Skip to content

Commit

Permalink
Fix the incorrect value of the max thread size (#9881) (#9883)
Browse files Browse the repository at this point in the history
ref #9745, close #9880

Signed-off-by: JaySon-Huang <[email protected]>
Signed-off-by: Calvin Neo <[email protected]>

Co-authored-by: JaySon-Huang <[email protected]>
Co-authored-by: Calvin Neo <[email protected]>
  • Loading branch information
3 people authored Feb 19, 2025
1 parent d8f0c00 commit 4d137a0
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 24 deletions.
12 changes: 11 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,17 @@ namespace DB
M(force_set_parallel_prehandle_threshold) \
M(force_raise_prehandle_exception) \
M(force_agg_on_partial_block) \
M(delta_tree_create_node_fail)
M(force_agg_prefetch) \
M(force_set_fap_candidate_store_id) \
M(force_not_clean_fap_on_destroy) \
M(force_fap_worker_throw) \
M(delta_tree_create_node_fail) \
M(disable_flush_cache) \
M(force_agg_two_level_hash_table_before_merge) \
M(force_thread_0_no_agg_spill) \
M(force_checkpoint_dump_throw_datafile) \
M(force_semi_join_time_exceed) \
M(force_set_proxy_state_machine_cpu_cores)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Common/getNumberOfCPUCores.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ UInt16 getNumberOfPhysicalCPUCores()
return CPUCores::number_of_physical_cpu_cores;
}

// We should call this function before Context has been created,
// which will call `getNumberOfLogicalCPUCores`, or we can not
// set cpu cores any more.
void setNumberOfLogicalCPUCores(UInt16 number_of_logical_cpu_cores_)
{
CPUCores::number_of_logical_cpu_cores = number_of_logical_cpu_cores_;
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Common/getNumberOfCPUCores.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
UInt16 getNumberOfLogicalCPUCores();
UInt16 getNumberOfPhysicalCPUCores();

// We should call this function before Context has been created,
// which will call `getNumberOfLogicalCPUCores`, or we can not
// set cpu cores any more.
void setNumberOfLogicalCPUCores(UInt16 number_of_logical_cpu_cores_);

void computeAndSetNumberOfPhysicalCPUCores(
Expand Down
6 changes: 1 addition & 5 deletions dbms/src/Interpreters/SettingsCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,7 @@ struct SettingMaxThreads
is_auto = true;
}

static UInt64 getAutoValue()
{
static auto res = getNumberOfLogicalCPUCores();
return res;
}
static UInt64 getAutoValue() { return getNumberOfLogicalCPUCores(); }

UInt64 get() const { return value; }

Expand Down
17 changes: 5 additions & 12 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
#include <Storages/KVStore/FFI/FileEncryption.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/ProxyStateMachine.h>
#include <Storages/KVStore/TMTContext.h>
#include <Storages/KVStore/TiKVHelpers/PDTiKVClient.h>
#include <Storages/Page/V3/Universal/UniversalPageStorage.h>
Expand Down Expand Up @@ -1072,6 +1073,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
* settings, available functions, data types, aggregate functions, databases...
*/
global_context = Context::createGlobal();
global_context->setApplicationType(Context::ApplicationType::SERVER);
global_context->getSharedContextDisagg()->disaggregated_mode = disaggregated_mode;
global_context->getSharedContextDisagg()->use_autoscaler = use_autoscaler;
/// Initialize users config reloader.
auto users_config_reloader = UserConfig::parseSettings(config(), config_path, global_context, log);

Expand Down Expand Up @@ -1140,15 +1144,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
});

/// get CPU/memory/disk info of this server
diagnosticspb::ServerInfoRequest request;
diagnosticspb::ServerInfoResponse response;
request.set_tp(static_cast<diagnosticspb::ServerInfoType>(1));
std::string req = request.SerializeAsString();
ffi_get_server_info_from_proxy(reinterpret_cast<intptr_t>(&helper), strIntoView(&req), &response);
server_info.parseSysInfo(response);
setNumberOfLogicalCPUCores(server_info.cpu_info.logical_cores);
computeAndSetNumberOfPhysicalCPUCores(server_info.cpu_info.logical_cores, server_info.cpu_info.physical_cores);
LOG_INFO(log, "ServerInfo: {}", server_info.debugString());
getServerInfoFromProxy(log, server_info, &helper, settings);

grpc_log = Logger::get("grpc");
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
Expand All @@ -1162,9 +1158,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
// Reset the `tiflash_instance_wrap.tmt` before `global_context` get released, or it will be a dangling pointer
tiflash_instance_wrap.tmt = nullptr;
});
global_context->setApplicationType(Context::ApplicationType::SERVER);
global_context->getSharedContextDisagg()->disaggregated_mode = disaggregated_mode;
global_context->getSharedContextDisagg()->use_autoscaler = use_autoscaler;

