FiloDB
High-performance distributed analytical database + Spark SQL queries + built for streaming.
filodb-announce google group and filodb-discuss google group
_______ __ ____ ____
/ ____(_) /___ / __ \/ __ )
/ /_ / / / __ \/ / / / __ |
/ __/ / / / /_/ / /_/ / /_/ /
/_/ /_/_/\____/_____/_____/
Columnar, versioned layers of data wrapped in a yummy high-performance analytical database engine.
See architecture and datasets and reading for more information. Also see the Spark Notebooks under doc
... there is one for time-series/geo analysis of the NYC Taxi dataset, and one for interactive charting of the GDELT dataset!
Table of Contents generated with DocToc
- Overview
- Pre-requisites
- Getting Started
- Introduction to FiloDB Data Modelling
- Using FiloDB Data Source with Spark
- Using the CLI
- Current Status
- Deploying
- Monitoring and Metrics
- Code Walkthrough
- Building and Testing
- You can help!
Overview
FiloDB is a new open-source distributed, versioned, and columnar analytical database designed for modern streaming workloads.
- High performance - competitive with Parquet scan speeds, plus filtering along two or more dimensions
- Very flexible filtering: filter on only part of a partition key, much more flexible than allowed in Cassandra
- Much faster bulk ingestion than raw Cassandra tables
- Compact storage - within 35% of Parquet for CassandraColumnStore
- Up to 27x more data stored per GB, compared to Cassandra 2.x, in real world fact table storage
- See the blog post on Apache Cassandra for analytics: a performance and storage analysis
- Idempotent writes - primary-key based appends and updates; easy exactly-once ingestion from streaming sources
- Distributed - pluggable storage engine includes Apache Cassandra and in-memory
- Low-latency - minimal SQL query latency of 15ms on one node; sub-second easily achievable with filtering and easy to use concurrency control
- SQL queries - plug in Tableau or any tool using JDBC/ODBC drivers
- Ingest from Spark/Spark Streaming from any supported Spark data source
Overview presentation -- see the docs folder for design docs.
To compile the .mermaid source files to .png's, install the Mermaid CLI.
Use Cases
- Storage and analysis of streaming event / time series data
- Data warehousing
- In-memory database for Spark Streaming analytics
- Low-latency in-memory SQL database engine
Anti-use-cases
- Heavily transactional, update-oriented workflows
Roadmap
Your input is appreciated!
- Productionization and automated stress testing
- Kafka input API / connector (without needing Spark)
- In-memory caching for significant query speedup
- True columnar querying and execution, using late materialization and vectorization techniques. GPU/SIMD.
- Projections. Often-repeated queries can be sped up significantly with projections.
Pre-requisites
- Java 8
- SBT to build
- Apache Cassandra (We prefer using CCM for local testing) (Optional if you are using the in-memory column store)
- Apache Spark (1.6.x)
Getting Started
-
Clone the project and cd into the project directory,
$ git clone https://github.com/tuplejump/FiloDB.git $ cd FiloDB
-
Choose either the Cassandra column store (default) or the in-memory column store.
- Start a Cassandra Cluster.
- Copy
core/src/main/resources/filodb-defaults.conf
and modify the Cassandra settings for your cluster. This step and passing in a custom config may be skipped for a localhost Cassandra cluster with no auth. - Or, use FiloDB's in-memory column store with Spark (does not work with CLI). Pass the
--conf spark.filodb.store=in-memory
tospark-submit
/spark-shell
. This is a great option to test things out, and is really really fast!
-
For Cassandra, update the
keyspace-replication-options
config, then runfilo-cli -Dconfig.file=/path/to/my/filo.conf --command init
to initialize the defaultfilodb_admin
keyspace. In addition, you should use CQLSH to create any additional keyspaces you desire to store FiloDB datasets in. -
Dataset creation can be done using
filo-cli
or using Spark Shell / Scala/Java API. -
Inserting data can be done using
filo-cli
(CSV only), using Spark SQL/JDBC (INSERT INTO), or the Spark Shell / Scala / Java API. -
Querying is done using Spark SQL/JDBC or Scala/Java API.
-
Listing/deleting/maintenance can be done using
filo-cli
. If using Cassandra,cqlsh
can also be used to inspect metadata.
Note: There is at least one release out now, tagged via Git and also located in the "Releases" tab on Github.
Introduction to FiloDB Data Modelling
Perhaps it's easiest by starting with a diagram of how FiloDB stores data.
Column A | Column B | |||
Partition key 1 | Segment 1 | Segment 2 | Segment 1 | Segment 2 |
Partition key 2 | Segment 1 | Segment 2 | Segment 1 | Segment 2 |
Three types of key define the data model of a FiloDB table.
- partition key - decides how data is going to be distributed across the cluster. All data within one partition key is guaranteed to fit on one node. May consist of multiple columns.
- segment key - groups row values into efficient chunks. Segments within a partition are sorted by segment key and range scans can be done over segment keys.
- row key - acts as a primary key within each partition and decides how data will be sorted within each segment. May consist of multiple columns.
The PRIMARY KEY for FiloDB consists of (partition key, segment_key, row key). When choosing the above values you must make sure the combination of the three are unique. No component of a primary key may be null - see the :getOrElse
function for a way of dealing with null inputs.
Specifying the partitioning column is optional. If a partitioning column is not specified, FiloDB will create a default one with a fixed value, which means everything will be thrown into one node, and is only suitable for small amounts of data. If you don't specify a partitioning column, then you have to make sure combination of segment key and row key values are all unique.
For examples of data modeling and choosing keys, see the examples below as well as datasets.
For additional information refer to Data Modeling and Performance Considerations.
Computed Columns
You may specify a function, or computed column, for use with any key column. This is especially useful for working around the non-null requirement for keys, or for computing a good segment key.
Name | Description | Example |
---|---|---|
string | returns a constant string value | :string /0 |
getOrElse | returns default value if column value is null. NOTE: do not use the default value null for strings. | :getOrElse columnA --- |
round | rounds down a numeric column. Useful for bucketing by time or bucketing numeric IDs. | :round timestamp 10000 |
stringPrefix | takes the first N chars of a string; good for partitioning | :stringPrefix token 4 |
timeslice | bucketizes a Long (millisecond) or Timestamp column using duration strings - 500ms, 5s, 10m, 3h, etc. | :timeslice arrivalTime 30s |
monthOfYear | return 1 to 12 (IntColumn) for the month number of a Long (millisecond) or Timestamp column | :monthOfYear pickup_datetime |
FiloDB vs Cassandra Data Modelling
- Like Cassandra, partitions (physical rows) distribute data and clustering keys act as a primary key within a partition
- Like Cassandra, a single partition is the smallest unit of parallelism when querying from Spark
- Wider rows work better for FiloDB (bigger chunk/segment size)
- FiloDB does not have Cassandra's restrictions for partition key filtering. You can filter by any partition keys with most operators. This means less tables in FiloDB can match more query patterns.
- Cassandra range scans over clustering keys is available over the segment key
Data Modelling and Performance Considerations
Choosing Partition Keys.
- Partition keys are the most efficient way to filter data. Remember that, unlike Cassandra, FiloDB is able to efficiently filter any column in a partition key -- even string contains, IN on only one column. It can do this because FiloDB pre-scans a much smaller table ahead of scanning the main columnar chunk table. This flexibility means that there is no need to populate different tables with different orders of partition keys just to optimize for different queries.
- If there are too few partitions, then FiloDB will not be able to distribute and parallelize reads.
- If the numer of rows in each partition is too few, then the storage will not be efficient.
- If the partition key is time based, there may be a hotspot in the cluster as recent data is all written into the same set of replicas, and likewise for read patterns as well.
- Consider picking a column or group of columns with low cardinality, and has good distribution so that data is distributed across the cluster.
- Consider only those columns that do not get updated. Since partition key is part of primary key, partition key columns cannot get updated.
Segment Key and Chunk Size.
Within each partition, data is delineated by the segment key into segments. Within each segment, successive flushes of the MemTable writes data in chunks. The segmentation is key to sorting and filtering data within partitions, and the chunk size (which depends on segmentation) also affects performance. The smaller the chunk size, the higher the overhead of scanning data becomes.
Segmentation and chunk size distribution may be checked by the CLI analyze
command. In addition, the following configuration affects segmentation and chunk size:
memtable.max-rows-per-table
,memtable.flush-trigger-rows
affects how many rows are kept in the MemTable at a time, and this along with how many partitions are in the MemTable directly leads to the chunk size upon flushing.- The segment size is directly controlled by the segment key. Choosing a segment key that groups data into big enough chunks (at least 1000 is a good guide) is highly recommended. Experimentation along with running
filo-cli analyze
is recommended to come up with a good segment key. See the Spark ingestion of GDELT below on an example... choosing an inappropriate segment key leads to MUCH slower ingest and read performance. chunk_size
option when creating a dataset caps the size of a single chunk.- Avoid picking any column that has the possibility of getting updated as a segment key column.
- Consider a low cardinal column within partition for segment key. Ideal segment key will hold at least 1000 values in a chunk as explained above. Use a computed column like
:string /0
as the segment key if there are no good candidates available or your chosen segment key has potential to hold very few values in chunks under each segment key. - Consider moving one of the partition keys as segment keys if your partition is not too wide. Use the
analyze
filo-cli command to discover partition size. - Cosider creating computed columns to make good segment key. Ex: Rounding date to month etc.
- Ideal segment key would have chunks filled with thousands of values and frequently gets used as filter in queries.
Predicate Pushdowns
To help with planning, here is an exact list of the predicate pushdowns (in Spark) that help with reducing I/O and query times:
- Partition key column(s): =, IN on any partition key column
- = on every partition key results in a single-partition query
- = or IN on every partition key results in a multi-partition query, still faster than full table scan
- Otherwise, a filtered full table scan results - partitions not matching predicates will not be scanned
- Segment key: must be of the form
segmentKey >/>= value AND segmentKey </<= value
orsegmentKey = value
Segment Key predicates will pushdown to cassandra if your storage engine is in Cassandra. FiloDB segment keys map to cluster key of the underlying cassandra storage.
Note: You can see predicate pushdown filters in application logs by setting logging level to INFO.
Example FiloDB Schema for machine metrics
This is one way I would recommend setting things up to take advantage of FiloDB.
The metric names are the column names. This lets you select on just one metric and effectively take advantage of columnar layout.
- Partition key = hostname
- Row key = timestamp (say millis)
- Segment key =
:round timestamp 10000
(Timestamp rounded to nearest 10000 millis or 10 seconds) - Columns: hostname, timestamp, CPU, load_avg, disk_usage, etc.
You can add more metrics/columns over time, but storing each metric in its own column is FAR FAR more efficient, at least in FiloDB. For example, disk usage metrics are likely to have very different numbers than load_avg, and so Filo can optimize the storage of each one independently. Right now I would store them as ints and longs if possible.
With the above layout, as long as there aren’t too many hostnames, set the memtable max size and flush trigger to both high numbers, you should get good read performance. Queries that would work well for the above layout:
- SELECT avg(load_avg), min(load_avg), max(load_avg) FROM metrics WHERE timestamp > t1 AND timestamp < t2 etc.
Queries that would work well once we expose a local Cassandra query interface:
- Select metrics from one individual host
Another possible layout is something like this:
- Partition key = hostname % 1024 (or pick your # of shards)
- Row key = hostname, timestamp
Distributed Partitioning
Currently, FiloDB is a library in Spark and requires the user to distribute data such that no two nodes have rows with the same partition key.
- The easiest strategy to accomplish this is to have data partitioned via a queue such as Kafka. That way, when the data comes into Spark Streaming, it is already partitioned correctly.
- Another way of accomplishing this is to use a DataFrame's
sort
method before using the DataFrame write API.
Using FiloDB Data Source with Spark
FiloDB has a Spark data-source module - filodb.spark
. So, you can use the Spark Dataframes read
and write
APIs with FiloDB. To use it, follow the steps below
- Start Cassandra and update project configuration if required.
- From the FiloDB project directory, execute,
$ sbt clean $ ./filo-cli --command init $ sbt spark/assembly
- Use the jar
FiloDB/spark/target/scala-2.10/filodb-spark-assembly-0.3.jar
with Spark 1.6.x.
The options to use with the data-source api are:
option | value | command | optional |
---|---|---|---|
dataset | name of the dataset | read/write | No |
database | name of the database to use for the dataset. For Cassandra, defaults to filodb.cassandra.keyspace config. |
read/write | Yes |
row_keys | comma-separated list of column name(s) or computed column functions to use for the row primary key within each partition. Cannot be null. Use :getOrElse function if null values might be encountered. |
write | No if mode is OverWrite or creating dataset for first time |
segment_key | name of the column (could be computed) to use to group rows into segments in a partition. Cannot be null. Use :getOrElse function if null values might be encountered. |
write | yes - defaults to :string /0 |
partition_keys | comma-separated list of column name(s) or computed column functions to use for the partition key. Cannot be null. Use :getOrElse function if null values might be encountered. If not specified, defaults to :string /0 (a single partition). |
write | Yes |
splits_per_node | number of read threads per node, defaults to 4 | read | Yes |
reset_schema | If true, allows dataset schema (eg partition keys) to be redefined for an existing dataset when SaveMode.Overwrite is used. Defaults to false. | write | Yes |
chunk_size | Max number of rows to put into one chunk. Note that this only has an effect if the dataset is created for the first time. | write | Yes |
flush_after_write | initiates a memtable flush after Spark INSERT / DataFrame.write; this ensures all the rows are flushed to ColumnStore. Might want to be turned off for streaming | write | yes - default true |
version | numeric version of data to write, defaults to 0 | read/write | Yes |
Partitioning columns could be created using an expression on the original column in Spark:
val newDF = df.withColumn("partition", df("someCol") % 100)
or even UDFs:
val idHash = sqlContext.udf.register("hashCode", (s: String) => s.hashCode())
val newDF = df.withColumn("partition", idHash(df("id")) % 100)
However, note that the above methods will lead to a physical column being created, so use of computed columns is probably preferable.
Configuring FiloDB
Some options must be configured before starting the Spark Shell or Spark application. FiloDB executables are invoked by spark application. These configuration settings can be tuned as per the needs of individual application invoking filoDB executables. There are two methods:
- Modify the
application.conf
and rebuild, or repackage a new configuration. - Override the built in defaults by setting SparkConf configuration values, preceding the filo settings with
spark.filodb
. For example, to change the default keyspace, pass--conf spark.filodb.cassandra.keyspace=mykeyspace
to Spark Shell/Submit. To use the fast in-memory column store instead of Cassandra, pass--conf spark.filodb.store=in-memory
. - It might be easier to pass in an entire configuration file to FiloDB. Pass the java option
-Dfilodb.config.file=/path/to/my-filodb.conf
, for example using--java-driver-options
.
Note that if Cassandra is kept as the default column store, the keyspace can be changed on each transaction by specifying the database
option in the data source API, or the database parameter in the Scala API.
For metrics system configuration, see the metrics section below.
Passing Cassandra Authentication Settings
Typically, you have per-environment configuration files, and you do not want to check in username and password information. Here are ways to pass in authentication settings:
-
Pass in the credentials on the command line.
- For Spark,
--conf spark.filodb.cassandra.username=XYZ
etc. - For CLI, other apps, pass in JVM args:
-Dfilodb.cassandra.username=XYZ
- For Spark,
-
Put the credentials in a local file on the host, and refer to it from your config file. In your config file, do
include "/usr/local/filodb-cass-auth.properties"
. The properties file would look like:filodb.cassandra.username=XYZ filodb.cassandra.password=AABBCC
Spark Data Source API Example (spark-shell)
You can follow along using the Spark Notebook... launch the notebook using EXTRA_CLASSPATH=$FILO_JAR ADD_JARS=$FILO_JAR ./bin/spark-notebook &
where FILO_JAR
is the path to filodb-spark-assembly
jar. See the FiloDB_GDELT notebook to follow the GDELT examples below, or the NYC Taxi notebook for some really neat time series/geo analysis!
Or you can start a spark-shell locally,
bin/spark-shell --jars ../FiloDB/spark/target/scala-2.10/filodb-spark-assembly-0.3.jar --packages com.databricks:spark-csv_2.10:1.2.0 --driver-memory 3G --executor-memory 3G
Loading CSV file from Spark:
scala> val csvDF = sqlContext.read.format("com.databricks.spark.csv").
option("header", "true").option("inferSchema", "true").
load("../FiloDB/GDELT-1979-1984-100000.csv")
Creating a dataset from a Spark DataFrame,
scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
scala> csvDF.write.format("filodb.spark").
option("dataset", "gdelt").
option("row_keys", "GLOBALEVENTID").
option("segment_key", ":round GLOBALEVENTID 10000").
option("partition_keys", ":getOrElse MonthYear -1").
mode(SaveMode.Overwrite).save()
Above, we partition the GDELT dataset by MonthYear, creating roughly 72 partitions for 1979-1984, with the unique GLOBALEVENTID used as a row key. We group every 10000 eventIDs into a segment using the convenient :round
computed column (GLOBALEVENTID is correlated with time, so in this case we could pack segments with consecutive EVENTIDs). You could use multiple columns for the partition or row keys, of course. For example, to partition by country code and year instead:
scala> csvDF.write.format("filodb.spark").
option("dataset", "gdelt_by_country_year").
option("row_keys", "GLOBALEVENTID").
option("segment_key", ":string 0").
option("partition_keys", ":getOrElse Actor2CountryCode NONE,:getOrElse Year -1").
mode(SaveMode.Overwrite).save()
Note that in the above case, since events are spread over a much larger number of partitions, it no longer makes sense to use GLOBALEVENTID as a segment key - at least with the original 10000 as a rounding factor. There are very few events for a given country and year within the space of 10000 event IDs, leading to inefficient storage. Instead, we use a single segment for each partition. We probably could have used :round GLOBALEVENTID 500000
or some other bigger factor as well. Using :round GLOBALEVENT 10000
lead to 3x slower ingest and at least 5x slower reads.
The key definitions can be left out for appends:
sourceDataFrame.write.format("filodb.spark").
option("dataset", "gdelt").
mode(SaveMode.Append).save()
Note that for efficient columnar encoding, wide rows with fewer partition keys are better for performance.
Reading the dataset,
val df = sqlContext.read.format("filodb.spark").option("dataset", "gdelt").load()
The dataset can be queried using the DataFrame DSL. See the section Querying Datasets for examples.
Note: For your production data loads sort the data frame before saving to FiloDB when the data source is not Cassandra. This will ensure to efficiently load segment chunks. Refer to Distributed Partitioning for additional info.
Spark/Scala/Java API
There is a more typesafe API than the Spark Data Source API.
import filodb.spark._
sqlContext.saveAsFilo(df, "gdelt",
rowKeys = Seq("GLOBALEVENTID"),
segmentKey = ":round GLOBALEVENTID 10000",
partitionKeys = Seq(":getOrElse MonthYear -1"))
The above creates the gdelt table based on the keys above, and also inserts data from the dataframe df.
NOTE: If you are running Spark Shell in DSE, you might need to do import _root_.filodb.spark._
.
Please see the ScalaDoc for the method for more details -- there is a database
option for specifying the Cassandra keyspace, and a mode
option for specifying the Spark SQL SaveMode.
There is also an API purely for inserting data... after all, specifying the keys is not needed when inserting into an existing table.
import filodb.spark._
sqlContext.insertIntoFilo(df, "gdelt")
The API for creating a DataFrame is also much more concise:
val df = sqlContext.filoDataset("gdelt")
val df2 = sqlContext.filoDataset("gdelt", database = Some("keyspace2"))
The above method calls rely on an implicit conversion. From Java, you would need to create a new FiloContext
first:
FiloContext fc = new filodb.spark.FiloContext(sqlContext);
fc.insertIntoFilo(df, "gdelt");
Spark Streaming Example
It's not difficult to ingest data into FiloDB using Spark Streaming. Simple use foreachRDD
on your DStream
and then transform each RDD into a DataFrame.
For an example, see the StreamingTest.
SQL/Hive Example
Start Spark-SQL:
bin/spark-sql --jars path/to/FiloDB/spark/target/scala-2.10/filodb-spark-assembly-0.3.jar
(NOTE: if you want to connect with a real Hive Metastore, you should probably instead start the thrift server, also adding the --jars
above, and then start the spark-beeline
client)
Create a temporary table using an existing dataset,
create temporary table gdelt
using filodb.spark
options (
dataset "gdelt"
);
Then, start running SQL queries!
You probably want to create a permanent Hive Metastore entry so you don't have to run create temporary table
every single time at startup:
CREATE TABLE gdelt using filodb.spark options (dataset "gdelt");
Once this is done, you could insert data using SQL syntax:
INSERT INTO TABLE gdelt SELECT * FROM othertable;
Of course, this assumes othertable
has a similar schema.
Querying Datasets
Now do some queries, using the DataFrame DSL:
scala> df.select(count(df("MonthYear"))).show()
...<skipping lots of logging>...
COUNT(MonthYear)
4037998
or SQL, to find the top 15 events with the highest tone:
scala> df.registerTempTable("gdelt")
scala> sqlContext.sql("SELECT Actor1Name, Actor2Name, AvgTone FROM gdelt ORDER BY AvgTone DESC LIMIT 15").collect()
res13: Array[org.apache.spark.sql.Row] = Array([208077.29634561483])
Now, how about something uniquely Spark .. feed SQL query results to MLLib to compute a correlation:
scala> import org.apache.spark.mllib.stat.Statistics
scala> val numMentions = df.select("NumMentions").map(row => row.getInt(0).toDouble)
numMentions: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[100] at map at DataFrame.scala:848
scala> val numArticles = df.select("NumArticles").map(row => row.getInt(0).toDouble)
numArticles: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[104] at map at DataFrame.scala:848
scala> val correlation = Statistics.corr(numMentions, numArticles, "pearson")
Notes: You can also query filoDB tables using Spark thrift server. Refer to SQL/Hive Example for additional information regarding thrift server.
FiloDB logs can be viewed in corresponding spark application logs by setting appropriate settings in log4j.properties
, or logback.xml
for DSE.
Using the CLI
The filo-cli
accepts arguments as key-value pairs. The following keys are supported:
key | purpose |
---|---|
dataset | It is required for all the operations. Its value should be the name of the dataset |
database | Specifies the "database" the dataset should operate in. For Cassandra, this is the keyspace. If not specified, uses config value. |
limit | This is optional key to be used with select . Its value should be the number of rows required. |
columns | This is required for defining the schema of a dataset. Its value should be a comma-separated string of the format, column1:typeOfColumn1,column2:typeOfColumn2 where column1 and column2 are the names of the columns and typeOfColumn1 and typeOfColumn2 are one of int ,long ,double ,string ,bool |
rowKeys | This is required for defining the row keys. Its value should be comma-separated list of column names or computed column functions to make up the row key |
segmentKey | The column name or computed column for the segment key |
partitionKeys | Comma-separated list of column names or computed columns to make up the partition key |
command | Its value can be either of init , create , importcsv , analyze , delete , truncate or list .The init command is used to create the FiloDB schema.The create command is used to define new a dataset. For example,./filo-cli --command create --dataset playlist --columns id:int,album:string,artist:string,title:string --rowKeys id --segmentKey ':string /0' Note: The sort column is not optional. The list command can be used to view the schema of a dataset. For example, ./filo-cli --command list --dataset playlist The importcsv command can be used to load data from a CSV file into a dataset. For example,./filo-cli --command importcsv --dataset playlist --filename playlist.csv Note: The CSV file should be delimited with comma and have a header row. The column names must match those specified when creating the schema for that dataset. |
The delete command is used to delete datasets, like a drop. |
|
truncate truncates data for an existing dataset to 0. |
|
select | Its value should be a comma-separated string of the columns to be selected,./filo-cli --dataset playlist --select album,title The result from select is printed in the console by default. An output file can be specified with the key --outfile . For example,./filo-cli --dataset playlist --select album,title --outfile playlist.csv |
delimiter | This is optional key to be used with importcsv command. Its value should be the field delimiter character. Default value is comma. |
numSegments | The maximum number of segments to analyze for the analyze command. Prevents analyze of large tables from taking too long. Defaults to 10000. |
timeoutSeconds | The number of seconds for timeout for initialization, table creation, other quick things |
Running the CLI
You may want to customize a configuration to point at your Cassandra cluster, or change other configuration parameters. The easiest is to pass in a customized config file:
./filo-cli -Dfilodb.config.file=/path/to/myfilo.conf --command init
You may also set the FILO_CONFIG_FILE
environment var instead, but any -Dfilodb.config.file
args passed in takes precedence.
Individual configuration params may also be changed by passing them on the command line. They must be the first arguments passed in. For example:
./filo-cli -Dfilodb.columnstore.segment-cache-size=10000 --command ingestcsv ....
All -D
config options must be passed before any other arguments.
You may also configure CLI logging by copying cli/src/main/resources/logback.xml
to your deploy folder, customizing it, and passing on the command line -Dlogback.configurationFile=/path/to/filo-cli-logback.xml
.
You can also change the logging directory by setting the FILO_LOG_DIR environment variable before calling the CLI.
NOTE: The CLI currently only operates on the Cassandra column store. The --database
option may be used to specify which keyspace to operate on. If the keyspace is not initialized, then FiloDB code will automatically create one for you, but you may want to create it yourself to control the options that you want.
CLI Example
The following examples use the GDELT public dataset and can be run from the project directory.
Create a dataset with all the columns :
./filo-cli --command create --dataset gdelt --columns GLOBALEVENTID:int,SQLDATE:string,MonthYear:int,Year:int,FractionDate:double,Actor1Code:string,Actor1Name:string,Actor1CountryCode:string,Actor1KnownGroupCode:string,Actor1EthnicCode:string,Actor1Religion1Code:string,Actor1Religion2Code:string,Actor1Type1Code:string,Actor1Type2Code:string,Actor1Type3Code:string,Actor2Code:string,Actor2Name:string,Actor2CountryCode:string,Actor2KnownGroupCode:string,Actor2EthnicCode:string,Actor2Religion1Code:string,Actor2Religion2Code:string,Actor2Type1Code:string,Actor2Type2Code:string,Actor2Type3Code:string,IsRootEvent:int,EventCode:string,EventBaseCode:string,EventRootCode:string,QuadClass:int,GoldsteinScale:double,NumMentions:int,NumSources:int,NumArticles:int,AvgTone:double,Actor1Geo_Type:int,Actor1Geo_FullName:string,Actor1Geo_CountryCode:string,Actor1Geo_ADM1Code:string,Actor1Geo_Lat:double,Actor1Geo_Long:double,Actor1Geo_FeatureID:int,Actor2Geo_Type:int,Actor2Geo_FullName:string,Actor2Geo_CountryCode:string,Actor2Geo_ADM1Code:string,Actor2Geo_Lat:double,Actor2Geo_Long:double,Actor2Geo_FeatureID:int,ActionGeo_Type:int,ActionGeo_FullName:string,ActionGeo_CountryCode:string,ActionGeo_ADM1Code:string,ActionGeo_Lat:double,ActionGeo_Long:double,ActionGeo_FeatureID:int,DATEADDED:string,Actor1Geo_FullLocation:string,Actor2Geo_FullLocation:string,ActionGeo_FullLocation:string --rowKeys GLOBALEVENTID --segmentKey ':string 0'
Verify the dataset metadata:
./filo-cli --command list --dataset gdelt
Import data from a CSV file:
./filo-cli --command importcsv --dataset gdelt --filename GDELT-1979-1984-100000.csv
Query/export some columns:
./filo-cli --dataset gdelt --select MonthYear,Actor2Code --limit 5 --outfile out.csv
Current Status
Version 0.3 is the stable, latest released version. It has been tested on a cluster for a different variety of schemas, has a stable data model and ingestion, and features a huge number of improvements over the previous version.
Upcoming version 0.4 change list:
- Defaults to Spark 1.6.1
- New metrics and monitoring framework based on Kamon.io, with built in stats logging and statsd output, and tracing of write path
- Replaced Phantom with direct usage of Java C* driver. Bonus: use prepared statements, should result in better performance all around especially on ingest; plus should support C* 3.0+
- WHERE clauses specifying multiple partition keys now get pushed down. Should result in much better read performance in those cases.
- New config
filodb.cassandra.keyspace-replication-options
allows any CQL replication option to be set when FiloDB keyspaces are created with CLI --command init - CLI log directory can be easily changed with FILO_LOG_DIR env var
- CLI analyze command can now analyze segments from multiple partitions up to a configurable maximum # of segments
- Allow comma-separated list of hosts for
filodb.cassandra.hosts
- Fix missing data on read issue with wrapping token ranges in C*
- Fix actor path uniqueness issue on ingestion
Version 0.3 change list:
- Read from / write to multiple keyspaces in C*, using the
database
option- All dataset/column metadata is stored in a central keyspace which defaults to
filodb_admin
- All dataset/column metadata is stored in a central keyspace which defaults to
- Stress tests
- Productionization and robustification all around, especially with
filo-cli
- A new clustering mode which ensures all data is flushed at the end of writes (when flush_after_write is true)
- Issue #77:
flush_after_write
and controlling memtable flushes at end of ETL - More efficient reads from DSE and when vnodes are enabled
- More efficient streaming ingestion: flushes no longer automatically done at end of each partition of data
- Issue #84: bug with ingesting into FiloDB after doing sort or shuffles using Tungsten Spark 1.5
- filo-cli delete command now deletes both metadata and data tables
- new filo-cli truncate command
- New Akka Cluster-based communication channel between driver and FiloDB coordinators helps ensure that all data is flushed at end of ingestions
Migrating Metadata from 0.2 Cassandra
The methods for columns and datasets are different, due to special characters and other requirements of the datasets table.
First, run ./filo-cli --command init
to create empty datasets and columns tables in the new admin keyspace.
Column metadata:
- Assuming the source keyspace is filodb, issue these commands in CQLSH:
copy filodb.columns to '/tmp/filodb_columns.csv';
- Add the name of the keyspace to the end of each line.
cat /tmp/filodb_columns.csv | tr -d '\r' | awk '{ $0=$0",filodb" } 1' >/tmp/new_columns.csv
- In CQLSH:
COPY filodb_admin.columns (dataset, version, name, columntype, id, isdeleted, database) FROM '/tmp/new_columns.csv';
Datasets metadata -- use Spark (note: this example is using Datastax Enterprise; if using regular spark, load the spark-cassandra-connector, and use sqlContext instead):
import org.apache.spark.sql.{Column, SaveMode}
import org.apache.spark.sql.catalyst.expressions.Literal
val oldData = csc.read.format("org.apache.spark.sql.cassandra").option("table", "datasets").option("keyspace", "filodb").load
oldData.withColumn("database", new Column(Literal("filodb"))).write.format("org.apache.spark.sql.cassandra").option("table", "datasets").option("keyspace", "filodb_admin").mode(SaveMode.Append).save
Deploying
The current version assumes Spark 1.6.x and Cassandra 2.1.x or 2.2.x, though it seems to work on Spark 1.5.x as well.
- sbt spark/assembly
- sbt cli/assembly
- Copy
core/src/main/resources/application.conf
and modify as needed for your own config file - Set FILO_CONFIG_FILE to the path to your custom config
- Run the cli jar as the filo CLI command line tool and initialize keyspaces if using Cassandra:
filo-cli-*.jar --command init
There is a branch for Datastax Enterprise 4.8 / Spark 1.4. Note that if you are using DSE or have vnodes enabled, a lower number of vnodes (16 or less) is STRONGLY recommended as higher numbers of vnodes slows down queries substantially and basically prevents subsecond queries from happening.
By default, FiloDB nodes (basically all the Spark executors) talk to each other using a random port and locally assigned hostname. You may wish to set filodb.spark.driver.port
, filodb.spark.executor.port
to assign specific ports (for AWS, for example) or possibly use a different config file on each host and set akka.remote.netty.tcp.hostname
on each host's config file.
Monitoring and Metrics
FiloDB uses Kamon for metrics and Akka/Futures/async tracing. Not only does this give us summary statistics, but this also gives us Zipkin-style tracing of the ingestion write path in production, which can give a much richer picture than just stats.
Metrics Sinks
- statsd sink - this is packaged into FiloDB's spark module (but not the CLI module) by default. All you need to do is stand up statsd, Statsite, or equivalent daemon. See the Kamon Statsd module guide for configuration.
- Kamon metrics logger - this is part of the coordinator module and will log all metrics (including segment trace metrics) at every Kamon tick interval, which defaults to 10 seconds. It is disabled by default but could be enabled with
--conf spark.filodb.metrics-logger.enabled=true
or changingfilodb.metrics-logger.enabled
in your conf file. Also, which metrics to log including pattern matching on names can be configured. - Kamon trace logger - this logs detailed trace information on segment appends and is always on if detailed tracing is on.
Metrics Configuration
Kamon has many configurable options. To get more detailed traces on the write / segment append path, for example, here is how you might pass to spark-submit
or spark-shell
options to set detailed tracing on and to trace 3% of all segment appends:
--driver-java-options '-XX:+UseG1GC -XX:MaxGCPauseMillis=500 -Dkamon.trace.level-of-detail=simple-trace -Dkamon.trace.random-sampler.chance=3'
Methods of configuring Kamon (except for the metrics logger):
- The best way to configure Kamon is to pass this Java property:
-Dkamon.config-provider=filodb.coordinator.KamonConfigProvider
. This lets you configure Kamon through the same mechanisms as the rest of FiloDB:-Dfilo.config.file
for example, and the configuration is automatically passed to each executor/worker. Otherwise: - Passing Java options on the command line with
-D
, or for Spark,--driver-java-options
and--executor-java-options
- Passing options in a config file and using
-Dconfig.file
. NOTE:-Dfilo.config.file
will not work because Kamon uses a different initialization stack. Need to be done for both drivers and executors.
Code Walkthrough
Please go to the architecture doc.
Building and Testing
Run the tests with sbt test
, or for continuous development, sbt ~test
. Noisy cassandra logs can be seen in filodb-test.log
.
To run benchmarks, from within SBT:
cd jmh
jmh:run -i 5 -wi 5 -f3
You can get the huge variety of JMH options by running jmh:run -help
.
You can help!
- Send your use cases for OLAP on Cassandra and Spark
- Especially IoT and Geospatial
- Email if you want to contribute
Your feedback will help decide the next batch of features, such as: - which data types to add support for - what architecture is best supported