[EPIC]: Morsel-Driven Scheduler IO
Closed this issue · 0 comments
TLDR
- Migrate to upstream object store implementation for S3 (battle tested), Azure and GCP
- Add ability in arrow-rs to read parquet directly from an object store implementation
- Unified Scheduler that efficiently uses best practice to segregate CPU and network IO on different pools
- I/O reduction when querying parquet files from object storage
Plan
Following on from #2199 the next piece of the puzzle is how to handle IO in the context of the new scheduler, in particular interaction with object storage. Much of this work has already been started, but as @alamb rightly pointed out, how everything fits together is not fully articulated anywhere. This is my attempt to do just that.
As described in #2489, I intend to polish and release the object_store abstraction found in IOx to crates.io. This will in turn allow using it in arrow-rs and DataFusion. Much of the rationale for this is covered in #2489 and #2445, but specifically for the morsel-driven IO component, moving away from the chunk_reader notions of Read
and AsyncRead
is important - as these are don't map well to parquet files in object storage (apache/arrow-rs#1473).
The next step will be to integrate object_store
with parquet
as part of apache/arrow-rs#1605. This will provide an interface to stream RecordBatch
from parquet files located on object store, with support for projection-pushdown and row-group filtering. This will eventually integrate with predicate-pushdown (apache/arrow-rs#1191), but one step at a time.
Other row-oriented formats, e.g. CSV, JSON, etc... will not require custom support in arrow-rs, as pushdown cannot be performed using standard object store interfaces. The query engine will need to fetch the raw data, potentially utilising things like S3 Select, and stream it through the sync arrow-rs decoders.
In order to integrate this with the new scheduler an implementation of ObjectStore
will be needed that takes a tokio::runtime::Handle
and an existing Arc<dyn ObjectStore>
, and spawns the async work on that runtime. This can then be used by ParquetExec and friends.
This is necessary for a few reasons:
- The scheduler uses rayon and not tokio, and many
ObjectStore
will use primitives that need a tokio runtime - Scheduling CPU-bound work on the same threads as IO is likely to result in instability as the CPU-bound work will yield sporadically
- We want the CPU-bound parquet decoding to occur on the rayon threadpool where it can't starve IO tasks
- We want the IO-bound network fetch to occur on the tokio threadpool where it can be efficiently multiplexed
The end result of is a clear separation between IO-bound work, and CPU-bound work, in particular:
- Tokio is solely used to multiplex IO-bound work, ensuring stable tail latencies
- Rayon is used to perform synchronous, CPU-bound computations as part of the morsel-driven execution described in #2199
Thoughts, concerns, feedback, etc... are most welcome, things are definitely not set in stone, but this is my current plan of action. Let me know what you think 😄