🚧 Under construction
A lightweight Python interface for extracting and loading using the Singer.io spec.
âš¡ Lazy install of Singer.io taps and targets
âš¡ Stream parallelism for high performance
âš¡ Remote state management
âš¡ Tap catalog is available in Python for metadata purposes
🔗 Native integration with Dagster
🔗 Native integration with Airflow (todo)
🔗 Native integration with Prefect (todo)
pip install elx --pre
The most basic usage is as follows. Simply define the Tap and the Target and elx will take care of the rest.
from elx import Runner, Tap, Target
runner = Runner(
Tap("git+https://gitlab.com/meltano/tap-carbon-intensity.git"),
Target("target-jsonl")
)
runner.run()
You can configure the tap and target by passing a config
dictionary to the Tap
and Target
constructors. The config will be injected into the tap and target at runtime.
from elx import Tap, Target
tap = Tap(
"tap-foo",
config={
"api_key": "1234567890",
"start_date": "2020-01-01"
}
)
target = Target(
"target-bar",
config={
"file_path": "/tmp"
}
)
You can modify the selected streams and properties of the tap by passing a deselected
list to the Tap constructor. To deselect an entire stream, you specifiy the <stream_name>
. To just deselect a stream property, specify the <stream_name.property_name>
.
from elx import Tap
tap = Tap(
"tap-foo",
config={...},
deselected=[
"users", # deselects the entire `users` stream
"customers.name", # deselects the `name` property from the `customers` stream
]
)
To facilitate incremental loading, the Tap constructor allows you to include a replication_keys
dictionary. This dictionary should contain key-value pairs representing the stream names and their respective replication keys.
from elx import Tap
tap = Tap(
"tap-foo",
config={...},
replication_keys={
"users": "updated_at", # sets `updated_at` column as replication key for `users` stream
}
)
By default, elx will store the state in the same directory as the script that is running. You can override this by passing a StateManager
to the Runner
constructor. Behind the scenes, elx uses smart-open to be able to store the state in a variety of locations.
from elx import Runner, StateManager
runner = Runner(
tap,
target,
state_manager=StateManager("s3://my-bucket/my-folder")
)
Supported paths include:
Path | Required Environment Variables | Elx Extra |
---|---|---|
s3://my-bucket/my-folder |
AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY |
elx[s3] |
gs://my-bucket/my-folder |
GOOGLE_APPLICATION_CREDENTIALS or GOOGLE_API_TOKEN |
elx[gs] |
azure://my-bucket/my-folder |
AZURE_STORAGE_CONNECTION_STRING |
elx[azure] |
~/my-folder |
None |
None |
/tmp/my-folder |
None |
None |
(ssh|scp|sftp)://username@host//my-folder |
None |
None |
(ssh|scp|sftp)://username:password@host//my-folder |
None |
None |
There are certain variables that are available to inject in the tap or target config. One common use case is to place the tap data in a schema with the tap name.
target = Target(
"target-bar",
config={
"default_target_schema": "{TAP_NAME}"
}
)
Supported variables:
Variable | Example |
---|---|
NOW |
2023-08-17T11:06:54.233086 |
YESTERDAY |
2023-08-16T11:06:54.233086 |
LAST_WEEK |
2023-08-10T11:06:54.233086 |
TAP_EXECUTABLE |
tap-smoke-test |
TAP_NAME |
tap_smoke_test |
TARGET_EXECUTABLE |
target-postgres |
TARGET_NAME |
target_postgres |