Kafka消息存储与索引设计

最后更新:2020-02-09

完全复制的参考资料

1. 日志存储方式

消息中间件的性能好坏,它的消息存储的机制是衡量该性能的最重要指标之一,而 Kafka 具有高性能、高吞吐、低延时的特点,动不动可以上到几十上百万 TPS,离不开它优秀的消息存储设计。在 Kafka 的设计思想中,消息的存储文件被称作日志,我们 Java 后端绝大部分人谈到日志,一般会联想到项目通过 log4j 等日志框架输出的信息,而 Kafka 的消息日志类似于数据库中的提交记录,他们会按照时间顺序进行追加,Kafka 的消息也是严格按照时间顺序并已一定的格式写入日志文件中。

Kafka 将消息封装成一个个 Record,并以自定义的格式序列化成二进制字节数组进行保存:

如上图所示,消息严格按照顺序进行追加,一般来说,左边的消息存储时间都要小于右边的消息,需要注意的一点是,在 0.10.0.0 以后的版本中,Kafka 的消息体中增加了一个用于记录时间戳的字段,而这个字段可以有 Kafka Producer 端自定义,意味着客户端可以打乱日志中时间的顺序性。(待考证)

Kafka 的消息存储会按照该主题的分区进行隔离保存,即每个分区都有属于自己的的日志,在 Kafka 中被称为分区日志(partition log),每条消息在发送前计算到被发往的分区中,broker 收到日志之后把该条消息写入对应分区的日志文件中:

如果每个分区只存在一个日志文件,对于消息的过期清除和检索都是一个大难题,因此 Kafka 会将每个分区的日志文件继续细分成若干个日志文件,这些日志文件也称作日志段文件(log segment file)每个日志段文件都会伴随一个索引文件和时间戳索引文件,在 broker 所属节点打开对应分区日志的目录log.dirs=/tmp/kafka-logs,可以看到相关文件:

# ls -l test-0/
total 20916
-rw-r--r-- 1 root root 10485760 Jan 29 01:47 00000000000000000000.index
-rw-r--r-- 1 root root   440133 Jan 29 01:47 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 29 01:47 00000000000000000000.timeindex
-rw-r--r-- 1 root root        8 Jan 29 01:47 leader-epoch-checkpoint

在使用kafka时,消息都是推送到某个topic中,然后由producer计算当前消息会发送到哪个partition,在partition中,kafka会为每条消息设置一个偏移量,也就是说,如果要唯一定位一条消息,使用<topic, partition, offset>三元组即可。基于kafka的架构模式,其会将各个分区平均分配到每个broker中,也就是说每个broker会被分配用来提供一个或多个分区的日志存储服务。在broker服务器上,kafka的日志也是按照partition进行存储的,其会在指定的日志存储目录中为每个topic的partition分别创建一个目录,目录中存储的就是这些分区的日志数据,而目录的名称则会以的格式进行创建。

每个日志段包含了 .log/.index/timeindex 三个文件,而且名字都是相同的。

  • .index 偏移量索引文件,存储数据对应的偏移量
  • .timestamp 时间戳索引文件
  • .log 日志文件,存储生产者生产的数据
  • .snaphot 快照文件
  • Leader-epoch-checkpoint 保存了每一任leader开始写入消息时的offset,会定时更新。 follower被选为leader时会根据这个确定哪些消息可用

1.1. log 文件

.log 后缀文件保存了 Kafka 消息的记录,而且每个 log 文件都有对应的消息记录范围,名字的数字代表了消息记录的初始位移值,并且随着消息数量的增多而增大,因此,每个新创建的分区一定会包含 0 的 log 文件。Kafka 文件名字使用了 20 位来标识位移,对于实际的生产环境已经足够用了。

每个 log 文件的默认大小为 1 GB,可以通过 log.segment.bytes 参数进行控制,每当 log 文件被填满后,Kafka 会自动创建一组新的日志文件和索引文件,也就是说日志段文件一旦被填满后,就不会再继续写入新消息,而是写到新的日志段文件中,而当前可被写入消息的日志段文件也称作当前日志段文件,它是一种特殊的日志段文件,它不会受到 Kafka 任何后台任务的影响,比如日志过期清除、日志 compaction 等任务。

1.2. 索引文件

每个 log 文件都会包含两个索引文件,分别是 .index.timeindex,在 Kafka 中它们分别被称为位移索引文件和时间戳索引文件,位移索引文件可根据消息的位移值快速地从查询到消息的物理文件位置,时间戳索引文件可根据时间戳查找到对应的位移信息。

