/kafka-streams-event-absence-controller

Detect and alert if an active device is not sending heartbeats

Primary LanguageJavaApache License 2.0Apache-2.0

kafka-streams-event-absence-controller

  • deviceTopic: the activated devices are sent to this topic each message is <deviceId, "active">, a tombstone is sent if the device is deactivated
  • heartbeatTopic: each device sends a heartbeat to this topic. Message format <deviceId, time in milliseconds>
  • alertTopic: topic where the alerts about disconnected active devices are sent. The message is resent until the device ir reconnected or deactivated.
  • mainHeartbeatTopic: technical topic where the heartbeat control message is sent. Each time a message is sent here it is broadcasted to all partitions in broadcastMainHeartbeatTopic.
  • broadcastMainHeartbeatTopic: it has the same number of partitions than deviceTopic (needed to trigger the join.

DeviceAlert Class A device that is in the deviceTopic should send 1 heartbeat each 10 seconds and in case it sends less than 3 heartbeats in 30 seconds, an alert is sent.

DeviceAlert2 Class A device should send 1 heartbeat each 10 seconds and in case it sends less than 3 heartbeats in 30 seconds, an alert is sent. If a device never sent a goodWindow (3 heartbeats), it is ignored.

start

    docker-compose down -v
    docker-compose up -d
    mvn clean package
    kafka-topics --bootstrap-server localhost:29092 --topic deviceTopic --create --partitions 3
    kafka-topics --bootstrap-server localhost:29092 --topic heartbeatTopic --create --partitions 3
    kafka-topics --bootstrap-server localhost:29092 --topic alertTopic --create --partitions 3
    kafka-topics --bootstrap-server localhost:29092 --topic broadcastMainHeartbeatTopic --create --partitions 3
    kafka-topics --bootstrap-server localhost:29092 --topic mainHeartbeatTopic --create --partitions 1
    kafka-topics --bootstrap-server localhost:29092 --list  

device alert (1st shell)

    java -classpath target/event-absence-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.demo.streams.DeviceAlert
    
    or
    
     java -classpath target/event-absence-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.demo.streams.DeviceAlert2

heartbeat producer (2nd shell)

    java -classpath target/event-absence-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.demo.streams.HeartBeatProducer 

device producer (3rd shell)

    java -classpath target/event-absence-1.0.0-SNAPSHOT-jar-with-dependencies.jar com.demo.streams.DeviceProducer 

Data is there (4th shell)

  1. Add devices
  2. Add devices to hearbeat producer
    kafka-console-consumer --bootstrap-server localhost:29092 --from-beginning --property print.key=true --topic broadcastMainHeartbeatTopic --partition 0
    kafka-console-consumer --bootstrap-server localhost:29092 --from-beginning --property print.key=true --topic broadcastMainHeartbeatTopic --partition 1
    kafka-console-consumer --bootstrap-server localhost:29092 --from-beginning --property print.key=true --topic broadcastMainHeartbeatTopic --partition 2
    kafka-console-consumer --bootstrap-server localhost:29092 --from-beginning --property print.key=true --topic deviceTopic 
    kafka-console-consumer --bootstrap-server localhost:29092 --from-beginning --property print.key=true --topic heartbeatTopic 
    kafka-console-consumer --bootstrap-server localhost:29092 --from-beginning --property print.key=true --topic alertTopic

References: