natcap/taskgraph

Intermittent EOFError

emlys opened this issue · 8 comments

emlys commented

A simple taskgraph example intermittently raises an EOFError.

example.py:

import taskgraph

workspace_dir = '/Users/emily/Documents/task-workspace'
def func():
  return 1

if __name__ == '__main__':
  task_graph = taskgraph.TaskGraph(workspace_dir, 1)
  task = task_graph.add_task(func=func)
  task_graph.close()
  task_graph.join()

Running python example.py, you see the error in about half of runs. It doesn't happen if you omit the line task_graph.close().
Observed on 0.10.2, 0.10.3, and main; python 3.7.9, 3.8.6, and 3.9.1; n_workers > 0 only.

Full log:

DEBUG:taskgraph.Task:file:///Users/emily/Documents/task-workspace/taskgraph_data.db?mode=ro exists: True
DEBUG:taskgraph.Task:Starting logging worker
DEBUG:taskgraph.Task:multithreaded: UNNAMED TASK (0) sending to new task queue.
DEBUG:taskgraph.Task:sending task UNNAMED TASK (0) right away
DEBUG:taskgraph.Task:Closing taskgraph.
DEBUG:taskgraph.Task:taskgraph closed
DEBUG:taskgraph.Task:joining taskgraph
DEBUG:taskgraph.Task:attempting to join threads
DEBUG:taskgraph.Task:attempting to join task UNNAMED TASK (0)
DEBUG:taskgraph.Task:joining UNNAMED TASK (0) done executing: <threading.Event object at 0x7fce181e29d0>
DEBUG:taskgraph.Task:_call check if precalculated UNNAMED TASK (0)
DEBUG:taskgraph.Task:file_stat_list: []
DEBUG:taskgraph.Task:other_arguments: [[], {}]
DEBUG:taskgraph.Task:file:///Users/emily/Documents/task-workspace/taskgraph_data.db?mode=ro exists: True
DEBUG:taskgraph.Task:not precalculated, Task hash does not exist (UNNAMED TASK (0))
DEBUG:taskgraph.Task:is_precalculated full task info: Task object 140523144620688:

{'exception_object': None,
 'ignore_directories': True,
 'ignore_path_list': [],
 'priority': 0,
 'self._reexecution_info': {'args_clean': [],
                            'file_stat_list': [],
                            'func_name': 'func',
                            'kwargs_clean': {},
                            'other_arguments': [[], {}],
                            'source_code_hash': 'addc3cbb28a58e3d4031ffa1739e3ce9cef5db71'},
 'self._result': None,
 'target_path_list': [],
 'task_id_hash': 'c71e53c90dbd998d085586266ac3dd3455e5ee50',
 'task_name': 'UNNAMED TASK (0)',
 'task_reexecution_hash': '82159a969de011463785501915a1dc6506cd7ef6'}
DEBUG:taskgraph.Task:not precalculated UNNAMED TASK (0)
DEBUG:taskgraph.Task:apply_async for task UNNAMED TASK (0)
DEBUG:taskgraph.Task:successful run on task UNNAMED TASK (0)
DEBUG:taskgraph.Task:task UNNAMED TASK (0) is complete, checking to see if any dependent tasks can be executed now
DEBUG:taskgraph.Task:task UNNAMED TASK (0) done processing
DEBUG:taskgraph.Task:task UNNAMED TASK (0) was joined
DEBUG:taskgraph.Task:Invoking terminate. already terminated? False
DEBUG:taskgraph.Task:setting task done for UNNAMED TASK (0)
DEBUG:taskgraph.Task:_handle_logs_from_processes shutting down
DEBUG:taskgraph.Task:no tasks are pending and taskgraph closed, normally terminating executor <Thread(task_executor_0, started daemon 123145432936448)>.
DEBUG:taskgraph.Task:task executor shutting down
DEBUG:taskgraph.Task:shutting down workers
ERROR:taskgraph.Task:Expected an empty logging queue, but if the TaskGraph were being terminated it's possible this object is corrupt and we'd get a different kind of exception like EOF. In any case we should always stop trying to drain the queue in the case of an Exception.
Traceback (most recent call last):
  File "/Users/emily/taskgraph/taskgraph/Task.py", line 413, in __del__
    x = self._logging_queue.get_nowait()
  File "/Users/emily/opt/miniconda3/lib/python3.8/multiprocessing/queues.py", line 129, in get_nowait
    return self.get(False)
  File "/Users/emily/opt/miniconda3/lib/python3.8/multiprocessing/queues.py", line 111, in get
    res = self._recv_bytes()
  File "/Users/emily/opt/miniconda3/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/Users/emily/opt/miniconda3/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/Users/emily/opt/miniconda3/lib/python3.8/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError
DEBUG:taskgraph.Task:taskgraph terminated

I think the best sequence is join and then close, but I know from firsthand experience it's very easy to get that backwards. And maybe it's supposed to be forgiving by design?

emlys commented

I got the close then join from the taskgraph readme example, but there are other things in there that could use an update too! My understanding was that close says "no more tasks can go in this task graph" so I put it right after adding the tasks. I could definitely be confused about the intent there.

join does check if self._closed is true to then do a self._terminate(). That's not really helpful, but there it is!

It looks like from the error message right before the traceback, Rich is aware an EOF could be thrown from a corrupt object and is purposefully passing on it.

emlys commented

If close does need to go after join, should there be a check for that? I wouldn't expect it to corrupt the queue. @richpsharp is this expected?

Sorry for the confusion! I think you're right about the sequence, Emily.

mcejp commented

FYI, if close is called before join (as per the README) on Windows, the exception looks a bit different:

DEBUG:root:Calling self.task_graph.close()
DEBUG:taskgraph.Task:Closing taskgraph.
DEBUG:taskgraph.Task:taskgraph closed
DEBUG:root:Done self.task_graph.close()
DEBUG:root:Calling self.task_graph.join()
DEBUG:taskgraph.Task:joining taskgraph
DEBUG:taskgraph.Task:attempting to join threads
DEBUG:taskgraph.Task:attempting to join task UNNAMED TASK (0)
DEBUG:taskgraph.Task:joining UNNAMED TASK (0) done executing: <threading.Event object at 0x0C087E20>
DEBUG:taskgraph.Task:task UNNAMED TASK (0) was joined
DEBUG:taskgraph.Task:attempting to join task UNNAMED TASK (1)
DEBUG:taskgraph.Task:joining UNNAMED TASK (1) done executing: <threading.Event object at 0x0C0D9E98>
DEBUG:taskgraph.Task:successful run on task UNNAMED TASK (1)
DEBUG:taskgraph.Task:task UNNAMED TASK (1) is complete, checking to see if any dependent tasks can be executed now
DEBUG:taskgraph.Task:task UNNAMED TASK (1) done processing
DEBUG:taskgraph.Task:no tasks are pending and taskgraph closed, normally terminating executor <Thread(task_executor_3, started daemon 16888)>.
DEBUG:taskgraph.Task:task executor shutting down
DEBUG:taskgraph.Task:no tasks are pending and taskgraph closed, normally terminating executor <Thread(task_executor_1, started daemon 21220)>.
DEBUG:taskgraph.Task:task executor shutting down
DEBUG:taskgraph.Task:no tasks are pending and taskgraph closed, normally terminating executor <Thread(task_executor_2, started daemon 18424)>.
DEBUG:taskgraph.Task:task executor shutting down
DEBUG:taskgraph.Task:task UNNAMED TASK (1) was joined
DEBUG:taskgraph.Task:Invoking terminate. already terminated? False
DEBUG:taskgraph.Task:taskgraph is terminated, ending <Thread(task_executor_0, started daemon 27316)>
DEBUG:taskgraph.Task:task executor shutting down
DEBUG:taskgraph.Task:setting task done for UNNAMED TASK (0)
DEBUG:taskgraph.Task:setting task done for UNNAMED TASK (1)
DEBUG:taskgraph.Task:_handle_logs_from_processes shutting down
DEBUG:root:Done self.task_graph.join()
DEBUG:taskgraph.Task:shutting down workers
ERROR:taskgraph.Task:Expected an empty logging queue, but if the TaskGraph were being terminated it's possible this object is corrupt and we'd get a different kind of exception like EOF. In any case we should always stop trying to drain the queue in the case of an Exception.
Traceback (most recent call last):
  File "C:\usr\Python38-32\lib\site-packages\taskgraph\Task.py", line 413, in __del__
    x = self._logging_queue.get_nowait()
  File "C:\usr\Python38-32\lib\multiprocessing\queues.py", line 129, in get_nowait
    return self.get(False)
  File "C:\usr\Python38-32\lib\multiprocessing\queues.py", line 109, in get
    elif not self._poll():
  File "C:\usr\Python38-32\lib\multiprocessing\connection.py", line 257, in poll
    return self._poll(timeout)
  File "C:\usr\Python38-32\lib\multiprocessing\connection.py", line 328, in _poll
    _winapi.PeekNamedPipe(self._handle)[0] != 0):
BrokenPipeError: [WinError 109] The pipe has been ended
DEBUG:taskgraph.Task:taskgraph terminated

Hi all, @emlys, a BrokenPipeError is unexpected and sloppy. I'll fix this!