Skip to content

Commit 907db3e

Browse files
committedApr 7, 2022
[WIP] Add Link Compaction related interfaces
1 parent a4b054c commit 907db3e

13 files changed

+106
-3
lines changed
 

‎db/column_family.cc

+7
Original file line numberDiff line numberDiff line change
@@ -994,6 +994,13 @@ Compaction* ColumnFamilyData::PickCompaction(
994994
return result;
995995
}
996996

997+
// TODO(guokuankuan@bytedance.com)
998+
Compaction* ColumnFamilyData::PickLinkCompaction(
999+
const MutableCFOptions& mutable_options,
1000+
const std::vector<SequenceNumber>& snapshots, LogBuffer* log_buffer) {
1001+
return nullptr;
1002+
}
1003+
9971004
Compaction* ColumnFamilyData::PickGarbageCollection(
9981005
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
9991006
StopWatch sw(ioptions_.env, ioptions_.statistics,

‎db/column_family.h

+6
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,12 @@ class ColumnFamilyData {
278278
const std::vector<SequenceNumber>& snapshots,
279279
LogBuffer* log_buffer);
280280

281+
// Pick a Link Compaction and set related version information to it.
282+
Compaction* PickLinkCompaction(const MutableCFOptions& mutable_options,
283+
const std::vector<SequenceNumber>& snapshots,
284+
LogBuffer* log_buffer);
285+
286+
281287
Compaction* PickGarbageCollection(const MutableCFOptions& mutable_options,
282288
LogBuffer* log_buffer);
283289
// Check if the passed range overlap with any running compactions.

‎db/compaction.h

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ enum CompactionType {
7777
kKeyValueCompaction = 0,
7878
kMapCompaction = 1,
7979
kGarbageCollection = 2,
80+
kLinkCompaction = 3,
8081
};
8182

8283
struct CompactionParams {

‎db/compaction_iterator.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ CompactionIterator::CompactionIterator(
170170
level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
171171
}
172172

173-
if (snapshots_->size() == 0) {
173+
if (snapshots_->empty()) {
174174
// optimize for fast path if there are no snapshots
175175
visible_at_tip_ = true;
176176
earliest_snapshot_ = kMaxSequenceNumber;

‎db/compaction_job.cc

+14-1
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,9 @@ struct CompactionJob::SubcompactionState {
304304
}
305305

306306
struct RebuildBlobsInfo {
307+
// File numbers
307308
chash_set<uint64_t> blobs;
309+
// pop_count = planned file count - actual used file count.
308310
size_t pop_count;
309311
};
310312
struct BlobRefInfo {
@@ -1493,6 +1495,9 @@ void CompactionJob::ProcessCompaction(SubcompactionState* sub_compact) {
14931495
case kGarbageCollection:
14941496
ProcessGarbageCollection(sub_compact);
14951497
break;
1498+
case kLinkCompaction:
1499+
ProcessLinkCompaction(sub_compact);
1500+
break;
14961501
default:
14971502
assert(false);
14981503
break;
@@ -1765,6 +1770,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
17651770
if (!sub_compact->compaction->partial_compaction()) {
17661771
dict_sample_data.reserve(kSampleBytes);
17671772
}
1773+
1774+
// Represents how many records in target blob SST that are needed by the key
1775+
// SST
17681776
std::unordered_map<uint64_t, uint64_t> dependence;
17691777

17701778
size_t yield_count = 0;
@@ -2036,7 +2044,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
20362044
sub_compact->c_iter.reset();
20372045
input.reset();
20382046
sub_compact->status = status;
2039-
} // namespace TERARKDB_NAMESPACE
2047+
}
2048+
2049+
// TODO(guokuankuan@bytedance.com)
2050+
void CompactionJob::ProcessLinkCompaction(SubcompactionState* sub_compact) {
2051+
return;
2052+
}
20402053

20412054
void CompactionJob::ProcessGarbageCollection(SubcompactionState* sub_compact) {
20422055
assert(sub_compact != nullptr);

‎db/compaction_job.h

+1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ class CompactionJob {
123123
// kv-pairs
124124
void ProcessCompaction(SubcompactionState* sub_compact);
125125
void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
126+
void ProcessLinkCompaction(SubcompactionState* sub_compact);
126127
void ProcessGarbageCollection(SubcompactionState* sub_compact);
127128

128129
Status FinishCompactionOutputFile(

‎db/compaction_picker.cc

+10-1
Original file line numberDiff line numberDiff line change
@@ -994,6 +994,15 @@ Compaction* CompactionPicker::PickGarbageCollection(
994994
return c;
995995
}
996996

997+
// TODO (guokuankuan@bytedance.com)
998+
Compaction* CompactionPicker::PickLinkCompaction(
999+
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
1000+
VersionStorageInfo* vstorage,
1001+
const std::vector<SequenceNumber>& snapshots, LogBuffer* log_buffer) {
1002+
// Need to be implemented
1003+
return nullptr;
1004+
}
1005+
9971006
void CompactionPicker::InitFilesBeingCompact(
9981007
const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage,
9991008
const InternalKey* begin, const InternalKey* end,
@@ -2971,7 +2980,7 @@ bool LevelCompactionBuilder::PickFileToCompact() {
29712980
// store where to start the iteration in the next call to PickCompaction
29722981
vstorage_->SetNextCompactionIndex(start_level_, cmp_idx);
29732982

2974-
return start_level_inputs_.size() > 0;
2983+
return !start_level_inputs_.empty();
29752984
}
29762985

29772986
bool LevelCompactionBuilder::PickIntraL0Compaction() {

‎db/compaction_picker.h

+10
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,16 @@ class CompactionPicker {
106106
VersionStorageInfo* vstorage,
107107
const std::vector<SequenceNumber>& snapshots, LogBuffer* log_buffer) = 0;
108108

109+
// Gather target files (including blob SST(KV separated) and ordinary SST) and
110+
// composite a compaction for future use.
111+
// A Link Compaction contains two major objectives:
112+
// (1) Link target SST files into a logical linked sst
113+
// (2) GC old linked SST files
114+
virtual Compaction* PickLinkCompaction(
115+
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
116+
VersionStorageInfo* vstorage,
117+
const std::vector<SequenceNumber>& snapshots, LogBuffer* log_buffer) = 0;
118+
109119
// Pick compaction which level has map or link sst
110120
Compaction* PickGarbageCollection(const std::string& cf_name,
111121
const MutableCFOptions& mutable_cf_options,

‎db/db_impl.h

+7
Original file line numberDiff line numberDiff line change
@@ -1246,6 +1246,7 @@ class DBImpl : public DB {
12461246
const std::string& dir_to_sync, FileType type,
12471247
uint64_t number, int job_id);
12481248
static void BGWorkCompaction(void* arg);
1249+
static void BGWorkLinkCompaction(void* arg);
12491250
static void BGWorkGarbageCollection(void* arg);
12501251
// Runs a pre-chosen universal compaction involving bottom level in a
12511252
// separate, bottom-pri thread pool.
@@ -1255,12 +1256,18 @@ class DBImpl : public DB {
12551256
static void UnscheduleCallback(void* arg);
12561257
void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
12571258
Env::Priority bg_thread_pri);
1259+
void BackgroundCallLinkCompaction(PrepickedCompaction* prepicked_compaction,
1260+
Env::Priority bg_thread_pri);
12581261
void BackgroundCallGarbageCollection();
12591262
void BackgroundCallFlush();
12601263
void BackgroundCallPurge();
1264+
12611265
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
12621266
LogBuffer* log_buffer,
12631267
PrepickedCompaction* prepicked_compaction);
1268+
Status BackgroundLinkCompaction(bool* madeProgress, JobContext* job_context,
1269+
LogBuffer* log_buffer,
1270+
PrepickedCompaction* prepicked_compaction);
12641271
Status BackgroundGarbageCollection(bool* madeProgress,
12651272
JobContext* job_context,
12661273
LogBuffer* log_buffer);

‎db/db_impl_compaction_flush.cc

+35
Original file line numberDiff line numberDiff line change
@@ -1888,6 +1888,14 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
18881888
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
18891889
&DBImpl::UnscheduleCallback);
18901890
}
1891+
1892+
// TODO(guokuankuan@bytedance.com)
1893+
// We need to find out a reasonable opportunity to emit Link Compactions
1894+
while(false) {
1895+
CompactionArg* ca = new CompactionArg;
1896+
env_->Schedule(&DBImpl::BGWorkLinkCompaction, ca, Env::Priority::LOW, this,
1897+
&DBImpl::UnscheduleCallback);
1898+
}
18911899
}
18921900

18931901
DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
@@ -2072,6 +2080,18 @@ void DBImpl::BGWorkCompaction(void* arg) {
20722080
delete prepicked_compaction;
20732081
}
20742082

2083+
void DBImpl::BGWorkLinkCompaction(void* arg) {
2084+
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
2085+
delete reinterpret_cast<CompactionArg*>(arg);
2086+
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
2087+
TEST_SYNC_POINT("DBImpl::BGWorkLinkCompaction");
2088+
auto prepicked_compaction =
2089+
static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
2090+
reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallLinkCompaction(
2091+
prepicked_compaction, Env::Priority::LOW);
2092+
delete prepicked_compaction;
2093+
}
2094+
20752095
void DBImpl::BGWorkGarbageCollection(void* arg) {
20762096
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
20772097
delete reinterpret_cast<CompactionArg*>(arg);
@@ -2408,6 +2428,14 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
24082428
}
24092429
}
24102430

2431+
// TODO(guokuankuan@bytedance.com)
2432+
void DBImpl::BackgroundCallLinkCompaction(PrepickedCompaction* prepicked_compaction,
2433+
Env::Priority bg_thread_pri) {
2434+
// Status s = BackgroundLinkCompaction(&made_progress, &job_context, &log_buffer,
2435+
// prepicked_compaction);
2436+
return;
2437+
}
2438+
24112439
void DBImpl::BackgroundCallGarbageCollection() {
24122440
bool made_progress = false;
24132441
JobContext job_context(next_job_id_.fetch_add(1), true);
@@ -2920,6 +2948,13 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
29202948
return status;
29212949
}
29222950

2951+
Status DBImpl::BackgroundLinkCompaction(bool* made_progress,
2952+
JobContext* job_context,
2953+
LogBuffer* log_buffer,
2954+
PrepickedCompaction* prepicked_compaction) {
2955+
return Status::OK();
2956+
}
2957+
29232958
Status DBImpl::BackgroundGarbageCollection(bool* made_progress,
29242959
JobContext* job_context,
29252960
LogBuffer* log_buffer) {

‎db/dbformat.h

+9
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ enum ValueType : unsigned char {
6464
// generated by WriteUnprepared write policy is not mistakenly read by
6565
// another.
6666
kTypeBeginUnprepareXID = 0x13, // WAL only.
67+
// Similar to kTypeValueIndex, this means current value belongs to
68+
// a LinkSST and pointed to the actual value file.
69+
kTypeLinkIndex = 0x14, // LinkSST only
6770
kMaxValue = 0x7F // Not used for storing records.
6871
};
6972

@@ -784,6 +787,12 @@ struct ParsedInternalKeyComparator {
784787
const InternalKeyComparator* cmp;
785788
};
786789

790+
// This interface is used for transferring data format between KV separated and
791+
// combined kv pairs.
792+
// We should add an implementation instance while processing key value pairs
793+
// during compactions.
794+
// Note that, we also port `Version` to this interface since we may need to fetch
795+
// value from any version that has separated values.
787796
class SeparateHelper {
788797
public:
789798
virtual ~SeparateHelper() = default;

‎include/rocksdb/compaction_filter.h

+4
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ class CompactionFilter
200200

201201
// Determines whether value changed by compaction filter were stable.
202202
// Default as false, which means stability of outcome is not promised.
203+
// "Stable" means the changed value will not change after the same
204+
// operation is applied multiple times.
205+
// Creators of the compaction filter should override this function, or
206+
// the behavior of the stability checking is undefined.
203207
virtual bool IsStableChangeValue() const { return false; }
204208

205209
// Returns a name that identifies this compaction filter.

‎util/iterator_cache.h

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ struct FileMetaData;
1818
class RangeDelAggregator;
1919
class TableReader;
2020

21+
// <FileNumber -> FileMetaData>
2122
typedef chash_map<uint64_t, FileMetaData*> DependenceMap;
2223

2324
class IteratorCache {

0 commit comments

Comments
 (0)