Skip to content

Commit

Permalink
Set proxy's memory limit by TiFlash (pingcap#9753)
Browse files Browse the repository at this point in the history
close pingcap#9745

Signed-off-by: Calvin Neo <[email protected]>
Signed-off-by: JaySon-Huang <[email protected]>

Co-authored-by: JaySon <[email protected]>
  • Loading branch information
CalvinNeo and JaySon-Huang committed Feb 17, 2025
1 parent ca5927a commit e31fa88
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 23 deletions.
205 changes: 183 additions & 22 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
#include <Poco/StringTokenizer.h>
#include <Poco/Timestamp.h>
#include <Server/HTTPHandlerFactory.h>
#include <Poco/Util/HelpFormatter.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Server/BgStorageInit.h>
#include <Server/Bootstrap.h>
#include <Server/CertificateReloader.h>
#include <Server/MetricsPrometheus.h>
#include <Server/MetricsTransmitter.h>
#include <Server/RaftConfigParser.h>
Expand Down Expand Up @@ -246,12 +251,40 @@ struct TiFlashProxyConfig

explicit TiFlashProxyConfig(Poco::Util::LayeredConfiguration & config)
{
if (!config.has(config_prefix))
return;
std::string key = "--" + k;
val_map[key] = v;
auto iter = val_map.find(key);
args.push_back(iter->first.data());
args.push_back(iter->second.data());
}

// Try to parse start args from `config`.
// Return true if proxy need to be started, and `val_map` will be filled with the
// proxy start params.
// Return false if proxy is not need.
bool tryParseFromConfig(const Poco::Util::LayeredConfiguration & config, bool has_s3_config, const LoggerPtr & log)
{
// tiflash_compute doesn't need proxy.
auto disaggregated_mode = getDisaggregatedMode(config);
if (disaggregated_mode == DisaggregatedMode::Compute && useAutoScaler(config))
{
LOG_INFO(log, "TiFlash Proxy will not start because AutoScale Disaggregated Compute Mode is specified.");
return false;
}

Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
config.keys("flash.proxy", keys);
if (!config.has("raft.pd_addr"))
{
LOG_WARNING(log, "TiFlash Proxy will not start because `raft.pd_addr` is not configured.");
if (!keys.empty())
LOG_WARNING(log, "`flash.proxy.*` is ignored because TiFlash Proxy will not start.");

return false;
}

{
// config items start from `flash.proxy.`
std::unordered_map<std::string, std::string> args_map;
for (const auto & key : keys)
{
Expand All @@ -272,14 +305,54 @@ struct TiFlashProxyConfig
val_map.emplace("--" + k, std::move(v));
}
}
return true;
}

TiFlashProxyConfig(
Poco::Util::LayeredConfiguration & config,
bool has_s3_config,
const StorageFormatVersion & format_version,
const Settings & settings,
const LoggerPtr & log)
{
is_proxy_runnable = tryParseFromConfig(config, has_s3_config, log);

args.push_back("TiFlash Proxy");
for (const auto & v : val_map)
{
args.push_back(v.first.data());
args.push_back(v.second.data());
}
is_proxy_runnable = true;

// Enable unips according to `format_version`
if (format_version.page == PageFormat::V4)
{
LOG_INFO(log, "Using UniPS for proxy");
addExtraArgs("unips-enabled", "1");
}

// Set the proxy's memory by size or ratio
std::visit(
[&](auto && arg) {
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, UInt64>)
{
if (arg != 0)
{
LOG_INFO(log, "Limit proxy's memory, size={}", arg);
addExtraArgs("memory-limit-size", std::to_string(arg));
}
}
else if constexpr (std::is_same_v<T, double>)
{
if (arg > 0 && arg <= 1.0)
{
LOG_INFO(log, "Limit proxy's memory, ratio={}", arg);
addExtraArgs("memory-limit-ratio", std::to_string(arg));
}
}
},
settings.max_memory_usage_for_all_queries.get());
}
};

Expand Down Expand Up @@ -453,7 +526,7 @@ struct RaftStoreProxyRunner : boost::noncopyable
pthread_attr_t attribute;
pthread_attr_init(&attribute);
pthread_attr_setstacksize(&attribute, parms.stack_size);
LOG_INFO(log, "start raft store proxy");
LOG_INFO(log, "Start raft store proxy. Args: {}", parms.conf.args);
pthread_create(&thread, &attribute, runRaftStoreProxyFFI, &parms);
pthread_attr_destroy(&attribute);
}
Expand Down Expand Up @@ -835,10 +908,104 @@ int Server::main(const std::vector<std::string> & /*args*/)

TiFlashErrorRegistry::instance(); // This invocation is for initializing

TiFlashProxyConfig proxy_conf(config());
DM::ScanContext::initCurrentInstanceId(config(), log);

const auto disaggregated_mode = getDisaggregatedMode(config());
const auto use_autoscaler = useAutoScaler(config());

