Create an Arrow Coder for Beam that allows us to create Ray Datasets
Opened this issue · 3 comments
This coder is the first step to allow us to create Ray Datasets based on Beam PCollections.
fyi @TheNeuralBit
Thanks, I think it's worth tracking this in a Beam issue as well. Could you provide some references for Ray Datasets that would inform how an Arrow encoded PCollection can integrate with it?
theres's very silly superficial stuff I wrote here: https://docs.google.com/document/d/1DcuKhCPnZezIvu9vFMsM4BRdBv0kgAWewOJqRbS42GI/edit#
Specifically, I would say read_datasource
may be a good point to look at - it spins up several Ray Tasks that read individual blocks. Each block is usually stored as a block of Arrow records.
I suppose an integration we could have is something like:
class RayPCollection(beam.PCollection):
def to_dataset(self):
pipeline_result = self.pipeline.run()
pcoll = pipeline_result.get_pcoll(pcoll)
return ray.data.Dataset(BlockList(pcoll.block_list))
or something like that
Beam has a few utilities to convert to-from Beam and Arrow schemas (see here).
A first step would be to write an ArrowRecordBatchCoder
, which can be constructed with a Beam Schema or an Arrow Schema, where each individual element is an Arrow RecordBatch
- Coder interfaces for Beam - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/coders.py
- An example of a Row Coder (complex) and a VarIntCoder (simple)
And then we can write a simple PTransform that takes a Beam PCollection of rows with schema, into a Beam PCollection where each element is an Arrow RecordBatch. (and viceversa)
Then it becomes easier to pass this to Ray's Datasets (and also into Beam from Datasets).
A second step could be to encode Beam Rows as batches of Arrow Records, but we can think about that once we do the first step.