AI 助理
备案 控制台
开发者社区 微服务 文章 正文

深入解析 Kafka Exactly Once 语义设计 & 实现

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 本篇文章主要介绍 Kafka 如何在流计算场景下保证端到端的 Exactly Once 语义,通过其架构上的设计以及源码分析帮助读者理解背后的实现原理。什么是 Exactly-Once?消息的投递语义主要分为三种:At Most Once: 消息投递至多一次,可能会丢但不会出现重复。At Least Once: 消息投递至少一次,可能会出现重复但不会丢。Exactly Once: 消息投递正好一次

本篇文章主要介绍 Kafka 如何在流计算场景下保证端到端的 Exactly Once 语义,通过其架构上的设计以及源码分析帮助读者理解背后的实现原理。


什么是 Exactly-Once?

消息的投递语义主要分为三种:

  • At Most Once: 消息投递至多一次,可能会丢但不会出现重复。
  • At Least Once: 消息投递至少一次,可能会出现重复但不会丢。
  • Exactly Once: 消息投递正好一次,不会出现重复也不会丢。

Kafka 实现的主要是针对流计算场景下的 Exactly Once 能力,当中有个重要的限制条件,数据必须取自 Kafka 且计算结果也必须保存到 Kafka。如果流数据的状态存储依赖的是外部系统(常见的传统消息队列使用场景),则可能无法在系统出现故障时保证 Exactly Once。比如消费端消费一批数据后,在尚未提交消费位点的情况下崩溃重启,此时可能会消费到重复的消息(当然客户端可以自己实现幂等),从而造成错误的计算结果。

所以注意,Kafka 并未实现一般意义上消息队列服务水平中的 Exactly Once 语义(目前市面上也并未有其他消息产品实现),以下所提到的 Exactly Once 也均是基于流计算场景出发。

为什么流计算场景需要 Exactly-Once 能力?

Kafka 实现 Exactly-Once 语义的主要出发点是为了应对流计算的 “Consume-Process-Produce” 任务,其主要流程如下:

  1. 流处理应用(e.g. Kafka Streams)从源主题中消费到一条消息 A。
  2. 触发函数对消息 A 进行处理并更新其状态。
  3. 生产一至多条消息 B1 - Bn 至目标主题。
  4. 等待来自目标主题所属 Broker 的响应。
  5. 向源主题主动提交消息A的位点,表示该消息已被处理,并等待响应。

可以看到在 “Consume-Process-Produce” 任务中,流处理应用会同时作为消费者和生产者对流数据进行处理,而 At Least Once 的语义可能会造成重复消息的出现,比如以下两种常见场景:

  1. 当流处理应用成功发送消息并等待来自目标主题所属 Broker 的响应时,本该抵达的  ACK 因网络问题而丢失,导致应用认为消息发送失败而进行重试,发送成功后服务端会出现重复消息。

  1. 流处理应用发送消息至目标主题并成功获得响应,但在即将把位点(消费进度)提交至源主题前 应用发生崩溃。应用重启后,重新从源主题消费到先前已处理过的消息,再次发送该条消息成功后,将会导致服务端出现重复消息。

根据以上示例我们可以得知,重复消息在 At-least-once随时都有可能出现,由于在流计算场景中,我们不会希望出现数据被重复处理的情况(比如网站浏览数被多算或少算一个),所以如何能够去除重复数据以达成精确一次(Exactly Once)投递至关重要。

Kafka 如何对消息进行去重?

要对消息进行去重,最直接的办法便是对每条消息进行唯一编号并在服务端对这些编号进行维护,每当有新消息便遍历检查是否存在重复的消息编号,如果有则拒绝写入。然而,这样的实现在消息量较大时,效率非常低。

一种更高效的方案是对每个生产者进行唯一编号(Producer ID, PID)并让其维护消息序列号,该序列号会随着消息的发送递增。这种方式同样可对消息进行唯一标识,且服务端不再需要对所有消息编号进行存储。取而代之的是,它只需维护各 PID 及其对应的最高消息序列号映射,以验证新消息的序列号是否有序和没有重复。

另一个问题是生产者应以全局(集群)还是分区维度维护其序列号?试想一个客户端可能发消息至多个分区,如果以全局的方式维护序列号,对客户端来说实现较为简单,但服务端就需较高成本来判断消息是否有序。如果以分区维度维护,则能够让服务端更高效地进行消息的有序验证(合法序列号 = 生产者已发送至该分区的消息最高序列号 + 1)。

总的来说,利用 PID + 以分区维度维护的消息序列号,服务端便可对接收到的消息进行如下判断:

  • 如果消息序列号小于 PID 在当前分区所对应的最高序列号,说明消息出现 重复,放弃写入至日志。
  • 如果消息序列号大于 PID 在当前分区所对应的最高序列号 + 1,说明消息出现 乱序,抛出异常信息并拒绝写入。

补充一点,针对每个 Topic-Partition,Broker 会在其内存中维护每个 PID 及序列号(成功写入日志的消息)的映射关系,然而这样在 Broker 重启时,将会需要读取所有日志来恢复该状态。因此, Kafka 引入 PID Snapshot 机制,定期针对该映射关系和日志位点做快照,以加速状态恢复效率。

 

然而,这样就能百分百保证消息有序了吗?

根据 Kafka 的设置,PID 对每个分区可同时发送最多5个未 ACK 的请求(服务端规定最多只能缓存每个 PID 在每个 Topic-Partition 最近的 5 个请求)。试想下如果我们一次发送 6 个请求,除了第一条其余都返回 ACK 成功。此时客户端会对第一条消息进行重试,但由于服务端仅能缓存 5 条最近的消息的缘故,所以其无法对重试的第一条消息的序列号判重。

