/is-using-kop-a-good-idea

Is using KoP (Kafka-On-Pulsar) a good idea? Use the scenarios implemented in this repository to check whether Pulsar with KoP enabled is a good fit for your most common Kafka workloads.

Primary LanguageJavaApache License 2.0Apache-2.0

Is Using KoP (Kafka-On-Pulsar) a Good Idea?

Building microservices around Apache Kafka is your job, and life is great. One day, you hear community members talking about some neat Apache Pulsar features, and get you intrigued. I mean, we all love Kafka, but you can't avoid wondering if migrating one of your projects to Pulsar is a good idea. Then it happens. You find Pulsar supports Kafka clients natively via a protocol handler called KoP: Kafka-On-Pulsar.

This gets you pumped. Is that it? Can I go ahead and simply point my microservices to Pulsar and be a hero with this migration? But you must be responsible; and history says you shouldn't believe migrations like this are refactoring free. This is why you may get interested in what this repository offers. To prove that KoP is worth your time, use it in scenarios that would really put the technology to stress, going way beyond the bread-and-buttter of producing and consuming messages. Follow the instructions below to test KoP in the following scenarios:

  • Microservice built for Apache Kafka: microservice written in Spring Boot, that connects to a "Kafka Cluster" using two endpoints exposed by Apache Pulsar with KoP enabled.

  • CDC using Debezium for MySQL: Kafka Connect with a connector from Debezium to capture in near real-time changes made to a MySQL database. Kafka Connect uses an endpoint exposed by Apache Pulsar with KoP enabled.

  • Stream processing using ksqlDB: Stream processing pipeline using ksqlDB to flat out events produced to Kafka. ksqlDB uses an endpoint exposed by Apache Pulsar with KoP enabled.

💡 This repository was created as part of the session entitled Is Using KoP (Kafka-On-Pulsar) a Good Idea? delivered in the Pulsar Summit 2022 conference, in San Francisco, CA. Click in the image below to access slides and recording of this session.

Is Using KoP (Kafka-On-Pulsar) a Good Idea?

Requirements

🏢 Shared Apache Pulsar Infrastructure

Before jumping into any of the scenarios, you must start the shared infrastructure all of them will use. This includes one Zookeeper instance, two Bookkeepers, and two Pulsar brokers.

1️⃣ Start the persistence layer

sh start-persistence.sh

👀 You must wait until the containers zookeeper, persistence-bookie-1, and persistence-bookie-2 are healthy to proceed with the next step.

2️⃣ Start the Pulsar brokers with KoP enabled

sh start-brokers.sh

👀 You must wait until the containers kafka-1 and kafka-2 are healthy to proceed with any next step.

✅ Scenario: Microservice built for Apache Kafka

This scenario checks two things. First, if KoP provides a truly Kafka-compatible API where third-party frameworks such as Spring Boot can connect with without deployment problems. Second, to check whether KoP is capable of mimic the distributed protocol from Kafka. Kafka is not just a one-directional typical client-server protocol. Instead, it is a bi-directional protocol where messages are exchanged from both parties. A good example is when a producer connects to the Kafka cluster using one bootstrap server endpoint, and the cluster keeps periodically updating that list back to the producer with metadata about the new cluster formation. Same for the consumer, which after joining a group, may be eventually removed by the cluster for the absence of valid heartbeats.

To validate this scenario, two Apache Pulsar brokers with KoP enabled will be executed, and the microservice will use the endpoint of only one broker to bootstrap the cluster. When everything is up-and-running and working as expected, the broker being used by the microservice will be killed, and the assumption is that the microservice should fallback to the other available broker, and continue its execution. If that ever happens, it means that the bootstrap worked as expected, giving the specifications of how Kafka manages clusters and sends this information to its clients.

1️⃣ Run the Spring Boot microservice

sh microservice-with-kafka/run-microservice.sh

👀 You must wait until the microservice connects with the brokers and start producing and consuming messages like this:

org.summit.pulsar.demo.FiveSecondsTom : Hi, I'm Tom 😄

2️⃣ Find out which broker is the leader of the partition

$PULSAR_HOME/bin/pulsar-admin --admin-url http://localhost:8081 topics lookup persistent://public/default/fiveSecondsTomTopic

👀 Take a note of which broker shows up in this lookup.

3️⃣ Forcing a fail-over situation by killing the leader

sh kill-broker.sh <BROKER_CONTAINER_NAME_FROM_STEP_TWO>

👀 Observe the microservice for a couple minutes. It must continue its processing.

#️⃣ stop all containers if you're done for the day.

sh stop-everything.sh

