pescadores/pescador

add pytorch example

faroit opened this issue · 17 comments

As more and more people use pytorch now, I wonder if we can have a pytorch example to use with pescador?

I fact, I already tried a few things....

Lets assume the infamous (among audio research) scenario of randomly sampling of small excerpts from longer audio tracks and we want to see all the data once for every epoch in random order:

import numpy as np
import pescador

# define audio tracks as (nb_samples, nb_features)
tracks = [np.random.random((track_length, 1)) for i in range(nb_tracks)]

# yield excerpts from audio tracks
def excerpt_gen(data):
    for i in range(0, data.shape[0] - excerpt_length, excerpt_hop):
        yield dict(X=data[i:i+excerpt_length, :])

# set up track streamers
streams = [pescador.Streamer(excerpt_gen, track) for track in tracks]
# randomly sample from streamers
mux = pescador.StochasticMux(streams, nb_tracks, rate=None, mode='exhaustive')
buffered_sample_gen = pescador.buffer_stream(mux, batch_size)

# iterate over data
for batch in buffered_sample_gen:
    print(batch['X'].mean())

While this would obviously directly work with pytorch by feeding in batches of data, I wonder if we could leverage the pytorch dataset and dataloader classes to simplify the code and maybe utilize pytorchs internal parallelisation within the dataloader.

It turns out pytorch does allow to override the Sampler and BatchSampler (See here) methods for their dataloader. But since they are all based on indices in your dataset class, using pescador for this wouldn't be exactly elegant (or I just miss the point).

For now, I came up with the following that works and yields the same batches as the vanilla example above. It works by extending the dataset class so that the dataloader provides just more samples and ignores the index.

import numpy as np
import pescador
import torch.utils.data

# define audio tracks as (nb_samples, nb_features)
tracks = [np.random.random((track_length, 1)) for i in range(nb_tracks)]

class TrackData(torch.utils.data.Dataset):
    def __init__(self, tracks):
        self.tracks = tracks
        self.streams = [pescador.Streamer(excerpt_gen, fn) for fn in tracks]
        self.mux = pescador.tuples(
            pescador.StochasticMux(
                self.streams, nb_tracks, rate=None, mode='exhaustive'
            ),
            'X', 'X'
        )

    def __len__(self):
        return len(self.tracks) * (track_length // excerpt_hop) - excerpt_length

    def __iter__(self):
        return self.mux.iterate()

    def __getitem__(self, idx):
        return next(self.mux)

def excerpt_gen(data):
    for i in range(0, data.shape[0] - excerpt_length, excerpt_hop):
        yield dict(X=data[i:i+excerpt_length, :])

dataset = TrackData(tracks)

train_loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=batch_size
)
for batch, (X, y) in enumerate(train_loader):
    print(X.mean())

I would love to hear your feedback on this and of course I would be happy to make a PR once we agreed on an elegant solution.

First, thanks for implementing this! I haven't spent much time with pytorch yet, but the above all makes sense from what I've read in the api docs.

The index thing is a real problem, and a point which I suspect that I'd disagree with the pytorch devs on. (TLDR: I prefer iid sampling with replacement to epoch/index-based training.)

I don't see a great solution beyond ignoring the index right now. One potential solution down the road would be to use coroutines for generation, so that the index for the next sample can be passed back into the streamer. This would require a pretty severe rewrite of pesc, but it might be worth considering as part of #30.

Maybe @cjacoby has thoughts?

