🤖 An Apache Beam Sink Library for Databases and other Sinks
🐘 Supports MySQL (Postgres, Elasticsearch coming soon...)
pip install beam_sink
import apache_beam as beam
from beam_sink import ReadMySQL, WriteToMySQL, MySQLConfig
import json
# First, we initialise a DB config object that can validate we're providing the right information
config = MySQLConfig(host="localhost", username="lenny", password="karl", database="springfield")
# Then we write a query
query = "select * from thrillhouse"
# Initialise your Beam Pipeline the way you normally would
with beam.Pipeline() as p:
(
p
| 'ReadTable' >> ReadMySQL(config, query)
| 'PrintResult' >> beam.ParDo(lambda x: print(x))
)
# We can also insert data into a table
table = "thrillhouse"
columns = ["id", "description", "amount"]
with beam.Pipeline() as p:
(
p
| 'ReadJson' >> beam.io.ReadFromText("tests/.data/test.jsonl")
| 'Parse' >> beam.Map(lambda x: [json.loads(x)])
| 'WriteData' >> WriteToMySQL(config, table, columns)
)
| Other operations such as Upsert or DB commands coming soon...