Skip to content

Commit cda74e4

Browse files
suxiaogang223morningman
authored andcommitted
[enhance](runtime filter) impl partition pruning in runtime filer apache#47025
1 parent 443e87e commit cda74e4

File tree

4 files changed

+226
-7
lines changed

4 files changed

+226
-7
lines changed

be/src/vec/exec/scan/vfile_scanner.cpp

+123-6
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,27 @@
2222
#include <gen_cpp/Metrics_types.h>
2323
#include <gen_cpp/PaloInternalService_types.h>
2424
#include <gen_cpp/PlanNodes_types.h>
25+
#include <glog/logging.h>
2526

2627
#include <algorithm>
2728
#include <boost/iterator/iterator_facade.hpp>
2829
#include <iterator>
2930
#include <map>
3031
#include <ostream>
3132
#include <tuple>
33+
#include <unordered_map>
3234
#include <utility>
3335

3436
#include "common/compiler_util.h" // IWYU pragma: keep
3537
#include "common/config.h"
3638
#include "common/logging.h"
3739
#include "common/object_pool.h"
40+
#include "common/status.h"
3841
#include "io/cache/block/block_file_cache_profile.h"
3942
#include "runtime/descriptors.h"
4043
#include "runtime/runtime_state.h"
4144
#include "runtime/types.h"
45+
#include "util/runtime_profile.h"
4246
#include "vec/aggregate_functions/aggregate_function.h"
4347
#include "vec/columns/column.h"
4448
#include "vec/columns/column_nullable.h"
@@ -167,6 +171,10 @@ Status VFileScanner::prepare(
167171
_file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT);
168172
_has_fully_rf_file_counter =
169173
ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber", TUnit::UNIT);
174+
_runtime_filter_partition_pruning_timer = ADD_TIMER(
175+
_parent->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime");
176+
_runtime_filter_partition_pruned_range_counter = ADD_COUNTER(
177+
_parent->scanner_profile(), "RuntimeFilterPartitionPrunedRangeNum", TUnit::UNIT);
170178
} else {
171179
_get_block_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerGetBlockTime");
172180
_open_reader_timer =
@@ -187,6 +195,11 @@ Status VFileScanner::prepare(
187195
_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT);
188196
_has_fully_rf_file_counter =
189197
ADD_COUNTER(_local_state->scanner_profile(), "HasFullyRfFileNumber", TUnit::UNIT);
198+
_runtime_filter_partition_pruning_timer = ADD_TIMER(
199+
_local_state->scanner_profile(), "FileScannerRuntimeFilterPartitionPruningTime");
200+
_runtime_filter_partition_pruned_range_counter =
201+
ADD_COUNTER(_local_state->scanner_profile(), "RuntimeFilterPartitionPrunedRangeNum",
202+
TUnit::UNIT);
190203
}
191204

192205
_file_cache_statistics.reset(new io::FileCacheStatistics());
@@ -226,6 +239,89 @@ Status VFileScanner::prepare(
226239
return Status::OK();
227240
}
228241

