- Docker.
- Spring boot 2.5.4 (Stable version at the time of this tuitorial).
- Java 8. (原例是java-11)
- Apache Kafka and zookeeper (Embedded from docker).
Go to root directory of project
$ docker-compose up -d
Start main class as spring boot application, the application will be successful. Here is the console looks like as the producer will run for every 5 seconds will generate a random number and publishes to final topic that is intended for consumption.
/Library/Java/JavaVirtualMachines/jdk-11.0.9.jdk/Contents/Home/bin/java -XX:TieredStopAtLevel=1 -noverify -Dspring.output.ansi.enabled=always -javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.......jar:/Users/ereshgorantla/Documents/Dev/gradle-6.5.1/caches/modules-2/files-2.1/com.google.code.findbugs/jsr305/3.0.2/25ea2e8b0c338a877313bd4672d3fe056ea78f0d/jsr305-3.0.2.jar com.cloud.stream.SpringCloudStreamFunctionalProgrammingApplication
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.5.4)
2021-08-26 12:40:26.830 INFO 2917 --- [ main] udStreamFunctionalProgrammingApplication : Starting SpringCloudStreamFunctionalProgrammingApplication using Java 11.0.9 on Ereshs-MacBook-Pro.local with PID 2917 (/Users/ereshgorantla/Documents/Dev/My Work/medium/spring-cloud/spring-cloud-stream-examples/spring-cloud-stream-functional-programming/build/classes/java/main started by ereshgorantla in /Users/ereshgorantla/Documents/Dev/My Work/medium/spring-cloud/spring-cloud-stream-examples)
2021-08-26 12:40:26.832 INFO 2917 --- [ main] udStreamFunctionalProgrammingApplication : No active profile set, falling back to default profiles: default
2021-08-26 12:40:27.362 INFO 2917 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-08-26 12:40:27.372 INFO 2917 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-08-26 12:40:27.420 INFO 2917 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-26 12:40:27.426 INFO 2917 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration' of type [org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationJmxConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-26 12:40:27.433 INFO 2917 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration' of type [org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-26 12:40:27.436 INFO 2917 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'mbeanServer' of type [com.sun.jmx.mbeanserver.JmxMBeanServer] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-26 12:40:27.447 INFO 2917 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-26 12:40:27.448 INFO 2917 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-08-26 12:40:27.651 INFO 2917 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9001 (http)
2021-08-26 12:40:27.658 INFO 2917 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2021-08-26 12:40:27.659 INFO 2917 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.52]
2021-08-26 12:40:27.725 INFO 2917 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2021-08-26 12:40:27.725 INFO 2917 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 866 ms
2021-08-26 12:40:28.188 INFO 2917 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Creating binder: kafka
2021-08-26 12:40:28.277 INFO 2917 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Caching the binder: kafka
2021-08-26 12:40:28.277 INFO 2917 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Retrieving cached binder: kafka
2021-08-26 12:40:28.304 INFO 2917 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.fizzBuzzProcessor-in-0' has 1 subscriber(s).
2021-08-26 12:40:28.305 INFO 2917 --- [ main] reactor.Flux.Map.1 : onSubscribe(FluxMap.MapSubscriber)
2021-08-26 12:40:28.306 INFO 2917 --- [ main] reactor.Flux.Map.1 : request(unbounded)
2021-08-26 12:40:28.311 INFO 2917 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.fizzBuzzConsumer-in-0' has 1 subscriber(s).
2021-08-26 12:40:28.474 INFO 2917 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel errorChannel
2021-08-26 12:40:28.527 INFO 2917 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel fizzBuzzProducer_integrationflow.channel#0
2021-08-26 12:40:28.539 INFO 2917 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel fizzBuzzProcessor-out-0
2021-08-26 12:40:28.558 INFO 2917 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel fizzBuzzProcessor-in-0
2021-08-26 12:40:28.562 INFO 2917 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel nullChannel
2021-08-26 12:40:28.567 INFO 2917 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel fizzBuzzProducer-out-0
2021-08-26 12:40:28.571 INFO 2917 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageChannel fizzBuzzConsumer-in-0
2021-08-26 12:40:28.577 INFO 2917 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageHandler fizzBuzzProducer_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2021-08-26 12:40:28.615 INFO 2917 --- [ main] o.s.i.monitor.IntegrationMBeanExporter : Registering MessageHandler _org.springframework.integration.errorLogger
2021-08-26 12:40:28.627 INFO 2917 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-08-26 12:40:28.627 INFO 2917 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2021-08-26 12:40:28.628 INFO 2917 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2021-08-26 12:40:28.644 INFO 2917 --- [ main] o.s.i.endpoint.ReactiveStreamsConsumer : started bean 'fizzBuzzProducer_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2021-08-26 12:40:28.645 INFO 2917 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Retrieving cached binder: kafka
2021-08-26 12:40:28.709 INFO 2917 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: numbers
2021-08-26 12:40:28.712 INFO 2917 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [localhost:9092]
--- [ main] org.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2021-08-26 12:40:29.186 INFO 2917 --- [ main] org.apache.kafka.common.metrics.Metrics : Metrics reporters closed
2021-08-26 12:40:29.187 INFO 2917 --- [ main] o.a.kafka.common.utils.AppInfoParser : App info kafka.consumer for consumer-anonymous.ac088754-30d4-45e9-87c3-071bc7a661d0-1 unregistered
2021-08-26 12:40:29.215 INFO 2917 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'numbers.anonymous.ac088754-30d4-45e9-87c3-071bc7a661d0.errors' has 1 subscriber(s).
2021-08-26 12:40:29.215 INFO 2917 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'numbers.anonymous.ac088754-30d4-45e9-87c3-071bc7a661d0.errors' has 0 subscriber(s).
2021-08-26 12:40:29.215 INFO 2917 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'numbers.anonymous.ac088754-30d4-45e9-87c3-071bc7a661d0.errors' has 1 subscriber(s).
2021-08-26 12:40:29.215 INFO 2917 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'numbers.anonymous.ac088754-30d4-45e9-87c3-071bc7a661d0.errors' has 2 subscriber(s).
2021-08-26 12:40:29.229 INFO 2917 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 100
org.apache.kafka.common.serialization.ByteArrayDeserializer
2021-08-26 12:40:29.389 INFO 2917 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.ac088754-30d4-45e9-87c3-071bc7a661d0-2, groupId=anonymous.ac088754-30d4-45e9-87c3-071bc7a661d0] (Re-)joining group
2021-08-26 12:40:29.390 INFO 2917 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.7.1
2021-08-26 12:40:29.390 INFO 2917 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 61dbce85d0d41457
2021-08-26 12:40:29.390 INFO 2917 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1629961829390
2021-08-26 12:40:29.390 INFO 2917 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-anonymous.4673ffb3-55d5-49fe-8cbb-9b8934ba3847-4, groupId=anonymous.4673ffb3-55d5-49fe-8cbb-9b8934ba3847] Subscribed to topic(s): fizz-buzz
.........
2021-08-26 12:40:32.422 INFO 2917 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.4673ffb3-55d5-49fe-8cbb-9b8934ba3847-4, groupId=anonymous.4673ffb3-55d5-49fe-8cbb-9b8934ba3847] Finished assignment for group at generation 1: {consumer-anonymous.4673ffb3-55d5-49fe-8cbb-9b8934ba3847-4-70428fe6-70b3-41d9-8280-b8105e0ce762=Assignment(partitions=[fizz-buzz-0])}
2021-08-26 12:40:32.480 INFO 2917 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.4673ffb3-55d5-49fe-8cbb-9b8934ba3847-4, groupId=anonymous.4673ffb3-55d5-49fe-8cbb-9b8934ba3847] Successfully synced group in generation Generation{generationId=1, memberId='consumer-anonymous.4673ffb3-55d5-49fe-8cbb-9b8934ba3847-4-70428fe6-70b3-41d9-8280-b8105e0ce762', protocol='range'}
....
2021-08-26 12:40:34.165 INFO 2917 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 61dbce85d0d41457
2021-08-26 12:40:34.165 INFO 2917 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1629961834165
2021-08-26 12:40:34.171 INFO 2917 --- [ad | producer-4] org.apache.kafka.clients.Metadata : [Producer clientId=producer-4] Cluster ID: jGpZSsM1QYyHKuc0Eu2XLA
2021-08-26 12:40:34.181 INFO 2917 --- [container-0-C-1] c.cloud.stream.kafka.KafkaConfiguration : Consumer Received : Fizz
2021-08-26 12:40:39.060 INFO 2917 --- [ parallel-1] reactor.Flux.Map.2 : onNext(7061)
2021-08-26 12:40:39.070 INFO 2917 --- [container-0-C-1] reactor.Flux.Map.1 : onNext(7061)
2021-08-26 12:40:39.079 INFO 2917 --- [container-0-C-1] c.cloud.stream.kafka.KafkaConfiguration : Consumer Received : 7061
2021-08-26 12:40:44.054 INFO 2917 --- [ parallel-1] reactor.Flux.Map.2 : onNext(1952)
2021-08-26 12:40:44.061 INFO 2917 --- [container-0-C-1] reactor.Flux.Map.1 : onNext(1952)
2021-08-26 12:40:44.067 INFO 2917 --- [container-0-C-1] c.cloud.stream.kafka.KafkaConfiguration : Consumer Received : 1952
2021-08-26 12:40:49.053 INFO 2917 --- [ parallel-1] reactor.Flux.Map.2 : onNext(4424)
2021-08-26 12:40:49.061 INFO 2917 --- [container-0-C-1] reactor.Flux.Map.1 : onNext(4424)
2021-08-26 12:40:49.067 INFO 2917 --- [container-0-C-1] c.cloud.stream.kafka.KafkaConfiguration : Consumer Received : 4424
-------------