Skip to content

Commit 5fb11b2

Browse files
committed
add file_cache_clear api
1 parent 0bff31b commit 5fb11b2

File tree

2 files changed

+155
-48
lines changed

2 files changed

+155
-48
lines changed

be/src/io/tools/file_cache_microbench.cpp

+154-48
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
#include "common/config.h"
4646
#include "common/status.h"
4747
#include "gflags/gflags.h"
48+
#include "io/cache/block_file_cache.h"
49+
#include "io/cache/block_file_cache_factory.h"
4850
#include "io/cache/cached_remote_file_reader.h"
4951
#include "io/file_factory.h"
5052
#include "io/fs/s3_file_system.h"
@@ -56,6 +58,13 @@
5658
#include "util/bvar_helper.h"
5759
#include "util/defer_op.h"
5860
#include "util/stopwatch.hpp"
61+
#include "util/string_util.h"
62+
63+
using doris::io::FileCacheFactory;
64+
using doris::io::BlockFileCache;
65+
66+
bvar::LatencyRecorder microbench_write_latency("file_cache_microbench_append");
67+
bvar::LatencyRecorder microbench_read_latency("file_cache_microbench_read_at");
5968

6069
const std::string HIDDEN_PREFIX = "test_file_cache_microbench/";
6170
// Just 10^9.
@@ -172,7 +181,10 @@ class MircobenchS3FileWriter {
172181
_rate_limiter->add(1);
173182
}
174183
using namespace doris;
175-
SCOPED_BVAR_LATENCY(*write_bvar)
184+
if (write_bvar) {
185+
SCOPED_BVAR_LATENCY(*write_bvar)
186+
}
187+
SCOPED_BVAR_LATENCY(microbench_write_latency);
176188
return _writer.appendv(slices, slices_size);
177189
}
178190

@@ -196,7 +208,10 @@ class MicrobenchFileReader {
196208
_rate_limiter->add(1); // 消耗一个令牌
197209
}
198210
using namespace doris;
199-
SCOPED_BVAR_LATENCY(*read_bvar)
211+
if (read_bvar) {
212+
SCOPED_BVAR_LATENCY(*read_bvar)
213+
}
214+
SCOPED_BVAR_LATENCY(microbench_write_latency);
200215
return _base_reader->read_at(offset, result, bytes_read, io_ctx);
201216
}
202217

