Refactor the parallelization utils
percevalw opened this issue · 1 comments
Feature type
Following a brainstorming with @Thomzoy, we'd like to refactor the parallelization utilities to decouple the type of collection (iterators, lists, dataframe pandas, dataframe spark, hive table, etc.) from the type of parallelization (no parallelization, multi cpu, gpu, distributed computing for spark).
Description
Collection types
Most of the processing with edsnlp is done on pandas and spark lists and dataframes (to the best of our knowledge), so we feel it's necessary to handle these cases natively.
The following changes will be made:
- the nlp.pipe method (cf refacto) will be able to receive a classic iterable as input, as is already the case, or a dataframe (pandas / spark at least)
- to manage the conversion of table rows / dictionaries into
spacy.tokens.Doc
objects, we add two methods tonlp.__call__
andnlp.pipe
to replace (eventually) the parameters (additional_spans, extensions, context, results_extractor):- parameter
to_doc
: (Any -> spacy.tokens.Doc) - parameter
from_doc
: (spacy.tokens.Doc -> Any)
- parameter
- in the case of a pandas or spark dataframe, we pre-convert the rows of these tables into dictionaries, before calling the to_doc method, and convert a dictionary produced by from_doc into a table row.
How do we plan to support other formats?
It's up to the user to convert the entry into an accepted format. For example, polars to pandas, or polars to dictionary iterator.
Why add two methods
to_doc
/from_doc
, and not simply ask the user to convert everything into an iterator?
- Depending on the distribution method, data may not always be accessible locally. For instance, in the case of a spark dataframe, a function is sent to each executor to apply the nlp object to a line. It is therefore necessary to send these conversion functions with the nlp object to each executor, hence these two parameters.
- This also allows us to optimize the common but tricky collections (pandas, spark), while giving users some leeway in the choice of columns and outputs.
Acceleration / parallelization mode
We plan to manage acceleration on several processes, one or more gpus, or in a distributed way via spark
-
the nlp.pipe method will be able to take n_process as input (parameter originally used in the spacy Language object) to distribute via several processes locally
-
a new "method" parameter will receive an acceleration object (dict / custom object?) containing the acceleration type:
- if gpu method: devices, num_cpu_per_gpus, batch_size
- if spark method (only compatible with dataframe spark) with number of partitions if repartitioning, number of executors, etc.
- ...
This will in turn call a specific function depending on the method
We can probably infer the parallelization method automatically, depending on the type of input collection and the computational resources available.
Pseudo implementation
This is open to discussion:
def pipe(self, data, from_doc, to_doc, method):
if is_simple_iterable(data):
accelerator = get_iterable_accelerator(method) # various accelerators
return accelerator(data, from_doc, to_doc)
elif is_pandas(data):
iterable = pandas_to_iterable(data)
accelerator = get_iterable_accelerator(method) # various accelerators
iterable = accelerator(iterable, from_doc, to_doc)
return iterable_to_pandas(iterable)
elif is_polars(data): # we haven't decided yet which formats are common enough to be supported natively
...
elif is_spark(data):
accelerator = get_spark_accelerator(method) # will check that the accelerator is compatible
return accelerator(data, from_doc, to_doc)
else:
raise NotImplementedError()
Change of mind !
The above proposition did not take into account:
- reading from a given source type and writing to a different source type
- parallelizing the IO ops (like decoding a json line or writing a parquet file in a distributed fashion)
- parallelizing conversion ops (like tokenization and entities pre-annotation)
- decoupling data format and processing method : apply multiprocessing backend on data read from/to spark dataframes...
The new solution (shipped with v0.10.0 #202) fixes this by introducing the LazyDocsCollection
, which records ops lazily and handles document conversion (i.e., tokenization and more) at the same level as pipeline components.