A unified PubSub messaging module that supports Kafka and Google PubSub as backend.
You need to clone the repo and create a python3.7 virtualenv as follows.
$git clone https://github.com/AliAbdelaal/pubsub-lib.git
$cd pubsub-lib/
$python -m venv venv
$source venv/bin/activate
$pip install .
Create a configs.json
file for the corresponding backend that you want to use, and set an environment variable with the path of the file as follows.
$export PUBSUB_CONFIG_PATH='path/to/configs.json'
To use Kafka as a backend service, you will need to install Kafka, you can follow these blogs to install it.
- HELLO WORLD IN KAFKA USING PYTHON
- Kafka-Python explained in 10 lines of code
- Getting started with Apache Kafka in Python
Then you will need to setup Kafka configurations file as follows.
-
Kafka config file:
{ "kafka_servers": [ "localhost:9092" ] }
To use Google PubSub you will need to setup a GCP account, create a new PubSub topic and a subscription with a publishing and subscribing permissions.
You can follow this video from the creators of Google PubSub to create a topic and a subscription.
You will need to setup the gcp configurations as follows.
-
Google PubSub config file:
{ "type": "YOUR_VALUE_GOES_HERE", "project_id": "YOUR_VALUE_GOES_HERE", "private_key_id": "YOUR_VALUE_GOES_HERE", "private_key": "YOUR_VALUE_GOES_HERE", "client_email": "YOUR_VALUE_GOES_HERE", "client_id": "YOUR_VALUE_GOES_HERE", "auth_uri": "YOUR_VALUE_GOES_HERE", "token_uri": "YOUR_VALUE_GOES_HERE", "auth_provider_x509_cert_url": "YOUR_VALUE_GOES_HERE", "client_x509_cert_url": "YOUR_VALUE_GOES_HERE" }
Then you will need to export a system environment variables, GOOGLE_CLOUD_PROJECT
which will include the project-ID.
Also working with GCP as backend you will need to provide an additional argument gcp_subscription_id
to the PubSubFactory.create_consumer()
which is the subscription-id.
export GOOGLE_CLOUD_PROJECT='PROJECT_ID'
The library supports sending and receiving messages in bytes format with an optional string key, here is an example for a producer that sends images to a topic and a consumer that saves these images.
You will need to install Pillow in order to try this example using
pip install Pillow
# import necessary libs
import io
from PIL import Image
from PubSub import PubSubFactory
# initiate a pubsub object with the backend that you would like to use ['kafka', 'gcp']
pubsub = PubSubFactory('gcp')
# if you have a topic named: images-topic
producer = pubsub.create_producer("images-topic")
try:
while True:
# keep on accepting key, value pairs of keys and it's corresponding image path
key, value = input("enter key, image path:").strip().split(",")
# read the image and encode it in bytes
image = Image.open(value)
with io.BytesIO() as output:
image.save(output, image.format)
# send the image via the publisher
producer.push_msg(output.getvalue(), key=key)
except KeyboardInterrupt:
print("Okay man, shutting down !")
# import libraries
import io
import os
from datetime import datetime
from PIL import Image
from PubSub import PubSubFactory
# Create callback function that accepts key and value
def callback(key:str, value:bytes):
# create a download file to save the images in
if not os.path.exists('downloads'):
os.mkdir("downloads")
image = Image.open(io.BytesIO(value))
file_name = f'{datetime.now().isoformat().replace(":", "-").split(".")[0]}.{image.format}'
# Save the image
image.save(os.path.join("downloads", file_name))
print(f"I got an Image !\n\tkey: {key}\n\tsaved to: {file_name}")
# initiate a pubsub object with the backend that you would like to use ['kafka', 'gcp']
pubsub = PubSubFactory('gcp')
# if you have a topic named: images-topic
pubsub.create_consumer("images-topic", callback)
# the consumer will run in a separate thread in the background.
print("A new consumer is running.")