flink-extended/flink-scala-api

Using uPickle library to deserialize JSON strings to case classes

Closed this issue · 2 comments

Hello, everyone! I was trying out Apache Flink at the first time with this awesome library, and I built a pipeline that reads data from a Pub/Sub topic, converts to a JSON format, transforms it and loads to another Pub/Sub topic.

However, I had a lot of problems to deserialize the JSON string messages to a case class. Firstly, I used the flink-json module to deserializers, but it didn't work for me. So, I thought of using another way to deserializer JSON messages to a ccase class.

In this way, I used Upickle to deserialize JSON strings to case classes, and it worked prefectly! But I still have a question: I'll have a performance degradation using uPickle to deserialize JSON instead of using flink-json library?

Is below a simple data pipeline that reads JSON strings from a socket (such as broker), and releases the JSON strings to case classes:

//> using toolkit default
//> using dep "org.flinkextended::flink-scala-api:1.18.1_1.1.6"
//> using dep "org.apache.flink:flink-clients:1.18.1"

import org.apache.flinkx.api._
import org.apache.flinkx.api.serializers._
import upickle.default.*

case class Person(name: String, age: Int) derives ReadWriter

@main def main =
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env
    .socketTextStream("localhost", 9999)
    .map(t => read[Person](t)) // convert a JSON string to Person class
    .map(p => p.copy(p.name.toUpperCase, p.age * 2)) // transform data converted to Person class
    .map(p => s"Hello, ${p.name}, you're ${p.age} years old!")
    .print()

  env.execute("myStream")
# send these messages to socket:
# {"name": "John", "age": 20}
# {"name": "Mary", "age": 25}
#
# Application console
SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
5> Hello, JOHN, you're 40 years old!
6> Hello, MARY, you're 50 years old!

In short, it's ok to work this way to deserialize JSON strings to case classes, instead of using flink-json module? With uPickle, the code is cleaner too!

Thanks in advance.

yes, it is ok to use any other library to parse JSON string into a case class instance. I have never used flink-json.
The problems you had are probably related to the support of Scala in the first place, just guessing here.

Thank you so much for your answer, @novakov-alexey! And congrats for this awesome library. Please keep it up to date :).