spring-projects/spring-data-redis

Switch `StreamMessageListenerContainer` default to not unsubscribe on simple exception

hein-hp opened this issue · 10 comments

While using Redis Stream, I encountered an issue where the consumer faced an exception while consuming, leading to the cancellation of subscription by the StreamMessageListenerContainer. I found that this was due to cancelSubscriptionOnError being triggered when encountering an exception. However, StreamReadRequest is package-private, so I have no way to modify cancelSubscriptionOnError. I'd like to inquire about how to handle consumer exceptions during consumption to maintain the subscription intact.

Have you noticed that StreamReadRequest is enclosed within an interface? That makes StreamReadRequest a public class allowing you to create a custom StreamReadRequest:

StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset).errorHandler(…).cancelOnError(…).build();

Let me know whether this helps.

Thank you for your answer, which helped me solve the problem. But I would like to ask, why is it designed this way, with the default being to cancel subscriptions when an exception occurs?

Errors caused by stream reads indicate a stream setup error such as absent group assignments and with that assumption in mind, there is nothing we can recover from so we decided to cancel the subscription.

What type of error are you running into?

I have the same question. Catch (RuntimeException e) in method deserializeAndEmitRecords , the range of Exception might too large ? When consume message occur same unexpection exception (such as NPE or other biz exceptionn ,the exception/bug by developer cannot completely avoid ) , the loop will also break .
I wonder if only catch the frame exception could be suitable ?

Care to provide the full stack trace?

Hey @mp911de

In our case it happened when an SQL exception occurred in the message loop

@RequiredArgsConstructor
public class OurStreamListener
    implements StreamListener<String, MapRecord<String, String, String>> {

  private final StreamMessageListenerContainer<String, MapRecord<String, String, String>>
      listenerContainer;

  @PostConstruct
  void subscribeToStream() {
    log.info("Subscribing to the status topic");

    // Start receiving messages
    final Subscription queueSubscription =
        listenerContainer.register(
            StreamMessageListenerContainer.StreamReadRequest.builder(
                    StreamOffset.create(
                        "queueName",
                        ReadOffset.lastConsumed()))
                .consumer(
                    Consumer.from(
                        "consumergroupname", "consumerID"))
                .autoAcknowledge(true)
                .build(),
            this);
  }

  @Override
  public void onMessage(MapRecord<String, String, String> message) {
    log.info("Received a message on the status topic");
    
   // ....
    (SQL Exception somewhere here)

  }
}

After the first SQL Exception the loop stops running for the rest of the application lifetime.

I fixed it by following your earlier advice and added

                // See https://github.com/spring-projects/spring-data-redis/issues/2919
                .cancelOnError(ignore -> false)
                .errorHandler(
                    t ->
                        log.error(
                            "An error occurred in the status polling loop", t))

After this, an SQL Exception doesn't stop the whole loop anymore, and it moves to the next message.

The behaviour is a bit unintuitive, but I understand why it's made like this.

I suppose making errorHandler print a default ERROR log message on exception (referring to #cancelOnError) is the "solution" here (currently, errorHandler is null by default)

hi @mp911de :
it's don't print the error stack trace because the source code catch the exception , and ↑ jord1e provide the demo code . And I'm try to make it more clear . In listenerContainer.receive Api , cancelOnError default is true.

//   org.springframework.data.redis.stream.StreamPollTask

    private void doLoop() {
        do {
            try {
                Thread.sleep(0L);
                List<ByteRecord> raw = this.readRecords();
                // the core code 
                this.deserializeAndEmitRecords(raw);
            } catch (InterruptedException var2) {
                this.cancel();
                Thread.currentThread().interrupt();
            } catch (RuntimeException var3) {
                RuntimeException ex = var3;
                if (this.cancelSubscriptionOnError.test(ex)) {
                    this.cancel();
                }

                this.errorHandler.handleError(ex);
            }
        } while(this.pollState.isSubscriptionActive());

    }
    
    
    private void deserializeAndEmitRecords(List<ByteRecord> records) {
        Iterator var2 = records.iterator();

        while(var2.hasNext()) {
            ByteRecord raw = (ByteRecord)var2.next();

            try {
                this.pollState.updateReadOffset(raw.getId().getValue());
                V record = this.convertRecord(raw);
                // the problem occur  
                // onMessage method  implements StreamListener  by user 
                // so it might be any excpetion  ,  catch the user code would also make state cancelled 
                this.listener.onMessage(record);
            } catch (RuntimeException var5) {
                RuntimeException ex = var5;
                if (this.cancelSubscriptionOnError.test(ex)) {
                    this.cancel();
                    this.errorHandler.handleError(ex);
                    return;
                }

                this.errorHandler.handleError(ex);
            }
        }

    }
    

biz code might do something in StreamListener Impl class onMessage method , so it could occur any exception . It might more in line with logical habits ,the state is runinng still when one unexpected exception occur . I wonder don't catch onMessage or define another exception to catch the source code exception could be better ? Just my two cents. thanks

@jord1e a stream read task defaults to DefaultStreamMessageListenerContainer.LoggingErrorHandler logging exceptions using the error level.

StreamMessageListenerContainer cannot know how to handle user-code exceptions yet we have to react to exceptions with a safe default.

In contrast to Pub/Sub, where messages are lost if there is no subscriber, Stream messages have a persistent aspect. Continuing by ignoring the exception would be possible but not great for the business code, however, the subscription would not cancel.

Looking at JMS polling, stopping the subscription is supposedly not a great experience, so we might want to flip the default.

We should also leverage ErrorHandler.handleError(…) to rethrow exceptions to align with how other parts of Spring handle exceptions.

@jord1e a stream read task defaults to DefaultStreamMessageListenerContainer.LoggingErrorHandler logging exceptions using the error level.

Very cool, I didn't notice #errorHandler on StreamMessageListenerContainerOptionsBuilder

Can't cancelOnError be implemented the same way, such that we have one place to define this behaviour (on the StreamMessageListenerContainer)?

In contrast to Pub/Sub, where messages are lost if there is no subscriber, Stream messages have a persistent aspect. Continuing by ignoring the exception would be possible but not great for the business code, however, the subscription would not cancel.

This was our main problem in my example. We were manually ACK'ing the messages, but didn't account for the exception happening before we could ACK it.
This is our fault of course, but because I used the same Consumer-ID (we want these specifics), we would just get the same message again and again after restarting the application.
When the message is not ACK'ed and the consumer moves on to the next message, the old message will just go dormant until it is reclaimed, or the consumer is restarted (if I understand correctly).

We should also leverage ErrorHandler.handleError(…) to rethrow exceptions to align with how other parts of Spring handle exceptions.

👍 I think this is the right way to go. The loop will continue to poll, but users will see (and log) the exception and have a chance to handle them in the future.

Thank you for picking it up for future users. Great abstraction all around.