- Java 11
- Maven
- Docker ( or Kafka )
First start up the Dockerized Kafka instance. This will start up Kafka and make it available on port 29092.
docker-compose up
Start up the Consumer application.
cd spring-kafka-consumer
mvn clean install
mvn spring-boot:run
This will build and start the Consumer application. The Consumer will wait for a maximum of 10 minutes for a message before shutting down.
Producer
cd spring-kafka-producer
mvn clean install
mvn spring-boot:run
This will build and start the producer application. The producer will send a message to the kafka queue then shutdown.
For this vulnerability to succeed either or both of the following flags need to be enabled on the consumer.
CheckDeserExWhenValueNull
CheckDeserExWhenKeyNull
This is done in the Consumer application under KafkaConsumerConfig.greetingKafkaListenerContainerFactory()
There are two payloads that can be triggered, by default the RCE payload is used.
To enable the DOS payload, modify the method KafkaApplication.sendGreetingMessage()
Change the payload that is added as a header to dosPayload
and rebuild and run the producer.
NOTE : As the DOS happens as the message is read, and the read never finishes the message remains in the queue, until either deleted manually or the
message retention time expires. This increases the potency of the DOS as it renders that queue useless until either manual intervention occurs, or the retention time expires.
Potentially leading to data loss for messages sent immediately after the DOS message.
To enable the DOS payload, modify the method KafkaApplication.sendGreetingMessage()
Change the payload that is added as a header to rcePayload
and rebuild and run the producer. By default the command that is run is
touch /tmp/newfile
to see if the attack was successful look for a file named newfile under /tmp.
If running on Windows, you can modify the command string to something more suitable.
This Gadget is very much just a POC. For a RCE to occur in the real world, there would need to be a gadget class available on the Consumers class path.
The Denial of Service does not require there to be any specific gadget class available on the Consumers class path. It relies on generating a modified version of the class
org.springframework.kafka.support.serializer.DeserializationException
that contains a Object.
This makes it easy to then add whichever payload we like into the serialized object.
This modified class is called xrg.springframework.kafka.support.serializer.DeserializationException
( Note the x at the start of the package name).
Once the payload is injected, in this case a billion laughs style attack using java.util.Set
and java.lang.Object
it is serialized.
then the binary data is modified to change the x to o matching what the consumer expects.
This serialized exception class is then added as a message header in both the springDeserializerExceptionValue
header and the springDeserializerExceptionKey
header.
These are then read by the consumer if either the key or message is null. After that just ensure the key or message is null and the consumer will read it.
There is some deserialization protection within Spring-Kafka. In ListenerUtils.
public static DeserializationException byteArrayToDeserializationException(LogAccessor logger, byte[] value) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(value)) {
boolean first = true;
@Override
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
if (this.first) {
this.first = false;
Assert.state(desc.getName().equals(DeserializationException.class.getName()),
"Header does not contain a DeserializationException");
}
return super.resolveClass(desc);
}
};
return (DeserializationException) ois.readObject();
}
catch (IOException | ClassNotFoundException | ClassCastException e) {
logger.error(e, "Failed to deserialize a deserialization exception");
return null;
}
}
You can see a check is made to ensure that the top level class is org.springframework.kafka.support.serializer.DeserializationException
. But note only the top level class is checked, and then only the class name ( which is within the attackers ability to modify ). So any payload below that level is deserialized.