ray-project/ray_beam_runner

Create an Arrow Coder for Beam that allows us to create Ray Datasets

Opened this issue · 3 comments

This coder is the first step to allow us to create Ray Datasets based on Beam PCollections.

fyi @TheNeuralBit

Thanks, I think it's worth tracking this in a Beam issue as well. Could you provide some references for Ray Datasets that would inform how an Arrow encoded PCollection can integrate with it?

theres's very silly superficial stuff I wrote here: https://docs.google.com/document/d/1DcuKhCPnZezIvu9vFMsM4BRdBv0kgAWewOJqRbS42GI/edit#

Specifically, I would say read_datasource may be a good point to look at - it spins up several Ray Tasks that read individual blocks. Each block is usually stored as a block of Arrow records.

I suppose an integration we could have is something like:

class RayPCollection(beam.PCollection):

  def to_dataset(self):
    pipeline_result = self.pipeline.run()
    pcoll = pipeline_result.get_pcoll(pcoll)
    return ray.data.Dataset(BlockList(pcoll.block_list))

or something like that

Beam has a few utilities to convert to-from Beam and Arrow schemas (see here).

A first step would be to write an ArrowRecordBatchCoder, which can be constructed with a Beam Schema or an Arrow Schema, where each individual element is an Arrow RecordBatch

And then we can write a simple PTransform that takes a Beam PCollection of rows with schema, into a Beam PCollection where each element is an Arrow RecordBatch. (and viceversa)

Then it becomes easier to pass this to Ray's Datasets (and also into Beam from Datasets).


A second step could be to encode Beam Rows as batches of Arrow Records, but we can think about that once we do the first step.