AutoMQ/automq

[BUG] StreamReader timeout

Closed this issue · 1 comments

[2024-06-05 07:34:28,941] ERROR [POTENTIAL_BUG] read from block cache timeout, stream=161329, [292951,292957), maxBytes: 1048576 (com.automq.stream.s3.S3Storage)
[arthas@321881]$ watch com.automq.stream.s3.cache.blockcache.StreamReader getBlocks0 '{params[1],target.blocksMap.firstEntry()}'  -n 1  -x 2 
method=com.automq.stream.s3.cache.blockcache.StreamReader.getBlocks0 location=AtExit
ts=2024-06-05 11:36:26; [cost=0.279973ms] result=@ArrayList[
    @Long[292951],
    @SimpleImmutableEntry[
        serialVersionUID=@Long[7138329143949025153],
        key=@Long[293431],
        value=@Block[com.automq.stream.s3.cache.blockcache.StreamReader$Block@1b388394],
    ],
]

image

Reason for the problem: During the execution of StreamReader#loadMoreBlocksWithoutData0, StreamReader#resetBlocks is interleaved, causing gaps between the blocks inserted by StreamReader#loadMoreBlocksWithoutData0 and nextReadOffset.

CompletableFuture<Void> prevCf = CompletableFuture.completedFuture(null);
for (S3ObjectMetadata objectMetadata : objects) {
// the object reader will be release in the whenComplete
@SuppressWarnings("resource") ObjectReader objectReader = objectReaderFactory.apply(objectMetadata);
// invoke basicObjectInfo to warm up the objectReader
objectReader.basicObjectInfo();
prevCf = prevCf.thenCompose(
nil ->
objectReader
.find(streamId, nextFindStartOffset.get(), -1L, Integer.MAX_VALUE)
.thenAcceptAsync(
findRst ->
findRst.streamDataBlocks().forEach(streamDataBlock -> {
DataBlockIndex index = streamDataBlock.dataBlockIndex();
Block block = new Block(objectMetadata, index);
if (!putBlock(block)) {
// After object compaction, the blocks get from different objectManager#getObjects maybe not continuous.
throw new BlockNotContinuousException();
}
nextFindStartOffset.set(streamDataBlock.getEndOffset());
}),
eventLoop
).whenComplete((nil2, ex) -> objectReader.release())
);
}
return prevCf;