pytorch/data

Allow more than 1 dispatching process in MultiProcessReadingService

ben-davidson-6 opened this issue ยท 1 comments

๐Ÿš€ The feature

I would like to easily be able to create m dispatching processes feeding n worker processes. Currently you can only have a single dispatching process. In the example below, I would like to have pipe_0 and pipe_1 running in their own processes feeding a shared queue.

import os
from typing import Iterator

from torch.utils.data.datapipes.iter.sharding import SHARDING_PRIORITIES
from torchdata.dataloader2 import DataLoader2, MultiProcessingReadingService
from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import (
    IterableWrapper,
    IterDataPipe,
    MultiplexerLongest,
    ShardingRoundRobinDispatcher,
)

class Loader:
    def __init__(self) -> None:
        self.n_to_load = 4

    def __iter__(self):
        print("loader", os.getpid())
        for k in range(self.n_to_load):
            yield k

@functional_datapipe("times_ten")
class MyStuff(IterDataPipe):
    def __init__(self, dp: IterDataPipe) -> None:
        super().__init__()
        self.dp = dp

    def __iter__(self) -> Iterator:
        print("doer", os.getpid())
        for x in self.dp:
            yield 10 * x

if __name__ == "__main__":
    print("main", os.getpid())
    pipe_0 = IterableWrapper(Loader())
    pipe_0 = ShardingRoundRobinDispatcher(
        pipe_0, sharding_group_filter=SHARDING_PRIORITIES.MULTIPROCESSING
    )
    pipe_1 = IterableWrapper(Loader())
    pipe_1 = ShardingRoundRobinDispatcher(
        pipe_1, sharding_group_filter=SHARDING_PRIORITIES.MULTIPROCESSING
    )
    pipe = MultiplexerLongest(pipe_0, pipe_1)
    pipe = pipe.times_ten()
    rs = MultiProcessingReadingService(num_workers=2)
    dl = DataLoader2(pipe, reading_service=rs)
    for k in dl:
        print(k)

Motivation, pitch

This pattern is very useful when you need to load a large shard of data and do some preprocessing on it before applying more batch level augmentations/normalisations, eg you have

for big_unprocessed_blob in data:
    processed_blob = processed(big_unprocessed_blob)
    for single_example in processed_blob:
         yield do_stuff_to_example(single_example)

I would like to have an easy way of doing

queue = start_loading_and_processing_blob(big_unprocessed_blob, num_workers=m)
pipe = DoStuffToExample(queue)
rs = MultiProcessingReadingService(num_workers=n)
dl = DataLoader2(pipe, reading_service=rs)

The main benefit is a performance win: There are cases where you need multiple dispatching processes to keep the gpu fed with data. Of course I can, and have written a version of this myself, but it is easy to get wrong, and there is a lot of boiler plate for handling the processes and the queues, and it would be great if it was all handled for you.

Alternatives

Option one using normal dataloader parallelism

If you try to do this within a normal dataloader worker the data will not be able to keep up. Eventually you hit an iteration where we need to read and process the big blob and that worker wont give us a batch and the gpu sits idle

Option two using a thread within normal dataloader workers

This is better but you ultimately hit the same problem as before, as it is tricky to control exactly when the thread will be working and when you need the worker to return a batch so you can end up processing the blob when you should be giving a batch, again meaning the gpu sits idle

Option three writing your own multiprocessing logic to output proccessed stuff to a queue which is consumed by an iterable

This works but you need to write this code. This is almost exactly what the existing ShardingRoundRobinDispatcher provides, except if I write it myself I can add as many dispatcher workers as I want.

Option four existing ShardingRoundRobinDispatcher

Use ShardingRoundRobinDispatcher this works but there are cases where a single dispatching process will not be able to keep up with the worker processes

Additional context

No response

+1, this would be very useful for my use-cases also.