/etlhelper

ETL Helper is a Python library to simplify data transfer between databases.

Primary LanguagePythonGNU Lesser General Public License v3.0LGPL-3.0

etlhelper

etlhelper is a Python library to simplify data transfer between databases.

etlhelper provides a unified way to connect to different database types (currently Oracle, PostgreSQL, SQLite and MS SQL Server). It is a thin wrapper around Python's DBAPI2 specification. The get_rows function returns the result of a SQL query and can be used to create simple HTTP APIs. The copy_rows function transfers data from one database to another. It is possible to apply a transform function to manipulate data in flight. These tools make it simple to create easy-to-understand, lightweight, versionable and testable Extract-Transform-Load (ETL) workflows.

etlhelper is not a tool for coordinating ETL jobs (use Apache Airflow), for converting GIS data formats (use ogr2ogr or fiona) or an Object Relation Mapper (use SQLAlchemy). However, it can be used in conjunction with each of these.

For an introduction to etlhelper, see the FOSS4GUK 2019 presentation Open Source Spatial ETL with Python and Apache Airflow: video (20 mins), slides.

Installation

pip install etlhelper[oracle]

Required database drivers are specified in the square brackets. Options are:

[oracle]
[mssql]
[postgres]

Multiple values can be separated by commas, e.g.: [oracle,mssql] would install both sets of drivers. The sqlite3 driver is included within Python's Standard Library.

Dependencies

Linux systems require additional packages to be installed on the system.

Debian / Ubuntu:

  • sudo apt install libaio1 for cxOracle.
  • sudo apt install build-essential unixodbc-dev for pyodbc.

Centos / Fedora:

  • sudo yum install libaio for Oracle
  • sudo yum install gcc gcc-c++ make python36-devel pyodbc unixODBC-devel for pyodbc

Oracle Instant Client

Oracle Instant Client libraries are required to connect to Oracle databases. etlhelper provides a script to install these on Linux systems from a zip file downloaded from the Oracle website and made available locally.

setup_oracle_client /path/or/url/for/instantclient-basic-linux.x64-12.2.0.1.0.zip
export "$(oracle_lib_path_export)"

If you are outside a virtual environment, the export command may be different. See terminal output for details. Run setup_oracle_client again to confirm setup has worked.

pyodbc for Microsoft SQL Server

The setup_mssql_driver tool checks that appropriate drivers are installed.

setup_mssql_driver

It provides links to installation instructions for drivers. The Dockerfile contains an example for Debian systems.

Quick Start

Password Definition

Passwords (e.g. Oracle password) must be specified via an environment variable. This can be done on the command line via:

  • export ORACLE_PASSWORD=some-secret-password on Linux
  • set ORACLE_PASSWORD=some-secret-password on Windows

Or in a Python terminal via:

import os
os.environ['ORACLE_PASSWORD'] = 'some-secret-password'

No password is required for SQLite databases.

DbParams

Database connection information is defined by DbParams objects.

from etlhelper import DbParams

ORACLEDB = DbParams(dbtype='ORACLE', host="localhost", port=1521,
                    dbname="mydata", user="oracle_user")

POSTGRESDB = DbParams(dbtype='PG', host="localhost", port=5432,
                      dbname="mydata", user="postgres_user")

SQLITEDB = DbParams(dbtype='SQLITE', filename='/path/to/file.db')

MSSQLDB = DbParams(dbtype='MSSQL', host="localhost", port=5432,
                   dbname="mydata", user="mssql_user",
                   odbc_driver="ODBC Driver 17 for SQL Server")

DbParams objects can also be created from environment variables using the from_environment() function.

Get rows

Connections are created by connect function. The get_rows function returns a list of named tuples containing data as native Python objects.

from my_databases import ORACLEDB
from etlhelper import connect, get_rows

sql = "SELECT * FROM src"

with connect(ORACLEDB, "ORA_PASSWORD") as conn:
    result = get_rows(sql, conn)

returns

[Row(id=1, value=1.234, simple_text='text', utf8_text='Öæ°\nz',
     day=datetime.date(2018, 12, 7),
     date_time=datetime.datetime(2018, 12, 7, 13, 1, 59)),
 Row(id=2, value=2.234, simple_text='text', utf8_text='Öæ°\nz',
     day=datetime.date(2018, 12, 8),
     date_time=datetime.datetime(2018, 12, 8, 13, 1, 59)),
 Row(id=3, value=2.234, simple_text='text', utf8_text='Öæ°\nz',
     day=datetime.date(2018, 12, 9),
     date_time=datetime.datetime(2018, 12, 9, 13, 1, 59))]

Data are accessible via index (row[4]) or name (row.day).

dump_rows passes each row to a function, while iter_rows returns a generator for looping over results.

Copy rows

Copy rows takes the results from a SELECT query and applies them as parameters to an INSERT query. The source and destination tables must already exist.

from my_databases import POSTGRESDB, ORACLEDB
from etlhelper import connect, copy_rows

select_sql = "SELECT id, name FROM src"
insert_sql = "INSERT INTO dest (id, name)
              VALUES (%s, %s)"

