Skip to content

Commit

Permalink
Set proxy's memory limit by TiFlash (#9753) and update tiflash proxy (#…
Browse files Browse the repository at this point in the history
…9853)

ref #4982, close #9745

Signed-off-by: Calvin Neo <[email protected]>
  • Loading branch information
CalvinNeo authored Feb 10, 2025
1 parent ea2b31f commit 7d751d6
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 56 deletions.
73 changes: 54 additions & 19 deletions dbms/src/Interpreters/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,29 @@ extern const int NO_ELEMENTS_IN_CONFIG;
/// Set the configuration by name.
void Settings::set(const String & name, const Field & value)
{
#define TRY_SET(TYPE, NAME, DEFAULT, DESCRIPTION) else if (name == #NAME) NAME.set(value);
#define TRY_SET(TYPE, NAME, DEFAULT, DESCRIPTION) else if (name == #NAME)(NAME).set(value);

if (false) {}
if (false) {} // NOLINT(readability-simplify-boolean-expr)
APPLY_FOR_SETTINGS(TRY_SET)
else { throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING); }
else
{
throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING);
}

#undef TRY_SET
}

/// Set the configuration by name. Read the binary serialized value from the buffer (for interserver interaction).
void Settings::set(const String & name, ReadBuffer & buf)
{
#define TRY_SET(TYPE, NAME, DEFAULT, DESCRIPTION) else if (name == #NAME) NAME.set(buf);
#define TRY_SET(TYPE, NAME, DEFAULT, DESCRIPTION) else if (name == #NAME)(NAME).set(buf);

if (false) {}
if (false) {} // NOLINT(readability-simplify-boolean-expr)
APPLY_FOR_SETTINGS(TRY_SET)
else { throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING); }
else
{
throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING);
}

#undef TRY_SET
}
Expand All @@ -53,9 +59,12 @@ void Settings::ignore(const String & name, ReadBuffer & buf)
{
#define TRY_IGNORE(TYPE, NAME, DEFAULT, DESCRIPTION) else if (name == #NAME) decltype(NAME)(DEFAULT).set(buf);

if (false) {}
if (false) {} // NOLINT(readability-simplify-boolean-expr)
APPLY_FOR_SETTINGS(TRY_IGNORE)
else { throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING); }
else
{
throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING);
}

#undef TRY_IGNORE
}
Expand All @@ -64,22 +73,28 @@ void Settings::ignore(const String & name, ReadBuffer & buf)
*/
void Settings::set(const String & name, const String & value)
{
#define TRY_SET(TYPE, NAME, DEFAULT, DESCRIPTION) else if (name == #NAME) NAME.set(value);
#define TRY_SET(TYPE, NAME, DEFAULT, DESCRIPTION) else if (name == #NAME)(NAME).set(value);

if (false) {}
if (false) {} // NOLINT(readability-simplify-boolean-expr)
APPLY_FOR_SETTINGS(TRY_SET)
else { throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING); }
else
{
throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING);
}

#undef TRY_SET
}

String Settings::get(const String & name) const
{
#define GET(TYPE, NAME, DEFAULT, DESCRIPTION) else if (name == #NAME) return NAME.toString();
#define GET(TYPE, NAME, DEFAULT, DESCRIPTION) else if (name == #NAME) return (NAME).toString();

if (false) {}
if (false) {} // NOLINT(readability-simplify-boolean-expr)
APPLY_FOR_SETTINGS(GET)
else { throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING); }
else
{
throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING);
}

