MagicStack/asyncpg

Custom CursorIterator

letrec opened this issue · 2 comments

letrec commented

I'd like to implement my own CursorIterator, but I can't find a straightforward way of getting it done via asyncpg public interface.
Currently it's created by CursorFactory, which in turn is created by PreparedStatement.cursor.
I can see two options here:

  • Introduce an additional parameter of CursorFactory type in PreparedStatement.cursor which would be None by default and None means use standard CursorFactory
  • do the same in PreparedStatement.init

Which behaviors do you intend to implement in the custom class? Anything that just makes sense to integrate directly?

letrec commented

I implemented an iterator that kicks off a next fetch before yielding the result (see my concerns related to it here):

from asyncio import CancelledError, Task, create_task
from collections.abc import AsyncIterable

async def iter_fast[T](iterable: AsyncIterable[T]):
    iterator = aiter(iterable)
    pending: Task[T] | None = None
    while True:
        try:
            if pending is None:
                item = await anext(iterator)
            else:
                item = await pending
        except StopAsyncIteration:
            break
        else:
            pending = create_task(anext(iterator))
            yield item

It's not possible to use it as is given that current iterator fetches rows in batches, but yields individual rows.
So what I ended up doing is implemented an iterator that yields batches (it's basically a copy of CursorIterator), wrapped it into an iter_fast and then wrapped it one more time into an iterator that flattens batches into individual rows:

async def flatten[T](iterable: AsyncIterable[list[T]]) -> AsyncIterable[T]:
    async for batch in iterable:
        for item in batch:
            yield item

So the end results looks somewhat like this:

iterator = flatten(
    iter_fast(
        BatchIterator(conn, query, stmt._state, args, stmt._state.record_class, prefetch=15_000, timeout=timeout)))

It yields a performance improvement, but not as impressive as I hoped.