This is an experiment to run a Apache Spark worker as part of a Neo4j instance at startup.
It works with Apache Spark versions 2.0 and later.
Build target/spark-on-neo4j-full-2.0.0-SNAPSHOT.jar
mvn clean package -DskipTests
Copy target/spark-on-neo4j-full-2.0.0-SNAPSHOT.jar
to $NEO4J_HOME/plugins
.
First start your $SPARK_HOME/sbin/start-master.sh
Then note the master-url and add it to your neo4j config
16/11/06 11:35:10 INFO Master: Starting Spark master at spark://host:port
You have to add your spark-master URL to your $NEO4J_HOME/con/neo4j.conf
spark.master=spark://host:port ## eg. #spark.master=spark://localhost:7077
So that the workers are started as part of the Neo4j lifecycle and can connect to the spark-master. Start Neo4j.
bin/neo4j start or bin/neo4j console
You should see output like this:
2016-11-06 21:18:28.457+0000 INFO Starting... log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 2016-11-06 21:18:29.592+0000 INFO Bolt enabled on localhost:7687. 2016-11-06 21:18:29.609+0000 INFO Initiating metrics... Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/11/06 22:18:30 INFO Worker: Started daemon with process name: 42589@Ich.local ... 16/11/06 22:18:31 INFO Utils: Successfully started service 'sparkWorker' on port 62461. 16/11/06 22:18:32 INFO Worker: Starting Spark worker 192.168.111.125:62461 with 4 cores, 15.0 GB RAM 16/11/06 22:18:32 INFO Worker: Running Spark version 2.0.0 16/11/06 22:18:32 INFO Worker: Spark home: /Users/mh/v/neo4j-enterprise-3.0.7/data/databases/graph.db/spark 16/11/06 22:18:32 INFO Utils: Successfully started service 'WorkerUI' on port 8081. 16/11/06 22:18:32 INFO WorkerWebUI: Bound WorkerWebUI to 0.0.0.0, and started at http://192.168.111.125:8081 16/11/06 22:18:32 INFO Worker: Connecting to master Ich.local:7077... 16/11/06 22:18:32 INFO TransportClientFactory: Successfully created connection to Ich.local/192.168.111.125:7077 after 35 ms (0 ms spent in bootstraps) 16/11/06 22:18:32 INFO Worker: Successfully registered with master spark://Ich.local:7077 16/11/06 22:18:32 INFO Worker: Asked to launch executor app-20161106135845-0002/2478 for Spark shell 16/11/06 22:18:32 INFO ExecutorRunner: Launch command: --driver-url spark://CoarseGrainedScheduler@192.168.111.125:65390 --executor-id 2478 --hostname 192.168.111.125 --cores 4 --app-id app-20161106135845-0002 --worker-url spark://Worker@192.168.111.125:62461 16/11/06 22:18:32 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 42589@Ich.local ... 16/11/06 22:18:32 INFO TransportClientFactory: Successfully created connection to /192.168.111.125:65390 after 5 ms (0 ms spent in bootstraps) 16/11/06 22:18:33 INFO DiskBlockManager: Created local directory at /private/var/folders/3q/d9xqh3fx7lj5nf5y4z3r5zs80000gn/T/blockmgr-fbd2ea8e-b01e-4450-9c54-a9f339753e4b 16/11/06 22:18:33 INFO MemoryStore: MemoryStore started with capacity 2.4 GB 16/11/06 22:18:33 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@192.168.111.125:65390 16/11/06 22:18:33 INFO WorkerWatcher: Connecting to worker spark://Worker@192.168.111.125:62461 16/11/06 22:18:33 INFO TransportClientFactory: Successfully created connection to /192.168.111.125:62461 after 3 ms (0 ms spent in bootstraps) 16/11/06 22:18:33 INFO WorkerWatcher: Successfully connected to spark://Worker@192.168.111.125:62461 16/11/06 22:18:33 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 16/11/06 22:18:33 INFO Executor: Starting executor ID 2478 on host 192.168.111.125 16/11/06 22:18:33 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 62467. 16/11/06 22:18:33 INFO NettyBlockTransferService: Server created on 192.168.111.125:62467 16/11/06 22:18:33 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(2478, 192.168.111.125, 62467) 16/11/06 22:18:33 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(2478, 192.168.111.125, 62467) 16/11/06 22:18:33 INFO Executor: Using REPL class URI: spark://192.168.111.125:65390/classes 16/11/06 22:18:45 INFO CoarseGrainedExecutorBackend: Got assigned task 4 16/11/06 22:18:45 INFO Executor: Running task 0.0 in stage 1.0 (TID 4) 16/11/06 22:18:45 INFO Executor: Fetching spark://192.168.111.125:65390/jars/spark-on-neo4j-2.0.0-SNAPSHOT.jar with timestamp 1478437124959 16/11/06 22:18:45 INFO TransportClientFactory: Successfully created connection to /192.168.111.125:65390 after 7 ms (0 ms spent in bootstraps) 16/11/06 22:18:45 INFO Utils: Fetching spark://192.168.111.125:65390/jars/spark-on-neo4j-2.0.0-SNAPSHOT.jar to /private/var/folders/3q/d9xqh3fx7lj5nf5y4z3r5zs80000gn/T/spark-028e6f9a-9da5-4fe2-855d-5b7557e48055/fetchFileTemp3247636418378142168.tmp
Then this should work:
$SPARK_HOME/bin/spark-shell --master spark://master-host:7077 --jars .../spark-on-neo4j/target/spark-on-neo4j-2.0.0-SNAPSHOT.jar:
import org.neo4j.spark._
scala> Neo4j(sc).cypher("MATCH (n) RETURN id(n)").loadRowRdd.count()
res8: Long = 169
import org.neo4j.spark._
conf = new SparkConf().setAppName("NeoTest") // .setMaster(masterUrl)
sc = SparkContext.getOrCreate(conf)
val people = Neo4j(sc).cypher("MATCH (n:Person) RETURN id(n)").loadRowRdd.count()
val count = Neo4j.db.execute("MATCH (n:Person) RETURN count(*) as c").columnAs[Long]("c").next()
You can use a fluent builder API to declare the queries or patterns you want to use, but also partitions, total-rows and batch-sizes and then select which Apache Spark data type to load.
This library supports:
-
RDD[Row], RDD[T]
(loadRowR) -
DataFrame
-
GraphX
Graph
-
GraphFrame
The general usage is
-
create
org.neo4j.spark.Neo4j(sc)
-
set
cypher(query,[params])
,nodes(query,[params])
,rels(query,[params])
as direct query, or </br>pattern("Label1",Seq("REL"),"Label2")
orpattern"Label1","prop1",("REL","prop"),("Label2","prop2
-
optionally define
partitions(n)
,batch(size)
,rows(count)
for parallelism -
choose which datatype to return
-
loadRowRdd
,loadNodeRdds
,loadRelRdd
,loadRdd[T]
-
loadDataFrame
,loadDataFrame(schema)
-
loadGraph[VD,ED]
-
loadGraphFrame[VD,ED]
-
For Example:
org.neo4j.spark.Neo4j(sc).cypher("MATCH (n:Person) RETURN n.name").partitions(5).batch(10000).loadRowRdd
-
Package it to be used out of the box
-
Also expose Neo4j’s Core API to Spark
./spark-shell --master spark://Ich.local:7077 --jars .../spark-on-neo4j-2.0.0-SNAPSHOT.jar,$NEO4J_HOME/lib/neo4j-kernel-3.0.7.jar import org.neo4j.spark._ val people = Neo4j(sc).inTx(db => db.findNodes(Label.label("Person")).asScala.size)
TREE: <db: error>.findNodes(Label.label("Person")) TRANSFORM: null TREE: <db: error>.findNodes(Label.label("Person")).<asScala: error> TRANSFORM: null TREE: <db: error>.findNodes(Label.label("Person")).<asScala: error>.<size: error> TRANSFORM: <console> TREE: ((<db: error>: org.neo4j.graphdb.GraphDatabaseService) => <db: error>.findNodes(Label.label("Person")).<asScala: error>.<size: error>) TRANSFORM: null TREE: org.neo4j.spark.Neo4j.apply($iw.this.$line3$read.$iw.$iw.sc).<inTx: error>[Nothing](((<db: error>: org.neo4j.graphdb.GraphDatabaseService) => <db: error>.findNodes(Label.label("Person")).<asScala: error>.<size: error>)) TRANSFORM: <console> TREE: private[this] val res5: Nothing = org.neo4j.spark.Neo4j.apply($iw.this.$line3$read.$iw.$iw.sc).<inTx: error>[Nothing](((<db: error>: org.neo4j.graphdb.GraphDatabaseService) => <db: error>.findNodes(Label.label("Person")).<asScala: error>.<size: error>)) ... 1000 lines ... java.lang.AssertionError: assertion failed: <db: error>.findNodes(Label.label("Person")) while compiling: <console> during phase: superaccessors library version: version 2.11.8 compiler version: version 2.11.8 reconstructed args: -classpath file:/Users/mh/java/neo/spark-on-neo4j/target/spark-on-neo4j-2.0.0-SNAPSHOT.jar:file:/Users/mh/v/neo4j-enterprise-3.0.7/lib/neo4j-kernel-3.0.7.jar -Yrepl-outdir /private/var/folders/3q/d9xqh3fx7lj5nf5y4z3r5zs80000gn/T/spark-572a8d8f-ce69-4d27-bdd8-522a7bb66c44/repl-feafbc17-b645-4d19-89a6-d416ff3afe6b -Yrepl-class-based last tree to typer: Select(This(package $line24), $read) tree position: line 54 of <console> tree tpe: $line24.$read.type symbol: object $read in package $line24 symbol definition: object $read (a ModuleSymbol) symbol package: $line24 symbol owners: object $read call site: class $iw in package $line24 ... <Cannot read source file> at scala.tools.nsc.typechecker.SuperAccessors$SuperAccTransformer.transform(SuperAccessors.scala:358) at scala.tools.nsc.typechecker.SuperAccessors$SuperAccTransformer.transform(SuperAccessors.scala:71) at scala.reflect.internal.Trees$class.itransform(Trees.scala:1345) at scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:16) at scala.reflect.internal.SymbolTable.itransform(SymbolTable.scala:16)