spring-projects/spring-data-redis

Redis Stream Issue: Caused by: io.lettuce.core.RedisException: Connection closed

Closed this issue · 2 comments

Using Redis stream in Spring Boot.
Whenever I restart the service, I'm receiving the above error message. This message is occurring intermittently.

org.springframework.data.redis.RedisSystemException: Redis exception
Caused by: io.lettuce.core.RedisException: Connection closed

`@Slf4j
@configuration
public class RedisStreamConfig {

private static final String START = "start";
private static final String STOP = "stop";
private static final String CONSUMER = "consumer";
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> container;
private final AtomicInteger recoveryAttempts = new AtomicInteger(0);
private final List subscriptions = new CopyOnWriteArrayList<>();
private static final ScheduledExecutorService singleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();

@value("${platform.redis.host}")
private String redisHost;

@value("${platform.redis.port}")
private int redisPort;

@value("${platform.redis.use.ssl}")
private boolean useSsl;

@value("${platform.registry.redis.stream.consumer.count}")
private int consumerCount;

@value("${platform.registry.redis.stream.consumer.polling.interval.ms}")
private int redisStreamConsumerPollingIntervalInMs;

@value("${platform.registry.redis.stream.consumer.messages.per.interval.ms}")
private int redisStreamConsumerMessagesPerPoll;

@value("${platform.registry.redis.stream.consumer.threads}")
private int redisStreamConsumerThreads;

@Autowired
private ApplicationContext applicationContext;

@bean
public RedisConnectionFactory redisConnectionFactory() {
val config = new RedisStandaloneConfiguration(this.redisHost, this.redisPort);

val clientOptions = io.lettuce.core.ClientOptions.builder()
        .autoReconnect(true) // ensures lettuce will reconnect if connection is dropped
        .disconnectedBehavior(io.lettuce.core.ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
        .pingBeforeActivateConnection(true)
        .timeoutOptions(TimeoutOptions.enabled())
        .socketOptions(SocketOptions.builder()
                .keepAlive(true)
                .tcpNoDelay(true)
                .build())
        .build();

val clientConfigBuilder = LettuceClientConfiguration.builder()
        .commandTimeout(Duration.ofMinutes(5)) // > this.redisStreamConsumerPollingIntervalInMs
        .clientOptions(clientOptions);

if (useSsl) {
    clientConfigBuilder.useSsl();
}

val clientConfig = clientConfigBuilder.build();

val factory = new LettuceConnectionFactory(config, clientConfig);
factory.setValidateConnection(true); // ensures connections are alive before use
return factory;

}

@bean
public RedisTemplate<String, Object> redisTemplate(final RedisConnectionFactory factory) {
val template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new StringRedisSerializer());
return template;
}

@bean(initMethod = START, destroyMethod = STOP)
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(final RedisConnectionFactory connectionFactory,
final RedisTemplate<String, Object> redisTemplate,
final InboundVoiceCallRedisStreamConsumer redisStreamConsumer) {
var options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(this.redisStreamConsumerPollingIntervalInMs))
.batchSize(this.redisStreamConsumerMessagesPerPoll)
.executor(Executors.newFixedThreadPool(this.redisStreamConsumerThreads))
.errorHandler(error -> handleRedisStreamError(error, redisTemplate, redisStreamConsumer))
.build();

this.container = StreamMessageListenerContainer.create(connectionFactory, options);

createConsumerGroupSafely(redisTemplate);
registerSubscriptions(redisStreamConsumer);
return this.container;

}

private void createConsumerGroupSafely(final RedisTemplate<String, Object> redisTemplate) {
try {
redisTemplate.opsForStream().createGroup(VOICE_CALL_INBOUND_STREAM, VOICE_CALL_INBOUND_GROUP);
log.info("Created consumer group: {}", VOICE_CALL_INBOUND_GROUP);
} catch (Exception e) {
log.warn("Error creating consumer group for redis stream", e);
}
}

private void registerSubscriptions(final InboundVoiceCallRedisStreamConsumer redisStreamConsumer) {
subscriptions.clear(); // ensure clean slate
for (var i = 1; i <= consumerCount; i++) {
val consumerName = CONSUMER + Symbols.UNDERSCORE + i;
log.info("Registering Redis consumer: {}", consumerName);

    val subscription = container.receive(
            Consumer.from(VOICE_CALL_INBOUND_GROUP, consumerName),
            StreamOffset.create(VOICE_CALL_INBOUND_STREAM, ReadOffset.lastConsumed()),
            redisStreamConsumer
    );
    subscriptions.add(subscription);
}

}

@PreDestroy
public void shutdownContainer() {
if (Objects.nonNull(container)) {
log.info("Stopping StreamMessageListenerContainer before shutdown");

    while (container.isRunning()) {
        try {
            Thread.sleep(50);
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
    }
    container.stop();

    // Wait for subscriptions to be inactive to avoid connection closed errors
    subscriptions.forEach(subscription -> {
        while (subscription.isActive()) {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
}

}

private void handleRedisStreamError(final Throwable error,
final RedisTemplate<String, Object> redisTemplate,
final InboundVoiceCallRedisStreamConsumer redisStreamConsumer) {
log.error("Redis Stream error occurred: {}", error.getMessage(), error);

// Short-circuit any recovery if context is closing/closed
if (applicationContext instanceof ConfigurableApplicationContext ctx && !ctx.isActive()) {
    log.warn("Context is closing/closed, skipping recovery.");
    return;
}

int attempt = recoveryAttempts.incrementAndGet();
long delay = Math.min(1000L * (1L << (attempt - 1)), 30000L); // exponential backoff

if (attempt > 5) {
    log.warn("Recovery attempts are exceeded.");
    return;
}
log.warn("Scheduling recovery attempt {} in {} ms", attempt, delay);
singleThreadScheduledExecutor.schedule(() -> {
    try {
        if (!(applicationContext instanceof ConfigurableApplicationContext execCtx) || !execCtx.isActive()) {
            log.warn("ApplicationContext became inactive before recovery execution; aborting attempt {}.", attempt);
            return;
        }

        log.info("Redis stream consumer recovery task starting (attempt {})...", attempt);
        recoverContainer(redisTemplate, redisStreamConsumer);
        log.info("Redis stream consumer recovery task completed (attempt {}).", attempt);
        recoveryAttempts.set(0);
    } catch (Throwable ex) {
        log.error("Exception in redis stream consumer recovery task (attempt {}): {}", attempt, ex.getMessage(), ex);
    }
}, delay, TimeUnit.MILLISECONDS);

}

private void recoverContainer(final RedisTemplate<String, Object> redisTemplate,
final InboundVoiceCallRedisStreamConsumer redisStreamConsumer) {
try {
log.info("Attempting Redis container recovery...");

    if (Objects.nonNull(container) && container.isRunning()) {
        container.stop();
        log.info("Stopped container");
    }

    subscriptions.forEach(sub -> {
        if (sub.isActive()) sub.cancel();
    });
    subscriptions.clear();

    Thread.sleep(1000);

    if (Objects.nonNull(container)) {
        container.start();
        log.info("Restarted Redis container");
        createConsumerGroupSafely(redisTemplate);
        registerSubscriptions(redisStreamConsumer); // re-register after restart
        log.info("Re-registered Redis subscriptions after recovery");
    }

    recoveryAttempts.set(0);
} catch (Exception e) {
    log.error("Recovery failed: {}", e.getMessage(), e);
}

}
}`

The properties:

platform.registry.redis.stream.messages.alive.time.ms=0
platform.registry.redis.stream.failed.message.retry.interval.time.ms=10000
platform.registry.redis.stream.failed.message.max.retry.count=0
platform.registry.redis.stream.consumer.polling.interval.ms=500
platform.registry.redis.stream.consumer.messages.per.interval.ms=4
platform.registry.redis.stream.consumer.threads=4
platform.registry.redis.stream.consumer.count=4

io.lettuce.core.RedisException: Connection closed is an infrastructure issue and nothing we can really do something about. You might want to include the full stack trace and post it in the Lettuce issue tracker.

Is the rest of the config correct?