G-Research/spark-dgraph-connector

Question about subgraphs/filtering

Opened this issue · 9 comments

Let's say I had a very large graph of IDs->commited->CommitNodes->modified->FileNodes where my edges are reversable.

Commitnodes have a author_date that is a datetime type in the DGraph schema. (ID's and FileNodes do not).

If I want to select from my graph a subgraph of of all the commitNodes for a particular month, and all the filenodes and IDs that they link to, I'm not sure if I can with the current API. Or can I?

Thanks in advance!
-dave

Partially. The best you can get is to only load all committed and modified edges and all node ids with author_date in your date range and then filtering the edges by joining with the node ids. You can't get better than that with the current connector implementation.

Assuming only CommitNodes have author_date, you could do

def read(first: Timestamp, last: Timestamp): Unit = {
  val edges = spark.read.dgraph.edges("localhost:9080")
  val nodes = spark.read.dgraph.nodes("localhost:9080")

  val committedEdges = edges.where($"predicate" === "committed")
  val modifiedEdges = edges.where($"predicate" === "modified")
  val commitNodeIds = nodes.where($"predicate" === "author_date" && $"objectTimestamp".between(first, last))

  Seq(
    committedEdges.join(commitNodeIds, committedEdges("objectUid") === commitNodeIds("subject"), "leftsemi"),
    modifiedEdges.join(commitNodeIds, modifiedEdges("subject") === commitNodeIds("subject"), "leftsemi")
  ).reduce(_.union(_))
}

This assumes that the Left Semi join pushes the "subject" projection down to the nodes DataFrame.

If you are calling that read for many different months, you could also read the entire committed-modified subgraph, enrich it with author date and store the data in partitioned parquet files:

val edges = spark.read.dgraph.edges("localhost:9080")
val nodes = spark.read.dgraph.nodes("localhost:9080")

val committedEdges = edges.where($"predicate" === "committed")
val modifiedEdges = edges.where($"predicate" === "modified")
val authorDates = nodes.where($"predicate" === "author_date").select($"subject".as("node_id"), $"objectTimestamp".as("author_date"))
val authorMonths = authorDates.select($"node_id", date_format($"author_date", "yyyy-MM").as("author_month"))

edgesWithAuthorDate = Seq(
  committedEdges.join(authorMonths.withColumnRenamed("node_id", "objectUid")),
  modifiedEdges.join(authorMonths.withColumnRenamed("node_id", "subject"))
).reduce(_.union(_))

edgesWithAuthorDate.write.partitionBy("author_month").parquet("commit-modified-edges")

Then you can read your sub-graph by calling:

spark.read.parquet("commit-modified-edges").where($"author_month" === "2020-01") 

This will read only relevant edges.

I recommend to use DataFrame.writePartitionedBy provided by our spark-extension package to write sensible partitioned files.

Some thinking about how to make the connector support your use-case (#144). Your request would turn into the following DQL query:

query {
  commitNodes as var(func: between(<author_date>, "2020-01-01T00:00:00Z", "2020-01-31T23:59:59Z"))

  result (func: uid(commitNodes), first: 100, offset: 0) {
    <uid>
    <author_date>
    <~commited> { <uid> }
    <modified> { <uid> }
  }
}

Which would be straightforward to turn into triples. As soon as you want to retrieve data from IDs or FileNodes, things become tricky as this can introduce duplicate triples. Also the , first: 100, offset: 0 would need to be injected to add partitioning. It looks like only a very limited subset of DQL can potentially be supported by this connector.

The io.grpc.StatusRuntimeException: UNAVAILABLE is 100% DGraph, so up to this point your PySpark is correct. This looks like the Dgraph instance is down. You should check the output of the alpha and zero nodes. And check it is still working with Ratel / https://play.dgraph.io.

Sorry for the late reply. Hope this background info is still relevant.

Smaller chunk sizes mean more partitions. With a large Spark cluster, this means more concurrent reads hitting the Dgraph cluster, which might cause the unresponsiveness.