Invalid deserialized value for MapState
Closed this issue · 19 comments
I'm using bravo to debug a MapState, and find the deserialized value is wrong while the outkey and the mapkey are right.
The MapState is used to record user login times in a day, so the outvalue values should be mostly 1, but what I get is mostly 358 .
Could you please help me narrow down the problem? Thanks a lot!
// MapState definition in Flink application
MapStateDescriptor<String, Integer> stateDescriptor = new MapStateDescriptor<>(
"loggedInUsers",
String.class,
Integer.class
);
// relative code in bravo application
DataSet<Tuple3<String, String, Integer>> state = reader.readKeyedStates(
new MapStateKKVReader<>(stateId, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO));
Hi,
Your code looks good in general.
Could you please try call withSerializersFromState()
on the Reader?
DataSet<Tuple3<String, String, Integer>> state = reader.readKeyedStates(
new MapStateKKVReader<>(stateId, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO).withSerializersFromState());
The best thing would be if somehow you could reproduce this in a test case.
There is one that uses map state reading here: https://github.com/king/bravo/blob/master/src/test/java/com/king/bravo/MapStateReadingTest.java
Should be easy enough to modify the pipeline to match the types you were trying to use. That way we have a clear way to look at it and fix it :)
@gyfora Good to hear that! Please let me know when it's fixed. And also thanks for your help!
I pushed a commit, can you please try now?
Sure. I will report the result in a while.
Now I get the following exception with the same code.
Caused by: java.lang.ClassCastException: org.apache.flink.api.java.tuple.Tuple1 cannot be cast to java.lang.String
at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:125)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at com.king.bravo.reader.MapStateKKVReader.flatMap(MapStateKKVReader.java:77)
at com.king.bravo.reader.MapStateKKVReader.flatMap(MapStateKKVReader.java:33)
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
The problem technically is when you specify keys by tuple field it is wrapped in a Tuple1.
I changed the behaviour so this tuple1s are unwrapped so if you pull the latest commit it should work. Please try :)
And sorry for the rough edges...
No worries. I'm happy to make contributions to the project, and I'll give it a try later. Thank you.
@gyfora Hi, I still have the invalid deserialized value issue. It seems that I get 358
as the deserialized value no matter what the actual int value is. And I tried using String values in a new job, but get an error indicating the value is null, while I'm pretty sure that there is no null value in the map.
It's very weird that the problem can't be reproduced in MapStateReadingTest, and everything works as expected in the tests. In addition, I'm using savepoints so there shouldn't be any Flink version compatibility issues (btw I'm using Flink 1.6.1).
Do you have any further ideas? It's OK if you don't have much time for this issue, since it looks like a corner case. Thanks a lot!
Very strange. Are you using the RocksDB state backend? Do you have TTL or anything special set on the state descriptor? Is this a window operator?
Given that you say the key and the mapkey I can only assume the bug is somehow in the value byte array deserialization logic.
In case of a mapstate it should work so that there is a boolean flag written first indicating potential null values in the map, and then the value itself is serialized.
I am not sure how the logic in the MapStateKKVReader is broken but you could put a breakpoint at the line I indicated and you could inspect whats in the valueBytes array and maybe copy the contents into a testcase where you could try applying the deserializers manually.
@gyfora Hi! Sorry for the delay. I'm a bit busy in the last few days.
I've found the problem. The deserialization logic does not work with the state TTL feature introduced in Flink 1.6.0. Now I can reproduce the problem in the UT, and you could see it here:
In addition, I greatly appreciate your help. 👍
Thanks a lot for the test case, let me know if you want to give a try fixing it. Otherwise I can look into it tomorrow.
I would assume there might be a timestamp written before the value in these cases.
I need to look into where Flink stores the info about TTL in the state itself and then we can add a simple flag :)
I'd like to have a try, but it would take me a while to study the serialization format, and for now I have my hands full with some stuff. I think it would be better to leave this to you experts :)
No worries, should be an easy fix if you know where to look but might take a while to dig through it if you are not very familiar.
I will fix it tomorrow
I pushed a fix to the master, now when reading MapStates you can set a flag to specify if the state is TTL or not:
https://github.com/king/bravo/blob/master/src/test/java/com/king/bravo/TtlStateTest.java#L75
For Value and List states TTL is recognised automatically, unfortunately this is not possible for map states right now :/
Please let me know if this works then we can close the issue
Yes, the problem is fixed and this issue can be closed now. Thank you very much! I'm looking forward to using bravo in more scenarios.
I found a fix where the TTL can also be determined for the map state so I dropped the useless flag. Should be in 0.3
Good to hear that! I'll switch to the newest version.