redis/lettuce

event loop thread gets blocked during disconnection/reconnection

leisurelyrcxf opened this issue · 4 comments

Bug Report

Current Behavior

event loop thread gets blocked during disconnection/reconnection

Input Code

public class MultiThreadSyncGet {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultiThreadSyncGet.class);

    private static final int LOOP_NUM = 10_000_000;

    private static final int BATCH_SIZE = 1000;

    private static final int DIGIT_NUM = 9;

    private static final String KEY_FORMATTER = String.format("key-%%0%dd", DIGIT_NUM);

    static {
        // noinspection ConstantValue
        LettuceAssert.assertState(DIGIT_NUM >= String.valueOf(LOOP_NUM).length() + 1, "digit num is not large enough");
    }

    void test() {
        RedisURI uri = RedisURI.create("redis.dev-d-okex.svc.dev.local", 6379);
        uri.setCredentialsProvider(new StaticCredentialsProvider(null, "123qweasd!@#".toCharArray()));
        try (RedisClient redisClient = RedisClient.create(uri)) {
            final ClientOptions.Builder optsBuilder = ClientOptions.builder()
                    .timeoutOptions(TimeoutOptions.builder().fixedTimeout(Duration.ofSeconds(60)).build());
            redisClient.setOptions(optsBuilder.build());
            final StatefulRedisConnection<byte[], byte[]> connection = redisClient.connect(ByteArrayCodec.INSTANCE);
            connection.setAutoFlushCommands(false);

            new Thread(() -> {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    logger.error("interrupted", e);
                }
                connection.close();
            }).start();

            for (int j = 0; j < LOOP_NUM && connection.isOpen(); j++) {
                for (int i = 0; i < BATCH_SIZE; i++) {
                    connection.async().get(genKey(j));
                }
                connection.flushCommands();
            }
        }
    }

    private byte[] genKey(int j) {
        return String.format(KEY_FORMATTER, j).getBytes();
    }

    public static void main(String[] args) {
        new MultiThreadSyncGet().test();
        logger.info("=====================================");
    }

}

I manipulate network using MacOS's Network Link Conditioner, set Downlink Delay to 1000ms, then set debug breakpoint at stack.remove(command)

 public void operationComplete(Future<Void> future) {

            try {
                if (!future.isSuccess()) {
                    stack.remove(command);
                }
            } finally {
                recycle();
            }
        }

Then evaluate

new ArrayList<>(stack).indexOf(command)

got 3971.

evaluate

stack.size()

got 85000.

we can conclude the compute complexity to remove all the failed-to-flush commands is O(3971*(85000-3971)), which is super big. If there are more successfully flushed commands before the first failed-to-flush command, the complexity could be even higher.

Expected behavior/code

Thread not blocked

Environment

  • Lettuce version(s): git_sha_dee8020d92e6bff562cef723dcacdea714b89982
  • Redis version: 7.0.0

Possible Solution

Use HashIndexedQueue instead which provides O(1) complexity to remove an element from the queue.
This is due to ArrayDeque#remove is O(n), if there are lots of tasks failing at AddToStask#operationComplete, then the event loop thread may get blocked for too long

Additional context

problematic code:

    static class AddToStack implements GenericFutureListener<Future<Void>> {
       ...

        AddToStack(Recycler.Handle<AddToStack> handle) {
            this.handle = handle;
        }

        @SuppressWarnings("unchecked")
        @Override
        public void operationComplete(Future<Void> future) {

            try {
                if (!future.isSuccess()) {
                    stack.remove(command);
                }
            } finally {
                recycle();
            }
        }
}

@leisurelyrcxf could you help me understand better the scenario you are addressing?
If you could provide a stack trace that explains it it would be perfect.

hi @tishun thx for the reply!

I updated the description (see the Details section) and stably reproduce method. Here is the callstack

operationComplete:1065, CommandHandler$AddToStack (io.lettuce.core.protocol)
notifyListener0:590, DefaultPromise (io.netty.util.concurrent)
notifyListeners0:583, DefaultPromise (io.netty.util.concurrent)
notifyListenersNow:559, DefaultPromise (io.netty.util.concurrent)
notifyListeners:492, DefaultPromise (io.netty.util.concurrent)
setValue0:636, DefaultPromise (io.netty.util.concurrent)
setFailure0:629, DefaultPromise (io.netty.util.concurrent)
tryFailure:118, DefaultPromise (io.netty.util.concurrent)
tryFailure:64, PromiseNotificationUtil (io.netty.util.internal)
safeFail:754, ChannelOutboundBuffer (io.netty.channel)
remove0:339, ChannelOutboundBuffer (io.netty.channel)
failFlushed:691, ChannelOutboundBuffer (io.netty.channel)
close:735, AbstractChannel$AbstractUnsafe (io.netty.channel)
close:620, AbstractChannel$AbstractUnsafe (io.netty.channel)
close:1352, DefaultChannelPipeline$HeadContext (io.netty.channel)
invokeClose:755, AbstractChannelHandlerContext (io.netty.channel)
access$1200:61, AbstractChannelHandlerContext (io.netty.channel)
run:738, AbstractChannelHandlerContext$11 (io.netty.channel)
runTask$$$capture:173, AbstractEventExecutor (io.netty.util.concurrent)
runTask:-1, AbstractEventExecutor (io.netty.util.concurrent)
 - Async stack trace
addTask:-1, SingleThreadEventExecutor (io.netty.util.concurrent)
execute:836, SingleThreadEventExecutor (io.netty.util.concurrent)
execute0:827, SingleThreadEventExecutor (io.netty.util.concurrent)
execute:817, SingleThreadEventExecutor (io.netty.util.concurrent)
safeExecute:1181, AbstractChannelHandlerContext (io.netty.channel)
close:735, AbstractChannelHandlerContext (io.netty.channel)
close:560, AbstractChannelHandlerContext (io.netty.channel)
close:957, DefaultChannelPipeline (io.netty.channel)
close:244, AbstractChannel (io.netty.channel)
closeAsync:606, DefaultEndpoint (io.lettuce.core.protocol)
closeAsync:152, CommandExpiryWriter (io.lettuce.core.protocol)
closeAsync:164, RedisChannelHandler (io.lettuce.core)
close:142, RedisChannelHandler (io.lettuce.core)
lambda$test$0:52, MultiThreadSyncGet (bench)
run:840, Thread (java.lang)

Awesome, thanks for clarifying, let me get back to you after I spend some time thinking about this.
Specific implementation aside this seems to be a meaningful thing to fix.