#undef GET
}
Expand All @@ -89,13 +104,16 @@ bool Settings::tryGet(const String & name, String & value) const
#define TRY_GET(TYPE, NAME, DEFAULT, DESCRIPTION) \
else if (name == #NAME) \
{ \
value = NAME.toString(); \
value = (NAME).toString(); \
return true; \
}

if (false) {}
if (false) {} // NOLINT(readability-simplify-boolean-expr)
APPLY_FOR_SETTINGS(TRY_GET)
else { return false; }
else
{
return false;
}

#undef TRY_GET
}
Expand Down Expand Up @@ -166,10 +184,10 @@ void Settings::deserialize(ReadBuffer & buf)
void Settings::serialize(WriteBuffer & buf) const
{
#define WRITE(TYPE, NAME, DEFAULT, DESCRIPTION) \
if (NAME.changed) \
if ((NAME).changed) \
{ \
writeStringBinary(#NAME, buf); \
NAME.write(buf); \
(NAME).write(buf); \
}

APPLY_FOR_SETTINGS(WRITE)
Expand All @@ -180,4 +198,21 @@ void Settings::serialize(WriteBuffer & buf) const
#undef WRITE
}

String Settings::toString() const
{
FmtBuffer buf;
#define WRITE(TYPE, NAME, DEFAULT, DESCRIPTION) \
if ((NAME).changed) \
{ \
buf.fmtAppend("{}={}(default {}), ", #NAME, (NAME).toString(), #DEFAULT); \
}

APPLY_FOR_SETTINGS(WRITE)

#undef WRITE
if (buf.size() > 2)
buf.resize(buf.size() - 2);
return buf.toString();
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ struct Settings

/// Write changed settings to buffer. (For example, to be sent to remote server.)
void serialize(WriteBuffer & buf) const;

String toString() const;
};


Expand Down
128 changes: 92 additions & 36 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
#include <Poco/StringTokenizer.h>
#include <Poco/Timestamp.h>
#include <Poco/Util/HelpFormatter.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Server/BgStorageInit.h>
#include <Server/Bootstrap.h>
#include <Server/CertificateReloader.h>
Expand Down Expand Up @@ -284,31 +285,33 @@ struct TiFlashProxyConfig
args.push_back(iter->second.data());
}

explicit TiFlashProxyConfig(Poco::Util::LayeredConfiguration & config, bool has_s3_config)
// Try to parse start args from `config`.
// Return true if proxy need to be started, and `val_map` will be filled with the
// proxy start params.
// Return false if proxy is not need.
bool tryParseFromConfig(const Poco::Util::LayeredConfiguration & config, bool has_s3_config, const LoggerPtr & log)
{
auto disaggregated_mode = getDisaggregatedMode(config);

// tiflash_compute doesn't need proxy.
auto disaggregated_mode = getDisaggregatedMode(config);
if (disaggregated_mode == DisaggregatedMode::Compute && useAutoScaler(config))
{
LOG_INFO(
Logger::get(),
"TiFlash Proxy will not start because AutoScale Disaggregated Compute Mode is specified.");
return;
LOG_INFO(log, "TiFlash Proxy will not start because AutoScale Disaggregated Compute Mode is specified.");
return false;
}

Poco::Util::AbstractConfiguration::Keys keys;
config.keys("flash.proxy", keys);
if (!config.has("raft.pd_addr"))
{
LOG_WARNING(Logger::get(), "TiFlash Proxy will not start because `raft.pd_addr` is not configured.");
LOG_WARNING(log, "TiFlash Proxy will not start because `raft.pd_addr` is not configured.");
if (!keys.empty())
LOG_WARNING(Logger::get(), "`flash.proxy.*` is ignored because TiFlash Proxy will not start.");
LOG_WARNING(log, "`flash.proxy.*` is ignored because TiFlash Proxy will not start.");

return;
return false;
}

{
// config items start from `flash.proxy.`
std::unordered_map<std::string, std::string> args_map;
for (const auto & key : keys)
args_map[key] = config.getString("flash.proxy." + key);
Expand All @@ -327,14 +330,54 @@ struct TiFlashProxyConfig
for (auto && [k, v] : args_map)
val_map.emplace("--" + k, std::move(v));
}
return true;
}

TiFlashProxyConfig(
Poco::Util::LayeredConfiguration & config,
bool has_s3_config,
const StorageFormatVersion & format_version,
const Settings & settings,
const LoggerPtr & log)
{
is_proxy_runnable = tryParseFromConfig(config, has_s3_config, log);

args.push_back("TiFlash Proxy");
for (const auto & v : val_map)
{
args.push_back(v.first.data());
args.push_back(v.second.data());
}
is_proxy_runnable = true;

// Enable unips according to `format_version`
if (format_version.page == PageFormat::V4)
{
LOG_INFO(log, "Using UniPS for proxy");
addExtraArgs("unips-enabled", "1");
}

// Set the proxy's memory by size or ratio
std::visit(
[&](auto && arg) {
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, UInt64>)
{
if (arg != 0)
{
LOG_INFO(log, "Limit proxy's memory, size={}", arg);
addExtraArgs("memory-limit-size", std::to_string(arg));
}
}
else if constexpr (std::is_same_v<T, double>)
{
if (arg > 0 && arg <= 1.0)
{
LOG_INFO(log, "Limit proxy's memory, ratio={}", arg);
addExtraArgs("memory-limit-ratio", std::to_string(arg));
}
}
},
settings.max_memory_usage_for_all_queries.get());
}
};

