oban-bg/oban

Support Stream in `Oban.Pro.Workers.Batch.new_batch/2`

Closed this issue · 3 comments

Currently, Oban.Pro.Workers.Batch.new_batch/2 accepts list of args/jobs so it is not possible to prepare batch in streaming fashion like this:

csv
|> download()
|> stream_csv_records()
|> MyBatchJob.new_batch()
|> Oban.insert_all(timeout: 60_000)

I made a workaround by adding batch_id manually:

csv
|> download()
|> stream_csv_records()
|> Stream.map(fn row ->
  company_id
  |> create_job(row, operation)
  |> put_batch_id(batch_id)
end)
|> Oban.insert_all(timeout: 60_000)

and it seems to work, but I have mixed feeling about this as this approach relies on Oban's internals (meta, batch_id, etc.). I know that new_batch/1 can be overridden but I believe it should be handled natively.

The current implementation of both Batch.new can't gracefully handle streams. For this to be effective, Batch.new would need to return a stream rather than a list.

There are some other big changes coming for Batches, and we'll keep this in mind. In the meantime, you can safely keep injecting a batch_id into meta (it's private, but won't be changing).

Thanks for the explanation 👍🏻

This is on main for oban_pro and will be out soon (🤞) with v1.5