Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](parquet)Fix data column and null map column not equal when reading Parquet complex type cross-page data #47734

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,18 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
// just read the remaining values of the last row in previous page,
// so there's no a new row should be read.
batch_size = 0;
/*
* Since the function is repeatedly called to fetch data for the batch size,
* it causes `_rep_levels.resize(0); _def_levels.resize(0);`, resulting in the
* definition and repetition levels of the reader only containing the latter
* part of the batch (i.e., missing some parts). Therefore, when using the
* definition and repetition levels to fill the null_map for structs and maps,
* the function should not be called multiple times before filling.
* todo:
* We may need to consider reading the entire batch of data at once, as this approach
* would be more user-friendly in terms of function usage. However, we must consider that if the
* data spans multiple pages, memory usage may increase significantly.
*/
} else {
_rep_levels.resize(0);
_def_levels.resize(0);
Expand Down Expand Up @@ -835,7 +847,7 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
continue;
}

_read_column_names.insert(doris_name);
_read_column_names.emplace_back(doris_name);

// select_vector.reset();
size_t field_rows = 0;
Expand All @@ -847,6 +859,15 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
is_dict_filter));
*read_rows = field_rows;
*eof = field_eof;
/*
* Considering the issue in the `_read_nested_column` function where data may span across pages, leading
* to missing definition and repetition levels, when filling the null_map of the struct later, it is
* crucial to use the definition and repetition levels from the first read column
* (since `_read_nested_column` is not called repeatedly).
*
* It is worth mentioning that, theoretically, any sub-column can be chosen to fill the null_map,
* and selecting the shortest one will offer better performance
*/
} else {
while (field_rows < *read_rows && !field_eof) {
size_t loop_rows = 0;
Expand Down
14 changes: 7 additions & 7 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,24 +298,24 @@ class StructColumnReader : public ParquetColumnReader {
if (!_read_column_names.empty()) {
// can't use _child_readers[*_read_column_names.begin()]
// because the operator[] of std::unordered_map is not const :(
return _child_readers.find(*_read_column_names.begin())->second->get_rep_level();
return _child_readers.find(_read_column_names.front())->second->get_rep_level();
}
return _child_readers.begin()->second->get_rep_level();
}

const std::vector<level_t>& get_def_level() const override {
if (!_read_column_names.empty()) {
return _child_readers.find(*_read_column_names.begin())->second->get_def_level();
return _child_readers.find(_read_column_names.front())->second->get_def_level();
}
return _child_readers.begin()->second->get_def_level();
}

Statistics statistics() override {
Statistics st;
for (const auto& reader : _child_readers) {
// make sure the field is read
if (_read_column_names.find(reader.first) != _read_column_names.end()) {
Statistics cst = reader.second->statistics();
for (const auto& column_name : _read_column_names) {
auto reader = _child_readers.find(column_name);
if (reader != _child_readers.end()) {
Statistics cst = reader->second->statistics();
st.merge(cst);
}
}
Expand All @@ -332,7 +332,7 @@ class StructColumnReader : public ParquetColumnReader {

private:
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _child_readers;
std::set<std::string> _read_column_names;
std::vector<std::string> _read_column_names;
};

}; // namespace doris::vectorized
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,12 @@
-- !viewfs --
25001 25001 25001

-- !row_cross_pages_2 --
149923 149923

-- !row_cross_pages_3 --
74815 74815

-- !row_cross_pages_4 --
457 457

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !1 --
1

-- !2 --
5000

-- !3 --
5000

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

suite("test_tvf_p2", "p0,external,tvf,external_docker,hive") {
suite("test_tvf_p0", "p0,external,tvf,external_docker,hive") {
String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String nameNodeHost = context.config.otherConfigs.get("externalEnvIp")
Expand Down Expand Up @@ -46,7 +46,7 @@ suite("test_tvf_p2", "p0,external,tvf,external_docker,hive") {
"format" = "orc");
"""

// a row of complex type may be stored across more pages
// (1): a row of complex type may be stored across more pages
qt_row_cross_pages """select count(id), count(m1), count(m2)
from hdfs(
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages.parquet",
Expand All @@ -73,5 +73,25 @@ suite("test_tvf_p2", "p0,external,tvf,external_docker,hive") {
"format" = "parquet",
"fs.viewfs.mounttable.my-cluster.link./ns1" = "hdfs://${nameNodeHost}:${hdfsPort}/",
"fs.viewfs.mounttable.my-cluster.homedir" = "/ns1")"""

// (2): a row of complex type may be stored across more pages
qt_row_cross_pages_2 """select count(id), count(experiment)
from hdfs(
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet",
"format" = "parquet");
""" //149923

qt_row_cross_pages_3 """select count(id), count(experiment)
from hdfs(
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet",
"format" = "parquet") where id > 49923 ;
""" // 74815

qt_row_cross_pages_4 """select count(id), count(experiment)
from hdfs(
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet",
"format" = "parquet") where id < 300 ;
""" //457

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_parquet_complex_cross_page", "p2,external,hive,external_remote,external_remote_hive") {

String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
//hudi hive use same catalog in p2.
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("disable test")
return;
}

String props = context.config.otherConfigs.get("hudiEmrCatalog")
String hms_catalog_name = "test_parquet_complex_cross_page"

sql """drop catalog if exists ${hms_catalog_name};"""
sql """
CREATE CATALOG IF NOT EXISTS ${hms_catalog_name}
PROPERTIES (
${props}
,'hive.version' = '3.1.3'
);
"""

logger.info("catalog " + hms_catalog_name + " created")
sql """switch ${hms_catalog_name};"""
logger.info("switched to catalog " + hms_catalog_name)
sql """ use regression;"""

sql """ set dry_run_query=true; """

qt_1 """ SELECT * FROM test_parquet_complex_cross_page WHERE device_id='DZ692' and format_time between 1737693770300 and 1737693770500
and date between '20250124' and '20250124' and project='GA20230001' ; """
qt_2 """ SELECT functions_pnc_ssm_road_di_objects from test_parquet_complex_cross_page ; """
qt_3 """ select * from test_parquet_complex_cross_page ; """

sql """drop catalog ${hms_catalog_name};"""
}
Loading