Skip to content

Commit

Permalink
Fix the incorrect value of the max thread size (#9881) (#9884)
Browse files Browse the repository at this point in the history
ref #9745, close #9880

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: Calvin Neo <[email protected]>

Co-authored-by: Calvin Neo <[email protected]>
Co-authored-by: Calvin Neo <[email protected]>
  • Loading branch information
3 people authored Feb 24, 2025
1 parent e9e162e commit 8084e4d
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 216 deletions.
19 changes: 18 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,24 @@ namespace DB
M(force_schema_sync_too_old_schema) \
M(skip_seek_before_read_dmfile) \
M(exception_after_large_write_exceed) \
M(delta_tree_create_node_fail)
M(proactive_flush_force_set_type) \
M(exception_when_fetch_disagg_pages) \
M(cop_send_failure) \
M(file_cache_fg_download_fail) \
M(force_set_parallel_prehandle_threshold) \
M(force_raise_prehandle_exception) \
M(force_agg_on_partial_block) \
M(force_agg_prefetch) \
M(force_set_fap_candidate_store_id) \
M(force_not_clean_fap_on_destroy) \
M(force_fap_worker_throw) \
M(delta_tree_create_node_fail) \
M(disable_flush_cache) \
M(force_agg_two_level_hash_table_before_merge) \
M(force_thread_0_no_agg_spill) \
M(force_checkpoint_dump_throw_datafile) \
M(force_semi_join_time_exceed) \
M(force_set_proxy_state_machine_cpu_cores)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
M(pause_with_alter_locks_acquired) \
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Common/getNumberOfCPUCores.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ UInt16 getNumberOfPhysicalCPUCores()
return CPUCores::number_of_physical_cpu_cores;
}

