Kafka基础知识
Kafka基础知识
简介
Apache Kafka 是一个由Scala和Java语言开发的分布式流处理平台,基于发布订阅模式。Kafka 主要用于构建实时数据流管道和流应用程序。它可以处理大量的实时数据,确保高吞吐量、低延迟、分布式、容错和可扩展的特点,因此广泛应用于日志处理、监控数据、消息传递、实时分析等场景。
Kafka架构设计
Kafka集群的架构图如下,接下来会详细进行介绍。
图源网络
Kafka集群
Kafka是一个分布式的系统,必然涉及到集群节点之间的协调工作,Kafka通过第三方组件Zookeeper来进行集群的协调工作,Kafka的集群采用的是主从架构(Master-Slave),Kafka的节点在启动时会向Zookeeper中尝试创建一个临时节点(节点为/controller)
,Zookeeper的节点是不允许重复的,成功在Zookeeper创建节点 /controller 的Kafka节点会成为集群的主节点(Master),也称为集群控制器Controller,来管理整个集群。
没有成功成为Master的其他Slave节点会创建Node监听器,用于监听 /controller 节点的状态变化,由于创建的是临时节点,当原来的Master出现故障时,Zookeeper中的节点会被删除,其他Slave节点会重新尝试在Zookeeper创建节点 /controller 来成为新的Master。
Zookeeper介绍
Zookeeper是一个开放源码的分布式应用程序协调服务软件。在当前的Web软件开发中,多节点分布式的架构设计已经成为必然,如何保证架构中不同的节点所运行的环境、系统配置是相同的,就是一个非常重要的话题。一般情况下,我们会采用独立的第三方软件保存分布式系统中的全局环境信息以及系统配置信息,这样系统中的每一个节点在运行时就可以从第三方软件中获取一致的数据。
Zookeeper的数据存储结构可以简单地理解为一个Tree结构,而Tree结构上的每一个节点可以用于存储数据,所以一般情况下,我们可以将分布式系统的元数据(环境信息以及系统配置信息)保存在Zookeeper节点中。
Zookeeper创建数据节点时,会根据业务场景创建临时节点或永久(持久)节点。永久节点就是无论客户端是否连接上Zookeeper都一直存在的节点,而临时节点指的是客户端连接时创建,断开连接后删除的节点。同时,Zookeeper也提供了Watch(监控)机制用于监控节点的变化,然后通知对应的客户端进行相应的变化。
核心概念
Kafka的每一个节点,都称为代理Broker,在一个Kafka集群中,为了区分不同的服务节点,每一个broker都应该有一个不重复的全局ID,称之为broker.id,在Kafka的配置文件server.properties
中进行配置。
1 | ############################# Server Basics ############################# |
Kafka集群中的主节点Master负责管理整个集群,包括但不限于Broker、Topic(主题)和Partition(分区)的管理,会在Zookeeper当中存储集群的元数据信息,详细请见下表:
节点 | 类型 | 说明 |
---|---|---|
/admin/delete_topics | 持久化节点 | 配置需要删除的topic,因为删除过程中,可能broker下线,或执行失败,那么就需要在broker重新上线后,根据当前节点继续删除操作,一旦topic所有的分区数据全部删除,那么当前节点的数据才会进行清理 |
/brokers/ids | 持久化节点 | 服务节点ID标识,只要broker启动,那么就会在当前节点中增加子节点,brokerID不能重复 |
/brokers/topics | 持久化节点 | 服务节点中的主题详细信息,包括分区,副本 |
/brokers/seqid | 持久化节点 | seqid主要用于自动生产brokerId |
/config/changes | 持久化节点 | kafka的元数据发生变化时,会向该节点下创建子节点。并写入对应信息 |
/config/clients | 持久化节点 | 客户端配置,默认为空 |
/config/brokers | 持久化节点 | 服务节点相关配置,默认为空 |
/config/ips | 持久化节点 | IP配置,默认为空 |
/config/topics | 持久化节点 | 主题配置,默认为空 |
/config/users | 持久化节点 | 用户配置,默认为空 |
/consumers | 持久化节点 | 消费者节点,用于记录消费者相关信息 |
/isr_change_notification | 持久化节点 | ISR列表发生变更时候的通知,在kafka当中由于存在ISR列表变更的情况发生,为了保证ISR列表更新的及时性,定义了isr_change_notification这个节点,主要用于通知Controller来及时将ISR列表进行变更。 |
/latest_producer_id_block | 持久化节点 | 保存PID块,主要用于能够保证生产者的任意写入请求都能够得到响应。 |
/log_dir_event_notification | 持久化节点 | 主要用于保存当broker当中某些数据路径出现异常时候,例如磁盘损坏,文件读写失败等异常时候,向ZooKeeper当中增加一个通知序号,Controller节点监听到这个节点的变化之后,就会做出对应的处理操作 |
/cluster/id | 持久化节点 | 主要用于保存kafka集群的唯一id信息,每个kafka集群都会给分配要给唯一id,以及对应的版本号 |
主题Topic
Kafka是分布式消息传输系统,采用的数据传输方式为发布订阅模式,也就是说由消息的生产者发布消息,消费者订阅消息后获取数据。为了对消费者订阅的消息进行区分,所以对消息在逻辑上进行了分类,这个分类我们称之为主题:Topic。生产者必须将消息数据发送到某一个主题,而消费者必须从某一个主题中获取消息,并且消费者可以同时消费一个或多个主题的数据。Kafka集群中可以存放多个主题的消息数据。
官方推荐主题的名称中不要同时包含下划线和点。
分区Partition
Kafka消息传输采用发布订阅模式,所以消息生产者必须将数据发送到一个主题,假如发送给这个主题的数据非常多,那么主题所在Broker节点的负载和吞吐量就会受到极大的考验,甚至有可能因为热点问题引起broker节点故障,导致服务不可用。
一个好的方案就是将一个主题从物理上分成几块,然后将不同的数据块均匀地分配到不同的Broker节点上,这样就可以缓解单节点的负载问题。这个主题的分块我们称之为:分区Partition。默认情况下,Topic主题创建时分区数量为1,也就是一块分区。Kafka的分区解决了单一主题topic线性扩展的问题,也解决了负载均衡的问题,同一个Topic的分区会均匀地分布在不同的Broker节点上。
Topic主题的每个分区都会用一个编号进行标记,一般是从0开始的连续整数数字。Partition分区是物理上的概念,也就意味着会以数据文件的方式真实存在。每个Topic包含一个或多个Partition,每个Partition都是一个有序的队列。Partition中每条消息都会分配一个有序的ID,称之为偏移量:Offset。
副本机制
为了保障Kafka分布式系统的高可用,防止某一节点故障导致服务不可用,Kafka的分区拥有副本机制,会给分区数据设定多个备份,这里的备份,我们称之为:副本Replication。Kafka支持多副本,使得主题Topic可以做到更多容错性,牺牲性能与空间去换取更高的可靠性。一个Partition的副本Replication也会分散到不同的Broker节点中(备份放置在同一个Broker中没有意义,一旦出现故障副本也没法使用),Kafka会根据副本分配策略进行分配。
Kafka中,一个Partition的所有的文件都称之为副本,会选择其中的一个作为Leader(主导)副本,其他的文件作为备份文件,称之为Follower(追随)副本。只有Leader副本才能进行数据的读写,Follower副本只做备份使用(2.4 版本之前),Follower副本会定期向Leader副本拉取
还未同步的数据(会携带上自己已经同步到的偏移量信息)。
- ISR:Follower 副本会维护自己与 Leader 副本的同步状态。Kafka 通过一个名为同步副本列表 ISR(In-Sync Replicas)的集合来管理副本的同步状态。如果 Follower 副本与 Leader 副本的数据差异(偏移量Offset)在一定范围内,它就会被认为是 “同步中” 状态,会被加入到 ISR 集合中。如果 Follower 副本落后太多,可能会被移出 ISR 集合,而进入OSR(Out-of-Sync Replicas)集合,直到它重新跟上 Leader 副本的进度。
当某个Partition的Leader副本所在的Broker故障后,会由集群的Controller进行故障转移,会优先从ISR当中选取新的Leader副本。
分区数可以大于节点数,但副本数不能大于节点数。因为副本需要分布在不同的节点上,才能达到备份的目的。
ISR + OSR = 已分配的副本列表AR(Assigned Replicas)
Kafka 2.4 版本对消费者协议进行了扩展,允许消费者在拉取数据时指定从 Follower 副本读取
生产者Provider
Kafka生产者的架构图如下:
图源网络
基本流程
- 主线程中会将用户消息经过拦截器、序列化器、分区器进行处理,处理好的用户消息会封装为
ProducerRecord
并发送到消息累加器(RecordAccumulator)中(用户在发送消息时必须指定主题Topic) - 消息累加器会为每个分区(每个主题的每个分区)对应一个队列,在收到消息后,会根据一定的策略判断是否需要为该消息创建新的
ProducerBatch
,或者将消息加入已有的ProducerBatch
中,以便批量发送数据,提高吞吐量,减少带宽消耗 - 当
ProducerBatch
满足发送条件(可配置)后,会由Sender
线程进行发送,会根据消息所属的主题和分区,选择合适的Kafka节点(Broker),将请求封装为Request
对象(包含了此次发送操作的各种详细信息),构造好的Request
对象会被添加到在途请求缓冲区中,等待Selector
进行处理并发送到 Kafka 集群 - 在发送
Request
后,Selector
会根据配置的acks
参数等待 Kafka 集群的响应。当收到响应时,会根据响应中的信息进行相应处理,如果发送失败,会根据重试策略进行重试。 - 最终
Selector
会通知Sender
线程调用相应的回调函数(如果用户配置了)并将Request
请求从在途请求缓冲区中清除以及将ProducerBatch
清除。
在看懂了基本流程后,接下来将对流程的各个点逐一进行展开。
消息处理
拦截器
生产者在将数据发送给Kafka服务器之前,允许我们对生产的数据进行统一的处理,比如进行数据的加密。这些处理可以通过Kafka提供的拦截器完成。拦截器并不是生产者必须配置的功能,需用户根据实际的情况自行选择使用。拦截器可以配置多个,会按照拦截器声明的顺序依次进行调用,
并且如果某一个拦截器执行时出现异常,只会跳过当前拦截器的逻辑,并不会影响后续拦截器的处理
。方法名 作用 onSend 数据发送前,会执行此方法,进行数据发送前的预处理 onAcknowledgement 数据发送后,获取应答时,会执行此方法 close 生产者关闭时,会执行此方法,完成一些资源回收和释放的操作 configure 创建生产者对象的时候,会执行此方法,可以根据场景对生产者对象的配置进行统一修改或转换 序列化器
Kafka 中的消息是以字节数组的形式在网络上传输和存储的,序列化器的作用就是将数据转换为字节数组,以便在 Kafka 中进行传输和存储。
常用序列化器
- StringSerializer:将字符串类型的消息序列化为字节数组,是 Kafka 中最常用的简单序列化器之一。
- ByteArraySerializer:直接将字节数组作为消息进行传输,不需要额外的序列化操作,适用于已经是字节数组形式的消息。
分区器
Kafka 中的主题可以分为多个分区,当发送消息时未指定发送的分区,分区器会决定将消息发送到哪个分区。用户可以根据自己的业务需求自定义分区器。例如,可以根据消息的内容、业务规则等因素来决定消息的分区。如果生产者没有配置分区器,将使用 Kafka 默认的分区器。
默认分区器
默认分区器根据消息的键来确定消息的分区。如果消息没有键,则会采用优化后的粘性分区策略(不同的Kafka版本可能不一致)进行分区的选择,以实现集群的负载均衡(无论如何选择,效果肯定是这个)。如果消息有键,则会根据键的哈希值(murmur2非加密散列算法计算)对分区数量取模,将消息发送到对应的分区中,这样可以保证
具有相同键的消息总是被发送到同一个分区中
,便于实现消息的顺序性和一致性。自定义分区器
消息可靠性Acks
在 Kafka 中,acks
即 Acknowledgments,是生产者用来控制消息发送确认机制的一个重要参数,它决定了生产者在发送消息后需要从 Kafka 集群收到多少个确认信号,以此来确定消息是否被成功处理,从而在一定程度上保证消息传递的可靠性。根据不同的场景,Kafka提供了3种应答处理,可以根据需要进行配置。
- acks=0:生产者对象将数据通过网络客户端发送到网络数据流中即视为发送成功,不会等待任何确认。在传输过程中如果网络出现了问题,那么数据就丢失了。数据的可靠性没有任何的保证。
- acks=1:生产者发送消息后,会等待首领副本确认收到消息。一旦首领副本将消息写入本地日志文件,就会向生产者发送确认。此时生产者认为消息发送成功,但
消息可能还未同步到其他跟随者副本
。 - acks=all 或 - 1:生产者发送消息后,会等待所有同步副本(ISR)都确认收到消息。只有当所有同步副本都将消息写入本地日志文件后,生产者才会收到确认,这时才认为消息发送成功。
实现原理
Request里面包含了生产者配置的acks
等相关参数。Kafka 集群中的首领副本在收到消息后,会根据acks
参数的设置进行相应的处理。如果acks
为 0,首领副本不会发送任何确认;如果acks
为 1,首领副本在将消息写入本地日志文件后,会通过网络向生产者发送一个确认;如果acks
为all
或-1
,首领副本会等待所有同步副本都将消息写入本地日志文件后,再向生产者发送确认。
消息发送回调
Kafka发送数据时,可以同时传递回调对象(Callback)用于对数据的发送结果进行对应处理。
Kafka发送数据时,底层的实现类似于生产者消费者模式。对应的,底层会由主线程作为生产者向消息累加器中放数据,而数据发送线程Sender会从消息累加器中获取数据进行发送。如果Kafka通过主线程将一条数据放入到消息累加器后,需等待数据发送操作的应答状态,才能发送一下条数据的场合,我们就称之为同步发送
。所以这里的所谓同步,就是生产数据的线程需要等待发送线程的应答(响应)结果。反之则为异步发送。
Kafka的代码实现上,采用的是JDK1.5增加的JUC并发编程的Future接口的get方法实现。
当调用get方法时,即会阻塞同步等待数据发送结果的返回。
消息重复与乱序问题
消息重复:由于网络或服务节点的故障,Kafka在传输数据时,可能会导致数据丢失,当我们配置的acks不为0时,首领副本会发送消息确认给生产者,当首领副本发送给生产者的确认丢失时,生产者会以为kafka没有收到数据,在等待确认超时后,会对超时的请求数据进行重试(retry)操作(如果配置了会进行重试)。通过重试操作尝试将数据再次发送给Kafka,那么Kafka就又收到了数据,而这两条数据是一样的,也就是说,导致了数据的重复。
消息乱序:重试(retry)功能除了可能会导致消息重复以外,还可能会导致消息乱序。假设有两个数据需要发送,第一个数据因为某种故障导致发送失败,但第二个数据发送成功,然后第一个数据再进行重试操作,数据的顺序已经被打乱了。
数据幂等性
为了解决Kafka传输数据时,所产生的数据重复和乱序问题,Kafka引入了幂等性操作。Kafka 数据幂等是指无论生产者向 Kafka 发送多少次相同的消息,Kafka 都只会将其持久化一次,而且是以正确的顺序存储在 Topic 的各个 Partition 中,确保消息在传递过程中不会因重复发送而导致数据不一致或重复处理。
在 Kafka 生产者客户端配置中,将enable.idempotence
属性设置为true
,即可开启消息幂等功能。当开启幂等性后,Kafka 生产者会自动调整一些相关参数的默认值,以确保幂等性的正确实现,但最好还是将相关的参数进行正确的配置
。
配置项 | 配置值 | 说明 |
---|---|---|
enable.idempotence | true | 开启幂等性 |
max.in.flight.requests.per.connection | 小于等于5 | 每个连接的在途请求数,不能大于5,取值范围为[1,5] |
acks | all(-1) | 确认应答,固定值,不能修改 |
retries | >0 | 重试次数,推荐使用Int最大值 |
数据幂等实现原理
Kafka 实现消息幂等的关键在于为每个生产者会话
分配一个唯一的 PID(Producer ID),并为每个消息分配一个单调递增的序列号(seqnum)。当生产者发送消息时,会将 PID 和序列号等信息一同发送给 Kafka 集群。Broker中会给每一个分区记录生产者的生产状态
:采用队列的方式缓存最近的5个批次数据(所以也要求生产者的在途请求缓冲区不要大于5)。队列中的数据按照seqnum进行升序排列(这里的数字5是经过压力测试,均衡空间效率和时间效率所得到的值)。
- 如果当前新的请求批次数据在缓存的5个旧的批次中存在相同的,那么说明有重复。
- 如果当前的请求批次数据在缓存中没有相同的,那么判断当前新的请求批次的序列号是否为缓存的最后一个批次的序列号加1,如果不是,那么说明消息是乱序的。
从实现原理上不难看出,幂等性仅做到单分区上的幂等性
,即单分区消息有序不重复,且无法实现跨会话的幂等性
,生产者掉线恢复后,是新的会话,PID不同。
对于幂等性在这两方面的不足,可采取Kafka事务的方式解决。
Kafka事务
Kafka的事务机制,满足事务的ACID属性,Kafka的事务比较独特,涉及到 Transaction Producer 和 Transaction Consumer,两者需配合使用才能满足事务的ACID属性
(如果你使用非 Transaction 的 Consumer 来消费 Transaction Producer 生产的消息就丢失了事务 ACID 的支持),通过事务机制,Kafka可以实现对多个 Topic 的多个 Partition 的原子性的写入,要使用Kafka事务,还需配置生产者ID(transactional.id
),它在整个集群中需要是唯一的。
Kafka的事务机制,在底层依赖于幂等生产者,幂等生产者是 Kafka事务的必要不充分条件,开启Kafka事务时,Kafka会自动开启幂等生产者。
开启事务机制后,Kafka 会将transactional.id
与一个Producer ID进行关联,即使跨会话,也能够获取到一样的PID,这解决了跨会话问题,而跨分区问题就需要引入分布式事务来解决了。
事务的使用方式如下:
1 | //初始化事务 |
Kafka事务的实现原理
为支持事务机制,Kafka引入了两个新的组件:Transaction Coordinator(事务协调器) 和 Transaction Log。
Transaction Coordinator是运行在每个 Kafka Broker上的一个模块,Transaction Log是 Kafka 的一个内部 topic(名为 __transaction_state),Transaction Log有多个分区,每个分区都会有一个leader副本,该 leade对应哪个 Kafka Broker,哪个 broker 上的Transaction Coordinator就负责对这些分区的写操作。每个 transactional.id 通过 hash 都对应到了Transaction Log的一个分区,所以每个 transactional.id 都有且仅有一个Transaction Coordinator负责。
由于 Transaction Coordinator 是 Kafka Broker 内部的一个模块,而 Transaction Log 是 Kafka的一个内部 topic, 所以 Kafka可以通过内部的复制协议和选举机制,来确保 transaction coordinator 的可用性和 transaction state的持久性
Transaction Log内部存储的只是事务的最新状态和其相关的元数据信息,Producer生产的原始消息,仍然是只存储在原来的Topic中。事务的状态有:Ongoing、Prepare commit、和Completed。
同时还将底层日志文件的格式进行了扩展,日志中除了普通的消息,还有一种消息专门用来标志事务的状态,它就是控制消息Control Batch,由一系列的Transaction Marker以及一些元数据组成,共有两种类型:commit 和 abort,分别用来表征事务已经成功提交或已经被成功终止。
Kafka事务的执行流程如下:
Producer通过 initTransactions 将 transactional.id 注册到 Transaction Coordinator:该操作对每个 Producer Session 只执行一次,此时Transaction Coordinator会关闭所有有相同 transactional.id 且处于 pending 状态的事务,同时也会递增 epoch 来屏蔽僵尸生产者 (zombie producers)
Producer通过 beginTransaction开启事务,并通过发送消息到目标Topic:此时消息对应的 Partition 会首先被注册到 Transaction Coordinator,然后 Producer 按照正常流程发送消息到目标 Topic
Producer通过 commitTransaction提交事务或通过abortTransaction回滚事务:此时会向 Transaction Coordinator 提交请求,开始两阶段提交协议(分布式事务)
在两阶段提交协议的第一阶段,Transaction Coordinator更新事务状态为Prepare commit,并将该状态持久化到 Transaction Log 中;
在两阶段提交协议的第二阶段,Transaction Coordinator 首先写 Transaction Marker 标记到目标 Topic 的目标 Partition,再更新事务状态为 commit或 abort,并将该状态持久化到 Transaction Log 中。
僵尸生产者是指那些由于各种原因(如进程崩溃但未正确清理资源、网络长时间中断等)导致与Transaction Coordinator失去联系,但在系统中仍被认为处于活动状态的生产者。这些僵尸生产者可能会持有一些资源或处于未完成的事务状态,对系统的正常运行造成潜在威胁。
epoch
(纪元)是一个单调递增的版本号,用于标识生产者与Transaction Coordinator之间的交互状态。当Transaction Coordinator检测到有新的生产者使用相同的transactional.id注册时,除了关闭pending事务外,还会递增epoch的值。在后续的事务操作中,Transaction Coordinator会将poch作为一个重要的标识来区分不同版本的生产者。当收到来自生产者的事务请求时,它会检查请求中的epoch值与自己记录的epoch值是否一致。如果不一致,说明该生产者可能是一个僵尸生产者,Transaction Coordinator会拒绝其请求。
事务隔离性
前面有提到Kafka的事务涉及到Transaction Producer 和 Transaction Consumer,两者需配合使用才能满足事务的ACID属性
,Kafka消费者消费消息时可以指定具体的读隔离级别,当指定使用 read_committed 隔离级别时,在内部会使用存储在目标 Topic-Partition 中的事务控制消息,来过滤掉没有提交的消息,包括回滚的消息和尚未提交的消息。Kafka消费者消费消息时也可以指定使用 read_uncommitted 隔离级别,此时目标 Topic-Partition 中的所有消息都会被返回,不会进行过滤。
需要注意的是,过滤消息时,Kafka 消费者不需要跟 Transactional Coordinator 进行交互,因为 Topic中存储的消息,包括正常的数据消息和控制消息,包含了足够的元数据信息来支持消息过滤
用户消息存储
数据由生产者Producer发送给Kafka集群,当Kafka接收到数据后,会将数据写入本地文件中进行存储。这里也涉及到一些细节。
Acks与副本数量
前面有提到过,当acks为-1(all)时,生产者发送消息后,会等待所有同步副本(ISR)都确认收到消息,但是如果当前ISR列表中只有一个Broker存在,那么此时的消息可靠性与我们期待的就有所差异了
,此时的消息可靠性与acks为1时一致,所以我们可以通过配置min.insync.replicas
参数对消息可靠性进行约束,只有当该消息成功写入至少min.insync.replicas
个副本后,生产者才会收到消息已成功提交的确认。
日志文件滚动
数据存储到文件中,如果数据文件太大,对于查询性能是会有很大影响的,所以副本数据文件并不是一个完整的大的数据文件,而是根据某些条件分成很多的小文件,每个小文件我们称之为文件段。日志文件滚动是指当 Kafka 的日志文件达到一定条件时,自动关闭当前日志文件,并创建一个新的日志文件来继续存储新的消息数据的过程。将文件进行分割的条件有多个,满足其中任意一个条件便会进行分割。
文件大小:可通过
log.segment.bytes
参数配置,默认值为1G。时间间隔:可通过
log.roll.hours
参数配置,默认值为7天。
日志清理
Kafka 的日志存储在一个个日志段文件中,日志清理就是定期删除那些不再需要的日志段文件,以释放磁盘空间并确保日志数据量在可控范围内,可通过log.cleaner.enable
开启日志清理功能。日志清理可以基于时间log.retention.hours(或其他)
也可以基于文件大小(单个分区的大小)log.retention.bytes
,Kafka提供两种日志清理方式:delete和compact,可通过log.cleanup.policy
进行配置。
- delete:将过期数据删除
- compact:日志压缩,按照消息的key进行整合,将相同key的数据,只保留最后一个
数据存储格式
Kafka系统早期设计的目的就是日志数据的采集和传输,所以数据是使用.log作为扩展名的日志文件进行保存的。文件名为长度20位的数字字符串,数字含义为当前日志文件的第一批数据的基础偏移量。
同时为了方便查找,Kafka在提供日志文件保存数据的同时,还提供了用于数据定位的索引文件,kafka的索引数据称之为稀疏索引,因为Kafka的索引文件中的索引信息是不完整的,只记录了一部分。默认情况下,4kb的日志数据才会记录一次索引,可通过log.index.interval.bytes
进行配置。
Kafka提供了两种索引,一种是基于偏移量的,一种是基于时间戳的。
数据刷盘
当我们把数据发送给Kafka之后,其实数据在Kafka Broker的操作系统的PageCache(页缓冲)里面,并没有立刻刷到磁盘上。如果操作系统挂了,数据就丢失了。一方面,应用程序可以调用fsync这个系统调用来强制刷盘,另一方面,操作系统有后台线程,定时刷盘。
Kafka提供了参数进行数据刷盘的控制,可以基于时间定期调用fsync也可以基于数据的条数来调用刷盘。具体的配置参数请见附录-数据存储相关配置,在此处就不介绍了。Kafka官方不建议通过配置参数来强制写盘,数据的可靠性应该通过Replication来保证
,否则会对Kafka集群的吞吐量有影响。
数据一致性
kafka通过副本机制实现了分布式系统的容错,但副本之间的数据同步采用的是异步的方式进行,Follower 副本与 Leader 副本的数据可能存在差异,当进行故障转移后,如果不进行额外的处理,可能会带来数据不一致的问题(消费者能访问到的数据是有差异的)。
为了提升数据的一致性,Kafka引入了高水位(High Watermark,HW)机制,它是一个与 Kafka 分区副本相关的概念,用于标识一个分区中已被所有同步副本(In-Sync Replicas,ISR)完全复制的消息的偏移量边界。简单来说,水位以下的消息是已经被所有同步副本成功存储并且可以安全地被消费者消费的消息,而水位以上的消息则可能还未完全在所有副本中同步完成。
消费者在拉取消息时,只能获取到水位以下的消息。水位以下的消息是已经被所有同步副本成功存储的消息,这样即使系统发生故障进行故障转移后,也不会带来数据不一致的问题。
消费者Consumer
Kafka消费者的架构图如下:
图源网络
基本流程
- 具有相同
group.id
的消费者Consumer属于同一个消费者组Consumer Group - 一个Consumer Group中多个Consumer会对订阅的每个Topic的分区进行负载均衡的消费(通过Broker上的一个组件消费者组协调器Group Coordinator按照一定的分配策略将分区分配给不同的消费者)
- 对于一个Topic的不同分区而言,一个分区同时只能被同一个Consumer Group内的一个Consumer消费(能够被多个Consumer Group消费,但对于一个Consumer Group而言只能有一个Consumer消费它),一个Consumer可以同时消费一个Topic的多个分区
- 消费者主动向 Kafka 集群发送请求来拉取消息
- 消费消息后会将当前标注的消费位移信息以消息的方式提交到位移主题(_consumer_offsets,一个内部Topic)中记录
消费者组中的消费者数量最好不要超出主题分区的数量,这样会有消费者无法进行消费,造成了资源的浪费
当消费者加入Consumer Group时,它会向 Kafka集群中的任意一个 Broker 发送 JoinGroup 请求,该 Broker 会将请求转发给对应的 Group Coordinator, Group Coordinator会从成员中选举出一个领导者(Leader),领导者负责与协调器进行交互,执行分区分配等操作。
在消费者组的运行过程中,当出现成员加入或离开、主题分区数量发生变化等情况时,协调器会触发再平衡操作。
消费者偏移量
消费者偏移量是指消费者在消费一个分区中的消息时,记录其已经消费到的消息位置的一个指标。它表示消费者在该分区的消息流中的位置,通过这个偏移量,消费者可以在下次消费时从上次停止的位置继续,从而保证消息不会被重复消费或遗漏。
每个消费者组在 Kafka 中都有自己独立的偏移量管理机制,不同消费者组对同一个主题的相同分区会维护各自不同的偏移量
。这意味着即使多个消费者组都订阅了相同的主题,它们在消费该主题的消息时,各自的消费进度和位置也是不同的,互不干扰。
起始偏移量
起始偏移量是指消费者首次开始消费一个分区的消息时(位移不存在时),从该分区的消息序列中选择的起始位置。它确定了消费者从哪里开始读取消息,通过auto.offset.reset
进行配置。
latest:消费者将从分区的最新消息开始消费,即从分区的末尾开始,此时消费者将错过在其订阅之前已经产生的消息
earliest:消费者将从分区的最早可用偏移量开始消费消息
none:不自动设置起始偏移量,而是需要用户通过代码或其他方式手动指定起始偏移量
偏移量提交
- 自动提交:Kafka 消费者客户端提供了自动提交偏移量的功能。通过配置参数
enable.auto.commit
设置为true
,消费者会在一定的时间间隔(auto.commit.interval.ms
参数进行配置)或消息数量达到一定阈值后,自动将当前已消费消息的偏移量提交给 Kafka 集群。 - 手动提交:参数
enable.auto.commit
设置为false
。手动提交可以更灵活地控制偏移量提交的时机,适合在一些对消息处理结果有严格要求的场景中使用。在 Kafka 的 Java 客户端中,可以通过commitSync()
方法进行同步手动提交,该方法会阻塞当前线程,直到偏移量提交成功;也可以使用commitAsync()
方法进行异步手动提交,该方法不会阻塞当前线程,提交操作在后台进行。
若消费者消息消费完成,但未及时进行提交,此时消费者出现故障,之后该条消息还会被进行消费,也就是会有重复消费的问题。即使采用最安全的手动同步提交的方式,在某些情况也会出现重复消费的问题,比如执行业务逻辑成功后,进行手动提交时故障了。想要做到消息不重复消费(消息幂等),需要对业务进行改造支持。
之前提到消费者消费消息后会将当前标注的消费位移信息以消息的方式提交到位移主题_consumer_offsets中,该主题的分区数量可以通过offsets.topic.num.partitions
设置,默认为50,Kafka会根据group.id的哈希值对分区数量取模来计算偏移量提交到哪一个分区。
自动提交的机制可能会导致消息丢失。如果消息处理逻辑复杂,处理一条消息需要花费较长时间,而自动提交偏移量的时间间隔较短,可能在消息还未处理完时,偏移量就已经被提交了。一旦消费者在处理过程中出现异常崩溃或重启,那么尚未处理完的消息就会被认为已经消费成功,从而导致消息丢失。
扩展
Kafka集群脑裂
当集群的Controller节点并没有宕掉,而是因为网络的抖动,导致和ZooKeeper之间的会话超时,那么此时,整个Kafka集群就会认为之前的Controller已经下线从而选举出新的Controller,而之前的Controller的网络又恢复了,以为自己还是Controller,继续管理整个集群,那么此时,整个Kafka集群就有两个controller进行管理,这种情况,我们称之为脑裂现象。
为了解决这个问题,Kafka通过纪元epoch的概念来解决,当选举新的时Controller时,这个epoch会增1,当旧的Controller发现集群中有新任Controller的时候(通过比对自己的epoch),那么它就会让自己变成一个普通的Broker。
摆脱Zookeeper(KRaft)
Kafka作为一种高吞吐量的分布式发布订阅消息系统 ,在消息应用中广泛使用,尤其在需要实时数据处理和应用程序活动跟踪的场景,kafka已成为首选服务。在Kafka2.8之前,Kafka强依赖Zookeeper来负责集群元数据的管理,这也导致当Zookeeper集群性能发生抖动时,Kafka的性能也会收到很大的影响。2.8版本之后,kafka开始提供KRaft(Kafka Raft)模式,开始去除对Zookeeper的依赖。
目前的Kafka同时支持Zookeeper和KRaft两种方式来进行集群的管理,目前生产环境肯定还是Zookeeper用的多,官方预计会在Kafka 4.0中移除ZooKeeper,到时候再学KRaft吧。。。
附录
对于配置项来说,不同的版本默认值可能会有变化,建议对核心配置项都进行显示的配置。
数据传输语义
传输语义 | 说明 | 例子 |
---|---|---|
at most once | 最多一次:不管是否能接收到,数据最多只传一次。这样数据可能会丢失 | acks=0 |
at least once | 最少一次:如果未接收到成功响应会重新发送,,所以会发送多次,直到收到为止,有可能出现数据重复 | acks=1 |
exactly once | 精准一次:消息只会一次,不会丢,也不会重复 | 幂等+事务+acks=-1 |
生产者相关配置(部分)
参数名 | 参数作用 | 类型 | 默认值 | 推荐值 |
---|---|---|---|---|
bootstrap.servers | 集群地址,格式为: brokerIP1:端口号,brokerIP2:端口号 | 必须 | ||
key.serializer | 对生产数据Key进行序列化的类完整名称 | 必须 | Kafka提供的字符串序列化类:StringSerializer | |
value.serializer | 对生产数据Value进行序列化的类完整名称 | 必须 | Kafka提供的字符串序列化类:StringSerializer | |
interceptor.classes | 拦截器类名,多个用逗号隔开 | 可选 | ||
batch.size | 数据批次字节大小。此大小会和数据最大估计值进行比较,取大值。估值=61+21+(keySize+1+valueSize+1+1) | 可选 | 16K | |
retries | 重试次数 | 可选 | 整型最大值 | 0或整型最大值 |
request.timeout.ms | 请求超时时间 | 可选 | 30s | |
linger.ms | 数据批次在缓冲区中停留时间 | 可选 | ||
acks | 请求应答类型:all(-1), 0, 1 | 可选 | all(-1) | 根据数据场景进行设置 |
retry.backoff.ms | 两次重试之间的时间间隔 | 可选 | 100ms | |
buffer.memory | 数据收集器缓冲区内存大小 | 可选 | 32M | 64M |
max.in.flight.requests.per.connection | 每个节点连接的最大同时处理请求的数量 | 可选 | 5 | 小于等于5 |
enable.idempotence | 幂等性 | 可选 | true | 根据数据场景进行设置 |
partitioner.ignore.keys | 是否放弃使用数据key选择分区 | 可选 | false | |
partitioner.class | 分区器类名 | 可选 | null |
数据存储相关配置(部分)
参数名 | 参数作用 | 默认值 |
---|---|---|
min.insync.replicas | 最小同步副本数量 | 1 |
log.segment.bytes | 文件段字节数据大小限制 | 1G = 1024 * 1024 * 1024 byte |
log.roll.hours | 文件段强制滚动时间阈值 | 7天 = 24 * 7 * 60 * 60 * 1000L ms |
log.flush.interval.messages | 满足刷写日志文件的数据条数 | Long.MaxValue |
log.flush.interval.ms | 满足刷写日志文件的时间周期 | Long.MaxValue |
log.index.interval.bytes | 刷写索引文件的字节数 | 4 * 1024 |
replica.lag.time.max.ms | 副本延迟同步时间 | 30s |
消费者相关配置(部分)
参数名 | 参数作用 | 类型 | 默认值 | 推荐值 |
---|---|---|---|---|
bootstrap.servers | 集群地址,格式为: brokerIP1:端口号,brokerIP2:端口号 | 必须 | ||
key.deserializer | 对数据Key进行反序列化的类完整名称 | 必须 | Kafka提供的字符串反序列化类:StringSerializer | |
value.deserializer | 对数据Value进行反序列化的类完整名称 | 必须 | Kafka提供的字符串反序列化类:ValueSerializer | |
group.id | 消费者组ID,用于标识完整的消费场景,一个组中可以包含多个不同的消费者对象。 | 必须 | ||
auto.offset.reset | 指定当消费者在启动或位移不存在时如何处理偏移量的重置 | 可选 | latest:如果未找到已提交的偏移量或位移无效,消费者将从分区的最新偏移量开始消费消息,即只消费在消费者启动之后产生的消息 | |
group.instance.id | 消费者实例ID,如果指定,那么在消费者组中使用此ID作为memberId前缀 | 可选 | ||
partition.assignment.strategy | 分区分配策略 | 可选 | ||
enable.auto.commit | 启用偏移量自动提交 | 可选 | true | |
auto.commit.interval.ms | 自动提交周期 | 可选 | 5000ms | |
fetch.max.bytes | 消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响 | 可选 | 52428800(50MB) | |
offsets.topic.num.partitions | 偏移量消费主题分区数 | 可选 | 50 | |
isolation.level | 消息读取隔离级别 | 可选 | read_uncommitted |
部分内容转载自尚硅谷