[BUG] Flink pulsar source upgrade from 1.13.1.4 to 1.13.6.2 fails
nikolasten opened this issue · 1 comments
Flink pulsar source upgrade from 1.13.1.4 to 1.13.6.2 fails
When upgrading flink pipeline that was using 1.13.1.4 pulsar flink connector, more specifically org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource
to 1.13.6.2 pulsar flink connector, upgrade fails with error
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)\nCaused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_e851a344fc332b3e7b727e57889fc262_(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173)
... 10 common frames omitted\nCaused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:485)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:276)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 12 common frames omitted\nCaused by: java.io.InvalidClassException: org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange; local class incompatible: stream classdesc serialVersionUID = -6297347936093846291, local class serialVersionUID = -4628744661831747115
at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer.toObject(TopicSubscriptionSerializer.java:103)
at org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer.deserialize(TopicSubscriptionSerializer.java:121)
at org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer.deserialize(TopicSubscriptionSerializer.java:32)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
... 16 common frames omitted
To Reproduce
Steps to reproduce the behavior:
- Deploy flink pipeline with pulsar flink connector 1.13.1.4 and use
org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource
as streaming source. - Enable checkpointing, close flink job with a savepoint
- Upgrade flink-pulsar dependecy to 1.13.6.2
- Deploy flink job from savepoint
Expected behavior
Upgrade was successful
Additional context
Seems like same issue is happening here apache/flink-cdc#78
I have ran into this exact issue - unfortunately it doesn't seem like this is fixable: the operator state is not compatible between the 2 versions. The TopicSubscription class in the state did not originally define a serialVersionUid
, causing the JVM to generate a default version number for the serialized class. In a later commit, serialVersionUid
was added to all the serializable classes, breaking everything after 1.13.1.4.
Not sure how critical it is that you upgrade, but you can always try manipulating the flink state ... I haven't tried this myself. If you can also tolerate restarting your application and ignoring existing state that's probably the simplest thing. version 1.13.1.4 seems like it's a dead end.