googleapis/python-datastore

Cannot retry aborted transaction

c-ghigny opened this issue · 3 comments

Hello everyone,

I encounter the issue using following versions:

  • python: 3.7.9
  • pip version: 21.0.1
  • google-cloud-datastore: 2.1.0

When multiple transactions are targeting the same entity, one of them receive this error:

409 Aborted due to cross-transaction contention. This occurs when multiple transactions attempt to access the same data, requiring Firestore to abort at least one in order to enforce serializability.

When I retry that transaction, I expected the transaction to try again and eventually succeed, but that is not the case. When that transaction is retried, I get this error :

400 The referenced transaction has expired or is no longer valid.

I found a easy way to reproduce it, and tried to simplify it as much as I could:

from threading import Thread
from time import sleep

from google.api_core.retry import Retry
from google.cloud import datastore
from google.cloud.datastore import Entity, Transaction

client = datastore.Client()
key = client.key('concurrent', 'transaction')

# always retry, but output the error
RETRY = Retry(predicate=lambda err: True, on_error=lambda err: print(err))


# Transaction as a context doesn't have a "retry" parameter, even if it interface "Batch" which does
# We need to override its methods to inject "retry" parameter
class RetryableTransaction(Transaction):

    def commit(self, retry=None, timeout=None):
        super().commit(retry=RETRY, timeout=timeout)


# to simulate a conflict, we'll need threads that will be able to run concurrently on the same entity
class ConcurrentCreate(Thread):

    def run(self) -> None:
        with RetryableTransaction(client=client):
            # create the entity only if it doesn't exist : check first then create 
            entity = client.get(key=key)
            if entity:
                raise Exception('Already exist')
            else:
                sleep(3)  # so we are sure that the transactions are conflicting with each other
                entity = Entity(key=key)
                entity.update({'content': 'X'})
                client.put(entity)
        print("Created!")


if __name__ == '__main__':
    client.delete(key)  # so we can run that snippet as much as we want
    t1 = ConcurrentCreate()
    t2 = ConcurrentCreate()
    # launch thread concurrently so that they conflict with each other
    t1.start()
    t2.start()
    # wait for them to complete so that we can cleanup
    t1.join()
    t2.join()
    client.delete(key)

Expected result

Created! 
409 Aborted due to cross-transaction contention. This occurs when multiple transactions attempt to access the same data, requiring Firestore to abort at least one in order to enforce serializability.
Already exist Exception

Current result

409 Aborted due to cross-transaction contention. This occurs when multiple transactions attempt to access the same data, requiring Firestore to abort at least one in order to enforce serializability.
Created!
400 The referenced transaction has expired or is no longer valid.
400 The referenced transaction has expired or is no longer valid.
400 The referenced transaction has expired or is no longer valid.
400 The referenced transaction has expired or is no longer valid.
400 The referenced transaction has expired or is no longer valid.
400 The referenced transaction has expired or is no longer valid.
400 The referenced transaction has expired or is no longer valid.
400 The referenced transaction has expired or is no longer valid.
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/home/cghigny/git/data-ingest-core/venv/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 73, in error_remapped_callable
    return callable_(*args, **kwargs)
  File "/home/cghigny/git/data-ingest-core/venv/lib/python3.7/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/home/cghigny/git/data-ingest-core/venv/lib/python3.7/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INVALID_ARGUMENT
	details = "The referenced transaction has expired or is no longer valid."
	debug_error_string = "{"created":"@1623938803.098371160","description":"Error received from peer ipv4:216.58.211.202:443","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"The referenced transaction has expired or is no longer valid.","grpc_status":3}"
>

(basically it retries until the 120s default timeout is reached, then a RetryError goes through)

I tried a bunch of stuff like cleaning up my Transaction by calling rollback(), putting the status back to INITIALIZED, and recalling begin() on it between each try but nothing seems to work. And since the error is sent by the server, I cannot inspect the source code to look at why cause that error 400.

I also tried to not use transaction as a context, but creating it, and then working with begin() and commit() so that I don't have to "cheat" by overwriting commit() but that doesn't work either.

On my side, it looks like a bug, I would expect the transaction to be retried integrally, but maybe I am missing a key concept.

Kindly yours

On a side note, and on the same spirit as #3, it would be great if we could instantiate a Transaction with a retry and timeout parameters, so those parameters are used in the __enter__ and __exit__, allowing us to use retryable Transaction as a context

I think this is working as expected. The issue is once a transaction completes, another transaction that edited data, based on a document read before the other transaction was read cannot be trusted.

For example, let's use a classical example of a bank account. Imagine we have a bank account where two purchases overlap.

Process 1 Process 2 Balance (Firestore)
Read Balance -> $50 $50
Read Balance -> $50
Subtract $10
Subtract $5
Commit Transaction $40
Commit Transaction $45

Because the read itself could have been used to perform calculations, outside of a list of transactional changes, the transaction based on the second read can't really be used without re-reading the current data and retrying.

That said, it seems there may be a question in here that may be a bug: That we are, on the library side, retrying this at all.
400 The referenced transaction has expired or is no longer valid.
If we were to raise an exception in this case and allow you as the user to handle it sooner, would that make sense?

I see the issue now, I was expecting the Transaction context to work a little like the firestore.transactional() decorator, but in a context, code cannot be retried so the "calculations" cannot be tried again (opposed to a decorator that can retry the whole function).

Thinking about it, I realized that a Retry can be used as a decorator, so now I managed to make my usecase work:

from threading import Thread
from time import sleep

from google.api_core.exceptions import AlreadyExists
from google.api_core.retry import Retry
from google.cloud import datastore
from google.cloud.datastore import Entity, Transaction

client = datastore.Client()
key = client.key('transaction', 'concurrent')

RETRY = Retry(predicate=lambda err: not isinstance(err, AlreadyExists), on_error=lambda err: print(err))


@RETRY
def create():
    with Transaction(client=client):
        entity = client.get(key)
        if entity:
            raise AlreadyExists('')
        else:
            sleep(3)  # so we are sure that the transaction are conflicting each other
            entity = Entity(key=key)
            entity.update({'content': 'X'})
            client.put(entity)
    print("Created!")


if __name__ == '__main__':
    client.delete(key)
    Thread(target=create).start()
    Thread(target=create).start()

Concerning

"that may be a bug: That we are, on the library side, retrying this at all."

I don't think it is an issue since it was retrying as I configured my retry policy (I used always true). It just felt a little counter-intuitive to ask the Transaction to be retry, as in my mind, the Transaction encompassed every line of code in the with

Since everything is working as expected, let's close the issue ☺