Support plugins/interceptors to customize event handling before sending to EventBridge
Closed this issue · 20 comments
What is your idea?
Support plugins/interceptors to influence the event handling behavior before sending to EventBridge. For example #110 (side-lining large events), filtering, masking (PII), etc.
Proposal for discussion:
Add aws.eventbridge.interceptor.classes
configuration property, an ordered list of fully-qualified classes implementing an Interceptor
interface (tbd): accepts single event, returns Optional
to implement ignore/drop/skip event behavior.
The connector would provide some out-of-the-box interceptors, e.g. for S3 side-lining (claim-check) or filtering. By default, no interceptor is used (current behavior). These out-of-the-box interceptor classes will use aws.eventbridge.interceptor.<class_suffix>.<property>
syntax for configuration. For custom interceptor classes, the developer (provider) is responsible for property naming and to avoid conflicts with existing properties e.g., aws.eventbridge
and Kafka connect.
Example
{
"name": "eventbridge-e2e",
"config": {
"auto.offset.reset": "earliest",
"connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
"topics": "eventbridge-e2e",
"aws.eventbridge.connector.id": "eventbridge-e2e-connector",
"aws.eventbridge.interceptor.classes": "software.amazon.event.kafkaconnector.interceptor.s3offload,com.example.org.piimasker",
"aws.eventbridge.interceptor.s3offload.key": "myS3EventKeyRef",
"piimasker.replacement.character": "*",
"aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:000000000000:event-bus/eventbridge-e2e",
"aws.eventbridge.region": "us-east-1",
"aws.eventbridge.endpoint.uri": "http://localstack:4566",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}
}
Implementation of this feature will not break existing users.
Related:
Would you be willing to make the change?
Discussed with @agebhar1 he would be interested.
Additional context
Batching is out of scope (since it works on multiple events) and can be exposed as a separate configuration property if needed.
cc/ @maschnetwork @baldawar for feedback
Update: reading more about Kafka's ConsumerInterceptor
, it seems this is exactly what we want :)
The ConsumerInterceptor requires the key and value class because it is a generic interface. Currently the sink task retrieve the data via SinkTask#put as a collection of SinkRecords. We need to check if and how Kafka handles this.
IIUC, ConsumerInterceptor
hits before the sink
is involved so this is agnostic to the sink
.
While walking through the code/packages of Kafka Connect, I found this interface Transformation. The generic record type must extends ConnectRecord which SinkRecord does. The de-serialization will be handled by Kafka Connect.
Shipped transformers from Kafka Connect can be found here.
https://docs.confluent.io/platform/current/connect/transforms/overview.html
https://www.confluent.io/blog/kafka-connect-single-message-transformation-tutorial-with-examples/
Are you saying instead of interceptor
we should use transformations
?
IMHO our use case is not really transformations but "intercepting" i.e., inspecting and potentially modifying polled records before they're sent to the sink
. Transformations serve a specific need and are more scoped by design (unless you do crazy stuff with it) - seems also well understood with the community.
Do you see any blocker for us in adopting the interceptor
approach?
Btw: my understanding of the event flow/chain is:
Kafka Topic -> Connect -> Interceptor(s) -> Sink
Are you saying instead of interceptor we should use transformations?
I need to understand the interceptor in more detail. How de-/serialization works to handle different key/value types. The e2e tests have already io.confluent.monitoring.clients.interceptor.Monitoring{Producer|Consumer}Interceptor
configured. But as far as I can see from the sources, these interceptors do not de-/serialize neither key or value.
For mutation of the value (key) we need to de-/serialize it. Not sure if/how we should do it in a fully generic way. Kafka
Kafka Topic -> Connect -> Interceptor(s) -> Sink
If it work's, yes.
Ah, got it. SerDe is your concern, ack! See this example and to discuss whether we want to follow similar approach or build a custom solution: https://github.com/irori-ab/claim-check-interceptors
Now I need some time to experiment ...
What we totally missed is that the consumer(/producer) interceptor(s) are for the whole Kafka Connect instance. So all sinks(/sources) are affected and might be configured by admins only.
I think you're right, though I haven't tried the following:
- Use
consumer.override
to influence that behavior - Instead of configuring
consumer.interceptor.classes
on the worker, add this to the sink
Related: you might find this KIP a good background read as well: https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
Thanks for the information. Will investigate into consumer.override
.
I've been tinkering with the consumer interceptor and consumer.override.interceptor.classes
. It is only activated for the consumer task if it is configured.
But unfortunately the provided configuration is not the one for registration of the sink task:
Instead of the sink configuration the consumer properties are used:
Thx so much @agebhar1 for digging into it!
So IIUC, we don't have access to sink-specific properties, such as key and value converters (not good), topics (well, guess not an issue), and all the aws.
sink task related configuration settings.
I'm still not clear how would we define additional consumer properties for the interceptor classes then? Say we want to build the S3 claim-check interceptor - how would it get consumer (interceptor) specific properties e.g., <interceptorclass>.s3.bucket
? I thought that was possible with interceptors - but now I'm wondering because I don't see the customer aws.
properties we use in consumerProps
, only in sinkConfig
.
It's my pleasure.
I will have some time next week for investigation.
After spend some time with debugging the code, I understood how the configuration for the interceptor is build. The comment on the method is also a good hint
The method org.apache.kafka.connect.runtime.Worker#baseConsumerConfigs
copies all properties with the prefix consumer.override
. More precise it is org.apache.kafka.connect.runtime.Worker#baseConsumerConfigs#baseConsumerConfigs
by the argument ConnectorClientConfigRequest.ClientType.CONSUMER
.
If we provide the configuration with the required prefix, the interceptor for the sink task get all properties:
@embano1 I've tried to run the ReplaceField
Single Message Transformer (SMT) with the e2e Kafka Connect Setup and it just works.
To summarize the current discussion of this issue.
We evaluated Kafkas concept of ConsumerInterceptor which is called before the Kafka client poll
method returns. It can either be configured for the whole Kafka Connect instance or a dedicated Sink Task. When intercept with the consumer interceptor, the payload must be known and the specific deserializer must exist, be present and configured. Thus, we need to (re-)implement the deserialization logic from the Sink Task. See also the prototype of #144.
Another approach we evaluated was Single Message Transformers (SMTs). Each single message is transformed before sent to the sink connector. The deserialization is done by the framework. The SMTs should not call out to external APIs which is clear stated in How to Use Single Message Transforms in Kafka Connect:
Don’t call out to external APIs or store state, and don’t attempt any heavy processing in the transform.
It is also mentioned in the talk Single Message Transformations Are Not the Transformations You’re Looking For.
Currently we do have only sidelining for large events/payloads (#110) to AWS S3 planned. To keep thinks simple the result of the discussion with @embano1 was to close this issue for now.