/spark-on-neo4j

Run Apache Spark Worker as part (in JVM process) of Neo4j instance

Primary LanguageScalaApache License 2.0Apache-2.0

Experiments to run Apache Spark Workers as part of Neo4j instances

This is an experiment to run a Apache Spark worker as part of a Neo4j instance at startup.

It works with Apache Spark versions 2.0 and later.

License

This spark-on-neo4j library is Apache 2 Licensed

Building

Build target/spark-on-neo4j-full-2.0.0-SNAPSHOT.jar

mvn clean package -DskipTests

Copy target/spark-on-neo4j-full-2.0.0-SNAPSHOT.jar to $NEO4J_HOME/plugins.

Config

First start your $SPARK_HOME/sbin/start-master.sh

Then note the master-url and add it to your neo4j config

16/11/06 11:35:10 INFO Master: Starting Spark master at spark://host:port

You have to add your spark-master URL to your $NEO4J_HOME/con/neo4j.conf

spark.master=spark://host:port
## eg.
#spark.master=spark://localhost:7077

So that the workers are started as part of the Neo4j lifecycle and can connect to the spark-master. Start Neo4j.

bin/neo4j start
or
bin/neo4j console

You should see output like this:

2016-11-06 21:18:28.457+0000 INFO  Starting...
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2016-11-06 21:18:29.592+0000 INFO  Bolt enabled on localhost:7687.
2016-11-06 21:18:29.609+0000 INFO  Initiating metrics...
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/11/06 22:18:30 INFO Worker: Started daemon with process name: 42589@Ich.local
...
16/11/06 22:18:31 INFO Utils: Successfully started service 'sparkWorker' on port 62461.
16/11/06 22:18:32 INFO Worker: Starting Spark worker 192.168.111.125:62461 with 4 cores, 15.0 GB RAM
16/11/06 22:18:32 INFO Worker: Running Spark version 2.0.0
16/11/06 22:18:32 INFO Worker: Spark home: /Users/mh/v/neo4j-enterprise-3.0.7/data/databases/graph.db/spark
16/11/06 22:18:32 INFO Utils: Successfully started service 'WorkerUI' on port 8081.
16/11/06 22:18:32 INFO WorkerWebUI: Bound WorkerWebUI to 0.0.0.0, and started at http://192.168.111.125:8081
16/11/06 22:18:32 INFO Worker: Connecting to master Ich.local:7077...
16/11/06 22:18:32 INFO TransportClientFactory: Successfully created connection to Ich.local/192.168.111.125:7077 after 35 ms (0 ms spent in bootstraps)
16/11/06 22:18:32 INFO Worker: Successfully registered with master spark://Ich.local:7077
16/11/06 22:18:32 INFO Worker: Asked to launch executor app-20161106135845-0002/2478 for Spark shell
16/11/06 22:18:32 INFO ExecutorRunner: Launch command: --driver-url spark://CoarseGrainedScheduler@192.168.111.125:65390 --executor-id 2478 --hostname 192.168.111.125 --cores 4 --app-id app-20161106135845-0002 --worker-url spark://Worker@192.168.111.125:62461
16/11/06 22:18:32 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 42589@Ich.local
...
16/11/06 22:18:32 INFO TransportClientFactory: Successfully created connection to /192.168.111.125:65390 after 5 ms (0 ms spent in bootstraps)
16/11/06 22:18:33 INFO DiskBlockManager: Created local directory at /private/var/folders/3q/d9xqh3fx7lj5nf5y4z3r5zs80000gn/T/blockmgr-fbd2ea8e-b01e-4450-9c54-a9f339753e4b
16/11/06 22:18:33 INFO MemoryStore: MemoryStore started with capacity 2.4 GB
16/11/06 22:18:33 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@192.168.111.125:65390
16/11/06 22:18:33 INFO WorkerWatcher: Connecting to worker spark://Worker@192.168.111.125:62461
16/11/06 22:18:33 INFO TransportClientFactory: Successfully created connection to /192.168.111.125:62461 after 3 ms (0 ms spent in bootstraps)
16/11/06 22:18:33 INFO WorkerWatcher: Successfully connected to spark://Worker@192.168.111.125:62461
16/11/06 22:18:33 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
16/11/06 22:18:33 INFO Executor: Starting executor ID 2478 on host 192.168.111.125
16/11/06 22:18:33 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 62467.
16/11/06 22:18:33 INFO NettyBlockTransferService: Server created on 192.168.111.125:62467
16/11/06 22:18:33 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(2478, 192.168.111.125, 62467)
16/11/06 22:18:33 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(2478, 192.168.111.125, 62467)
16/11/06 22:18:33 INFO Executor: Using REPL class URI: spark://192.168.111.125:65390/classes
16/11/06 22:18:45 INFO CoarseGrainedExecutorBackend: Got assigned task 4
16/11/06 22:18:45 INFO Executor: Running task 0.0 in stage 1.0 (TID 4)
16/11/06 22:18:45 INFO Executor: Fetching spark://192.168.111.125:65390/jars/spark-on-neo4j-2.0.0-SNAPSHOT.jar with timestamp 1478437124959
16/11/06 22:18:45 INFO TransportClientFactory: Successfully created connection to /192.168.111.125:65390 after 7 ms (0 ms spent in bootstraps)
16/11/06 22:18:45 INFO Utils: Fetching spark://192.168.111.125:65390/jars/spark-on-neo4j-2.0.0-SNAPSHOT.jar to /private/var/folders/3q/d9xqh3fx7lj5nf5y4z3r5zs80000gn/T/spark-028e6f9a-9da5-4fe2-855d-5b7557e48055/fetchFileTemp3247636418378142168.tmp

