king/bravo

Support Flink 1.7.0

Closed this issue · 6 comments

Now that Flink 1.7.0 has been released [0], can we upgrade bravo to use Flink 1.7.0? I'm updating my Flink cluster to 1.7 and would love to continue to be able to use bravo

[0] https://flink.apache.org/news/2018/11/30/release-1.7.0.html

Does it not work with 1.7.0? I would assume the savepoint/checkpoint format hasn't changed much

It might be that I'm doing something wrong, but I now get this error:

Caused by: java.lang.IllegalStateException: Cannot read state of a stateless operator.
	at com.king.bravo.reader.OperatorStateReader.lambda$readKeyedStates$0(OperatorStateReader.java:101)
	at java.util.Optional.orElseThrow(Optional.java:290)
	at com.king.bravo.reader.OperatorStateReader.readKeyedStates(OperatorStateReader.java:101)

And when I try doing something super simple like

val savepoint = StateMetadataUtils.loadSavepoint(savepointPath)
val env = ExecutionEnvironment.getExecutionEnvironment
val reader = new OperatorStateReader(env, savepoint, stateOperatorUid)
reader.getAllKeyedStateRows.first(3).print()

I get

Caused by: java.lang.NoSuchMethodError: org.apache.flink.runtime.state.KeyedBackendSerializationProxy.<init>(Ljava/lang/ClassLoader;Z)V
	at com.king.bravo.utils.StateMetadataUtils.getKeyedBackendSerializationProxy(StateMetadataUtils.java:177)
	at com.king.bravo.reader.inputformat.RocksDBSavepointIterator.openIfNeeded(RocksDBSavepointIterator.java:136)
	at com.king.bravo.reader.inputformat.RocksDBSavepointIterator.nextRecord(RocksDBSavepointIterator.java:102)
	at com.king.bravo.reader.inputformat.RocksDBSavepointIterator.next(RocksDBSavepointIterator.java:87)
	at com.king.bravo.reader.inputformat.RocksDBSavepointIterator.next(RocksDBSavepointIterator.java:54)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
	at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:270)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1812)
	at java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
	at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
	at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
	at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
	at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
	at com.king.bravo.reader.inputformat.RocksDBKeyedStateInputFormat.reachedEnd(RocksDBKeyedStateInputFormat.java:104)

To be clear, I'm compiling with flink 1.7.0 and bravo 0.3 in the same code base. It's possible if I used Flink 1.6.2 and bravo 0.3 together to read flink 1.7 savepoints, it would work, but that's much more challenging to operate in this code base.

Alright, thanks for checking. I will try to fix this and release 0.4 tomorrow

I have released the 0.4 to maven central that should work for Flink 1.7.0.
Please try it when you have some time to make sure everything still works as expected.

Works now, thanks!