Kafka专贴
bingoohuang opened this issue · 5 comments
Kafka版本选型分析
原则:
- 成熟度:发布至少已1年以上(周边已经差不多跟进、有人先行蹚坑)
- 采用度:公有云支持得比较好(潮流跟随)
- 实用度:已有功能没有明显缺陷,不追求最新功能
当前(2021年03月07日)最新版本 2020年12月21日发布的2.7.0。
维基百科 2014年11月,几个曾在领英为Kafka工作的工程师,创建了名为Confluent的新公司,并着眼于Kafka。
Confluent Platform | Apache Kafka® | Release Date | Standard End of Support | Platinum End of Support | ZooKeeper | Java Version |
---|---|---|---|---|---|---|
6.1.x | 2.7.x | February 9, 2021 | February 9, 2023 | February 9, 2024 | 3.4.10 through 3.5.8 | 1.8.0_202, 11.0_4 |
6.0.x | 2.6.x | September 24,2020 | September 24, 2022 | September 24, 2023 | 3.4.10 through 3.5.8 | 1.8.0_202, 11.0_4 |
5.5.x | 2.5.x | April 24, 2020 | April 24, 2022 | April 24, 2023 | 3.4.10 through 3.5.8 | 1.8.0_202, 11.0_4 |
5.4.x | 2.4.x | January 10, 2020 | January 10, 2022 | January 10, 2023 | 3.4.10 through 3.5.8 | 1.8.0_202, 11.0_4 |
5.3.x | 2.3.x | July 19, 2019 | July 19, 2021 | July 19, 2022 | 3.4.10 through 3.4.14 | 1.8.0_60, 11.0_2 |
5.2.x | 2.2.x | March 28, 2019 | March 28, 2021 | March 28, 2022 | 3.4.10 through 3.4.13 | 1.8.0_60, 11.0_2 |
5.1.x | 2.1.x | December 14, 2018 | December 14, 2020 | December 14, 2021 | 3.4.10 through 3.4.13 | 1.8.0_60 |
5.0.x | 2.0.x | July 31, 2018 | July 31, 2020 | July 31, 2021 | 3.4.10 through 3.4.13 | 1.8.0_60 |
4.1.x | 1.1.x | April 16, 2018 | April 16, 2020 | April 16, 2021 | 3.4.10 | 1.7.0_60, 1.8.0_60 |
4.0.x | 1.0.x | November 28, 2017 | November 28, 2019 | November 28, 2020 | 3.4.10 | 1.7.0_60, 1.8.0_60 |
3.3.x | 0.11.0.x | August 1, 2017 | August 1, 2019 | August 1, 2020 | 3.4.10 | 1.7.0_60, 1.8.0_60 |
3.2.x | 0.10.2.x | March 2, 2017 | March 2, 2019 | March 2, 2020 | 3.4.6 through 3.4.9 | 1.7.0_60, 1.8.0_60 |
3.1.x | 0.10.1.x | November 15, 2016 | November 15, 2018 | November 15, 2019 | 3.4.6 through 3.4.8 | 1.7.0_60, 1.8.0_60 |
3.0.x | 0.10.0.x | May 24, 2016 | May 24, 2018 | May 24, 2019 | 3.4.6 | 1.7.0_60, 1.8.0_60 |
2.0.x | 0.9.0.x | December 7, 2015 | December 7, 2017 | December 7, 2018 | - | 1.7.0_60, 1.8.0_60 |
1.0.0 | – | February 25, 2015 | February 25, 2017 | February 25, 2018 | - | 1.7.0_60, 1.8.0_60 |
公有云支持
阿里云 消息队列 Kafka 版
产品介绍中提到
开箱即用 100% 兼容开源社区 Apache Kafka(0.10.0.0 及以上版本),全面兼容 Apache Kafka 开源客户端
升级实例帮助文档中提到
消息队列Kafka版实例默认部署的大版本为0.10.x。其中,新实例默认部署的版本为0.10.2,旧实例默认部署的版本为0.10。0.10有一定概率触发死锁、频繁Rebalance等Bug,建议您将0.10升级至0.10.2。如何升级,请参见升级小版本。
消息队列Kafka版实例支持0.10.x和2.x大版本。其中,0.10.x大版本提供0.10和0.10.2,2.x大版本只提供2.2.0。
腾讯云
消息队列 CKafka 分布式、高吞吐量、高可扩展性的消息服务,100%兼容开源 Apache Kafka 0.9 0.10
标准版:兼容开源0.9、0.10、1.1版本,默认安装1.1版本,不支持定制版本。
AWS Amazon Managed Streaming for Apache Kafka (Amazon MSK)完全托管、高度可用且安全的 Apache Kafka 服务 支持的Kafka Versions
- Apache Kafka version 2.7.0
- Apache Kafka version 2.6.1
- Apache Kafka version 2.6.0
- Apache Kafka version 2.5.1
- Amazon MSK bug-fix version 2.4.1.1
- Apache Kafka version 2.4.1 (use 2.4.1.1 instead)
- Apache Kafka version 2.3.1
- Apache Kafka version 2.2.1
- Apache Kafka version 1.1.1 (for existing clusters only)
备注
Kafka消息格式的变更
从0.8.x版本开始到现在的2.x版本,Kafka的消息格式也经历了 3 个版本: v0 版本、v1 版本和 v2 版本 。
- 0.8.x 版本开始到 0.10.x 版本之前的消息格式通常称为 v0 版本。
- 0.10.0 开始到 0.11.0 所使用的消息格式版本为 v1,比 v0 就多了一个 timestamp 字段,表示消息的时间戳。
- 0.11.0 开始所使用的消息格式版本为 v2,这个版本的消息相比 v0 和 v1 的版本而言改动很大,同时还参考了 Protocol Buffer而引入了变长整型(Varints)和ZigZag编码。
Kafka对于ZK的使用
参见首页 > 消息队列Kafka版 > 产品简介 > 使用限制
在使用设计层面,Apache Kafka自0.9.0之后已经屏蔽掉ZK,即客户端使用无需访问ZooKeeper。
https://www.infoq.com/articles/apache-kafka-best-practices-to-optimize-your-deployment/
Kafka docker支持
lensesio/box
Lenses Box is a docker image that provides a full installation of Apache Kafka with all relevant components.
Software | Version |
---|---|
Apache Kafka | 2.5.1 |
Kafka Connect | 2.5.1 |
Elasticsearch | 6.8.7 |
Lenses.io | 4.1.0 |
Kafka Topic 命名规范
要创建topic了,找个规范参考一下吧
<data-center>.<domain>.<classification>.<description>.<version>
examples:
- aws.analytics.fct.pageviews.0
- azure.comms.fct.gps.0
- dc1.identity.cdc.users.1
- gcp.notifications.cmd.emails.3
- gcp.notifications.sys.email-cache.0
Data Center
The data center which the data resides in. This is not required, but is helpful when an organization reaches the size where they would like to do an active/active setup or replicate data between data centers. For example, if you have one cluster in AWS and one in Azure, your topics may be prefixed with aws and azure.
Domain
A domain for the data is a well understood, permanent name for the area of the system the data relates to. These should not include any product names, team names, or service names.
Examples of this vary wildly between industries. For example, in a transportation organization, some domains might be:
- comms: all events relating to device communications
- fleet: all events relating to trucking fleet management
- identity: all events relating to identity and auth services
Classification
The classification of data within a Kafka topic tells an end-user how it should be interpreted or used. This should not tell us about data format or contents. I typically use the following classifications:
- fct: Fact data is information about a thing that has happened. It is an immutable event at a specific point in time. Examples of this include data from sensors, user activity events, or notifications.
cdc: Change data capture (CDC) indicates this topic contains all instances of a specific thing and receives all changes to those things. These topics do not capture deltas and can be used to repopulate data stores or caches. These are commonly found as compacted topics within Kafka. - cmd: Command topics represent operations that occur against the system. This is typically found as the request-response pattern, where you have a verb and a statement. Examples might include UpdateUser and UserUpdated.
- sys: System topics are used for internal topics within a single service. They are operational topics that do not contain any relevant information outside of the owning system. These topics are not meant for public consumption.
Description
The description is arguably the most important part of the name and is the event name that describes the type of data the topic holds. This is the subject of the data, such as customers, invoices, users, payments, etc.
Version
The version of a topic is often the most forgotten section of a proper topic name. As data evolves within a topic, there may be breaking schema changes or a complete change in the format of the data. By versioning topics, you can allow a transitionary period where consumers can switch to the new data without impacting any old consumers.
By convention, it is preferred to version all topics and to start them at 0.
kafka消息体压缩
其它一些图:
## Polling strategy
Polling strategy is the default strategy of Kafka Java client producer
The load balancing performance of the polling strategy is very good. It can always ensure that messages are evenly distributed to all partitions as much as possible. By default, it is the most reasonable partition strategy. The message distribution of the polling strategy is shown in the following figure:
Random strategy
The random strategy defaults fromPartitionChoose one randomly from the list, and the message distribution of the random strategy is roughly as shown in the figure below:
## Order by message key strategy
- Kafka allows to define for each messageMessage key, Referred to as Key
- Key can be a string with clear business meaning: customer code, department number, business ID, metadata used to characterize messages, etc.
- Once the message is key defined, it can be guaranteedAll messages of the same key enter the same partition, Because the message processing under each partition isorder, So this strategy is calledPress the message key to preserve the order strategy.
Custom strategy
- Customized partitioning strategy requires explicit configuration of producer-side parameterspartitioner.class
- Implementation interface:org.apache.kafka.clients.producer.Partitioner
Message consumption
Range strategy
This article assumes that we have a topic named T1, which contains 5 partitions, and then we have two consumers (C1, C2) to consume the data in these 5 partitions, and the num.streams of C1 = 2, C2's num.streams = 1 (where num.streams refers to the number of consumer threads). The Range strategy is for each topic. First, the partitions in the same topic are sorted by serial number, and consumers are sorted in order. In our example, the sorted partitions will be 0, 1, 2, 3, 4, 5, 6, 7, 8, 9; the consumer threads sorted will be C1-0, C1-1 , C2-0. Then divide the number of partitions by the total number of consumer threads to determine how many partitions each consumer thread consumes. If the division is not exhausted, the first few consumer threads will consume one more partition. In our example, we have 10 partitions, 3 consumer threads, 10/3 = 3, and inexhaustible, then consumer thread C1-0 will consume one more partition, so the final partition allocation results look It looks like this: C1-0 will consume 0, 3 partitions, C1-1 will consume 1, 4 partitions, and C2-0 will consume 2 partitions. The specific consumption diagram is as follows:
If we add Partition, from the previous 5 partitions to 6 partitions, then the result of the final partition allocation looks like this: C1-0 will consume 0,3 partitions, and C1-1 will consume 1,4 Partition, C2-0 will consume 2,5 partitions, and the final consumption is as follows:
RoundRobin strategy
To use the RoundRobin strategy, there are two prerequisites that must be met:
1.The num.streams of all consumers in the same Consumer Group must be equal;
- Each consumer must subscribe to the same topic.
So here assume that num.streams = 2 of the two consumers mentioned earlier. The working principle of the RoundRobin strategy: compose all topic partitions into a TopicAndPartition list, and then sort the TopicAndPartition list according to hashCode. In our example, if the topic-partitions groups sorted by hashCode are T1-5, T1-3, T1-0, T1-2, T1-1, T1-4, and our consumer threads are sorted as C1-0, C1-1, C2-0, C2-1, the result of the final partition allocation is: C1-0 will consume T1-5, T1-1; partition C1-1 will consume T1-3, T1-4, Partition; C2-0 will consume T1-0, partition; C2-1 will consume T1-2, partition; the consumption diagram is as follows:
Kafka message reliability
The guarantee of Kafka's high reliability comes from its robust replication strategy, that is, the replication mechanism of the partition. Kafka will provide multiple replications for each partition and distribute the replications to other Brokers in the entire cluster. The specific replication The number can be set by parameters. The replication here will elect a leader node, and the other nodes are follower nodes. All messages are sent to the leader and then synchronized to the follower node through the synchronization algorithm. When the replication fails to work, the election will be re-elected, even Part of the Broker downtime can still ensure that the entire cluster is highly available and messages are not lost.
升级
Client and broker compatibility across Kafka versions
An overview on client and broker version compatibility.
Maintaining compatibility across different Kafka clients and brokers is a common issue. Mismatches among client and broker versions can occur as part of any of the following scenarios:
- Upgrading your Kafka cluster without upgrading your Kafka clients.
- Using a third-party application that produces to or consumes from your Kafka cluster.
- Having a client program communicating with two or more Kafka clusters (such as consuming from one cluster and producing to a different cluster).
- Using Flume or Spark as a Kafka consumer.
In these cases, it is important to understand client/broker compatibility across Kafka versions. Here are general rules that apply:
- Newer Kafka brokers can talk to older Kafka clients. The reverse is not true: older Kafka brokers cannot talk to newer Kafka clients.
- Changes in either the major part or the minor part of the upstream version major.minor determines whether the client and broker are compatible. Differences among the maintenance versions are not considered when determining compatibility.
As a result, the general pattern for upgrading Kafka from version A to version B is:
- Change Kafka
server.properties
to refer to version A. - Upgrade the brokers to version B and restart the cluster.
- Upgrade the clients to version B.
- After all the clients are upgraded, change the properties from the first step to version B and restart the cluster.
Determine the Kafka-Client compatibility with kafka-broker
There's a link to the confluent matrix on the Spring for Apache Kafka project page (along with spring-kafka/kafka-clients compatibility).
0.9 is very, very old.
Typically, clients/brokers newer than 0.10.2.0 can talk to each other, but if records have headers, you will need a client >= 0.11.0.0.
Bidirectional Client Compatibility is now supported, you don't need to worry about the compatibility matrix anymore, for KIP-35 enabled clients, any version are good, KIP-35 is released from Broker protocol - 0.10.0, Java clients - 0.10.2
refer:
Kafka运维命令大全
- 前台启动broker
bin/kafka-server-start.sh <path>/server.properties
- 后台启动broker
bin/kafka-server-start.sh -daemon <path>/server.properties
- 关闭broker
bin/kafka-server-stop.sh
- 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --replication-factor 3 --topic topicname
- 删除topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topicname
- 查询topic列表
bin/kafka-topics.sh --zookeeper localhost:2181 --list
- 查询topic详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topicname
- 修改topic
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 6 --topic topicname
- 查询消费者组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
- 查询消费者组详情
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupname
- 重设消费者组位移 最早处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-earliest --execute
- 最新处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-latest --execute
- 某个位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-offset 2000 --execute
- 调整到某个时间之后得最早位移
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-datetime 2019-09-15T00:00:00.000
- 删除消费者组
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --delete --group groupname
- producer脚本
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicname
参数含义:
--compression-codec lz4 压缩类型
--request-required-acks all acks的值
--timeout 3000 linger.ms的值
--message-send-max-retries 10 retries的值
--max-partition-memory-bytes batch.size值复制代码 - consumer脚本
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
- 指定groupid
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning --consumer-property group.id=old-consumer-group
- 指定分区
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning --partition 0
- kafka-run-class脚本
kafka-run-class.sh kafka.tools.ConsoleConsumer
就是kafka-console-consumer.sh
kafka-run-class.sh kafka.tools.ConsoleProducer
就是 kafka-console-producer.sh复制代码- 获取topic当前消息数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topicname --time -1
--time -1表示最大位移 --time -2表示最早位移 - 查询consumeroffsets
bin/kafka-simple-consumer-shell.sh --topic _consumer_offsets --partition 12 --broker-list localhost:9092 --formatter "kafka.coorfinator.GroupMetadataManager\$OffsetsMessageFormatter"
- MirrorMaker 跨机房灾备工具
bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist topicA|topicB
$ kafka-consumer-groups.sh --bootstrap-server broker01.example.com:9092 --describe --group flume
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
flume t1 0 1 3 2 test-consumer-group_postamac.local-1456198719410-29ccd54f-0
Why is Kafka fast?
Kafka achieves low latency message delivery through Sequential I/O and Zero Copy Principle. The same techniques are commonly used in many other messaging/streaming platforms.
The diagram below illustrates how the data is transmitted between producer and consumer, and what zero-copy means.
🔹 Step 1.1 - 1.3: Producer writes data to the disk
🔹 Step 2: Consumer reads data without zero-copy
2.1: The data is loaded from disk to OS cache
2.2 The data is copied from OS cache to Kafka application
2.3 Kafka application copies the data into the socket buffer
2.4 The data is copied from socket buffer to network card
2.5 The network card sends data out to the consumer
🔹 Step 3: Consumer reads data with zero-copy
3.1: The data is loaded from disk to OS cache
3.2 OS cache directly copies the data to the network card via sendfile() command
3.3 The network card sends data out to the consumer