Skip to content

Commit

Permalink
Merge branch 'master' into add-cases-for-ip
Browse files Browse the repository at this point in the history
  • Loading branch information
amorynan authored Oct 12, 2024
2 parents 0e6b19a + 02c671b commit f127902
Show file tree
Hide file tree
Showing 2,767 changed files with 173,747 additions and 20,422 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/code-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ jobs:
popd
export PATH="${DEFAULT_DIR}/ldb-toolchain/bin/:$(pwd)/thirdparty/installed/bin/:${PATH}"
DISABLE_BE_JAVA_EXTENSIONS=ON DO_NOT_CHECK_JAVA_ENV=ON DORIS_TOOLCHAIN=clang ENABLE_PCH=OFF OUTPUT_BE_BINARY=0 ./build.sh --be --cloud
DISABLE_BE_JAVA_EXTENSIONS=ON DO_NOT_CHECK_JAVA_ENV=ON DORIS_TOOLCHAIN=clang ENABLE_PCH=OFF OUTPUT_BE_BINARY=0 ./build.sh --be
fi
echo "should_check=${{ steps.filter.outputs.cpp_changes }}" >>${GITHUB_OUTPUT}
Expand Down
11 changes: 6 additions & 5 deletions .github/workflows/scope-label.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
---
name: Add Scope Labeler

on:
pull_request_target:
types:
- opened
- synchronize
# This action has some error, skip it temporarily
#on:
# pull_request_target:
# types:
# - opened
# - synchronize

jobs:
process:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ thirdparty/doris-thirdparty*.tar.xz
docker/thirdparties/docker-compose/mysql/data
docker/thirdparties/docker-compose/hive/scripts/tpch1.db/
docker/thirdparties/docker-compose/hive/scripts/paimon1
docker/thirdparties/docker-compose/hive/scripts/tvf_data

fe_plugins/output
fe_plugins/**/.factorypath
Expand Down
72 changes: 50 additions & 22 deletions be/src/agent/be_exec_version_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@

#include "agent/be_exec_version_manager.h"

