diamondio/better-queue

Task queue memory leak

Closed this issue · 0 comments

This script that is trying to do a scan in an api, it uses a task queue with sqlite to save the queue however if it is filling the memory to the point of crash

import { promises as fs } from "fs";
import axios from "axios"
import Queue from "better-queue";
const SqliteStore = require("better-queue-sqlite");

// Constants
const QUEUE_STORE_CONFIG = {
  type: "sql",
  dialect: "sqlite",
  path: "./queue2.db.sqlite",
};
const QUEUE_CONFIG = {
  concurrent: 16,
  store: new SqliteStore(QUEUE_STORE_CONFIG),
  maxRetries: 3,
  retryDelay: 1000,
};
const API_URL = "https://brasilapi.com.br/api/cep/v2/";
const MAX_CEP = 99999999;
const INITIAL_BATCH_SIZE = 100;
const QUEUE_THRESHOLD = 1000;
const CEP_FILE = "./ceps.bin";

// Function to read the ceps.bin file and get the biggest zip code
async function getStartingZip(): Promise<number> {
  try {
    const data = await fs.readFile(CEP_FILE);
    let maxCep = 0;

    for (let i = 0; i < data.length; i += 4) {
      const cep = data.readUInt32LE(i);
      if (cep > maxCep) {
        maxCep = cep;
      }
    }

    return maxCep;
  } catch (error: any) {
    if (error.code === "ENOENT") {
      // File not found, return 0
      return 0;
    } else {
      throw error;
    }
  }
}

// Function to wait for the queue size to decrease below the threshold
async function waitForQueueToDecrease(q: Queue, threshold: number): Promise<void> {
  return new Promise((resolve) => {
    const checkQueueSize = () => {
      const stats = q.getStats();
      const succeeded = stats.total * stats.successRate;
      const failed = stats.total - succeeded;
      const queueSize = stats.total - succeeded - failed;
      if (queueSize < threshold) {
        resolve();
      } else {
        setTimeout(checkQueueSize, 500);
      }
    };
    checkQueueSize();
  });
}

// Worker function for processing individual tasks
async function asyncWorker(cep: number, cb: any) {
  const paddedCep = cep.toString().padStart(8, "0");

  try {
    const response = await axios(`${API_URL}${paddedCep}`);

    // Successful request
    if (response.status !== 404) {
      const buffer = Buffer.alloc(4);
      buffer.writeUInt32LE(cep, 0);
      await fs.appendFile(CEP_FILE, buffer);
      console.log(paddedCep);
    }

    cb(null, null);
  } catch (error: any) {
    // Handle errors
    if (error.response && error.response.status === 404) {
      cb(null, null);
    } else {
      console.error("Error fetching CEP:", error.code, cep);
      cb(error);
    }
  }
}

// Main function to start the script
async function main() {
  let cep = await getStartingZip();
  console.log("Starting", cep);

  let succeeded = 0;
  let failed = 0;

  // Create a new queue with the asyncWorker function
  const q: Queue = new Queue(asyncWorker, QUEUE_CONFIG);

  // Listen to the queue events for task completion
  q.on("task_finish", async function () {
    succeeded++;

    const shouldEnqueueMore = (q.getStats().total - failed - succeeded) < INITIAL_BATCH_SIZE;
    if (shouldEnqueueMore && !await enqueueNextBatch(q, INITIAL_BATCH_SIZE)) {
      if (succeeded % INITIAL_BATCH_SIZE === 0) {
        console.log("task_finish| succeeded,cep,failed", succeeded, cep, failed);
        if (global.gc) {
          global.gc()
        }
      }
    }
  });

  // Listen to the queue events for task failure
  q.on("task_failed", function () {
    console.error("task_failed");
    failed++;
  });

  // Function to enqueue a batch of tasks to the queue
  async function enqueueNextBatch(q: Queue, batchSize: number): Promise<boolean> {
    // Wait for the queue to decrease below threshold
    await waitForQueueToDecrease(q, QUEUE_THRESHOLD);

    const limit = Math.min(cep + batchSize, MAX_CEP + 1);
    for (; cep < limit; cep++) {
      q.push(cep);
    }

    return cep === MAX_CEP + 1;
  }

  // Enqueue the first batch of tasks
  await enqueueNextBatch(q, INITIAL_BATCH_SIZE);
}

main().catch(console.error);