kafka高水位

最后更新:2020-02-06

基本复制的参考资料的内容

1. HW

  • HW是 High Watermark 的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。HW 保证了 Kafka 集群中消息的一致性。确切地说,是保证了 Partition 的 Follower 与 Leader 间数 据的一致性。
  • LEO是 Log End Offset 的缩写,日志最后消息的偏移量。消息是被写入到 Kafka 的日志文件中的, 它标识当前日志文件中下一条待写入消息的 offset。

下图说明了两者的关系

如上图所示,第一条消息的 offset(LogStartOffset)为0,最后一条消息的 offset 为8,offset 为9的消息用虚线框表示,代表下一条待写入的消息。日志文件的 HW 为6,表示消费者只能拉取到 offset 在0至5之间的消息,而 offset 为6的消息对消费者而言是不可见的。

对于 Leader 新写入的消息,Consumer 是不能立刻消费的。Leader 会等待该消息被所有 ISR 中的 Partition Follower 同步后才会更新 HW,此时消息才能被 Consumer 消费。

在 Kafka 中,高水位的作用主要有 2 个

  • 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
  • 帮助 Kafka 完成副本同步。

在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。消费者只能消费已提交消息。位移值等于高水位的消息也属于未提交消息。也就是说,高水位上的消息是不能被消费者消费的。同一个副本对象,其高水位值不会大于 LEO 值。

2. 高水位更新机制

每个副本对象都保存了一组高水位值和 LEO 值,但实际上,在 Leader 副本所在的 Broker 上,还保存了其他 Follower 副本的 LEO 值。

在上图中,我们可以看到,Broker 0 上保存了某分区的 Leader 副本和所有 Follower 副本的 LEO 值,而 Broker 1 上仅仅保存了该分区的某个 Follower 副本。Kafka 把 Broker 0 上保存的这些 Follower 副本又称为远程副本(Remote Replica)。

简单来说,每个副本都有HW和LEO的存储,而leader不但保存自己的HW和LEO, 还保存了每个远端副本的LEO,用于在自身的HW更新时计算值。

Kafka 副本机制在运行过程中,会更新 Broker 1 上 Follower 副本的高水位和 LEO 值,同时也会更新 Broker 0 上 Leader 副本的高水位和 LEO 以及所有远程副本的 LEO,但它不会更新远程副本的高水位值

在 Broker 0 上保存这些远程副本的主要作用是,帮助 Leader 副本确定其高水位,也就是分区高水位

HW和LEO的更新策略如下:

follower自己的LEO Follower从leader副本拉取消息,写入磁盘后,更新LEO值
Leader自己的LEO Leader收到producer消息,写入磁盘后,更新LEO值
Leader的远程LEO Follower fech时带上自己的LEO, leader使用这个值更新远程LEO
Follower的自己HW followerfetch成功更新LEO后,比较leader发来的hw和自己的hw,取较小值
Leader自己的hw Leader更新LEO之后,更新完远程LEO之后,取所有副本的最小LEO

下面我们通过一个例子看一下一次完整的写请求的HW / LEO更新流程:

(1)初始状态

Leader 所有的 HW&LEO都为0, follower 与 leader 建立连接,follower fetch leader, follower 所有 HW & LEO 都为0

(2)Follower 第一次 fetch

Producer 发来一条消息到 leader, 此时 leader 的 LEO=1, follower 带着自己的 HW&LEO(都为0) 开始 fetch, leader的 HW=min(all follower LEO)=0, leader 记录follower的LEO=0;follower 拉取到一条消息,带着消息和leader的 HW(0)&LEO(1)返回自身更新自己的LEO=1, 更新自己的HW=min(follower 自身 LEO(1) 和 leader HW(0))=0

(3)Follower 第二次fetch

Follower带着自己的 HW(0)&LEO(1) 去请求leader .此时leader 的HW更新为1,leader 保存的follower的 LEO更新为1,带着leader 的 HW(1)&LEO(1)返回自身,更新自己的 HW&LEO

这种HW和LEO更新策略有个很明显的问题,即follower的HW更新需要follower的2轮fetch中的leader返回才能更新,而Leader的HW已更新。

在这之间,如果follower和leader的节点发生故障,则follower的HW和leader的HW会处于不一致状态,带来比较多的一致性问题。比如如下场景:

  • Leader更新完分区HW后,follower HW还未更新,此时follower重启;
  • Follower重启后,LEO设置为之前的follower HW值(0), 此时发生消息截断(临时状态);
  • Follower重新同步leader, 此时leader宕机,则不选举则不可用;
  • Follower被选举为leader, 则msg 1 永久丢失了。

min.insync.replicas为1,这种重启后follower发生截断发生的概率会大大提升, 而在多个副本存在的情况下,情况可能还会更加糟糕。

而kafka新版本为了解决这个HW&LEO的同步机制更新缺陷,引入了Epoch的概念。

3. Leader Epoch

依托于高水位,Kafka 既界定了消息的对外可见性,又实现了异步的副本同步机制。不过,我们还是要思考一下这里面存在的问题。

从上面的分析中,我们知道,Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。如果把上面那个例子扩展到多个 Follower 副本,情况可能更糟,也许需要多轮拉取请求。也就是说,Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。基于此,社区在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。

所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。

  • Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
  • 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。

假设现在有两个 Leader Epoch<0, 0> 和 <1, 120>,那么,第一个 Leader Epoch 表示版本号是 0,这个版本的 Leader 从位移 0 开始保存消息,一共保存了 120 条消息。之后,Leader 发生了变更,版本号增加到 1,新版本的起始位移是 120。

Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。这样,每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。

4.参考资料

《Kafka核心技术与实战》

https://mp.weixin.qq.com/s/Tmdq5VwLd48OaURMjG8Wyw

Edgar

Edgar
一个略懂Java的小菜比