DeNA/HandyRL

Multiple Ctrl+C is needed to finish every process and thread in the early phase

Closed this issue · 3 comments

It has been a little bothering me for a long time, but most of the time when I try to stop it with Ctrl+C early in the training process, it doesn't stop at once.

It seems that it's stopping at the following Queue.get():

^CTraceback (most recent call last):
  File "/Users/ohto/myrepo/HandyRL/handyrl/train.py", line 644, in run
    self.server()
  File "/Users/ohto/myrepo/HandyRL/handyrl/train.py", line 568, in server
    conn, (req, data) = self.worker.recv()
  File "/Users/ohto/myrepo/HandyRL/handyrl/connection.py", line 221, in recv
    return self.input_queue.get()
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/queue.py", line 171, in get
    self.not_empty.wait()
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/threading.py", line 312, in wait
    waiter.acquire()
KeyboardInterrupt

However, setting timeout did not affect this.
develop...YuriCat:experiment/ctrl_c_queue_timeout

I had the same problem. Subprocess can‘t stop while master process finished.
The main reason is that conn.recv() will wait forever and value in subprocess can't work. I found the subprocess will copy(or like this) a method from the master process to its own, so the unshared param in the master process has no influence in subprocess(like self.shutdown_flag) any more.

Add a timeout check and use multiprocessing.Value work for me.

class Batcher:
    def __init__():
    # original version
    # self.shutdown_flag = False
    # fixed
    self.shutdown_flag = multiprocessing.Value("b", False)

    def _worker(self, conn, bid):
        print('started batcher %d' % bid)
        while not self.shutdown_flag.value:
            # original version 
            # episodes = conn.recv()
            # batch = make_batch(episodes)
            # conn.send(batch)
            # fixed
            while conn.poll(timeout=0.8):
                episodes = conn.recv()
                batch = make_batch(episodes)
                conn.send(batch)
           if self.shutdown_flag.value:
                break
        print('finished batcher %d' % bid)

    def shutdown(self):
        # original version
        # self.shutdown_flag = True
        # fixed
        self.shutdown_flag.value = True

@baichii Thank you for your report and great idea!

We are currently trying this PR #307.
In this PR, all threads are started with daemon=True and then easily shutdown after the first Ctrl+C.
In addition, we can also finish every process after training until configured epochs in config.yaml.

As you pointed out, how to stop processes inside the Batcher is also an important issue.
We have set daemon=True in this PR, but using multiprocessing.Value is also a great idea to be considered.

Merged #307.