We begin with an introduction to the Tech Stack that we used before briefly elucidating the schema of the dataset that was ingested. Onwards, the Data Ingestion methodology including reasons for choosing different parts of the stack is addressed.
The second part deals with Indexing and Retrieval of data using Gremlin from TitanDB. We conclude with a short section on key learnings and an attached appendix. The appendix includes detailed directions for Setup.
- Spark in conjunction with HDFS for pre-processing of data.
- TitanDB configured with Cassandra as the storage-backend and ElasticSearch as the indexing-backend serves as our graph database.
- Spark (internal to TitanDB) interacting with HDFS is responsible for Bulk Ingestion of data in TitanDB.
- MapReduce leveraged to build ElasticSearch indexes over the graph database.
In contrast to most mainstream NoSQL technologies, TitanDB is not schemaless. Vertex and Edge labels, as well as properties and their datatypes need to be declared before starting to use the database. This is generally done by a call to the graph's ManagementSystem (for eg, refer: scripts/load-users.groovy).
The StackOverflow dataset, as noted in the proposal consists of 3 main elements:
- Users - The Stack Overflow users
- Posts - Questions or Answers
- Comments - User comments on posts
Let's run through the sample schema pictured above.
- John, Bill and Silly are 3 fictional users.
- John asks the question "How to compute sum of ..." to which Bill answers "(1 to N toList) ...". Note that both the question and answer are posts, but in addition, the answer_post has an answerTo relationship with the question_post.
- Silly is the author of 2 comments, each of which are related to one of the posts.
- Observations:
- John is connected to Bill via their QnA.
- John and Bill are both connected to Silly via comments on their respective posts.
We walk through the strategy in detail explaining the reasons for our choices alongwith any nuances with the help of text as well as code.
- Read XML files from HDFS using Spark, filter in the data of interest and write to HDFS again in CSV format.
- Use BulkLoaderVertexProgram (running over Spark) to digest the resulting CSV files and add nodes to the graph.
The dataset consists of XML files archived in 7z format. 7za is a lightweight-archiving utility that can be used to unarchive the XML files. The usage goes like this 7za x stackoverflow.com-Users.7z
.
spark-xml is a Spark library that reads XML files into Spark-SQL dataframes. Since the XML read happens over Spark, the gut-feel is that this should be much faster than other solutions like Python XML Iterators. While differences might not be noticeable when running on a single machine, this nevertheless serves as a PoC for distributed processing of large XML files.
The first step is to modify XML in-place using the commandline tool sed
so it's readable. This is a limitation of Spark-XML, in that it can't read self-closing XML tags.
# Insert closing tag </row>, while adding a dummy attribute foo with the value bar inside.
sed -i -e 's/\/>/><foo>bar<\/foo><\/row>/' Users.xml
We've tried to do all File I/O via HDFS, to imitate a distributed processing framework. In line with this, after stream-processing with sed, the data is moved to HDFS from where Spark picks it up and later writes the output to HDFS itself.
# Create input directory if it doesn't exist
hadoop fs -ls
hadoop fs -mkdir input
# Move Users.xml to input
# moveFromLocal instead of copyFromLocal if you're short on space
hadoop fs -copyFromLocal Users.xml input/
The following lines of codes are most conveniently implemented on Spark Shell (bin/spark-shell
).
// In order to read XML into Spark Dataframes, Spark SQL is required.
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
// Read the <row> tags of the XML file that is loaded from the HDFS location "input/Users.xml". The <row> tags could more appropriately have been <user> tags, but that's how the file is formatted.
// Play with the Dataframe using df.show() or df.filter functions to get a sense of what the data looks like.
val df = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "row").load("input/Users.xml")
// Select the attributes of interest, write them using CSV format with no header and NULLs formatted as empty strings and write to graph/nodes/users directory in HDFS.
// The directory MUST be empty if it exists. If it doesn't exist, Spark will create it.
df.select("@Id", "@DisplayName", "@Age", "@Location", "@UpVotes", "@DownVotes", "@Reputation").write.format("com.databricks.spark.csv").option("nullValue", "").save("output/users")
Loading the graph elements transactionally, i.e. one-by-one into the database may be a resonable strategy for small datasets, however, it quickly becomes infeasible for large datasets. Delving deeper into the literature around TitanDB, we discovered a utility program known as the BulkLoaderVertexProgram, that is built exactly for this use-case. But first, some tech details.
TitanDB heavily uses TinkerPop, the popular graph computing framework for a variety of purposes, including graph traversals and computations. It's easy to see that graph computations over large graphs is a non-trivial problem. To tackle it, TinkerPop provides Hadoop-Gremlin (formerly TinkerPop Furnace), a scalable graph engine for processing large graphs. Hadoop-Gremlin provides support for computing over Giraph, Spark and MapReduce.
Hadoop-Gremlin allows bulk ingestion of data using its BulkLoaderVertexProgram (blvp, in short) via reading of standard as well as arbitrarily formatted files (using Hadoop's ScriptInputFormat). CSV is a custom or arbitrary format because the standard formats for graph ingestion are GraphML and GraphSON, based on XML and JSON respectively. The formats for both of these are fairly sophisticated and the usual method of producing such files is generally by outputting a graph in one of these formats, which doesn't work for our case.
An obvious implication of the custom format is a script that can parse the format. Therefore, there are 3 files that we need to put in place for TitanDB to process the data.
- scripts/load_users.groovy : Creates the attributes and label of nodes (here, users) that we need.
- conf/hadoop-graph/users-hadoop-load.properties : Specifies location of input data and the parsing script.
- scripts/users-script-input.groovy : The parsing script itself, this needs to be in HDFS too.
Once these are in-place, just execute the load_users.groovy script on the Gremlin shell:
:load scripts/load_users.groovy
The process for loading edges varies because of a long-standing bug in the version of TinkerPop that TitanDB uses. Instead of using blvp, edges are therefore loaded transactionally. The below example creates directed edges with the label createdBy from posts to users, signifying the relation that a given post was created by the connected user.
// Get Dataframe as earlier, then continue using below
// Take a fraction of data points, if you so wish
var filtered_posts_users = df.filter(df.col("@OwnerUserId").isNotNull).select("@Id", "@OwnerUserId").limit(100000)
// write to CSV
filtered_posts_users.write.format("com.databricks.spark.csv").option("nullValue", "").save("graph/edges/posts_owners")
We copy the data from HDFS to the local filesystem and use just one script: scripts/load-posts-owners.groovy to load the edges into the graph. The script iterates over all lines in all files in the specified directory, loading edges one-by-one. This provides a convenient point to assess the efficacy of blvp by comparing the runtimes of two.
In our framework, edges are lightweight connections between nodes without any properties. Hence, it stands to reason that they should be loaded with little effort. On my comp, where ingestion of 8 million users takes around 30 minutes, ingestion of even a 1 million edges easily surpasses that benchmark!
vertex_label | # ingested |
---|---|
user | 7.6M |
post | 3.4M |
comments | 3.9M |
relationship_type (edge_label) | # ingested |
---|---|
user <- post (createdBy) | 3.2M |
post <- post (answerTo) | 2.4M |
post <- comment (commentOn) | 250K |
user <- comment (createdBy) | 250K |
Here are some basic queries to give the reader a flavor of Gremlin DSL:
// Get user with id 42
g.V().has('bulkLoader.vertex.id', 'user:42').properties()
// Get a couple of posts created by user with id 42
// Note how the function "in" is used to traverse an edge, and "textPrefix" used to get only posts (out of posts and comments)
g.V().has('bulkLoader.vertex.id', 'user:42').in('createdBy').has('bulkLoader.vertex.id', textPrefix('post')).limit(2).properties()
// Get user who created a specific post
g.V().has('bulkLoader.vertex.id', 'post:42').out('createdBy').properties()
// Get answer to a question post
g.V().has('bulkLoader.vertex.id', 'post:42').in('answerTo').properties()
Funkier things with Gremlin can consist of trying to connect users via their questions and answers or their comments. For instance, look at the snippets below:
// Get users connected to a certain user via answers to his/her asked questions
g.V().has('bulkLoader.vertex.id', 'user:91').in('createdBy').has('PostTypeId', 1).in('answerTo').out('createdBy').values('DisplayName').dedup()
// Get users connected to a given user U via comments on questions asked by U
g.V().has('bulkLoader.vertex.id', 'user:91').in('createdBy').has("PostTypeId", 1).in('commentOn').out('createdBy').values("DisplayName").dedup()
However, consider a range scan or wildcard query like the following:
// Find users with murakami in their name
g.V().has("DisplayName", textContainsRegex("murakami"))
TitanDB gives a warning on issuing this query, complaining that it will have to scan the entire graph. While graph scans can be forbidden by using a flag (force-index) on production environments, the solution is to use approprate indexes on items that are consistent with query patterns.
This is where ElasticSearch comes in. TitanDB provides the option of creating 2 different types of graph-indexes, described briefly:
- Composite Indexes - Defined on a pre-defined combination of properties that can only check for equality, i.e. no prefix or regex checks for strings and no less-than/greater-than checks for numbers. Since these are rather elementary kinds of indexes, these don't need the presence of an indexing backend.
- Mixed Indexes - More powerful indexes which can be used in combination with one another to answer queries about arbitrary combinations of indexed keys. These require the presence of an Indexing backend like Elastic/Lucene and allow more powerful search predicates.
Our aim is to enable fast lookups for queries of the sort:
// Get users with 'murakami' in their name(text-insensitive) with a reputation greater than 1000
g.V().has("DisplayName", textContainsRegex("murakami")).has("Reputation", gt(1000)).count()
It's obvious that this requires an index on the property 'DisplayName' and another one on 'Reputation' for all users. Here's how you can build an index for DisplayName:
mgmt = graph.openManagement()
display_name = mgmt.getPropertyKey("DisplayName")
/*
Create an index using DisplayName
Observe the Mapping.TEXT argument; this is the default and indexes DisplayName
by tokenizing it into words so that any query be default operates at the
word/token level, NOT the entire string
*/
usersByNameIndex = mgmt.buildIndex("usersByName", Vertex.class).addKey(display_name, Mapping.TEXT.asParameter()).buildMixedIndex("search")
mgmt.commit()
This and a couple more lines are generally all you need to do if you're creating the property alongwith creating the index. When that is not so, as was in our case, a Reindexing procedure must be run to index the previously existing graph data. For large graphs, this can be run as a MapReduce job (the simpler procedure on my machine gives no hint of stopping even after 2 hours).
A painful Hadoop/MapReduce setup is required in order to do so. Unlike the earlier blvp program, in which Spark is run internally by TitanDB, running MapReduce job and task trackers are required in order to execute the Reindexing procedure. Make sure that the TitanDB jars are added to the CLASSPATH var of Hadoop. Here's a snippet that demonstrates how the procedure can be run:
// Run a Titan-Hadoop job to reindex
mgmt = graph.openManagement()
mr = new MapReduceIndexManagement(graph)
mr.updateIndex(mgmt.getGraphIndex("usersByNameIndex"), SchemaAction.REINDEX).get()
After the index is registered and enabled, the magic becomes apparent as you try to execute the above discussed query. No warnings appear, results are displayed quickly without rendering the machine unusable.
To give a more complete use-case, good indexes for the present dataset may efficiently answer questions of the sort: What are all the questions tagged 'C++' asked by the 5 most highly reputed users having 'Linus' in their name ?
Two of the biggest learnings that we had from the course may be summarized thus:
- Using outdated tech is a BAD idea. TitanDB is compatible with versions of Cassandra, ES, Hadoop and Spark that are found deep in git archives and their documentation is even more difficult to locate. Dev time is precious and should not be wasted endlessly on setups.
- Build using tiny samples of data. 1GB files are unsuitable for testing out stuff no matter how obvious things may seem before trying them out. Large execution times mean longer feedback loops and hence, slower progress.
NOTE: All versions of the following support software have been selected because of limitations with TitanDB 1.0.0. The last public release of TitanDB was over 2 years ago, when they only supported Hadoop 1.
- Get Elasticsearch 1.5.2 from the releases section of the Github repo.
- Use Maven to build elasticsearch from the sourcecode:
mvn clean package -DskipTests
. - Unzip the built package, and execute
bin/elasticsearch
. Verify elastic is up using:curl -X GET http://localhost:9200/
.
- Get Cassandra 2.1.19 from the Downloads section of their website.
- Execute
bin/cassandra
from the root directory of the un-archived package. The default settings worked for me with storage set to PROJECT_ROOT/data/data. You might have to play with the JAVA_HOME variable to get this right though. - You can start cassandra in the background with
cassandra -f
and forget about it.
Spark is required because we need to process large XML files. This might sound like a snazzy decision, but the mature Spark XML library which creates Dataframes out of XML data was a no brainer.
- Get Spark 1.6.1.
- Edit conf/spark-env.sh to set the HADOOP_CONF_DIR environment variable to point to the conf directory of your Hadoop installation, so that it integrates with HDFS. Now by default, File I/O happens with HDFS.
- Add the following line to
conf/spark-defaults.conf
, so that the necessary libraries are loaded each time Spark is started.
spark.jars.packages com.databricks:spark-xml_2.10:0.3.5,org.apache.hadoop:hadoop-client:1.2.1,com.databricks:spark-csv_2.10:1.5.0
Not an essential per se, but the Big Data requirement(parsing of XML files running into GBs with Spark) alongwith the way TitanDB's bulk loading program works(only reads from HDFS) requires this.
- Get your preferred format from Apache Archives of Hadoop 1.2.1.
- Conf files from in the
conf/hadoop
directory(in repo) to be used. Specifically,dfs.data.dir
anddfs.name.dir
need to be set to some location in the local filesystem APART FROM /tmp, since /tmp is cleared every time the machine powers down.
- Get Titan 1.0.0 from the Downloads page @ Titan.
- Edit
bin/gremlin.sh
to ensure CLASSPATH includes the path to Hadoop's conf directory. Referbin/gremlin.sh
. - Copy the opencsv and groovycsv jars from the lib directory of the repo to lib directory of your TitanDB project dir.
bin/gremlin.sh
from inside the project directory, and you're good to go.
The following attributes alongwith the data types mentioned alongside were ingested for each of the node types. Please note that graph edges are attribute-less.
- Users
- Id :: Integer
- DisplayName :: String
- Age :: Float
- Location :: String
- UpVotes :: Integer
- DownVotes :: Integer
- Reputation :: Integer
- Posts
- Id :: Integer
- PostTypeId :: Integer
- 1: Question
- 2: Answer
- CreationDate :: Date
- Score :: Integer
- Tags :: String
- AnswerCount :: Integer
- CommentCount :: Integer
- FavoriteCount :: Integer
- Comments
- Id :: Integer
- Score :: Integer
// Get Users with names either 'John' or 'Doe' and sort and find the top 5 according to their reputations and find out the questions they have asked having tags 'Java' for example. This includes duplicates.
g.V().has("DisplayName", textContainsRegex("John","Doe")).order().by('Reputation',decr).limit(5).in("createdBy").has("PostTypeId",1).has("Tags", textContainsRegex("Java"))
// Get Users with name 'John' and find out the answers they have written containing the tag 'Java'
g.V().has("DisplayName", textContainsRegex("John")).in("createdBy").has("PostTypeId",2).has("Tags=", textContainsRegex("Java"))
// Get User with Id2 and find out the number of UpMods the particular user has.
g.V().has('bulkLoader.vertex.id', 'user:2').in("votedFor").has("VoteTypeId",2).count()
// Sometimes when you just need to aggregate a list of various values like let's say name, we can do the following
g.V().has("DisplayName", textContainsRegex("John")).fold()
// If we need to match the various patterns in the graph for the users for similar posts, we can use
g.V().match(
__.as("creators").out("createdBy").has("DisplayName, "Sam").as("posts"),
__.as("posts").in("createdBy").has("Location", "Menlo Park").as("cocreators")).
select("creators","cocreators").by("DisplayName")