Spark SQL 表达式优化策略与实践

摘要:导读随着数据量的快速增长,传统的数据处理方法难以满足对计算速度、资源利用率以及查询响应时间的要求。为了应对这些挑战,Spark SQL 引入了多种优化技术,以提高查询效率,降低计算开销。本文从表达式层面探讨了 Spark SQL 的优化手段,重点分析了三个方面

导读随着数据量的快速增长,传统的数据处理方法难以满足对计算速度、资源利用率以及查询响应时间的要求。为了应对这些挑战,Spark SQL 引入了多种优化技术,以提高查询效率,降低计算开销。本文从表达式层面探讨了 Spark SQL 的优化手段,重点分析了三个方面。一,LIKE ALL 与 LIKE ANY 通过重新设计表达式的基类 LikeAllBase 来避免内存和栈空间消耗过大的问题。二, Trim 函数通过重构原有代码,将通用的逻辑抽象到父类中,减少了冗余的代码并提高了代码复用性,优化了执行效率。三,常量折叠通过在查询优化阶段计算常量表达式的值,并将其缓存,避免了在查询执行时重复计算,从而显著提升了性能。

今天的介绍围绕下面六点展开:

1. 数新智能大数据产品

2. Spark SQL 课程前情回顾

3. Spark SQL 分析层原理

4. LIKE ALL 与 LIKE ANY 优化

5. 改进 TRIM 函数

6. 常量折叠

分享嘉宾|耿嘉安 浙江数新网络有限公司 高级架构专家

编辑整理|叶子涵

内容校对|李瑶

出品社区|DataFun

01

首先要介绍的是赛博数智引擎(CyberEngine)。从其架构图可以看出,CyberEngine 基于统一的元数据服务,能够支持 K8S、Yarn 等资源管理系统的任务调度。上层则支持多种机器学习框架和流批处理引擎,例如 Spark、Flink 等。除此之外,CyberEngine 还具备了自有的 CyberSQL 以及调度引擎 CyberScheduler 等功能。

今天的主题主要讲的是 Spark,因此将特别介绍 Cyber Engine 中针对 Spark 进行的大量的性能优化和稳定性提升。同样,Flink 也做了许多稳定性方面的优化。此外,CyberEngine 基于 Spark 实现了统一的 SQL 查询平台——CyberSQL,能够支持机器查询、联邦查询等多种查询方式,补充了非 ETL 场景下的大数据处理需求。

第二个产品是赛博数据平台(CyberData),这是数新智能研发的一站式多云大数据管理平台。它不仅支持流批一体,还支持湖仓一体、数智一体等多种架构。

说到多云,CyberData 支持公有云、私有云以及混合云的部署方式。在这些云基础架构上,CyberData 还能够支持一些非云场景,帮助用户实现数据治理、数据开发、数据安全、数据质量等数据中台能力。

第一讲主要探讨了 Spark 内核的原理;第二讲详细讲解了 Spark SQL 的原理,涵盖了其实现方式以及用户提交的 SQL 在 Spark 中的处理流程;第三、四讲,从解析层开始,逐层深入到分析层,选取了几例典型的案例,介绍了每一层的详细设计,并分享了每一层的优化案例。今天,将讨论 Spark SQL 的表达式优化。

03Spark SQL 分析层原理

Spark 中的表达式究竟在什么时候产生呢?首先,表达式的生成并不在解析层完成解析时进行。此时,Spark 只是解析出未经分析的物理计划和其他内容,尚不能称为表达式。

表达式的生成发生在分析层。具体来说,分析层通过解析器(Parser)将初始的未解析状态对象转换为可供 Spark 执行引擎使用的结构。分析过程依赖于多个组件,如 Catalog Manager、Catalog Plugin 和函数注册表等,这些组件帮助将物理计划转化为准确且具有实际意义的逻辑计划或表达式。

在分析层,Spark 生成的表达式有以下七种主要和常见的表达式类型。

