Blob db create a snapshot before every read

Summary:
If GC kicks in between

* A Get() reads index entry from base db.
* The Get() read from a blob file

The GC can delete the corresponding blob file, making the key not found. Fortunately we have existing logic to avoid deleting a blob file if it is referenced by a snapshot. So the fix is to explicitly create a snapshot before reading index entry from base db.
Closes https://github.com/facebook/rocksdb/pull/2754

Differential Revision: D5655956

Pulled By: yiwu-arbug

fbshipit-source-id: e4ccbc51331362542e7343175bbcbdea5830f544
main
yiwu-arbug 7 years ago committed by Facebook Github Bot
parent 4624ae52c9
commit 5b68b114f1
  1. 100
      utilities/blob_db/blob_db_impl.cc
  2. 43
      utilities/blob_db/blob_db_impl.h
  3. 92
      utilities/blob_db/blob_db_test.cc

@ -305,8 +305,8 @@ void BlobDBImpl::StartBackgroundTasks() {
kDeleteCheckPeriodMillisecs, kDeleteCheckPeriodMillisecs,
std::bind(&BlobDBImpl::EvictCompacted, this, std::placeholders::_1)); std::bind(&BlobDBImpl::EvictCompacted, this, std::placeholders::_1));
tqueue_.add( tqueue_.add(
kDeleteObsoletedFilesPeriodMillisecs, kDeleteObsoleteFilesPeriodMillisecs,
std::bind(&BlobDBImpl::DeleteObsFiles, this, std::placeholders::_1)); std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
tqueue_.add(kSanityCheckPeriodMillisecs, tqueue_.add(kSanityCheckPeriodMillisecs,
std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1)); std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
tqueue_.add(kWriteAmplificationStatsPeriodMillisecs, tqueue_.add(kWriteAmplificationStatsPeriodMillisecs,
@ -1117,15 +1117,25 @@ Status BlobDBImpl::AppendSN(const std::shared_ptr<BlobFile>& bfile,
} }
std::vector<Status> BlobDBImpl::MultiGet( std::vector<Status> BlobDBImpl::MultiGet(
const ReadOptions& options, const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family, const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) { const std::vector<Slice>& keys, std::vector<std::string>* values) {
// Get a snapshot to avoid blob file get deleted between we
// fetch and index entry and reading from the file.
ReadOptions ro(read_options);
bool snapshot_created = SetSnapshotIfNeeded(&ro);
std::vector<std::string> values_lsm; std::vector<std::string> values_lsm;
values_lsm.resize(keys.size()); values_lsm.resize(keys.size());
auto statuses = db_->MultiGet(options, column_family, keys, &values_lsm); auto statuses = db_->MultiGet(ro, column_family, keys, &values_lsm);
TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:1");
TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:2");
values->resize(keys.size());
assert(statuses.size() == keys.size());
for (size_t i = 0; i < keys.size(); ++i) { for (size_t i = 0; i < keys.size(); ++i) {
if (!statuses[i].ok()) continue; if (!statuses[i].ok()) {
continue;
}
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
auto cfd = cfh->cfd(); auto cfd = cfh->cfd();
@ -1133,9 +1143,21 @@ std::vector<Status> BlobDBImpl::MultiGet(
Status s = CommonGet(cfd, keys[i], values_lsm[i], &((*values)[i])); Status s = CommonGet(cfd, keys[i], values_lsm[i], &((*values)[i]));
statuses[i] = s; statuses[i] = s;
} }
if (snapshot_created) {
db_->ReleaseSnapshot(ro.snapshot);
}
return statuses; return statuses;
} }
bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
assert(read_options != nullptr);
if (read_options->snapshot != nullptr) {
return false;
}
read_options->snapshot = db_->GetSnapshot();
return true;
}
Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key, Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
const std::string& index_entry, std::string* value, const std::string& index_entry, std::string* value,
SequenceNumber* sequence) { SequenceNumber* sequence) {
@ -1172,11 +1194,6 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
bfile = hitr->second; bfile = hitr->second;
} }
if (bfile->Obsolete()) {
return Status::NotFound(
"Blob Not Found as blob file was garbage collected");
}
// 0 - size // 0 - size
if (!handle.size() && value != nullptr) { if (!handle.size() && value != nullptr) {
value->clear(); value->clear();
@ -1274,25 +1291,30 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
return s; return s;
} }
Status BlobDBImpl::Get(const ReadOptions& options, Status BlobDBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) { PinnableSlice* value) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd(); auto cfd = cfh->cfd();
// Get a snapshot to avoid blob file get deleted between we
// fetch and index entry and reading from the file.
// TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
ReadOptions ro(read_options);
bool snapshot_created = SetSnapshotIfNeeded(&ro);
Status s; Status s;
std::string index_entry; std::string index_entry;
s = db_->Get(options, column_family, key, &index_entry); s = db_->Get(ro, column_family, key, &index_entry);
if (!s.ok()) { TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1");
if (debug_level_ >= 3) TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2");
ROCKS_LOG_WARN(db_options_.info_log, if (s.ok()) {
"Get Failed on LSM KEY: %s status: '%s'", s = CommonGet(cfd, key, index_entry, value->GetSelf());
key.ToString().c_str(), s.ToString().c_str()); value->PinSelf();
return s; }
if (snapshot_created) {
db_->ReleaseSnapshot(ro.snapshot);
} }
s = CommonGet(cfd, key, index_entry, value->GetSelf());
value->PinSelf();
return s; return s;
} }
@ -1302,6 +1324,8 @@ Slice BlobDBIterator::value() const {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh_); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh_);
auto cfd = cfh->cfd(); auto cfd = cfh->cfd();
TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:1");
TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:2");
Status s = db_impl_->CommonGet(cfd, iter_->key(), index_entry.ToString(false), Status s = db_impl_->CommonGet(cfd, iter_->key(), index_entry.ToString(false),
&vpart_); &vpart_);
return Slice(vpart_); return Slice(vpart_);
@ -1977,7 +2001,7 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr<BlobFile> bfile, uint64_t now,
return false; return false;
} }
std::pair<bool, int64_t> BlobDBImpl::DeleteObsFiles(bool aborted) { std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
if (aborted) return std::make_pair(false, -1); if (aborted) return std::make_pair(false, -1);
{ {
@ -2002,6 +2026,7 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsFiles(bool aborted) {
} }
} }
blob_files_.erase(bfile->BlobFileNumber());
Status s = env_->DeleteFile(bfile->PathName()); Status s = env_->DeleteFile(bfile->PathName());
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log,
@ -2026,7 +2051,9 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsFiles(bool aborted) {
// put files back into obsolete if for some reason, delete failed // put files back into obsolete if for some reason, delete failed
if (!tobsolete.empty()) { if (!tobsolete.empty()) {
WriteLock wl(&mutex_); WriteLock wl(&mutex_);
for (auto bfile : tobsolete) obsolete_files_.push_front(bfile); for (auto bfile : tobsolete) {
obsolete_files_.push_front(bfile);
}
} }
return std::make_pair(!aborted, -1); return std::make_pair(!aborted, -1);
@ -2212,8 +2239,6 @@ std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
WriteLock wl(&mutex_); WriteLock wl(&mutex_);
for (auto bfile : obsoletes) { for (auto bfile : obsoletes) {
bool last_file = (bfile == obsoletes.back()); bool last_file = (bfile == obsoletes.back());
// remove from global list so writers
blob_files_.erase(bfile->BlobFileNumber());
if (!evict_cb) { if (!evict_cb) {
bfile->SetCanBeDeleted(); bfile->SetCanBeDeleted();
@ -2231,10 +2256,14 @@ std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
return std::make_pair(true, -1); return std::make_pair(true, -1);
} }
Iterator* BlobDBImpl::NewIterator(const ReadOptions& opts, Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) { ColumnFamilyHandle* column_family) {
return new BlobDBIterator(db_->NewIterator(opts, column_family), // Get a snapshot to avoid blob file get deleted between we
column_family, this); // fetch and index entry and reading from the file.
ReadOptions ro(read_options);
bool snapshot_created = SetSnapshotIfNeeded(&ro);
return new BlobDBIterator(db_->NewIterator(ro, column_family), column_family,
this, snapshot_created, ro.snapshot);
} }
Status DestroyBlobDB(const std::string& dbname, const Options& options, Status DestroyBlobDB(const std::string& dbname, const Options& options,
@ -2283,6 +2312,7 @@ Status BlobDBImpl::TEST_GetSequenceNumber(const Slice& key,
} }
std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const { std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
ReadLock l(&mutex_);
std::vector<std::shared_ptr<BlobFile>> blob_files; std::vector<std::shared_ptr<BlobFile>> blob_files;
for (auto& p : blob_files_) { for (auto& p : blob_files_) {
blob_files.emplace_back(p.second); blob_files.emplace_back(p.second);
@ -2300,6 +2330,10 @@ std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
return obsolete_files; return obsolete_files;
} }
void BlobDBImpl::TEST_DeleteObsoleteFiles() {
DeleteObsoleteFiles(false /*abort*/);
}
void BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) { void BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
CloseSeqWrite(bfile, false /*abort*/); CloseSeqWrite(bfile, false /*abort*/);
} }
@ -2310,6 +2344,16 @@ Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
} }
void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); } void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); }
void BlobDBImpl::TEST_ObsoleteFile(std::shared_ptr<BlobFile>& bfile) {
uint64_t number = bfile->BlobFileNumber();
assert(blob_files_.count(number) > 0);
bfile->SetCanBeDeleted();
{
WriteLock l(&mutex_);
obsolete_files_.push_back(bfile);
}
}
#endif // !NDEBUG #endif // !NDEBUG
} // namespace blob_db } // namespace blob_db

