Support sidelining large events/payloads to an external event store
embano1 opened this issue · 23 comments
What is your idea?
EventBridge's PutEvents
size limit is currently 256KB. Often, Kafka records are larger (default 1MB) and the connector drops (or DLQs, if configured) those records with a warning. By adding a new configuration property, the connector could send (sideline) those records to a configured event store, such as S3 e.g., by using the claim-check pattern, replacing the event payload with an S3 link (reference).
Would you be willing to make the change?
May be.
Additional context
We should discuss the API/implementation in this issue before creating a PR. cc/ @baldawar
@agebhar1 thoughts on how much logic could/should we use from https://github.com/awslabs/payload-offloading-java-common-lib-for-aws? There's a lot to consider taking a dependency vs rolling our own so happy to discuss here if this works for our scenario incl defining payload key path for offloading.
I did a quick and dirty PoC locally for the mentioned library @embano1, which I found too 😅. The DefaultEventBridgeMapper#createJsonPayload
was updated to send the customer value
additionally to the defined S3 bucket.
The log from the integration test EventBridgeSinkConnectorIT
shows the expected entry.
Only the methods S3Dao#storeTextInS3 and S3BackedPayloadStore#storeOriginalPayload with the interface PayloadStore are used. The dependencies are Jackson and AWS SDK. They are not the latest one but managed without conflicts by Maven. The result of storeOriginalPayload(...)
is (unfortunately) a stringified JSON Array:
[
"software.amazon.payloadoffloading.PayloadS3Pointer",
{
"s3BucketName": "s3BucketName",
"s3Key":"s3Key"
}
]
if the S3BackedPayloadStore
is used. The S3Dao#storeTextInS3
are only 20 lines.
@muellerc please keep an eye on this issue. We'll share discussion notes/implementation options here as well to gather feedback on the proposal.
Same for @dirk-froehner (once we have more to share)
!Updated for JSON Path
I will summarize the current status and add the the ideas @embano1 and I have discussed.
The S3 offloading will be activated if the required configuration is provided which is the bucket name. The optional configuration is the JSON Path to the JSON sub-document which should be sidelined from the EventBridge event.
The JSON Path must be start with $.detail.value
, as only sidelining the Kafka record value is supported. The JSON object which is referenced by the pointer will be removed with the key from the EventBridge event and put to S3. The S3 resource key will be added to the event instead.
The key is $.detail.dataref
based on CloudEvents Dataref.
If the JSON path result is empty the EventBridge event will be not modified.
Example (non-relevant data is omitted):
JSON Path: $.detail.value.orderItems
Bucket: bucket_name
EventBridge event before:
{
"detail": {
"topic": "json-values-topic",
"partition": 0,
"offset": 0,
"timestamp": 1684841916831,
"timestampType": "CreateTime",
"headers": [],
"key": "order-1",
"value": {
"orderItems": [
"item-1",
"item-2"
],
"orderCreatedTime": "Tue May 23 13:38:46 CEST 2023"
}
}
}
EventBridge event after:
{
"detail": {
"topic": "json-values-topic",
"partition": 0,
"offset": 0,
"timestamp": 1684841916831,
"timestampType": "CreateTime",
"headers": [],
"key": "order-1",
"value": {
"orderCreatedTime": "Tue May 23 13:38:46 CEST 2023"
},
"dataref": "arn:aws:s3:::bucket_name/key_name"
}
}
S3 resource payload:
[ "item-1", "item-2" ]
The connector configuration will be extended by (everything else is omited):
{
"config": {
"aws.eventbridge.connector.recordOffloading.s3.default.bucket": "bucket_name", // †
"aws.eventbridge.connector.recordOffloading.default.fieldref": "/detail/value" // ‡
}
}
† : mandatory
‡ : optional, /detail/value
by default
The *.default.*
part is added to support custom configuration in future.
The S3 resource key should be deterministic to not put the data again and support exactly once semantic in case of a previous error. The key could be generated by (only on will be implemented):
- SHA of JSON object
$.detail.value
- composite key of topic, partition and offset
The S3 put events are send before the EventBridge event for each entry where the JSON Path contains something.
Detailed error handling for S3 still needs to be discussed.
Thank you so much @agebhar1 for the writeup and proposal!
Some questions:
- Can you please provide more examples e.g., one where
fieldref
does not match and one wherefieldref
=/detail/value
- How do you feel about the ergonomics of the
dataref
key always being at theroot
level vs where the replacement happens (for example iffieldref
is/detail/value/orderItems
puttingdataref
to{"detail": {"value": {"dataref" ...}}}
) - Would using the current EventBridge JSONPath expression e.g.,
$.detail
instead of/detail
work for consistency? - Suggesting
aws.eventbridge.offloading
as key prefix - Can you take a note that this behavior will be customizable in the future if a custom class is provided (with a note on the interface design and how the current implementation [no replacement] will be a no-op class implementing this interface)
cc/ @baldawar @muellerc @dirk-froehner for feedback
!Updated for JSON Path
My pleasure.
(1)
Example (non-relevant data is omitted) sideline all:
JSON Path: $.detail.value
Bucket: bucket_name
EventBridge event before:
{
"detail": {
"topic": "json-values-topic",
"partition": 0,
"offset": 0,
"timestamp": 1684841916831,
"timestampType": "CreateTime",
"headers": [],
"key": "order-1",
"value": {
"orderItems": [
"item-1",
"item-2"
],
"orderCreatedTime": "Tue May 23 13:38:46 CEST 2023"
}
}
}
EventBridge event after:
{
"detail": {
"topic": "json-values-topic",
"partition": 0,
"offset": 0,
"timestamp": 1684841916831,
"timestampType": "CreateTime",
"headers": [],
"key": "order-1",
"dataref": "arn:aws:s3:::bucket_name/key_name"
}
}
S3 resource payload:
{
"orderItems": [
"item-1",
"item-2"
],
"orderCreatedTime": "Tue May 23 13:38:46 CEST 2023"
}
Example (non-relevant data is omitted) sideline nothing:
JSON Path: $.detail.value.order
Bucket: bucket_name
EventBridge event before:
{
"detail": {
"topic": "json-values-topic",
"partition": 0,
"offset": 0,
"timestamp": 1684841916831,
"timestampType": "CreateTime",
"headers": [],
"key": "order-1",
"value": {
"orderItems": [
"item-1",
"item-2"
],
"orderCreatedTime": "Tue May 23 13:38:46 CEST 2023"
}
}
}
EventBridge event after:
{
"detail": {
"topic": "json-values-topic",
"partition": 0,
"offset": 0,
"timestamp": 1684841916831,
"timestampType": "CreateTime",
"headers": [],
"key": "order-1",
"value": {
"orderItems": [
"item-1",
"item-2"
],
"orderCreatedTime": "Tue May 23 13:38:46 CEST 2023"
}
}
}
no S3 object created
(2)
- fixed
dataref
position:- pro: A client read event from EventBridge could immediate recognize if a part of the Kafka record is sidelined if
dataref
is at an fixed position. - cons: The client needs to know the JSON Pointer to possible assemble the original JSON.
- pro: A client read event from EventBridge could immediate recognize if a part of the Kafka record is sidelined if
- dynamic
dataref
position:- pro: The client knows the position for possible assemble the original JSON.
- cons: A client must read the whole document to recognize if a part of the Kafka record is sidelined.
I have no feeling which one is more ergonomic. Let's see what the colleagues thinks.
(3)
The JSONPath subset should work, but the used JSON library, Jackson, does not support it. Since it's already a subset we can maybe use a (simple) mapping from JSON Pointer to the subset. The syntax should not support array indices and wildcards (*) for simplicity and uniqueness.
(4)
The connector configuration will be extended by (everything else is omitted):
{
"config": {
"aws.eventbridge.offloading.s3.default.bucket": "bucket_name", // †
"aws.eventbridge.offloading.default.fieldref": "$.detail.value" // ‡
}
}
† : mandatory
‡ : optional, /detail/value by default
(5) follows
To continue the implementation of this feature I need some feedback for
- how is the S3 key generated: SHA(?) of
$.detail.value
, Topic+Partition+Index of Kafka record, ... dataref
value: is the ARN to the bucket data okay?
For the S3 API status codes I will have a look into the sources of the SQS extended Python library, but any suggestions are welcome.
how is the S3 key generated
Would SHA over the whole JSON serialized Kafka record work? Because it includes this information, or does this happen before you have a full EventBridge JSON object? SHA generation obviously before any dataref replacement
dataref
value: is the ARN to the bucket data okay?
Yes, IMHO arn:aws:s3:::bucket_name/key_name
would work. key_name
would be the SHA?
The mapping from the raw bytes of the Kafka record is serialized within a sub routine of DefaultEventBridgeMapper
s public EventBridgeMappingResult map(List<SinkRecord> records)
If we keep sidelining as an extra 'pipeline' step
then it's to late to calculate the SHA from the raw bytes.
Using ${topic}-${partition}-${offset}
is not an option? It's also unique and does not require any calculation.
Yes, IMHO arn:aws:s3:::bucket_name/key_name would work. key_name would be the SHA?
Yes, in the case of using SHA. When using topic, partition and offset it is:
arn:aws:s3:::bucket_name/${topic}-${partition}-${offset}
then it's to late to calculate the SHA from the raw bytes
Sorry, I miscommunicated: since you're dealing with PutEventsRequestEntry
you could SHA over it to generate the key and upload before batching/sendToEventBridge
, no?
Using ${topic}-${partition}-${offset} is not an option? It's also unique and does not require any calculation.
Strictly speaking not unique since you can repartition/move records between topics - however this also affects our current EventBridgeMapper
, i.e., leading to a different outbound event since we use those fields as well. Events from different Kafka clusters using the same bucket will lead to dups as well (if same topic name is used, which might not be uncommon e.g., test/dev/prod). We could add source/connectorID to the key since connector.id
is supposed to be unique per our documentation - still doesn't prevent users from reusing the ID though 🤷
Regarding
(2)
fixed dataref position:
pro: A client read event from EventBridge could immediate recognize if a part of the Kafka record is sidelined if dataref is at an fixed position.
cons: The client needs to know the JSON Pointer to possible assemble the original JSON.dynamic dataref position:
pro: The client knows the position for possible assemble the original JSON.
cons: A client must read the whole document to recognize if a part of the Kafka record is sidelined.
I have no feeling which one is more ergonomic. Let's see what the colleagues thinks.
Yeah, this one needs thinking/discussion. Let's see what @baldawar thinks.
Also, we could provide an option to sideline the whole event or only the JSONPath expression part - perhaps sidelining the whole object, while requiring more space, is the easier approach for clients, placing dataRef
under .root
? Also a two-way door if we decide to go with whole event first (least surprise)?
then it's to late to calculate the SHA from the raw bytes
Sorry, I miscommunicated: since you're dealing with PutEventsRequestEntry you could SHA over it to generate the key and upload before batching/sendToEventBridge, no?
My fault, we have MappedSinkRecord<PutEventsRequestEntry>
so the original Kafa SinkRecord
is still there and the SHA could be calculated from bytes
:
var record = mappedSinkRecord.getSinkRecord()
var bytes = jsonConverter.fromConnectData(record.topic(), record.keySchema(), record.key()))
Do you think SHA-256 is enough? Javas MessageDigest supports only MD5, SHA-1 and SHA-256.
Using ${topic}-${partition}-${offset} is not an option? It's also unique and does not require any calculation.
Strictly speaking not unique since you can repartition/move records between topics - however this also affects our current EventBridgeMapper, i.e., leading to a different outbound event since we […]
You're right. I forgot the repartitioning.
I have implemented the S3 key generation using the SHA-256 of the SinkRecord
value. But with different JSON Paths, the key does not change. Shouldn't we use the SHA of the S3 payload?
@agebhar1 fyi https://aws.amazon.com/about-aws/whats-new/2024/02/amazon-sqs-extended-client-library-python-payloads/
@embano1 all errors on S3 operations are propagated to the user. But I found these https://docs.aws.amazon.com/AmazonS3/latest/userguide/ErrorBestPractices.html and https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html.
But with different JSON Paths, the key does not change. Shouldn't we use the SHA of the S3 payload?
I think I know where you're going, but can you please elaborate the "with different JSON Paths, the key does not change"? Are you concerned about referencing the same S3 object from different paths?
all errors on S3 operations are propagated to the user. But I found these
We're using the SDK, so should be mostly good here? (exception handling)
But with different JSON Paths, the key does not change. Shouldn't we use the SHA of the S3 payload?
I think I know where you're going, but can you please elaborate the "with different JSON Paths, the key does not change"? Are you concerned about referencing the same S3 object from different paths?
Sure.
Example EventBridge Event:
{
"detail": {
"topic": "json-values-topic",
"partition": 0,
"offset": 0,
"timestamp": 1684841916831,
"timestampType": "CreateTime",
"headers": [],
"key": "order-1",
"value": {
"orderItems": [
"item-1",
"item-2"
],
"orderCreatedTime": "Tue May 23 13:38:46 CEST 2023"
}
}
}
(a)
JSON Pointer: $detail.value.orderItems
SHA: X
S3 Key: X
S3 payload: ["item-1", "item-2"]
(b)
JSON Pointer: $detail.value
SHA: X
S3 Key: X
S3 payload: { "orderItems": ["item-1", "item-2"], "orderCreatedTime": "Tue May 23 13:38:46 CEST 2023" }
S3 Key (a) == S3 Key (b)
S3 payload (a) != S3 payload (b)
Ah, I see where the confusion comes from :) You're using the SinkRecord
whereas I was referring to the modified PutEventsRequest
you have at the end of that stage. I see that it implements Java SerDe, just don't know how it's really done. If so, you could use the SerDe and then create a SHA over the object? This will be unique per JSON pointer (payload); might need to think about edge cases for empty payloads e.g., when JSON pointer does not exist/is empty/is null?