
Kafka 采用的是发布 - 订阅模型
Kafka的作用
1.解耦合
2.数据持久化
3.扩展与容灾
4.顺序保证
5.缓冲峰值处理能力
6.异步通信
Kafka 多副本机制
Kafka 为分区(Partition)引入了多副本(Replica)机制。分区 (Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的 消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。
生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷⻉,它们的 存在只是为了保证消息存储的安全性。当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选
Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比好的并发能力(负载均衡)。
Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能 力,不过也相应的增加了所需要的存储空间。
Kafka中的ZooKeeper
https://www.cnblogs.com/zhy-heaven/p/10994022.html
GroupCoordinator机制
Kafka 的 Server 端主要有三块内容:GroupCoordinator、Controller 和 ReplicaManager,其中,GroupCoordinator 的内容是与 Consumer 端紧密结合在一起的,简单来说就是,GroupCoordinator 是负责进行 consumer 的 group 成员的rebalance与 offset 管理
我们知道kafka保证同一消费组中的每个consumer能够消费一个或者多个特定的partition数据,一个partition的数据只能被一个consumer消费;因为每个partition里的消息是有序的,这样可以保证partition中的数据被同一个消费者有序消费;同时consumer只需要和自己消费的partition的broker通信就可以,减少开销。
Rebalance
在如下条件下,partition要在consumer中重新分配:
- l 条件1:有新的consumer加入
- l 条件2:旧的consumer挂了
- l 条件3:coordinator挂了,集群选举出新的coordinator
- l 条件4:topic的partition新加
- l 条件5:consumer调用unsubscrible(),取消topic的订阅
Coordinator
在kafka0.9版本之前,consumer的rebalance是通过在zookeeper上注册watch完成的。每个consumer创建的时候,会在在Zookeeper上的路径为/consumers/[consumer group]/ids/[consumer id]下将自己的id注册到消费组下;然后在/consumers/[consumer group]/ids 和/brokers/ids下注册watch;最后强制自己在消费组启动rebalance。
这种做法很容易带来zk的羊群效应,任何Broker或者Consumer的增减都会触发所有的Consumer的Rebalance,造成集群内大量的调整;同时由于每个consumer单独通过zookeeper判断Broker和consumer宕机,由于zk的脑裂特性,同一时刻不同consumer通过zk看到的表现可能是不一样,这就可能会造成很多不正确的rebalance尝试;除此之外,由于consumer彼此独立,每个consumer都不知道其他consumer是否rebalance成功,可能会导致consumer group消费不正确。
基于zk的rebalance存在不可避免的羊群效应()和脑裂问题(两个 节点之间失去通信,都认为自己需要成为master,),如何不用zk来协调,而是将失败探测和Rebalance的逻辑放到一个高可用的中心,那么上述问题就能得以解决;因此kafka0.9.*的版本重新设计了consumer端,诞生了这样一个高可用中心Coordinator,大大减少了zookeeper负载。
对于每一个Consumer Group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。coordinator主要做两件事:
- 维持group的成员组成。这包括加入新的成员,检测成员的存活性,清除不再存活的成员。
- 协调group成员的行为。
Coordinator有如下几种类型:
- GroupCoordinator:broker端的,每个kafka server都有一个实例,管理部分的consumer group和它们的offset
- WorkerCoordinator:broker端的,管理GroupCoordinator程序,主要管理workers的分配。
- ConsumerCoordinator:consumer端的,和GroupCoordinator通信的媒介。
ConsumerCoordinator是KafkaConsumer的一个成员,只负责与GroupCoordinator通信,所以真正的协调者还是GroupCoordinator。

步骤1:对于每1个consumer group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。因此,第1步就是找到这个coordinator

步骤2:找到coordinator之后,发送JoinGroup请求

步骤3:JoinGroup返回之后,发送SyncGroup,得到自己所分配到的partition
partition的分配策略和分配结果其实是由client决定的,而不是由coordinator决定的。在第2步,所有consumer都往coordinator发送JoinGroup消息之后,coordinator会指定其中一个consumer作为leader,其他consumer作为follower。
然后由这个leader进行partition分配。然后在第3步,leader通过SyncGroup消息,把分配结果发给coordinator,其他consumer也发送SyncGroup消息,获得这个分配结果。
为什么要在consumer中选一个leader出来,进行分配,而不是由coordinator直接分配呢?关于这个, Kafka的官方文档有详细的分析。其中一个重要原因是为了灵活性:如果让server分配,一旦需要新的分配策略,server集群要重新部署,这对于已经在线上运行的集群来说,代价是很大的;而让client分配,server集群就不需要重新部署了。
Kafka 如何保证消息的消费顺序
每次添加消息到 Partition(分区) 的时候都会采用尾加法,如上图所示。Kafka 只能为我们保证 Partition(分区) 中的消息有序,而不能保证 Topic(主题) 中的 Partition(分区) 的有序。
消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移 量(offset)来保证消息在分区内的顺序性
Kafka 如何保证消息不丢失
生产者丢失数据的情况
生产者(Producer) 调用 send 方法发送消息之后,消息可能因为网络问题并没有发送过去。
所以,我们不能默认在调用 send 方法发送消息之后消息消息发送成功了。为了确定消息是发送成功, 我们要判断消息发送的结果。但是要注意的是 Kafka 生产者(Producer) 使用 send 方法发送消息实 际上是异步的操作,我们可以通过 get() 方法获取调用结果,
另外这里推荐为 Producer 的 retries (重试次数)设置一个比较合理的值,一般是 3 ,但是为了保 证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发 送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波 动一次你3次一下子就重试完了
消费者丢失消息的情况
当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题, 试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消 费,但是 offset 却被自动提交了。
我们手动关闭闭自动提交 offset,每次在真正消费完消息之后之后再自己手动 提交 offset 。 但是,细心的朋友一定会发现,这样会带来消息被重新消费的问题。比如你刚刚消费 完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次
Kafka本身Server弄丢消息
我们知道 Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副 本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副 本交互。你可以理解为其他副本只是 leader 副本的拷⻉,它们的存在只是为了保证消息存储的安全 性。
试想一种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。
设置 acks = all ** 解决办法就是我们设置 **acks = all。acks 是 Kafka 生产者(Producer) 很重要的一个参数。
acks 的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。当我们配置 acks = all 代表则所有副本都要接收到该消息之后该消息才算真正成功被发送
Kafka 如果保证消息不重复
幂等
为了实现Producer的幂等语义,Kafka引入了Producer ID(即PID)和Sequence Number。每个新的Producer在初始化的时候会被分配一个唯一的PID,该PID对用户完全透明而不会暴露给用户。 Producer发送每条消息<Topic, Partition>对于Sequence Number会从0开始单调递增,broker端会为每个<PID, Topic, Partition>维护一个序号,每次commit一条消息此序号加一,对于接收的每条消息,如果其序号比Broker维护的序号(即最后一次Commit的消息的序号)大1以上,则Broker会接受它,否则将其丢弃:
- 序号比Broker维护的序号大1以上,说明存在乱序。
- 序号比Broker维护的序号小,说明此消息以及被保存,为重复数据。
幂等性机制仅解决了单分区上的数据重复和乱序问题,对于跨session和所有分区的重复和乱序问题不能得到解决。于是需要引入事务

事务
事务是指所有的操作作为一个原子,要么都成功,要么都失败,而不会出现部分成功或部分失败的可能
为了解决跨session和所有分区不能EXACTLY-ONCE问题,Kafka从0.11开始引入了事务。 为了支持事务,Kafka引入了Transacation Coordinator来协调整个事务的进行,并可将事务持久化到内部topic里,类似于offset和group的保存。 用户为应用提供一个全局的Transacation ID,应用重启后Transacation ID不会改变。为了保证新的Producer启动后,旧的具有相同Transaction ID的Producer即失效,每次Producer通过Transaction ID拿到PID的同时,还会获取一个单调递增的epoch。由于旧的Producer的epoch比新Producer的epoch小,Kafka可以很容易识别出该Producer是老的Producer并拒绝其请求。有了Transaction ID后,Kafka可保证:
- 跨Session的数据幂等发送。当具有相同Transaction ID的新的Producer实例被创建且工作时,旧的Producer停止工作。
- 跨Session的事务恢复。如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么Commit要么Abort,使得新实例从一个正常状态开始工作

文档信息
- 本文作者:Jessica
- 本文链接:https://jessica0530.github.io/2020/08/30/Kafka-%E5%9F%BA%E7%A1%80%E4%BB%8B%E7%BB%8D/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)