为此,Kafka 实现了以下机制来对有序性做保证:

  • 生产者不可一次发送超过服务端允许的未 ACK 请求数(5 个)。
  • 当服务端发现消息序列号大于预期,代表消息出现乱序,则将后续的乱序请求一并拒绝并返回异常进行重试(e.g. 第一个请求发送失败/超时,则后续同时发送的其他请求也需跟着重试)。
  • 当出现重试请求时,会按照序列号大小将请求重新放到发送队列中的正确位置。
  • 客户端发送重试请求前,会检查当前的重试请求是否是之前未发送成功批次的首个请求,如果不是,则需等待首个请求被重新加入到发送队列中(等于最多允许发送的未 ACK 请求数动态减少到 1)。

这便是 Kafka 为解决因重试导致的消息重复问题所引入的幂等生产者(Idempotent Producer)基本思想,其主要针对以下场景:

  1. 服务端接收消息并保存后,发送 ACK 失败,生产者认为该消息发送失败而进行重试导致消息出现重复。
  2. 消息 A 发送失败,消息 B 发送成功,消息 A 经过重试后发送成功所产生的消息乱序问题。

然而,幂等生产者仅能保证在单会话 & 单分区的 Exactly Once 语义,它仍然没有解决在流计算场景下的一些常见问题:

  1. 流处理应用在收到消息 ACK 前重启,此时服务端无法分辨前后请求来自同个生产者,给重启后的应用分配了新的 PID,故无法正确判断其对应的消息序列号,进而导致无法在消息重投情况下做去重。
  2. 流处理应用在消费一批消息并处理后,可能会将计算结果发往不同分区,我们不希望出现部分发送成功,部分失败的状态。
  3. 流处理应用成功发送消息至目标 Topic 且返回 ACK 成功,但在向源 Topic 提交位点前应用发生崩溃而重启。这种情况下,应用重启后将会消费到重复消息,导致计算处理结果出现错误。

综上所述,可以总结出幂等生产者在保证 Exactly Once 机制上的两大缺陷:1. 应用崩溃重启后,无法保证新会话发送的消息依旧能够保持幂等;2. 无法保证多分区/多个读写操作的原子性。为此,Kafka 引入了事务性机制来加强对 Exactly Once 语义的保证,接下来我们就来看看 Kafka 是如何通过事务来一一解决这些问题的。

Kafka 如何保证跨会话的幂等性写入?

由于幂等生产者的的 PID 会随着每次应用的重启而更新,导致跨会话的情况下,服务端无法定位生产者重启前所发送的消息最高序列号为消息做去重。

为此,Kafka 引入了稳定的唯一 ID —— Transactional ID(事务 ID)来标识一组事务的操作,在跨会话的情况下,只要使用相同的事务 ID,即可接续之前的事务状态继续操作(e.g. 故障恢复后主动回滚上次未完成的事务)。而不同于 PID 的是,事务 ID 是通过用户提供的,而 PID 则是由服务端进行分配。

这里可能会出现问题,如果当前多个生产者使用了相同的事务 ID(e.g. 生产者假死后复活,然而已有新的生产者接续之前未完成的事务),是否会发生脑裂问题(多个生产者可能针对同一事务进行操作)呢?

为此,Kafka 实现了个防护(fencing)机制,引入 epoch 的概念来隔离掉僵尸生产者,服务端在生产者发送事务初始化请求时,便会记录该事务 ID 所对应的最新 epoch 并连同 PID 一同返回给生产者,如果有来自相同事务 ID 的初始化请求便将其对应的 epoch 加 1。这样便能够隔离掉来自较早创建(epoch 较小)的生产者请求。

Transactional ID + 生产者 epoch 解决了幂等生产者无法保证跨会话幂等写入的问题,但同时还需配合服务端针对事务性机制的实现,才可确保新会话启动后,任何先前未完成的操作都已提交/回滚,处于一个“干净”的状态。这就涉及到我们再来要讨论的 Kafka 在多分区写入或有多个读写操作的情况下,如何保证事务的原子性。

Kafka 如何保证多分区写入/多个读写操作的原子性?

Kafka 为了解决多分区写入/多个读写操作的原子性问题,在服务端 Broker 间引入了个关键新角色 —— Transaction Coordinator(事务协调者)。事务协调者主要的职责在于管理事务相关的元数据(e.g. 事务 ID、PID、epoch 等)。

当事务开始后,每当生产者要向一个新 Topic-Partition 发送消息,便会向协调者同步自己要操作哪个 Topic-Partition,之后才会正式向对应分区发消息,待事务执行完后再告诉协调者事务的执行结果。

让我们继续分析,一个事务内所发送的消息具体是如何被提交/回滚的?我们知道 Kafka 有个分区 Leader 的角色(负责读/写该分区消息的 Broker),一个事务内的消息都将被发往各分区 Leader。如果事务完成后,由生产者分别向各个分区 Leader 告知事务消息可提交/回滚,这样对于客户端的压力是否有些大?

如同前面所说,生产者会在正式向各分区 Leader 发送消息前,会同步自己要将消息发往哪个 Topic-Partition 给协调者,事务执行完后再告知协调者事务的执行结果,这说明协调者掌握了事务所涉及的关键信息,并具备了操作整个事务的主动权。因此,当协调者拿到事务的执行结果后,便可由它来负责通知各分区 Leader 要对事务消息进行提交/回滚。

接下来,同一事务的消息在还未被提交/回滚前会以什么形式待在服务端呢?如果放在内存肯定是不现实的,一个长事务可能会包含很多消息,造成内存压力过大,所以必定会需要对这些未提交/回滚的消息进行持久化。但如果将事务消息存放到磁盘,进行回滚操作的过程又可能会涉及到多次磁盘的读写,效率极低。

为此,Kafka 引入了新的消息类型 —— Control Messages(控制消息),简单来说就是事务执行结果的标记,协调者会将控制消息发往各分区 Leader 来通知当前事务所涉及的消息应该被提交/回滚。控制消息会和普通消息一同持久化,客户端则会连同控制消息一块消费,并通过它得到事务的执行结果。

到这里,我们可以知道事务协调者这个角色的重要性,然而协调者也有可能出现挂掉的情况,此时协调者的角色可能会随着服务端的 HA 机制(Kafka 多副本机制)转移到其他 Broker,这样要如何保证事务状态信息不会在转移过后丢失呢?

