/morpheus

Morpheus brings the leading graph query language, Cypher, onto the leading distributed processing platform, Spark.

Primary LanguageScalaApache License 2.0Apache-2.0

Maven Central

Morpheus: Cypher for Apache Spark

Morpheus extends Apache Spark™ with Cypher, the industry's most widely used property graph query language defined and maintained by the openCypher project. It allows for the integration of many data sources and supports multiple graph querying. It enables you to use your Spark cluster to run analytical graph queries. Queries can also return graphs to create processing pipelines.

Note This is the repo formerly known as opencypher/cypher-for-apache-spark

Intended audience

Morpheus allows you to develop complex processing pipelines orchestrated by a powerful and expressive high-level language. In addition to developers and big data integration specialists, Morpheus is also of practical use to data scientists, offering tools allowing for disparate data sources to be integrated into a single graph. From this graph, queries can extract subgraphs of interest into new result graphs, which can be conveniently exported for further processing.

Morpheus builds on the Spark SQL DataFrame API, offering integration with standard Spark SQL processing and also allows integration with GraphX. To learn more about this, please see our examples.

Current status: Pre-release

The functionality and APIs are stabilizing but surface changes (e.g. to the Cypher syntax and semantics for multiple graph processing and graph projections/construction) are still likely to occur. We invite you to try out the project, and we welcome feedback and contributions.

If you are interested in contributing to the project we would love to hear from you; email us at opencypher@neo4j.org or just raise a PR. Please note that this is an openCypher project and contributions can only be accepted if you’ve agreed to the openCypher Contributors Agreement (oCCA).

Morpheus Features

Morpheus is built on top of the Spark DataFrame API and uses features such as the Catalyst optimizer. The Spark representations are accessible and can be converted to representations that integrate with other Spark libraries.

Morpheus supports a subset of Cypher and is the first implementation of multiple graphs and graph query compositionality.

Morpheus currently supports importing graphs from Hive, Neo4j, relational database systems via JDBC and from files stored either locally, in HDFS or S3. Morpheus has a data source API that allows you to plug in custom data importers for external graphs.

Morpheus Roadmap

Morpheus is under rapid development and we are planning to offer support for:

  • a large subset of the Cypher language
  • new Cypher Multiple Graph features
  • injection of custom graph data sources

Spark Project Improvement Proposal

Currently Morpheus is a third-party add-on to the Spark ecosystem. We, however, believe that property graphs and graph processing has the potential to be come a vital part of data analytics. We are thus working, in cooperation with Databricks, on making Morpheus a core part of Spark. The first step on this road is the specification of a PropertyGraph API, similar to SQL and Dataframes, along with porting Cypher 9 features of Morpheus to the core Spark project in a so called Spark Project Improvement Proposal (SPIP).

We are currently in the second phase of this process, after having successfully passed the vote for inclusion into Apache Spark 3.0. The SPIP describing the motivation and goals is published here SPARK-25994. Additionally SPARK-26028 proposes an API design and implementation strategies.

Supported Spark and Scala versions

As of Morpheus 0.3.0, the project has migrated to Scala 2.12 and Spark 2.4 series. As of Spark 2.4.1 Scala 2.12 is officially supported for Spark. However, only Spark 2.4.2 uses Scala 2.12 for its prebuilt convenience binaries, which means that in order to use Morpheus with a later Spark version, one needs to build it manually.

Get started with Morpheus

Morpheus is currently easiest to use with Scala. Below we explain how you can import a simple graph and run a Cypher query on it.

Building Morpheus

Morpheus is built using Gradle

./gradlew build

Add the Morpheus dependency to your project

In order to use Morpheus add the following dependency:

Maven:

<dependency>
  <groupId>org.opencypher</groupId>
  <artifactId>morpheus-spark-cypher</artifactId>
  <version>0.4.2</version>
</dependency>

sbt:

libraryDependencies += "org.opencypher" % "morpheus-spark-cypher" % "0.4.2"

Remember to add fork in run := true in your build.sbt for scala projects; this is not Morpheus specific, but a quirk of spark execution that will help prevent problems.

Hello Morpheus

Cypher is based on the property graph data model, comprising labelled nodes and typed relationships, with a relationship either connecting two nodes, or forming a self-loop on a single node. Both nodes and relationships are uniquely identified by an ID (Morpheus internally uses Array[Byte] to represent identifiers and auto-casts Long, String and Integer values), and contain a set of properties.

The following example shows how to convert a social network represented by two DataFrames to a PropertyGraph. Once the property graph is constructed, it supports Cypher queries via its cypher method.

import org.apache.spark.sql.DataFrame
import org.opencypher.morpheus.api.MorpheusSession
import org.opencypher.morpheus.api.io.{MorpheusNodeTable, MorpheusRelationshipTable}
import org.opencypher.morpheus.util.App

/**
  * Demonstrates basic usage of the Morpheus API by loading an example graph from [[DataFrame]]s.
  */
object DataFrameInputExample extends App {
  // 1) Create Morpheus session and retrieve Spark session
  implicit val morpheus: MorpheusSession = MorpheusSession.local()
  val spark = morpheus.sparkSession

  import spark.sqlContext.implicits._

  // 2) Generate some DataFrames that we'd like to interpret as a property graph.
  val nodesDF = spark.createDataset(Seq(
    (0L, "Alice", 42L),
    (1L, "Bob", 23L),
    (2L, "Eve", 84L)
  )).toDF("id", "name", "age")
  val relsDF = spark.createDataset(Seq(
    (0L, 0L, 1L, "23/01/1987"),
    (1L, 1L, 2L, "12/12/2009")
  )).toDF("id", "source", "target", "since")

  // 3) Generate node- and relationship tables that wrap the DataFrames. The mapping between graph elements and columns
  //    is derived using naming conventions for identifier columns.
  val personTable = MorpheusNodeTable(Set("Person"), nodesDF)
  val friendsTable = MorpheusRelationshipTable("KNOWS", relsDF)

  // 4) Create property graph from graph scans
  val graph = morpheus.readFrom(personTable, friendsTable)

  // 5) Execute Cypher query and print results
  val result = graph.cypher("MATCH (n:Person) RETURN n.name")

  // 6) Collect results into string by selecting a specific column.
  //    This operation may be very expensive as it materializes results locally.
  val names: Set[String] = result.records.table.df.collect().map(_.getAs[String]("n_name")).toSet

  println(names)
}

The above program prints:

Set(Alice, Bob, Eve)

More examples, including multiple graph features, can be found in the examples module.

Run example Scala apps via command line

You can use Gradle to run a specific Scala application from command line. For example, to run the DataFrameInputExample within the morpheus-examples module, we just call:

./gradlew morpheus-examples:runApp -PmainClass=org.opencypher.morpheus.examples.DataFrameInputExample

Next steps

How to contribute

We would love to find out about any issues you encounter and are happy to accept contributions following a Contributors License Agreement (CLA) signature as per the process outlined in our contribution guidelines.

License

The project is licensed under the Apache Software License, Version 2.0, with an extended attribution notice as described in the license header.

Copyright

© Copyright 2016-2019 Neo4j, Inc.

Apache Spark™, Spark, and Apache are registered trademarks of the Apache Software Foundation.