Python Utility code for building a Data Pipeline with AWS Kinesis.
See Triton
Kinesis (http://aws.amazon.com/kinesis/) lets you define streams of records. You put records in one end, and the other end can consumer them. The stream maintains the records for 24 hours. These streams come in multiple shards (defined by the adminstrator).
The tooling provided here builds on top of the boto library to make real-world work with these streams and record easier. This is preferential to using the Amazon provided KCL (Kinesis Client Library) which is Java-based, or the python bindings built on top of KCL, because it isn't very pythonic.
This tooling also provides built in support for checkpointing, which allows a client to pick up processing records wherever it stopped last. The raw kinesis libraries require the client to take care of the checkpointing process itself.
Normal AWS credential environment variables or IAM roles for boto apply.
Clients will need to define a yaml file containing definitions for the streams they will want to use. That yaml file will look like:
my_stream:
name: my_stream_v2
partition_key: value
region: us-west-1
Clients generating records will reference the my_stream
stream which will
automatically know to use the real underlying stream of my_stream_v2
in the
us-west-1
region. Records put into this stream are assumed to have a key
named value
which is use for partitioning.
Triton comes with a command line script triton
which can be used to demo some simple functionality.
$ echo 'hi' | triton put -s test_stream
And then to consume:
$ triton get -s test_stream
<Record shardId-000000000001 49551479315998225220804774498660962603757541393499684882>
{'msg': 'hi\n', 'ts': 1433969276.172019}
(Note the order is actually important here, this consumer is set to 'latest', so if your producer produces first, you might miss it.) You can set the config by using the environment variable TRITON_CONFIG, the default is /etc/triton.yaml
Adding records to the stream is easy:
import triton
c = triton.load_config("/etc/triton.yaml")
s = triton.get_stream('my_stream', c)
s.put(value='hi mom', ts=time.time())
For more advanced uses, you can record the shard and sequence number returned by the put operation.
shard, seq_num = s.put(...)
You could in theory communicate these values to some other process if you want to ensure they have received this record.
CAVEAT UTILITOR: Triton currently only supports data types directly converatible
into msgpack formated data.
Unsupported types will raise a TypeError
.
Using the producer syntax above, s.put(value='hi mom', ts=time.time())
, will block until
the operation to put the data into Kinesis is complete, which can take on the order of 100 ms.
This guarantees that the write has succeeded before continuing the flow of control.
To allow for writes that do not block, triton comes with tritond
;
a daemon that will spool Kinesis messages to local memory and write those messages to Kinesis asynchronously.
Writes via this pathway block for approximately 0.1 ms.
The tritond
spools messages to memory and writes all recieved messages to Kinesis
every 100 ms.
It is important to note that using this non-blocking pathway eliminates the guarantee
that data will be written to Kinesis.
An instance of tritond
needs be running to collect Kinesis writes.
It is recomended to run an instance on each host that will be producing Kinesis writes.
By default, tritond
will listen on 127.0.0.1:3515
or it will
respect the environment variables TRITON_ZMQ_HOST
and TRITON_ZMQ_PORT
.
The tritond
uses the same triton.yaml
files to configure triton streams;
and will log errors and skip any data if the stream is not configured
or the config file is not found.
tritond
can be run by simply calling it from the command line. For testing
and/or debugging, it can be run in verbose mode and with its output directed to stdout or a file e.g.
tritond -v --skip-kinesis # writes verbose logs and writes events to stdout
tritond -cc --skip-kinesis --output_file test_output.txt
Once tritond
is running, usage follows the basic write pattern:
import triton
c = triton.load_config("/etc/triton.yaml")
s = triton.get_nonblocking_stream('my_stream', c)
s.put(value='hi mom', ts=time.time())
Since the actual Kinesis write happens asynchronously, the shard and sequence number
are not returned from this operation.
Also, as mentioned above, Triton currently only supports data types directly converatible
into msgpack formated data.
For data put into a NonblockingStream
object, unsupported types will log an error and continue.
Writing consumers is more complicated as you must deal with sharding. Even in the lightest of workloads you'll likely want to have multiple shards. Triton makes this simple:
import triton
c = triton.load_config("/etc/triton.yaml")
s = triton.get_stream('my_stream', c)
i = s.build_iterator_from_latest()
for rec in i:
print rec.data
This will consume only new records from the stream. Since the stream in theory never ends, you can in your own process management tell it when to stop:
for rec in i:
do_stuff(rec)
if has_reason():
i.stop()
This will cause the iterator to stop fetching new data, but will flush out data that's already been fetched.
Kinesis supports other types of iterators. For example, if you want to see all the data in the stream:
i = s.build_iterator_for_all()
or if you know a specific shard and offset:
i = s.build_iterator_from_seqnum(shard_num, seq_num)
For building distributed consumers, you'll want to divide up the work by shards. So if you have 4 shards, the first worker would:
i = s.build_iterator_from_latest([0, 1])
and the second worker would do:
i = s.build_iterator_from_latest([2, 3])
Note that these are 'share numbers', not shard ids. These are indexes into the actual shard list.
Triton supports checkpointing to a DB so that processing can start where
previous processing left off. It requires a postgresDB available.
To specify the DB location, set the ENV variable TRITON_DB
to the DSN
of the postgres DB, e.g.
export TRITON_DB="dbname=db_name port=5432 host=www.dbhosting.com user=user_name password=password"
Attempting to checkpoint without this DB being configured will raise a
TritonCheckpointError
exception.
The DB also needs to have a specific table created; calling the following will initialized the table (this call is safe to repeat; it is a no-op if the table already exists):
triton.checkpoint.init_db()
Triton checkpointing also requires a unique client name, since the basic
assumption is that the checkpoint DB will be shared. The client name is specified
by the ENV variable TRITON_CLIENT_NAME
.
Attempting to checkpoint without this ENV variable will also raise a
TritonCheckpointError
exception.
Once configured, checkpointing can be used simply by calling the checkpoint
method on a stream iterator.
For example:
s = triton.get_stream('my_stream', c)
i = s.build_iterator_from_checkpoint()
for ctr in range(1):
rec = i.next()
print rec.data
i.checkpoint()
The next time this code is run, it will pick up from where the last run left off.
Triton data is typically archived to S3. Using the triton command, you can view that data:
$ triton cat --bucket=triton-data --stream=my_stream --start-date=20150715 --end-date=20150715
Or using the API, something like:
import triton
c = triton.load_config("/etc/triton.yaml")
b = triton.open_bucket("triton-data", "us-west-1")
s = triton.stream_from_s3_store(b, c['my_stream'], start_dt, end_dt)
for rec in s:
... do something ...
You should be able to configure your development environment using make:
~/python-triton $ make dev
You will likely need to install system libraries as well:
~/python-triton $ sudo apt-get install libsnappy-dev libzmq-dev
The tests should all work:
~/python-triton $ make test
.
PASSED. 1 test / 1 case: 1 passed, 0 failed. (Total test time 0.00s)
If you need to debug your application with ipython:
~/python-triton $ make shell
Python 2.7.3 (default, Apr 27 2012, 21:31:10)
Type "copyright", "credits" or "license" for more information.
IPython 0.12.1 -- An enhanced Interactive Python.
? -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help -> Python's own help system.
object? -> Details about 'object', use 'object??' for extra details.
In [1]: from project.models import Project
In [2]:
- It would probably be helpful to have some common code for building a worker process that just handles records.