flink-extended/flink-scala-api

BigDecMaper not serializable

Closed this issue ยท 9 comments

Hey, I've got this (simplified) class and encountered an issue with BigDecMapper not being serializable.

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flinkx.api.serializers._

case class Item(
  id: Long,
  price: BigDecimal,
  currency: String,
)

object Item {
  implicit lazy val typeInformation: TypeInformation[Item] = deriveTypeInformation
}

This compiles well, but when I execute my test suite, this is being thrown:

org.apache.flink.api.common.InvalidProgramException: The implementation of the CaseClassSerializer is not serializable. The object probably contains or references non serializable fields.
  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:170)
  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
  at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2360)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1997)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1980)
  at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1959)
  at org.apache.flinkx.api.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:618)
  at com.vinted.search.commons.test.fixtures.FlinkTest$TestSourceExtensions.fromList(FlinkTest.scala:84)
  ...
  Cause: java.io.NotSerializableException: org.apache.flinkx.api.mapper.BigDecMapper
  at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
  at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
  at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1500)
  at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
  at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
  at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1543)
  at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1500)
  at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1423)
  at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1169)
  at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1369)

I tried marking typeInformation as @transient so that flink would not send it over the wire, but that did not help, how could I solve this?

Thank you ๐Ÿ™

Hi @buinauskas

It seems we have forgotten to make BigDecMapper Serializable.
Can you quickly check in your code base the following code? If it works fine, then I can make the same change in this library:

import org.apache.flinkx.api.serializer.MappedSerializer.TypeMapper
import java.math.{BigDecimal => JBigDecimal}

 // adding Serializable interface
 class BigDecMapper2 extends TypeMapper[scala.BigDecimal, java.math.BigDecimal] with Serializable {
      override def map(a: BigDecimal): java.math.BigDecimal       = a.bigDecimal
      override def contramap(b: java.math.BigDecimal): BigDecimal = BigDecimal(b)
 }

object Item {
  implicit lazy val bigDecMapper2: TypeMapper[scala.BigDecimal, JBigDecimal] = new BigDecMapper2()
  implicit lazy val bigDecInfo2: TypeInformation[scala.BigDecimal] = mappedTypeInfo[scala.BigDecimal, JBigDecimal]

  implicit lazy val typeInformation: TypeInformation[Item] = deriveTypeInformation
}

And I think it is missing the java.time.Instant Serializable as well

Hey @novakov-alexey sorry for the late response, I was away for the weekend. Making type mappers serializable solves the problem ๐Ÿ‘

Thanks for confirming. We will do that in the next release as well.

Anything I could help with? I would like this to be released sooner to avoid implementing type information manually for these types.

@buinauskas yes, do you want to make a PR ? It will speed up the things

@novakov-alexey good, I've submitted a fix.

Excellent, thank you!