为此,Kafka 引入了个新的内部 Topic (不会被客户端消费)—— __transaction_state,该 Topic 是个 Compacted Topic(简单来说就是 KV 存储),协调者所有的元数据信息都会被持久化到其对应的日志。加上 Kafka 本身的多副本机制,保证了协调者在故障恢复后能够正确恢复状态。

到这里就结束了吗?别忘了我们还需消费端的配合才能够达成端到端的 Exactly Once。

在事务尚未完成的情况下,消息照理来说不应被客户端读取到,因此其中一个可行的办法是在客户端先缓存尚未提交/回滚的事务消息,待消费到控制消息后再判断是否允许客户端获取到数据。然而,这对客户端内存的压力较大,不太合适。

如果是通过服务端来保证消费者能够获取到所有已提交的数据呢?听起来似乎更可行些,但一长事务的日志头尾位点涵盖范围可能会包含其他短事务和其他类型的消息,该长事务提交后,服务端就需要读取出完整的日志段并过滤掉不属于它的数据,或者通过磁盘随机读访问长事务所涉及的日志段。这样对读取性能影响是较大的。

因此,Kafka 选择了相对保守的方案,引入 Last Stable Offset(LSO)的概念在客户端实现了 read_committed(读已提交)策略,顾名思义,在该位点之前的消息都是稳定的、已完成的事务或者其他类型t的消息,它代表的是第一个未完成事务的首消息位点,当客户端设置 read_committed 模式后,服务端便只会读取到 LSO 之前的消息。

LSO 的缺点在于该位点后可能存在其他已提交的事务或者其他类型的消息,但受限于它们的位点之前有某个尚在进行中的事务(e.g. 未提交的长事务所涉及的日志段有可能包含数个已提交的短事务),所以无法被返回给客户端,但也因此减轻了客户端的压力(降低 OOM 风险)和确保了服务端的读取效率。

除了利用 LSO 过滤掉未完成的事务,是否有办法高效地直接过滤掉已回滚的事务?虽然有控制消息可以判断事务是否回滚,但如果每次都需要在客户端缓存数据,等消费到控制消息再确认是否丢弃,是否还是效率低了些?

Kafka 为了再进一步减轻客户端的缓存压力,服务端会记录每个分区的已回滚的事务所涉及的日志起止位点,并持久化到后缀名为 .txnindex 的文件中(为了快速恢复)。这样客户端在拉取数据时,服务端会根据其拉取的位点所涉及到的回滚事务,返回一个已回滚事务集合,供客户端提早过滤掉已回滚的消息。

Kafka 如何在流计算场景实现 Exactly-once 机制?

到这里可以大致总结出,Kafka 通过引入幂等生产者 + 事务性机制解决了跨会话的幂等写入和跨分区/多个读写操作的原子性。接下来我们综合这些概念,来看看 Kafka 在流计算场景实现端到端 Exactly-once 的主要流程。

整体流程

我们先介绍下大致的运行流程,再深入各个步骤的细节实现:

  1. 事务初始化
  1. 生产者寻找 事务协调者(Transactional Coordinator)
  2. 生产者向事务协调者获取  PID
  1. 事务开始
  2. Consume-Process-Produce:
  1. 消费者(流处理应用同时作为消费者和生产者)从源 Topic 消费消息并做处理。
  2. 生产者同步消息所要发往的  Topic-Partition 信息给事务协调者。
  3. 生产者向目标 Topic-Partition 发送消息。
  4. 生产者同步提交位点所要发往的  Topic-Partition (内部 Topic __consumer_offsets)信息给事务协调者。
  5. 生产者通知 消费组协调者(Group Coordinator,服务端负责感知消费组变化的 Broker)提交位点(仅持久化位点,但并 未更新缓存,因此直到事务提交前对消费者不可见)
  1. 提交/回滚事务
  1. 生产者告知事务协调者事务 执行结果(提交/回滚)。
  2. 事务协调者向事务所涉及的分区 Leader 发送 控制消息标记事务执行结果,同时事务协调者给生产者响应事务已提交/回滚成功。
  3. 所有分区 Leader 将控制消息持久化(任何一个失败都会进行无限重试)后,事务协调者将该事务状态修改为 已提交/回滚,事务结束。

这就是 Kafka 实现流计算针对场景实现 Exactly Once 的大体流程,而在我们正式深入每一步骤的实现前,可以先熟悉下 Kafka 的客户端及服务端是怎么对事务状态进行维护的,帮助后面更好地理解细节。

状态转移

首先是服务端的状态管理,总共分为以下几个状态:

  • EMPTY:事务尚未开始。
  • ONGOING:事务已开始并正在进行中。
  • PREPARE_COMMIT:事务准备提交。
  • PREPARE_ABORT:事务准备回滚。
  • COMPLETE_COMMIT:事务已经提交。
  • COMPLETE_ABORT:事务已经回滚。
  • DEAD:事务 ID 已过期且即将从事务内存中移除。
  • PREPARE_EPOCH_FENCE:处于增加 epoch 数及隔离掉过期生产者的过程中。

而客户端的状态管理则分为以下几种状态:

  • UNINITIALIZED:事务生产者初始化前的状态,这个阶段没有任何事务的处理。
  • INITIALIZING:事务初始化的过程,包含寻找协调者以及获取 PID 等。
  • READY:对于全新的事务,生产者收到来自协调者事务初始化请求的响应后,或者对于已有的事务,事务流程完成后会将状态置为 READY。
  • IN_TRANSACTION:事务生产者正式发送消息前,将自己的状态置为 IN_TRANSACTION,标识事务开始。
  • COMMITTING_TRANSACTION:事务生产者告知服务端事务提交前,会先将自己更新至该状态。
  • ABORTING_TRANSACTION:事务生产者告知服务端事务回滚前,会先将自己更新至该状态。
  • ABORTABLE_ERROR:事务流程中,如果有数据发送失败/异常,会转换为该状态,再自动回滚事务。
  • FATAL_ERROR:转移到该状态后,再进行状态转移时会抛出异常。

