/PypeLineETL

A Python ETL extension for PygramETL

Primary LanguagePython

PypeLineETL

Introduction

This repository contains the code for populating the Flashnotes Data Warehouse. This system is comprised of lightweight ETL process management code that uses the open source pygrametl package for some of the core dimensional modeling infrastructure. Otherwise the codebase is a relatively pure pythonic ETL framework. A few key design considerations

  • Celery offers us a powerful means of executing ETLs asynchronously, but simple synchronous execution options are available from run.py

  • Although our implementation uses MySql, it would be rather trivial to use Postgres or some other relational database management system.

  • The system architecture maintains intentional decoupling between ETLs. This allows for many powerful production configurations that exploit parallelization to its fullest. For example, the ETLs could be spread out across several queues and task "workers" and indeed even accross multiple VMs.

Some general references on dimensional modeling and ETL management

Dimensional Modeling Techniques

The Data Warehouse Toolkit (book)

System Prerequisites

The repo deploy subfolder contains two files packages.txt and requirements.txt which list the system package (obtained via brew or apt-get) and pip install packages respectively.

Basic ETL Workflow

Overall System Organization

PypeLineETL is currently designed for maximum configuration flexibility. There are really six files with which to be familiar in order to properly configure and deploy the system.

  • config.py - describes which dimensions and facts are known to the system
  • dimensions.py - pygram dimension object factory definitions
  • facts.py - pygram fact object factory definitions
  • sources.py - low level sql definitions for source tables
  • creates.py - low level sql definitions for data warehouse tables
  • tasks.py - asynchronous celery task definitions.

The additional file run.py is really meant for experimentation and testing.

Synchronous Commandline Execution(using run.py)

For bootstrapping, test, or demonstration purposes it may be useful to simple run one or more ETLs from a command line. Although in production a parallelized task management model is more suitable. If you simply type run.py from the command line from within the main project folder Pypeline will execute any and all configured ETLs. You can edit config.py to turn on and off any ETLs you don't wish to run. In addition you can run tests.py which will validate that the source connections and queries are working for any configured ETLs.

Using run.py

This simple command line tool supports very basic arguments. The organization of PypeLine encourages the user to use the various configuration files to govern which ETLs get run and what the various data sources are for each. So in run.py there ar not complicated data connection parameters. In addition there is no way to specify specific ETLs.

Usage

Enter python run.py --help

This will show you all of the available options

Usage: run.py [options]

Options:
  -h, --help
  --test_dimension_source  Test the dimensions source SQL queries (read-only)
  --run_dimensions         Run all active dimension ETLs
  --run_facts              Run all active fact ETLs
  --delete_existing_tables Delete all existing dimension and fact tables

NOTE: Only --test_dimensions_source defaults to True, so simply invoking run.py will run the dimension source tests. This can be a good way to do basic environment validation.

Asynchronous Execution (using Celery)

The asynchronous task management is handled through Celery. In tasks.py there are currently two celery tasks functions. One is a lower level function which can execute a single ETL and the other is a higher level function which can run all configured ETLs as a group. Furthermore the config.py ETL definitions can be used to turn an ETL on or off

System Prerequistes

As mentioned previously, it is necessary to have celery and redis configured to run the ETL tasks in tasks.py. A simple development setup can be run with the following invocations. If you are running an AWS machine you may need to invoke these using sudo.

/usr/local/bin/redis-server &
/usr/local/bin/celeryd &
Task Running a Single ETL
@celery.task
def process_etl(conf_name, conf_type)}}}
...

Run a single ETL (in a python shell)

In [2]: import tasks
In [3]: task = tasks.process_etl.apply_async(args=["school", "dimension"])

Check on the task status

In [4]: task.status
Out[4]: 'SUCCESS'

In [5]: task.result
Out[5]: ('dimension', 'school', 1687)

Check the content of the school_dim Data Warehouse Table

mysql> select school_id, name, state from school_dim limit 3;
+-----------+------------------------------+-------+
| school_id | name                         | state |
+-----------+------------------------------+-------+
|       993 | Abilene Christian University | TX    |
|       995 | Academy of Art University    | CA    |
|       996 | Adams State College          | CO    |
+-----------+------------------------------+-------+
3 rows in set (0.00 sec)

Task Running a configured set of ETLs
@celery.task
def refresh_data_warehouse(run_dimensions=True,
                           run_facts=True,
                           delete_existing_tables=True):
...

In this test configuration we have 4 dimension ETLs configured. Let's run all of them (in a python shell) but let's skip running facts for now.

In [1]: import tasks

In [2]: task = tasks.refresh_data_warehouse.apply_async(args=[True, False, True])

Later we can check the task status. Note, that we have have status (updated row counts) for each of the 4 ETLs.

In [3]: task.status
Out[3]: 'SUCCESS'

In [4]: task.result
Out[4]: [[('dimension', 'product', 850),
  ('dimension', 'user', 279),
  ('dimension', 'course', 30418),
  ('dimension', 'date', 2191),
  ('dimension', 'school', 1687),
  ('dimension', 'payment_source', 3),
  ('dimension', 'disability_program', 1),
  ('dimension', 'affiliate', 4),
  ('dimension', 'site', 1)],
 []]

Now to get additional confirmation about what actually ran we can check run.log in etl-logs (output destination is configurable in config.py

INFO:__main__:Dropping table product_dim
INFO:__main__:Dropped table product_dim
INFO:__main__:Creating table product_dim.
INFO:__main__:Table product_dim created.
INFO:__main__:Executing source query for dimension product_dim
INFO:__main__:Loading data
INFO:__main__:Loaded 850 rows into table product_dim (elapsed time: 0:00:00.124727)
INFO:__main__:Dropping table user_dim
INFO:__main__:Dropped table user_dim
INFO:__main__:Creating table user_dim.
INFO:__main__:Table user_dim created.
INFO:__main__:Executing source query for dimension user_dim
INFO:__main__:Loading data
INFO:__main__:Loaded 279 rows into table user_dim (elapsed time: 0:00:00.046210)
INFO:__main__:Dropping table course_dim
INFO:__main__:Dropped table course_dim
INFO:__main__:Creating table course_dim.
INFO:__main__:Table course_dim created.
INFO:__main__:Executing source query for dimension course_dim
INFO:__main__:Loading data
INFO:__main__:Loaded 30418 rows into table course_dim (elapsed time: 0:00:03.141393)
INFO:__main__:Dropping table school_dim
INFO:__main__:Dropped table school_dim
INFO:__main__:Creating table school_dim.
INFO:__main__:Table school_dim created.
INFO:__main__:Executing source query for dimension school_dim
INFO:__main__:Loading data
INFO:__main__:Loaded 1687 rows into table school_dim (elapsed time: 0:00:00.167087)