PySpark Cassandra brings back the fun in working with Cassandra data in PySpark.
This module provides python support for Apache Spark's Resillient Distributed Datasets from Apache Cassandra CQL rows using Cassandra Spark Connector within PySpark, both in the interactive shell and in python programmes submitted with spark-submit.
This project was initially forked from https://github.com/Parsely/pyspark-cassandra, but in order to submit it to http://spark-packages.org/, a plain old repository was created.
Contents:
- Compatibility
- Using with PySpark
- Using with PySpark shell
- Building
- API
- Examples
- Problems / ideas?
- Contributing
Currently PySpark Cassandra has been succesfully used with Spark version 1.2.0, 1.2.1, 1.2.2, 1.3.0 and 1.3.1 and DataStax Spark Cassandra Connector version 1.2. Feedback on (in-)compatibility with other versions is much appreciated.
Pyspark Cassandra is published at Spark Packages. This allows easy usage with Spark 1.3 and beyond through:
spark-submit \
--packages TargetHolding/pyspark-cassandra:<version> \
--conf spark.cassandra.connection.host=your,cassandra,node,namesspark-submit \
--jars /path/to/pyspark_cassandra-<version>.jar \
--driver-class-path /path/to/pyspark_cassandra-<version>.jar \
--py-files target/pyspark_cassandra_<version>-<python version>.egg \
--conf spark.cassandra.connection.host=your,cassandra,node,names \
--master spark://spark-master:7077 \
yourscript.py(note that the the --driver-class-path due to SPARK-5185)
Replace spark-submit with pyspark to start the interactive shell and don't provide a script as argument and then import PySpark Cassandra. Note that when performing this import the sc variable in pyspark is augmented with the cassandraTable(...) method.
import pyspark_cassandraFor Spark 1.3 with Spark Packages
Pyspark Cassandra can be compiled using:
sbt compileThe package can be published locally with:
sbt spPublishLocalThe package can be published to Spark Packages with (requires authentication and authorization):
sbt spPublishA Java / JVM library as well as a python library is required to use PySpark Cassandra. They can be built with:
make distThis creates 1) a fat jar with the Spark Cassandra Connector and additional classes for bridging Spark and PySpark for Cassandra data and 2) a python source distribution at:
target/pyspark_cassandra-<version>.jartarget/pyspark_cassandra_<version>-<python version>.egg.
The PySpark Cassandra API aims to stay close to the Cassandra Spark Connector API. Reading its documentation is a good place to start.
The primary representation of CQL rows in PySpark Cassandra is the ROW format. However sc.cassandraTable(...) supports the row_format argument which can be any of the constants from RowFormat:
DICT: The default layout, a CQL row is represented as a python dict with the CQL row columns as keys.TUPLE: A CQL row is represented as a python tuple with the values in CQL table column order / the order of the selected columns.KV_DICTS: A tuple of two python dicts represents the primary key columns and remaining (value) columns respectively.KV_TUPLES: A tuple of two python tuples represents the primary key columns and remaining (value) columns respectively. The values in the tuple are in the CQL table column order / the order of the selected columns.ROW: A pyspark_cassandra.Row object representing a CQL row.
Column values are related between CQL and python as follows:
| CQL | python |
|---|---|
| ascii | unicode string |
| bigint | long |
| blob | bytearray |
| boolean | boolean |
| counter | int, long |
| decimal | decimal |
| double | float |
| float | float |
| inet | str |
| int | int |
| map | dict |
| set | set |
| list | list |
| text | unicode string |
| timestamp | datetime.datetime |
| timeuuid | uuid.UUID |
| varchar | unicode string |
| varint | long |
| uuid | uuid.UUID |
| UDT | pyspark_cassandra.UDT |
This is the default type to which CQL rows are mapped. It is directly compatible with pyspark.sql.Row but is (correctly) mutable and provides some other improvements.
This type is structurally identical to pyspark_cassandra.Row but serves user defined types. Mapping to custom python types (e.g. via CQLEngine) is not yet supported.
A CassandraSparkContext is very similar to a regular SparkContext. It is created in the same way, can be used to read files, parallelize local data, broadcast a variable, etc. See the Spark Programming Guide for more details. But it exposes one additional method:
-
cassandraTable(keyspace, table, ...): Returns a CassandraRDD for the given keyspace and table. Additional arguments which can be provided:row_formatcan be set to any of thepyspark_cassandra.RowFormatvalues (defaults toROW)split_sizesets the size in the number of CQL rows in each partition (defaults to100000)fetch_sizesets the number of rows to fetch per request from Cassandra (defaults to1000)consistency_levelsets with which consistency level to read the data (defaults toLOCAL_ONE)
PySpark Cassandra supports saving arbitrary RDD's to Cassandra using:
-
rdd.saveToCassandra(keyspace, table, ...): Saves an RDD to Cassandra. The RDD is expected to contain dicts with keys mapping to CQL columns. Additional arguments which can be supplied are:columns(iterable): The columns to save, i.e. which keys to take from the dicts in the RDD.batch_size(int): The size in bytes to batch up in an unlogged batch of CQL inserts.batch_buffer_size(int): The maximum number of batches which are 'pending'.batch_level(string): The way batches are formed (defaults to "partition"):all: any row can be added to any batchreplicaset: rows are batched for replica setspartition: rows are batched by their partition key
consistency_level(cassandra.ConsistencyLevel): The consistency level used in writing to Cassandra.parallelism_level(int): The maximum number of batches written in parallel.ttl(int or timedelta): The time to live as milliseconds or timedelta to use for the values.timestamp(int, date or datetime): The timestamp in milliseconds, date or datetime to use for the values.
A CassandraRDD is very similar to a regular RDD in pyspark. It is extended with the following methods:
select(*columns): Creates a CassandraRDD with the select clause applied.where(clause, *args): Creates a CassandraRDD with a CQL where clause applied. The clause can contain ? markers with the arguments supplied as *args.saveToCassandra(...): As above, but the keyspace and/or table may be omitted to save to the same keyspace and/or table.spanBy(*columns): Groups rows by the given columns without shuffling.
When importing pyspark_cassandra.streaming the method ``saveToCassandra(...)``` is made available on DStreams.
Creating a SparkContext with Cassandra support
import pyspark_cassandra
conf = SparkConf() \
.setAppName("PySpark Cassandra Test") \
.setMaster("spark://spark-master:7077") \
.set("spark.cassandra.connection.host", "cas-1")
sc = CassandraSparkContext(conf=conf)Using select and where to narrow the data in an RDD and then filter, map, reduce and collect it::
sc \
.cassandraTable("keyspace", "table") \
.select("col-a", "col-b") \
.where("key=?", "x") \
.filter(lambda r: r["col-b"].contains("foo")) \
.map(lambda r: (r["col-a"], 1)
.reduceByKey(lamba a, b: a + b)
.collect()Storing data in Cassandra::
rdd = sc.parallelize([{
"key": k,
"stamp": datetime.now(),
"val": random() * 10,
"tags": ["a", "b", "c"],
"options": {
"foo": "bar",
"baz": "qux",
}
} for k in ["x", "y", "z"]])
rdd.saveToCassandra(
"keyspace",
"table",
ttl=timedelta(hours=1),
)Create a streaming context, convert every line to a generater of words which are saved to cassandra. Through this example all unique words are stored in Cassandra.
The words are wrapped as a tuple so that they are in a format which can be stored. A dict or a pyspark_cassandra.Row object would have worked as well.
from pyspark.streaming import StreamingContext
from pyspark_cassandra import streaming
ssc = StreamingContext(sc, 2)
ssc \
.socketTextStream("localhost", 9999) \
.flatMap(lambda l: ((w,) for w in (l,))) \
.saveToCassandra('keyspace', 'words')
ssc.start()Feel free to use the issue tracker propose new functionality and / or report bugs.
- Fork it
- Create your feature branch (git checkout -b my-new-feature)
- Commit your changes (git commit -am 'Add some feature')
- Push to the branch (git push origin my-new-feature)
- Create new Pull Request