Then this should work:

spark-shell
$SPARK_HOME/bin/spark-shell --master spark://master-host:7077 --jars .../spark-on-neo4j/target/spark-on-neo4j-2.0.0-SNAPSHOT.jar:

import org.neo4j.spark._

scala> Neo4j(sc).cypher("MATCH (n) RETURN id(n)").loadRowRdd.count()
res8: Long = 169
Spark Application
import org.neo4j.spark._

conf = new SparkConf().setAppName("NeoTest") // .setMaster(masterUrl)
sc = SparkContext.getOrCreate(conf)

val people = Neo4j(sc).cypher("MATCH (n:Person) RETURN id(n)").loadRowRdd.count()

val count = Neo4j.db.execute("MATCH (n:Person) RETURN count(*) as c").columnAs[Long]("c").next()

Builder API

You can use a fluent builder API to declare the queries or patterns you want to use, but also partitions, total-rows and batch-sizes and then select which Apache Spark data type to load.

This library supports:

  • RDD[Row], RDD[T] (loadRowR)

  • DataFrame

  • GraphX Graph

  • GraphFrame

The general usage is

  1. create org.neo4j.spark.Neo4j(sc)

  2. set cypher(query,[params]),nodes(query,[params]),rels(query,[params]) as direct query, or </br> pattern("Label1",Seq("REL"),"Label2") or pattern"Label1","prop1",("REL","prop"),("Label2","prop2

  3. optionally define partitions(n), batch(size), rows(count) for parallelism

  4. choose which datatype to return

    • loadRowRdd, loadNodeRdds, loadRelRdd, loadRdd[T]

    • loadDataFrame,loadDataFrame(schema)

    • loadGraph[VD,ED]

    • loadGraphFrame[VD,ED]

For Example:

org.neo4j.spark.Neo4j(sc).cypher("MATCH (n:Person) RETURN n.name").partitions(5).batch(10000).loadRowRdd

Next Steps

  • Package it to be used out of the box

  • Also expose Neo4j’s Core API to Spark

this doesn’t yet work :(
./spark-shell --master spark://Ich.local:7077 --jars .../spark-on-neo4j-2.0.0-SNAPSHOT.jar,$NEO4J_HOME/lib/neo4j-kernel-3.0.7.jar

import org.neo4j.spark._

val people = Neo4j(sc).inTx(db => db.findNodes(Label.label("Person")).asScala.size)
Stacktrace / Error
TREE: <db: error>.findNodes(Label.label("Person"))
TRANSFORM: null
TREE: <db: error>.findNodes(Label.label("Person")).<asScala: error>
TRANSFORM: null
TREE: <db: error>.findNodes(Label.label("Person")).<asScala: error>.<size: error>
TRANSFORM: <console>
TREE: ((<db: error>: org.neo4j.graphdb.GraphDatabaseService) => <db: error>.findNodes(Label.label("Person")).<asScala: error>.<size: error>)
TRANSFORM: null
TREE: org.neo4j.spark.Neo4j.apply($iw.this.$line3$read.$iw.$iw.sc).<inTx: error>[Nothing](((<db: error>: org.neo4j.graphdb.GraphDatabaseService) => <db: error>.findNodes(Label.label("Person")).<asScala: error>.<size: error>))
TRANSFORM: <console>
TREE: private[this] val res5: Nothing = org.neo4j.spark.Neo4j.apply($iw.this.$line3$read.$iw.$iw.sc).<inTx: error>[Nothing](((<db: error>: org.neo4j.graphdb.GraphDatabaseService) => <db: error>.findNodes(Label.label("Person")).<asScala: error>.<size: error>))
... 1000 lines ...
java.lang.AssertionError: assertion failed:
  <db: error>.findNodes(Label.label("Person"))
     while compiling: <console>
        during phase: superaccessors
     library version: version 2.11.8
    compiler version: version 2.11.8
  reconstructed args: -classpath file:/Users/mh/java/neo/spark-on-neo4j/target/spark-on-neo4j-2.0.0-SNAPSHOT.jar:file:/Users/mh/v/neo4j-enterprise-3.0.7/lib/neo4j-kernel-3.0.7.jar -Yrepl-outdir /private/var/folders/3q/d9xqh3fx7lj5nf5y4z3r5zs80000gn/T/spark-572a8d8f-ce69-4d27-bdd8-522a7bb66c44/repl-feafbc17-b645-4d19-89a6-d416ff3afe6b -Yrepl-class-based

  last tree to typer: Select(This(package $line24), $read)
       tree position: line 54 of <console>
            tree tpe: $line24.$read.type
              symbol: object $read in package $line24
   symbol definition: object $read (a ModuleSymbol)
      symbol package: $line24
       symbol owners: object $read
           call site: class $iw in package $line24
...
<Cannot read source file>
	at scala.tools.nsc.typechecker.SuperAccessors$SuperAccTransformer.transform(SuperAccessors.scala:358)
	at scala.tools.nsc.typechecker.SuperAccessors$SuperAccTransformer.transform(SuperAccessors.scala:71)
	at scala.reflect.internal.Trees$class.itransform(Trees.scala:1345)
	at scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:16)
	at scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:16)