/spark-streaming-checkpoint

Spark Streaming Checkpoint File Manager for MinIO

Primary LanguageScalaApache License 2.0Apache-2.0

Spark Streaming Checkpoint File Manager for MinIO

This project implements a MinIO native CheckpointFileManager for Apache Spark Structured Streaming. MinIO is a strictly consistent S3-API compatible object store; all object operations are atomic and transactional. This native CheckpointFileManager takes full advantage of the native object APIs and eliminates the Hadoop HCFS emulation layer, which is inefficient and unnecessary on object stores.

Since filesystems did not support ACID transactions, applications wrote the files to a temporary location and used atomic renames to mimic the commit operation. Object stores do not have a rename API because the objects do not appear in the namespace until the put or put-multipart transaction is complete. The default CheckpointFileManager shipped with Apache Spark is designed for HDFS and POSIX-based filesystems and it emulates rename API on the object store using PUT-COPY-LIST-DELTE APIs.

Sample Code used in testing

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object SparkStreamingFromDirectory {

  def main(args: Array[String]): Unit = {

    val spark:SparkSession = SparkSession.builder()
      .appName("SparkByExample")
      .config("spark.sql.streaming.checkpointFileManagerClass", "io.minio.spark.checkpoint.S3BasedCheckpointFileManager")
      .master("local[1]").getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "http://127.0.0.1:9000")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "minioadmin")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "minioadmin")

    val schema = StructType(
      List(
        StructField("RecordNumber", IntegerType, true),
        StructField("Zipcode", StringType, true),
        StructField("ZipCodeType", StringType, true),
        StructField("City", StringType, true),
        StructField("State", StringType, true),
        StructField("LocationType", StringType, true),
        StructField("Lat", StringType, true),
        StructField("Long", StringType, true),
        StructField("Xaxis", StringType, true),
        StructField("Yaxis", StringType, true),
        StructField("Zaxis", StringType, true),
        StructField("WorldRegion", StringType, true),
        StructField("Country", StringType, true),
        StructField("LocationText", StringType, true),
        StructField("Location", StringType, true),
        StructField("Decommisioned", StringType, true)
      )
    )

    val df = spark.readStream
      .schema(schema)
      .json("./resources/")

    df.printSchema()

    val groupDF = df.select("Zipcode")
        .groupBy("Zipcode").count()
    groupDF.printSchema()

    groupDF.writeStream
      .format("console")
      .outputMode("complete")
      .option("truncate", false)
      .option("newRows", 30)
      .option("checkpointLocation", "s3a://process-runner/checkpoints/")
      .start()
      .awaitTermination()
  }
}

The resources used for streaming inputs.

tree ../resources/
../resources/
├── zipcode10.json
├── zipcode11.json
├── zipcode12.json
├── zipcode1.json
├── zipcode2.json
├── zipcode3.json
├── zipcode4.json
├── zipcode5.json
├── zipcode6.json
├── zipcode7.json
├── zipcode8.json
└── zipcode9.json

0 directories, 12 files

Results (concise)

Optimization can be seen in terms of total time taken for Batch '0'

Without Optimization With Optimization
72secs 17secs

Total number of namespace pollution

Total DEL markers without optimization Total DEL markers with optimization
409 0

Total number of excess objects on namespace

Total excess objects without optimization Total excess objects with optimization
818 (out of which 409 are DEL markers) 0

Total number of API calls

Total number of API calls without optimization Total number of API calls with optimization
6938 224

The number of excess calls to object ratio

API Calls / Objects without optimization API Calls / objects with optimization
33.8x 1.09x

These results show the overall benefits of using this CheckpointFileManager, and why the upstream s3a based checkpointing is poorly designed to be used with object storage.

Results (detailed) with each steps

Spark-shell with S3A based checkpointing

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/
         
scala> :load SparkStreamingFromDirectory-S3A.scala
Loading SparkStreamingFromDirectory-S3A.scala...
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
defined object SparkStreamingFromDirectory

