The datafun allows for loading existing datasets or files with different formats and treat them as streaming pipelines. It also allows one to add operations like filter and map in a functional way executed lazily while loading data.
With datafun you can:
- Load files locally from specific file formats, eg. CSV, JSON etc.
- Load files from remote sources, eg. a CSV or a JSON from a Google Cloud Storage bucket.
- Apply streaming transformations on the fly by applying functional operations (filter, map...) to it.
- Define data transformations to be stored and later used and extended by anyone. This will contribute to an internal hub of common datasets and pipelines used many times.
A datafun Dataset is both a lazy stream of data and lazy sequential pipeline of operations.
- It is a stream of data: when you iterate over the dataset, one element at a time is generated and returned to the caller.
- It is a lazy sequential pipeline: every time an element needs to be returned, the pipeline of operations is executed on the current element.
Node types. The DatasetSource class is the first node of the pipeline and is responsible for loading the data from a local or remote storage, then passing it to the next node in the pipeline. DatasetNode objects representing filter or map operations can be added to the pipeline.
Communication. The pipeline communicates in a message-passing fashion. Stream objects are passed between nodes, representing the state of the stream and containing the current element. A stream may be a StartOfStream, EndOfStream, PullStream, or just Stream (when containing data). To give an example of the communication, consider the following pipeline:
TextDataSource -> Map (lambda x: x.lower()) -> Filter (lambda x: "messiah" in x))
and the following stream of data:
"Dune" -> "Dune Messiah" -> "Children of Dune" -> ...
This is what happens when you begin iterating over the stream:
- StartOfStream triggers TextDataSource, which reads the first element
Dune
- Stream is forwarded to Map, which transforms
Dune
intodune
- Stream is forwarded to Filter, the lambda inside fails the check, so a PullStream is sent backward
- PullStream arrives to Map, which sends it back to TextDataSource
- TextDataSource reads the PullStream, generates another element,
Dune Messiah
, then issues a Stream forward. - ...
- This Stream arrives at the end, so
messiah
it is returned to the caller. - ...
Available transformation operations.
name | Arguments | Description |
---|---|---|
filter | f: Callable[T, bool] | Filters elements according to f. |
map | f: Callable[T, S] | Maps every element to another with function f. |
flat_map | f: Callable[T, Iterable[S]] = lambda x: x | A Flat Map takes a list and returns every element, one at a time. The given function f, given element of time T, must return an Iterable of type S. If not provided, f is the identity. |
sampling | p: float, seed: int | Samples elements according to the sampling ratio p: 0. < p <= 1. |
unique | by: Callable[T, U] = lambda x: x | Removes duplicates. by argument specifies a function that extracts the desired information |
aggregate | init: Callable[[], S] agg: Callable[[T, S], S] reduce: Callable[[S], R] |
Aggregates stream using function f. Hint: given initial value init of type S for aggregated stream, maps f to stream (value: T, aggregated: S) where value is an element of the stream, and aggregated is the current aggregate value. reduce is applied to the aggregated value, if provided |
zip | *iterables | Zips elements from multiple dataset, like python zip() |
join | other: Dataset, key: Callable, key_left: Callable, key_right: Callable | Joins two datasets based on provided key functions that specify the path in the dictionary. Either specify key for both, or key_left and key_right. |
limit | n: int | Limits the number of elements in the streams to the first n |
cache | / | Caches result of previous nodes into memory. Useful when the pipeline is executed multiple times. Be careful with memory intensive operations. |
repeat | / | Make the dataset infinite. Be careful that this does NOT cache the loading of data. Usa cache() beforehand if needed. |
Datasets overloads basic python operations: +
, -
, *
, /
.
You can see examples for every operation in the dedicated notebook.
Available info operations.
name | Arguments | Description |
---|---|---|
info | / | Prints dataset description |
summary | / | Prints list of transformations in the pipeline |
schema | / | Prints schema of dataset. Useful for csv, json, jsonl |
Available output operations.
name | Arguments | Return type | Description |
---|---|---|---|
show | n: int | str | Print first n elements as string |
take | n: int | List[T] | Returns the first n elements of the dataset |
take_while | f: Callable[..., bool] | List[T] | Returns all elements satisfying f |
collect | / | List[T] | Returns a list containing all elements. Also prints progress while collecting. |
Available other operations.
name | Arguments | Description |
---|---|---|
clone | / | Deep clone dataset |
pip install datafun
import datafun as dfn
ds: Dataset = dfn.load('text', path="path/to/dune_frank_herbert.txt")
for el in ds:
# do something with el
ds = dfn.load(range(N))
ds.take(5) # take first 5 integers
>> [0, 1, 2, 3, 4]
ds = dfn.load('json', path="path/to/file.json")
ds2 = ds.filter(lambda x: x['KEY'] > 10)
ds2 = ds2.map(lambda x: x**2)
print(ds2.summary()) # Shows pipeline ops
for el in ds2:
# do something with el.
# This will execute the defined ops one element at a time
# Streaming normalize on the fly
ds = (
dfn
.load('gcs-jsonl', path='gs://my_bucket/timeseries.jsonl')
.map(lambda x: x['value'])
)
mean = 3.14 # let's assume to have a mean
norm_ds = ds - mean
for value in norm_ds:
print(value)
You can see examples for every operation in the dedicated notebook.
ID | Class | Elem. type | Description |
---|---|---|---|
text | TextDataset | str | Streams the local text file(s) one line at a time. Glob supported |
csv | CSVDataset | dict | Streams the local CSV file(s) one row at a time. Glob supported |
json | JSONDataset | dict | Streams the local JSON file(s) one at a time. Glob supported |
jsonl | JSONLinesDataset | dict | Streams the local JSONLines file(s) one line at a time. Glob supported |
gcs | GCSDataset | str | Download files from Google Cloud Storage, then streams paths to be loaded locally. Glob supported |
gcs-text | GCSTextDataset | str | As gcs, but also opens the text file(s) and streams one line a time. Glob supported |
gcs-csv | GCSCSVDataset | dict | As gcs, but also opens the CSV file(s) and streams one row a time. Glob supported |
gcs-json | GCSJSONDataset | dict | As gcs, but also opens the JSON file(s) and streams it one at a time. Glob supported |
gcs-jsonl | GCSJSONLinesDataset | dict | As gcs, but also opens the JSONLines file(s) and streams one line at a time. Glob supported |
elk | ELKDataset | dict | Connects to ELK instance, runs json query specified in path , then returns the resulting docs one at a time |
rest | RESTDataset | dict | Streams the response from a REST API. |
Name | Type | Required | Default | Description |
---|---|---|---|---|
path | Union[List[str]], str] | Yes | The local path to be loaded | |
encoding | str | No | 'utf-8' | The encoding to be used when opening the file |
allowed_extensions | Sequence[str] | No | None | The supported extensions (others will be discarded). None means all extensions |
Returned element type: str
Name | Type | Required | Default | Description |
---|---|---|---|---|
path | Union[List[str], str] | Yes | The local path to be loaded | |
encoding | str | No | 'utf-8' | The encoding to be used when opening the file |
allowed_extensions | Sequence[str] | No | ('csv') | The supported extensions (others will be discarded). None means all extensions |
delimiter | str | No | ',' | The column delimiter |
Returned element type: dict
. Each element represent a row from the CSV(s). The keys are the CSV column names.
Name | Type | Required | Default | Description |
---|---|---|---|---|
path | Union[List[str], str] | Yes | The local path to be loaded | |
encoding | str | No | 'utf-8' | The encoding to be used when opening the file |
allowed_extensions | Sequence[str] | No | ('json') | The supported extensions (others will be discarded). None means all extensions |
Returned element type: dict
. This dictionary directly maps the JSON file.
Name | Type | Required | Default | Description |
---|---|---|---|---|
path | Union[List[str], str] | Yes | The local path to be loaded | |
encoding | str | No | 'utf-8' | The encoding to be used when opening the file |
allowed_extensions | Sequence[str] | No | ('json', 'jsonl') | The supported extensions (others will be discarded). None means all extensions |
Returned element type: dict
. This dictionary maps a single JSON line from the file.
Name | Type | Required | Default | Description |
---|---|---|---|---|
path | Union[List[str], str] | Yes | The remote path to be loaded | |
download_path | str | No | '$HOME/.cache/data-engineering' | The path where the file(s) will be downloaded |
service_account | str | No | None | The service account path to use. If missing will use the default user account |
project | str | No | None | The project id to use. If missing will use the user configured project it |
Returned element type: str
. Each element represent a local path where to find the file downloaded from GCS.
Name | Type | Required | Default | Description |
---|---|---|---|---|
path | Union[List[str], str] | Yes | The remote path to be loaded | |
download_path | str | No | '$HOME/.cache/data-engineering' | The path where the file(s) will be downloaded |
service_account | str | No | None | The service account path to use. If missing will use the default user account |
project | str | No | None | The project id to use. If missing will use the user configured project it |
encoding | str | No | 'utf-8' | The encoding to be used when opening the file |
Returned element type: str
. Each element represent a line from the remote file.
Name | Type | Required | Default | Description |
---|---|---|---|---|
path | Union[List[str], str] | Yes | The remote path to be loaded | |
download_path | str | No | '$HOME/.cache/data-engineering' | The path where the file(s) will be downloaded |
service_account | str | No | None | The service account path to use. If missing will use the default user account |
project | str | No | None | The project id to use. If missing will use the user configured project it |
encoding | str | No | 'utf-8' | The encoding to be used when opening the file |
allowed_extensions | Sequence[str] | No | ('csv') | The supported extensions (others will be discarded). None means all extensions |
delimiter | str | No | ',' | The column delimiter |
Returned element type: dict
. Each element represent a row from the CSV(s). The keys are the CSV column names.
Name | Type | Required | Default | Description |
---|---|---|---|---|
path | Union[List[str], str] | Yes | The remote path to be loaded | |
download_path | str | No | '$HOME/.cache/data-engineering' | The path where the file(s) will be downloaded |
service_account | str | No | None | The service account path to use. If missing will use the default user account |
project | str | No | None | The project id to use. If missing will use the user configured project it |
encoding | str | No | 'utf-8' | The encoding to be used when opening the file |
allowed_extensions | Sequence[str] | No | ('json') | The supported extensions (others will be discarded). None means all extensions |
Returned element type: dict
. Each element is a dict directly mapping a JSON file.
Name | Type | Required | Default | Description |
---|---|---|---|---|
path | Union[List[str], str] | Yes | The remote path to be loaded | |
download_path | str | No | '$HOME/.cache/data-engineering' | The path where the file(s) will be downloaded |
service_account | str | No | None | The service account path to use. If missing will use the default user account |
project | str | No | None | The project id to use. If missing will use the user configured project it |
encoding | str | No | 'utf-8' | The encoding to be used when opening the file |
allowed_extensions | Sequence[str] | No | ('json', 'jsonl') | The supported extensions (others will be discarded). None means all extensions |
Returned element type: dict
. Each element is a dict mapping a JSON line from the file(s).
Name | Type | Required | Default | Description |
---|---|---|---|---|
path | Union[List[str], str] | Yes | The local path to a query written in JSON | |
host | str | Yes | Elastic host URL | |
port | str | Yes | Elastic host port | |
username | str | Yes | Elastic authentication username | |
password | str | Yes | Elastic authentication password | |
index | str | Yes | Elastic index to query | |
start_isodate | str (ISO datetime) | Yes | Elastic start date range with format: "2021-09-15T10:00:00.000Z" | |
end_isodate | str (ISO datetime) | Yes | Elastic end date range with format: "2021-09-15T10:00:00.000Z" | |
date_field | str | No | @timestamp | Elastic date field. Can be nested into list, eg. "messages.date" |
date_field_separator | str | No | . | [DEPRECATED] (separator automatically inferred) Separator for date_field used to split the path. Use different ones to NOT split and consider date_field as single field |
Returned element type: dict
. Each element is a document matching the given query.
Name | Type | Required | Default | Description |
---|---|---|---|---|
download_path | str | No | '$HOME/.cache/data-engineering' | Where to download data |
headers | dict | No | {} | Call Headers |
auth_headers | dict | No | {} | Authentication headers |
params | Optional[dict] | No | None | URL key value params |
method | str | No | 'GET' | Call method, 'GET', 'POST'... |
next_url_f | Callable[[dict], Optional[str]] | No | lambda x: None | Function to extract the next url to call from the json response for paginated APIs. Should return None to mark the end or the URL string |
Returned element type: dict
representing the json response.
def my_load(path, **kwargs):
ds = (dfn
.load('json', path=path, **kwargs)
.filter(lambda x: x['KEY'] > 10)
.map(lambda x: x**2)
)
return ds
Write classes
@dataclass
class ELKDatasetConfig(dfn.Config):
# Additional arguments to base class
host: str
port: int
class ELKDataset(dfn.DatasetSource):
def __init__(self, config, **kwargs):
super().__init__(config=config, **kwargs)
self.elk_client = elklib(self.config.host, self.config.port)
def dataset_name() -> str:
return 'elk-dataset'
def info(self) -> dict:
return {
'description': 'My ELK dataset class.',
'author': 'foo@company.org',
'date': '2021-09-01',
}
def schema(self) -> dict:
return {
'column_1': 'int',
...
}
# this method may also load a row and infer it, like CSVDataset does
def _generate_examples(self) -> Generator:
for doc in self.elk_client.scroll_function():
# Apply some prep to the document
yield doc
Add new dataset to list of supported datasets
- Add a line to
DATASETS
indatafun/__init__.py
:
DATASETS = {
...
"elk-dataset": ELKDataset,
}
Use it!
ds = dfn.load('elk-dataset', config={
'host': 'localhost',
'port': '8888',
})