samelamin/spark-bigquery

Spark 2.1 - Structured Streaming Watermarking Exception

Closed this issue · 28 comments

Hi guys,

I am trying to use the structured streaming BQ sink to execute this sliding window:

val orderCreatedEventsSlidingWindowStreamingDF =
      orderCreatedEventsStreamingInputDF
        .withColumn("timestamp",date_format($"event.eventOccurredTime", "YYY-MM-dd'T'hh:mm").cast("timestamp"))
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
          window($"timestamp","10 minutes","5 minutes")
        )
        .count()

orderCreatedEventsSlidingWindowStreamingDF
  .writeStream
      .option("checkpointLocation", "/mnt/streaming-checkpoints")
      .option("tableReferenceSink",s"$bigQueryProjectId:$sdvDataset.$orderCreatedSlidingWindowByHourStreamingTable")
      .format("com.samelamin.spark.bigquery")
      .outputMode(OutputMode.Append)
      .start()

....(note the 'append' mode needed to have watermarks....and some othe boilerplate code is left out above) and have run into this exception setting a watermark for my streamingQuery.

java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark timestamp#364: timestamp, interval 10 minutes

Firstly is there anything wrong with my watermarking code it appears right? Has anyone created a sliding window similar to this in this Append mode and get it to work with this library?

Secondly if I change this to 'complete' mode (I do want the watermarked sliding window in append mode but plaing around with all modes), I can get data streamed into BQ, if I remove the watermark and set to 'Complete' mode, I get some sample results in the link

https://www.dropbox.com/s/8kvct3oftbu3e9n/Screenshot%202017-05-12%2014.35.26.png?dl=0

However the data been written to BQ seems to be appended not overwritten (refer rows 4 and 5 for the same window but different BQ load times).

Id expect there to only be the same bq_load_timestamp at any given time in this BQ table (the last load time) and only ever 1 unique row for a window? This indicates both of those dont hold true.

I am missing something on these? Appreciate your thoughts?

(note: Im using 2.2 RC2 to access the soon to be GAed kinesis stream connector, but this shouldnt matter)

Thanks!

Can anyone help on this please?

Hi @kurtmaile sorry for the late reply I am away at a conference

At the moment the connector is always appending to a BQ table, hence the output mode is always append

I personally have not used watermarking yet but I can have a look at replicating it when I am back home next week

Regards
Sam

Hi,

No probs and thanks for looking into it when you can, would be a very cool feature addition.

Good luck with the presentation and conference, hoping its recorded so I can watch it later. :)

Cheers mate!

Hi Sam,

Hope you had fun at the conference! Where are you based BTW?

Quick one on this topic. Thanks so much for offering to look at it next week. Any chance this would be early in the week? Of course its up to you as your time you are offering :) I have proposed and built a solution using structured streaming which depends on this feature - so obviously very grateful for your help. This library is very cool, hopefully I can talk you through our solution at some point in the next few months on a hangout or something.

Im not sure on the edicate here so dont mean to offend in anyway, but Id be happy to donate a little bit of money to you or any cause youd want for your help in closing some of the gaps like this and the MapType one (still a blocker). I know you would do it anyway when you can and time permitting, guess Im trying to acknowledge and thank you for your time and expertese in helping to close such gaps. Just a thought anyway.

Cool and thanks so much.

p.s - is there a recording of your presentation from the conference? How did it go / was it recieved well? Would love to watch it

Hey @kurtmaile thanks for the kind words. Im based in London. Very kind of you to offer to help and I might take you up on the offer to donate! That said you dont have to as I will be implementing it time permitting over the next few weeks

I personally have not used watermarking so perhaps if we have a call to discuss what you are trying to build exactly there might be a solution while you wait on a fix. Same for the MapType

With regards to the conference it was @ Riga DevDays, the recording should be up shortly

Cool happy to donate for sure given my immediate need and benefit! Legend.

