googleapis/python-ndb

"Might indicate a bad software design"

Opened this issue · 13 comments

There is a bit of code that reads:

        future = self.futures.get(key)
        if future:
            if self.todo[key] != value:
                # I don't think this is likely to happen. I'd like to know about it if
                # it does because that might indicate a bad software design.
                future = tasklets.Future()
                future.set_exception(
                    RuntimeError(
                        "Key has already been set in this batch: {}".format(key)
                    )
                )

Ok, we're letting you know. We have Python 3.9 code running in Appengine and are seeing "Key has already been set in this batch" often enough that I felt compelled to check this issue.
The expression in our code that triggers this is
ndb.get_multi(event.guestlists) and the stack trace below that is

File "/layers/google.python.pip/pip/lib/python3.9/site-packages/google/cloud/ndb/_options.py", line 89, in wrapper return wrapped(*pass_args, **kwargs)
File "/layers/google.python.pip/pip/lib/python3.9/site-packages/google/cloud/ndb/utils.py", line 153, in positional_wrapper return wrapped(*args, **kwds) File "/layers/google.python.pip/pip/lib/python3.9/site-packages/google/cloud/ndb/model.py", line 6427, in get_multi return [future.result() for future in futures]
File "/layers/google.python.pip/pip/lib/python3.9/site-packages/google/cloud/ndb/model.py", line 6427, in <listcomp> return [future.result() for future in futures]
File "/layers/google.python.pip/pip/lib/python3.9/site-packages/google/cloud/ndb/tasklets.py", line 214, in result self.check_success()
File "/layers/google.python.pip/pip/lib/python3.9/site-packages/google/cloud/ndb/tasklets.py", line 161, in check_success raise self._exception
File "/layers/google.python.pip/pip/lib/python3.9/site-packages/google/cloud/ndb/tasklets.py", line 334, in _advance_tasklet yielded = self.generator.throw(type(error), error, traceback)
File "/layers/google.python.pip/pip/lib/python3.9/site-packages/google/cloud/ndb/key.py", line 894, in get entity_pb = yield _datastore_api.lookup(self._key, _options)
File "/layers/google.python.pip/pip/lib/python3.9/site-packages/google/cloud/ndb/tasklets.py", line 334, in _advance_tasklet yielded = self.generator.throw(type(error), error, traceback)
File "/layers/google.python.pip/pip/lib/python3.9/site-packages/google/cloud/ndb/_datastore_api.py", line 150, in lookup lock = yield _cache.global_lock_for_read(cache_key, result)
File "/layers/google.python.pip/pip/lib/python3.9/site-packages/google/cloud/ndb/tasklets.py", line 334, in _advance_tasklet yielded = self.generator.throw(type(error), error, traceback)
File "/layers/google.python.pip/pip/lib/python3.9/site-packages/google/cloud/ndb/_cache.py", line 607, in global_lock_for_read lock_acquired = yield global_compare_and_swap(key, lock, expires=_LOCK_TIME)
File "/layers/google.python.pip/pip/lib/python3.9/site-packages/google/cloud/ndb/tasklets.py", line 334, in _advance_tasklet yielded = self.generator.throw(type(error), error, traceback)
File "/layers/google.python.pip/pip/lib/python3.9/site-packages/google/cloud/ndb/_cache.py", line 185, in wrapper result = yield function(key, *args, **kwargs) RuntimeError: Key has already been set in this batch: b'NDB30\n\x15\x12\x13evite-services-prod\x12,\n\nEventModel\x1a\x1e0175E4IF2RRZTMEEIEPMHZZKQTIFYA\x12\x18\n\rGuestListItem\x10\x80\x80\xe4\xe0\x88\xa6\xb4\x0b'

I tried calling ndb.get_multi(), explicitly passing in duplicate keys, and it didn't trigger this issue.

@mecheverri This is interesting. Is there a put happening during the same request that might hit the same key? Could it happen before the get?

I added some logging to better understand what is happening. It appears that within a request, there aren't any calls to put() the item whose key is mentioned in the RuntimeError.

Fortunately, on the chance that this exception might result from a race condition, I tried catching the exception and retrying the tasklet, and that seems to work.

I added some logging to better understand what is happening. It appears that within a request, there aren't any calls to put() the item whose key is mentioned in the RuntimeError.

Fortunately, on the chance that this exception might result from a race condition, I tried catching the exception and retrying the tasklet, and that seems to work.

@mecheverri

Okay, that's helpful. Does your logging give you any insight into what calls are happening in a particular request context? I'm not entirely sure how to reproduce this. Any pointers you could give would be very helpful.

We have a high volume of requests and this appears to occur at random, which suggested the race condition theory. It occurs infrequently enough that I'm not able to identify a pattern regarding its cause. I have seen this so far with two different stack traces.

Race condition is certainly a possibility. The resource that would have that condition is attached to the context, which shouldn't be shared across threads, and should generally be created per request. Would your code that creates the context be something you could share?

I have run into this while trying to finish porting a fairly large python 2.7 appengine project to python 3.x. The following was with python 2.7.18 and python-ndb 1.11.2.

I've been able to reproduce this consistently in a test case, using the _InProcessGlobalCache.

I haven't tested yet with newer versions yet.

