Keep schema registry up to date
thekingofcity opened this issue · 1 comments
Problem description
Schema Registry will upgrade the internal Avro to 1.11 which will break the compatibility of the current implementation.
Problem location
Avro deserialization schema
Suggestions for an improvement
To add some more details, the schema registry update would break the tests of interpreting with Avro's GenericRecord
in both reader and writer, more specificly, this test case for example:
Error stacktrace:
2022-04-27 17:27:43,740 ERROR io.pravega.connectors.flink.FlinkPravegaReader [] - Exception occurred while creating a Pravega EventStreamReader to read events
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at io.pravega.connectors.flink.FlinkPravegaReader.emitEvent(FlinkPravegaReader.java:357) ~[main/:?]
at io.pravega.connectors.flink.FlinkPravegaReader.run(FlinkPravegaReader.java:321) [main/:?]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116) [flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73) [flink-streaming-java_2.12-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323) [flink-streaming-java_2.12-1.14.0.jar:1.14.0]
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) ~[kryo-2.24.0.jar:?]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) ~[flink-core-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
... 12 more
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1060) ~[?:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) ~[kryo-2.24.0.jar:?]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) ~[flink-core-1.14.0.jar:1.14.0]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
... 12 more
We first try to offer an higher-level withDeserializationSchemaFromRegistry
interface in #422 with much detail encapsulated to handle all the json, avro and protobuf format. After the avro is passively upgraded, we find that the way we handle the produced type for GenericRecord
is not correct. It should contain the schema information as new GenericRecordAvroTypeInfo(SCHEMA)
for Flink to deserialize, but we just pass this TypeInformation.of(GenericRecord.class)
which causes this error under Avro 1.11.
Now we would decide to remove this withDeserializationSchemaFromRegistry
API, and just keep the DeserializerFromSchemaRegistry
and let the users to wrap this freely with the correct type information. Similarly in the writer path, we would remove the withSerializationSchemaFromRegistry
API and use addSource(source, typeInfo)
to specify the type information if needed.