Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,8 @@ DEFINE_mInt64(file_cache_background_lru_dump_update_cnt_threshold, "1000");
DEFINE_mInt64(file_cache_background_lru_dump_tail_record_num, "5000000");
DEFINE_mInt64(file_cache_background_lru_log_replay_interval_ms, "1000");
DEFINE_mBool(enable_evaluate_shadow_queue_diff, "false");

DEFINE_Bool(enable_normal_queue_cold_hot_separation, "true");
DEFINE_Int32(normal_queue_cold_percent, "20");
DEFINE_Int32(file_cache_downloader_thread_num_min, "32");
DEFINE_Int32(file_cache_downloader_thread_num_max, "32");

Expand Down
3 changes: 2 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,8 @@ DECLARE_mInt64(file_cache_background_lru_dump_update_cnt_threshold);
DECLARE_mInt64(file_cache_background_lru_dump_tail_record_num);
DECLARE_mInt64(file_cache_background_lru_log_replay_interval_ms);
DECLARE_mBool(enable_evaluate_shadow_queue_diff);

DECLARE_Bool(enable_normal_queue_cold_hot_separation);
DECLARE_Int32(normal_queue_cold_percent);
// inverted index searcher cache
// cache entry stay time after lookup
DECLARE_mInt32(index_cache_entry_stay_time_after_lookup_s);
Expand Down
219 changes: 199 additions & 20 deletions be/src/io/cache/block_file_cache.cpp

Large diffs are not rendered by default.

18 changes: 13 additions & 5 deletions be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,16 @@ class BlockFileCache {
size_t offset, size_t size, FileBlock::State state,
std::lock_guard<std::mutex>& cache_lock);

virtual FileBlockCell* add_cell_directly(const UInt128Wrapper& hash,
const CacheContext& context, size_t offset,
size_t size, FileBlock::State state,
std::lock_guard<std::mutex>& cache_lock);

Status initialize_unlocked(std::lock_guard<std::mutex>& cache_lock);

void update_block_lru(FileBlockSPtr block, std::lock_guard<std::mutex>& cache_lock);

