awslabs/eventbridge-kafka-connector

Support plugins/interceptors to customize event handling before sending to EventBridge

Closed this issue · 20 comments

What is your idea?

Support plugins/interceptors to influence the event handling behavior before sending to EventBridge. For example #110 (side-lining large events), filtering, masking (PII), etc.

Proposal for discussion:

Add aws.eventbridge.interceptor.classes configuration property, an ordered list of fully-qualified classes implementing an Interceptor interface (tbd): accepts single event, returns Optional to implement ignore/drop/skip event behavior.

The connector would provide some out-of-the-box interceptors, e.g. for S3 side-lining (claim-check) or filtering. By default, no interceptor is used (current behavior). These out-of-the-box interceptor classes will use aws.eventbridge.interceptor.<class_suffix>.<property> syntax for configuration. For custom interceptor classes, the developer (provider) is responsible for property naming and to avoid conflicts with existing properties e.g., aws.eventbridge and Kafka connect.

Example

{
    "name": "eventbridge-e2e",
    "config": {
        "auto.offset.reset": "earliest",
        "connector.class": "software.amazon.event.kafkaconnector.EventBridgeSinkConnector",
        "topics": "eventbridge-e2e",
        "aws.eventbridge.connector.id": "eventbridge-e2e-connector",
        "aws.eventbridge.interceptor.classes": "software.amazon.event.kafkaconnector.interceptor.s3offload,com.example.org.piimasker",
        "aws.eventbridge.interceptor.s3offload.key": "myS3EventKeyRef",
        "piimasker.replacement.character": "*",
        "aws.eventbridge.eventbus.arn": "arn:aws:events:us-east-1:000000000000:event-bus/eventbridge-e2e",
        "aws.eventbridge.region": "us-east-1",
        "aws.eventbridge.endpoint.uri": "http://localstack:4566",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false
    }
}

Implementation of this feature will not break existing users.

Related:

Would you be willing to make the change?

Discussed with @agebhar1 he would be interested.

Additional context

Batching is out of scope (since it works on multiple events) and can be exposed as a separate configuration property if needed.

cc/ @maschnetwork @baldawar for feedback

Update: reading more about Kafka's ConsumerInterceptor, it seems this is exactly what we want :)

The ConsumerInterceptor requires the key and value class because it is a generic interface. Currently the sink task retrieve the data via SinkTask#put as a collection of SinkRecords. We need to check if and how Kafka handles this.

IIUC, ConsumerInterceptor hits before the sink is involved so this is agnostic to the sink.

While walking through the code/packages of Kafka Connect, I found this interface Transformation. The generic record type must extends ConnectRecord which SinkRecord does. The de-serialization will be handled by Kafka Connect.

Shipped transformers from Kafka Connect can be found here.

https://docs.confluent.io/platform/current/connect/transforms/overview.html
https://www.confluent.io/blog/kafka-connect-single-message-transformation-tutorial-with-examples/

Are you saying instead of interceptor we should use transformations?

IMHO our use case is not really transformations but "intercepting" i.e., inspecting and potentially modifying polled records before they're sent to the sink. Transformations serve a specific need and are more scoped by design (unless you do crazy stuff with it) - seems also well understood with the community.

Do you see any blocker for us in adopting the interceptor approach?

Btw: my understanding of the event flow/chain is:

Kafka Topic -> Connect -> Interceptor(s) -> Sink

Are you saying instead of interceptor we should use transformations?

I need to understand the interceptor in more detail. How de-/serialization works to handle different key/value types. The e2e tests have already io.confluent.monitoring.clients.interceptor.Monitoring{Producer|Consumer}Interceptor configured. But as far as I can see from the sources, these interceptors do not de-/serialize neither key or value.

For mutation of the value (key) we need to de-/serialize it. Not sure if/how we should do it in a fully generic way. Kafka

Kafka Topic -> Connect -> Interceptor(s) -> Sink

If it work's, yes.

Ah, got it. SerDe is your concern, ack! See this example and to discuss whether we want to follow similar approach or build a custom solution: https://github.com/irori-ab/claim-check-interceptors

Now I need some time to experiment ...

What we totally missed is that the consumer(/producer) interceptor(s) are for the whole Kafka Connect instance. So all sinks(/sources) are affected and might be configured by admins only.

I think you're right, though I haven't tried the following:

  1. Use consumer.override to influence that behavior
  2. Instead of configuring consumer.interceptor.classes on the worker, add this to the sink

Related: you might find this KIP a good background read as well: https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors

Thanks for the information. Will investigate into consumer.override.

I've been tinkering with the consumer interceptor and consumer.override.interceptor.classes. It is only activated for the consumer task if it is configured.

Bildschirmfoto vom 2023-09-20 17-22-48

But unfortunately the provided configuration is not the one for registration of the sink task:

Bildschirmfoto vom 2023-09-20 17-22-02

Instead of the sink configuration the consumer properties are used:

Bildschirmfoto vom 2023-09-20 18-03-02

Thx so much @agebhar1 for digging into it!

So IIUC, we don't have access to sink-specific properties, such as key and value converters (not good), topics (well, guess not an issue), and all the aws. sink task related configuration settings.

I'm still not clear how would we define additional consumer properties for the interceptor classes then? Say we want to build the S3 claim-check interceptor - how would it get consumer (interceptor) specific properties e.g., <interceptorclass>.s3.bucket? I thought that was possible with interceptors - but now I'm wondering because I don't see the customer aws. properties we use in consumerProps, only in sinkConfig.

It's my pleasure.

I will have some time next week for investigation.

After spend some time with debugging the code, I understood how the configuration for the interceptor is build. The comment on the method is also a good hint

Bildschirmfoto vom 2023-09-24 10-41-30

The method org.apache.kafka.connect.runtime.Worker#baseConsumerConfigs copies all properties with the prefix consumer.override. More precise it is org.apache.kafka.connect.runtime.Worker#baseConsumerConfigs#baseConsumerConfigs by the argument ConnectorClientConfigRequest.ClientType.CONSUMER.

If we provide the configuration with the required prefix, the interceptor for the sink task get all properties:

Bildschirmfoto vom 2023-09-24 10-50-49

@embano1 I've tried to run the ReplaceField Single Message Transformer (SMT) with the e2e Kafka Connect Setup and it just works.

The configuration is received. Also additional ones:
Kafka Connect SMT- ReplaceField - config

And the apply method is invoked:
Kafka Connect SMT- ReplaceField - apply

To summarize the current discussion of this issue.

We evaluated Kafkas concept of ConsumerInterceptor which is called before the Kafka client poll method returns. It can either be configured for the whole Kafka Connect instance or a dedicated Sink Task. When intercept with the consumer interceptor, the payload must be known and the specific deserializer must exist, be present and configured. Thus, we need to (re-)implement the deserialization logic from the Sink Task. See also the prototype of #144.

Another approach we evaluated was Single Message Transformers (SMTs). Each single message is transformed before sent to the sink connector. The deserialization is done by the framework. The SMTs should not call out to external APIs which is clear stated in How to Use Single Message Transforms in Kafka Connect:

Don’t call out to external APIs or store state, and don’t attempt any heavy processing in the transform.

It is also mentioned in the talk Single Message Transformations Are Not the Transformations You’re Looking For.

Currently we do have only sidelining for large events/payloads (#110) to AWS S3 planned. To keep thinks simple the result of the discussion with @embano1 was to close this issue for now.

Thx for the write up @agebhar1 ! Closing as discussed.