摘要:AutoMQ 自 2023 年底正式开源以来,凭借其面向云原生场景的创新架构,迅速赢得了全球开发者的关注。目前在 GitHub 上已累计接近 6.5k Star,多次登上 GitHub Trending,受到海外技术社区的高度认可。Kafka 作为流处理领域的
本内容原始内容为英文,如需追求最原汁原味和准确的阅读体验,请直接点击底部 [查看原文] 阅读原始英文素材。
AutoMQ 自 2023 年底正式开源以来,凭借其面向云原生场景的创新架构,迅速赢得了全球开发者的关注。目前在 GitHub 上已累计接近 6.5k Star,多次登上 GitHub Trending,受到海外技术社区的高度认可。Kafka 作为流处理领域的核心组件,其在云环境中的演进备受关注。AutoMQ 基于 S3 构建的新一代 Kafka 存储引擎,提供了更低成本、更高弹性的新选择。
我们为大家带来的是一篇来自海外开发者 Vu Trinh 的优质内容翻译。Vu Trinh 是一位专注于数据工程的技术作者,拥有超过 24,000 名读者关注,长期在 Medium 和 Substack 平台分享关于流处理、OLAP 数据库和云原生架构的深度内容。本文基于 AutoMQ,深入剖析了将 Apache Kafka 构建在对象存储之上所面临的关键挑战与技术解法。相信这篇内容能为希望理解 Kafka 云原生演进路径的读者提供实用启发。
背景
但在深入探讨之前,不妨先问一个简单的问题:“为什么要把数据转移到 S3 上?”,答案是——为了降低成本。
在 Kafka 中,计算与存储是紧耦合的,也就是说,增加存储能就必须添加更多机器,这常常会导致资源利用效率低下。
Kafka 的设计还依赖数据副本来保证数据的持久性。在存储消息后,Leader 必须将数据复制给多个 Follower。由于体系结构的紧耦合,一旦集群成员发生变动,数据就需要在不同机器之间迁移,从而带来额外的开销。
另一个问题是跨可用区(AZ)的传输费用。像 AWS 或 GCP 这样的云服务商会对跨区请求收取额外费用。由于 Producer 只能将消息写入对应的 Partition leader,在云上部署 Kafka 时,如果 Borker 和分区平均分布在三个可用区,Producer 大约有三分之二的时间需要向位于其他可用区的 Leader 写入数据。此外,Kafka 在云上的部署还会产生大量的跨可用区传输费用,因为 Leader 还需要将消息复制给分布在其他可用区的 Follower。
想象一下,如果你将所有数据卸载到像 S3 这样的对象存储上,你可以:
节省存储成本,因为对象存储比磁盘存储更便宜;
实现计算与存储的独立扩展;
避免数据复制,因为对象存储本身具备数据持久性和可用性保障;
让任何 Broker 都可以处理读写请求;
……
基于对象存储构建 Kafka 兼容的解决方案,正成为一股新兴趋势。自 2023 年以来,已有至少五家厂商推出了类似的解决方案。2023 年有 WarpStream 和 AutoMQ,2024 年又有 Confluent Freight Clusters、Bufstream,以及 Redpanda Cloud Topics 等。
抛开这些市场热度不谈,我真正感兴趣的是:构建这样一个基于 S3 的存储层解决方案,究竟会遇到哪些挑战?为此,我选择了 AutoMQ 来进行研究,因为它是目前唯一的开源版本。这使我能够更深入地理解其中的难点与解决方案。
AutoMQ 简介
AutoMQ 是一个 100% 兼容 Kafka 的替代方案。它通过复用 Kafka 的协议栈代码,并重写存储层,利用 Write Ahead Log(预写日志) 技术,实现了在云环境中高效运行 Kafka,并将数据高效卸载到 Object storage(对象存储) 中。关于 AutoMQ 的更多介绍,可以参考我之前的文章 [1]。
接下来,我们将探讨在 Object storage(对象存储) 上构建 Kafka 所面临的潜在挑战,并了解 AutoMQ 是如何逐一应对这些问题的。
01 延迟
第一个也是最直观的挑战是延迟。以下是一些数据,帮助大家更直观地理解:对 Object storage(对象存储)发起 GetObject 请求时,中位延迟大约为 20 毫秒,P90 延迟约为 60 毫秒;而 NVMe SSD 的延迟在 20–100 微秒之间,速度大约快了 1000 倍。
一些厂商选择牺牲低延迟性能。WarpStream 或 Bufstream 认为,为了大幅节省成本和简化运维,这是一个值得的取舍。这类系统会等消息持久化到 Object storage(对象存储)之后,才向 Producer 发送确认(ack)消息。
AutoMQ 则不这么做。它通过 WAL+S3 的架构实现低延迟。为了保持写入延迟 P99 小于 10 毫秒,AutoMQ 的 Broker 首先将数据写入 WAL(预写日志)。WAL 本质上是一个磁盘设备,比如 AWS EBS。在写入 S3 之前,Broker 必须确保消息已经持久化到 WAL 中;只有当消息成功写入 WAL 后,Broker 才会向 Producer 返回 “我已经收到消息” 的响应。而将数据写入对象存储的过程则是异步进行的。
这种设计的核心思想是:利用 WAL 来发挥不同云存储介质的特性,并与 S3 灵活组合,以适应各种使用场景。例如:
使用 EBS 时,WAL 可提供极低的延迟;但当 Producer 向位于其他可用区的 Leader partition 发送消息时,仍会产生跨 AZ 的数据传输费用。
使用 S3 作为 WAL(AutoMQ 将 S3 同时作为主存储和 WAL 使用)时,用户可以完全消除跨 AZ 成本,但相应地写入延迟会有所增加。
02 IOPS
与延迟相关的另一个问题是写入 Object storage(对象存储)的频率。S3 Standard PUT 请求每 1000 次请求的成本是 $0.005。如果一个服务每秒进行 10,000 次写入,一个月下来光是 PUT 请求的费用就会高达 13 万美元。
如果 Broker 在接收到 Producer 的消息后立刻将其写入 Object storage,那么 PUT 请求的数量将是极其庞大的。
为了降低对 Object storage 的请求次数,几乎所有厂商都会让 Broker 在上传前对数据进行批处理:将数据暂存在内存中一段时间,或者直到累计达到某个设定的大小再统一上传。用户可以选择缩短缓存时间以换取更低的延迟,但相应地,也需要承担更多的 PUT 请求费用。
这些 Brokers 可以将来自不同 Topic 或 Partition 的数据进行批处理,以降低单个 Partition 写入带来的成本。在 AutoMQ 的数据批处理过程中,可能会生成两种类型的对象:
Stream Set Object(SSO):包含来自不同 Partition 的连续数据段的对象。
Stream Object(SO):包含来自单个 Partition 的连续 segment 的对象。
在将数据写入 Object storage 时,有两种场景:
如果同一个 Stream 的数据可以填满批处理阈值,Broker 就会以 SO 的形式上传;
如果是多个不同 Partition 的 Stream 数据合并后达到批处理大小,Broker 就会以 SSO 的形式上传。
(需要说明的是,这里的描述并不代表 AutoMQ compaction 过程的真实实现。)
由于这种机制,同一个 Partition 的数据可能被分散到多个对象中,当 Broker 发起读取请求时会增加请求次数,进而影响读取性能。为此,AutoMQ 设置了一个后台 Compaction 过程,用于异步地将同一个 Partition 的数据尽可能合并到最少的对象中,这可以确保同一 Partition 的数据物理上更集中,从而实现 Object storage 中的顺序读取。
03 缓存管理
延续前文关于延迟与 IOPS 的挑战,提高 Object storage 读取性能最简单的方式,就是减少对对象存储的 GET 请求次数。
数据缓存可以有效帮助实现这一目标,带来两个好处:提升读取性能,减少对对象存储的访问频率。但这也引出了一个新的问题:如何高效地管理缓存以提高命中率?(计算机科学中仅有的两个难题之一就是缓存失效,另一个是命名。)
WarpStream 采用一致性哈希环来在 Agents 之间分配负载,每个 Agent 负责一个 Topic 中的一部分数据。当 Agent 收到客户端请求时,它会识别出谁负责相应的数据文件,并将请求路由给对应的 Agent。
AutoMQ 则尝试保留 Kafka 中的“数据本地性(data locality)”特性,Broker 仍然知晓自己负责的 Partition。因此,AutoMQ 的缓存管理机制可以设计为:Broker 缓存其所管理的 Partition 的数据。(后文我们会详细讨论“数据本地性”)
AutoMQ 设计了两种用途不同的缓存系统:Log Cache 用于处理写入和热读(即最近数据);Block Cache 负责处理冷读(即历史数据)。当 Broker 接收到来自 Producer 的消息时,除了写入 WAL,还会同步写入 Log cache,以便快速响应近期读取请求。
如果 Log cache 中没有命中的数据,则会从 Block cache 中读取。Block cache 是通过从对象存储加载数据填充的,它通过预取(Prefetching)和批量读取(Batch reading)等技术,提高了对历史数据的内存命中率,从而在执行冷读操作时仍能保持良好的性能。
04 元数据管理
构建在对象存储上的系统需要比 Kafka 更多的元数据。例如,Kafka 可以通过扫描文件系统目录树列出某个 Partition 下的 Segments。在 S3 中执行相同的操作需要发出 LIST 请求,然而这些请求的性能不佳。此外,由于数据的批量处理,消息的顺序不像在 Kafka 中那样直接。
新的系统需要更多的元数据,以回答“哪些对象持有该 Topic 的数据?”或“如何确保消息的顺序?”这样的问题。
这些元数据的数量与存储在 S3 中的对象总数相关。为确保元数据数量的优化,AutoMQ 运用 IOPS 部分中的 Compaction 技术,将多个小对象组合成较大的对象,从而限制元数据的数量。
此外,Kafka 利用 ZooKeeper 或 Kraft 来存储集群的元数据,比如 Broker 注册或 Topic 配置。WarpStream 或 Bufstream 则依赖事务型数据库来完成这一功能。
Zookeeper 模式与 Kraft 模式
相比之下,AutoMQ 采用的是 Kraft 架构,引入了一个 Controller quorum(控制器仲裁组) 来选举出 Controller leader。整个集群的元数据(包括 Topic/partition 与数据的映射关系、Partition 与 Broker 的映射关系 等)都存储在 Leader 上,只有 Leader 有权限修改这些元数据。如果某个 Broker 想要修改元数据,必须先与 Leader 通信。这些元数据会被复制到每个 Broker 上,任何元数据的变更都会由 Controller 将其同步传播到所有 Broker。
05 Kafka 兼容性
除了要解决上述所有问题之外,Kafka 替代方案还必须具备一个关键能力:让用户可以毫无障碍地从 Kafka 切换到新方案。换句话说,这个新方案必须与 Kafka 兼容。
Kafka 协议的核心技术设计基于一个重要前提:依赖本地磁盘存储数据。这包括:将消息追加写入物理日志、将 Topic 拆分成多个 Partitions、在多个 Broker 之间进行副本复制、实现负载均衡、获取 Leader 信息以生产消息、通过在 Segment 文件中定位 Offset 来为消费者提供消费服务等。
因此,要在 Object storage 上开发一个 Kafka 兼容的方案是极具挑战性的。先不论性能问题,对 Object storage 的写入方式与本地磁盘完全不同。我们无法像在文件系统中那样,打开一个不可变对象并在其末尾追加数据。
那么,这些团队是如何提供一个基于 Object storage 同时又能无缝替代原设使用本地磁盘的解决方案呢?
一些方案(例如 WarpStream、Bufstream)选择完全重写 Kafka 协议,以适应 Object storage 的特性。他们认为这种方式比基于开源 Kafka 协议进行适配更直接。
而 AutoMQ 则采取不同的做法:专注于只重写 Kafka 的存储层,以最大限度地复用开源 Kafka 协议。虽然这一过程可能会遇到很多挑战,但我认为这是值得的。这样一来,他们能够为用户提供与 Kafka 完全兼容的解决方案;即便 Kafka 发布了新功能,他们也能将这些改动合并到 AutoMQ 的代码中。但是,他们是如何构建这个新存储层,使其能够与对象存储协同工作的呢?在揭晓答案前,我们先来回顾一下 Kafka 的内部结构。
Kafka 中有以下关键组件:
网络模块负责管理 Kafka Client 的连接收发;
KafkaApis 根据请求中的 API key 将请求分发到具体的模块;
ReplicaManager 负责消息的发送与接收以及分区管理;Coordinator 负责消费者管理和事务消息处理;Kraft 则负责集群的元数据管理。
Storage(存储):该模块提供可靠的数据存储,并向 ReplicaManager、Coordinator 和 Kraft 提供 Partition(分区)抽象。它被划分为多个层级:
UnifiedLog 通过 ISR 多副本复制机制确保高可靠性的数据存储;
LocalLog 负责本地数据存储,提供“无限”流的存储抽象;
LogSegment 是 Kafka 中最小的存储单元,将 LocalLog 拆分为多个数据段,并映射到相应的物理文件上。
为了实现 Kafka 的 100% 兼容性,AutoMQ 复用了除了存储层以外的全部逻辑。对于新的存储实现,AutoMQ 必须确保依然能提供 Partition 抽象,以便 ReplicaManager、Coordinator 和 Kraft 等 Kafka 模块可以顺利接入。
虽然 Kafka 对外暴露的是通过 Partition 实现的连续流(Stream)抽象,但其内部很多操作仍然依赖 segment 概念,例如:内部的 Compaction(压缩)流程、Kafka 的日志恢复机制、事务与时间戳索引机制,以及读取操作等。
AutoMQ 仍然使用与 Kafka 类似的 Segment 概念,但在 Segment 之上引入了 Stream 抽象,以便将数据卸载到对象存储中。在 API 层面,Stream 的核心方法是:Append(追加写入)和 Fetch(拉取读取)。
与 Kafka 的 Log 相比,AutoMQ 的 Stream 缺少索引、事务索引、时间戳索引以及 Compaction(数据压缩)机制。
为了与 Kafka 的元数据和索引组织方式保持一致,AutoMQ 的 Stream 包含以下内容:
Meta stream 提供类似键值对(KV)的语义,用于在 Partition 层面存储元数据。在 Apache Kafka 中,可以通过扫描文件系统目录树来列出某个 Partition 下的所有 Segment。而在 AutoMQ Kafka 中,Meta S3Stream 使用 ElasticLogMeta 来记录 Segment 列表以及 Segment 与 Stream 的映射关系,这也有助于避免向 Object storage 发送 LIST 请求。
Data stream 负责 Stream 与 Segment 数据之间的映射。它已经具备了基于逻辑 Offset 查询数据的能力,因此可以替代 Kafka 中的 xxx.data 和 xxx.index。
Txn/Time streams 对应于 Kafka 中的 xxx.tnxindex 和 xxx.timeindex 文件。
与 Kafka 的 Segment 抽象仅限于文件系统操作不同,Stream 还承担了更多职责:从缓存消息、写入预写日志(Write-Ahead Log),到异步地将数据卸载至 S3。
06 Shared Nothing 与 Shared Disk 的融合
Shared Nothing 和 Shared Disk 各有优劣。前者可以更高效地执行写入操作并缓存数据;后者的存储方式则具备在不同节点间共享数据的效率优势。理论上,当数据存储在对象存储中时,任何 Broker 都可以读取和写入任何 Partition。
在 Kafka 最初的 Shared Nothing 架构中,Partition 与节点是一一绑定的。读写请求只能访问拥有相应 Partition 的节点。这种绑定机制不仅用于识别处理请求的节点,也用于实现负载均衡。因此,当采用 Shared Disk 架构构建替代方案时,厂商仍需考虑“数据本地性”的问题。
以 WarpStream 为例,它在写入流程中绕过了“数据本地性”,任何与客户端处于同一个可用区(AZ)的 Agent 都可以处理写入操作。但在读取请求方面,仍必须由负责对应数据的 Agent 来处理(参考缓存管理部分的描述)。
尽管 AutoMQ 的设计是将数据完全存储在对象存储中,它依然希望 Broker 知道自己负责哪些 Partition。AutoMQ 有意保留与 Kafka 一样的“数据本地性”特性,即为每个 Partition 分配特定的 Broker。
07 吞吐量
无状态 Broker 的职责比 Kafka broker 要多得多。在 Kafka 中,Broker 将所有存储相关的工作交给操作系统处理;但对于运行在对象存储上的 Kafka 兼容方案,Broker 必须自行负责在内存中缓存数据、上传、Compaction 以及解析对象存储中的数据。
如果设计不够谨慎,这些额外流程会给 broker 带来大量开销——Compaction 过程若管理不当,甚至会影响常规写入请求的处理。
在 AutoMQ 中,存在如下几类网络流量:
消息发送流量 (Message-sending Traffic):Producer -> AutoMQ -> S3
实时读取消费流量 (Tail Read Consumption Traffic):AutoMQ -> Consumer
历史消费流量 (Historical Consumption Traffic):S3 -> AutoMQ -> Consumer
Compaction 读取流量 (Compaction Read Traffic):S3 -> AutoMQ
Compaction 上传流量 (Compaction Upload Traffic):AutoMQ -> S3
为了避免在带宽受限的情况下各类流量相互竞争,AutoMQ 将上述流量按优先级分为四个层级:
Tier-0 消息发送流量
Tier-1 冷读消费流量 (Catch-up Read)
Tier-2 Compaction 读写流量 (Compaction Read/Write Traffic)
Tier-3 热读消费流量 (Chasing Read)
AutoMQ 基于优先级队列和令牌桶算法,实现了一个异步多层级速率限制器,用以保障各类流量的分层隔离和服务质量。
令牌桶(Token Bucket:):令牌桶是一种速率限制算法,定期为“桶”内填充令牌,每个令牌代表一个请求进行的许可。当桶为空时,延迟或放弃请求以防止系统过载。
对于 Tier-0 级别请求,限流器不用于流量控制。
对于 Tier-1 到 Tier-3 级别请求,当可用令牌不足时,请求会按优先级顺序排队。定期添加令牌到令牌桶时,回调线程会被激活,尝试处理已排队的请求。
08 跨越 AZ 的流量成本
如背景部分所述,原始 Kafka 的设计可能会导致跨 AZ 传输费用飙升,主要原因有两点:
Producer 可能会向不同可用区的 Leader 产生数据。(1)
Leader 必须将数据复制到不同区域的两个 Followers 。(2)
对于基于 S3 构建的解决方案,第 (2) 点可以较为容易地解决,因为对象存储自身就支持数据复制。而第 (1) 点则更为复杂。
像 WarpStream 和 Bufstream 这样的方案尝试通过修改 Kafka 的服务发现协议来规避这个问题。在 Kafka 中,Producer 在发送消息之前,必须先通过 metadata 请求获取指定 Partition 的 Leader 信息,通常该请求会发送到一组 Bootstrap servers 上。WarpStream 或 Bufstream 会尝试让与 Producer 处于同一可用区的 Broker 响应这个 Metadata 请求。对它们而言,任何 Broker 都可以写入消息,并不存在“ Leader ”的概念。
而 AutoMQ 的处理方式则有所不同,因为它仍希望像 Kafka 一样保留“数据本地性”的设计。为了解决跨 AZ 的传输成本问题,AutoMQ 引入了一种基于 S3 实现的 WAL(预写日志)机制。例如,假设 Producer 位于 AZ1,Partition 2(P2)的 Leader Broker(B2)位于 AZ2,而 AZ1 中还有另一个 Broker(B1)。
Producer 依然会向一组 Bootstrap brokers 发起 Metadata 请求,同时携带其所在可用区(AZ)的信息。在 AutoMQ 端,Broker 会根据一致性哈希算法跨不同 AZ 进行映射。假设 AutoMQ 将 B2 分配在 AZ2,而 B1 分配在 AZ1。由于 AutoMQ 从 Metadata 请求中得知 Producer 位于 AZ1,它就会返回 B1 的信息;如果 Producer 位于 B2 所在的 AZ2,则会返回 B2 的信息。核心理念是:确保 Producer 总是与同一 AZ 中的 Broker 通信。
Producer 获取到 B1 的信息后(注意,这个 Broker 实际上并不负责目标 Partition),就开始向 B1 发送消息。B1 会将消息先缓存在内存中,并异步将其作为 WAL 数据写入对象存储(S3)。
当消息成功写入 S3 后,B1 会向实际负责该 Partition 的 Leader Broker——B2 发起一次 RPC 请求,告知其临时数据的写入情况,包括数据在 S3 中的位置(此过程会产生少量跨 AZ 的 Broker 间流量)。随后,B2 会从 S3 中读取该临时数据,并将其追加写入到目标 Partition(P2)中。一旦 B2 完成数据写入,它会回复 B1,B1 随后再向 Producer 发送最终的确认(Acknowledgment)。
结束语
感谢大家读到这里。本文从构建兼容 Kafka 的对象存储解决方案这一趋势出发,结合我对这类系统实现挑战的好奇,逐步展开讨论。接着,探讨了几个值得关注的维度,比如延迟、IOPS 和 Kafka 协议兼容性。在识别每个维度可能遇到的问题后,又进一步分析了 AutoMQ 是如何尝试解决这些问题的。
需要特别说明的是,我并不是 Kafka 方面的专家,只是对这个系统非常感兴趣,并希望能把自己的学习过程分享给社区。所以如果文中有任何不准确之处,欢迎指出和交流。
来源:极客邦科技