TGSAI/mdio-python

SEGY import spawns multiprocessing recursion

Closed this issue ยท 7 comments

The following code will trigger a (near) endless recursion of subprocesses, causing all the conversions to be attempted in parallel, failing all of them:

(I am not using Dask)

import os
from mdio import segy_to_mdio

input_segy = 'C:\\Demo\\Data\\ST10010ZC11_PZ_PSDM_KIRCH_FAR_D.MIG_FIN.POST_STACK.3D.JS-017536.segy'

for level in [0, 0.01, 0.02, 1.0, 10.0]:
    compressed_mdio_file = os.path.join(os.path.splitext(input_segy)[0] + '_' + str(level) + '.mdio')
    segy_to_mdio(segy_path=input_segy,
                 mdio_path_or_buffer=compressed_mdio_file,
                 index_bytes=(189, 193),
                 index_names=('inline', 'crossline'),
                 compression_tolerance=level,
                 lossless=(level == 0))
    print(f'Converted {input_segy} @ {level} tolerance, to {compressed_mdio_file}')

print('Complete')

What I see is a constant looping of the following output:

C:\sources\mdio_test (main)
ฮป python mdio_test.py
Scanning SEG-Y for geometry attributes:   0%|                        | 0/6 [00:00<?, ?block/s]
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\spawn.py", line 125, in _main
    prepare(preparation_data)
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\spawn.py", line 236, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\spawn.py", line 287, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\runpy.py", line 265, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\runpy.py", line 97, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "C:\sources\nDBenchmark\mdio_test.py", line 9, in <module>
    segy_to_mdio(segy_path=input_segy,
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\mdio\converters\segy.py", line 177, in segy_to_mdio
    dimensions, index_headers = get_grid_plan(
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\mdio\segy\utilities.py", line 53, in get_grid_plan
    index_headers = parse_trace_headers(
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\mdio\segy\parsers.py", line 120, in parse_trace_headers
    with Pool(num_workers) as pool:
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\context.py", line 119, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild,
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\pool.py", line 212, in __init__
    self._repopulate_pool()
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\pool.py", line 303, in _repopulate_pool
    return self._repopulate_pool_static(self._ctx, self.Process,
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\pool.py", line 326, in _repopulate_pool_static
    w.start()
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\context.py", line 327, in _Popen
    return Popen(process_obj)
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\popen_spawn_win32.py", line 45, in __init__
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\spawn.py", line 154, in get_preparation_data
    _check_not_importing_main()
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\spawn.py", line 125, in _main
    prepare(preparation_data)
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\spawn.py", line 236, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\spawn.py", line 287, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\runpy.py", line 265, in run_path
    return _run_module_code(code, init_globals, run_name,
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\runpy.py", line 97, in _run_module_code
    _run_code(code, mod_globals, init_globals,
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "C:\sources\nDBenchmark\mdio_test.py", line 9, in <module>
    segy_to_mdio(segy_path=input_segy,
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\mdio\converters\segy.py", line 177, in segy_to_mdio
    dimensions, index_headers = get_grid_plan(
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\mdio\segy\utilities.py", line 53, in get_grid_plan
    index_headers = parse_trace_headers(
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\mdio\segy\parsers.py", line 120, in parse_trace_headers
    with Pool(num_workers) as pool:
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\context.py", line 119, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild,
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\pool.py", line 212, in __init__
    self._repopulate_pool()
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\pool.py", line 303, in _repopulate_pool
    return self._repopulate_pool_static(self._ctx, self.Process,
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\pool.py", line 326, in _repopulate_pool_static
    w.start()
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\context.py", line 327, in _Popen
    return Popen(process_obj)
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\popen_spawn_win32.py", line 45, in __init__
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\spawn.py", line 154, in get_preparation_data
    _check_not_importing_main()
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
srib commented

Thanks for reporting this issue, @mstormo!

Looks like you are on Windows operating system.

This SO issue recommends to run the snippet of the code within if __name__ == "__main__" guard. Can you please run within the if __name__ == "__main__" guard to see if it works?

This does work around the issue.

However, it's a highly unexpected requirement, given that my own code does not imply any multiprocessing, and the fact that an imported module will force this requirement may have significant implications in non-trivial systems, such as another module where you cannot enforce that the method of execution is isolated like this.

The Python process was forked over 30 times for this simple example.

Please note that Ctrl+C will not abort the process, since the other forked processes indeed continues in the background. So, the above suggested work-around only hides a larger issue.

  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\segyio\trace.py", line 50, in wrapindex                                                                                                            
Traceback (most recent call last):                                                                                                                                                                                           
                                                                                                                                                                                                                             
    if not 0 <= i < len(self):                                                                                                                                                                                               
    self.run()rs\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\process.py", line 315, in _bootstrap                                                                                                             
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\process.py", line 108, in run                                                                                                                    
                                                                                                                                                                                                                             
    self._target(*self._args, **self._kwargs)                                                                                                                                                                                
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\pool.py", line 125, in worker                                                                                                                    
    result = (True, func(*args, **kwds))                                                                                                                                                                                     
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\multiprocessing\pool.py", line 48, in mapstar                                                                                                                    
    return list(map(*args))                                                                                                                                                                                                  
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\mdio\segy\_workers.py", line 271, in trace_worker_map                                                                                              
    return trace_worker(*args)                                                                                                                                                                                               
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\mdio\segy\_workers.py", line 182, in trace_worker                                                                                                  
    data_array.set_basic_selection(                                                                                                                                                                                          
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\zarr\core.py", line 1448, in set_basic_selection                                                                                                   
    return self._set_basic_selection_nd(selection, value, fields=fields)                                                                                                                                                     
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\zarr\core.py", line 1748, in _set_basic_selection_nd                                                                                               
    self._set_selection(indexer, value, fields=fields)                                                                                                                                                                       
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\zarr\core.py", line 1820, in _set_selection                                                                                                        
    self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values,                                                                                                                                                      
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\zarr\core.py", line 2018, in _chunk_setitems                                                                                                       
    to_store = {k: self._encode_chunk(v) for k, v in cdatas.items()}                                                                                                                                                         
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\zarr\core.py", line 2018, in <dictcomp>                                                                                                            
    to_store = {k: self._encode_chunk(v) for k, v in cdatas.items()}                                                                                                                                                         
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\zarr\core.py", line 2194, in _encode_chunk                                                                                                         
    cdata = self._compressor.encode(chunk)                                                                                                                                                                                   
  File "C:\Users\mstormo\.pyenv\pyenv-win\versions\3.8.9\lib\site-packages\numcodecs\zfpy.py", line 70, in encode                                                                                                             
    return _zfpy.compress_numpy(                                                                                                                                                                                             
KeyboardInterrupt                                                                                                                                                                                                            
Ingesting SEG-Y in 24 chunks:  38%|โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ                           | 9/24 [00:23<00:11,  1.28block/s]

Correction, Ctrl+C kills the current batch of Python processes, forks a new set, which just hangs on 0% CPU usage.

srib commented

Thank you for your comments, @mstormo.

I don't think the if __name__ == "__main__" guard is unexpected. Documentation of multiprocessing recommends it as part of their programming guidelines.

Importing segy_to_mdio from mdio by default uses workers from https://github.com/TGSAI/mdio-python/blob/main/src/mdio/converters/segy.py#L19 which in turn imports multiprocessing.

The alternatives are as follows:

  1. We can run as single process on Windows which is less desirable.
  2. Allow multiprocessing as an option on Windows
  3. Specify the usage of guard in the documentation.

What do you suggest as a fix? Happy to accept a PR.

The unexpected statement was for MDIO as a "back box" for the end-user, with no mention of the required guard statement in the documentation or examples, while explicitly mentioning Dask for the purpose of parallel distributed processing.

Note that my example here was a trivial reproducible example just to illustrate the problem. In my own case, the implementation was in a plugin in a larger system, where the main system started several executions as a result. Rather convoluted to understand what was going on, given that I was not using subprocessees or the multiprocessing library anywhere in my own code.

As such, it's unexpected for the end-user, while not a surprise to you as the implementor, of course.

Given that the process forking makes the end-user lose control of the terminal (hangs on six idle sub-processes of Python), I think the only option is to run it as a single process by default on Windows, with an option to enable it, if desired. (Option 1 & 2, combined)

I expect the main-guard to be required on Linux too, so examples would need to be updated to indicate such if you keep it enabled by default.

Hi @mstormo, thanks for letting us know!

We will make updates to the documentation based on your feedback.

As @srib mentioned, when we use multiprocessing in Python, it almost always needs a main guard; by default, the ingestion uses multiprocessing (not Dask). The reading, writing, and export can use Dask if needed, but it's off by default. We have plans to Daskify the ingestion as well. We should clarify this for sure.