malthe/pq

pq as a "jobs queue" - items can not be read concurrently when using transactional get

elawdio opened this issue · 19 comments

The use-case: running multiple threads that pull an item from the queue and handle it concurrently. In addition, in case of a failure (of any kind) in the handling logic, the item should be returned to the queue and be available in the next iteration (by opening a new transaction for each pull).

I realized that my jobs are running synchronously and that all threads are blocked when one thread works on an item, and they are released only after it finishes to handle the item and releases the transaction.

I believe it happens because there's an "UPDATE" SQL statement in "_pull_item" method in "pq/init.py" which probably locks the table until the transaction is released.

Any ideas on how to make the instances run concurrently?

Code example:

import time
from datetime import datetime
from threading import Thread, get_ident
from typing import Callable
from pq import PQ, Queue
from psycopg2 import connect


def _create_queue_instance(queue_name: str):
    conn = connect('<CONN_STRING>')
    pq = PQ(conn)
    queue = pq[queue_name]
    return queue


def _job(queue: Queue, handler: Callable):
    while True:
        try:
            with queue:
                item = queue.get()
                if not item:
                    continue

                print(f'[{datetime.utcnow()}] {get_ident()} (thread) | {queue.name} (queue) | Starting to handle an item')
                handler(item.data)
                print(f'[{datetime.utcnow()}] {get_ident()} (thread) | {queue.name} (queue) | Done')
        except:
            print(f'An exception was thrown while handling an item for queue: {queue.name}')


def _simple_handler(item):
    print(item)
    print('sleeping for 10 seconds')
    time.sleep(10)


def main():
    # Inserting some items
    q = _create_queue_instance('test_queue')
    for i in range(5):
        q.put(f'item-{i}')

    # Starting the job instances
    threads = []
    for _ in range(5):
        q = _create_queue_instance('test_queue')
        t = Thread(target=_job,
                   args=(q, _simple_handler),
                   daemon=True)
        t.start()
        threads.append(t)

    # Blocking the app from ending.
    for t in threads:
        t.join()


if __name__ == '__main__':
    main()

The output I get:

[2019-03-31 12:45:26.840991] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-4
sleeping for 10 seconds
[2019-03-31 12:45:36.845939] 123145582637056 (thread) | test_queue (queue) | Done
[2019-03-31 12:45:36.854240] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-3
sleeping for 10 seconds
[2019-03-31 12:45:46.858404] 123145582637056 (thread) | test_queue (queue) | Done
[2019-03-31 12:45:46.866138] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-2
sleeping for 10 seconds
[2019-03-31 12:45:56.871011] 123145582637056 (thread) | test_queue (queue) | Done
[2019-03-31 12:45:56.878916] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-1
sleeping for 10 seconds
[2019-03-31 12:46:06.882754] 123145582637056 (thread) | test_queue (queue) | Done
[2019-03-31 12:46:06.891333] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-0
sleeping for 10 seconds
[2019-03-31 12:46:16.893778] 123145582637056 (thread) | test_queue (queue) | Done
stas commented

This looks more like a question, @elawdio please check the testcase for an example how to handle the jobs in parallel:
https://github.com/malthe/pq/blob/master/pq/tests.py#L326

Also consider using a connection pooler if you're running jobs in parallel (examples also available in the tests).

Thanks for the reply :)
I went through the testcase that you mentioned, but I don't see how it relates to my use case.
The testcase iterates through the queue under one single transaction, so if there's a failure - it will affect all of the items that were pulled during the transaction.
In my use case, I want to open a separate transaction for each item so in case of a failure, only that item will be returned to the queue.

Regarding the connection pooler, it will not make a difference as I establish a new fresh connection with Postgres for each thread. (see the usage of '_create_queue_instance' in my code example)

Am I still missing something?

There is no table lock. In fact, the pg library uses advisory locks. In theory, one transaction finds the next queue item and holds an advisory lock on just that item until the transaction ends. But last time I checked, the library has some rough edges on transaction management:

  • Documentation could be better
  • There are some warts in the API

Exactly, before I started to use this library, I was glad to see that it uses advisory locks, and this is why I was surprised by this code behavior.

Do you have an idea of what can be done in my implementation / pq implementation to make it work?
If it has something to do with pq, I'll be glad to contribute a new version based on your guidance.

