line/decaton

Retrying task will be discarded on task deserialization with using DefaultTaskExtractor

Opened this issue · 3 comments

We are consuming Kafka topic using Decaton processor with retrying.

However, ProcessorsBuilder.consuming(String topic, TaskExtractor<T> taskExtractor) is not working correctly with DefaultTaskExtractor.

retryTaskExtractor will unwrap DecatonTaskRequest using DefaultTaskExtractor, then taskExtractor.extract() here.
https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L76
But if taskExtractor is DefaultTaskExtractor or a TaskExctractor which is delegating deserialization to DefaultTaskExtractor, deserialization will be failed on retryTaskExtractor and the retrying task will be discarded.

Stacktrace:

java.lang.IllegalArgumentException: com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
	at com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor.extract(DefaultTaskExtractor.java:45)
	at com.linecorp.bot.commons.decaton.processor.TimedTaskExtractor.extract(TimedTaskExtractor.kt:31)
	at com.linecorp.decaton.processor.runtime.ProcessorsBuilder.lambda$consuming$1(ProcessorsBuilder.java:83)
	at com.linecorp.decaton.processor.runtime.internal.ProcessPipeline.extract(ProcessPipeline.java:96)
	at com.linecorp.decaton.processor.runtime.internal.ProcessPipeline.scheduleThenProcess(ProcessPipeline.java:68)
	at com.linecorp.decaton.processor.runtime.internal.ProcessorUnit.processTask(ProcessorUnit.java:73)
	at com.linecorp.decaton.processor.runtime.internal.ProcessorUnit.lambda$putTask$1(ProcessorUnit.java:60)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
	at com.linecorp.decaton.com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:101)
	at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.readTag(CodedInputStream.java:551)
	at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.skipMessage(CodedInputStream.java:649)
	at com.linecorp.decaton.com.google.protobuf.CodedInputStream$ArrayDecoder.skipField(CodedInputStream.java:581)
	at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.<init>(Decaton.java:1073)
	at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.<init>(Decaton.java:1041)
	at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest$1.parsePartialFrom(Decaton.java:1638)
	at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest$1.parsePartialFrom(Decaton.java:1633)
	at com.linecorp.decaton.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:163)
	at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197)
	at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:209)
	at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:214)
	at com.linecorp.decaton.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
	at com.linecorp.decaton.protocol.Decaton$DecatonTaskRequest.parseFrom(Decaton.java:1250)
	at com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor.extract(DefaultTaskExtractor.java:36)
	... 9 common frames omitted

We passed original com.linecorp.bot.commons.decaton.processor.TimedTaskExtractor to taskExtractor which is like following (written in Kotlin).
This TimedTaskExtractor is for observing consuming delay.
The issue will cause with delegate = DefaultTaskExtractor.

class TimedTaskExtractor<T>(
    private val delegate: TaskExtractor<T>,
    subscription: String,
    topic: String,
    meterRegistry: MeterRegistry
) : TaskExtractor<T> {
    private val timer = meterRegistry.timer(
        "decaton.processor.${TimedTaskExtractor::class.simpleName?.toLowerCase()}.timestamp_delay",
        Tags.of(Tag.of("topic", topic), Tag.of("subscription", subscription))
    )

    override fun extract(bytes: ByteArray): DecatonTask<T> {
        return delegate.extract(bytes).also {
            if (it.metadata().timestampMillis() > 0) {
                timer.record(
                    System.currentTimeMillis() - it.metadata().timestampMillis(),
                    TimeUnit.MILLISECONDS
                )
            }
        }
    }
}

Thanks for pointing out.

Yeah, that's the heritage from when Decaton didn't support consuming arbitrary task format. (Initially, Decaton only supported DecatonTaskRequest protobuf format)

When we introduced TaskExtractor for supporting arbitrary message format, there was a discussion about how to support retry-feature.

  • Decaton's retry-feature is implemented as like below:
    • Only change metadata-part of DecatonTaskRequest and leave raw serializedTask as-is, and produce it to retry-topic
  • Problem here is:
    • We have to treat serializedTask differently depending on messages are DecatonTaskRequest-format or in arbitrary format.
    • In DecatonTaskRequest, serialziedTask is a serialized_task field in protobuf
    • On the other hand, when we consume arbitrary format, serializedTask has to be raw entire message bytes.
    • So, if we want to treat DecatonTaskRequest and arbitrary format in unified exactly same way, we have to migrate retried-task's messages to store entire message bytes in its serialized task field
      • However, it introduces backward incompatibility so we can't upgrade Decaton safely without stopping all consumers once. (i.e. to prevent producing retry tasks)
  • That's the reason Decaton has a special treatment for DefaultTaskExtractor (i.e. ProcessorsBuilder#consuming(String, Deserializer) overload)

In short, you have to use DefaultTaskExtractor only when you consume DecatonTaskRequest format.

In short, you have to use ProcessorsBuilder#consuming(String, Deserializer) to consume DecatonTaskRequest format.

I guess we can solve this problem fundamentally in this attempt (#80), but unfortunately it's now stuck due to our lack of resource.

In short, you have to use DefaultTaskExtractor only when you consume DecatonTaskRequest format.

We are using DecatonTaskRequest format task with DefaultTaskExtractor (with wrapped task extractor) and retrying.

I think the problem of the code is here
https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L76

If we specify Deserializer for .consuming(), new DefaultTaskExtractor<>(deserializer) for both taskExtractor and retryTaskExtractor.
https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L63
But if we specify TaskExtractor for .consuming(), retryTaskExtractor will be wrapped with DefaultTaskExtractor for consuming non DecatonTaskRequest format topics even if taskExtractor is DefaultTaskExtractor.
https://github.com/line/decaton/blob/v3.0.2/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java#L76

If we call .consuming() with DefaultTaskExtractor, deserialize DecatonTaskRequest 2 times and it failed.
I know TaskExtractor is especially for non DecatonTaskRequest, but it will cause a bug.
If it doesn't support DefaultTaskExtractor, I think it should be thrown error or put logging on .cosuming().

In short, you have to use DefaultTaskExtractor only when you consume DecatonTaskRequest format

This my comment was bit wrong. Precisely, you have to use ProcessorsBuilder#consuming(String, Deserializer) to consume DecatonTaskRequest format.

If we call .consuming() with DefaultTaskExtractor, deserialize DecatonTaskRequest 2 times and it failed.

Yeah, right.
I understand that this behavior could cause a bug, but since DefaultTaskExtractor is located in internal package, we don't expect it to be used by users.

Though throwing an exception or logging is an option, if users "wrap" DefaultTaskExtractor (like you did) in outer extractor, it may not work.

I think adding some DefaultTaskExtractor's javadoc about the caution for the usage is sufficient for now.