Skip to content

Commit

Permalink
Blob db create a snapshot before every read
Browse files Browse the repository at this point in the history
Summary:
If GC kicks in between

* A Get() reads index entry from base db.
* The Get() read from a blob file

The GC can delete the corresponding blob file, making the key not found. Fortunately we have existing logic to avoid deleting a blob file if it is referenced by a snapshot. So the fix is to explicitly create a snapshot before reading index entry from base db.
Closes facebook#2754

Differential Revision: D5655956

Pulled By: yiwu-arbug

fbshipit-source-id: e4ccbc51331362542e7343175bbcbdea5830f544
  • Loading branch information
yiwu-arbug authored and facebook-github-bot committed Aug 21, 2017
1 parent 4624ae5 commit 5b68b11
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 38 deletions.
100 changes: 72 additions & 28 deletions utilities/blob_db/blob_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,8 @@ void BlobDBImpl::StartBackgroundTasks() {
kDeleteCheckPeriodMillisecs,
std::bind(&BlobDBImpl::EvictCompacted, this, std::placeholders::_1));
tqueue_.add(
kDeleteObsoletedFilesPeriodMillisecs,
std::bind(&BlobDBImpl::DeleteObsFiles, this, std::placeholders::_1));
kDeleteObsoleteFilesPeriodMillisecs,
std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
tqueue_.add(kSanityCheckPeriodMillisecs,
std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
tqueue_.add(kWriteAmplificationStatsPeriodMillisecs,
Expand Down Expand Up @@ -1117,25 +1117,47 @@ Status BlobDBImpl::AppendSN(const std::shared_ptr<BlobFile>& bfile,
}

std::vector<Status> BlobDBImpl::MultiGet(
const ReadOptions& options,
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
// Get a snapshot to avoid blob file get deleted between we
// fetch and index entry and reading from the file.
ReadOptions ro(read_options);
bool snapshot_created = SetSnapshotIfNeeded(&ro);
std::vector<std::string> values_lsm;
values_lsm.resize(keys.size());
auto statuses = db_->MultiGet(options, column_family, keys, &values_lsm);
auto statuses = db_->MultiGet(ro, column_family, keys, &values_lsm);
TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:1");
TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:2");

values->resize(keys.size());
assert(statuses.size() == keys.size());
for (size_t i = 0; i < keys.size(); ++i) {
if (!statuses[i].ok()) continue;
if (!statuses[i].ok()) {
continue;
}

auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
auto cfd = cfh->cfd();

Status s = CommonGet(cfd, keys[i], values_lsm[i], &((*values)[i]));
statuses[i] = s;
}
if (snapshot_created) {
db_->ReleaseSnapshot(ro.snapshot);
}
return statuses;
}

bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
assert(read_options != nullptr);
if (read_options->snapshot != nullptr) {
return false;
}
read_options->snapshot = db_->GetSnapshot();
return true;
}

Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
const std::string& index_entry, std::string* value,
SequenceNumber* sequence) {
Expand Down Expand Up @@ -1172,11 +1194,6 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
bfile = hitr->second;
}

if (bfile->Obsolete()) {
return Status::NotFound(
"Blob Not Found as blob file was garbage collected");
}

// 0 - size
if (!handle.size() && value != nullptr) {
value->clear();
Expand Down Expand Up @@ -1274,25 +1291,30 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
return s;
}

Status BlobDBImpl::Get(const ReadOptions& options,
Status BlobDBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();

// Get a snapshot to avoid blob file get deleted between we
// fetch and index entry and reading from the file.
// TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
ReadOptions ro(read_options);
bool snapshot_created = SetSnapshotIfNeeded(&ro);

Status s;
std::string index_entry;
s = db_->Get(options, column_family, key, &index_entry);
if (!s.ok()) {
if (debug_level_ >= 3)
ROCKS_LOG_WARN(db_options_.info_log,
"Get Failed on LSM KEY: %s status: '%s'",
key.ToString().c_str(), s.ToString().c_str());
return s;
s = db_->Get(ro, column_family, key, &index_entry);
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1");
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2");
if (s.ok()) {
s = CommonGet(cfd, key, index_entry, value->GetSelf());
value->PinSelf();
}
if (snapshot_created) {
db_->ReleaseSnapshot(ro.snapshot);
}

s = CommonGet(cfd, key, index_entry, value->GetSelf());
value->PinSelf();
return s;
}

Expand All @@ -1302,6 +1324,8 @@ Slice BlobDBIterator::value() const {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh_);
auto cfd = cfh->cfd();

TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:1");
TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:2");
Status s = db_impl_->CommonGet(cfd, iter_->key(), index_entry.ToString(false),
&vpart_);
return Slice(vpart_);
Expand Down Expand Up @@ -1977,7 +2001,7 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr<BlobFile> bfile, uint64_t now,
return false;
}

