malthe/pq

DuplicatePreparedStatement error

oz123 opened this issue · 10 comments

oz123 commented

My application is based on peewee and bottle, I am getting the following error:

Traceback (most recent call last):
  File "/usr/src/app/venv/src/bottle/bottle.py", line 1005, in _handle
    out = route.call(**args)
  File "/usr/src/app/venv/src/bottle/bottle.py", line 2017, in wrapper
    rv = callback(*a, **ka)
  File "/run/app/apps/backend/views.py", line 33, in _enable_cors
    return fn(*args, **kwargs)
  File "/run/app/apps/backend/views.py", line 60, in register
    User.create(**request.json)
  File "/usr/src/app/venv/lib/python3.6/site-packages/peewee.py", line 6235, in create
    inst.save(force_insert=True)
  File "/usr/src/app/venv/lib/python3.6/site-packages/playhouse/signals.py", line 72, in save
    post_save.send(self, created=created)
  File "/usr/src/app/venv/lib/python3.6/site-packages/playhouse/signals.py", line 51, in send
    responses.append((r, r(sender, instance, *args, **kwargs)))
  File "/run/app/apps/backend/models.py", line 57, in on_entry_save
    queue.put(model_as_dict)
  File "/usr/src/app/venv/lib/python3.6/site-packages/pq/__init__.py", line 232, in put
    utc_format(expected_at) if expected_at is not None else None,
  File "/usr/src/app/venv/lib/python3.6/site-packages/pq/utils.py", line 69, in wrapper
    cursor.execute("PREPARE %s AS\n%s" % (name, query), d)
psycopg2.errors.DuplicatePreparedStatement: prepared statement "_put_item_queue_vat" already exists

Can you maybe explain how to avoid this?

It means that you're using the same connection in multiple threads. This is not supported even by the database driver.

Is something wrong in your connection pool setup?

oz123 commented

Thanks for replying. I am using a web application deployed with uwsgi. There are 4 worker threads.

The queue is created with the following code:

from playhouse.pool import PooledPostgresqlExtDatabase as PostgresqlDatabase
from psycopg2 import connect, errors
from pq import PQ

....

db = PostgresqlDatabase(CONFIG.postgres_db,
                        user=CONFIG.postgres_user,
                        password=CONFIG.postgres_password,
                        host=CONFIG.postgres_host, port=5432,
                        max_connections=32,
                        stale_timeout=300  # 5 minutes.
                       )


def get_queue(connection, name):
    """
    a PQ connection wrapper

    connection = connect(dbname=CONFIG.postgres_db,
                         user=CONFIG.postgres_user,
                         password=CONFIG.postgres_password,
                         host=CONFIG.postgres_host)

    using peewee we simply get the connection from the database instance.
    """

    pq = PQ(connection)

    # recreating the queue is, because peewee evolve does not take care
    # of this model
    try:
        pq.create()
    except errors.DuplicateTable:
        pass

    queue = pq[name]
    return queue

my_queue = get_queue(db.connection(), "default")

I am guessing, every time I call queue.put I need to pass a new connection. Thus avoiding
a global shared connection.
This will take some rethinking of how I combine pq with peewee. I would be happy to hear a suggestion.

If you have a module-global my_queue that you're using from multiple threads, then you have a problem because you're using just a single connection.

Instead, what you can do is to use a connection pool (passing it to the PQ constructor using keyword-argument pool). This will play nicely with your setup.

oz123 commented

Actually, I am using a connection pool. I just didn't pass it. Although I'm not sure peewee's connection pool does the job.
I still need to check this.

It does not but you should be able to write a simple wrapper for it.

oz123 commented

Sigh... More yack shaving 😃

Yep. Alternatively, use psycopg2 directly. You might not need to run everything through Peewee.

oz123 commented

Using psycopg2 directly seems tricky with uwsgi. I am reluctant to do this.

It seems the solution is simply using lazy-apps when running multiple workers with uWSGI..

Also, eventually, the application will be deployed to k8s. This means I can at the cost of more RAM have each replica member have one worker, thus avoiding the connection sharing.

Yeah that's not a bad idea. And to be honest, the memory overhead probably isn't all that big.

oz123 commented

Thanks for replying. I made some changes to my code and docker images, seems that the issue is gone.