Skip to content

Commit

Permalink
Merge branch 'apache:master' into fix-json-quote
Browse files Browse the repository at this point in the history
  • Loading branch information
amorynan authored Aug 27, 2024
2 parents 70a643e + 748103a commit db2a8f9
Show file tree
Hide file tree
Showing 224 changed files with 30,944 additions and 28,112 deletions.
8 changes: 8 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,7 @@ DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
DEFINE_Bool(enable_file_cache, "false");
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}]
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
// format: {"path": "/path/to/file_cache", "total_size":53687091200, "normal_percent":85, "disposable_percent":10, "index_percent":5}
DEFINE_String(file_cache_path, "");
DEFINE_Int64(file_cache_each_block_size, "1048576"); // 1MB

Expand Down Expand Up @@ -1305,6 +1306,13 @@ DEFINE_mBool(enable_hdfs_mem_limiter, "true");

DEFINE_mInt16(topn_agg_limit_multiplier, "2");

// Tablet meta size limit after serialization, 1.5GB
DEFINE_mInt64(tablet_meta_serialize_size_limit, "1610612736");
// Protobuf supports a maximum of 2GB, so the size of the tablet meta after serialization must be less than 2GB
// 1717986918 = 2GB * 0.8
DEFINE_Validator(tablet_meta_serialize_size_limit,
[](const int64_t config) -> bool { return config < 1717986918; });

DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60");

// clang-format off
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,8 @@ DECLARE_mBool(enable_hdfs_mem_limiter);
// we should do agg limit opt
DECLARE_mInt16(topn_agg_limit_multiplier);

DECLARE_mInt64(tablet_meta_serialize_size_limit);

DECLARE_mInt64(pipeline_task_leakage_detect_period_secs);

#ifdef BE_TEST
Expand Down
33 changes: 23 additions & 10 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -818,9 +818,9 @@ void BlockFileCache::remove_file_blocks_and_clean_time_maps(
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
size_t& removed_size,
std::vector<FileBlockCell*>& to_evict,
std::lock_guard<std::mutex>& cache_lock) {
std::lock_guard<std::mutex>& cache_lock, bool is_ttl) {
for (const auto& [entry_key, entry_offset, entry_size] : queue) {
if (!is_overflow(removed_size, size, cur_cache_size)) {
if (!is_overflow(removed_size, size, cur_cache_size, is_ttl)) {
break;
}
auto* cell = get_cell(entry_key, entry_offset, cache_lock);
Expand Down Expand Up @@ -860,7 +860,8 @@ bool BlockFileCache::try_reserve_for_ttl_without_lru(size_t size,
}
std::vector<FileBlockCell*> to_evict;
auto collect_eliminate_fragments = [&](LRUQueue& queue) {
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock);
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
false);
};
if (disposable_queue_size != 0) {
collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
Expand All @@ -887,7 +888,8 @@ bool BlockFileCache::try_reserve_for_ttl(size_t size, std::lock_guard<std::mutex
size_t cur_cache_size = _cur_cache_size;

std::vector<FileBlockCell*> to_evict;
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock);
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
true);
remove_file_blocks_and_clean_time_maps(to_evict, cache_lock);

return !is_overflow(removed_size, size, cur_cache_size);
Expand Down Expand Up @@ -1151,10 +1153,19 @@ bool BlockFileCache::try_reserve_from_other_queue_by_hot_interval(
return !is_overflow(removed_size, size, cur_cache_size);
}

bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size,
size_t cur_cache_size) const {
return _disk_resource_limit_mode ? removed_size < need_size
: cur_cache_size + need_size - removed_size > _capacity;
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
bool is_ttl) const {
bool ret = false;
if (_disk_resource_limit_mode) {
ret = (removed_size < need_size);
} else {
ret = (cur_cache_size + need_size - removed_size > _capacity);
}
if (is_ttl) {
size_t ttl_threshold = config::max_ttl_cache_ratio * _capacity / 100;
return (ret || ((cur_cache_size + need_size - removed_size) > ttl_threshold));
}
return ret;
}

