- configurable filtering app for JSON objects, i.e. forward/drop objects in case a configured field in the payload matches a specific pattern
- Messages are String-encoded JSON without usage of schema registry
- As there is no further processing logic involved, String SerDes are used and messages are forwarded as such
- using maven:
mvn compile
- packaging to jar files:
mvn package
- Unit test for components and TTD tests are included
- included in maven build, separate test run:
mvn test
- The Kafka Streams application can handle a configurable number of filtering pipelines
- The application needs a few properties to be defined (also see the test configurations in folder src/test/resources/*.properties):
streamsFilter.sources = Source1,Source2 # Source 1 streamsFilter.Source1.inputTopic = topic.Source1.unfiltered streamsFilter.Source1.outputTopic = topic.Source1 streamsFilter.Source1.field = path.to.json.field streamsFilter.Source1.filterPattern = 42 streamsFilter.Source1.filterActionDrop = false # Source 2 streamsFilter.Source2.inputTopic = topic.Source2.unfiltered streamsFilter.Source2.outputTopic = topic.Source2 streamsFilter.Source2.field = path.to.json.field streamsFilter.Source2.filterPattern = dont_panic streamsFilter.Source2.filterActionDrop = true
- The Kafka Streams application should be configured for durability (default is availability/performance) to avoid data loss in case of crashes, also see Configuring a Streams Application. The following properties should be set:
acks=all replication.factor=3 num.standby.replicas=1
- A fat jar file is created in addition to avoid dependency issues that can be use for single-file-deployments
- One Kafka Streams application needs to be configured and started per pipeline
- The application is started with java command, e.g.
java -jar target/KStreamsFilterApp-0.1-jar-with-dependencies.jar <command-line arguments>
- Help on arguments:
java -jar target/KStreamsFilterApp-0.1-jar-with-dependencies.jar -h
- Example using a configuration file:
java -jar KStreamsFilterApp-0.1-jar-with-dependencies.jar --config-file streams_other.properties
- Example run with local Kafka broker and prometheus agent:
java -javaagent:target/jmx_prometheus_javaagent-0.17.2.jar=1234:configs/jmx_exporter_kafka_streams.yml -jar target/KStreamsFilterApp-0.1-jar-with-dependencies.jar -c examples/streams_localhost.properties
- Docker-based:
- Docker examples and run scrips can be found in docker.
- Nothing special - rather straightforward: run a fat jar in docker and mount configurations/certificates.
- Default configuration is built into the jar but can be overridden on the command line by passing a configuration file using
-Dlog4j.configuration
, example:java -jar target/KStreamsFilterApp-0.1-jar-with-dependencies.jar -Dlog4j.configuration=file:/path/to/log4jconfig.properties
- Logging is per default done on INFO level to STDOUT using slf4j-simple
- Per default no additional metrics are exposed.
- JMX remote monitoring is possible, you need to add the corresponding properties to the java command line, example for non-encrypted JMX without authentication on port 8888:
java -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.port=8888 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -jar target/KStreamsFilterApp-0.1-jar-with-dependencies.jar -c examples/streams_combined_TLS.properties
- Streams metrics, lag end e2e latency can be seen in the Confluent Control Center once enabling monitoring interceptors using the
--enable-monitoring-interceptor
argument. You will need to set the security for the interceptors explicitly, otherwise it will assume default bootstrap.servers without security (also see example configurations). - Prometheus JMX agent can be added with
-javaagent:<path-to-agent>/jmx_prometheus_javaagent-0.17.2.jar=1234:<path-to-config>/jmx_exporter_kafka_streams.yml