from google.cloud import ndb

client = ndb.Client("test")


class Foo(ndb.Model):
    a = ndb.StringProperty()

with client.context(global_cache=ndb.global_cache._InProcessGlobalCache()):
    print(ndb.get_context().global_cache.cache)
    # => {}

    ent = Foo(a="Foo")
    ent.put()

    print(ndb.get_context().global_cache.cache)
    # => {}

    ent.put()

    print(ndb.get_context().global_cache.cache)
    # => {'NDB30\n\x06\x12\x04test\x12\x07\n\x03Foo\x10\x01': ('', 1675475589.391862)}
  
    #  ndb.get_context().global_cache.clear()
    ndb.get_context().cache.clear()

    try:
        ndb.get_multi([ent.key, ent.key])
        # => RuntimeError("Key has already been set in this batch: ...
    finally:
        print(ndb.get_context().global_cache.cache)
        # {'NDB30\n\x06\x12\x04test\x12\x07\n\x03Foo\x10\x01': ('\n\x11\n\x06\x12\x04test\x12\x07\n\x03Foo\x10\x01\x1a\x0b\n\x01a\x12\x06\x8a\x01\x03Foo', None)}

@chrisrossi any updates on this issue?

I don't work here anymore.

I think the issue here is we are creating a new lock on every get of a key. So for instance if I do something like this:

class SomeModel(ndb.Model):
    pass

@ndb.tasklet
def  temp():
    yield (
        SomeModel.get_by_id_async('temp'),
        SomeModel.get_by_id_async('temp'),
    )

ndb.synctasklet(temp)()

The first get_by_id will register a todo with value ID + uuid_1. The second will try to register a todo with value ID + uuid_2 and will fail on mismatch. I think the get should not use uuid append for acquiring lock. There is no issue with the above code since both will use a single future for getting the value. It makes sense to do this in writes but not in reads IMO.

We are having load of these error too, after upgrading from version 1.9.0 to 2.2.2
The error seems to occur only places where we rely heavily on tasklets. I really don't know how to do any debugging here.

We are having load of these error too, after upgrading from version 1.9.0 to 2.2.2 The error seems to occur only places where we rely heavily on tasklets. I really don't know how to do any debugging here.

@anoteboom what I ended up doing was to use two patches:
First for disabling this behavior for reads because honestly I don't see any need of this for the reads
Second to disable for writes but log a message so that flow can be fixed. Once all the flows were fixed, this patch was removed

We have these patches up on production for 3 months and we haven't found any bugs due to this. We were also able to fix all the cases where there were duplicate writes of the same entities using the error log
Following is the patch file I used:

from logging import error as log_error
from sys import modules

from google.cloud.datastore_v1.types.entity import Key
from google.cloud.ndb._cache import (
    _GlobalCacheSetBatch,
    global_compare_and_swap,
    global_set_if_not_exists,
    global_watch,
)
from google.cloud.ndb.tasklets import Future, Return, tasklet

GLOBAL_CACHE_KEY_PREFIX: bytes = modules["google.cloud.ndb._cache"]._PREFIX
LOCKED_FOR_READ: bytes = modules["google.cloud.ndb._cache"]._LOCKED_FOR_READ
LOCK_TIME: bytes = modules["google.cloud.ndb._cache"]._LOCK_TIME


@tasklet
def custom_global_lock_for_read(key: str, value: str):
    if value is not None:
        yield global_watch(key, value)
        lock_acquired = yield global_compare_and_swap(
            key, LOCKED_FOR_READ, expires=LOCK_TIME
        )
    else:
        lock_acquired = yield global_set_if_not_exists(
            key, LOCKED_FOR_READ, expires=LOCK_TIME
        )

    if lock_acquired:
        raise Return(LOCKED_FOR_READ)


##############################################################################################################################################
# This is a temporary patch to disable the hard check of not allowing multiple puts for the same key in a batch with different values. We    #
# are only logging the error here to be able to identify the areas where this issue exists. Once we have fixed all those cases, we will      #
# remove this patch.                                                                                                                         #
##############################################################################################################################################


def custom_global_cache_set_batch_add(self: _GlobalCacheSetBatch, key: bytes, value):
    future: Future = self.futures.get(key)
    if future:
        if self.todo[key] != value:
            key_protobuf = Key()
            key_protobuf._pb.ParseFromString(key.removeprefix(GLOBAL_CACHE_KEY_PREFIX))
            deserialized_key = ", ".join(
                [
                    line.strip(", ")
                    for line in str(key_protobuf.path).strip("[]").splitlines()
                ]
            )
            log_error("Key has already been set in this batch: '%s'", deserialized_key)
        else:
            return future

    future = future or Future()
    future.info = self.future_info(key, value)
    self.todo[key] = value
    self.futures[key] = future
    return future


modules["google.cloud.ndb._cache"].global_lock_for_read = custom_global_lock_for_read
setattr(
    modules["google.cloud.ndb._cache"]._GlobalCacheSetBatch,
    "add",
    custom_global_cache_set_batch_add,
)

@usmanmani1122 Thank you for that patchfile. This triggers me to look into this again. I'll share my experience

Confirmed! @usmanmani1122 your patch fixes this for me too. Thank you!