/datafusion

DataFusion is an attempt at building a modern distributed compute platform in Rust, using Apache Arrow as the memory model

Primary LanguageRustApache License 2.0Apache-2.0

License Version Build Status Coverage Status Gitter chat

DataFusion: Modern Distributed Compute Platform implemented in Rust

DataFusion is an attempt at building a modern distributed compute platform in Rust, leveraging Apache Arrow as the memory model.

NOTE: DataFusion was donated to the Apache Arrow project in February 2019. Source is here.

See my article How To Build a Modern Distributed Compute Platform to learn about the design and my motivation for building this. The TL;DR is that this project is a great way to learn about building a query engine but this is quite early and not usable for any real world work just yet.

Status

The current code supports single-threaded execution of limited SQL queries (projection, selection, and aggregates) against CSV files. Parquet files will be supported shortly.

To use DataFusion as a crate dependency, add the following to your Cargo.toml:

[dependencies]
datafusion = "0.6.0"

Here is a brief example for running a SQL query against a CSV file. See the examples directory for full examples.

fn main() {
    // create local execution context
    let mut ctx = ExecutionContext::new();

    // define schema for data source (csv file)
    let schema = Arc::new(Schema::new(vec![
        Field::new("city", DataType::Utf8, false),
        Field::new("lat", DataType::Float64, false),
        Field::new("lng", DataType::Float64, false),
    ]));

    // register csv file with the execution context
    let csv_datasource = CsvDataSource::new("test/data/uk_cities.csv", schema.clone(), 1024);
    ctx.register_datasource("cities", Rc::new(RefCell::new(csv_datasource)));

    // simple projection and selection
    let sql = "SELECT city, lat, lng FROM cities WHERE lat > 51.0 AND lat < 53";

    // execute the query
    let relation = ctx.sql(&sql).unwrap();

    // display the relation
    let mut results = relation.borrow_mut();

    while let Some(batch) = results.next().unwrap() {

        println!(
            "RecordBatch has {} rows and {} columns",
            batch.num_rows(),
            batch.num_columns()
        );

        let city = batch
            .column(0)
            .as_any()
            .downcast_ref::<BinaryArray>()
            .unwrap();

        let lat = batch
            .column(1)
            .as_any()
            .downcast_ref::<Float64Array>()
            .unwrap();

        let lng = batch
            .column(2)
            .as_any()
            .downcast_ref::<Float64Array>()
            .unwrap();

        for i in 0..batch.num_rows() {
            let city_name: String = String::from_utf8(city.get_value(i).to_vec()).unwrap();

            println!(
                "City: {}, Latitude: {}, Longitude: {}",
                city_name,
                lat.value(i),
                lng.value(i),
            );
        }
    }
}

Roadmap

See ROADMAP.md for the full roadmap.

Prerequisites

  • Rust nightly (required by parquet-rs crate)

Building DataFusion

See BUILDING.md.

Gitter

There is a Gitter channel where you can ask questions about the project or make feature suggestions too.

Contributing

Contributors are welcome! Please see CONTRIBUTING.md for details.