Ill put a few slides together to discuss then on my streaming use case. Briefly in the meantime I am doing a lot of order sliding window aggregations (e.g sales per 10 mins), which needs watermarks for late event handling and so Spark can clean/free up memory. These aggregations (plus the raw events) make their way to BQ. Kinesis is source (events pushed from microservices) which is in v2.2.

re: the MapType, will you just convert a Spark Map to a BQ Array of KV tuple? On my brief look that seemed how youd map it to BQ Types?

Hey Sam,

Let me know where you'd like a donation on this BTW? Structured streaming in particular will be so key going forward and really excited to be using this with BigQuery. Im based in London too we should look out for a Spark meetup to meet at (and present?). Really cool stuff so enjoying it.

Im sure you've seen these but just incase you haven't:

https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html

FYI you can contact me on kurt.maile@xiatech.co.uk if email is preferred on any coms.

Hey sounds great lets pencil in Sunday although I have a baby so given its a bank holiday weekend and shes a tad unpredictable I cant guarentee it that but lets aim for it! Do you think you will get anytime to start on the features this week? Not trying to be pushy just some tight timelines from my project....cheers!

Hey Sam,

Hope you had a nice long weekend!

Are you going to Spark summit next week? Wish I was - pretty excited to listen to the keynotes and sessions on replay at least.

Quick question - any progress on the structured streaming 2.1 watermark feature addition by any chance? Think also supporting the 'complete' output mode would be good too which I guess translates to an 'Write_Truncate' write disposition?

Cheers mate! :)

Hi @kurtmaile I think I might be able to pick it up this weekend, but it would be useful to have that call first to try and understand what you are attempting to do

Hi Sam, Sorry Ive only just seen this! Apologies - was my daughters first birthday last weekend and was hectic organising a load of things.....and I was a bit porly myself.

Did you make any progress? Ill try and get something to you first for context and we can have a call soon - do you have contact email?

Once again really sorry I missed your above comment!

Kurt

Hi I think its easiest to recap on the original post now while I have a work break as its a pretty basic structured streaming example - a sliding window of length 10 minutes, sliding/trigerring every 5, that counts the number orders in these windows. My team want this moving graph to be visualised showing order counts for our new Hybris ecom platform end of June.

So my basic df is as follows:

val orderCreatedEventsSlidingWindowStreamingDF =
      orderCreatedEventsStreamingInputDF
        .withColumn("timestamp",date_format($"event.eventOccurredTime", "YYY-MM-dd'T'hh:mm").cast("timestamp"))
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
          window($"timestamp","10 minutes","5 minutes")
        )
        .count()

Then when I write the streaming query I get the error as originally posted.

So this is an aggregation on 'event time' (event.eventOccurredTime) - where the full timestamp is formatted to minutes granularity for window grouping.

For BQ we have an append mode, which is fine providing I can provide a watermark as the semantis are to only writesout the result once...and spark will thrown an exception anyway.....so with a watermark of 10 minutes in this example, I can guarentee (pretty much) all orders for this period what have been sent by Hybris and thus the prefix of data can be appended to BQ SS table.

Thats really it to be honest - the reference which Im sure youve read is below.

As the databricks documentation mentions, the output modes are supported below as follows.

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

With event time aggregation like this, I need Append output mode (complete not needed / appropriate, update would be nice but is 'new' and well the sql update capability for BQ standard SQL DML is limited in quota...bummer....so the natural is append) . But watermark is needed. Spark now throws an assertion error as the first post shows - you used to not have to provide it.

Really hope this is simple enough to do as pretty desperate for the functionality - I have a number of these order like streaming metrics to build in the next few weeks.

Once again Im sooo sorry to have missed you last weekend, hope I havent missed your only near-term window to introduce this feature! As mentioned happy to contribute too for your work.

p.s - LOVING the library so far, its rock solid, no issues yet. Ive built some simple-ish raw event pipeline where I dont do any aggregations, just some classic 'etl' of a kinesis event stream and into BQ. It all uses strucured streaming, this hasnt needed watermarks yet so havent been blocked but I will need this very soon. I also removed the 'toLower' on the casing of the schema fields and this has been working fine. However due to time pressures I didnt try to do anything neat (yet) that considers backwards compatibility for others that hafve used this and thus their schemas are lowercase? But it works fine.