实现细节

接下来,让我们正式深入 Exactly-once 的实现细节,了解这其中的每一步客户端和服务端都做了哪些事。

1. 事务初始化

1.a 生产者寻找事务协调者

如同前面所说,事务性机制实现的关键就在于事务协调者,所以整个流程的第一步就是生产者需要找到当前事务所对应的协调者。

生产者调用 initTransactions() 方法后,首先会向服务一台任意一台 Broker (一般选择本地连接最少的)发送 FindCoordinatorRequest,该请求会携带自定义的 Transactional ID (事务 ID)。

服务端则会将事务 ID 的哈希码与 __transaction_state 的分区数(默认为50)取模,得到对应分区 Leader(分区所属的 Broker)会作为协调者的角色。

def partitionFor(transactionalId: String): Int = Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount

1.b 生产者向事务协调者获取 PID

找到协调者后,生产者便会向其发送 InitPidRequest 请求分配 PID 以及该 PID 所对应的 epoch。

而协调者则会通过 ProducerIdManager (与协调者一同初始化的对象,负责管理 PID 信息)向 ZooKeeper 申请一个 PID 段(每次申请1000个 PID),再将该 PID 段区间记录到 ZK 的 “/latest_producer_id” 节点中。当所需 PID 大于该节点所记录的区间时,再重新申请新的 PID 段,否则便将下一个可用的 PID 信息返回给协调者。

def generateProducerId(): Long = {
  this synchronized {
    // 如果分配的 PID 段大于申请的 PID 段区间,便向ZK再申请1000个
    if (nextProducerId > currentProducerIdBlock.producerIdEnd) {
      allocateNewProducerIdBlock()
      nextProducerId = currentProducerIdBlock.producerIdStart
    }
    nextProducerId += 1 // 下一可用 PID
    nextProducerId - 1 // 返回当前分配的 PID
  }
}

除了给生产者返回 PID + epoch 信息,协调者还会初始化该事务 ID 所对应的事务元数据(e.g. 包括 PID、epoch、Topic-Partition等)并持久化到 __transaction_state 中(此时事务状态置为 EMPTY)。

2. 事务开始

这一步生产者会调用 beginTransaction() 方法,表示事务操作正式开始。可以看到,这里并未有任何的客户端-服务端交互,客户端只是简单的将本地的状态转换为 IN_TRANSACTION

public synchronized void beginTransaction () {
  ensureTransactional();
  maybeFailWithError();
  // 将生产者的事务状态置换为 IN_TRANSACTION
  transitionTo(State.IN_TRANSACTION);
}

3. Consume-Process-Produce

3.a 消费者从源 Topic 消费消息并做处理

Consume-Process-Produce 流程的第一步为应用作为消费者的角色从源 Topic 轮询获取消息,并对获取到的消息做处理后进行发送。

for (ConsumerRecord record : records) {
  // 对消费到的消息进行处理
  ProducerRecord customizedRecord = transform(record);
  producer.send(customizedRecord);
}

3.b 生产者同步消息所要发往的分区给事务协调者

生产者发送消息会分为两步,首先会把即将要发往的 Topic-Partition 信息先同步给协调者,再正式向 Topic-Partition 发送消息。

我们来看下代码实现,生产者调用 send() 方法时,会将对应 Topic-Partition 信息记录到本地的 TransactionManager,如果发现该 Topic-Partition 之前不存在,则向协调者发送 AddPartitionsToTxnRequest,添加该信息到事务 ID 对应的元数据中并持久化到 __transaction_state 日志。

// 正式发消息前会对事务消息类型做检查,判断是否需要同步 Topic-Partition 信息给协调者
if (transactionManager != null && transactionManager.isTransactional())
  transactionManager.maybeAddPartitionToTransaction(tp);

public synchronized void maybeAddPartitionToTransaction (TopicPartition topicPartition) {
  // 如果分区信息已添加到 TransactionManager 中
  if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition))
    return;

  log.debug("Begin adding new partition {} to transaction", topicPartition);
  // 添加事务消息的分区信息到待发送分区集合中(发往协调者)
  topicPartitionBookkeeper.addPartition(topicPartition);
  newPartitionsInTransaction.add(topicPartition);
}

3.c 生产者向目标 Topic-Partition 发送消息

生产者将携带 Batch 消息的 ProduceRequest 发往指定的 Topic-Partition,这里比较重要的逻辑是会给当前 Batch 消息所对应的 Topic-Partition 设置序列号和事务标识,该序列号会随发往该分区的消息而递增。

synchronized Integer sequenceNumber(TopicPartition topicPartition) {
  return topicPartitionBookkeeper.getOrCreatePartition(topicPartition).nextSequence;
}

服务端接收到请求后,会验证其携带的 Batch 消息是否重复以及乱序。

def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = {
  if (batch.producerEpoch != producerEpoch)
    None
  else
    batchWithSequenceRange(batch.baseSequence, batch.lastSequence)
}

def batchWithSequenceRange(
    firstSeq: Int,
    lastSeq: Int
): Option[BatchMetadata] = {
  // ProducerStateEntry#batchMetadata 属性存放了该生产者最新发送的5个消息批次的序号,如果该集合中存在某个消息批次与请求的消息批次的
  // baseSequence(第一条消息的序号)、 lastSequence(最后一条消息的序号)相同,则认为生产者发送的消息批次重复发送,不做处理
  val duplicate = batchMetadata.filter { metadata =>
    firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq
  }
  duplicate.headOption
}

// 判断序列号是否连续
private def inSequence(lastSeq: Int, nextSeq: Int): Boolean = {
  nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Int.MaxValue)
}

3.d 生产者同步所要提交的消息位点分区信息给事务协调者

生产者会调用 sendOffsetsToTransaction() 方法进行消费位点的提交,该步骤可视为与发送事务消息一样,一样分为两步。