Back when #19 was reported, I had a new look at how the whole thing is set up and tried to implement changes that would better support the presumably common scenario where you pull out an item from the queue, make other database queries in the same transaction and either commit or abort the transaction.

But I struggled to get the test cases to work out and ran out of time.

To be honest, I didn't completely understand from your last comment if I can solve my problem with the current version of pq.
It seems like it can not be done, am I right?

I understand why #19 can be complex, but do you think that my specific problem can be solved easily? If so, as I mentioned before - I'll be glad to help.

@stas, why did you add LIMIT 2 here: 362681e – ?

It looks like that would limit the possible concurrency to 2.

Thanks for helping me investigating this!

Why would that limit the concurrency to 2? Correct me if I am wrong but the change to LIMIT 2 was made in the 'put' method and not in the 'get'.

The problem that I am raising relates to being able to execute 'get' multiple times concurrently (in multiple threads) where each 'get' executed inside a separated transaction (with queue: ... handle_item_logic(queue.get()) ...), without blocking each other.

@elawdio – using SKIP LOCKED (a newer feature from PostgreSQL 9.5+), this seems to work correctly (see https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/).

Your test script now runs as expected. Can you confirm?

It works amazingly! Thank you very much!
When do you think it can be merged so I can use it as a package?

stas commented

Hmm,
@elawdio, I'm still not clear on what you're trying to achieve here. Please correct me if I'm missing out something from below...

First of all, I don't know why you'd want to place a queue that's blocking into a thread with a DB connection which is shared with a bunch of other threads. This is clearly not safe and it seems like that is the reason why your first thread is blocking the other threads (based on your logs, the get_ident() is the same).

Another question that I have is how and why you're fetching the items from the queue. The Queue.get() method, will block by default (and yes, your DB connection will be in use, this is how LISTEN works and It doesn't seem to have anything to do with the transactions), if you want to reuse your connection, consider Queue.get(block=False), you're in an infinite loop anyway.

One more thing, when you do with queue: you're starting a transaction as well. Just removing that, will give different results.


A bit of context, we've been running pq in production scaling over multiple processes with dozens of threads and had no issues around transactions being blocked. My advise would be to follow strictly the best practices re parallelism when designing your queue (as in, use a connections pool, use a separate connections pool inside the job handlers and so on...). What worked for us, is to wrap every queue into a separate process and let it manage a set of threads. Obviously you can use processes all-over, but that will cost you an expensive connection to every process (also more resources and stress on your DB).

stas commented

@stas, why did you add LIMIT 2 here: 362681e – ?

The two items are used to calculate the next_timeout, this way we can look ahead and say, don't bother running another query for this amount of time, since you won't get anything.

@stas – he's not sharing the connection, each thread opens its own connection.

Which means that there's something incorrect about our tests because they didn't pick this up.

Hi @stas, as @malthe just mentioned,

  1. I am not sharing 1 DB connection string among all threads, I open a new connection with the DB for each thread (please refer to '_create_queue_instance' function and its usage in my code example).
    So, why should the first thread block the others?

  2. Although my code example doesn't use queue.get(block=False), it shouldn't block because LISTEN is not enabled specifically in my dev environment. But anyways, as mentioned in the previous clause I use a different connection for each thread.

  3. I need the with queue:so in case of failure (of any kind) the item will be returned to the queue.
    e.g: while handling an item the proccess is shutdown - by using with queue: the item will not be lost.

@stas – re LIMIT 2 – gotcha. That's also supported by the code in this PR.

stas commented

Apologies, indeed, I missed the part where connection is initiated.

I started looking at the locks in the running database. And it looks like there's a race condition or something. There are 28 locks with the query RELEASE SAVEPOINT pq. I still belive that with queue: is the issue here.

Take a look at this query while running your example:

SELECT 
  datname,
  relation,
  mode,
  granted,
  query,
  state,
  pg_locks.pid
FROM pg_stat_activity
JOIN pg_locks on pg_locks.pid = pg_stat_activity.pid
WHERE state != 'idle'
AND query NOT LIKE '% FROM pg_stat_activity %' 
;

It would be great if we could find a technical explanation of the current use-case. Though again, I doubt running pq in such a setup makes any sense (one connection per queue in a thread). Threads are just a pain to manage.

@stas, well truth be told, that's the whole point of pq – to be used in a concurrent scenario.

Can I kindly ask you when are you going to release a new version of pq including the skip locked change?

This has now been fixed as is available in release 1.7.0.