databricks/Spark-The-Definitive-Guide

Sessionization Example never times out

aremash opened this issue · 2 comments

When running https://github.com/databricks/Spark-The-Definitive-Guide/blob/master/code/Streaming-Chapter_22_Event-Time_and_Stateful_Processing.scala lines 239-319 (plus a few boilerplate lines, using Spark 2.2.0), "if (oldState.hasTimedOut)" on line 281 is never true for me, i.e. the timer never expires. I only see output "if (state.values.length > 1000)" on line 287. The only modification I made to the code is to change the writeStream.format from "memory" to "json" (and add the path and checkpointLocation), and I'm using the first 6 files/parts of the activity-data, both with and without readStream.option("maxFilesPerTrigger",1). I also tried changing line 309 to "CURRENT_TIMESTAMP as timestamp", but it didn't help. Do you get any output (besides empty files) if you comment out lines 287-292? Would you please confirm whether there is a bug with the code or with Spark or am I doing something wrong?

I also tried changing the GroupStateTimeout from EventTimeTimeout to ProcessingTimeTimeout on line 315 plus oldState.setTimeoutTimestamp(newState.timestamp.getTime(), "5 seconds") to oldState.setTimeoutDuration("5 seconds") on line 295, but still don't observe any time-outs.

There might be a couple of issues here.

  1. That's because the file sink doesn't support complete mode. In append mode, it's going to have to wait for the entire watermark to run out, which can take some time. More info here. You'd start seeing results after 10+ minutes if you're doing append because it has to wait for the watermark to complete.

  2. Also if there's no data left, or it gets processed too quickly, Spark won't "age out" old state if it processes an empty batch. This is also something that's slated for future changes.