malthe/pq

connection pool exhausted

lukas1994 opened this issue · 3 comments

Hi,
I want to run multiple threads that work on jobs in a queue. If a job fails I want it to retry. This is my code:

pool = ThreadedConnectionPool(4, 4, connection_string)
pq = PQ(pool=pool)

try:
    pq.create()
    print("created queue table")
except:
    print("queue table already exists")

job_queue = pq["queue"]

def worker():
  while True:
    try:
      with job_queue:
        for job in job_queue:
          if job is None:
            print("job is None")
            break
          # do stuff

    except Exception as e:
        print(f"worker {current_thread()} died because: {str(e)} -- restarting...")


for i in range(4):
    Thread(target=worker).start()

However, I'm getting connection pool exhausted exceptions. My code probably throws exceptions but this ideally it would be able to retry without any issues.

I already looked at the tests but couldn't find the issue.

This does seem rather weird. Try using psql to introspect the active connections using e.g. select * from pg_stat_activity.

I ran \COPY (SELECT * from pg_stat_activity) TO 'pg_stat_activity.csv' CSV HEADER while my application was running (and the exceptions came in). Attached the output file.

pg_stat_activity.txt

Are the connections returned properly to the pool if my code throws an exception?