Task queue memory leak
Closed this issue · 0 comments
jadsongmatos commented
This script that is trying to do a scan in an api, it uses a task queue with sqlite to save the queue however if it is filling the memory to the point of crash
import { promises as fs } from "fs";
import axios from "axios"
import Queue from "better-queue";
const SqliteStore = require("better-queue-sqlite");
// Constants
const QUEUE_STORE_CONFIG = {
type: "sql",
dialect: "sqlite",
path: "./queue2.db.sqlite",
};
const QUEUE_CONFIG = {
concurrent: 16,
store: new SqliteStore(QUEUE_STORE_CONFIG),
maxRetries: 3,
retryDelay: 1000,
};
const API_URL = "https://brasilapi.com.br/api/cep/v2/";
const MAX_CEP = 99999999;
const INITIAL_BATCH_SIZE = 100;
const QUEUE_THRESHOLD = 1000;
const CEP_FILE = "./ceps.bin";
// Function to read the ceps.bin file and get the biggest zip code
async function getStartingZip(): Promise<number> {
try {
const data = await fs.readFile(CEP_FILE);
let maxCep = 0;
for (let i = 0; i < data.length; i += 4) {
const cep = data.readUInt32LE(i);
if (cep > maxCep) {
maxCep = cep;
}
}
return maxCep;
} catch (error: any) {
if (error.code === "ENOENT") {
// File not found, return 0
return 0;
} else {
throw error;
}
}
}
// Function to wait for the queue size to decrease below the threshold
async function waitForQueueToDecrease(q: Queue, threshold: number): Promise<void> {
return new Promise((resolve) => {
const checkQueueSize = () => {
const stats = q.getStats();
const succeeded = stats.total * stats.successRate;
const failed = stats.total - succeeded;
const queueSize = stats.total - succeeded - failed;
if (queueSize < threshold) {
resolve();
} else {
setTimeout(checkQueueSize, 500);
}
};
checkQueueSize();
});
}
// Worker function for processing individual tasks
async function asyncWorker(cep: number, cb: any) {
const paddedCep = cep.toString().padStart(8, "0");
try {
const response = await axios(`${API_URL}${paddedCep}`);
// Successful request
if (response.status !== 404) {
const buffer = Buffer.alloc(4);
buffer.writeUInt32LE(cep, 0);
await fs.appendFile(CEP_FILE, buffer);
console.log(paddedCep);
}
cb(null, null);
} catch (error: any) {
// Handle errors
if (error.response && error.response.status === 404) {
cb(null, null);
} else {
console.error("Error fetching CEP:", error.code, cep);
cb(error);
}
}
}
// Main function to start the script
async function main() {
let cep = await getStartingZip();
console.log("Starting", cep);
let succeeded = 0;
let failed = 0;
// Create a new queue with the asyncWorker function
const q: Queue = new Queue(asyncWorker, QUEUE_CONFIG);
// Listen to the queue events for task completion
q.on("task_finish", async function () {
succeeded++;
const shouldEnqueueMore = (q.getStats().total - failed - succeeded) < INITIAL_BATCH_SIZE;
if (shouldEnqueueMore && !await enqueueNextBatch(q, INITIAL_BATCH_SIZE)) {
if (succeeded % INITIAL_BATCH_SIZE === 0) {
console.log("task_finish| succeeded,cep,failed", succeeded, cep, failed);
if (global.gc) {
global.gc()
}
}
}
});
// Listen to the queue events for task failure
q.on("task_failed", function () {
console.error("task_failed");
failed++;
});
// Function to enqueue a batch of tasks to the queue
async function enqueueNextBatch(q: Queue, batchSize: number): Promise<boolean> {
// Wait for the queue to decrease below threshold
await waitForQueueToDecrease(q, QUEUE_THRESHOLD);
const limit = Math.min(cep + batchSize, MAX_CEP + 1);
for (; cep < limit; cep++) {
q.push(cep);
}
return cep === MAX_CEP + 1;
}
// Enqueue the first batch of tasks
await enqueueNextBatch(q, INITIAL_BATCH_SIZE);
}
main().catch(console.error);