void use_cell(const FileBlockCell& cell, FileBlocks* result, bool not_need_move,
void use_cell(FileBlockCell& cell, FileBlocks* result, bool not_need_move,
std::lock_guard<std::mutex>& cache_lock);

bool try_reserve_for_lru(const UInt128Wrapper& hash, QueryFileCacheContextPtr query_context,
Expand Down Expand Up @@ -500,6 +505,7 @@ class BlockFileCache {
LRUQueue _normal_queue;
LRUQueue _disposable_queue;
LRUQueue _ttl_queue;
LRUQueue _cold_normal_queue;

// keys for async remove
RecycleFileCacheKeys _recycle_keys;
Expand All @@ -515,17 +521,19 @@ class BlockFileCache {
std::shared_ptr<bvar::Status<size_t>> _cur_ttl_cache_lru_queue_element_count_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_normal_queue_element_count_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_normal_queue_cache_size_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_cold_normal_queue_element_count_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_cold_normal_queue_cache_size_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_index_queue_element_count_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_index_queue_cache_size_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_disposable_queue_element_count_metrics;
std::shared_ptr<bvar::Status<size_t>> _cur_disposable_queue_cache_size_metrics;
std::array<std::shared_ptr<bvar::Adder<size_t>>, 4> _queue_evict_size_metrics;
std::array<std::shared_ptr<bvar::Adder<size_t>>, 5> _queue_evict_size_metrics;
std::shared_ptr<bvar::Adder<size_t>> _total_evict_size_metrics;
std::shared_ptr<bvar::Adder<size_t>> _gc_evict_bytes_metrics;
std::shared_ptr<bvar::Adder<size_t>> _gc_evict_count_metrics;
std::shared_ptr<bvar::Adder<size_t>> _evict_by_time_metrics_matrix[4][4];
std::shared_ptr<bvar::Adder<size_t>> _evict_by_size_metrics_matrix[4][4];
std::shared_ptr<bvar::Adder<size_t>> _evict_by_self_lru_metrics_matrix[4];
std::shared_ptr<bvar::Adder<size_t>> _evict_by_time_metrics_matrix[5][5];
std::shared_ptr<bvar::Adder<size_t>> _evict_by_size_metrics_matrix[5][5];
std::shared_ptr<bvar::Adder<size_t>> _evict_by_self_lru_metrics_matrix[5];
std::shared_ptr<bvar::Adder<size_t>> _evict_by_try_release;

std::shared_ptr<bvar::Window<bvar::Adder<size_t>>> _num_hit_blocks_5m;
Expand Down
7 changes: 5 additions & 2 deletions be/src/io/cache/cache_lru_dumper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,13 +481,16 @@ void CacheLRUDumper::restore_queue(LRUQueue& queue, const std::string& queue_nam
ctx.cache_type = FileCacheType::NORMAL;
} else if (queue_name == "disposable") {
ctx.cache_type = FileCacheType::DISPOSABLE;
} else if (queue_name == "cold_normal") {
ctx.cache_type = FileCacheType::COLD_NORMAL;
} else {
LOG_WARNING("unknown queue type for lru restore, skip");
DCHECK(false);
return;
}
// TODO(zhengyu): we don't use stats yet, see if this will cause any problem
_mgr->add_cell(hash, ctx, offset, size, FileBlock::State::DOWNLOADED, cache_lock);
_mgr->add_cell_directly(hash, ctx, offset, size, FileBlock::State::DOWNLOADED,
cache_lock);
}
in.close();
} else {
Expand All @@ -497,7 +500,7 @@ void CacheLRUDumper::restore_queue(LRUQueue& queue, const std::string& queue_nam
};

void CacheLRUDumper::remove_lru_dump_files() {
std::vector<std::string> queue_names = {"disposable", "index", "normal", "ttl"};
std::vector<std::string> queue_names = {"disposable", "index", "normal", "ttl", "cold_normal"};
for (const auto& queue_name : queue_names) {
std::string filename =
fmt::format("{}/lru_dump_{}.tail", _mgr->_cache_base_path, queue_name);
Expand Down
5 changes: 5 additions & 0 deletions be/src/io/cache/file_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ uint64_t FileBlock::get_caller_id() {
return id;
}

void FileBlock::set_cache_type(FileCacheType new_type) {
std::lock_guard block_lock(_mutex);
_key.meta.type = new_type;
}

uint64_t FileBlock::get_or_set_downloader() {
std::lock_guard block_lock(_mutex);

Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/file_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class FileBlock {

FileCacheType cache_type() const { return _key.meta.type; }

void set_cache_type(FileCacheType new_type);

static uint64_t get_caller_id();

std::string get_info_for_log() const;
Expand Down
23 changes: 22 additions & 1 deletion be/src/io/cache/file_cache_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ std::string cache_type_to_surfix(FileCacheType type) {
return "";
case FileCacheType::TTL:
return "_ttl";
case FileCacheType::COLD_NORMAL:
return "";
}
return "";
}
Expand All @@ -61,6 +63,8 @@ FileCacheType string_to_cache_type(const std::string& str) {
return FileCacheType::DISPOSABLE;
} else if (str == "ttl") {
return FileCacheType::TTL;
} else if (str == "cold_normal") {
return FileCacheType::COLD_NORMAL;
}
DCHECK(false) << "The string is " << str;
return FileCacheType::NORMAL;
Expand All @@ -75,6 +79,8 @@ std::string cache_type_to_string(FileCacheType type) {
return "normal";
case FileCacheType::TTL:
return "ttl";
case FileCacheType::COLD_NORMAL:
return "cold_normal";
}
DCHECK(false) << "unknown type: " << type;
return "normal";
Expand All @@ -90,7 +96,9 @@ std::string FileCacheSettings::to_string() const {
<< ", index_queue_elements: " << index_queue_elements
<< ", ttl_queue_size: " << ttl_queue_size << ", ttl_queue_elements: " << ttl_queue_elements
<< ", query_queue_size: " << query_queue_size
<< ", query_queue_elements: " << query_queue_elements << ", storage: " << storage;
<< ", query_queue_elements: " << query_queue_elements
<< ", cold_query_queue_size: " << cold_query_queue_size
<< ", cold_query_queue_elements: " << cold_query_queue_elements << ", storage: " << storage;
return ss.str();
}

Expand Down Expand Up @@ -125,6 +133,19 @@ FileCacheSettings get_file_cache_settings(size_t capacity, size_t max_query_cach
settings.query_queue_elements =
std::max(settings.query_queue_size / settings.max_file_block_size,
REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS);

if (config::enable_normal_queue_cold_hot_separation) {
size_t normal_queue_per_size = settings.query_queue_size / 100;
size_t normal_queue_per_elements = settings.query_queue_elements / 100;

settings.cold_query_queue_size = normal_queue_per_size * config::normal_queue_cold_percent;
settings.cold_query_queue_elements =
normal_queue_per_elements * config::normal_queue_cold_percent;

settings.query_queue_size -= settings.cold_query_queue_size;
settings.query_queue_elements -= settings.cold_query_queue_elements;
}

settings.storage = storage;
return settings;
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/cache/file_cache_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ enum FileCacheType {
NORMAL = 1,
DISPOSABLE = 0,
TTL = 3,
COLD_NORMAL = 4,
};
std::string cache_type_to_surfix(FileCacheType type);
FileCacheType surfix_to_cache_type(const std::string& str);
Expand Down Expand Up @@ -121,6 +122,8 @@ struct FileCacheSettings {
size_t index_queue_elements {0};
size_t query_queue_size {0};
size_t query_queue_elements {0};
size_t cold_query_queue_size {0};
size_t cold_query_queue_elements {0};
size_t ttl_queue_size {0};
size_t ttl_queue_elements {0};
size_t max_file_block_size {0};
Expand Down
8 changes: 4 additions & 4 deletions be/src/io/cache/fs_file_cache_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,8 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const
}
// if the file is tmp, it means it is the old file and it should be removed
if (!args.is_tmp) {
_mgr->add_cell(args.hash, args.ctx, args.offset, args.size,
FileBlock::State::DOWNLOADED, cache_lock);
_mgr->add_cell_directly(args.hash, args.ctx, args.offset, args.size,
FileBlock::State::DOWNLOADED, cache_lock);
return;
}
std::error_code ec;
Expand Down Expand Up @@ -887,8 +887,8 @@ void FSFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* mgr, cons
}
} else {
context_original.cache_type = cache_type;
mgr->add_cell(key.hash, context_original, offset, size,
FileBlock::State::DOWNLOADED, cache_lock);
mgr->add_cell_directly(key.hash, context_original, offset, size,
FileBlock::State::DOWNLOADED, cache_lock);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/io/cache/lru_queue_recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ LRUQueue& LRUQueueRecorder::get_shadow_queue(FileCacheType type) {
return _shadow_normal_queue;
case FileCacheType::TTL:
return _shadow_ttl_queue;
case FileCacheType::COLD_NORMAL:
return _shadow_cold_normal_queue;
default:
LOG(WARNING) << "invalid shadow queue type";
DCHECK(false);
Expand All @@ -113,6 +115,8 @@ CacheLRULogQueue& LRUQueueRecorder::get_lru_log_queue(FileCacheType type) {
return _normal_lru_log_queue;
case FileCacheType::TTL:
return _ttl_lru_log_queue;
case FileCacheType::COLD_NORMAL:
return _cold_normal_lru_log_queue;
default:
LOG(WARNING) << "invalid lru log queue type";
DCHECK(false);
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/cache/lru_queue_recorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class LRUQueueRecorder {
_lru_queue_update_cnt_from_last_dump[FileCacheType::NORMAL] = 0;
_lru_queue_update_cnt_from_last_dump[FileCacheType::INDEX] = 0;
_lru_queue_update_cnt_from_last_dump[FileCacheType::TTL] = 0;
_lru_queue_update_cnt_from_last_dump[FileCacheType::COLD_NORMAL] = 0;
}
void record_queue_event(FileCacheType type, CacheLRULogType log_type, const UInt128Wrapper hash,
const size_t offset, const size_t size);
Expand All @@ -73,11 +74,13 @@ class LRUQueueRecorder {
LRUQueue _shadow_normal_queue;
LRUQueue _shadow_disposable_queue;
LRUQueue _shadow_ttl_queue;
LRUQueue _shadow_cold_normal_queue;

CacheLRULogQueue _ttl_lru_log_queue;
CacheLRULogQueue _index_lru_log_queue;
CacheLRULogQueue _normal_lru_log_queue;
CacheLRULogQueue _disposable_lru_log_queue;
CacheLRULogQueue _cold_normal_lru_log_queue;

std::unordered_map<FileCacheType, size_t> _lru_queue_update_cnt_from_last_dump;

Expand Down
9 changes: 8 additions & 1 deletion be/src/olap/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,14 @@ Status parse_conf_cache_paths(const std::string& config_path, std::vector<CacheP
if (total_size < 0 || (config::enable_file_cache_query_limit && query_limit_bytes < 0)) {
return Status::InvalidArgument("total_size or query_limit should not less than zero");
}

if (config::enable_normal_queue_cold_hot_separation) {
if (config::normal_queue_cold_percent <= 0)
return Status::InvalidArgument(
"The config normal_queue_cold_percent must be greater than 0.");
if (config::normal_queue_cold_percent >= 100)
return Status::InvalidArgument(
"The config normal_queue_cold_percent must be less than 0.");
}
// percent
auto get_percent_value = [&](const std::string& key, size_t& percent) {
auto& value = map.FindMember(key.c_str())->value;
Expand Down
102 changes: 102 additions & 0 deletions be/test/io/cache/block_file_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ TEST_F(BlockFileCacheTest, init) {
]
)");
config::enable_file_cache_query_limit = true;
config::enable_normal_queue_cold_hot_separation = false;
std::vector<CachePath> cache_paths;
EXPECT_TRUE(parse_conf_cache_paths(string, cache_paths));
EXPECT_EQ(cache_paths.size(), 2);
Expand Down Expand Up @@ -8222,4 +8223,105 @@ TEST_F(BlockFileCacheTest, cached_remote_file_reader_direct_read_bytes_check) {
FileCacheFactory::instance()->_capacity = 0;
}

TEST_F(BlockFileCacheTest, test_normal_queue_cold_hot_separation) {
config::enable_evict_file_cache_in_advance = false;
config::enable_normal_queue_cold_hot_separation = true;
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
fs::create_directories(cache_base_path);
TUniqueId query_id;
query_id.hi = 1;
query_id.lo = 1;
io::FileCacheSettings settings;

settings.ttl_queue_size = 0;
settings.ttl_queue_elements = 0;
settings.query_queue_size = 30;
settings.query_queue_elements = 10;
settings.cold_query_queue_size = 30;
settings.cold_query_queue_elements = 10;
settings.index_queue_size = 0;
settings.index_queue_elements = 0;
settings.disposable_queue_size = 0;
settings.disposable_queue_elements = 0;
settings.capacity = 60;
settings.max_file_block_size = 10;
settings.max_query_cache_size = 0;

io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
int i = 0;
for (; i < 100; i++) {
if (cache.get_async_open_success()) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
ASSERT_TRUE(cache.get_async_open_success());

io::CacheContext context1;
ReadStatistics rstats;
context1.stats = &rstats;
context1.cache_type = io::FileCacheType::NORMAL;
context1.query_id = query_id;
auto key1 = io::BlockFileCache::hash("key1");

int64_t offset = 0;

for (; offset < 60; offset += 5) {
auto holder = cache.get_or_set(key1, offset, 5, context1);
auto blocks = fromHolder(holder);
ASSERT_EQ(blocks.size(), 1);

assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 4),
io::FileBlock::State::EMPTY);
ASSERT_TRUE(blocks[0]->get_or_set_downloader() == io::FileBlock::get_caller_id());
download(blocks[0]);
assert_range(2, blocks[0], io::FileBlock::Range(offset, offset + 4),
io::FileBlock::State::DOWNLOADED);

blocks.clear();
}

offset = 0;

for (; offset < 30; offset += 5) {
auto holder = cache.get_or_set(key1, offset, 5, context1);
auto blocks = fromHolder(holder);
ASSERT_EQ(blocks.size(), 1);

assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 4),
io::FileBlock::State::DOWNLOADED);
blocks.clear();
}

ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 30);
ASSERT_EQ(cache.get_stats_unsafe()["cold_normal_queue_curr_size"], 30);
ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);

for (offset = 30; offset < 60; offset += 5) {
auto holder = cache.get_or_set(key1, offset, 5, context1);
auto blocks = fromHolder(holder);
ASSERT_EQ(blocks.size(), 1);
assert_range(1, blocks[0], io::FileBlock::Range(offset, offset + 4),
io::FileBlock::State::DOWNLOADED);
blocks.clear();
}

ASSERT_EQ(cache.get_stats_unsafe()["ttl_queue_curr_size"], 0);
ASSERT_EQ(cache.get_stats_unsafe()["index_queue_curr_size"], 0);
ASSERT_EQ(cache.get_stats_unsafe()["normal_queue_curr_size"], 60);
ASSERT_EQ(cache.get_stats_unsafe()["cold_normal_queue_curr_size"], 0);
ASSERT_EQ(cache.get_stats_unsafe()["disposable_queue_curr_size"], 0);

if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}

config::enable_normal_queue_cold_hot_separation = false;
}

} // namespace doris::io
Loading