|
def get_iter( |
|
self, |
|
keys_iterable: Optional[Iterable] = None, |
|
shuffle_key_buffer: int = 1, |
|
key_hooks: Optional[Iterable[Union[Callable, Type[Composable], functools.partial]]] = None, |
|
max_workers: Optional[int] = None, |
|
prefetch_buffer: int = 10, |
|
shuffle_item_buffer: int = 1, |
|
flatten: bool = False, |
|
keys_kwargs: Optional[Dict] = None, |
|
get_kwargs: Optional[Dict] = None, |
|
key_shuffle_kwargs: Optional[Dict] = None, |
|
item_shuffle_kwargs: Optional[Dict] = None, |
|
) -> Composable: |
|
"""Returns an iterable of items in the form of a :py:class:`squirrel.iterstream.Composable`, which allows |
|
various stream manipulation functionalities. |
|
|
|
Items are fetched using the :py:meth:`get` method. The returned :py:class:`Composable` iterates over the items |
|
in the order of the keys returned by the :py:meth:`keys` method. |
|
|
|
Args: |
|
keys_iterable (Iterable, optional): If provided, only the keys in `keys_iterable` will be used to fetch |
|
items. If not provided, all keys in the store are used. |
|
shuffle_key_buffer (int): Size of the buffer used to shuffle keys. |
|
key_hooks (Iterable[Iterable[Union[Callable, Type[Composable], functools.partial]]], optional): Hooks |
|
to apply to keys before fetching the items. It is an Iterable any of these objects: |
|
1) subclass of :py:meth:`~squirrel.iterstream.Composable`: in this case, `.compose(hook, **kw)` |
|
will be applied to the stream |
|
2) A Callable: `.to(hook, **kw)` will be applied to the stream |
|
3) A partial function: the three attributes `args`, `keywords` and `func` will be retrieved, and |
|
depending on whether `func` is a subclass of :py:meth:`~squirrel.iterstream.Composable` or a |
|
`Callable`, one of the above cases will happen, with the only difference that arguments are passed |
|
too. This is useful for passing arguments. |
|
max_workers (int, Optional): If larger than 1 or None, :py:meth:`~squirrel.iterstream.Composable.async_map` |
|
is called to fetch multiple items simultaneously and `max_workers` refers to the maximum number of |
|
workers in the ThreadPoolExecutor used by `async_map`. |
|
Otherwise, :py:meth:`~squirrel.iterstream.Composable.map` is called and `max_workers` is not used. |
|
Defaults to None. |
|
prefetch_buffer (int): Size of the buffer used for prefetching items if `async_map` is used. See |
|
`max_workers` for more details. Please be aware of the memory footprint when setting this parameter. |
|
shuffle_item_buffer (int): Size of the buffer used to shuffle items after being fetched. Please be aware of |
|
the memory footprint when setting this parameter. |
|
flatten (bool): Whether to flatten the returned iterable. Defaults to False. |
|
keys_kwargs (Dict, optional): Keyword arguments passed to :py:meth:`keys` when getting the keys in the |
|
store. Not used if `keys_iterable` is provided. Defaults to None. |
|
get_kwargs (Dict, optional): Keyword arguments passed to :py:meth:`get` when fetching items. Defaults to |
|
None. |
|
key_shuffle_kwargs (Dict, optional): Keyword arguments passed to :py:meth:`shuffle` when shuffling keys. |
|
Defaults to None. Can be useful to e.g. set the seed etc. |
|
item_shuffle_kwargs (Dict, optional): Keyword arguments passed to :py:meth:`shuffle` when shuffling items. |
|
Defaults to None. Can be useful to e.g. set the seed etc. |
|
|
|
Returns: |
|
(squirrel.iterstream.Composable) Iterable over the items in the store. |
|
""" |
|
keys_kwargs = {} if keys_kwargs is None else keys_kwargs |
|
get_kwargs = {} if get_kwargs is None else get_kwargs |
|
key_shuffle_kwargs = {} if key_shuffle_kwargs is None else key_shuffle_kwargs |
|
item_shuffle_kwargs = {} if item_shuffle_kwargs is None else item_shuffle_kwargs |
|
keys_it = keys_iterable if keys_iterable is not None else self.keys(**keys_kwargs) |
|
it = IterableSource(keys_it).shuffle(size=shuffle_key_buffer, **key_shuffle_kwargs) |
|
|
|
if key_hooks: |
|
for hook in key_hooks: |
|
arg = [] |
|
kwarg = {} |
|
f = hook |
|
if isinstance(hook, partial): |
|
arg = hook.args |
|
kwarg = hook.keywords |
|
f = hook.func |
|
|
|
if isclass(f) and issubclass(f, Composable): |
|
it = it.compose(f, *arg, **kwarg) |
|
elif isinstance(f, Callable): |
|
it = it.to(f, *arg, **kwarg) |
|
else: |
|
raise ValueError( |
|
f"wrong argument for hook {hook}, it should be a Callable, partial function, or a subclass " |
|
f"of Composable" |
|
) |
|
|
|
map_fn = partial(self.get, **get_kwargs) |
|
_map = ( |
|
it.map(map_fn) |
|
if max_workers is not None and max_workers <= 1 |
|
else it.async_map(map_fn, prefetch_buffer, max_workers) |
|
) |