`sendMessageBatch` edge case when individual batch entries are close to limit (not over the limit)
Opened this issue · 1 comments
There is unhandled edge-case in the current sendMessageBatch
implementation:
According to documentation here: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-batch-api-actions.html
The total size of all messages that you send in a single SendMessageBatch call can't exceed 262,144 bytes (256 KiB).
Currently we only measure the individual batch entries, to verify they are under the limit, but this is not enough.
We also should measure that entire request is under the threshold.
Few years ago (when there was missing ExtendedSNSClient that is based on AWS SDK v2.x) I've implemented my own ExtendedSNSClient that was very inspired by this ExtendedSQSClient library, and i did resolve the issue described here with the following approach:
@Override
public PublishBatchResponse publishBatch(PublishBatchRequest publishBatchRequest) throws AwsServiceException, SdkClientException {
if (publishBatchRequest == null) {
String errorMessage = "publishBatchRequest cannot be null.";
log.error(errorMessage);
throw SdkClientException.create(errorMessage);
}
PublishBatchRequest.Builder publishBatchRequestBuilder = publishBatchRequest.toBuilder();
appendUserAgent(publishBatchRequestBuilder);
publishBatchRequest = publishBatchRequestBuilder.build();
if (!configuration.isPayloadSupportEnabled()) {
log.trace("Payload support is disabled. Publishing full message to SNS. S3 is not used.");
return super.publishBatch(publishBatchRequest);
}
List<PublishBatchRequestEntry> batchEntries = new ArrayList<>(publishBatchRequest.publishBatchRequestEntries());
List<Integer> smallMessageIndexes = new LinkedList<>();
//move batch entries that individually exceed the threshold limit of SNS to S3 and return total size of resulting request
long totalSize = moveIndividuallyLargeMessagesToS3(publishBatchRequest.publishBatchRequestEntries(), batchEntries, smallMessageIndexes);
if (isLarge(totalSize) && !smallMessageIndexes.isEmpty()) {
//move all messages of the batch to S3 in case tatal size of request exceed the threshold
moveRemainingMessagesToS3(batchEntries, smallMessageIndexes);
}
return super.publishBatch(publishBatchRequest.toBuilder().publishBatchRequestEntries(batchEntries).build());
}
private long moveIndividuallyLargeMessagesToS3(List<PublishBatchRequestEntry> batchEntriesSrc, List<PublishBatchRequestEntry> batchEntriesTarget, List<Integer> smallMessageIndexes) {
long totalSize = 0L;
int index = 0;
for (PublishBatchRequestEntry entry : batchEntriesSrc) {
if (StringUtils.isEmpty(entry.message())) {
String errorMessage = "message cannot be null or empty.";
log.error(errorMessage);
throw SdkClientException.create(errorMessage);
}
//Check message attributes for ExtendedClient related constraints
checkMessageAttributes(entry.messageAttributes());
long entrySize = sizeOf(entry);
if (configuration.isAlwaysThroughS3() || isLarge(entrySize)) {
log.trace("Storing publish request entry payload to S3");
entry = storeMessageInS3(entry);
entrySize = sizeOf(entry);
} else {
smallMessageIndexes.add(index);
}
totalSize += entrySize;
batchEntriesTarget.set(index, entry);
++index;
}
return totalSize;
}
private void moveRemainingMessagesToS3(List<PublishBatchRequestEntry> batchEntries, List<Integer> smallMessageIndexes) {
for (Integer index : smallMessageIndexes) {
batchEntries.set(index, storeMessageInS3(batchEntries.get(index)));
}
}
LMK what you think about this?
If you agree with the direction i can work on the PR.
I believe there is a room for improvement in the suggested code snippet above.
The idea there was the following:
- move each individually large (over the limit) entry to the s3
- check total size
2.1 if total size is ok do nothing
2.2 if total size is not ok move every other message to s3.
The improvement in 2.2 could be:
2.2 Start moving messages one by one (maybe starting from biggest to smallest) and measure total at each step.
The moment total is under threshold - send .
Or maybe:
Do some calculations beforehand to calculate what is the minimal set of messages that required to be moved to s3 so that total batch will be under the limit.