/streams-mini-workshop

A mini version of a workshop I did during #Livestreams

Primary LanguageJavaMIT LicenseMIT

👨🏼‍💻 Workshop — Developing Event-driven Microservices with Spring Boot 🍃, Confluent Cloud ☁️, and Java ☕️.

Developing Event-driven Microservices with Spring Boot 🍃, Confluent Cloud ☁️, and Java ☕️.

Workshop prerequisites and setup

Prerequisites

Tip
You should have your login and password information handy provided by workshop organizers.

🏁 Implementing a Stream Processor with Kafka Streams

We will implement a Kafka Streams topology to process atomic transactions to any request submitted to the transaction-request topic.

Within the workshop project folder, you will find a transactions-processor subfolder representing a Kafka Streams application. Spring Boot and the spring-kafka project handled the boilerplate code required to connect to Kafka. This workshop will focus on writing a Kafka Streams topology with the function processing for our use case.

Atomic transaction processing with Kafka Streams

Our business requirement states that we must check whether the funds are sufficient for every request received before updating the balance of the account being processed. We should never have two transactions being processed at the same time for the same account. This would create a race condition for which we have no guarantee we can enforce the balance check before withdrawing funds.

The Data Generator writes transaction requests to the Kafka topic with a key equal to the transaction’s account number. Therefore, we can be sure all messages of an account will be processed by a single thread for our Transaction Service no matter how many instances are concurrently running.

Note
Kafka Streams won’t commit any message offset until it completes our business logic of managing a transaction request.
Transaction Service

Implement the Transaction Transformer

Because of our stream processor’s transaction nature, we require a specific component from Kafka Streams named a Transformer. This utility allows us to process events one by one while interacting with a State Store–another component of Kafka Streams that help us to persist our account balance in a local instance of an embedded database - RocksDB.

Open the io.confluent.developer.ccloud.demo.kstream.TransactionTransformer Java class and implement the transform function to return a TransactionResult based on the validity of the transaction request. The TransactionResult contains a success flag set to true if the funds were successfully updated.

The transform method also updates the store State Store. The class already has utility functions to help you execute our business logic.

TransactionTransformer.transform()
  @Override
  public TransactionResult transform(Transaction transaction) {

    if (transaction.getType().equals(Transaction.Type.DEPOSIT)) {
      return new TransactionResult(transaction,
                                   depositFunds(transaction),
                                   true,
                                   null);
    }

    if (hasEnoughFunds(transaction)) {
      return new TransactionResult(transaction, withdrawFunds(transaction), true, null);
    }

    log.info("Not enough funds for account {}.", transaction.getAccount());

    return new TransactionResult(transaction,
                                 getFunds(transaction.getAccount()),
                                 false,
                                 TransactionResult.ErrorType.INSUFFICIENT_FUNDS);
  }
Validate implementation by running TopologyTestDriver test

Run io.confluent.developer.ccloud.demo.kstream.TransactionTransformerTest to validate that implementation is correct.

Implement the Streaming Topology

In Kafka Streams, a Topology is the definition of your data flow. It’s a manifest for all operations and transformations to be applied to your data.

To start a stream processor, Kafka Streams only requires you to build a`Topology` and hand it over. Kafka Streams will take care of managing the underlying consumers and producers.

The io.confluent.developer.ccloud.demo.kstream.KStreamConfig Java class already contains all the boilerplate code required by Kafka Streams to start our processor. In this exercise, we will leverage a StreamsBuilder to define and instantiate a Topology that will handle our transaction processing.

Open the io.confluent.developer.ccloud.demo.kstream.KStreamConfig.defineStreams method and get ready to write your first Kafka Streams Topology.

Create a KStream from the source topic.

Use the stream method of streamsBuilder to turn a topic into a KStream.

KStream<String, Transaction> transactionStream = streamsBuilder.stream("transaction-request");

Leverage the Transformer to process our requests

To inform Kafka Streams that we want to update the funds State Store for all incoming requests atomically, we can leverage the transformValues operator to plugin our TransactionTransformer. This operator requires us to specify the funds State Store that the Transformer will use. This also instructs Kafka Streams to keep track of events from our transaction-request since they will result in a change of state for our store.

KStream<String, TransactionResult> resultStream = transactionStream.transformValues(() -> new TransactionTransformer(storeName), storeName);

Redirect the transaction result to the appropriate topic.

With a new derived stream containing TransactionResult, we can now use the information contained in the payload to feed a success or failure topic.

We will achieve this by deriving two streams from our resultStream. Each stream will be built by applying a filter and filterNot operator with a predicate on the success flag from our TransactionResult payload. With the two derived streams, we can explicitly call the to operator to instruct Kafka Streams to write the mutated events to their respective topics.

resultStream
  .filter(this::success)
  .to("transaction-successs");

resultStream
  .filterNot(this::success)
  .to("transaction-failed");

The implemented defineStreams method

Use this reference implementation to validate you have the right stream definition.

protected void defineStreams(StreamsBuilder streamsBuilder) {

    KStream<String, Transaction> transactionStream = streamsBuilder.stream(transactionRequestConfiguration.getName());

    final String storeName = fundsStoreConfig.getName();
    KStream<String, TransactionResult> resultStream = transactionStream.transformValues(() -> new TransactionTransformer(storeName), storeName);

    resultStream
        .filter(this::success)
        .to(transactionSuccessConfiguration.getName());

    resultStream
        .filterNot(this::success)
        .to(transactionFailedConfiguration.getName());
  }
