Support Stream in `Oban.Pro.Workers.Batch.new_batch/2`
Closed this issue · 3 comments
Currently, Oban.Pro.Workers.Batch.new_batch/2
accepts list of args/jobs so it is not possible to prepare batch in streaming fashion like this:
csv
|> download()
|> stream_csv_records()
|> MyBatchJob.new_batch()
|> Oban.insert_all(timeout: 60_000)
I made a workaround by adding batch_id
manually:
csv
|> download()
|> stream_csv_records()
|> Stream.map(fn row ->
company_id
|> create_job(row, operation)
|> put_batch_id(batch_id)
end)
|> Oban.insert_all(timeout: 60_000)
and it seems to work, but I have mixed feeling about this as this approach relies on Oban's internals (meta, batch_id, etc.). I know that new_batch/1
can be overridden but I believe it should be handled natively.
The current implementation of both Batch.new can't gracefully handle streams. For this to be effective, Batch.new would need to return a stream rather than a list.
There are some other big changes coming for Batches, and we'll keep this in mind. In the meantime, you can safely keep injecting a batch_id into meta (it's private, but won't be changing).
Thanks for the explanation 👍🏻
This is on main for oban_pro and will be out soon (🤞) with v1.5