/kiner

Python AWS Kinesis Producer with error handling and thread support.

Primary LanguagePythonMIT LicenseMIT

A simple Python AWS Kinesis Producer.

    

Build Status PyPI version License

Features

  • Error handling and retrying with exponential backoff
  • Automatic batching and flush callbacks
  • Threaded execution

Inspired by the AWS blog post Implementing Efficient and Reliable Producers with the Amazon Kinesis Producer Library.

Installation

You can use pip to install Kiner.

pip install kiner

Usage

To use Kiner, you'll need to have AWS authentication credentials configured as stated in the boto3 documentation

from kiner.producer import KinesisProducer

p = KinesisProducer('stream-name', batch_size=500, max_retries=5, threads=10)

for i in range(10000):
    p.put_record(i)

p.close()

To be notified when data is flushed to AWS Kinesis, provide a flush_callback

from uuid import uuid4
from kiner.producer import KinesisProducer

def on_flush(count, last_flushed_at, Data=b'', PartitionKey='', Metadata=()):
    print(f"""
        Flushed {count} messages at timestamp {last_flushed_at}
        Last message was {Metadata['id']} paritioned by {PartitionKey} ({len(Data)} bytes)
    """)

p = KinesisProducer('stream-name', flush_callback=on_flush)

for i in range(10000):
    p.put_record(i, metadata={'id': uuid4()}, partition_key=f"{i % 2}")

p.close()

Contributions