Retrying task will be discarded on task deserialization with using DefaultTaskExtractor
Opened this issue · 3 comments
We are consuming Kafka topic using Decaton processor with retrying.
However, ProcessorsBuilder.consuming(String topic, TaskExtractor<T> taskExtractor)
is not working correctly with DefaultTaskExtractor
.
retryTaskExtractor
will unwrap DecatonTaskRequest
using DefaultTaskExtractor
, then taskExtractor.extract()
here.
https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L76
But if taskExtractor
is DefaultTaskExtractor
or a TaskExctractor
which is delegating deserialization to DefaultTaskExtractor
, deserialization will be failed on retryTaskExtractor
and the retrying task will be discarded.
Stacktrace:
java.lang.IllegalArgumentException: com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
at com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor.extract(DefaultTaskExtractor.java:45)
at com.linecorp.bot.commons.decaton.processor.TimedTaskExtractor.extract(TimedTaskExtractor.kt:31)
at com.linecorp.decaton.processor.runtime.ProcessorsBuilder.lambda$consuming$1(ProcessorsBuilder.java:83)
at com.linecorp.decaton.processor.runtime.internal.ProcessPipeline.extract(ProcessPipeline.java:96)
at com.linecorp.decaton.processor.runtime.internal.ProcessPipeline.scheduleThenProcess(ProcessPipeline.java:68)
at com.linecorp.decaton.processor.runtime.internal.ProcessorUnit.processTask(ProcessorUnit.java:73)
at com.linecorp.decaton.processor.runtime.internal.ProcessorUnit.lambda$putTask$1(ProcessorUnit.java:60)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
at com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:101)
at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:551)
at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.skipMessage(CodedInputStream.java:649)
at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.skipField(CodedInputStream.java:581)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.<init>(Decaton.java:1073)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.<init>(Decaton.java:1041)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest$1.parsePartialFrom(Decaton.java:1638)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest$1.parsePartialFrom(Decaton.java:1633)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:163)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:209)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:214)
at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.parseFrom(Decaton.java:1250)
at com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor.extract(DefaultTaskExtractor.java:36)
... 9 common frames omitted
We passed original com.linecorp.bot.commons.decaton.processor.TimedTaskExtractor
to taskExtractor
which is like following (written in Kotlin).
This TimedTaskExtractor
is for observing consuming delay.
The issue will cause with delegate = DefaultTaskExtractor
.
class TimedTaskExtractor<T>(
private val delegate: TaskExtractor<T>,
subscription: String,
topic: String,
meterRegistry: MeterRegistry
) : TaskExtractor<T> {
private val timer = meterRegistry.timer(
"decaton.processor.${TimedTaskExtractor::class.simpleName?.toLowerCase()}.timestamp_delay",
Tags.of(Tag.of("topic", topic), Tag.of("subscription", subscription))
)
override fun extract(bytes: ByteArray): DecatonTask<T> {
return delegate.extract(bytes).also {
if (it.metadata().timestampMillis() > 0) {
timer.record(
System.currentTimeMillis() - it.metadata().timestampMillis(),
TimeUnit.MILLISECONDS
)
}
}
}
}
Thanks for pointing out.
Yeah, that's the heritage from when Decaton didn't support consuming arbitrary task format. (Initially, Decaton only supported DecatonTaskRequest
protobuf format)
When we introduced TaskExtractor
for supporting arbitrary message format, there was a discussion about how to support retry-feature.
- Decaton's retry-feature is implemented as like below:
- Only change metadata-part of DecatonTaskRequest and leave raw
serializedTask
as-is, and produce it to retry-topic
- Only change metadata-part of DecatonTaskRequest and leave raw
- Problem here is:
- We have to treat
serializedTask
differently depending on messages areDecatonTaskRequest
-format or in arbitrary format. - In
DecatonTaskRequest
,serialziedTask
is aserialized_task
field in protobuf - On the other hand, when we consume arbitrary format,
serializedTask
has to be raw entire message bytes. - So, if we want to treat
DecatonTaskRequest
and arbitrary format in unified exactly same way, we have to migrate retried-task's messages to store entire message bytes in its serialized task field- However, it introduces backward incompatibility so we can't upgrade Decaton safely without stopping all consumers once. (i.e. to prevent producing retry tasks)
- We have to treat
- That's the reason Decaton has a special treatment for
DefaultTaskExtractor
(i.e.ProcessorsBuilder#consuming(String, Deserializer)
overload)
In short, you have to use DefaultTaskExtractor
only when you consume DecatonTaskRequest
format.
In short, you have to use ProcessorsBuilder#consuming(String, Deserializer)
to consume DecatonTaskRequest
format.
I guess we can solve this problem fundamentally in this attempt (#80), but unfortunately it's now stuck due to our lack of resource.
In short, you have to use DefaultTaskExtractor only when you consume DecatonTaskRequest format.
We are using DecatonTaskRequest
format task with DefaultTaskExtractor
(with wrapped task extractor) and retrying.
I think the problem of the code is here
https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L76
If we specify Deserializer
for .consuming()
, new DefaultTaskExtractor<>(deserializer)
for both taskExtractor
and retryTaskExtractor
.
https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L63
But if we specify TaskExtractor
for .consuming()
, retryTaskExtractor
will be wrapped with DefaultTaskExtractor
for consuming non DecatonTaskRequest
format topics even if taskExtractor
is DefaultTaskExtractor
.
https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L76
If we call .consuming()
with DefaultTaskExtractor
, deserialize DecatonTaskRequest
2 times and it failed.
I know TaskExtractor
is especially for non DecatonTaskRequest
, but it will cause a bug.
If it doesn't support DefaultTaskExtractor
, I think it should be thrown error or put logging on .cosuming()
.
In short, you have to use DefaultTaskExtractor only when you consume DecatonTaskRequest format
This my comment was bit wrong. Precisely, you have to use ProcessorsBuilder#consuming(String, Deserializer) to consume DecatonTaskRequest format.
If we call .consuming() with DefaultTaskExtractor, deserialize DecatonTaskRequest 2 times and it failed.
Yeah, right.
I understand that this behavior could cause a bug, but since DefaultTaskExtractor
is located in internal
package, we don't expect it to be used by users.
Though throwing an exception or logging is an option, if users "wrap" DefaultTaskExtractor (like you did) in outer extractor, it may not work.
I think adding some DefaultTaskExtractor
's javadoc about the caution for the usage is sufficient for now.