Okay, several thoughts on this, after looking around pytorch a bit more and thinking about this some.

  • I think this is probably a good solution for an example, at least an example which matches with pytorch style; I generally agree with what @bmcfee said.

  • A thought on DataLoaders from pytorch: the general function of them seems to be to provide easy separate-thread / multi-process data loading. With the current pescador, if you decided you don't care too much about multiprocessing, you could use a ZMQStreamer to push the streaming to another thread, in the same way, and then just use pescador directly. pytorch doesn't (if I understand correctly, which maybe I don't) seem to require you to use their data loaders; you still ultimately pass the batches directly to the training function. Therefore, I think what I would do here is:

# randomly sample from streamers
mux = pescador.StochasticMux(streams, nb_tracks, rate=None, mode='exhaustive')
buffered_sample_gen = pescador.buffer_stream(mux, batch_size)
buffered_zmq = pescador.ZMQStreamer(buffered_sample_gen)

# iterate over data
for batch in buffered_zmq:
    print(batch['X'].mean())
    #you can still train with this batch
     model(batch['X'])  # not sure exactly what goes here but you get the idea
  • I think this does +1=> #30 though, as that is currently the only or at least best path forward we have to implementing this sort of feature (multiprocessing/threading, specifically) in pescador, and personally, I'd rather use pescador for this than the pytorch dataloaders ;). Keras has more or less the same behavior built into the fit_generator and fit functions.

@cjacoby thanks for your input.

I thought about this a bit more and tried many things. In the end I agree: You loose a lot of flexibility by using the pytorch Dataset class. And it makes more sense to just train in batches. On the downside, I found that using the pescador buffered_zmq is a lot slower than random sampling using the pytorch samplers. So I guess #30 could make some difference here. In any case, I will add some comparisons/benchmark here later and keep this open for now.

@bmcfee The remaining question is, do you want to advertise pescador + pytorch = ❤️ now or just wait once one of the many (1, 2, 3, 4, 5) PyTorch high level api packages will be ready for prime time.

On the downside, I found that using the pescador buffered_zmq is a lot slower than random sampling using the pytorch samplers.

This I suspect is due more to the inherent slowness of buffering (invoking many data copies), though zmq overhead can also hurt a bit. Is it much worse to use the unbuffered zmq stream and let pytorch handle buffering? This is how i typically do it with keras, and it seems to work pretty well.

The remaining question is, do you want to advertise pescador + pytorch = now

It seems a bit premature, eh? I haven't looked at any of the other packages you mentioned -- do they all manage data streaming, or are they more keras-like in functionality?

I now had some more time to dig into pytorch and got nice results together with pescador.

@bmcfee:

haven't looked at any of the other packages you mentioned -- do they all manage data streaming, or are they more keras-like in functionality?

the most promising one (also developed under the pytorch umbrella) is ingnite but it doesn't come with any convenient way to consume generators, though it supports pytorch dataloaders.

I'd say we wait a few more months till 1.0 appears at the horizon and evaluate again.

For now, I essentially followed @cjacoby advice to stick with a for loop style training and not used pytorchs dataloader classes for obvious reasons. That seems like the best option until some of the high level packages become more popular. I think the issue can be closes till then.

actually an IterableDataset is currently being developed for PyTorch. That would finally allow to more elegantly hook in pescador, so it would make sense to wait for this to be merged and added to PyTorch. I will leave this issue open till then and provide an a example once that has happened.

back with some tests on pytorch 1.2 (its available as nightly build). The IterableDataset works as expected and allows to use pytorchs dataloader for batching.

import numpy as np
import pescador
import torch

def excerpt_gen(track, excerpt_length, excerpt_hop):
    for i in range(0, track.shape[0] - excerpt_length, excerpt_hop):
        yield track[i:i+excerpt_length, :], track[i:i+excerpt_length, :]


class TrackChunksDataset(torch.utils.data.IterableDataset):
    def __init__(
        self,
        nb_tracks=100,
        track_length=1000,
        excerpt_length=100,
        excerpt_hop=100
    ):
        tracks = (np.random.random((track_length, 1)) for i in range(nb_tracks))
        streams = [
            pescador.Streamer(
                excerpt_gen, track, excerpt_length, excerpt_hop
            ) for track in tracks
        ]
        self.mux = pescador.StochasticMux(
            streams, nb_tracks, rate=None, mode='exhaustive'
        )

    def __iter__(self):
        return self.mux.iterate()


dataset = TrackChunksDataset()

train_loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=16,
    num_workers=0
)
for x, y in train_loader:
    x.mean()
    y.mean()

Increasing the number of workers does obviously not speed up the sampling since pytorch copies the datasets to the different workers. They provide a way to retrieve the worker_id to then split the data for each worker.

@bmcfee you have an idea what would be a good example for the pescador gallery for such a splitting?

@bmcfee you have an idea what would be a good example for the pescador gallery for such a splitting?

What if excerpt_gen took a filename instead of the decoded audio? The workers would then load the data from a file and yield it out. Then it's at least only copying data in one direction instead of both. This is one of the things that our ZMQStreamer is supposed to make easier, by sharing views of numpy arrays between colocated processes rather than serializing the whole thing. I guess pytorch's splitter doesn't work like that yet?

What if excerpt_gen took a filename instead of the decoded audio? The workers would then load the data from a file and yield it out. Then it's at least only copying data in one direction instead of both.

yes, thats what I would do in practice. I can modify the example.

This is one of the things that our ZMQStreamer is supposed to make easier, by sharing views of numpy arrays between colocated processes rather than serializing the whole thing. I guess pytorch's splitter doesn't work like that yet?

Sorry, I didn't get this part. To parallelize the loading I would just split the tracks list by worker_id. How does ZMQStreamer make this easier (or faster)?

To parallelize the loading I would just split the tracks list by worker_id. How does ZMQStreamer make this easier (or faster)?

I don't think these would necessarily be compatible -- they're doing similar kinds of things. All I was trying to say is that it's possible to share data by reference across processes (on one machine) using current pesc stuff, and that pytorch doesn't seem to support that (yet).

All I was trying to say is that it's possible to share data by reference across processes (on one machine) using current pesc stuff, and that pytorch doesn't seem to support that (yet).

yes that is true. Despite its name, DataParallel is only concerned about sharing already batched tensors across GPUs.

Nonetheless, should I prepare a PR for a simple pytorch 1.2 example based on the Keras example?

I have been playing with pescador and pytorch DataLoader lately and ended up doing the following:

from torch.utils.data import IterableDataset
class _PyTorchDataset(IterableDataset):
    def __init__(self, stream):
        super().__init__()
        self.stream = stream
    
    def __iter__(self):
        return self.stream.iterate()

pescador.maps.pytorch_dataset = _PyTorchDataset

... that can then be used more or less in the same way as pescador.maps.keras_tuples:

data_loader = DataLoader(pescador.maps.pytorch_dataset(my_pescador_stream),
                         batch_size=32, pin_memory=True)

It does the job because all that DataLoader cares about is the IterableDataset inheritance.

DataLoader will take care of creating mini-batches and memory pinning.
This obviously let pescador deal with parallelism.

@hbredin indeed, that seems to be a good combination. Did you evaluate the performance for this?

@faroit Not yet, no. This is on my TODO list with no ETA :-)

I checked: using DataLoader like I described above, or using pescador.maps.buffer_stream is more or less equivalent in terms of performance.

For some reason, I thought DataLoader would be smarter than that and do some kind of prefetching (which I really need), but it's not. I guess I'll fallback to this solution I have been using for a while but this is out of the scope of this issue.