Orchestrator infinite crash loop when a Kafka message is not deserializable
Opened this issue · 2 comments
The error here is triggered by a custom job parameter being an array instead of a string. It would break the deserialization process, but not drop the message. Therefore, it would crash again right away when it retried to read the message, creating an infinite loop of crashing.
Here is the error message:
Confluent.Kafka.ConsumeException: Local: Value deserialization error
---> System.Text.Json.JsonException: The JSON value could not be converted to System.String. Path: $.customJobParameters[1].value | LineNumber: 0 | BytePositionInLine: 1675.
---> System.InvalidOperationException: Cannot get the value of a token type 'StartArray' as a string.
at System.Text.Json.ThrowHelper.ThrowInvalidOperationException_ExpectedString(JsonTokenType tokenType)
at System.Text.Json.Utf8JsonReader.GetString()
at System.Text.Json.Serialization.Metadata.JsonPropertyInfo`1.ReadJsonAndSetMember(Object obj, ReadStack& state, Utf8JsonReader& reader)
at System.Text.Json.Serialization.Converters.ObjectDefaultConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
at System.Text.Json.Serialization.JsonCollectionConverter`2.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, TCollection& value)
at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
at System.Text.Json.Serialization.Metadata.JsonPropertyInfo`1.ReadJsonAndSetMember(Object obj, ReadStack& state, Utf8JsonReader& reader)
at System.Text.Json.Serialization.Converters.ObjectDefaultConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state)
--- End of inner exception stack trace ---
at System.Text.Json.ThrowHelper.ReThrowWithPath(ReadStack& state, Utf8JsonReader& reader, Exception ex)
at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state)
at System.Text.Json.JsonSerializer.ReadFromSpan[TValue](ReadOnlySpan`1 utf8Json, JsonTypeInfo jsonTypeInfo, Nullable`1 actualByteCount)
at System.Text.Json.JsonSerializer.Deserialize[TValue](ReadOnlySpan`1 utf8Json, JsonSerializerOptions options)
at Orchestrator.Queue.JobsConsumer.JobSerializer`1.Deserialize(ReadOnlySpan`1 data, Boolean isNull, SerializationContext context) in /app/Orchestrator/Queue/JobsConsumer/JobSerializer.cs:line 29
at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
--- End of inner exception stack trace ---
at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
at Confluent.Kafka.Consumer`2.Consume(CancellationToken cancellationToken)
at Orchestrator.Queue.KafkaConsumer`1.<>c__DisplayClass1_1.<<-ctor>b__0>d.MoveNext() in /app/Orchestrator/Queue/KafkaConsumer.cs:line 34
A potential solution could be to catch the exception and ignore it to just ignore the message. I don't know if it works.
https://jonboulineau.me/blog/kafka/dealing-with-bad-records-in-kafka
Here is a draft of a potential implementation in JsonSerializer.cs
:
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull) return default;
try
{
return JsonSerializer.Deserialize<T>(data.ToArray()) ?? default;
}
catch (ConsumeException e)
{
Console.WriteLine("Error message here");
return default;
}
}
Again, I don't know that it would work.
The modification to JsonSerializer.cs
does not work.
The issue is in JobSerializer.cs
. We now catch the exception, a JsonException
, and soft fail. Better logging would / retry and stuff like that would be better, but it is no longer a bug in my branch.