Unified Kafka Module
This module aims to simplify the usage of Apache Kafka via Python code.
Consumer
def handle_json_message_data(msg_value):
# this is the function to process the message value
print(msg_value)
KafkaConsumerThread(
topic="myTopic",
broker_address='127.0.1.1',
handle_json_message_data=True # if json messages have been passed,
run_as_separate_thread=False # True if you want to run the consumer in a separate thread
).start_consumer(handle_json_message_data)
KafkaConsumerRestThread(
topic="test",
server_address="myhost:8080/kafka-rest-proxy",
run_as_separate_thread=True
).start_consumer(print)
Producer
KafkaProducer(
broker_address='127.0.1.1' # kafka broker,
).publish_message('topic', <a JSON serializable object>)
data = [dict(element="1", ee=3) for _ in range(1000)]
KafkaRestProducer(
rest_proxy_address="http://yourhost:8080/kafka-rest-proxy")\
.publish_messages("test", data, parallel_processes=2, message_list_size=2)