✅ Scenario: CDC using Debezium for MySQL

One of the most popular use cases for Apache Kafka is using it to replicate data from relational databases to purpose-built systems, while Kafka acts as a persistent layer for the data streams. To enable this use case, Kafka Connect, which is an integration technology from the Kafka ecosystem, is usually used with connectors capable of pulling data from relational databases either using a polling query or via CDC (Change-Data Capture).

This scenario check if an Apache Pulsar broker with KoP enabled can be used as the data stream layer for Kafka Connect, which uses a connector from Debezium to stream data changes made in near real-time in a MySQL database. The validation process is very simple: you just need to set this up as you would using Apache Kafka — but use Apache Pulsar with KoP enabled instead. Everything must work as advertised.

1️⃣ Start the containers for this scenario

sh start-cdc-with-debezium.sh

👀 You must wait until the containers mysql and connect are healthy to proceed with the next step.

2️⃣ Connect with the MySQL database and check the data

docker compose -f cdc-with-debezium/docker-compose.yml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD cdc_with_debezium'

Then, in the MySQL shell, execute the following command:

select * from customers;

You should see an output like this:

+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+

3️⃣ Deploy the Debezium connector

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @cdc-with-debezium/mysql-connector.json

To verify if the connector has been sucessfully deployed, execute the following command:

curl -X GET http://localhost:8083/connectors/cdc-with-debezium

You should see an output like this:

{"name":"cdc-with-debezium","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.user":"debezium","database.server.id":"184054","tasks.max":"1","database.hostname":"mysql","database.password":"dbz","database.history.kafka.bootstrap.servers":"kafka-1:9092,kafka-2:9093","database.history.kafka.topic":"dbhistory.cdc_with_debezium","name":"cdc-with-debezium","database.server.name":"mysql","database.port":"3306","database.include.list":"cdc_with_debezium"},"tasks":[{"connector":"cdc-with-debezium","task":0}],"type":"source"}

4️⃣ Use the kafka-console-consumer to monitor data streams

$KAFKA_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic mysql.cdc_with_debezium.customers

👀 Leave this console open so you can see the data coming in.

5️⃣ Insert a new record into the customers table.

docker compose -f cdc-with-debezium/docker-compose.yml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD cdc_with_debezium'

Then, in the MySQL shell, execute the following command:

insert into customers values (1006, "Ricardo", "Ferreira", "riferrei@riferrei.com");

6️⃣ Look to the kafka-console-consumer output. You should see an output like this:

Struct{after=Struct{id=1006,first_name=Ricardo,last_name=Ferreira,email=riferrei@riferrei.com},source=Struct{version=1.9.3.Final,connector=mysql,name=mysql,ts_ms=1660957053000,db=cdc_with_debezium,table=customers,server_id=1,file=binlog.000002,pos=406,row=0,thread=36},op=c,ts_ms=1660957053478}

#️⃣ Stop the containers from this scenario.

sh stop-cdc-with-debezium.sh

Alternatively, you can also stop all containers if you're done for the day.

sh stop-everything.sh

This scenario was created based on the following tutorial:

🧑🏻‍💻 https://github.com/debezium/debezium-examples/tree/main/tutorial#using-mysql

✅ Scenario: Stream processing using ksqlDB

Another popular use case for Apache Kafka is using it as a persistent layer for stream processing pipelines. The idea is to have events being stored into Kafka, and stream processing technologies such as Kafka Streams, ksqlDB, Apache Flink be able to execute event processing pipelines that compute near real-time results as new events arrive.

This scenario check if an Apache Pulsar broker with KoP enabled can be used as the data stream layer for ksqlDB, which implements a stream processing pipeline that flattens out events with a complex nested layout and also changes the data format from JSON to Protobuf. The validation process is very simple: you just need to set this up as you would using Apache Kafka — but use Apache Pulsar with KoP enabled instead. Everything must work as advertised.

1️⃣ Start the containers for this scenario

sh start-stream-processing.sh

👀 You must wait until the containers schema-registry and ksqldb-server are healthy to proceed with the next step.

2️⃣ Connect with the ksqlDB server via CLI:

$KAFKA_HOME/bin/ksql http://localhost:8088

Then, in the ksqlDB shell, execute the following command:

RUN SCRIPT 'stream-processing/statements.sql';

Once this command finishes, you should have two streams created. Before moving further, it is a good idea to check if they were really created by executing the following command:

SHOW STREAMS;

You should see an output like this:

 Stream Name      | Kafka Topic      | Key Format | Value Format | Windowed 
