/elx

A lightweight Python interface for extracting and loading using the Singer.io spec.

Primary LanguagePython

ELX

🚧 Under construction

A lightweight Python interface for extracting and loading using the Singer.io spec.

âš¡ Lazy install of Singer.io taps and targets
âš¡ Stream parallelism for high performance
âš¡ Remote state management
âš¡ Tap catalog is available in Python for metadata purposes

🔗 Native integration with Dagster
🔗 Native integration with Airflow (todo)
🔗 Native integration with Prefect (todo)

Installation

pip install elx --pre

Usage

The most basic usage is as follows. Simply define the Tap and the Target and elx will take care of the rest.

from elx import Runner, Tap, Target

runner = Runner(
  Tap("git+https://gitlab.com/meltano/tap-carbon-intensity.git"),
  Target("target-jsonl")
)

runner.run()

Configuration

You can configure the tap and target by passing a config dictionary to the Tap and Target constructors. The config will be injected into the tap and target at runtime.

from elx import Tap, Target

tap = Tap(
  "tap-foo",
  config={
    "api_key": "1234567890",
    "start_date": "2020-01-01"
  }
)

target = Target(
  "target-bar",
  config={
    "file_path": "/tmp"
  }
)

De-selecting streams and properties

You can modify the selected streams and properties of the tap by passing a deselected list to the Tap constructor. To deselect an entire stream, you specifiy the <stream_name>. To just deselect a stream property, specify the <stream_name.property_name>.

from elx import Tap

tap = Tap(
  "tap-foo",
  config={...},
  deselected=[
    "users", # deselects the entire `users` stream
    "customers.name", # deselects the `name` property from the `customers` stream
  ]
)

Replication keys

To facilitate incremental loading, the Tap constructor allows you to include a replication_keys dictionary. This dictionary should contain key-value pairs representing the stream names and their respective replication keys.

from elx import Tap

tap = Tap(
  "tap-foo",
  config={...},
  replication_keys={
    "users": "updated_at", # sets `updated_at` column as replication key for `users` stream
  }
)

State

By default, elx will store the state in the same directory as the script that is running. You can override this by passing a StateManager to the Runner constructor. Behind the scenes, elx uses smart-open to be able to store the state in a variety of locations.

from elx import Runner, StateManager

runner = Runner(
  tap,
  target,
  state_manager=StateManager("s3://my-bucket/my-folder")
)

Supported paths include:

Path Required Environment Variables Elx Extra
s3://my-bucket/my-folder AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY elx[s3]
gs://my-bucket/my-folder GOOGLE_APPLICATION_CREDENTIALS or GOOGLE_API_TOKEN elx[gs]
azure://my-bucket/my-folder AZURE_STORAGE_CONNECTION_STRING elx[azure]
~/my-folder None None
/tmp/my-folder None None
(ssh|scp|sftp)://username@host//my-folder None None
(ssh|scp|sftp)://username:password@host//my-folder None None

Config injection

There are certain variables that are available to inject in the tap or target config. One common use case is to place the tap data in a schema with the tap name.

target = Target(
  "target-bar",
  config={
    "default_target_schema": "{TAP_NAME}"
  }
)

Supported variables:

Variable Example
NOW 2023-08-17T11:06:54.233086
YESTERDAY 2023-08-16T11:06:54.233086
LAST_WEEK 2023-08-10T11:06:54.233086
TAP_EXECUTABLE tap-smoke-test
TAP_NAME tap_smoke_test
TARGET_EXECUTABLE target-postgres
TARGET_NAME target_postgres