event loop thread gets blocked during disconnection/reconnection
leisurelyrcxf opened this issue · 4 comments
Bug Report
Current Behavior
event loop thread gets blocked during disconnection/reconnection
Input Code
public class MultiThreadSyncGet {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultiThreadSyncGet.class);
private static final int LOOP_NUM = 10_000_000;
private static final int BATCH_SIZE = 1000;
private static final int DIGIT_NUM = 9;
private static final String KEY_FORMATTER = String.format("key-%%0%dd", DIGIT_NUM);
static {
// noinspection ConstantValue
LettuceAssert.assertState(DIGIT_NUM >= String.valueOf(LOOP_NUM).length() + 1, "digit num is not large enough");
}
void test() {
RedisURI uri = RedisURI.create("redis.dev-d-okex.svc.dev.local", 6379);
uri.setCredentialsProvider(new StaticCredentialsProvider(null, "123qweasd!@#".toCharArray()));
try (RedisClient redisClient = RedisClient.create(uri)) {
final ClientOptions.Builder optsBuilder = ClientOptions.builder()
.timeoutOptions(TimeoutOptions.builder().fixedTimeout(Duration.ofSeconds(60)).build());
redisClient.setOptions(optsBuilder.build());
final StatefulRedisConnection<byte[], byte[]> connection = redisClient.connect(ByteArrayCodec.INSTANCE);
connection.setAutoFlushCommands(false);
new Thread(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
logger.error("interrupted", e);
}
connection.close();
}).start();
for (int j = 0; j < LOOP_NUM && connection.isOpen(); j++) {
for (int i = 0; i < BATCH_SIZE; i++) {
connection.async().get(genKey(j));
}
connection.flushCommands();
}
}
}
private byte[] genKey(int j) {
return String.format(KEY_FORMATTER, j).getBytes();
}
public static void main(String[] args) {
new MultiThreadSyncGet().test();
logger.info("=====================================");
}
}
I manipulate network using MacOS's Network Link Conditioner
, set Downlink Delay
to 1000ms, then set debug breakpoint at stack.remove(command)
public void operationComplete(Future<Void> future) {
try {
if (!future.isSuccess()) {
stack.remove(command);
}
} finally {
recycle();
}
}
Then evaluate
new ArrayList<>(stack).indexOf(command)
got 3971
.
evaluate
stack.size()
got 85000
.
we can conclude the compute complexity to remove all the failed-to-flush commands is O(3971*(85000-3971)), which is super big. If there are more successfully flushed commands before the first failed-to-flush command, the complexity could be even higher.
Expected behavior/code
Thread not blocked
Environment
- Lettuce version(s): git_sha_dee8020d92e6bff562cef723dcacdea714b89982
- Redis version: 7.0.0
Possible Solution
Use HashIndexedQueue instead which provides O(1) complexity to remove an element from the queue.
This is due to ArrayDeque#remove is O(n), if there are lots of tasks failing at AddToStask#operationComplete, then the event loop thread may get blocked for too long
Additional context
problematic code:
static class AddToStack implements GenericFutureListener<Future<Void>> {
...
AddToStack(Recycler.Handle<AddToStack> handle) {
this.handle = handle;
}
@SuppressWarnings("unchecked")
@Override
public void operationComplete(Future<Void> future) {
try {
if (!future.isSuccess()) {
stack.remove(command);
}
} finally {
recycle();
}
}
}
@leisurelyrcxf could you help me understand better the scenario you are addressing?
If you could provide a stack trace that explains it it would be perfect.
hi @tishun thx for the reply!
I updated the description (see the Details section) and stably reproduce method. Here is the callstack
operationComplete:1065, CommandHandler$AddToStack (io.lettuce.core.protocol)
notifyListener0:590, DefaultPromise (io.netty.util.concurrent)
notifyListeners0:583, DefaultPromise (io.netty.util.concurrent)
notifyListenersNow:559, DefaultPromise (io.netty.util.concurrent)
notifyListeners:492, DefaultPromise (io.netty.util.concurrent)
setValue0:636, DefaultPromise (io.netty.util.concurrent)
setFailure0:629, DefaultPromise (io.netty.util.concurrent)
tryFailure:118, DefaultPromise (io.netty.util.concurrent)
tryFailure:64, PromiseNotificationUtil (io.netty.util.internal)
safeFail:754, ChannelOutboundBuffer (io.netty.channel)
remove0:339, ChannelOutboundBuffer (io.netty.channel)
failFlushed:691, ChannelOutboundBuffer (io.netty.channel)
close:735, AbstractChannel$AbstractUnsafe (io.netty.channel)
close:620, AbstractChannel$AbstractUnsafe (io.netty.channel)
close:1352, DefaultChannelPipeline$HeadContext (io.netty.channel)
invokeClose:755, AbstractChannelHandlerContext (io.netty.channel)
access$1200:61, AbstractChannelHandlerContext (io.netty.channel)
run:738, AbstractChannelHandlerContext$11 (io.netty.channel)
runTask$$$capture:173, AbstractEventExecutor (io.netty.util.concurrent)
runTask:-1, AbstractEventExecutor (io.netty.util.concurrent)
- Async stack trace
addTask:-1, SingleThreadEventExecutor (io.netty.util.concurrent)
execute:836, SingleThreadEventExecutor (io.netty.util.concurrent)
execute0:827, SingleThreadEventExecutor (io.netty.util.concurrent)
execute:817, SingleThreadEventExecutor (io.netty.util.concurrent)
safeExecute:1181, AbstractChannelHandlerContext (io.netty.channel)
close:735, AbstractChannelHandlerContext (io.netty.channel)
close:560, AbstractChannelHandlerContext (io.netty.channel)
close:957, DefaultChannelPipeline (io.netty.channel)
close:244, AbstractChannel (io.netty.channel)
closeAsync:606, DefaultEndpoint (io.lettuce.core.protocol)
closeAsync:152, CommandExpiryWriter (io.lettuce.core.protocol)
closeAsync:164, RedisChannelHandler (io.lettuce.core)
close:142, RedisChannelHandler (io.lettuce.core)
lambda$test$0:52, MultiThreadSyncGet (bench)
run:840, Thread (java.lang)
Awesome, thanks for clarifying, let me get back to you after I spend some time thinking about this.
Specific implementation aside this seems to be a meaningful thing to fix.