标量子查询:标量子查询是返回单个值的子查询。例如,WHERE … IN (SELECT y FROM z) 返回一个唯一值,称为标量子查询。非标量子查询:与标量子查询返回单个值不同,非标量子查询则返回一个多值的数据结构,如数组或线性结构,通常涉及多条数据记录。函数:函数表达式包括各种常见的字符串函数、数学函数、日期函数等。Spark 支持数百种函数,类似的数据库通常也支持更多的函数。函数通常用于数据转换或计算中,是最常见的表达式类型之一。聚合函数:聚合函数如 MAX、MIN、COUNT 等,常用于对数据进行汇总或统计。在 Spark 中,聚合函数对列存储格式(如数据湖中的列存)具有重要意义,特别是在优化查询时。分析函数:分析函数,如窗口函数,是聚合函数的扩展,提供了如排名、累积和滑动平均等功能。分析函数通常与窗口函数结合使用,共同为数据分析提供更多维度的处理能力。条件表达式:条件表达式通常用于表达式的选择逻辑,例如 IF、ELSE。在 Spark SQL 中,可以使用这些条件表达式来控制计算流程,并根据条件执行不同的操作。常量与变量:常量是在 SQL 查询中具有固定值的表达式。例如,在 WHERE 子句中,日期字段等于某个特定日期时,该日期被视为常量。常量值不随查询执行的环境变化而变化。变量在 Spark SQL 中表示动态变化的值。它们可能由 Spark 内部生成,如随机数,或者作为表字段存在并随数据行变化。例如,某个表的字段值可能随着每一行的变化而不同,这类字段可以视为变量。

在介绍完 Spark SQL 中表达式的基本概念和分类后,接下来我们进入今天的第一个优化点:LIKE ALL 与 LIKE ANY 的优化。

在 SQL 中,LIKE 是常用的模式匹配操作符,通常用于查询符合特定模式的记录。例如,假设有一个班级数据库表,其中包含每位学生的姓名,若要查询所有姓张的学生,可以使用类似 LIKE '张%' 的语法进行匹配。

随着 SQL 语法的发展,很多数据库(包括 Spark SQL)支持在 WHERE 条件中使用多个 LIKE 模式进行匹配。若每个模式都使用单独的 LIKE 子句,查询语句会显得冗余。为此,LIKE ALL 和 LIKE ANY 语法应运而生。

LIKE ALL:表示字段的值必须与多个模式中的每一个都匹配。只有当值同时匹配所有模式时,条件才会成立,谓词才会返回 TRUE。这种方式需要逐一匹配所有模式,因此不存在短路现象。LIKE ANY:表示字段的值只需要与多个模式中的任意一个匹配即可。如果匹配到任意一个模式,条件就返回 TRUE。由于支持短路操作,一旦匹配成功,后续的模式匹配就会被跳过,因此性能通常优于 LIKE ALL。

总结来说,LIKE ALL 需要完全匹配所有模式,性能较差,而 LIKE ANY 由于短路特性,性能相对较好。

在 Spark SQL 的实现中,LIKE 是通过表达式来表示的,可以简单理解为一个判断函数,检查某个值是否符合指定模式。每个 LIKE 操作会生成一个相应的表达式对象。在 JVM 中,这些表达式会分配为对象,并通过继承机制生成新的类。

对于 LIKE ALL 和 LIKE ANY,Spark SQL 采用了复用现有 LIKE 表达式功能的方式。例如,LIKE ALL 会通过将多个 LIKE 表达式通过 AND 操作符连接起来实现。

尽管 Spark SQL 通过 LIKE ALL 和 LIKE ANY 简化了多个模式匹配的实现过程,但在某些情况下,尤其是当模式数量极大时(如 1 万个模式),这种方式可能引发性能问题:

内存占用:每个 LIKE 表达式都会在 JVM 中分配对象,如果模式数量非常多,会导致内存消耗急剧增加。频繁的垃圾回收(GC):由于大量的对象分配和回收,可能会导致频繁的垃圾回收,甚至发生 OOM(内存溢出)错误。栈空间消耗:Spark SQL 中的表达式采用树状结构。如果嵌套多个 AND 或 LIKE 表达式,树的深度将增加,进而消耗大量栈空间,可能导致栈溢出错误(StackOverflowError)。

为了解决 LIKE ALL 性能问题,可以通过重新设计表达式的基类来避免内存和栈空间消耗过大的问题。

具体的优化方法是重新设计一个基类 LikeAllBase,其中包含一个 sequence 数组,用于存储多个模式(patterns)。通过这种方式,可以避免之前在表达式中直接重复多个 LIKE 条件,而是通过统一的结构来处理多个模式的匹配。

特别地,此类设计中使用的是 UTF8String 而非 Java 自带的 String 类型。因为 UTF8String 是 Spark 内部用于表示字符串的类,具有比 Java 原生字符串更高效的内存管理和性能。

LikeAllBase 类的引入使得 LIKE ALL 和 LIKE ANY 的处理更具可扩展性。在这个基类中,最重要的部分是实现 eval(input: InternalRow) 方法。这个方法的核心逻辑是:

空值检查:首先判断输入值是否为空,若为空则无需继续匹配。模式匹配:对于非空值,逐一与每个模式进行匹配。

与之前的设计相比,这种实现方式避免了在每个模式匹配过程中动态申请栈空间的问题,从而避免了栈空间的深度增加和栈溢出的风险。

通过使用 LikeAllBase 类的设计,Spark SQL 在处理 LIKE ALL 时只需维护一个对象,而不是像 AND 连接多个 LIKE表达式时那样生成大量的对象。这不仅减少了 JVM 内存的消耗,也降低了频繁垃圾回收(GC)和内存溢出(OOM)错误的发生概率。

05改进 TRIM 函数

在许多编程语言中,像 Java,trim 函数可以直接应用于字符串变量,用于去除字符串两端的空格。在 Spark SQL 中,也支持类似的 TRIM 函数,通常用于去除 SQL 查询中字符串值、变量或属性两端的空白字符。这一功能在数据清洗和处理时非常实用,可以有效去除不必要的空格,确保数据的一致性。

尽管 TRIM 函数的功能简单明了,但在一些情况下,它的实现和性能可以进一步优化。通过对其底层实现进行改进,可以提升执行效率,尤其是在大数据量的操作中。

首先,可以看到优化前的代码部分(标记为红色区域),这部分代码将在优化过程中被删除。优化后的代码(标记为绿色区域)则包含了新增的逻辑。

优化前,Spark SQL 中的 string trim 函数注册名为 Trim。该函数的实现依赖于一个名为 eval 的接口,这是函数计算逻辑的核心部分。具体来说,eval 函数实现了 TRIM 操作的主要计算逻辑。尽管 TRIM 函数的功能简单,其实现代码结构并不复杂。在优化过程中,主要调整了函数的结构,简化了冗余部分,以提高性能和可维护性。

除了支持标准的 TRIM 函数,Spark 还提供了 TRIM LEFT 和 TRIM RIGHT 函数,用于分别移除字符串左侧和右侧的空格。

TRIM LEFT:仅删除字符串开头的空格,保留末尾的空格。TRIM RIGHT:仅删除字符串末尾的空格,保留开头的空格。

TRIM LEFT & TRIM RIGHT 与 TRIM 的实现结构非常相似,都包含一个 eval 函数,该函数的逻辑结构基本相同。为了提高性能并减少栈深度,Spark 在版本 1.4 和 1.5 中引入了 doeval 函数,动态生成 Java 类,并将计算过程平铺至这个类中,从而避免了多层函数调用和栈操作,提升了执行效率。

