spring-cloud/spring-cloud-function

Null returned value with function composition throws an NPE in between functions

mborgraeve opened this issue · 6 comments

Hi,

So, I'm not sure if this is an actual issue or if it is designed that way, feel free to let me know of course !

Describe the issue
We are using Spring Cloud Stream with function composition.
In some cases, we want to continue processing some message, in some cases we don't.

When we don't, we are able to return a null value from the last function in the composition, and no message outputs the chain of function.

The issue appear when a function returns a null but isn't the last one in the chain.

To Reproduce
I've got a minimal project demonstrating this in two integration tests at https://github.com/mborgraeve/demo-scs-bean-loop/tree/npe

For example:

  • Some value -> PassThroughFunction|NullReturningFunction-> No issue, no message output
  • Some value -> NullReturningFunction|PassThroughFunction -> we have an NPE stack trace (and still no message output).

Version of the framework
This happens in 2022.0.3, 2022.0.4, 2023.0.0.

Expected behavior
We wouldn't expect a NPE stack trace, and would expect the message to be silently dropped.
Screenshots

Additional context
For reference, the stacktrace is copied below. It looks like it tries to convert the message, but fail because it's null

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2921) ~[spring-kafka-3.1.1.jar:3.1.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2862) ~[spring-kafka-3.1.1.jar:3.1.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2826) ~[spring-kafka-3.1.1.jar:3.1.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2744) ~[spring-kafka-3.1.1.jar:3.1.1]
	at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.12.2.jar:1.12.2]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2742) ~[spring-kafka-3.1.1.jar:3.1.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2595) ~[spring-kafka-3.1.1.jar:3.1.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2481) ~[spring-kafka-3.1.1.jar:3.1.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2123) ~[spring-kafka-3.1.1.jar:3.1.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1478) ~[spring-kafka-3.1.1.jar:3.1.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1442) ~[spring-kafka-3.1.1.jar:3.1.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1313) ~[spring-kafka-3.1.1.jar:3.1.1]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: org.springframework.kafka.KafkaException: Failed to execute runnable
	at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:82) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:457) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:422) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2848) ~[spring-kafka-3.1.1.jar:3.1.1]
	... 12 common frames omitted
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@3b9906f1]
	at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-6.2.1.jar:6.2.1]
	at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108) ~[spring-integration-core-6.2.1.jar:6.2.1]
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-6.2.1.jar:6.2.1]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132) ~[spring-integration-core-6.2.1.jar:6.2.1]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-6.2.1.jar:6.2.1]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.2.1.jar:6.2.1]
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.2.1.jar:6.2.1]
	at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378) ~[spring-integration-core-6.2.1.jar:6.2.1]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:332) ~[spring-integration-core-6.2.1.jar:6.2.1]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302) ~[spring-integration-core-6.2.1.jar:6.2.1]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.1.3.jar:6.1.3]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.1.3.jar:6.1.3]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.1.3.jar:6.1.3]
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.1.3.jar:6.1.3]
	at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262) ~[spring-integration-core-6.2.1.jar:6.2.1]
	at io.micrometer.observation.Observation.observe(Observation.java:499) ~[micrometer-observation-1.12.2.jar:1.12.2]
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262) ~[spring-integration-core-6.2.1.jar:6.2.1]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:391) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.lambda$onMessage$0(KafkaMessageDrivenChannelAdapter.java:460) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
	at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.lambda$doWithRetry$0(KafkaInboundEndpoint.java:77) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:335) ~[spring-retry-2.0.5.jar:na]
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:227) ~[spring-retry-2.0.5.jar:na]
	at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:70) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
	... 15 common frames omitted
Caused by: java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "input" is null
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertNonMessageInputIfNecessary(SimpleFunctionRegistry.java:1283) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputIfNecessary(SimpleFunctionRegistry.java:1121) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:722) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$andThen$0(SimpleFunctionRegistry.java:646) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:725) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:577) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
	at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:88) ~[spring-cloud-stream-4.0.3.jar:4.0.3]
	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:789) ~[spring-cloud-stream-4.0.3.jar:4.0.3]
	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:624) ~[spring-cloud-stream-4.0.3.jar:4.0.3]
	at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.2.1.jar:6.2.1]
	... 36 common frames omitted

@mborgraeve To handle situations like these, you can rely on the KafkaNull object, which gives you the same result without that NPE. Can you try your NulledReturnProcessor as below?

@Service
public class NulledReturnProcessor implements Processor {

    public Object process(Object input) {
        return KafkaNull.INSTANCE;
    }
}

Frameworks are aware of the KafkaNull type and handle it appropriately without causing the NPE.

Thanks for your answer !
Ah, that's interesting. This trick works in my demo project.

BUT.

I oversimplified the demo project, we don't actually return Object in the signature, we have a domain object, so are unable to return this type, and some cases where we don't use Kafka but Rabbit, and would like to avoid pulling a Kafka dependency.

We can tweak our framework to handle Optionals all the way in every function and composition, but that feels like a hack.

I digged a bit into the Spring Function composition (as in use the compose or andThen methods on Functions), and it seems like it should be able to handle null return values.

@mborgraeve In that case, I think this needs to be addressed from Spring Cloud Function as that is the framework that SCSt uses to build its foundations. I will transfer the issue there.

@olegz What do you think about this request? I think this is doable and a reasonable one to address.

I would be glad to execute it if directed a bit.

I don't see an issue with function returning null and the fix appears to be quite simple.
Now, keep in mind that if you are returning Message you won't be able to have payload as null.