pytorch/data

Race condition when prefetching forks

Opened this issue ยท 1 comments

falckt commented

๐Ÿ› Describe the bug

When running prefetch on multiple branches of a forked datapipe, it is possible to trigger a race condition.

import time

import torchdata.datapipes as dp
from torchdata.dataloader2 import DataLoader2, MultiProcessingReadingService


def expensive_op(item):
    time.sleep(1)
    return item


fork1, fork2 = (
    dp.iter.IterableWrapper(range(20))
    .map(expensive_op)
    .fork(2)
)
fork1 = fork1.prefetch(2)
fork2 = fork2.prefetch(2)

dl = DataLoader2(
    fork1.zip(fork2),
    reading_service=MultiProcessingReadingService(4),
)

for _ in dl:
    pass

Traceback

Traceback (most recent call last):
  File "/mwes/prefetch.py", line 22, in <module>
    for _ in dl:
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/dataloader2.py", line 53, in __next__
    next_val = next(self.dataloader._datapipe_iter)  # type: ignore[arg-type]
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 345, in __iter__
    response.exc.reraise()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/_utils.py", line 52, in reraise
    raise exception
ValueError: Caught ValueError in worker process 0.
Original Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 211, in DataPipeBehindQueues
    value = source_datapipe.nonblocking_next()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 103, in nonblocking_next
    return next(self._as_iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 624, in __iter__
    yield from zip(*iterators)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 154, in __next__
    return self._get_next()
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 142, in _get_next
    result = next(self.iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 196, in get_next_element_by_instance
    return_val = next(self._datapipe_iterator)  # type: ignore[arg-type]
ValueError: generator already executing
This exception is thrown by __iter__ of PrefetcherIterDataPipe(buffer_size=2, source_datapipe=_ChildDataPipe)

root@1a29f8e4775c:/workspace# python /mwes/prefetch.py
Traceback (most recent call last):
  File "/mwes/prefetch.py", line 22, in <module>
    for _ in dl:
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/dataloader2.py", line 53, in __next__
    next_val = next(self.dataloader._datapipe_iter)  # type: ignore[arg-type]
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 345, in __iter__
    response.exc.reraise()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/_utils.py", line 52, in reraise
    raise exception
ValueError: Caught ValueError in worker process 0.
Original Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 211, in DataPipeBehindQueues
    value = source_datapipe.nonblocking_next()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 103, in nonblocking_next
    return next(self._as_iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 624, in __iter__
    yield from zip(*iterators)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 154, in __next__
    return self._get_next()
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 142, in _get_next
    result = next(self.iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 196, in get_next_element_by_instance
    return_val = next(self._datapipe_iterator)  # type: ignore[arg-type]
ValueError: generator already executing
This exception is thrown by __iter__ of PrefetcherIterDataPipe(buffer_size=2, source_datapipe=_ChildDataPipe)

root@1a29f8e4775c:/workspace# python /mwes/prefetch.py
Traceback (most recent call last):
  File "/mwes/prefetch.py", line 22, in <module>
    for _ in dl:
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/dataloader2.py", line 53, in __next__
    next_val = next(self.dataloader._datapipe_iter)  # type: ignore[arg-type]
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 345, in __iter__
    response.exc.reraise()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/_utils.py", line 52, in reraise
    raise exception
ValueError: Caught ValueError in worker process 0.
Original Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 211, in DataPipeBehindQueues
    value = source_datapipe.nonblocking_next()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 103, in nonblocking_next
    return next(self._as_iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 624, in __iter__
    yield from zip(*iterators)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 154, in __next__
    return self._get_next()
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 142, in _get_next
    result = next(self.iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 196, in get_next_element_by_instance
    return_val = next(self._datapipe_iterator)  # type: ignore[arg-type]
ValueError: generator already executing
This exception is thrown by __iter__ of PrefetcherIterDataPipe(buffer_size=2, source_datapipe=_ChildDataPipe)

root@1a29f8e4775c:/workspace# python /mwes/prefetch.py
Traceback (most recent call last):
  File "/mwes/prefetch.py", line 28, in <module>
    for _ in dl:
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/dataloader2.py", line 53, in __next__
    next_val = next(self.dataloader._datapipe_iter)  # type: ignore[arg-type]
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 345, in __iter__
    response.exc.reraise()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/_utils.py", line 52, in reraise
    raise exception
ValueError: Caught ValueError in worker process 0.
Original Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 211, in DataPipeBehindQueues
    value = source_datapipe.nonblocking_next()
  File "/opt/conda/lib/python3.10/site-packages/torchdata/dataloader2/communication/iter.py", line 103, in nonblocking_next
    return next(self._as_iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 624, in __iter__
    yield from zip(*iterators)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 195, in wrap_generator
    response = gen.send(request)
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 102, in __iter__
    raise data
  File "/opt/conda/lib/python3.10/site-packages/torchdata/datapipes/iter/util/prefetcher.py", line 72, in thread_worker
    item = next(itr)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 154, in __next__
    return self._get_next()
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/_hook_iterator.py", line 142, in _get_next
    result = next(self.iterator)
  File "/opt/conda/lib/python3.10/site-packages/torch/utils/data/datapipes/iter/combining.py", line 196, in get_next_element_by_instance
    return_val = next(self._datapipe_iterator)  # type: ignore[arg-type]
ValueError: generator already executing
This exception is thrown by __iter__ of PrefetcherIterDataPipe(buffer_size=2, source_datapipe=_ChildDataPipe)

Versions

Test run in pytorch-nightly docker image. (Also fails on pytorch 2.0 with torchdata 0.6.)

PyTorch version: 2.1.0.dev20230514
Is debug build: False
CUDA used to build PyTorch: 11.7
ROCM used to build PyTorch: N/A

OS: Ubuntu 20.04.6 LTS (x86_64)
GCC version: Could not collect
Clang version: Could not collect
CMake version: version 3.22.1
Libc version: glibc-2.31

Python version: 3.10.11 (main, Apr 20 2023, 19:02:41) [GCC 11.2.0] (64-bit runtime)
Python platform: Linux-5.19.0-1024-aws-x86_64-with-glibc2.31
Is CUDA available: False
CUDA runtime version: No CUDA
CUDA_MODULE_LOADING set to: N/A
GPU models and configuration: No CUDA
Nvidia driver version: No CUDA
cuDNN version: No CUDA
HIP runtime version: N/A
MIOpen runtime version: N/A
Is XNNPACK available: True

CPU:
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
Address sizes: 48 bits physical, 48 bits virtual
CPU(s): 64
On-line CPU(s) list: 0-63
Thread(s) per core: 2
Core(s) per socket: 32
Socket(s): 1
NUMA node(s): 1
Vendor ID: AuthenticAMD
CPU family: 23
Model: 49
Model name: AMD EPYC 7R32
Stepping: 0
CPU MHz: 2799.972
BogoMIPS: 5599.94
Hypervisor vendor: KVM
Virtualization type: full
L1d cache: 1 MiB
L1i cache: 1 MiB
L2 cache: 16 MiB
L3 cache: 128 MiB
NUMA node0 CPU(s): 0-63
Vulnerability Itlb multihit: Not affected
Vulnerability L1tf: Not affected
Vulnerability Mds: Not affected
Vulnerability Meltdown: Not affected
Vulnerability Mmio stale data: Not affected
Vulnerability Retbleed: Mitigation; untrained return thunk; SMT enabled with STIBP protection
Vulnerability Spec store bypass: Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1: Mitigation; usercopy/swapgs barriers and __user pointer sanitization
Vulnerability Spectre v2: Mitigation; Retpolines, IBPB conditional, STIBP always-on, RSB filling, PBRSB-eIBRS Not affected
Vulnerability Srbds: Not affected
Vulnerability Tsx async abort: Not affected
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtscp lm constant_tsc rep_good nopl nonstop_tsc cpuid extd_apicid aperfmperf tsc_known_freq pni pclmulqdq ssse3 fma cx16 sse4_1 sse4_2 movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm cmp_legacy cr8_legacy abm sse4a misalignsse 3dnowprefetch topoext ssbd ibrs ibpb stibp vmmcall fsgsbase bmi1 avx2 smep bmi2 rdseed adx smap clflushopt clwb sha_ni xsaveopt xsavec xgetbv1 clzero xsaveerptr rdpru wbnoinvd arat npt nrip_save rdpid

Versions of relevant libraries:
[pip3] numpy==1.24.3
[pip3] torch==2.1.0.dev20230514
[pip3] torchaudio==2.1.0.dev20230514
[pip3] torchdata==0.7.0.dev20230514
[pip3] torchelastic==0.2.2
[pip3] torchtext==0.16.0.dev20230514
[pip3] torchvision==0.16.0.dev20230514
[pip3] triton==2.1.0
[conda] blas 1.0 mkl
[conda] mkl 2023.1.0 h6d00ec8_46342
[conda] mkl-service 2.4.0 py310h5eee18b_1
[conda] mkl_fft 1.3.6 py310h1128e8f_1
[conda] mkl_random 1.2.2 py310h1128e8f_1
[conda] numpy 1.24.3 py310h5f9d8c6_1
[conda] numpy-base 1.24.3 py310hb5e798b_1
[conda] pytorch 2.1.0.dev20230514 py3.10_cuda11.7_cudnn8.5.0_0 pytorch-nightly
[conda] pytorch-cuda 11.7 h778d358_5 pytorch-nightly
[conda] pytorch-mutex 1.0 cuda pytorch-nightly
[conda] torchaudio 2.1.0.dev20230514 py310_cu117 pytorch-nightly
[conda] torchdata 0.7.0.dev20230514 py310 pytorch-nightly
[conda] torchelastic 0.2.2 pypi_0 pypi
[conda] torchtext 0.16.0.dev20230514 py310 pytorch-nightly
[conda] torchtriton 2.1.0+7d1a95b046 py310 pytorch-nightly
[conda] torchvision 0.16.0.dev20230514 py310_cu117 pytorch-nightly

ejguan commented

Thanks for reporting it. I can confirm this racing condition from fork. To unblock your use case, can you pls do

dp = fork1.zip(fork2)
dp = dp.prefetch(2)

IMHO, It's generally caused by fork takes a single reference of the generator from prior DataPipe. And, it can be resolved by making _ForkerIterDataPipe.get_next_element_by_instance a normal function to return next value rather than generator function, and making _ChildDataPipe.__iter__ do a while-loop to call get_next_element_by_instance once per iteration.