// Some Storage's config is necessary for Proxy
TiFlashStorageConfig storage_config;
// Deprecated settings.
// `global_capacity_quota` will be ignored if `storage_config.main_capacity_quota` is not empty.
// "0" by default, means no quota, the actual disk capacity is used.
size_t global_capacity_quota = 0;
std::tie(global_capacity_quota, storage_config) = TiFlashStorageConfig::parseSettings(config(), log);
if (!storage_config.s3_config.bucket.empty())
{
storage_config.s3_config.enable(/*check_requirements*/ true, log);
}
else if (disaggregated_mode == DisaggregatedMode::Compute && use_autoscaler)
{
// compute node with auto scaler, the requirements will be initted later.
storage_config.s3_config.enable(/*check_requirements*/ false, log);
}

if (storage_config.format_version != 0)
{
if (storage_config.s3_config.isS3Enabled() && !isStorageFormatForDisagg(storage_config.format_version))
{
auto message = fmt::format(
"'storage.format_version' must be set to {} when S3 is enabled!",
getStorageFormatsForDisagg());
LOG_ERROR(log, message);
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, message);
}
setStorageFormat(storage_config.format_version);
LOG_INFO(log, "Using format_version={} (explicit storage format detected).", STORAGE_FORMAT_CURRENT.identifier);
}
else
{
if (storage_config.s3_config.isS3Enabled())
{
// If the user does not explicitly set format_version in the config file but
// enables S3, then we set up a proper format version to support S3.
setStorageFormat(DEFAULT_STORAGE_FORMAT_FOR_DISAGG.identifier);
LOG_INFO(log, "Using format_version={} (infer by S3 is enabled).", STORAGE_FORMAT_CURRENT.identifier);
}
else
{
// Use the default settings
LOG_INFO(log, "Using format_version={} (default settings).", STORAGE_FORMAT_CURRENT.identifier);
}
}

// sanitize check for disagg mode
if (storage_config.s3_config.isS3Enabled())
{
if (auto disaggregated_mode = getDisaggregatedMode(config()); disaggregated_mode == DisaggregatedMode::None)
{
const String message = "'flash.disaggregated_mode' must be set when S3 is enabled!";
LOG_ERROR(log, message);
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, message);
}
}

// Set whether to use safe point v2.
PDClientHelper::enable_safepoint_v2 = config().getBool("enable_safe_point_v2", false);

/** Context contains all that query execution is dependent:
* settings, available functions, data types, aggregate functions, databases...
*/
global_context = Context::createGlobal();
/// Initialize users config reloader.
auto users_config_reloader = UserConfig::parseSettings(config(), config_path, global_context, log);

/// Load global settings from default_profile and system_profile.
/// It internally depends on UserConfig::parseSettings.
// TODO: Parse the settings from config file at the program beginning
global_context->setDefaultProfiles(config());
LOG_INFO(
log,
"Loaded global settings from default_profile and system_profile, changed configs: {{{}}}",
global_context->getSettingsRef().toString());
Settings & settings = global_context->getSettingsRef();

// Init Proxy's config
TiFlashProxyConfig proxy_conf( //
config(),
storage_config.s3_config.isS3Enabled(),
STORAGE_FORMAT_CURRENT,
settings,
log);
EngineStoreServerWrap tiflash_instance_wrap{};
auto helper = GetEngineStoreServerHelper(
&tiflash_instance_wrap);
auto helper = GetEngineStoreServerHelper(&tiflash_instance_wrap);

#ifdef USE_JEMALLOC
LOG_INFO(log, "Using Jemalloc for TiFlash");
#else
LOG_INFO(log, "Not using Jemalloc for TiFlash");
#endif


RaftStoreProxyRunner proxy_runner(RaftStoreProxyRunner::RunRaftStoreProxyParms{&helper, proxy_conf}, log);

Expand Down Expand Up @@ -898,11 +1065,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
gpr_set_log_function(&printGRPCLog);

/** Context contains all that query execution is dependent:
* settings, available functions, data types, aggregate functions, databases...
*/
global_context = std::make_unique<Context>(Context::createGlobal());
global_context->setGlobalContext(*global_context);
SCOPE_EXIT({
if (!proxy_conf.is_proxy_runnable)
return;

LOG_INFO(log, "Unlink tiflash_instance_wrap.tmt");
// Reset the `tiflash_instance_wrap.tmt` before `global_context` get released, or it will be a dangling pointer
tiflash_instance_wrap.tmt = nullptr;
});
global_context->setApplicationType(Context::ApplicationType::SERVER);

/// Init File Provider
Expand Down Expand Up @@ -1071,20 +1241,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Init TiFlash metrics.
global_context->initializeTiFlashMetrics();

/// Initialize users config reloader.
auto users_config_reloader = UserConfig::parseSettings(config(), config_path, global_context, log);

/// Load global settings from default_profile and system_profile.
/// It internally depends on UserConfig::parseSettings.
global_context->setDefaultProfiles(config());
LOG_INFO(log, "Loaded global settings from default_profile and system_profile.");

///
/// The config value in global settings can only be used from here because we just loaded it from config file.
///

/// Initialize the background & blockable background thread pool.
Settings & settings = global_context->getSettingsRef();
LOG_INFO(log, "Background & Blockable Background pool size: {}", settings.background_pool_size);
auto & bg_pool = global_context->initializeBackgroundPool(settings.background_pool_size);
auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size);
Expand Down

0 comments on commit e31fa88

Please sign in to comment.