Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LinkCompaction: Add Link Compaction related interfaces #232

Open
wants to merge 5 commits into
base: link-compaction
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions db/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ enum CompactionType {
kKeyValueCompaction = 0,
kMapCompaction = 1,
kGarbageCollection = 2,
kLinkCompaction = 3,
};

struct CompactionParams {
Expand Down
2 changes: 1 addition & 1 deletion db/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ CompactionIterator::CompactionIterator(
level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
}

if (snapshots_->size() == 0) {
if (snapshots_->empty()) {
// optimize for fast path if there are no snapshots
visible_at_tip_ = true;
earliest_snapshot_ = kMaxSequenceNumber;
Expand Down
164 changes: 152 additions & 12 deletions db/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <algorithm>
#include <deque>
#include <memory>
#include <string>
#include <vector>

Expand All @@ -22,7 +23,28 @@

namespace TERARKDB_NAMESPACE {

class CompactionIterator {
class CompactionIteratorBase {
public:
virtual ~CompactionIteratorBase() = 0;

virtual void ResetRecordCounts() = 0;

virtual void SeekToFirst() = 0;
virtual void Next() = 0;

// Getters
virtual const Slice& key() const = 0;
virtual const LazyBuffer& value() const = 0;
virtual const Status& status() const = 0;
virtual const ParsedInternalKey& ikey() const = 0;
virtual bool Valid() const = 0;
virtual const Slice& user_key() const = 0;
virtual const CompactionIterationStats& iter_stats() const = 0;
// for KV Separation or LinkSST, current value may depend on other file numbers
virtual const std::vector<uint64_t> depended_file_numbers() const = 0;
};

class CompactionIterator: public CompactionIteratorBase {
public:
friend class CompactionIteratorToInternalIterator;

Expand Down Expand Up @@ -96,30 +118,34 @@ class CompactionIterator {
const SequenceNumber preserve_deletes_seqnum = 0,
const chash_set<uint64_t>* b = nullptr);

~CompactionIterator();
~CompactionIterator() override;

void ResetRecordCounts();
void ResetRecordCounts() override;

// Seek to the beginning of the compaction iterator output.
//
// REQUIRED: Call only once.
void SeekToFirst();
void SeekToFirst() override;

// Produces the next record in the compaction.
//
// REQUIRED: SeekToFirst() has been called.
void Next();
void Next() override;

// Getters
const Slice& key() const { return key_; }
const LazyBuffer& value() const { return value_; }
const Status& status() const { return status_; }
const ParsedInternalKey& ikey() const { return ikey_; }
bool Valid() const { return valid_; }
const Slice& user_key() const { return current_user_key_; }
const CompactionIterationStats& iter_stats() const { return iter_stats_; }
const Slice& key() const override { return key_; }
const LazyBuffer& value() const override { return value_; }
const Status& status() const override { return status_; }
const ParsedInternalKey& ikey() const override { return ikey_; }
bool Valid() const override { return valid_; }
const Slice& user_key() const override { return current_user_key_; }
const CompactionIterationStats& iter_stats() const override { return iter_stats_; }
void SetFilterSampleInterval(size_t filter_sample_interval);

virtual const std::vector<uint64_t> depended_file_numbers() const {
return {value_.file_number()};
}

private:
// Processes the input stream to find the next output
void NextFromInput();
Expand Down Expand Up @@ -236,6 +262,120 @@ class CompactionIterator {
}
};

// A LinkCompactionIterator takes a ordinary CompactionIterator as its input and
// iterate all KVs block by block (refer to LinkBlockRecord).
class LinkCompactionIterator: public CompactionIteratorBase {
public:
// @param c_iter Original compaction iterator that combines all underlying SSTs
// @param group_sz The total number of the KV items that a LinkBlock contains.
// @param hash_bits Total bits a key in LinkBlock will use.
explicit LinkCompactionIterator(std::unique_ptr<CompactionIterator> c_iter,
IteratorCache* iterator_cache,
int group_sz,
int hash_bits)
: c_iter_(std::move(c_iter)),
iterator_cache_(iterator_cache),
group_sz_(group_sz),
hash_bits_(hash_bits) {}

~LinkCompactionIterator() override {
c_iter_.reset();
}

void ResetRecordCounts() override {
c_iter_->ResetRecordCounts();
}

// We need to construct the first LinkBlockRecord here.
void SeekToFirst() override {
c_iter_->SeekToFirst();
status_ = c_iter_->status();
if(!status_.ok()) {
return;
}
Next();
}

// We need to step forward N(LinkBlockRecord's size) times to make sure we can
// get the next LinkBlockRecord(LBR)
// @see LinkSstIterator
void Next() override {
// Each time we proceed to next LBR, we refill underlying file numbers.
file_numbers_.clear();

// Obtain the original KV pairs
Slice max_key;
Slice smallest_key; // TODO need to check if we need smallest key
std::vector<uint32_t> hash_values;
for(int i = 0; i < group_sz_ && c_iter_->Valid(); ++i) {
const LazyBuffer& value = c_iter_->value();
hash_values.emplace_back(LinkBlockRecord::hash(c_iter_->user_key(), hash_bits_));
file_numbers_.emplace_back(value.file_number());
if(i == group_sz_ - 1) {
max_key = c_iter_->key();
}
Next();
if(!c_iter_->status().ok()) {
status_ = c_iter_->status();
return;
}
}
curr_lbr_ = std::make_unique<LinkBlockRecord>(iterator_cache_, max_key,
group_sz_, hash_bits_);
curr_lbr_->Encode(file_numbers_, hash_values, smallest_key);

// Parse internal key
const Slice& key = curr_lbr_->MaxKey();
if(!ParseInternalKey(key, &ikey_)) {
status_ = Status::Incomplete("Cannot decode internal key");
}
}

// Obtain the LBR's key, aka the largest key of the block range.
const Slice& key() const override {
return curr_lbr_->MaxKey();
}

const LazyBuffer& value() const override {
return curr_lbr_->EncodedValue();
}

const Status& status() const override {
return status_;
}

const ParsedInternalKey& ikey() const override {
return ikey_;
}

bool Valid() const override {
return status_.ok();
}

const Slice& user_key() const override {
return c_iter_->user_key();
}

const CompactionIterationStats& iter_stats() const override {
return c_iter_->iter_stats();
}

const std::vector<uint64_t> depended_file_numbers() const {
return file_numbers_;
}

private:
Status status_;
std::unique_ptr<LinkBlockRecord> curr_lbr_;
ParsedInternalKey ikey_;
std::vector<uint64_t> file_numbers_;

std::unique_ptr<CompactionIterator> c_iter_;
IteratorCache* iterator_cache_;
int group_sz_ = 0;
int hash_bits_ = 0;
};

InternalIterator* NewCompactionIterator(
CompactionIterator* (*new_compaction_iter_callback)(void*), void* arg,
const Slice* start_user_key = nullptr);
Expand Down
74 changes: 43 additions & 31 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) {
// Maintains state for each sub-compaction
struct CompactionJob::SubcompactionState {
const Compaction* compaction;
std::unique_ptr<CompactionIterator> c_iter;
std::unique_ptr<CompactionIteratorBase> c_iter;

// The boundaries of the key-range this compaction is interested in. No two
// subcompactions may have overlapping key-ranges.
Expand Down Expand Up @@ -304,7 +304,9 @@ struct CompactionJob::SubcompactionState {
}

struct RebuildBlobsInfo {
// File numbers
chash_set<uint64_t> blobs;
// pop_count = planned file count - actual used file count.
size_t pop_count;
};
struct BlobRefInfo {
Expand Down Expand Up @@ -1493,6 +1495,9 @@ void CompactionJob::ProcessCompaction(SubcompactionState* sub_compact) {
case kGarbageCollection:
ProcessGarbageCollection(sub_compact);
break;
case kLinkCompaction:
ProcessLinkCompaction(sub_compact);
break;
default:
assert(false);
break;
Expand Down Expand Up @@ -1580,34 +1585,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
snapshot_checker_, compact_->compaction->level(),
db_options_.statistics.get(), shutting_down_);

struct BuilderSeparateHelper : public SeparateHelper {
SeparateHelper* separate_helper = nullptr;
std::unique_ptr<ValueExtractor> value_meta_extractor;
Status (*trans_to_separate_callback)(void* args, const Slice& key,
LazyBuffer& value) = nullptr;
void* trans_to_separate_callback_args = nullptr;

Status TransToSeparate(const Slice& internal_key, LazyBuffer& value,
const Slice& meta, bool is_merge,
bool is_index) override {
return SeparateHelper::TransToSeparate(
internal_key, value, value.file_number(), meta, is_merge, is_index,
value_meta_extractor.get());
}

Status TransToSeparate(const Slice& key, LazyBuffer& value) override {
if (trans_to_separate_callback == nullptr) {
return Status::NotSupported();
}
return trans_to_separate_callback(trans_to_separate_callback_args, key,
value);
}
BuilderSeparateHelper separate_helper;

LazyBuffer TransToCombined(const Slice& user_key, uint64_t sequence,
const LazyBuffer& value) const override {
return separate_helper->TransToCombined(user_key, sequence, value);
}
} separate_helper;
if (compact_->compaction->immutable_cf_options()
->value_meta_extractor_factory != nullptr) {
ValueExtractorContext context = {cfd->GetID()};
Expand Down Expand Up @@ -1682,14 +1661,26 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
status = Status::OK();
}

sub_compact->c_iter.reset(new CompactionIterator(
auto it = std::make_unique<CompactionIterator>(
input.get(), &separate_helper, end, cfd->user_comparator(), &merge,
versions_->LastSequence(), &existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_, env_,
ShouldReportDetailedTime(env_, stats_), false, &range_del_agg,
sub_compact->compaction, mutable_cf_options->get_blob_config(),
compaction_filter, shutting_down_, preserve_deletes_seqnum_,
&rebuild_blobs_info.blobs));
&rebuild_blobs_info.blobs);

// For LinkCompaction, we wrap the old compaction iterator
if(!sub_compact->compaction->immutable_cf_options()->enable_link_compaction) {
sub_compact->c_iter = std::move(it);
} else {
// TODO iterator_cache cannot be null
sub_compact->c_iter = std::make_unique<LinkCompactionIterator>(
std::move(it), nullptr, mutable_cf_options->lbr_group_sz,
mutable_cf_options->lbr_hash_bits);
}
it.reset();

auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();

Expand Down Expand Up @@ -1765,6 +1756,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (!sub_compact->compaction->partial_compaction()) {
dict_sample_data.reserve(kSampleBytes);
}

// Represents how many records in target blob SST that are needed by the key
// SST: <file_number, count>
std::unordered_map<uint64_t, uint64_t> dependence;

size_t yield_count = 0;
Expand All @@ -1773,6 +1767,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// returns true.
const Slice& key = c_iter->key();
const LazyBuffer& value = c_iter->value();
// For KV separation, we need to record all dependencies SSTs
if (c_iter->ikey().type == kTypeValueIndex ||
c_iter->ikey().type == kTypeMergeIndex) {
assert(value.file_number() != uint64_t(-1));
Expand All @@ -1781,6 +1776,17 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
++ib.first->second;
}
}
// For LinkSSTs, the dependency decoding is slightly different
// TODO([email protected]) We didn't handle MergeIndex yet.
if(c_iter->ikey().type == kTypeLinkIndex) {
// Decode value to find out all underlying SST file numbers.
for(const auto& fn: c_iter->depended_file_numbers()) {
auto ib = dependence.emplace(fn, 1);
if (!ib.second) {
++ib.first->second;
}
}
}

assert(end == nullptr ||
cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0);
Expand Down Expand Up @@ -1824,6 +1830,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (!status.ok()) {
break;
}
// For compression
for (const auto& data_elmt : {key, value.slice()}) {
size_t data_end_offset = data_begin_offset + data_elmt.size();
while (sample_begin_offset_iter != sample_begin_offsets.cend() &&
Expand Down Expand Up @@ -2036,7 +2043,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
sub_compact->c_iter.reset();
input.reset();
sub_compact->status = status;
} // namespace TERARKDB_NAMESPACE
}

void CompactionJob::ProcessLinkCompaction(SubcompactionState* sub_compact) {
assert(sub_compact != nullptr);
return ProcessKeyValueCompaction(sub_compact);
}

void CompactionJob::ProcessGarbageCollection(SubcompactionState* sub_compact) {
assert(sub_compact != nullptr);
Expand Down
Loading