MagicStack/asyncpg

backpressure on LISTEN?

jrozentur opened this issue · 5 comments

  • asyncpg version: 0.18.3
  • PostgreSQL version: 9.5
  • Do you use a PostgreSQL SaaS? If so, which? Can you reproduce
    the issue with a local PostgreSQL install?
    :
  • Python version: 3.6
  • Platform:
  • Do you use pgbouncer?:
  • Did you install asyncpg with pip?:
  • If you built asyncpg locally, which version of Cython did you use?:
  • Can the issue be reproduced under both asyncio and
    uvloop?
    :

My application subscribes to postgres notifications and fans them out to subscribers using streaming http.

I see that notifications emerge in connection._process_notification, where they are dispatched via call_soon(), each time calling _call_listener(), which will synchronously call the callback that I specify in conn.add_listener(). In the callback, I insert a task into the queue for asynchronous processing...

I see a lot of tasks inserted this way, e.g. ~5000 before my application has a chance to process them. Therefore application memory has big unpredictable spikes, like +300k

Question: how do you apply back-pressure in this setup? is there a way to limit the number of postgres notifications that are being converted to tasks, to control overall number of in-flight tasks? Thanks!

In short, there's no way to apply backpressure on notifications sent by the server, since these are completely asynchronous at the protocol level. That said, it may be possible to optimize the high-volume case somewhat by buffering the notifications and using call_later on a wrapper that loops over the buffer instead of call_soon on every notification.

@elprans Is it possible to apply backpressure on the socket level somehow? For example, through .pause_reading() and .resume_reading() methods? Are they available through asynpg API?

For context, I'm trying to come up with a good API for LISTEN functionality in triopg library. The backpressure issue is the main problem. Since triopg is a thin wrapper around asyncpg, we are limited by its functionality.

For comparison, psycopg2 does support applying LISTEN backpressure, when used in asynchronous mode. It exposes the socket file descriptor via conn.fileno(). The descriptor can then be used by "is there data on this socket?" APIs, like wait_readable function from Trio:

async def get_events(channel, **kwargs):
    import psycopg2
    import psycopg2.extensions
    import trio
    conn = psycopg2.connect(**kwargs)
    conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
    try:
        with conn.cursor() as cur:
            cur.execute(f"LISTEN {channel};")

        while True:
            await trio.lowlevel.wait_readable(conn)
            conn.poll()
            while conn.notifies:
                event = conn.notifies.pop(0)
                yield event
    finally:
        conn.close()

via https://gitter.im/python-trio/general?at=5edf845b7c64f31f114cf02b

There's no public API, but you can get to the socket like this: conn._transport.get_extra_info('socket')

Hello, I am developing a queue system for PostgreSQL (with customizable job model, job history, sync/async support, bulk get/insert, LISTEN/NOTIFY optional support and more) that should hopefully go open-source soon and I think I ran into the same problem.

I use LISTEN/NOTIFY to tell me asap when something is available. I then need to use a complex query (the payload does not serve me any purpose here) to dequeue jobs (the query uses in particular FOR UPDATE and SKIP LOCKED and does not delete jobs but just marks them) similarly to the library pq and yield them asap to the user.

In my case I don't think I could use trio.lowlevel.wait_readable. It has no timeout and when I tried to wrap it with asyncio.wait_for e.g. await asyncio.wait_for(trio.lowlevel.wait_readable(socket), timeout=1) Python raised the following error : RuntimeError: must be called from async context.

Below is what I came up with. In my case I also have to deal with historic jobs i.e. things that happened before listening! It looks a little stupid but it works well even with many listeners. And it also does not use any additional library such as trio. I would be very thankful for any feedback or suggestion. I am a bit in over my head here 🥴.

class PostgresQueue:
    # many methods that I did not copy here
    # ...

    # ASYNCHRONOUS LISTEN
    # callback for asyncpg.connection.Connection.add_listerner(channel, callback)
    # since callbacks cannot be async we will have to wrap that in a task (see `_on_notify` below)
    async def _async_on_notify(self, *args, **kwargs):
        # the value we put does not matter, we will use `get` to block until we get something
        await self._async_listen_queue.put(1)  # asyncio.Queue.put

    def _on_notify(self, *args, **kwargs):
        asyncio.create_task(self._async_on_notify())

    async def _await_for_more_jobs(self, driver_connection, timeout=1) -> list:
        await driver_connection.add_listener(channel_name, callback=self._on_notify)
        try:
            notification = await asyncio.wait_for(self._async_listen_queue.get(), timeout=timeout)
            self._async_listen_queue.task_done()
            return True
        # the time lost by catching this should be negligeable enough compared to the rest (0.4s slower for 100.000
        # rounds where a timeout exception systematically occured vs no exceptions - 9.1 vs 9.5s)
        except asyncio.TimeoutError:
            return False
        finally:
            await driver_connection.remove_listener(channel_name, self._on_notify)

    async def aiterqueue(self, timeout=1):
        # I actually use sqlalchemy in my library so I am going to have to get the
        # lowest level connection for listening
        raw_connection = await self.connection.get_raw_connection()
        driver_connection:asyncpg.connection.Connection = raw_connection.connection.driver_connection
        while True:
            # FETCH HISTORICAL DATA (before we started to listen using method `_await_for_more_jobs`)
            # Postgres won't notify us about previous events
            jobs = await self.aget()
            if len(jobs) == 0:
                # if there are no more jobs, wait for the queue to notify us
                has_notifications = await self._await_for_more_jobs(driver_connection=driver_connection, timeout=timeout)
                if not has_notifications:
                    # this is a sort of heartbeat to the user so he can make decisions when no more jobs are available
                    yield []
                    # the control is returned to the user so we will have to use `aget`
                    # to fetch historical data again since we are not listening anymore
            else:
                yield jobs

For reference, here is how I achieved the same functionality in synchronous mode.

Synchronous listen
class PostgresQueue:

    # SYNCHRONOUS LISTEN
    def _wait_for_more_jobs(self, timeout=1) -> list:
        with self.connection.begin():
            self.connection.execute(text(f'LISTEN {channel_name};'))
            # wait for resource to receive some I/O
            select.select([self.connection.connection],[],[], timeout)
            self.connection.connection.poll()
            # we don't care about the notifications and their payloads,
            # just that jobs are available
            has_notifications = len(self.connection.connection.notifies) > 0
            self.connection.connection.notifies = []
            return has_notifications

    def iterqueue(self, timeout=1):
        while True:
            # FETCH HISTORICAL DATA (before we started to listen using method `_wait_for_more_jobs`)
            # Postgres won't notify us about previous events
            jobs = self.get()
            if len(jobs) == 0:
                # if there are no more jobs, wait for the queue to notify us
                has_notifications = self._wait_for_more_jobs(timeout=timeout)
                # case of no notifications after n seconds
                if not has_notifications:
                    # this is a sort of heartbeat to the user so he can make decisions when no more jobs are available
                    yield []
                    # the control is returned to the user so we will have to use `get`
                    # to fetch historical data again since we are not listening anymore
            else:
                yield jobs

EDIT: I worked with a mock and did not test this well enough, I updated the code... Sorry for that.