AutoMQ/automq

[Enhancement] BlockCache enhancement

Closed this issue · 3 comments

TODO: design doc

Who is this for and what problem do they have today?

Why is solving this problem impactful?

Additional notes

image

[2024-04-12 16:43:08,773] WARN Unexpected exception from an event executor:  (io.netty.util.concurrent.SingleThreadEventExecutor)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
	at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83)
	at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148)
	at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
	at io.netty.buffer.AbstractDerivedByteBuf.release0(AbstractDerivedByteBuf.java:98)
	at io.netty.buffer.AbstractDerivedByteBuf.release(AbstractDerivedByteBuf.java:94)
	at com.automq.stream.s3.ObjectReader$DataBlockGroup.release(ObjectReader.java:465)
	at com.automq.stream.s3.cache.blockcache.DataBlock.free(DataBlock.java:62)
	at com.automq.stream.s3.cache.blockcache.DataBlockCache$Cache.evict0(DataBlockCache.java:193)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)

[2024-04-12 21:58:27,908] ERROR Fetch stream[3374] [5,8) fail (kafka.log.streamaspect.AlwaysSuccessClient)
java.util.concurrent.RejectedExecutionException: event executor terminated
	at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:934)
	at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:351)
	at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:344)
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:836)
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute0(SingleThreadEventExecutor.java:827)
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:817)
	at com.automq.stream.s3.cache.blockcache.StreamReaders$Cache.read(StreamReaders.java:124)
	at com.automq.stream.s3.cache.blockcache.StreamReaders.read(StreamReaders.java:75)
	at com.automq.stream.s3.S3Storage.read0(S3Storage.java:464)
	at com.automq.stream.s3.S3Storage.read(S3Storage.java:438)
	at com.automq.stream.s3.S3Stream.fetch0(S3Stream.java:253)
	at com.automq.stream.s3.S3Stream.lambda$fetch$4(S3Stream.java:190)
	at com.automq.stream.utils.FutureUtil.exec(FutureUtil.java:65)
	at com.automq.stream.s3.S3Stream.fetch(S3Stream.java:190)
	at com.automq.stream.s3.S3StreamClient$StreamWrapper.fetch(S3StreamClient.java:261)
	at kafka.log.streamaspect.AlwaysSuccessClient$StreamImpl.fetch(AlwaysSuccessClient.java:247)
	at kafka.log.streamaspect.MetaStream.fetch(MetaStream.java:152)
	at com.automq.stream.api.Stream.fetch(Stream.java:76)

error.log

  1. StreamReader is holding BlockA.
  2. BlockA is freed.
  3. A new instance of BlockA, termed BlockA', is returned to the DataBlockCache.
  4. Upon closing, StreamReader will mark BlockA as read and add it to the inactive list.
  5. The DataBlockCache will then remove the duplicate, freed BlockA from the inactive list.
[2024-04-13 05:51:54,186] ERROR [stream-reader-0]Error running task (com.automq.stream.utils.threads.EventLoop)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
	at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83)
	at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148)
	at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101)
	at io.netty.buffer.AbstractDerivedByteBuf.release0(AbstractDerivedByteBuf.java:98)
	at io.netty.buffer.AbstractDerivedByteBuf.release(AbstractDerivedByteBuf.java:94)
	at com.automq.stream.s3.ObjectReader$DataBlockGroup.release(ObjectReader.java:465)
	at com.automq.stream.s3.cache.blockcache.DataBlock.free(DataBlock.java:62)
	at com.automq.stream.s3.cache.blockcache.DataBlockCache$Cache.evict0(DataBlockCache.java:193)
	at com.automq.stream.utils.threads.EventLoop.run(EventLoop.java:48)
[2024-04-13 06:49:49,908] INFO [QuorumController id=0] [CommitStreamSetObject]: successfully commit stream set object. streamSetObjectId=37816, nodeId=7, nodeEpoch=1712979070826, compacted objects: [36992, 37761, 35841, 37633, 37762, 35843, 35459, 37763, 37635, 35845, 36998, 36999, 34952, 36363, 37387, 37004, 36878,

[2024-04-13 06:59:53,190] ERROR GetObject for object e0090000/_kafka_ChaosInjectionClusterw/36878 [371469653, 372495902) fail (com.automq.stream.s3.operator.DefaultS3Operator)
java.util.concurrent.CompletionException: software.amazon.awssdk.services.s3.model.NoSuchKeyException: The specified key does not exist. (Service: S3, Status Code: 404, Request ID: DN7AKVZRJDD2KHXQ, Extended Request ID: QsHmJ/VZK66O95/+w+fnPqgeaXhDEroIkTJp+MngNLvQZG2dP72guFHcIC3zTxZOqa/FS5S1PdI=)
	at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:65)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:51)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
	at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:103)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:184)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:170)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
[2024-04-13 07:17:26,442] ERROR [BUG] nextStartOffset:1380608 is not in the range of index:1380348-1380608 (com.automq.stream.s3.cache.blockcache.StreamReader)
[2024-04-13 07:17:59,305] ERROR [BUG] nextStartOffset:1808128 is not in the range of index:1808256-1808881 (com.automq.stream.s3.cache.blockcache.StreamReader)
[2024-04-13 07:18:27,392] ERROR [BUG] nextStartOffset:1439104 is not in the range of index:1438912-1439104 (com.automq.stream.s3.cache.blockcache.StreamReader)
[2024-04-13 07:19:32,258] ERROR [BUG] nextStartOffset:1468416 is not in the range of index:1468544-1469294 (com.automq.stream.s3.cache.blockcache.StreamReader)

