About
A collection of random transforms for the Apache beam python SDK . Many are simple transforms. The most useful ones are those for reading/writing from/to relational databases.
Installation
- Using pip
pip install beam-nuggets
- From source
git clone git@github.com:mohaseeb/beam-nuggets.git
cd beam-nuggets
pip install .
Supported transforms
IO
- relational_db.ReadFromDB for reading from relational database tables.
- relational_db.Write
for writing to relational database tables.
Above transforms uses SqlAlchemy to communicate with the database, and hence they can read from and write to all relational databases supported by SqlAlchemy. The transforms are tested against PostgreSQL, MySQL and SQLite. - kafkaio.KafkaProduce for writing to Kafka topics.
- kafkaio.KafkaConsume for consuming from kafka topics.
- csvio.Read for reading CSV files.
Others
- SelectFromNestedDict Selects a subset from records formed of nested dictionaries.
- ParseJson
- AssignUniqueId
Documentation
See here.
Usage
Write data to an SQLite table using beam-nugget's relational_db.Write transform.
# write_sqlite.py contents
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
records = [
{'name': 'Jan', 'num': 1},
{'name': 'Feb', 'num': 2}
]
source_config = relational_db.SourceConfiguration(
drivername='sqlite',
database='/tmp/months_db.sqlite',
create_if_missing=True # create the database if not there
)
table_config = relational_db.TableConfiguration(
name='months',
create_if_missing=True, # automatically create the table if not there
primary_key_columns=['num'] # and use 'num' column as primary key
)
with beam.Pipeline(options=PipelineOptions()) as p: # Will use local runner
months = p | "Reading month records" >> beam.Create(records)
months | 'Writing to DB' >> relational_db.Write(
source_config=source_config,
table_config=table_config
)
Execute the pipeline
python write_sqlite.py
Examine the contents
sqlite3 /tmp/months_db.sqlite 'select * from months'
# output:
# 1.0|Jan
# 2.0|Feb
To write the same data to a PostgreSQL table instead, just create a suitable relational_db.SourceConfiguration as follows.
source_config = relational_db.SourceConfiguration(
drivername='postgresql+pg8000',
host='localhost',
port=5432,
username='postgres',
password='password',
database='calendar',
create_if_missing=True # create the database if not there
)
Click here
for more examples, including writing to PostgreSQL in Google Cloud Platform
using the DataFlowRunner.
An example showing how you can use beam-nugget's relational_db.ReadFromDB
transform to read from a PostgreSQL database table.
from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db
with beam.Pipeline(options=PipelineOptions()) as p:
source_config = relational_db.SourceConfiguration(
drivername='postgresql+pg8000',
host='localhost',
port=5432,
username='postgres',
password='password',
database='calendar',
)
records = p | "Reading records from db" >> relational_db.ReadFromDB(
source_config=source_config,
table_name='months',
query='select num, name from months' # optional. When omitted, all table records are returned.
)
records | 'Writing to stdout' >> beam.Map(print)
See here for more examples.
Development
- Install
git clone git@github.com:mohaseeb/beam-nuggets.git
cd beam-nuggets
export BEAM_NUGGETS_ROOT=`pwd`
pip install -e .[dev]
- Make changes on dedicated dev branches
- Run tests
cd $BEAM_NUGGETS_ROOT
python -m unittest discover -v
- Generate docs
cd $BEAM_NUGGETS_ROOT
docs/generate_docs.sh
- Create a PR against master.
- After merging the accepted PR and updating the local master, upload a new build to pypi.
cd $BEAM_NUGGETS_ROOT
scripts/build_test_deploy.sh
Backlog
- versioned docs?
- Summarize the investigation of using Source/Sink Vs ParDo(and GroupBy) for IO
- more nuggets: WriteToCsv
- Investigate readiness of SDF ParDo, and possibility to use for relational_db.ReadFromDB
- integration tests
- DB transforms failures handling on IO transforms
- more nuggets: Elasticsearch, Mongo
- WriteToRelationalDB, logging
Contributions by
mohaseeb, astrocox, 2514millerj, alfredo, shivangkumar
Licence
MIT