/pig-on-spark

proof-of-concept implementation of Pig-on-Spark integrated at the logical node level

Primary LanguageScalaApache License 2.0Apache-2.0

Pig-on-Spark

Hello there!

This repository holds a proof-of-concept implementation of Pig-on-Spark integrated at the logical node level. The current (August 2014) plan for Pig-on-Spark is to integrate Sigmoid's solution (integrated at the physical node level and based off of the Spork project from Twitter) into Pig now, with this version integrating later on if the performance benefit is deemed to be sufficiently large.

The ASF JIRA ticket is located here.

The Sigmoid Pig-on-Spark implementation is located here.

Building

This project combines Pig and Spark, so you’ll need to build both to get it to work. Since it’s integrated into Spark, you’ll also need to configure Spark to know where Pig is.

  1. Build Pig
  2. Install Pig locally: $ ant mvn-install

Note: This causes the following error on my system:

    BUILD FAILED
    /Users/Greg/Pig/pig/build.xml:1175: Error installing artifact 'org.apache.pig:pig:jar': Error installing artifact: File /Users/Greg/Pig/pig/build/pig-0.14.0-SNAPSHOT-SPARK-h2.jar does not exist

However, the important files are still copied and the next steps in the build will succeed. You can verify that things will still work by running

    $ ls ~/.m2/repository/org/apache/pig/pig/0.14.0-SNAPSHOT/

(substitute the version of Pig you are using in place of 0.14.0-SNAPSHOT). If you see a collection of jars and a pom file, you should be good to go.

  1. Modify Spark SQL’s pom file, /sql/core/pom.xml, to add Pig as a dependency:

     \+    <dependency>
     \+      <groupId>org.apache.pig</groupId>
     \+      <artifactId>pig</artifactId>
     \+      <version>0.14.0-SNAPSHOT-SPARK</version>
     \+    </dependency>
    
  2. Build Spark: $ $SPARK_HOME/sbt/sbt assembly

Testing

I have built a test harness to compare this implementation with vanilla Pig (or with another implementation of Pig on Spark) for correctness. The harness uses test queries from Pig’s nightly.conf, with data generated by Pig’s generate_data.pl script. To run the harness,

  1. $ cp java_opts.sh.template java_opts.sh
  2. Configure the desired settings in java_opts.sh
  3. $ ./java_opts.sh
  4. Run the tests $ sbt/sbt "test-only org.apache.spark.sql.PigCompatibilitySuite"

The code for the harness is in sql/core/src/test/scala/org/apache/spark/sql/PigComparisonTest.scala and the tests that it runs can be configured in sql/core/src/test/scala/org/apache/spark/sql/PigCompatibilitySuite.scala. Add the tests that you want to run to the whitelist and those that you don’t want to run to the blacklist. The file is currently configured so that all tests on the whitelist should pass and all those on the blacklist should fail.

Running

Run as you would any Spark job. The interface is highly analogous to Spark SQL’s: to create a SchemaRDD that will hold the result of a Pig query, use SQLContext.pql(query).

NOTE: Every query you run must begin with a LOAD and end with a STORE, or it will just be a no-op.

For example:

val sqlContext = new SQLContext(sc)
val pigRdd = sqlContext.pql(“a = LOAD ‘/path/to/input’ USING PigStorage(‘\t’) AS (f1: int, f2:int); b = FILTER a BY f1 > 0; STORE b INTO ‘/path/to/output’ USING PigStorage(‘\t’);”)

Code Walkthrough

A note on Catalyst

Catalyst is Spark’s query optimizer. This implementation of Pig-on-Spark translates Pig’s logical plan nodes into Catalyst logical plan nodes, so I’ll give you a brief overview of how Catalyst works.

When user enters a query into Spark SQL, the following steps are performed:

  1. A query parser parses the query text into a logical plan
  2. An analyzer optimizes the logical plan
  3. A series of strategies (heuristics) are applied to the optimized logical plan to convert it to a physical plan (the strategies provide further optimizations in some cases like joins)
  4. Each node in the physical plan has an execute() method that executes that node as one or more RDD operations; these methods are called to execute the query

In short: Query text => Logical plan => Optimized logical plan => Physical plan => RDD operations
(query parser) (analyzer) (heuristic strategies) (node-specific execute methods)

How Pig fits in

This implementation follows these steps:

  1. Grunt translates the input Pig query text into a Pig logical plan
  2. A series of TranslationVisitor classes translates the Pig logical plan into a Catalyst logical plan
  3. Catalyst finishes the optimization, translation to physical nodes, and execution

The most important step here is step 2, so let’s look a little more closely at the TranslationVisitors. There are currently 3 such Visitor classes: LogicalPlanTranslationVisitor, ExpressionPlanTranslationVisitor, and ExpressionPlanTranslationVisitor, all of which mix in the trait PigTranslationVisitor. All of the classes covered in this document can be found in the folder sql/core/src/main/scala/org/apache/spark/sql/ .

PigTranslationVisitor

This is the parent trait for the 3 Visitor classes. It holds the data structures that help with the translation process and has some helper methods for translating Pig types and expressions into Catalyst.

The data structures are as follows:

  • sparkNodes: a list of all of the translated nodes that have been produced by this visitor so far. Once the visitor is done walking the graph, the head of this list will be the root of the graph
  • pigToSparkMap: maps a Pig node to its Spark translation
  • pigToSparkChildrenMap: maps a Pig node to the Spark translations of all of its inputs (children in Catalyst terminology, predecessors for Pig LogicalRelationalOperators, successors for Pig LogicalExpressions).
  • pigToOutputMap: maps a Pig node to its set of outputs (which may not be a legal object in Catalyst). See the section on Bags for more details.

