Null returned value with function composition throws an NPE in between functions
mborgraeve opened this issue · 6 comments
Hi,
So, I'm not sure if this is an actual issue or if it is designed that way, feel free to let me know of course !
Describe the issue
We are using Spring Cloud Stream with function composition.
In some cases, we want to continue processing some message, in some cases we don't.
When we don't, we are able to return a null
value from the last function in the composition, and no message outputs the chain of function.
The issue appear when a function returns a null
but isn't the last one in the chain.
To Reproduce
I've got a minimal project demonstrating this in two integration tests at https://github.com/mborgraeve/demo-scs-bean-loop/tree/npe
For example:
- Some value -> PassThroughFunction|NullReturningFunction-> No issue, no message output
- Some value -> NullReturningFunction|PassThroughFunction -> we have an NPE stack trace (and still no message output).
Version of the framework
This happens in 2022.0.3, 2022.0.4, 2023.0.0.
Expected behavior
We wouldn't expect a NPE stack trace, and would expect the message to be silently dropped.
Screenshots
Additional context
For reference, the stacktrace is copied below. It looks like it tries to convert the message, but fail because it's null
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2921) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2862) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2826) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2744) ~[spring-kafka-3.1.1.jar:3.1.1]
at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.12.2.jar:1.12.2]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2742) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2595) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2481) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2123) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1478) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1442) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1313) ~[spring-kafka-3.1.1.jar:3.1.1]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: org.springframework.kafka.KafkaException: Failed to execute runnable
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:82) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:457) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:422) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2848) ~[spring-kafka-3.1.1.jar:3.1.1]
... 12 common frames omitted
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@3b9906f1]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:332) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.1.3.jar:6.1.3]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.1.3.jar:6.1.3]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.1.3.jar:6.1.3]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.1.3.jar:6.1.3]
at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262) ~[spring-integration-core-6.2.1.jar:6.2.1]
at io.micrometer.observation.Observation.observe(Observation.java:499) ~[micrometer-observation-1.12.2.jar:1.12.2]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:391) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.lambda$onMessage$0(KafkaMessageDrivenChannelAdapter.java:460) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.lambda$doWithRetry$0(KafkaInboundEndpoint.java:77) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:335) ~[spring-retry-2.0.5.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:227) ~[spring-retry-2.0.5.jar:na]
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:70) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
... 15 common frames omitted
Caused by: java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "input" is null
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertNonMessageInputIfNecessary(SimpleFunctionRegistry.java:1283) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputIfNecessary(SimpleFunctionRegistry.java:1121) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:722) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$andThen$0(SimpleFunctionRegistry.java:646) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:725) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:577) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:88) ~[spring-cloud-stream-4.0.3.jar:4.0.3]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:789) ~[spring-cloud-stream-4.0.3.jar:4.0.3]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:624) ~[spring-cloud-stream-4.0.3.jar:4.0.3]
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.2.1.jar:6.2.1]
... 36 common frames omitted
@mborgraeve To handle situations like these, you can rely on the KafkaNull
object, which gives you the same result without that NPE. Can you try your NulledReturnProcessor
as below?
@Service
public class NulledReturnProcessor implements Processor {
public Object process(Object input) {
return KafkaNull.INSTANCE;
}
}
Frameworks are aware of the KafkaNull
type and handle it appropriately without causing the NPE.
Thanks for your answer !
Ah, that's interesting. This trick works in my demo project.
BUT.
I oversimplified the demo project, we don't actually return Object in the signature, we have a domain object, so are unable to return this type, and some cases where we don't use Kafka but Rabbit, and would like to avoid pulling a Kafka dependency.
We can tweak our framework to handle Optional
s all the way in every function and composition, but that feels like a hack.
I digged a bit into the Spring Function composition (as in use the compose
or andThen
methods on Function
s), and it seems like it should be able to handle null return values.
@mborgraeve In that case, I think this needs to be addressed from Spring Cloud Function as that is the framework that SCSt
uses to build its foundations. I will transfer the issue there.
@olegz What do you think about this request? I think this is doable and a reasonable one to address.
I would be glad to execute it if directed a bit.
I don't see an issue with function returning null and the fix appears to be quite simple.
Now, keep in mind that if you are returning Message you won't be able to have payload as null.