// We should call this function before Context has been created,
// which will call `getNumberOfLogicalCPUCores`, or we can not
// set cpu cores any more.
void setNumberOfLogicalCPUCores(UInt16 number_of_logical_cpu_cores_)
{
CPUCores::number_of_logical_cpu_cores = number_of_logical_cpu_cores_;
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Common/getNumberOfCPUCores.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
UInt16 getNumberOfLogicalCPUCores();
UInt16 getNumberOfPhysicalCPUCores();

// We should call this function before Context has been created,
// which will call `getNumberOfLogicalCPUCores`, or we can not
// set cpu cores any more.
void setNumberOfLogicalCPUCores(UInt16 number_of_logical_cpu_cores_);

void computeAndSetNumberOfPhysicalCPUCores(UInt16 number_of_logical_cpu_cores, UInt16 number_of_hardware_physical_cores);
192 changes: 0 additions & 192 deletions dbms/src/Functions/tests/bench_function_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,197 +166,5 @@ try
CATCH
BENCHMARK_REGISTER_F(LikeBench, like)->Iterations(10);

<<<<<<< HEAD:dbms/src/Functions/tests/bench_function_ilike.cpp
=======
class CollationBench : public benchmark::Fixture
{
public:
using ColStringType = typename TypeTraits<String>::FieldType;
using ColUInt8Type = typename TypeTraits<UInt8>::FieldType;

ColumnsWithTypeAndName data{
toVec<String>("col0", std::vector<ColStringType>(1000000, "aaaaaaaaaaaaa")),
toVec<String>("col1", std::vector<ColStringType>(1000000, "aaaaaaaaaaaaa")),
toVec<UInt8>("result", std::vector<ColUInt8Type>{})};

ColumnsWithTypeAndName like_data{
toVec<String>("col0", std::vector<ColStringType>(1000000, "qwdgefwabchfue")),
createConstColumn<String>(1000000, "%abc%"),
createConstColumn<Int32>(1000000, static_cast<Int32>('\\')),
toVec<UInt8>("result", std::vector<ColUInt8Type>{})};
};

class CollationLessBench : public CollationBench
{
public:
void SetUp(const benchmark::State &) override {}
};

class CollationEqBench : public CollationBench
{
public:
void SetUp(const benchmark::State &) override {}
};

class CollationLikeBench : public CollationBench
{
public:
void SetUp(const benchmark::State &) override {}
};

#define BENCH_LESS_COLLATOR(collator) \
BENCHMARK_DEFINE_F(CollationLessBench, collator) \
(benchmark::State & state) \
try \
{ \
FunctionLess fl; \
TiDB::TiDBCollatorPtr collator = TiDB::ITiDBCollator::getCollator(TiDB::ITiDBCollator::collator); \
fl.setCollator(collator); \
Block block(data); \
ColumnNumbers arguments{0, 1}; \
for (auto _ : state) \
{ \
fl.executeImpl(block, arguments, 2); \
} \
} \
CATCH \
BENCHMARK_REGISTER_F(CollationLessBench, collator)->Iterations(10);


#define BENCH_EQ_COLLATOR(collator) \
BENCHMARK_DEFINE_F(CollationEqBench, collator) \
(benchmark::State & state) \
try \
{ \
FunctionEquals fe; \
TiDB::TiDBCollatorPtr collator = TiDB::ITiDBCollator::getCollator(TiDB::ITiDBCollator::collator); \
fe.setCollator(collator); \
Block block(data); \
ColumnNumbers arguments{0, 1}; \
for (auto _ : state) \
{ \
fe.executeImpl(block, arguments, 2); \
} \
} \
CATCH \
BENCHMARK_REGISTER_F(CollationEqBench, collator)->Iterations(10);


#define BENCH_LIKE_COLLATOR(collator) \
BENCHMARK_DEFINE_F(CollationLikeBench, collator) \
(benchmark::State & state) \
try \
{ \
FunctionLike3Args fl; \
TiDB::TiDBCollatorPtr collator = TiDB::ITiDBCollator::getCollator(TiDB::ITiDBCollator::collator); \
fl.setCollator(collator); \
Block block(like_data); \
ColumnNumbers arguments{0, 1, 2}; \
for (auto _ : state) \
{ \
fl.executeImpl(block, arguments, 3); \
} \
} \
CATCH \
BENCHMARK_REGISTER_F(CollationLikeBench, collator)->Iterations(10);


BENCH_LESS_COLLATOR(UTF8MB4_BIN);
BENCH_LESS_COLLATOR(UTF8MB4_GENERAL_CI);
BENCH_LESS_COLLATOR(UTF8MB4_UNICODE_CI);
BENCH_LESS_COLLATOR(UTF8MB4_0900_AI_CI);
BENCH_LESS_COLLATOR(UTF8MB4_0900_BIN);
BENCH_LESS_COLLATOR(UTF8_BIN);
BENCH_LESS_COLLATOR(UTF8_GENERAL_CI);
BENCH_LESS_COLLATOR(UTF8_UNICODE_CI);
BENCH_LESS_COLLATOR(ASCII_BIN);
BENCH_LESS_COLLATOR(BINARY);
BENCH_LESS_COLLATOR(LATIN1_BIN);

BENCH_EQ_COLLATOR(UTF8MB4_BIN);
BENCH_EQ_COLLATOR(UTF8MB4_GENERAL_CI);
BENCH_EQ_COLLATOR(UTF8MB4_UNICODE_CI);
BENCH_EQ_COLLATOR(UTF8MB4_0900_AI_CI);
BENCH_EQ_COLLATOR(UTF8MB4_0900_BIN);
BENCH_EQ_COLLATOR(UTF8_BIN);
BENCH_EQ_COLLATOR(UTF8_GENERAL_CI);
BENCH_EQ_COLLATOR(UTF8_UNICODE_CI);
BENCH_EQ_COLLATOR(ASCII_BIN);
BENCH_EQ_COLLATOR(BINARY);
BENCH_EQ_COLLATOR(LATIN1_BIN);

BENCH_LIKE_COLLATOR(UTF8MB4_BIN);
BENCH_LIKE_COLLATOR(UTF8MB4_GENERAL_CI);
BENCH_LIKE_COLLATOR(UTF8MB4_UNICODE_CI);
BENCH_LIKE_COLLATOR(UTF8MB4_0900_AI_CI);
BENCH_LIKE_COLLATOR(UTF8MB4_0900_BIN);
BENCH_LIKE_COLLATOR(UTF8_BIN);
BENCH_LIKE_COLLATOR(UTF8_GENERAL_CI);
BENCH_LIKE_COLLATOR(UTF8_UNICODE_CI);
BENCH_LIKE_COLLATOR(ASCII_BIN);
BENCH_LIKE_COLLATOR(BINARY);
BENCH_LIKE_COLLATOR(LATIN1_BIN);

class LengthBench : public benchmark::Fixture
{
public:
using ColStringType = typename TypeTraits<String>::FieldType;

ColumnsWithTypeAndName data1{toVec<String>("col", std::vector<ColStringType>(data_num, ""))};
ColumnsWithTypeAndName data2{toVec<String>("col", std::vector<ColStringType>(data_num, "aaaaaaaaaa"))};
ColumnsWithTypeAndName data3{toVec<String>("col", std::vector<ColStringType>(data_num, "啊aaaaaaaa"))};

void SetUp(const benchmark::State &) override {}
};

BENCHMARK_DEFINE_F(LengthBench, bench)
(benchmark::State & state)
try
{
FunctionLength function_length;
std::vector<Block> blocks{Block(data1), Block(data2), Block(data3)};
for (auto & block : blocks)
block.insert({nullptr, std::make_shared<DataTypeNumber<UInt8>>(), "res"});
ColumnNumbers arguments{0};
for (auto _ : state)
{
for (auto & block : blocks)
function_length.executeImpl(block, arguments, 1);
}
}
CATCH
BENCHMARK_REGISTER_F(LengthBench, bench)->Iterations(10);

class ASCIIBench : public benchmark::Fixture
{
public:
using ColStringType = typename TypeTraits<String>::FieldType;

ColumnsWithTypeAndName data1{toVec<String>("col", std::vector<ColStringType>(data_num, ""))};
ColumnsWithTypeAndName data2{toVec<String>("col", std::vector<ColStringType>(data_num, "aaaaaaaaaa"))};
ColumnsWithTypeAndName data3{toVec<String>("col", std::vector<ColStringType>(data_num, "啊aaaaaaaa"))};

void SetUp(const benchmark::State &) override {}
};

BENCHMARK_DEFINE_F(ASCIIBench, bench)
(benchmark::State & state)
try
{
FunctionASCII function_ascii;
std::vector<Block> blocks{Block(data1), Block(data2), Block(data3)};
for (auto & block : blocks)
block.insert({nullptr, std::make_shared<DataTypeNumber<UInt8>>(), "res"});
ColumnNumbers arguments{0};
for (auto _ : state)
{
for (auto & block : blocks)
function_ascii.executeImpl(block, arguments, 1);
}
}
CATCH
BENCHMARK_REGISTER_F(ASCIIBench, bench)->Iterations(10);

>>>>>>> b30c1f5090 (Improve the performance of `length` and `ascii` functions (#9345)):dbms/src/Functions/tests/bench_function_string.cpp
} // namespace tests
} // namespace DB
6 changes: 1 addition & 5 deletions dbms/src/Interpreters/SettingsCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,7 @@ struct SettingMaxThreads
is_auto = true;
}

