Allow more than 1 dispatching process in MultiProcessReadingService
ben-davidson-6 opened this issue ยท 1 comments
๐ The feature
I would like to easily be able to create m
dispatching processes feeding n
worker processes. Currently you can only have a single dispatching process. In the example below, I would like to have pipe_0
and pipe_1
running in their own processes feeding a shared queue.
import os
from typing import Iterator
from torch.utils.data.datapipes.iter.sharding import SHARDING_PRIORITIES
from torchdata.dataloader2 import DataLoader2, MultiProcessingReadingService
from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import (
IterableWrapper,
IterDataPipe,
MultiplexerLongest,
ShardingRoundRobinDispatcher,
)
class Loader:
def __init__(self) -> None:
self.n_to_load = 4
def __iter__(self):
print("loader", os.getpid())
for k in range(self.n_to_load):
yield k
@functional_datapipe("times_ten")
class MyStuff(IterDataPipe):
def __init__(self, dp: IterDataPipe) -> None:
super().__init__()
self.dp = dp
def __iter__(self) -> Iterator:
print("doer", os.getpid())
for x in self.dp:
yield 10 * x
if __name__ == "__main__":
print("main", os.getpid())
pipe_0 = IterableWrapper(Loader())
pipe_0 = ShardingRoundRobinDispatcher(
pipe_0, sharding_group_filter=SHARDING_PRIORITIES.MULTIPROCESSING
)
pipe_1 = IterableWrapper(Loader())
pipe_1 = ShardingRoundRobinDispatcher(
pipe_1, sharding_group_filter=SHARDING_PRIORITIES.MULTIPROCESSING
)
pipe = MultiplexerLongest(pipe_0, pipe_1)
pipe = pipe.times_ten()
rs = MultiProcessingReadingService(num_workers=2)
dl = DataLoader2(pipe, reading_service=rs)
for k in dl:
print(k)
Motivation, pitch
This pattern is very useful when you need to load a large shard of data and do some preprocessing on it before applying more batch level augmentations/normalisations, eg you have
for big_unprocessed_blob in data:
processed_blob = processed(big_unprocessed_blob)
for single_example in processed_blob:
yield do_stuff_to_example(single_example)
I would like to have an easy way of doing
queue = start_loading_and_processing_blob(big_unprocessed_blob, num_workers=m)
pipe = DoStuffToExample(queue)
rs = MultiProcessingReadingService(num_workers=n)
dl = DataLoader2(pipe, reading_service=rs)
The main benefit is a performance win: There are cases where you need multiple dispatching processes to keep the gpu fed with data. Of course I can, and have written a version of this myself, but it is easy to get wrong, and there is a lot of boiler plate for handling the processes and the queues, and it would be great if it was all handled for you.
Alternatives
Option one using normal dataloader parallelism
If you try to do this within a normal dataloader worker the data will not be able to keep up. Eventually you hit an iteration where we need to read and process the big blob and that worker wont give us a batch and the gpu sits idle
Option two using a thread within normal dataloader workers
This is better but you ultimately hit the same problem as before, as it is tricky to control exactly when the thread will be working and when you need the worker to return a batch so you can end up processing the blob when you should be giving a batch, again meaning the gpu sits idle
Option three writing your own multiprocessing logic to output proccessed stuff to a queue which is consumed by an iterable
This works but you need to write this code. This is almost exactly what the existing ShardingRoundRobinDispatcher
provides, except if I write it myself I can add as many dispatcher workers as I want.
Option four existing ShardingRoundRobinDispatcher
Use ShardingRoundRobinDispatcher
this works but there are cases where a single dispatching process will not be able to keep up with the worker processes
Additional context
No response
+1, this would be very useful for my use-cases also.