riverqueue/river

Slow processing of large batches of jobs

Closed this issue · 1 comments

We're looking at River as a replacement for an existing Redis based worker system. We often enqueue a large batch of jobs (using InsertMany) that we want processed quickly. In testing it looks like River always waits FetchPollInterval between fetches even if there are jobs available in the queue.

The test we've set up is we have a single worker process with 32 workers pulling from the default queue. Connections are direct to Postgres 17. We are logging all of the jobs that are run and they each take ~25ms. We then batch produce 5k records and it prints batches of completions every second. If we increase the FetchPollInterval to 5 seconds the batches start to come in every 5 seconds instead. When we drop FetchPollInterval to 100ms then everything processes quickly but the batches happen far enough apart (small every minute larger ones every hour) that it using a small interval would cause a lot of unnecessary load on the database.

What knobs do we have here other than FetchPollInterval? I'm assuming we should (roughly) be matching worker counts to available core.

Would it make sense for River to ignore the interval and do an immediate fetch when the previous fetch had a full set of records? That is only apply FetchPollInterval when fewer records were returned than available workers.

I came up with a basic proof of concept that has greatly increased job throughput in local testing even when increasing FetchPollInterval to 5 seconds. If I understand how the fetch limiter works this should still respect FetchCooldown.

diff --git a/vendor/github.com/riverqueue/river/producer.go b/vendor/github.com/riverqueue/river/producer.go
index 6d6392ad..66b35b81 100644
--- a/vendor/github.com/riverqueue/river/producer.go
+++ b/vendor/github.com/riverqueue/river/producer.go
@@ -447,7 +447,11 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit
      if p.paused {
        continue
      }
-     p.innerFetchLoop(workCtx, fetchResultCh)
+     more := p.innerFetchLoop(workCtx, fetchResultCh)
+     if more {
+       fetchLimiter.Call()
+     }
+
      // Ensure we can't start another fetch when fetchCtx is done, even if
      // the fetchLimiter is also ready to fire:
      select {
@@ -461,7 +465,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit
  }
 }

-func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) {
+func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) bool {
  limit := p.maxJobsToFetch()
  go p.dispatchWork(workCtx, limit, fetchResultCh)

@@ -473,7 +477,7 @@ func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan pr
      } else if len(result.jobs) > 0 {
        p.startNewExecutors(workCtx, result.jobs)
      }
-     return
+     return len(result.jobs) == limit
    case result := <-p.jobResultCh:
      p.removeActiveJob(result.ID)
    case jobID := <-p.cancelCh: