Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
return Status::OK();
}

// merge small IO
// merge small IO with adaptive window sizing
size_t merge_start = offset + has_read;
const size_t merge_end = merge_start + _merged_read_slice_size;
size_t merge_end = merge_start + _merged_read_slice_size;
// <slice_size, is_content>
std::vector<std::pair<size_t, bool>> merged_slice;
size_t content_size = 0;
Expand All @@ -115,6 +115,16 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
return Status::IOError("Fail to merge small IO");
}
int merge_index = range_index;

// Adaptive parameters:
// - max_single_gap: Maximum gap size to include (512KB by default)
// - adaptive_shrink_threshold: Stop merging if prospective gap ratio > threshold
// - min_content_for_adaptive: Only enable adaptive check after accumulating enough content
constexpr size_t max_single_gap = 512 * 1024; // 512KB
constexpr double adaptive_shrink_threshold = 0.4; // If gap/content would > 40%, stop merging
constexpr size_t min_content_for_adaptive =
SMALL_IO / 4; // 512KB, enable check after this much content

while (merge_start < merge_end && merge_index < _random_access_ranges.size()) {
size_t content_max = _remaining - content_size;
if (content_max == 0) {
Expand Down Expand Up @@ -142,10 +152,30 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b
if (merge_index < _random_access_ranges.size() - 1 && merge_start < merge_end) {
size_t gap = _random_access_ranges[merge_index + 1].start_offset -
_random_access_ranges[merge_index].end_offset;

// Adaptive stopping conditions:
// 1. Single gap is too large (> max_single_gap)
if (gap >= max_single_gap) {
break;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gap >= max_single_gap meas gap >= SMALL_IO is always true

// 2. Original condition: accumulated data > SMALL_IO and gap >= SMALL_IO
if ((content_size + hollow_size) > SMALL_IO && gap >= SMALL_IO) {
// too large gap
break;
}

// 3. Dynamic check: if upcoming gap would push ratio too high, stop here
// Check BEFORE including the gap, not after
// Only enable this check after accumulating enough content to avoid over-conservative behavior
// with very small initial ranges
if (content_size >= min_content_for_adaptive && gap > 0) {
double prospective_gap_ratio = (double)(hollow_size + gap) / (double)content_size;
if (prospective_gap_ratio > adaptive_shrink_threshold) {
// Including this gap would make ratio too high, stop merging
break;
}
}

if (gap < merge_end - merge_start && content_size < _remaining &&
!_range_cached_data[merge_index + 1].has_read) {
hollow_size += gap;
Expand Down
Loading
Loading