InitialDLab/Simba

Why in the partition folder of core(of sql) should add xxxPartition.scala file

pzz2011 opened this issue · 6 comments

the commit of Simba I mentioned in the tile is https://github.com/InitialDLab/Simba/commit/871e5930fbdcb7e5ab35e010ee6a41ed03dadb3c

I am trying to understand your implementation thoughts. Although I have readed some code of sparkcore,
still have plenty of confusion.

I know that sparkcore has implemented several partition and original sparksql seems has not implemented
partition. Why you should add your own partition implementation in the link above after adding Index for sparksql.

Hope to see your reply, thanks very much!

There are several reasons why we build something like that:

  • Building index needs special locality, such as spatial locality, on each partition. This is not provided by Spark by default.
  • Helper functions are needed by all different algorithms for partitioning data in different ways. We build these partitioners so that we can reuse them frequently.
  • Some important intermediate results are protected by the default Spark partitioner. We decouple these partitioners to make our own for acquiring these values.

It's not very clear to me. In Spark mail list I have found some simple custom Partitioner, but Simba code of
XXXXPartitioner.scala are still very strange to me.(I have compared to spark core's implementation of RangePartitioner.scala)

The following is my confusion at this moment:
1.not only in RangePartition Object but almost all partition implementation in Simba XXXpartition Object 's apply method , you use SortBasedShuffleOn to check whether to use MutablePair or not? I cannot make sense of it.
2. I cannot make good sense of the usage of ShuffledRDD, the snippet code:

   72       val shuffled = new ShuffledRDD[Double, InternalRow, InternalRow](rdd, part)
    73      shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
    74      (shuffled, part.rangeBounds)
    75    }
    76  }

WhysetSerializer(new SparkSqlSerializer(new SparkConf(false)))?

Please send me your reply in your free time, thanks!

They are legacy codes migrated from Exchange execution plan of Spark SQL 1.3. At that particular version, these codes are used to make sure compatibility for old hash based spark shuffler and a better default serializer (Kyro) is used during the procedure. Since all these codes are still functional and didn't cost any issue, I keep them as I don't really have time to look through new Exchange Plan in higher version of Spark SQL.

Well, after a fast pass over the new Exchange plan, all the procedure I did is still there. They are just buried in more heuristics determining if it can use Tungsten Hash Shuffling strategy or not....

This is the particular Spark Plan I am referring to:
https://github.com/InitialDLab/Simba/blob/612053d6d159eb97ae935118fac09e784d41b8e6/engine/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

If you want to learn how it works, you should read and think through it.

They are legacy codes migrated from

It's true that they are come from the version you referred above.

As you mentioned Exchange.scala, I found the Spark Dev mail list the following:
http://apache-spark-developers-list.1001551.n3.nabble.com/question-about-catalyst-and-TreeNode-td16663.html#a16669

Trees are immutable, and TreeNode takes care of copying unchanged parts of the tree when you are doing transformations. As a result, even if you do construct a DAG with the Dataset API, the first transformation will turn it back into a tree.

The only exception to this rule is when we share the results of plans after an Exchange operator. This is the last step before execution and sometimes turns the query into a DAG to avoid redundant computation.

From the above , I know Exchange operation happens between query to a DAG.(does it means tree to DAG(query to rdd?)?)
And I look into the execution/SparkStrategies.scala, it seems say the call site of Exchange in BasicOperation only. So why other operation e.g. SpatialJoinExtractor(your implementation) doesn't need Exchange to transform a query to DAG?

btw, your are so amazing that have published 3 paper in recent one year.:-).

Reading the following Rule of Spark Planner will answer your question. To understand the whole thing, you should stand on a higher position before diving into individual implementation. It will show the elegance of this architecture and it is the correct way to read it.

private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[SparkPlan] {

https://github.com/InitialDLab/Simba/blob/master/engine/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L931