在每个log文件进行分段之后,这两个索引文件也会进行分段,这也就是它们的文件名与log文件一致的原因;

Kafka 的索引文件按照稀疏索引文件的思想进行设计的,每个索引文件包含若干条索引项。稀疏索引的核心即不会为每个记录都保存索引,而是写入一定的记录之后才会增加一个索引值,具体这个间隔有多大则通过 log.index.interval.bytes 参数进行控制,默认大小为 4 KB,意味着 Kafka 至少写入 4KB 消息数据之后,才会在索引文件中增加一个索引项。

需要注意的一点是,消息大小还会影响 Kakfa 索引的插入频率,假设每个消息大小均大于 4 KB,会导致每次追加消息的时候,都会伴随一次索引项的增加。因此 log.index.interval.bytes 也是 Kafka 调优一个重要参数值。

关于位移索引文件,这里有两点需要说明:

  • 由于kafka消息都是以batch的形式进行存储,因而索引文件中索引元素的最小单元是batch,也就是说,通过位移索引文件能够定位到消息所在的batch,而没法定位到消息在batch中的具体位置,查找消息的时候,还需要进一步对batch进行遍历;
  • 位移索引文件中记录的位移值并不是消息真正的位移值,而是该位移相对于该位移索引文件的起始位移的偏移量,通过这种方式能够极大的减小位移索引文件的大小。如下图所示为一个位移索引文件的格式示意图:

Kafka 是如何根据索引文件进行快速检索的呢?由于索引文件也是按照消息的顺序性进行增加索引项的,位移索引文件按照位移顺序保存,而时间戳索引文件则按照时间顺序保存索引项,因此 Kafka 可以利用二分查找算法来搜索目标索引项,把时间复杂度降到了 O(lgN),大大减少了查找的时间。

每个日志段的索引文件可通过 log.index.size.max.bytes 参数控制,默认大小为 10 MB。

我的版本没有log.index.size.max.bytes这个参数,还没有考证

1.2.1 位移索引文件

位移索引文件的索引项结构如下:

可以看出,每个索引项的大小为 8 bytes,源码 kafka.log.OffsetIndex#entrySize = 8 限定了索引项的大小。

需要注意的是,索引文件大小必须是索引项的整数倍,假设 log.index.size.max.bytes = 500,则 Kafka 会创建一个大小为 496 bytes 的索引文件。

  • 相对位移:保存于索引文件名字上面的起始位移的差值,假设一个索引文件为:00000000000000000100.index,那么起始位移值即 100,当存储位移为 150 的消息索引时,在索引文件中的相对位移则为 150 - 100 = 50,这么做的好处是使用 4 字节保存位移即可,可以节省非常多的磁盘空间。
  • 文件物理位置:消息在 log 文件中保存的位置,也就是说 Kafka 可根据消息位移,通过位移索引文件快速找到消息在 log 文件中的物理位置,有了该物理位置的值,我们就可以快速地从 log 文件中找到对应的消息了。

下图来表示 Kafka 是如何快速检索消息:

假设 Kafka 需要找出位移为 3550 的消息,那么 Kafka 首先会使用二分查找算法找到小于 3550 的最大索引项:[3528, 2310272],得到索引项之后,Kafka 会根据该索引项的文件物理位置在 log 文件中从位置 2310272 开始顺序查找,直至找到位移为 3550 的消息记录为止。

1.2.2 时间戳索引文件

Kafka 在 0.10.0.0 以后的版本当中,消息中增加了时间戳信息,为了满足用户需要根据时间戳查询消息记录,Kafka 增加了时间戳索引文件,时间戳索引文件的索引项结构如下:

可以看出,每个索引项的大小为 12 bytes,源码 kafka.log.TimeIndex#entrySize = 12 限定了索引项的大小。

同样地,时间戳索引文件大小也必须为索引项的整数倍大小,计算方式与位移索引文件相同。

下图表示 Kafka 是如何快速检索消息:

使用时间戳查找消息的流程与使用位移查找消息的流程的一些细节少有不同

假设要查询时间戳为 1609087040523 附近的消息,从源码逻辑,根据二分算法找到时间戳索引项 [1609087040112, 5146],然后根据根据位移值从位移索引文件中找到小于 5146 位移的最大索引项[5046, 3111375]。

根据查到的索引项位移值 5046 开始查询,当消息时间戳最接近目标搜索的时间戳并且位移大于等于搜索起始位移时,则该消息即是满足该时间戳条件的消息。

2. 消息格式

2.1. Kafka 0.7.x 消息格式

