/kafkaquery

A CLI that allows SQL querying of JSON data from Kafka topics.

Primary LanguageScalaApache License 2.0Apache-2.0

KafkaQuery

KafkaQuery allows to process JSON data stored in Kafka with the help of Flink SQL in a streaming fashion.

asciinema page

Quick intro

KafkaQuery offers you to create a schema for JSON data in Kafka, perform queries and output the processed results to the terminal, a local socket or another Kafka topic.

Download the latest KafkaQuery release here and extract its content.

Create a schema for your topic

--infer-schema person

Check the schema of the topic

--schema person

Query on the topic

--query "SELECT * FROM person"

Apply a filter operation and view the results in the terminal

--query "SELECT name FROM person WHERE age > 17"

or stream the processed data into a new topic

--query "SELECT name FROM person WHERE age > 17" --output kafka:adults

These are some essential features but there are more functionalities. The following sections go over getting started, creating a schema, querying and User-defined functions.

To pause and inspect any of the following examples follow their link to the respective Asciinema page.

Getting started

The following example will go through setting up and using KafkaQuery. We will consume data from an example topic called person which contains messages of the following format

{	
	"name":"John Smith",
	"age":32,
	"height":"172cm"
}

Download the latest KafkaQuery release here and extract its content.

(Alternatively build KafkaQuery yourself)

Clone the project and open it as a sbt project. Run sbt pack to create a package folder containing program launch scripts in the following directory: target/pack/bin/kafkaquery/bin

For any usage of KafkaQuery you need to execute the kafkaquery script that can be found in the bin folder.

Specify your ZooKeeper and Kafka addresses:

By either

Setting environment variables for your ZooKeeper and Kafka addresses:
Property Default value Environment variable name (optional)
Kafka Address localhost:9092 KAFKA_ADDR
ZooKeeper Address localhost:2181 ZK_ADDR

Or

Specifying your ZooKeeper and Kafka addresses for every execution:

Always append the following options to your command when running the program

--zookeeper <address> --kafka <address>


For information on the commands check out the help option or visit the wiki.

Your first query

asciinema page (Click on the gif to access the recording and copy paste the commands)

Creating a schema for your topic

An Avro Schema is needed for a topic to perform queries on it. KafkaQuery offers to either let your schema be generated or manually insert your own schema.

Inferring a schema

KafkaQuery can generate a schema for your topic based on the latest message:

--infer-schema <topic_name>

If the program terminates with Successfully generated schema for topic <topic_name> you can start querying that topic.

*While inferring, KafkaQuery might ask for your input to decide whether a construct should be considered a JSON object or map.

*Any numeric value will be mapped to type long

An example:

A schema for topic color is needed.

The latest message in the topic is:

{
  "name":"Peter",
  "favoriteColors":{
                      "color1":"Red",
                      "color2":"Blue",
                      "color3":"Black"
                    }
}

Run the program with the following option:

--infer-schema color
Should this be a map (m) or an object (o)?
{
  "color1" : "Red",
  "color2" : "Blue",
  "color3" : "Black"
}
Please insert one of the following characters: m, o

m

Successfully generated schema for topic color

In this case, using a map (m) is appropriate.

Verify the schema with the following option:

--schema color
{
  "type" : "record",
  "name" : "color",
  "namespace" : "infer",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "favoriteColors",
    "type" : {
      "type" : "map",
      "values" : "string"
    }
  } ]
}

The topic color can now be queried on and has the fields name and favoriteColors.

Manual schema insertion

Sometimes inferring a schema does not yield the expected result.

For that case, it is possible to manually insert a schema:

--update-schema:<topic_name>=<avro_Schema_file>

Flink's data type mapping is quite helpful when deciding for data types for your own schema.

An example:

Renaming a field of topic person.

Current schema
{
  "type" : "record",
  "name" : "person",
  "namespace" : "infer",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "age",
    "type" : "long"
  }, {
    "name" : "height",
    "type" : "string"
  } ]
}

Create a file containing the updated schema:

schema.txt
{
  "type" : "record",
  "name" : "person",
  "namespace" : "infer",
  "fields" : [ {
+   "name" : "surname",
    "type" : "string"
  }, {
    "name" : "age",
    "type" : "long"
  }, {
    "name" : "height",
    "type" : "string"
  } ]
}

Run the program with the following option:

--update-schema:person=path/to/schema.txt

The schema for topic person is updated now.

Querying

To perform any query make use of the --query <queryText> command.

Details on how to specify the actual query text can be found here (Flink SQL).

For querying the following options are available:


-t, --timeout <seconds> - Terminates the program once no new message has arrived for the specified duration.

Start strategy:

--start earliest - messages are processed from the earliest available offset (default).

--start latest - messages are processed from the latest available offset.

Output destinations:

--output kafka:<topic_name> - Writes results to the specified Kafka topic.

--output socket:<port> - Writes results to a local socket on the specified port.

*When not specified, results are printed to the terminal by default.


Example Usage: Count license occurrences hourly using tumbling windows with a timeout of 5s

asciinema page

User defined functions

KafkaQuery allows making use of Flink's User-defined Functions, short UDFs.

Example usage of UDF's

Create your function according to the documentation:

FeetToCm.java

import org.apache.flink.table.functions.ScalarFunction;

public class FeetToCm extends ScalarFunction {

    /**
     * Evaluates the argument height. Converts heights ending with 'ft' to centimeters.
     * @param height
     * @return height in centimeters
     */
    public String eval(String height) {
        if(height.endsWith("ft")) {
            return (Double.parseDouble(height.substring(0, height.length()-2)) * 30.48) + "cm";
        }
        return height;
    }
}

asciinema page (Click on the gif to access the recording and copy paste the commands)

Usage of external libraries in UDFs

If a UDF makes use of external libraries make sure to add jars containing all necessary dependencies in the udf_dependencies folder.

Authors

Developed by Abele Mălan, Jakub Nguyen, Daniel van den Akker, Christiaan Botha and Ayush Patandin.