摘要:在现代分布式查询引擎中,数据过滤优化是提升查询性能的关键技术。Trino 提供了两种重要的过滤机制:静态谓词下推(Static Predicate Pushdown)和动态过滤(Dynamic Filtering)。这两种技术都能显著减少数据扫描量,但在实现方
在现代分布式查询引擎中,数据过滤优化是提升查询性能的关键技术。Trino 提供了两种重要的过滤机制:静态谓词下推(Static Predicate Pushdown)和动态过滤(Dynamic Filtering)。这两种技术都能显著减少数据扫描量,但在实现方式和应用场景上有重要区别。
静态谓词下推是最基础的查询优化技术,它将查询中明确指定的过滤条件直接下推到数据源执行。
工作原理
当用户提交查询时,Trino 查询优化器会分析 WHERE 子句中的条件,并将这些条件转换为数据源可以理解的格式,然后下推执行。
-- 用户查询SELECT customer_name, order_date FROM orders WHERE order_date >= '2025-01-01' AND status = 'completed';-- Trino 下推到 MySQL 的查询SELECT customer_name, order_date FROM orders WHERE order_date >= '2025-01-01' AND status = 'completed';在这个例子中,order_date >= '2025-01-01' 和 status = 'completed' 是静态条件,它们在查询计划阶段就已经确定,并直接下推到 MySQL 数据库执行。
动态过滤是一种更高级的优化技术,主要用于连接查询中。它基于连接操作中一张表的数据来动态生成另一张表的过滤条件。
思路:
收集过滤条件:首先执行连接键值较小的表,收集其连接键的值生成动态条件:基于收集到的键值动态生成 IN 条件应用过滤:将生成的条件应用到大表的查询中-- 用户查询SELECT o.order_id, c.customer_name, o.total_amountFROM orders oJOIN customers c ON o.customer_id = c.customer_idWHERE c.country = 'CHINA' AND c.customer_type = 'premium';-- 第一步 - 执行小表过滤:SELECT DISTINCT customer_id FROM customers WHERE country = 'CHINA' AND customer_type = 'premium';-- 假设结果:[1001, 1003, 1007, 1015, 1023]-- 第二步 - 动态生成条件:o.customer_id IN (1001, 1003, 1007, 1015, 1023)-- 第三步 - 应用到大表,实际发送到订单表数据源的查询SELECT order_id, total_amount, customer_idFROM orders WHERE customer_id IN (1001, 1003, 1007, 1015, 1023);优势:
大幅减少数据扫描:只扫描符合条件的数据降低网络传输:减少在集群中的数据传输量提升查询性能:特别是在大表连接小表的场景中效果显著JdbcDynamicFilteringSplitManagerDynamicFilteringJdbcSplitSourceJdbcSplitJdbcDynamicFilteringSplitManager 检查是否需要等待动态过滤,如果需要则等待动态过滤条件收集完成,创建DynamicFilteringJdbcSplitSource。
@Overridepublic ConnectorSplitSource getSplits(ConnectortransactionHandle transaction,ConnectorSession session,ConnectorTableHandle table,DynamicFilter dynamicFilter,Constraintconstraint){JdbcTableHandle tableHandle = (JdbcTableHandle) table;// 符合条件情况下才会使用动态过滤,否则使用常规 SplitManager 来生成 Splitboolean hasLimit = tableHandle.getLimit.isPresent;if (DynamicFilter == DynamicFilter.EMPTY || hasLimit || !dynamicFilteringEnabled(session)) {return delegateSplitManager.getSplits(transaction, session, table, dynamicFilter, constraint);}return new DynamicFilteringSplitSource(transaction, session, (JdbcTableHandle) table, dynamicFilter, constraint);}DynamicFilteringJdbcSplitSource 将收集到的动态过滤条件附加到每个 Split 。这一步骤是在 Split 生成阶段完成的,也就是在 Coordinator 节点上完成而不在 Worker 节点上。这样避免每个 Worker 节点需等待动态过滤条件的情况,同时考虑到 JDBC 连接器生成的 Split 数量通常较少,附加过滤条件的开销很小。
@Overridepublic CompletableFuture getNextBatch(int maxSize){if (!isEligibleForDynamicFilter(tableHandle)) {return delegateSplitSource.getNextBatch(maxSize);}return delegateSplitSource.getNextBatch(maxSize).thenApply(batch -> {// 获取动态过滤条件TupleDomain dynamicFilterPredicate = dynamicFilter.getCurrentPredicate.transformKeys(JdbcColumnHandle.class::cast);return new ConnectorSplitBatch(batch.getSplits.stream.map(split -> {JdbcSplit jdbcSplit = (JdbcSplit) split;// 为每个 Split 附加动态过滤条件return jdbcSplit.withDynamicFilter(dynamicFilterPredicate);}).collect(toImmutableList),batch.isNoMoreSplits);});}来源:SapphireCoder