namespace doris {
#include "common/exception.h"

const std::map<int, const std::set<std::string>> AGGREGATION_CHANGE_MAP = {
{AGGREGATION_2_1_VERSION,
{"window_funnel", "stddev_samp", "variance_samp", "percentile_approx_weighted",
"percentile_approx", "covar_samp", "percentile", "percentile_array"}}};
namespace doris {

Status BeExecVersionManager::check_be_exec_version(int be_exec_version) {
if (be_exec_version > max_be_exec_version || be_exec_version < min_be_exec_version) {
Expand All @@ -35,19 +32,42 @@ Status BeExecVersionManager::check_be_exec_version(int be_exec_version) {
return Status::OK();
}

void BeExecVersionManager::check_agg_state_compatibility(int current_be_exec_version,
int data_be_exec_version,
std::string function_name) {
if (current_be_exec_version > AGGREGATION_2_1_VERSION &&
data_be_exec_version <= AGGREGATION_2_1_VERSION &&
AGGREGATION_CHANGE_MAP.find(AGGREGATION_2_1_VERSION)->second.contains(function_name)) {
int BeExecVersionManager::get_function_compatibility(int be_exec_version,
std::string function_name) {
if (_function_restrict_map.contains(function_name) && be_exec_version != get_newest_version()) {
throw Exception(Status::InternalError(
"agg state data with {} is not supported, "
"current_be_exec_version={}, data_be_exec_version={}, need to rebuild the data "
"or set the be_exec_version={} in fe.conf",
function_name, current_be_exec_version, data_be_exec_version,
AGGREGATION_2_1_VERSION));
"function {} do not support old be exec version, maybe it's because doris are "
"doing a rolling upgrade. newest_version={}, input_be_exec_version={}",
function_name, get_newest_version(), be_exec_version));
}

auto it = _function_change_map.find(function_name);
if (it == _function_change_map.end()) {
// 0 means no compatibility issues need to be dealt with
return 0;
}

auto version_it = it->second.lower_bound(be_exec_version);
if (version_it == it->second.end()) {
return 0;
}

return *version_it;
}

void BeExecVersionManager::check_function_compatibility(int current_be_exec_version,
int data_be_exec_version,
std::string function_name) {
if (get_function_compatibility(current_be_exec_version, function_name) ==
get_function_compatibility(data_be_exec_version, function_name)) {
return;
}

throw Exception(Status::InternalError(
"agg state data with {} is not supported, "
"current_be_exec_version={}, data_be_exec_version={}, need to rebuild the data "
"or set the be_exec_version={} in fe.conf temporary",
function_name, current_be_exec_version, data_be_exec_version, data_be_exec_version));
}

/**
Expand All @@ -69,7 +89,7 @@ void BeExecVersionManager::check_agg_state_compatibility(int current_be_exec_ver
* 3: start from doris 2.0.0 (by some mistakes)
* a. aggregation function do not serialize bitmap to string.
* b. support window funnel mode.
* 4/5: start from doris 2.1.0
* 4: start from doris 2.1.0
* a. ignore this line, window funnel mode should be enabled from 2.0.
* b. array contains/position/countequal function return nullable in less situations.
* c. cleared old version of Version 2.
Expand All @@ -79,14 +99,22 @@ void BeExecVersionManager::check_agg_state_compatibility(int current_be_exec_ver
* g. do local merge of remote runtime filter
* h. "now": ALWAYS_NOT_NULLABLE -> DEPEND_ON_ARGUMENTS
*
* 7: start from doris 3.0.0
* 5: start from doris 3.0.0
* a. change some agg function nullable property: PR #37215
*
* 6: start from doris 3.0.1 and 2.1.6
* a. change the impl of percentile (need fix)
* b. clear old version of version 3->4
* c. change FunctionIsIPAddressInRange from AlwaysNotNullable to DependOnArguments
* d. change some agg function nullable property: PR #37215
* e. change variant serde to fix PR #38413
* d. change variant serde to fix PR #38413
*
* 7: start from doris 3.0.2
* a. window funnel logic change
* b. support const column in serialize/deserialize function: PR #41175
*/
const int BeExecVersionManager::max_be_exec_version = 7;
const int BeExecVersionManager::min_be_exec_version = 0;

const int BeExecVersionManager::max_be_exec_version = 8;
const int BeExecVersionManager::min_be_exec_version = 0;
std::map<std::string, std::set<int>> BeExecVersionManager::_function_change_map {};
std::set<std::string> BeExecVersionManager::_function_restrict_map;
} // namespace doris
28 changes: 25 additions & 3 deletions be/src/agent/be_exec_version_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,44 @@ constexpr inline int OLD_WAL_SERDE = 3; // use to solve compatibility is
constexpr inline int AGG_FUNCTION_NULLABLE = 5; // change some agg nullable property: PR #37215
constexpr inline int VARIANT_SERDE = 6; // change variant serde to fix PR #38413
constexpr inline int AGGREGATION_2_1_VERSION =
5; // some aggregation changed the data format after this version
6; // some aggregation changed the data format after this version
constexpr inline int USE_CONST_SERDE =
8; // support const column in serialize/deserialize function: PR #41175

class BeExecVersionManager {
public:
BeExecVersionManager() = delete;

static Status check_be_exec_version(int be_exec_version);

static void check_agg_state_compatibility(int current_be_exec_version, int data_be_exec_version,
std::string function_name);
static int get_function_compatibility(int be_exec_version, std::string function_name);

static void check_function_compatibility(int current_be_exec_version, int data_be_exec_version,
std::string function_name);

static int get_newest_version() { return max_be_exec_version; }

static std::string get_function_suffix(int be_exec_version) {
return "_for_old_version_" + std::to_string(be_exec_version);
}

// For example, there are incompatible changes between version=7 and version=6, at this time breaking_old_version is 6.
static void registe_old_function_compatibility(int breaking_old_version,
std::string function_name) {
_function_change_map[function_name].insert(breaking_old_version);
}

static void registe_restrict_function_compatibility(std::string function_name) {
_function_restrict_map.insert(function_name);
}

private:
static const int max_be_exec_version;
static const int min_be_exec_version;
// [function name] -> [breaking change start version]
static std::map<std::string, std::set<int>> _function_change_map;
// those function must has input newest be exec version
static std::set<std::string> _function_restrict_map;
};

} // namespace doris
60 changes: 32 additions & 28 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,42 +240,46 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
master_info.network_address.hostname, master_info.network_address.port);
}

