dfdx/Spark.jl

Adding support for key value Pair RDDs

Closed this issue · 15 comments

I'm working on a project that needs support for the Spark functions cartesian and groupByKey and I've been looking into how to extend the interface to support key value pair RDDs via the JavaPairRDD class.

At the moment it looks like this would be possible by duplicating the scala rdd and iterator code to add support for the pair variant. One side effect of this would be a that there would need to be pair variants of the Julia map and map_partitions functions in a similar way to how JavaRDD handles this. I'll also have to change the way objects are passed between Julia and scala to add support for tuples.

Can you foresee any issues or have any problems with extending the interface in this way?

dfdx commented

I see 2 possible solutions here:

  1. Creating a method for constructing a JavaPairRDD from JuliaRDD and converters between Java's Pair and Julia's Tuple{Any,Any}.
  2. Following the PySpark implementation.

The first case is roughly equivalent to what you propose. The most important issue here is that currently JuliaRDD is always the last RDD in a chain, i.e. once you dive into Julia, you never append any more Java RDDs. It's not a show stopper (at the end, JuliaRDD is still a valid Spark's RDD), but I've never tested anything like this. In particular, I expect that we will need more powerful deserialization mechanism on Java side. I.e. currently Spark thinks that JuliaRDD is just an RDD[Array[Byte]], and we want to make it RDD[(K,V)]. Though, we might be lucky and Spark won't notice a difference because of type erasure.

The second option is a kind of proven to work, but is based on shuffling, which is another bunch of work.

I suggest you to pick up an option that plays better with your future needs (e.g. what you may need more - call JuliaRDD from other RDDs or shuffle data across the cluster), start a new branch and ping me early so that we could figure out the details and possible issues faster.

Thanks a lot for that detailed response.

The PySpark approach might be a long term goal but for now I'm happy with relatively quick and dirty and can live with the lack of shuffling for now.

I'll be working in the pair_rdd branch on my fork and I'll let you know when there is anything either working or more likely failing in an interesting way.

Thanks for the heads up on the JuliaRDD being used as source RDDs not really being tested, I think I'll hit this quite soon.

dfdx commented

Note that PySpark isn't perfect as well. In fact, I believe we may do better with Julia, so experiments with alternative approaches is highly welcome.

Another thing you should know if you plan to spend more time on Spark.jl is that once Julia 0.6 is out I'm going to update the code and also upgrade Spark to version 2.x instead of current 1.6. Not that I expect many issues, but it's worth to keep it in mind.

Thanks, I did try and get things working with Spark 2.1 see spark_v2 in my fork, I did manage it but had to remove a lot of the logging as that has gone private.

OK I've just committed a first wave of changes.

So far cartesian works to create a pair RDD from 2 non-pair RDDs.

To get this to work I've had to mess about a lot with the typing and serialization which I think I've now simplified as the expense at present of having the types completely opaque to the Java side, could probably change that fairly simply for string, double and int if you think that's important.

dfdx commented

Wow, that's a whole lot of changes! Looks very promising.

Iterator[Array[Byte]] was itself opaque for Java side, so changing it now doesn't make the code worse. On the other hand, we may want to improve things by having Iterator[T] to enable fully typed communication between Julia and JVM, but it's absolutely not a requirement right now.

One thing that caught my eye was defining identical methods for several RDD subtypes. E.g.:

Base.parent(rdd::RDD) = rdd
Base.parent(rdd::PipelinedRDD) = rdd.parentrdd
Base.parent(rdd::PipelinedPairRDD) = rdd.parentrdd

Could equally be written:

Base.parent(rdd::RDD) = rdd.parentrddd

Because you can't have an instance of abstract RDD anyway, and all subtypes use the same code.

Thanks for taking a look.

On the base.parent overrides I need different behaviour for the JavaRDD and JavaPairRDD where the parent is the class itself hence the current rdd result. I was using the RDD to be the default and then overriding for the pipeline variants. the behaviour I want is

Base.parent(rdd::JavaRDD) = rdd
Base.parent(rdd::JavaPairRDD) = rdd
Base.parent(rdd::PipelinedRDD) = rdd.parentrdd
Base.parent(rdd::PipelinedPairRDD) = rdd.parentrdd

which I don't think can be simplified away to just the one call, three seemed the least.

By the way just added group_by_key.

dfdx commented

It looks error-prone that JavaRDD references itself as its parent, e.g. it may lead to loops during RDD chain traversal. Why do you need it this way?

Thanks for the comment, agree it's confusing so I've removed the use of Base.parent from my recent commits.

I'm now fairly happy with what's there now, is there anything else you'd need me to look at before doing a pull request?

One thing to note is that at the moment group by key only works if the key is a string or an integer as these are converted to native java types. For the julia types serialized as byte arrays the default equals only checks for object reference equality not for byte by byte equality. I think fixing this will require a custom type with a deep equals override but I can live without this for now.

dfdx commented

For the julia types serialized as byte arrays the default equals only checks for object reference equality not for byte by byte equality.

On Julia or Java side? If Java considers 2 byte arrays with the same content equal during groupByKey, I believe it should work fine.

You can create PR at any moment and push to it anytime later if changes are needed. In fact, it's even more convenient because we can add inline comments (if they work this time, of course).

One of the things I've noticed is that there's a lot of code duplication. I haven't checked them line by line, but JuliaPairRDD seems to have a lot of identical code with JuliaRDD. I even think it might be possible to merge them, considering Pair as a special case of Any and casting type where needed. Or maybe not, but I'd like to see why, because maintaining 2 classes instead of 1 is almost twice as hard.

Also, many functions on pairs are just copies of their counterparts for ordinary values. One example is flat_map_par() which is identical with flat_map().

I also believe you've got confused with data transfer and serialization. I split them as follows:

  • writeobj and readobj are for data transfer; they only work with byte arrays + control codes;
  • to_bytes and from_bytes are used for serialization and deserialization and this is where you should plug additional data types.

This way we can keep data transfer between Java and Julia simple and stupid, and at the same time provide an extensible mechanism for plugging new data types (e.g. floats, collections or even custom classes).

On Julia or Java side? If Java considers 2 byte arrays with the same content equal during groupByKey , I believe it should work fine
Java side, java does object reference equality so it doesn't work as expected with custom objects

On Julia or Java side? If Java considers 2 byte arrays with the same content equal during groupByKey , I believe it should work fine

Thanks I'll do that once I've cleaned up a bit based on your comments, hopefully the code comment will work this time

One of the things I've noticed is that there's a lot of code duplication. I haven't checked them line by line, but JuliaPairRDD seems to have a lot of identical code with JuliaRDD . I even think it might be possible to merge them, considering Pair as a special case of Any and casting type where needed. Or maybe not, but I'd like to see why, because maintaining 2 classes instead of 1 is almost twice as hard.

The JuliaPairRDD inherits from a different parent class and need a different iterator, I suppose we could try and template it but JavaCall and templated classes seems hard. I'll see what I can do though as agree on code duplication being evil.

Also, many functions on pairs are just copies of their counterparts for ordinary values. One example is flat_map_par() which is identical with flat_map() .

That's a bug I think the returned class should be different but agree the common code should probably be separate.

I also believe you've got confused with data transfer and serialization. I split them as follows:
• writeobj and readobj are for data transfer; they only work with byte arrays + control codes;
• to_bytes and from_bytes are used for serialization and deserialization and this is where you should plug additional data types.
This way we can keep data transfer between Java and Julia simple and stupid, and at the same time provide an extensible mechanism for plugging new data types (e.g. floats, collections or even custom classes).

Thanks for clarifying your intent, I'll try and conform to that more. I'll also keep it a bit cleaner and avoid being clever in moving strings

dfdx commented

The JuliaPairRDD inherits from a different parent class and need a different iterator, I suppose we could try and template it but JavaCall and templated classes seems hard. I'll see what I can do though as agree on code duplication being evil.

One way is to have AbstractJuliaRDD[T] and put all common logic their. E.g.:

    class JuliaRDD extends AbstractJuliaRDD[Any] { ... }
    class JuliaPairRDD extends AbstractJuliaRDD[(Any, Any)] { ... }

More sophisticated, but presumably more elegant way is to put all the logic to JuliaRDD and cast iterator when needed, e.g. something like:

it : Iterator[Any]
pairIt = it.asInstanceOf[(Any, Any)]

Although this one is risky, so we may try to refactor to such an approach later when we have more stable code.

That's a bug I think the returned class should be different but agree the common code should probably be separate.

Julia will specialize on actual parameters. E.g. if you have function foo(x) = x + 1 and run it as:

foo(42)
foo(1.0)

then Julia will compile 2 absolutely separate functions - one for Int and another for Float64. Likewise, any function func(rdd::RDD) will eventually be separately compiled to func(rdd::SingleRDD) or func(rdd::PairRDD).

Thanks for clarifying your intent, I'll try and conform to that more. I'll also keep it a bit cleaner and avoid being clever in moving strings

By the way, is there a reason you prefer marking starts and ends of arrays and strings instead of first writing their length and then reading that much bytes from a stream?

By the way, is there a reason you prefer marking starts and ends of arrays and strings instead of first writing their length and then reading that much bytes from a stream?

A write time I wanted to avoid having to collect all the elements before writing to avoid an extra layer of copying. This way seems more stream like.

One way is to have AbstractJuliaRDD[T] and put all common logic

I'll try this approach, when working with the code I did find odd behaviour with JavaCall so may factor out the static code a bit more.

Closing this as I've now created pull request