@ -200,7 +200,7 @@ class BlobDBImpl : public BlobDB {
static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000; static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000;
// how often to schedule delete obs files periods // how often to schedule delete obs files periods
static constexpr uint32_t kDeleteObsoletedFilesPeriodMillisecs = 10 * 1000; static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000;
// how often to schedule check seq files period // how often to schedule check seq files period
static constexpr uint32_t kCheckSeqFilesPeriodMillisecs = 10 * 1000; static constexpr uint32_t kCheckSeqFilesPeriodMillisecs = 10 * 1000;
@ -219,16 +219,16 @@ class BlobDBImpl : public BlobDB {
const Slice& key) override; const Slice& key) override;
using rocksdb::StackableDB::Get; using rocksdb::StackableDB::Get;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) override; const Slice& key, PinnableSlice* value) override;
using rocksdb::StackableDB::NewIterator; using rocksdb::StackableDB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& opts, virtual Iterator* NewIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) override; ColumnFamilyHandle* column_family) override;
using rocksdb::StackableDB::MultiGet; using rocksdb::StackableDB::MultiGet;
virtual std::vector<Status> MultiGet( virtual std::vector<Status> MultiGet(
const ReadOptions& options, const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_family, const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, const std::vector<Slice>& keys,
std::vector<std::string>* values) override; std::vector<std::string>* values) override;
@ -269,11 +269,19 @@ class BlobDBImpl : public BlobDB {
GCStats* gc_stats); GCStats* gc_stats);
void TEST_RunGC(); void TEST_RunGC();
void TEST_ObsoleteFile(std::shared_ptr<BlobFile>& bfile);
void TEST_DeleteObsoleteFiles();
#endif // !NDEBUG #endif // !NDEBUG
private: private:
Status OpenPhase1(); Status OpenPhase1();
// Create a snapshot if there isn't one in read options.
// Return true if a snapshot is created.
bool SetSnapshotIfNeeded(ReadOptions* read_options);
Status CommonGet(const ColumnFamilyData* cfd, const Slice& key, Status CommonGet(const ColumnFamilyData* cfd, const Slice& key,
const std::string& index_entry, std::string* value, const std::string& index_entry, std::string* value,
SequenceNumber* sequence = nullptr); SequenceNumber* sequence = nullptr);
@ -332,7 +340,7 @@ class BlobDBImpl : public BlobDB {
// delete files which have been garbage collected and marked // delete files which have been garbage collected and marked
// obsolete. Check whether any snapshots exist which refer to // obsolete. Check whether any snapshots exist which refer to
// the same // the same
std::pair<bool, int64_t> DeleteObsFiles(bool aborted); std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted);
// Major task to garbage collect expired and deleted blobs // Major task to garbage collect expired and deleted blobs
std::pair<bool, int64_t> RunGC(bool aborted); std::pair<bool, int64_t> RunGC(bool aborted);
@ -593,7 +601,7 @@ class BlobFile {
// This Read-Write mutex is per file specific and protects // This Read-Write mutex is per file specific and protects
// all the datastructures // all the datastructures
port::RWMutex mutex_; mutable port::RWMutex mutex_;
// time when the random access reader was last created. // time when the random access reader was last created.
std::atomic<std::time_t> last_access_; std::atomic<std::time_t> last_access_;
@ -700,12 +708,23 @@ class BlobFile {
class BlobDBIterator : public Iterator { class BlobDBIterator : public Iterator {
public: public:
explicit BlobDBIterator(Iterator* iter, ColumnFamilyHandle* column_family, explicit BlobDBIterator(Iterator* iter, ColumnFamilyHandle* column_family,
BlobDBImpl* impl) BlobDBImpl* impl, bool own_snapshot,
: iter_(iter), cfh_(column_family), db_impl_(impl) { const Snapshot* snapshot)
assert(iter_); : iter_(iter),
cfh_(column_family),
db_impl_(impl),
own_snapshot_(own_snapshot),
snapshot_(snapshot) {
assert(iter != nullptr);
assert(snapshot != nullptr);
} }
~BlobDBIterator() { delete iter_; } ~BlobDBIterator() {
if (own_snapshot_) {
db_impl_->ReleaseSnapshot(snapshot_);
}
delete iter_;
}
bool Valid() const override { return iter_->Valid(); } bool Valid() const override { return iter_->Valid(); }
@ -727,10 +746,14 @@ class BlobDBIterator : public Iterator {
Status status() const override { return iter_->status(); } Status status() const override { return iter_->status(); }
// Iterator::Refresh() not supported.
private: private:
Iterator* iter_; Iterator* iter_;
ColumnFamilyHandle* cfh_; ColumnFamilyHandle* cfh_;
BlobDBImpl* db_impl_; BlobDBImpl* db_impl_;
bool own_snapshot_;
const Snapshot* snapshot_;
mutable std::string vpart_; mutable std::string vpart_;
}; };

@ -723,6 +723,98 @@ TEST_F(BlobDBTest, GCOldestSimpleBlobFileWhenOutOfSpace) {
obsolete_files[0]->BlobFileNumber()); obsolete_files[0]->BlobFileNumber());
} }
TEST_F(BlobDBTest, ReadWhileGC) {
// run the same test for Get(), MultiGet() and Iterator each.
for (int i = 0; i < 3; i++) {
BlobDBOptions bdb_options;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
blob_db_->Put(WriteOptions(), "foo", "bar");
BlobDBImpl *blob_db_impl =
static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
std::shared_ptr<BlobFile> bfile = blob_files[0];
uint64_t bfile_number = bfile->BlobFileNumber();
blob_db_impl->TEST_CloseBlobFile(bfile);
switch (i) {
case 0:
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::Get:AfterIndexEntryGet:1",
"BlobDBTest::ReadWhileGC:1"},
{"BlobDBTest::ReadWhileGC:2",
"BlobDBImpl::Get:AfterIndexEntryGet:2"}});
break;
case 1:
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::MultiGet:AfterIndexEntryGet:1",
"BlobDBTest::ReadWhileGC:1"},
{"BlobDBTest::ReadWhileGC:2",
"BlobDBImpl::MultiGet:AfterIndexEntryGet:2"}});
break;
case 2:
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBIterator::value:BeforeGetBlob:1",
"BlobDBTest::ReadWhileGC:1"},
{"BlobDBTest::ReadWhileGC:2",
"BlobDBIterator::value:BeforeGetBlob:2"}});
break;
}
SyncPoint::GetInstance()->EnableProcessing();
auto reader = port::Thread([this, i]() {
std::string value;
std::vector<std::string> values;
std::vector<Status> statuses;
switch (i) {
case 0:
ASSERT_OK(blob_db_->Get(ReadOptions(), "foo", &value));
ASSERT_EQ("bar", value);
break;
case 1:
statuses = blob_db_->MultiGet(ReadOptions(), {"foo"}, &values);
ASSERT_EQ(1, statuses.size());
ASSERT_EQ(1, values.size());
ASSERT_EQ("bar", values[0]);
break;
case 2:
// VerifyDB use iterator to scan the DB.
VerifyDB({{"foo", "bar"}});
break;
}
});
TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:1");
GCStats gc_stats;
ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(bfile, &gc_stats));
ASSERT_EQ(1, gc_stats.blob_count);
ASSERT_EQ(1, gc_stats.num_relocate);
ASSERT_EQ(1, gc_stats.relocate_succeeded);
blob_db_impl->TEST_ObsoleteFile(blob_files[0]);
blob_db_impl->TEST_DeleteObsoleteFiles();
// The file shouln't be deleted
blob_files = blob_db_impl->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
ASSERT_EQ(bfile_number, blob_files[0]->BlobFileNumber());
auto obsolete_files = blob_db_impl->TEST_GetObsoleteFiles();
ASSERT_EQ(1, obsolete_files.size());
ASSERT_EQ(bfile_number, obsolete_files[0]->BlobFileNumber());
TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:2");
reader.join();
SyncPoint::GetInstance()->DisableProcessing();
// The file is deleted this time
blob_db_impl->TEST_DeleteObsoleteFiles();
blob_files = blob_db_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_NE(bfile_number, blob_files[0]->BlobFileNumber());
ASSERT_EQ(0, blob_db_impl->TEST_GetObsoleteFiles().size());
VerifyDB({{"foo", "bar"}});
Destroy();
}
}
} // namespace blob_db } // namespace blob_db
} // namespace rocksdb } // namespace rocksdb

Loading…
Cancel
Save