摘要:前面章节讨论的都是数据流处理相关的问题,本章开始讨论流计算系统中的数据传输系统。就像楚汉相争天下时,不管刘邦在前线多么气势磅礴、浩浩荡荡,都需要萧何在后方给他及时运送粮草、补充兵马,这样才能够保证刘邦的大军在前线专注于英勇杀敌,并最终取得楚汉相争的胜利。数据传
前面章节讨论的都是数据流处理相关的问题,本章开始讨论流计算系统中的数据传输系统。就像楚汉相争天下时,不管刘邦在前线多么气势磅礴、浩浩荡荡,都需要萧何在后方给他及时运送粮草、补充兵马,这样才能够保证刘邦的大军在前线专注于英勇杀敌,并最终取得楚汉相争的胜利。数据传输系统就是流计算系统中的“萧何”,虽然它不是“打仗”的核心,但是没有它保证“粮草”快速和通顺地运转,流计算系统就不能稳
定、可靠、高效地运行,进而不能发挥出实时流计算系统潜在的价值。数据传输系统承担了整个实时流计算系统的数据传输任务,是遍布实时流计算系统各处的“血管系统”。
数据传输是一个在多系统间进行通信的过程。传统典型的系统间通信方式是远程方法调用,也就是我们常说的RPC,如在Hadoop中广泛使用的AVRO RPC和Protobuf RPC,以及在微服务架构中更加广泛使用的REST。在实时流计算系统中,由于“流数据”的特点,我们通常使用一种与“流”更加契合的通信方式,即基于消息中间件的数据传输方式,如RabbitMQ、Apache Kafka和Apache Pulsar等。
除了消息中间件以外,大多数开源流计算框架在其系统内部实现了自己的数据传输方法。大体上,我们可以将这些在分布式计算节点间进行数据传输的方案理解为一种功能更加专一的跨进程间消息队列。它们的基本功能是在两个进程之间进行数据传输,更复杂的功能则包括数据持久化和反向压力等。我们可以将这些跨进程间的消息队列理解为简化版或定制版的消息中间件。
本章将重点讨论实时流计算系统中多个子系统或业务模块间通过消息中间件进行数据传输的问题。
在计算机领域,但凡在两个不同应用或系统间传递的数据,都可以称为消息。例如,在TCP/IP协议的4层模型中,数据链路层在两个MAC地址间传递的数据可以称为消息,网络层在两个IP地址间传递的数据可以称为消息,传输层在两个套接字间传递的数据可以称为消息,应用层在两个进程间传递的数据也可以称为消息。
在实时流计算应用开发过程中,我们主要关心的是具有业务含义的数据。这些数据在流计算应用的各个业务逻辑处理单元间传递,我们亦称其为消息。这些消息可以表现为字符串、JSON对象或AVRO对象等。当在两个业务逻辑处理单元间传递消息时,需要先将这些消息对象序列化为字节数组,然后经网络传递,最后由消息消费方接收并反序列化,恢复为最初的消息模样。
或许我们会觉得直接在诸如TCP/UDP等网络协议的基础上,开发两个系统之间的消息传递系统是一件简单、轻松的事情。但当我们真正开始着手实现这个系统时,就会发现问题并没有其表面所展现的那么简单。考虑消息需要高性能、高可靠、顺序地传递,系统重启或故障恢复时需要进行消息持久化和多副本存储,消息数据量暴增时需要具备横向扩展处理能力,多种不同平台之间传递数据流时需要实现不用语言的客户端等。可以说,当你千辛万苦完成了所有这些特性的开发时,那么必须恭喜你,你一定已经成为软件开发的大神级人物!但同时,你也一定已经忘记了最初构建业务系统的目的是什么。
当我们专注于实时流计算应用的业务逻辑开发时,急需一个开箱即用并且成熟可靠的数据传输系统。这时候就是消息中间件发挥作用的时刻了。消息中间件替我们解决了流计算系统中数据传输的绝大多数问题。我们只需要使用消息中间件提供给我们的各种API与其对接,就可以轻而易举地实现消息在两个系统之间的传输,而其他关于高性能、高可靠、跨平台等一系列的问题,完全交由消息中间件自行实现和处理。
在此,我们需要“表彰”下消息中间件在帮助我们构建流计算系统过程中的几大功绩。
1.将上下游业务逻辑处理单元解耦
高内聚、低耦合已经是软件领域老生常谈的设计原则了。在第2章讲解数据接收模块时,我们通过队列将数据接收服务内部的几个处理步骤分离开。如果将这个过程放大到更大的系统,将每个处理步骤放大为一个业务模块,那就可以用消息中间件来替换原本在处理步骤间使用的队列。回想我们平时在开发软件系统时,如果有多个开发人员开发业务流程的不同业务模块,那么是不是通过消息中间件将彼此间的开发过程隔离开更好。只要上下游的开发人员间约定好消息的格式,就可以开始各自的开发工作,并且彼此之间的任务边界、责任边界一目了然。当软件上线发生问题时,也只需要查看各个模块的输入/输出是否正常即可,可以非常方便、容易地定位线上问题发生的地方。
2.缓冲消息和平滑流量高峰
第3章谈到,流计算系统是一种天然的异步处理系统。在流计算系统中,上下游之间的业务逻辑的复杂程度不尽相同,从而上下游间的处理速度也会不同。反向压力是解决这种上下游处理速度不一致问题的手段之一。有些时候,线上业务会在某些时间段出现流量高峰。例如,每天早、中、晚3个时间段的广告点击量通常是最高的。又如,商家在做推广活动时,流量突然暴涨也是司空见惯的事情。这个时候,通过消息中间件将短时间内的高峰流量缓存在消息队列中,同时各个业务模块依旧在尽其所能地处理消息队列中积压的消息。如此一来,既保证了系统平稳运行,又最大程度发挥了系统的处理能力。
3.使系统的处理能力能够横向扩展
大部分消息中间件支持M(M≥1)个消息生产者和N(N≥1)个消息消费者的模式。这样,消息的生产者和消费者的数量实际上是完全独立的。消费者来不及处理生产者输入消息中间件的消息时,可以部署更多的消息消费者来处理消息。所以,消息中间件使得流计算系统的横向扩展能力得到显著增强。
4.消息高可靠传递
就像任何软件都有Bug一样,线上服务可能会因为各种各样的原因而失败,可能是软件本身的Bug导致服务进程退出、可能是服务日志没有及时清理而磁盘写满、可能是系统内存不足导致服务进程被操作系统杀掉、可能是云服务厂商的光纤被施工队挖断导致服务器宕机等。
为了保证实时流计算系统中的数据不会因为服务的失败而丢失,最基本的要求是能够对数据进行持久化。而为了提供更可靠的数据恢复保证,通常还会对数据进行多副本保存。当消息中间件和服务重启时,服务失败前尚未处理的消息不会丢失掉。越来越多的消息中间件提供了消息的可靠传递机制,如最少一次传递,甚至是精确一次传递,让我们能够更加专注于业务逻辑的开发,而不是将宝贵的精力和时间耗费在底层的消息传递可靠性保证上。
5.消息的分区和保序
或许有些读者会觉得分区和保序怎么能够搅合在一起?实时上,这是一种de facto的最佳实践经验。如果消息中间件没有提供消息的分区功能,那么要实现保序就只能由客户端使用单一线程来读取消息,然后按照特定的key来将消息分发到多个工作线程的任务队列中去。如若不然,就不能保证消息是严格按照其输入的顺序来被处理的。这是因为多线程的执行通常是相互抢占的,先拿到消息的线程可能会在较后的时间执行,这样就破坏了消息处理的时序。而如果将所有消息都交由同一个线程来处理,这或多或少会掣肘并发度的提高。
而如果消息在一开始输入消息中间件时就按照特定的key进行分区并保证分区内的顺序,那么只需要给每个分区分配一个线程来消费和处理消息,就能够在保证消息在业务逻辑上有序的同时,大幅提高系统的并发处理能力。这种用分区消息局部有序性来取代全体消息整体有序性的做法,在很多业务场景下都能够满足对消息顺序的要求,同时不会影响处理性能的水平扩展,是一种很好的实践经验。
以上就是我们总结的消息中间件的几点“功绩”了。必须提到的是,以消息中间件为核心的SOA系统架构模式曾经深深影响过一代系统开发人员。直到现在,SOA系统架构模式还在许多企业级架构中发挥着重要作用。虽然SOA系统架构模式不是本章重点,但仍然建议读者自行查阅相关资料了解其历史。毕竟“以史为鉴,可以知兴替”,通过对系统架构模式演进过程的研究,我们会更加深刻地理解现代软件系统架构。
消息中间件最简单的工作模式是点对点模式,即我们经常听到的点对点(Point-to-Point,P2P)模式。图8-1展示了消息中间件点对点模式的工作原理。用Java中的BlockingQueue来描述点对点模式是非常合适的,消息生产者将消息发送到消息中间件的某个队列中,同时消息消费者从这个队列的另一端接收消息。生产者和消费者之间是相互独立的。点对点模式的消息中间件支持多个消费者,但是一条消息只能由一个消费者消费。
发布/订阅模式是消息中间件的另一种工作模式。发布/订阅模式的功能更强,使用场景更多,是大多数消息中间件的主要工作模式。
图8-2展示了消息中间件发布/订阅模式的工作原理。在发布/订阅模式中,我们先定义好一个具有特定意义的主题(topic),消息生产者将所有属于这个主题的消息发送到消息中间件中代表这个主题的消息队列上,然后任何订阅了这个主题、对该主题感兴趣的消息消费者都可以接收这些消息。发布/订阅模式使得消息生产者和消息消费者之间的通信不再是一种点到点的传输,而是由消息中间件作为代理人统一管理消息的接收、组织、存储和转发,这样减少了系统中所有生产者和消费者之间的连接数量,从而降低整个系统的复杂度。和点对点方式不同,发布到主题的消息会被所有订阅者消费。
发布/订阅模式存在一个负载均衡问题。当发布者消息量很大时,单个订阅者的处理能力逐渐变得不足,于是我们将多个订阅者节点组成一个订阅组以共同处理某个主题的消息,这样在订阅组内部的订阅者节点之间就实现了负载均衡,使得消费者的处理能力能够水平扩展。图8-3展示了消息中间件中一种带负载均衡功能的发布/订阅模式。
消息中间件包揽了消息传递过程中的大部分事情,当我们开始业务模块的开发时,只需要定义好业务模块和业务模块之间的通信协议(也就是消息模式)即可。所谓消息模式,也可以说是消息的定义,再说白点儿,就是定义消息有哪些字段、字段的类型、字段的排列顺序、字段是否必需等。或许我们觉得这不就是定义消息吗,列出文档不就可以了。但是在实际产品开发过程中,消息模式随产品版本的迭代和更新,有时候会成为一个非常恼人的问题,就像睡觉时耳边嗡嗡做势的蚊子。所以,这里我们要讨论下几种消息模式的处理方式。
1.无模式(弱模式)
无模式(弱模式)也是一种模式。很多Java开发者尤其喜欢用类来表达一个实体,这是非常好的习惯。但是像Python这样动态语言的开发者则并不是十分热衷于预先规定一个对象必须有哪些字段,他们只需要大致知道一个对象有哪些字段,然后在程序需要某个字段的地方有那个字段即可。当数据在实时流系统中被逐步处理和进行信息增强时,一些临时字段、可选字段、推导字段会逐渐附加到消息上。这个时候,使用无格式的消息模式未必不是一种好的选择。以JSON为代表的一类数据格式是无模式消息的典型,也就是说不需要schema文件就可以将其从字节数组反序列化为JSON对象。笔者将这种使用JSON表示数据,直接操作JSON字段,实现数据处理逻辑的设计和开发方式,称为“无schema编程”。当在处理流式数据过程中,需要增加一些消息字段时,这是一种方便、灵活的解决方案。需要注意的是,无schema编程对程序开发者要求较高,需要非常注意在程序的各个地方检查相关处理逻辑必要字段的完整性和合法性。另外,要遵守“数据不变性原则”,即可以增加消息的字段,但是不要修改和覆盖消息的原有字段。换言之,我们对消息的处理仅仅是增强信息,而不是修改信息。
2.强模式
定义严格的数据模式,是大家喜闻乐见的事情,不仅仅是开发,也包括产品、测试、运维和售前。以Avro、Thrif、Protocol buffers为代表的数据组织方案是强消息模式的代表。强模式的好处是字段在反序列化为对象的时候,自动对字段的完整性进行检查,再配合类的定义,使用起来更加方便、高效和安全。当采用强模式表达消息时,应该尽可能地选择既支持前向兼容也支持后向兼容的方案。前向兼容是指当schema新加字段后,如果以旧schema保存的二进制数据用新schema反序列化,那么新加字段应该设置为默认值,而不是抛出异常。所谓后向兼容是指当schema新加字段时,如果以新schema保存的二进制数据用旧schema反序列化,那么新加字段应该被忽略,而不是抛出异常。这样做的原因在于,实际生产中线上系统非常可能混合新旧两种schema对应的二进制数据。例如,在新版本客户端SDK发布后,市面上仍旧会有很多旧版本客户端SDK的用户。虽然Avro、Thrif、Protocol buffers等序列化框架都支持前、后向兼容,但是产品迭代更新schema时,还是应该尽量保证消息字段的一脉相承,并且应该仔细阅读各种序列化框架前、后向兼容的限制条件,不要随意删除字段、修改字段名称和调整字段顺序,否则稍不注意就会出现反序列化结果和预期不一致的问题。
3.版本控制
任何时候,给接口或协议添加版本控制都是明智之举,消息模式亦是如此。在消息模式的首位添加一个版本控制字段总归是好的,这样当数据模式被改得面目全非时,依旧能够通过不同版本执行不同逻辑分支的方式留下处理所有新旧格式消息的余地。另外,版本控制有助于数据处理失败时的问题追溯和分析。
第1章已经讨论了在消息中间件选型时的一些基本考量因素,如吞吐量、延迟、高可用、持久化和水平扩展。这里,我们补充几个在实际开发中需要考虑的问题。
1.消息传达的可靠性
大多数流计算平台会对消息传达的可靠性做出一定程度的保证,如尽力而为、至少一次或精确一次等。关于这点,我们在前面对比各种开源实时流计算平台时已有所讨论,这里不再赘述。作为流计算系统中数据的传输中枢,消息中间件自身对消息传递可靠性的保证亦是如此。很多开发者认为“精确一次”理所当然是最好的消息可靠性保证机制,有了它就不用考虑任何消息传递失败的问题。但笔者认为,不管是消息中间件还是流计算平台,大多数情况下都不要过度依赖消息中间件能够提供给你“精确一次”的保证。一方面,保证“精确一次”会非常显著地降低系统性能;另一方面,不同系统所承诺的“精确一次”语义或多或少有所区别,使用起来还有一定限制。
基本上,所有“精确一次”级别的可靠性保证机制都是通过框架内部的一套封闭且完备的逻辑来实现的。所以,开发者想要使用“精确一次”级别的可靠性服务,就必然限定在框架提供的SPI或服务下完成,如Storm的ACK机制和Trident、Flink的checkpoint机制和KeyedState及Operator State等。相比实现一个独立且完整的模块,一边开发业务逻辑一边还得照顾框架本身的工作机制实在是一件琐碎的事情。另外,当我们需要与不受框架约束的外部存储(如文件、数据库等)交互时,就脱离了流计算框架的保护,到头来开发者还是需要自己去保证消息的“精确一次”。如果我们不明真相,以为框架提供了“精确一次”就万事大吉,忽略了与外部存储交互时对失败的处理,那么开发出来的程序就很不安全了。总的来说,消息中间件至少应该能够提供“至少一次”级别的传
达可靠性保证。至于“精确一次”级别的可靠性,其实现起来更加复杂,使用起来也有更多约束条件,性能也需要考量。“精确一次”和“至少一次”之间的差异就像lock和try lock之间的区别。相比前者,选择后者时更显乐观,但使用时需要更加谨慎,如此才能在保证结果正确的同时获得更好的性能表现。
2.消息重放
昨日重现,或者朝花夕拾,都是美好的事情。消息重放是流计算系统中时不时会遇到的问题。重放的原因可能是某段时间消息处理程序崩溃,需要补跑数据,也可能是模型更新,需要重新训练模型参数。实现消息重放最“直接”的方式是将消息从其保存的地方重新拉出来,再次发往消息中间件,但这种方法可能并非最优。将消息从块存储设备读出来,还原消息格式和顺序,再重新发往消息中间件,整个过程都涉及比较多的定制开发工作。特别是当需要重放的主题比较多时,定制开发工作会变得十分烦琐。好在现在越来越多的消息中间件开始支持数据存储功能。也就是说,它们不再是简单的消息发布/订阅系统,还是流数据存储系统,可以将一定时间范围内的数据流保存下来。当需要重放消息时,只需将“播放点”设置到重放开始的地方,即可完美复现之前的数据流。
来源:大数据架构师