Streams Spark Build Status

This project is based upon streams-flink that tries to slightly modify streams-storm project in order to adapt it to Flink. Here we do the same procedure to adapt it to Spark Streaming. This way we can achieve parsing of XML configuration files for streams framework and translating them into Spark's data flow graph.

The XML definition of streams process has not been changed. We have support for services and queues. We can still use copies attribute in <process ...> tag in order to controll the level of parallelism. Each copy is then mapped to a task slot inside of the Flink cluster. Each process, e.g. as the following

<process input="data" id="extraction" copies="1">

is translated to a flatMap function.

JavaDStream<Data> dataJavaDStream = receiver.flatMap(function);

Parallelism level is set over copies attribute. In Spark Streaming it can be defined through the number of the partitions in a RDD. On the one hand it can be set through batch interval / block interval or we can repartition the data manually as followed:

if (el.hasAttribute("copies")) {
	receiver = receiver.repartition(Integer.parseInt(element.getAttribute("copies")));
}

This value should be computed as the number of all available cores minus 1 core for the driver.

The easiest way to start a Spark job is to use the submit script by Spark itself (many configurations can be set here or directly in code):

./bin/spark-submit --class spark.deploy_on_spark --master spark://129.217.30.197:6066 \
            --deploy-mode cluster \
            --executor-memory 9G \
            --driver-memory 2G \
            --num-executors 2 --executor-cores 5 \
            --conf "spark.eventLog.dir=file:///home/egorov/spark-eventlog" \
            --conf "spark.eventLog.enabled=true" \
            --conf "spark.streaming.unpersist=true" \
            --conf "spark.ui.showConsoleProgress=false" \
            --conf "spark.streaming.backpressure.enabled=true" \
            --conf "spark.streaming.ui.retainedBatches=300" \
            --conf "spark.ui.retainedStages=300" \
            --conf "spark.locality.wait=1s" \
            --conf "spark.locality.wait.node=0s" \
            --conf "spark.locality.wait.rack=1s" \
            --conf "spark.worker.cleanup.enabled=true" \
            --conf "spark.worker.cleanup.interval=1800" \
            --conf "spark.executor.logs.rolling.maxRetainedFiles=4" \
            --conf "spark.executor.logs.rolling.strategy=time" \
            --conf "spark.executor.logs.rolling.time.interval=hourly" \
            --conf "spark.executor.extraJavaOptions=-XX:MaxPermSize=2G -XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties" \
            file:///home/egorov/streams-spark-0.9.25-SNAPSHOT-spark-provided.jar /home/egorov/example.xml

spark.locality.wait{.node,.rack} setting can be used to enable splitting the tasks among several nodes. Otherwise default setting of 3 seconds will be used and possibly all tasks will run on the node where the data is received.

Some further optimizations that are mentioned above has been described here.