Showing off the AI model capabilities available in Flink SQL on Confluent Cloud.
Quickly interact with or train AI models in 3 easy steps with Flink AI
confluent flink connection create openai-connection \
--cloud GCP \
--region us-central1 \
--type openai \
--endpoint https://api.openai.com/v1/chat/completions \
--api-key <your-api-key>
CREATE MODEL my_model WITH ('provider' = 'openai', 'model_id' = 'gpt-3')
SELECT ML_PREDICT(my_model, input) FROM my_stream
Sorry, this README will be written from a MacOS perspective.
You can use your existing Confluent Cloud account if you know you have access to Flink, or you can create a trial account with free credits.
If given the option to name your environment, we used flink_ai_sandbox
.
Choosing the same name will make it easier to follow along.
In order to run these demos you'll need the Confluent CLI for the very first step or be good friends with the admin who has access to run the one needed command to establish the connection to the AI APIs you plan to use.
You can install with Homebrew which should work well enough for this demo.
If you plan on doing more with the Kafka CLI, I recommend installing Confluent Platform. This comes with all of the Kafka CLI commands in addition to the Confluent CLI.
⚠️ CaveatConfluent Platform contains an old version of Confluent CLI. You will need to download the latest version of Confluent CLI and replace the version Confluent Platform comes with.
Run confluent --version
to check if you have version 4.7+. If not,
follow their Quick Start guide
to download the CLI to a new directory; copying it to an isolated directory will just make life easier.
After you have downloaded and extracted the Confluent CLI,
you can copy the confluent
file from your new /bin
to the Confluent Platform /bin
which should already be in your path. You have now upgraded your version of confluent
.
Check the version to verify.
Follow the Install Confluent CLI guides.
Flink AI allows you to interact with many different AI platforms.
For this demo we'll be using OpenAI with their GPT chat APIs.
The demo also assumes Flink is running on GCP in a specific region.
You'll need to change according to your Flink deployment.
The very first thing we'll need to do is to create the Flink Connection.
This is what allows Flink to communicate with your AI model hosted on another platform.
You'll need to login to your Confluent Cloud account. This command also saves your login details.
confluent login --save
The confluent flink connection create
command (docs) is what we'll use to create this connection.
This command supplies Flink with the API key and service endpoint that it will interact with.
The cloud
and region
flags are actually where your Flink cluster is running.
confluent flink connection create \
openai-chat-completions-connection \
--cloud gcp \
--region us-central1 \
--type openai \
--endpoint https://api.openai.com/v1/chat/completions \
--api-key <YOUR API KEY HERE>
Note that each connection has an endpoint
,
so if there are multiple endpoints you want to interact with,
you'll need to create multiple connections.
As such, we chose to name this connection openai-chat-completions-connection
to reflect the endpoint that it was targeted towards.
The cloud
and region
will change depending on where your Confluent Cloud is deployed.
The type
and endpoint
will change depending on your model.
And AWS connections such as Bedrock and SageMaker will need to also supply
an aws-access-key
and aws-secret-key
.
cloud
- aws
- azure
- gcp
type
- openai
- azureml
- azureopenai
- bedrock
- sagemaker
- googleai
- vertexai
- mongodb
- elastic
- pinecone
The Confluent Run an AI Model documentation provides some additional examples.
Login to Confluent Cloud, then navigate to "Stream Processing" on the left side.
For this demo, we selected Try example
so Confluent Cloud will generate some initial data
for us to work with. We'll use this later in our advanced example.
The Basics - Create a Model (Docs)
CREATE MODEL sentiment_bot
INPUT (text STRING)
OUTPUT (sentiment STRING)
COMMENT 'Determines the sentiment as one of 3 values'
WITH (
'provider' = 'openai',
'task' = 'classification',
'openai.connection' = 'openai-chat-completions-connection',
'openai.client_timeout' = '120',
'openai.model_version' = 'gpt-3.5-turbo',
'openai.system_prompt' = 'Analyze the sentiment of the text and return only POSITIVE, NEGATIVE, or NEUTRAL.'
);
The first thing you'll want to do in order to make use of your AI endpoint connection is to wrap it in a Flink Model. This defines the inputs and outputs of your interaction, the type of interaction you'll be performing, and the prompt or other params your model needs. This is what enables the real-time prediction and inference in your Flink environment.
This example comes almost directly from the Confluent docs.
Task Types (docs)
- classification
- clustering
- embedding
- regression
- text_generation
The WTIH
options are where you will dive a lot of functionality.
Read the docs.
Running CREATE MODEL my-model
several times will just end up creating new versions each time.
Unless you alter the model, the latest versions will not be the default version.
You can use a specific version by referring to the version number e.g. my-model$2
for version 2
.
Or you can update the model's default version:
ALTER MODEL my-model SET ('default_version'='2');
See more details, including how to delete your model or specific versions, in the docs.
Now that we have a Flink model available to us, we want to utilize it for things like model inference in our streaming queries.
SELECT id, text, translation FROM text_stream, LATERAL TABLE(ML_PREDICT('translation-model', text));
ML_PREDICT
is a new function added to Flink that runs against remote AI/ML models for prediction, classification, and text generation tasks.
-- Model evaluation with all versions
SELECT ML_EVALUATE(`my_model$all`, f1, f2) FROM `eval_data`;
ML_EVALUATE
is a new function
that enables benchmarking of your models to see how they compare against your streaming datasets.
This diagram from the Confluent blog shows how federated_search()
will be used to retrive content from a vector store.
https://www.confluent.io/blog/flinkai-realtime-ml-and-genai-confluent-cloud/
Invoking remote models is an important first step in enabling shift-left principles within the data streaming platform on Confluent Cloud. Flink on Confluent Cloud can easily rank, score, and rate event properties within data streams, resulting in clean data at the source. This is a fundamental principle while building modern data platforms and will greatly increase the value of information extracted from such platforms.
https://www.confluent.io/blog/mastering-real-time-retrieval-augmented-generation-rag-with-flink/
With AI model inference in Flink SQL, Confluent allows you to simplify the development and deployment of RAG-enabled GenAI applications by providing a unified platform for both data processing and AI tasks. By tapping into real-time, high-quality, and trustworthy data streams, you can augment the LLM with proprietary and domain-specific data using the RAG pattern and enable your LLM to deliver the most reliable, accurate responses.