bool BlockFileCache::try_reserve_from_other_queue_by_size(
Expand All @@ -1165,7 +1176,8 @@ bool BlockFileCache::try_reserve_from_other_queue_by_size(
std::vector<FileBlockCell*> to_evict;
for (FileCacheType cache_type : other_cache_types) {
auto& queue = get_queue(cache_type);
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock);
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
false);
}
remove_file_blocks(to_evict, cache_lock);
return !is_overflow(removed_size, size, cur_cache_size);
Expand Down Expand Up @@ -1207,7 +1219,8 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
size_t cur_cache_size = _cur_cache_size;

std::vector<FileBlockCell*> to_evict;
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock);
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
false);
remove_file_blocks(to_evict, cache_lock);

if (is_overflow(removed_size, size, cur_cache_size)) {
Expand Down
5 changes: 3 additions & 2 deletions be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,8 @@ class BlockFileCache {
bool try_reserve_from_other_queue_by_size(std::vector<FileCacheType> other_cache_types,
size_t size, std::lock_guard<std::mutex>& cache_lock);

bool is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size) const;
bool is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
bool is_ttl = false) const;

void remove_file_blocks(std::vector<FileBlockCell*>&, std::lock_guard<std::mutex>&);

Expand All @@ -399,7 +400,7 @@ class BlockFileCache {

void find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
size_t& removed_size, std::vector<FileBlockCell*>& to_evict,
std::lock_guard<std::mutex>& cache_lock);
std::lock_guard<std::mutex>& cache_lock, bool is_ttl);
// info
std::string _cache_base_path;
size_t _capacity = 0;
Expand Down
19 changes: 12 additions & 7 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -932,8 +932,7 @@ Status CompactionMixin::modify_rowsets() {
output_rowsets.push_back(_output_rowset);

if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write() &&
_tablet->tablet_schema()->cluster_key_idxes().empty()) {
_tablet->enable_unique_key_merge_on_write()) {
Version version = tablet()->max_version();
DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id());
std::unique_ptr<RowLocationSet> missed_rows;
Expand Down Expand Up @@ -961,10 +960,15 @@ Status CompactionMixin::modify_rowsets() {
&output_rowset_delete_bitmap);
if (missed_rows) {
missed_rows_size = missed_rows->size();
std::size_t merged_missed_rows_size = _stats.merged_rows;
if (!_tablet->tablet_meta()->tablet_schema()->cluster_key_idxes().empty()) {
merged_missed_rows_size += _stats.filtered_rows;
}
if (_tablet->tablet_state() == TABLET_RUNNING &&
_stats.merged_rows != missed_rows_size) {
merged_missed_rows_size != missed_rows_size) {
std::stringstream ss;
ss << "cumulative compaction: the merged rows(" << _stats.merged_rows
<< "), filtered rows(" << _stats.filtered_rows
<< ") is not equal to missed rows(" << missed_rows_size
<< ") in rowid conversion, tablet_id: " << _tablet->tablet_id()
<< ", table_id:" << _tablet->table_id();
Expand All @@ -982,10 +986,11 @@ Status CompactionMixin::modify_rowsets() {
ss << ", version[0-" << version.second + 1 << "]";
}
std::string err_msg = fmt::format(
"cumulative compaction: the merged rows({}) is not equal to missed "
"rows({}) in rowid conversion, tablet_id: {}, table_id:{}",
_stats.merged_rows, missed_rows_size, _tablet->tablet_id(),
_tablet->table_id());
"cumulative compaction: the merged rows({}), filtered rows({})"
" is not equal to missed rows({}) in rowid conversion,"
" tablet_id: {}, table_id:{}",
_stats.merged_rows, _stats.filtered_rows, missed_rows_size,
_tablet->tablet_id(), _tablet->table_id());
if (config::enable_mow_compaction_correctness_check_core) {
CHECK(false) << err_msg;
} else {
Expand Down
7 changes: 6 additions & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ bool MemTable::need_agg() const {
return false;
}

Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) {
Expand All @@ -529,4 +529,9 @@ Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
return Status::OK();
}

Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_to_block(res));
return Status::OK();
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ class MemTable {
void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row,
RowInBlock* row_in_skiplist);