static UInt64 getAutoValue()
{
static auto res = getNumberOfPhysicalCPUCores();
return res;
}
static UInt64 getAutoValue() { return getNumberOfLogicalCPUCores(); }

UInt64 get() const
{
Expand Down
17 changes: 5 additions & 12 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
#include <Storages/DeltaMerge/ReadThread/SegmentReader.h>
#include <Storages/FormatVersion.h>
#include <Storages/IManageableStorage.h>
#include <Storages/KVStore/ProxyStateMachine.h>
#include <Storages/Page/V3/Universal/UniversalPageStorage.h>
#include <Storages/PathCapacityMetrics.h>
#include <Storages/S3/FileCache.h>
Expand Down Expand Up @@ -1018,6 +1019,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
* settings, available functions, data types, aggregate functions, databases...
*/
global_context = Context::createGlobal();
global_context->setApplicationType(Context::ApplicationType::SERVER);
global_context->getSharedContextDisagg()->disaggregated_mode = disaggregated_mode;
global_context->getSharedContextDisagg()->use_autoscaler = use_autoscaler;
/// Initialize users config reloader.
auto users_config_reloader = UserConfig::parseSettings(config(), config_path, global_context, log);

Expand Down Expand Up @@ -1086,15 +1090,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
});

/// get CPU/memory/disk info of this server
diagnosticspb::ServerInfoRequest request;
diagnosticspb::ServerInfoResponse response;
request.set_tp(static_cast<diagnosticspb::ServerInfoType>(1));
std::string req = request.SerializeAsString();
ffi_get_server_info_from_proxy(reinterpret_cast<intptr_t>(&helper), strIntoView(&req), &response);
server_info.parseSysInfo(response);
setNumberOfLogicalCPUCores(server_info.cpu_info.logical_cores);
computeAndSetNumberOfPhysicalCPUCores(server_info.cpu_info.logical_cores, server_info.cpu_info.physical_cores);
LOG_INFO(log, "ServerInfo: {}", server_info.debugString());
getServerInfoFromProxy(log, server_info, &helper, settings);

