ctx.Value() changes initial message
merlor opened this issue · 5 comments
When i work with Avro messages (msg interface{} is a pointer) the function context.Value() overwrites the initial message with the value returned by context.Value(), here a simple example.
I have a processor with a group configured like this:
goka.DefineGroup(
"my-group",
goka.Input(aTopic, aCodec, consumeA),
goka.Persist(ACodec),
)
The avro message is related with this struct:
type A struct {
text string
}
This is the callback that consumes the ATopic:
func consumeA(ctx goka.Context, msg interface{}) {
newValue := msg.(*A)
fmt.Println(fmt.Sprintf("%v", msg))
oldValue := ctx.Value().(*A)
fmt.Println(fmt.Sprintf("%v", msg))
if newValue != oldValue {
ctx.SetValue(newValue)
}
}
When I send the first message {"text":"hello!}
the output will be:
&{hello!}
&{hello!}
and it correctly persist the actual value.
But if I send another message {"text":"hello worlld}
the output will be:
&{hello world}
&{hello!}
the condition newValue != oldValue
will never be true and no new value will be persisted.
Obviously I can assign the message to a variable as a work around, but i have a couple of doubt about this:
- Is this the correct behavior of the context.Value() function? (There is no mention about this in the documentation)
- Am I correctly defining the GroupGraph and all the related components?
Hi @merlor,
to be honest I'm not sure I understand the issue here.
You mentioned context.Value()
overwrites the intiial value returned by context.Value()
. That's the same call, that will not change any value whatsoever.
The pointer comparison like if newValue != oldValue{
is a bad idea as you said, because ctx.Value()
will always return a new pointer after unmarshalling the bytes from storage. Unless the codec (in the example called ACodec
) somehow reuses the structs for unmarshalling, then maybe the decode-calls keep overwriting the same structs and you'll see very odd behavior. So make sure the codec works without side effects. But I doubt that's the real reason for the unexpected prints.
Your example actually indicates, that the processor has already run and stored a value "hello!
for the same key. Otherwise the type-assertion would panic since the call to ctx.Value()
returns nil
, if the value was never set before. Maybe you ran the example a bit different and stored a value with the same key in a previous run?
Remember that ctx.SetValue()
persists the value to Kafka, which then gets recovered when the processor restarts.
To start from scratch, use a new group-name
(here called "my-group") and delete local storage (default is/tmp/goka
).
Starting from scratch will definitely crash the processor on the type assertion (unless the codec returns a valid A
for []byte{}
).
The second run is actually correct. The new value is hello world
, and the previously stored value is hello!
, so everything works as expected.
The processor definition looks fine btw.
Let me know if that solved the issue or if you're still having trouble.
Hi @frairon
I'm sorry, I try to explain myself better; let me rewrite the consumeA function in a simpler version (What I want to understand is the behavior of parameter msg):
func consumeA(ctx goka.Context, msg interface{}) {
fmt.Println(fmt.Sprintf("%v", msg)) //print the msg value
newValue := msg.(*A)
ctx.Value()
fmt.Println(fmt.Sprintf("%v", msg)) //print the msg value again
ctx.SetValue(newValue)
}
The first thing that I want to point out is that (as the the initial example) the function fmt.Println() has the same input (the parameter msg) in both calls.
Now there are no condition, the variable newValue will be always persisted and the call ctx.Value() seems useless, but if I send the thwo messages that is mentioned before ( {"text":"hello!}
and "{text":"hello world}
) I receive these outputs (in order):
first output:
&{hello!}
&{hello!}
second output:
&{hello world}
&{hello!}
N.B. the first message was sent in a "clean state" (the topic and the table created with persist are empty)
The first output seems legit to me, but the second one is my biggest concern.
You mentioned context.Value() overwrites the intial value returned by context.Value()
Not exactly, I tried to explain that the call ctx.Value() overwrites the parameter msg; let's see what happens to it:
when the second message arrives the msg value is, obviously, "hello world" but after I call ctx.Value() its value becomes "hello!" (the value previously persisted). Isn't a bit weird? Is this the normal behavior?
The msg variable is related to the topic while the functions ctx.Value() and ctx.SetValue() are related to the persisted table so I'd say no but maybe I'm missing something.
Ok that's very odd and not legit and super weird and should not happen at all.
The parameter msg
is the incoming kafka message, i.e. the result of the []byte
-value from kafka passed to aCodec.Decode()
. It shouldn't change from that point on.
Also the call to ctx.Value()
has no notion of the incoming message, those two are independent things.
The only thing they might share is the codec. If a call to Decode
has side-effects to something outside that call, it might corrupt existing data structures.
Could you maybe give insights how the codec is implemented?
Alternatively: can you reproduce the behavior if you used a different codec for struct A
e.g. one that encodes via json?
This gave me the right hint:
The parameter msg is the incoming kafka message, i.e. the result of the []byte-value from kafka passed to aCodec.Decode(). It shouldn't change from that point on.
Also the call to ctx.Value() has no notion of the incoming message, those two are independent things.
The only thing they might share is the codec. If a call to Decode has side-effects to something outside that call, it might corrupt existing data structures.
Basically, the problem was how the codec was handled (It was returning the same pointer every time); I fix that and now It works.
You can close, thank you so much for the help @frairon
Great! You're welcome :).