diamondio/better-queue

Memory leak with priorities

Opened this issue · 4 comments

Hi there,

It appears that the priorities implementation is leaking.

Whenever a taskid (autogenerated or custom) is added to queue._store._priorities, it never gets deleted. So as long as tasks are added to a prioritised queue, the queue is leaking few bytes at a time. Depending on the tasks frequency you might start leaking MBs in few minutes to hours.

So practically the problem is that the priorities are not cleared for finished or failed tasks.

Currently the workaround we have applied is related with manually deleting the task ids, whenever task_finish or task_failed are triggered with the respective id.

         this.mainQueue.on('task_finish', taskId => {
            delete this.mainQueue._store._priorities[taskId];
        });

        this.mainQueue.on('task_failed', (taskId, err) => {
            if (err) {
                console.error(err);
            } else {
                delete this.mainQueue._store._priorities[taskId];
            }
        });

@leanderlee I have verified as well that this memory leak is still there after 2 years 😿
@nmargaritis thanks for the workaround!

@m-shojaei There isn't any active development in this project apparently. However using the workaround above should resolve the problem. I personally switched the library we are using as our usecase could be served from async/priorityQueue

@nmargaritis Won't it be more correct to delete the priority in the task_failed event handler even when there's an error?

this.mainQueue.on('task_failed', (taskId, err) => {
    console.error(err);
    delete this.mainQueue._store._priorities[taskId];
});

Hi all,

Seem's having another memroy leak on _retries. Like store _priorities, "taskId" are never removed on _retries.

A quick workaround could be:

worker.on('end', function () {
    self.length -= Object.keys(batch).length;
    if (timeout) {
      clearTimeout(timeout);
    }
    var finishAndGetNext = function () {
      if (!self._connected) return;
      self._store.releaseLock(lockId, function (err) {
        if (err) {
          // If we cannot release the lock then retry
          return setTimeout(function () {
            finishAndGetNext();
          }, 1)
        }
        self._running--;
        taskIds.forEach(function (taskId) {
          if (self._workers[taskId] && !self._workers[taskId].active) {
            delete self._workers[taskId];
             // FIX MEMORY LEAK
            delete self._retries[taskId];
            delete self._store._priorities[taskId];
            // FIX MEMORY LEAK
          }
        });
        self._processNextAfterTimeout();
      })
    }
    if (self.afterProcessDelay) {
      setTimeout(function () {
        finishAndGetNext()
      }, self.afterProcessDelay);
    } else {
      self.setImmediate(function () {
        finishAndGetNext()
      })
    }
  })