From ec3c911f97bcf686547a33130ffdf0cad08d2491 Mon Sep 17 00:00:00 2001 From: Pxl Date: Thu, 4 Aug 2022 14:07:48 +0800 Subject: [PATCH] [Feature][Materialized-View] support materialized view on vectorized engine (#10792) --- .gitattributes | 1 + be/src/olap/hll.cpp | 40 -------------- be/src/olap/hll.h | 9 ---- .../rowset/segment_v2/segment_iterator.cpp | 10 ++-- .../olap/rowset/segment_v2/segment_iterator.h | 2 +- be/src/olap/schema_change.cpp | 48 ++++++++--------- be/src/olap/schema_change.h | 1 + be/src/runtime/descriptors.h | 5 -- be/src/runtime/primitive_type.h | 1 - be/src/vec/columns/column_complex.h | 50 +++++++----------- be/src/vec/exec/volap_scanner.cpp | 13 +++-- .../org/apache/doris/alter/RollupJobV2.java | 11 ++-- .../apache/doris/alter/SchemaChangeJobV2.java | 4 +- .../analysis/CreateMaterializedViewStmt.java | 13 +++-- .../rewrite/mvrewrite/CountFieldToSum.java | 6 +-- .../mvrewrite/HLLHashToSlotRefRule.java | 3 +- .../apache/doris/alter/RollupJobV2Test.java | 5 +- .../CreateMaterializedViewStmtTest.java | 21 +++++--- .../data/rollup/test_materialized_view.out | Bin 926 -> 1399 bytes .../rollup/test_materialized_view.groovy | 10 +++- 20 files changed, 100 insertions(+), 153 deletions(-) diff --git a/.gitattributes b/.gitattributes index 6c01695fd205ef..01e299f06ec450 100644 --- a/.gitattributes +++ b/.gitattributes @@ -14,3 +14,4 @@ *.thrift text eol=lf *.proto text eol=lf *.conf text eol=lf +*.out text eol=lf -diff diff --git a/be/src/olap/hll.cpp b/be/src/olap/hll.cpp index a2ec6e39a0a12b..bc2ba3de39a6ac 100644 --- a/be/src/olap/hll.cpp +++ b/be/src/olap/hll.cpp @@ -407,44 +407,4 @@ void HllSetResolver::parse() { } } -void HllSetHelper::set_sparse(char* result, const std::map& index_to_value, - int& len) { - result[0] = HLL_DATA_SPARSE; - len = sizeof(HllSetResolver::SetTypeValueType) + sizeof(HllSetResolver::SparseLengthValueType); - char* write_value_pos = result + len; - for (auto iter = index_to_value.begin(); iter != index_to_value.end(); ++iter) { - write_value_pos[0] = (char)(iter->first & 0xff); - write_value_pos[1] = (char)(iter->first >> 8 & 0xff); - write_value_pos[2] = iter->second; - write_value_pos += 3; - } - int registers_count = index_to_value.size(); - len += registers_count * - (sizeof(HllSetResolver::SparseIndexType) + sizeof(HllSetResolver::SparseValueType)); - *(int*)(result + 1) = registers_count; -} - -void HllSetHelper::set_explicit(char* result, const std::set& hash_value_set, int& len) { - result[0] = HLL_DATA_EXPLICIT; - result[1] = (HllSetResolver::ExplicitLengthValueType)(hash_value_set.size()); - len = sizeof(HllSetResolver::SetTypeValueType) + - sizeof(HllSetResolver::ExplicitLengthValueType); - char* write_pos = result + len; - for (auto iter = hash_value_set.begin(); iter != hash_value_set.end(); ++iter) { - uint64_t hash_value = *iter; - *(uint64_t*)write_pos = hash_value; - write_pos += 8; - } - len += sizeof(uint64_t) * hash_value_set.size(); -} - -void HllSetHelper::set_full(char* result, const std::map& index_to_value, - const int registers_len, int& len) { - result[0] = HLL_DATA_FULL; - for (auto iter = index_to_value.begin(); iter != index_to_value.end(); ++iter) { - result[1 + iter->first] = iter->second; - } - len = registers_len + sizeof(HllSetResolver::SetTypeValueType); -} - } // namespace doris diff --git a/be/src/olap/hll.h b/be/src/olap/hll.h index 81c17be2529363..a3347f4626ca99 100644 --- a/be/src/olap/hll.h +++ b/be/src/olap/hll.h @@ -370,13 +370,4 @@ class HllSetResolver { SparseLengthValueType* _sparse_count; }; -// todo(kks): remove this when dpp_sink class was removed -class HllSetHelper { -public: - static void set_sparse(char* result, const std::map& index_to_value, int& len); - static void set_explicit(char* result, const std::set& hash_value_set, int& len); - static void set_full(char* result, const std::map& index_to_value, - const int set_len, int& len); -}; - } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 9aa4173fd3773d..1b39703863bbbd 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -1001,16 +1001,15 @@ uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro Status SegmentIterator::_read_columns_by_rowids(std::vector& read_column_ids, std::vector& rowid_vector, - uint16_t* sel_rowid_idx, size_t select_size, - vectorized::MutableColumns* mutable_columns) { + uint16_t* sel_rowid_idx, size_t select_size) { SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns); std::vector rowids(select_size); for (size_t i = 0; i < select_size; ++i) { rowids[i] = rowid_vector[sel_rowid_idx[i]]; } for (auto cid : read_column_ids) { - auto& column = (*mutable_columns)[cid]; - RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size, column)); + RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size, + _current_return_columns[cid])); } return Status::OK(); @@ -1117,8 +1116,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { // step3: read non_predicate column RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, _block_rowids, - sel_rowid_idx, selected_size, - &_current_return_columns)); + sel_rowid_idx, selected_size)); // step4: output columns // 4.1 output non-predicate column diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index b99f8b379a9ab5..7170d6010814ff 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -116,7 +116,7 @@ class SegmentIterator : public RowwiseIterator { void _output_non_pred_columns(vectorized::Block* block); Status _read_columns_by_rowids(std::vector& read_column_ids, std::vector& rowid_vector, uint16_t* sel_rowid_idx, - size_t select_size, vectorized::MutableColumns* mutable_columns); + size_t select_size); template Status _output_column_by_sel_idx(vectorized::Block* block, const Container& column_ids, diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 743393e555a948..783ec852725434 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -785,22 +785,11 @@ Status RowBlockChanger::change_block(vectorized::Block* ref_block, return Status::OLAPInternalError(OLAP_ERR_NOT_INITED); } - // material-view or rollup task will fail now - if (_desc_tbl.get_row_tuples().size() != ref_block->columns()) { - return Status::NotSupported( - "_desc_tbl.get_row_tuples().size() != ref_block->columns(), maybe because rollup " - "not supported now. "); - } - - std::vector nullable_tuples; - for (int i = 0; i < ref_block->columns(); i++) { - nullable_tuples.emplace_back(ref_block->get_by_position(i).column->is_nullable()); - } - ObjectPool pool; RuntimeState* state = pool.add(new RuntimeState()); state->set_desc_tbl(&_desc_tbl); - RowDescriptor row_desc = RowDescriptor::create_default(_desc_tbl, nullable_tuples); + RowDescriptor row_desc = + RowDescriptor(_desc_tbl.get_tuple_descriptor(_desc_tbl.get_row_tuples()[0]), false); const int row_size = ref_block->rows(); const int column_size = new_block->columns(); @@ -811,10 +800,6 @@ Status RowBlockChanger::change_block(vectorized::Block* ref_block, for (int idx = 0; idx < column_size; idx++) { int ref_idx = _schema_mapping[idx].ref_column; - if (!_schema_mapping[idx].materialized_function.empty()) { - return Status::NotSupported("Materialized function not supported now. "); - } - if (ref_idx < 0) { // new column, write default value auto value = _schema_mapping[idx].default_value; @@ -1547,15 +1532,14 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea rowset_reader->next_block(ref_block.get()); while (ref_block->rows()) { RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get())); - if (!_mem_tracker->check_limit(config::memory_limitation_per_thread_for_schema_change_bytes, - new_block->allocated_bytes())) { + if (!_mem_tracker->check_limit(_memory_limitation, new_block->allocated_bytes())) { RETURN_IF_ERROR(create_rowset()); - if (!_mem_tracker->check_limit( - config::memory_limitation_per_thread_for_schema_change_bytes, - new_block->allocated_bytes())) { + if (!_mem_tracker->check_limit(_memory_limitation, new_block->allocated_bytes())) { LOG(WARNING) << "Memory limitation is too small for Schema Change." - << "memory_limitation=" << _memory_limitation; + << " _memory_limitation=" << _memory_limitation + << ", new_block->allocated_bytes()=" << new_block->allocated_bytes() + << ", consumption=" << _mem_tracker->consumption(); return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR); } } @@ -1649,9 +1633,8 @@ bool SchemaChangeWithSorting::_external_sorting(vector& src_row rs_readers.push_back(rs_reader); } // get cur schema if rowset schema exist, rowset schema must be newer than tablet schema - auto max_version_rowset = src_rowsets.back(); const TabletSchema* cur_tablet_schema = - max_version_rowset->rowset_meta()->tablet_schema().get(); + src_rowsets.back()->rowset_meta()->tablet_schema().get(); if (cur_tablet_schema == nullptr) { cur_tablet_schema = new_tablet->tablet_schema().get(); } @@ -1680,6 +1663,12 @@ Status VSchemaChangeWithSorting::_external_sorting(vector& src_ rs_readers.push_back(rs_reader); } + // get cur schema if rowset schema exist, rowset schema must be newer than tablet schema + auto cur_tablet_schema = src_rowsets.back()->rowset_meta()->tablet_schema(); + if (cur_tablet_schema == nullptr) { + cur_tablet_schema = new_tablet->tablet_schema(); + } + Merger::Statistics stats; RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, READER_ALTER_TABLE, new_tablet->tablet_schema().get(), rs_readers, @@ -1717,6 +1706,7 @@ Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& req std::shared_mutex SchemaChangeHandler::_mutex; std::unordered_set SchemaChangeHandler::_tablet_ids_in_converting; +std::set SchemaChangeHandler::_supported_functions = {"hll_hash", "to_bitmap"}; // In the past schema change and rollup will create new tablet and will wait for txns starting before the task to finished // It will cost a lot of time to wait and the task is very difficult to understand. @@ -1848,7 +1838,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& LOG(WARNING) << "New tablet has a version " << pair.first << " crossing base tablet's max_version=" << max_rowset->end_version(); - Status::OLAPInternalError(OLAP_ERR_VERSION_ALREADY_MERGED); + return Status::OLAPInternalError(OLAP_ERR_VERSION_ALREADY_MERGED); } } std::vector empty_vec; @@ -1949,9 +1939,14 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& if (item.__isset.mv_expr) { if (item.mv_expr.nodes[0].node_type == TExprNodeType::FUNCTION_CALL) { mv_param.mv_expr = item.mv_expr.nodes[0].fn.name.function_name; + if (!_supported_functions.count(mv_param.mv_expr)) { + return Status::NotSupported("Unknow materialized view expr " + + mv_param.mv_expr); + } } else if (item.mv_expr.nodes[0].node_type == TExprNodeType::CASE_EXPR) { mv_param.mv_expr = "count_field"; } + mv_param.expr = std::make_shared(item.mv_expr); } sc_params.materialized_params_map.insert( @@ -2152,6 +2147,7 @@ Status SchemaChangeHandler::_parse_request( const TabletColumn& new_column = new_tablet->tablet_schema()->column(i); const string& column_name = new_column.name(); ColumnMapping* column_mapping = rb_changer->get_mutable_column_mapping(i); + column_mapping->new_column = &new_column; if (new_column.has_reference_column()) { int32_t column_index = base_tablet_schema->field_index(new_column.referenced_column()); diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 0ce3761d4ff108..79b5da2852c462 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -328,6 +328,7 @@ class SchemaChangeHandler { static std::shared_mutex _mutex; static std::unordered_set _tablet_ids_in_converting; + static std::set _supported_functions; }; using RowBlockDeleter = std::function; diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 85f5bea18a9f1a..74f59655aff6cd 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -392,11 +392,6 @@ class RowDescriptor { RowDescriptor(const DescriptorTbl& desc_tbl, const std::vector& row_tuples, const std::vector& nullable_tuples); - static RowDescriptor create_default(const DescriptorTbl& desc_tbl, - const std::vector& nullable_tuples) { - return RowDescriptor(desc_tbl, desc_tbl.get_row_tuples(), nullable_tuples); - } - // standard copy c'tor, made explicit here RowDescriptor(const RowDescriptor& desc) : _tuple_desc_map(desc._tuple_desc_map), diff --git a/be/src/runtime/primitive_type.h b/be/src/runtime/primitive_type.h index 03362acd65b6f0..e6aba461fb55a2 100644 --- a/be/src/runtime/primitive_type.h +++ b/be/src/runtime/primitive_type.h @@ -209,7 +209,6 @@ struct PrimitiveTypeTraits { using ColumnType = vectorized::ColumnString; }; -// only for adapt get_predicate_column_ptr template struct PredicatePrimitiveTypeTraits { using PredicateFieldType = typename PrimitiveTypeTraits::CppType; diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index 35bfa45b520e16..fe732ec8a9ce08 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -62,41 +62,30 @@ class ColumnComplexType final : public COWHelper> data.push_back(*reinterpret_cast(pos)); } - void insert_many_binary_data(char* data_array, uint32_t* len_array, - uint32_t* start_offset_array, size_t num) override { + void insert_binary_data(const char* pos, size_t length) { + insert_default(); + T* pvalue = &get_element(size() - 1); + if (!length) { + *pvalue = *reinterpret_cast(pos); + return; + } + if constexpr (std::is_same_v) { - for (size_t i = 0; i < num; i++) { - uint32_t len = len_array[i]; - uint32_t start_offset = start_offset_array[i]; - insert_default(); - BitmapValue* pvalue = &get_element(size() - 1); - if (len != 0) { - BitmapValue value; - value.deserialize(data_array + start_offset); - *pvalue = std::move(value); - } else { - *pvalue = std::move(*reinterpret_cast(data_array + start_offset)); - } - } + pvalue->deserialize(pos); } else if constexpr (std::is_same_v) { - for (size_t i = 0; i < num; i++) { - uint32_t len = len_array[i]; - uint32_t start_offset = start_offset_array[i]; - insert_default(); - HyperLogLog* pvalue = &get_element(size() - 1); - if (len != 0) { - HyperLogLog value; - value.deserialize(Slice(data_array + start_offset, len)); - *pvalue = std::move(value); - } else { - *pvalue = std::move(*reinterpret_cast(data_array + start_offset)); - } - } + pvalue->deserialize(Slice(pos, length)); } else { LOG(FATAL) << "Unexpected type in column complex"; } } + void insert_many_binary_data(char* data_array, uint32_t* len_array, + uint32_t* start_offset_array, size_t num) override { + for (size_t i = 0; i < num; i++) { + insert_binary_data(data_array + start_offset_array[i], len_array[i]); + } + } + void insert_default() override { data.push_back(T()); } void insert_many_defaults(size_t length) override { @@ -299,10 +288,7 @@ template ColumnPtr ColumnComplexType::permute(const IColumn::Permutation& perm, size_t limit) const { size_t size = data.size(); - if (limit == 0) - limit = size; - else - limit = std::min(size, limit); + limit = limit ? std::min(size, limit) : size; if (perm.size() < limit) { LOG(FATAL) << "Size of permutation is less than required."; diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index 02fae4de8de274..194da3ca7da23a 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -252,9 +252,13 @@ Status VOlapScanner::_init_return_columns(bool need_seq_col) { if (!slot->is_materialized()) { continue; } - int32_t index = slot->col_unique_id() >= 0 - ? _tablet_schema.field_index(slot->col_unique_id()) - : _tablet_schema.field_index(slot->col_name()); + + int32_t index = _tablet_schema.field_index(slot->col_unique_id()); + if (index < 0) { + // rollup/materialized view should use col_name to find index + index = _tablet_schema.field_index(slot->col_name()); + } + if (index < 0) { std::stringstream ss; ss << "field name is invalid. field=" << slot->col_name(); @@ -262,8 +266,9 @@ Status VOlapScanner::_init_return_columns(bool need_seq_col) { return Status::InternalError(ss.str()); } _return_columns.push_back(index); - if (slot->is_nullable() && !_tablet_schema.column(index).is_nullable()) + if (slot->is_nullable() && !_tablet_schema.column(index).is_nullable()) { _tablet_columns_convert_to_null_set.emplace(index); + } } // expand the sequence column diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 3412a999bb1921..a1e7d0b149cfee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -138,10 +138,10 @@ private RollupJobV2() { super(JobType.ROLLUP); } - public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs, - long baseIndexId, long rollupIndexId, String baseIndexName, String rollupIndexName, - List rollupSchema, int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType, - short rollupShortKeyColumnCount, OriginStatement origStmt) { + public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs, long baseIndexId, + long rollupIndexId, String baseIndexName, String rollupIndexName, List rollupSchema, + int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType, short rollupShortKeyColumnCount, + OriginStatement origStmt) { super(jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs); this.baseIndexId = baseIndexId; @@ -150,6 +150,7 @@ public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long t this.rollupIndexName = rollupIndexName; this.rollupSchema = rollupSchema; + this.baseSchemaHash = baseSchemaHash; this.rollupSchemaHash = rollupSchemaHash; this.rollupKeysType = rollupKeysType; @@ -376,8 +377,8 @@ protected void runWaitingTxnJob() throws AlterCancelException { List fullSchema = tbl.getBaseSchema(true); DescriptorTable descTable = new DescriptorTable(); + TupleDescriptor destTupleDesc = descTable.createTupleDescriptor(); for (Column column : fullSchema) { - TupleDescriptor destTupleDesc = descTable.createTupleDescriptor(); SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc); destSlotDesc.setIsMaterialized(true); destSlotDesc.setColumn(column); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 062b3b5e2791fd..d31769dbec8870 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -416,8 +416,8 @@ protected void runWaitingTxnJob() throws AlterCancelException { List fullSchema = tbl.getBaseSchema(true); DescriptorTable descTable = new DescriptorTable(); + TupleDescriptor destTupleDesc = descTable.createTupleDescriptor(); for (Column column : fullSchema) { - TupleDescriptor destTupleDesc = descTable.createTupleDescriptor(); SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc); destSlotDesc.setIsMaterialized(true); destSlotDesc.setColumn(column); @@ -648,7 +648,7 @@ private void onFinished(OlapTable tbl) { tbl.setStorageFormat(storageFormat); } - //update max column unique id + // update max column unique id int maxColUniqueId = tbl.getMaxColUniqueId(); for (Column column : tbl.getFullSchema()) { if (column.getUniqueId() > maxColUniqueId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java index 61e33892a46f2a..5a440227486aa0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java @@ -139,7 +139,7 @@ public void analyze(Analyzer analyzer) throws UserException { if (selectStmt.getAggInfo() != null) { mvKeysType = KeysType.AGG_KEYS; } - analyzeSelectClause(); + analyzeSelectClause(analyzer); analyzeFromClause(); if (selectStmt.getWhereClause() != null) { throw new AnalysisException("The where clause is not supported in add materialized view clause, expr:" @@ -156,7 +156,7 @@ public void analyze(Analyzer analyzer) throws UserException { } } - public void analyzeSelectClause() throws AnalysisException { + public void analyzeSelectClause(Analyzer analyzer) throws AnalysisException { SelectList selectList = selectStmt.getSelectList(); if (selectList.getItems().isEmpty()) { throw new AnalysisException("The materialized view must contain at least one column"); @@ -222,7 +222,7 @@ public void analyzeSelectClause() throws AnalysisException { } meetAggregate = true; // build mv column item - mvColumnItemList.add(buildMVColumnItem(functionCallExpr)); + mvColumnItemList.add(buildMVColumnItem(analyzer, functionCallExpr)); // TODO(ml): support REPLACE, REPLACE_IF_NOT_NULL, bitmap_union, hll_union only for aggregate table // TODO(ml): support different type of column, int -> bigint(sum) } @@ -347,7 +347,8 @@ private void supplyOrderColumn() throws AnalysisException { } } - private MVColumnItem buildMVColumnItem(FunctionCallExpr functionCallExpr) throws AnalysisException { + private MVColumnItem buildMVColumnItem(Analyzer analyzer, FunctionCallExpr functionCallExpr) + throws AnalysisException { String functionName = functionCallExpr.getFnName().getFunction(); List slots = new ArrayList<>(); functionCallExpr.collect(SlotRef.class, slots); @@ -399,6 +400,9 @@ private MVColumnItem buildMVColumnItem(FunctionCallExpr functionCallExpr) throws mvColumnName = baseColumnName; } else { mvColumnName = mvColumnBuilder(functionName, baseColumnName); + if (!functionChild0.getType().isStringType()) { + functionChild0.uncheckedCastChild(Type.VARCHAR, 0); + } defineExpr = functionChild0; } mvAggregateType = AggregateType.valueOf(functionName.toUpperCase()); @@ -410,6 +414,7 @@ private MVColumnItem buildMVColumnItem(FunctionCallExpr functionCallExpr) throws defineExpr = new CaseExpr(null, Lists.newArrayList(new CaseWhenClause( new IsNullPredicate(baseColumnRef, false), new IntLiteral(0, Type.BIGINT))), new IntLiteral(1, Type.BIGINT)); + defineExpr.analyze(analyzer); type = Type.BIGINT; break; default: diff --git a/fe/fe-core/src/main/java/org/apache/doris/rewrite/mvrewrite/CountFieldToSum.java b/fe/fe-core/src/main/java/org/apache/doris/rewrite/mvrewrite/CountFieldToSum.java index a636c39f9406e4..ee57c6317a254a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rewrite/mvrewrite/CountFieldToSum.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rewrite/mvrewrite/CountFieldToSum.java @@ -33,8 +33,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import java.util.List; - /** * Rewrite count(k1) to sum(mv_count_k1) when MV Column exists. * For example: @@ -92,9 +90,7 @@ private Expr rewriteExpr(Column mvColumn, Analyzer analyzer) { // exception to Unknown column, because we can't find an alias which named as origin table name that has // required column. SlotRef mvSlotRef = new SlotRef(null, mvColumn.getName()); - List newFnParams = Lists.newArrayList(); - newFnParams.add(mvSlotRef); - FunctionCallExpr result = new FunctionCallExpr("sum", newFnParams); + FunctionCallExpr result = new FunctionCallExpr("sum", Lists.newArrayList(mvSlotRef)); result.analyzeNoThrow(analyzer); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rewrite/mvrewrite/HLLHashToSlotRefRule.java b/fe/fe-core/src/main/java/org/apache/doris/rewrite/mvrewrite/HLLHashToSlotRefRule.java index 22a311d0f81f8d..cf13f52cb4f147 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rewrite/mvrewrite/HLLHashToSlotRefRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rewrite/mvrewrite/HLLHashToSlotRefRule.java @@ -106,8 +106,7 @@ private Expr rewriteExpr(String fnName, SlotRef queryColumnSlotRef, Column mvCol TableName tableName = queryColumnSlotRef.getTableName(); Preconditions.checkNotNull(tableName); SlotRef mvSlotRef = new SlotRef(tableName, mvColumn.getName()); - List newFnParams = Lists.newArrayList(); - newFnParams.add(mvSlotRef); + List newFnParams = Lists.newArrayList(mvSlotRef); FunctionCallExpr result = new FunctionCallExpr(fnName, newFnParams); result.analyzeNoThrow(analyzer); return result; diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java index 72b06e2efeb563..1637829ccabb71 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/RollupJobV2Test.java @@ -314,8 +314,8 @@ public void testSchemaChangeWhileTabletNotStable() throws Exception { @Test - public void testSerializeOfRollupJob(@Mocked CreateMaterializedViewStmt stmt) throws IOException, - AnalysisException { + public void testSerializeOfRollupJob(@Mocked CreateMaterializedViewStmt stmt) + throws IOException { // prepare file File file = new File(fileName); file.createNewFile(); @@ -326,6 +326,7 @@ public void testSerializeOfRollupJob(@Mocked CreateMaterializedViewStmt stmt) th String mvColumnName = CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PREFIX + "to_bitmap_" + "c1"; Column column = new Column(mvColumnName, Type.BITMAP, false, AggregateType.BITMAP_UNION, false, "1", ""); columns.add(column); + RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, "test", "rollup", columns, 1, 1, KeysType.AGG_KEYS, keysCount, new OriginStatement("create materialized view rollup as select bitmap_union(to_bitmap(c1)) from test", diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateMaterializedViewStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateMaterializedViewStmtTest.java index ce4a9c029d6ca0..0ae3b543b011a1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateMaterializedViewStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateMaterializedViewStmtTest.java @@ -1167,7 +1167,8 @@ public void testBuildMVColumnItem(@Injectable SelectStmt selectStmt, result = Type.LARGEINT; } }; - MVColumnItem mvColumnItem = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", functionCallExpr); + MVColumnItem mvColumnItem = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", analyzer, + functionCallExpr); Assert.assertEquals(Type.LARGEINT, mvColumnItem.getType()); SlotRef slotRef2 = new SlotRef(new TableName(internalCtl, "db", "table"), "a"); @@ -1183,7 +1184,8 @@ public void testBuildMVColumnItem(@Injectable SelectStmt selectStmt, result = Type.BIGINT; } }; - MVColumnItem mvColumnItem2 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", functionCallExpr2); + MVColumnItem mvColumnItem2 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", analyzer, + functionCallExpr2); Assert.assertEquals(Type.BIGINT, mvColumnItem2.getType()); SlotRef slotRef3 = new SlotRef(new TableName(internalCtl, "db", "table"), "a"); @@ -1199,7 +1201,8 @@ public void testBuildMVColumnItem(@Injectable SelectStmt selectStmt, result = Type.VARCHAR; } }; - MVColumnItem mvColumnItem3 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", functionCallExpr3); + MVColumnItem mvColumnItem3 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", analyzer, + functionCallExpr3); Assert.assertEquals(Type.VARCHAR, mvColumnItem3.getType()); SlotRef slotRef4 = new SlotRef(new TableName(internalCtl, "db", "table"), "a"); @@ -1215,7 +1218,8 @@ public void testBuildMVColumnItem(@Injectable SelectStmt selectStmt, result = Type.DOUBLE; } }; - MVColumnItem mvColumnItem4 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", functionCallExpr4); + MVColumnItem mvColumnItem4 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", analyzer, + functionCallExpr4); Assert.assertEquals(Type.DOUBLE, mvColumnItem4.getType()); } @@ -1242,7 +1246,8 @@ public void testKeepScaleAndPrecisionOfType(@Injectable SelectStmt selectStmt, result = ScalarType.createVarchar(50); } }; - MVColumnItem mvColumnItem = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", functionCallExpr); + MVColumnItem mvColumnItem = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", analyzer, + functionCallExpr); Assert.assertEquals(50, mvColumnItem.getType().getLength()); SlotRef slotRef2 = new SlotRef(new TableName(internalCtl, "db", "table"), "a"); @@ -1258,7 +1263,8 @@ public void testKeepScaleAndPrecisionOfType(@Injectable SelectStmt selectStmt, result = ScalarType.createDecimalType(10, 1); } }; - MVColumnItem mvColumnItem2 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", functionCallExpr2); + MVColumnItem mvColumnItem2 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", analyzer, + functionCallExpr2); Assert.assertEquals(new Integer(10), mvColumnItem2.getType().getPrecision()); Assert.assertEquals(1, ((ScalarType) mvColumnItem2.getType()).getScalarScale()); @@ -1275,7 +1281,8 @@ public void testKeepScaleAndPrecisionOfType(@Injectable SelectStmt selectStmt, result = ScalarType.createChar(5); } }; - MVColumnItem mvColumnItem3 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", functionCallExpr3); + MVColumnItem mvColumnItem3 = Deencapsulation.invoke(createMaterializedViewStmt, "buildMVColumnItem", analyzer, + functionCallExpr3); Assert.assertEquals(5, mvColumnItem3.getType().getLength()); } } diff --git a/regression-test/data/rollup/test_materialized_view.out b/regression-test/data/rollup/test_materialized_view.out index 444968df729c64fd3bcafacab8b033dd7da741c7..6c679aa19c136de41ace1d95e17976459aa48735 100644 GIT binary patch delta 130 zcmbQo{+(;XJm$#@85K7+C@>0fLIGD|Zb^J{eraCGWPe8OXlKV@SA}p7S3iY>;>4WP zc%W>8f@iRTU#O3dLI_CQK*7}qsK!vi)z5{~$S64x?xG+aSSC`9>(};@;0B`Cdg#Z8m delta 7 Ocmey)HIIG6JZ1n5g#!2h diff --git a/regression-test/suites/rollup/test_materialized_view.groovy b/regression-test/suites/rollup/test_materialized_view.groovy index 07e040bae3ee9e..231ca4de23c352 100644 --- a/regression-test/suites/rollup/test_materialized_view.groovy +++ b/regression-test/suites/rollup/test_materialized_view.groovy @@ -105,10 +105,16 @@ suite("test_materialized_view", "rollup") { } } sql "SELECT store_id, count(sale_amt) FROM ${tbName1} GROUP BY store_id;" + qt_sql "DESC ${tbName1} ALL;" + + qt_sql "SELECT store_id, count(sale_amt) FROM ${tbName1} GROUP BY store_id;" + + explain { + sql("SELECT store_id, count(sale_amt) FROM ${tbName1} GROUP BY store_id;") + contains "(amt_count)" + } sql "DROP TABLE ${tbName1} FORCE;" sql "DROP TABLE ${tbName2} FORCE;" } - -