/parquet2

Primary LanguageRustOtherNOASSERTION

Parquet2

This is a re-write of the official parquet crate with performance, parallelism and safety in mind.

Checkout the guide for details on how to use this crate to read parquet.

The five main differentiators in comparison with parquet are:

  • it uses #![forbid(unsafe_code)]
  • delegates parallelism downstream
  • decouples reading (IO intensive) from computing (CPU intensive)
  • it is faster (10-20x when reading to arrow format)
  • supports async read and write.
  • It is integration-tested against pyarrow and (py)spark 3

The overall idea is to offer the ability to read compressed parquet pages and a toolkit to decompress them to their favourite in-memory format.

This allows this crate's iterators to perform minimal CPU work, thereby maximizing throughput. It is up to the consumers to decide whether they want to take advantage of this through parallelism at the expense of memory usage (e.g. decompress and deserialize pages in threads) or not.

This crate cannot be used directly to read parquet (except metadata). To read data from parquet, checkout arrow2.

Functionality implemented

  • Read dictionary pages
  • Read and write V1 pages
  • Read and write V2 pages
  • Compression and de-compression (all)

Functionality not (yet) implemented

The parquet format has multiple encoding strategies for the different physical types. This crate currently reads from almost all of them, and supports encoding to a subset of them. They are:

Supported decoding

Delta-encodings are still experimental, as I have been unable to generate large pages encoded with them from spark, thereby hindering robust integration tests.

Encoding

Organization

  • read: read metadata and pages
  • write: write metadata and pages
  • encoding: encoders and decoders of the different parquet encodings
  • page: page declarations
  • metadata: parquet files metadata (e.g. FileMetaData)
  • schema: types metadata declaration (e.g. ConvertedType)
  • types.rs: physical type declaration (i.e. how things are represented in memory).
  • statistics: deserialized representation of a parquet page
  • compression: compressors and decompressors compression (e.g. Gzip)
  • error: errors declaration

Run integration tests

There are integration tests against parquet files generated by pyarrow. To run then, you will need to run

python3 -m venv venv
venv/bin/pip install pip --upgrade
venv/bin/pip install pyarrow==7
venv/bin/python tests/write_pyarrow.py
cargo test

before. This is only needed once (per change in the tests/write_pyarrow.py).

How to implement page readers

The in-memory format used to consume parquet pages strongly influences how the pages should be deserialized. As such, this crate does not commit to a particular in-memory format. Consumers are responsible for converting pages to their target in-memory format.

This git repository contains a serialization to a simple in-memory format in integration, that is used to validate integration with other implementations.

There is also an implementation for the arrow format here.

Higher Parallelism

Typically, converting a page into memory is expensive and thus consider how to distribute work across threads. E.g.

let handles = vec![];
for column in columns {
    let column_meta = metadata.row_groups[row_group].column(column);
    let compressed_pages = get_page_iterator(column_meta, &mut file, file)?.collect()?;
    // each compressed_page has a buffer; cloning is expensive(!). We move it so that the memory
    // is released at the end of the processing.
    handles.push(thread::spawn move {
        page_iter_to_array(compressed_pages.into_iter())
    })
}
let columns_from_all_groups = handles.join_all();

this will read the file as quickly as possible in the main thread and send CPU-intensive work to other threads, thereby maximizing IO reads (at the cost of storing multiple compressed pages in memory; buffering is also an option here).

Decoding flow

Generally, a parquet file is read as follows:

  1. Read metadata
  2. Seek a row group and column
  3. iterate over (compressed) pages within that (group, column)

This is IO-intensive, requires parsing thrift, and seeking within a file.

Once a compressed page is loaded into memory, it can be decompressed, decoded and deserialized into a specific in-memory format. All of these operations are CPU-intensive and are thus left to consumers to perform, as they may want to send this work to threads.

read -> compressed page -> decompressed page -> decoded bytes -> deserialized

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.