streamnative/pulsar-io-cloud-storage

[BUG] Sink flushes only when batchTimeMs is exceeded for large batchSize

alpreu opened this issue · 0 comments

Describe the bug
BlobStoreAbstractSink is written incorrectly and contains a bug.
Every time the framework calls write() (records are from the consumer), it does a blocking add to a queue called pendingFlushQueue which has max capacity of 100,000 as the users configured (the sink - pendingQueueSize).
After the write was successful, it runs flushIfNeeded checking if we exceeded max bytes (configured to default of 10kb) needed to flush or max records number (batchSize - configured to 10,000 records).
So after pod start, queue is 0, and quickly fills up. Once it hits 10kb, it async launches a flush and continues to write to the queue until the queue is full (100,000 records).
The flush takes up-to 10kb or 10,000 records, so 10kb and writes it to S3.
Then finishes.
Since the write() is blocked since queue is full, nothing triggers another flush.
So only after maxBatchTime currently configured to 5min, it runs a flush, takes 10kb, flush, and repeat - 5min.

Expected behavior
The queue should flush the pending records repeatedly, independent of the insertion by the write method