/csv-async

CSV parsing in async context

Primary LanguageRustMIT LicenseMIT

csv-async

crates.io Documentation Version

build status build status build status codecov

This is CSV library to use in asynchronous environment. Implemented API is similar to existing csv crate with few exceptions like builder's create_ functions instead of from_ as in csv.

Some code is borrowed from csv crate (synchronized with version 1.1.6 - Mar 2021). This package shares CSV parsing routines with csv by means of using csv-core crate. Major version of this crate will be kept in sync with major version of csv with which it is API compatible.

CSV files are being read or written by objects of types AsyncReader / AsyncWriter to / from generic text-based structures or by AsyncDeserializer / AsyncSerializer to / from data specific structures with generated serde interfaces.

Library does not contain synchronous reader/writer. If you need it - please use csv crate.

Cargo Features

Features which can be enabled / disabled during library build.

Feature Default Description
with_serde on Enables crate to use serde derive macros
tokio off Enables crate to be used with tokio runtime and libraries

Enabling tokio feature allows user to use tokio::fs::File and makes AsyncReader (AsyncWriter) to be based on tokio::io::AsyncRead (tokio::io::AsyncWrite). Currently this crate depends on tokio version 0.2.

Without tokio feature, this crate depends only on futures crate and reader (writer) are based on traits futures::io::AsyncRead (futures::io::AsyncWrite), what allows user to use async_std::fs::File.

Example usage:

Sample input file:

city,region,country,population
Southborough,MA,United States,9686
Northbridge,MA,United States,14061
Marlborough,MA,United States,38334
Springfield,MA,United States,152227
Springfield,MO,United States,150443
Springfield,NJ,United States,14976
Concord,NH,United States,42605
use std::error::Error;
use std::process;
use futures::stream::StreamExt;
use async_std::fs::File;

async fn filter_by_region(region:&str, file_in:&str, file_out:&str) -> Result<(), Box<dyn Error>> {
    // Function reads CSV file that has column named "region" at second position (index = 1).
    // It writes to new file only rows with region equal to passed argument
    // and removes region column.
    let mut rdr = csv_async::AsyncReader::from_reader(
        File::open(file_in).await?
    );
    let mut wri = csv_async::AsyncWriter::from_writer(
        File::create(file_out).await?
    );
    wri.write_record(rdr
        .headers()
        .await?.into_iter()
        .filter(|h| *h != "region")
    ).await?;
    let mut records = rdr.records();
    while let Some(record) = records.next().await {
        let record = record?;
        match record.get(1) {
            Some(reg) if reg == region => 
                wri.write_record(record
                    .iter()
                    .enumerate()
                    .filter(|(i, _)| *i != 1)
                    .map(|(_, s)| s)
                ).await?,
            _ => {},
        }
    }
    Ok(())
}

fn main() {
    async_std::task::block_on(async {
        if let Err(err) = filter_by_region(
            "MA",
            "/tmp/all_regions.csv",
            "/tmp/MA_only.csv"
        ).await {
            eprintln!("error running filter_by_region: {}", err);
            process::exit(1);
        }
    });
}

For serde example please see documentation root page.

Plans

Some ideas for future development:

  • Create benchmarks, maybe some performance improvements.
  • Things marked as TODO in the code.
  • Support for smol asynchronous runtime.
  • Create more examples and tutorial.