This repo shows 2 microservices written in .NET 8 that produce and consume messages from an Apache Kafka® cluster.
It follows this Confluent guide for the code and few other resources for the lessons.
Apache Kafka is an event streaming platform used to collect, store and process real time data streams at scale.
It has numerous use cases, including distributed logging, stream processing and Pub-Sub Messaging.
![image](https://private-user-images.githubusercontent.com/30603497/304163909-19978d3c-3f9c-4474-9a3b-995b5ea437ed.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQxNjM5MDktMTk5NzhkM2MtM2Y5Yy00NDc0LTlhM2ItOTk1YjVlYTQzN2VkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTcyZTU4NWI3N2JjNDBjODc4NzVhMTIyMTYxOTNkODRkYWE3NGFkZGRkNTJiMTIyYTQ1YTBmZjQ2ZmM5ZDIyMGQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.xXodONAsShmz4lpJOUGVb6zqJC9ccPnuh1NTyv6cShs)
- Data streaming with Apache Kafka
- Kafka 101
- How Kafka works(Great!)
- confluent kafka dotnet examples
- Apache Kafka for .NET developers(Great!)
- Kafka Visualization(Great!)
An event is any type of action, incident, or change that's identified or recorded by software or applications. For example, a payment, a website click, or a temperature reading, along with a description of what happened.
Kafka encourages you to see the world as sequences of events, which it models as key-value pairs.
Events are immutable, as it is (sometimes tragically) impossible to change the past.
Because the world is filled with so many events, Kafka gives us a means to organize them and keep them in order: topics.
A topic is an ordered log of events.
Topics are properly logs, not queues; they are durable, replicated, fault-tolerant records of the events stored in them.
The simplicity of the log as a data structure and the immutability of the contents in it are keys to Kafka's success as a critical component in modern data infrastructure.
Partitioning takes the single topic log (remember topic is just a log), and breaks it into multiple logs each of which can live on a separate node in the Kafka cluster.
![image](https://private-user-images.githubusercontent.com/30603497/304197223-d54600ad-49c8-4a4b-9983-33b4f366600c.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQxOTcyMjMtZDU0NjAwYWQtNDljOC00YTRiLTk5ODMtMzNiNGYzNjY2MDBjLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTMxYWFkYmU5YzVkNTJlYzcxZTQ2MjNmODgwM2IxNWIyOWM0YTQwZDY4MDA5ZTcwOTZiOThmNjM5MjYzOGM4MzYmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.JuePIsRVIha2iU5Zm_pG8XmWxrWCcDMr7oCt9SfXmg8)
Deciding which messages to write to which partition:
-
If the message has no key (remember event is key-value pair)
The messages are distributed round-robin among the topic partitions.
-
If the message has key
Use that key to figure out which partition to put the message into.
We run that key through a hash function, take that output and mod the number of partitions and the resulting number is just the partition number to write to.
It guarantees that messages having the same key always land in the same partition and therefore are always in order.
Kafka is distributed data infrastructure, which implies that there is some kind of node that can be duplicated across a network such that the collection of all of those nodes functions together as a single Kafka cluster.
That node is called a broker.
A broker can run on bare metal hardware, a cloud instance, in a container managed by Kubernetes, in Docker on your laptop, or wherever JVM processes can run.
Kafka brokers are intentionally kept very simple, maintaining as little state as possible. They are responsible for writing new events to partitions, serving reads on existing partitions, and replicating partitions among themselves.
They don’t do any computation over messages or routing of messages between topics.
Kafka brokers are servers with special jobs to do: managing the load balancing, replication, and stream decoupling within the Kafka cluster.
In Kafka, data (individual pieces of data are events) are stored within logical groupings called topics. Those topics are split into partitions, the underlying structure of which is a log.
![image](https://private-user-images.githubusercontent.com/30603497/304195204-23ac25ca-e062-4c98-bad3-ef3d50f1a1bf.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQxOTUyMDQtMjNhYzI1Y2EtZTA2Mi00Yzk4LWJhZDMtZWYzZDUwZjFhMWJmLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTYwYjI4NjUxZGRkY2VmZDE5YWQyNjE2MjI0ODhkZmFlMTYzM2YwZTFlMDNiODJkNjI0ZTU3NWI0YTU0NDBmMDgmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.erx7LkKbgP8-p67om42m4IZED9wCg16iK5I6BMkDHuo)
Imagine running those broker 1,2,3 pods inside a Kubernetes cluster. Now you have Kafka cluster inside a Kubernetes cluster!
Offset represents the position of a record within a partition of a topic, similar to how an index works in an array. Any particular message in a topic partition is identified by this unique offset.
![image](https://private-user-images.githubusercontent.com/30603497/306134224-5c2a7c9a-2374-4c07-ab33-fc8468fecf03.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDYxMzQyMjQtNWMyYTdjOWEtMjM3NC00YzA3LWFiMzMtZmM4NDY4ZmVjZjAzLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWYwY2E4N2RjZmYzY2EwOGVlMWU0YzA2YmRkYWU0NmE2YWM0Zjc2NzlhNzliZmYyZjhkZGIxZWQ4OGNmN2I4NWEmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.tZtpTdFdbCASoOQslEkT_zxwjZXK254Hqrd3Ixgq91I)
The concept of committed comes only when there is a consumer group.
The committed offset points to the next message that will be processed in the future.
For eg: When you commit offset 3, you're telling Kafka that your consumer has successfully processed the record upto offset 2 and is ready to consume the record at offset 3.
Each partition of a Kafka topic has its own set of offsets, which indicate the last message that was successfully processed by the consumer group for that partition.
So in essence, "committing an offset" is indicating that we've successfully processed all records up to that point.
A cluster in Kafka is a group of servers (nodes) working together for three reasons:
- Speed (low latency)
Several data streams can be processed by separate servers, which decreases the latency of data delivery. - Durability
Data is replicated across multiple servers, so if one fails, another server has the data backed up. - Scalability
Kafka also balances the load across multiple servers to provide scalability.
Kafka provides replicated storage of topic partitions.
A single node (also known as a broker) can be both a leader for some partitions and a follower for others.
Each node in a cluster has one of two roles when it comes to replication: leader or follower. Followers stay in sync with the leader in order to keep up to date on the newest data.
The diagram below shows what might happen if you had a Topic A with a replication factor of two (N). This means that the leader node is responsible for the first instance of each partition, and the follower node ensures that it is replicated on each server.
![image](https://private-user-images.githubusercontent.com/30603497/304219597-dc4ae999-e0a0-4f07-ba60-e78f5618e117.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQyMTk1OTctZGM0YWU5OTktZTBhMC00ZjA3LWJhNjAtZTc4ZjU2MThlMTE3LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWI4M2RiNGYwMWUwODhjYjEzMjlhMjA5ZmI2YjQ2NmVhY2U1ZTFiMTQ3YTg0Yzc0MTM3MDc4Y2Y5ZjE1ZjQwYjImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.cx3cu52OswOU9LhhHyod5Us0oIAPHg9nYAH_uw7b2Z8)
broker1 is the leader of Partition 1 and and broker 2 is follower of Partition 1.
There's 1 lead partition and N-1 followers. N is the replication factor.
For example, I created this one for putting the hands on exercises.
Ashishs-MacBook-Pro:dotnet-kafka akhanal$ dotnet new sln
![image](https://private-user-images.githubusercontent.com/30603497/304234430-e309d0df-7de6-4fce-a199-467cdcd5bf50.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQyMzQ0MzAtZTMwOWQwZGYtN2RlNi00ZmNlLWExOTktNDY3Y2RjZDViZjUwLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTdiZTBhYmJmOTM4YTMwYzc1MmVmM2E3YWQ2ZGRmNmFkYTYwYWI4MjAxYWU5NGMyYTIxZjFjZjc3N2U1N2E2NDEmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.YrNLQ0h-g1Mp9cg44dxW2Bi083uC1CiKuzOgyhTEAaI)
![image](https://private-user-images.githubusercontent.com/30603497/304234878-25394bc5-984c-4eaf-a071-3cb20a8fec59.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQyMzQ4NzgtMjUzOTRiYzUtOTg0Yy00ZWFmLWEwNzEtM2NiMjBhOGZlYzU5LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPThiM2ZjY2IwYzQyNzM4NWI4OWUzOTZmMWZjMjkyMTdlZmE5ZGQ0YjY2M2JiNmNiMjlkNGI0ZGY4ZmE2YmM5ODQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0._Mv4tPSilpavGwy93idAYm-jAbsTEcfxD3qoGeOZuiw)
Install Microsoft.Extensions.Hosting
package.
Add appsettings.json
, and set these options:
- Build action: Content
- Copy to output directory: Copy if newer
And use it
dotnet-kafka/Consumer/Program.cs
Lines 4 to 10 in 7313b07
dotnet user-secrets init
This command adds a UserSecretsId
element, populated with a GUID, to the .csproj
file.
If you want the Producer to also access secrets pointed by this Id, copy this element into the Producer's project file as well.
Now you can store API keys and secrets in there without it being checked into source control.
Right click the project -> Tools -> .NET User Secrets
![image](https://private-user-images.githubusercontent.com/30603497/304541579-507002f2-4517-45f6-b285-8b87a30e981f.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ1NDE1NzktNTA3MDAyZjItNDUxNy00NWY2LWIyODUtOGI4N2EzMGU5ODFmLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTY1NWNmMTg5ODBiYzFjNjFhNTM4MDNiYTQ2NDk5MzgwMGM1ZGE2MzJjNTc5ODljYzhiYTUwNDMxMTUzNGFjOTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.NKvPXLp2uKdz_yDctbKdkUOc3WsLO3rc76OuoTAJQq0)
Put your secrets here
![image](https://private-user-images.githubusercontent.com/30603497/304892034-9168a6f6-5fc2-4a64-82ac-6db5ff2f3262.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ4OTIwMzQtOTE2OGE2ZjYtNWZjMi00YTY0LTgyYWMtNmRiNWZmMmYzMjYyLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTM2NTZmYmNmYTQ0Y2NjYjI3NjA1NzJiODE0N2FiNTQyZDkwMWFiZjgyOGQ1YmViYzZmYjBhYzcwM2M0YTcyMGYmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.T_r-555L57Dhr-N0otJzKqsshr3WC6rQCty050uRnSY)
Manage Nuget Packages
![image](https://private-user-images.githubusercontent.com/30603497/304235202-7ba0251a-8099-4a36-aed4-b2de91b089d2.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQyMzUyMDItN2JhMDI1MWEtODA5OS00YTM2LWFlZDQtYjJkZTkxYjA4OWQyLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTViMjEzOTdkNDAwYjY2ZmMzNTZlNzllNTRlODE5ODUxOGUwNDgyNWJlOTkyN2VhMzVmZWM2YjRhNjNhYWEyZDAmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.IYki6BlDaiAW6YRQ1Mm9d47u1uofLY6LsdUJzqxDEn8)
Install it in both projects
![image](https://private-user-images.githubusercontent.com/30603497/304235562-b85f2175-ac38-4a30-9588-190d444171ff.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQyMzU1NjItYjg1ZjIxNzUtYWMzOC00YTMwLTk1ODgtMTkwZDQ0NDE3MWZmLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTI0Y2VhZTYyMTQyOWJiODQzY2MwNzJkMWU1OWNkOWY2ODcxMmFiNjRiOGI1ZjdlYzZjOGU3ODNkNjFhZWNmYzEmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.aomxAGaz7D-NzP4OoR-d6ABrLlEWGKd-Gxl6kv2nEEc)
Go to Java Downloads and install the latest JDK. (JDK 21 as of Feb 2024).
[Not required, so not doing it now]
brew install confluentinc/tap/cli
Check it was installed
Ashishs-MacBook-Pro:dotnet-kafka ashishkhanal$ confluent version
confluent - Confluent CLI
Version: v3.48.1
Git Ref: e86e352ee
Build Date: 2024-01-25T23:40:47Z
Go Version: go1.21.5 X:boringcrypto (darwin/amd64)
Development: false
confluent local kafka start
You'll get this error
Error: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?
It went away after I checked the option of allowing docker socket to be used.
![image](https://private-user-images.githubusercontent.com/30603497/304259983-1313b420-df4f-4981-a72d-88ef5cff3b61.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQyNTk5ODMtMTMxM2I0MjAtZGY0Zi00OTgxLWE3MmQtODhlZjVjZmYzYjYxLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTEzNDAwMTEzNmU3YWZmY2YzOTc2M2E3ZjM0MDdhZDMzMjJjOTczY2ZhYzljNWU3ZDNjMWQ2NDUwZjBmZjA5NDQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.vLEtrWtIhm72qZnyv9KFvJ46Vabg7b3Wafv2oawG7ho)
Like this:
![image](https://private-user-images.githubusercontent.com/30603497/304263484-cea97d6a-30cd-4388-8b28-6bd11a223881.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQyNjM0ODQtY2VhOTdkNmEtMzBjZC00Mzg4LThiMjgtNmJkMTFhMjIzODgxLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTEzMjllMWY5OGJkN2NiMDIzNzA0YmZhZDJlNjU2NTNiNDczYTQxNDE4NzNjM2RjY2I1YmFmYTMwYTczNzZkYjUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.qtx60t8SSasjifyoxnjuLfZJ-BB8j_sM7ZdfsuwUxCY)
Now, it doesn't print any ports, just shows this:
Ashishs-MacBook-Pro:dotnet-kafka ashishkhanal$ confluent local kafka start
The local commands are intended for a single-node development environment only, NOT for production usage. See more: https://docs.confluent.io/current/cli/index.html
Set environment variable CONFLUENT_HOME
:
Check where the confluent
cli is installed
Ashishs-MacBook-Pro:dotnet-kafka ashishkhanal$ brew info confluentinc/tap/cli
/usr/local/Cellar/cli/3.48.1 (4 files, 57.5MB) *
Open your shell profile
vim ~/.bash_profile
Add this to the file
# For Confluent
export CONFLUENT_HOME="/usr/local/Cellar/cli/3.48.1"
export PATH=$PATH:$CONFLUENT_HOME/bin
Hit Esc
, type :wq
and hit enter to save and quit.
Reload the bash profile file using the source command:
source ~/.bash_profile
It works at this point. For more details, check out my StackOverflow question.
![image](https://private-user-images.githubusercontent.com/30603497/304865136-9a9b15c5-4a0f-4d5c-8249-3883810c1f7c.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ4NjUxMzYtOWE5YjE1YzUtNGEwZi00ZDVjLTgyNDktMzg4MzgxMGMxZjdjLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPThiNjVmMGM5MmZlZGUzZTNlMzFjMjRiZjY1YzIxNTYyYjc0NTFmMTljYWUzMGE3Y2Q0MWQ0YTc5ZGU1YzVmM2QmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.cTiLB0Yi-iGk6WmUCmxVivI0e90xqZAVAyYP3fmbhjo)
I gave Confluent Cloud a try instead of local cluster at this time.
I signed up through my Azure account as 'Pay As You Go' plan.
![image](https://private-user-images.githubusercontent.com/30603497/304444198-14b0a2ef-e70f-4b33-b879-50d08f882d86.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ0NDQxOTgtMTRiMGEyZWYtZTcwZi00YjMzLWI4NzktNTBkMDhmODgyZDg2LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTkzNzMxODgxYmM5NjBmM2QwYmJiZmNjMDExZDQ4YTBjNTM3NjgwYjk4NTI5N2JlY2QzZDc3NDNhOTYxOWNkN2QmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.4-HRHqrNVbQ2-YjCdfBzPp3tWCyQ54X4QFYud0Y3Kf0)
Created Confluent org in my Azure account
![image](https://private-user-images.githubusercontent.com/30603497/304300541-bb0f35be-f95f-461d-abd6-0c2ae5029b1c.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQzMDA1NDEtYmIwZjM1YmUtZjk1Zi00NjFkLWFiZDYtMGMyYWU1MDI5YjFjLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTYwODY5NmRmZWRiZjYzMjA2N2U5ZWJkYWE2OWIzMTRhZmZiNDJmYzc4ZTQxMGJmMjQ0MjAxMTZkOGEwOWIwYmMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.1G7vr-3amhHjUXf-8VysW6CPZsKIduHRWyQO9UKcO70)
It appears under the RG I created it under
![image](https://private-user-images.githubusercontent.com/30603497/304444744-a3f31fd2-b305-4b0c-b2b8-c86da7c2aafc.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ0NDQ3NDQtYTNmMzFmZDItYjMwNS00YjBjLWIyYjgtYzg2ZGE3YzJhYWZjLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWZiYmNlZTA1YzAzOTUzZWQ3M2Y1NjM5ZGYxYTEzNTBkYjkxMDAwMmMxMWFjZDNlNDhjNzA2ODRlZTFmMzEyMWQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.dyf27H6QCW4biYZohMijpWF79P_fO8dLCkCDJuuSBbI)
Now go to Confluent cloud by clicking Launch
![image](https://private-user-images.githubusercontent.com/30603497/304445779-672d9b73-cd46-4a59-9f23-a5c613362b00.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ0NDU3NzktNjcyZDliNzMtY2Q0Ni00YTU5LTlmMjMtYTVjNjEzMzYyYjAwLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTk0OWFjYjI5NTU1MjIzYWU4NDExYTUyMjhkOGY1ZDc3ZmZjMjUyNGU2OTlkMTEzNTc2ZDZiMjJhMWI1YzNmMWUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.mWMFMBM6UocWZDowgY6qI6I2Os9oNKLiXaLP1vO1erA)
You'll go to this url often, bookmark it if you'd like:
https://confluent.cloud/environments
Go to Home
Environments -> Add cloud environment
![image](https://private-user-images.githubusercontent.com/30603497/304894562-a569d5b1-58e8-411a-8984-b281a69bcb6f.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ4OTQ1NjItYTU2OWQ1YjEtNThlOC00MTFhLTg5ODQtYjI4MWE2OWJjYjZmLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTkzZjA5NTM1YzRhZjRmNzI4YzUzYWQzMjFkYTcyNjE5YTNhMDNmMGI0M2JkZjhjNjQ0OTZmOWYxYTUwMThjZTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.GHL9geV6qU8gHHbX7p0WifRK1mCbfNbbHiyKSRCLNZ4)
Stream Governance Packages -> Essentials -> Begin configuration
Tell it where you will be storing the metadata.
![image](https://private-user-images.githubusercontent.com/30603497/304895411-2c57f793-801d-4f67-a07c-07a233ec53a0.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ4OTU0MTEtMmM1N2Y3OTMtODAxZC00ZjY3LWEwN2MtMDdhMjMzZWM1M2EwLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWZkYTlhYjEwZmI4ZmZkZjQyZjliYmExYTQxYTVhMjgzYjkyNTU0ZjU4OGMyMzg4MTRlNDg3NWM0OGU3Yjc4NTkmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.93qrcKNQ4K3Nm98ZSmVi3ieTo2qz6TMUGeyzdlnmKJ4)
confluent login
![image](https://private-user-images.githubusercontent.com/30603497/304895982-653fcab1-692d-4c27-b8ae-42ae9561d94f.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ4OTU5ODItNjUzZmNhYjEtNjkyZC00YzI3LWI4YWUtNDJhZTk1NjFkOTRmLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTg0MDQ4OGQ3MTFkNmVkMmJmOWYwZDYyZGFlNDQyMjBlMTc0NDlmMzFmNjY1MzZmMWUwMmFhYzhkZGU1YWVlM2YmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.TLmzq7NRnakIr1hU0BAPtP41YFynrpERhGxC8rYSSjc)
CLI shows the successful login
![image](https://private-user-images.githubusercontent.com/30603497/304896409-6646fbe7-56c9-4e2c-a913-00c0e1e861b3.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ4OTY0MDktNjY0NmZiZTctNTZjOS00ZTJjLWE5MTMtMDBjMGUxZTg2MWIzLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTJjYzdlMTE0NTg2MTg0YTNmZmVjMzg3NGZmNmM5ODg4Y2UwOGQwMGI2OGI1Y2EyYzg4NzYxNjVlZWQxOWU4MjAmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.xwowD3rSED_G-F2uDqX-JPYKHjxW9zwY0K4215IYH1I)
confluent environment list
![image](https://private-user-images.githubusercontent.com/30603497/304896673-8efab508-ae07-44d3-9d59-bba678c8630f.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ4OTY2NzMtOGVmYWI1MDgtYWUwNy00NGQzLTlkNTktYmJhNjc4Yzg2MzBmLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTlkMWM3ZmJhMzdiMmJjOGE5N2U1MzAzNDIxNjViYTBiMjYzYTgzOWExZTEwYmI0OTg3Y2I2MTVjYzkxZjg5MmQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.MXg0pfpg_nZPnTcfpLRWXXpUp75NtFeD7refGVCMsnQ)
Set the new environment I just created as the active environment:
confluent environment use env-19vow5
Notice that the *
has changed
![image](https://private-user-images.githubusercontent.com/30603497/304897274-3f73b8fa-4c31-40dd-90aa-5260aec5400c.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ4OTcyNzQtM2Y3M2I4ZmEtNGMzMS00MGRkLTkwYWEtNTI2MGFlYzU0MDBjLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTRhMWFhZmMxYWRiMWUzMjI4YWJiYTNlZTdkOWJhN2IwNzI4ZDg3ODM0NTdkZTk3NDc4YzA0MTU1MWZiNmI2MDYmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.xdb6xtOTtbpVXvVW_VYZo1hKCfkfqHorlLJoOpTuZ3U)
![image](https://private-user-images.githubusercontent.com/30603497/304897376-f1070fff-b389-409f-baf9-27078918b2b2.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ4OTczNzYtZjEwNzBmZmYtYjM4OS00MDlmLWJhZjktMjcwNzg5MThiMmIyLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWZiMTA3OGU4NDRmMDM0OGU0MzhmZDgwZTdiODZkZDdjNjU2MWRmNDZhNzQ2NzMxNjExOGY4NDM5ZjQwYzBiM2EmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.Fdee_ZKemoeSvxKm4LEU7iE39BVF1msBCZ4u0d5LhHg)
-> Create cluster on my own
Create cluster -> Basic
![image](https://private-user-images.githubusercontent.com/30603497/304897913-de308359-313f-4346-a51b-0caf12e2470e.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ4OTc5MTMtZGUzMDgzNTktMzEzZi00MzQ2LWE1MWItMGNhZjEyZTI0NzBlLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWIzNzJlM2NjYjcxNzA3ZGQxMDc1MzhlMDhmNzFlNDM1ZWZiMzA1NDBhZWY4YmY0YzVhNjk0NWM1MzMyNWNhOGMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.BJgJC0hWbHPF1GqGUD-RDjr6PFgCFwiOXYmCHbczMRs)
-> Launch cluster
Payment details
Add coupon code: DOTNETKAFKA101
![image](https://private-user-images.githubusercontent.com/30603497/304900361-b47bd8f5-af4e-43a1-9a24-58a48f6fdc90.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ5MDAzNjEtYjQ3YmQ4ZjUtYWY0ZS00M2ExLTlhMjQtNThhNDhmNmZkYzkwLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTUyNGEwMGNmZDk3YTJmMGE2MTdhM2Q3M2QyYzg0Y2ZkZWMzNzJiYTE3ZjQ0ZTY3M2UxYWY3ZGFmY2U4OTIyMzcmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.eQEqxUfGDLAm0M11rXLT_NEM9CsKYGI0BIImcTmkSpA)
We will need an API Key to allow applications to access our cluster.
![image](https://private-user-images.githubusercontent.com/30603497/304900756-55e85853-d27f-4e01-b5c2-c9c76995f0bc.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ5MDA3NTYtNTVlODU4NTMtZDI3Zi00ZTAxLWI1YzItYzljNzY5OTVmMGJjLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWVlNDFhZjJjZjJmZWMzOWY4OWMzY2M3MDkzZTliYTRhZGYzMGM5MzM2NTRmYjNhMDEyNjA3M2Q0NjU3NmFlZmMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.tyx2FRHvtpxozYhYmur98IC_KQtfg0vLcQnX0-za_V4)
Global access -> Next
Download and save the key somewhere for future use.
- From the main menu (top right) or the breadcrumb navigation (top) select Environments.
- Select the kafka-with-dotnet environment.
- In the right-hand menu there should be an option to Add key. Select it and create a new API Key.
A topic is an immutable, append-only log of events. Usually, a topic is comprised of the same kind of events.
![image](https://private-user-images.githubusercontent.com/30603497/304919102-8338369a-158d-4932-8d82-3b301a85a628.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ5MTkxMDItODMzODM2OWEtMTU4ZC00OTMyLThkODItM2IzMDFhODVhNjI4LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTMwZGNkZTRhYTRmMjdjOGY0MjFlNDIyMjE1OWExMzdjNDJkZmU0ZWU4NDY3NGJhYzdkNTAwNDBhM2M5YjI1NzMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.nx5kGKL92X6B7K8V8XGGRtBS8RQyc2fQJzeSxjKH4Fg)
Create a new topic, RawBiometricsImported
, which you will use to produce and consume events.
![image](https://private-user-images.githubusercontent.com/30603497/304919265-324d25d8-a71f-4485-86b0-5470d45370d6.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ5MTkyNjUtMzI0ZDI1ZDgtYTcxZi00NDg1LTg2YjAtNTQ3MGQ0NTM3MGQ2LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTRhYmMwMWRiNzZkMjYyZGZlNTQ2YjRkYjU4OTJhYmVkYWY1NTQwZjc0ZjczZGIzN2JlMjk1MTNhN2JiNjg1OGEmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.pdIirWyRRTuzoAztTJuvGzPiYvMWbw8IpISigOhCNN4)
-> Create with defaults
When asked to Define a data contract select Skip for now.
- The Kafka.BootstrapServers is the Bootstrap server in the file.
- The Kafka.SaslUsername is the key value in the file.
- The Kafka.SaslPassword is the secret value in the file.
- SchemaRegistry.URL is the Stream Governance API endpoint url.
- SchemaRegistry.BasicAuthUserInfo is
<key>:<secret>
from the API Key file you downloaded for the Schema Registry.
Store user name and passwords inside secrets.json
and other details in appsettings.json
.
![image](https://private-user-images.githubusercontent.com/30603497/304904037-2c56d72f-9788-437f-9715-2de12d2b3731.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ5MDQwMzctMmM1NmQ3MmYtOTc4OC00MzdmLTk3MTUtMmRlMTJkMmIzNzMxLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTc4NGQ1MzdjOGMxZDFlZjAyZTViZmUwYzE1MjE2OWZkYzg3Yzg4ZWViNmM4ZTM1MjI3Yzg4NDA2ZGUyMmU0OTEmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.N_TbSTRMDSNEQX0b_olvR4J60tWxW_CPKA5tXsWXJmQ)
A domain event signals something that has happened in the outside world that is of interest to the application.
Events are something that happened in the past. So they are immutable.
Use past tense when naming events. For eg: UserCreated
, UserAddressChanged
etc.
var message = new Message<string, Biometrics>
{
Key = metrics.DeviceId,
Value = metrics
};
If you care about message ordering, provide key, otherwise it's optional.
In above example, because we're using DeviceId
as key, all messages of that specific device are handled in order.
Value can be a primitive type such as string or some object that can be serialized into formats such as JSON, Avro or Protobuf.
![image](https://private-user-images.githubusercontent.com/30603497/304907095-1d0706c6-5b6e-465f-ba32-2aef3628dd8d.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ5MDcwOTUtMWQwNzA2YzYtNWI2ZS00NjVmLWJhMzItMmFlZjM2MjhkZDhkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWU4ODVmNDY5N2Q2OWNkMDAxNTA2ODAzODhlYzVjMzIzNjRlOWI2MDM2ZGMxZTc4NTE4MGZjYjA5NWM2NmJhMzImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.lYrs5z1MzU8kYs3wUaPiyRsqa_BfpYNAiE9DKxAvpvU)
You can consider the messages being produced by your system to be just another type of API.
Some APIs will be consumed through HTTP while others might be consumed through Kafka.
"KafkaProducer": {
// One or more Kafka brokers each specified by a host and port if necessary.
// It will be used to establish the initial connection to the Kafka cluster.
// Once connected, additional brokers may become available.
"BootstrapServers": "pkc-4rn2p.canadacentral.azure.confluent.cloud:9092",
"SecurityProtocol": "SaslSsl",
"SaslMechanism": "PLAIN",
"SaslUsername": "comes from user-secrets' secrets.json",
"SaslPassword": "comes from user-secrets' secrets.json",
// Used to identify the producer.
// In other words, to give it a name.
// Although it's not strictly required, providing a ClientId will make debugging a lot easier.
"ClientId": "ClientGateway"
}
Grab the config
builder.Services.Configure<ProducerConfig>(builder.Configuration.GetSection("KafkaProducer")); // OR var producerConfig = builder.Configuration.GetSection("KafkaProducer").Get<ProducerConfig>();
Go to the web api you created earlier.
It will work as a simple REST endpoint that accepts data from a fitness tracker in the form of strings and pushes it to Kafka with no intermediate processing.
In the long run, this may be dangerous because it could allow a malfunctioning device to push invalid data into our stream. We probably want to perform a minimal amount of validation, prior to pushing the data. We'll do that later.
Register an instance of IProducer<string, string>
. We use a singleton because the producer maintains connections that we want to reuse.
builder.Services.AddSingleton<IProducer<string, string>>(sp =>
{
var config = sp.GetRequiredService<IOptions<ProducerConfig>>();
return new ProducerBuilder<string, Biometrics>(config.Value)
.Build();
});
Send the message
var result = await producer.ProduceAsync(biometricsImportedTopicName, message);
// Synchronous method, so it will wait for acknowledgement from broker before continuing
// It's often better to produce multiple messages into a batch prior to calling Flush.
producer.Flush();
The messages aren't necessarily sent immediately.
They may be buffered in memory so that multiple messages can be sent as a batch.
Once we're sure we want the messages to be sent, it's a good idea to call the .Flush()
method.
![image](https://private-user-images.githubusercontent.com/30603497/304925164-f7250701-7d04-4477-bead-6c9c5f83db7e.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ5MjUxNjQtZjcyNTA3MDEtN2QwNC00NDc3LWJlYWQtNmM5YzVmODNkYjdlLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTY3YjFjOGEyM2NiMWJjZjdlYzNmMTA3NTgwZTgyOWIwMDA4ZjJlYzhkMWE4MTNjMTVkODNmNGIwZWFjMGJhNWQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.F0EWSrmETIfe7TiQd3mkryYotfhuS2j5m7_dq523eNE)
![image](https://private-user-images.githubusercontent.com/30603497/304925346-a83c9991-e5b9-4256-863a-a9461490aec6.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ5MjUzNDYtYTgzYzk5OTEtZTViOS00MjU2LTg2M2EtYTk0NjE0OTBhZWM2LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWQ4ZTQzMTc5ZTc3ODI1M2RjMzk0YjJkNmJjNzc4NjEyNTQ1YjI1MmVkOTQ2ZTcwOGQzOGJkMDVjZjFmNGIyY2EmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.h8OfyaK7jx6An1q54pDsRjxTltm2CvgmX2Xb3Xk6HqA)
Home -> Environments -> kafka-with-dotnet -> cluster_0 -> Topics
![image](https://private-user-images.githubusercontent.com/30603497/304925538-974b3b19-fa64-4402-8e40-8b2b3b48cfc1.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ5MjU1MzgtOTc0YjNiMTktZmE2NC00NDAyLThlNDAtOGIyYjNiNDhjZmMxLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTgxNTE5N2JkOGE5MGUxZWY4NjA4YmE0MzhlNjY4ZjVmNzg4YWRiZWNiMjBkMWRlZTA5YjI1MWJhMTlhN2E5MDImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.tkcCUz1nQVz72cqJO2C5VYRxx08Kt0rtHrfbh5Nm_N0)
The message producer is created by providing two types.
new ProducerBuilder<TypeofKey, TypeofValue>(config)
The first type represents the Key of the message while the second is for the Value.
These types can be simple types, such as strings or integers, but they can also be more complex types like a custom class or struct.
However, Kafka itself doesn't directly understand these types, it just operates with blobs of data in byte form. This implies that we need to convert our data into an array of bytes before sending it to Kafka if it is a complex object.
![image](https://private-user-images.githubusercontent.com/30603497/304927293-528a11e9-a910-4ffa-b6ae-afe2e4997195.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDQ5MjcyOTMtNTI4YTExZTktYTkxMC00ZmZhLWI2YWUtYWZlMmU0OTk3MTk1LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTc5MWNiYTM1ZGUwNDk2ZWE1MDg0NDY1NGVkZWM5ODVkYmViZGZlMTg4N2JlOWM3Y2I1NjhjNTA4YmIzZThhZGImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.beDUUhBDEU09j0PoQhVZCx8UlBcIaM4fuGjM6uiaDvA)
To register a serializer for the message value, we use the SetValueSerializer
method on the ProducerBuilder.
For eg:
new ProducerBuilder<string, Biometrics>(producerConfig)
.SetValueSerializer(new JsonSerializer<Biometrics>(schemaRegistry))
.Build();
The Confluent Kafka client includes support for three major message formats, including Protobuf, Avro and JSON.
Each of these three formats supports a message schema. Schema is basically a set of rules that outline the exact structure of a message.
These schema can be stored in an external service known as a Schema Registry. The Confluent Cloud built-in Schema Registry is a good choice here.
To make use of the schemas, we can connect our serializers to the Schema Registry.
The serializer will have its own version of the schema that it will use to serialize each message.
The first time we try to serialize an object we look for a matching schema in the registry.
If a matching schema is found,the current message and any future ones that use the same schema will be sent to Kafka.
![image](https://private-user-images.githubusercontent.com/30603497/305227250-0448c387-860b-4e1b-863a-5808c65ecd93.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDUyMjcyNTAtMDQ0OGMzODctODYwYi00ZTFiLTg2M2EtNTgwOGM2NWVjZDkzLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTk4OTJkMmJmOGQ0ODk2NjM3MzlkMDc3YzcwODlkN2UyODRiY2I3NmY3ODE2NWI1MTBmNjBjMmI4OTQ1ZmUzMGYmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.5tn_e-3oXJbDFSwr92PNYlbJ3kzhW9oCNwZXYU7cwR0)
However, if no matching schema is found,then any messages that use that schema will be rejected. Essentially, an exception is thrown. This ensures that each message going to Kafka matches the required format.
Create a new topic named BiometricsImported
(use the default partitions).
Define a data contract -> Create a schema for message values
![image](https://private-user-images.githubusercontent.com/30603497/305240245-bf734225-8385-4252-b79a-6c9334a41ebb.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDUyNDAyNDUtYmY3MzQyMjUtODM4NS00MjUyLWI3OWEtNmM5MzM0YTQxZWJiLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWZkNmQxZDk3MWNlODdlZjc4ZTg0ODk4Yjg2YzdmODdjMTBiZDMyMDVhNjE4ZTY1M2E2Zjk5MTA5Mjc0Zjg1NjcmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.Ep-lIyWXUalrIvJIDWbIG9RLnSlYK0YTCgffN_9Kgg8)
-> Create Schema
![image](https://private-user-images.githubusercontent.com/30603497/305240908-04a1e110-e193-44c0-b6da-fc3c4f8fae09.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDUyNDA5MDgtMDRhMWUxMTAtZTE5My00NGMwLWI2ZGEtZmMzYzRmOGZhZTA5LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWQ1Mzg5MDY1M2MyNjYzNmM2ZTA2NDdjZDU5OWExNGUyMTRjZGNkZjBlOGQxNTEyM2QwZTEyMjEzN2QzZjgyOTUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.IxZ2LCEZPeATLhloPhxBOdnUXcVIg8f-kDclXykrWqA)
Copy paste the above JSON into https://codebeautify.org/jsonviewer and add your fields into it (to ensure you're not messing up the JSON), and paste it back to this page and hit Create.
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://example.com/myURI.schema.json",
"title": "Biometrics",
"description": "Biometrics collected from the fitness tracker.",
"type": "object",
"additionalProperties": false,
"definitions": {
"HeartRate": {
"type": "object",
"additionalProperties": false,
"properties": {
"DateTime": {
"format": "date-time",
"type": "string"
},
"Value": {
"format": "int32",
"type": "integer"
}
}
}
},
"properties": {
"DeviceId": {
"format": "guid",
"type": "string",
"description": "The string type is used for strings of text."
},
"HeartRates": {
"items": {
"$ref": "#/definitions/HeartRate"
},
"type": "array"
},
"MaxHeartRate": {
"format": "int32",
"type": "integer"
}
}
}
In above step, essentially you were just mapping these records into a JSON format
record Biometrics(Guid DeviceId, List<HeartRate> HeartRates, int MaxHeartRate);
record HeartRate(DateTime DateTime, int Value);
Add 2 packages:
dotnet-kafka/Producer/Producer.csproj
Lines 13 to 14 in df78503
Register an instance of a ISchemaRegistryClient
using a new CachedSchemaRegistryClient
dotnet-kafka/Producer/Program.cs
Lines 29 to 34 in df78503
Navigate to Swagger page
![image](https://private-user-images.githubusercontent.com/30603497/305252355-dc04d7e6-0e43-4be8-a929-c570e29a2d77.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDUyNTIzNTUtZGMwNGQ3ZTYtMGU0My00YmU4LWE5MjktYzU3MGUyOWEyZDc3LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTVkNmZlZjAyMGRiMDAyODkyODJmNWI2MjY3YzZjZWFlZTIxZDViNmZmZGEwMWUyNDU0MmNlZGM5MzliNGZmNGImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.LMzljsQkQ1OuRG8VNYBL30V7dGp840TZlRn33017oQ4)
Click on /biometrics
and click "Try it out".
Send the request. It succeeds.
![image](https://private-user-images.githubusercontent.com/30603497/305253382-a842fba4-eaac-4ab2-acd1-a3155e0848a7.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDUyNTMzODItYTg0MmZiYTQtZWFhYy00YWIyLWFjZDEtYTMxNTVlMDg0OGE3LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWYzZWFmYjNiZWRhMzE4ODI5MGU0ODcyYzgwNDM3N2MxNDc5YWZmMDgxZGE4MDU0MmU0NjgyOWM0MWRhYTk2ZWYmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.XRyRyXCJOX6LiLbuctI75FX6kXASiK7AdBLN1i5622A)
Check it out in Confluent Cloud.
![image](https://private-user-images.githubusercontent.com/30603497/305253502-b546a032-9a6b-4997-a7ba-d06e786e1d03.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDUyNTM1MDItYjU0NmEwMzItOWE2Yi00OTk3LWE3YmEtZDA2ZTc4NmUxZDAzLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTY2OWYyZWViNzkxZmIxY2QwZTE4Zjc2MDJjOGEzZmJkNDFmZTg4NTU4OGE3YmFjYmEwZDdmMzM2YjgyMGI1NWImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.tdM9uq1SE_yajHmHkkHhHVNwc_EHk1e5NU4LHphZj4k)
The job of the consumer is to process each message in a topic. Usually, they're processed one at a time in the order they're received.
![image](https://private-user-images.githubusercontent.com/30603497/305254999-e41e8ed4-9ccb-4bf6-8f40-8003e57693eb.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDUyNTQ5OTktZTQxZThlZDQtOWNjYi00YmY2LThmNDAtODAwM2U1NzY5M2ViLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTE4NzA2MzUwNTE2OWM0YTE1YTFkNmUxMWJlYzg2ZDRkZmJjNDFmMmVkZjgwZTE2ZDgzMTFkYjVkNTgxNjBkNDUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.UBMBqvVi5g3fqwZXm1HzVvRHhAm7eNMGjbDxA3LfQv4)
It's like a reader (consumer) reading a book.
The reader gets to decide which books they read and how they read them.
Some people might read a book end-to-end while others might jump around, only looking at the sections that interest them.
In the same way, a Kafka consumer can decide what topics they will pay attention to.
Within each topic, they often look at every message, perform some processing, and potentially update a database or some other state.
However, they may decide that some messages aren't relevant and discard them rather than wasting time on processing.
![image](https://private-user-images.githubusercontent.com/30603497/305255637-a84a7f8f-2190-47e0-8147-238eefd8935c.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDUyNTU2MzctYTg0YTdmOGYtMjE5MC00N2UwLTgxNDctMjM4ZWVmZDg5MzVjLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWZhYjY5YzIzNDVmZjZkY2IyMmVhZGZmNzBkY2EzNmRkNGI4ZmFhMzI1NjEyZDRkYzYwMzg5NmMxOWM5MGNmMTkmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.ZY32mKyny11jWfTCRMrqN3Jt-VrpfpR7qX01Rz8tupo)
Remember that when messages are produced to a topic, they're assigned a key. That key is used to separate the topic into multiple partitions.
Within each partition, the order of the messages is guaranteed. However, it's not guaranteed across multiple partitions.
![image](https://private-user-images.githubusercontent.com/30603497/305256035-10acb37d-0e46-42b2-8ae3-5ca67f311b02.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDUyNTYwMzUtMTBhY2IzN2QtMGU0Ni00MmIyLThhZTMtNWNhNjdmMzExYjAyLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTUxZmNhMDYyNTRmYTM4ODExYWViMThlMWZkZTAwYmE0Yzk1MDYwMWE3YzA4M2JmZDg3ZGUyNDMyZTNlOGU5ZjEmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.TRjaPtzMB4N0F2Kn6cx36wch-p3ecLH7BKv9QQtrr0s)
When we create a consumer, we assign it a group ID. This ID indicates that multiple instances of the consumer all belong to the same group. Kafka will distribute the partitions among the group members.
This means that in a topic with 30 partitions, you could have up to 30 instances of your consumer all working in parallel.
![image](https://private-user-images.githubusercontent.com/30603497/305256162-75b87e44-f7ce-45b4-92dc-adb07a975477.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDUyNTYxNjItNzViODdlNDQtZjdjZS00NWI0LTkyZGMtYWRiMDdhOTc1NDc3LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWIwYTA3MmQ3YjdiYWZiMzBkNTM4MjYwOTcwZmRhZjFjYTA2YzUyZTY2OWNiMTZlMzE1MGE2MDdjZjA5M2NlOTMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.5dF8rvUc-zePdOQxQXcfWQ8iQbde7y4pDRXtNDLkNC8)
Each member of the group processes separate partitions.
"KafkaConsumer": {
"BootstrapServers": "pkc-4rn2p.canadacentral.azure.confluent.cloud:9092",
"SecurityProtocol": "SaslSsl",
"SaslMechanism": "PLAIN",
"ClientId": "BiometricsService",
// To identify multiple consumers that all belong to a single group. (Eg: multiple instances of same microservice)
"GroupId": "BiometricsService",
// Determines where the consumer should start processing messages if it has no record of a previously committed offset.
// In other words, if the consumer is unsure where to start processing messages, it will use this setting.
// Setting it to Earliest will cause the consumer to start from the earliest message it hasn't seen yet.
// This is a good value to use if you want to ensure that all messages get processed.
"AutoOffsetReset": "earliest",
// EnableAutoCommit determines whether or not the consumer will automatically commit its offsets.
// Committing an offset indicates that the consumer has finished processing a message and doesn't want to receive it again.
// If we choose to auto-commit, then the offsets are automatically managed by the consumer.
"EnableAutoCommit": true,
"SaslUsername": "comes from user-secrets' secrets.json",
"SaslPassword": "comes from user-secrets' secrets.json"
}
When we are ready to begin consuming messages, we need to subscribe to a topic. This is done using the Subscribe Method on the consumer.
It takes either a single topic name or a collection of multiple topic names. This informs the client which topics we want to consume but it doesn't actually start processing messages.
consumer.Subscribe(BiometricsImportedTopicName);
To start processing messages, we use the Consume Method. It will grab the next available message from the topic and return it as the result.
consumer.Subscribe(BiometricsImportedTopicName);
while(!stoppingToken.IsCancellationRequested)
{
var result = consumer.Consume(stoppingToken);
// Do something with the message. DeviceId: result.Message.Key, Biometrics: result.Message.Value
}
// Close the consumer after you're done
consumer.Close();
How to register services in Console app
Add necessary packages
dotnet-kafka/Consumer/Consumer.csproj
Lines 14 to 15 in 54cda76
And the setup in Program.cs
dotnet-kafka/Consumer/Program.cs
Lines 21 to 30 in 54cda76
We have 1 message in the Topic
![image](https://private-user-images.githubusercontent.com/30603497/305589350-14e5c215-a0b6-4e5f-a58a-74a2e569d6c8.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDU1ODkzNTAtMTRlNWMyMTUtYTBiNi00ZTVmLWE1OGEtNzRhMmU1NjlkNmM4LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTRmODZiN2YxYTcwMGQwZTRlMDRjOTk5Y2FjMTY5MzBlZDVmZTRiNDZhZDQ1OWVmMDE5ZjQyZTQyOTkxYjgzYTQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.-q4sYAwX3qPbNcsq1ku_4L5g3K5Vn5IwU27iozW-ClE)
The Consumer read it successfully
![image](https://private-user-images.githubusercontent.com/30603497/305589390-231b46ac-ab86-403d-b2ab-199f50b89a9b.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDU1ODkzOTAtMjMxYjQ2YWMtYWI4Ni00MDNkLWIyYWItMTk5ZjUwYjg5YTliLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTNlMzk5MjFjNjRiMDE1NTc4ZmMzMjM1YjBhNDY5NmIzNDVhN2YxN2NmNmFiZjI0NGQ4YmY4ZGI1N2Q4NjM5OGMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.W-G4sByOZr8t4LT6dGP3WPWwjtYIeBVAJZqnxBA33QQ)
The default behavior of a Kafka consumer is to automatically commit offsets at a given interval.
"KafkaConsumer": {
// Other stuffs here
// Means that the consumer will receive every message from a topic, but may see duplicates.
"EnableAutoCommit": true, // Default
"AutoCommitIntervalMs": 5000,
"EnableAutoOffsetStore": true
},
However, the behavior in .NET is quite different.
Let's take a look at a simple example.
Scenario 1
while (!stoppingToken.IsCancellationRequested)
{
// When we call consume, the offset for the current message is stored in memory.
// A background thread periodically commits the most recently stored offset to Kafka, according to the AutoCommitInterval.
var result = consumer.Consume(stoppingToken);
Process(result); // CRASH!!
// Background commit would have occured here.
// 👆If our application crashes BEFORE we commit the offsets, then we'll see the messages again once we recover.
// In this scenario, we see all of the messages, but we get duplicates.
}
Scenario 2
while (!stoppingToken.IsCancellationRequested)
{
var result = consumer.Consume(stoppingToken);
// Background commit occurs here.
// 👆If our application crashes AFTER we commit the offsets, then the message is lost and won't be recovered.
Process(result); // CRASH!!
}
If we want to achieve at-least-once processing, we need to manually store the offsets.
To do this, we start by disabling automatic offset storage.
"KafkaConsumer": {
// Other stuffs here
"EnableAutoCommit": true, // The auto-commit thread will still commit the offsets to Kafka according to the specified interval
"AutoCommitIntervalMs": 5000,
"EnableAutoOffsetStore": false, // <-- This guy
},
Now the code looks like this
while (!stoppingToken.IsCancellationRequested)
{
var result = consumer.Consume(stoppingToken);
Process(result); // CRASH!!
// When the message is fully processed, we call store offset on the consumer to explicitly store the offset in memory.
consumer.StoreOffset(result);
}
In the event of a crash, the offset won't have been stored. When we recover, we'll see the message again, giving us the at-least-once guarantee we were looking for.
In order to achieve effectively-once processing, we need to enable some settings on our producer.
Normally, when the producer experiences a write failure, it will retry the message.
This can result in duplicates. However, if we enable idempotence then a unique sequence number is included with each write.
If the same sequence number is sent twice, then it will be de-duplicated.
// Consumer/appsettings.json
"KafkaProducer": {
// Other stuffs here
"ClientId": "BiometricsService",
"EnableIdempotence": "true" // <-- This guy
// Config added for Transactional commits
// Identify the instance of our application with a TransactionId.
"TransactionalId": "BiometricsService" // <-- This guy
},
"KafkaConsumer": {
// Other stuffs here
"ClientId": "BiometricsService",
"GroupId": "BiometricsService",
// Config changed for Transactional commits. Changed from default true to false.
"EnableAutoCommit": false, // <-- This guy
// Default is ReadCommitted. Ensures that the consumers are only reading fully committed message in a Transaction.
"IsolationLevel": "ReadCommitted" // <-- This guy
}
If we are consuming data from Kafka and producing new records to Kafka with no other external storage, then we can rely on Kafka transactions.
In the previous exercise, we implemented a basic consumer for our topic. One key aspect of that consumer is that it was set up to do automatic commits (after consuming the message). As a result, if something failed to process, it may be committed anyway, and the message may be lost.
We are going to switch from automatic commits to transactional commits ("EnableAutoCommit": false
).
As we process the messages, we will produce multiple new messages to a separate topic. We want this to happen in a transactional fashion. Either all of the messages get published, or none of them do.
In Confluent Cloud, create a new topic named HeartRateZoneReached
.
The type of message it will use is
public record HeartRateZoneReached(Guid DeviceId, HeartRateZone Zone, DateTime DateTime, int HeartRate, int MaxHeartRate);
So use this Schema
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://example.com/myURI.schema.json",
"title": "HeartRateZoneReached",
"type": "object",
"description": "Schema for HeartRateZoneReached.",
"additionalProperties": false,
"definitions": {
"HeartRateZone": {
"description": "It's an enum that represents HeartRateZone",
"type": "integer",
"enum": [
0,
1,
2,
3,
4,
5
],
"x-enumNames": [
"None",
"Zone1",
"Zone2",
"Zone3",
"Zone4",
"Zone5"
]
},
"properties": {
"DateTime": {
"format": "date-time",
"type": "string"
},
"DeviceId": {
"format": "guid",
"type": "string"
},
"HeartRate": {
"format": "int32",
"type": "integer"
},
"MaxHeartRate": {
"format": "int32",
"type": "integer"
},
"Zone": {
"$ref": "#/definitions/HeartRateZone"
}
}
}
}
Now our Consumer will also produce HeartRateZoneReached
messages, so we need to add Producer configuration in Consumer project.
// Consumer/appsettings.json
"KafkaProducer": {
// Other stuffs here
"ClientId": "BiometricsService",
"EnableIdempotence": "true" // <-- This guy
// Config added for Transactional commits
// Identify the instance of our application with a TransactionId.
"TransactionalId": "BiometricsService" // <-- This guy
}
dotnet-kafka/Consumer/Domain/HeartRateExtensions.cs
Lines 1 to 24 in 82c4b08
Now send this data to the /biometrics
endpoint in Producer.
{
"deviceId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"heartRates": [
{
"dateTime": "2022-12-02T13:50:00.000Z",
"value": 90
},
{
"dateTime": "2022-12-02T13:51:00.000Z",
"value": 110
},
{
"dateTime": "2022-12-02T13:52:00.000Z",
"value": 130
},
{
"dateTime": "2022-12-02T13:53:00.000Z",
"value": 150
},
{
"dateTime": "2022-12-02T13:54:00.000Z",
"value": 170
},
{
"dateTime": "2022-12-02T13:55:00.000Z",
"value": 190
}
],
"maxHeartRate": 200
}
![image](https://private-user-images.githubusercontent.com/30603497/306075590-b8c70396-e15c-434e-86c0-fb77a36de2ae.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDYwNzU1OTAtYjhjNzAzOTYtZTE1Yy00MzRlLTg2YzAtZmI3N2EzNmRlMmFlLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWU4ZDJiZTI4MjQyMTYxYWJlMDBjM2UzMzBmMTg2ZmM1ZDFjNzNmMjdjY2VjNjBjY2RjNTVhY2Q2NDgyZDA5YmUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.KMnFmBgwWO6HMnFt2_HuCp1Pyn7tmwicYHSOSJkCd18)
Also you can look at the Topics in Confluent cloud
![image](https://private-user-images.githubusercontent.com/30603497/306075342-9a7c0e4f-539a-4b6f-a8a3-8b7317ea2e6d.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDYwNzUzNDItOWE3YzBlNGYtNTM5YS00YjZmLWE4YTMtOGI3MzE3ZWEyZTZkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTZkNjQxMmMzYjdkMDYwZjMzNDM0MTBkYmVjNTk5MTllYmEzNTc4ZDQ0YTk1MmFlZTRhYTMyZTgwMWYzMGY3YzcmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.LAH6mFVk-7stOK1h5ih9C1Grvq4bE5-CUK8wrTcHLcY)
As soon as above call succeeds, you'll get here in Consumer project
![image](https://private-user-images.githubusercontent.com/30603497/306074907-66bc6d3f-b2d8-404f-acc5-dc551a943d2b.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDYwNzQ5MDctNjZiYzZkM2YtYjJkOC00MDRmLWFjYzUtZGM1NTFhOTQzZDJiLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWIzOGU4ZDA1NWUwZmRhN2RhNTRjNTdhNzBiYTkxMWQzNGNhY2U0MzhhYmM1NjAzNDc5MDQwYjBkYWUzYjhiMWMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.P83EnWm_oX839r40WvHs2rHX3QPQw4sqcBX0HRAIGy8)
After HandleMessage
method in Consumer/Workers/HeartRateZoneWorker.cs
runs to completion, you'll see 5 messages in HeartRateZoneReached
topic
![image](https://private-user-images.githubusercontent.com/30603497/306085934-3f670057-ad1f-476f-938f-c08692ee97d1.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDYwODU5MzQtM2Y2NzAwNTctYWQxZi00NzZmLTkzOGYtYzA4NjkyZWU5N2QxLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWNiZjY1Mjc1MTk0YjQxMzgxMGRjZGZlZmNjMWJkYjhmMmYxMjliYjI0OGRiZmE0MzdmNjQwN2UxNTFjODRhMDYmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.VNg0WuWZufTDtKY7VxLG1duFh0qmCT5MloICXDTzrIw)
So, in essence, this worker ensures that processing each Biometrics
message and producing HeartRateZoneReached
messages is an atomic operation/ transaction: either everything happens or nothing happens.
Send this over
{
"deviceId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"heartRates": [
{
"dateTime": "2022-12-02T13:50:00.000Z",
"value": 90
},
{
"dateTime": "2022-12-02T13:51:00.000Z",
"value": 110
}
],
"maxHeartRate": 200
}
It'll go to BiometricsImported
![image](https://private-user-images.githubusercontent.com/30603497/306105553-f43846c5-0f0e-4f3c-8e8d-76481acd1f36.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDYxMDU1NTMtZjQzODQ2YzUtMGYwZS00ZjNjLThlOGQtNzY0ODFhY2QxZjM2LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTQxYTk3YjA4NzExYzBhMjRiNDYzOTNjNjExY2ExYjU4ZjhhMTc1NWE1ODIzY2NhMDIwY2IwODc2M2Y2NDkzYmQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.gRX_te8qszYQsLUtLGJo7HQKR6myJ1Yuh3gQ-kZOdo0)
The offset says 2 (offset).
Now when this message appears at the Consumer, you'll also see that the Offset says 2.
![image](https://private-user-images.githubusercontent.com/30603497/306106836-7a9b0684-800e-430d-a47a-15ead4a8a1db.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDYxMDY4MzYtN2E5YjA2ODQtODAwZS00MzBkLWE0N2EtMTVlYWQ0YThhMWRiLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWMzMGFhMDI1YzQxMzBmMDE3YWQ3M2M2M2NhMTQ3YjNlNGI5MDhjZWUxOGY2MDA5ZTIxYWE1ODU2M2MxNjU4NDQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.7INkg-QS8GYWN_ri-zhPUXOVaZ2ew3VLXYxjaL2tMDA)
The offsets
inside HandleMessage
shows 3 (to be committed offset).
![image](https://private-user-images.githubusercontent.com/30603497/306107034-7bc3a70a-0190-40d4-a989-068d56541d82.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMDYxMDcwMzQtN2JjM2E3MGEtMDE5MC00MGQ0LWE5ODktMDY4ZDU2NTQxZDgyLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTJlNmFhMjIzZGNlM2FjYTIzYzhjNDE0NWQyM2FjYmUyYTYxZDYzNzgzZjBhZTNjZmQ1ZjRlYWFhYWY2NDhmYmImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.Vi_MVxCfY3NzlzTm5V5ZNjbzZW6GXoYAPVGnNfDMoSM)
More details in the comments:
dotnet-kafka/Consumer/Workers/HeartRateZoneWorker.cs
Lines 41 to 53 in 17203b0
![image](https://private-user-images.githubusercontent.com/30603497/330902650-37eeb0e3-f3c9-4b38-aabb-9b3c79143720.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MjAwNDU0NTcsIm5iZiI6MTcyMDA0NTE1NywicGF0aCI6Ii8zMDYwMzQ5Ny8zMzA5MDI2NTAtMzdlZWIwZTMtZjNjOS00YjM4LWFhYmItOWIzYzc5MTQzNzIwLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA3MDMlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNzAzVDIyMTkxN1omWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWJiZjliOThiMjE1NTcxNTI0N2E3YjlmYWQ1MzE5ODI5MmZhMTg5YjA2ZGRmM2I4NWE0ZWFlYzYzMTNiY2UzMzImWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.HYDrqaV1frRIPAdHcjhUArf8D_AF6i5xQvrjVLQR2CY)