/quix-streams

Quix Streams - A library for data streaming and Python Stream Processing

Primary LanguagePythonApache License 2.0Apache-2.0

Quix - React to data, fast

Quix on Twitter The Stream Community Slack Linkedin Events YouTube Docs Roadmap

Quix Streams 2.0 (alpha)

IMPORTANT: Quix Streams v2.0 is currently in alpha and will likely experience rapid and significant interface changes, feature additions, and bugfixes.
Use with discretion!

The documentation for Quix Streams <2.0 can be found here

Quix Streams 2.0 is a cloud native library for processing data in Kafka using pure Python. It’s designed to give you the power of a distributed system in a lightweight library by combining the low-level scalability and resiliency features of Kafka with an easy to use Python interface.

Quix Streams has the following benefits:

  • No JVM, no orchestrator, no server-side engine.
  • Easily integrates with the entire Python ecosystem (pandas, scikit-learn, TensorFlow, PyTorch etc).
  • Support for many serialization formats, including JSON (and Quix-specific).
  • Support for stateful operations using RocksDB.
  • A simple framework with Pandas-like interface to ease newcomers to streaming.
  • "At-least-once" Kafka processing guarantees.
  • Designed to run and scale resiliently via container orchestration (like Kubernetes).
  • Easily runs locally and in Jupyter Notebook for convenient development and debugging.
  • Seamless integration with the Quix platform.

Use Quix Streams to build event-driven, machine learning/AI or physics-based applications that depend on real-time data from Kafka.

Getting started πŸ„

Install Quix Streams

Quix Streams 2.0 (currently in alpha)

To install the latest alpha version of Quix Streams 2.0:

python -m pip install --pre quixstreams

or

python -m pip install "quixstreams>=2.0a"

Requirements

Python 3.8+, Apache Kafka 0.10+

See requirements.txt for the full list of requirements

Example Application

Here's an example of how to process data from a Kafka Topic with Quix Streams:

from quixstreams import Application, State

# Define an application
app = Application(
    broker_address="localhost:9092",  # Kafka broker address
    consumer_group="consumer-group-name",  # Kafka consumer group
)

# Define the input and output topics. By default, "json" serialization will be used
input_topic = app.topic("my_input_topic")
output_topic = app.topic("my_output_topic")


def count(data: dict, state: State):
    # Get a value from state for the current Kafka message key
    total = state.get('total', default=0)
    total += 1
    # Set a value back to the state
    state.set('total', total)
    # Update your message data with a value from the state
    data['total'] = total


# Create a StreamingDataFrame instance
# StreamingDataFrame is a primary interface to define the message processing pipeline
sdf = app.dataframe(topic=input_topic)

# Print the incoming messages
sdf = sdf.update(lambda value: print('Received a message:', value))

# Select fields from incoming messages
sdf = sdf[["field_1", "field_2", "field_3"]]

# Filter only messages with "field_0" > 10 and "field_2" != "test"
sdf = sdf[(sdf["field_1"] > 10) & (sdf["field_2"] != "test")]

# Filter messages using custom functions
sdf = sdf[sdf.apply(lambda value: 0 < (value['field_1'] + value['field_3']) < 1000)]

# Generate a new value based on the current one
sdf = sdf.apply(lambda value: {**value, 'new_field': 'new_value'})

# Update a value based on the entire message content
sdf['field_4'] = sdf.apply(lambda value: value['field_1'] + value['field_3'])

# Use a stateful function to persist data to the state store and update the value in place
sdf = sdf.update(count, stateful=True)

# Print the result before producing it
sdf = sdf.update(lambda value, ctx: print('Producing a message:', value))

# Produce the result to the output topic 
sdf = sdf.to_topic(output_topic)

if __name__ == "__main__":
    # Run the streaming application 
    app.run(sdf)

How It Works

There are two primary components:

  • StreamingDataFrame - a predefined declarative pipeline to process and transform incoming messages.
  • Application - to manage the Kafka-related setup & teardown and message lifecycle (consuming, committing). It processes each message with the dataframe you provide it.

Under the hood, the Application will:

  • Consume a message.
  • Deserialize it.
  • Process it with your StreamingDataFrame.
  • Produce it to the output topic.
  • Automatically commit the topic offset and state updates after the message is processed.
  • React to Kafka rebalancing updates and manage the topic partitions.
  • Manage the State store.
  • Handle OS signals and gracefully exit the application.

More Examples

You may find more examples in the examples folder here.

Advanced Usage

For more in-depth description of Quix Streams components, please follow these links:

Using the Quix Platform - Application.Quix()

This library doesn't have any dependency on any commercial products, but if you use it together with Quix SaaS Platform you will get some advantages out of the box during your development process such as:

  • Auto-configuration.
  • Monitoring.
  • Data explorer.
  • Data persistence.
  • Pipeline visualization.
  • Metrics.

and more.

Quix Streams provides a seamless integration with Quix Platform via Application.Quix() class. This class will automatically configure the Application using Quix SDK Token.
If you are running this within the Quix platform it will be configured automatically.
Otherwise, please see Quix Platform Configuration.

What's Next

This library is being actively developed.

Here are some of the planned improvements:

  • State recovery based on Kafka changelog topics
  • Stateful Windowing: tumbling, hopping, and sliding windows
  • Group-bys and joins (for merging topics/keys)
  • Support for "exactly-once" Kafka processing (aka transactions)
  • Other serialization support like Avro and Protobuf
  • Schema Registry support

To find out when the next version is ready, make sure you watch this repo and join our Quix Community on Slack!

Contribution Guide

Contributing is a great way to learn and we especially welcome those who haven't contributed to an OSS project before.
We're very open to any feedback or code contributions to this OSS project ❀️.

Before contributing, please read our Contributing file for how you can best give feedback and contribute.

Need help?

If you run into any problems, please create an issue or ask in #quix-help in our Quix Community on Slack.

Community πŸ‘­

Join other software engineers in The Stream, an online community of people interested in all things data streaming. This is a space to both listen to and share learnings.

πŸ™Œ Join our Slack community!

License

Quix Streams is licensed under the Apache 2.0 license. View a copy of the License file here.

Stay in touch πŸ‘‹

You can follow us on Twitter and Linkedin where we share our latest tutorials, forthcoming community events and the occasional meme.

If you have any questions or feedback - write to us at support@quix.io!