timgit/pg-boss

feature request: worker's job filter

Eomm opened this issue ยท 8 comments

Eomm commented

In order to create a QOS system it would be great to support a filter option by the work() method.

This new parameter lets the user to write to a single queue/table and 2 different workers will be able to process the same queue at different pace:

  • eg: queue icecream
  • worker1 consumes icecream with job taste: cioco has a worker that process it with newJobCheckInterval: 200
  • worker1 consumes icecream with job taste: bubble has a worker that process it with newJobCheckInterval: 2000

โ—๏ธ It is up to the user writing a filter that process all the jobs, otherwise the job with taste: lemon will be archived automatically.

Example:

const PgBoss = require('pg-boss');

(async function () {
  try {
    await buildConsumer();
  } catch (error) {
    console.log({ globalErr: error.message });
  }
})();

async function buildConsumer () {
  const boss = new PgBoss({
    user: 'postgres',
    password: 'postgres',
    noScheduling: true,
  });

  await boss.work(
    queueName,
    {
      teamSize: 2,
      newJobCheckInterval: 1000,
      filter: { // ๐Ÿš€
        jobFilter: `data ->> 'body' = $1`,
        jobParams: ['body'],
      }
    },
    executeJob
  );

  console.log('Waiting for jobs');
}

This should generate a query like this in the fetch function:

function fetchNextJob (schema) {

    WITH nextJob as (
      SELECT id
      FROM pgboss.job j
      WHERE state < 'active'
        AND name LIKE $1
        AND startAfter < now()
+        AND data ->> 'body' = $3
      ORDER BY priority desc, createdOn, id
      LIMIT $2
      FOR UPDATE SKIP LOCKED
    )
    UPDATE pgboss.job j SET
      state = 'active',
      startedOn = now(),
      retryCount = CASE WHEN state = 'retry' THEN retryCount + 1 ELSE retryCount END
    FROM nextJob
    WHERE j.id = nextJob.id
    RETURNING j.id, name, data, EXTRACT(epoch FROM expireIn) as expire_in_seconds

Note:

  • this query should land to the executeSql function when the user customizes the db too

What do you think?

timgit commented

You can already do this using wildcards. Set all workers to a wildcard by default, such as icecream.*, then, you can replace the wildcard with a specific flavor

Eomm commented

Sorry, I don't get it.
Does the producer that send the message must know the consumer's queue in di case?

My target would be having a silly producer that does not know how many consumers the BE has

timgit commented

Queue patterns use the * character to match 0 or more characters. For example, a job from queue status-report-12345 would be fetched with pattern status-report-* or even stat*5.

For example, a producer would use the flavor as part of the queue name, such as icecream.vanilla and icecream.chocolate. A consumer using work('icecream.*') would get both flavors, but another consumer using work('icecream.vanilla') would not get chocolate.

Eomm commented

The proposed solution assumes that I have control over the producer - it is not the case ๐Ÿ˜ž

Eomm commented

Would you mind to accept a PR with such a feature in case?

timgit commented

Yes, sounds good

This would be useful. I didn't see a test case for work('icecream.*.toppings.none') sort of queue layouts.