LogicalPlanTranslationVisitor

This class is, for the most part, fairly straightforward. It extends Pig’s LogicalRelationalNodesVisitor class and provides visit() methods that translate a Pig logical plan node (org.apache.pig.newplan.LogicalRelationalOperator) into the appropriate type of Spark logical plan node (org.apache.spark.sql.catalyst.plans.logical.LogicalPlan).

The visit() methods for the different types of LogicalRelationalOperators all follow roughly the same pattern:

  1. Fetch the translated versions of the Pig node’s inputs. We use a DependencyOrderWalker to ensure that every node is visited after its inputs.
  2. Construct the equivalent type of Spark logical node by pulling the necessary information out of the input Pig node.
  3. Add the Pig node and the newly-created Spark node to our data structures so that they will be available for the Pig node’s successors.

For the most part, there tends to be a 1-1 correspondence between Pig logical nodes and Catalyst logical nodes. Pig has LOFilter, Spark has Filter; Pig has LODistinct, Spark has Distinct; etc. These nodes are pretty easy to translate.

However, there are two notable cases in which there is no easy mapping: nested ForEach plans and the CoGroup operator (there may be others, such as Split and Stream, but I have not attempted to translate those yet). The Pig operators LOForEach, LOGenerate, and LOInnerLoad all map to project operations in Catalyst. The LOCoGroup operator maps to an Aggregate node in Catalyst (in the case where we are grouping one table), or a Join followed by an Aggregate (in the case where we are grouping multiple tables).

Furthermore, LOGenerate and LOInnerLoad only appear in nested plans within an LOForEach. In Catalyst these are properly translated into Spark expressions, not logical plan nodes, so they are not handled by the LogicalPlanTranslationVisitor. For the purposes of simplicity, I have simply decided not to implement LOCoGroup operators that group multiple tables.

ExpressionPlanTranslationVisitor

This class is even more straightforward. It extends Pig’s LogicalExpressionVisitor class and provides visit() methods that translate a Pig expression (org.apache.pig.newplan.logical.expression.LogicalExpression) into the corresponding Spark expression (org.apache.spark.sql.catalyst.expressions.Expression). There aren’t any messy cases as there were in the LogicalPlanTranslationVisitor, and almost all of the expressions are self-explanatory.

The only expression that is mildly complicated is the ProjectExpression, which is less straightforward because of nested plans and because Catalyst does not yet support bags. We need a way to get the output schema of the inputs to the ProjectExpression’s attachedRelationalOp, which means that we need a pointer to the translated versions of those inputs. If the attachedRelationalOp is in a nested plan, then we need to go one level further. For details on how bags make the ProjectExpression more complicated, see the section on Bags.

NestedPlanTranslationVisitor

This class is similar in structure to the other two visitors, but is a little more complicated because it covers operations that have fundamentally different representations in Pig vs. Catalyst. The purpose of this class is to translate nested plans (inner plans of a ForEach statement), which Pig treats as logical nodes and Catalyst treats (mostly) as expressions.

We currently only support the LOGenerate and LOInnerLoad operators (which was the bare minimum necessary to demonstrate ForEach). Neither of these operators exist in Catalyst (LOInnerLoad translates to a project expression and LOGenerate to a list of project expressions), so translating the inner plan requires some complicated bookkeeping to keep track of where inputs are coming from.

Bags

One fundamental difference between Pig and Catalyst is that Pig’s object model allows bags to be arbitrarily nested, while Catalyst only supports atomic values. There are plans to expand Catalyst’s object model in the future, but this will require figuring out how to spill nested bags to disk when they exceed the size of the available memory. Currently, Catalyst can spill rows to disk when a table is too large to fit in memory, but it cannot spill parts of an individual row to memory.

One approach to handle this discrepancy (at least for a proof of concept demonstration) would be to just not support operations that create nested bags. However, all LOCoGroup operations produce nested bags as part of their operation (even if they are not returned as part of the final result). Since we really wanted to run aggregate queries, we would have to deal with bags somehow.

The workaround that I chose was to have a separate data structure to keep track of (possibly not Catalyst-legal) intermediate outputs of each Pig node (whether a logical node or an expression). The pigToOutputMap in each PigTranslationVisitor serves this function, and maps Pig nodes to a Bag of their outputs. A Bag in this sense is a sequence in which each element is either a NamedExpression (an attribute references or an aliased expression) or a Bag. The Bag class also includes somewhat hackish methods for preventing the nesting from getting too deep (ie. we would rather have a Bag(1) than a Bag(Bag(Bag(1)))).

Unsupported features

  • Cube
  • Native
  • Rank
  • Split/SplitOutput
  • Stream
  • Union
  • Scalar (expression)
  • UDFs (extremely limited support for a few builtin functions, see section on broken/brittle)
  • Describe/Dump/Explain/Illustrate

Things that are known to be broken/brittle

  • CoGroup: Doesn't support grouping on multiple tables or without an aggregator. Assumes that there is exactly 1 aggregating ForEach for each CoGroup and that this ForEach comes immediately after the CoGroup.
  • ForEach: When used with CoGroup, we manually cut the ForEach node out of the graph (since the whole CoGroup+ForEach construction just becomes a single Aggregate node in Catalyst).
  • Load: Does not support custom load functions. Basically just loads from character-delimited files. Does not support loading without a schema. Does not support loading from a file that was written to within the same query.
  • Sort: Does not support UDF sort functions.
  • UserFunc: Currently only supports SUM, MIN, MAX, AVG