Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhance](PrefetchReader) Make the prefetch timeout one config #27371

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,8 @@ DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
// Download binlog rate limit, unit is KB/s, 0 means no limit
DEFINE_Int32(download_binlog_rate_limit_kbs, "0");

DEFINE_mInt32(buffered_reader_read_timeout_ms, "20000");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1188,6 +1188,8 @@ DECLARE_Int32(ingest_binlog_work_pool_size);
// Download binlog rate limit, unit is KB/s
DECLARE_Int32(download_binlog_rate_limit_kbs);

DECLARE_mInt32(buffered_reader_read_timeout_ms);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
37 changes: 19 additions & 18 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,20 +386,15 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz
return Status::OK();
}

// the condition variable would wait at most 10 seconds
// otherwise it would quit the procedure and treat it
// as one time out error status and would make the load
// task failed
constexpr static int WAIT_TIME_OUT_MS = 10000;

// there exists occasions where the buffer is already closed but
// some prior tasks are still queued in thread pool, so we have to check whether
// the buffer is closed each time the condition variable is notified.
void PrefetchBuffer::reset_offset(size_t offset) {
{
std::unique_lock lck {_lock};
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
[this]() { return _buffer_status != BufferStatus::PENDING; })) {
if (!_prefetched.wait_for(
lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() { return _buffer_status != BufferStatus::PENDING; })) {
_prefetch_status = Status::TimedOut("time out when reset prefetch buffer");
return;
}
Expand All @@ -426,10 +421,12 @@ void PrefetchBuffer::reset_offset(size_t offset) {
void PrefetchBuffer::prefetch_buffer() {
{
std::unique_lock lck {_lock};
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() {
return _buffer_status == BufferStatus::RESET ||
_buffer_status == BufferStatus::CLOSED;
})) {
if (!_prefetched.wait_for(
lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() {
return _buffer_status == BufferStatus::RESET ||
_buffer_status == BufferStatus::CLOSED;
})) {
_prefetch_status = Status::TimedOut("time out when invoking prefetch buffer");
return;
}
Expand Down Expand Up @@ -469,7 +466,8 @@ void PrefetchBuffer::prefetch_buffer() {
_statis.prefetch_request_io += 1;
_statis.prefetch_request_bytes += _len;
std::unique_lock lck {_lock};
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
if (!_prefetched.wait_for(lck,
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() { return _buffer_status == BufferStatus::PENDING; })) {
_prefetch_status = Status::TimedOut("time out when invoking prefetch buffer");
return;
Expand Down Expand Up @@ -550,10 +548,12 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
{
std::unique_lock lck {_lock};
// buffer must be prefetched or it's closed
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS), [this]() {
return _buffer_status == BufferStatus::PREFETCHED ||
_buffer_status == BufferStatus::CLOSED;
})) {
if (!_prefetched.wait_for(
lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() {
return _buffer_status == BufferStatus::PREFETCHED ||
_buffer_status == BufferStatus::CLOSED;
})) {
_prefetch_status = Status::TimedOut("time out when read prefetch buffer");
return _prefetch_status;
}
Expand Down Expand Up @@ -589,7 +589,8 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
void PrefetchBuffer::close() {
std::unique_lock lck {_lock};
// in case _reader still tries to write to the buf after we close the buffer
if (!_prefetched.wait_for(lck, std::chrono::milliseconds(WAIT_TIME_OUT_MS),
if (!_prefetched.wait_for(lck,
std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
[this]() { return _buffer_status != BufferStatus::PENDING; })) {
_prefetch_status = Status::TimedOut("time out when close prefetch buffer");
return;
Expand Down