/Flink2Kafka

A Flink applcation that demonstrates reading and writing to/from Apache Kafka with Apache Flink

Primary LanguageJava

There And Back Again

A Story of Apache Kafka & Apache Flink.

ProjectRA

Project Details

This project has heavily inspired by two existing efforts from Data In Motion's FLaNK Stack and Data Artisan's blog on stateful streaming applications. The goal of this project is to provide insight into connecting an Apache Flink applications to Apache Kafka.

Church of the FLaNK Stack

Flank

Data Artisan's

NYC Taxi Ride Data Set

Project Scope

This project includes the Apache Flink application code and NiFi flow required to get the data into and out Apache Kafka. It doesn't include installation steps NiFi, Kafka, or Flink, but links to installation documentations have been provided below.

Project Prerequisites

  1. Apache NiFi local server
  2. Apache Kafka with an empty topic called "rawinput" and "enriched"
  3. IntelliJ IDE installed with Scala plug-in installed
  4. A cloned copy of this Git repository

Project Set Up and Run

Apache NiFi to Apache Kafka Setup

With Apache NiFi, the records from the source CSV file will be converted into individual JSON records. These records will be written to an Apache Kafka topic called "rawInput".

  • In the NiFi UI, import the NiFi Flow template (XML file in this Git repo). For help, please review the following documentation. Cloudera Documemnetation Link.

  • Upload NiFi Flow template using the UI icons.

UploadTemp

  • To add NiFi Flow Template the to canvas, click on the "Add Template icon" in the NiFI UI.

AddTemp2UI

  • Select the NiFi FLow Template to add.

SelectTempToAdd

  • Once the NiFi template is loaded, the left side of the NiFi flow will look like this.

nifiFlow

  • Right click on the GetFileCSV processor, open Properties tab, and set the path to the source CSV file in the Input Directory option. Please note, the CSV file is located in the data directory of this Git repo.

readCSVFile

  • Right click on the SplitRecord processor, open Properties tab, and click on the CSVReader.

CP

  • Before the NiFi Flow will work, all of these services need to be enabled.

ESC0

ESC1

ESC2

  • Right click on the PublishKafkaRecord processor, open Properties tab, and verify the location of your Kafka broker and topic name.

prodKConf

  • Verify the JSON records are being written to rawInput Kafka topic. This can be accomplished with right side the NiFi flow. Once this has been verified please turn off Kafka Consumer processor.

conKRaw

  • Validate the JSON record in the Flow File

FFJson

Flink Application Development In IntelliJ

For Development purposes, a running Flink cluster isn't required for application development. This application was built inside of the IntelliJ IDE because it will stand up Flink when your application is running, and the shut it down. This of course isn't required, but it will does make your life easier.

Flink Application - Connect to Kafka Topic

Once JSON files are being written to the Kafka topic, Flink can create a connection to the topic and create a Flink table on top of it, which can later be queried with SQL. This Github repository contains a Flink application that demonstrates this capability.

Java Libraries Required

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

Define Flink Streaming Environment

  • In Flink, the following java code defines the Flink Stream Execution and Stream Table Environments
 //Class Member Static Variables
    static StreamExecutionEnvironment fsEnv;
    static StreamTableEnvironment fsTableEnv;
    static EnvironmentSettings fsSettings;

 // create execution environment
    fsSettings = EnvironmentSettings.newInstance()
       .useBlinkPlanner()
       .inStreamingMode()
       .withBuiltInCatalogName("default_catalog")
       .withBuiltInDatabaseName("default_database")
       .build();

    fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    
  // configure event-time and watermarks
    fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    fsEnv.getConfig().enableForceAvro();
    fsEnv.getConfig().setAutoWatermarkInterval(1000L);

  //Create Streaming Table Environment
     fsTableEnv  = StreamTableEnvironment.create(fsEnv, fsSettings);

Establish Flink Table Connection

  • In Flink, the following java code establishes a Flink Table connection with a Kafka topic. Please note, the schema has been set as JSON and the schema has been provided.
