KinesisAggregator and KCL don't play nice together
cosmincatalin opened this issue ยท 15 comments
Hi,
After a lot of struggling with making the Java version of the KinesisAggregator
play nice with the KCL, I have finally narrowed down the issue.
Issue
On one side, I have an app that uses the KinesisAggregator
to write aggregated Stream Records on a Kinesis Stream. I use PutRecord
as the persist API method. This is working fine.
The records produced with the help of the KinesisAggregator
end up on the stream. You can try this and you will notice no exceptions and all the CloudWatch metrics will show what you would expect.
On the other side of the stream, I have a Spark app that under the hood uses the KCL to ingest data from Kinesis. The cool thing about the KCL is that it should handle aggregated Stream Records transparently to the user, as if there is no aggregation.
When running the said app however, I can see in CloudWatch that data is being read (from all the shards), but in Spark there is no data coming in ...aparently.
Cause
After going deep into the KCL code, I found where the Kinesis User Records go to die:
if (effectiveHashKey.compareTo(startingHashKey) < 0
|| effectiveHashKey.compareTo(endingHashKey) > 0) {
for (int toRemove = 0; toRemove < recordsInCurrRecord; ++toRemove) {
result.remove(result.size() - 1);
}
break;
}
What this essentially does/says is that for each of the User Records that were already extracted from a Stream Record there is a conditional check to verify that their attached Explicit Hash Key corresponds to the Explcit Hash Key of the Stream Record from where they were extracted.
In other words, User Records need look as if they are coming from the shard from where they are actually coming from (as being part of a Stream Record). When this check fails for one User Record, all of the User Records from that Stream Record are being dropped. I guess this is a good thing, since otherwise I would have probably missed the issue ๐.
Now, the root cause of the problem is that the KinesisAggregator
makes no check/enforcement or otherwise documents the fact that when one produces an aggregated Stream Record that goes to a specific shard (as indicated by its corresponding Explicit Hash Key), all of the User Records it contains must also have assigned Explicit Hash Keys that correspond to the Stream Record's shard.
The example provided in the SampleAggregatorProducer.java
file goes something like this:
private static void sendViaBatch(AmazonKinesis producer, String streamName, RecordAggregator aggregator) {
System.out.println("Creating " + ProducerConfig.RECORDS_TO_TRANSMIT + " records...");
for (int i = 1; i <= ProducerConfig.RECORDS_TO_TRANSMIT; i++) {
String pk = ProducerUtils.randomPartitionKey();
String ehk = ProducerUtils.randomExplicitHashKey(); // <---- NEW EXPLICIT HASH KEY FOR EVERY RECORD
byte[] data = ProducerUtils.randomData(i, ProducerConfig.RECORD_SIZE_BYTES);
// addUserRecord returns non-null when a full record is ready to
// transmit
AggRecord aggRecord = aggregator.addUserRecord(pk, ehk, data);
if (aggRecord != null) {
ForkJoinPool.commonPool().execute(() -> {
sendRecord(producer, streamName, aggRecord);
});
}
}
}
You can see how each new User Record receives a new Explicit Hash Key that does not necessarily correspond to the first one in the Stream Record (which is the one that is actually used by the Stream Record). If just one of the aggregated User Records references an Explicit Hash Key from a different shard, all the records die in the KCL ๐
Possible resolution
There are many ways in which this problem can be solved (these are just some suggestion from the top of my head):
- Document the issue. Leave the code be, it is valid after all, you just need to know how to use it.
- Modify the
validateExplicitHashKey
method here so that it actually checks the Explicit Hash Key in relationship to the shard the User record is supposed to go to. That is to say, make it compatible with the KCL. - Modify the methods so that the same Explicit Hash Key is actually persisted to all of the User Records inside a Stream Record transparently.
Best regards,
Cosmin
Hi Cosmin! Thank you so much for your detailed deep dive into the issues you've been having. I'm hoping to get a chance to dive into this either tonight or tomorrow. My last round of testing with the aggregator was against a Lambda function using the deaggregator in this project which still seems fine, so I'll make sure to try to run against a KCL app as well before submitting any potential fixes. I'm sorry to hear you've had so many issues, so I'd definitely like to get this fixed for you ASAP.
Hi,
Thanks for looking into this. When you make your tests, make sure to assign multiple random Explicit Hash Keys to the User Records in one Stream Record, otherwise you won't see the issue.
Actually, this is how I work around the problem, I choose an Explicit Hash Key and I assign it to all User Records and to the Stream Record that contains them.
Best regards,
Cosmin
Hi Cosmin,
Thanks again for the fantastic write-up. I dug in this weekend and found and reproduced the problems you mentioned. I agree with your list of possible resolutions, though I'm still deciding which is best.
This is much trickier than I thought.
I think the major problem I see is that, no matter what I do, the user can potentially change the Explicit Hash Key (EHK) or Partition Key (PK) on the actual Kinesis Record between the time they get it from the AggRecord
class and when they actually send it via the Kinesis SDK. If that happens, the user will likely miss records (like you did) and there's nothing I can do about it. So it definitely seems like there needs to be some better documentation around that.
Otherwise, the KCL seems very strict about checking. Like you said, if the EHK doesn't match the shard, the records are dropped. If you don't set an EHK, however, the KCL calculates the EHK from the PK on the record and still drops things if the EHK doesn't match. It also looks like if both PK and EHK are not set, the KCL will get a NullPointerException.
If we're not happy with only a documentation-based solution (and I'd like to do better than that if possible), it seems like the only way to prevent any issues or confusion for the users of the library is to completely remove the ability to set PKs and EHKs on the individual user records added to the AggRecord. I don't love that either though.
I think the conclusion I'm coming to is similar to yours. I document well (in Javadoc and README) how to be careful of setting PKs/EHKs and when they will/won't get overwritten and then I force all user records added to the AggRecord
to share the same PK/EHK as the first record added so that none will get rejected on the KCL side.
Does that sound reasonable?
I'll try to set this up on a separate branch and let you verify the pull request when I'm ready.
Thanks again for all the details.
~Brent
Hi Bent,
I think forcing the EHK of all the User Records
in one Stream Record
to match the first User Record
in the said Stream Record
is the way to go. Of course, this will have to be documented. The main thing that needs to be understood is that the first record plays the decider role and all other record follow it.
All the best,
Cosmin
Is it possible to hit this problem using the KPL sending to a kinesis stream and then kinesis firehose?
@zackb I believe what is referenced above is an issue specific to this project. Your question sounds like you're describing:
KPL->Kinesis->Kinesis Firehose
which doesn't involve this library at all and therefore shouldn't be an issue.
@cosmincatalin can't thank you enough for tracking this down!
I ran into this issue with the node client. To fix it I went the route you both mentioned of assigning the first record's EHK to all User Records
in the Stream Record
. I'll get a PR submitted once I test it out some more.
But - I'm also wondering about the logic referenced in the KCL and what the use-case is. My only thought right now is that it is there to take into account a shard split and to prevent duplicate processing, is that a fair assumption? The way I understand a shard-split is that new child shards are created and all future writes are directed to these shards , but existing data is not re-distributed to the new shards; instead, it cuts off writes to the old shards and directs writes to the new shards. I also read that the KCL does not spin up new workers on a reshard until to parent shard is finished processing. If these are true - that leads me to believe that this KCL logic to drop records is un-necessary. Would you agree?
I'm fairly new to Kinesis and the KCL so I am likely lacking some understanding, but wanted to get your thoughts just in case.
Thanks!
@aheuermann Having not worked directly on the KCL, I can't answer your question on the use-case, though I definitely had the same question myself when @cosmincatalin pointed it out.
I think your best bet might be to either post a question on the AWS Kinesis forums (https://forums.aws.amazon.com/forum.jspa?forumID=169) or maybe submit an issue directly on the Kinesis Client Library GitHub repository (https://github.com/awslabs/amazon-kinesis-client). I would be interested in the answer as well.
FWIW, your understanding of how shard splits work lines up with mine.
What makes this issue even worse: The partition/hash keys are still transmitted per user record, not only per aggregated record. At the same time, I'm forced to choose the exact same partition/hash key for all user records in an aggregated record. This adds totally unnecessary overhead, in particular for small user records:
Consider small records of 100 bytes. The per user record overhead is about 70 bytes. Aggregating 1000 records adds 1000 times the same hash/partition key, and increases the agg record size by 70%, and decreases the maximum throughput by 40%.
@fmthoma We unfortunately don't control the message specification for aggregation; that's defined by the Kinesis Producer Library (and is used by the Kinesis Client Library):
https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
The user records that go into an aggregated record have a required partition_key_index field and the aggregated record stores up all the different partition keys in the partition_key_table.
What that should (in theory) mean is that if smaller records share the same partition key(s), you should only see the bytes for the partition key showing up once in the aggregated record rather than repeatedly. The uint64 partition_key_index will be repeated for every record, but the string partition_key itself should not repeat.
We're using the standard Google protofbuf library interface (generated from the protobuf format linked above) for the aggregation, so I wouldn't expect there's a bug in that particular bit (though always possible). That being said, I'm far from a protobuf mastermind, so if you happen to see any issues or have any suggestions on how to make this better, I'm certainly amenable.
@brentnash You're indeed right, we did observe the overhead in the KPL, and in the kinesis-aggregation library before we switched to packing only records with the same partition key into one aggregated record.
So for kinesis-aggregation, everything works as expected now. For the KPL, we're still having this issue, and looking at the KCL code, it's quite clear why: The KCL validates that the explicit hash key is in a certain range, namely the range of hash keys that are assigned to the given shard. The KPL takes care of only aggregating records for one shard into one agg record, but still every record has its individual partition/hash key. Unfortunately, aggregation in the KPL is completely transparent, so there is a tradeoff between using as few different partition/hash keys as possible (to keep overhead low), and using evenly distributed keys (to balance between shards). Unaware of the tradeoff, we chose random(-looking) keys to achieve optimum balancing, at the expense of large overhead.
@cosmincatalin @brentnash is documenting this limitation and force setting all User records in an Aggregated record to have the same EHK the way to go. Is there a pr for this? If not I could try and raise one.
@brentnash isn't with AWS anymore so may have limited time to invest in this issue now. If he does, thank you! I'll try looking into this for you.
I think that a documentation update is the minimum we can do, and otherwise setting the EHK to the Primary Key from the client side is going to be safe bet to ensure that the shard mapping is preserved.
Thanks @IanMeyers . Would it be fine to just maintain something like a current pk/ehk in the RecordAggregator and modify the addUserRecord function to force set the first key to all records which are to be part of the AggRecord irrespective of what the use gives? Seems likes the simplest fool proof way to address this issue and make sure no one falls into this trap again.
Something like this
public AggRecord addUserRecord(String partitionKey, String explicitHashKey, byte[] data) throws Exception {
if(currentPk == null) {
//First init for new Request
this.currentPk = partitionKey;
this.currentEhk = explicitHashKey;
}
LOG.debug("Aggregating record with pk {}", currentPk);
boolean success = this.currentRecord.addUserRecord(currentPk, currentEhk, data);
if (success) {
// we were able to add the current data to the in-flight record
return null;
} else {
// this record is full, let all the listeners know
final AggRecord completeRecord = this.currentRecord;
for (ListenerExecutorPair pair : this.listeners) {
pair.getExecutor().execute(() -> {
pair.getListener().recordComplete(completeRecord);
});
}
// current record is full; clear it out, make a new empty one and
// add the new user record
clearRecord();
//Reset pk and ehk for next aggregated record
LOG.debug("Not Aggregating record with pk {}", currentPk);
this.currentPk = partitionKey;
this.currentEhk = explicitHashKey;
LOG.debug("Init Aggregating record with pk {}", currentPk);
success = this.currentRecord.addUserRecord(currentPk, currentEhk, data);
if (!success) {
throw new Exception(String.format("Unable to add User Record %s, %s with data length %s", partitionKey,
explicitHashKey, data.length));
}
return completeRecord;
}
}
@ssesha I believe I have a solution for this. Are you in a position to help me test at scale? Can you email meyersi@amazon.com if so?