spring-cloud/stream-applications

Exception: Dispatcher has no subscribers for channel 'application-1.input'.

akbar-deloitte opened this issue · 1 comments

Hello SCDF team,
Just wanted to check on an exception thrown by an filter application while processing. While processing, one of the kafka instances was down but other instances were up and running but still the filter application was throwing below exception. After some time the failed kafka instance was also up and running but the filter application keep on throwing the same exception, until we restart the stream. After restarting the stream, it started functioning correctly. So just wanted to check out couple of things like, scdf applications should be able to re-establish the connection with kafka, or it should switch the kafka node on failure. Also what does this exception means : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.input'.

2023-01-25 06:51:15.465 ERROR [filter-processor,a5f03c95af0873fe5256c0364520571a,47fd717869b5efa9] 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.input'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[282], headers={content-length=349, http_requestMethod=POST, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=[SOURCE-TOPIC-NAME], accept=*/*, target-protocol=kafka, authorization=Bearer [TOKEN], b3=a5f03c95af0873fe5256c0364520571a-9706bd89dd573696-0, nativeHeaders={}, skip-input-type-conversion=false, kafka_offset=31854652, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4fb49de5, host=[HOSTNAME], http_requestUrl=http://[HOSTNAME]/api/dataloader, id=7c392e68-0c98-3c68-a5bc-a8a83634f51a, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1674629475460, accept-encoding=gzip, deflate, br, kafka_groupId=[KAFKA-GROUP], user-agent=PostmanRuntime/7.30.0, timestamp=1674629475464}], failedMessage=GenericMessage [payload=byte[282], headers={content-length=349, http_requestMethod=POST, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=[SOURCE-TOPIC-NAME], accept=*/*, target-protocol=kafka, authorization=Bearer [TOKEN], b3=a5f03c95af0873fe5256c0364520571a-9706bd89dd573696-0, nativeHeaders={}, skip-input-type-conversion=false, kafka_offset=31854652, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4fb49de5, host=[HOSTNAME], http_requestUrl=http://[HOSTNAME]/api/dataloader, id=7c392e68-0c98-3c68-a5bc-a8a83634f51a, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1674629475460, accept-encoding=gzip, deflate, br, kafka_groupId=[KAFKA-GROUP], user-agent=PostmanRuntime/7.30.0, timestamp=1674629475464}]
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:454)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:428)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[282], headers={content-length=349, http_requestMethod=POST, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=[SOURCE-TOPIC], accept=*/*, target-protocol=kafka, authorization=Bearer [TOKEN], b3=a5f03c95af0873fe5256c0364520571a-9706bd89dd573696-0, nativeHeaders={}, skip-input-type-conversion=false, kafka_offset=31854652, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4fb49de5, host=[HOSTNAME], http_requestUrl=http://[HOSTNAME]/api/dataloader, id=7c392e68-0c98-3c68-a5bc-a8a83634f51a, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1674629475460, accept-encoding=gzip, deflate, br, kafka_groupId=[KAFKA-GROUP], user-agent=PostmanRuntime/7.30.0, timestamp=1674629475464}]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
	... 23 more

So, we have just fixed a filter application from reactive to imperative since there is no easy way to handle interim errors, but reactive subscriber on that channel is cancelled anyway.
With imperative function we have a way to retry on a binder level.
Please, give a try of the 4.0.0-SNAPSHOT version of this filter application.