242+
void VFileScanner::_init_runtime_filter_partition_pruning_ctxs() {
243+
_runtime_filter_partition_pruning_ctxs.clear();
244+
for (auto& conjunct : _conjuncts) {
245+
auto impl = conjunct->root()->get_impl();
246+
if (impl) {
247+
// If impl is not null, which means this a conjuncts from runtime filter.
248+
DCHECK(impl->get_num_children() > 0 && impl->get_child(0)->is_slot_ref());
249+
const auto* slot_ref = static_cast<const VSlotRef*>(impl->get_child(0).get());
250+
if (_partition_slot_index_map.find(slot_ref->slot_id()) !=
251+
_partition_slot_index_map.end()) {
252+
// If the slot is partition column, add it to runtime filter partition pruning ctxs.
253+
_runtime_filter_partition_pruning_ctxs.emplace_back(conjunct);
254+
}
255+
}
256+
}
257+
}
258+
259+
Status VFileScanner::_process_runtime_filters_partition_pruning(bool& can_filter_all) {
260+
SCOPED_TIMER(_runtime_filter_partition_pruning_timer);
261+
if (_runtime_filter_partition_pruning_ctxs.empty() || _partition_col_descs.empty()) {
262+
return Status::OK();
263+
}
264+
size_t partition_value_column_size = 0;
265+
266+
// 1. Get partition key values to string columns.
267+
std::unordered_map<SlotId, MutableColumnPtr> parititon_slot_id_to_column;
268+
for (auto const& partition_col_desc : _partition_col_descs) {
269+
auto partiton_data = std::get<0>(partition_col_desc.second);
270+
const auto* partiton_slot_desc = std::get<1>(partition_col_desc.second);
271+
auto partition_value_column = ColumnString::create();
272+
partition_value_column->insert_data(partiton_data.c_str(), partiton_data.size());
273+
partition_value_column_size = partition_value_column->size();
274+
parititon_slot_id_to_column[partiton_slot_desc->id()] = std::move(partition_value_column);
275+
}
276+
277+
// 2. Build a temp block from the partition column, then execute conjuncts and filter block.
278+
// 2.1 Build a temp block from the partition column to match the conjuncts executing.
279+
Block temp_block;
280+
int index = 0;
281+
bool first_cloumn_filled = false;
282+
for (auto const* slot_desc : _real_tuple_desc->slots()) {
283+
if (!slot_desc->need_materialize()) {
284+
// should be ignored from reading
285+
continue;
286+
}
287+
if (parititon_slot_id_to_column.find(slot_desc->id()) !=
288+
parititon_slot_id_to_column.end()) {
289+
auto data_type = slot_desc->get_data_type_ptr();
290+
auto partition_value_column = std::move(parititon_slot_id_to_column[slot_desc->id()]);
291+
if (data_type->is_nullable()) {
292+
temp_block.insert(
293+
{ColumnNullable::create(
294+
std::move(partition_value_column),
295+
ColumnUInt8::create(partition_value_column_size, 0)),
296+
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()),
297+
""});
298+
} else {
299+
temp_block.insert({std::move(partition_value_column),
300+
std::make_shared<DataTypeString>(), ""});
301+
}
302+
if (index == 0) {
303+
first_cloumn_filled = true;
304+
}
305+
} else {
306+
temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
307+
slot_desc->get_data_type_ptr(),
308+
slot_desc->col_name()));
309+
}
310+
index++;
311+
}
312+
313+
// 2.2 Execute conjuncts.
314+
if (!first_cloumn_filled) {
315+
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
316+
// The following process may be tricky and time-consuming, but we have no other way.
317+
temp_block.get_by_position(0).column->assume_mutable()->resize(partition_value_column_size);
318+
}
319+
IColumn::Filter result_filter(temp_block.rows(), 1);
320+
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_runtime_filter_partition_pruning_ctxs, nullptr,
321+
&temp_block, &result_filter, &can_filter_all));
322+
return Status::OK();
323+
}
324+
229325
Status VFileScanner::_process_conjuncts_for_dict_filter() {
230326
_slot_id_to_filter_conjuncts.clear();
231327
_not_single_slot_filter_conjuncts.clear();
@@ -289,6 +385,7 @@ Status VFileScanner::open(RuntimeState* state) {
289385
RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
290386
if (_first_scan_range) {
291387
RETURN_IF_ERROR(_init_expr_ctxes());
388+
_init_runtime_filter_partition_pruning_ctxs();
292389
} else {
293390
// there's no scan range in split source. stop scanner directly.
294391
_scanner_eof = true;
@@ -771,6 +868,24 @@ Status VFileScanner::_get_next_reader() {
771868
const TFileRangeDesc& range = _current_range;
772869
_current_range_path = range.path;
773870

871+
if (!_partition_slot_descs.empty()) {
872+
// we need get partition columns first for runtime filter partition pruning
873+
RETURN_IF_ERROR(_generate_parititon_columns());
874+
if (_push_down_conjuncts.size() < _conjuncts.size()) {
875+
// there are new runtime filters, need to re-init runtime filter partition pruning ctxs
876+
_init_runtime_filter_partition_pruning_ctxs();
877+
}
878+
879+
bool can_filter_all = false;
880+
RETURN_IF_ERROR(_process_runtime_filters_partition_pruning(can_filter_all));
881+
if (can_filter_all) {
882+
// this range can be filtered out by runtime filter partition pruning
883+
// so we need to skip this range
884+
COUNTER_UPDATE(_runtime_filter_partition_pruned_range_counter, 1);
885+
continue;
886+
}
887+
}
888+
774889
// create reader for specific format
775890
Status init_status;
776891
TFileFormatType::type format_type = _params->format_type;
@@ -1019,7 +1134,8 @@ Status VFileScanner::_get_next_reader() {
10191134
_missing_cols.clear();
10201135
RETURN_IF_ERROR(_cur_reader->get_columns(&_name_to_col_type, &_missing_cols));
10211136
_cur_reader->set_push_down_agg_type(_get_push_down_agg_type());
1022-
RETURN_IF_ERROR(_generate_fill_columns());
1137+
RETURN_IF_ERROR(_generate_missing_columns());
1138+
RETURN_IF_ERROR(_cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs));
10231139
if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
10241140
fmt::memory_buffer col_buf;
10251141
for (auto& col : _missing_cols) {
@@ -1049,10 +1165,8 @@ Status VFileScanner::_get_next_reader() {
10491165
return Status::OK();
10501166
}
10511167

1052-
Status VFileScanner::_generate_fill_columns() {
1168+
Status VFileScanner::_generate_parititon_columns() {
10531169
_partition_col_descs.clear();
1054-
_missing_col_descs.clear();
1055-
10561170
const TFileRangeDesc& range = _current_range;
10571171
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
10581172
for (const auto& slot_desc : _partition_slot_descs) {
@@ -1073,7 +1187,11 @@ Status VFileScanner::_generate_fill_columns() {
10731187
}
10741188
}
10751189
}
1190+
return Status::OK();
1191+
}
10761192

1193+
Status VFileScanner::_generate_missing_columns() {
1194+
_missing_col_descs.clear();
10771195
if (!_missing_cols.empty()) {
10781196
for (auto slot_desc : _real_tuple_desc->slots()) {
10791197
if (!slot_desc->is_materialized()) {
@@ -1091,8 +1209,7 @@ Status VFileScanner::_generate_fill_columns() {
10911209
_missing_col_descs.emplace(slot_desc->col_name(), it->second);
10921210
}
10931211
}
1094-
1095-
return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs);
1212+
return Status::OK();
10961213
}
10971214

10981215
Status VFileScanner::_init_expr_ctxes() {

be/src/vec/exec/scan/vfile_scanner.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ class VFileScanner : public VScanner {
165165
Block _src_block;
166166

167167
VExprContextSPtrs _push_down_conjuncts;
168+
VExprContextSPtrs _runtime_filter_partition_pruning_ctxs;
168169

169170
std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
170171
std::unique_ptr<io::IOContext> _io_ctx;
@@ -181,10 +182,12 @@ class VFileScanner : public VScanner {
181182
RuntimeProfile::Counter* _fill_missing_columns_timer = nullptr;
182183
RuntimeProfile::Counter* _pre_filter_timer = nullptr;
183184
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
185+
RuntimeProfile::Counter* _runtime_filter_partition_pruning_timer = nullptr;
184186
RuntimeProfile::Counter* _empty_file_counter = nullptr;
185187
RuntimeProfile::Counter* _not_found_file_counter = nullptr;
186188
RuntimeProfile::Counter* _file_counter = nullptr;
187189
RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr;
190+
RuntimeProfile::Counter* _runtime_filter_partition_pruned_range_counter = nullptr;
188191

189192
const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
190193
// single slot filter conjuncts
@@ -212,8 +215,11 @@ class VFileScanner : public VScanner {
212215
Status _convert_to_output_block(Block* block);
213216
Status _truncate_char_or_varchar_columns(Block* block);
214217
void _truncate_char_or_varchar_column(Block* block, int idx, int len);
215-
Status _generate_fill_columns();
216218
Status _handle_dynamic_block(Block* block);
219+
Status _generate_parititon_columns();
220+
Status _generate_missing_columns();
221+
void _init_runtime_filter_partition_pruning_ctxs();
222+
Status _process_runtime_filters_partition_pruning(bool& is_partition_pruning);
217223
Status _process_conjuncts_for_dict_filter();
218224
Status _process_late_arrival_conjuncts();
219225
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !runtime_filter_partition_pruning1 --
3+
3994
4+
5+
-- !runtime_filter_partition_pruning2 --
6+
4990
7+
8+
-- !runtime_filter_partition_pruning3 --
9+
1999
10+
11+
-- !runtime_filter_partition_pruning4 --
12+
2994
13+
14+
-- !runtime_filter_partition_pruning1 --
15+
3994
16+
17+
-- !runtime_filter_partition_pruning2 --
18+
4990
19+
20+
-- !runtime_filter_partition_pruning3 --
21+
1999
22+
23+
-- !runtime_filter_partition_pruning4 --
24+
2994
25+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_hive_runtime_filter_partition_pruning", "p0,external,hive,external_docker,external_docker_hive") {
19+
def test_runtime_filter_partition_pruning = {
20+
qt_runtime_filter_partition_pruning1 """
21+
select count(*) from partition_table where nation =
22+
(select nation from partition_table
23+
group by nation having count(*) > 0
24+
order by nation desc limit 1);
25+
"""
26+
qt_runtime_filter_partition_pruning2 """
27+
select count(*) from partition_table where nation in
28+
(select nation from partition_table
29+
group by nation having count(*) > 0
30+
order by nation desc limit 2);
31+
"""
32+
qt_runtime_filter_partition_pruning3 """
33+
select count(*) from partition_table where city =
34+
(select city from partition_table
35+
group by city having count(*) > 0
36+
order by city desc limit 1);
37+
"""
38+
qt_runtime_filter_partition_pruning4 """
39+
select count(*) from partition_table where city in
40+
(select city from partition_table
41+
group by city having count(*) > 0
42+
order by city desc limit 2);
43+
"""
44+
}
45+
46+
String enabled = context.config.otherConfigs.get("enableHiveTest")
47+
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
48+
logger.info("diable Hive test.")
49+
return;
50+
}
51+
52+
for (String hivePrefix : ["hive2", "hive3"]) {
53+
try {
54+
String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")
55+
String catalog_name = "${hivePrefix}_test_partitions"
56+
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
57+
58+
sql """drop catalog if exists ${catalog_name}"""
59+
sql """create catalog if not exists ${catalog_name} properties (
60+
"type"="hms",
61+
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
62+
);"""
63+
sql """use `${catalog_name}`.`default`"""
64+
65+
test_runtime_filter_partition_pruning()
66+
67+
} finally {
68+
}
69+
}
70+
}
71+

0 commit comments

Comments
 (0)