python/cpython

Calling `Multiprocessing.Queue.close()` too quickly causes intermittent failure (BrokenPipeError)

Closed this issue · 2 comments

BPO 35844
Nosy @maggyero

Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.

Show more details

GitHub fields:

assignee = None
closed_at = None
created_at = <Date 2019-01-28.20:28:52.696>
labels = ['3.7', 'library', 'docs']
title = 'Calling `Multiprocessing.Queue.close()` too quickly causes intermittent failure (BrokenPipeError)'
updated_at = <Date 2022-03-14.23:00:30.288>
user = 'https://bugs.python.org/charmonium'

bugs.python.org fields:

activity = <Date 2022-03-14.23:00:30.288>
actor = 'maggyero'
assignee = 'docs@python'
closed = False
closed_date = None
closer = None
components = ['Documentation', 'Library (Lib)']
creation = <Date 2019-01-28.20:28:52.696>
creator = 'charmonium'
dependencies = []
files = []
hgrepos = []
issue_num = 35844
keywords = []
message_count = 1.0
messages = ['334490']
nosy_count = 4.0
nosy_names = ['docs@python', 'maggyero', 'charmonium', 'jdogzz-g5']
pr_nums = []
priority = 'normal'
resolution = None
stage = None
status = 'open'
superseder = None
type = None
url = 'https://bugs.python.org/issue35844'
versions = ['Python 3.7']

If all processes try to close the Queue immediately after someone has written to it, this causes an error (see the link for more details). Uncommenting any of the time.sleeps makes it work consistently again.

    import multiprocessing
    import time
    import logging
    import multiprocessing.util
    multiprocessing.util.log_to_stderr(level=logging.DEBUG)
    
    queue = multiprocessing.Queue(maxsize=10)
    
    def worker(queue):
        queue.put('abcdefghijklmnop')
    
        # "Indicate that no more data will be put on this queue by the
        # current process." --Documentation
        # time.sleep(0.01)
        queue.close()
    
    proc = multiprocessing.Process(target=worker, args=(queue,))
    proc.start()
    
    # "Indicate that no more data will be put on this queue by the current
    # process." --Documentation
    # time.sleep(0.01)
    queue.close()
    
    proc.join()

Perhaps this is because I am not understanding the documentation correctly, but in that case I would contend this is a documentation bug.

    Traceback (most recent call last):
      File "/usr/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
        send_bytes(obj)
      File "/usr/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
        self._send_bytes(m[offset:offset + size])
      File "/usr/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
        self._send(header + buf)
      File "/usr/lib/python3.7/multiprocessing/connection.py", line 368, in _send
        n = write(self._handle, buf)
    BrokenPipeError: [Errno 32] Broken pipe

@JelleZijlstra, could you also close this issue now that it has been fixed in PR #31913?