kafka 原理解析

2020/01/29 Knowledge

必看

参考来源

目录

kafka 是大家比较常用的消息中间件,本文主要介绍 kafka 基本组件及其相关原理

基本架构

kafka架构

  • Broker:消息中间件处理节点,一个 Kafka 节点就是一个 broker,一个或者多个 Broker 可以组成一个 Kafka 集群
  • Topic:Kafka 根据 topic 对消息进行归类,发布到 Kafka 集群的每条消息都需要指定一个 topic
  • Producer:消息生产者,向 Broker 发送消息的客户端
  • Consumer:消息消费者,从 Broker 读取消息的客户端
  • ConsumerGroup:每个 Consumer 属于一个特定的 Consumer Group,一条消息可以发送到多个不同的 Consumer Group,但是一个 Consumer Group 中只能有一个 Consumer 能够消费该消息
  • Partition:物理上的概念,一个 topic 可以分为多个 partition,每个 partition 内部是有序的
  • Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为 Leader。在 kafka 中默认副本的最大数量是 10 个,且副本的数量不能大于 Broker 的数量,follower 和 leader 绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
  • Zookeeper:kafka 集群依赖 zookeeper 来保存集群的的元信息,来保证系统的可用性。

偏移量

offset

Kafka 通过 offset 保证消息在分区内的顺序,offset 的顺序性不跨分区。Kafka0.10 以后,使用一个专门的 topic __consumer_offset 保存offset(之前使用 zookeeper)。__consumer_offset 日志留存方式为 compact,也就是说,该 topic 会对 key 相同的消息进行整理

__consumer_offset内保存三类消息:

  • Consumer group 组元数据消息
  • Consumer group 位移消息
  • Tombstone 消息

消息发送

发送

发送数据

我们看上面的架构图中,producer 就是生产者,是数据的入口。注意看图中的红色箭头,producer 在写入数据的时候永远的找 leader(不同 topic 选出的 leader 也不相同),不会直接将数据写入 follower!那 leader 怎么找呢?写入的流程又是什么样的呢?我们看下图:

发送2

发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入 leader 后,follower 是主动的去 leader 进行同步的!producer 采用 push 模式将数据发布到 broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!

kafka leader 选举机制原理

kafka 在所有 broker 中选出一个 controller,所有 Partition 的 Leader 选举都由 controller 决定。controller 会将 Leader 的改变直接通过 RPC 的方式(比 Zookeeper Queue 的方式更高效)通知需为此作出响应的 Broker。同时 controller 也负责增删 Topic 以及 Replica 的重新分配。

流程:

  1. Controller 在 Zookeeper 注册 Watch,一旦有 Broker 宕机(这是用宕机代表任何让系统认为其 die 的情景,包括但不限于机器断电,网络不可用,GC 导致的 Stop The World,进程 crash 等),其在 Zookeeper 对应的 znode 会自动被删除,Zookeeper 会关闭 Controller 注册的 watch,Controller 读取最新的幸存的 Broker

  2. Controller 决定 set_p(分区集合),该集合包含了宕机的所有 Broker 上的所有 Partition,因为可能有多个 topic partition 再此 broker 上。

  3. 对 set_p 中的每一个 Partition 做以下处理:
    1. 从 /brokers/topics/[topic]/partitions/[partition]/state 读取该 Partition 当前的 ISR(in-sync Replica 一个副本同步列表)
    2. 决定该 Partition 的新 Leader。如果当前 ISR 中有至少一个 Replica 还幸存,则选择其中一个作为新 Leader,新的 ISR 则包含当前 ISR 中所有幸存的 Replica(选举算法的实现类似于微软的 PacificA)。否则选择该 Partition 中任意一个幸存的 Replica 作为新的 Leader 以及 ISR(该场景下可能会有潜在的数据丢失)。如果该 Partition 的所有 Replica 都宕机了,则将新的 Leader 设置为-1。
    3. 将新的 Leader,ISR 和新的 leader_epoch 及 controller_epoch 写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有其 version 在 3.1 至 3.3 的过程中无变化时才会执行,否则跳转到 3.1
  4. 直接通过 RPC 向 set_p 相关的 Broker 发送 LeaderAndISRRequest 命令。Controller 可以在一个 RPC 操作中发送多个命令从而提高效率。

选举leader

存储分区

分区结构

每个 Partition 其实都会对应一个日志目录(kafka-logs):{topicName}-{partitionid}/,在目录下面会对应多个日志分段(LogSegment)。LogSegment文件由两部分组成,分别为“.index”文件、“.log”文件和“.timeindex”文件。log 文件就实际是存储 message 的地方,而 index 和 timeindex 文件为索引文件,用于检索消息。

index-log

如上图,这个 partition 有三组 segment 文件,每个 log 文件的大小是一样的,但是存储的 message 数量是不一定相等的(每条的 message 大小不一致)。一个 segment 是固定大小的消息集合(可配置)。文件的命名是以该 segment 最小 offset 来命名的,如 000.index 存储 offset 为 0~368795 的消息,kafka 就是利用分段+索引的方式来解决查找效率的问题。

分区的好处

  1. 方便扩展。因为一个 topic 可以有多个 partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。

  2. 提高并发。以 partition 为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。

分区原则

熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在 kafka 中,如果某个 topic 有多个 partition,producer 又怎么知道该将数据发往哪个 partition 呢?kafka 中有几个原则:

  1. partition 在写入的时候可以指定需要写入的 partition,如果有指定,则写入对应的 partition。

  2. 如果没有指定 partition,但是设置了数据的 key,则会根据 key 的值 hash 出一个 partition。

  3. 如果既没指定 partition,又没有设置 key,则会轮询选出一个 partition。

消息不丢失

保证消息不丢失是一个消息队列中间件的基本保证,那 producer 在向 kafka 写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过 ACK 应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认 kafka 接收到数据,这个参数可设置的值为 0、1、all。

0 代表 producer 往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。

1 代表 producer 往集群发送数据只要 leader 应答就可以发送下一条,只确保 leader 发送成功。

all 代表 producer 往集群发送数据需要所有的 follower 都完成从 leader 的同步才会发送下一条,确保 leader 发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。

Memory Mapped Files

即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以 Kafka 的数据并不是实时的写入硬盘,它充分利用了现代操作系统分页存储,利用内存提高 I/O 效率。

Memory Mapped Files(后面简称 mmap)也被翻译成内存映射文件,在 64 位操作系统中一般可以表示 20G 的数据文件,它的工作原理是直接利用操作系统的 Page 来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。

MMAP

通过 mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小有虚拟内存为我们兜底。

使用这种方式可以获取很大的 I/O 提升,省去了用户空间到内核空间复制的开销(调用文件的 read 会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中。)也有一个很明显的缺陷——不可靠,写到 mmap 中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用 flush 的时候才把数据真正的写到硬盘。Kafka 提供了一个参数——producer.type 来控制是不是主动 flush,如果 Kafka 写入到 mmap 之后就立即 flush 然后再返回 Producer 叫同步(sync),写入 mmap 之后立即返回 Producer 不调用 flush 叫异步(async)。

mmap 其实是 Linux 中的一个函数就是用来实现内存映射的,谢谢 Java NIO,它给我提供了一个 mappedbytebuffer 类可以用来实现内存映射(所以是沾了 Java 的光才可以如此神速和 Scala 没关系)

Message 结构

上面说到 log 文件就实际是存储 message 的地方,我们在 producer 往 kafka 写入的也是一条一条的 message,那存储在 log 中的 message 是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:

1、 offset:offset 是一个占 8byte 的有序 id 号,它可以唯一确定每条消息在 parition 内的位置!

2、 消息大小:消息大小占用 4byte,用于描述消息的大小。

3、 消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。

存储策略

无论消息是否被消费,kafka 都会保存所有的消息。那对于旧数据有什么删除策略呢?

1、 基于时间,默认配置是 168 小时(7天,可配置)。

2、 基于大小,默认配置是 1073741824。

需要注意的是,kafka 读取特定消息的时间复杂度是 O(1),所以这里删除过期的文件并不会提高kafka的性能!

消息消费

消费者组的消费关系

消息存储在 log 文件后,消费者就可以进行消费了。Kafka采用的是点对点的模式,消费者主动的去 kafka 集群拉取消息,与 producer 相同的是,消费者在拉取消息的时候也是找 leader 去拉取。

多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组 id!同一个消费组者的消费者可以消费同一 topic 下不同分区的数据,但是不会组内多个消费者消费同一分区的数据

消费

这种方式保证了同一个消费者组内的消费者不会消费重复数据。

图示是消费者组内的消费者小于 partition 数量的情况,所以会出现某个消费者消费多个 partition 数据的情况,消费的速度也就不及只处理一个 partition 的消费者的处理速度!如果是消费者组的消费者多于 partition 的数量,那会不会出现多个消费者消费同一个 partition 的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量一致!

消费读取文件

在保存数据的小节里面,我们聊到了 partition 划分为多组 segment,每个 segment 又包含 .log、.index、.timeindex 文件,存放的每条 message 包含 offset、消息大小、消息体……我们多次提到 segment 和 offset,查找消息的时候是怎么利用 segment+offset 配合查找的呢?假如现在需要查找一个 offset 为368801的 message 是什么样的过程呢?我们先看看下面的图:

消费文件

  1. 先找到 offset 的 368801message 所在的 segment 文件(利用二分法查找),这里找到的就是在第二个 segment 文件。

  2. 打开找到的 segment 中的 .index 文件(也就是 368796.index 文件,该文件起始偏移量为 368796+1,我们要查找的 offset 为 368801 的 message 在该 index 内的偏移量为 368796+5=368801,所以这里要查找的相对 offset 为 5)。由于该文件采用的是稀疏索引的方式存储着相对 offset 及对应 message 物理偏移量的关系,所以直接找相对 offset 为 5 的索引找不到,这里同样利用二分法查找相对 offset 小于或者等于指定的相对 offset 的索引条目中最大的那个相对 offset,所以找到的是相对 offset 为 4 的这个索引。

  3. 根据找到的相对 offset 为 4 的索引确定 message 存储的物理偏移位置为 256。打开数据文件,从位置为 256 的那个地方开始顺序扫描直到找到 offset 为 368801 的那条 Message。

这套机制是建立在 offset 为有序的基础上,利用 segment 加有序 offset 加稀疏索引加二分查找加顺序查找等多种手段来高效的查找数据!至此,消费者就能拿到需要处理的数据进行处理了。那每个消费者又是怎么记录自己消费的位置呢?在早期的版本中,消费者将消费到的 offset 维护 zookeeper 中,consumer 每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的 offset 已经直接维护在 kafka 集群的 __consumer_offsets 这个 topic 中。

Search

    Table of Contents