BlockCache has no queued Tasks, and the upper layer has no unreleased tokens. It is suspected that there is an issue with the AsyncSemaphore notification mechanism.

[2024-04-13 08:29:11,360] WARN fetch timeout, stream[2239] [1644904, 1748992) (kafka.log.streamaspect.AlwaysSuccessClient)
[2024-04-13 08:29:11,361] WARN fetch timeout, stream[2295] [1644568, 1749248) (kafka.log.streamaspect.AlwaysSuccessClient)
[2024-04-13 08:29:11,361] WARN fetch timeout, stream[1517] [1644660, 1748992) (kafka.log.streamaspect.AlwaysSuccessClient)
[2024-04-13 08:29:11,361] WARN fetch timeout, stream[751] [1644773, 1748992) (kafka.log.streamaspect.AlwaysSuccessClient)
[2024-04-13 08:29:11,361] WARN fetch timeout, stream[2117] [2036986, 2164480) (kafka.log.streamaspect.AlwaysSuccessClient)
[2024-04-13 08:29:11,361] WARN fetch timeout, stream[2365] [1645077, 1749760) (kafka.log.streamaspect.AlwaysSuccessClient)
[2024-04-13 08:29:11,361] WARN fetch timeout, stream[2069] [1645868, 1750144) (kafka.log.streamaspect.AlwaysSuccessClient)
[2024-04-13 08:29:11,361] WARN fetch timeout, stream[243] [1645234, 1749760) (kafka.log.streamaspect.AlwaysSuccessClient)
[2024-04-13 08:29:11,361] WARN fetch timeout, stream[207] [1646188, 1750784) (kafka.log.streamaspect.AlwaysSuccessClient)
[2024-04-13 08:29:11,361] WARN fetch timeout, stream[2373] [1645942, 1750784) (kafka.log.streamaspect.AlwaysSuccessClient)
[2024-04-13 08:29:11,361] WARN fetch timeout, stream[675] [2036313, 2163840) (kafka.log.streamaspect.AlwaysSuccessClient)
[2024-04-13 08:29:11,361] WARN fetch timeout, stream[2501] [2036608, 2164608) (kafka.log.streamaspect.AlwaysSuccessClient)
[2024-04-13 08:29:11,541] WARN read from block cache timeout, stream=2239, 1644904, maxBytes: 131072 (com.automq.stream.s3.S3Storage)
[2024-04-13 08:29:11,541] WARN read from block cache timeout, stream=2295, 1644568, maxBytes: 131072 (com.automq.stream.s3.S3Storage)
[2024-04-13 08:29:11,541] WARN read from block cache timeout, stream=1517, 1644660, maxBytes: 131072 (com.automq.stream.s3.S3Storage)
[2024-04-13 08:29:11,541] WARN read from block cache timeout, stream=751, 1644773, maxBytes: 131072 (com.automq.stream.s3.S3Storage)
[2024-04-13 08:29:11,541] WARN read from block cache timeout, stream=2117, 2036986, maxBytes: 131072 (com.automq.stream.s3.S3Storage)
[2024-04-13 08:29:11,541] WARN read from block cache timeout, stream=2365, 1645077, maxBytes: 131072 (com.automq.stream.s3.S3Storage)
[2024-04-13 08:29:11,541] WARN read from block cache timeout, stream=2069, 1645868, maxBytes: 131072 (com.automq.stream.s3.S3Storage)
[2024-04-13 08:29:11,541] WARN read from block cache timeout, stream=243, 1645234, maxBytes: 131072 (com.automq.stream.s3.S3Storage)
[2024-04-13 08:29:11,541] WARN read from block cache timeout, stream=207, 1646188, maxBytes: 131072 (com.automq.stream.s3.S3Storage)
[2024-04-13 08:29:11,541] WARN read from block cache timeout, stream=2373, 1645942, maxBytes: 131072 (com.automq.stream.s3.S3Storage)
[2024-04-13 08:29:11,541] WARN read from block cache timeout, stream=675, 2036313, maxBytes: 131072 (com.automq.stream.s3.S3Storage)
[2024-04-13 08:29:11,541] WARN read from block cache timeout, stream=2501, 2036608, maxBytes: 131072 (com.automq.stream.s3.S3Storage)
[2024-04-13 08:49:02,541] WARN read from block cache timeout, stream=2308, 1915327, maxBytes: 131072 (com.automq.stream.s3.S3Storage)
[2024-04-13 08:49:03,360] WARN fetch timeout, stream[2308] [1915327, 2023525) (kafka.log.streamaspect.AlwaysSuccessClient)

[arthas@4448]$ vmtool -x 3 --action getInstances --className com.automq.stream.s3.cache.blockcache.AsyncSemaphore  --express 'instances[0].tasks'

[arthas@4448]$ vmtool -x 1 --action getInstances --className kafka.server.FairLimiter  --express 'instances[0].permits.availablePermits()'
@Integer[197132288]
[arthas@4448]$ vmtool -x 1 --action getInstances --className kafka.server.FairLimiter  --express 'instances[1].permits.availablePermits()'
@Integer[209715200]

When memory is tight, the caches read down by Readahead are almost all evicted before being read, wasting the bandwidth of S3.
image