Support Flink 1.7.0
Closed this issue · 6 comments
Now that Flink 1.7.0 has been released [0], can we upgrade bravo to use Flink 1.7.0? I'm updating my Flink cluster to 1.7 and would love to continue to be able to use bravo
[0] https://flink.apache.org/news/2018/11/30/release-1.7.0.html
Does it not work with 1.7.0? I would assume the savepoint/checkpoint format hasn't changed much
It might be that I'm doing something wrong, but I now get this error:
Caused by: java.lang.IllegalStateException: Cannot read state of a stateless operator.
at com.king.bravo.reader.OperatorStateReader.lambda$readKeyedStates$0(OperatorStateReader.java:101)
at java.util.Optional.orElseThrow(Optional.java:290)
at com.king.bravo.reader.OperatorStateReader.readKeyedStates(OperatorStateReader.java:101)
And when I try doing something super simple like
val savepoint = StateMetadataUtils.loadSavepoint(savepointPath)
val env = ExecutionEnvironment.getExecutionEnvironment
val reader = new OperatorStateReader(env, savepoint, stateOperatorUid)
reader.getAllKeyedStateRows.first(3).print()
I get
Caused by: java.lang.NoSuchMethodError: org.apache.flink.runtime.state.KeyedBackendSerializationProxy.<init>(Ljava/lang/ClassLoader;Z)V
at com.king.bravo.utils.StateMetadataUtils.getKeyedBackendSerializationProxy(StateMetadataUtils.java:177)
at com.king.bravo.reader.inputformat.RocksDBSavepointIterator.openIfNeeded(RocksDBSavepointIterator.java:136)
at com.king.bravo.reader.inputformat.RocksDBSavepointIterator.nextRecord(RocksDBSavepointIterator.java:102)
at com.king.bravo.reader.inputformat.RocksDBSavepointIterator.next(RocksDBSavepointIterator.java:87)
at com.king.bravo.reader.inputformat.RocksDBSavepointIterator.next(RocksDBSavepointIterator.java:54)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:270)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1812)
at java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
at com.king.bravo.reader.inputformat.RocksDBKeyedStateInputFormat.reachedEnd(RocksDBKeyedStateInputFormat.java:104)
To be clear, I'm compiling with flink 1.7.0 and bravo 0.3 in the same code base. It's possible if I used Flink 1.6.2 and bravo 0.3 together to read flink 1.7 savepoints, it would work, but that's much more challenging to operate in this code base.
Alright, thanks for checking. I will try to fix this and release 0.4 tomorrow
I have released the 0.4 to maven central that should work for Flink 1.7.0.
Please try it when you have some time to make sure everything still works as expected.
Works now, thanks!