From c9dd2ff9e4a1ef9081c4e97f85bd2dcdbbef1ce9 Mon Sep 17 00:00:00 2001 From: Kuankuan Guo Date: Thu, 7 Apr 2022 22:54:07 +0800 Subject: [PATCH 1/5] LinkCompaction: Add LevelCompactionBuilder::PickLinkCompaction --- db/compaction.h | 1 + db/compaction_iterator.cc | 2 +- db/compaction_job.cc | 15 ++++++++++++++- db/compaction_job.h | 1 + db/compaction_picker.cc | 23 +++++++++++++++++++++-- db/db_impl.h | 1 + db/dbformat.h | 9 +++++++++ include/rocksdb/advanced_options.h | 5 +++++ include/rocksdb/compaction_filter.h | 4 ++++ options/cf_options.h | 2 ++ util/iterator_cache.h | 1 + 11 files changed, 60 insertions(+), 4 deletions(-) diff --git a/db/compaction.h b/db/compaction.h index a0790c0b26..955ab7a467 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -77,6 +77,7 @@ enum CompactionType { kKeyValueCompaction = 0, kMapCompaction = 1, kGarbageCollection = 2, + kLinkCompaction = 3, }; struct CompactionParams { diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 3dd0ef25f5..b9c24449e8 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -170,7 +170,7 @@ CompactionIterator::CompactionIterator( level_ptrs_ = std::vector(compaction_->number_levels(), 0); } - if (snapshots_->size() == 0) { + if (snapshots_->empty()) { // optimize for fast path if there are no snapshots visible_at_tip_ = true; earliest_snapshot_ = kMaxSequenceNumber; diff --git a/db/compaction_job.cc b/db/compaction_job.cc index aec992c326..1a7b38bf79 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -304,7 +304,9 @@ struct CompactionJob::SubcompactionState { } struct RebuildBlobsInfo { + // File numbers chash_set blobs; + // pop_count = planned file count - actual used file count. size_t pop_count; }; struct BlobRefInfo { @@ -1493,6 +1495,9 @@ void CompactionJob::ProcessCompaction(SubcompactionState* sub_compact) { case kGarbageCollection: ProcessGarbageCollection(sub_compact); break; + case kLinkCompaction: + ProcessLinkCompaction(sub_compact); + break; default: assert(false); break; @@ -1765,6 +1770,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (!sub_compact->compaction->partial_compaction()) { dict_sample_data.reserve(kSampleBytes); } + + // Represents how many records in target blob SST that are needed by the key + // SST std::unordered_map dependence; size_t yield_count = 0; @@ -2036,7 +2044,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { sub_compact->c_iter.reset(); input.reset(); sub_compact->status = status; -} // namespace TERARKDB_NAMESPACE +} + +// TODO(guokuankuan@bytedance.com) +void CompactionJob::ProcessLinkCompaction(SubcompactionState* sub_compact) { + return; +} void CompactionJob::ProcessGarbageCollection(SubcompactionState* sub_compact) { assert(sub_compact != nullptr); diff --git a/db/compaction_job.h b/db/compaction_job.h index e7b78eb4d2..dfe7dddaab 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -123,6 +123,7 @@ class CompactionJob { // kv-pairs void ProcessCompaction(SubcompactionState* sub_compact); void ProcessKeyValueCompaction(SubcompactionState* sub_compact); + void ProcessLinkCompaction(SubcompactionState* sub_compact); void ProcessGarbageCollection(SubcompactionState* sub_compact); Status FinishCompactionOutputFile( diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 00b0887ab7..43773c7a24 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -2200,6 +2200,13 @@ class LevelCompactionBuilder { // Pick and return a compaction. Compaction* PickCompaction(); + // Gather target files (including blob SST(KV separated) and ordinary SST) and + // composite a compaction for future use. + // A Link Compaction contains two major objectives: + // (1) Link target SST files into a logical linked sst + // (2) GC old linked SST files + Compaction* PickLinkCompaction(); + // Pick lazy compaction Compaction* PickLazyCompaction(const std::vector& snapshots); @@ -2298,7 +2305,10 @@ void LevelCompactionBuilder::SetupInitialFiles() { // In these cases, to reduce L0 file count and thus reduce likelihood // of write stalls, we can attempt compacting a span of files within // L0. - if (PickIntraL0Compaction()) { + // + // If link compaction is enabled, we should skip L0 internal compaction + // since the link should be finished quite fast. + if (!ioptions_.enable_link_compaction && PickIntraL0Compaction()) { output_level_ = 0; compaction_reason_ = CompactionReason::kLevelL0FilesNum; break; @@ -2431,6 +2441,13 @@ Compaction* LevelCompactionBuilder::PickCompaction() { return c; } +// TODO (guokuankuan@bytedance.com) +// We could reuse the ordinary compaction picker at the moment, but sooner we should pick link compactions smarter. +Compaction* LevelCompactionBuilder::PickLinkCompaction() { + compaction_type_ = CompactionType::kLinkCompaction; + return PickCompaction(); +} + Compaction* LevelCompactionBuilder::PickLazyCompaction( const std::vector& snapshots) { using SortedRun = CompactionPicker::SortedRun; @@ -2971,7 +2988,7 @@ bool LevelCompactionBuilder::PickFileToCompact() { // store where to start the iteration in the next call to PickCompaction vstorage_->SetNextCompactionIndex(start_level_, cmp_idx); - return start_level_inputs_.size() > 0; + return !start_level_inputs_.empty(); } bool LevelCompactionBuilder::PickIntraL0Compaction() { @@ -2999,6 +3016,8 @@ Compaction* LevelCompactionPicker::PickCompaction( mutable_cf_options, ioptions_); if (ioptions_.enable_lazy_compaction) { return builder.PickLazyCompaction(snapshots); + } else if (ioptions_.enable_link_compaction) { + return builder.PickLinkCompaction(); } else { return builder.PickCompaction(); } diff --git a/db/db_impl.h b/db/db_impl.h index 66c51a6c3f..a480390be2 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -1258,6 +1258,7 @@ class DBImpl : public DB { void BackgroundCallGarbageCollection(); void BackgroundCallFlush(); void BackgroundCallPurge(); + Status BackgroundCompaction(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer, PrepickedCompaction* prepicked_compaction); diff --git a/db/dbformat.h b/db/dbformat.h index 7861edeec6..31686e096e 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -64,6 +64,9 @@ enum ValueType : unsigned char { // generated by WriteUnprepared write policy is not mistakenly read by // another. kTypeBeginUnprepareXID = 0x13, // WAL only. + // Similar to kTypeValueIndex, this means current value belongs to + // a LinkSST and pointed to the actual value file. + kTypeLinkIndex = 0x14, // LinkSST only kMaxValue = 0x7F // Not used for storing records. }; @@ -784,6 +787,12 @@ struct ParsedInternalKeyComparator { const InternalKeyComparator* cmp; }; +// This interface is used for transferring data format between KV separated and +// combined kv pairs. +// We should add an implementation instance while processing key value pairs +// during compactions. +// Note that, we also port `Version` to this interface since we may need to fetch +// value from any version that has separated values. class SeparateHelper { public: virtual ~SeparateHelper() = default; diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 6df82cd088..e5dbfc88c9 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -216,6 +216,11 @@ struct AdvancedColumnFamilyOptions { // LazyCompaction, default false bool enable_lazy_compaction = false; + // Link Compaciton is a replacement policy to general compactions. + // It will not physically compacts target SST files immediately but instead link them logically. + // The actually physical compaction will be triggered in the background. + bool enable_link_compaction = false; + // Read TableProperties from file if false bool pin_table_properties_in_reader = true; diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index afe5e012f6..a985b4ddab 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -200,6 +200,10 @@ class CompactionFilter // Determines whether value changed by compaction filter were stable. // Default as false, which means stability of outcome is not promised. + // "Stable" means the changed value will not change after the same + // operation is applied multiple times. + // Creators of the compaction filter should override this function, or + // the behavior of the stability checking is undefined. virtual bool IsStableChangeValue() const { return false; } // Returns a name that identifies this compaction filter. diff --git a/options/cf_options.h b/options/cf_options.h index 8aa25aae6f..8c4ed3cb69 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -53,6 +53,8 @@ struct ImmutableCFOptions { bool enable_lazy_compaction; + bool enable_link_compaction; + bool pin_table_properties_in_reader; bool inplace_update_support; diff --git a/util/iterator_cache.h b/util/iterator_cache.h index d545155df0..011f2dde99 100644 --- a/util/iterator_cache.h +++ b/util/iterator_cache.h @@ -18,6 +18,7 @@ struct FileMetaData; class RangeDelAggregator; class TableReader; +// FileMetaData> typedef chash_map DependenceMap; class IteratorCache { From d9e6677ead47cca57fb90673c7778f6ee44023b2 Mon Sep 17 00:00:00 2001 From: Roy Guo Date: Mon, 18 Apr 2022 10:57:47 +0800 Subject: [PATCH 2/5] LinkCompaction: Add LinkSstIterator and implement all its related interfaces --- db/compaction_job.cc | 31 +-- db/compaction_job.h | 32 +++ db/table_cache.cc | 52 ++-- db/version_edit.h | 3 + include/rocksdb/options.h | 1 + table/two_level_iterator.cc | 497 +++++++++++++++++++++++++++++++++++- table/two_level_iterator.h | 10 +- util/heap.h | 2 +- util/iterator_cache.h | 4 + 9 files changed, 578 insertions(+), 54 deletions(-) diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 1a7b38bf79..9a95112df6 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1585,34 +1585,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { snapshot_checker_, compact_->compaction->level(), db_options_.statistics.get(), shutting_down_); - struct BuilderSeparateHelper : public SeparateHelper { - SeparateHelper* separate_helper = nullptr; - std::unique_ptr value_meta_extractor; - Status (*trans_to_separate_callback)(void* args, const Slice& key, - LazyBuffer& value) = nullptr; - void* trans_to_separate_callback_args = nullptr; - - Status TransToSeparate(const Slice& internal_key, LazyBuffer& value, - const Slice& meta, bool is_merge, - bool is_index) override { - return SeparateHelper::TransToSeparate( - internal_key, value, value.file_number(), meta, is_merge, is_index, - value_meta_extractor.get()); - } - - Status TransToSeparate(const Slice& key, LazyBuffer& value) override { - if (trans_to_separate_callback == nullptr) { - return Status::NotSupported(); - } - return trans_to_separate_callback(trans_to_separate_callback_args, key, - value); - } + BuilderSeparateHelper separate_helper; - LazyBuffer TransToCombined(const Slice& user_key, uint64_t sequence, - const LazyBuffer& value) const override { - return separate_helper->TransToCombined(user_key, sequence, value); - } - } separate_helper; if (compact_->compaction->immutable_cf_options() ->value_meta_extractor_factory != nullptr) { ValueExtractorContext context = {cfd->GetID()}; @@ -2048,7 +2022,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // TODO(guokuankuan@bytedance.com) void CompactionJob::ProcessLinkCompaction(SubcompactionState* sub_compact) { - return; + assert(sub_compact != nullptr); + return ProcessKeyValueCompaction(sub_compact); } void CompactionJob::ProcessGarbageCollection(SubcompactionState* sub_compact) { diff --git a/db/compaction_job.h b/db/compaction_job.h index dfe7dddaab..2a640d7876 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -64,6 +64,38 @@ class VersionEdit; class VersionSet; class CompactionJob { + public: + + class BuilderSeparateHelper : public SeparateHelper { + public: + SeparateHelper* separate_helper = nullptr; + std::unique_ptr value_meta_extractor; + Status (*trans_to_separate_callback)(void* args, const Slice& key, + LazyBuffer& value) = nullptr; + void* trans_to_separate_callback_args = nullptr; + + Status TransToSeparate(const Slice& internal_key, LazyBuffer& value, + const Slice& meta, bool is_merge, + bool is_index) override { + return SeparateHelper::TransToSeparate( + internal_key, value, value.file_number(), meta, is_merge, is_index, + value_meta_extractor.get()); + } + + Status TransToSeparate(const Slice& key, LazyBuffer& value) override { + if (trans_to_separate_callback == nullptr) { + return Status::NotSupported(); + } + return trans_to_separate_callback(trans_to_separate_callback_args, key, + value); + } + + LazyBuffer TransToCombined(const Slice& user_key, uint64_t sequence, + const LazyBuffer& value) const override { + return separate_helper->TransToCombined(user_key, sequence, value); + } + }; + public: CompactionJob(int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, diff --git a/db/table_cache.cc b/db/table_cache.cc index 75eee2d558..22de5b77f7 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -85,7 +85,7 @@ static bool InheritanceMismatch(const FileMetaData& sst_meta, return true; } -// Store params for create depend table iterator in future +// Store params for create dependency table iterator in future class LazyCreateIterator : public Snapshot { TableCache* table_cache_; ReadOptions options_; // deep copy @@ -292,7 +292,7 @@ InternalIterator* TableCache::NewIterator( } size_t readahead = 0; bool record_stats = !for_compaction; - if (file_meta.prop.is_map_sst()) { + if (file_meta.prop.is_map_sst() || file_meta.prop.is_link_sst()) { record_stats = false; } else { // MapSST don't handle these @@ -340,15 +340,9 @@ InternalIterator* TableCache::NewIterator( } InternalIterator* result = nullptr; if (s.ok()) { - if (!file_meta.prop.is_map_sst()) { - if (options.table_filter && - !options.table_filter(*table_reader->GetTableProperties())) { - result = NewEmptyInternalIterator(arena); - } else { - result = table_reader->NewIterator(options, prefix_extractor, arena, - skip_filters, for_compaction); - } - } else { + // For map & linked SST, we should expand their underlying key value pairs, + // not simply iterate the input SST key values. + if(file_meta.prop.is_map_sst() || file_meta.prop.is_link_sst()) { ReadOptions map_options = options; map_options.total_order_seek = true; map_options.readahead_size = 0; @@ -356,6 +350,7 @@ InternalIterator* TableCache::NewIterator( table_reader->NewIterator(map_options, prefix_extractor, arena, skip_filters, false /* for_compaction */); if (!dependence_map.empty()) { + // Map SST will handle range deletion internally, so we can skip here. bool ignore_range_deletions = options.ignore_range_deletions || file_meta.prop.map_handle_range_deletions(); @@ -365,33 +360,54 @@ InternalIterator* TableCache::NewIterator( lazy_create_iter = new (buffer) LazyCreateIterator( this, options, env_options, range_del_agg, prefix_extractor, for_compaction, skip_filters, ignore_range_deletions, level); - } else { lazy_create_iter = new LazyCreateIterator( this, options, env_options, range_del_agg, prefix_extractor, for_compaction, skip_filters, ignore_range_deletions, level); } - auto map_sst_iter = NewMapSstIterator( - &file_meta, result, dependence_map, ioptions_.internal_comparator, - lazy_create_iter, c_style_callback(*lazy_create_iter), arena); + + // For map & linked sst, we should expand their dependencies and merge + // all related iterators into one combined iterator for further reads. + InternalIterator* sst_iter = nullptr; + + if(file_meta.prop.is_map_sst()) { + sst_iter = NewMapSstIterator( + &file_meta, result, dependence_map, ioptions_.internal_comparator, + lazy_create_iter, c_style_callback(*lazy_create_iter), arena); + } else { + assert(file_meta.prop.is_link_sst()); + sst_iter = NewLinkSstIterator( + &file_meta, result, dependence_map, ioptions_.internal_comparator, + lazy_create_iter, c_style_callback(*lazy_create_iter), arena); + } + if (arena != nullptr) { - map_sst_iter->RegisterCleanup( + sst_iter->RegisterCleanup( [](void* arg1, void* arg2) { static_cast(arg1)->~InternalIterator(); static_cast(arg2)->~LazyCreateIterator(); }, result, lazy_create_iter); } else { - map_sst_iter->RegisterCleanup( + sst_iter->RegisterCleanup( [](void* arg1, void* arg2) { delete static_cast(arg1); delete static_cast(arg2); }, result, lazy_create_iter); } - result = map_sst_iter; + result = sst_iter; + } + } else { + if (options.table_filter && + !options.table_filter(*table_reader->GetTableProperties())) { + result = NewEmptyInternalIterator(arena); + } else { + result = table_reader->NewIterator(options, prefix_extractor, arena, + skip_filters, for_compaction); } } + if (create_new_table_reader) { assert(handle == nullptr); result->RegisterCleanup(&DeleteTableReader, table_reader, diff --git a/db/version_edit.h b/db/version_edit.h index 86049f46ce..ff878e4745 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -98,8 +98,11 @@ struct TablePropertyCache { std::vector inheritance; // inheritance set uint64_t earliest_time_begin_compact = port::kMaxUint64; uint64_t latest_time_end_compact = port::kMaxUint64; + uint32_t lbr_hash_bits = 11; // hash bits for each LinkSST KV + uint32_t lbr_group_size = 16; // Group size for LinkBlockRecord bool is_map_sst() const { return purpose == kMapSst; } + bool is_link_sst() const {return purpose == kLinkSst; } bool has_range_deletions() const { return (flags & kNoRangeDeletions) == 0; } bool map_handle_range_deletions() const { return (flags & kMapHandleRangeDeletions) != 0; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 110f5ec52a..a8e546d56f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -88,6 +88,7 @@ enum SstPurpose { kEssenceSst, // Actual data storage sst kLogSst, // Log as sst kMapSst, // Dummy sst + kLinkSst, // Link SST }; struct Options; diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index 50c000f5b0..f96c9db7d6 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -214,21 +214,18 @@ class MapSstIterator final : public InternalIterator { int include_largest_; std::vector link_; struct HeapElement { - InternalIterator* iter; + InternalIterator* iter{}; Slice key; }; template class HeapComparator { public: HeapComparator(const InternalKeyComparator& comparator) : c_(comparator) {} - bool operator()(const HeapElement& a, const HeapElement& b) const { return is_less ? c_.Compare(a.key, b.key) < 0 : c_.Compare(a.key, b.key) > 0; } - const InternalKeyComparator& internal_comparator() const { return c_; } - private: const InternalKeyComparator& c_; }; @@ -488,7 +485,7 @@ class MapSstIterator final : public InternalIterator { if (min_heap_.empty() || icomp.Compare(min_heap_.top().key, largest_key_) >= include_largest_) { - // out of largest bound + // out of the largest bound first_level_value_.reset(); first_level_iter_->Next(); if (InitFirstLevelIter()) { @@ -521,7 +518,7 @@ class MapSstIterator final : public InternalIterator { if (max_heap_.empty() || icomp.Compare(smallest_key_, max_heap_.top().key) >= include_smallest_) { - // out of smallest bound + // out of the smallest bound first_level_value_.reset(); first_level_iter_->Prev(); if (InitFirstLevelIter()) { @@ -544,6 +541,477 @@ class MapSstIterator final : public InternalIterator { virtual Status status() const override { return status_; } }; +// Each LinkSST contains a list of LinkBlock, each LinkBlock contains a certain +// number of file_numbers which indicate where the KV pairs are placed. +class LinkBlockRecord { + public: + LinkBlockRecord(IteratorCache* iterator_cache, + const Slice& key, int group_sz, int hash_bits) + : iterator_cache_(iterator_cache), + max_key_(key), group_sz_(group_sz), hash_bits_(hash_bits) { + key_buffer_.resize(group_sz); + } + + // TODO(guokuankuan@bytedance.com) + // Encode current LinkBlock into slice, so we can put it into a LinkSST. + // Format: + // [file_numbers] block_sz * uint64_t (varint) + // [hash_values] byte aligned (block_sz * hash_bits) + // [smallest key] length prefixed slice + void Encode() {} + + // Decode from a LinkSST's value. + bool DecodeFrom(Slice& input) { + // Decode all file numbers for each KV pair + for(int i = 0; i < group_sz_; ++i) { + uint64_t file_number = 0; + if(!GetVarint64(&input, &file_number)) { + return false; + } + file_numbers_.emplace_back(file_number); + } + assert(file_numbers_.size() == group_sz_); + // Decode hashed values + int total_bits = group_sz_ * hash_bits_; + int total_bytes = total_bits % 8 == 0 ? total_bits / 8 :total_bits / 8 + 1; + assert(input.size() > total_bytes); + assert(hash_bits_ <= 32); + for(int i = 0; i < group_sz_; ++i) { + // TODO(guokuankuan@bytedance.com) Add some UT for this function. + // convert bit represent into uint32 hash values. + int start_pos = i * hash_bits_; + int start_bytes = start_pos / 8; + int end_bytes = (start_pos + hash_bits_) / 8; + uint32_t hash = 0; + memcpy((char*)&hash, input.data() + start_bytes, end_bytes - start_bytes + 1); + hash << (start_pos % 8); + hash >> ((7 - (start_pos + hash_bits_) % 8) + (start_pos % 8)); + hash_values_.emplace_back(hash); + } + // Decode optional smallest key + if(!GetLengthPrefixedSlice(&input, &smallest_key_)) { + return false; + } + return true; + } + + // There should be only one active LinkBlock during the Link SST iteration. + // Here we load all underlying iterators within current LinkBlock and reset + // iter_idx for further sue. + bool ActiveLinkBlock() { + for(int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[i]); + if(!it->status().ok()) { + return false; + } + } + iter_idx = 0; + return true; + } + + // Seek all iterators to their first item. + Status SeekToFirst() { + iter_idx = 0; + for(int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + it->SeekToFirst(); + if(!it->status().ok()) { + return it->status(); + } + } + return Status::OK(); + } + + // Seek all iterators to their last item. + Status SeekToLast() { + assert(group_sz_ == file_numbers_.size()); + iter_idx = group_sz_ - 1; + for(int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + it->SeekToLast(); + if(!it->status().ok()) { + return it->status(); + } + } + return Status::OK(); + } + + Status Seek(const Slice& target, const InternalKeyComparator& icomp) { + uint32_t target_hash = hash(target, hash_bits_); + // Lower bound search for target key + uint32_t left = 0; + uint32_t right = group_sz_; + while(left < right) { + uint32_t mid = left + (right - left) / 2; + auto key = buffered_key(mid, file_numbers_[mid]); + // TODO (guokuankuan@bytedance.com) + // Shall we check key's hash value here? + if(icomp.Compare(key, target) >= 0) { + right = mid; + } else { + left = mid + 1; + } + } + + if(left < group_sz_) { + iter_idx = left; + // Prepare target SST's iterator for further use + // TODO Shall we init all other iterators to the right place so we can + // reuse them in later Next()/Prev()? + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + it->Seek(target); + return Status::OK(); + } else { + iter_idx = -1; + return Status::Corruption(); + } + } + + // Move current iterator to next position, will skip all invalid records + // (hash = 0) + // If all subsequent items are invalid, return an error status. + Status Next() { + // Find the next valid position of current LinkBlock + int next_iter_idx = iter_idx + 1; + while(next_iter_idx < hash_values_.size()) { + // Move forward target iterator + auto it = iterator_cache_->GetIterator(file_numbers_[next_iter_idx]); + it->Next(); + assert(it->status().ok()); + if(hash_values_[next_iter_idx] != INVALID_ITEM_HASH) { + break; + } + next_iter_idx++; + } + + // Exceed max boundary, we should try next LinkBlock, the iter_idx is now + // meaningless since there should be only one LinkBlock active at the same + // time during iteration. + if(next_iter_idx == hash_values_.size()) { + return Status::NotFound("Exceed LinkBlock's max boundary"); + } + + // Current LinkBlock is still in use, update iter_idx. + iter_idx = next_iter_idx; + return Status::OK(); + } + + // See the comment `Next()`, the `Prev()` implementation is almost the same + // except iterator direction. + Status Prev() { + // Find the previous valid position of current LinkBlock + int prev_iter_idx = iter_idx - 1; + while(prev_iter_idx >= 0) { + // Move backward + auto it = iterator_cache_->GetIterator(file_numbers_[prev_iter_idx]); + it->Prev(); + assert(it->status().ok()); + if(hash_values_[prev_iter_idx] != INVALID_ITEM_HASH) { + break; + } + prev_iter_idx--; + } + + // Exceed the smallest boundary + if(prev_iter_idx == -1) { + return Status::NotFound("Exceed LinkBlock's smallest boundary"); + } + + // Current LinkBlock is still in use, update iter_idx. + iter_idx = prev_iter_idx; + + return Status::OK(); + } + + // Extract key from the underlying SST iterator + Slice CurrentKey() const { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + return it->key(); + } + + // Extract value from the underlying SST iterator + LazyBuffer CurrentValue() const { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + return it->value(); + } + + // Return the max key of current LinkBlock + Slice MaxKey() const { return max_key_; } + + // If we have a non-empty `smallest_key_`, we should re-init all underlying + // iterators (default: empty) + Slice SmallestKey() const {return smallest_key_; } + + bool HasSmallestKey() const { return smallest_key_.valid(); } + + std::vector& GetFileNumbers() { return file_numbers_;} + + private: + // If a key's hash is marked as `INVALID_ITEM_HASH`, it means we should remove + // this item in next compaction, thus we shouldn't read from it. + const int INVALID_ITEM_HASH = 0; + + // TODO(guokuankuan@bytedance.com) + // Hash a user key into an integer, limit the maximum bits. + uint32_t hash(const Slice& user_key, int max_bits) { + return 0; + } + + Slice buffered_key(uint32_t idx, uint64_t file_number) { + if(!key_buffer_[idx].valid()) { + // Find out the occurrence between `idx` and the last position of current + // file_number, e.g. [0 ,1, 10, 11, 9, 10, 2], if `idx` is 2, then the + // occurrence should be {2, 5}. + std::vector occurrence = {idx}; + for(uint32_t i = idx + 1; i < group_sz_; ++i) { + if(file_numbers_[i] == file_number) { + occurrence.emplace_back(i); + } + } + + // Seek to the last position of current file_number and `Prev` back to the + // position `idx`, fill all touched keys into the key buffer. + auto it = iterator_cache_->GetIterator(file_number); + it->SeekForPrev(max_key_); + assert(it->status().ok()); + for(uint32_t i = occurrence.size() - 1; i >=0; --i) { + uint32_t pos = occurrence[i]; + if(!key_buffer_[pos].valid()) { + key_buffer_[pos] = it->key(); + } + it->Prev(); + } + } + + assert(!key_buffer_[idx].empty()); + return key_buffer_[idx]; + } + + private: + IteratorCache* iterator_cache_; + + // The end/max key of current LinkBlock + Slice max_key_; + // How many KV pairs we should group into one LinkBlock. + int group_sz_ = 8; + // Bits count required for each underlying SST file + int hash_bits_ = 11; + // Cache touched keys while iterating + std::vector key_buffer_; + // Indicate which SST current KV pairs belongs to. + // file_numbers_.size() == block_sz_ + std::vector file_numbers_; + // Each KV pair has a hash value (hash(user_key)) + std::vector hash_values_; + // Current iteration index of this LinkBlock. + int iter_idx = 0; + // Optional, if smallest_key_exist, it means one of the underlying iterator + // is expired, we should seek all iterators to target key again for further + // iteration. + Slice smallest_key_; +}; + +// TODO(guokuankuan@bytedance.com) +// A LinkSstIterator is almost the same to MapSstIterator. +class LinkSstIterator : public InternalIterator { + private: + const FileMetaData* file_meta_; + InternalIterator* link_sst_iter_ {}; + InternalKeyComparator icomp_; + // The smallest key of current Link SST +// Slice smallest_key; + // The largest key of current Link SST +// Slice largest_key; + + IteratorCache iterator_cache_; + std::vector lbr_list_; + + Status status_; + uint32_t cur_lbr_idx_{}; + +// BinaryHeap, HeapVectorType> min_heap_; + + public: + LinkSstIterator(const FileMetaData* file_meta, InternalIterator* iter, + const DependenceMap& dependence_map, + const InternalKeyComparator& icomp, void* create_arg, + const IteratorCache::CreateIterCallback& create) + : file_meta_(file_meta), + link_sst_iter_(iter), + icomp_(icomp), + iterator_cache_(dependence_map, create_arg, create) { + if (file_meta != nullptr && !file_meta_->prop.is_map_sst()) { + abort(); + } + } + + ~LinkSstIterator() override = default; + + private: + // Init all LBR by decoding all LinkSST's value. + bool InitLinkBlockRecords() { + LazyBuffer current_value_; + link_sst_iter_->SeekToFirst(); + while(link_sst_iter_->Valid()) { + current_value_ = link_sst_iter_->value(); + status_ = current_value_.fetch(); + if(!status_.ok()) { + status_ = Status::Corruption("Failed to fetch lazy buffer"); + return false; + } + Slice input = current_value_.slice(); + lbr_list_.emplace_back(&iterator_cache_, + link_sst_iter_->key(), + file_meta_->prop.lbr_group_size, + file_meta_->prop.lbr_hash_bits); + if(!lbr_list_.back().DecodeFrom(input)) { + status_ = Status::Corruption("Cannot decode Link SST"); + return false; + } + link_sst_iter_->Next(); + } + current_value_.reset(); + return true; + } + + // We assume there should be a lot of underlying SST for each LinkSST, so we + // could simply initialize all SST iterators before any iteration. + bool InitSecondLevelIterators() { + for(auto& lb: lbr_list_) { + if(!lb.ActiveLinkBlock()) { + return false; + } + } + return true; + } + + public: + bool Valid() const override { return !lbr_list_.empty(); } + void SeekToFirst() override { + if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + status_ = Status::Aborted(); + return; + } + assert(!lbr_list_.empty()); + cur_lbr_idx_ = 0; + status_ = lbr_list_[cur_lbr_idx_].SeekToFirst(); + } + + void SeekToLast() override { + if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + status_ = Status::Corruption(); + return; + } + assert(!lbr_list_.empty()); + cur_lbr_idx_ = lbr_list_.size() - 1; + status_ = lbr_list_[cur_lbr_idx_].SeekToLast(); + } + + // TODO(guokuankuan@bytendance.com) + // Is input target a InternalKey ? then what is the default sequence#? + void Seek(const Slice& target) override { + if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + status_ = Status::Corruption(); + return; + } + // Find target LinkBlock's position + auto it = std::lower_bound(lbr_list_.begin(), lbr_list_.end(), target, + [&](const LinkBlockRecord& lbr, const Slice& target){ + return icomp_.Compare(lbr.MaxKey(), target) < 0; + }); + if(it == lbr_list_.end()) { + status_ = Status::NotFound(); + return; + } + cur_lbr_idx_ = it - lbr_list_.begin(); + // Do the Seek + status_ = lbr_list_[cur_lbr_idx_].Seek(target, icomp_); + } + + // Position at the first key at or before the target. + void SeekForPrev(const Slice& target) override { + if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + status_ = Status::Corruption(); + return; + } + + // We adopt Seek & Prev semantics here + Seek(target); + + // If the position's key is equal to target, we are good to go. + if(status_.ok() && key() == target) { + return; + } + + // If the key is greater than target, then we need to `Prev` to the right + // place. + while(status_.ok() && key().compare(target) > 0) { + Prev(); + } + } + + void Next() override { + while(cur_lbr_idx_ < lbr_list_.size()) { + auto s = lbr_list_[cur_lbr_idx_].Next(); + if(s.ok()) { + break; + } + + // If we cannot `Next()` current LBR properly, try next. + cur_lbr_idx_++; + if(cur_lbr_idx_ == lbr_list_.size()) { + break; + } + + assert(cur_lbr_idx_ < lbr_list_.size()); + // If next LBR has a valid smallest key, we should re-seek all iterators + // (which means the iterators' continuous may break) + auto lbr = lbr_list_[cur_lbr_idx_]; + if(lbr.HasSmallestKey()) { + lbr.Seek(lbr.SmallestKey(), icomp_); + } + } + + // No valid position found + if(cur_lbr_idx_ == lbr_list_.size()) { + status_ = Status::NotFound("End of iterator exceeded"); + return; + } + } + + void Prev() override { + while(cur_lbr_idx_ >= 0) { + auto s = lbr_list_[cur_lbr_idx_].Prev(); + if(s.ok()) { + break; + } + // All items were consumed, exit. + if(cur_lbr_idx_ == 0) { + status_ = Status::NotFound("Not more previous items!"); + return; + } + + // If we cannot `Prev()` current LBR, try previous one, note that if current + // LBR has a valid smallest key, we should re-seek previous LBR. + cur_lbr_idx_--; + auto curr_lbr = lbr_list_[cur_lbr_idx_ + 1]; + auto prev_lbr = lbr_list_[cur_lbr_idx_]; + if(curr_lbr.HasSmallestKey()) { + prev_lbr.Seek(prev_lbr.MaxKey(), icomp_); + } + } + } + + Slice key() const override { + assert(Valid()); + return lbr_list_[cur_lbr_idx_].CurrentKey(); + } + LazyBuffer value() const override { + assert(Valid()); + return lbr_list_[cur_lbr_idx_].CurrentValue(); + } + Status status() const override { return status_; } +}; } // namespace InternalIteratorBase* NewTwoLevelIterator( @@ -552,6 +1020,23 @@ InternalIteratorBase* NewTwoLevelIterator( return new TwoLevelIndexIterator(state, first_level_iter); } +InternalIterator* NewLinkSstIterator( + const FileMetaData* file_meta, InternalIterator* mediate_sst_iter, + const DependenceMap& dependence_map, const InternalKeyComparator& icomp, + void* callback_arg, const IteratorCache::CreateIterCallback& create_iter, + Arena* arena) { + assert(file_meta == nullptr || file_meta->prop.is_link_sst()); + if (arena == nullptr) { + return new LinkSstIterator(file_meta, mediate_sst_iter, dependence_map, + icomp, callback_arg, create_iter); + } else { + void* buffer = arena->AllocateAligned(sizeof(LinkSstIterator)); + return new (buffer) + LinkSstIterator(file_meta, mediate_sst_iter, dependence_map, icomp, + callback_arg, create_iter); + } +} + InternalIterator* NewMapSstIterator( const FileMetaData* file_meta, InternalIterator* mediate_sst_iter, const DependenceMap& dependence_map, const InternalKeyComparator& icomp, diff --git a/table/two_level_iterator.h b/table/two_level_iterator.h index 3a967bc3ea..0286fbf153 100644 --- a/table/two_level_iterator.h +++ b/table/two_level_iterator.h @@ -42,12 +42,20 @@ extern InternalIteratorBase* NewTwoLevelIterator( TwoLevelIteratorState* state, InternalIteratorBase* first_level_iter); -// Retuan a two level iterator. for unroll map sst // keep all params lifecycle please +// @return a two level iterator. for unroll map sst extern InternalIterator* NewMapSstIterator( const FileMetaData* file_meta, InternalIterator* mediate_sst_iter, const DependenceMap& dependence_map, const InternalKeyComparator& icomp, void* callback_arg, const IteratorCache::CreateIterCallback& create_iter, Arena* arena = nullptr); +// A link sst iterator should expand all its dependencies for the callers. +// @return TwoLevelIterator +extern InternalIterator* NewLinkSstIterator( + const FileMetaData* file_meta, InternalIterator* mediate_sst_iter, + const DependenceMap& dependence_map, const InternalKeyComparator& icomp, + void* callback_arg, const IteratorCache::CreateIterCallback& create_iter, + Arena* arena = nullptr); + } // namespace TERARKDB_NAMESPACE diff --git a/util/heap.h b/util/heap.h index a21015998c..96b34c8087 100644 --- a/util/heap.h +++ b/util/heap.h @@ -129,7 +129,7 @@ class BinaryHeap { T v = std::move(data_[index]); size_t picked_child = port::kMaxSizet; - while (1) { + while (true) { const size_t left_child = get_left(index); if (get_left(index) >= data_.size()) { break; diff --git a/util/iterator_cache.h b/util/iterator_cache.h index 011f2dde99..bef49e698a 100644 --- a/util/iterator_cache.h +++ b/util/iterator_cache.h @@ -19,6 +19,10 @@ class RangeDelAggregator; class TableReader; // FileMetaData> +// +// TODO(guokuankuan@bytedance.com) +// Shall we change this variable name to `FileMetaMap`? This map is simply map +// file number to it's related file metadata typedef chash_map DependenceMap; class IteratorCache { From 4116c8224146a3a05ca7611c89300b98410a29d5 Mon Sep 17 00:00:00 2001 From: guokuankuan Date: Wed, 20 Apr 2022 18:04:59 +0800 Subject: [PATCH 3/5] LinkCompaction: Add LinkSST support in TableCache::Get --- db/table_cache.cc | 33 ++- db/table_cache.h | 1 + db/version_edit.h | 4 + table/two_level_iterator.cc | 474 +------------------------------- table/two_level_iterator.h | 530 ++++++++++++++++++++++++++++++++++++ util/iterator_cache.h | 5 +- 6 files changed, 570 insertions(+), 477 deletions(-) diff --git a/db/table_cache.cc b/db/table_cache.cc index 22de5b77f7..b0a6c6dacb 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -489,11 +489,42 @@ Status TableCache::Get(const ReadOptions& options, if (s.ok()) { t->UpdateMaxCoveringTombstoneSeq(options, ExtractUserKey(k), get_context->max_covering_tombstone_seq()); - if (!file_meta.prop.is_map_sst()) { + if (!file_meta.prop.is_map_sst() && !file_meta.prop.is_link_sst()) { + // For ordinary SST we could read directly from the TableReader s = t->Get(options, k, get_context, prefix_extractor, skip_filters); } else if (dependence_map.empty()) { + // TODO (guokuankuan@bytedance.com) What shall we do to the ordinary SSTs during LnkSST GC? + // Both MapSST & LinkSST need the data from dependence_map s = Status::Corruption( "TableCache::Get: Composite sst depend files missing"); + } else if(file_meta.prop.is_link_sst()) { + ReadOptions forward_options = options; + forward_options.ignore_range_deletions |= + file_meta.prop.link_handle_range_deletions(); + + // Find target LinkBlockRecord & check LBR's key hash to determine which ordinary SST we + // should look into + Arena arena; + uint64_t file_number; + LinkSstIterator* iter_ptr = reinterpret_cast + (t->NewIterator(ReadOptions(), prefix_extractor, &arena)); + std::unique_ptr link_sst_iter(iter_ptr); + s = link_sst_iter->GetTargetFileNumber(k, &file_number); + + // Forward Get() to target ordinary SST + if(s.ok()) { + auto find = dependence_map.find(file_number); + if (find == dependence_map.end()) { + s = Status::Corruption("Link sst dependence missing"); + } else { + assert(find->second->fd.GetNumber() == file_number); + s = Get(forward_options, *find->second, dependence_map, k, + get_context, prefix_extractor, file_read_hist, skip_filters, + level, inheritance); + } + } + // TODO (guokuankuan@bytedance.com) shall we recovery min_seq_backup? + // get_context->SetMinSequenceAndType(min_seq_type_backup); } else { // Forward query to target sst ReadOptions forward_options = options; diff --git a/db/table_cache.h b/db/table_cache.h index 6659a9cb85..46f0328110 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -25,6 +25,7 @@ #include "rocksdb/table.h" #include "rocksdb/terark_namespace.h" #include "table/table_reader.h" +#include "table/two_level_iterator.h" #include "util/iterator_cache.h" namespace TERARKDB_NAMESPACE { diff --git a/db/version_edit.h b/db/version_edit.h index ff878e4745..51bcfc3c9a 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -85,6 +85,7 @@ struct TablePropertyCache { kMapHandleRangeDeletions = 1ULL << 0, kHasSnapshots = 1ULL << 1, kNoRangeDeletions = 1ULL << 2, + kLinkHandleRangeDeletions = 3ULL << 0, }; uint64_t num_entries = 0; // the number of entries. uint64_t num_deletions = 0; // the number of deletion entries. @@ -107,6 +108,9 @@ struct TablePropertyCache { bool map_handle_range_deletions() const { return (flags & kMapHandleRangeDeletions) != 0; } + bool link_handle_range_deletions() const { + return (flags & kLinkHandleRangeDeletions) != 0; + } bool has_snapshots() const { return (flags & kHasSnapshots) != 0; } }; diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index f96c9db7d6..8c3049b8a3 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -9,7 +9,6 @@ #include "table/two_level_iterator.h" -#include "db/version_edit.h" #include "rocksdb/options.h" #include "rocksdb/terark_namespace.h" #include "table/block.h" @@ -541,478 +540,7 @@ class MapSstIterator final : public InternalIterator { virtual Status status() const override { return status_; } }; -// Each LinkSST contains a list of LinkBlock, each LinkBlock contains a certain -// number of file_numbers which indicate where the KV pairs are placed. -class LinkBlockRecord { - public: - LinkBlockRecord(IteratorCache* iterator_cache, - const Slice& key, int group_sz, int hash_bits) - : iterator_cache_(iterator_cache), - max_key_(key), group_sz_(group_sz), hash_bits_(hash_bits) { - key_buffer_.resize(group_sz); - } - - // TODO(guokuankuan@bytedance.com) - // Encode current LinkBlock into slice, so we can put it into a LinkSST. - // Format: - // [file_numbers] block_sz * uint64_t (varint) - // [hash_values] byte aligned (block_sz * hash_bits) - // [smallest key] length prefixed slice - void Encode() {} - - // Decode from a LinkSST's value. - bool DecodeFrom(Slice& input) { - // Decode all file numbers for each KV pair - for(int i = 0; i < group_sz_; ++i) { - uint64_t file_number = 0; - if(!GetVarint64(&input, &file_number)) { - return false; - } - file_numbers_.emplace_back(file_number); - } - assert(file_numbers_.size() == group_sz_); - // Decode hashed values - int total_bits = group_sz_ * hash_bits_; - int total_bytes = total_bits % 8 == 0 ? total_bits / 8 :total_bits / 8 + 1; - assert(input.size() > total_bytes); - assert(hash_bits_ <= 32); - for(int i = 0; i < group_sz_; ++i) { - // TODO(guokuankuan@bytedance.com) Add some UT for this function. - // convert bit represent into uint32 hash values. - int start_pos = i * hash_bits_; - int start_bytes = start_pos / 8; - int end_bytes = (start_pos + hash_bits_) / 8; - uint32_t hash = 0; - memcpy((char*)&hash, input.data() + start_bytes, end_bytes - start_bytes + 1); - hash << (start_pos % 8); - hash >> ((7 - (start_pos + hash_bits_) % 8) + (start_pos % 8)); - hash_values_.emplace_back(hash); - } - // Decode optional smallest key - if(!GetLengthPrefixedSlice(&input, &smallest_key_)) { - return false; - } - return true; - } - - // There should be only one active LinkBlock during the Link SST iteration. - // Here we load all underlying iterators within current LinkBlock and reset - // iter_idx for further sue. - bool ActiveLinkBlock() { - for(int i = 0; i < group_sz_; ++i) { - auto it = iterator_cache_->GetIterator(file_numbers_[i]); - if(!it->status().ok()) { - return false; - } - } - iter_idx = 0; - return true; - } - - // Seek all iterators to their first item. - Status SeekToFirst() { - iter_idx = 0; - for(int i = 0; i < group_sz_; ++i) { - auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); - it->SeekToFirst(); - if(!it->status().ok()) { - return it->status(); - } - } - return Status::OK(); - } - - // Seek all iterators to their last item. - Status SeekToLast() { - assert(group_sz_ == file_numbers_.size()); - iter_idx = group_sz_ - 1; - for(int i = 0; i < group_sz_; ++i) { - auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); - it->SeekToLast(); - if(!it->status().ok()) { - return it->status(); - } - } - return Status::OK(); - } - - Status Seek(const Slice& target, const InternalKeyComparator& icomp) { - uint32_t target_hash = hash(target, hash_bits_); - // Lower bound search for target key - uint32_t left = 0; - uint32_t right = group_sz_; - while(left < right) { - uint32_t mid = left + (right - left) / 2; - auto key = buffered_key(mid, file_numbers_[mid]); - // TODO (guokuankuan@bytedance.com) - // Shall we check key's hash value here? - if(icomp.Compare(key, target) >= 0) { - right = mid; - } else { - left = mid + 1; - } - } - - if(left < group_sz_) { - iter_idx = left; - // Prepare target SST's iterator for further use - // TODO Shall we init all other iterators to the right place so we can - // reuse them in later Next()/Prev()? - auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); - it->Seek(target); - return Status::OK(); - } else { - iter_idx = -1; - return Status::Corruption(); - } - } - - // Move current iterator to next position, will skip all invalid records - // (hash = 0) - // If all subsequent items are invalid, return an error status. - Status Next() { - // Find the next valid position of current LinkBlock - int next_iter_idx = iter_idx + 1; - while(next_iter_idx < hash_values_.size()) { - // Move forward target iterator - auto it = iterator_cache_->GetIterator(file_numbers_[next_iter_idx]); - it->Next(); - assert(it->status().ok()); - if(hash_values_[next_iter_idx] != INVALID_ITEM_HASH) { - break; - } - next_iter_idx++; - } - - // Exceed max boundary, we should try next LinkBlock, the iter_idx is now - // meaningless since there should be only one LinkBlock active at the same - // time during iteration. - if(next_iter_idx == hash_values_.size()) { - return Status::NotFound("Exceed LinkBlock's max boundary"); - } - - // Current LinkBlock is still in use, update iter_idx. - iter_idx = next_iter_idx; - return Status::OK(); - } - - // See the comment `Next()`, the `Prev()` implementation is almost the same - // except iterator direction. - Status Prev() { - // Find the previous valid position of current LinkBlock - int prev_iter_idx = iter_idx - 1; - while(prev_iter_idx >= 0) { - // Move backward - auto it = iterator_cache_->GetIterator(file_numbers_[prev_iter_idx]); - it->Prev(); - assert(it->status().ok()); - if(hash_values_[prev_iter_idx] != INVALID_ITEM_HASH) { - break; - } - prev_iter_idx--; - } - - // Exceed the smallest boundary - if(prev_iter_idx == -1) { - return Status::NotFound("Exceed LinkBlock's smallest boundary"); - } - - // Current LinkBlock is still in use, update iter_idx. - iter_idx = prev_iter_idx; - - return Status::OK(); - } - - // Extract key from the underlying SST iterator - Slice CurrentKey() const { - auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); - return it->key(); - } - - // Extract value from the underlying SST iterator - LazyBuffer CurrentValue() const { - auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); - return it->value(); - } - - // Return the max key of current LinkBlock - Slice MaxKey() const { return max_key_; } - - // If we have a non-empty `smallest_key_`, we should re-init all underlying - // iterators (default: empty) - Slice SmallestKey() const {return smallest_key_; } - - bool HasSmallestKey() const { return smallest_key_.valid(); } - - std::vector& GetFileNumbers() { return file_numbers_;} - - private: - // If a key's hash is marked as `INVALID_ITEM_HASH`, it means we should remove - // this item in next compaction, thus we shouldn't read from it. - const int INVALID_ITEM_HASH = 0; - - // TODO(guokuankuan@bytedance.com) - // Hash a user key into an integer, limit the maximum bits. - uint32_t hash(const Slice& user_key, int max_bits) { - return 0; - } - - Slice buffered_key(uint32_t idx, uint64_t file_number) { - if(!key_buffer_[idx].valid()) { - // Find out the occurrence between `idx` and the last position of current - // file_number, e.g. [0 ,1, 10, 11, 9, 10, 2], if `idx` is 2, then the - // occurrence should be {2, 5}. - std::vector occurrence = {idx}; - for(uint32_t i = idx + 1; i < group_sz_; ++i) { - if(file_numbers_[i] == file_number) { - occurrence.emplace_back(i); - } - } - - // Seek to the last position of current file_number and `Prev` back to the - // position `idx`, fill all touched keys into the key buffer. - auto it = iterator_cache_->GetIterator(file_number); - it->SeekForPrev(max_key_); - assert(it->status().ok()); - for(uint32_t i = occurrence.size() - 1; i >=0; --i) { - uint32_t pos = occurrence[i]; - if(!key_buffer_[pos].valid()) { - key_buffer_[pos] = it->key(); - } - it->Prev(); - } - } - - assert(!key_buffer_[idx].empty()); - return key_buffer_[idx]; - } - - private: - IteratorCache* iterator_cache_; - - // The end/max key of current LinkBlock - Slice max_key_; - // How many KV pairs we should group into one LinkBlock. - int group_sz_ = 8; - // Bits count required for each underlying SST file - int hash_bits_ = 11; - // Cache touched keys while iterating - std::vector key_buffer_; - // Indicate which SST current KV pairs belongs to. - // file_numbers_.size() == block_sz_ - std::vector file_numbers_; - // Each KV pair has a hash value (hash(user_key)) - std::vector hash_values_; - // Current iteration index of this LinkBlock. - int iter_idx = 0; - // Optional, if smallest_key_exist, it means one of the underlying iterator - // is expired, we should seek all iterators to target key again for further - // iteration. - Slice smallest_key_; -}; - -// TODO(guokuankuan@bytedance.com) -// A LinkSstIterator is almost the same to MapSstIterator. -class LinkSstIterator : public InternalIterator { - private: - const FileMetaData* file_meta_; - InternalIterator* link_sst_iter_ {}; - InternalKeyComparator icomp_; - // The smallest key of current Link SST -// Slice smallest_key; - // The largest key of current Link SST -// Slice largest_key; - - IteratorCache iterator_cache_; - std::vector lbr_list_; - - Status status_; - uint32_t cur_lbr_idx_{}; - -// BinaryHeap, HeapVectorType> min_heap_; - - public: - LinkSstIterator(const FileMetaData* file_meta, InternalIterator* iter, - const DependenceMap& dependence_map, - const InternalKeyComparator& icomp, void* create_arg, - const IteratorCache::CreateIterCallback& create) - : file_meta_(file_meta), - link_sst_iter_(iter), - icomp_(icomp), - iterator_cache_(dependence_map, create_arg, create) { - if (file_meta != nullptr && !file_meta_->prop.is_map_sst()) { - abort(); - } - } - - ~LinkSstIterator() override = default; - - private: - // Init all LBR by decoding all LinkSST's value. - bool InitLinkBlockRecords() { - LazyBuffer current_value_; - link_sst_iter_->SeekToFirst(); - while(link_sst_iter_->Valid()) { - current_value_ = link_sst_iter_->value(); - status_ = current_value_.fetch(); - if(!status_.ok()) { - status_ = Status::Corruption("Failed to fetch lazy buffer"); - return false; - } - Slice input = current_value_.slice(); - lbr_list_.emplace_back(&iterator_cache_, - link_sst_iter_->key(), - file_meta_->prop.lbr_group_size, - file_meta_->prop.lbr_hash_bits); - if(!lbr_list_.back().DecodeFrom(input)) { - status_ = Status::Corruption("Cannot decode Link SST"); - return false; - } - link_sst_iter_->Next(); - } - current_value_.reset(); - return true; - } - - // We assume there should be a lot of underlying SST for each LinkSST, so we - // could simply initialize all SST iterators before any iteration. - bool InitSecondLevelIterators() { - for(auto& lb: lbr_list_) { - if(!lb.ActiveLinkBlock()) { - return false; - } - } - return true; - } - - public: - bool Valid() const override { return !lbr_list_.empty(); } - void SeekToFirst() override { - if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { - status_ = Status::Aborted(); - return; - } - assert(!lbr_list_.empty()); - cur_lbr_idx_ = 0; - status_ = lbr_list_[cur_lbr_idx_].SeekToFirst(); - } - - void SeekToLast() override { - if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { - status_ = Status::Corruption(); - return; - } - assert(!lbr_list_.empty()); - cur_lbr_idx_ = lbr_list_.size() - 1; - status_ = lbr_list_[cur_lbr_idx_].SeekToLast(); - } - - // TODO(guokuankuan@bytendance.com) - // Is input target a InternalKey ? then what is the default sequence#? - void Seek(const Slice& target) override { - if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { - status_ = Status::Corruption(); - return; - } - // Find target LinkBlock's position - auto it = std::lower_bound(lbr_list_.begin(), lbr_list_.end(), target, - [&](const LinkBlockRecord& lbr, const Slice& target){ - return icomp_.Compare(lbr.MaxKey(), target) < 0; - }); - if(it == lbr_list_.end()) { - status_ = Status::NotFound(); - return; - } - cur_lbr_idx_ = it - lbr_list_.begin(); - // Do the Seek - status_ = lbr_list_[cur_lbr_idx_].Seek(target, icomp_); - } - - // Position at the first key at or before the target. - void SeekForPrev(const Slice& target) override { - if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { - status_ = Status::Corruption(); - return; - } - - // We adopt Seek & Prev semantics here - Seek(target); - - // If the position's key is equal to target, we are good to go. - if(status_.ok() && key() == target) { - return; - } - - // If the key is greater than target, then we need to `Prev` to the right - // place. - while(status_.ok() && key().compare(target) > 0) { - Prev(); - } - } - - void Next() override { - while(cur_lbr_idx_ < lbr_list_.size()) { - auto s = lbr_list_[cur_lbr_idx_].Next(); - if(s.ok()) { - break; - } - - // If we cannot `Next()` current LBR properly, try next. - cur_lbr_idx_++; - if(cur_lbr_idx_ == lbr_list_.size()) { - break; - } - - assert(cur_lbr_idx_ < lbr_list_.size()); - // If next LBR has a valid smallest key, we should re-seek all iterators - // (which means the iterators' continuous may break) - auto lbr = lbr_list_[cur_lbr_idx_]; - if(lbr.HasSmallestKey()) { - lbr.Seek(lbr.SmallestKey(), icomp_); - } - } - - // No valid position found - if(cur_lbr_idx_ == lbr_list_.size()) { - status_ = Status::NotFound("End of iterator exceeded"); - return; - } - } - - void Prev() override { - while(cur_lbr_idx_ >= 0) { - auto s = lbr_list_[cur_lbr_idx_].Prev(); - if(s.ok()) { - break; - } - // All items were consumed, exit. - if(cur_lbr_idx_ == 0) { - status_ = Status::NotFound("Not more previous items!"); - return; - } - - // If we cannot `Prev()` current LBR, try previous one, note that if current - // LBR has a valid smallest key, we should re-seek previous LBR. - cur_lbr_idx_--; - auto curr_lbr = lbr_list_[cur_lbr_idx_ + 1]; - auto prev_lbr = lbr_list_[cur_lbr_idx_]; - if(curr_lbr.HasSmallestKey()) { - prev_lbr.Seek(prev_lbr.MaxKey(), icomp_); - } - } - } - - Slice key() const override { - assert(Valid()); - return lbr_list_[cur_lbr_idx_].CurrentKey(); - } - LazyBuffer value() const override { - assert(Valid()); - return lbr_list_[cur_lbr_idx_].CurrentValue(); - } - Status status() const override { return status_; } -}; -} // namespace +} InternalIteratorBase* NewTwoLevelIterator( TwoLevelIteratorState* state, diff --git a/table/two_level_iterator.h b/table/two_level_iterator.h index 0286fbf153..3ab313dd34 100644 --- a/table/two_level_iterator.h +++ b/table/two_level_iterator.h @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once + +#include "db/version_edit.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/terark_namespace.h" @@ -28,6 +30,534 @@ struct TwoLevelIteratorState { const BlockHandle& handle) = 0; }; +// Each LinkSST contains a list of LinkBlock, each LinkBlock contains a certain +// number of file_numbers which indicate where the KV pairs are placed. +class LinkBlockRecord { + public: + LinkBlockRecord(IteratorCache* iterator_cache, + const Slice& key, int group_sz, int hash_bits) + : iterator_cache_(iterator_cache), + max_key_(key), group_sz_(group_sz), hash_bits_(hash_bits) { + key_buffer_.resize(group_sz); + } + + // TODO(guokuankuan@bytedance.com) + // Encode current LinkBlock into slice, so we can put it into a LinkSST. + // Format: + // [file_numbers] block_sz * uint64_t (varint) + // [hash_values] byte aligned (block_sz * hash_bits) + // [smallest key] length prefixed slice + void Encode() {} + + // Decode from a LinkSST's value. + bool DecodeFrom(Slice& input) { + // Decode all file numbers for each KV pair + for(int i = 0; i < group_sz_; ++i) { + uint64_t file_number = 0; + if(!GetVarint64(&input, &file_number)) { + return false; + } + file_numbers_.emplace_back(file_number); + } + assert(file_numbers_.size() == group_sz_); + // Decode hashed values + int total_bits = group_sz_ * hash_bits_; + int total_bytes = total_bits % 8 == 0 ? total_bits / 8 :total_bits / 8 + 1; + assert(input.size() > total_bytes); + assert(hash_bits_ <= 32); + for(int i = 0; i < group_sz_; ++i) { + // TODO(guokuankuan@bytedance.com) Add some UT for this function. + // convert bit represent into uint32 hash values. + int start_pos = i * hash_bits_; + int start_bytes = start_pos / 8; + int end_bytes = (start_pos + hash_bits_) / 8; + uint32_t hash = 0; + memcpy((char*)&hash, input.data() + start_bytes, end_bytes - start_bytes + 1); + hash << (start_pos % 8); + hash >> ((7 - (start_pos + hash_bits_) % 8) + (start_pos % 8)); + hash_values_.emplace_back(hash); + } + // Decode optional smallest key + if(!GetLengthPrefixedSlice(&input, &smallest_key_)) { + return false; + } + return true; + } + + // There should be only one active LinkBlock during the Link SST iteration. + // Here we load all underlying iterators within current LinkBlock and reset + // iter_idx for further sue. + bool ActiveLinkBlock() { + for(int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[i]); + if(!it->status().ok()) { + return false; + } + } + iter_idx = 0; + return true; + } + + // Seek all iterators to their first item. + Status SeekToFirst() { + iter_idx = 0; + for(int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + it->SeekToFirst(); + if(!it->status().ok()) { + return it->status(); + } + } + return Status::OK(); + } + + // Seek all iterators to their last item. + Status SeekToLast() { + assert(group_sz_ == file_numbers_.size()); + iter_idx = group_sz_ - 1; + for(int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + it->SeekToLast(); + if(!it->status().ok()) { + return it->status(); + } + } + return Status::OK(); + } + + // Get the underlying SST's file number of target key + Status GetFileNumber(const Slice& target, uint64_t* file_number) { + uint32_t target_hash = hash(target, hash_bits_); + for(int i = 0; i < group_sz_; ++i) { + // hash collision may happen, so we should double check the key's content. + if(target_hash == hash_values_[i]) { + auto it = iterator_cache_->GetIterator(file_numbers_[i]); + it->Seek(target); + if(!it->status().ok() || it->key().compare(target) != 0) { + continue; + } + *file_number = file_numbers_[i]; + return Status::OK(); + } + } + return Status::NotFound("Target key is not exist"); + } + + // Seek inside a LinkBlock and will fetch & buffer each key we touched + Status Seek(const Slice& target, const InternalKeyComparator& icomp) { + uint32_t target_hash = hash(target, hash_bits_); + // Lower bound search for target key + uint32_t left = 0; + uint32_t right = group_sz_; + while(left < right) { + uint32_t mid = left + (right - left) / 2; + auto key = buffered_key(mid); + // TODO (guokuankuan@bytedance.com) + // Shall we check key's hash value here? + if(icomp.Compare(key, target) >= 0) { + right = mid; + } else { + left = mid + 1; + } + } + + if(left < group_sz_) { + iter_idx = left; + // Prepare target SST's iterator for further use + // TODO Shall we init all other iterators to the right place so we can + // reuse them in later Next()/Prev()? + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + it->Seek(target); + return Status::OK(); + } else { + iter_idx = -1; + return Status::Corruption(); + } + } + + // Move current iterator to next position, will skip all invalid records + // (hash = 0) + // If all subsequent items are invalid, return an error status. + Status Next() { + // Find the next valid position of current LinkBlock + int next_iter_idx = iter_idx + 1; + while(next_iter_idx < hash_values_.size()) { + // Move forward target iterator + auto it = iterator_cache_->GetIterator(file_numbers_[next_iter_idx]); + it->Next(); + assert(it->status().ok()); + if(hash_values_[next_iter_idx] != INVALID_ITEM_HASH) { + break; + } + next_iter_idx++; + } + + // Exceed max boundary, we should try next LinkBlock, the iter_idx is now + // meaningless since there should be only one LinkBlock active at the same + // time during iteration. + if(next_iter_idx == hash_values_.size()) { + return Status::NotFound("Exceed LinkBlock's max boundary"); + } + + // Current LinkBlock is still in use, update iter_idx. + iter_idx = next_iter_idx; + return Status::OK(); + } + + // See the comment `Next()`, the `Prev()` implementation is almost the same + // except iterator direction. + Status Prev() { + // Find the previous valid position of current LinkBlock + int prev_iter_idx = iter_idx - 1; + while(prev_iter_idx >= 0) { + // Move backward + auto it = iterator_cache_->GetIterator(file_numbers_[prev_iter_idx]); + it->Prev(); + assert(it->status().ok()); + if(hash_values_[prev_iter_idx] != INVALID_ITEM_HASH) { + break; + } + prev_iter_idx--; + } + + // Exceed the smallest boundary + if(prev_iter_idx == -1) { + return Status::NotFound("Exceed LinkBlock's smallest boundary"); + } + + // Current LinkBlock is still in use, update iter_idx. + iter_idx = prev_iter_idx; + + return Status::OK(); + } + + // Extract key from the underlying SST iterator + Slice CurrentKey() const { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + return it->key(); + } + + // Extract value from the underlying SST iterator + LazyBuffer CurrentValue() const { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + return it->value(); + } + + // Return the max key of current LinkBlock + Slice MaxKey() const { return max_key_; } + + // If we have a non-empty `smallest_key_`, we should re-init all underlying + // iterators (default: invalid) + Slice SmallestKey() const {return smallest_key_; } + + bool HasSmallestKey() const { return smallest_key_.valid(); } + + std::vector& GetFileNumbers() { return file_numbers_;} + + private: + // If a key's hash is marked as `INVALID_ITEM_HASH`, it means we should remove + // this item in next compaction, thus we shouldn't read from it. + const int INVALID_ITEM_HASH = 0; + + // TODO(guokuankuan@bytedance.com) + // Hash a user key into an integer, limit the maximum bits. + uint32_t hash(const Slice& user_key, int max_bits) { + return 0; + } + + // Get target key from cache. If it's not present, fetch from iterator & cache it. + Slice buffered_key(uint32_t idx) { + uint64_t file_number = file_numbers_[idx]; + if(!key_buffer_[idx].valid()) { + // Find out the occurrence between `idx` and the last position of current + // file_number, e.g. [0 ,1, 10, 11, 9, 10, 2], if `idx` is 2, then the + // occurrence should be {2, 5}. + std::vector occurrence = {idx}; + for(uint32_t i = idx + 1; i < group_sz_; ++i) { + if(file_numbers_[i] == file_number) { + occurrence.emplace_back(i); + } + } + + // Seek to the last position of current file_number and `Prev` back to the + // position `idx`, fill all touched keys into the key buffer. + auto it = iterator_cache_->GetIterator(file_number); + it->SeekForPrev(max_key_); + assert(it->status().ok()); + for(uint32_t i = occurrence.size() - 1; i >=0; --i) { + uint32_t pos = occurrence[i]; + if(!key_buffer_[pos].valid()) { + key_buffer_[pos] = it->key(); + } + it->Prev(); + } + } + + assert(!key_buffer_[idx].empty()); + return key_buffer_[idx]; + } + + private: + IteratorCache* iterator_cache_; + + // The end/max key of current LinkBlock + Slice max_key_; + // How many KV pairs we should group into one LinkBlock. + int group_sz_ = 8; + // Bits count required for each underlying SST file + int hash_bits_ = 11; + // Cache touched keys while iterating + std::vector key_buffer_; + // Indicate which SST current KV pairs belongs to. + // file_numbers_.size() == block_sz_ + std::vector file_numbers_; + // Each KV pair has a hash value (hash(user_key)) + std::vector hash_values_; + // Current iteration index of this LinkBlock. + int iter_idx = 0; + // Optional, if smallest_key_exist, it means one of the underlying iterator + // is expired, we should seek all iterators to target key again for further + // iteration. + Slice smallest_key_; + }; + +// LinkSstIterator is a composition of a few ordinary iterators +class LinkSstIterator : public InternalIterator { + private: + const FileMetaData* file_meta_; + InternalIterator* link_sst_iter_ {}; + InternalKeyComparator icomp_; + + IteratorCache iterator_cache_; + std::vector lbr_list_; + + Status status_; + uint32_t cur_lbr_idx_{}; + + // Some operations may invalid current iterator. + bool valid_ = true; + + bool lbr_initialized_ = false; + bool lbr_actived_ = false; + +// BinaryHeap, HeapVectorType> min_heap_; + + public: + LinkSstIterator(const FileMetaData* file_meta, InternalIterator* iter, + const DependenceMap& dependence_map, + const InternalKeyComparator& icomp, void* create_arg, + const IteratorCache::CreateIterCallback& create) + : file_meta_(file_meta), + link_sst_iter_(iter), + icomp_(icomp), + iterator_cache_(dependence_map, create_arg, create) { + if (file_meta != nullptr && !file_meta_->prop.is_map_sst()) { + abort(); + } + } + + ~LinkSstIterator() override = default; + + private: + // Init all LBR by decoding all LinkSST's value. + bool InitLinkBlockRecords() { + if(lbr_initialized_) { + return true; + } + LazyBuffer current_value_; + link_sst_iter_->SeekToFirst(); + while(link_sst_iter_->Valid()) { + current_value_ = link_sst_iter_->value(); + status_ = current_value_.fetch(); + if(!status_.ok()) { + status_ = Status::Corruption("Failed to fetch lazy buffer"); + return false; + } + Slice input = current_value_.slice(); + lbr_list_.emplace_back(&iterator_cache_, + link_sst_iter_->key(), + file_meta_->prop.lbr_group_size, + file_meta_->prop.lbr_hash_bits); + if(!lbr_list_.back().DecodeFrom(input)) { + status_ = Status::Corruption("Cannot decode Link SST"); + return false; + } + link_sst_iter_->Next(); + } + current_value_.reset(); + lbr_initialized_ = true; + return true; + } + + // We assume there should be a lot of underlying SST for each LinkSST, so we + // could simply initialize all SST iterators before any iteration. + bool InitSecondLevelIterators() { + if(lbr_actived_) { + return true; + } + for(auto& lb: lbr_list_) { + if(!lb.ActiveLinkBlock()) { + return false; + } + } + lbr_actived_ = true; + return true; + } + + public: + bool Valid() const override { return !lbr_list_.empty() && valid_; } + void SeekToFirst() override { + if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + status_ = Status::Aborted(); + return; + } + assert(!lbr_list_.empty()); + cur_lbr_idx_ = 0; + status_ = lbr_list_[cur_lbr_idx_].SeekToFirst(); + valid_ = true; + } + + void SeekToLast() override { + if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + status_ = Status::Corruption(); + return; + } + assert(!lbr_list_.empty()); + cur_lbr_idx_ = lbr_list_.size() - 1; + status_ = lbr_list_[cur_lbr_idx_].SeekToLast(); + valid_ = true; + } + + // TODO(guokuankuan@bytendance.com) + // Is input target a InternalKey ? then what is the default sequence#? + void Seek(const Slice& target) override { + if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + status_ = Status::Corruption(); + return; + } + // Find target LinkBlock's position + auto it = std::lower_bound(lbr_list_.begin(), lbr_list_.end(), target, + [&](const LinkBlockRecord& lbr, const Slice& target){ + return icomp_.Compare(lbr.MaxKey(), target) < 0; + }); + if(it == lbr_list_.end()) { + status_ = Status::NotFound(); + return; + } + cur_lbr_idx_ = it - lbr_list_.begin(); + // Do the Seek + status_ = lbr_list_[cur_lbr_idx_].Seek(target, icomp_); + valid_ = true; + } + + // Position at the first key at or before the target. + void SeekForPrev(const Slice& target) override { + if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + status_ = Status::Corruption(); + return; + } + + // We adopt Seek & Prev semantics here + Seek(target); + + // If the position's key is equal to target, we are good to go. + if(status_.ok() && key() == target) { + return; + } + + // If the key is greater than target, then we need to `Prev` to the right + // place. + while(status_.ok() && key().compare(target) > 0) { + Prev(); + } + valid_ = true; + } + + void Next() override { + assert(Valid()); + while(cur_lbr_idx_ < lbr_list_.size()) { + auto s = lbr_list_[cur_lbr_idx_].Next(); + if(s.ok()) { + break; + } + + // If we cannot `Next()` current LBR properly, try next. + cur_lbr_idx_++; + if(cur_lbr_idx_ == lbr_list_.size()) { + break; + } + + assert(cur_lbr_idx_ < lbr_list_.size()); + // If next LBR has a valid smallest key, we should re-seek all iterators + // (which means the iterators' continuous may break) + auto lbr = lbr_list_[cur_lbr_idx_]; + if(lbr.HasSmallestKey()) { + lbr.Seek(lbr.SmallestKey(), icomp_); + } + } + + // No valid position found + if(cur_lbr_idx_ == lbr_list_.size()) { + status_ = Status::NotFound("End of iterator exceeded"); + return; + } + } + + void Prev() override { + assert(Valid()); + while(cur_lbr_idx_ >= 0) { + auto s = lbr_list_[cur_lbr_idx_].Prev(); + if(s.ok()) { + break; + } + // All items were consumed, exit. + if(cur_lbr_idx_ == 0) { + status_ = Status::NotFound("Not more previous items!"); + return; + } + + // If we cannot `Prev()` current LBR, try previous one, note that if current + // LBR has a valid smallest key, we should re-seek previous LBR. + cur_lbr_idx_--; + auto curr_lbr = lbr_list_[cur_lbr_idx_ + 1]; + auto prev_lbr = lbr_list_[cur_lbr_idx_]; + if(curr_lbr.HasSmallestKey()) { + prev_lbr.Seek(prev_lbr.MaxKey(), icomp_); + } + } + } + + Slice key() const override { + assert(Valid()); + return lbr_list_[cur_lbr_idx_].CurrentKey(); + } + + LazyBuffer value() const override { + assert(Valid()); + return lbr_list_[cur_lbr_idx_].CurrentValue(); + } + + // Find target key's underlying SST file number (exactly match) + // TODO (guokuankuan@bytedance.com) Maybe we could re-use Seek() here? + Status GetTargetFileNumber(const Slice& target, uint64_t* file_number) { + if(!InitLinkBlockRecords()) { + return Status::Corruption(); + } + // Find target LinkBlock's position + auto it = std::lower_bound(lbr_list_.begin(), lbr_list_.end(), target, + [&](const LinkBlockRecord& lbr, const Slice& target){ + return icomp_.Compare(lbr.MaxKey(), target) < 0; + }); + if(it == lbr_list_.end()) { + return Status::NotFound(); + } + + return it->GetFileNumber(target, file_number); + } + + Status status() const override { return status_; } +}; + // Return a new two level iterator. A two-level iterator contains an // index iterator whose values point to a sequence of blocks where // each block is itself a sequence of key,value pairs. The returned diff --git a/util/iterator_cache.h b/util/iterator_cache.h index bef49e698a..c04e7154b6 100644 --- a/util/iterator_cache.h +++ b/util/iterator_cache.h @@ -14,15 +14,14 @@ namespace TERARKDB_NAMESPACE { -struct FileMetaData; class RangeDelAggregator; class TableReader; // FileMetaData> // // TODO(guokuankuan@bytedance.com) -// Shall we change this variable name to `FileMetaMap`? This map is simply map -// file number to it's related file metadata +// Shall we change this variable name to `FileMetaMap`? This map simply maps file +// numbers to their related file metadata typedef chash_map DependenceMap; class IteratorCache { From 170a03dfba2898270fd32e6552be1cdc52d322c0 Mon Sep 17 00:00:00 2001 From: guokuankuan Date: Wed, 20 Apr 2022 19:05:42 +0800 Subject: [PATCH 4/5] LinkCompaction: Fix a few LinkSstIterator issues --- db/table_cache.cc | 2 +- table/two_level_iterator.h | 64 +++++++++++++++++++++----------------- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/db/table_cache.cc b/db/table_cache.cc index b0a6c6dacb..3d1827c3eb 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -523,7 +523,7 @@ Status TableCache::Get(const ReadOptions& options, level, inheritance); } } - // TODO (guokuankuan@bytedance.com) shall we recovery min_seq_backup? + // TODO (guokuankuan@bytedance.com) shall we recover with min_seq_backup? // get_context->SetMinSequenceAndType(min_seq_type_backup); } else { // Forward query to target sst diff --git a/table/two_level_iterator.h b/table/two_level_iterator.h index 3ab313dd34..959d8a2c35 100644 --- a/table/two_level_iterator.h +++ b/table/two_level_iterator.h @@ -100,28 +100,28 @@ class LinkBlockRecord { // Seek all iterators to their first item. Status SeekToFirst() { - iter_idx = 0; for(int i = 0; i < group_sz_; ++i) { - auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + auto it = iterator_cache_->GetIterator(file_numbers_[i]); it->SeekToFirst(); if(!it->status().ok()) { return it->status(); } } + iter_idx_ = 0; return Status::OK(); } // Seek all iterators to their last item. Status SeekToLast() { assert(group_sz_ == file_numbers_.size()); - iter_idx = group_sz_ - 1; for(int i = 0; i < group_sz_; ++i) { - auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + auto it = iterator_cache_->GetIterator(file_numbers_[i]); it->SeekToLast(); if(!it->status().ok()) { return it->status(); } } + iter_idx_ = group_sz_ - 1; return Status::OK(); } @@ -152,8 +152,7 @@ class LinkBlockRecord { while(left < right) { uint32_t mid = left + (right - left) / 2; auto key = buffered_key(mid); - // TODO (guokuankuan@bytedance.com) - // Shall we check key's hash value here? + // TODO (guokuankuan@bytedance.com) Shall we check key's hash value here? if(icomp.Compare(key, target) >= 0) { right = mid; } else { @@ -162,15 +161,19 @@ class LinkBlockRecord { } if(left < group_sz_) { - iter_idx = left; - // Prepare target SST's iterator for further use - // TODO Shall we init all other iterators to the right place so we can - // reuse them in later Next()/Prev()? - auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); - it->Seek(target); + iter_idx_ = left; + // We should seek all related iterators within a LinkBlockRecord + std::set unique_file_numbers(file_numbers_.begin(), file_numbers_.end()); + for(const auto& fn: unique_file_numbers) { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx_]); + it->Seek(target); + if(!it->status().ok()) { + return it->status(); + } + } return Status::OK(); } else { - iter_idx = -1; + iter_idx_ = -1; return Status::Corruption(); } } @@ -180,12 +183,14 @@ class LinkBlockRecord { // If all subsequent items are invalid, return an error status. Status Next() { // Find the next valid position of current LinkBlock - int next_iter_idx = iter_idx + 1; + int next_iter_idx = iter_idx_ + 1; while(next_iter_idx < hash_values_.size()) { // Move forward target iterator auto it = iterator_cache_->GetIterator(file_numbers_[next_iter_idx]); it->Next(); - assert(it->status().ok()); + if(!it->status().ok()) { + return it->status(); + } if(hash_values_[next_iter_idx] != INVALID_ITEM_HASH) { break; } @@ -200,7 +205,7 @@ class LinkBlockRecord { } // Current LinkBlock is still in use, update iter_idx. - iter_idx = next_iter_idx; + iter_idx_ = next_iter_idx; return Status::OK(); } @@ -208,12 +213,14 @@ class LinkBlockRecord { // except iterator direction. Status Prev() { // Find the previous valid position of current LinkBlock - int prev_iter_idx = iter_idx - 1; + int prev_iter_idx = iter_idx_ - 1; while(prev_iter_idx >= 0) { // Move backward auto it = iterator_cache_->GetIterator(file_numbers_[prev_iter_idx]); it->Prev(); - assert(it->status().ok()); + if(!it->status().ok()) { + return it->status(); + } if(hash_values_[prev_iter_idx] != INVALID_ITEM_HASH) { break; } @@ -226,20 +233,20 @@ class LinkBlockRecord { } // Current LinkBlock is still in use, update iter_idx. - iter_idx = prev_iter_idx; + iter_idx_ = prev_iter_idx; return Status::OK(); } // Extract key from the underlying SST iterator Slice CurrentKey() const { - auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx_]); return it->key(); } // Extract value from the underlying SST iterator LazyBuffer CurrentValue() const { - auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx]); + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx_]); return it->value(); } @@ -283,7 +290,9 @@ class LinkBlockRecord { // position `idx`, fill all touched keys into the key buffer. auto it = iterator_cache_->GetIterator(file_number); it->SeekForPrev(max_key_); - assert(it->status().ok()); + if(!it->status().ok()) { + return Slice(nullptr); + } for(uint32_t i = occurrence.size() - 1; i >=0; --i) { uint32_t pos = occurrence[i]; if(!key_buffer_[pos].valid()) { @@ -314,7 +323,7 @@ class LinkBlockRecord { // Each KV pair has a hash value (hash(user_key)) std::vector hash_values_; // Current iteration index of this LinkBlock. - int iter_idx = 0; + int iter_idx_ = 0; // Optional, if smallest_key_exist, it means one of the underlying iterator // is expired, we should seek all iterators to target key again for further // iteration. @@ -408,7 +417,6 @@ class LinkSstIterator : public InternalIterator { bool Valid() const override { return !lbr_list_.empty() && valid_; } void SeekToFirst() override { if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { - status_ = Status::Aborted(); return; } assert(!lbr_list_.empty()); @@ -419,7 +427,6 @@ class LinkSstIterator : public InternalIterator { void SeekToLast() override { if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { - status_ = Status::Corruption(); return; } assert(!lbr_list_.empty()); @@ -432,7 +439,6 @@ class LinkSstIterator : public InternalIterator { // Is input target a InternalKey ? then what is the default sequence#? void Seek(const Slice& target) override { if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { - status_ = Status::Corruption(); return; } // Find target LinkBlock's position @@ -490,9 +496,9 @@ class LinkSstIterator : public InternalIterator { assert(cur_lbr_idx_ < lbr_list_.size()); // If next LBR has a valid smallest key, we should re-seek all iterators // (which means the iterators' continuous may break) - auto lbr = lbr_list_[cur_lbr_idx_]; - if(lbr.HasSmallestKey()) { - lbr.Seek(lbr.SmallestKey(), icomp_); + auto next_lbr = lbr_list_[cur_lbr_idx_]; + if(next_lbr.HasSmallestKey()) { + next_lbr.Seek(next_lbr.SmallestKey(), icomp_); } } From ff8e329e6766eaea7d621594d2b09992a9d21cd4 Mon Sep 17 00:00:00 2001 From: Kuankuan Guo Date: Tue, 10 May 2022 23:33:29 +0800 Subject: [PATCH 5/5] [WIP]LinkCompaction: Add LinkCompactionIterator and reformat LinkSstIterator --- db/compaction_iterator.h | 164 +++++- db/compaction_job.cc | 34 +- db/compaction_picker.cc | 5 +- options/cf_options.h | 4 + table/two_level_iterator.h | 1064 ++++++++++++++++++------------------ 5 files changed, 730 insertions(+), 541 deletions(-) diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index c799937cdf..c4d55ce929 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -22,7 +23,28 @@ namespace TERARKDB_NAMESPACE { -class CompactionIterator { +class CompactionIteratorBase { + public: + virtual ~CompactionIteratorBase() = 0; + + virtual void ResetRecordCounts() = 0; + + virtual void SeekToFirst() = 0; + virtual void Next() = 0; + + // Getters + virtual const Slice& key() const = 0; + virtual const LazyBuffer& value() const = 0; + virtual const Status& status() const = 0; + virtual const ParsedInternalKey& ikey() const = 0; + virtual bool Valid() const = 0; + virtual const Slice& user_key() const = 0; + virtual const CompactionIterationStats& iter_stats() const = 0; + // for KV Separation or LinkSST, current value may depend on other file numbers + virtual const std::vector depended_file_numbers() const = 0; +}; + +class CompactionIterator: public CompactionIteratorBase { public: friend class CompactionIteratorToInternalIterator; @@ -96,30 +118,34 @@ class CompactionIterator { const SequenceNumber preserve_deletes_seqnum = 0, const chash_set* b = nullptr); - ~CompactionIterator(); + ~CompactionIterator() override; - void ResetRecordCounts(); + void ResetRecordCounts() override; // Seek to the beginning of the compaction iterator output. // // REQUIRED: Call only once. - void SeekToFirst(); + void SeekToFirst() override; // Produces the next record in the compaction. // // REQUIRED: SeekToFirst() has been called. - void Next(); + void Next() override; // Getters - const Slice& key() const { return key_; } - const LazyBuffer& value() const { return value_; } - const Status& status() const { return status_; } - const ParsedInternalKey& ikey() const { return ikey_; } - bool Valid() const { return valid_; } - const Slice& user_key() const { return current_user_key_; } - const CompactionIterationStats& iter_stats() const { return iter_stats_; } + const Slice& key() const override { return key_; } + const LazyBuffer& value() const override { return value_; } + const Status& status() const override { return status_; } + const ParsedInternalKey& ikey() const override { return ikey_; } + bool Valid() const override { return valid_; } + const Slice& user_key() const override { return current_user_key_; } + const CompactionIterationStats& iter_stats() const override { return iter_stats_; } void SetFilterSampleInterval(size_t filter_sample_interval); + virtual const std::vector depended_file_numbers() const { + return {value_.file_number()}; + } + private: // Processes the input stream to find the next output void NextFromInput(); @@ -236,6 +262,120 @@ class CompactionIterator { } }; +// A LinkCompactionIterator takes a ordinary CompactionIterator as its input and +// iterate all KVs block by block (refer to LinkBlockRecord). +class LinkCompactionIterator: public CompactionIteratorBase { + public: + // @param c_iter Original compaction iterator that combines all underlying SSTs + // @param group_sz The total number of the KV items that a LinkBlock contains. + // @param hash_bits Total bits a key in LinkBlock will use. + explicit LinkCompactionIterator(std::unique_ptr c_iter, + IteratorCache* iterator_cache, + int group_sz, + int hash_bits) + : c_iter_(std::move(c_iter)), + iterator_cache_(iterator_cache), + group_sz_(group_sz), + hash_bits_(hash_bits) {} + + ~LinkCompactionIterator() override { + c_iter_.reset(); + } + + void ResetRecordCounts() override { + c_iter_->ResetRecordCounts(); + } + + // We need to construct the first LinkBlockRecord here. + void SeekToFirst() override { + c_iter_->SeekToFirst(); + status_ = c_iter_->status(); + if(!status_.ok()) { + return; + } + Next(); + } + + // We need to step forward N(LinkBlockRecord's size) times to make sure we can + // get the next LinkBlockRecord(LBR) + // @see LinkSstIterator + void Next() override { + // Each time we proceed to next LBR, we refill underlying file numbers. + file_numbers_.clear(); + + // Obtain the original KV pairs + Slice max_key; + Slice smallest_key; // TODO need to check if we need smallest key + std::vector hash_values; + for(int i = 0; i < group_sz_ && c_iter_->Valid(); ++i) { + const LazyBuffer& value = c_iter_->value(); + hash_values.emplace_back(LinkBlockRecord::hash(c_iter_->user_key(), hash_bits_)); + file_numbers_.emplace_back(value.file_number()); + if(i == group_sz_ - 1) { + max_key = c_iter_->key(); + } + Next(); + if(!c_iter_->status().ok()) { + status_ = c_iter_->status(); + return; + } + } + curr_lbr_ = std::make_unique(iterator_cache_, max_key, + group_sz_, hash_bits_); + curr_lbr_->Encode(file_numbers_, hash_values, smallest_key); + + // Parse internal key + const Slice& key = curr_lbr_->MaxKey(); + if(!ParseInternalKey(key, &ikey_)) { + status_ = Status::Incomplete("Cannot decode internal key"); + } + } + + // Obtain the LBR's key, aka the largest key of the block range. + const Slice& key() const override { + return curr_lbr_->MaxKey(); + } + + const LazyBuffer& value() const override { + return curr_lbr_->EncodedValue(); + } + + const Status& status() const override { + return status_; + } + + const ParsedInternalKey& ikey() const override { + return ikey_; + } + + bool Valid() const override { + return status_.ok(); + } + + const Slice& user_key() const override { + return c_iter_->user_key(); + } + + const CompactionIterationStats& iter_stats() const override { + return c_iter_->iter_stats(); + } + + const std::vector depended_file_numbers() const { + return file_numbers_; + } + + private: + Status status_; + std::unique_ptr curr_lbr_; + ParsedInternalKey ikey_; + std::vector file_numbers_; + + std::unique_ptr c_iter_; + IteratorCache* iterator_cache_; + int group_sz_ = 0; + int hash_bits_ = 0; +}; + InternalIterator* NewCompactionIterator( CompactionIterator* (*new_compaction_iter_callback)(void*), void* arg, const Slice* start_user_key = nullptr); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 9a95112df6..cd5c7bd1be 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -149,7 +149,7 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) { // Maintains state for each sub-compaction struct CompactionJob::SubcompactionState { const Compaction* compaction; - std::unique_ptr c_iter; + std::unique_ptr c_iter; // The boundaries of the key-range this compaction is interested in. No two // subcompactions may have overlapping key-ranges. @@ -1661,14 +1661,26 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { status = Status::OK(); } - sub_compact->c_iter.reset(new CompactionIterator( + auto it = std::make_unique( input.get(), &separate_helper, end, cfd->user_comparator(), &merge, versions_->LastSequence(), &existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false, &range_del_agg, sub_compact->compaction, mutable_cf_options->get_blob_config(), compaction_filter, shutting_down_, preserve_deletes_seqnum_, - &rebuild_blobs_info.blobs)); + &rebuild_blobs_info.blobs); + + // For LinkCompaction, we wrap the old compaction iterator + if(!sub_compact->compaction->immutable_cf_options()->enable_link_compaction) { + sub_compact->c_iter = std::move(it); + } else { + // TODO iterator_cache cannot be null + sub_compact->c_iter = std::make_unique( + std::move(it), nullptr, mutable_cf_options->lbr_group_sz, + mutable_cf_options->lbr_hash_bits); + } + it.reset(); + auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); @@ -1746,7 +1758,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } // Represents how many records in target blob SST that are needed by the key - // SST + // SST: std::unordered_map dependence; size_t yield_count = 0; @@ -1755,6 +1767,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // returns true. const Slice& key = c_iter->key(); const LazyBuffer& value = c_iter->value(); + // For KV separation, we need to record all dependencies SSTs if (c_iter->ikey().type == kTypeValueIndex || c_iter->ikey().type == kTypeMergeIndex) { assert(value.file_number() != uint64_t(-1)); @@ -1763,6 +1776,17 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { ++ib.first->second; } } + // For LinkSSTs, the dependency decoding is slightly different + // TODO(guokuankuan@bytedance.com) We didn't handle MergeIndex yet. + if(c_iter->ikey().type == kTypeLinkIndex) { + // Decode value to find out all underlying SST file numbers. + for(const auto& fn: c_iter->depended_file_numbers()) { + auto ib = dependence.emplace(fn, 1); + if (!ib.second) { + ++ib.first->second; + } + } + } assert(end == nullptr || cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0); @@ -1806,6 +1830,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (!status.ok()) { break; } + // For compression for (const auto& data_elmt : {key, value.slice()}) { size_t data_end_offset = data_begin_offset + data_elmt.size(); while (sample_begin_offset_iter != sample_begin_offsets.cend() && @@ -2020,7 +2045,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { sub_compact->status = status; } -// TODO(guokuankuan@bytedance.com) void CompactionJob::ProcessLinkCompaction(SubcompactionState* sub_compact) { assert(sub_compact != nullptr); return ProcessKeyValueCompaction(sub_compact); diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 43773c7a24..4bfe2cff9e 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -2442,7 +2442,8 @@ Compaction* LevelCompactionBuilder::PickCompaction() { } // TODO (guokuankuan@bytedance.com) -// We could reuse the ordinary compaction picker at the moment, but sooner we should pick link compactions smarter. +// We could reuse the ordinary compaction picker at the moment, but sooner we should +// pick link compactions smarter. Compaction* LevelCompactionBuilder::PickLinkCompaction() { compaction_type_ = CompactionType::kLinkCompaction; return PickCompaction(); @@ -3017,7 +3018,7 @@ Compaction* LevelCompactionPicker::PickCompaction( if (ioptions_.enable_lazy_compaction) { return builder.PickLazyCompaction(snapshots); } else if (ioptions_.enable_link_compaction) { - return builder.PickLinkCompaction(); + return builder.PickLinkCompaction(); } else { return builder.PickCompaction(); } diff --git a/options/cf_options.h b/options/cf_options.h index 8c4ed3cb69..eae706fe4b 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -239,6 +239,10 @@ struct MutableCFOptions { std::vector max_bytes_for_level_multiplier_additional; CompactionOptionsUniversal compaction_options_universal; + // Link Compaction related options + uint32_t lbr_group_sz = 10; + uint32_t lbr_hash_bits = 7; + // Misc options uint64_t max_sequential_skip_in_iterations; bool paranoid_file_checks; diff --git a/table/two_level_iterator.h b/table/two_level_iterator.h index 959d8a2c35..9640547d1e 100644 --- a/table/two_level_iterator.h +++ b/table/two_level_iterator.h @@ -32,536 +32,556 @@ struct TwoLevelIteratorState { // Each LinkSST contains a list of LinkBlock, each LinkBlock contains a certain // number of file_numbers which indicate where the KV pairs are placed. +// +// Encoded value layout: +// [file_numbers] block_sz * uint64_t (variant) +// [hash_values] byte aligned (block_sz * hash_bits) +// [smallest key] length prefixed slice (optional) +// class LinkBlockRecord { - public: - LinkBlockRecord(IteratorCache* iterator_cache, - const Slice& key, int group_sz, int hash_bits) - : iterator_cache_(iterator_cache), - max_key_(key), group_sz_(group_sz), hash_bits_(hash_bits) { - key_buffer_.resize(group_sz); - } - - // TODO(guokuankuan@bytedance.com) - // Encode current LinkBlock into slice, so we can put it into a LinkSST. - // Format: - // [file_numbers] block_sz * uint64_t (varint) - // [hash_values] byte aligned (block_sz * hash_bits) - // [smallest key] length prefixed slice - void Encode() {} - - // Decode from a LinkSST's value. - bool DecodeFrom(Slice& input) { - // Decode all file numbers for each KV pair - for(int i = 0; i < group_sz_; ++i) { - uint64_t file_number = 0; - if(!GetVarint64(&input, &file_number)) { - return false; - } - file_numbers_.emplace_back(file_number); - } - assert(file_numbers_.size() == group_sz_); - // Decode hashed values - int total_bits = group_sz_ * hash_bits_; - int total_bytes = total_bits % 8 == 0 ? total_bits / 8 :total_bits / 8 + 1; - assert(input.size() > total_bytes); - assert(hash_bits_ <= 32); - for(int i = 0; i < group_sz_; ++i) { - // TODO(guokuankuan@bytedance.com) Add some UT for this function. - // convert bit represent into uint32 hash values. - int start_pos = i * hash_bits_; - int start_bytes = start_pos / 8; - int end_bytes = (start_pos + hash_bits_) / 8; - uint32_t hash = 0; - memcpy((char*)&hash, input.data() + start_bytes, end_bytes - start_bytes + 1); - hash << (start_pos % 8); - hash >> ((7 - (start_pos + hash_bits_) % 8) + (start_pos % 8)); - hash_values_.emplace_back(hash); - } - // Decode optional smallest key - if(!GetLengthPrefixedSlice(&input, &smallest_key_)) { - return false; - } - return true; - } - - // There should be only one active LinkBlock during the Link SST iteration. - // Here we load all underlying iterators within current LinkBlock and reset - // iter_idx for further sue. - bool ActiveLinkBlock() { - for(int i = 0; i < group_sz_; ++i) { - auto it = iterator_cache_->GetIterator(file_numbers_[i]); - if(!it->status().ok()) { - return false; - } - } - iter_idx = 0; - return true; - } - - // Seek all iterators to their first item. - Status SeekToFirst() { - for(int i = 0; i < group_sz_; ++i) { - auto it = iterator_cache_->GetIterator(file_numbers_[i]); - it->SeekToFirst(); - if(!it->status().ok()) { - return it->status(); - } - } - iter_idx_ = 0; - return Status::OK(); - } - - // Seek all iterators to their last item. - Status SeekToLast() { - assert(group_sz_ == file_numbers_.size()); - for(int i = 0; i < group_sz_; ++i) { - auto it = iterator_cache_->GetIterator(file_numbers_[i]); - it->SeekToLast(); - if(!it->status().ok()) { - return it->status(); - } - } - iter_idx_ = group_sz_ - 1; - return Status::OK(); + public: + // TODO(guokuankuan@bytedance.com) + // Hash a user key into an integer, limit the maximum bits. + static uint32_t hash(const Slice& user_key, int max_bits) { return 0; } + + public: + // @param key The largest internal key of all items in current LinkBlock + // @param group_sz Total number of items inside a LinkBlock + LinkBlockRecord(IteratorCache* iterator_cache, const Slice& key, int group_sz, + int hash_bits) + : iterator_cache_(iterator_cache), + max_key_(key), + group_sz_(group_sz), + hash_bits_(hash_bits) { + key_buffer_.resize(group_sz); + } + ~LinkBlockRecord() {} + + // TODO(guokuankuan@bytedance.com) + // Encode current LinkBlock into slice, so we can put it into a LinkSST. + void Encode(std::vector file_numbers, + std::vector hash_values, const Slice& smallest_key) { + // TODO remember to set file_number + + encoded_value = LazyBuffer(); + return; + } + + // Decode from a LinkSST's value. + bool DecodeFrom(Slice& input) { + // Decode all file numbers for each KV pair + for (int i = 0; i < group_sz_; ++i) { + uint64_t file_number = 0; + if (!GetVarint64(&input, &file_number)) { + return false; + } + file_numbers_.emplace_back(file_number); + } + assert(file_numbers_.size() == group_sz_); + // Decode hashed values + int total_bits = group_sz_ * hash_bits_; + int total_bytes = total_bits % 8 == 0 ? total_bits / 8 : total_bits / 8 + 1; + assert(input.size() > total_bytes); + assert(hash_bits_ <= 32); + for (int i = 0; i < group_sz_; ++i) { + // TODO(guokuankuan@bytedance.com) Add some UT for this function. + // convert bit represent into uint32 hash values. + int start_pos = i * hash_bits_; + int start_bytes = start_pos / 8; + int end_bytes = (start_pos + hash_bits_) / 8; + uint32_t hash = 0; + memcpy((char*)&hash, input.data() + start_bytes, + end_bytes - start_bytes + 1); + hash << (start_pos % 8); + hash >> ((7 - (start_pos + hash_bits_) % 8) + (start_pos % 8)); + hash_values_.emplace_back(hash); + } + // Decode optional smallest key + if (!GetLengthPrefixedSlice(&input, &smallest_key_)) { + return false; + } + return true; + } + + // There should be only one active LinkBlock during the Link SST iteration. + // Here we load all underlying iterators within current LinkBlock and reset + // iter_idx for further sue. + bool ActiveLinkBlock() { + for (int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[i]); + if (!it->status().ok()) { + return false; + } + } + iter_idx_ = 0; + return true; + } + + // Seek all iterators to their first item. + Status SeekToFirst() { + for (int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[i]); + it->SeekToFirst(); + if (!it->status().ok()) { + return it->status(); + } + } + iter_idx_ = 0; + return Status::OK(); + } + + // Seek all iterators to their last item. + Status SeekToLast() { + assert(group_sz_ == file_numbers_.size()); + for (int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[i]); + it->SeekToLast(); + if (!it->status().ok()) { + return it->status(); + } + } + iter_idx_ = group_sz_ - 1; + return Status::OK(); + } + + // Get the underlying SST's file number of target key + Status GetFileNumber(const Slice& target, uint64_t* file_number) { + uint32_t target_hash = hash(target, hash_bits_); + for (int i = 0; i < group_sz_; ++i) { + // hash collision may happen, so we should double check the key's content. + if (target_hash == hash_values_[i]) { + auto it = iterator_cache_->GetIterator(file_numbers_[i]); + it->Seek(target); + if (!it->status().ok() || it->key().compare(target) != 0) { + continue; } - - // Get the underlying SST's file number of target key - Status GetFileNumber(const Slice& target, uint64_t* file_number) { - uint32_t target_hash = hash(target, hash_bits_); - for(int i = 0; i < group_sz_; ++i) { - // hash collision may happen, so we should double check the key's content. - if(target_hash == hash_values_[i]) { - auto it = iterator_cache_->GetIterator(file_numbers_[i]); - it->Seek(target); - if(!it->status().ok() || it->key().compare(target) != 0) { - continue; - } - *file_number = file_numbers_[i]; - return Status::OK(); - } - } - return Status::NotFound("Target key is not exist"); + *file_number = file_numbers_[i]; + return Status::OK(); + } + } + return Status::NotFound("Target key is not exist"); + } + + // Seek inside a LinkBlock and will fetch & buffer each key we touched + Status Seek(const Slice& target, const InternalKeyComparator& icomp) { + uint32_t target_hash = hash(target, hash_bits_); + // Lower bound search for target key + uint32_t left = 0; + uint32_t right = group_sz_; + while (left < right) { + uint32_t mid = left + (right - left) / 2; + auto key = buffered_key(mid); + // TODO (guokuankuan@bytedance.com) Shall we check key's hash value here? + if (icomp.Compare(key, target) >= 0) { + right = mid; + } else { + left = mid + 1; + } + } + + if (left < group_sz_) { + iter_idx_ = left; + // We should seek all related iterators within a LinkBlockRecord + std::set unique_file_numbers(file_numbers_.begin(), + file_numbers_.end()); + for (const auto& fn : unique_file_numbers) { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx_]); + it->Seek(target); + if (!it->status().ok()) { + return it->status(); } - - // Seek inside a LinkBlock and will fetch & buffer each key we touched - Status Seek(const Slice& target, const InternalKeyComparator& icomp) { - uint32_t target_hash = hash(target, hash_bits_); - // Lower bound search for target key - uint32_t left = 0; - uint32_t right = group_sz_; - while(left < right) { - uint32_t mid = left + (right - left) / 2; - auto key = buffered_key(mid); - // TODO (guokuankuan@bytedance.com) Shall we check key's hash value here? - if(icomp.Compare(key, target) >= 0) { - right = mid; - } else { - left = mid + 1; - } - } - - if(left < group_sz_) { - iter_idx_ = left; - // We should seek all related iterators within a LinkBlockRecord - std::set unique_file_numbers(file_numbers_.begin(), file_numbers_.end()); - for(const auto& fn: unique_file_numbers) { - auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx_]); - it->Seek(target); - if(!it->status().ok()) { - return it->status(); - } - } - return Status::OK(); - } else { - iter_idx_ = -1; - return Status::Corruption(); - } - } - - // Move current iterator to next position, will skip all invalid records - // (hash = 0) - // If all subsequent items are invalid, return an error status. - Status Next() { - // Find the next valid position of current LinkBlock - int next_iter_idx = iter_idx_ + 1; - while(next_iter_idx < hash_values_.size()) { - // Move forward target iterator - auto it = iterator_cache_->GetIterator(file_numbers_[next_iter_idx]); - it->Next(); - if(!it->status().ok()) { - return it->status(); - } - if(hash_values_[next_iter_idx] != INVALID_ITEM_HASH) { - break; - } - next_iter_idx++; - } - - // Exceed max boundary, we should try next LinkBlock, the iter_idx is now - // meaningless since there should be only one LinkBlock active at the same - // time during iteration. - if(next_iter_idx == hash_values_.size()) { - return Status::NotFound("Exceed LinkBlock's max boundary"); - } - - // Current LinkBlock is still in use, update iter_idx. - iter_idx_ = next_iter_idx; - return Status::OK(); - } - - // See the comment `Next()`, the `Prev()` implementation is almost the same - // except iterator direction. - Status Prev() { - // Find the previous valid position of current LinkBlock - int prev_iter_idx = iter_idx_ - 1; - while(prev_iter_idx >= 0) { - // Move backward - auto it = iterator_cache_->GetIterator(file_numbers_[prev_iter_idx]); - it->Prev(); - if(!it->status().ok()) { - return it->status(); - } - if(hash_values_[prev_iter_idx] != INVALID_ITEM_HASH) { - break; - } - prev_iter_idx--; - } - - // Exceed the smallest boundary - if(prev_iter_idx == -1) { - return Status::NotFound("Exceed LinkBlock's smallest boundary"); - } - - // Current LinkBlock is still in use, update iter_idx. - iter_idx_ = prev_iter_idx; - - return Status::OK(); - } - - // Extract key from the underlying SST iterator - Slice CurrentKey() const { - auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx_]); - return it->key(); - } - - // Extract value from the underlying SST iterator - LazyBuffer CurrentValue() const { - auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx_]); - return it->value(); - } - - // Return the max key of current LinkBlock - Slice MaxKey() const { return max_key_; } - - // If we have a non-empty `smallest_key_`, we should re-init all underlying - // iterators (default: invalid) - Slice SmallestKey() const {return smallest_key_; } - - bool HasSmallestKey() const { return smallest_key_.valid(); } - - std::vector& GetFileNumbers() { return file_numbers_;} - - private: - // If a key's hash is marked as `INVALID_ITEM_HASH`, it means we should remove - // this item in next compaction, thus we shouldn't read from it. - const int INVALID_ITEM_HASH = 0; - - // TODO(guokuankuan@bytedance.com) - // Hash a user key into an integer, limit the maximum bits. - uint32_t hash(const Slice& user_key, int max_bits) { - return 0; + } + return Status::OK(); + } else { + iter_idx_ = -1; + return Status::Corruption(); + } + } + + // Move current iterator to next position, will skip all invalid records + // (hash = 0) + // If all subsequent items are invalid, return an error status. + Status Next() { + // Find the next valid position of current LinkBlock + int next_iter_idx = iter_idx_ + 1; + while (next_iter_idx < hash_values_.size()) { + // Move forward target iterator + auto it = iterator_cache_->GetIterator(file_numbers_[next_iter_idx]); + it->Next(); + if (!it->status().ok()) { + return it->status(); + } + if (hash_values_[next_iter_idx] != INVALID_ITEM_HASH) { + break; + } + next_iter_idx++; + } + + // Exceed max boundary, we should try next LinkBlock, the iter_idx is now + // meaningless since there should be only one LinkBlock active at the same + // time during iteration. + if (next_iter_idx == hash_values_.size()) { + return Status::NotFound("Exceed LinkBlock's max boundary"); + } + + // Current LinkBlock is still in use, update iter_idx. + iter_idx_ = next_iter_idx; + return Status::OK(); + } + + // See the comment `Next()`, the `Prev()` implementation is almost the same + // except iterator direction. + Status Prev() { + // Find the previous valid position of current LinkBlock + int prev_iter_idx = iter_idx_ - 1; + while (prev_iter_idx >= 0) { + // Move backward + auto it = iterator_cache_->GetIterator(file_numbers_[prev_iter_idx]); + it->Prev(); + if (!it->status().ok()) { + return it->status(); + } + if (hash_values_[prev_iter_idx] != INVALID_ITEM_HASH) { + break; + } + prev_iter_idx--; + } + + // Exceed the smallest boundary + if (prev_iter_idx == -1) { + return Status::NotFound("Exceed LinkBlock's smallest boundary"); + } + + // Current LinkBlock is still in use, update iter_idx. + iter_idx_ = prev_iter_idx; + + return Status::OK(); + } + + // Extract key from the underlying SST iterator + Slice CurrentKey() const { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx_]); + return it->key(); + } + + // Extract value from the underlying SST iterator + LazyBuffer CurrentValue() const { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx_]); + return it->value(); + } + + // Return the max key of current LinkBlock + const Slice& MaxKey() const { return max_key_; } + + const LazyBuffer& EncodedValue() const { return encoded_value; } + + // If we have a non-empty `smallest_key_`, we should re-init all underlying + // iterators (default: invalid) + const Slice& SmallestKey() const { return smallest_key_; } + + bool HasSmallestKey() const { return smallest_key_.valid(); } + + std::vector& GetFileNumbers() { return file_numbers_; } + + private: + // If a key's hash is marked as `INVALID_ITEM_HASH`, it means we should remove + // this item in next compaction, thus we shouldn't read from it. + const int INVALID_ITEM_HASH = 0; + + // Get target key from cache. If it's not present, fetch from iterator & cache + // it. + Slice buffered_key(uint32_t idx) { + uint64_t file_number = file_numbers_[idx]; + if (!key_buffer_[idx].valid()) { + // Find out the occurrence between `idx` and the last position of current + // file_number, e.g. [0 ,1, 10, 11, 9, 10, 2], if `idx` is 2, then the + // occurrence should be {2, 5}. + std::vector occurrence = {idx}; + for (uint32_t i = idx + 1; i < group_sz_; ++i) { + if (file_numbers_[i] == file_number) { + occurrence.emplace_back(i); } - - // Get target key from cache. If it's not present, fetch from iterator & cache it. - Slice buffered_key(uint32_t idx) { - uint64_t file_number = file_numbers_[idx]; - if(!key_buffer_[idx].valid()) { - // Find out the occurrence between `idx` and the last position of current - // file_number, e.g. [0 ,1, 10, 11, 9, 10, 2], if `idx` is 2, then the - // occurrence should be {2, 5}. - std::vector occurrence = {idx}; - for(uint32_t i = idx + 1; i < group_sz_; ++i) { - if(file_numbers_[i] == file_number) { - occurrence.emplace_back(i); - } - } - - // Seek to the last position of current file_number and `Prev` back to the - // position `idx`, fill all touched keys into the key buffer. - auto it = iterator_cache_->GetIterator(file_number); - it->SeekForPrev(max_key_); - if(!it->status().ok()) { - return Slice(nullptr); - } - for(uint32_t i = occurrence.size() - 1; i >=0; --i) { - uint32_t pos = occurrence[i]; - if(!key_buffer_[pos].valid()) { - key_buffer_[pos] = it->key(); - } - it->Prev(); - } - } - - assert(!key_buffer_[idx].empty()); - return key_buffer_[idx]; + } + + // Seek to the last position of current file_number and `Prev` back to the + // position `idx`, fill all touched keys into the key buffer. + auto it = iterator_cache_->GetIterator(file_number); + it->SeekForPrev(max_key_); + if (!it->status().ok()) { + return {nullptr}; + } + for (uint32_t i = occurrence.size() - 1; i >= 0; --i) { + uint32_t pos = occurrence[i]; + if (!key_buffer_[pos].valid()) { + key_buffer_[pos] = it->key(); } - - private: - IteratorCache* iterator_cache_; - - // The end/max key of current LinkBlock - Slice max_key_; - // How many KV pairs we should group into one LinkBlock. - int group_sz_ = 8; - // Bits count required for each underlying SST file - int hash_bits_ = 11; - // Cache touched keys while iterating - std::vector key_buffer_; - // Indicate which SST current KV pairs belongs to. - // file_numbers_.size() == block_sz_ - std::vector file_numbers_; - // Each KV pair has a hash value (hash(user_key)) - std::vector hash_values_; - // Current iteration index of this LinkBlock. - int iter_idx_ = 0; - // Optional, if smallest_key_exist, it means one of the underlying iterator - // is expired, we should seek all iterators to target key again for further - // iteration. - Slice smallest_key_; - }; + it->Prev(); + } + } + + assert(!key_buffer_[idx].empty()); + return key_buffer_[idx]; + } + + private: + IteratorCache* iterator_cache_; + + // The end/max key of current LinkBlock + Slice max_key_; + // Encoded LBR value (contains all current LBR's information) + LazyBuffer encoded_value; + // How many KV pairs we should group into one LinkBlock. + int group_sz_ = 8; + // Bits count required for each underlying SST file + int hash_bits_ = 11; + // Cache touched keys while iterating + std::vector key_buffer_; + // Indicate which SST current KV pairs belongs to. + // file_numbers_.size() == block_sz_ + std::vector file_numbers_; + // Each KV pair has a hash value (hash(user_key)) + std::vector hash_values_; + // Current iteration index of this LinkBlock. + int iter_idx_ = 0; + // Optional, if smallest_key_exist, it means one of the underlying iterator + // is expired, we should seek all iterators to target key again for further + // iteration. + Slice smallest_key_; +}; // LinkSstIterator is a composition of a few ordinary iterators class LinkSstIterator : public InternalIterator { - private: - const FileMetaData* file_meta_; - InternalIterator* link_sst_iter_ {}; - InternalKeyComparator icomp_; - - IteratorCache iterator_cache_; - std::vector lbr_list_; - - Status status_; - uint32_t cur_lbr_idx_{}; - - // Some operations may invalid current iterator. - bool valid_ = true; - - bool lbr_initialized_ = false; - bool lbr_actived_ = false; - -// BinaryHeap, HeapVectorType> min_heap_; - - public: - LinkSstIterator(const FileMetaData* file_meta, InternalIterator* iter, - const DependenceMap& dependence_map, - const InternalKeyComparator& icomp, void* create_arg, - const IteratorCache::CreateIterCallback& create) - : file_meta_(file_meta), - link_sst_iter_(iter), - icomp_(icomp), - iterator_cache_(dependence_map, create_arg, create) { - if (file_meta != nullptr && !file_meta_->prop.is_map_sst()) { - abort(); - } - } - - ~LinkSstIterator() override = default; - - private: - // Init all LBR by decoding all LinkSST's value. - bool InitLinkBlockRecords() { - if(lbr_initialized_) { - return true; - } - LazyBuffer current_value_; - link_sst_iter_->SeekToFirst(); - while(link_sst_iter_->Valid()) { - current_value_ = link_sst_iter_->value(); - status_ = current_value_.fetch(); - if(!status_.ok()) { - status_ = Status::Corruption("Failed to fetch lazy buffer"); - return false; - } - Slice input = current_value_.slice(); - lbr_list_.emplace_back(&iterator_cache_, - link_sst_iter_->key(), - file_meta_->prop.lbr_group_size, - file_meta_->prop.lbr_hash_bits); - if(!lbr_list_.back().DecodeFrom(input)) { - status_ = Status::Corruption("Cannot decode Link SST"); - return false; - } - link_sst_iter_->Next(); - } - current_value_.reset(); - lbr_initialized_ = true; - return true; - } - - // We assume there should be a lot of underlying SST for each LinkSST, so we - // could simply initialize all SST iterators before any iteration. - bool InitSecondLevelIterators() { - if(lbr_actived_) { - return true; - } - for(auto& lb: lbr_list_) { - if(!lb.ActiveLinkBlock()) { - return false; - } - } - lbr_actived_ = true; - return true; - } - - public: - bool Valid() const override { return !lbr_list_.empty() && valid_; } - void SeekToFirst() override { - if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { - return; - } - assert(!lbr_list_.empty()); - cur_lbr_idx_ = 0; - status_ = lbr_list_[cur_lbr_idx_].SeekToFirst(); - valid_ = true; - } - - void SeekToLast() override { - if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { - return; - } - assert(!lbr_list_.empty()); - cur_lbr_idx_ = lbr_list_.size() - 1; - status_ = lbr_list_[cur_lbr_idx_].SeekToLast(); - valid_ = true; - } - - // TODO(guokuankuan@bytendance.com) - // Is input target a InternalKey ? then what is the default sequence#? - void Seek(const Slice& target) override { - if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { - return; - } - // Find target LinkBlock's position - auto it = std::lower_bound(lbr_list_.begin(), lbr_list_.end(), target, - [&](const LinkBlockRecord& lbr, const Slice& target){ - return icomp_.Compare(lbr.MaxKey(), target) < 0; - }); - if(it == lbr_list_.end()) { - status_ = Status::NotFound(); - return; - } - cur_lbr_idx_ = it - lbr_list_.begin(); - // Do the Seek - status_ = lbr_list_[cur_lbr_idx_].Seek(target, icomp_); - valid_ = true; - } - - // Position at the first key at or before the target. - void SeekForPrev(const Slice& target) override { - if(!InitLinkBlockRecords() || !InitSecondLevelIterators()) { - status_ = Status::Corruption(); - return; - } - - // We adopt Seek & Prev semantics here - Seek(target); - - // If the position's key is equal to target, we are good to go. - if(status_.ok() && key() == target) { - return; - } - - // If the key is greater than target, then we need to `Prev` to the right - // place. - while(status_.ok() && key().compare(target) > 0) { - Prev(); - } - valid_ = true; - } - - void Next() override { - assert(Valid()); - while(cur_lbr_idx_ < lbr_list_.size()) { - auto s = lbr_list_[cur_lbr_idx_].Next(); - if(s.ok()) { - break; - } - - // If we cannot `Next()` current LBR properly, try next. - cur_lbr_idx_++; - if(cur_lbr_idx_ == lbr_list_.size()) { - break; - } - - assert(cur_lbr_idx_ < lbr_list_.size()); - // If next LBR has a valid smallest key, we should re-seek all iterators - // (which means the iterators' continuous may break) - auto next_lbr = lbr_list_[cur_lbr_idx_]; - if(next_lbr.HasSmallestKey()) { - next_lbr.Seek(next_lbr.SmallestKey(), icomp_); - } - } - - // No valid position found - if(cur_lbr_idx_ == lbr_list_.size()) { - status_ = Status::NotFound("End of iterator exceeded"); - return; - } - } - - void Prev() override { - assert(Valid()); - while(cur_lbr_idx_ >= 0) { - auto s = lbr_list_[cur_lbr_idx_].Prev(); - if(s.ok()) { - break; - } - // All items were consumed, exit. - if(cur_lbr_idx_ == 0) { - status_ = Status::NotFound("Not more previous items!"); - return; - } - - // If we cannot `Prev()` current LBR, try previous one, note that if current - // LBR has a valid smallest key, we should re-seek previous LBR. - cur_lbr_idx_--; - auto curr_lbr = lbr_list_[cur_lbr_idx_ + 1]; - auto prev_lbr = lbr_list_[cur_lbr_idx_]; - if(curr_lbr.HasSmallestKey()) { - prev_lbr.Seek(prev_lbr.MaxKey(), icomp_); - } - } - } - - Slice key() const override { - assert(Valid()); - return lbr_list_[cur_lbr_idx_].CurrentKey(); - } - - LazyBuffer value() const override { - assert(Valid()); - return lbr_list_[cur_lbr_idx_].CurrentValue(); - } - - // Find target key's underlying SST file number (exactly match) - // TODO (guokuankuan@bytedance.com) Maybe we could re-use Seek() here? - Status GetTargetFileNumber(const Slice& target, uint64_t* file_number) { - if(!InitLinkBlockRecords()) { - return Status::Corruption(); - } - // Find target LinkBlock's position - auto it = std::lower_bound(lbr_list_.begin(), lbr_list_.end(), target, - [&](const LinkBlockRecord& lbr, const Slice& target){ - return icomp_.Compare(lbr.MaxKey(), target) < 0; - }); - if(it == lbr_list_.end()) { - return Status::NotFound(); - } - - return it->GetFileNumber(target, file_number); - } - - Status status() const override { return status_; } + private: + const FileMetaData* file_meta_; + InternalIterator* link_sst_iter_{}; + InternalKeyComparator icomp_; + + IteratorCache iterator_cache_; + std::vector lbr_list_; + + Status status_; + uint32_t cur_lbr_idx_{}; + + // Some operations may invalid current iterator. + bool valid_ = true; + + bool lbr_initialized_ = false; + bool lbr_actived_ = false; + + // BinaryHeap, HeapVectorType> min_heap_; + + public: + LinkSstIterator(const FileMetaData* file_meta, InternalIterator* iter, + const DependenceMap& dependence_map, + const InternalKeyComparator& icomp, void* create_arg, + const IteratorCache::CreateIterCallback& create) + : file_meta_(file_meta), + link_sst_iter_(iter), + icomp_(icomp), + iterator_cache_(dependence_map, create_arg, create) { + if (file_meta != nullptr && !file_meta_->prop.is_map_sst()) { + abort(); + } + } + + ~LinkSstIterator() override = default; + + private: + // Init all LBR by decoding all LinkSST's value. + bool InitLinkBlockRecords() { + if (lbr_initialized_) { + return true; + } + LazyBuffer current_value_; + link_sst_iter_->SeekToFirst(); + while (link_sst_iter_->Valid()) { + current_value_ = link_sst_iter_->value(); + status_ = current_value_.fetch(); + if (!status_.ok()) { + status_ = Status::Corruption("Failed to fetch lazy buffer"); + return false; + } + Slice input = current_value_.slice(); + lbr_list_.emplace_back(&iterator_cache_, link_sst_iter_->key(), + file_meta_->prop.lbr_group_size, + file_meta_->prop.lbr_hash_bits); + if (!lbr_list_.back().DecodeFrom(input)) { + status_ = Status::Corruption("Cannot decode Link SST"); + return false; + } + link_sst_iter_->Next(); + } + current_value_.reset(); + lbr_initialized_ = true; + return true; + } + + // We assume there should be a lot of underlying SST for each LinkSST, so we + // could simply initialize all SST iterators before any iteration. + bool InitSecondLevelIterators() { + if (lbr_actived_) { + return true; + } + for (auto& lb : lbr_list_) { + if (!lb.ActiveLinkBlock()) { + return false; + } + } + lbr_actived_ = true; + return true; + } + + public: + bool Valid() const override { return !lbr_list_.empty() && valid_; } + void SeekToFirst() override { + if (!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + return; + } + assert(!lbr_list_.empty()); + cur_lbr_idx_ = 0; + status_ = lbr_list_[cur_lbr_idx_].SeekToFirst(); + valid_ = true; + } + + void SeekToLast() override { + if (!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + return; + } + assert(!lbr_list_.empty()); + cur_lbr_idx_ = lbr_list_.size() - 1; + status_ = lbr_list_[cur_lbr_idx_].SeekToLast(); + valid_ = true; + } + + // TODO(guokuankuan@bytendance.com) + // Is input target a InternalKey ? then what is the default sequence#? + void Seek(const Slice& target) override { + if (!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + return; + } + // Find target LinkBlock's position + auto it = + std::lower_bound(lbr_list_.begin(), lbr_list_.end(), target, + [&](const LinkBlockRecord& lbr, const Slice& target) { + return icomp_.Compare(lbr.MaxKey(), target) < 0; + }); + if (it == lbr_list_.end()) { + status_ = Status::NotFound(); + return; + } + cur_lbr_idx_ = it - lbr_list_.begin(); + // Do the Seek + status_ = lbr_list_[cur_lbr_idx_].Seek(target, icomp_); + valid_ = true; + } + + // Position at the first key at or before the target. + void SeekForPrev(const Slice& target) override { + if (!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + status_ = Status::Corruption(); + return; + } + + // We adopt Seek & Prev semantics here + Seek(target); + + // If the position's key is equal to target, we are good to go. + if (status_.ok() && key() == target) { + return; + } + + // If the key is greater than target, then we need to `Prev` to the right + // place. + while (status_.ok() && key().compare(target) > 0) { + Prev(); + } + valid_ = true; + } + + void Next() override { + assert(Valid()); + while (cur_lbr_idx_ < lbr_list_.size()) { + auto s = lbr_list_[cur_lbr_idx_].Next(); + if (s.ok()) { + break; + } + + // If we cannot `Next()` current LBR properly, try next. + cur_lbr_idx_++; + if (cur_lbr_idx_ == lbr_list_.size()) { + break; + } + + assert(cur_lbr_idx_ < lbr_list_.size()); + // If next LBR has a valid smallest key, we should re-seek all iterators + // (which means the iterators' continuous may break) + auto& next_lbr = lbr_list_[cur_lbr_idx_]; + if (next_lbr.HasSmallestKey()) { + next_lbr.Seek(next_lbr.SmallestKey(), icomp_); + } + } + + // No valid position found + if (cur_lbr_idx_ == lbr_list_.size()) { + status_ = Status::NotFound("End of iterator exceeded"); + return; + } + } + + void Prev() override { + assert(Valid()); + while (cur_lbr_idx_ >= 0) { + auto s = lbr_list_[cur_lbr_idx_].Prev(); + if (s.ok()) { + break; + } + // All items were consumed, exit. + if (cur_lbr_idx_ == 0) { + status_ = Status::NotFound("Not more previous items!"); + return; + } + + // If we cannot `Prev()` current LBR, try previous one, note that if + // current LBR has a valid smallest key, we should re-seek previous LBR. + cur_lbr_idx_--; + auto& curr_lbr = lbr_list_[cur_lbr_idx_ + 1]; + auto& prev_lbr = lbr_list_[cur_lbr_idx_]; + if (curr_lbr.HasSmallestKey()) { + prev_lbr.Seek(prev_lbr.MaxKey(), icomp_); + } + } + } + + Slice key() const override { + assert(Valid()); + return lbr_list_[cur_lbr_idx_].CurrentKey(); + } + + LazyBuffer value() const override { + assert(Valid()); + return lbr_list_[cur_lbr_idx_].CurrentValue(); + } + + // Find target key's underlying SST file number (exactly match) + // TODO (guokuankuan@bytedance.com) Maybe we could re-use Seek() here? + Status GetTargetFileNumber(const Slice& target, uint64_t* file_number) { + if (!InitLinkBlockRecords()) { + return Status::Corruption(); + } + // Find target LinkBlock's position + auto it = + std::lower_bound(lbr_list_.begin(), lbr_list_.end(), target, + [&](const LinkBlockRecord& lbr, const Slice& target) { + return icomp_.Compare(lbr.MaxKey(), target) < 0; + }); + if (it == lbr_list_.end()) { + return Status::NotFound(); + } + + return it->GetFileNumber(target, file_number); + } + + Status status() const override { return status_; } }; // Return a new two level iterator. A two-level iterator contains an