grpc_log = Logger::get("grpc");
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
Expand All @@ -1108,9 +1104,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
// Reset the `tiflash_instance_wrap.tmt` before `global_context` get released, or it will be a dangling pointer
tiflash_instance_wrap.tmt = nullptr;
});
global_context->setApplicationType(Context::ApplicationType::SERVER);
global_context->getSharedContextDisagg()->disaggregated_mode = disaggregated_mode;
global_context->getSharedContextDisagg()->use_autoscaler = use_autoscaler;

/// Init File Provider
bool enable_encryption = false;
Expand Down
82 changes: 82 additions & 0 deletions dbms/src/Storages/KVStore/ProxyStateMachine.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Logger.h>
#include <Common/setThreadName.h>
#include <Core/TiFlashDisaggregatedMode.h>
#include <Interpreters/Settings.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Server/ServerInfo.h>
#include <Storages/FormatVersion.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/ProxyFFI.h>
#include <Storages/Transaction/TMTContext.h>

#include <boost/noncopyable.hpp>
#include <chrono>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

namespace DB
{
namespace FailPoints
{
extern const char force_set_proxy_state_machine_cpu_cores[];
} // namespace FailPoints

inline void getServerInfoFromProxy(
LoggerPtr log,
ServerInfo & server_info,
EngineStoreServerHelper * helper,
Settings & global_settings)
{
/// get CPU/memory/disk info of this server
diagnosticspb::ServerInfoRequest request;
diagnosticspb::ServerInfoResponse response;
request.set_tp(static_cast<diagnosticspb::ServerInfoType>(1));
std::string req = request.SerializeAsString();
#ifndef DBMS_PUBLIC_GTEST
// In tests, no proxy is provided, and Server is not linked to.
ffi_get_server_info_from_proxy(reinterpret_cast<intptr_t>(helper), strIntoView(&req), &response);
server_info.parseSysInfo(response);
LOG_INFO(log, "ServerInfo: {}", server_info.debugString());
#else
UNUSED(helper);
#endif
fiu_do_on(FailPoints::force_set_proxy_state_machine_cpu_cores, {
server_info.cpu_info.logical_cores = 12345;
}); // Mock a server_info
setNumberOfLogicalCPUCores(server_info.cpu_info.logical_cores);
computeAndSetNumberOfPhysicalCPUCores(server_info.cpu_info.logical_cores, server_info.cpu_info.physical_cores);

// If the max_threads in global_settings is "0"/"auto", it is set by
// `getNumberOfLogicalCPUCores` in `SettingMaxThreads::getAutoValue`.
// We should let it follow the cpu cores get from proxy.
if (global_settings.max_threads.is_auto && global_settings.max_threads.get() != getNumberOfLogicalCPUCores())
{
// now it should set the max_threads value according to the new logical cores
global_settings.max_threads.setAuto();
LOG_INFO(
log,
"Reset max_threads, max_threads={} logical_cores={}",
global_settings.max_threads.get(),
getNumberOfLogicalCPUCores());
}
}
} // namespace DB
Loading

0 comments on commit 8084e4d

Please sign in to comment.