// create table environment
       fsTableEnv.connect(
               new Kafka()
                       .version("universal")
                       .topic("rawInput")
                       .startFromLatest()
                       .property("zookeeper.connect", "localhost:2181")
                       .property("bootstrap.servers", "localhost:9092")
                       .property("group.id", "test")
       )
       // declare a format for this system
       .withFormat(
          new Json()
       )
       // declare the schema of the table
       .withSchema(
          new Schema()
               .field("medallion", DataTypes.STRING())
               .field("licenseId", DataTypes.STRING())
               .field("pickUpTime", DataTypes.STRING())
               .field("dropOffTime", DataTypes.STRING())
               .field("trip_time_in_secs", DataTypes.BIGINT())
               .field("trip_distance", DataTypes.FLOAT())
               .field("pickUpLon", DataTypes.FLOAT())
               .field("pickUpLat", DataTypes.FLOAT())
               .field("dropOffLon", DataTypes.FLOAT())
               .field("dropOffLat", DataTypes.FLOAT())
               .field("payment_type", DataTypes.STRING())
               .field("fare_amount", DataTypes.FLOAT())
               .field("surcharge", DataTypes.FLOAT())
               .field("mta_tax", DataTypes.FLOAT())
               .field("tip_amount", DataTypes.FLOAT())
               .field("tolls_amount", DataTypes.FLOAT())
               .field("total", DataTypes.FLOAT())
       )
       .inAppendMode()
       // create a table with given name
       .createTemporaryTable("TaxiRides");
  • The Flink application will display the following if everything is working as expected.

KafkaRead

Query Flink Table Built On Kafka Topic

  • In Flink, the following Java code will query the newly established Flink Table and print to the screen
// define SQL query to compute average total per area and hour
    Table result = fsTableEnv.sqlQuery(
            "SELECT " +
                    " * " +
                    "FROM TaxiRides"
    );

    // convert result table into a stream and print it
    fsTableEnv.toAppendStream(result, Row.class).print();

Establish a Connection to Destination Kafka Topic

  • In Flink, the following java code will create a connection to a Kafka topic "enriched". Please note, the schema has been set as JSON and the schema has been provided.
 // create table environment
        fsTableEnv.connect(
            new Kafka()
            .version("universal")
            .topic("enriched")
            .startFromLatest()
            .property("zookeeper.connect", "localhost:2181")
            .property("bootstrap.servers", "localhost:9092")
            .property("group.id", "test")
        )
        // declare a format for this system
        .withFormat(
            new Json()
        )
        // declare the schema of the table
        .withSchema(
            new Schema()
                .field("medallion", DataTypes.STRING())
                .field("TimeStamp", DataTypes.TIMESTAMP(3) )
        )
        .inAppendMode()
        // create a table with given name
        .createTemporaryTable("KafkaSink");

Write to Kafka Topic From Flink Query

  • In Flink, the following code will write the query results to a Kafka topic that was established in the previous step.
// define SQL query to compute average total per area and hour
        Table result = fsTableEnv.sqlQuery(
                "SELECT " +
                " medallion, CURRENT_TIMESTAMP, " +
                " FROM  TaxiRides"
        );

        result.insertInto("KafkaSink");
  • The following output is expected in the application. Pleas note the last value in this images was removed from the code example.

KafkaRead

Apache Kafka to NiFi

Read from Kafka Topic "enriched"

  • In the NiFi UI, find the following section of the flow.

conFromKafka1

  • Validate the Kafka settings are correct.

conKConfig

  • Active the Consumer Kafka Processor and validate results.

Helpful Installation Links

Apache NiFi

Apache Kafka

Apache Flink

IntelliJ

Additional Helpful Links

Apache Flink

Apache Kafka + Apache Flink

Apache Flink and Apache Kafka Code Examples

Apache Kafka + Apache Druid

Additional Apache Project Install Links

Additional Apache Projects On Docker