/// Init File Provider
bool enable_encryption = false;
Expand Down
82 changes: 82 additions & 0 deletions dbms/src/Storages/KVStore/ProxyStateMachine.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Logger.h>
#include <Common/setThreadName.h>
#include <Core/TiFlashDisaggregatedMode.h>
#include <Interpreters/Settings.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Server/ServerInfo.h>
#include <Storages/FormatVersion.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>
#include <Storages/KVStore/KVStore.h>
#include <Storages/KVStore/TMTContext.h>

#include <boost/noncopyable.hpp>
#include <chrono>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

namespace DB
{
namespace FailPoints
{
extern const char force_set_proxy_state_machine_cpu_cores[];
} // namespace FailPoints

inline void getServerInfoFromProxy(
LoggerPtr log,
ServerInfo & server_info,
EngineStoreServerHelper * helper,
Settings & global_settings)
{
/// get CPU/memory/disk info of this server
diagnosticspb::ServerInfoRequest request;
diagnosticspb::ServerInfoResponse response;
request.set_tp(static_cast<diagnosticspb::ServerInfoType>(1));
std::string req = request.SerializeAsString();
#ifndef DBMS_PUBLIC_GTEST
// In tests, no proxy is provided, and Server is not linked to.
ffi_get_server_info_from_proxy(reinterpret_cast<intptr_t>(helper), strIntoView(&req), &response);
server_info.parseSysInfo(response);
LOG_INFO(log, "ServerInfo: {}", server_info.debugString());
#else
UNUSED(helper);
#endif
fiu_do_on(FailPoints::force_set_proxy_state_machine_cpu_cores, {
server_info.cpu_info.logical_cores = 12345;
}); // Mock a server_info
setNumberOfLogicalCPUCores(server_info.cpu_info.logical_cores);
computeAndSetNumberOfPhysicalCPUCores(server_info.cpu_info.logical_cores, server_info.cpu_info.physical_cores);

// If the max_threads in global_settings is "0"/"auto", it is set by
// `getNumberOfLogicalCPUCores` in `SettingMaxThreads::getAutoValue`.
// We should let it follow the cpu cores get from proxy.
if (global_settings.max_threads.is_auto && global_settings.max_threads.get() != getNumberOfLogicalCPUCores())
{
// now it should set the max_threads value according to the new logical cores
global_settings.max_threads.setAuto();
LOG_INFO(
log,
"Reset max_threads, max_threads={} logical_cores={}",
global_settings.max_threads.get(),
getNumberOfLogicalCPUCores());
}
}
} // namespace DB
61 changes: 61 additions & 0 deletions dbms/src/Storages/KVStore/tests/gtest_proxy_state_machine.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Interpreters/Settings.h>
#include <Storages/KVStore/ProxyStateMachine.h>
#include <TestUtils/TiFlashTestBasic.h>

#include <ext/scope_guard.h>

// TODO: Move ServerInfo into KVStore, to make it more conhensive.
namespace DB
{
namespace FailPoints
{
extern const char force_set_proxy_state_machine_cpu_cores[];
} // namespace FailPoints

namespace tests
{
TEST(ProxyStateMachineTest, SetLogicalCores)
{
auto log = Logger::get();
{
FailPointHelper::enableFailPoint(FailPoints::force_set_proxy_state_machine_cpu_cores);
SCOPE_EXIT({ FailPointHelper::disableFailPoint(FailPoints::force_set_proxy_state_machine_cpu_cores); });
Settings settings;
ServerInfo server_info;
getServerInfoFromProxy(log, server_info, nullptr, settings);
ASSERT_EQ(settings.max_threads.get(), 12345);
}
{
// If user explicitly set `max_threads`, then `getServerInfo` won't overwrite the value
FailPointHelper::enableFailPoint(FailPoints::force_set_proxy_state_machine_cpu_cores);
SCOPE_EXIT({ FailPointHelper::disableFailPoint(FailPoints::force_set_proxy_state_machine_cpu_cores); });
Settings settings;
settings.max_threads.set(8);
ServerInfo server_info;
getServerInfoFromProxy(log, server_info, nullptr, settings);
ASSERT_EQ(settings.max_threads.get(), 8);
}
{
Settings settings;
ServerInfo server_info;
getServerInfoFromProxy(log, server_info, nullptr, settings);
ASSERT_EQ(settings.max_threads.get(), std::thread::hardware_concurrency());
}
}
} // namespace tests
} // namespace DB

0 comments on commit 4d137a0

Please sign in to comment.