IMPORTANT: Quix Streams v2.0 is currently in alpha and will likely experience rapid and significant interface changes, feature additions, and bugfixes.
Use with discretion!The documentation for Quix Streams <2.0 can be found here
Quix Streams 2.0 is a cloud native library for processing data in Kafka using pure Python. Itβs designed to give you the power of a distributed system in a lightweight library by combining the low-level scalability and resiliency features of Kafka with an easy to use Python interface.
Quix Streams has the following benefits:
- No JVM, no orchestrator, no server-side engine.
- Easily integrates with the entire Python ecosystem (pandas, scikit-learn, TensorFlow, PyTorch etc).
- Support for many serialization formats, including JSON (and Quix-specific).
- Support for stateful operations using RocksDB.
- A simple framework with Pandas-like interface to ease newcomers to streaming.
- "At-least-once" Kafka processing guarantees.
- Designed to run and scale resiliently via container orchestration (like Kubernetes).
- Easily runs locally and in Jupyter Notebook for convenient development and debugging.
- Seamless integration with the Quix platform.
Use Quix Streams to build event-driven, machine learning/AI or physics-based applications that depend on real-time data from Kafka.
To install the latest alpha version of Quix Streams 2.0:
python -m pip install --pre quixstreams
or
python -m pip install "quixstreams>=2.0a"
Python 3.8+, Apache Kafka 0.10+
See requirements.txt for the full list of requirements
Here's an example of how to process data from a Kafka Topic with Quix Streams:
from quixstreams import Application, State
# Define an application
app = Application(
broker_address="localhost:9092", # Kafka broker address
consumer_group="consumer-group-name", # Kafka consumer group
)
# Define the input and output topics. By default, "json" serialization will be used
input_topic = app.topic("my_input_topic")
output_topic = app.topic("my_output_topic")
def count(data: dict, state: State):
# Get a value from state for the current Kafka message key
total = state.get('total', default=0)
total += 1
# Set a value back to the state
state.set('total', total)
# Update your message data with a value from the state
data['total'] = total
# Create a StreamingDataFrame instance
# StreamingDataFrame is a primary interface to define the message processing pipeline
sdf = app.dataframe(topic=input_topic)
# Print the incoming messages
sdf = sdf.update(lambda value: print('Received a message:', value))
# Select fields from incoming messages
sdf = sdf[["field_1", "field_2", "field_3"]]
# Filter only messages with "field_0" > 10 and "field_2" != "test"
sdf = sdf[(sdf["field_1"] > 10) & (sdf["field_2"] != "test")]
# Filter messages using custom functions
sdf = sdf[sdf.apply(lambda value: 0 < (value['field_1'] + value['field_3']) < 1000)]
# Generate a new value based on the current one
sdf = sdf.apply(lambda value: {**value, 'new_field': 'new_value'})
# Update a value based on the entire message content
sdf['field_4'] = sdf.apply(lambda value: value['field_1'] + value['field_3'])
# Use a stateful function to persist data to the state store and update the value in place
sdf = sdf.update(count, stateful=True)
# Print the result before producing it
sdf = sdf.update(lambda value, ctx: print('Producing a message:', value))
# Produce the result to the output topic
sdf = sdf.to_topic(output_topic)
if __name__ == "__main__":
# Run the streaming application
app.run(sdf)
There are two primary components:
StreamingDataFrame
- a predefined declarative pipeline to process and transform incoming messages.Application
- to manage the Kafka-related setup & teardown and message lifecycle (consuming, committing). It processes each message with the dataframe you provide it.
Under the hood, the Application
will:
- Consume a message.
- Deserialize it.
- Process it with your
StreamingDataFrame
. - Produce it to the output topic.
- Automatically commit the topic offset and state updates after the message is processed.
- React to Kafka rebalancing updates and manage the topic partitions.
- Manage the State store.
- Handle OS signals and gracefully exit the application.
You may find more examples in the
examples
folder here.
For more in-depth description of Quix Streams components, please follow these links:
- StreamingDataFrame.
- Serialization.
- Stateful Processing.
- Usage with Quix SaaS Platform.
- Upgrading from Quix Streams <2.0.
Using the Quix Platform - Application.Quix()
This library doesn't have any dependency on any commercial products, but if you use it together with Quix SaaS Platform you will get some advantages out of the box during your development process such as:
- Auto-configuration.
- Monitoring.
- Data explorer.
- Data persistence.
- Pipeline visualization.
- Metrics.
and more.
Quix Streams provides a seamless integration with Quix Platform via Application.Quix()
class.
This class will automatically configure the Application using Quix SDK Token.
If you are running this within the Quix platform it will be configured
automatically.
Otherwise, please see
Quix Platform Configuration.
This library is being actively developed.
Here are some of the planned improvements:
- State recovery based on Kafka changelog topics
- Stateful Windowing: tumbling, hopping, and sliding windows
- Group-bys and joins (for merging topics/keys)
- Support for "exactly-once" Kafka processing (aka transactions)
- Other serialization support like Avro and Protobuf
- Schema Registry support
To find out when the next version is ready, make sure you watch this repo and join our Quix Community on Slack!
Contributing is a great way to learn and we especially welcome those who haven't contributed to an OSS project before.
We're very open to any feedback or code contributions to this OSS project β€οΈ.
Before contributing, please read our Contributing file for how you can best give feedback and contribute.
If you run into any problems, please create an issue or ask in #quix-help in our Quix Community on Slack.
Join other software engineers in The Stream, an online community of people interested in all things data streaming. This is a space to both listen to and share learnings.
π Join our Slack community!
Quix Streams is licensed under the Apache 2.0 license. View a copy of the License file here.
You can follow us on Twitter and Linkedin where we share our latest tutorials, forthcoming community events and the occasional meme.
If you have any questions or feedback - write to us at support@quix.io!