Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add dbiter value_meta() #57

Open
wants to merge 5 commits into
base: dev.1.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ Status BuildTable(
int_tbl_prop_collector_factories,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories_for_blob,
uint32_t column_family_id, const std::string& column_family_name,
const ValueExtractor* meta_extractor, uint32_t column_family_id,
const std::string& column_family_name,
std::vector<SequenceNumber> snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, const CompressionType compression,
Expand Down Expand Up @@ -165,7 +166,7 @@ Status BuildTable(
std::unique_ptr<TableBuilder> builder;
FileMetaData* current_output = nullptr;
TableProperties* current_prop = nullptr;
std::unique_ptr<ValueExtractor> value_meta_extractor;
const ValueExtractor* meta_extractor;
Status (*trans_to_separate_callback)(void* args, const Slice& key,
LazyBuffer& value) = nullptr;
void* trans_to_separate_callback_args = nullptr;
Expand All @@ -175,7 +176,7 @@ Status BuildTable(
bool is_index) override {
return SeparateHelper::TransToSeparate(
internal_key, value, value.file_number(), meta, is_merge, is_index,
value_meta_extractor.get());
meta_extractor);
}

Status TransToSeparate(const Slice& internal_key,
Expand All @@ -194,11 +195,6 @@ Status BuildTable(
return LazyBuffer();
}
} separate_helper;
if (ioptions.value_meta_extractor_factory != nullptr) {
ValueExtractorContext context = {column_family_id};
separate_helper.value_meta_extractor =
ioptions.value_meta_extractor_factory->CreateValueExtractor(context);
}

auto finish_output_blob_sst = [&] {
Status status;
Expand Down Expand Up @@ -292,7 +288,7 @@ Status BuildTable(
status = SeparateHelper::TransToSeparate(
key, value, blob_meta->fd.GetNumber(), Slice(),
GetInternalKeyType(key) == kTypeMerge, false,
separate_helper.value_meta_extractor.get());
separate_helper.meta_extractor);
}
return status;
};
Expand All @@ -307,6 +303,7 @@ Status BuildTable(
c_style_callback(trans_to_separate);
separate_helper.trans_to_separate_callback_args = &trans_to_separate;
}
separate_helper.meta_extractor = meta_extractor;

CompactionIterator c_iter(
iter.get(), &separate_helper, nullptr,
Expand Down
3 changes: 2 additions & 1 deletion db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ extern Status BuildTable(
int_tbl_prop_collector_factories,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories_for_blob,
uint32_t column_family_id, const std::string& column_family_name,
const ValueExtractor* meta_extractor, uint32_t column_family_id,
const std::string& column_family_name,
std::vector<SequenceNumber> snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, const CompressionType compression,
Expand Down
18 changes: 18 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
return cfd()->user_comparator();
}

const ValueExtractor* ColumnFamilyHandleImpl::GetMetaExtractor() const {
return cfd()->meta_extractor();
}

Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
if (!cf_options.compression_per_level.empty()) {
for (size_t level = 0; level < cf_options.compression_per_level.size();
Expand Down Expand Up @@ -489,6 +493,12 @@ ColumnFamilyData::ColumnFamilyData(
}
}

if (ioptions_.value_meta_extractor_factory) {
ValueExtractorContext ctx{id_};
meta_extractor_ =
ioptions_.value_meta_extractor_factory->CreateValueExtractor(ctx);
}

RecalculateWriteStallConditions(mutable_cf_options_);
}

Expand Down Expand Up @@ -1476,4 +1486,12 @@ const Comparator* GetColumnFamilyUserComparator(
return nullptr;
}

const ValueExtractor* GetColumnFamilyMetaExtractor(
ColumnFamilyHandle* column_family) {
if (column_family != nullptr) {
return column_family->GetMetaExtractor();
}
return nullptr;
}

} // namespace TERARKDB_NAMESPACE
8 changes: 8 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
virtual const std::string& GetName() const override;
virtual Status GetDescriptor(ColumnFamilyDescriptor* desc) override;
virtual const Comparator* GetComparator() const override;
virtual const ValueExtractor* GetMetaExtractor() const override;

