awslabs/kinesis-aggregation

Feature request: better KCL integration

Closed this issue · 2 comments

Hi,
We are currently looking for a java alternative of KPL and the aggregation seems to be a good start. However the current aggregation solution doesn't seem to be compatible well with KCL solution (#11 ). Can we add more feature support into KCL integration? including:

  1. shard level aggregation and retry logic:
    Similar to #11 , KCL has validation on whether the aggregated data belongs to the correct shard and will skip to process batches that fail the validation. In order to support KCL integration, we need to parse the partition key of each record at producer level, map it to a specific shard and aggregated records for each shard.

  2. split/merge shards support:
    Since the producer data is aggregated based on shard, if the stream shards got splited/merged, the producer needs to detect the shard changes and re-shuffle the record based on the new shard-hashkey mappings. This is an additional bonus feature that could be supported based on 1.

  3. shard level rate limiting:
    Aggregation is a great way of reducing the throughputException. However in the case of large throughput and multiple kinesis producers, it would be ideal to have producer side rate limiting that monitoring the write traffic for each shard. This could help improve the producer performance and have the producer request failed at an early stage rather than fail at the sending stage.

Thanks!

We have mitigated the first problem by creating our own library kinesis-writer. It's in Scala and works with ScalaPB, so it's pretty custom, but maybe it serves as a starting point for your own project or a fork.

@xutingz Thanks much for the feedback and feature requests!

I think this is a matter of scope and intent more than anything. This project intentionally focused only on the aggregation/deaggregation of records according to the KPL message format specification and ignores anything that actually involves knowledge of the stream (unlike the KPL itself).

If stream awareness features are what you need, I would suggest you look into using the KPL itself. The intention of this library was not to duplicate existing KPL functionality, but rather just make the aggregation logic itself available in other languages (specifically languages available on AWS Lambda).

The emphasis on AWS Lambda environments means that persistent knowledge of shard conditions, rate limiting, etc., are always possible because my understanding is that when you subscribe an AWS Lambda function to a Kinesis stream, you get a new Lambda invocation with every batch of records. That means you can't buffer in memory or wait too long on any single invocation since you're not guaranteed your Lambda function will persist between invocations.

I think saying the current solution isn't compatible with the KCL is not entirely accurate. It's totally compatible, but as @cosmincatalin has detailed in #11, you have to be careful how you use it or you can generate invalid batches. I agree that's not great.

That's all a long-winded way of saying that I think the features you mentioned currently exist in the KPL and they are outside the scope of implementation of this project. If you're interested in pursuing this request further, I'd recommend either starting a thread on the AWS Kinesis Forums (https://forums.aws.amazon.com/forum.jspa?forumID=169) or posting an issue directly on the Kinesis Producer Library GitHub repository (https://github.com/awslabs/amazon-kinesis-producer) directly.