feature request: worker's job filter
Eomm opened this issue ยท 8 comments
In order to create a QOS system it would be great to support a filter
option by the work()
method.
This new parameter lets the user to write to a single queue/table and 2 different workers will be able to process the same queue at different pace:
- eg: queue
icecream
- worker1 consumes
icecream
with jobtaste: cioco
has a worker that process it withnewJobCheckInterval: 200
- worker1 consumes
icecream
with jobtaste: bubble
has a worker that process it withnewJobCheckInterval: 2000
โ๏ธ It is up to the user writing a filter that process all the jobs, otherwise the job with taste: lemon
will be archived automatically.
Example:
const PgBoss = require('pg-boss');
(async function () {
try {
await buildConsumer();
} catch (error) {
console.log({ globalErr: error.message });
}
})();
async function buildConsumer () {
const boss = new PgBoss({
user: 'postgres',
password: 'postgres',
noScheduling: true,
});
await boss.work(
queueName,
{
teamSize: 2,
newJobCheckInterval: 1000,
filter: { // ๐
jobFilter: `data ->> 'body' = $1`,
jobParams: ['body'],
}
},
executeJob
);
console.log('Waiting for jobs');
}
This should generate a query like this in the fetch function:
Line 353 in 523c36b
WITH nextJob as (
SELECT id
FROM pgboss.job j
WHERE state < 'active'
AND name LIKE $1
AND startAfter < now()
+ AND data ->> 'body' = $3
ORDER BY priority desc, createdOn, id
LIMIT $2
FOR UPDATE SKIP LOCKED
)
UPDATE pgboss.job j SET
state = 'active',
startedOn = now(),
retryCount = CASE WHEN state = 'retry' THEN retryCount + 1 ELSE retryCount END
FROM nextJob
WHERE j.id = nextJob.id
RETURNING j.id, name, data, EXTRACT(epoch FROM expireIn) as expire_in_seconds
Note:
- this query should land to the
executeSql
function when the user customizes thedb
too
What do you think?
You can already do this using wildcards. Set all workers to a wildcard by default, such as icecream.*
, then, you can replace the wildcard with a specific flavor
Sorry, I don't get it.
Does the producer that send the message must know the consumer's queue in di case?
My target would be having a silly producer that does not know how many consumers the BE has
Queue patterns use the * character to match 0 or more characters. For example, a job from queue status-report-12345 would be fetched with pattern status-report-* or even stat*5.
For example, a producer would use the flavor as part of the queue name, such as icecream.vanilla
and icecream.chocolate
. A consumer using work('icecream.*')
would get both flavors, but another consumer using work('icecream.vanilla')
would not get chocolate.
The proposed solution assumes that I have control over the producer - it is not the case ๐
Would you mind to accept a PR with such a feature in case?
Yes, sounds good
This would be useful. I didn't see a test case for work('icecream.*.toppings.none')
sort of queue layouts.