spring-cloud/spring-cloud-dataflow

Consuming from multiple topics using single SCDF stream

Closed this issue · 6 comments

While creating a stream, we are allowed to give only a single topic. To override the destination, we use the spring.cloud.stream.bindings.input.destination property to set multiple Kafka topics. However, when attempting to do so, the following error message occurs, even though the streams still get deployed successfully.

Error Message:
115E:(pos 5): unexpected data in stream definition ',' :trip,duty > trip-office --management.endpoints.web.exposure.include=health,info,bindings,prometheus,metrics --management.metrics.export.prometheus.enabled=true --management.prometheus.metrics.export.rsocket.port=7001 --management.metrics.tags.instance.index=${vcap.application.instance_index:${spring.cloud.stream.instanceIndex:0}} --spring.cloud.stream.kafka.binder.configuration.max.poll.records=22 --wavefront.application.service=${spring.cloud.dataflow.stream.app.label:unknown}-${spring.cloud.dataflow.stream.app.type:unknown}-${vcap.application.instance_index:${spring.cloud.stream.instanceIndex:0}} --spring.cloud.stream.instanceCount=1 --management.metrics.tags.application.guid=${spring.cloud.application.guid:unknown} --management.metrics.tags.application.name=${vcap.application.application_name:${spring.cloud.dataflow.stream.app.label:unknown}} --management.metrics.export.prometheus.rsocket.host=spring-cloud-dataflow-prometheus-proxy --management.prometheus.metrics.export.rsocket.enabled=true --wavefront.application.name=${spring.cloud.dataflow.stream.name:unknown} --management.prometheus.metrics.export.enabled=true --management.metrics.tags.application.type=${spring.cloud.dataflow.stream.app.type:unknown} --management.metrics.tags.stream.name=${spring.cloud.dataflow.stream.name:unknown} --management.prometheus.metrics.export.rsocket.host=spring-cloud-dataflow-prometheus-proxy --management.metrics.tags.application=${spring.cloud.dataflow.stream.name:unknown}-${spring.cloud.dataflow.stream.app.label:unknown}-${spring.cloud.dataflow.stream.app.type:unknown} --management.metrics.export.prometheus.rsocket.port=7001 --management.metrics.export.prometheus.rsocket.enabled=true ^

Steps to Reproduce:
Create a stream using Spring Cloud Data Flow with a single topic.
Override the destination with the spring.cloud.stream.bindings.input.destination property to specify multiple Kafka topics, e.g., trip,duty.
Deploy the stream.

Spring Cloud Data Flow Version: 3.2.5

Create streams for each topic using kafa-source and send to tap
kafa > :MY_TAP
Then create a stream that uses tap to start your stream.
My later comment provides a solution.

But doing so, I will have to deploy 3 times if we have 2 topics.
I want to do this in one deployment.

Hello @ramit0407 ,
This will be a feature add to SCDF. I'll add it to our backlog.

@ramit0407

I you use kafka-source-kafka you will be able to specify multiple topics that already exist using property kafka.supplier.topics

You will need to register using Applications -> Add Applications -> from properties file

source.kafka.uri=springcloudstream/kafka-source-kafka:5.0.0
source.kafka.bootVersion=3
source.kafka.metadata=maven://org.springframework.cloud.stream.app:kafka-source-kafka:jar:metadata:5.0.0

Assuming your DSL was:

:trip,duty > x | y

It will now be:
kafka --kafka.supplier.topics=trip,duty | x | y

Closing this issue due to inactivity. If this has been closed in error please leave a comment letting us know to reopen it. Thank you.