/sprucfluo

Data streaming for LMs. WIP

Primary LanguagePythonApache License 2.0Apache-2.0

Sprucfluo: Streaming Datasets for Large Language Models

This is a library to support streaming datasets for large language models. It's centered around TorchData, which is a library of combinators around torch Datasets. It uses HuggingFace for tokenization, and fsspec for reading files from local filesystems, http, and cloud storage.

Here's a quick example of how to use it. This script will stream the pile and another corpus, tokenize them, and mix samples from them in equal proportions.

from torchdata.datapipes.iter.util.samplemultiplexer import SampleMultiplexerDataPipe
from transformers import AutoTokenizer

import sprucfluo as sf

pile_data = sf.corpus.load_corpus("https://mystic.the-eye.eu/public/AI/pile/train/{00..29}.jsonl.zst")
pubmed_data = sf.corpus.load_corpus("gcs://pubmed-mosaic/pubmed-sharded/pubmedAbs_train.{1..128}-of-128.jsonl.gz")

tokenizer = AutoTokenizer.from_pretrained("gpt2")

pubmed_data = pubmed_data.then(sf.tokenize_and_group_texts, tokenizer=tokenizer, seq_len=1024)
pile_data = pile_data.then(sf.tokenize_and_group_texts, tokenizer=tokenizer, seq_len=1024)

data = SampleMultiplexerDataPipe({pile_data: 5, pubmed_data: 5}, seed=0)

for encoded in data: # iterdatapipe of BatchEncoding
    print(encoded)

One somewhat sneaky thing that sprucfluo does is it registers the then method on IterDataPipe using (the existing) TorchData registration mechanism. This means that you can use the then method on any IterDataPipe, passing in a function that goes from IterDataPipe to IterDataPipe, along with any additional arguments that can be passed in à la functtools.partial.

It also registers a few additional methods:

  • shard_by_rank: splits the data into shards based on the rank of the process.
  • load_from_fsspec_fancy: similar to the built-in load_from_fsspec, but accepts additional arguments to pass to fsspec.open, including compression and compression_opts and any authentication information. It also automatically decompresses using the built-in decompression mechanisms in fsspec.

At the moment it doesn't support caching, though that's in progress.

Open TAsks

  • Add support for caching
  • think about loading from HF datasets?
  • support for the following datasets:
    • openwebtext (weird archive format)
    • wikitext-103 (need to detokenize)
    • lambada (need to detokenize)