/stream

Spring Cloud Stream Pub/Sub

Primary LanguageJavaApache License 2.0Apache-2.0

ref: https://www.baeldung.com/spring-cloud-stream

# visit localhost:15672
docker run -it --hostname rabbit-server \
 --name my-rabbit \
 -v /opt/rabbitmq/var/lib/rabbitmq:/var/lib/rabbitmq \
 -p 5672:5672 -p 15671:15671 -p 15672:15672 \
 rabbitmq:3-management

get start

  1. add @EnableMyStreamListener annotation;
@SpringBootApplication
@EnableMyStreamListener({Processor.class, CustomListener.class, CustomProducer.class})
public static class StreamApplication {

}
  1. create custom source and sink;
public interface CustomListener {
    String ENTITIES_COMPANY = "entities-company";

    @Input(ENTITIES_COMPANY)
    SubscribableChannel listenOnEntitiesCompany();
}

public interface CustomProducer {
    String EVENTS_BOOKING_START = "events-booking-start";

    @Output(EVENTS_BOOKING_START)
    MessageChannel produceEventsBookingStart();
}
  1. add listeners
@MyStreamListener(value = Sink.INPUT, group = "group-a")
public void listen2(String msg) {
    logger.info("listen2 : {}", msg);
    Assert.assertTrue("hello world".equals(msg));
}

@StreamListener(CustomListener.ENTITIES_COMPANY)
public void listen1(String msg) {
    logger.info("listen1 : {}", msg);
    Assert.assertTrue("company created".equals(msg));
}

By default, you won't config destination/group/binder for individual listener. You can use @MyStreamListener annotation to custom destination/group/binder.

  1. application.yml
spring:
  application:
    name: stream-service
  cloud:
    stream:
      bindings:
        default:
          destination: staging
          group-prefix: golden-unicorn
          binder: rabbit
        output:
          destination: staging
          binder: rabbit
        events-booking-start:
          destination: staging
          binder: rabbit
      rabbit:
        bindings:
          input:
            consumer:
              binding-routing-key: entities.user
          output:
            producer:
              routing-key-expression: '"entities.user"'