/Spark-Structured-Streaming-Examples

Spark Streaming / Kafka / Cassandra Example

Primary LanguageScala

Kafka / Cassandra / Spark Structured Streaming Example

Stream the number of time Drake is broadcasted on each radio. And also, see how easy is Spark Structured Streaming 2.2.0 to use using Spark SQL's Dataframe API

Input data

Coming from radio stations stored inside a parquet file, the stream is emulated with .option("maxFilesPerTrigger", 1) option.

The stream is after read to be sink into Kafka. Then, Kafka to Cassandra

Output data

Stored inside Kafka and Cassandra for example only. Cassandra's Sinks uses the ForeachWriter and also the StreamSinkProvider to compare both sinks.

One is using the Datastax's Cassandra saveToCassandra method. The other another method, messier (untyped), that uses CQL on a custom foreach loop.

From Spark's doc about batch duration:

Trigger interval: Optionally, specify the trigger interval. If it is not specified, the system will check for availability of new data as soon as the previous processing has completed. If a trigger time is missed because the previous processing has not completed, then the system will attempt to trigger at the next trigger point, not immediately after the processing has completed.

Kafka topic

One topic "test" with only one partition

Send a message

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 


{"radio":"skyrock","artist":"Drake","title":"Hold On We’Re Going Home","count":38} 

Cassandra Table

A table for the ForeachWriter

CREATE TABLE test.radio (
  radio varchar,
  title varchar,
  artist varchar,
  count bigint,
  PRIMARY KEY (radio, title, artist)
);

A second sink to test the other writer.

CREATE TABLE test.radioOtherSink (
  radio varchar,
  title varchar,
  artist varchar,
  count bigint,
  PRIMARY KEY (radio, title, artist)
);

A 3rd sink to store kafka metadata in case checkpointing is not available (application upgrade for example)

CREATE TABLE test.kafkaMetadata (
  partition int,
  offset bigint,
  PRIMARY KEY (partition)
);

Table Content

Radio
cqlsh> SELECT * FROM test.radio;

 radio   | title                    | artist | count
---------+--------------------------+--------+-------
 skyrock |                Controlla |  Drake |     1
 skyrock |                Fake Love |  Drake |     9
 skyrock | Hold On We’Re Going Home |  Drake |    35
 skyrock |            Hotline Bling |  Drake |  1052
 skyrock |  Started From The Bottom |  Drake |    39
    nova |         4pm In Calabasas |  Drake |     1
    nova |             Feel No Ways |  Drake |     2
    nova |                From Time |  Drake |    34
    nova |                     Hype |  Drake |     2

Kafka Metadata

When doing an application upgrade, we cannot use checkpointing, so we need to store our offset into a external datasource, here Cassandra is chosen. Then, when starting our kafka source we need to use the option "StartingOffsets" with a json string like

""" {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """

Learn more in the official Spark's doc.

In the case, there is not Kafka's metadata stored inside Cassandra, earliest is used.

cqlsh> SELECT * FROM test.kafkametadata;
 partition | offset
-----------+--------
         0 |    171

Useful links

Inspired by

Requirements

@TODO docker compose

  • Cassandra 3.10
  • Kafka 0.10+ (with Zookeeper)