aphp/edsnlp

Refactor the parallelization utils

percevalw opened this issue · 1 comments

Feature type

Following a brainstorming with @Thomzoy, we'd like to refactor the parallelization utilities to decouple the type of collection (iterators, lists, dataframe pandas, dataframe spark, hive table, etc.) from the type of parallelization (no parallelization, multi cpu, gpu, distributed computing for spark).

Description

Collection types

Most of the processing with edsnlp is done on pandas and spark lists and dataframes (to the best of our knowledge), so we feel it's necessary to handle these cases natively.

The following changes will be made:

  • the nlp.pipe method (cf refacto) will be able to receive a classic iterable as input, as is already the case, or a dataframe (pandas / spark at least)
  • to manage the conversion of table rows / dictionaries into spacy.tokens.Doc objects, we add two methods to nlp.__call__ and nlp.pipe to replace (eventually) the parameters (additional_spans, extensions, context, results_extractor):
    • parameter to_doc: (Any -> spacy.tokens.Doc)
    • parameter from_doc: (spacy.tokens.Doc -> Any)
  • in the case of a pandas or spark dataframe, we pre-convert the rows of these tables into dictionaries, before calling the to_doc method, and convert a dictionary produced by from_doc into a table row.

How do we plan to support other formats?

It's up to the user to convert the entry into an accepted format. For example, polars to pandas, or polars to dictionary iterator.

Why add two methods to_doc / from_doc, and not simply ask the user to convert everything into an iterator?

  1. Depending on the distribution method, data may not always be accessible locally. For instance, in the case of a spark dataframe, a function is sent to each executor to apply the nlp object to a line. It is therefore necessary to send these conversion functions with the nlp object to each executor, hence these two parameters.
  2. This also allows us to optimize the common but tricky collections (pandas, spark), while giving users some leeway in the choice of columns and outputs.

Acceleration / parallelization mode

We plan to manage acceleration on several processes, one or more gpus, or in a distributed way via spark

  • the nlp.pipe method will be able to take n_process as input (parameter originally used in the spacy Language object) to distribute via several processes locally

  • a new "method" parameter will receive an acceleration object (dict / custom object?) containing the acceleration type:

    • if gpu method: devices, num_cpu_per_gpus, batch_size
    • if spark method (only compatible with dataframe spark) with number of partitions if repartitioning, number of executors, etc.
    • ...

    This will in turn call a specific function depending on the method
    We can probably infer the parallelization method automatically, depending on the type of input collection and the computational resources available.

Pseudo implementation

This is open to discussion:

def pipe(self, data, from_doc, to_doc, method):
    if is_simple_iterable(data):
        accelerator = get_iterable_accelerator(method)  # various accelerators 
        return accelerator(data, from_doc, to_doc)
    elif is_pandas(data):
        iterable = pandas_to_iterable(data)
        accelerator = get_iterable_accelerator(method)  # various accelerators 
        iterable = accelerator(iterable, from_doc, to_doc)
        return iterable_to_pandas(iterable)
    elif is_polars(data): # we haven't decided yet which formats are common enough to be supported natively
        ...
    elif is_spark(data):
        accelerator = get_spark_accelerator(method)  # will check that the accelerator is compatible
        return accelerator(data, from_doc, to_doc)
    else:
        raise NotImplementedError()

Change of mind !

The above proposition did not take into account:

  • reading from a given source type and writing to a different source type
  • parallelizing the IO ops (like decoding a json line or writing a parquet file in a distributed fashion)
  • parallelizing conversion ops (like tokenization and entities pre-annotation)
  • decoupling data format and processing method : apply multiprocessing backend on data read from/to spark dataframes...

The new solution (shipped with v0.10.0 #202) fixes this by introducing the LazyDocsCollection, which records ops lazily and handles document conversion (i.e., tokenization and more) at the same level as pipeline components.