Cheers!!!!

Hi @kurtmaile sorry for the late reply, its conference season so I have been away. I am struggling to find time to fix this tbh, I will try to sit down this weekend and see if I can write something up

With regards to the lowercase, its fine you can send a pr. just bump up the version so users know its a breaking change

Hey Sam, thanks for the reply! Yeah loads going on all interesting and great you are presenting too! re: the watermarks, understood you can only do what you can but would be VERY indebted to you if you do and the donation stands...another use case I've got is streaming de-duplication,

streamingDf.dropDuplicates("eventId") 

as the upstream can occasionally send dups into kinesis. But this requires watermarks too.

Cheers for your help and understanding!
Kurt

p.s - good luck with your presentations too

p.s - any of them recorded I could watch?

FYI - As I mentioned am keen to donate generously for your efforts particularily as its your precious time in a busy period - and the rest of the community can benefit of course! SS is picking up bigtime gith the GA of SS.

Im going to also hopefully get to write a blogpost on databricks/BQ on our project, of which you and this project will be most noteably be mentioned :)

Cool, excited.

Question - had you looked at BQ streaming inserts at all for SS?

Hiya,

Hope your presentations have been going well Sam.

Any chance youve been able to progress the watermarking?

Cheers mate!

Hi @kurtmaile I just got back, yes the presentations has been going well

Sorry for the slow progress on this, its just me working on this connector and I can use some help

I still have to read up on watermarking and understand how to best implement it with BQ which is why I would have liked the call to understand the requirements

DM me on Twitter so we can sort something out

Also the talks from RigaDevDays are out so if you are still interested you can stream it online

Hi @kurtmaile I am just sitting down to try and understand your requirements

just a few questions, I am assuming the source data you are reading from is Kinesis?

I suggest enabling Watermarking SS to BQ with append for now

I think thats achievable in the short term and we can iteratively moved forward

So just so I can understand the requirement, you are attempting to update the count to BQ and overwrite? or is it append?

So every 5 mins we trigger(sliding window) a count of the last 10 mins of orders then write to BQ with a load timestamp of the trigger time

Is that correct?

Hi Sam,

Hope you had a nice weekend.

Did you manage to make progress with watermarking?

If you create a branch / PR for it let me know and I can take a look?

Cheers mate!

Hi Sam,

How are you getting on with the watermarking mate?

Cheers
Kurt

Hi Sam,

Oh sorry to hear hope its not too serious! Wish you well in your recovery. Health is the most important thing so take it easy and all the best mate.

Kurt

@kurtmaile Really good news for you!
I am sure you will be glad to know this issue has now been resolved. Please use ver 0.1.8 and give me a shout if you have any more issues :)

It was a spark bug raised here https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala#L45

TLDR: The reason is Dataset.foreachPartition() would further call Dataset.rdd(), but Dataset.rdd() will create a new plan.

Because StreamExecution uses the existing plan to collect metrics and update watermark, we should never create a new plan. Otherwise, metrics and watermark are updated in the new plan, and StreamExecution cannot retrieval them.
Hence, we need to manually convert internal rows to objects using encoder.

Hi Sam,

Oh brilliant, very timely, thanks so much for this. Looking forward to giving it a crack! Will let you know how I get on.

Thoughts on the BQ quota issue I created?

Hope the recovery is going well and you didnt exert too much energy :) Feel sorry for you as eye is not something you want to have issues with. I have a very bad back which gives me alot of grief so know the feeling.

When we both get some bandwidth, we should look at BQ streamingInserts too for SS, as it has the nice property of exactly once and removing duplicates. I have a quite number of duplicates in BQ, for which I create a view over the raw table to dedup so consumers only see the latest. Future enhancement. Google CLoud Dataproc using this for its streaming sink, have you looked at this service at all?

Regards,
Kurt