Kafka ,File Systems(CSV,Delimiter,Parquet,orc,avro,json),Socket
Kafka ,Console,meory,foreach
#IMP: Schema Definition is manadatory to process the data
Structured Streaming is a stream processing engine built on the Spark SQL engine.
StructuredNetworkWordCount maintains a running word count of text data received from a TCP socket. DataFrame lines represents an unbounded table containing the streaming text. The table contains one column of strings value, and each line in the streaming text data becomes a row in the table.The DataFrame is converted to a Dataset of String using .as[String]. We then apply the flatMap to split each line into words Dataset. Finally, we define the wordCounts DataFrame by grouping it by the word and counting them. wordCounts is a streaming DataFrame representing the running word counts.
#1. Input sources:
```
val socketDF = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
socketDF.isStreaming // Returns True for DataFrames that have streaming sources
socketDF.printSchema
```
```
// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
.csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
```
val outputPathDir = workingDir + "/output.parquet" // A subdirectory for our output val checkpointPath = workingDir + "/checkpoint" // A subdirectory for our checkpoint & W-A logs val myStreamName = "lesson02_ss" // An arbitrary name for the stream
Mode Example Notes
Complete dsw.outputMode("complete") The entire updated Result Table is written to the sink. The individual sink implementation decides how to handle writing the entire table.
Append dsw.outputMode("append") Only the new rows appended to the Result Table since the last trigger are written to the sink.
Update dsw.outputMode("update") Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. Since Spark 2.1.1
In the example below, we are writing to a Parquet directory which only supports the append mode:
id get unique identifier of the running query that persists across restarts from checkpoint data
runId get unique id of this run of the query, which will be generated at every start/restart
name get name of the auto-generated or user-specified name
explain() print detailed explanations of the query
stop() stop query
awaitTermination() block until query is terminated, with stop() or with error
exception exception if query terminated with error
recentProgress array of most recent progress updates for this query
lastProgress most recent progress update of this streaming query
DataStreamWriter.format accepts the following values, among others:
Output Sink Example Notes
File dsw.format("parquet"), dsw.format("csv")... Dumps the Result Table to a file. Supports Parquet, json, csv, etc.
Kafka dsw.format("kafka") Writes the output to one or more topics in Kafka
Console dsw.format("console") Prints data to the console (useful for debugging)
Memory dsw.format("memory") Updates an in-memory table, which can be queried through Spark SQL or the DataFrame API
foreach dsw.foreach(writer: ForeachWriter) This is your "escape hatch", allowing you to write your own type of sink.
Delta dsw.format("delta") A proprietary sink
```
// Subscribe to 1 topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
```
```
Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int
```
Complete Mode - The entire Result Table will be written. Append Mode - Only new appended rows will be written. (Assume existing rows do not changed.) Update Mode - Updated rows in the Result Table will be written.
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data
// Select the devices which have signal more than 10
df.select("device").where("signal > 10") // using untyped APIs
ds.filter(_.signal > 10).map(_.device) // using typed APIs
// Running count of the number of updates for each device type
df.groupBy("deviceType").count() // using untyped API
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // using typed API
We want to have a 10-minutes window that report on every 5 minutes:
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
Late arrived data can be updated in the correct window:
In the update mode, rows in the result table can be updated. To reduce the amount of intermediate in-memory state to maintain, we keep a watermarking as a threshold on how late a data can arrive.
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
Some sinks (like files) do not supported the updates that the Update Mode required. In Append Mode, new rows will not be appended until the watermarking period has passed so we can account for late arrived data.
Static DataFrames can be joined with streaming DataFrame:
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join") // right outer join with a static DF
To filter duplicate records:
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// Without watermark using guid column
streamingDf.dropDuplicates("guid")
// With watermark using guid and eventTime columns
streamingDf
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("guid", "eventTime")
#9. Sink : #i File based.
writeStream
.format("parquet") // can be "orc", "json", "csv", etc.
.option("path", "path/to/destination/dir")
.start()
```
# ii.For debugging
// Write to console writeStream .format("console") .start()
# iii. To memory
writeStream .format("memory") .queryName("tableName") .start()
# Commit Asyn in Spark Streaming:
https://stackoverflow.com/questions/49375384/spark-kafka-streaming-multi-partition-commitasync-issue
# iv Write into Kafka Topic:
spark.readStream.format("kafka") .option("kafka.bootstartup.servers","host:port") .option("subscribe","kafkaTopic") .option("startingOffsets","latest") .load() .selectExpr("CAST(value as STRING) as json ","timestamp") .select(from_json(col("json"),streamctl).alias("parsed")) .select("parsed.*") .withColumn("results",explode($"results")) .select("results.user.username") .withColumn("value",regexp_replace(col("username"),"([0-9])","") .select("Value") .writeStream .option("checkpointLocation","D:/dev_path/checkpoint") .format("kafak") .option("kafka.bootstartup.servers","host:ip") .option("topic","spark-report-topic") .start() .awaitTermination()
# Write into foreachBatch (MongoDB)
ageAverage .writeStream .trigger(Trigger.ProcessingTime("10 seconds")) .outputMode("complete").foreachBatch { (batchDf: DataFrame, batchId: Long) => val df = batchDf.withColumn("batchId", lit(batchId)) df.printSchema() df.write.format("mongo").mode(SaveMode.Append) .option("uri", MongoDBConstants.spark_mongodb_output_uri) .option("database", MongoDBConstants.mongodb_database) .option("collection", MongoDBConstants.mongodb_collection_tbl) .save(); df.show(20, false); }.start();
## Write into foreachBatch (MySQL DB)
ageAverage
.writeStream
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode("complete").foreachBatch{(batchDf:DataFrame,batchId:Long) =>
val df=batchDf.withColumn("batchId",lit(batchId))
df.printSchema()
df.write.mode(SaveMode.Append).jdbc(url,"meetup_rsvp_tbl",prop)
df.show(20,false);
#10 .Managing Streaming Query
val query = df.writeStream.format("console").start() // get the query object
query.id // get the unique identifier of the running query that persists across restarts from checkpoint data
query.runId // get the unique id of this run of the query, which will be generated at every start/restart
query.name // get the name of the auto-generated or user-specified name
query.explain() // print detailed explanations of the query
query.stop() // stop the query
query.awaitTermination() // block until query is terminated, with stop() or with error
query.exception // the exception if the query has been terminated with error
query.recentProgress // an array of the most recent progress updates for this query
query.lastProgress // the most recent progress update of this streaming query
#11 Asynchronous API
Query listener:
val spark: SparkSession = ...
spark.streams.addListener(new StreamingQueryListener() { override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { println("Query started: " + queryStarted.id) } override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { println("Query terminated: " + queryTerminated.id) } override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { println("Query made progress: " + queryProgress.progress) } })
# 12 .Recovering from Failures with Checkpointing
aggDF .writeStream .outputMode("complete") .option("checkpointLocation", "path/to/HDFS/dir") .format("memory") .start()