diff --git a/db/compaction.h b/db/compaction.h index a0790c0b26..955ab7a467 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -77,6 +77,7 @@ enum CompactionType { kKeyValueCompaction = 0, kMapCompaction = 1, kGarbageCollection = 2, + kLinkCompaction = 3, }; struct CompactionParams { diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 3dd0ef25f5..b9c24449e8 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -170,7 +170,7 @@ CompactionIterator::CompactionIterator( level_ptrs_ = std::vector(compaction_->number_levels(), 0); } - if (snapshots_->size() == 0) { + if (snapshots_->empty()) { // optimize for fast path if there are no snapshots visible_at_tip_ = true; earliest_snapshot_ = kMaxSequenceNumber; diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index c799937cdf..c4d55ce929 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -22,7 +23,28 @@ namespace TERARKDB_NAMESPACE { -class CompactionIterator { +class CompactionIteratorBase { + public: + virtual ~CompactionIteratorBase() = 0; + + virtual void ResetRecordCounts() = 0; + + virtual void SeekToFirst() = 0; + virtual void Next() = 0; + + // Getters + virtual const Slice& key() const = 0; + virtual const LazyBuffer& value() const = 0; + virtual const Status& status() const = 0; + virtual const ParsedInternalKey& ikey() const = 0; + virtual bool Valid() const = 0; + virtual const Slice& user_key() const = 0; + virtual const CompactionIterationStats& iter_stats() const = 0; + // for KV Separation or LinkSST, current value may depend on other file numbers + virtual const std::vector depended_file_numbers() const = 0; +}; + +class CompactionIterator: public CompactionIteratorBase { public: friend class CompactionIteratorToInternalIterator; @@ -96,30 +118,34 @@ class CompactionIterator { const SequenceNumber preserve_deletes_seqnum = 0, const chash_set* b = nullptr); - ~CompactionIterator(); + ~CompactionIterator() override; - void ResetRecordCounts(); + void ResetRecordCounts() override; // Seek to the beginning of the compaction iterator output. // // REQUIRED: Call only once. - void SeekToFirst(); + void SeekToFirst() override; // Produces the next record in the compaction. // // REQUIRED: SeekToFirst() has been called. - void Next(); + void Next() override; // Getters - const Slice& key() const { return key_; } - const LazyBuffer& value() const { return value_; } - const Status& status() const { return status_; } - const ParsedInternalKey& ikey() const { return ikey_; } - bool Valid() const { return valid_; } - const Slice& user_key() const { return current_user_key_; } - const CompactionIterationStats& iter_stats() const { return iter_stats_; } + const Slice& key() const override { return key_; } + const LazyBuffer& value() const override { return value_; } + const Status& status() const override { return status_; } + const ParsedInternalKey& ikey() const override { return ikey_; } + bool Valid() const override { return valid_; } + const Slice& user_key() const override { return current_user_key_; } + const CompactionIterationStats& iter_stats() const override { return iter_stats_; } void SetFilterSampleInterval(size_t filter_sample_interval); + virtual const std::vector depended_file_numbers() const { + return {value_.file_number()}; + } + private: // Processes the input stream to find the next output void NextFromInput(); @@ -236,6 +262,120 @@ class CompactionIterator { } }; +// A LinkCompactionIterator takes a ordinary CompactionIterator as its input and +// iterate all KVs block by block (refer to LinkBlockRecord). +class LinkCompactionIterator: public CompactionIteratorBase { + public: + // @param c_iter Original compaction iterator that combines all underlying SSTs + // @param group_sz The total number of the KV items that a LinkBlock contains. + // @param hash_bits Total bits a key in LinkBlock will use. + explicit LinkCompactionIterator(std::unique_ptr c_iter, + IteratorCache* iterator_cache, + int group_sz, + int hash_bits) + : c_iter_(std::move(c_iter)), + iterator_cache_(iterator_cache), + group_sz_(group_sz), + hash_bits_(hash_bits) {} + + ~LinkCompactionIterator() override { + c_iter_.reset(); + } + + void ResetRecordCounts() override { + c_iter_->ResetRecordCounts(); + } + + // We need to construct the first LinkBlockRecord here. + void SeekToFirst() override { + c_iter_->SeekToFirst(); + status_ = c_iter_->status(); + if(!status_.ok()) { + return; + } + Next(); + } + + // We need to step forward N(LinkBlockRecord's size) times to make sure we can + // get the next LinkBlockRecord(LBR) + // @see LinkSstIterator + void Next() override { + // Each time we proceed to next LBR, we refill underlying file numbers. + file_numbers_.clear(); + + // Obtain the original KV pairs + Slice max_key; + Slice smallest_key; // TODO need to check if we need smallest key + std::vector hash_values; + for(int i = 0; i < group_sz_ && c_iter_->Valid(); ++i) { + const LazyBuffer& value = c_iter_->value(); + hash_values.emplace_back(LinkBlockRecord::hash(c_iter_->user_key(), hash_bits_)); + file_numbers_.emplace_back(value.file_number()); + if(i == group_sz_ - 1) { + max_key = c_iter_->key(); + } + Next(); + if(!c_iter_->status().ok()) { + status_ = c_iter_->status(); + return; + } + } + curr_lbr_ = std::make_unique(iterator_cache_, max_key, + group_sz_, hash_bits_); + curr_lbr_->Encode(file_numbers_, hash_values, smallest_key); + + // Parse internal key + const Slice& key = curr_lbr_->MaxKey(); + if(!ParseInternalKey(key, &ikey_)) { + status_ = Status::Incomplete("Cannot decode internal key"); + } + } + + // Obtain the LBR's key, aka the largest key of the block range. + const Slice& key() const override { + return curr_lbr_->MaxKey(); + } + + const LazyBuffer& value() const override { + return curr_lbr_->EncodedValue(); + } + + const Status& status() const override { + return status_; + } + + const ParsedInternalKey& ikey() const override { + return ikey_; + } + + bool Valid() const override { + return status_.ok(); + } + + const Slice& user_key() const override { + return c_iter_->user_key(); + } + + const CompactionIterationStats& iter_stats() const override { + return c_iter_->iter_stats(); + } + + const std::vector depended_file_numbers() const { + return file_numbers_; + } + + private: + Status status_; + std::unique_ptr curr_lbr_; + ParsedInternalKey ikey_; + std::vector file_numbers_; + + std::unique_ptr c_iter_; + IteratorCache* iterator_cache_; + int group_sz_ = 0; + int hash_bits_ = 0; +}; + InternalIterator* NewCompactionIterator( CompactionIterator* (*new_compaction_iter_callback)(void*), void* arg, const Slice* start_user_key = nullptr); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index aec992c326..cd5c7bd1be 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -149,7 +149,7 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) { // Maintains state for each sub-compaction struct CompactionJob::SubcompactionState { const Compaction* compaction; - std::unique_ptr c_iter; + std::unique_ptr c_iter; // The boundaries of the key-range this compaction is interested in. No two // subcompactions may have overlapping key-ranges. @@ -304,7 +304,9 @@ struct CompactionJob::SubcompactionState { } struct RebuildBlobsInfo { + // File numbers chash_set blobs; + // pop_count = planned file count - actual used file count. size_t pop_count; }; struct BlobRefInfo { @@ -1493,6 +1495,9 @@ void CompactionJob::ProcessCompaction(SubcompactionState* sub_compact) { case kGarbageCollection: ProcessGarbageCollection(sub_compact); break; + case kLinkCompaction: + ProcessLinkCompaction(sub_compact); + break; default: assert(false); break; @@ -1580,34 +1585,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { snapshot_checker_, compact_->compaction->level(), db_options_.statistics.get(), shutting_down_); - struct BuilderSeparateHelper : public SeparateHelper { - SeparateHelper* separate_helper = nullptr; - std::unique_ptr value_meta_extractor; - Status (*trans_to_separate_callback)(void* args, const Slice& key, - LazyBuffer& value) = nullptr; - void* trans_to_separate_callback_args = nullptr; - - Status TransToSeparate(const Slice& internal_key, LazyBuffer& value, - const Slice& meta, bool is_merge, - bool is_index) override { - return SeparateHelper::TransToSeparate( - internal_key, value, value.file_number(), meta, is_merge, is_index, - value_meta_extractor.get()); - } - - Status TransToSeparate(const Slice& key, LazyBuffer& value) override { - if (trans_to_separate_callback == nullptr) { - return Status::NotSupported(); - } - return trans_to_separate_callback(trans_to_separate_callback_args, key, - value); - } + BuilderSeparateHelper separate_helper; - LazyBuffer TransToCombined(const Slice& user_key, uint64_t sequence, - const LazyBuffer& value) const override { - return separate_helper->TransToCombined(user_key, sequence, value); - } - } separate_helper; if (compact_->compaction->immutable_cf_options() ->value_meta_extractor_factory != nullptr) { ValueExtractorContext context = {cfd->GetID()}; @@ -1682,14 +1661,26 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { status = Status::OK(); } - sub_compact->c_iter.reset(new CompactionIterator( + auto it = std::make_unique( input.get(), &separate_helper, end, cfd->user_comparator(), &merge, versions_->LastSequence(), &existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false, &range_del_agg, sub_compact->compaction, mutable_cf_options->get_blob_config(), compaction_filter, shutting_down_, preserve_deletes_seqnum_, - &rebuild_blobs_info.blobs)); + &rebuild_blobs_info.blobs); + + // For LinkCompaction, we wrap the old compaction iterator + if(!sub_compact->compaction->immutable_cf_options()->enable_link_compaction) { + sub_compact->c_iter = std::move(it); + } else { + // TODO iterator_cache cannot be null + sub_compact->c_iter = std::make_unique( + std::move(it), nullptr, mutable_cf_options->lbr_group_sz, + mutable_cf_options->lbr_hash_bits); + } + it.reset(); + auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); @@ -1765,6 +1756,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (!sub_compact->compaction->partial_compaction()) { dict_sample_data.reserve(kSampleBytes); } + + // Represents how many records in target blob SST that are needed by the key + // SST: std::unordered_map dependence; size_t yield_count = 0; @@ -1773,6 +1767,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // returns true. const Slice& key = c_iter->key(); const LazyBuffer& value = c_iter->value(); + // For KV separation, we need to record all dependencies SSTs if (c_iter->ikey().type == kTypeValueIndex || c_iter->ikey().type == kTypeMergeIndex) { assert(value.file_number() != uint64_t(-1)); @@ -1781,6 +1776,17 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { ++ib.first->second; } } + // For LinkSSTs, the dependency decoding is slightly different + // TODO(guokuankuan@bytedance.com) We didn't handle MergeIndex yet. + if(c_iter->ikey().type == kTypeLinkIndex) { + // Decode value to find out all underlying SST file numbers. + for(const auto& fn: c_iter->depended_file_numbers()) { + auto ib = dependence.emplace(fn, 1); + if (!ib.second) { + ++ib.first->second; + } + } + } assert(end == nullptr || cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0); @@ -1824,6 +1830,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (!status.ok()) { break; } + // For compression for (const auto& data_elmt : {key, value.slice()}) { size_t data_end_offset = data_begin_offset + data_elmt.size(); while (sample_begin_offset_iter != sample_begin_offsets.cend() && @@ -2036,7 +2043,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { sub_compact->c_iter.reset(); input.reset(); sub_compact->status = status; -} // namespace TERARKDB_NAMESPACE +} + +void CompactionJob::ProcessLinkCompaction(SubcompactionState* sub_compact) { + assert(sub_compact != nullptr); + return ProcessKeyValueCompaction(sub_compact); +} void CompactionJob::ProcessGarbageCollection(SubcompactionState* sub_compact) { assert(sub_compact != nullptr); diff --git a/db/compaction_job.h b/db/compaction_job.h index e7b78eb4d2..2a640d7876 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -64,6 +64,38 @@ class VersionEdit; class VersionSet; class CompactionJob { + public: + + class BuilderSeparateHelper : public SeparateHelper { + public: + SeparateHelper* separate_helper = nullptr; + std::unique_ptr value_meta_extractor; + Status (*trans_to_separate_callback)(void* args, const Slice& key, + LazyBuffer& value) = nullptr; + void* trans_to_separate_callback_args = nullptr; + + Status TransToSeparate(const Slice& internal_key, LazyBuffer& value, + const Slice& meta, bool is_merge, + bool is_index) override { + return SeparateHelper::TransToSeparate( + internal_key, value, value.file_number(), meta, is_merge, is_index, + value_meta_extractor.get()); + } + + Status TransToSeparate(const Slice& key, LazyBuffer& value) override { + if (trans_to_separate_callback == nullptr) { + return Status::NotSupported(); + } + return trans_to_separate_callback(trans_to_separate_callback_args, key, + value); + } + + LazyBuffer TransToCombined(const Slice& user_key, uint64_t sequence, + const LazyBuffer& value) const override { + return separate_helper->TransToCombined(user_key, sequence, value); + } + }; + public: CompactionJob(int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, @@ -123,6 +155,7 @@ class CompactionJob { // kv-pairs void ProcessCompaction(SubcompactionState* sub_compact); void ProcessKeyValueCompaction(SubcompactionState* sub_compact); + void ProcessLinkCompaction(SubcompactionState* sub_compact); void ProcessGarbageCollection(SubcompactionState* sub_compact); Status FinishCompactionOutputFile( diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 00b0887ab7..4bfe2cff9e 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -2200,6 +2200,13 @@ class LevelCompactionBuilder { // Pick and return a compaction. Compaction* PickCompaction(); + // Gather target files (including blob SST(KV separated) and ordinary SST) and + // composite a compaction for future use. + // A Link Compaction contains two major objectives: + // (1) Link target SST files into a logical linked sst + // (2) GC old linked SST files + Compaction* PickLinkCompaction(); + // Pick lazy compaction Compaction* PickLazyCompaction(const std::vector& snapshots); @@ -2298,7 +2305,10 @@ void LevelCompactionBuilder::SetupInitialFiles() { // In these cases, to reduce L0 file count and thus reduce likelihood // of write stalls, we can attempt compacting a span of files within // L0. - if (PickIntraL0Compaction()) { + // + // If link compaction is enabled, we should skip L0 internal compaction + // since the link should be finished quite fast. + if (!ioptions_.enable_link_compaction && PickIntraL0Compaction()) { output_level_ = 0; compaction_reason_ = CompactionReason::kLevelL0FilesNum; break; @@ -2431,6 +2441,14 @@ Compaction* LevelCompactionBuilder::PickCompaction() { return c; } +// TODO (guokuankuan@bytedance.com) +// We could reuse the ordinary compaction picker at the moment, but sooner we should +// pick link compactions smarter. +Compaction* LevelCompactionBuilder::PickLinkCompaction() { + compaction_type_ = CompactionType::kLinkCompaction; + return PickCompaction(); +} + Compaction* LevelCompactionBuilder::PickLazyCompaction( const std::vector& snapshots) { using SortedRun = CompactionPicker::SortedRun; @@ -2971,7 +2989,7 @@ bool LevelCompactionBuilder::PickFileToCompact() { // store where to start the iteration in the next call to PickCompaction vstorage_->SetNextCompactionIndex(start_level_, cmp_idx); - return start_level_inputs_.size() > 0; + return !start_level_inputs_.empty(); } bool LevelCompactionBuilder::PickIntraL0Compaction() { @@ -2999,6 +3017,8 @@ Compaction* LevelCompactionPicker::PickCompaction( mutable_cf_options, ioptions_); if (ioptions_.enable_lazy_compaction) { return builder.PickLazyCompaction(snapshots); + } else if (ioptions_.enable_link_compaction) { + return builder.PickLinkCompaction(); } else { return builder.PickCompaction(); } diff --git a/db/db_impl.h b/db/db_impl.h index 66c51a6c3f..a480390be2 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -1258,6 +1258,7 @@ class DBImpl : public DB { void BackgroundCallGarbageCollection(); void BackgroundCallFlush(); void BackgroundCallPurge(); + Status BackgroundCompaction(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer, PrepickedCompaction* prepicked_compaction); diff --git a/db/dbformat.h b/db/dbformat.h index 7861edeec6..31686e096e 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -64,6 +64,9 @@ enum ValueType : unsigned char { // generated by WriteUnprepared write policy is not mistakenly read by // another. kTypeBeginUnprepareXID = 0x13, // WAL only. + // Similar to kTypeValueIndex, this means current value belongs to + // a LinkSST and pointed to the actual value file. + kTypeLinkIndex = 0x14, // LinkSST only kMaxValue = 0x7F // Not used for storing records. }; @@ -784,6 +787,12 @@ struct ParsedInternalKeyComparator { const InternalKeyComparator* cmp; }; +// This interface is used for transferring data format between KV separated and +// combined kv pairs. +// We should add an implementation instance while processing key value pairs +// during compactions. +// Note that, we also port `Version` to this interface since we may need to fetch +// value from any version that has separated values. class SeparateHelper { public: virtual ~SeparateHelper() = default; diff --git a/db/table_cache.cc b/db/table_cache.cc index 75eee2d558..3d1827c3eb 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -85,7 +85,7 @@ static bool InheritanceMismatch(const FileMetaData& sst_meta, return true; } -// Store params for create depend table iterator in future +// Store params for create dependency table iterator in future class LazyCreateIterator : public Snapshot { TableCache* table_cache_; ReadOptions options_; // deep copy @@ -292,7 +292,7 @@ InternalIterator* TableCache::NewIterator( } size_t readahead = 0; bool record_stats = !for_compaction; - if (file_meta.prop.is_map_sst()) { + if (file_meta.prop.is_map_sst() || file_meta.prop.is_link_sst()) { record_stats = false; } else { // MapSST don't handle these @@ -340,15 +340,9 @@ InternalIterator* TableCache::NewIterator( } InternalIterator* result = nullptr; if (s.ok()) { - if (!file_meta.prop.is_map_sst()) { - if (options.table_filter && - !options.table_filter(*table_reader->GetTableProperties())) { - result = NewEmptyInternalIterator(arena); - } else { - result = table_reader->NewIterator(options, prefix_extractor, arena, - skip_filters, for_compaction); - } - } else { + // For map & linked SST, we should expand their underlying key value pairs, + // not simply iterate the input SST key values. + if(file_meta.prop.is_map_sst() || file_meta.prop.is_link_sst()) { ReadOptions map_options = options; map_options.total_order_seek = true; map_options.readahead_size = 0; @@ -356,6 +350,7 @@ InternalIterator* TableCache::NewIterator( table_reader->NewIterator(map_options, prefix_extractor, arena, skip_filters, false /* for_compaction */); if (!dependence_map.empty()) { + // Map SST will handle range deletion internally, so we can skip here. bool ignore_range_deletions = options.ignore_range_deletions || file_meta.prop.map_handle_range_deletions(); @@ -365,33 +360,54 @@ InternalIterator* TableCache::NewIterator( lazy_create_iter = new (buffer) LazyCreateIterator( this, options, env_options, range_del_agg, prefix_extractor, for_compaction, skip_filters, ignore_range_deletions, level); - } else { lazy_create_iter = new LazyCreateIterator( this, options, env_options, range_del_agg, prefix_extractor, for_compaction, skip_filters, ignore_range_deletions, level); } - auto map_sst_iter = NewMapSstIterator( - &file_meta, result, dependence_map, ioptions_.internal_comparator, - lazy_create_iter, c_style_callback(*lazy_create_iter), arena); + + // For map & linked sst, we should expand their dependencies and merge + // all related iterators into one combined iterator for further reads. + InternalIterator* sst_iter = nullptr; + + if(file_meta.prop.is_map_sst()) { + sst_iter = NewMapSstIterator( + &file_meta, result, dependence_map, ioptions_.internal_comparator, + lazy_create_iter, c_style_callback(*lazy_create_iter), arena); + } else { + assert(file_meta.prop.is_link_sst()); + sst_iter = NewLinkSstIterator( + &file_meta, result, dependence_map, ioptions_.internal_comparator, + lazy_create_iter, c_style_callback(*lazy_create_iter), arena); + } + if (arena != nullptr) { - map_sst_iter->RegisterCleanup( + sst_iter->RegisterCleanup( [](void* arg1, void* arg2) { static_cast(arg1)->~InternalIterator(); static_cast(arg2)->~LazyCreateIterator(); }, result, lazy_create_iter); } else { - map_sst_iter->RegisterCleanup( + sst_iter->RegisterCleanup( [](void* arg1, void* arg2) { delete static_cast(arg1); delete static_cast(arg2); }, result, lazy_create_iter); } - result = map_sst_iter; + result = sst_iter; + } + } else { + if (options.table_filter && + !options.table_filter(*table_reader->GetTableProperties())) { + result = NewEmptyInternalIterator(arena); + } else { + result = table_reader->NewIterator(options, prefix_extractor, arena, + skip_filters, for_compaction); } } + if (create_new_table_reader) { assert(handle == nullptr); result->RegisterCleanup(&DeleteTableReader, table_reader, @@ -473,11 +489,42 @@ Status TableCache::Get(const ReadOptions& options, if (s.ok()) { t->UpdateMaxCoveringTombstoneSeq(options, ExtractUserKey(k), get_context->max_covering_tombstone_seq()); - if (!file_meta.prop.is_map_sst()) { + if (!file_meta.prop.is_map_sst() && !file_meta.prop.is_link_sst()) { + // For ordinary SST we could read directly from the TableReader s = t->Get(options, k, get_context, prefix_extractor, skip_filters); } else if (dependence_map.empty()) { + // TODO (guokuankuan@bytedance.com) What shall we do to the ordinary SSTs during LnkSST GC? + // Both MapSST & LinkSST need the data from dependence_map s = Status::Corruption( "TableCache::Get: Composite sst depend files missing"); + } else if(file_meta.prop.is_link_sst()) { + ReadOptions forward_options = options; + forward_options.ignore_range_deletions |= + file_meta.prop.link_handle_range_deletions(); + + // Find target LinkBlockRecord & check LBR's key hash to determine which ordinary SST we + // should look into + Arena arena; + uint64_t file_number; + LinkSstIterator* iter_ptr = reinterpret_cast + (t->NewIterator(ReadOptions(), prefix_extractor, &arena)); + std::unique_ptr link_sst_iter(iter_ptr); + s = link_sst_iter->GetTargetFileNumber(k, &file_number); + + // Forward Get() to target ordinary SST + if(s.ok()) { + auto find = dependence_map.find(file_number); + if (find == dependence_map.end()) { + s = Status::Corruption("Link sst dependence missing"); + } else { + assert(find->second->fd.GetNumber() == file_number); + s = Get(forward_options, *find->second, dependence_map, k, + get_context, prefix_extractor, file_read_hist, skip_filters, + level, inheritance); + } + } + // TODO (guokuankuan@bytedance.com) shall we recover with min_seq_backup? + // get_context->SetMinSequenceAndType(min_seq_type_backup); } else { // Forward query to target sst ReadOptions forward_options = options; diff --git a/db/table_cache.h b/db/table_cache.h index 6659a9cb85..46f0328110 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -25,6 +25,7 @@ #include "rocksdb/table.h" #include "rocksdb/terark_namespace.h" #include "table/table_reader.h" +#include "table/two_level_iterator.h" #include "util/iterator_cache.h" namespace TERARKDB_NAMESPACE { diff --git a/db/version_edit.h b/db/version_edit.h index 86049f46ce..51bcfc3c9a 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -85,6 +85,7 @@ struct TablePropertyCache { kMapHandleRangeDeletions = 1ULL << 0, kHasSnapshots = 1ULL << 1, kNoRangeDeletions = 1ULL << 2, + kLinkHandleRangeDeletions = 3ULL << 0, }; uint64_t num_entries = 0; // the number of entries. uint64_t num_deletions = 0; // the number of deletion entries. @@ -98,12 +99,18 @@ struct TablePropertyCache { std::vector inheritance; // inheritance set uint64_t earliest_time_begin_compact = port::kMaxUint64; uint64_t latest_time_end_compact = port::kMaxUint64; + uint32_t lbr_hash_bits = 11; // hash bits for each LinkSST KV + uint32_t lbr_group_size = 16; // Group size for LinkBlockRecord bool is_map_sst() const { return purpose == kMapSst; } + bool is_link_sst() const {return purpose == kLinkSst; } bool has_range_deletions() const { return (flags & kNoRangeDeletions) == 0; } bool map_handle_range_deletions() const { return (flags & kMapHandleRangeDeletions) != 0; } + bool link_handle_range_deletions() const { + return (flags & kLinkHandleRangeDeletions) != 0; + } bool has_snapshots() const { return (flags & kHasSnapshots) != 0; } }; diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 6df82cd088..e5dbfc88c9 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -216,6 +216,11 @@ struct AdvancedColumnFamilyOptions { // LazyCompaction, default false bool enable_lazy_compaction = false; + // Link Compaciton is a replacement policy to general compactions. + // It will not physically compacts target SST files immediately but instead link them logically. + // The actually physical compaction will be triggered in the background. + bool enable_link_compaction = false; + // Read TableProperties from file if false bool pin_table_properties_in_reader = true; diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index afe5e012f6..a985b4ddab 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -200,6 +200,10 @@ class CompactionFilter // Determines whether value changed by compaction filter were stable. // Default as false, which means stability of outcome is not promised. + // "Stable" means the changed value will not change after the same + // operation is applied multiple times. + // Creators of the compaction filter should override this function, or + // the behavior of the stability checking is undefined. virtual bool IsStableChangeValue() const { return false; } // Returns a name that identifies this compaction filter. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 110f5ec52a..a8e546d56f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -88,6 +88,7 @@ enum SstPurpose { kEssenceSst, // Actual data storage sst kLogSst, // Log as sst kMapSst, // Dummy sst + kLinkSst, // Link SST }; struct Options; diff --git a/options/cf_options.h b/options/cf_options.h index 8aa25aae6f..eae706fe4b 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -53,6 +53,8 @@ struct ImmutableCFOptions { bool enable_lazy_compaction; + bool enable_link_compaction; + bool pin_table_properties_in_reader; bool inplace_update_support; @@ -237,6 +239,10 @@ struct MutableCFOptions { std::vector max_bytes_for_level_multiplier_additional; CompactionOptionsUniversal compaction_options_universal; + // Link Compaction related options + uint32_t lbr_group_sz = 10; + uint32_t lbr_hash_bits = 7; + // Misc options uint64_t max_sequential_skip_in_iterations; bool paranoid_file_checks; diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index 50c000f5b0..8c3049b8a3 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -9,7 +9,6 @@ #include "table/two_level_iterator.h" -#include "db/version_edit.h" #include "rocksdb/options.h" #include "rocksdb/terark_namespace.h" #include "table/block.h" @@ -214,21 +213,18 @@ class MapSstIterator final : public InternalIterator { int include_largest_; std::vector link_; struct HeapElement { - InternalIterator* iter; + InternalIterator* iter{}; Slice key; }; template class HeapComparator { public: HeapComparator(const InternalKeyComparator& comparator) : c_(comparator) {} - bool operator()(const HeapElement& a, const HeapElement& b) const { return is_less ? c_.Compare(a.key, b.key) < 0 : c_.Compare(a.key, b.key) > 0; } - const InternalKeyComparator& internal_comparator() const { return c_; } - private: const InternalKeyComparator& c_; }; @@ -488,7 +484,7 @@ class MapSstIterator final : public InternalIterator { if (min_heap_.empty() || icomp.Compare(min_heap_.top().key, largest_key_) >= include_largest_) { - // out of largest bound + // out of the largest bound first_level_value_.reset(); first_level_iter_->Next(); if (InitFirstLevelIter()) { @@ -521,7 +517,7 @@ class MapSstIterator final : public InternalIterator { if (max_heap_.empty() || icomp.Compare(smallest_key_, max_heap_.top().key) >= include_smallest_) { - // out of smallest bound + // out of the smallest bound first_level_value_.reset(); first_level_iter_->Prev(); if (InitFirstLevelIter()) { @@ -544,7 +540,7 @@ class MapSstIterator final : public InternalIterator { virtual Status status() const override { return status_; } }; -} // namespace +} InternalIteratorBase* NewTwoLevelIterator( TwoLevelIteratorState* state, @@ -552,6 +548,23 @@ InternalIteratorBase* NewTwoLevelIterator( return new TwoLevelIndexIterator(state, first_level_iter); } +InternalIterator* NewLinkSstIterator( + const FileMetaData* file_meta, InternalIterator* mediate_sst_iter, + const DependenceMap& dependence_map, const InternalKeyComparator& icomp, + void* callback_arg, const IteratorCache::CreateIterCallback& create_iter, + Arena* arena) { + assert(file_meta == nullptr || file_meta->prop.is_link_sst()); + if (arena == nullptr) { + return new LinkSstIterator(file_meta, mediate_sst_iter, dependence_map, + icomp, callback_arg, create_iter); + } else { + void* buffer = arena->AllocateAligned(sizeof(LinkSstIterator)); + return new (buffer) + LinkSstIterator(file_meta, mediate_sst_iter, dependence_map, icomp, + callback_arg, create_iter); + } +} + InternalIterator* NewMapSstIterator( const FileMetaData* file_meta, InternalIterator* mediate_sst_iter, const DependenceMap& dependence_map, const InternalKeyComparator& icomp, diff --git a/table/two_level_iterator.h b/table/two_level_iterator.h index 3a967bc3ea..9640547d1e 100644 --- a/table/two_level_iterator.h +++ b/table/two_level_iterator.h @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once + +#include "db/version_edit.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/terark_namespace.h" @@ -28,6 +30,560 @@ struct TwoLevelIteratorState { const BlockHandle& handle) = 0; }; +// Each LinkSST contains a list of LinkBlock, each LinkBlock contains a certain +// number of file_numbers which indicate where the KV pairs are placed. +// +// Encoded value layout: +// [file_numbers] block_sz * uint64_t (variant) +// [hash_values] byte aligned (block_sz * hash_bits) +// [smallest key] length prefixed slice (optional) +// +class LinkBlockRecord { + public: + // TODO(guokuankuan@bytedance.com) + // Hash a user key into an integer, limit the maximum bits. + static uint32_t hash(const Slice& user_key, int max_bits) { return 0; } + + public: + // @param key The largest internal key of all items in current LinkBlock + // @param group_sz Total number of items inside a LinkBlock + LinkBlockRecord(IteratorCache* iterator_cache, const Slice& key, int group_sz, + int hash_bits) + : iterator_cache_(iterator_cache), + max_key_(key), + group_sz_(group_sz), + hash_bits_(hash_bits) { + key_buffer_.resize(group_sz); + } + ~LinkBlockRecord() {} + + // TODO(guokuankuan@bytedance.com) + // Encode current LinkBlock into slice, so we can put it into a LinkSST. + void Encode(std::vector file_numbers, + std::vector hash_values, const Slice& smallest_key) { + // TODO remember to set file_number + + encoded_value = LazyBuffer(); + return; + } + + // Decode from a LinkSST's value. + bool DecodeFrom(Slice& input) { + // Decode all file numbers for each KV pair + for (int i = 0; i < group_sz_; ++i) { + uint64_t file_number = 0; + if (!GetVarint64(&input, &file_number)) { + return false; + } + file_numbers_.emplace_back(file_number); + } + assert(file_numbers_.size() == group_sz_); + // Decode hashed values + int total_bits = group_sz_ * hash_bits_; + int total_bytes = total_bits % 8 == 0 ? total_bits / 8 : total_bits / 8 + 1; + assert(input.size() > total_bytes); + assert(hash_bits_ <= 32); + for (int i = 0; i < group_sz_; ++i) { + // TODO(guokuankuan@bytedance.com) Add some UT for this function. + // convert bit represent into uint32 hash values. + int start_pos = i * hash_bits_; + int start_bytes = start_pos / 8; + int end_bytes = (start_pos + hash_bits_) / 8; + uint32_t hash = 0; + memcpy((char*)&hash, input.data() + start_bytes, + end_bytes - start_bytes + 1); + hash << (start_pos % 8); + hash >> ((7 - (start_pos + hash_bits_) % 8) + (start_pos % 8)); + hash_values_.emplace_back(hash); + } + // Decode optional smallest key + if (!GetLengthPrefixedSlice(&input, &smallest_key_)) { + return false; + } + return true; + } + + // There should be only one active LinkBlock during the Link SST iteration. + // Here we load all underlying iterators within current LinkBlock and reset + // iter_idx for further sue. + bool ActiveLinkBlock() { + for (int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[i]); + if (!it->status().ok()) { + return false; + } + } + iter_idx_ = 0; + return true; + } + + // Seek all iterators to their first item. + Status SeekToFirst() { + for (int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[i]); + it->SeekToFirst(); + if (!it->status().ok()) { + return it->status(); + } + } + iter_idx_ = 0; + return Status::OK(); + } + + // Seek all iterators to their last item. + Status SeekToLast() { + assert(group_sz_ == file_numbers_.size()); + for (int i = 0; i < group_sz_; ++i) { + auto it = iterator_cache_->GetIterator(file_numbers_[i]); + it->SeekToLast(); + if (!it->status().ok()) { + return it->status(); + } + } + iter_idx_ = group_sz_ - 1; + return Status::OK(); + } + + // Get the underlying SST's file number of target key + Status GetFileNumber(const Slice& target, uint64_t* file_number) { + uint32_t target_hash = hash(target, hash_bits_); + for (int i = 0; i < group_sz_; ++i) { + // hash collision may happen, so we should double check the key's content. + if (target_hash == hash_values_[i]) { + auto it = iterator_cache_->GetIterator(file_numbers_[i]); + it->Seek(target); + if (!it->status().ok() || it->key().compare(target) != 0) { + continue; + } + *file_number = file_numbers_[i]; + return Status::OK(); + } + } + return Status::NotFound("Target key is not exist"); + } + + // Seek inside a LinkBlock and will fetch & buffer each key we touched + Status Seek(const Slice& target, const InternalKeyComparator& icomp) { + uint32_t target_hash = hash(target, hash_bits_); + // Lower bound search for target key + uint32_t left = 0; + uint32_t right = group_sz_; + while (left < right) { + uint32_t mid = left + (right - left) / 2; + auto key = buffered_key(mid); + // TODO (guokuankuan@bytedance.com) Shall we check key's hash value here? + if (icomp.Compare(key, target) >= 0) { + right = mid; + } else { + left = mid + 1; + } + } + + if (left < group_sz_) { + iter_idx_ = left; + // We should seek all related iterators within a LinkBlockRecord + std::set unique_file_numbers(file_numbers_.begin(), + file_numbers_.end()); + for (const auto& fn : unique_file_numbers) { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx_]); + it->Seek(target); + if (!it->status().ok()) { + return it->status(); + } + } + return Status::OK(); + } else { + iter_idx_ = -1; + return Status::Corruption(); + } + } + + // Move current iterator to next position, will skip all invalid records + // (hash = 0) + // If all subsequent items are invalid, return an error status. + Status Next() { + // Find the next valid position of current LinkBlock + int next_iter_idx = iter_idx_ + 1; + while (next_iter_idx < hash_values_.size()) { + // Move forward target iterator + auto it = iterator_cache_->GetIterator(file_numbers_[next_iter_idx]); + it->Next(); + if (!it->status().ok()) { + return it->status(); + } + if (hash_values_[next_iter_idx] != INVALID_ITEM_HASH) { + break; + } + next_iter_idx++; + } + + // Exceed max boundary, we should try next LinkBlock, the iter_idx is now + // meaningless since there should be only one LinkBlock active at the same + // time during iteration. + if (next_iter_idx == hash_values_.size()) { + return Status::NotFound("Exceed LinkBlock's max boundary"); + } + + // Current LinkBlock is still in use, update iter_idx. + iter_idx_ = next_iter_idx; + return Status::OK(); + } + + // See the comment `Next()`, the `Prev()` implementation is almost the same + // except iterator direction. + Status Prev() { + // Find the previous valid position of current LinkBlock + int prev_iter_idx = iter_idx_ - 1; + while (prev_iter_idx >= 0) { + // Move backward + auto it = iterator_cache_->GetIterator(file_numbers_[prev_iter_idx]); + it->Prev(); + if (!it->status().ok()) { + return it->status(); + } + if (hash_values_[prev_iter_idx] != INVALID_ITEM_HASH) { + break; + } + prev_iter_idx--; + } + + // Exceed the smallest boundary + if (prev_iter_idx == -1) { + return Status::NotFound("Exceed LinkBlock's smallest boundary"); + } + + // Current LinkBlock is still in use, update iter_idx. + iter_idx_ = prev_iter_idx; + + return Status::OK(); + } + + // Extract key from the underlying SST iterator + Slice CurrentKey() const { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx_]); + return it->key(); + } + + // Extract value from the underlying SST iterator + LazyBuffer CurrentValue() const { + auto it = iterator_cache_->GetIterator(file_numbers_[iter_idx_]); + return it->value(); + } + + // Return the max key of current LinkBlock + const Slice& MaxKey() const { return max_key_; } + + const LazyBuffer& EncodedValue() const { return encoded_value; } + + // If we have a non-empty `smallest_key_`, we should re-init all underlying + // iterators (default: invalid) + const Slice& SmallestKey() const { return smallest_key_; } + + bool HasSmallestKey() const { return smallest_key_.valid(); } + + std::vector& GetFileNumbers() { return file_numbers_; } + + private: + // If a key's hash is marked as `INVALID_ITEM_HASH`, it means we should remove + // this item in next compaction, thus we shouldn't read from it. + const int INVALID_ITEM_HASH = 0; + + // Get target key from cache. If it's not present, fetch from iterator & cache + // it. + Slice buffered_key(uint32_t idx) { + uint64_t file_number = file_numbers_[idx]; + if (!key_buffer_[idx].valid()) { + // Find out the occurrence between `idx` and the last position of current + // file_number, e.g. [0 ,1, 10, 11, 9, 10, 2], if `idx` is 2, then the + // occurrence should be {2, 5}. + std::vector occurrence = {idx}; + for (uint32_t i = idx + 1; i < group_sz_; ++i) { + if (file_numbers_[i] == file_number) { + occurrence.emplace_back(i); + } + } + + // Seek to the last position of current file_number and `Prev` back to the + // position `idx`, fill all touched keys into the key buffer. + auto it = iterator_cache_->GetIterator(file_number); + it->SeekForPrev(max_key_); + if (!it->status().ok()) { + return {nullptr}; + } + for (uint32_t i = occurrence.size() - 1; i >= 0; --i) { + uint32_t pos = occurrence[i]; + if (!key_buffer_[pos].valid()) { + key_buffer_[pos] = it->key(); + } + it->Prev(); + } + } + + assert(!key_buffer_[idx].empty()); + return key_buffer_[idx]; + } + + private: + IteratorCache* iterator_cache_; + + // The end/max key of current LinkBlock + Slice max_key_; + // Encoded LBR value (contains all current LBR's information) + LazyBuffer encoded_value; + // How many KV pairs we should group into one LinkBlock. + int group_sz_ = 8; + // Bits count required for each underlying SST file + int hash_bits_ = 11; + // Cache touched keys while iterating + std::vector key_buffer_; + // Indicate which SST current KV pairs belongs to. + // file_numbers_.size() == block_sz_ + std::vector file_numbers_; + // Each KV pair has a hash value (hash(user_key)) + std::vector hash_values_; + // Current iteration index of this LinkBlock. + int iter_idx_ = 0; + // Optional, if smallest_key_exist, it means one of the underlying iterator + // is expired, we should seek all iterators to target key again for further + // iteration. + Slice smallest_key_; +}; + +// LinkSstIterator is a composition of a few ordinary iterators +class LinkSstIterator : public InternalIterator { + private: + const FileMetaData* file_meta_; + InternalIterator* link_sst_iter_{}; + InternalKeyComparator icomp_; + + IteratorCache iterator_cache_; + std::vector lbr_list_; + + Status status_; + uint32_t cur_lbr_idx_{}; + + // Some operations may invalid current iterator. + bool valid_ = true; + + bool lbr_initialized_ = false; + bool lbr_actived_ = false; + + // BinaryHeap, HeapVectorType> min_heap_; + + public: + LinkSstIterator(const FileMetaData* file_meta, InternalIterator* iter, + const DependenceMap& dependence_map, + const InternalKeyComparator& icomp, void* create_arg, + const IteratorCache::CreateIterCallback& create) + : file_meta_(file_meta), + link_sst_iter_(iter), + icomp_(icomp), + iterator_cache_(dependence_map, create_arg, create) { + if (file_meta != nullptr && !file_meta_->prop.is_map_sst()) { + abort(); + } + } + + ~LinkSstIterator() override = default; + + private: + // Init all LBR by decoding all LinkSST's value. + bool InitLinkBlockRecords() { + if (lbr_initialized_) { + return true; + } + LazyBuffer current_value_; + link_sst_iter_->SeekToFirst(); + while (link_sst_iter_->Valid()) { + current_value_ = link_sst_iter_->value(); + status_ = current_value_.fetch(); + if (!status_.ok()) { + status_ = Status::Corruption("Failed to fetch lazy buffer"); + return false; + } + Slice input = current_value_.slice(); + lbr_list_.emplace_back(&iterator_cache_, link_sst_iter_->key(), + file_meta_->prop.lbr_group_size, + file_meta_->prop.lbr_hash_bits); + if (!lbr_list_.back().DecodeFrom(input)) { + status_ = Status::Corruption("Cannot decode Link SST"); + return false; + } + link_sst_iter_->Next(); + } + current_value_.reset(); + lbr_initialized_ = true; + return true; + } + + // We assume there should be a lot of underlying SST for each LinkSST, so we + // could simply initialize all SST iterators before any iteration. + bool InitSecondLevelIterators() { + if (lbr_actived_) { + return true; + } + for (auto& lb : lbr_list_) { + if (!lb.ActiveLinkBlock()) { + return false; + } + } + lbr_actived_ = true; + return true; + } + + public: + bool Valid() const override { return !lbr_list_.empty() && valid_; } + void SeekToFirst() override { + if (!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + return; + } + assert(!lbr_list_.empty()); + cur_lbr_idx_ = 0; + status_ = lbr_list_[cur_lbr_idx_].SeekToFirst(); + valid_ = true; + } + + void SeekToLast() override { + if (!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + return; + } + assert(!lbr_list_.empty()); + cur_lbr_idx_ = lbr_list_.size() - 1; + status_ = lbr_list_[cur_lbr_idx_].SeekToLast(); + valid_ = true; + } + + // TODO(guokuankuan@bytendance.com) + // Is input target a InternalKey ? then what is the default sequence#? + void Seek(const Slice& target) override { + if (!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + return; + } + // Find target LinkBlock's position + auto it = + std::lower_bound(lbr_list_.begin(), lbr_list_.end(), target, + [&](const LinkBlockRecord& lbr, const Slice& target) { + return icomp_.Compare(lbr.MaxKey(), target) < 0; + }); + if (it == lbr_list_.end()) { + status_ = Status::NotFound(); + return; + } + cur_lbr_idx_ = it - lbr_list_.begin(); + // Do the Seek + status_ = lbr_list_[cur_lbr_idx_].Seek(target, icomp_); + valid_ = true; + } + + // Position at the first key at or before the target. + void SeekForPrev(const Slice& target) override { + if (!InitLinkBlockRecords() || !InitSecondLevelIterators()) { + status_ = Status::Corruption(); + return; + } + + // We adopt Seek & Prev semantics here + Seek(target); + + // If the position's key is equal to target, we are good to go. + if (status_.ok() && key() == target) { + return; + } + + // If the key is greater than target, then we need to `Prev` to the right + // place. + while (status_.ok() && key().compare(target) > 0) { + Prev(); + } + valid_ = true; + } + + void Next() override { + assert(Valid()); + while (cur_lbr_idx_ < lbr_list_.size()) { + auto s = lbr_list_[cur_lbr_idx_].Next(); + if (s.ok()) { + break; + } + + // If we cannot `Next()` current LBR properly, try next. + cur_lbr_idx_++; + if (cur_lbr_idx_ == lbr_list_.size()) { + break; + } + + assert(cur_lbr_idx_ < lbr_list_.size()); + // If next LBR has a valid smallest key, we should re-seek all iterators + // (which means the iterators' continuous may break) + auto& next_lbr = lbr_list_[cur_lbr_idx_]; + if (next_lbr.HasSmallestKey()) { + next_lbr.Seek(next_lbr.SmallestKey(), icomp_); + } + } + + // No valid position found + if (cur_lbr_idx_ == lbr_list_.size()) { + status_ = Status::NotFound("End of iterator exceeded"); + return; + } + } + + void Prev() override { + assert(Valid()); + while (cur_lbr_idx_ >= 0) { + auto s = lbr_list_[cur_lbr_idx_].Prev(); + if (s.ok()) { + break; + } + // All items were consumed, exit. + if (cur_lbr_idx_ == 0) { + status_ = Status::NotFound("Not more previous items!"); + return; + } + + // If we cannot `Prev()` current LBR, try previous one, note that if + // current LBR has a valid smallest key, we should re-seek previous LBR. + cur_lbr_idx_--; + auto& curr_lbr = lbr_list_[cur_lbr_idx_ + 1]; + auto& prev_lbr = lbr_list_[cur_lbr_idx_]; + if (curr_lbr.HasSmallestKey()) { + prev_lbr.Seek(prev_lbr.MaxKey(), icomp_); + } + } + } + + Slice key() const override { + assert(Valid()); + return lbr_list_[cur_lbr_idx_].CurrentKey(); + } + + LazyBuffer value() const override { + assert(Valid()); + return lbr_list_[cur_lbr_idx_].CurrentValue(); + } + + // Find target key's underlying SST file number (exactly match) + // TODO (guokuankuan@bytedance.com) Maybe we could re-use Seek() here? + Status GetTargetFileNumber(const Slice& target, uint64_t* file_number) { + if (!InitLinkBlockRecords()) { + return Status::Corruption(); + } + // Find target LinkBlock's position + auto it = + std::lower_bound(lbr_list_.begin(), lbr_list_.end(), target, + [&](const LinkBlockRecord& lbr, const Slice& target) { + return icomp_.Compare(lbr.MaxKey(), target) < 0; + }); + if (it == lbr_list_.end()) { + return Status::NotFound(); + } + + return it->GetFileNumber(target, file_number); + } + + Status status() const override { return status_; } +}; + // Return a new two level iterator. A two-level iterator contains an // index iterator whose values point to a sequence of blocks where // each block is itself a sequence of key,value pairs. The returned @@ -42,12 +598,20 @@ extern InternalIteratorBase* NewTwoLevelIterator( TwoLevelIteratorState* state, InternalIteratorBase* first_level_iter); -// Retuan a two level iterator. for unroll map sst // keep all params lifecycle please +// @return a two level iterator. for unroll map sst extern InternalIterator* NewMapSstIterator( const FileMetaData* file_meta, InternalIterator* mediate_sst_iter, const DependenceMap& dependence_map, const InternalKeyComparator& icomp, void* callback_arg, const IteratorCache::CreateIterCallback& create_iter, Arena* arena = nullptr); +// A link sst iterator should expand all its dependencies for the callers. +// @return TwoLevelIterator +extern InternalIterator* NewLinkSstIterator( + const FileMetaData* file_meta, InternalIterator* mediate_sst_iter, + const DependenceMap& dependence_map, const InternalKeyComparator& icomp, + void* callback_arg, const IteratorCache::CreateIterCallback& create_iter, + Arena* arena = nullptr); + } // namespace TERARKDB_NAMESPACE diff --git a/util/heap.h b/util/heap.h index a21015998c..96b34c8087 100644 --- a/util/heap.h +++ b/util/heap.h @@ -129,7 +129,7 @@ class BinaryHeap { T v = std::move(data_[index]); size_t picked_child = port::kMaxSizet; - while (1) { + while (true) { const size_t left_child = get_left(index); if (get_left(index) >= data_.size()) { break; diff --git a/util/iterator_cache.h b/util/iterator_cache.h index d545155df0..c04e7154b6 100644 --- a/util/iterator_cache.h +++ b/util/iterator_cache.h @@ -14,10 +14,14 @@ namespace TERARKDB_NAMESPACE { -struct FileMetaData; class RangeDelAggregator; class TableReader; +// FileMetaData> +// +// TODO(guokuankuan@bytedance.com) +// Shall we change this variable name to `FileMetaMap`? This map simply maps file +// numbers to their related file metadata typedef chash_map DependenceMap; class IteratorCache {