Pass message to RecordReceivedMalformedEvent when validation fails
raza-matillion opened this issue · 12 comments
We have encountered a scenario where we want to send all messages (that failed validation) to dead letter topic to be reviewed later. Currently I can't find a way to get the message if it fails validation. I think it will be useful if we pass binding.Message
to ObservabilityService method RecordReceivedMalformedEvent which will enable us to log or send message to dead letter topic.
Line 68 in 70abff6
and
Line 74 in 70abff6
Quick question: since invoker.go
returns an error in such cases, why not send that event to a configured DLQ (this SDK is not opinionated on DLQs, so you can use whatever SDK/logic you want in your app). I have not seen many cases where the observability service is responsible for DLQ-ing, but I could be wrong here.
cc/ @duglin for thoughts
I like the idea of a DLQ that isn't tied to the ObservabilityService.
I agree with the idea. My plan is to use custom error handler in ObservabilityService that will handle the error as we intended. So the DLQ logic will not live in ObservabilityService.
func (o Observability) RecordReceivedMalformedEvent(ctx context.Context, m binding.Message, err error) {
o.ErrorHandler.Handle(ctx, m, err)
}
What I understand is, the invoker.go
does not return error when we use client.StartReceive
. Correct me if I am wrong.
The message is also passed as nil
to the respFn
otherwise I could use client.Responder
instead of client.StartReceive
Line 75 in 70abff6
So you don't know what the CloudEvent
is upfront? I was assuming you know the to be sent CloudEvent
and together with the validation error returned you can construct a DLQ message? Can you describe a common flow of how you/your users are using the SDK?
No I don't know cloudevent
upfront. I am consuming cloudevent
in a service which is produced by another service. I am assuming that producers will send cloudevent
format. If producer sends a malformed cloudevent
, I want to log error and pass the malformed message to DLQ. Now if the binding.ToEvent
fails in invoker.go
then I don't have a way to get hold on the malformed message so that I can pass it to DLQ to further investigation.
I want to log error and pass the malformed message to DLQ
Shouldn't that be the responsibility of the producer/sender though? If they send a malformed event and you return an error, i.e., no ACK
, it should be their (the sender's) responsibility to handle the error, no? I understand your scenario might be a special case then? I'm coming back to my initial feedback that using an observability service for DLQ capabilities could be considered an anti-pattern here. WDYT?
What I understand is, the sender sends and forget, and consumer try to process message and upon failure, send the message to DLQ.
I totally agree with your point that DLQ in the Observability Service is anti-pattern. My original request is to pass the binding.Message
to RecordReceivedMalformedEvent in Observability Service. Here is the PR if you want to have a look
https://github.com/cloudevents/sdk-go/pull/965/files
What I understand is, the sender sends and forget, and consumer try to process message and upon failure, send the message to DLQ.
With fire-and-forget, the sender is basically in at-most-once
semantics land. So implementing DLQ on the consumer might be overkill from that perspective. If the sender uses at-least-once
semantics, it needs to wait for a (N)ACK
from the receiver.
Question: while not the full event/payload, do we log some metadata in the observability service for correlation purposes at all?
Just logging error message at the moment in observability service. I don't think I have access to any other info from the actual payload to log? Would be good to have some more info about payload as consumer is unaware the event/payload. Looking at another open issue, someone has encountered similar situation
#757
Just logging error message at the moment in observability service. I don't think I have access to any other info from the actual payload to log?
Correct, we don't log any message detail.
Would be good to have some more info about payload as consumer is unaware the event/payload.
Agree, IMHO this is something we should improve. From that point, your PR sounds good to me (not the DLQ approach). However, I'm not an expert on the observability part and their could be issues with logging large, or malicious, payloads (which would be true today I guess since we record a successfully transformed event to observability service - yet, it's a risk).
This is the code in invoker.go
which does not provide any information and I'd argue is almost useless since you don't know what really happened/there's no way to correlate.
e, eventErr := binding.ToEvent(ctx, m)
switch {
case eventErr != nil && r.fn.hasEventIn:
r.observabilityService.RecordReceivedMalformedEvent(ctx, eventErr)
return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "failed to convert Message to Event: %w", eventErr))
case r.fn != nil:
// Check if event is valid before invoking the receiver function
if e != nil {
if validationErr := e.Validate(); validationErr != nil {
r.observabilityService.RecordReceivedMalformedEvent(ctx, validationErr)
return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "validation error in incoming event: %w", validationErr))
}
}
@embano1 You are absolutely right about this block of code. If you ignore my DLQ approach, the PR will still make this block of code useful. I would suggest to even pass binding.Message
to respFn
where we are passing nil
. I can update my PR with this change if you agree.
respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "validation error in incoming event: %w", validationErr))