首先向协调者发送 AddOffsetsToTxnRequest 将即将要发往的 Topic-Partition(提交位点是向 __consumer_offsets 内部 Topic)同步给协调者,再将位点信息发送给消费组协调者(GroupCoordinator,__consumer_offsets 的分区 Leader,会负责记录消费进度)。

public synchronized TransactionalRequestResult sendOffsetsToTransaction(
    final Map offsets,
    final ConsumerGroupMetadata groupMetadata) {
  ...
  // 生产者发送 AddOffsetsToTxn 请求给事务协调者,请求中携带了ACK偏移量,事务协调者收到该
  // 请求后会将偏移量信息存储到事务状态主题中,生产者收到事务协调者对 AddOffsetsToTxn 
  // 请求的响应后,再正式提交位点发给GroupCoordinator
  log.debug("Begin adding offsets {} for consumer group {} to transaction",
      offsets, groupMetadata);
  AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(
      new AddOffsetsToTxnRequestData()
          .setTransactionalId(transactionalId)
          .setProducerId(producerIdAndEpoch.producerId)
          .setProducerEpoch(producerIdAndEpoch.epoch)
          .setGroupId(groupMetadata.groupId()));
  AddOffsetsToTxnHandler handler =
      new AddOffsetsToTxnHandler(builder, offsets, groupMetadata);

  enqueueRequest(handler);
  return handler.result;
}

协调者在收到该请求后,便跟 3.b 一样,将根据指定 group.id(消费者组 ID) 算出对应的 __consumer_offsets 分区信息(group.id 的哈希码与 __consumer_offsets 的分区数取模)写入到当前事务 ID 所对应的元数据并持久化到 __transaction_state 的日志中。

// 计算消费位点信息要发往哪个 __consumer_offsets 分区
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

// 将该分区信息添加到事务元数据中(与处理存储分区信息共用 handleAddPartitionsToTransaction 方法),
// 并将新的事务元数据写入 __transaction_state 事务状态主题
txnCoordinator.handleAddPartitionsToTransaction(
  transactionalId,
  addOffsetsToTxnRequest.data.producerId,
  addOffsetsToTxnRequest.data.producerEpoch,
  Set(offsetTopicPartition),
  sendResponseCallback,
  requestLocal
)

3.e 生产者通知消费组协调者提交位点

收到协调者对 AddOffsetsToTxnRequest 的响应后,生产者才会正式向消费组协调者(GroupCoordinator)发送 TxnOffsetCommitRequest 请求正式提交已消费的消息位点信息。

消费组协调者(GroupCoordinator)接收到 TxnOffsetCommitRequest 请求后,会根据 group.id 计算出对应的 __consumer_offsets 分区,并将位点信息(包含 PID 信息)持久化。

需要注意的一点是,此时消费组协调者还不会更新消费进度缓存,所以对消费者(消费时通过接口获取缓存中的消费进度)尚不可见,只有这个事务完成提交后才会将位点信息写入到缓存。

// 生产者收到来自协调者对 AddOffsetsToTxnRequest 请求的响应后,
// 即发送TxnOffsetCommit请求给Group Coordinator,请求中携带了已消费位点
pendingRequests.add(txnOffsetCommitHandler(result, offsets, groupMetadata));

// GroupCoordinator 将位点信息持久化至 __consumer_offsets 日志中
private def doTxnCommitOffsets(group: GroupMetadata,
                                 memberId: String,
                                 groupInstanceId: Option[String],
                                 generationId: Int,
                                 producerId: Long,
                                 producerEpoch: Short,
                                 offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                                 requestLocal: RequestLocal,
                                 responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
	group.inLock {
  	val validationErrorOpt = validateOffsetCommit(group, generationId, memberId, groupInstanceId,
                                                  isTransactional = true)
  	if (validationErrorOpt.isDefined) {
    	responseCallback(offsetMetadata.map { case (k, _) => k -> validationErrorOpt.get })
    } else {
      // 将位点信息进行持久化
    	groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId,
          producerEpoch, requestLocal)
    }
  }
}

4. 提交/回滚事务

4.a 生产者告知事务协调者事务执行结果

事务操作完成后,生产者会调用 commitTransaction()/abortTransaction() 来提交或回退当前事务,这俩操作都会发送携带提交/回滚标识的 EndTxnRequest 给协调者。

// 切换状态至正在提交事务COMMITTING_TRANSACTION,并发送EndTxnRequest请求给协调者(beginCompletingTransaction),
// 事务回滚也是相同流程
public synchronized TransactionalRequestResult beginCommit() {
    return handleCachedTransactionRequestResult(() -> {
        maybeFailWithError();
        transitionTo(State.COMMITTING_TRANSACTION);
        return beginCompletingTransaction(TransactionResult.COMMIT);
    }, State.COMMITTING_TRANSACTION);
}

private TransactionalRequestResult beginCompletingTransaction(
    TransactionResult transactionResult) {
  ...
  // 发送EndTxnRequest请求,请求中携带了提交事务或回退事务的标志,这里只关注提交事务的逻辑,协调者收到EndTxnRequest请求后,
  // 会将事务状态切换到PREPARE_COMMIT状态,并将该PREPARE_COMMIT状态写入事务状态主题,写入成功后,协调者返回事务提交成功的
  // 响应给生产者
  if (!(lastError instanceof InvalidPidMappingException)) {
    EndTxnRequest.Builder builder = new EndTxnRequest.Builder(
        new EndTxnRequestData()
            .setTransactionalId(transactionalId)
            .setProducerId(producerIdAndEpoch.producerId)
            .setProducerEpoch(producerIdAndEpoch.epoch)
            .setCommitted(transactionResult.id));

    EndTxnHandler handler = new EndTxnHandler(builder);
    enqueueRequest(handler);
    if (!epochBumpRequired) {
      return handler.result;
    }
  }
  // 将状态转换回 INITIALIZING 状态,请求协调者增加 PID 所对应的 epoch,并回到 READY 状态,
  // 准备进行下次事务操作
  return initializeTransactions(this.producerIdAndEpoch);
}

协调者收到 EndTxnRequest 请求后,会更新 __transaction_state 的元数据,将事务状态置为 PREPARE_COMMIT/PREPARE_ABORT

