Livy is an open source REST interface for interacting with Spark from anywhere. It supports executing snippets of code or programs in a Spark context that runs locally or in YARN.
- Interactive Scala, Python and R shells
- Batch submissions in Scala, Java, Python
- Multi users can share the same server (impersonation support)
- Can be used for submitting jobs from anywhere with REST
- Does not require any code change to your programs
Pull requests are welcomed! But before you begin, please check out the Wiki.
Livy is used for powering the Spark snippets of the Hadoop Notebook of Hue 3.8, which you can see the implementation here.
See the API documentation below and some curl examples:
To build/run Livy, you will need:
- Debian/Ubuntu:
- mvn (from
maven
package or maven3 tarball) - openjdk-7-jdk (or Oracle Java7 jdk)
- spark 1.4+ from (from Apache Spark tarball)
- Python 2.6+
- R 3.x
- mvn (from
- Redhat/CentOS:
- mvn (from
maven
package or maven3 tarball) - java-1.7.0-openjdk (or Oracle Java7 jdk)
- spark 1.4+ (from Apache Spark tarball)
- Python 2.6+
- R 3.x
- mvn (from
- MacOS:
- Xcode command line tools
- Oracle's JDK 1.7+
- Maven (Homebrew)
- apache-spark 1.5 (Homebrew)
- Python 2.6+
- R 3.x
Livy is built using Apache Maven. To checkout and build Livy, run:
% git clone git@github.com:cloudera/livy.git
% mvn -DskipTests clean package
By default Livy is built with the Cloudera distribution of Spark (currently based off Spark 1.5.0), but it is simple to support other versions, such as Spark 1.4.1, by compiling Livy with:
% mvn -DskipTests -Dspark.version=1.4.1 clean package
In order to run the Livy Tests, first follow the instructions in Building Livy. Then run:
% export SPARK_HOME=/usr/lib/spark
% export HADOOP_CONF_DIR=/etc/hadoop/conf
% mvn test
In order to run Livy with local sessions, first export these variables:
% export SPARK_HOME=/usr/lib/spark
% export HADOOP_CONF_DIR=/etc/hadoop/conf
Then start the server with:
% ./bin/livy-server
Or with YARN sessions by running:
% env \
LIVY_SERVER_JAVA_OPTS="-Dlivy.server.session.factory=yarn" \
CLASSPATH=`hadoop classpath` \
$LIVY_HOME/bin/livy-server
The properties of the server can be modified by copying
livy-defaults.conf.template and renaming it conf/livy-defaults.conf
. The
Livy configuration directory can be placed in an alternative directory by defining
LIVY_CONF_DIR
.
In particular the YARN mode
(default is local
process for development) can be set with:
livy.server.session.factory = yarn
Now to see it in action by interacting with it in Python with the Requests
library. By default livy runs on port 8998 (which can be changed with the
livy_server_port config
option). We’ll start off with a Spark session that
takes Scala code:
>>> import json, pprint, requests, textwrap
>>> host = 'http://localhost:8998'
>>> data = {'kind': 'spark'}
>>> headers = {'Content-Type': 'application/json'}
>>> r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
>>> r.json()
{u'state': u'starting', u'id': 0, u’kind’: u’spark’}
Once the session has completed starting up, it transitions to the idle state:
>>> session_url = host + r.headers['location']
>>> r = requests.get(session_url, headers=headers)
>>> r.json()
{u'state': u'idle', u'id': 0, u’kind’: u’spark’}
Now we can execute Scala by passing in a simple JSON command:
>>> statements_url = session_url + '/statements'
>>> data = {'code': '1 + 1'}
>>> r = requests.post(statements_url, data=json.dumps(data), headers=headers)
>>> r.json()
{u'output': None, u'state': u'running', u'id': 0}
If a statement takes longer than a few milliseconds to execute, Livy returns early and provides a URL that can be polled until it is complete:
>>> statement_url = host + r.headers['location']
>>> r = requests.get(statement_url, headers=headers)
>>> pprint.pprint(r.json())
[{u'id': 0,
u'output': {u'data': {u'text/plain': u'res0: Int = 2'},
u'execution_count': 0,
u'status': u'ok'},
u'state': u'available'}]
That was a pretty simple example. More interesting is using Spark to estimate PI. This is from the Spark Examples:
>>> data = {
... 'code': textwrap.dedent("""\
... val NUM_SAMPLES = 100000;
... val count = sc.parallelize(1 to NUM_SAMPLES).map { i =>
... val x = Math.random();
... val y = Math.random();
... if (x*x + y*y < 1) 1 else 0
... }.reduce(_ + _);
... println(\"Pi is roughly \" + 4.0 * count / NUM_SAMPLES)
... """)
... }
>>> r = requests.post(statements_url, data=json.dumps(data), headers=headers)
>>> pprint.pprint(r.json())
{u'id': 1,
u'output': {u'data': {u'text/plain': u'Pi is roughly 3.14004\nNUM_SAMPLES: Int = 100000\ncount: Int = 78501'},
u'execution_count': 1,
u'status': u'ok'},
u'state': u'available'}
Finally, lets close our session:
>>> session_url = 'http://localhost:8998/sessions/0'
>>> requests.delete(session_url, headers=headers)
<Response [204]>
pyspark has the exact same API, just with a different initial command:
>>> data = {'kind': 'pyspark'}
>>> r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
>>> r.json()
{u'id': 1, u'state': u'idle'}
The PI example from before then can be run as:
>>> data = {
... 'code': textwrap.dedent("""\
... import random
... NUM_SAMPLES = 100000
... def sample(p):
... x, y = random.random(), random.random()
... return 1 if x*x + y*y < 1 else 0
...
... count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
... .reduce(lambda a, b: a + b)
... print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
... """)
... }
>>> r = requests.post(statements_url, data=json.dumps(data), headers=headers)
>>> pprint.pprint(r.json())
{u'id': 12,
u'output': {u'data': {u'text/plain': u'Pi is roughly 3.136000'},
u'execution_count': 12,
u'status': u'ok'},
u'state': u'running'}
SparkR also has the same API:
>>> data = {'kind': 'sparkR'}
>>> r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
>>> r.json()
{u'id': 1, u'state': u'idle'}
The PI example from before then can be run as:
>>> data = {
... 'code': textwrap.dedent("""\
... n <- 100000
... piFunc <- function(elem) {
... rands <- runif(n = 2, min = -1, max = 1)
... val <- ifelse((rands[1]^2 + rands[2]^2) < 1, 1.0, 0.0)
... val
... }
... piFuncVec <- function(elems) {
... message(length(elems))
... rands1 <- runif(n = length(elems), min = -1, max = 1)
... rands2 <- runif(n = length(elems), min = -1, max = 1)
... val <- ifelse((rands1^2 + rands2^2) < 1, 1.0, 0.0)
... sum(val)
... }
... rdd <- parallelize(sc, 1:n, slices)
... count <- reduce(lapplyPartition(rdd, piFuncVec), sum)
... cat("Pi is roughly", 4.0 * count / n, "\n")
... """)
... }
>>> r = requests.post(statements_url, data=json.dumps(data), headers=headers)
>>> pprint.pprint(r.json())
{u'id': 12,
u'output': {u'data': {u'text/plain': u'Pi is roughly 3.136000'},
u'execution_count': 12,
u'status': u'ok'},
u'state': u'running'}
- User group: http://groups.google.com/a/cloudera.org/group/livy-user
- Dev group: http://groups.google.com/a/cloudera.org/group/livy-dev
- Jira: https://issues.cloudera.org/browse/LIVY
- Pull requests: https://github.com/cloudera/livy/pulls
Returns all the active interactive sessions.
name | description | type |
---|---|---|
sessions | session list | list |
Creates a new interative Scala, Python or R shell in the cluster.
name | description | type |
---|---|---|
kind | The session kind (required) | session kind |
proxyUser | The user to impersonate that will run this session (e.g. bob) | string |
jars | Files to be placed on the java classpath | list of paths |
pyFiles | Files to be placed on the PYTHONPATH | list of paths |
files | Files to be placed in executor working directory | list of paths |
driverMemory | Memory for driver (e.g. 1000M, 2G) | string |
driverCores | Number of cores used by driver (YARN mode only) | int |
executorMemory | Memory for executor (e.g. 1000M, 2G) | string |
executorCores | Number of cores used by executor | int |
numExecutors | Number of executors (YARN mode only) | int |
archives | Archives to be uncompressed in the executor working directory (YARN mode only) | list of paths |
queue | The YARN queue to submit too (YARN mode only) | string |
name | Name of the application | string |
conf | Spark configuration property | Map of key=val |
The created Session.
Return the session information
The Session.
Kill the Session job.
Get the log lines from this session.
name | description | type |
---|---|---|
from | offset | int |
size | amount of batches to return | int |
name | description | type |
---|---|---|
id | The session id | int |
from | offset | int |
size | total amount of lines | int |
log | The log lines | list of strings |
Return all the statements in a session.
name | description | type |
---|---|---|
statements | statement list | list |
Execute a statement in a session.
name | description | type |
---|---|---|
code | The code to execute | string |
The statement object.
Return all the active batch jobs.
name | description | type |
---|---|---|
batches | batch list | list |
name | description | type |
---|---|---|
proxyUser | The user to impersonate that will execute the job | string |
file | Archive holding the file | path (required) |
args | Command line arguments | list of strings |
className | Application's java/spark main class | string |
jars | Files to be placed on the java classpath | list of paths |
pyFiles | Files to be placed on the PYTHONPATH | list of paths |
files | Files to be placed in executor working directory | list of paths |
driverMemory | Memory for driver (e.g. 1000M, 2G) | string |
driverCores | Number of cores used by driver | int |
executorMemory | Memory for executor (e.g. 1000M, 2G) | string |
executorCores | Number of cores used by executor | int |
numExecutors | Number of executor | int |
archives | Archives to be uncompressed (YARN mode only) | list of paths |
queue | The YARN queue to submit too (YARN mode only) | string |
name | Name of the application | string |
conf | Spark configuration property | Map of key=val |
The created Batch object.
name | description | type |
---|---|---|
from | offset | int |
size | amount of batches to return | int |
name | description | type |
---|---|---|
id | The batch id | int |
state | The state of the batch | batch state |
log | The output of the batch job | list of strings |
Kill the Batch job.
Get the log lines from this batch.
name | description | type |
---|---|---|
from | offset | int |
size | amount of batches to return | int |
name | description | type |
---|---|---|
id | The batch id | int |
from | offset | int |
size | total amount of lines | int |
log | The log lines | list of strings |
Sessions represent an interactive shell.
name | description | type |
---|---|---|
id | The session id | int |
kind | session kind (spark, pyspark, or sparkr) | session kind (required) |
log | The log lines | list of strings |
state | The session state | string |
name | description |
---|---|
not_started | session has not been started |
starting | session is starting |
idle | session is waiting for input |
busy | session is executing a statement |
error | session errored out |
dead | session has exited |
name | description |
---|---|
spark | interactive scala/spark session |
pyspark | interactive python/spark session |
sparkr | interactive R/spark session |
Statements represent the result of an execution statement.
name | description | type |
---|---|---|
id | The statement id | integer |
state | The execution state | statement state |
output | The execution output | statement output |
name | description |
---|---|
running | Statement is currently executing |
available | Statement has a ready response |
error | Statement failed |
name | description | type |
---|---|---|
status | execution status | string |
execution_count | a monotomically increasing number | integer |
data | statement output | an object mapping a mime type to
the result. If the mime type is
application/json , the value
will be a JSON value |
name | description | type |
---|---|---|
id | The session id | int |
kind | session kind (spark, pyspark, or sparkr) | session kind (required) |
log | The log lines | list of strings |
state | The session state | string |
Apache License, Version 2.0 http://www.apache.org/licenses/LICENSE-2.0