src_conn = connect(ORACLEDB, "ORA_PASSWORD")
dest_conn = connect(POSTGRESDB, "PG_PASSWORD")

copy_rows(select_sql, src_conn, insert_sql, dest_conn)

Transform

Data can be transformed in-flight by applying a transform function. This is any Python callable (function) that takes an iterator (e.g. list) and returns another iterator.

import random

def my_transform(chunk):
    # Append random integer (1-10), filter if <5.

    new_chunk = []
    for row in chunk:
        external_value = random.randrange(10)
        if external_value >= 6:
            new_chunk.append((*row, external_value))

    return new_chunk

copy_rows(select_sql, src_conn, insert_sql, dest_conn
          transform=my_transform)

The above code demonstrates that the returned chunk can have a different number of rows of different length. The external data can result from a call to a webservice or other database.

The iter_chunks and iter_rows functions return generators. Each chunk or row of data is only accessed when it is required. Using yield instead of return in the transform function makes it a generator, too. Data transformation can then be performed via memory-efficient iterator-chains.

Spatial ETL

No specific drivers are required for spatial data if they are transferred as Well Known Text.

select_sql_oracle = """
    SELECT
      id,
      SDO_UTIL.TO_WKTGEOMETRY(geom)
    FROM src
    """

insert_sql_postgis = """
    INSERT INTO dest (id, geom) VALUES (
      %s,
      ST_Transform(ST_GeomFromWKT(%s), 27700)
    )
    """

Other spatial operations e.g. coordinate transforms, intersections and buffering can be carried out in the SQL. Transform functions can manipulate geometries using the Shapely library.

ETL script example

The following is an example ETL script.

from my_databases import ORACLEDB, POSTGRESDB
from etl_helper import connect, copy_rows

DELETE_SQL = "..."
SELECT_SQL = "..."
INSERT_SQL = "..."

def copy_src_to_dest():
    with connect(ORACLEDB, "ORA_PASSWORD") as src_conn:
        with connect(POSTGRESDB, "PG_PASSWORD") as dest_conn:
            execute(DELETE_SQL, dest_conn)
            copy_rows(SELECT_SQL, src_conn,
                      INSERT_SQL, dest_conn)

if __name__ == "__main__":
    copy_src_to_dest()

The DELETE_SQL command clears existing data prior to insertion. This makes the script idempotent.

Recipes

etlhelper has other useful functions.

Logging progress

ETLHelper does not emit log messages by default. Time-stamped messages indicating the number of rows processed can be enabled by setting the log level to INFO. Setting the level to DEBUG provides information on the query that was run, example data and the database connection.

import logging
from etlhelper import logger

logger.setLevel(logging.INFO)

Output from a call to copy_rows will look like:

2019-10-07 15:06:22,411 iter_chunks: Fetching rows
2019-10-07 15:06:22,413 executemany: 1 rows processed
2019-10-07 15:06:22,416 executemany: 2 rows processed
2019-10-07 15:06:22,419 executemany: 3 rows processed
2019-10-07 15:06:22,420 iter_chunks: 3 rows returned
2019-10-07 15:06:22,420 executemany: 3 rows processed in total

Getting a SQLAlchemy engine

SQLAlchemy allows you to read/write data from Pandas. It can be installed separately with pip install sqlalchemy. For example, to export a CSV file of data:

from my_databases import ORACLEDB
from etlhelper import get_sqlalchemy_connection_string
from sqlalchemy import create_engine

sqla_conn_str = get_sqlalchemy_connection_string(ORACLEDB, "ORACLE_PASSWORD")
engine = create_engine(sqla_conn_str)

sql = "SELECT * FROM my_table"
df = pd.read_sql(sql, engine)
df.to_csv('my_data.csv', header=True, index=False, float_format='%.3f')

Row factories

A row factory can be specified to change the output style. For example, to return each row as a dictionary, use the following:

from etlhelper import connect, iter_rows
from etlhelper.row_factories import dict_rowfactory

conn = connect(ORACLEDB, 'ORACLE_PASSWORD')
sql = "SELECT * FROM my_table"
for row in iter_rows(sql, conn, row_factory=dict_rowfactory):
    print(row['id'])

The dict_rowfactory is useful when getting data to be serialised into JSON. When combined with Hug, an HTTP API can be created in fewer than 20 lines of code.

Insert rows

The executemany function can be used to insert data to the database. Large datasets are broken into chunks and inserted in batches to reduce the number of queries to the database that are required.

from etlhelper import connect, executemany
 
rows = [(1, 'value'), (2, 'another value')]
insert_sql = "INSERT INTO some_table (col1, col2) VALUES (%s, %s)"

with connect(some_db, 'SOME_DB_PASSWORD') as conn:
    executemany(insert_sql, rows, conn)

Development

Maintainers

ETL Helper was created by and is maintained by British Geological Survey Informatics.

See CONTRIBUTING.md for details on how to contribute to the software.

Licence

ETL Helper is distributed under the LGPL v3.0 licence. Copyright: © BGS / UKRI 2019

References