saghul/python-fibers

fibers._cfibers.error: cannot switch to a Fiber on a different thread

geertj opened this issue · 10 comments

I'm having trouble with multi-threading and fibers. I don't really have a standalone reproducer yet at the moment, but I'm hoping you can help me troubleshoot a bit.

What I'm doing is to run two concurrent threads via pyuv.Loop.queue_work, in addition to my main thread. Each of the additional threads has two fibers: one fibers runs an event loop, and the other does some potentially blocking or CPU intensive work.

I've done a bit printf() debugging, and what I've found out so far is the following: I'm experiencing that when one thread exits, the other thread will create a new main fiber in update_current(). This will allocate a new Fiber object with a new thread_h attribute. This prevents me from switching to a different fiber that is running in the same OS thread (confirmed via the gettid() system call), but according to Fiber_func_switch() is not running in the same OS thread.

So far I haven't been able to determine yet why the new main fiber is created. The Python side seems to be OK, because PyThreadState_GetDict returns the same dictionary for which previously a main fiber was created already.

Do you have any ideas or suggestions? Also I'm not completely following the logic how _global_state works, especially with regards to threading. Maybe you could elaborate on that a bit?

Update: the bug seems to be that stacklet_switch() in do_switch() can return in a different thread. If it does, then stacklet__post_switch() should not unconditionally set _global_state.current. It should first preserve the current fiber for the previous thread state.

Patch below. I hope it is correct but it requires some thorough review. @saghul, this code is quite complex and it took me hours to get my head around it :) Any possibility of simplifying it? For example, could you do away with _global_state and store everything in the thread state? Or could you not depend on "the next thread" to restore current_fiber_key?

diff --git a/src/fibers.c b/src/fibers.c
index e4bf96a..15cfad0 100644
--- a/src/fibers.c
+++ b/src/fibers.c
@@ -326,6 +326,7 @@ stacklet__post_switch(stacklet_handle h)
 {
     Fiber *origin = _global_state.origin;
     Fiber *self = _global_state.destination;
+    Fiber *current = _global_state.current;
     PyObject *result = _global_state.value;

     ASSERT(h);
@@ -336,6 +337,11 @@ stacklet__post_switch(stacklet_handle h)

     self->stacklet_h = origin->stacklet_h;
     origin->stacklet_h = h;
+
+    if (PyThreadState_GetDict() != current->ts_dict) {
+        Py_INCREF(current);
+        PyDict_SetItem(current->ts_dict, current_fiber_key, current);
+    }
     _global_state.current = self;

     return result;

Hi @geertj, I haven't been able to look at this yet. I hope to do that soon. Do you have a reproducible test case?

Thanks for the patch, I'll have look!

Hi saghul, in the next couple of days I can give you a reproducer. It's one test in the Gruvi test suite but it's part of a change that is not yet ready to be committed. I'm pretty sure about the mechanism though

Extra points if you can make it happen just with the stdlib so we can add it to the test suite ;-)

A reproducer below. However for some reason it gives me a segfault rather than "cannot switch" error. And I get the segfault both with and without my patch. So I'm not 100% sure that it's the same bug...

If you make the sleep in thread1/fiber1 2 seconds instead of 1, so that ends after thread2/fiber1, you do not get a segfault.

import time
import threading
import fibers

def thread1():
    def fiber1():
        print('T1/F1: sleeping for 1 second')
        time.sleep(1)
    f1 = fibers.Fiber(fiber1)
    f1.switch()

def thread2():
    time.sleep(0.5)
    def fiber1():
        print('T2/F1: sleeping for 1 second')
        time.sleep(1)
        print('T2/F1: switch to T2/F2')
        f2.switch()
    def fiber2():
        print('in T2/F2!!')
    f1 = fibers.Fiber(fiber1)
    f2 = fibers.Fiber(fiber2)
    f1.switch()

t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)

t1.start()
t2.start()
t1.join()
t2.join()

@saghul any news on this? It's a pretty serious crash and multiple tests in Gruvi run into this.

Does the reproducer segfault for you as well?

Sorry, I had no time to look at it yet, hopefully I will in the next couple of days.

The fix for this landed in ebfaf72