merantix-momentum/squirrel-core

[FEATURE] Make `get_iter` method documentation about `max_workers` more explicit

kai-tub opened this issue · 3 comments

Hey, I've stumbled across a potentially easy-to-misunderstand part of the MapDriver.get_iter documentation:

def get_iter(
self,
keys_iterable: Optional[Iterable] = None,
shuffle_key_buffer: int = 1,
key_hooks: Optional[Iterable[Union[Callable, Type[Composable], functools.partial]]] = None,
max_workers: Optional[int] = None,
prefetch_buffer: int = 10,
shuffle_item_buffer: int = 1,
flatten: bool = False,
keys_kwargs: Optional[Dict] = None,
get_kwargs: Optional[Dict] = None,
key_shuffle_kwargs: Optional[Dict] = None,
item_shuffle_kwargs: Optional[Dict] = None,
) -> Composable:
"""Returns an iterable of items in the form of a :py:class:`squirrel.iterstream.Composable`, which allows
various stream manipulation functionalities.
Items are fetched using the :py:meth:`get` method. The returned :py:class:`Composable` iterates over the items
in the order of the keys returned by the :py:meth:`keys` method.
Args:
keys_iterable (Iterable, optional): If provided, only the keys in `keys_iterable` will be used to fetch
items. If not provided, all keys in the store are used.
shuffle_key_buffer (int): Size of the buffer used to shuffle keys.
key_hooks (Iterable[Iterable[Union[Callable, Type[Composable], functools.partial]]], optional): Hooks
to apply to keys before fetching the items. It is an Iterable any of these objects:
1) subclass of :py:meth:`~squirrel.iterstream.Composable`: in this case, `.compose(hook, **kw)`
will be applied to the stream
2) A Callable: `.to(hook, **kw)` will be applied to the stream
3) A partial function: the three attributes `args`, `keywords` and `func` will be retrieved, and
depending on whether `func` is a subclass of :py:meth:`~squirrel.iterstream.Composable` or a
`Callable`, one of the above cases will happen, with the only difference that arguments are passed
too. This is useful for passing arguments.
max_workers (int, Optional): If larger than 1 or None, :py:meth:`~squirrel.iterstream.Composable.async_map`
is called to fetch multiple items simultaneously and `max_workers` refers to the maximum number of
workers in the ThreadPoolExecutor used by `async_map`.
Otherwise, :py:meth:`~squirrel.iterstream.Composable.map` is called and `max_workers` is not used.
Defaults to None.
prefetch_buffer (int): Size of the buffer used for prefetching items if `async_map` is used. See
`max_workers` for more details. Please be aware of the memory footprint when setting this parameter.
shuffle_item_buffer (int): Size of the buffer used to shuffle items after being fetched. Please be aware of
the memory footprint when setting this parameter.
flatten (bool): Whether to flatten the returned iterable. Defaults to False.
keys_kwargs (Dict, optional): Keyword arguments passed to :py:meth:`keys` when getting the keys in the
store. Not used if `keys_iterable` is provided. Defaults to None.
get_kwargs (Dict, optional): Keyword arguments passed to :py:meth:`get` when fetching items. Defaults to
None.
key_shuffle_kwargs (Dict, optional): Keyword arguments passed to :py:meth:`shuffle` when shuffling keys.
Defaults to None. Can be useful to e.g. set the seed etc.
item_shuffle_kwargs (Dict, optional): Keyword arguments passed to :py:meth:`shuffle` when shuffling items.
Defaults to None. Can be useful to e.g. set the seed etc.
Returns:
(squirrel.iterstream.Composable) Iterable over the items in the store.
"""
keys_kwargs = {} if keys_kwargs is None else keys_kwargs
get_kwargs = {} if get_kwargs is None else get_kwargs
key_shuffle_kwargs = {} if key_shuffle_kwargs is None else key_shuffle_kwargs
item_shuffle_kwargs = {} if item_shuffle_kwargs is None else item_shuffle_kwargs
keys_it = keys_iterable if keys_iterable is not None else self.keys(**keys_kwargs)
it = IterableSource(keys_it).shuffle(size=shuffle_key_buffer, **key_shuffle_kwargs)
if key_hooks:
for hook in key_hooks:
arg = []
kwarg = {}
f = hook
if isinstance(hook, partial):
arg = hook.args
kwarg = hook.keywords
f = hook.func
if isclass(f) and issubclass(f, Composable):
it = it.compose(f, *arg, **kwarg)
elif isinstance(f, Callable):
it = it.to(f, *arg, **kwarg)
else:
raise ValueError(
f"wrong argument for hook {hook}, it should be a Callable, partial function, or a subclass "
f"of Composable"
)
map_fn = partial(self.get, **get_kwargs)
_map = (
it.map(map_fn)
if max_workers is not None and max_workers <= 1
else it.async_map(map_fn, prefetch_buffer, max_workers)
)

The documentation of max_workers states that by default None will be used and also mentions that this will cause async_map to be called but I missed these parts of the documentation and was surprised to see that so many threads were allocated.

I am/was not too familiar with the ThreadPoolExecutor interface and find it somewhat surprising that None equals numer_of_processors x 5 according to the ThreadPoolExecutor definition.
Maybe it would be helpful to explicitly state that by default ThreadPoolExecutor will be used with so many threads?
The documentation string reads a bit unintuitive as the starts out that max_worker defines how many items are fetched simultaneously with max_worker and then continues to state that otherwise map is used.
From that perspective, max_workers=None doesn't sound like it should be using any threads at all.
Without knowing the default values of ThreadPoolExecutor I would make it more explicit that to disable threading one has to set max_workers=0/1 and that by default many threads are used.

I am happy to add a PR with my suggested doc-string update if you agree! :)

Hi @kai-tub, thank you for bringing this up! Sounds like a good step to enhance the user experience. I'd be happy to review a PR.

We should probably also mention this more explicitly in our readthedocs docu, if it is not clear yet. CC @AlirezaSohofi to keep you in the loop.

Hello @kai-tub. Thanks for reporting this issue. I agree it can be confusing. A PR is very welcome of course. I would suggest to mention that it depends on the python version, as explained in the link you provided.