Custom CursorIterator
letrec opened this issue · 2 comments
I'd like to implement my own CursorIterator, but I can't find a straightforward way of getting it done via asyncpg public interface.
Currently it's created by CursorFactory, which in turn is created by PreparedStatement.cursor.
I can see two options here:
- Introduce an additional parameter of CursorFactory type in PreparedStatement.cursor which would be None by default and None means use standard CursorFactory
- do the same in PreparedStatement.init
Which behaviors do you intend to implement in the custom class? Anything that just makes sense to integrate directly?
I implemented an iterator that kicks off a next fetch before yielding the result (see my concerns related to it here):
from asyncio import CancelledError, Task, create_task
from collections.abc import AsyncIterable
async def iter_fast[T](iterable: AsyncIterable[T]):
iterator = aiter(iterable)
pending: Task[T] | None = None
while True:
try:
if pending is None:
item = await anext(iterator)
else:
item = await pending
except StopAsyncIteration:
break
else:
pending = create_task(anext(iterator))
yield itemIt's not possible to use it as is given that current iterator fetches rows in batches, but yields individual rows.
So what I ended up doing is implemented an iterator that yields batches (it's basically a copy of CursorIterator), wrapped it into an iter_fast and then wrapped it one more time into an iterator that flattens batches into individual rows:
async def flatten[T](iterable: AsyncIterable[list[T]]) -> AsyncIterable[T]:
async for batch in iterable:
for item in batch:
yield itemSo the end results looks somewhat like this:
iterator = flatten(
iter_fast(
BatchIterator(conn, query, stmt._state, args, stmt._state.record_class, prefetch=15_000, timeout=timeout)))It yields a performance improvement, but not as impressive as I hoped.