这个版本消息的格式相对简单,具体如下:

从上面可以看出,Kafka 0.7.x 版本的消息格式比较简单,主要包括:

  • magic:这个占用1个字节,主要用于标识 Kafka 版本。这个版本的 Kafka magic有 0 和 1 两个值,不过默认 Message 使用的是 1;
  • attributes:占用1个字节,这里面存储了消息压缩使用的编码。这个版本的 Kafka 仅支持 gzip 和 snappy 两种压缩格式;后四位如果是0001则标识gzip压缩,如果是0010则是snappy压缩,如果是0000则表示没有使用压缩。
  • crc:占用4个字节,主要用于校验消息的内容,也就是上图的Value。
  • value:这个占用的字节为 N - 6,N为Message总字节数,6就是前面几个属性占用的字节和。value即是消息的真实内容,在 Kafka 中这个也叫做payload。

大家在上图还看到 MessageSet 的格式,一个 MessageSet 包含多条消息,其中:

  • offset:占用8个字节,这个是 Kafka 消息存储到磁盘之后的物理偏移量;
  • size:占用4个字节,这是消息的大小。
  • message:占用N个字节,这个就是上图的Message,格式见Message Format。

需要注意的是, Kafka 从 Producer 发送到 Broker 是以 MessageSet 为单位发送的,而不是以 Message 发送的。而且压缩的时候也是以 MessageSet 进行压缩的,并不是只压缩一条消息,这样做的目的是提高效率。压缩之后的消息格式如下:

从上图可以看出,压缩之后的内容作为另外一条消息的内容进行存储,其中包含了多条消息。

2.2. Kafka 0.8.x (0.9.x) 消息格式

Kafka 0.7.x 的消息格式有以下几个缺点:

  • 压缩消息的内部消息不可以通过偏移量进行寻址
  • 对于压缩消息,消费者checkpoint 消息时只能对整个消息进行 checkpoint ,无法对压缩消息内部的消息进行checkpoint ,这使得我们只能实现At-least-once语义。
  • 这种消息格式不适合log compaction

针对这些问题,Kafka 0.8.0 基于 Kafka 0.7.0 消息合适进行了改进,主要如下:

这个版本的 Message 格式加入了 Key 相关的信息,以及 内容的长度等,各个字段的含义介绍如下:

  • crc:占用4个字节,主要用于校验消息的内容;
  • magic:这个占用1个字节,主要用于标识 Kafka 版本。
  • attributes:占用1个字节,这里面存储了消息压缩使用的编码。这个版本的 Kafka 仅支持 gzip、snappy 以及 lz4(0.8.2引入) 三种压缩格式;后四位如果是0001则标识gzip压缩,如果是0010则是snappy压缩,如果是0011则是snappy压缩,如果是0000则表示没有使用压缩。
  • key length:占用4个字节。主要标识 Key 的内容的长度 K;
  • key:占用 K 个字节。存储的是 key 的具体内容
  • value length:占用4个字节。主要标识 value 的内容的长度 V;
  • value:这个占用的字节为 V。value即是消息的真实内容,在 Kafka 中这个也叫做payload。

这个版本的MessageSet 格式和之前一样,就不介绍了。但是需要注意的是,这个版本 MessageSet 中的 offset 字段存储的已经不是消息物理偏移量了,而是逻辑地址,比如0,、1、2….。有了逻辑地址,我们就可以解决之前Kafka 0.7.0遇到的一些问题,比如可以在压缩消息内通过偏移量进行寻址,压缩消息可以checkpoint内部的消息等。

我们在上面说了 Kafka 压缩的时候是将整个 MessageSet 进行压缩的,压缩完之后的内容作为另外一个 Message 的 value,如下:

从上图看到,压缩消息的格式和非压缩格式的消息不一样,少了 Key 的存储空间,而且这时候消息的 value 为压缩之后的消息内容。

2.3. Kafka 0.10.x 消息格式

到了Kafka 0.10.x,其引入了 Kafka Stream,其依赖了消息的时间,所以这个版本的消息加入了时间戳属性,格式如下:

可以看出,这个版本相对于 Kafka 0.8.x版本的消息格式变化不大,各个字段的含义: 这个版本的 Message 格式加入了 Key 相关的信息,以及 内容的长度等,各个字段的含义介绍如下:

  • crc:占用4个字节,主要用于校验消息的内容;
  • magic:这个占用1个字节,主要用于标识 Kafka 版本。Kafka 0.10.x magic默认值为1
  • attributes:占用1个字节,这里面存储了消息压缩使用的编码以及Timestamp类型。这个版本的 Kafka 仅支持 gzip、snappy 以及 lz4(0.8.2引入) 三种压缩格式;后四位如果是 0001 则表示 gzip 压缩,如果是 0010 则是 snappy 压缩,如果是 0011 则是 lz4 压缩,如果是0000则表示没有使用压缩。第4个bit位如果为0,代表使用create time;如果为1代表append time;其余位(第5~8位)保留;
  • key length:占用4个字节。主要标识 Key 的内容的长度 K;
  • key:占用 K 个字节。存储的是 key 的具体内容
  • value length:占用4个字节。主要标识 value 的内容的长度 V;
  • value:这个占用的字节为 V。value即是消息的真实内容,在 Kafka 中这个也叫做payload。

这个版本的压缩消息格式和 Kafka 0.8.x 类似。

3. 日志切分

我们可以通过以下配置项来控制logSegment的切分

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824 # 1G
# 新版本没有这个配置?
log.index.size.max.bytes=10485760 # 10M

当满足以下几个条件之一时,就会触发文件的切分

  • 数据文件(后缀为.log)的大小超过log.segment.bytes配置的值,默认是1G
  • index文件(后缀为.index)的大小超过log.index.size.max.bytes配置的值,默认是10M

4. 日志删除

# The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion
# 日志删除检测频率,默认为5分钟,也即每隔5分钟,检测日志是否需要删除
log.retention.check.interval.ms=300000

#The number of hours to keep a log file before deleting it (in hours)
# 日志保存时间,默认是7天,也即7天前的日志需要删除
log.retention.hours=168

# The maximum size of the log before deleting it
# 日志最大值,当所有日志文件大小的总和超过该配置就需要被删除
log.retention.bytes

5. 日志压缩

所谓的日志压缩功能,其主要是针对这样的场景的,比如对某个用户的邮箱数据进行修改,其总共修改了三次,修改过程如下:

email=john@gmail.com
email=john@yahoo.com.cn
email=john@163.com

在这么进行修改之后,很明显,我们主要需要关心的是最后一次修改,因为其是最终数据记录,但是如果我们按顺序处理上述消息,则需要处理三次消息。kafka的日志压缩就是为了解决这个问题而存在的,对于使用相同key的消息,其会只保留最新的一条消息的记录,而中间过程的消息都会被kafka cleaner给清理掉。但是需要注意的是,kafka并不会清理当前处于活跃状态的日志文件中的消息记录。所谓当前处于活跃状态的日志文件,也就是当前正在写入数据的日志文件。如下图所示为一个kafka进行日志压缩的示例图:

图中K1的数据有V1、V3和V4,经过压缩之后只有V4保留了下来,K2的数据则有V2、V6和V10,压缩之后也只有V10保留了下来;同理可推断其他的Key的数据。另外需要注意的是,kafka开启日志压缩使用的是log.cleanup.policy,其默认值为delete,也即我们正常使用的策略,如果将其设置为compaction,则开启了日志压缩策略,但是需要注意的是,开启了日志压缩策略并不代表kafka会清理历史数据,只有将log.cleaner.enable设置为true才会定时清理历史数据。

在kafka中,其本身也在使用日志压缩策略,主要体现在kafka消息的偏移量存储。在旧版本中,kafka将每个consumer分组当前消费的偏移量信息保存在zookeeper中,但是由于zookeeper是一款分布式协调工具,其对于读操作具有非常高的性能,但是对于写操作性能比较低,而consumer的位移提交动作是非常频繁的,这势必会导致zookeeper称为kafka消息消费的瓶颈。因而在最新版本中,kafka将分组消费的位移数据存储在了一个特殊的topic中,即__consumer_offsets,由于每个分组group的位移信息都会提交到该topic,因而kafka默认为其设置了非常多的分区,也即50个分区。另外,consumer在提交位移时,使用的key为groupId+topic+partition,而值则为当前提交的位移,也就是说,对于每一个分组所消费的topic的partition,其都只会保留最新的位移。如果consumer需要读取位移,那么只需要按照上述格式组装key,然后在该topic中读取最新的消息数据即可

__consumeroffsets 的 Partition 数量由下面的 Server 配置决定:

offsets.topic.num.partitions=50

参考资料

https://my.oschina.net/zhangxufeng/blog/3114166

https://mp.weixin.qq.com/s/5UZcm9nMEwlSNjrBlrzIVQ

https://www.iteblog.com/archives/2232.html

Edgar

Edgar
一个略懂Java的小菜比