stellar/django-polaris

poll_pending_deposits concurrency

Closed this issue · 4 comments

I'd like to poll deposits in parallel but it seems Polaris doesn't allow it.
I imagine that's the reason I get so many database is locked errors, but I have no concrete evidence yet.
Looking at this code section from polaris/management/commands/process_pending_deposits.py:

        pending_deposits = (
            Transaction.objects.filter(
                status__in=[
                    Transaction.STATUS.pending_user_transfer_start,
                    Transaction.STATUS.pending_external,
                ],
                kind=Transaction.KIND.deposit,
                pending_execution_attempt=False,
            )
            .select_related("asset")
            .select_for_update()
        )
        with django.db.transaction.atomic():
            ready_transactions = rri.poll_pending_deposits(pending_deposits)
            Transaction.objects.filter(
                id__in=[t.id for t in ready_transactions]
            ).update(pending_execution_attempt=True)

I can see that Polaris selects the items for update, then uses an atomic block to pass the queryset.
Buy if I try, for example, to create multiple threads inside rri.poll_pending_deposits, one to poll each deposit, and do transaction.save() inside that thread, I get some kind of deadlock, code just freezes in the save():

def poll_pending_deposits(
        self, pending_deposits: QuerySet, *args: List, **kwargs: Dict
    ) -> List[Transaction]:
        if pending_deposits.count() == 0:
            return []

        with ThreadPool(pending_deposits.count()) as pool:
            rets = pool.map(poll_pending_deposit, pending_deposits)
            pool.close()
            pool.join()

I might be doing it wrong, maybe I shouldn't be using threads, but since it's a management command, it's not really inside a worker process so it doesn't affect the web server.

How should I poll deposits in parallel?
Thanks

I tested with both SQLite and PostgreSQL. Both get a deadlock at save():

def poll_pending_deposit(transaction: Transaction):
    transaction.save()
    return None

Maybe process_pending_deposits shouldn't lock the entire queryset and shouldn't pass a queryset to the integration function.
Each transaction should be handled individually. We could maybe call the integration function asynchronously for each transaction.
I feel very limited not being able to fetch in parallel. Something that could take 1s takes 60s (60 transactions and 1 second to poll each).

Hi @yuriescl I didn't find the exact reason why you're getting deadlock errors but I suspect it has to do with using rows locked in one connection in another. Django's docs mention that it creates a different DB connection per thread, but the connection that collects the pending_deposits is locking the rows (or in SQLite's case, the entire table) so other connections may not be able to write to them. Depending on your DB's isolation level they may not be able to read either.

Maybe process_pending_deposits shouldn't lock the entire queryset and shouldn't pass a queryset to the integration function. Each transaction should be handled individually. We could maybe call the integration function asynchronously for each transaction.

I've considered doing this, but felt that changing existing integration functions to be async def's could potentially require significant refactoring of an anchor's implementation. For example, all DB queries would need to be wrapped in sync_to_async().

That doesn't mean that you cannot use async though. Polaris calls poll_pending_deposits() from a synchronous context by wrapping the function it's called from in a async_to_sync() call. You could still run each of your transaction polling jobs in the event loop for concurrent processing doing something like this:

import asyncio
from django.db.models import QuerySet
from asgiref.sync import async_to_sync

def poll_pending_deposits(pending_deposits: QuerySet):
    pending_deposits = list(pending_deposits)  # execute query
    poll_results = async_to_sync(poll_transactions)(pending_deposits)
    return [deposit for deposit, idx in enumerate(pending_deposits) if poll_results[idx] is True]
    

async def poll_transactions(transactions: List[Transaction]) -> List[bool]:
    return asyncio.gather(*[poll_transaction(t) for t in transactions], raise_exceptions=False)


async def poll_transaction(transaction: Transaction) -> bool:
    <polling code>
    return is_ready

Ok, I appreciate the suggestion, will fiddle around with async and see if I can get more speed on the polling. Thanks!