Following the instructions to get set up on PWS with Spring Cloud Data Flow (SCDF), https://github.com/lshannon/spring-cloud-data-flow-setup, let's explore a more detailed example.
We can create a simple stream.
stream create --name m --definition "http | log"
We can test the stream.
http post --target https://clouqa-dataflow-server-ok8lm7b-words-http.cfapps.io/ --data "Hello World"
A Spring Boot component can be created to produce messages.
@Bean
public CommandLineRunner init(RestTemplate template) {
return (args) -> {
for (int i = 0; i < 10000; i++) {
HttpEntity<String> payload = new HttpEntity<String>("A letter for you sir! : " + i);
template.postForLocation("https://clouqa-dataflow-server-ok8lm7b-m2-http.cfapps.io/", payload);
}
};
}
Deploy the repository application with the transformer package. Then register the service.
app register --name simple-message-processor --type processor --uri https://processor-repository-roseolar-pauperage.cfapps.io/simple-mesage-processor-0.0.1-SNAPSHOT.jar
Create a new stream that uses the custom component.
stream create m2 --definition "http | simple-message-processor | log" --deploy
Lets create the streams for routing.
stream create --name l --definition ":log > log" --deploy
stream create --name d --definition ":db > jdbc --driver-class-name=org.postgresql.Driver --username=xxxxxx --password=xxxxxxx --url=jdbc:postgresql://stampy.db.elephantsql.com:5432/xxxxxx --jdbc.initialize=true --spring.datasource.maxActive=2 --spring.datasource.tomcat.max-active=2" --deploy
Now let's send the data to the appropriate stream using the routing key the processor added.
stream create m2 --definition "http | simple-message-processor | rabbit --routing-key-expression=#jsonPath(payload,'$.routingKey') --exchange-expression=#jsonPath(payload,'$.routingKey')" --deploy