dfdx/Spark.jl

Can we abandon RDD API?

Closed this issue · 5 comments

dfdx commented

Originally, Spark.jl was developed for Spark 1.x with its RDD interface, e.g. most useful functions were map, filter, etc. In Spark 2.0, the default interface changed to Dataset (DataFrame in PySpark). And now, with Spark 3.0, I meet more and more people who don't even know about RDD interface.

Another part of this equation is that our implementation of RDD API is quite unstable due to complicated interprocess communication. Compare steps for running a map function over RDD:

  1. Start Julia program and instantiate corresponding driver in JVM.
  2. Serialize Julia function and data, send them to Spark workers (JVM).
  3. In each worker, start a new Julia process, connect to it and pass serialized function and data.
  4. Serialize the result and send it back to JVM.

We have to implement and support each of these communication processes, as well as handle all possible errors and use cases. Function serialization is a disaster. Different versions of Julia on driver and workers are not covered at all. In comparison, DataFrame API consists only from these steps:

  1. Start Julia program and instantiate corresponding driver in JVM.
  2. Call JVM functions via JavaCall.

The only moment when we (may) need to serialize Julia functions is in UDFs, which are much more controllable than arbitrary functions.

So my question to Spark / Spark.jl users is: do you actually need RDD API?

aviks commented

So to me, the biggest reason to use Spark.jl is the ability to run Julia UDFs in a distributed fashion. That is, we use Spark to partition the data among nodes, and run a Julia function on the local data in each node. If that is easy and fast, we probably do not care whether the RDD or Dataset features are used.

dfdx commented

A quick update on UDFs. Although a UDF in Spark can be implemented a single class which evaluates its inputs, for UDFs that call external processes a more sophisticated handling is needed. In particular, PythonUDF implements Expression interface and can be included into logical query plan, but when Spark translates the logical plan to a physical one (either during analysis, or optimization - I'm not sure yet), PythonUDF is replaced with something like ArrowEvalPythonExec which applies serialized Python functions to the input of type Seq[InternalRow].

Translation from a logical to a physical plan is implemented using strategies (instances of Strategy == SparkStrategy class). For Python, it's PythonEvals object. As far as I understand, all activated strategies are listed in SparkPlanner.strategies sequence. At this point I'm not sure if it's possible to alter the list of strategies from the external library, so we may need to add a strategy for Julia directly to Spark source code.

Contributing directly to Apache Spark implies we commit to supporting the high quality of the code now and later, as well as closely following all the changes in new releases. Even without it, messing up with Spark's query plans sounds like a huge time investment. Now I want to step back and reconsider the problems we solve with Spark.jl, and whether there's an easier way to achieve them in the long run.

Thoughts and opinions are welcome.

Drvi commented

At work we use primarily PySpark which benefits greatly from the DataFrame API, I've used plain RDDs only a couple of times because of that. If Spark.jl supported the DataFrame API, it would be easier for me to try to promote it to my colleagues:)

dfdx commented

@Drvi Note that the current version of Spark.jl already supports the most popular DataFrame functions, including select, group_by, join, etc. Other functions are usually easy to add by request. The only big issue is the UDFs, for the reasons I described above.

dfdx commented

RDD API has been discontinued.