bingoohuang/blog

Kafka 解难:到底你们是谁在写我呢?

bingoohuang opened this issue · 1 comments

Kafka 解难:到底你们是谁在写我呢?

image

缘起

Kafka 里积攒了几百万的消息等待着消费,运维同学很急慌,看着上百个连接着 Kafka 的连接,想方便找出来,哪些连接是生产者(写入者),其中又有哪些是忙碌的,以及在哪些主题上忙碌。

image

思索

“天灵灵,地灵灵,太上老君快显灵”。发动脑筋,马上想出几种方案:

  1. Kafka 有没有自带方案?找了一圈,没找到。
  2. Kafka 源代码好不好改,要不在生产者建立的时候,记录一下。理论可行,实际上干起来好像有点费劲。
  3. 整个流量统计的工具,统计一下 TCP 连接,哪些是上行多那就是生产者。可是还要找出主题,好像已有工具不行。
  4. 建立代理拦截 Kafka 流量,使用 IP Tables 重定向,。。。技术上好像比较牛叉,但是主路,技术还不太熟悉,也不是一时半会能搞定。
  5. ...

当当当,几种方案,都不好使。

干活

对了,想到之前曾经做过的 HTTP 旁路流量复制,能不能在 Kafka 上整一个呢,如果能方便解析出 Kafka 的 生产者消息包格式,那就很方便干活了。这个思路是通的。马上“百度”一下,很容易,找到了一个开源的 kafka-sniffer 项目,那就顺手拈来吧,稍在改造一下吧, Golang干这活还是太方便了,一会就搞定,效果如下

https://github.com/bingoohuang/kafka-sniffer

# kafka-sniffer
2022/02/24 13:34:42 starting capture on interface "eth0"
2022/02/24 13:34:42 client 192.1.1.15:61285-192.1.1.14:9092 type: *kafka.FetchRequest topic [dev-logcenter], correlationID: 117377425, clientID: sarama
2022/02/24 13:34:42 client 192.1.1.15:37953-192.1.1.14:9092 type: *kafka.ProduceRequest topic [dev-metrics], correlationID: 6003063, clientID: sarama
2022/02/24 13:34:42 client 192.1.1.11:24717-192.1.1.14:9092 type: *kafka.FetchRequest topic [dev-metrics], correlationID: 196489671, clientID: sarama
2022/02/24 13:34:42 client 192.1.1.7:37233-192.1.1.14:9092 type: *kafka.FetchRequest topic [__consumer_offsets], correlationID: 247189, clientID: consumer-KMOffsetCache-cmak-548974c6c4-sxvgt-1723
2022/02/24 13:34:42 client 192.1.6.17:51404-192.1.1.14:9092 type: *kafka.FetchRequest topic [dev-ids], correlationID: 6716609, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.9.23:36866-192.1.1.14:9092 type: *kafka.FetchRequest topic [bq_disaster_recovery], correlationID: 623626, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.7:34038-192.1.1.14:9092 type: *kafka.FetchRequest topic [agent_transaction], correlationID: 12480162, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.4:55214-192.1.1.14:9092 type: *kafka.FetchRequest topic [dev-cloudSignLogServer], correlationID: 3341672, clientID: 2428545257036493
2022/02/24 13:34:42 client 192.1.1.12:6267-192.1.1.14:9092 type: *kafka.FetchRequest topic [judicial_disaster], correlationID: 9009620, clientID: consumer-2
2022/02/24 13:34:42 client 192.1.1.11:33378-192.1.1.14:9092 type: *kafka.ProduceRequest topic [dev-gateway], correlationID: 10948681, clientID: producer-1
2022/02/24 13:34:42 client 192.1.1.12:9195-192.1.1.14:9092 type: *kafka.FetchRequest topic [judicial-2tripartite], correlationID: 9011202, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.12:41426-192.1.1.14:9092 type: *kafka.FetchRequest topic [agent_count_transaction], correlationID: 194647, clientID: consumer-11
2022/02/24 13:34:42 client 192.1.1.11:22615-192.1.1.14:9092 type: *kafka.FetchRequest topic [ids-message-record-1], correlationID: 8999184, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.12:20394-192.1.1.14:9092 type: *kafka.FetchRequest topic [count_transaction_pro], correlationID: 3240311, clientID: consumer-11
2022/02/24 13:34:42 client 192.1.1.12:7273-192.1.1.14:9092 type: *kafka.FetchRequest topic [transaction_pro], correlationID: 3240395, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.4:6654-192.1.1.14:9092 type: *kafka.FetchRequest topic [count_transaction], correlationID: 572423, clientID: consumer-1
2022/02/24 13:34:42 client 192.1.1.4:48249-192.1.1.14:9092 type: *kafka.FetchRequest topic [transaction], correlationID: 8692411, clientID: consumer-11
2022/02/24 13:34:42 client 192.1.9.23:33500-192.1.1.14:9092 type: *kafka.FetchRequest topic [verif_supplement_file_v1], correlationID: 117992, clientID: consumer-2

再顺便造一个统计 API:

# gurl :9870/client
Conn-Session: 127.0.0.1:44828->127.0.0.1:9870 (reused: false, wasIdle: false, idle: 0s)
GET /client? HTTP/1.1
Host: 127.0.0.1:9870
Accept: application/json
Accept-Encoding: gzip, deflate
Content-Type: application/json
Gurl-Date: Thu, 24 Feb 2022 06:30:03 GMT
User-Agent: gurl/1.0.0


HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Thu, 24 Feb 2022 06:30:03 GMT

[
  {
    "Start": "2022-02-24T14:29:19.049701376+08:00",
    "Client": "192.1.6.17:51404",
    "ReqType": "*kafka.FetchRequest",
    "ClientID": "consumer-1",
    "Requests": 89,
    "BytesRead": 7387,
    "Topics": [
      "dev-ids"
    ]
  },
  {
    "Start": "2022-02-24T14:29:19.025437041+08:00",
    "Client": "192.1.8.12:6267",
    "ReqType": "*kafka.FetchRequest",
    "ClientID": "consumer-2",
    "Requests": 89,
    "BytesRead": 7031,
    "Topics": [
      "judicial_disaster"
    ]
  },
  {
    "Start": "2022-02-24T14:29:20.301435997+08:00",
    "Client": "192.1.6.15:56324",
    "ReqType": "*kafka.ProduceRequest",
    "ClientID": "sarama",
    "Requests": 309,
    "BytesRead": 123625,
    "Topics": [
      "dev-metrics"
    ]
  },
  {
    "Start": "2022-02-24T14:29:20.84427283+08:00",
    "Client": "192.1.6.4:54598",
    "ReqType": "*kafka.ProduceRequest",
    "ClientID": "sarama",
    "Requests": 283,
    "BytesRead": 113472,
    "Topics": [
      "dev-metrics"
    ]
  }
]

齐活

好了,可以比较方便地找出谁在写我,用什么主题写我,累计写了多少次,写了多少量了。

看帖的同学,你们还有没有更好的方案呢,敬请放马过来,比这个方案好的,我要请客吃饭咯。

补充:

上面提到的第4种使用 IPTables 方案,https://github.com/JackOfMostTrades/tls-tproxy 使用此技术的示意图如下:

image

netstat 怎么查不到对应的连接了呢

从 192.1.8.14 上看

有很多来自于 192.1.8.11 的连接连到本机的 9092 Kafka 端口上

[beta19 ~]# hostname -I
192.1.8.14 172.17.0.1 10.42.0.0
[beta19 ~]# netstat -atnp | awk 'NR <= 2 || /9092/ && /108.11/' | awk '{if (NR <= 2) { print $0 } else  {print NR-2,$0}}'
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
1 tcp        0      0 192.1.8.14:9092     192.1.8.11:37594    ESTABLISHED 21239/java
2 tcp        0      0 192.1.8.14:9092     192.1.8.11:60318    ESTABLISHED 21239/java
3 tcp        0      0 192.1.8.14:9092     192.1.8.11:47051    ESTABLISHED 21239/java
4 tcp        0      0 192.1.8.14:9092     192.1.8.11:23547    ESTABLISHED 21239/java
5 tcp        0      0 192.1.8.14:9092     192.1.8.11:41188    ESTABLISHED 21239/java
6 tcp        0      0 192.1.8.14:9092     192.1.8.11:22280    ESTABLISHED 21239/java
7 tcp        0      0 192.1.8.14:9092     192.1.8.11:38398    ESTABLISHED 21239/java
8 tcp        0      0 192.1.8.14:9092     192.1.8.11:23467    ESTABLISHED 21239/java
9 tcp        0      0 192.1.8.14:9092     192.1.8.11:47125    ESTABLISHED 21239/java
10 tcp        0      0 192.1.8.14:9092     192.1.8.11:41180    ESTABLISHED 21239/java
11 tcp        0      0 192.1.8.14:9092     192.1.8.11:41292    ESTABLISHED 21239/java
12 tcp        0      0 192.1.8.14:9092     192.1.8.11:5478     ESTABLISHED 21239/java
13 tcp        0      0 192.1.8.14:9092     192.1.8.11:54091    ESTABLISHED 21239/java

从 192.1.8.11 上看

于是登录到 192.1.8.11 上使用同样的 netstat 命令,却找不到对应的连接,使用 /proc/net/nf_conntrack 找到了,来源是 10.42.1.211,原来是 k8s 容器

[beta11 ~]# hostname -I
192.1.8.11 172.17.0.1 10.42.1.0
[beta11 ~]# netstat -atnp | grep 192.1.8.11:22280
[beta11 ~]# cat /proc/net/nf_conntrack | grep 22280
ipv4     2 tcp      6 86365 ESTABLISHED src=10.42.1.211 dst=192.1.8.14 sport=39856 dport=9092 src=192.1.8.14 dst=192.1.8.11 sport=9092 dport=22280 [ASSURED] mark=0 zone=0 use=2
[beta17 ~]# kubectl get pod -A -o wide | awk 'NR <=1 || /10.42.1.211/'
NAMESPACE NAME                              READY STATUS  RESTARTS AGE IP          NODE      NOMINATED NODE READINESS GATES
cloudsign timetaskcloudsign-74c59b777-xhkgs 1/1   Running 0        71d 10.42.1.211 192.1.8.11<none>         <none>

文章 连接跟踪(conntrack):原理、应用及 Linux 内核实现 提到

> 由于这套连接跟踪机制是独立于 Netfilter 的,因此它的 conntrack 和 NAT 信息也没有 存储在内核的(也就是 Netfilter 的)conntrack table 和 NAT table。所以常规的 conntrack/netstats/ss/lsof 等工具是看不到的

image

看来 k8s 的这个网络方案就实现了这样一套独立的连接跟踪和 NAT 机制。所以 netstat 命令是看不到的。