Distributed. Columnar. Versioned.
_______ __ ____ ____
/ ____(_) /___ / __ \/ __ )
/ /_ / / / __ \/ / / / __ |
/ __/ / / / /_/ / /_/ / /_/ /
/_/ /_/_/\____/_____/_____/
FiloDB is a new open-source database based on Apache Cassandra and Spark SQL. FiloDB brings breakthrough performance levels for analytical queries by using a columnar storage layout with different space-saving techniques like dictionary compression. At the same time, row-level, column-level operations and built in versioning gives FiloDB far more flexibility than can be achieved using file-based technologies like Parquet alone.
- FiloDB aim's to bring one to two orders of magnitude speedups over OLAP performance of Cassandra 2.x CQL tables + Spark. For the POC performance comparison, please see cassandra-gdelt repo.
- Enable easy exactly-once ingestion from Kafka for streaming geospatial applications.
- Incrementally computed columns and geospatial annotations
- MPP-like automatic caching of projections from popular queries for fast results
FiloDB is a great fit for bulk analytical workloads, or streaming / append-only event data. It is not optimized for heavily transactional, update-oriented workflows.
Overview presentation -- see the docs folder for design docs.
To compile the .mermaid source files to .png's, install the Mermaid CLI.
Definitely alpha or pre-alpha. What is here is more intended to show what is possible with columnar storage on Cassandra combined with Spark, and gather feedback.
- Append-only
- Ingestion and querying through Spark DataFrames
- Keyed by partition and row number only
- Only int, double, long, and string types
- Localhost only - no locality in Spark input source
Also, the design and architecture are heavily in flux. Currently this is designed as a layer on top of Cassandra, but may (probably will) evolve towards something that can integrate with existing C* tables.
- Send me your use cases for OLAP on Cassandra and Spark
- Especially IoT and Geospatial
- Email if you want to contribute
Your feedback will help decide the next batch of features, such as: - which data types to add support for - what architecture is best supported
Run the tests with sbt test
, or for continuous development, sbt ~test
. Noisy cassandra logs can be seen in filodb-test.log
.
Build the spark data source module with sbt spark/assembly
. Then, CD into a Spark 1.4.x distribution (1.4.0 and onwards should work), and start spark-shell with something like:
bin/spark-shell --jars ../FiloDB/spark/target/scala-2.10/filodb-spark-assembly-0.1-SNAPSHOT.jar
You can use the Spark Dataframes read
and write
APIs with FiloDB. This should also make it possible to create and ingest data using only JDBC, or the SQL Shell. To create a dataset:
dataDF.write.format("filodb.spark").
option("dataset", "test1").
option("create_dataset", "true").
save()
Note the create_dataset
option, which automatically creates columns and partitions (which you would otherwise need to do through the CLI).
To read it back:
val df = sql.read.format("filodb.spark").option("dataset", "test1").load()
Create a config first and import the implicit functions:
scala> val config = com.typesafe.config.ConfigFactory.parseString("max-outstanding-futures = 16")
config: com.typesafe.config.Config = Config(SimpleConfigObject({"max-outstanding-futures":16}))
scala> import filodb.spark._
import filodb.spark._
The easiest way to create a table, its columns, and ingest data is to use the implicit method saveAsFiloDataset
:
scala> sqlContext.saveAsFiloDataset(myDF, config, "table1", createDataset=true)
Currently it does not append but rather overwrites, but this will be fixed. Reading is just as easy:
scala> val df = sqlContext.filoDataset(config, "gdelt")
15/06/04 15:21:41 INFO DCAwareRoundRobinPolicy: Using data-center name 'datacenter1' for DCAwareRoundRobinPolicy (if this is incorrect, please provide the correct datacenter name with DCAwareRoundRobinPolicy constructor)
15/06/04 15:21:41 INFO Cluster: New Cassandra host localhost/127.0.0.1:9042 added
15/06/04 15:21:41 INFO FiloRelation: Read schema for dataset gdelt = Map(ActionGeo_CountryCode -> Column(ActionGeo_CountryCode,gdelt,0,StringColumn,FiloSerializer,false,false), Actor1Geo_FullName -> Column(Actor1Geo_FullName,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2Name -> Column(Actor2Name,gdelt,0,StringColumn,FiloSerializer,false,false), ActionGeo_ADM1Code -> Column(ActionGeo_ADM1Code,gdelt,0,StringColumn,FiloSerializer,false,false), Actor2CountryCode -> Column(Actor2CountryCode,gdelt,0,StringColumn,FiloSerializer,fals...
You could also verify the schema via df.printSchema
.
Now do some queries, using the DataFrame DSL:
scala> df.select(count(df("MonthYear"))).show()
...<skipping lots of logging>...
COUNT(MonthYear)
4037998
or SQL, to find the top 15 events with the highest tone:
scala> df.registerTempTable("gdelt")
scala> sqlContext.sql("SELECT Actor1Name, Actor2Name, AvgTone FROM gdelt ORDER BY AvgTone DESC LIMIT 15").collect()
res13: Array[org.apache.spark.sql.Row] = Array([208077.29634561483])
Now, how about something uniquely Spark .. feed SQL query results to MLLib to compute a correlation:
scala> import org.apache.spark.mllib.stat.Statistics
scala> val numMentions = df.select("NumMentions").map(row => row.getInt(0).toDouble)
numMentions: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[100] at map at DataFrame.scala:848
scala> val numArticles = df.select("NumArticles").map(row => row.getInt(0).toDouble)
numArticles: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[104] at map at DataFrame.scala:848
scala> val correlation = Statistics.corr(numMentions, numArticles, "pearson")
First, build the CLI using sbt cli/assembly
. This will create an executable in cli/target/scala-2.10/filo-cli-*
. In the examples below, the "filo-cli" command is an alias to this executable.
Create a dataset with all the columns from the GDELT public dataset:
filo-cli --command create --dataset gdelt --columns GLOBALEVENTID:int,SQLDATE:string,MonthYear:int,Year:int,FractionDate:double,Actor1Code:string,Actor1Name:string,Actor1CountryCode:string,Actor1KnownGroupCode:string,Actor1EthnicCode:string,Actor1Religion1Code:string,Actor1Religion2Code:string,Actor1Type1Code:string,Actor1Type2Code:string,Actor1Type3Code:string,Actor2Code:string,Actor2Name:string,Actor2CountryCode:string,Actor2KnownGroupCode:string,Actor2EthnicCode:string,Actor2Religion1Code:string,Actor2Religion2Code:string,Actor2Type1Code:string,Actor2Type2Code:string,Actor2Type3Code:string,IsRootEvent:int,EventCode:string,EventBaseCode:string,EventRootCode:string,QuadClass:int,GoldsteinScale:double,NumMentions:int,NumSources:int,NumArticles:int,AvgTone:double,Actor1Geo_Type:int,Actor1Geo_FullName:string,Actor1Geo_CountryCode:string,Actor1Geo_ADM1Code:string,Actor1Geo_Lat:double,Actor1Geo_Long:double,Actor1Geo_FeatureID:int,Actor2Geo_Type:int,Actor2Geo_FullName:string,Actor2Geo_CountryCode:string,Actor2Geo_ADM1Code:string,Actor2Geo_Lat:double,Actor2Geo_Long:double,Actor2Geo_FeatureID:int,ActionGeo_Type:int,ActionGeo_FullName:string,ActionGeo_CountryCode:string,ActionGeo_ADM1Code:string,ActionGeo_Lat:double,ActionGeo_Long:double,ActionGeo_FeatureID:int,DATEADDED:string,Actor1Geo_FullLocation:string,Actor2Geo_FullLocation:string,ActionGeo_FullLocation:string
You could also add columns later with the same syntax.
Create a partition:
filo-cli --command create --dataset gdelt --partition first
Verify the dataset metadata:
filo-cli --command list --dataset gdelt
Import a CSV file (note: it must have a header row, the column names must match what was created before, and must be comma-delimited):
filo-cli --command importcsv --dataset gdelt --partition first --filename GDELT_1979-1984.csv
Query/export some columns:
filo-cli --dataset gdelt --partition first --select MonthYear,Actor2Code