scala> SparkStreamingFromDirectory.main(Array(""))
23/02/25 02:14:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
root
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- Xaxis: string (nullable = true)
 |-- Yaxis: string (nullable = true)
 |-- Zaxis: string (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: string (nullable = true)

root
 |-- Zipcode: string (nullable = true)
 |-- count: long (nullable = false)

-------------------------------------------                                     
Batch: 0
-------------------------------------------
+-------+-----+
|Zipcode|count|
+-------+-----+
|76166  |2    |
|32564  |2    |
|85210  |2    |
|36275  |3    |
|709    |3    |
|35146  |3    |
|708    |2    |
|35585  |3    |
|32046  |2    |
|27203  |4    |
|34445  |2    |
|27007  |4    |
|704    |10   |
|27204  |4    |
|34487  |2    |
|85209  |2    |
|76177  |4    |
+-------+-----+

Amount of calls

mc support top api myminio/

API                             RX      TX      CALLS   ERRORS 
s3.CopyObject                   48 KiB  47 KiB  208     0     
s3.DeleteMultipleObjects        146 KiB 47 KiB  417     0     
s3.DeleteObject                 32 KiB  0 B     211     0     
s3.GetObject                    168 B   1.3 KiB 1       0     
s3.HeadObject                   441 KiB 0 B     2950    0     
s3.ListObjectsV2                408 KiB 1.4 MiB 2732    0     
s3.PutObject                    128 KiB 0 B     419     0     

Summary:

Total: 6938 CALLS, 1.2 MiB RX, 1.5 MiB TX - in 72.36s

The amount of files left over in the wake of this behavior on a versioned buckets.

~ mc ls -r --versions myminio/process-runner/ | wc -l
1023

Our of which 614 actual objects

~  mc ls -r --versions myminio/process-runner/ | grep PUT | wc -l
614

and almost 409 delete markers (soft deletes)

~ mc ls -r --versions myminio/process-runner/ | grep DEL | wc -l
409

Actual objects on namespace without versioning lookup

~ mc ls -r myminio/process-runner/  | wc -l
205

After Direct Checkpointing Write Optimization

...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/
         
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :load SparkStreamingFromDirectory.scala
Loading SparkStreamingFromDirectory.scala...
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
defined object SparkStreamingFromDirectory

scala> SparkStreamingFromDirectory.main(Array(""))
23/02/25 02:20:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
root
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- Xaxis: string (nullable = true)
 |-- Yaxis: string (nullable = true)
 |-- Zaxis: string (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: string (nullable = true)

root
 |-- Zipcode: string (nullable = true)
 |-- count: long (nullable = false)

-------------------------------------------                                     
Batch: 0
-------------------------------------------
+-------+-----+
|Zipcode|count|
+-------+-----+
|76166  |2    |
|32564  |2    |
|85210  |2    |
|36275  |3    |
|709    |3    |
|35146  |3    |
|708    |2    |
|35585  |3    |
|32046  |2    |
|27203  |4    |
|34445  |2    |
|27007  |4    |
|704    |10   |
|27204  |4    |
|34487  |2    |
|85209  |2    |
|76177  |4    |
+-------+-----+
~ mc support top api myminio/

API                     RX      TX      CALLS   ERRORS 
s3.GetObject            159 B   1.3 KiB 1       0     
s3.HeadObject           1.5 KiB 0 B     10      0     
s3.ListObjectVersions   765 B   2.0 KiB 5       0     
s3.PutObject            88 KiB  0 B     208     0     

Summary:

Total: 224 CALLS, 90 KiB RX, 3.3 KiB TX - in 17.00s

Actual number of valid objects

~ mc ls -r --versions myminio/process-runner/ | wc -l
205

Actual objects on namespace without versioning lookup

~ mc ls -r myminio/process-runner/  | wc -l
205