AbsaOSS/ABRiS

Do I have to restart the Spark streaming job for new schema to take effect ?

Opened this issue · 8 comments

I was trying Abris library and consuming CDC record generated by Debezium.

val abrisConfig: FromAvroConfig = (AbrisConfig
  .fromConfluentAvro
  .downloadReaderSchemaByLatestVersion
  .andTopicNameStrategy(topicName)
  .usingSchemaRegistry(schemaRegistryURL))

val df=(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokerServers)
  .option("subscribe", topicName)
  .load())

val deserializedAvro = (df
  .select(from_avro(col("value"), abrisConfig)
          .as("data"))
  .select(col("data.after.*")))
deserializedAvro.printSchema()

val query = (deserializedAvro
  .writeStream
  .format("console")
  .outputMode("append")
  .option("checkpointLocation", s"s3://$bucketName/checkpoints/$tableName")
  .trigger(Trigger.ProcessingTime("60 seconds"))
  .start())

I added column while the streaming job is running. I was expecting it to print the new col that I added. It did not. Does it not dynamically refresh the schema from the version information in the payload ?
Do I have to restart the spark streaming job to process/view new columns ?

Hi @akshayar
In Spark, it's impossible to change the schema during a run. So, indeed you have to restart the streaming job to consume the new field.

Thanks @kevinwallimann . Is this a feature that you are considering to implement ? Or the nature of problem itself is such that it can not be solved?

Hi @akshayar This is a limitation of Spark, so it's not in the scope of this project.

What could be done is to run Spark's streaming "forEachBatch" and initialize the AbrisConfig inside it. But it is a really limiting approach.

I'm a bit surprised that there is no way to implement a refresh behaviour. What if AvroDataToCatalyst did re-read the readerSchema from time to time and re-instantiate an AbrisAvroDeserializer based on a fresh schema ? In spark, UDFs can have side-effects and this problem seems quite similar to me.

Look at this code:

.select(from_avro(col("value"), abrisConfig)

from_avro is called here to create a Spark Expression. The expression has method dataType that is called by Spark to get what type will the column in resulting DataFrame have. Abris return the type converted from the reader schema.

If we later decide to change the reader schema we still have no way to change the Spark type, because it calls us not the other way.

As far as I know DataFrame`s dataType cannot be changed you can only create new dataFrame so that is other limitation here.

I am not expert on Spark streaming so if you think this can be done please provide some code example how this should work.

Thanks for the answer. I see the issue: Spark infers the DataFrame schema only once and it cannot changed.

An alternative would be to be able to stop the app automatically when a schema change happens so that it could be restarted (watchdog) with the new schema.

Hi @github-raphael-douyere Spot on. Theoretically, it may be possible to change the logical plan from one microbatch to the next one, but I don't know how this could be implemented or if it would break something else in Spark.

Your suggestion to restart a query when a schema change has been detected can work. In case of a schema change in the middle of a microbatch, some messages might still be consumed using the old schema.

Maybe these constraints aren't present with the continuous mode, but I'm no expert about the continuous mode.

If you find a solution to your problem, please feel free to share it here.

Hi, I am facing the same issue just wanted to know if there was a solution to this problem.
Thanks
@akshayar @github-raphael-douyere