Kafka 解难:到底你们是谁在写我呢?
bingoohuang opened this issue · 1 comments
Kafka 解难:到底你们是谁在写我呢?
缘起
Kafka 里积攒了几百万的消息等待着消费,运维同学很急慌,看着上百个连接着 Kafka 的连接,想方便找出来,哪些连接是生产者(写入者),其中又有哪些是忙碌的,以及在哪些主题上忙碌。
思索
“天灵灵,地灵灵,太上老君快显灵”。发动脑筋,马上想出几种方案:
- Kafka 有没有自带方案?找了一圈,没找到。
- Kafka 源代码好不好改,要不在生产者建立的时候,记录一下。理论可行,实际上干起来好像有点费劲。
- 整个流量统计的工具,统计一下 TCP 连接,哪些是上行多那就是生产者。可是还要找出主题,好像已有工具不行。
- 建立代理拦截 Kafka 流量,使用 IP Tables 重定向,。。。技术上好像比较牛叉,但是主路,技术还不太熟悉,也不是一时半会能搞定。
- ...
当当当,几种方案,都不好使。
干活
对了,想到之前曾经做过的 HTTP 旁路流量复制,能不能在 Kafka 上整一个呢,如果能方便解析出 Kafka 的 生产者消息包格式,那就很方便干活了。这个思路是通的。马上“百度”一下,很容易,找到了一个开源的 kafka-sniffer 项目,那就顺手拈来吧,稍在改造一下吧, Golang干这活还是太方便了,一会就搞定,效果如下
https://github.com/bingoohuang/kafka-sniffer
# kafka-sniffer
2022/02/24 13:34:42 starting capture on interface "eth0"
2022/02/24 13:34:42 client 192.1.1.15:61285-192.1.1.14:9092 type: *kafka.FetchRequest topic [dev-logcenter], correlationID: 117377425, clientID: sarama
2022/02/24 13:34:42 client 192.1.1.15:37953-192.1.1.14:9092 type: *kafka.ProduceRequest topic [dev-metrics], correlationID: 6003063, clientID: sarama
2022/02/24 13:34:42 client 192.1.1.11:24717-192.1.1.14:9092 type: *kafka.FetchRequest topic [dev-metrics], correlationID: 196489671, clientID: sarama
2022/02/24 13:34:42 client 192.1.1.7:37233-192.1.1.14:9092 type: *kafka.FetchRequest topic [__consumer_offsets], correlationID: 247189, clientID: consumer-KMOffsetCache-cmak-548974c6c4-sxvgt-1723
2022/02/24 13:34:42 client 192.1.6.17:51404-192.1.1.14:9092 type: *kafka.FetchRequest topic [dev-ids], correlationID: 6716609, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.9.23:36866-192.1.1.14:9092 type: *kafka.FetchRequest topic [bq_disaster_recovery], correlationID: 623626, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.7:34038-192.1.1.14:9092 type: *kafka.FetchRequest topic [agent_transaction], correlationID: 12480162, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.4:55214-192.1.1.14:9092 type: *kafka.FetchRequest topic [dev-cloudSignLogServer], correlationID: 3341672, clientID: 2428545257036493
2022/02/24 13:34:42 client 192.1.1.12:6267-192.1.1.14:9092 type: *kafka.FetchRequest topic [judicial_disaster], correlationID: 9009620, clientID: consumer-2
2022/02/24 13:34:42 client 192.1.1.11:33378-192.1.1.14:9092 type: *kafka.ProduceRequest topic [dev-gateway], correlationID: 10948681, clientID: producer-1
2022/02/24 13:34:42 client 192.1.1.12:9195-192.1.1.14:9092 type: *kafka.FetchRequest topic [judicial-2tripartite], correlationID: 9011202, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.12:41426-192.1.1.14:9092 type: *kafka.FetchRequest topic [agent_count_transaction], correlationID: 194647, clientID: consumer-11
2022/02/24 13:34:42 client 192.1.1.11:22615-192.1.1.14:9092 type: *kafka.FetchRequest topic [ids-message-record-1], correlationID: 8999184, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.12:20394-192.1.1.14:9092 type: *kafka.FetchRequest topic [count_transaction_pro], correlationID: 3240311, clientID: consumer-11
2022/02/24 13:34:42 client 192.1.1.12:7273-192.1.1.14:9092 type: *kafka.FetchRequest topic [transaction_pro], correlationID: 3240395, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.4:6654-192.1.1.14:9092 type: *kafka.FetchRequest topic [count_transaction], correlationID: 572423, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.4:48249-192.1.1.14:9092 type: *kafka.FetchRequest topic [transaction], correlationID: 8692411, clientID: consumer-11
2022/02/24 13:34:42 client 192.1.9.23:33500-192.1.1.14:9092 type: *kafka.FetchRequest topic [verif_supplement_file_v1], correlationID: 117992, clientID: consumer-2
再顺便造一个统计 API:
# gurl :9870/client
Conn-Session: 127.0.0.1:44828->127.0.0.1:9870 (reused: false, wasIdle: false, idle: 0s)
GET /client? HTTP/1.1
Host: 127.0.0.1:9870
Accept: application/json
Accept-Encoding: gzip, deflate
Content-Type: application/json
Gurl-Date: Thu, 24 Feb 2022 06:30:03 GMT
User-Agent: gurl/1.0.0
HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Thu, 24 Feb 2022 06:30:03 GMT
[
{
"Start": "2022-02-24T14:29:19.049701376+08:00",
"Client": "192.1.6.17:51404",
"ReqType": "*kafka.FetchRequest",
"ClientID": "consumer-1",
"Requests": 89,
"BytesRead": 7387,
"Topics": [
"dev-ids"
]
},
{
"Start": "2022-02-24T14:29:19.025437041+08:00",
"Client": "192.1.8.12:6267",
"ReqType": "*kafka.FetchRequest",
"ClientID": "consumer-2",
"Requests": 89,
"BytesRead": 7031,
"Topics": [
"judicial_disaster"
]
},
{
"Start": "2022-02-24T14:29:20.301435997+08:00",
"Client": "192.1.6.15:56324",
"ReqType": "*kafka.ProduceRequest",
"ClientID": "sarama",
"Requests": 309,
"BytesRead": 123625,
"Topics": [
"dev-metrics"
]
},
{
"Start": "2022-02-24T14:29:20.84427283+08:00",
"Client": "192.1.6.4:54598",
"ReqType": "*kafka.ProduceRequest",
"ClientID": "sarama",
"Requests": 283,
"BytesRead": 113472,
"Topics": [
"dev-metrics"
]
}
]
齐活
好了,可以比较方便地找出谁在写我,用什么主题写我,累计写了多少次,写了多少量了。
看帖的同学,你们还有没有更好的方案呢,敬请放马过来,比这个方案好的,我要请客吃饭咯。
补充:
上面提到的第4种使用 IPTables 方案,https://github.com/JackOfMostTrades/tls-tproxy 使用此技术的示意图如下:
netstat 怎么查不到对应的连接了呢
从 192.1.8.14 上看
有很多来自于 192.1.8.11 的连接连到本机的 9092 Kafka 端口上
[beta19 ~]# hostname -I
192.1.8.14 172.17.0.1 10.42.0.0
[beta19 ~]# netstat -atnp | awk 'NR <= 2 || /9092/ && /108.11/' | awk '{if (NR <= 2) { print $0 } else {print NR-2,$0}}'
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
1 tcp 0 0 192.1.8.14:9092 192.1.8.11:37594 ESTABLISHED 21239/java
2 tcp 0 0 192.1.8.14:9092 192.1.8.11:60318 ESTABLISHED 21239/java
3 tcp 0 0 192.1.8.14:9092 192.1.8.11:47051 ESTABLISHED 21239/java
4 tcp 0 0 192.1.8.14:9092 192.1.8.11:23547 ESTABLISHED 21239/java
5 tcp 0 0 192.1.8.14:9092 192.1.8.11:41188 ESTABLISHED 21239/java
6 tcp 0 0 192.1.8.14:9092 192.1.8.11:22280 ESTABLISHED 21239/java
7 tcp 0 0 192.1.8.14:9092 192.1.8.11:38398 ESTABLISHED 21239/java
8 tcp 0 0 192.1.8.14:9092 192.1.8.11:23467 ESTABLISHED 21239/java
9 tcp 0 0 192.1.8.14:9092 192.1.8.11:47125 ESTABLISHED 21239/java
10 tcp 0 0 192.1.8.14:9092 192.1.8.11:41180 ESTABLISHED 21239/java
11 tcp 0 0 192.1.8.14:9092 192.1.8.11:41292 ESTABLISHED 21239/java
12 tcp 0 0 192.1.8.14:9092 192.1.8.11:5478 ESTABLISHED 21239/java
13 tcp 0 0 192.1.8.14:9092 192.1.8.11:54091 ESTABLISHED 21239/java
从 192.1.8.11 上看
于是登录到 192.1.8.11 上使用同样的 netstat 命令,却找不到对应的连接,使用 /proc/net/nf_conntrack
找到了,来源是 10.42.1.211
,原来是 k8s 容器
[beta11 ~]# hostname -I
192.1.8.11 172.17.0.1 10.42.1.0
[beta11 ~]# netstat -atnp | grep 192.1.8.11:22280
[beta11 ~]# cat /proc/net/nf_conntrack | grep 22280
ipv4 2 tcp 6 86365 ESTABLISHED src=10.42.1.211 dst=192.1.8.14 sport=39856 dport=9092 src=192.1.8.14 dst=192.1.8.11 sport=9092 dport=22280 [ASSURED] mark=0 zone=0 use=2
[beta17 ~]# kubectl get pod -A -o wide | awk 'NR <=1 || /10.42.1.211/'
NAMESPACE NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
cloudsign timetaskcloudsign-74c59b777-xhkgs 1/1 Running 0 71d 10.42.1.211 192.1.8.11<none> <none>
文章 连接跟踪(conntrack):原理、应用及 Linux 内核实现 提到
> 由于这套连接跟踪机制是独立于 Netfilter 的,因此它的 conntrack 和 NAT 信息也没有 存储在内核的(也就是 Netfilter 的)conntrack table 和 NAT table。所以常规的 conntrack/netstats/ss/lsof 等工具是看不到的
看来 k8s 的这个网络方案就实现了这样一套独立的连接跟踪和 NAT 机制。所以 netstat 命令是看不到的。