if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_engine.notify_listeners();
}

if (master_info.__isset.meta_service_endpoint != config::is_cloud_mode()) {
return Status::InvalidArgument(
"fe and be do not work in same mode, fe cloud mode: {},"
" be cloud mode: {}",
master_info.__isset.meta_service_endpoint, config::is_cloud_mode());
}

if (master_info.__isset.meta_service_endpoint && config::meta_service_endpoint.empty() &&
!master_info.meta_service_endpoint.empty()) {
auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint,
true);
LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint << " "
<< st;
LOG(WARNING) << "Detected mismatch in cloud mode configuration between FE and BE. "
<< "FE cloud mode: "
<< (master_info.__isset.meta_service_endpoint ? "true" : "false")
<< ", BE cloud mode: " << (config::is_cloud_mode() ? "true" : "false")
<< ". If fe is earlier than version 3.0.2, the message can be ignored.";
}

if (master_info.__isset.cloud_instance_id) {
if (!config::cloud_instance_id.empty() &&
config::cloud_instance_id != master_info.cloud_instance_id) {
return Status::InvalidArgument(
"cloud_instance_id in fe.conf and be.conf are not same, fe: {}, be: {}",
master_info.cloud_instance_id, config::cloud_instance_id);
if (master_info.__isset.meta_service_endpoint) {
if (config::meta_service_endpoint.empty() && !master_info.meta_service_endpoint.empty()) {
auto st = config::set_config("meta_service_endpoint", master_info.meta_service_endpoint,
true);
LOG(INFO) << "set config meta_service_endpoing " << master_info.meta_service_endpoint
<< " " << st;
}

if (config::cloud_instance_id.empty() && !master_info.cloud_instance_id.empty()) {
auto st = config::set_config("cloud_instance_id", master_info.cloud_instance_id, true);
config::set_cloud_unique_id(master_info.cloud_instance_id);
LOG(INFO) << "set config cloud_instance_id " << master_info.cloud_instance_id << " "
<< st;
if (master_info.meta_service_endpoint != config::meta_service_endpoint) {
LOG(WARNING) << "Detected mismatch in meta_service_endpoint configuration between FE "
"and BE. "
<< "FE meta_service_endpoint: " << master_info.meta_service_endpoint
<< ", BE meta_service_endpoint: " << config::meta_service_endpoint;
return Status::InvalidArgument<false>(
"fe and be do not work in same mode, fe meta_service_endpoint: {},"
" be meta_service_endpoint: {}",
master_info.meta_service_endpoint, config::meta_service_endpoint);
}
}

if (master_info.__isset.cloud_unique_id &&
config::cloud_unique_id != master_info.cloud_unique_id &&
config::enable_use_cloud_unique_id_from_fe) {
auto st = config::set_config("cloud_unique_id", master_info.cloud_unique_id, true);
LOG(INFO) << "set config cloud_unique_id " << master_info.cloud_unique_id << " " << st;
}

if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_engine.notify_listeners();
}

return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ Status CloudBaseCompaction::modify_rowsets() {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, _rowid_conversion, compaction_type(),
_stats.merged_rows, initiator, output_rowset_delete_bitmap,
_input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(),
_stats.merged_rows, _stats.filtered_rows, initiator, output_rowset_delete_bitmap,
_allow_delete_in_cumu_compaction));
LOG_INFO("update delete bitmap in CloudBaseCompaction, tablet_id={}, range=[{}-{}]",
_tablet->tablet_id(), _input_rowsets.front()->start_version(),
Expand Down
Loading

0 comments on commit f127902

Please sign in to comment.