private:
ColumnFamilyData* cfd_;
Expand Down Expand Up @@ -318,6 +319,8 @@ class ColumnFamilyData {
const InternalKeyComparator& internal_comparator() const {
return internal_comparator_;
}
// thread-safe
const ValueExtractor* meta_extractor() const { return meta_extractor_.get(); }

const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories(const MutableCFOptions& moptions) const {
Expand Down Expand Up @@ -436,6 +439,8 @@ class ColumnFamilyData {
const ImmutableCFOptions ioptions_;
MutableCFOptions mutable_cf_options_;

std::unique_ptr<ValueExtractor> meta_extractor_;

const bool is_delete_range_supported_;

std::unique_ptr<TableCache> table_cache_;
Expand Down Expand Up @@ -655,4 +660,7 @@ extern uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family);
extern const Comparator* GetColumnFamilyUserComparator(
ColumnFamilyHandle* column_family);

extern const ValueExtractor* GetColumnFamilyMetaExtractor(
ColumnFamilyHandle* column_family);

} // namespace TERARKDB_NAMESPACE
15 changes: 5 additions & 10 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {

struct BuilderSeparateHelper : public SeparateHelper {
SeparateHelper* separate_helper = nullptr;
std::unique_ptr<ValueExtractor> value_meta_extractor;
const ValueExtractor* meta_extractor;
Status (*trans_to_separate_callback)(void* args, const Slice& key,
LazyBuffer& value) = nullptr;
void* trans_to_separate_callback_args = nullptr;
Expand All @@ -1287,7 +1287,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
bool is_index) override {
return SeparateHelper::TransToSeparate(
internal_key, value, value.file_number(), meta, is_merge, is_index,
value_meta_extractor.get());
meta_extractor);
}

Status TransToSeparate(const Slice& key, LazyBuffer& value) override {
Expand All @@ -1303,13 +1303,6 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
return separate_helper->TransToCombined(user_key, sequence, value);
}
} separate_helper;
if (compact_->compaction->immutable_cf_options()
->value_meta_extractor_factory != nullptr) {
ValueExtractorContext context = {cfd->GetID()};
separate_helper.value_meta_extractor =
compact_->compaction->immutable_cf_options()
->value_meta_extractor_factory->CreateValueExtractor(context);
}

auto trans_to_separate = [&](const Slice& key, LazyBuffer& value) {
assert(value.file_number() == uint64_t(-1));
Expand All @@ -1334,12 +1327,14 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
status = SeparateHelper::TransToSeparate(
key, value, blob_meta->fd.GetNumber(), Slice(),
GetInternalKeyType(key) == kTypeMerge, false,
separate_helper.value_meta_extractor.get());
separate_helper.meta_extractor);
}
return status;
};

