pravega/flink-connectors

Keep schema registry up to date

thekingofcity opened this issue · 1 comments

Problem description
Schema Registry will upgrade the internal Avro to 1.11 which will break the compatibility of the current implementation.

Problem location
Avro deserialization schema

Suggestions for an improvement

To add some more details, the schema registry update would break the tests of interpreting with Avro's GenericRecord in both reader and writer, more specificly, this test case for example:

public void testReaderWithAvroGenericRecordRegistryDeserializer() throws Exception {

Error stacktrace:

2022-04-27 17:27:43,740 ERROR io.pravega.connectors.flink.FlinkPravegaReader               [] - Exception occurred while creating a Pravega EventStreamReader to read events
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
    at io.pravega.connectors.flink.FlinkPravegaReader.emitEvent(FlinkPravegaReader.java:357) ~[main/:?]
    at io.pravega.connectors.flink.FlinkPravegaReader.run(FlinkPravegaReader.java:321) [main/:?]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116) [flink-streaming-java_2.12-1.14.0.jar:1.14.0]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73) [flink-streaming-java_2.12-1.14.0.jar:1.14.0]
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323) [flink-streaming-java_2.12-1.14.0.jar:1.14.0]
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) ~[kryo-2.24.0.jar:?]
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) ~[flink-core-1.14.0.jar:1.14.0]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
    ... 12 more
Caused by: java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1060) ~[?:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[kryo-2.24.0.jar:?]
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) ~[kryo-2.24.0.jar:?]
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) ~[flink-core-1.14.0.jar:1.14.0]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0]
    ... 12 more

We first try to offer an higher-level withDeserializationSchemaFromRegistry interface in #422 with much detail encapsulated to handle all the json, avro and protobuf format. After the avro is passively upgraded, we find that the way we handle the produced type for GenericRecord is not correct. It should contain the schema information as new GenericRecordAvroTypeInfo(SCHEMA) for Flink to deserialize, but we just pass this TypeInformation.of(GenericRecord.class) which causes this error under Avro 1.11.

Now we would decide to remove this withDeserializationSchemaFromRegistry API, and just keep the DeserializerFromSchemaRegistry and let the users to wrap this freely with the correct type information. Similarly in the writer path, we would remove the withSerializationSchemaFromRegistry API and use addSource(source, typeInfo) to specify the type information if needed.