TRIM LEFT & TRIM RIGHT 与 TRIM 的实现结构非常相似,都包含一个 eval 函数,该函数的逻辑结构基本相同。所以在优化过程中,重复的逻辑被删除,所有相同的逻辑被提取到父类中,从而实现代码复用。无论是 TRIM LEFT 还是 TRIM,它们都继承自相同的基类 String2TrimExpression。通过将通用逻辑抽象到父类中,优化后的代码结构变得更加简洁和清晰。

虽然该优化可能不像第一个例子那样直接提升性能和稳定性,但它通过精简代码、减少冗余函数调用,在 JVM 层面也能带来性能的改善。精简后的代码结构使得 Spark 的执行效率和稳定性都得到了提升。

常量折叠是编译原理中的一个基础概念,旨在优化代码中的常量计算。无论是在编程语言还是在 SQL 查询中,当遇到常量表达式时,编译器或解释器会在编译或执行前计算出结果,避免重复计算,从而提高效率。

常量折叠的原理:以 Python 中的示例 a = 1 + 2 为例,假设这段代码写在一个函数中。如果每次调用函数时,Python 都重新计算 1 + 2,这显然不是高效的做法。实际上,Python 在执行时会将 1 + 2 计算出结果 3,然后将该结果赋值给变量 a,而不会每次都重新计算。常量折叠的条件:表达式中的常量在运行前即可确定其结果,且该结果可以在程序运行期间反复使用。例如,在 a = 1 + 2 中,1 + 2 是常量表达式,其结果可以在编译时预先计算。常量折叠的过程:常量折叠的过程类似于树状结构的遍历。以 a = 1 + 2 为例,首先分析该表达式的结构。假设 = 是根节点,a 是左子节点,1 + 2 是右子节点,而 1 + 2 又是一个加法表达式,其中 1 和 2 是常量。因此,编译器可以将 1 + 2 直接折叠为 3,从而简化表达式。最终,表达式 a = 1 + 2 会变成 a = 3,避免了重复计算。常量折叠的效果:编译器或执行引擎能够消除不必要的运算,提高执行效率,同时也使得代码更加简洁。

在 SQL 查询中,常量折叠优化是一种基础而重要的性能提升手段。通过常量折叠,可以避免在执行查询时重复计算已知的常量表达式,从而减少计算开销。

例如,在 SELECT 1 + 2 AS a, col FROM Tab; 这个查询中,1 + 2 是一个常量表达式, a 是该表达式的别名,col 是表 Tab 中的某个字段。每次执行此查询时,如果不做优化,Spark SQL 可能会在每一行数据处理时都重新计算 1 + 2,这显然是低效的,尤其当表中有大量数据时。

为了避免这种不必要的重复计算,Spark SQL 在优化阶段会对常量表达式进行折叠。常量折叠是在编译过程中将常量表达式(如 1 + 2)计算为一个固定值(如 3)。因此,查询优化后,执行时 1 + 2 不需要每次重新计算,只需要计算 3 + col,即常量 3 与字段 col 进行运算。这种优化大大减少了计算的复杂性,特别是在处理大数据量时,可以显著提高性能。

3.Spark SQL 中的 array_insert 函数与常量折叠优化

array_insert 函数用于向数组中的某个指定索引位置插入元素。比如向数组的第 5 个位置插入元素 array_insert(array(1, 2, 3), 5, 10),向数组的倒数第 1 个位置插入元素 array_insert(array(1, 2, 3), -1, 10)。

其中,数组的索引可以是常量或变量。当索引是常量时,Spark SQL 会在查询优化阶段直接计算常量索引值并缓存,而不是在每次执行时都重新计算。比如,即使我们将索引写为表达式(例如 2 + 3),Spark SQL 会在运行时将其计算为常量值 5,而不会在每次查询执行时都重新计算该表达式。通过这种常量折叠,Spark SQL 可以提高查询性能,避免重复计算常量索引值,从而减少计算开销。

总之,array_insert 函数提供了一个方便的方式来在数组中插入元素,同时结合常量折叠优化,能够进一步提升查询效率。

以上就是本次分享的内容,谢谢大家。

来源:DataFunTalk

相关推荐