A functional wrapper around Spark to make it work with ZIO,
improve error management and increase performances.
You can find the documentation of zio-spark here.
The documentation covers additional subjects like CancellableJobs
, code generation, ...
You can find the scaladoc of zio-spark here.
- Exhaustive support of the apache.spark API (released april 2022, using ScalaMeta and code generation)
- Support for Scala 3 (released early december 2022)
- complete wrapper for SparkContext (small update)
- port of spark-test to zio-spark (creation of zio-spark-test)
- integration of typed-queries
You can ask us (Dylan, Jonathan) for some help if you want to use the lib or have questions around it : https://calendly.com/zio-spark/help
If you want to get the very last version of this library you can still download it using:
libraryDependencies += "io.univalence" %% "zio-spark" % "0.12.0"
You can use Gitter 8 to create an example application, with all the dependencies.
For Scala 2.13
sbt new univalence/zio-spark.g8
For Scala 3
sbt new univalence/zio-spark.g8 --useScala3=true
If you want to get the latest snapshots (the version associated with the last commit on master), you can still download it using:
resolvers += Resolver.sonatypeRepo("snapshots"),
libraryDependencies += "io.univalence" %% "zio-spark" % "<SNAPSHOT-VERSION>"
You can find the latest version on nexus repository manager.
ZIO-Spark is compatible with Scala 2.11, 2.12 and 2.13. Spark is provided, you must add your own Spark version in build.sbt (as you would usually).
libraryDependencies ++= Seq(
"io.univalence" %% "zio-spark" % "0.10.0",
"org.apache.spark" %% "spark-core" % "3.3.1" % Provided,
"org.apache.spark" %% "spark-sql" % "3.3.1" % Provided
)
We advise you to use the latest version of Spark for your scala version.
We worked to make zio-spark available for Scala 3, so it works with zio-direct.
import zio.*
import zio.direct.*
import zio.spark.sql.*
//import for syntax + spark encoders
import zio.spark.sql.implicits.*
import scala3encoders.given
//throwsAnalysisException directly
import zio.spark.sql.TryAnalysis.syntax.throwAnalysisException
object Main extends ZIOAppDefault {
val sparkSession = SparkSession.builder.master("local").asLayer
override def run = {
defer {
val readBuild: RIO[SparkSession,DataFrame] = SparkSession.read.text("./build.sbt")
val text: Dataset[String] = readBuild.run.as[String]
text.filter(_.contains("zio")).show(truncate = false).run
Console.printLine("what a time to be alive!").run
}.provideLayer(sparkSession)
}
}
build.sbt
scalaVersion := "3.2.1"
"dev.zio" %% "zio" % "2.0.5",
"dev.zio" % "zio-direct_3" % "1.0.0-RC1",
"io.univalence" %% "zio-spark" % "0.12.0",
("org.apache.spark" %% "spark-sql" % "3.3.1" % Provided).cross(CrossVersion.for3Use2_13),
("org.apache.hadoop" % "hadoop-client" % "3.3.1" % Provided),
"dev.zio" %% "zio-test" % "2.0.5" % Test
There are many reasons why we decide to build this library, such as:
- allowing user to build Spark pipeline with ZIO easily.
- making better code, pure FP, more composable, more readable Spark code.
- stopping the propagation of
implicit SparkSessions
. - improving some performances.
- taking advantage of ZIO allowing our jobs to retry and to be run in parallel.
"What if Spark was using better functional programming and an effect system?"
zio-spark is built with this main idea in mind, to rewrite the existing API in Spark using better functional programming principle. You will find a corresponding type for the existing API :
org.apache.spark | zio.spark |
---|---|
sql.Dataset | sql.Dataset |
sql.SparkSession | sql.SparkSession |
... | ... |
It comes with different API, for example :
/**
* Returns the number of rows in the Dataset.
* @group action
* @since 1.6.0
*/
def count(implicit trace: Trace): Task[Long]
compare to
/**
* Returns the number of rows in the Dataset.
* @group action
* @since 1.6.0
*/
def count(): Long
Another example, with errors, which allows you to handle the case where the column do not exist :
/**
* Selects column based on the column name and returns it as a
* [[Column]].
*
* @note
* The column name can also reference to a nested column like `a.b`.
*
* @group untypedrel
* @since 2.0.0
*/
def col(colName: String): TryAnalysis[Column]
compare to
def col(colName: String): Column
zio-spark can be use with existing Spark code, without modifications :
def existingCode(implicit ss:org.apache.spark.sql.SparkSession):org.apache.spark.sql.Dataset[String] = {
import ss.implicits._
ss.read.parquet("toto.parquet").as[String]
}
//...
val out=
zio.spark.sql.fromSpark(existingCode).flatMap(ds => ZIO.attempt(ds.count()))
//or lift using .zioSpark to start using the new API
val out =
zio.spark.sql.fromSpark(existingCode).flatMap(_.zioSpark.count)
One of the core principle is you should be able to integrate zio-spark into an existing codebase, without major modifications. In most case you can even just change the imports, and fix the compilation errors related to effects (dataset reads, job launches, ...).
It's not as battle tested as it should be at the moment, we are migrating progressively existing projects to this new version.
We did a conference talk at the end 2019 ( https://www.youtube.com/watch?v=1ttsi0YwMkI ) on it, but in French.
Strangely there have been fewer conferences in 2020 - 2021 - ... or we have been very busy at work.
With the rewrite in 2022, we will do some conference with the new design in 2023, in French and in English to present the project.
- ZparkIO a framework for Spark, ZIO
- iskra from VirtusLab, and interresting take and typesafety for Spark, without compromises on performance.
- spark-scala3, one of our dependency to support encoders for Spark in Scala3.
Pull requests are welcomed. We are open to organize pair-programming session to tackle improvements. If you want to add
new things in zio-spark
, don't hesitate to open an issue!
You can also talk to us directly using this link if you are interested to contribute https://calendly.com/zio-spark/contribution.