// Used to wrapped by to_block to do exception handle logic
Status _to_block(std::unique_ptr<vectorized::Block>* res);

private:
int64_t _tablet_id;
bool _enable_unique_key_mow = false;
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,17 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
"failed to push data. version count: {}, exceed limit: {}, tablet: {}",
tablet->version_count(), config::max_tablet_version_num, tablet->tablet_id());
}

int version_count = tablet->version_count() + tablet->stale_version_count();
if (tablet->avg_rs_meta_serialize_size() * version_count >
config::tablet_meta_serialize_size_limit) {
return Status::Error<TOO_MANY_VERSION>(
"failed to init rowset builder. meta serialize size : {}, exceed limit: {}, "
"tablet: {}",
tablet->avg_rs_meta_serialize_size() * version_count,
config::tablet_meta_serialize_size_limit, tablet->tablet_id());
}

auto tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->copy_from(*tablet->tablet_schema());
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
Expand Down
16 changes: 10 additions & 6 deletions be/src/olap/rowset/segment_v2/ordinal_page_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool kept_in_memory,
_num_pages = reader.count();
_ordinals.resize(_num_pages + 1);
_pages.resize(_num_pages);

g_ordinal_index_memory_bytes << sizeof(*this) + _ordinals.size() * sizeof(ordinal_t) +
_pages.size() * sizeof(PagePointer) +
sizeof(OrdinalIndexReader);

for (int i = 0; i < _num_pages; i++) {
Slice key = reader.get_key(i);
ordinal_t ordinal = 0;
Expand All @@ -127,9 +132,6 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool kept_in_memory,
}
_ordinals[_num_pages] = _num_values;

g_ordinal_index_memory_bytes << sizeof(*this) + _ordinals.size() * sizeof(ordinal_t) +
_pages.size() * sizeof(PagePointer) +
sizeof(OrdinalIndexReader);
return Status::OK();
}

Expand All @@ -155,9 +157,11 @@ OrdinalPageIndexIterator OrdinalIndexReader::seek_at_or_before(ordinal_t ordinal
}

OrdinalIndexReader::~OrdinalIndexReader() {
g_ordinal_index_memory_bytes << -sizeof(*this) - _ordinals.size() * sizeof(ordinal_t) -
_pages.size() * sizeof(PagePointer) -
sizeof(OrdinalIndexReader);
if (_ordinals.size() > 0) {
g_ordinal_index_memory_bytes << -sizeof(*this) - _ordinals.size() * sizeof(ordinal_t) -
_pages.size() * sizeof(PagePointer) -
sizeof(OrdinalIndexReader);
}
}

} // namespace segment_v2
Expand Down
12 changes: 7 additions & 5 deletions be/src/olap/rowset/segment_v2/zone_map_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ Status ZoneMapIndexReader::_load(bool use_page_cache, bool kept_in_memory,

_page_zone_maps.resize(reader.num_values());

g_zone_map_memory_bytes << sizeof(*this) + sizeof(ZoneMapPB) * _page_zone_maps.size() +
sizeof(IndexedColumnMetaPB);

// read and cache all page zone maps
for (int i = 0; i < reader.num_values(); ++i) {
size_t num_to_read = 1;
Expand All @@ -176,16 +179,15 @@ Status ZoneMapIndexReader::_load(bool use_page_cache, bool kept_in_memory,
}
}

g_zone_map_memory_bytes << sizeof(*this) + sizeof(ZoneMapPB) * _page_zone_maps.size() +
sizeof(IndexedColumnMetaPB);

return Status::OK();
}

ZoneMapIndexReader::~ZoneMapIndexReader() {
// Maybe wrong due to load failures.
g_zone_map_memory_bytes << -sizeof(*this) - sizeof(ZoneMapPB) * _page_zone_maps.size() -
sizeof(IndexedColumnMetaPB);
if (_page_zone_maps.size() > 0) {
g_zone_map_memory_bytes << -sizeof(*this) - sizeof(ZoneMapPB) * _page_zone_maps.size() -
sizeof(IndexedColumnMetaPB);
}
}
#define APPLY_FOR_PRIMITITYPE(M) \
M(TYPE_TINYINT) \
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,16 @@ Status RowsetBuilder::init() {
RETURN_IF_ERROR(check_tablet_version_count());
}

