AutoMQ/automq

[BUG] StreamReader endless loop

Closed this issue · 1 comments

2024-05-31 10:07:20.553 +0800 ERROR [com.automq.stream.s3.S3Storage]  [POTENTIAL_BUG] read from block cache timeout, stream=20948, [311613,325507), maxBytes: 1048576
2024-05-31 10:06:50.556 +0800 ERROR [com.automq.stream.s3.S3Storage]  [POTENTIAL_BUG] read from block cache timeout, stream=20948, [311613,325507), maxBytes: 1048576

image

Problem Cause: Both Consumer consume the same offset using one StreamReader.

  1. Consumer1 acquires a StreamReader with readOffset=10 and tries to read [10, 40).
  2. Consumer2 acquires the same StreamReader as step 1 with readOffset=10 and tries to read [10, 30).
  3. Consumer1 finishes reading first and clears the blocks corresponding to [10, 40) from the StreamReader.
  4. Consumer2 #getBlocks0, floorKey can only obtain null, and #loadMoreBlocksWithoutData can only load blocks after 40, causing Consumer2's read requests to keep looping in #getBlocks0.

public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset,
long endOffset,
int maxBytes) {
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
eventLoop.execute(() -> {
cleanupExpiredStreamReader();
StreamReaderKey key = new StreamReaderKey(streamId, startOffset);
StreamReader streamReader = streamReaders.computeIfAbsent(key, k -> new StreamReader(streamId, startOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache));
CompletableFuture<ReadDataBlock> streamReadCf = streamReader.read(startOffset, endOffset, maxBytes)
.whenComplete((rst, ex) -> {
if (ex != null) {
LOGGER.error("read {} [{}, {}), maxBytes: {} from block cache fail", streamId, startOffset, endOffset, maxBytes, ex);
}
// when two stream read progress is the same, only one stream reader can be retained
if (streamReaders.remove(key, streamReader) && ex == null) {
streamReaders.put(new StreamReaderKey(streamId, streamReader.nextReadOffset()), streamReader);
}
});
FutureUtil.propagate(streamReadCf, cf);
});
return cf;
}

private void getBlocks0(GetBlocksContext ctx, long startOffset, long endOffset, int maxBytes) {
Long floorKey = blocksMap.floorKey(startOffset);
CompletableFuture<Boolean> loadMoreBlocksCf;
int remainingSize = maxBytes;
if (floorKey == null || startOffset >= loadedBlockIndexEndOffset) {
loadMoreBlocksCf = loadMoreBlocksWithoutData(endOffset);
} else {
boolean firstBlock = true;
boolean fulfill = false;
for (Map.Entry<Long, Block> entry : blocksMap.tailMap(floorKey).entrySet()) {
Block block = entry.getValue();
long objectId = block.metadata.objectId();
if (!objectManager.isObjectExist(objectId)) {
// The cached block's object maybe deleted by the compaction. So we need to check the object exist.
ctx.cf.completeExceptionally(new ObjectNotExistException(objectId));
return;
}
DataBlockIndex index = block.index;
if (!firstBlock || index.startOffset() == startOffset) {
remainingSize -= index.size();
}
if (firstBlock) {
firstBlock = false;
}
// after read the data will be return to the cache, so we need to reload the data every time
block = block.newBlockWithData(ctx.readahead);
ctx.blocks.add(block);
if ((endOffset != -1L && index.endOffset() >= endOffset) || remainingSize <= 0) {
fulfill = true;
break;
}
}
if (fulfill) {
ctx.cf.complete(ctx.blocks);
return;
} else {
loadMoreBlocksCf = loadMoreBlocksWithoutData(endOffset);
}
}
int finalRemainingSize = remainingSize;
// use async to prevent recursive call cause stack overflow
loadMoreBlocksCf.thenAcceptAsync(moreBlocks -> {
if (ctx.readahead) {
// If #loadMoreBlocksWithoutData result is empty, it means the stream is already loads to the end.
if (!moreBlocks) {
ctx.cf.complete(ctx.blocks);
return;
}
} else {
if (!moreBlocks && endOffset > loadedBlockIndexEndOffset) {
String errMsg = String.format("[BUG] streamId=%s expect load blocks to endOffset=%s, " +
"current loadedBlockIndexEndOffset=%s", streamId, endOffset, loadedBlockIndexEndOffset);
ctx.cf.completeExceptionally(new AutoMQException(errMsg));
return;
}
}
long nextStartOffset = ctx.blocks.isEmpty() ? startOffset : ctx.blocks.get(ctx.blocks.size() - 1).index.endOffset();
getBlocks0(ctx, nextStartOffset, endOffset, finalRemainingSize);
}, eventLoop).exceptionally(ex -> {
ctx.cf.completeExceptionally(ex);
return null;
});
}