Skip to content

Commit 844c38d

Browse files
suxiaogang223morningman
authored andcommitted
[enhance](runtime filter) impl partition pruning in runtime filer (#47025)
This PR implements partition pruning through runtime filters. When executing a SQL query like: ```sql SELECT count(*) FROM int_partition_table WHERE partition_col = ( SELECT partition_col FROM int_partition_table GROUP BY partition_col HAVING count(*) > 0 ORDER BY partition_col DESC LIMIT 1 ) ``` During execution, the backend (BE) will receive a dynamic runtime filter condition `partition_col = xxx`. Since partition_col is a partitioning column, we can use its value to determine if the partition can be pruned. Additionally, this mechanism also supports filtering queries like: ```sql SELECT count(*) FROM int_partition_table WHERE func(partition_col) = xxx ``` If func cannot be evaluated at the frontend (FE), the frontend will not perform partition pruning. However, since the backend can compute func, this mechanism allows us to handle pruning scenarios that are not possible at the frontend, providing a more efficient pruning process on the backend side.
1 parent 320e3b2 commit 844c38d

File tree

20 files changed

+405
-9
lines changed

20 files changed

+405
-9
lines changed

be/src/vec/core/block.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ void Block::initialize_index_by_name() {
148148
void Block::insert(size_t position, const ColumnWithTypeAndName& elem) {
149149
if (position > data.size()) {
150150
throw Exception(ErrorCode::INTERNAL_ERROR,
151-
"invalid input position, position={}, data.size{}, names={}", position,
151+
"invalid input position, position={}, data.size={}, names={}", position,
152152
data.size(), dump_names());
153153
}
154154

@@ -165,7 +165,7 @@ void Block::insert(size_t position, const ColumnWithTypeAndName& elem) {
165165
void Block::insert(size_t position, ColumnWithTypeAndName&& elem) {
166166
if (position > data.size()) {
167167
throw Exception(ErrorCode::INTERNAL_ERROR,
168-
"invalid input position, position={}, data.size{}, names={}", position,
168+
"invalid input position, position={}, data.size={}, names={}", position,
169169
data.size(), dump_names());
170170
}
171171

be/src/vec/exec/scan/vfile_scanner.cpp

+161-6
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,30 @@
2020
#include <fmt/format.h>
2121
#include <gen_cpp/Exprs_types.h>
2222
#include <gen_cpp/Metrics_types.h>
23+
#include <gen_cpp/Opcodes_types.h>
2324
#include <gen_cpp/PaloInternalService_types.h>
2425
#include <gen_cpp/PlanNodes_types.h>
26+
#include <glog/logging.h>
2527

28+
#include <algorithm>
2629
#include <boost/iterator/iterator_facade.hpp>
2730
#include <iterator>
2831
#include <map>
32+
#include <ranges>
2933
#include <tuple>
34+
#include <unordered_map>
3035
#include <utility>
3136

3237
#include "common/compiler_util.h" // IWYU pragma: keep
3338
#include "common/config.h"
3439
#include "common/logging.h"
3540
#include "common/object_pool.h"
41+
#include "common/status.h"
3642
#include "io/cache/block/block_file_cache_profile.h"
3743
#include "runtime/descriptors.h"
3844
#include "runtime/runtime_state.h"
3945
#include "runtime/types.h"
46+
#include "util/runtime_profile.h"
4047
#include "vec/aggregate_functions/aggregate_function.h"
4148
#include "vec/columns/column.h"
4249
#include "vec/columns/column_nullable.h"
@@ -67,6 +74,7 @@
6774
#include "vec/exec/scan/vscan_node.h"
6875
#include "vec/exprs/vexpr.h"
6976
#include "vec/exprs/vexpr_context.h"
77+
#include "vec/exprs/vexpr_fwd.h"
7078
#include "vec/exprs/vslot_ref.h"
7179
#include "vec/functions/function.h"
7280
#include "vec/functions/function_string.h"
@@ -161,6 +169,8 @@ Status VFileScanner::prepare(
161169
ADD_TIMER_WITH_LEVEL(_parent->_scanner_profile, "FileScannerPreFilterTimer", 1);
162170
_convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(
163171
_parent->_scanner_profile, "FileScannerConvertOuputBlockTime", 1);
172+
_runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
173+
_parent->_scanner_profile, "FileScannerRuntimeFilterPartitionPruningTime", 1);
164174
_empty_file_counter =
165175
ADD_COUNTER_WITH_LEVEL(_parent->_scanner_profile, "EmptyFileNum", TUnit::UNIT, 1);
166176
_not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_parent->_scanner_profile,
@@ -169,6 +179,9 @@ Status VFileScanner::prepare(
169179
ADD_COUNTER_WITH_LEVEL(_parent->_scanner_profile, "FileNumber", TUnit::UNIT, 1);
170180
_has_fully_rf_file_counter = ADD_COUNTER_WITH_LEVEL(_parent->_scanner_profile,
171181
"HasFullyRfFileNumber", TUnit::UNIT, 1);
182+
_runtime_filter_partition_pruned_range_counter =
183+
ADD_COUNTER_WITH_LEVEL(_parent->_scanner_profile,
184+
"RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);
172185
} else {
173186
_get_block_timer =
174187
ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerGetBlockTime", 1);
@@ -184,6 +197,8 @@ Status VFileScanner::prepare(
184197
"FileScannerPreFilterTimer", 1);
185198
_convert_to_output_block_timer = ADD_TIMER_WITH_LEVEL(
186199
_local_state->scanner_profile(), "FileScannerConvertOuputBlockTime", 1);
200+
_runtime_filter_partition_prune_timer = ADD_TIMER_WITH_LEVEL(
201+
_local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime", 1);
187202
_empty_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
188203
"EmptyFileNum", TUnit::UNIT, 1);
189204
_not_found_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
@@ -192,6 +207,9 @@ Status VFileScanner::prepare(
192207
TUnit::UNIT, 1);
193208
_has_fully_rf_file_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
194209
"HasFullyRfFileNumber", TUnit::UNIT, 1);
210+
_runtime_filter_partition_pruned_range_counter =
211+
ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
212+
"RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT, 1);
195213
}
196214

197215
_file_cache_statistics.reset(new io::FileCacheStatistics());
@@ -231,6 +249,113 @@ Status VFileScanner::prepare(
231249
return Status::OK();
232250
}
233251

252+
// check if the expr is a partition pruning expr
253+
bool VFileScanner::_check_partition_prune_expr(const VExprSPtr& expr) {
254+
if (expr->is_slot_ref()) {
255+
auto* slot_ref = static_cast<VSlotRef*>(expr.get());
256+
return _partition_slot_index_map.find(slot_ref->slot_id()) !=
257+
_partition_slot_index_map.end();
258+
}
259+
if (expr->is_literal()) {
260+
return true;
261+
}
262+
return std::ranges::all_of(expr->children(), [this](const auto& child) {
263+
return _check_partition_prune_expr(child);
264+
});
265+
}
266+
267+
void VFileScanner::_init_runtime_filter_partition_prune_ctxs() {
268+
_runtime_filter_partition_prune_ctxs.clear();
269+
for (auto& conjunct : _conjuncts) {
270+
auto impl = conjunct->root()->get_impl();
271+
// If impl is not null, which means this a conjuncts from runtime filter.
272+
auto expr = impl ? impl : conjunct->root();
273+
if (_check_partition_prune_expr(expr)) {
274+
_runtime_filter_partition_prune_ctxs.emplace_back(conjunct);
275+
}
276+
}
277+
}
278+
279+
void VFileScanner::_init_runtime_filter_partition_prune_block() {
280+
// init block with empty column
281+
for (auto const* slot_desc : _real_tuple_desc->slots()) {
282+
if (!slot_desc->need_materialize()) {
283+
// should be ignored from reading
284+
continue;
285+
}
286+
_runtime_filter_partition_prune_block.insert(
287+
ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
288+
slot_desc->get_data_type_ptr(), slot_desc->col_name()));
289+
}
290+
}
291+
292+
Status VFileScanner::_process_runtime_filters_partition_prune(bool& can_filter_all) {
293+
SCOPED_TIMER(_runtime_filter_partition_prune_timer);
294+
if (_runtime_filter_partition_prune_ctxs.empty() || _partition_col_descs.empty()) {
295+
return Status::OK();
296+
}
297+
size_t partition_value_column_size = 1;
298+
299+
// 1. Get partition key values to string columns.
300+
std::unordered_map<SlotId, MutableColumnPtr> parititon_slot_id_to_column;
301+
for (auto const& partition_col_desc : _partition_col_descs) {
302+
const auto& [partition_value, partition_slot_desc] = partition_col_desc.second;
303+
auto test_serde = partition_slot_desc->get_data_type_ptr()->get_serde();
304+
auto partition_value_column = partition_slot_desc->get_data_type_ptr()->create_column();
305+
auto* col_ptr = static_cast<IColumn*>(partition_value_column.get());
306+
Slice slice(partition_value.data(), partition_value.size());
307+
int num_deserialized = 0;
308+
RETURN_IF_ERROR(test_serde->deserialize_column_from_fixed_json(
309+
*col_ptr, slice, partition_value_column_size, &num_deserialized, {}));
310+
parititon_slot_id_to_column[partition_slot_desc->id()] = std::move(partition_value_column);
311+
}
312+
313+
// 2. Fill _runtime_filter_partition_prune_block from the partition column, then execute conjuncts and filter block.
314+
// 2.1 Fill _runtime_filter_partition_prune_block from the partition column to match the conjuncts executing.
315+
size_t index = 0;
316+
bool first_column_filled = false;
317+
for (auto const* slot_desc : _real_tuple_desc->slots()) {
318+
if (!slot_desc->need_materialize()) {
319+
// should be ignored from reading
320+
continue;
321+
}
322+
if (parititon_slot_id_to_column.find(slot_desc->id()) !=
323+
parititon_slot_id_to_column.end()) {
324+
auto data_type = slot_desc->get_data_type_ptr();
325+
auto partition_value_column = std::move(parititon_slot_id_to_column[slot_desc->id()]);
326+
if (data_type->is_nullable()) {
327+
_runtime_filter_partition_prune_block.insert(
328+
index, ColumnWithTypeAndName(
329+
ColumnNullable::create(
330+
std::move(partition_value_column),
331+
ColumnUInt8::create(partition_value_column_size, 0)),
332+
data_type, slot_desc->col_name()));
333+
} else {
334+
_runtime_filter_partition_prune_block.insert(
335+
index, ColumnWithTypeAndName(std::move(partition_value_column), data_type,
336+
slot_desc->col_name()));
337+
}
338+
if (index == 0) {
339+
first_column_filled = true;
340+
}
341+
}
342+
index++;
343+
}
344+
345+
// 2.2 Execute conjuncts.
346+
if (!first_column_filled) {
347+
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
348+
// The following process may be tricky and time-consuming, but we have no other way.
349+
_runtime_filter_partition_prune_block.get_by_position(0).column->assume_mutable()->resize(
350+
partition_value_column_size);
351+
}
352+
IColumn::Filter result_filter(_runtime_filter_partition_prune_block.rows(), 1);
353+
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_prune_ctxs, nullptr,
354+
&_runtime_filter_partition_prune_block,
355+
&result_filter, &can_filter_all));
356+
return Status::OK();
357+
}
358+
234359
Status VFileScanner::_process_conjuncts_for_dict_filter() {
235360
_slot_id_to_filter_conjuncts.clear();
236361
_not_single_slot_filter_conjuncts.clear();
@@ -294,6 +419,11 @@ Status VFileScanner::open(RuntimeState* state) {
294419
RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
295420
if (_first_scan_range) {
296421
RETURN_IF_ERROR(_init_expr_ctxes());
422+
if (_state->query_options().enable_runtime_filter_partition_prune &&
423+
!_partition_slot_index_map.empty()) {
424+
_init_runtime_filter_partition_prune_ctxs();
425+
_init_runtime_filter_partition_prune_block();
426+
}
297427
} else {
298428
// there's no scan range in split source. stop scanner directly.
299429
_scanner_eof = true;
@@ -775,6 +905,29 @@ Status VFileScanner::_get_next_reader() {
775905
const TFileRangeDesc& range = _current_range;
776906
_current_range_path = range.path;
777907

908+
if (!_partition_slot_descs.empty()) {
909+
// we need get partition columns first for runtime filter partition pruning
910+
RETURN_IF_ERROR(_generate_parititon_columns());
911+
912+
if (_state->query_options().enable_runtime_filter_partition_prune) {
913+
// if enable_runtime_filter_partition_prune is true, we need to check whether this range can be filtered out
914+
// by runtime filter partition prune
915+
if (_push_down_conjuncts.size() < _conjuncts.size()) {
916+
// there are new runtime filters, need to re-init runtime filter partition pruning ctxs
917+
_init_runtime_filter_partition_prune_ctxs();
918+
}
919+
920+
bool can_filter_all = false;
921+
RETURN_IF_ERROR(_process_runtime_filters_partition_prune(can_filter_all));
922+
if (can_filter_all) {
923+
// this range can be filtered out by runtime filter partition pruning
924+
// so we need to skip this range
925+
COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
926+
continue;
927+
}
928+
}
929+
}
930+
778931
// create reader for specific format
779932
Status init_status;
780933
// for compatibility, if format_type is not set in range, use the format type of params
@@ -1018,7 +1171,8 @@ Status VFileScanner::_get_next_reader() {
10181171
_missing_cols.clear();
10191172
RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols));
10201173
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
1021-
RETURN_IF_ERROR(_generate_fill_columns());
1174+
RETURN_IF_ERROR(_generate_missing_columns());
1175+
RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
10221176
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
10231177
fmt::memory_buffer col_buf;
10241178
for (auto& col : _missing_cols) {
@@ -1048,10 +1202,8 @@ Status VFileScanner::_get_next_reader() {
10481202
return Status::OK();
10491203
}
10501204

1051-
Status VFileScanner::_generate_fill_columns() {
1205+
Status VFileScanner::_generate_parititon_columns() {
10521206
_partition_col_descs.clear();
1053-
_missing_col_descs.clear();
1054-
10551207
const TFileRangeDesc& range = _current_range;
10561208
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
10571209
for (const auto& slot_desc : _partition_slot_descs) {
@@ -1072,7 +1224,11 @@ Status VFileScanner::_generate_fill_columns() {
10721224
}
10731225
}
10741226
}
1227+
return Status::OK();
1228+
}
10751229

1230+
Status VFileScanner::_generate_missing_columns() {
1231+
_missing_col_descs.clear();
10761232
if (!_missing_cols.empty()) {
10771233
for (auto slot_desc : _real_tuple_desc->slots()) {
10781234
if (!slot_desc->is_materialized()) {
@@ -1090,8 +1246,7 @@ Status VFileScanner::_generate_fill_columns() {
10901246
_missing_col_descs.emplace(slot_desc->col_name(), it->second);
10911247
}
10921248
}
1093-
1094-
return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs);
1249+
return Status::OK();
10951250
}
10961251

10971252
Status VFileScanner::_init_expr_ctxes() {

be/src/vec/exec/scan/vfile_scanner.h

+11-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include "vec/core/block.h"
3939
#include "vec/exec/format/generic_reader.h"
4040
#include "vec/exec/scan/vscanner.h"
41+
#include "vec/exprs/vexpr_fwd.h"
4142

4243
namespace doris {
4344
class RuntimeState;
@@ -167,6 +168,8 @@ class VFileScanner : public VScanner {
167168
Block _src_block;
168169

169170
VExprContextSPtrs _push_down_conjuncts;
171+
VExprContextSPtrs _runtime_filter_partition_prune_ctxs;
172+
Block _runtime_filter_partition_prune_block;
170173

171174
std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
172175
std::unique_ptr<io::IOContext> _io_ctx;
@@ -183,10 +186,12 @@ class VFileScanner : public VScanner {
183186
RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
184187
RuntimeProfile::Counter* _pre_filter_timer = nullptr;
185188
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
189+
RuntimeProfile::Counter* _runtime_filter_partition_prune_timer = nullptr;
186190
RuntimeProfile::Counter* _empty_file_counter = nullptr;
187191
RuntimeProfile::Counter* _not_found_file_counter = nullptr;
188192
RuntimeProfile::Counter* _file_counter = nullptr;
189193
RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr;
194+
RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr;
190195

191196
const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
192197
// single slot filter conjuncts
@@ -214,8 +219,13 @@ class VFileScanner : public VScanner {
214219
Status _convert_to_output_block(Block* block);
215220
Status _truncate_char_or_varchar_columns(Block* block);
216221
void _truncate_char_or_varchar_column(Block* block, int idx, int len);
217-
Status _generate_fill_columns();
218222
Status _handle_dynamic_block(Block* block);
223+
Status _generate_parititon_columns();
224+
Status _generate_missing_columns();
225+
bool _check_partition_prune_expr(const VExprSPtr& expr);
226+
void _init_runtime_filter_partition_prune_ctxs();
227+
void _init_runtime_filter_partition_prune_block();
228+
Status _process_runtime_filters_partition_prune(bool& is_partition_pruned);
219229
Status _process_conjuncts_for_dict_filter();
220230
Status _process_late_arrival_conjuncts();
221231
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
create database if not exists partition_tables;
2+
use partition_tables;
3+
4+
CREATE TABLE decimal_partition_table (
5+
id INT,
6+
name STRING,
7+
value FLOAT
8+
)
9+
PARTITIONED BY (partition_col DECIMAL(10, 2))
10+
STORED AS PARQUET
11+
LOCATION '/user/doris/preinstalled_data/partition_tables/decimal_partition_table';
12+
13+
CREATE TABLE int_partition_table (
14+
id INT,
15+
name STRING,
16+
value FLOAT
17+
)
18+
PARTITIONED BY (partition_col INT)
19+
STORED AS PARQUET
20+
LOCATION '/user/doris/preinstalled_data/partition_tables/int_partition_table';
21+
22+
CREATE TABLE string_partition_table (
23+
id INT,
24+
name STRING,
25+
value FLOAT
26+
)
27+
PARTITIONED BY (partition_col STRING)
28+
STORED AS PARQUET
29+
LOCATION '/user/doris/preinstalled_data/partition_tables/string_partition_table';
30+
31+
CREATE TABLE date_partition_table (
32+
id INT,
33+
name STRING,
34+
value FLOAT
35+
)
36+
PARTITIONED BY (partition_col DATE)
37+
STORED AS PARQUET
38+
LOCATION '/user/doris/preinstalled_data/partition_tables/date_partition_table';
39+
40+
41+
msck repair table decimal_partition_table;
42+
msck repair table int_partition_table;
43+
msck repair table string_partition_table;
44+
msck repair table date_partition_table;

0 commit comments

Comments
 (0)