BigDecMaper not serializable
Closed this issue ยท 9 comments
Hey, I've got this (simplified) class and encountered an issue with BigDecMapper not being serializable.
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flinkx.api.serializers._
case class Item(
id: Long,
price: BigDecimal,
currency: String,
)
object Item {
implicit lazy val typeInformation: TypeInformation[Item] = deriveTypeInformation
}
This compiles well, but when I execute my test suite, this is being thrown:
org.apache.flink.api.common.InvalidProgramException: The implementation of the CaseClassSerializer is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:170)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2360)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1997)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1980)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1959)
at org.apache.flinkx.api.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:618)
at com.vinted.search.commons.test.fixtures.FlinkTest$TestSourceExtensions.fromList(FlinkTest.scala:84)
...
Cause: java.io.NotSerializableException: org.apache.flinkx.api.mapper.BigDecMapper
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1500)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1500)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1369)
I tried marking typeInformation as @transient so that flink would not send it over the wire, but that did not help, how could I solve this?
Thank you ๐
Hi @buinauskas
It seems we have forgotten to make BigDecMapper Serializable.
Can you quickly check in your code base the following code? If it works fine, then I can make the same change in this library:
import org.apache.flinkx.api.serializer.MappedSerializer.TypeMapper
import java.math.{BigDecimal => JBigDecimal}
// adding Serializable interface
class BigDecMapper2 extends TypeMapper[scala.BigDecimal, java.math.BigDecimal] with Serializable {
override def map(a: BigDecimal): java.math.BigDecimal = a.bigDecimal
override def contramap(b: java.math.BigDecimal): BigDecimal = BigDecimal(b)
}
object Item {
implicit lazy val bigDecMapper2: TypeMapper[scala.BigDecimal, JBigDecimal] = new BigDecMapper2()
implicit lazy val bigDecInfo2: TypeInformation[scala.BigDecimal] = mappedTypeInfo[scala.BigDecimal, JBigDecimal]
implicit lazy val typeInformation: TypeInformation[Item] = deriveTypeInformation
}
And I think it is missing the java.time.Instant
Serializable as well
Hey @novakov-alexey sorry for the late response, I was away for the weekend. Making type mappers serializable solves the problem ๐
Thanks for confirming. We will do that in the next release as well.
Anything I could help with? I would like this to be released sooner to avoid implementing type information manually for these types.
@buinauskas yes, do you want to make a PR ? It will speed up the things
@novakov-alexey good, I've submitted a fix.
released: https://repo1.maven.org/maven2/org/flinkextended/flink-scala-api_2.12/1.18.1_1.1.3/
@buinauskas thank you for your contribution!
Excellent, thank you!