spring-cloud-stream-aggregate-sample
This repo is a collection of following individual maven modules.
- Netty TCP Source
- Payload Deserializer
- Spring Cloud Stream Aggregate Application that combines the above 2 modules to form a protocol server
- A demo app to send data to the Netty TCP source
Netty TCP Source
Read more about the Netty TCP souce in the netty project.
Netty is a good choice for writing protocol servers. The Spring Cloud Stream Netty TCP source supports both binary and textual data using the various decoders provided by Netty out of the box.
Payload Deserializer
This is a Spring Cloud Stream Processor using the Spring Integration's PayloadDeserializingTransformer. It receives data through an incoming channel, process the data and put the deserialized data into the output channel. One important thing about this deseriallizer is that the end users are expected to provide a deserializer that implements Spring's Deserializer. Otherwise, by default it provides a no-op echo deserializer that produces the same byte array passed to it.
Aggregate Application
The aggregate application combines the apps produced from the above 2 components into a single application. In the payload-deserializer app, it customizes to provide two deserializers each one to be activated based on the following properties.
- payload.deserializer.text
- payload.deserializer.binary
one for deserializing textual data and other for binary.
The binary deserializer deserializes a contrived protocol that has the following structure.
Length field (4 bytes) | count for text fields | text field 1 length | text field 1 | ....| text field n length | text field n | Numeric field (4 bytes)
Steps to run the Aggregate application
-
Clone this repo
-
cd netty
-
./mvnw clean install (-PgenerateApps to produce the standalone middleware based apps)
-
cd ../payload-deserializer
-
./mvnw clean install (-PgenerateApps to produce the standalone middleware based apps)
-
cd ../netty-ingester-aggregate-kafka-10
-
./mvnw clean package
-
java -jar target/netty-ingester-aggregate-kafka-10-1.0.0.BUILD-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=fox --netty.tcp.port=29001 --netty.tcp.decoder=LENGTH --netty.tcp.lengthFieldOffset=0 --netty.tcp.lengthFieldLength=4 --netty.tcp.maxFrameLength=1024 --payload.deserializer.binary=true
Quick explanation of the properties passed:
spring.cloud.stream.bindings.output.destination=fox - Bind the output from the source to a destination named fox on the broker.
netty.tcp.port=29001 - The port that Netty TCP server will listen for incoming data
netty.tcp.decoder=LENGTH - Netty decoder to user
netty.tcp.lengthFieldOffset=0 - Where doesn length field begins in the frame
netty.tcp.lengthFieldLength=4 - Length of the length field
netty.tcp.maxFrameLength=1024 - Maximum length of the incoming TCP frame that Netty consumes before passing it downstream
payload.deserializer.binary=true - Tell the aggregate application to choose the binary deserializer
Now, run a standalone log sink application as below:
java -jar target/log-sink-kafka-10-1.2.0.BUILD-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=fox
Push data to the Netty TCP server
cd push-data-tcp
This is a small spring boot app that sends data to the Netty TCP server.
./mvnw clean package
java -jar target/push-data-tcp-1.0.0.BUILD-SNAPSHOT.jar 100
This sends 100 binary messages to the Netty TCP server based on the fictional binary protocol above.
You should see all those messages sent in the log sink console.
Running Line based data to the Netty TCP Server
java -jar target/netty-ingester-aggregate-kafka-10-1.0.0.BUILD-SNAPSHOT.jar --netty.tcp.port=29001 --netty.tcp.decoder=LINE --netty.tcp.maxLineLength=256 --spring.cloud.stream.bindings.output.destination=fox --payload.deserializer.text=true
On another terminal do the following:
echo "hello netty tcp source" | nc localhost 29001
You should see this message coming through the log sink console