nipreps/niworkflows

Handle file read/write race in parallel computing

Opened this issue · 3 comments

What happened?

Originally posted in PennLINC/xcp_d#1064 (comment)

When different nodes try to access the same file simultaneously (especially when run pipelines in parallel), the file could be inaccessible (for XCP-D use case).

What command did you use?

For XCP-D, AFAIK, the command used belongs to `DerivativesDataSink` class.

What version of the software are you running?

XCP-D 0.6.1

How are you running this software?

Singularity

Is your data BIDS valid?

Yes

Are you reusing any previously computed results?

No

Please copy and paste any relevant log output.

Node: xcpd_wf.single_subject_TJNU007N_wf.cifti_postprocess_0_wf.qc_report_wf.ds_qc_metadata
Working directory: /seastor/CAMP/tmp/xcpd_wf/single_subject_TJNU007N_wf/cifti_postprocess_0_wf/qc_report_wf/ds_qc_metadata

Node inputs:

acquisition = <undefined>
atlas = <undefined>
base_directory = /seastor/CAMP/derivatives/xcpd_no_gsr
ceagent = <undefined>
check_hdr = True
chunk = <undefined>
cohort = <undefined>
compress = <undefined>
data_dtype = <undefined>
datatype = <undefined>
den = <undefined>
desc = linc
direction = <undefined>
dismiss_entities = ['suffix', 'task', 'measure', 'mode', 'roi', 'atlas', 'tracksys', 'ceagent', 'modality', 'space', 'flip', 'fmap', 'staining', 'tracer', 'chunk', 'mt', 'den', 'hemi', 'inv', 'recording', 'res', 'cohort', 'sample', 'reconstruction', 'desc', 'session', 'part', 'echo', 'acquisition', 'from', 'datatype', 'direction', 'model', 'extension', 'scans', 'proc', 'subject', 'label', 'subset', 'to', 'run']
echo = <undefined>
extension = .json
flip = <undefined>
fmap = <undefined>
from = <undefined>
hemi = <undefined>
in_file = ['/seastor/CAMP/tmp/xcpd_wf/single_subject_TJNU007N_wf/cifti_postprocess_0_wf/qc_report_wf/qc_report/filtered_denoisedqc_bold.json']
inv = <undefined>
label = <undefined>
measure = <undefined>
meta_dict = <undefined>
modality = <undefined>
mode = <undefined>
model = <undefined>
mt = <undefined>
part = <undefined>
proc = <undefined>
reconstruction = <undefined>
recording = <undefined>
res = <undefined>
roi = <undefined>
run = <undefined>
sample = <undefined>
scans = <undefined>
session = <undefined>
source_file = ['/seastor/CAMP/derivatives/fmriprep/sub-TJNU007N/ses-1/func/sub-TJNU007N_ses-1_task-am_dir-PA_run-1_space-fsLR_den-91k_bold.dtseries.nii']
space = <undefined>
staining = <undefined>
subject = <undefined>
subset = <undefined>
suffix = qc
task = <undefined>
to = <undefined>
tracer = <undefined>
tracksys = <undefined>

Traceback (most recent call last):
  File "/usr/local/miniconda/lib/python3.10/site-packages/nipype/pipeline/plugins/multiproc.py", line 344, in _send_procs_to_workers
    self.procs[jobid].run(updatehash=updatehash)
  File "/usr/local/miniconda/lib/python3.10/site-packages/nipype/pipeline/engine/nodes.py", line 527, in run
    result = self._run_interface(execute=True)
  File "/usr/local/miniconda/lib/python3.10/site-packages/nipype/pipeline/engine/nodes.py", line 645, in _run_interface
    return self._run_command(execute)
  File "/usr/local/miniconda/lib/python3.10/site-packages/nipype/pipeline/engine/nodes.py", line 771, in _run_command
    raise NodeExecutionError(msg)
nipype.pipeline.engine.nodes.NodeExecutionError: Exception raised while executing Node ds_qc_metadata.

Traceback:
	Traceback (most recent call last):
	  File "/usr/local/miniconda/lib/python3.10/site-packages/nipype/interfaces/base/core.py", line 397, in run
	    runtime = self._run_interface(runtime)
	  File "/usr/local/miniconda/lib/python3.10/site-packages/niworkflows/interfaces/bids.py", line 732, in _run_interface
	    _copy_any(orig_file, str(out_file))
	  File "/usr/local/miniconda/lib/python3.10/site-packages/niworkflows/utils/misc.py", line 288, in _copy_any
	    copyfile(src, dst, copy=True, use_hardlink=True)
	  File "/usr/local/miniconda/lib/python3.10/site-packages/nipype/utils/filemanip.py", line 447, in copyfile
	    copyfile(
	  File "/usr/local/miniconda/lib/python3.10/site-packages/nipype/utils/filemanip.py", line 386, in copyfile
	    elif posixpath.samefile(newfile, originalfile):
	  File "/usr/local/miniconda/lib/python3.10/genericpath.py", line 100, in samefile
	    s1 = os.stat(f1)
	FileNotFoundError: [Errno 2] No such file or directory: '/seastor/CAMP/derivatives/xcpd_no_gsr/xcp_d/desc-linc_qc.json'

Additional information / screenshots

No response

I wonder if adding an overwrite option to DerivativesDataSink would be useful?

Generally it seems like a bad idea to aim two parallel jobs at the same output directory. Especially when working on networked filesystems, synchronization is uncertain at best.

Happy to consider a patch, but my experience is that these are massive time sinks that are better handled by using separate output directories and merging in a single process, post-run.

I might not be thinking about this the right way, but in my case (XCP-D), we warp atlases to the standard space and resolution used across runs (and potentially subjects), so we only want one copy of the warped+resampled atlas in the derivatives. I have this done in the single-subject workflow since that's where collect_data is called and I need the BOLD runs selected by collect_data to identify the space and resolution to warp the atlas to, but we expect the space and resolution to be consistent within and across subjects, so the files are written out to the same location.

Does that approach make sense to you?