int version_count = tablet()->version_count() + tablet()->stale_version_count();
if (tablet()->avg_rs_meta_serialize_size() * version_count >
config::tablet_meta_serialize_size_limit) {
return Status::Error<TOO_MANY_VERSION>(
"failed to init rowset builder. meta serialize size : {}, exceed limit: {}, "
"tablet: {}",
tablet()->avg_rs_meta_serialize_size() * version_count,
config::tablet_meta_serialize_size_limit, _tablet->tablet_id());
}

RETURN_IF_ERROR(prepare_txn());

DBUG_EXECUTE_IF("BaseRowsetBuilder::init.check_partial_update_column_num", {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& reques
}
}
std::vector<RowsetSharedPtr> empty_vec;
RETURN_IF_ERROR(_new_tablet->modify_rowsets(empty_vec, rowsets_to_delete));
_new_tablet->delete_rowsets(rowsets_to_delete, false);
// inherit cumulative_layer_point from base_tablet
// check if new_tablet.ce_point > base_tablet.ce_point?
_new_tablet->set_cumulative_layer_point(-1);
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class Tablet final : public BaseTablet {

size_t num_rows();
int version_count() const;
int stale_version_count() const;
bool exceed_version_limit(int32_t limit) override;
uint64_t segment_count() const;
Version max_version() const;
Expand All @@ -164,6 +165,7 @@ class Tablet final : public BaseTablet {
double bloom_filter_fpp() const;
size_t next_unique_id() const;
size_t row_size() const;
int64_t avg_rs_meta_serialize_size() const;

// operation in rowsets
Status add_rowset(RowsetSharedPtr rowset);
Expand Down Expand Up @@ -676,6 +678,11 @@ inline int Tablet::version_count() const {
return _tablet_meta->version_count();
}

inline int Tablet::stale_version_count() const {
std::shared_lock rdlock(_meta_lock);
return _tablet_meta->stale_version_count();
}

inline Version Tablet::max_version() const {
std::shared_lock rdlock(_meta_lock);
return _tablet_meta->max_version();
Expand Down Expand Up @@ -730,4 +737,8 @@ inline size_t Tablet::row_size() const {
return _tablet_meta->tablet_schema()->row_size();
}

inline int64_t Tablet::avg_rs_meta_serialize_size() const {
return _tablet_meta->avg_rs_meta_serialize_size();
}

} // namespace doris
21 changes: 21 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
#include <json2pb/pb_to_json.h>
#include <time.h>

#include <cstdint>
#include <set>
#include <utility>

#include "cloud/config.h"
#include "common/config.h"
#include "gutil/integral_types.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/data_dir.h"
Expand Down Expand Up @@ -532,6 +534,25 @@ void TabletMeta::serialize(string* meta_binary) {
<< partition_id << " new=" << tablet_meta_pb.DebugString();
});
bool serialize_success = tablet_meta_pb.SerializeToString(meta_binary);
if (!_rs_metas.empty() || !_stale_rs_metas.empty()) {
_avg_rs_meta_serialize_size =
meta_binary->length() / (_rs_metas.size() + _stale_rs_metas.size());
if (meta_binary->length() > config::tablet_meta_serialize_size_limit ||
!serialize_success) {
int64_t origin_meta_size = meta_binary->length();
int64_t stale_rowsets_num = tablet_meta_pb.stale_rs_metas().size();
tablet_meta_pb.clear_stale_rs_metas();
meta_binary->clear();
serialize_success = tablet_meta_pb.SerializeToString(meta_binary);
LOG(WARNING) << "tablet meta serialization size exceeds limit: "
<< config::tablet_meta_serialize_size_limit
<< " clean up stale rowsets, tablet id: " << tablet_id()
<< " stale rowset num: " << stale_rowsets_num
<< " serialization size before clean " << origin_meta_size
<< " serialization size after clean " << meta_binary->length();
}
}

if (!serialize_success) {
LOG(FATAL) << "failed to serialize meta " << tablet_id();
}
Expand Down
Loading

0 comments on commit db2a8f9

Please sign in to comment.