AggRecord does not validate record size correctly
Opened this issue · 3 comments
In the following code:
public boolean addUserRecord(String partitionKey, String explicitHashKey, byte[] data) {
// set the explicit hash key for the message to the partition key -
// required for encoding
explicitHashKey = explicitHashKey != null ? explicitHashKey : createExplicitHashKey(partitionKey);
// validate values from the provided message
validatePartitionKey(partitionKey);
validateExplicitHashKey(explicitHashKey);
validateData(data);
// Validate new record size won't overflow max size for a
// PutRecordRequest
int sizeOfNewRecord = calculateRecordSize(partitionKey, explicitHashKey, data);
if (getSizeBytes() + sizeOfNewRecord > MAX_BYTES_PER_RECORD) {
return false;
it validates each of key
, explicitHashKey
, data
separately. But it does not validate that the sum of all of them is still less than MAX_RECORD_SIZE
This testcase shows the problem.
package com.zillow.amazonaws.kinesis.producer;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
public class PutRecordsRequestEntryInformationTest
{
@Test
public void test()
{
SecureRandom random = new SecureRandom();
byte [] buffer = new byte[1*1024*1024];
random.nextBytes(buffer);
String key = "key";
try {
PutRecordsRequestEntryInformation record = new PutRecordsRequestEntryInformation(
key,
ByteBuffer.wrap(buffer)
);
System.out.printf("%d\n", record.sizeInBytes());
Assert.assertFalse("Expected exception", true);
} catch (Exception e) {
}
buffer = new byte[1*1024*1024 - 20];
random.nextBytes(buffer);
try {
PutRecordsRequestEntryInformation record = new PutRecordsRequestEntryInformation(
key,
ByteBuffer.wrap(buffer)
);
System.out.printf("%d\n", record.sizeInBytes());
Assert.assertTrue("Expected exception", true);
} catch (Exception e) {
e.printStackTrace(System.err);
Assert.assertTrue("Unexpected exception", false);
}
}
}
The first one is expected to fail. The second one should succeed but it doesnt.
Hello,
I can't really tell what the test is doing because I don't have access to the PutRecordsRequestEntryInformation
class. However, the code lines that you site are just structure checks for each of the partition key, EHK, and data buffer separately. The total message size is then checked in the calculateRecordSize()
method, which contains checks for the incremental size based on whether a partition key is used vs an EHK, and whether those objects already existing the protobuf index. If you can include more detail from your test, we might be able to find the issue.
Thx!
Sorry for the bad snippet. here is the correct one:
@Test
public void recordAggregatortest()
{
SecureRandom random = new SecureRandom();
byte [] buffer = new byte[1*1024*1024];
random.nextBytes(buffer);
String key = "key";
try {
RecordAggregator aggregator = new RecordAggregator();
aggregator.addUserRecord(
key,
buffer
);
System.out.printf("%d\n", aggregator.getSizeBytes());
Assert.assertFalse("Expected exception", true);
} catch (Exception e) {
}
int EXPECTED_MAX = 1024*1024 - 40;
buffer = new byte[EXPECTED_MAX];
random.nextBytes(buffer);
try {
RecordAggregator aggregator = new RecordAggregator();
aggregator.addUserRecord(
key,
buffer
);
System.out.printf("%d\n", aggregator.getSizeBytes());
Assert.assertFalse("Expected exception", true);
} catch (Exception e) {
e.printStackTrace(System.err);
Assert.assertTrue("Unexpected exception", false);
}
}
So, my issue is not that this is not working... it is. but it is confusing because this will pass the validateData
check, and then when it figures out the real serialized byte size, it fails.
Fine... but atleast give a better exception that helps the user. Something like...
The payload size exceeds the available limit for this partitionKey and explicitHashKey. Payload size is limited to <N> bytes
Unfortunately we can't just track and error on the data size, as the message digest MD5 will change as records are added and therefore it's not a single length that is allowable. Can certainly provide better error messages however.