separate_helper.separate_helper = sub_compact->compaction->input_version();
separate_helper.meta_extractor =
compact_->compaction->column_family_data()->meta_extractor();
if (!sub_compact->compaction->immutable_cf_options()
->table_factory->IsBuilderNeedSecondPass()) {
separate_helper.trans_to_separate_callback =
Expand Down
1 change: 1 addition & 0 deletions db/comparator_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class KVIter : public Iterator {
}

virtual Slice key() const override { return iter_->first; }
virtual Slice meta() const override { return Slice::Invalid(); }
virtual Slice value() const override { return iter_->second; }
virtual Status status() const override { return Status::OK(); }

Expand Down
2 changes: 1 addition & 1 deletion db/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
&meta_vec, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(mutable_cf_options),
cfd->int_tbl_prop_collector_factories_for_blob(mutable_cf_options),
cfd->GetID(), cfd->GetName(), snapshot_seqs,
cfd->meta_extractor(), cfd->GetID(), cfd->GetName(), snapshot_seqs,
earliest_write_conflict_snapshot, snapshot_checker,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
cfd->ioptions()->compression_opts, paranoid_file_checks,
Expand Down
30 changes: 30 additions & 0 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,31 @@ class DBIter final : public Iterator {
return saved_key_.GetUserKey();
}
}
virtual Slice meta() const override {
if (cfd_ == nullptr) {
return Slice::Invalid();
}
if (meta_.valid()) {
return meta_;
}
do {
auto v = value();
if (!v.valid()) {
break;
}
auto s = cfd_->meta_extractor()->Extract(saved_key_.GetUserKey(), v,
&meta_buffer_);
if (!s.ok()) {
valid_ = false;
break;
}
meta_ = meta_buffer_;
return meta_;
} while (false);

meta_ = Slice::Invalid();
return meta_;
}
virtual Slice value() const override {
assert(valid_);
auto s = value_.fetch();
Expand Down Expand Up @@ -279,6 +304,8 @@ class DBIter final : public Iterator {
local_stats_.skip_count_--;
}
num_internal_keys_skipped_ = 0;
meta_ = Slice::Invalid();
meta_buffer_.clear();
value_.reset();
if (value_buffer_.capacity() > 1048576) {
std::string().swap(value_buffer_);
Expand All @@ -303,6 +330,8 @@ class DBIter final : public Iterator {
ParsedInternalKey ikey_;
LazyBuffer value_;
std::string value_buffer_;
mutable Slice meta_;
mutable std::string meta_buffer_;
Direction direction_;
mutable bool valid_;
bool current_entry_is_merged_;
Expand Down Expand Up @@ -1445,6 +1474,7 @@ inline void ArenaWrappedDBIter::SeekForPrev(const Slice& target) {
inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::meta() const { return db_iter_->meta(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
Expand Down
1 change: 1 addition & 0 deletions db/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class ArenaWrappedDBIter : public Iterator {
virtual void Next() override;
virtual void Prev() override;
virtual Slice key() const override;
virtual Slice meta() const override;
virtual Slice value() const override;
virtual Status status() const override;
virtual Status Refresh() override;
Expand Down
94 changes: 94 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,29 @@

namespace TERARKDB_NAMESPACE {

class TestValueExtractor : public ValueExtractor {
public:
~TestValueExtractor() override{};

// Extract a custom info from a specified key value pair. This method is
// called when a value will trans to separate.
virtual Status Extract(const Slice& /*key*/, const Slice& value,
std::string* output) const override {
output->assign(std::to_string(value.size()));
return Status::OK();
};
};
class TestValueExtractorFactory : public ValueExtractorFactory {
public:
virtual std::unique_ptr<ValueExtractor> CreateValueExtractor(
const Context& context) const override {
return std::make_unique<TestValueExtractor>();
}

// Returns a name that identifies this value extractor factory.
virtual const char* Name() const override { return "TestValueExtractor"; }
};

class DBTest : public DBTestBase {
public:
DBTest() : DBTestBase("/db_test") {}
Expand Down Expand Up @@ -225,6 +248,76 @@ TEST_F(DBTest, WriteEmptyBatch) {
ASSERT_EQ("bar", Get(1, "foo"));
}

TEST_F(DBTest, GetValueMeta) {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 100000;
options.value_meta_extractor_factory =
std::make_shared<TestValueExtractorFactory>();

CreateAndReopenWithCF({"pikachu"}, options);

ASSERT_OK(Put(1, "k4", "123456"));
ASSERT_OK(Put(1, "k6", "12345678"));
ASSERT_OK(Put(1, "k3", "12345"));
ASSERT_OK(Put(1, "k1", "123"));
ASSERT_OK(Put(1, "k2", "1234"));
ASSERT_OK(Put(1, "k5", "1234567"));
ASSERT_OK(Put(1, "k7", "123456789"));
WriteBatch empty_batch;
ASSERT_OK(dbfull()->Write(WriteOptions(), &empty_batch));

std::unique_ptr<Iterator> db_iter(
dbfull()->NewIterator(ReadOptions(), dbfull()->GetColumnFamilyHandle(1)));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->meta(), "3");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->meta(), "4");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->meta(), "5");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->meta(), "6");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->meta(), "7");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->meta(), "8");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->meta(), "9");
db_iter.reset();

ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
db_iter.reset(
dbfull()->NewIterator(ReadOptions(), dbfull()->GetColumnFamilyHandle(1)));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->meta(), "3");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->meta(), "4");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->meta(), "5");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->meta(), "6");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->meta(), "7");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->meta(), "8");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->meta(), "9");
}

TEST_F(DBTest, SkipDelay) {
Options options = CurrentOptions();
options.env = env_;
Expand Down Expand Up @@ -2762,6 +2855,7 @@ class ModelDB : public DB {

virtual Slice key() const override { return iter_->first; }
virtual Slice value() const override { return iter_->second; }
virtual Slice meta() const override { return Slice::Invalid(); }
virtual Status status() const override { return Status::OK(); }

private:
Expand Down
Loading