Validate implementation by running TopologyTestDriver test

Run io.confluent.developer.ccloud.demo.kstream.KStreamConfigTest to validate that implementation is correct.

Running the Kafka Streams application

Note
If you are running the application from your IDE, launch the main method from io.confluent.developer.ccloud.demo.kstream.KStreamDemoApplication.

If you want to run with the CLI, you must build the application before launching it.

To build the application, run the following command:
./gradlew :transactions-processor:build
To run the application run the following command
java -jar transactions-processor/build/libs/kstreams-demo-0.0.1-SNAPSHOT.jar

Generate some transactions using the Data Generator endpoint

Ensure your Data Generator application is still running from the previous section.

The utility script scripts/generate-transaction.sh will let you generate transactions. Generate a few transactions using the following commands:

scripts/generate-transaction.sh 1 DEPOSIT 100 CAD
scripts/generate-transaction.sh 1 DEPOSIT 200 CAD
scripts/generate-transaction.sh 1 DEPOSIT 300 CAD
scripts/generate-transaction.sh 1 WITHDRAW 300 CAD
scripts/generate-transaction.sh 1 WITHDRAW 10000 CAD

scripts/generate-transaction.sh 2 DEPOSIT 100 CAD
scripts/generate-transaction.sh 2 DEPOSIT 50 CAD
scripts/generate-transaction.sh 2 DEPOSIT 300 CAD
scripts/generate-transaction.sh 2 WITHDRAW 300 CAD

The script will pass in the following arguments:

  • The account number.

  • The amount.

  • The type of operation (DEPOSIT or WITHDRAW).

  • The currency.

Monitor the successful transaction results

  1. Access Confluent Cloud user interface from https://confluent.cloud.

  2. From the main screen, navigate to the environment that looks like demo-env-<some-number>.

  3. Inside of the environment, you should see a cluster that looks like demo-kafka-cluster-<some-number>. On the left side, click on Topics.

  4. Click on the transaction-success topic and access the messages tab.

  5. Click on the offset textbox and type 0 and press enter to load all messages from partition 0 starting from offset 0.

You should see transaction-success events in the user interface. If you don’t see any messages, try your luck with partition 1 starting from offset 0.

Monitor the failed transaction results from Control Center

  1. Click on the topic tab from the cluster navigation menu.

  2. Select the transaction-failed topic and access the messages tab.

  3. Click on the offset textbox and type 0 and press enter to load all messages from partition 0 starting from offset 0.

You should see transaction-failed events in the user interface. If you don’t see any messages, try your lock with partition 1 starting from offset 0.

In the next section, we will explore how writing Stream Processor can be simplified with ksqlDB.

🎁 DEMO: Enrich transaction results with ksqlDB

I will show how to write a second Stream Processor to generate a detailed transaction statement enriched with account details.

We will leverage ksqlDB to declare a stream processor that will enrich our transaction data in real-time with our referential data coming from the account topic. The objective of this section is to show how you can use an SQL-like query language to generate streams processors like Kafka Streams, without having to compile and run any custom software.

Transaction Statements

Create the account table

ksqlDB is built on top of Kafka Streams. As such, the KStream and KTable are both key constructs for defining stream processors.

The first step requires us to instruct ksqlDB that we wish to turn the account topic into a Table. This table will allow us to join each transaction-success event with the latest account event of the underlying topic. Run the following command in your ksqlDB CLI terminal:

CREATE TABLE ACCOUNT (
  numkey string PRIMARY KEY,
  number INT,
  cityAddress STRING,
  countryAddress STRING,
  creationDate BIGINT,
  firstName STRING,
  lastName STRING,
  numberAddress STRING,
  streetAddress STRING,
  updateDate BIGINT
) WITH (
  KAFKA_TOPIC = 'account',
  VALUE_FORMAT='JSON'
);

Create the transaction-success stream

Before we create the Transaction Statement stream processor, we must also inform ksqlDB that we wish to turn the transaction-success into a Stream. Run the following command in your ksqlDB CLI terminal:

CREATE STREAM TRANSACTION_SUCCESS (
  numkey string KEY,
  transaction STRUCT<guid STRING, account STRING, amount DOUBLE, type STRING, currency STRING, country STRING>,
  funds STRUCT<account STRING, balance DOUBLE>,
  success boolean,
  errorType STRING
) WITH (
  kafka_topic='transaction-success',
  value_format='json'
);

Create the transaction statement stream

Now that we have all the ingredients of our Transaction Statement stream processor, we can now create a new stream derived from our transaction-success events paired with the latest data from the account topic. We will instruct ksqlDB to create a new stream as a query. By default, ksqlDB will publish any output to a new TRANSACTION_STATEMENT topic. The select query provides the details about with events to subscribe and which table to join each notification. The output of this new stream processor will be a mix of the transaction details coupled with all the matching account details. The key from transaction-success and account will be used as matching criteria for the LEFT JOIN command. EMIT CHANGES informs ksqlDB that the query is long-running and should be kept alive–as if it were a Kafka Streams application to be 100% available to process all events. Run the following command in your ksqlDB CLI prompt:

CREATE STREAM TRANSACTION_STATEMENT AS
  SELECT *
  FROM TRANSACTION_SUCCESS
  LEFT JOIN ACCOUNT ON TRANSACTION_SUCCESS.numkey = ACCOUNT.numkey
  EMIT CHANGES;

✅ It’s a wrap!

Congratulations! Now you know how to build event-driven microservices using Spring Boot, Kafka Streams, and ksqlDB.