4.b 事务协调者向事务所涉及的分区 Leader 发送控制消息

待 __transaction_state 元数据更新并持久化成功后,接下来协调者会同时做两件事,分别为向生产者发送执行成功响应(实际还没有),后续事务执行交给协调者来完成,以及向各个事务所涉及的 Topic-Partition 发送 WriteTxnMarkerRequest 请求,标记事务执行结果。

preSendResult match {
  case Left(err) =>
    info(
      s"Aborting sending of transaction markers after appended $txnMarkerResult to transaction log and returning $err error to client for $transactionalId's EndTransaction request"
    )
    responseCallback(err)

  case Right((txnMetadata, newPreSendMetadata)) =>
    // 先返回给客户端成功响应,然后再发送marker消息直到broker成功持久化控制消息
    responseCallback(Errors.NONE)

    // 该方法会为该事务所有事务分区创建一个TxnIdAndMarkerEntry实例(代表一个待发送的 WriteTxnMarkers 请求,目标节点为分区
    // Leader),并添加到TransactionMarkerChannelManager#markersQueuePerBroker中
    txnMarkerChannelManager.addTxnMarkersToSend(
      coordinatorEpoch,
      txnMarkerResult,
      txnMetadata,
      newPreSendMetadata
    )
}

判断控制消息(WriteTxnMarkerRequest)是否成功写入到所有目标 Topic-Partition,如果不成功则需不断进行重试,直到所有控制消息都已持久化。

if (!abortSending) {
  if (retryPartitions.nonEmpty) {
    debug(
      s"Re-enqueuing ${txnMarker.transactionResult} transaction markers for transactional id $transactionalId " +
        s"under coordinator epoch ${txnMarker.coordinatorEpoch}"
    )

    // 如果Broker返回处理失败的响应,则协调者重新生成TxnIdAndMarkerEntry实例并添加到TransactionMarkerChannelManager#markersQueuePerBroker队列中,
    // 后续协调者会重新发送WriteTxnMarkers请求到该Broker,直到事务中所有的Broker处理成功,该事务才算提交完成
    txnMarkerChannelManager.addTxnMarkersToBrokerQueue(
      transactionalId,
      txnMarker.producerId,
      txnMarker.producerEpoch,
      txnMarker.transactionResult,
      txnMarker.coordinatorEpoch,
      retryPartitions.toSet
    )
  } else {
    // 如果Broker返回处理成功的响应
    txnMarkerChannelManager.maybeWriteTxnCompletion(transactionalId)
  }
}

4.c 事务协调者将该事务状态修改为已提交/回滚

待所有控制消息都被持久化到对应的日志文件后,协调者才会将 __transaction_state 中,该事务的状态更新为 COMPLETE_COMMIT/COMPLETE_ABORT,表示该事务已完成,可以将相关的缓存内容清除了,这里只需要保留已完成事务的 PID 及事务完成的时间戳,即可通过事务过期处理机制删除过期的事务 ID & PID 映射。

这边需要特别提下事务回滚操作,其流程跟事务提交大同小异,但回滚除了由客户端主动触发,也有可能由协调者本身来进行触发,比如事务超时的场景,协调者会定时扫描检查是否有超时的事务。

scheduler.schedule(
  "transaction-abort",
  () => abortTimedOutTransactions(onEndTransactionComplete),
  txnConfig.abortTimedOutTransactionsIntervalMs,
  txnConfig.abortTimedOutTransactionsIntervalMs
)

此外,回滚操作还会向 .txnIndex 文件更新左右已回滚的事务,方便消费端更高效地过滤掉已回滚的数据,不需等待消费到控制消息。

// 检查已完成的事务中是否有需要记录到 .txnindex 文件中的回滚事务
completedTxns.foreach { completedTxn =>
  val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
  segment.updateTxnIndex(completedTxn, lastStableOffset)
  producerStateManager.completeTxn(completedTxn)
}

// 如果事务以回滚,便会将事务的起止 offset,以及当前的 LSO 记录到 .txnindex 文件
def updateTxnIndex(completedTxn: CompletedTxn, lastStableOffset: Long): Unit = {
  if (completedTxn.isAborted) {
    trace(
      s"Writing aborted transaction $completedTxn to transaction index, last stable offset is $lastStableOffset"
    )
    txnIndex.append(new AbortedTxn(completedTxn, lastStableOffset))
  }
}

Kafka 如何处理事务流程中的故障/异常?

Kafka 针对流计算场景实现的 Exactly Once 能力看似很完备,但我们都知道在分布式的世界中,事情往往不会那么美好,所以我们有必要来分析下它是如何应对各个事务执行阶段可能发生的异常或故障的。

生产者故障/重启的 Exactly Once 保障情况?

在事务流程中,生产者主要涉及如下几个阶段:

事务初始化(Empty)
  • 生产者发送请求(FindCoordinatorRequest/InitProducerIdRequest)错误/超时:生产者会根据返回的异常信息进行重试/关闭。
  • 生产者故障:如果已成功初始化事务元数据,则会通过服务端的事务 ID 超时机制,清除过期事务 ID。
  • 生产者重启:重启后的生产者会重新向服务端发送 FindCoordinatorRequest 寻找协调者,并向协调者请求获取 PID,请求会携带与重启前一样的 Transactional ID,如果协调者处在重启前已生成并记录该 Transactional ID 对应的事务状态,则服务端会返回其相对应的 PID + epoch 信息(epoch 会加1),否则重新生成事务元数据并返回 PID + epoch 信息。
事务进行中(Empty -> Ongoing)
  • 生产者发送请求(AddPartitionToTxnRequest/ProduceRequest/AddOffsetsToTxnRequest/ TxnOffsetsCommitRequest)错误/超时:生产者会根据返回的异常信息,主动向协调者发送回滚请求回滚该事务。
  • 生产者故障:通过服务端的事务超时机制回滚该事务。
  • 生产者重启:重启后会从头开始事务流程(包括事务初始化 + Consume-Process-Produce流程),在事务初始化阶段,服务端会回滚该事务 ID 重启前的事务,该流程会要求生产者等待一段时间后重试,回滚过期事务后才会继续新事务。