Expand Down Expand Up @@ -518,7 +561,7 @@ struct RaftStoreProxyRunner : boost::noncopyable
pthread_attr_t attribute;
pthread_attr_init(&attribute);
pthread_attr_setstacksize(&attribute, parms.stack_size);
LOG_INFO(log, "start raft store proxy");
LOG_INFO(log, "Start raft store proxy. Args: {}", parms.conf.args);
pthread_create(&thread, &attribute, runRaftStoreProxyFFI, &parms);
pthread_attr_destroy(&attribute);
}
Expand Down Expand Up @@ -1023,20 +1066,39 @@ int Server::main(const std::vector<std::string> & /*args*/)
// Set whether to use safe point v2.
PDClientHelper::enable_safepoint_v2 = config().getBool("enable_safe_point_v2", false);

/** Context contains all that query execution is dependent:
* settings, available functions, data types, aggregate functions, databases...
*/
global_context = Context::createGlobal();
/// Initialize users config reloader.
auto users_config_reloader = UserConfig::parseSettings(config(), config_path, global_context, log);

/// Load global settings from default_profile and system_profile.
/// It internally depends on UserConfig::parseSettings.
// TODO: Parse the settings from config file at the program beginning
global_context->setDefaultProfiles(config());
LOG_INFO(
log,
"Loaded global settings from default_profile and system_profile, changed configs: {{{}}}",
global_context->getSettingsRef().toString());
Settings & settings = global_context->getSettingsRef();

// Init Proxy's config
TiFlashProxyConfig proxy_conf(config(), storage_config.s3_config.isS3Enabled());
TiFlashProxyConfig proxy_conf( //
config(),
storage_config.s3_config.isS3Enabled(),
STORAGE_FORMAT_CURRENT,
settings,
log);
EngineStoreServerWrap tiflash_instance_wrap{};
auto helper = GetEngineStoreServerHelper(&tiflash_instance_wrap);

if (STORAGE_FORMAT_CURRENT.page == PageFormat::V4)
{
LOG_INFO(log, "Using UniPS for proxy");
proxy_conf.addExtraArgs("unips-enabled", "1");
}
else
{
LOG_INFO(log, "UniPS is not enabled for proxy, page_version={}", STORAGE_FORMAT_CURRENT.page);
}
#ifdef USE_JEMALLOC
LOG_INFO(log, "Using Jemalloc for TiFlash");
#else
LOG_INFO(log, "Not using Jemalloc for TiFlash");
#endif


RaftStoreProxyRunner proxy_runner(RaftStoreProxyRunner::RunRaftStoreProxyParms{&helper, proxy_conf}, log);

Expand Down Expand Up @@ -1090,10 +1152,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
gpr_set_log_function(&printGRPCLog);

/** Context contains all that query execution is dependent:
* settings, available functions, data types, aggregate functions, databases...
*/
global_context = Context::createGlobal();
SCOPE_EXIT({
if (!proxy_conf.is_proxy_runnable)
return;

LOG_INFO(log, "Unlink tiflash_instance_wrap.tmt");
// Reset the `tiflash_instance_wrap.tmt` before `global_context` get released, or it will be a dangling pointer
tiflash_instance_wrap.tmt = nullptr;
});
global_context->setApplicationType(Context::ApplicationType::SERVER);
global_context->getSharedContextDisagg()->disaggregated_mode = disaggregated_mode;
global_context->getSharedContextDisagg()->use_autoscaler = use_autoscaler;
Expand Down Expand Up @@ -1277,21 +1343,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Init TiFlash metrics.
global_context->initializeTiFlashMetrics();

/// Initialize users config reloader.
auto users_config_reloader = UserConfig::parseSettings(config(), config_path, global_context, log);

/// Load global settings from default_profile and system_profile.
/// It internally depends on UserConfig::parseSettings.
// TODO: Parse the settings from config file at the program beginning
global_context->setDefaultProfiles(config());
LOG_INFO(log, "Loaded global settings from default_profile and system_profile.");

///
/// The config value in global settings can only be used from here because we just loaded it from config file.
///

/// Initialize the background & blockable background thread pool.
Settings & settings = global_context->getSettingsRef();
LOG_INFO(log, "Background & Blockable Background pool size: {}", settings.background_pool_size);
auto & bg_pool = global_context->initializeBackgroundPool(settings.background_pool_size);
auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size);
Expand Down

0 comments on commit 7d751d6

Please sign in to comment.