Foreword - if you need help: please ask your question with tag kiba-etl on StackOverflow so that other can benefit from your contribution! I monitor this specific tag and will reply to you.
Writing reliable, concise, well-tested & maintainable data-processing code is tricky.
Kiba lets you define and run such high-quality ETL (Extract-Transform-Load) jobs, using Ruby (see supported versions).
Learn more on the Kiba blog and on StackOverflow:
- Live Coding Session - Processing data with Kiba ETL
- Rubyists - are you doing ETL unknowningly?
- How to write solid data processing code
- How to reformat CSV files with Kiba (in-depth, hands-on tutorial)
- How to explode multivalued attributes with Kiba ETL?
- Common techniques to compute aggregates with Kiba
- How to run Kiba in a Rails environment?
Consulting services: if your organization needs to leverage data processing to solve a given business problem, I'm available to help you out via consulting sessions. More information.
Kiba Pro: for more features & goodies, check out Kiba Pro (Changelog & contact info).
Kiba provides you with a DSL to define ETL jobs:
# declare a ruby method here, for quick reusable logic
def parse_french_date(date)
Date.strptime(date, '%d/%m/%Y')
end
# or better, include a ruby file which loads reusable assets
# eg: commonly used sources / destinations / transforms, under unit-test
require_relative 'common'
# declare a pre-processor: a block called before the first row is read
pre_process do
# do something
end
# declare a source where to take data from (you implement it - see notes below)
source MyCsvSource, 'input.csv'
# declare a row transform to process a given field
transform do |row|
row[:birth_date] = parse_french_date(row.fetch(:birth_date))
# return to keep in the pipeline
row
end
# declare another row transform, dismissing rows conditionally by returning nil
transform do |row|
row[:birth_date].year < 2000 ? row : nil
end
# declare a row transform as a class, which can be tested properly
transform ComplianceCheckTransform, eula: 2015
# before declaring a definition, maybe you'll want to retrieve credentials
config = YAML.load(IO.read('config.yml'))
# declare a destination - like source, you implement it (see below)
destination MyDatabaseDestination, config['my_database']
# declare a post-processor: a block called after all rows are successfully processed
post_process do
# do something
end
The combination of pre-processors, sources, transforms, destinations and post-processors defines the data processing pipeline.
Note: you are advised to store your ETL definitions as files with the extension .etl
(rather than .rb
). This will make sure you do not end up loading them by mistake from another component (eg: a Rails app).
You can use the provided command-line:
bundle exec kiba my-data-processing-script.etl
This command essentially starts a two-step process:
script_content = IO.read(filename)
# pass the filename to get for line numbers on errors
job_definition = Kiba.parse(script_content, filename)
Kiba.run(job_definition)
Kiba.parse
evaluates your ETL Ruby code to register sources, transforms, destinations and post-processors in a job definition. It is important to understand that you can use Ruby logic at the DSL parsing time. This means that such code is possible, provided the CSV files are available at parsing time:
Dir['to_be_processed/*.csv'].each do |file|
source MyCsvSource, file
end
Once the job definition is loaded, Kiba.run
will use that information to do the actual row-by-row processing. It currently uses a simple row-by-row, single-threaded processing that will stop at the first error encountered.
In Kiba, you are responsible for implementing the sources that do the extraction of data.
Sources are classes implementing:
- a constructor (to which Kiba will pass the provided arguments in the DSL)
- the
each
method (which should yield rows one by one)
Rows are usually Hash
instances, but could be other structures as long as the rest of your pipeline is expecting it.
Since sources are classes, you can (and are encouraged to) unit test them and reuse them.
Here is a simple CSV source:
require 'csv'
class MyCsvSource
def initialize(input_file)
@csv = CSV.open(input_file, headers: true, header_converters: :symbol)
end
def each
@csv.each do |row|
yield(row.to_hash)
end
@csv.close
end
end
Row transforms can implemented in two ways: as blocks, or as classes.
When writing a row transform as a block, it will be passed the row as parameter:
transform do |row|
row[:this_field] = row.fetch(:that_field) * 10
# make sure to return the row to keep it in the pipeline
row
end
NOTE: using Hash#fetch
is a good practice when working with Hash rows. It makes sure you'll get an exception if the key your program expects suddenly becomes missing.
To dismiss a row from the pipeline, simply return nil
from a transform:
transform { |row| row.fetch(:index) % 2 == 0 ? row : nil }
While you cannot call return
from a Ruby block, you can
return early from a block by using next
:
transform do |row|
# remove a row with `next`
if row.fetch(:index) % 2 == 0
next # the row will be removed from the pipeline
end
# return a modified row
if row.fetch(:index) % 3 == 0
next {great_index: row.fetch(:index) * 10}
end
# otherwise return the row as is
row
end
This is very useful to avoid nested if
statements inside a single block.
If you implement the transform as a class, it must respond to process(row)
:
class SamplingTransform
def initialize(modulo_value)
@modulo_value = modulo_value
end
def process(row)
row[:index] % @modulo_value == 0 ? row : nil
end
end
You'll use it this way in your ETL declaration (the parameters will be passed to initialize):
# only keep 1 row over 10
transform SamplingTransform, 10
Like the block form, it can return nil
to dismiss the row. The class form allows better testability and reusability across your(s) ETL script(s).
Like sources, destinations are classes that you are providing. Destinations must implement:
- a constructor (to which Kiba will pass the provided arguments in the DSL)
- a
write(row)
method that will be called for each non-dismissed row - an optional
close
method that will be called, if present, at the end of the processing (useful to tear down resources such as connections)
Here is an example destination:
require 'csv'
# simple destination assuming all rows have the same fields
class MyCsvDestination
def initialize(output_file)
@csv = CSV.open(output_file, 'w')
end
def write(row)
unless @headers_written
@headers_written = true
@csv << row.keys
end
@csv << row.values
end
def close
@csv.close
end
end
Pre-processors and post-processors are currently blocks, which get called only once per ETL run:
- Pre-processors get called before the ETL starts reading rows from the sources.
- Post-processors get invoked after the ETL successfully processed all the rows.
Note that post-processors won't get called if an error occurred earlier.
count = 0
def system!(cmd)
fail "Command #{cmd} failed" unless system(cmd)
end
file = 'my_file.csv'
sample_file = 'my_file.sample.csv'
pre_process do
# it's handy to work with a reduced data set. you can
# e.g. just keep one line of the CSV files + the headers
system! "sed -n \"1p;25706p\" #{file} > #{sample_file}"
end
source MyCsv, file: sample_file
transform do |row|
count += 1
row
end
post_process do
Email.send(supervisor_address, "#{count} rows successfully processed")
end
The way Kiba works makes it easy to create reusable, well-tested ETL components and jobs.
The main reason for this is that a Kiba ETL script can require
shared Ruby code, which allows to:
- create well-tested, reusable sources & destinations
- create macro-transforms as methods, to be reused across sister scripts
- substitute a component by another (e.g.: try a variant of a destination)
- use a centralized place for configuration (credentials, IP addresses, etc.)
The fact that the DSL evaluation "runs" the script also allows for simple meta-programming techniques, like pre-reading a source file to extract field names, to be used in transform definitions.
The ability to support that DSL, but also check command line arguments, environment variables and tweak behaviour as needed, or call other/faster specialized tools make Ruby an asset to implement ETL jobs.
Make sure to subscribe to my Ruby ETL blog where I'll demonstrate such techniques over time!
Kiba currently supports Ruby 2.0+ and JRuby (with its default 1.9 syntax). See test matrix.
Wow, you're still there? Nice to meet you. I'm Thibaut, author of Kiba.
I first met the idea of row-based syntax when I started using Anthony Eden's Activewarehouse-ETL, first published around 2006 (I think), in which Anthony applied the core principles defined by Ralph Kimball in The Data Warehouse ETL Toolkit.
I've been writing and maintaining a number of production ETL systems using Activewarehouse-ETL, then later with an ancestor of Kiba which was named TinyTL.
I took over the maintenance of Activewarehouse-ETL circa 2009/2010, but over time, I could not properly update & document it, given the gradual failure of a large number of dependencies and components. Ultimately in 2014 I had to stop maintaining it, after an already long hiatus.
That said using Activewarehouse-ETL for so long made me realize the row-based processing syntax was great and provided some great assets for maintainability on long time-spans.
Kiba is a completely fresh & minimalistic-on-purpose implementation of that row-based processing pattern.
It is minimalistic to make it more likely that I will be able to maintain it over time.
It makes strong simplicity assumptions (like letting you define the sources, transforms & destinations). MiniTest is an inspiration.
As I developed Kiba, I realize how much this simplicity opens the road for interesting developments such as multi-threaded & multi-processes processing.
Last word: Kiba is 100% sponsored by my company LoGeek SARL (also provider of WiseCash, a lightweight cash-flow forecasting app).
Copyright (c) LoGeek SARL.
Kiba is an Open Source project licensed under the terms of the LGPLv3 license. Please see http://www.gnu.org/licenses/lgpl-3.0.html for license text.
Until the API is more stable, I can only accept documentation Pull Requests.
(agreement below borrowed from Sidekiq Legal)
By submitting a Pull Request, you disavow any rights or claims to any changes submitted to the Kiba project and assign the copyright of those changes to LoGeek SARL.
If you cannot or do not want to reassign those rights (your employment contract for your employer may not allow this), you should not submit a PR. Open an issue and someone else can do the work.
This is a legal way of saying "If you submit a PR to us, that code becomes ours". 99.9% of the time that's what you intend anyways; we hope it doesn't scare you away from contributing.