事务提交/回滚(Ongoing -> PrepareCommit/PrepareAbort -> CompleteCommit/CompleteAbort)
  • 生产者发送请求(EndTxnRequest)错误/超时:生产者会根据返回的异常信息,主动回滚事务,或者重试 EndTxnRequest 请求到协调者,而协调者收到该请求后则会发送控制消息到各分区 Leader 来标识事务已完成。
  • 生产者故障:服务端如果有成功收到生产者的 EndTxnRequest,则会接管后续提交/回滚流程,向各分区 Leader 发送控制消息。如果服务端没有收到生产者的 EndTxnRequest,则会通过事务超时机制回滚该事务。
  • 生产者重启:重启后会从头开始事务流程(包括事务初始化 + Consume-Process-Produce流程),在事务初始化阶段,服务端会回滚该事务 ID 重启前的事务,该流程会要求生产者等待一段时间后重试,回滚过期事务后才会继续新事务。

Broker 故障/重启的 Exactly Once 保障情况?

在事务流程中,Broker 主要涉及如下几个阶段:

事务初始化(Empty)
  • 协调者发生故障/重启:通过 HA 机制重新选主客户端重试 FindCoordinatorRequest 找到新协调者,并继续后续事务流程。
事务进行中(Empty -> Ongoing)
  • 协调者发生故障/重启:通过 HA 机制重新选主 + Transaction Log 恢复事务状态(ONGOING),客户端重试 FindCoordinatorRequest 寻找新任新协调者,并继续发送消息。
  • Broker 发生故障/重启:通过 HA 机制重新选主 + PID Snapshot & 日志恢复 PID-消息序列号映射,客户端向对应的分区 Leader 重新发送消息。
事务提交/回滚(Ongoing -> PrepareCommit/PrepareAbort -> CompleteCommit/CompleteAbort)
  • 协调者发生故障/重启:通过 HA 机制重新选主 + Transaction Log 恢复事务状态(PREPARE_COMMIT/PREPARE_ABORT),新任协调者会继续完成后续事务流程(向各分区 Leader 发送控制消息)。
  • Broker 发生故障/重启:通过 HA 机制重新选主 + PID Snapshot & 日志恢复 PID-消息序列号映射,协调者会不断重试发送控制消息直到其成功写入对应的分区 Leader。

事务超时

客户端在事务初始化阶段会设置一个事务超时时间(transaction.timeout.ms,默认60秒),该参数用于限制服务端等待客户端更新事务状态的最大时间,超过该时间则主动回滚当前事务。

对应的服务端则有个最长可允许事务超时时间设置(同样叫 max.transaction.timeout.ms,默认15分钟),客户端的超时时间不可超过服务端该值,否则抛出 InvalidTxnTimeoutException。

除了事务操作超时设置,我们都知道协调者会对事务 ID 的相关元数据进行缓存,为了保证缓存不会被无效占用,所以还有个事务 ID 超时配置 transaction.id.expiration.ms,如果事务 ID 长期(默认7天)无请求发送过来,则会判断该事务 ID 已过期,将该映射进行清除。


游客sn2k7wrgfnq32
目录
相关文章
土木林森
|
2月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
土木林森
124 58
喜欢猪猪
|
1月前
|
消息中间件 安全 Kafka
Kafka支持SSL/TLS协议技术深度解析
SSL(Secure Socket Layer,安全套接层)及其继任者TLS(Transport Layer Security,传输层安全)是为网络通信提供安全及数据完整性的一种安全协议。这些协议在传输层对网络连接进行加密,确保数据在传输过程中不被窃取或篡改。
喜欢猪猪
92 0
路边两盏灯
|
2月前
|
消息中间件 域名解析 网络协议
【Azure 应用服务】部署Kafka Trigger Function到Azure Function服务中,解决自定义域名解析难题
【Azure 应用服务】部署Kafka Trigger Function到Azure Function服务中,解决自定义域名解析难题
路边两盏灯
26 0
软件求生
|
4月前
|
消息中间件 Kafka 程序员
Kafka面试必备:深度解析Replica副本的作用与机制
**Kafka的Replica副本是保证数据可靠性的关键机制。每个Partition有Leader和Follower副本,Leader处理读写请求及管理同步,Follower被动同步并准备成为新Leader。从Kafka 2.4开始,Follower在完全同步时也可提供读服务,提升性能。数据一致性通过高水位机制和Leader Epoch机制保证,后者更精确地判断和恢复数据一致性,增强系统容错能力。**
软件求生
151 1
软件求生
|
4月前
|
消息中间件 监控 Kafka
深入解析:Kafka 为何不支持全面读写分离?
**Kafka 2.4 引入了有限的读写分离,允许Follower处理只读请求,以缓解Leader压力。但这不适用于所有场景,特别是实时数据流和日志分析,因高一致性需求及PULL同步方式导致的复制延迟,可能影响数据实时性和一致性。在设计系统时需考虑具体业务需求。**
软件求生
52 1
张飞的猪
|
4月前
|
消息中间件 SQL 存储
ClickHouse(21)ClickHouse集成Kafka表引擎详细解析
ClickHouse的Kafka表引擎允许直接从Apache Kafka流中消费数据,支持多种数据格式如JSONEachRow。创建Kafka表时需指定参数如brokers、topics、group和format。关键参数包括`kafka_broker_list`、`kafka_topic_list`、`kafka_group_name`和`kafka_format`。Kafka特性包括发布/订阅、容错存储和流处理。通过设置`kafka_num_consumers`可以调整并行消费者数量。Kafka引擎还支持Kerberos认证。虚拟列如`_topic`、`_offset`等提供元数据信息。
张飞的猪
213 0
公众号:码到三十五
|
4月前
|
消息中间件 存储 运维
RocketMQ与Kafka深度对比:特性与适用场景解析
RocketMQ与Kafka深度对比:特性与适用场景解析
公众号:码到三十五
512 0
Codelinghu
|
6天前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
Codelinghu
21 0
Codelinghu
|
6天前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
Codelinghu
19 0
Codelinghu
|
6天前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
Codelinghu
15 0

