Declarative Apache Flink Statefun over FastAPI
- Python ^3.9
- Poetry
- GNU Make
- Compose V2
- Run run_dev recipe on Makefile
make run_dev
- Create consumer to subcribe message on
cluster
topic. Example
from kafka import KafkaConsumer
if __name__ == "__main__":
topic_name = "cluster"
try:
consumer = KafkaConsumer(topic_name, bootstrap_servers=["localhost:9092"])
for message in consumer:
message = message.value
print(message)
except Exception as e:
print(str(e))
- Create producer to publish message on
message-topic
topic. Example
import json
from time import sleep
import pandas as pd
from kafka import KafkaAdminClient, KafkaProducer
from kafka.admin import NewTopic
if __name__ == "__main__":
data = pd.read_csv('file:///path-to-your-data-csv')
topic_name = "message-topic"
producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
try:
# Create Kafka topic
topic = NewTopic(
name=topic_name,
num_partitions=1,
replication_factor=1,
)
admin = KafkaAdminClient(bootstrap_servers="localhost:9092")
admin.create_topics([topic])
except Exception:
print(f"Topic {topic_name} is already created")
datas = data["your-str-column"]
total_data = len(datas)
for index, content in enumerate(datas):
producer.send(topic_name, value=content.encode("utf-8"), key="test".encode("utf-8"))
sleep(0.1)