BigDecMaper not serializable
Closed this issue ยท 9 comments
Hey, I've got this (simplified) class and encountered an issue with BigDecMapper not being serializable.
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flinkx.api.serializers._
case class Item(
id: Long,
price: BigDecimal,
currency: String,
object Item {
implicit lazy val typeInformation: TypeInformation[Item] = deriveTypeInformation
This compiles well, but when I execute my test suite, this is being thrown:
org.apache.flink.api.common.InvalidProgramException: The implementation of the CaseClassSerializer is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(
at org.apache.flinkx.api.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:618)
Cause: org.apache.flinkx.api.mapper.BigDecMapper
at java.base/
at java.base/
at java.base/
at java.base/
at java.base/
at java.base/
at java.base/
at java.base/
at java.base/
at java.base/
I tried marking typeInformation as @transient so that flink would not send it over the wire, but that did not help, how could I solve this?
Thank you ๐
Hi @buinauskas
It seems we have forgotten to make BigDecMapper Serializable.
Can you quickly check in your code base the following code? If it works fine, then I can make the same change in this library:
import org.apache.flinkx.api.serializer.MappedSerializer.TypeMapper
import java.math.{BigDecimal => JBigDecimal}
// adding Serializable interface
class BigDecMapper2 extends TypeMapper[scala.BigDecimal, java.math.BigDecimal] with Serializable {
override def map(a: BigDecimal): java.math.BigDecimal = a.bigDecimal
override def contramap(b: java.math.BigDecimal): BigDecimal = BigDecimal(b)
object Item {
implicit lazy val bigDecMapper2: TypeMapper[scala.BigDecimal, JBigDecimal] = new BigDecMapper2()
implicit lazy val bigDecInfo2: TypeInformation[scala.BigDecimal] = mappedTypeInfo[scala.BigDecimal, JBigDecimal]
implicit lazy val typeInformation: TypeInformation[Item] = deriveTypeInformation
And I think it is missing the java.time.Instant
Serializable as well
Hey @novakov-alexey sorry for the late response, I was away for the weekend. Making type mappers serializable solves the problem ๐
Thanks for confirming. We will do that in the next release as well.
Anything I could help with? I would like this to be released sooner to avoid implementing type information manually for these types.
@buinauskas yes, do you want to make a PR ? It will speed up the things
@novakov-alexey good, I've submitted a fix.
@buinauskas thank you for your contribution!
Excellent, thank you!