Race condition in `AsyncResult.wait` and `Connection.serve`
notEvil opened this issue · 6 comments
This issue was previously discussed in #492 and recently brought up in #527.
Hi!
I believe there is a race condition which potentially creates a deadlock situation. Consider thread A and B, and the following sequence of operations (rpyc 5.3.1)
- A is serving the connection at
Line 438 in ba07bae
- B is sending a request and reaches
Line 47 in ba07bae
- B sees that the result didn't arrive yet and reaches
Line 428 in ba07bae
- A receives the result for B, releases the lock at
Line 445 in ba07bae
Line 408 in ba07bae
- B successfully acquires the lock and reaches
Line 438 in ba07bae
If there is no third party, B might wait indefinitely for something that A already received but didn't process in time (lost the race). I hope this is easy to follow and self-evident. Obviously, the probability hitting this should be low, but I did and was able to reproduce the issue reliably in the past.
I found a more concise solution:
diff --git a/rpyc/core/async_.py b/rpyc/core/async_.py
index 0af147d..cf6aef1 100644
--- a/rpyc/core/async_.py
+++ b/rpyc/core/async_.py
@@ -44,16 +44,19 @@ class AsyncResult(object):
"""Waits for the result to arrive. If the AsyncResult object has an
expiry set, and the result did not arrive within that timeout,
an :class:`AsyncResultTimeout` exception is raised"""
- while not (self._is_ready or self.expired):
+ while self._waiting():
# Serve the connection since we are not ready. Suppose
# the reply for our seq is served. The callback is this class
# so __call__ sets our obj and _is_ready to true.
- self._conn.serve(self._ttl)
+ self._conn.serve(self._ttl, condition=self._waiting)
# Check if we timed out before result was ready
if not self._is_ready:
raise AsyncResultTimeout("result expired")
+ def _waiting(self):
+ return not (self._is_ready or self.expired)
+
def add_callback(self, func):
"""Adds a callback to be invoked when the result arrives. The callback
function takes a single argument, which is the current AsyncResult
diff --git a/rpyc/core/protocol.py b/rpyc/core/protocol.py
index 69643c7..96e549a 100644
--- a/rpyc/core/protocol.py
+++ b/rpyc/core/protocol.py
@@ -260,7 +260,7 @@ class Connection(object):
return next(self._seqcounter)
def _send(self, msg, seq, args): # IO
- data = brine.dump((msg, seq, args))
+ data = brine.I1.pack(msg) + brine.dump((seq, args)) # see _dispatch
if self._bind_threads:
this_thread = self._get_thread()
data = brine.I8I8.pack(this_thread.id, this_thread._remote_thread_id) + data
@@ -392,8 +392,10 @@ class Connection(object):
self._config["logger"].debug(debug_msg.format(msg, seq))
def _dispatch(self, data): # serving---dispatch?
- msg, seq, args = brine.load(data)
+ msg, = brine.I1.unpack(data[:1]) # unpack just msg to reduce time to release
if msg == consts.MSG_REQUEST:
+ self._recvlock.release()
+ seq, args = brine.load(data[1:])
if self._bind_threads:
self._get_thread()._occupation_count += 1
self._dispatch_request(seq, args)
@@ -404,15 +406,19 @@ class Connection(object):
if this_thread._occupation_count == 0:
this_thread._remote_thread_id = UNBOUND_THREAD_ID
if msg == consts.MSG_REPLY:
+ seq, args = brine.load(data[1:])
obj = self._unbox(args)
self._seq_request_callback(msg, seq, False, obj)
+ self._recvlock.release() # late release
elif msg == consts.MSG_EXCEPTION:
+ self._recvlock.release()
+ seq, args = brine.load(data[1:])
obj = self._unbox_exc(args)
self._seq_request_callback(msg, seq, True, obj)
else:
raise ValueError(f"invalid message type: {msg!r}")
- def serve(self, timeout=1, wait_for_lock=True): # serving
+ def serve(self, timeout=1, wait_for_lock=True, condition=lambda: True): # serving
"""Serves a single request or reply that arrives within the given
time frame (default is 1 sec). Note that the dispatching of a request
might trigger multiple (nested) requests, thus this function may be
@@ -427,10 +433,17 @@ class Connection(object):
# Exit early if we cannot acquire the recvlock
if not self._recvlock.acquire(False):
if wait_for_lock:
+ if not condition(): # unlikely, but the result could've arrived and another thread could've won the race to acquire
+ return False
# Wait condition for recvlock release; recvlock is not underlying lock for condition
return self._recv_event.wait(timeout.timeleft())
else:
return False
+ if not condition(): # the result arrived and we won the race to acquire, unlucky
+ self._recvlock.release()
+ with self._recv_event:
+ self._recv_event.notify_all()
+ return False
# Assume the receive rlock is acquired and incremented
# We must release once BEFORE dispatch, dispatch any data, and THEN notify all (see issue #527 and #449)
try:
@@ -442,7 +455,6 @@ class Connection(object):
self.close() # sends close async request
raise
else:
- self._recvlock.release()
if data:
self._dispatch(data) # Dispatch will unbox, invoke callbacks, etc.
return True
A unit test may monkey patch brine.load
to hold the thread between release
and _seq_request_callback
while sending a second thread to win the race.
I found a way to reproduce some pesky issues (prevelant in 99c5abe as well).
[rpyc]$ RPYC_BIND_THREADS=true taskset -c 0 pyenv exec python -m unittest discover -v -k test_race -k test_refcount
True
test_asyncresult_race (tests.test_race.TestRace) ... ok
test_refcount (tests.test_refcount.TestRefcount) ... ok
----------------------------------------------------------------------
Ran 2 tests in 0.532s
OK
[rpyc]$ RPYC_BIND_THREADS=false taskset -c 0 pyenv exec python -m unittest discover -v -k test_race -k test_refcount
False
test_asyncresult_race (tests.test_race.TestRace) ... ok
test_refcount (tests.test_refcount.TestRefcount) ... ok
----------------------------------------------------------------------
Ran 2 tests in 0.479s
OK
I fixed the busy loop and corrected a logic error in caf8c1c. The idea would be, if a thread is able to acquire the lock for a short period of time, then we know at that point in time no other thread is receiving data. However, the boolean we used prior cannot provide such guarantees as easily---always we can get more reuse out of the lock compared to introducing another variable which at best mirrors the recvlock state.
Thanks again for the busy loop catch. Lmk if you notice any other improvements to be made.
Now I'm confused, what issues?
The idea would be, if a thread is able to acquire the lock for a short period of time, then we know at that point in time no other thread is receiving data. However, the boolean we used prior cannot provide such guarantees as easily---always we can get more reuse out of the lock compared to introducing another variable which at best mirrors the recvlock state.
Can you clarify? I don't see the connection between duration of lock ownership and another thread currently receiving. All access to the boolean is done while holding self._lock
, so its state is consistent with everything else under the locks umbrella. Also, recvlock is not used at all when using thread binding. Instead self._lock
is used for all purposes. Their purpose is very different, especially now that AsyncResult.wait
contains logic previously found in Connection.serve
.
Anyway, I went down a rabbit whole w/ too many changes at once. So, I reverted back to where your code was merged and started resolving bugs that impacted my ability to refactor.
RPYC_BIND_THREADS=false taskset -c 0 pyenv exec python -m unittest discover -v -k test_race -k test_refcount -k test_affinity -k test_deploy
RPYC_BIND_THREADS=true taskset -c 0 pyenv exec python -m unittest discover -v -k test_race -k test_refcount -k test_affinity -k test_deploy
I'd still like to refactor bind threads a bit to make things easier to follow and less brittle.
I added bind thread test to the CI/CD and your branch merge in is master + some fixes.
I was rushed after pushing fixes yesterday, and I'll try to recap the chaos of my commits/learning @notEvil .
Refactor rabbit hole and breaking changes
Passing the waiting function into serve seemed to introduce some accidental complexity that results in checking the state of AsyncResult
more than once. This got me started down the refactoring rabbit hole. When testing my refactor, I set BIND_THREADS
instead of RPYC_BIND_THREADS
, meaning the tests never ran using bind threads. Not to mention, I should've refactored in smaller chunks. I did not realize I broke bind threads until your comment. Oops!
Reverting back and finding more issues
Before trying to debug my attempted refactor, I checked out 99c5abe to validate that all the tests passed, but they did not. I decided to revert the changes since 99c5abe at commit 0e9f95a. After undoing the refactor/breaking-changes, I made some follow up commits to improve guard rails for bind threads and fix existing bugs.
- Fixed test_deploy to handle RPYC_BIND_THREADS from env—
SshMachine
was running w/ thread binding disabled while the tests had it enabled b/c environment variables are not passed by default. - e473682 and 5f41958 are improvements around connection close timing for locks. While the fix being addressed in those commits cause tests to fail, the bug prevents the interpreter from closing indefinitely due to a blocking acquire. I can no longer reproduce the issue after those commits. However, improvements/refactoring are needed for closing connections and lock handling... being unable to reproduce a race condition does not mean it is fixed.
- I added another test step that runs all of the unit tests with
RPYC_BIND_THREADS=true
until we make it the default behavior.
TODOs/Take-aways
- improvements/refactoring are needed for closing connections and lock handling
- consider a better design pattern for AsyncResult wait (e.g., if bind threads behavior is assumed, does it make more sense to have a worker thread dedicated to send/recv that uses events compared to the current implementation of polling in AsyncResult... how much accidental complexity is there due to not using dedicated threads for channel communication? previously this was not done due to not knowing which thread a message was intended for).
- Diagrams explaining RPyC data flow architecture as it relates to bind threads
Thanks for your time/contributions. I aim to make bind threads the default behavior eventually. Before it becomes the default behavior, I'd like to familiarize myself with it, optimize it, make it easier to follow, and document why/how it works.
Refactor rabbit hole and breaking changes
Very unfortunate
Before trying to debug my attempted refactor, I checked out 99c5abe to validate that all the tests passed, but they did not.
When I run the tests, I usually see some fails due to hardened SSH config on my system. So I might accidentally missed an issue or two (your 1. for instance). And sometimes the background threads don't shut down gracefully. Especially in the gevent test which leaves the test environment tainted for subsequent tests. That area definitely needs more thought!
consider a better design pattern for AsyncResult wait (e.g., if bind threads behavior is assumed, does it make more sense to have a worker thread dedicated to send/recv that uses events compared to the current implementation of polling in AsyncResult... how much accidental complexity is there due to not using dedicated threads for channel communication? previously this was not done due to not knowing which thread a message was intended for).
I don't think thats an option. A dedicated thread for communication would introduce thread switches which hurt performance. Also, there isn't that much added complexity I would say.
I aim to make bind threads the default behavior eventually. Before it becomes the default behavior, I'd like to familiarize myself with it, optimize it, make it easier to follow, and document why/how it works.
Great, if you need any information or find optimizations, let me know :)