热门文章

最新文章

  • 1
    解决kafka集群由于默认的__consumer_offsets这个topic的默认的副本数为1而存在的单点故障问题
  • 2
    SQLServer CDC数据通过Kafka connect实时同步至分析型数据库 AnalyticDB For PostgreSQL及OSS
  • 3
    springboot配置kafka生产者和消费者详解
  • 4
    flume-kafka 实例 详细
  • 5
    阿里云Kafka幂等生产者与事务生产者
  • 6
    Apache Kafka开发入门指南
  • 7
    kafka集群搭建
  • 8
    阿里云Flink-自定义kafka sink partitioner实践及相关踩坑记录
  • 9
    FlinkSQL 实时采集Kafka内容到MySQL(实战记录)
  • 10
    kafka web console安装
  • 1
    API在电子商务中的应用与优势:深入解析
    62
  • 2
    Java8 Lambda实现源码解析
    162714
  • 3
    深入解析JavaScript中的模板字符串
    68
  • 4
    linux centos系统搭建samba文件服务器 NetBIOS解析 (超详细)
    184
  • 5
    Linux中搭建DNS 域名解析服务器(详细版)
    338
  • 6
    万字长文深度解析JDK序列化原理及Fury高度兼容的极致性能实现
    168680
  • 7
    移动应用与系统:开发与操作系统的深度解析
    50
  • 8
    解析FormData格式数据:Python实践指南
    250
  • 9
    【AOP入门案例深解析】
    57
  • 10
    ArrayList源码解析
    39
  • 相关课程

    更多
  • 云计算工程师解析与实战-网络专家篇(体验版)
  • 深入解析Docker容器化技术
  • 分布式消息系统 Kafka 快速入门
  • Java面试疑难点解析 - 面试技巧及语言基础
  • Java面试疑难点解析 - Java Web开发
  • Java面试疑难点解析 - 系统架构及项目设计
  • 相关电子书

    更多
  • 神龙云服务器产品及技术深度解析
  • 弹性创造价值:基于ECS的最佳性价比实践解析
  • 又快又稳:阿里云下一代虚拟交换机解析
  • 相关实验场景

    更多
  • 通过云拨测对指定服务器进行Ping/DNS监测
  • 推荐镜像

    更多
  • DNS
  • NTP
  • kali-security
  • 下一篇
    AI助理化繁为简,速取代码参数——使用python SDK 处理OSS存储的图片

    玻璃钢生产厂家邵武玻璃钢雕塑厂家送子观音玻璃钢雕塑广州大型卡通玻璃钢雕塑吉林玻璃钢雕塑定做玻璃钢抽象雕塑造价商场美陈模型图湖北仿铜玻璃钢雕塑联系方式无锡设计玻璃钢雕塑费用中庭商场美陈供应商银川人物玻璃钢雕塑哪家好沈阳鸡西玻璃钢人物雕塑湛江信誉好的玻璃钢雕塑曲周玻璃钢花盆花器浙江四大天王玻璃钢雕塑加工台州学校玻璃钢雕塑销售厂家江苏季节性商场美陈商场美陈气球文案甘肃卡通玻璃钢雕塑大量销售汕头玻璃钢动物雕塑供应商家日照玻璃钢动物雕塑山西高质量玻璃钢雕塑生产厂家深圳宏丰玻璃钢雕塑清远楼盘玻璃钢人物雕塑常州商场大型美陈天水景区玻璃钢雕塑安装开业商场美陈价格山西省玻璃钢雕塑找哪家五华区玻璃钢雕塑哪里有卖玻璃钢雕塑仿不锈钢太原玻璃钢海豚雕塑价格香港通过《维护国家安全条例》两大学生合买彩票中奖一人不认账让美丽中国“从细节出发”19岁小伙救下5人后溺亡 多方发声单亲妈妈陷入热恋 14岁儿子报警汪小菲曝离婚始末遭遇山火的松茸之乡雅江山火三名扑火人员牺牲系谣言何赛飞追着代拍打萧美琴窜访捷克 外交部回应卫健委通报少年有偿捐血浆16次猝死手机成瘾是影响睡眠质量重要因素高校汽车撞人致3死16伤 司机系学生315晚会后胖东来又人满为患了小米汽车超级工厂正式揭幕中国拥有亿元资产的家庭达13.3万户周杰伦一审败诉网易男孩8年未见母亲被告知被遗忘许家印被限制高消费饲养员用铁锨驱打大熊猫被辞退男子被猫抓伤后确诊“猫抓病”特朗普无法缴纳4.54亿美元罚金倪萍分享减重40斤方法联合利华开始重组张家界的山上“长”满了韩国人?张立群任西安交通大学校长杨倩无缘巴黎奥运“重生之我在北大当嫡校长”黑马情侣提车了专访95后高颜值猪保姆考生莫言也上北大硕士复试名单了网友洛杉矶偶遇贾玲专家建议不必谈骨泥色变沉迷短剧的人就像掉进了杀猪盘奥巴马现身唐宁街 黑色着装引猜测七年后宇文玥被薅头发捞上岸事业单位女子向同事水杯投不明物质凯特王妃现身!外出购物视频曝光河南驻马店通报西平中学跳楼事件王树国卸任西安交大校长 师生送别恒大被罚41.75亿到底怎么缴男子被流浪猫绊倒 投喂者赔24万房客欠租失踪 房东直发愁西双版纳热带植物园回应蜉蝣大爆发钱人豪晒法院裁定实锤抄袭外国人感慨凌晨的中国很安全胖东来员工每周单休无小长假白宫:哈马斯三号人物被杀测试车高速逃费 小米:已补缴老人退休金被冒领16年 金额超20万

    玻璃钢生产厂家 XML地图 TXT地图 虚拟主机 SEO 网站制作 网站优化