@@ -327,6 +342,14 @@ std::string get_usage(const std::string& progname) {
327342
GET /get_help
328343
Get this help information.
329344
345+
GET /file_cache_clear
346+
Clear the file cache with the following query parameters:
347+
{
348+
"sync": <true|false>, // Whether to synchronize the cache clear operation
349+
"segment_path": "<path>" // Optional path of the segment to clear from the cache
350+
}
351+
If "segment_path" is not provided, all caches will be cleared based on the "sync" parameter.
352+
330353
Notes:
331354
- Ensure that the S3 configuration is set correctly in the environment.
332355
- The program will create and read files in the specified S3 bucket.
@@ -352,6 +375,7 @@ struct JobConfig {
352375
int64_t read_length_left;
353376
int64_t read_length_right;
354377
bool write_file_cache = true;
378+
bool bvar_enable = false;
355379

356380
// 从JSON解析配置
357381
static JobConfig from_json(const std::string& json_str) {
@@ -364,67 +388,124 @@ struct JobConfig {
364388
throw std::runtime_error("JSON parse error json args=" + json_str);
365389
}
366390
validate(d);
367-
if (d.HasMember("write_file_cache") && d["write_file_cache"].GetBool() == false) {
368-
config.write_file_cache = false;
391+
392+
// 检查并设置 write_file_cache
393+
if (d.HasMember("write_file_cache") && d["write_file_cache"].IsBool()) {
394+
config.write_file_cache = d["write_file_cache"].GetBool();
395+
} else {
396+
config.write_file_cache = true; // 默认值
397+
}
398+
399+
// 检查并设置 bvar_enable
400+
if (d.HasMember("bvar_enable") && d["bvar_enable"].IsBool()) {
401+
config.bvar_enable = d["bvar_enable"].GetBool();
402+
} else {
403+
config.bvar_enable = false; // 默认值
404+
}
405+
406+
// 检查并设置 num_files
407+
if (d.HasMember("num_files") && d["num_files"].IsInt()) {
408+
config.num_files = d["num_files"].GetInt();
409+
} else {
410+
config.num_files = 1; // 默认值
411+
}
412+
413+
// 检查并设置 size_bytes_perfile
414+
if (d.HasMember("size_bytes_perfile") && d["size_bytes_perfile"].IsInt64()) {
415+
config.size_bytes_perfile = d["size_bytes_perfile"].GetInt64();
416+
if (config.size_bytes_perfile <= 0) {
417+
throw std::runtime_error("size_bytes_perfile must be positive");
418+
}
419+
} else {
420+
config.size_bytes_perfile = 1024 * 1024; // 默认值
421+
}
422+
423+
// 检查并设置 write_iops
424+
if (d.HasMember("write_iops") && d["write_iops"].IsInt()) {
425+
config.write_iops = d["write_iops"].GetInt();
426+
}
427+
428+
// 检查并设置 read_iops
429+
if (d.HasMember("read_iops") && d["read_iops"].IsInt()) {
430+
config.read_iops = d["read_iops"].GetInt();
369431
}
370-
config.num_files = d["num_files"].GetInt();
371-
if (config.num_files == 0) {
372-
config.num_files = 1;
432+
433+
// 检查并设置 num_threads
434+
if (d.HasMember("num_threads") && d["num_threads"].IsInt()) {
435+
config.num_threads = d["num_threads"].GetInt();
436+
if (config.num_threads <= 0 || config.num_threads > 10000) {
437+
throw std::runtime_error("num_threads must be between 1 and 10000");
438+
}
439+
} else {
440+
config.num_threads = 200; // 默认值
373441
}
374-
config.size_bytes_perfile = d["size_bytes_perfile"].GetInt64();
375-
config.write_iops = d["write_iops"].GetInt();
376-
config.read_iops = d["read_iops"].GetInt();
377-
config.num_threads = d["num_threads"].GetInt();
378-
if (config.num_threads == 0) {
379-
config.num_threads = 200;
442+
443+
// 检查并设置 file_prefix
444+
if (d.HasMember("file_prefix") && d["file_prefix"].IsString()) {
445+
config.file_prefix = d["file_prefix"].GetString();
446+
} else {
447+
throw std::runtime_error("file_prefix is required and must be a string");
380448
}
381-
config.file_prefix = d["file_prefix"].GetString();
382449

383-
if (!d.HasMember("cache_type")) {
384-
config.cache_type = "NORMAL";
450+
// 检查并设置 cache_type
451+
if (d.HasMember("cache_type") && d["cache_type"].IsString()) {
452+
config.cache_type = d["cache_type"].GetString();
385453
} else {
386-
config.cache_type = d["cache_type"].IsString() ? d["cache_type"].GetString() : "NORMAL";
454+
config.cache_type = "NORMAL"; // 默认值
387455
}
388456

457+
// 检查 TTL 类型的缓存
389458
if (config.cache_type == "TTL") {
390-
if (!d.HasMember("expiration")) {
391-
throw std::runtime_error("expiration is required when cache type eq TTL");
459+
if (!d.HasMember("expiration") || !d["expiration"].IsInt64()) {
460+
throw std::runtime_error(
461+
"expiration is required and must be an integer when cache type is TTL");
392462
}
393463
config.expiration = d["expiration"].GetInt64();
394464
if (config.expiration <= 0) {
395-
throw std::runtime_error("expiration <= 0 when cache type eq TTL");
465+
throw std::runtime_error("expiration must be positive when cache type is TTL");
396466
}
397467
}
398468

399-
config.repeat = d["repeat"].GetInt64();
469+
// 检查并设置 repeat
470+
if (d.HasMember("repeat") && d["repeat"].IsInt64()) {
471+
config.repeat = d["repeat"].GetInt64();
472+
}
400473

401-
config.write_batch_size = d["write_batch_size"].GetInt64();
402-
if (config.write_batch_size == 0) {
403-
config.write_batch_size = doris::config::s3_write_buffer_size;
474+
// 检查并设置 write_batch_size
475+
if (d.HasMember("write_batch_size") && d["write_batch_size"].IsInt64()) {
476+
config.write_batch_size = d["write_batch_size"].GetInt64();
477+
} else {
478+
config.write_batch_size = doris::config::s3_write_buffer_size; // 默认值
404479
}
405480

481+
// 检查并设置 read_offset
406482
if (config.read_iops > 0) {
407-
// such as [0, 100)
408-
const rapidjson::Value& read_offset_array = d["read_offset"];
409-
if (!read_offset_array.IsArray() || read_offset_array.Size() != 2) {
483+
if (d.HasMember("read_offset") && d["read_offset"].IsArray() &&
484+
d["read_offset"].Size() == 2) {
485+
const rapidjson::Value& read_offset_array = d["read_offset"];
486+
config.read_offset_left = read_offset_array[0].GetInt64();
487+
config.read_offset_right = read_offset_array[1].GetInt64();
488+
if (config.read_offset_left >= config.read_offset_right) {
489+
throw std::runtime_error(
490+
"read_offset_left must be less than read_offset_right");
491+
}
492+
} else {
410493
throw std::runtime_error("Invalid read_offset format, expected array of size 2");
411494
}
412-
config.read_offset_left = read_offset_array[0].GetInt64();
413-
config.read_offset_right = read_offset_array[1].GetInt64();
414-
if (config.read_offset_left >= config.read_offset_right) {
415-
throw std::runtime_error("read_offset_left must be less than read_offset_right");
416-
}
417495

418-
// such as [100, 500) or [-200, -10)
419-
const rapidjson::Value& read_length_array = d["read_length"];
420-
if (!read_length_array.IsArray() || read_length_array.Size() != 2) {
496+
// 检查并设置 read_length
497+
if (d.HasMember("read_length") && d["read_length"].IsArray() &&
498+
d["read_length"].Size() == 2) {
499+
const rapidjson::Value& read_length_array = d["read_length"];
500+
config.read_length_left = read_length_array[0].GetInt64();
501+
config.read_length_right = read_length_array[1].GetInt64();
502+
if (config.read_length_left >= config.read_length_right) {
503+
throw std::runtime_error(
504+
"read_length_left must be less than read_length_right");
505+
}
506+
} else {
421507
throw std::runtime_error("Invalid read_length format, expected array of size 2");
422508
}
423-
config.read_length_left = read_length_array[0].GetInt64();
424-
config.read_length_right = read_length_array[1].GetInt64();
425-
if (config.read_length_left >= config.read_length_right) {
426-
throw std::runtime_error("read_length_left must be less than read_length_right");
427-
}
428509
}
429510

430511
// 添加更多参数验证
@@ -535,16 +616,20 @@ struct Job {
535616
private:
536617
void init_latency_recorders(const std::string& id) {
537618
if (config.write_iops > 0) {
538-
write_latency =
539-
std::make_shared<bvar::LatencyRecorder>("file_cache_microbench_append_" + id);
540-
write_rate_limit_s = std::make_shared<bvar::Adder<int64_t>>(
541-
"file_cache_microbench_append_rate_limit_ns_" + id);
619+
if (config.bvar_enable) {
620+
write_latency = std::make_shared<bvar::LatencyRecorder>(
621+
"file_cache_microbench_append_" + id);
622+
write_rate_limit_s = std::make_shared<bvar::Adder<int64_t>>(
623+
"file_cache_microbench_append_rate_limit_ns_" + id);
624+
}
542625
}
543626
if (config.read_iops > 0) {
544-
read_latency =
545-
std::make_shared<bvar::LatencyRecorder>("file_cache_microbench_read_at_" + id);
546-
read_rate_limit_s = std::make_shared<bvar::Adder<int64_t>>(
547-
"file_cache_microbench_read_rate_limit_ns_" + id);
627+
if (config.bvar_enable) {
628+
read_latency = std::make_shared<bvar::LatencyRecorder>(
629+
"file_cache_microbench_read_at_" + id);
630+
read_rate_limit_s = std::make_shared<bvar::Adder<int64_t>>(
631+
"file_cache_microbench_read_rate_limit_ns_" + id);
632+
}
548633
}
549634
}
550635

@@ -1266,6 +1351,27 @@ class MicrobenchServiceImpl : public microbench::MicrobenchService {
12661351
cntl->response_attachment().append(help_info);
12671352
}
12681353

1354+
void file_cache_clear(google::protobuf::RpcController* cntl_base,
1355+
const microbench::HttpRequest* request,
1356+
microbench::HttpResponse* response, google::protobuf::Closure* done) {
1357+
brpc::ClosureGuard done_guard(done);
1358+
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
1359+
const std::string* sync_str = cntl->http_request().uri().GetQuery("sync");
1360+
const std::string* segment_path = cntl->http_request().uri().GetQuery("segment_path");
1361+
1362+
LOG(INFO) << "Received file cache clear request, sync=" << (sync_str ? *sync_str : "")
1363+
<< ", segment_path=" << (segment_path ? *segment_path : "");
1364+
if (segment_path == nullptr) {
1365+
FileCacheFactory::instance()->clear_file_caches(doris::to_lower(*sync_str) == "true");
1366+
} else {
1367+
doris::io::UInt128Wrapper hash = doris::io::BlockFileCache::hash(*segment_path);
1368+
doris::io::BlockFileCache* cache = FileCacheFactory::instance()->get_by_path(hash);
1369+
cache->remove_if_cached(hash);
1370+
}
1371+
1372+
cntl->response_attachment().append(R"({"status": "OK"})");
1373+
}
1374+
12691375
private:
12701376
std::string get_status_string(JobStatus status) {
12711377
switch (status) {

be/src/io/tools/proto/microbench.proto

+1
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ service MicrobenchService {
3131
rpc list_jobs(HttpRequest) returns (HttpResponse);
3232
rpc cancel_job(HttpRequest) returns (HttpResponse);
3333
rpc get_help(HttpRequest) returns (HttpResponse);
34+
rpc file_cache_clear(HttpRequest) returns (HttpResponse);
3435
};

0 commit comments

Comments
 (0)