PHOEBUS ======= Contents -------- * Introduction * Computational Model * Distributed Processing * Getting it to work * Pluggable Storage and True Multi-Node Distribution with HDFS and Thrift * Next Steps * References Introduction ------------ Phoebus is a system written in Erlang for Distributed processing of very large graphs that span billions of vertices and edges. It is basically an implementation of Google's Pregel[1] paper. It supports a Distributed model of computation similar to MapReduce[2], but more tuned to Graph processing. Computational Model ------------------- * A Graph is partitioned into a groups of Records. * A Record consists of a Vertex and its outgoing Edges (An Edge is a Tuple consisting of the edge weight and the target vertex name). * A User specifies a 'Compute' function that is applied to each Record. * Computation on the graph happens in a sequence of incremental Super Steps. * At each Super step, the Compute function is applied to all 'active' vertice of the graph. * Vertices communicate with each other via Message Passing. * The Compute function is provided with the Vertex record and all Messages sent to the Vertex in the previous SuperStep. * A Compute funtion can - Mutate the value associated to a vertex - Add/Remove outgoing edges. - Mutate Edge weight - Send a Message to any other vertex in the graph. - Change state of the vertex from 'active' to 'hold'. * At the begining of each SuperStep, if there are no more active vertices -and- if there are no messages to be sent to any vertex, the algorithm terminates. * A User may additionally specify a 'MaxSteps' to stop the algorithm after a some number of super steps. * A User may additionally specify a 'Combine' funtion that is applied to the all the Messages targetted at a Vertex before the Compute function is applied to it. Distributed Processing ---------------------- * The Computational model allows the algorithm to be parallelly performed by a cluster of phoebus nodes. * A 'Job' submitted to a Phoebus cluster is managed by a 'Master' process running on the node that receives the Job. * The Master partitions the input graph and spawns a 'Worker' for each partition on one of the nodes of the cluster. * The Master then askes the Worker to perform a Super step on its partition and awaits notification from the Worker of Step completion. * The Step number is incremented untill all Workers report that they have no more 'active' Vertices and no more outstanding messages to be deliverd. Getting it to work (Tested on Mac OS X Snow Leopard) ------------------ Requirement: * rebar (Download from http://hg.basho.com/rebar/downloads/rebar) * git * erlang (tested on R14B) 1) Clone github.. $ git clone git://github.com/xslogic/phoebus.git $ cd phoebus 2) Compile and create release.. $ ./generate ==> rel (compile) ==> phoebus (compile) .... ==> rel (generate) Usage: phoebus {start|stop|restart|reboot|ping|console|attach} 3) Create a sample output directory $ mkdir /tmp/output 4) Start a two node Phoebus cluster... Terminal 1: $ ./run_phoebus 1 ..... Erlang R13B04 (erts-5.7.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:5] [hipe] [kernel-poll:true] Eshell V5.7.5 (abort with ^G) (phoebusr1@my-machine)1> Terminal 2: $ ./run_phoebus 2 ..... Erlang R13B04 (erts-5.7.5) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:5] [hipe] [kernel-poll:true] Eshell V5.7.5 (abort with ^G) (phoebusr2@my-machine)1> 5) Creating input data set: Currently, Phoebus requires each Input record be line delimited. It must be of the form "<VertexName>\t<VertexValue>\t<EdgeWeight1>\t<TargetVertexName1>\t<EdgeWeight2>\t<TargetVertexName2>...\n" The module "algos" that comes with Phoebus has a utility function that can generate Binary Tree as a sample input data set. (phoebusr1@my-machine)1> algos:create_binary_tree("/tmp/input", 4, 1000). ok The create_binary_tree function has created an input data set in the directory "/tmp/input". It has created a 1000 node binary tree with root as "1" It has split the input into 4 files. $ head -5 infile1 1 1 1 2 1 3 2 2 1 4 1 5 3 3 1 6 1 7 4 4 1 8 1 9 5 5 1 10 1 11 6) Running a sample algo: The module "algos" has a sample Compute function that calculates shortest path to a Node (phoebusr1@my-machine)1> AFun = fun algos:shortest_path/2. #Fun<algos.shortest_path.2> (phoebusr1@my-machine)1> phoebus_master:start_link([{name, "first_ever"}, {max_steps, 100}, {algo_fun, AFun}, {input_dir, "file:///tmp/input/"}, {output_dir, "file:///tmp/output/"}]). ok Since the input has 4 files, phoebous spawns 4 workers.. 2 on each node... 7) Wait for Algorithm to end.. Once it finishes.. output will be written to "/tmp/output". Listing all Vertices with names starting with "20"... $ cat /tmp/output/* | grep '^20' 200 100:50:25:12:6:3:1 1 400 1 401 204 102:51:25:12:6:3:1 1 408 1 409 20 10:5:2:1 1 40 1 41 201 100:50:25:12:6:3:1 1 402 1 403 203 101:50:25:12:6:3:1 1 406 1 407 The Second column (The value of the Vertex) gives the shortest path to the vertex from the root of the binary tree Pluggable Storage and True Multi-Node Distribution with HDFS[3] and Thrift[4] ----------------------------------------------------------------------------- * A true Multi-Node setup requires a Distributed storage layer. * Phoebus defines an 'external_store' behaviour, an implementation of which is 'external_store_file'. * 'external_store_file' was used in the above section to read/store vertices from the local file system. * Phobues can be mode to read/store to HDFS using the 'external_store_hdfs' module. * The store layer is decided by the URI scheme of the "input_dir" and "output_dir" parameters passed to 'phoebus_master' when submitting a job. for eg: .... (phoebusr1@my-machine)1> phoebus_master:start_link([{name, "first_ever"}, {max_steps, 100}, {algo_fun, AFun}, {input_dir, "hdfs://localhost:9000/tmp/input/"}, {output_dir, "file:///tmp/output/"}]). .... * HDFS exposes only Java APIs. Thus, 'external_store_hdfs' is implemented as a wrapper over an erlang thrift client that talks to an external thrift server. * Phoebus has been tested with the HDFS thrift server that is bundled with Cloudera CDH3 Hadoop distribution[5]. Next Steps ---------- * Need to fix Fault tolerence and Error Handling.. If Worker dies, master must ask another worker on another node to take up work * The Pregel paper talks of an 'Aggregate' Function... need to implement.. * Support Jobs written in Python. * Add support for Disco DFS[6]. References ---------- 1. (http://portal.acm.org/citation.cfm?id=1582716.1582723) Grzegorz Malewicz, Matthew H. Austern, Aart J. C. Bik, James C. Dehnert, Ilan Horn, Naty Leiser, and Grzegorz Czajkowski, Pregel: A System for Large-Scale Graph Processing 2. MapReduce (http://en.wikipedia.org/wiki/MapReduce) 3. Hadoop Distributed File system (http://hadoop.apache.org/hdfs/) 4. Thrift (http://incubator.apache.org/thrift/) 5. Cloudera's CDH3 Hadoop tarball (http://archive.cloudera.com/cdh/3/hadoop-0.20.2+320.tar.gz). 6. Disco Distributed File System (http://discoproject.org/doc/start/ddfs.html)
alexaverbuch/phoebus
Phoebus is a distributed framework for large scale graph processing written in Erlang.
ErlangApache-2.0