dotnetcore/CAP

Scheduling freezes on large number of delayed messages

eubelov opened this issue · 1 comments

Setup: CAP 8.1 RabbitMq + Postgres
Given: A large number of delayed messages (>20000 in my case)

When IDataStorage.ScheduleMessagesOfDelayedAsync fetches all messages from the DB and schedules them for processing, entire operation is executed inside a DB transaction and with FOR UPDATE SKIP LOCKED instruction that locks all selected rows for the duration of the transaction.

Next, the following happens:

  1. Transaction (T1) starts in ScheduleMessagesOfDelayedAsync
  2. Pending messages are read and enqueued via _dispatcher.EnqueueToScheduler() (IProcessor.Delayed.cs)
  3. Messages are added to the _schedulerQueue priority queue for async processing by a background task. (Dispatcher.cs)
  4. The BG task picks the next available message from the priority queue: _schedulerQueue.TryPeek(out _, out _nextSendTime)
  5. The message is sent to transport (await _sender.SendAsync(_schedulerQueue.Dequeue());) and SetSuccessfulState is called (IMessageSender.Default.cs)
  6. An update statement is executed in ChangeMessageStateAsync() (IDataStorage.PostgreSql.cs) against the DB while the T1 transaction is still not committed. Thus, the update statement times out and fails as the affected row is still locked by T1.
  7. The while (_schedulerQueue.TryPeek(out _, out _nextSendTime)) loop freezes forever. (IDispatcher.Default.cs)

It looks similar to #1429. It is caused by ScheduleMessagesOfDelayedAsync triggering again during the execution of T1 transaction. Please refer to the solution in #1429. If there are a large number of delayed messages at the same time, you may need to override the QueuedMessageFetchTime method.