twitter/scalding

Scalding on Beam discussion about Joins

tlazaro opened this issue · 1 comments

Multi joins for Scalding on Beam are not performing well on Dataflow, or getting stuck on multi joins (many inputs) with very large collections where many keys with many values are common. i.e. a very large, production Scalding job with a 7-way join with billions of elements in a few inputs and millions in others, with many hot keys performs well on MapReduce but can't complete the join when running on Dataflow using the Beam backend.

It's possible the use of Iterators in the joins is causing unnecessary materialization when running on Beam or that the order of the iteration is not favorable to Dataflow as discovered by Scio.

The Scio project offers a very similar user facing API as Scalding and was used to convert Scalding jobs to run on Beam with little modifications. The convention in Scalding and Scio for joins assumes the larger collection or collection with more values for per key should be on the left big.join(small). For Beam and Dataflow the convention seems to be the reverse as explored in: spotify/scio#289. Following the same approach we could reverse the order in which we do the joins.

We could do a custom implementation of MultiJoinFunction specific for the Beam backend. The one in Scalding (here and here) currently depends on Iterable (which can be turned to Iterator) on one side and Iterator on the other (which can only be iterated once). When implementing the joins in Beam we have Iterables for the values coming from all inputs and could be using Iterables for everything in MultiJoinFunction and Joiner, making the functions symmetrical and making the reversal simpler.

I'm creating this issue so we can ground follow-up code changes on this discussion.

To fully map MultiJoinFunction and Joiner to a Beam backend version using only Iterable, we have a problem:

While JoinFunction is a sealed trait, we still accept arbitrary functions from users when creating a MultiJoinFunction:
https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala#L121

Also for JoinFunction: FlatMappedJoin, MappedGroupJoin and JoinFromHashJoin take arbitrary functions depending on Iterator.