Kafka protocol filter
mattklein123 opened this issue Β· 70 comments
It's looking like Lyft may be able to fund Kafka protocol support in Envoy sometime this year.
Community, can you please chime in on what you would like to see? I know this will be a very popular feature. Stats (as in the Mongo filter) are a no-brainer. What else? Eventually routing and load balancing for languages in which the Kafka client drivers are not as robust?
First, this is totally awesome.
Second, as someone with Kafka experience but rather new to service meshes, I have two types of suggestions: I have some additional features that I'd want to use if I had a Kafka-Proxy and then I also have a very unbaked suggestion for a different way to look at Kafka/Mesh integration that I'd like to discuss.
Let's start with additional features (not all are my idea, folks from Confluent helped!):
- You can use the proxy to validate events. Because Kafka is "content agnostic", misbehaving clients can write literally anything. A proxy can validate that the message is in Protobufs (or whatever), that it has mandatory headers, etc.
- Rate limiting is useful.
- Add headers that allow tracking lineage of events - this was one of the reasons headers were added to Kafka.
- Message format on the server can't be bumped up until all the clients upgraded, which can delay introduction of new features for a long long time. A proxy can convert the format.
- Count events for monitoring
- Really cool if possible: Failover to a DR cluster. This is easy for producers and currently super tricky for consumers (because offsets). Not sure if a service-mesh is enough for that one.
Here's the other point-of-view:
Kafka isn't just a service, "kinda like a database", Kafka is a way of sending messages from one service to another. Kinda like a transport layer, except async and persistent. I wonder if Envoy can integrate with Kafka even deeper and allow services to use Kafka to communicate with other services (instead of REST, gRPC, etc). And then you can "hide" transition from REST to Kafka communication in the same way Lyft used Envoy to move to gRPC.
Not sure if this makes total sense, since async programming is different, but worth mulling over.
This would be dope. A couple use cases to start are request logs and stats. You can also build a nice audit log by taking your request logs and enriching them with the users's info. This could also help people write their own Kafka filters adding features like upconverting old Kafka clients to newer protocol versions.
Thanks @gwenshap those are all great ideas. Would love to discuss more. If Confluent is potentially interested in helping with this (even if just design) can you or someone else reach out to me? My email address is easy to find or you can DM me on Twitter to connect.
Other interesting ideas that come to mind:
- Add Kafka support to the upcoming tap/dump feature so that we can dump to a Kafka stream.
- Shadow requests to a Kafka stream instead of HTTP/gRPC shadow.
@mattklein123 Do you mind explaining the primary use case you had in mind? Would this be a "Front Envoy" that might be used for ingress into Kubernetes? Or would a side car proxy pretend to be all Kafka brokers to local clients?
By "Front Envoy", I mean something like slide 15 in your deck here.
- tracing/monitoring: who is writing to which topics? Create a graph of data flow from producers to topics to consumers. https://logallthethings.com/2017/05/17/visualizing-the-flow-of-data-as-it-moves-through-a-kafka-cluster/
- add compression to apps which arenβt already using it
- add/remove SSL for traffic headed to/from broker
- fault injection. Trigger consumer rebalances, broker down, producer transaction failures
- metrics: byte rate per client id. Byte rate per consumer group.
- like @gwenshap said: validate that requests have certain attributes. Example: CreateTopic requests must have minimum replication factor. Like https://kafka.apache.org/0110/javadoc/org/apache/kafka/server/policy/CreateTopicPolicy.html but for all kafka API types.
- automatic topic name conversion to/from a cluster. Like, an app would publish to topic
foo
, and it would actually go to topicapplication.foo
. This would allow multi tenant clusters, but the application would think they have the whole namespace to themselves. - consumer lag monitoring for the entire datacenter
- metrics about which apps are using which versions of the client libraries
- +1 on failover for consumers to another datacenter. You can do offset->timestamp conversion on one datacenter, and then do timestamp->offset conversion on the failover datacenter.
Awesome that you guys are looking into Kafka protocol support, that'd be an amazing feature to have!
+1 to @gwenshap and @wushujames ideas, also:
- dynamic routing (for multicluster setups) of producers and consumers. This, in conjunction with a control plane would be a killer-feature: cluster/topic drain and rebalancing operations made easy;
- double-producing (e.g. if user wants to produce data to 2 or more clusters/topics at the same time);
Here are some ideas I would find useful (some already mentioned)
- Monitor consumer lag
- Failover to another cluster/datacenter while maintaining log position (hard due to offset mismatch)
- Mirroring topics to another cluster, or teeing publishes "exactly once" to maintain identical clusters
- Automatic topic switching e.g. specify
my_topic_*
to consumemy_topic_1
and switch tomy_topic_2
when it becomes available, transparently to the consumer. This would be useful for data migrations without interrupting consumption. In other terms, the ability to hot swap topics unbeknownst to the consumer. - Filter data on the server before sending to the consumer.
- Producer rate limiting
Between @mattklein123 @gwenshap and @wushujames this is an awesome list of features.
As a general question, particularly for Matt: would you see any value in capturing some of the more generic features and turning them higher level abstraction for messaging support in the service mesh?
Perhaps also look at some of what kafka-pixy does. I find the wrapping of Kafka's native protocol into with REST/gRPC to be pretty compelling. This better supports usage from FaaS and apps that don't necessarily have the ability to do a long-lived connection.
I'd like to see Envoy's Zipkin traces reported to Zipkin using Zipkin's Kafka collector.
Thanks everyone for the awesome suggestions that have been added to this issue. From Lyft's perspective, we are primarily interested in:
- L7 protocol parsing for observability (stats, logging, and trace linking with HTTP RPCs)
- Ratelimiting at both the connection and L7 message level
So I think this is where we will focus, probably starting in Q3. I will need to go through and do some basic SWAGing in terms of how much existing code in https://github.com/edenhill/librdkafka can be reused for the protocol parsing portion. We will also coordinate with folks at Confluent on this work as well. Please reach out if you are also interested in helping.
Are there any plans at this point for how to practically proxy the Kafka protocol to a pool of brokers? In general, clients connect to a seed node and send it a "metadata" request for the topic/partition they're interested in. The response to that includes a hostname and port, which clients then connect to directly. It means that in practice Kafka clients are (by design) very good at dis-intermediating proxies.
@ebroder One way to do it will be to register the proxy address (probably localhost:port if we are using sidecar) as their advertised-listeners. And then they'll return this address to the clients.
In the latest release, advertised hosts will be a dynamic property, so this may become even easier to manage.
@gwenshap: Interesting. That would imply a Kafka cluster that would only work with the sidecars then, right?
I didn't mean to imply that. That's why I said "one way". I know @travisjeffery and @theduderog have ideas about central proxies. Sidecars do seem to be Envoy's main mode of deployment.
That does require that you have to allocate a sidecar port for every kafka broker you're running, right? It seems like the overhead/management costs there could potentially add up quickly
I'm not sure? How expensive are ports? Kafka clusters with over 50 brokers are quite rare.
@ebroder @wushujames @gwenshap TBH I really have not gotten into the details yet. If the Kafka protocol does not support a built-in method of proxying (we should discuss), I think there are a few options:
- Pre-configure all broker addresses in Envoy and have the seed node return Envoy addresses. Pro: Conceptually simple, Con: Annoying to configure.
- Use some type of IP tables interception to make sure all Kafka connections go through Envoy on the way to the brokers. Pro: Transparent. Con: Requires kernel/external scripts. Needs more investigation and thinking.
- Have Envoy do active L7 proxying/mutation of the seed communication, and swap broker addresses with local Envoy, and then potentially remember which broker to send which messages to. Pro: No kernel magic, fully transparent. Con: Very complicated, involves handling parts of the client/broker handshake.
But again I haven't done any investigation. I was going to carve out some time to learn more about all of this in Q2 and potentially find some people who would like to help me learn more about it. :)
@mattklein123 @gwenshap @ebroder: Yeah, I had the same idea as Matt's option 3. Since the initial request to the brokers has to flow through the sidecar anyway, it can intercept and rewrite the response back to the client, and transform the request/responses as they flow between client/broker. Sounds expensive to me, but I know very little about envoy's performance.
@mattklein123 @gwenshap @ebroder @wushujames: take a look at https://medium.com/solo-io/introducing-gloo-nats-bring-events-to-your-api-f7ee450f7f79 & https://github.com/solo-io/envoy-nats-streaming - love to get your thoughts .... we created a NATS filter for Envoy ...
As @wushujames mentioned:
automatic topic name conversion to/from a cluster. Like, an app would publish to topic foo, and it would actually go to topic application.foo. This would allow multi tenant clusters, but the application would think they have the whole namespace to themselves.
This will be very useful for cannary release or blue/green deployment since will allow to modify the actual topic without any change in application.
@mattklein123: There have been a lot of requests in this thread. Will there a design doc with a list of which requested feature will be supported?
@georgeteo yes when I start working on this (unsure when) I will provide a design doc once I do more research.
This issue has been automatically marked as stale because it has not had activity in the last 30 days. It will be closed in the next 7 days unless it is tagged "help wanted" or other activity occurs. Thank you for your contributions.
already solved?
Lots of great points above, I'd add that ideally the service discovery should be able to pick broker/topic combinations based on client requests for data - so that a consumer/producer could ask envoy for a "log publisher" (or whatever) and be directed to the closest brokers + their configured topic.
This is potentially very powerful, keen to see this!
Would it be possible to provide producer backpressure support over Kafka in the proxy?
I'm thinking of something like:
Config - per producer / topic pair - list of consumer groups for lag calculation; lag limit.
Runtime - if sum of lag in configured consumer groups exceeds lag limit, prevent producing.
Something on authorization side. We were excited to find sidecar enabling complex authorization patterns at message level, Service A can only consume this message from Service B, if an only if....
With Kafka being message moved between services, we lose the advantage of sidecar, as now kafka becomes responsible for which message can be consumed by which service. Kafka authorization is pretty coarse (topic level, feel free to correct). If kafka proxy can help with fine grained authorization, kafka can fit as messaging layer between services without losing the benefits of sidecar.
Is it too late to consider this? We are debating if we keep kafka as messaging layer (asynchronous communication) or switch to synchronous communication due to this limitation.
@mattklein123 dumping incoming traffic to a kafka stream is a killer feature, we're looking for something like this as a collector for our analytics platform.
@mattklein123 I think we could consider pure-Envoy (one proxy in front of whole cluster) or Istio-Envoy installations (or to be more generic, one proxy in front of each Kafka broker) separately.
With pure Envoy installation, we have the proxy sitting in front of the whole Kafka cluster, with client being aware only of proxy's address (makes it similar to current Redis filter).
bootstrap.servers = envoy-host:9092
In such a case it is necessary to decide how much of underlying kafka do we want to expose to end clients (and what would be the implications).
Kafka client connects to multiple brokers, but it should do that through the proxy, what means that the proxy itself would need to re-implement a lot of features that are already provided by kafka server:
- implement bootstrap phase (clients at startup want to find other brokers present in the cluster, but this time they are connecting to proxy, and we don't want to leak internals)
- proxy would need to keep partition<->broker assignment metadata - it would receive fetch requests, and then would need to forward them to the correct broker - same with produce requests (as IMHO letting clients talk to brokers directly defeats the purpose of the proxy - we want to be capable of accessing/amending the data transferred)
- metadata information: AdminClient already started to expose internals of how data is stored via
describeTopics
API, listing hosts/ports inside - I think these responses would be need rewritten
Obviously, generic inspection of kafka payloads (see below) would provide extra features.
In case of deploying Envoy as sidecar per kafka instance it gets a bit different - we do not need to amend the data that's sent by original client (as it is already reaching out to correct brokers - so it's valid from kafka's point), but we still can get a lot if we are capable of decoding / encoding payloads:
- request counting for monitoring (e.g.
FetchRequest
,ProduceRequest
,JoinGroupRequest
) - request counting per topic (by accessing the
topic_data.topic
field in e.g.ProduceRequest
) - request rewriting (namespacing) per topic (rough idea is rewriting Fetch/Produce requests accessing
prefix1.topic
to something likeprefix2.topic
, but something even more generic is possible (think sed)) - rate limiting
- authorization to resources (even on partition level)
The same also applies to the producer-side, as we would know which client instance talks to which topics etc. due to the above ^.
Obviously, if we are capable of deploying Envoy on both sides of the channel, then we can have TLS termination etc.
@adamkotwasinski agree with your write up. I loosely plan on handling both of those cases.
FWIW, I'm taking a 2 month leave this winter where I plan on doing some part time coding and I'm going to look at getting started at implementing this.
(Assuming no one else wants to start development sooner)
(continuing the minimal sidecar-per-broker idea)
Assume we have a Kafka read/write filter that can process both incoming data (requests sent to broker by clients) and outgoing data (responses).
Would it be possible then to use Envoy as egress proxy sitting as a sidecar next to client, so we could capture the client's outbound data? (it would give users visibility into client-level metrics e.g. which machines in particular are creating more load etc.).
So in the end it would look a bit like this:
[ client machine ] [ broker machine ]
[ client ] <---> [ envoy ] <-----------------> [ envoy ] <---> [ broker ]
| |
v v
metrics metrics
One thing that troubles me is that if that egress proxy would be used for all outbound comms, then we'd need to conditionally activate the filter - is it possible to state something like activate kafka filter if the outbound port == 9092
?
Is the Envoy Kafka effort discussed with the Knative community? Pivotal has interesting ideas there on Kafka client as sidecar. Knative depends on Istio.
@solsson Do you have more links about it?
Knative's abstraction on top of messaging systems is briefly mentioned in https://github.com/knative/docs/tree/master/eventing#buses, but afaik the implementation is being reworked.
Pivotal are backers of Knative, and maybe their mentions of Kafka were in https://springoneplatform.io/2018/sessions/introducing-knative or https://www.youtube.com/watch?v=_OiGt4QwdlM or https://content.pivotal.io/podcasts/serverless-knative-project-riff-with-mark-fisher-ep-112. Wish I had more info, but I was just guessing that Istio + Kafka could be a topic within the Knative eventing discussions, which is why my comment was only a question :)
@solsson I found The document how to set it, but it seems still work in progress.
Status here:
@adamkotwasinski is beginning work on a basic sniffing filter than can run in front of a broker. This will include encoder/decoder, stats, etc. I will help him review/merge, etc. that. Then this winter I plan on working on a full L7 client filter in which Envoy itself becomes the only broker the client knows about, and handles everything else behind the scenes.
@mattklein123 @adamkotwasinski
What help can you use on this project?
@nippip et al, I think we have a solid plan forward at this point which is starting in #4950. I'm not sure of the status of that work (cc @adamkotwasinski) but I think the next step is to move that plan forward and start getting stuff merged.
@mattklein123 Prior to #4950's inclusion into Envoy what is the best method for using Istio in a cluster which uses Kafka as the main message broker between the other services in the cluster?
Thanks @christian-posta, I will reach out there.
@solsson I guess the current implementation of KNative Eventing[1] is primarily about CloudEvents over HTTP (services do not use Kafka as the communication protocol). However, I'm not quite sure that they want to use non-CloudEvent based mechanism for the service-to-service eventing.
[1] https://github.com/knative/docs/tree/master/eventing#architecture
@mattklein123
As far as I understand, with the above implementation, the developer experience would be as follows
- A Service that produces events treats Envoy as the broker for publishing messages
- A Service that consumes events treats Envoy as the broker that it subscribed to.
- The existence of a (central) Kafka broker is transparent to the user. The rest of the commodity features will be transparently supported via Envoy and its ecosystem.
Is this the pattern that we currently support?
Also, the patterns that we use to integrate Kafka with Envoy must be protocol agnostic and can equally be applied for other event-driven messaging protocols such as NATS, AMQP etc. Should we generalize the possible patterns that we use in this implementation as references for other protocols?
So great to read this thread and the thoughts around this...
One thing I would like to see (and something @gwenshap hinted at in her Service Mesh Youtube video) is using the sidecar as a way to offload a lot of the responsibilities of the existing Kafka client SDK.
Kafka's broker is very simple (by good design). For example, it's the client SDK's responsibility to rebalance consumer groups and decide which client has which partitions; it's also the client SDK's responsibility to work out which broker to communicate to by requesting metadata. All this responsibility makes the client SDK large and difficult to support across different languages and runtimes. The approach so far has been to have a fully featured JVM story, and second-class non-JVM languages calling into a C SDK. The downside to this is that non-JVM clients lag functionality - they don't have Kafka Streams support, can't be used for Connectors etc...
Envoy could change that but shifting the architecture and moving some of the client SDK responsibilities into a sidecar. The communication between the Kafka client and the sidecar would be analogous to the existing Kafka REST API protocol. The sidecar would communicate to the broker on behalf of connected consumers, handle rebalances and leader changes. The simpler consumer would poll for updates from localhost sidecar and consumer and produce events in much the same way as the Kafka REST Proxy does today. Active polling indicates a healthy client. Like the existing Kafka protocol - there wouldn't be a need for unsolicited events.
I see some clear advantages with this model:
- Simplifies the client SDK by creating a middle tier. Allows for easier development of non-JVM language client SDKs.
- Functional parity across runtimes - for example (although not impossible now) it should be easier to test and develop Kafka Streams and other stateful processing
- Better integration with Kubernetes orchestration - for example: - replica scaling clients based on consumer lag would be much easier to accomplish with more control over the lifetimes of the clients.
- More efficient rebalance processing by streamlining coordination between consumers via the sidecar
- Other sidecar advantages - such as logging, metrics, health checks, service discovery etc...
- Simplified build process for Kafka consumers (avoid C/C++ library dependency)
There are definitely some gaps between the KNative Cloud Events (which are more geared towards FaaS) and the Kafka protocol (and the closely related Kafka REST Proxy protocol), but they're not insurmountable.
Subscribing..
Hello,
I think this issue is the best place to report some regression introduced by #4950. The python script kafka_generator.py
does not work with python3 (everything else within envoy default settings works for me).
return len(list(self.used_fields()))
As I found python3 does not return a list back from filter, I've made it work by adding an additional list()
conversion. I'm not fluent in python and don't know if this is a good solution or there exists better ways. If this is fine, I'm more than happy to prepare my first contribution.
@tobiaskohlbau addressed in #6998
Bumping this as well.
any status update on this?
I am interested in the status on this issue as well. Thanks!
Me too.
Me as well
Same!
Also interested in this issue.
Same! Any updates?
I am interested in the status on this issue as well. Thanks!
Folks, please stop the "me too" posts. The way to express that is to thumbs-up Matt's comment at the top.
how can i join kafka protocol filter feature develop?has any check list so i can pick some task for begin :) @mattklein123 @adamkotwasinski
I'd like to add transparent encryption and decryption to the list of features. In contrast to just doing TLS, this would allow me to have a zero knowledge broker.
I'm going to close this as the filter is implemented. Let's please open more specific feature requests for the filter so we can track things in a more granular fashion. Thank you @adamkotwasinski!!!
@mattklein123 yeah, I'm planning to revisit this in April (hopefully) when I'll start work on "fat-mesh" filter. Initially it will be very simple: custom cluster (that manages the internal Kafka-discovery (somewhat similar to redis-cluster code)) and the trivial (non-consumer-group) ProduceRequest & FetchRequest handling.
All right, I got some initial "stateful" proxy features implemented allowing Envoy to act as a facade for multiple Kafka clusters:
- producer proxy (handles the
Produce
requests) - #11936 - basically records received by Envoy are going to be resubmitted to librdkafka producers that point at the right clusters, - consumer proxy (stateful) (handles the
Fetch
requests) - #24372 - Envoy uses embedded librdkafka consumers to consume records from upstream clusters that match the requests received so far, with some caching - by definition this is stateful (we do not translate a downstream Fetch into an upstream Fetch) and distributes the records amongst all the downstream connections that express an interest in the same topic-partition.
More notes and things that might need to be improved at https://github.com/adamkotwasinski/envoy/blob/ff39845987af5cc5ff8796ad3b683f6a7e8dbe3f/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst#notes
All right, given that some code has been pushed to allow for response rewriting, we can now use Envoy without needing Kafka to change its configuration : #30669
This allows any kind of user just to set up their fleet of Envoys (actually listeners) to do their own e.g. limiting or termination without needing to bother the Kafka service owners.
Updated the protocol code to handle Kafka 3.8 : #36166
Broker filter can now filter requests by API Key - #36978