std::pair<bool, int64_t> BlobDBImpl::DeleteObsFiles(bool aborted) {
std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
if (aborted) return std::make_pair(false, -1);

{
Expand All @@ -2002,6 +2026,7 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsFiles(bool aborted) {
}
}

blob_files_.erase(bfile->BlobFileNumber());
Status s = env_->DeleteFile(bfile->PathName());
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
Expand All @@ -2026,7 +2051,9 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsFiles(bool aborted) {
// put files back into obsolete if for some reason, delete failed
if (!tobsolete.empty()) {
WriteLock wl(&mutex_);
for (auto bfile : tobsolete) obsolete_files_.push_front(bfile);
for (auto bfile : tobsolete) {
obsolete_files_.push_front(bfile);
}
}

return std::make_pair(!aborted, -1);
Expand Down Expand Up @@ -2212,8 +2239,6 @@ std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
WriteLock wl(&mutex_);
for (auto bfile : obsoletes) {
bool last_file = (bfile == obsoletes.back());
// remove from global list so writers
blob_files_.erase(bfile->BlobFileNumber());

if (!evict_cb) {
bfile->SetCanBeDeleted();
Expand All @@ -2231,10 +2256,14 @@ std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
return std::make_pair(true, -1);
}

Iterator* BlobDBImpl::NewIterator(const ReadOptions& opts,
Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) {
return new BlobDBIterator(db_->NewIterator(opts, column_family),
column_family, this);
// Get a snapshot to avoid blob file get deleted between we
// fetch and index entry and reading from the file.
ReadOptions ro(read_options);
bool snapshot_created = SetSnapshotIfNeeded(&ro);
return new BlobDBIterator(db_->NewIterator(ro, column_family), column_family,
this, snapshot_created, ro.snapshot);
}

Status DestroyBlobDB(const std::string& dbname, const Options& options,
Expand Down Expand Up @@ -2283,6 +2312,7 @@ Status BlobDBImpl::TEST_GetSequenceNumber(const Slice& key,
}

std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
ReadLock l(&mutex_);
std::vector<std::shared_ptr<BlobFile>> blob_files;
for (auto& p : blob_files_) {
blob_files.emplace_back(p.second);
Expand All @@ -2300,6 +2330,10 @@ std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
return obsolete_files;
}

void BlobDBImpl::TEST_DeleteObsoleteFiles() {
DeleteObsoleteFiles(false /*abort*/);
}

void BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
CloseSeqWrite(bfile, false /*abort*/);
}
Expand All @@ -2310,6 +2344,16 @@ Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
}

void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); }

void BlobDBImpl::TEST_ObsoleteFile(std::shared_ptr<BlobFile>& bfile) {
uint64_t number = bfile->BlobFileNumber();
assert(blob_files_.count(number) > 0);
bfile->SetCanBeDeleted();
{
WriteLock l(&mutex_);
obsolete_files_.push_back(bfile);
}
}
#endif // !NDEBUG

} // namespace blob_db
Expand Down
43 changes: 33 additions & 10 deletions utilities/blob_db/blob_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class BlobDBImpl : public BlobDB {
static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000;

// how often to schedule delete obs files periods
static constexpr uint32_t kDeleteObsoletedFilesPeriodMillisecs = 10 * 1000;
static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000;

// how often to schedule check seq files period
static constexpr uint32_t kCheckSeqFilesPeriodMillisecs = 10 * 1000;
Expand All @@ -219,16 +219,16 @@ class BlobDBImpl : public BlobDB {
const Slice& key) override;

using rocksdb::StackableDB::Get;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) override;

using rocksdb::StackableDB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& opts,
virtual Iterator* NewIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) override;

using rocksdb::StackableDB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
Expand Down Expand Up @@ -269,11 +269,19 @@ class BlobDBImpl : public BlobDB {
GCStats* gc_stats);

void TEST_RunGC();

void TEST_ObsoleteFile(std::shared_ptr<BlobFile>& bfile);

void TEST_DeleteObsoleteFiles();
#endif // !NDEBUG

private:
Status OpenPhase1();

// Create a snapshot if there isn't one in read options.
// Return true if a snapshot is created.
bool SetSnapshotIfNeeded(ReadOptions* read_options);

Status CommonGet(const ColumnFamilyData* cfd, const Slice& key,
const std::string& index_entry, std::string* value,
SequenceNumber* sequence = nullptr);
Expand Down Expand Up @@ -332,7 +340,7 @@ class BlobDBImpl : public BlobDB {
// delete files which have been garbage collected and marked
// obsolete. Check whether any snapshots exist which refer to
// the same
std::pair<bool, int64_t> DeleteObsFiles(bool aborted);
std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted);

// Major task to garbage collect expired and deleted blobs
std::pair<bool, int64_t> RunGC(bool aborted);
Expand Down Expand Up @@ -593,7 +601,7 @@ class BlobFile {

// This Read-Write mutex is per file specific and protects
// all the datastructures
port::RWMutex mutex_;
mutable port::RWMutex mutex_;

// time when the random access reader was last created.
std::atomic<std::time_t> last_access_;
Expand Down Expand Up @@ -700,12 +708,23 @@ class BlobFile {
class BlobDBIterator : public Iterator {
public:
explicit BlobDBIterator(Iterator* iter, ColumnFamilyHandle* column_family,
BlobDBImpl* impl)
: iter_(iter), cfh_(column_family), db_impl_(impl) {
assert(iter_);
BlobDBImpl* impl, bool own_snapshot,
const Snapshot* snapshot)
: iter_(iter),
cfh_(column_family),
db_impl_(impl),
own_snapshot_(own_snapshot),
snapshot_(snapshot) {
assert(iter != nullptr);
assert(snapshot != nullptr);
}

~BlobDBIterator() { delete iter_; }
~BlobDBIterator() {
if (own_snapshot_) {
db_impl_->ReleaseSnapshot(snapshot_);
}
delete iter_;
}

bool Valid() const override { return iter_->Valid(); }

Expand All @@ -727,10 +746,14 @@ class BlobDBIterator : public Iterator {

Status status() const override { return iter_->status(); }

// Iterator::Refresh() not supported.

private:
Iterator* iter_;
ColumnFamilyHandle* cfh_;
BlobDBImpl* db_impl_;
bool own_snapshot_;
const Snapshot* snapshot_;
mutable std::string vpart_;
};

Expand Down
Loading

0 comments on commit 5b68b11

Please sign in to comment.