----------------------------------------------------------------------------
 FLATTENED_ORDERS | FLATTENED_ORDERS | KAFKA      | PROTOBUF     | false    
 ORDERS           | ORDERS           | KAFKA      | JSON         | false    
----------------------------------------------------------------------------

3️⃣ Execute a continuous query

SELECT
    ORDER_ID,
    ORDER_TS,
    ORDER_AMOUNT,
    CUST_FIRST_NAME,
    CUST_LAST_NAME,
    CUST_PHONE_NUMBER,
    CUST_ADDR_STREET,
    CUST_ADDR_NUMBER,
    CUST_ADDR_ZIPCODE,
    CUST_ADDR_CITY,
    CUST_ADDR_STATE,
    PROD_SKU,
    PROD_NAME,
    PROD_VENDOR_NAME,
    PROD_VENDOR_COUNTRY
FROM FLATTENED_ORDERS
EMIT CHANGES;

👀 This query never stops, unless you press Cmd+C to interrupt its execution. Leave this console open so you can see the data coming in.

4️⃣ Ingest data into the input topic

$KAFKA_HOME/bin/kafka-console-producer --bootstrap-server localhost:9092 --topic ORDERS

Copy-and-paste the events below to the kafka-console-producer CLI:

{"id": "1", "timestamp": "2020-01-18 01:12:05", "amount": 84.02, "customer": {"firstName": "Ricardo", "lastName": "Ferreira", "phoneNumber": "1234567899", "address": {"street": "Street SDF", "number": "8602", "zipcode": "27640", "city": "Raleigh", "state": "NC"}}, "product": {"sku": "P12345", "name": "Highly Durable Glue", "vendor": {"vendorName": "Acme Corp", "country": "US"}}}
{"id": "2", "timestamp": "2020-01-18 01:35:12", "amount": 84.02, "customer": {"firstName": "Tim", "lastName": "Berglund", "phoneNumber": "9987654321", "address": {"street": "Street UOI", "number": "1124", "zipcode": "85756", "city": "Littletown", "state": "CO"}}, "product": {"sku": "P12345", "name": "Highly Durable Glue", "vendor": {"vendorName": "Acme Corp", "country": "US"}}}
{"id": "3", "timestamp": "2020-01-18 01:58:55", "amount": 84.02, "customer": {"firstName": "Robin", "lastName": "Moffatt", "phoneNumber": "4412356789", "address": {"street": "Street YUP", "number": "9066", "zipcode": "BD111NE", "city": "Leeds", "state": "YS"}}, "product": {"sku": "P12345", "name": "Highly Durable Glue", "vendor": {"vendorName": "Acme Corp", "country": "US"}}}
{"id": "4", "timestamp": "2020-01-18 02:31:43", "amount": 84.02, "customer": {"firstName": "Viktor", "lastName": "Gamov", "phoneNumber": "9874563210", "address": {"street": "Street SHT", "number": "12450", "zipcode": "07003", "city": "New Jersey", "state": "NJ"}}, "product": {"sku": "P12345", "name": "Highly Durable Glue", "vendor": {"vendorName": "Acme Corp", "country": "US"}}}

5️⃣ Verify if the pipeline executed correctly

Go back to the continuous query that you started on step 3️⃣. With new events arriving, you should see that the query generated an output similar to this:

+-----------+--------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------+----------------------+
|ORDER_ID   |ORDER_AMOUNT  |CUST_FIRST_NAME  |CUST_LAST_NAME   |CUST_PHONE_NUMBER|CUST_ADDR_STREET |CUST_ADDR_NUMBER |PROD_SKU    |PROD_NAME             |
+-----------+--------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------+----------------------+
|1          |84.02         |Ricardo          |Ferreira         |1234567899       |Street SDF       |8602             |P12345      |Highly Durable Glue   |
|2          |84.02         |Tim              |Berglund         |9987654321       |Street UOI       |1124             |P12345      |Highly Durable Glue   |
|3          |84.02         |Robin            |Moffatt          |4412356789       |Street YUP       |9066             |P12345      |Highly Durable Glue   |
|4          |84.02         |Viktor           |Gamov            |9874563210       |Street SHT       |12450            |P12345      |Highly Durable Glue   |

💡 The actual output includes more columns that what is shown above.

#️⃣ Stop the containers from this scenario

sh stop-stream-processing.sh

Alternatively, you can also stop all containers if you're done for the day.

sh stop-everything.sh

This scenario was created based on the following tutorial:

🧑🏻‍💻 https://developer.confluent.io/tutorials/flatten-nested-data/ksql.html

License

This project is licensed under the Apache 2.0 License.