make blob file close synchronous

Summary:
Fixing flaky blob_db_test.

To close a blob file, blob db used to add a CloseSeqWrite job to the background thread to close it. Changing file close to be synchronous in order to simplify logic, and fix flaky blob_db_test.
Closes https://github.com/facebook/rocksdb/pull/2787

Differential Revision: D5699387

Pulled By: yiwu-arbug

fbshipit-source-id: dd07a945cd435cd3808fce7ee4ea57817409474a
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 3c840d1a6d
commit 503db684f7
  1. 82
      utilities/blob_db/blob_db_impl.cc
  2. 16
      utilities/blob_db/blob_db_impl.h
  3. 21
      utilities/blob_db/blob_db_test.cc

@ -891,7 +891,10 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
}
if (blob_inserter.has_put()) {
CloseIf(blob_inserter.last_file());
s = CloseBlobFileIfNeeded(blob_inserter.last_file());
if (!s.ok()) {
return s;
}
}
// add deleted key to list of keys that have been deleted for book-keeping
@ -1022,7 +1025,9 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options,
extendTTL(&(bfile->ttl_range_), expiration);
}
CloseIf(bfile);
if (s.ok()) {
s = CloseBlobFileIfNeeded(bfile);
}
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish");
return s;
@ -1362,58 +1367,44 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
return std::make_pair(true, -1);
}
std::pair<bool, int64_t> BlobDBImpl::CloseSeqWrite(
std::shared_ptr<BlobFile> bfile, bool aborted) {
Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
Status s;
ROCKS_LOG_INFO(db_options_.info_log, "Close blob file %" PRIu64,
bfile->BlobFileNumber());
{
WriteLock wl(&mutex_);
// this prevents others from picking up this file
open_blob_files_.erase(bfile);
auto findit =
std::find(open_simple_files_.begin(), open_simple_files_.end(), bfile);
if (findit != open_simple_files_.end()) open_simple_files_.erase(findit);
if (bfile->HasTTL()) {
size_t erased __attribute__((__unused__)) = open_blob_files_.erase(bfile);
assert(erased == 1);
} else {
auto iter = std::find(open_simple_files_.begin(),
open_simple_files_.end(), bfile);
assert(iter != open_simple_files_.end());
open_simple_files_.erase(iter);
}
}
if (!bfile->closed_.load()) {
WriteLock lockbfile_w(&bfile->mutex_);
bfile->WriteFooterAndCloseLocked();
s = bfile->WriteFooterAndCloseLocked();
}
return std::make_pair(false, -1);
}
void BlobDBImpl::CloseIf(const std::shared_ptr<BlobFile>& bfile) {
// atomic read
bool close = bfile->GetFileSize() > bdb_options_.blob_file_size;
if (!close) return;
if (debug_level_ >= 2) {
ROCKS_LOG_DEBUG(db_options_.info_log,
"Scheduling file for close %s fsize: %" PRIu64
" limit: %" PRIu64,
bfile->PathName().c_str(), bfile->GetFileSize(),
bdb_options_.blob_file_size);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to close blob file %" PRIu64 "with error: %s",
bfile->BlobFileNumber(), s.ToString().c_str());
}
{
WriteLock wl(&mutex_);
return s;
}
open_blob_files_.erase(bfile);
auto findit =
std::find(open_simple_files_.begin(), open_simple_files_.end(), bfile);
if (findit != open_simple_files_.end()) {
open_simple_files_.erase(findit);
} else {
ROCKS_LOG_WARN(db_options_.info_log,
"File not found while closing %s fsize: %" PRIu64
" Multithreaded Writes?",
bfile->PathName().c_str(), bfile->GetFileSize());
}
Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
// atomic read
if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
return Status::OK();
}
tqueue_.add(0, std::bind(&BlobDBImpl::CloseSeqWrite, this, bfile,
std::placeholders::_1));
return CloseBlobFile(bfile);
}
bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked(
@ -1585,7 +1576,7 @@ std::pair<bool, int64_t> BlobDBImpl::CheckSeqFiles(bool aborted) {
}
for (auto bfile : process_files) {
CloseSeqWrite(bfile, false);
CloseBlobFile(bfile);
}
return std::make_pair(true, -1);
@ -1916,7 +1907,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
delete transaction;
}
ROCKS_LOG_INFO(
db_options_.info_log, "%s blob file %" PRIu64 ".",
db_options_.info_log,
"%s blob file %" PRIu64
". Total blob records: %" PRIu64 ", Deletes: %" PRIu64 "/%" PRIu64
" succeeded, Relocates: %" PRIu64 "/%" PRIu64 " succeeded.",
s.ok() ? "Successfully garbage collected" : "Failed to garbage collect",
@ -2334,8 +2326,8 @@ void BlobDBImpl::TEST_DeleteObsoleteFiles() {
DeleteObsoleteFiles(false /*abort*/);
}
void BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
CloseSeqWrite(bfile, false /*abort*/);
Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
return CloseBlobFile(bfile);
}
Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,

@ -263,7 +263,7 @@ class BlobDBImpl : public BlobDB {
std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
void TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
Status TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
GCStats* gc_stats);
@ -293,11 +293,6 @@ class BlobDBImpl : public BlobDB {
// this handler is called.
void OnFlushBeginHandler(DB* db, const FlushJobInfo& info);
// timer queue callback to close a file by appending a footer
// removes file from open files list
std::pair<bool, int64_t> CloseSeqWrite(std::shared_ptr<BlobFile> bfile,
bool aborted);
// is this file ready for Garbage collection. if the TTL of the file
// has expired or if threshold of the file has been evicted
// tt - current time
@ -308,8 +303,11 @@ class BlobDBImpl : public BlobDB {
// collect all the blob log files from the blob directory
Status GetAllLogFiles(std::set<std::pair<uint64_t, std::string>>* file_nums);
// appends a task into timer queue to close the file
void CloseIf(const std::shared_ptr<BlobFile>& bfile);
// Close a file by appending a footer, and removes file from open files list.
Status CloseBlobFile(std::shared_ptr<BlobFile> bfile);
// Close a file if its size exceeds blob_file_size
Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile);
uint64_t ExtractExpiration(const Slice& key, const Slice& value,
Slice* value_slice, std::string* new_value);
@ -470,7 +468,7 @@ class BlobDBImpl : public BlobDB {
// epoch or version of the open files.
std::atomic<uint64_t> epoch_of_;
// typically we keep 4 open blob files (simple i.e. no TTL)
// All opened non-TTL blob files.
std::vector<std::shared_ptr<BlobFile>> open_simple_files_;
// all the blob files which are currently being appended to based

@ -185,7 +185,7 @@ TEST_F(BlobDBTest, PutWithTTL) {
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
bdb_impl->TEST_CloseBlobFile(blob_files[0]);
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
@ -214,7 +214,7 @@ TEST_F(BlobDBTest, PutUntil) {
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
bdb_impl->TEST_CloseBlobFile(blob_files[0]);
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
@ -246,7 +246,7 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) {
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_FALSE(blob_files[0]->HasTTL());
bdb_impl->TEST_CloseBlobFile(blob_files[0]);
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(0, gc_stats.num_deletes);
@ -291,7 +291,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) {
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
bdb_impl->TEST_CloseBlobFile(blob_files[0]);
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
@ -338,7 +338,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) {
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
bdb_impl->TEST_CloseBlobFile(blob_files[0]);
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
@ -395,7 +395,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
bdb_impl->TEST_CloseBlobFile(blob_files[0]);
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
@ -592,7 +592,7 @@ TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
}
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0]));
// Test for data in SST
size_t new_keys = 0;
for (int i = 0; i < 100; i++) {
@ -627,7 +627,7 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) {
static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0]));
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate",
@ -663,7 +663,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0]));
mock_env_->set_now_micros(300 * 1000000);
SyncPoint::GetInstance()->LoadDependency(
@ -708,7 +708,6 @@ TEST_F(BlobDBTest, GCOldestSimpleBlobFileWhenOutOfSpace) {
ASSERT_EQ(11, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
ASSERT_TRUE(blob_files[0]->Immutable());
blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
for (int i = 1; i <= 10; i++) {
ASSERT_FALSE(blob_files[i]->HasTTL());
if (i < 10) {
@ -736,7 +735,7 @@ TEST_F(BlobDBTest, ReadWhileGC) {
ASSERT_EQ(1, blob_files.size());
std::shared_ptr<BlobFile> bfile = blob_files[0];
uint64_t bfile_number = bfile->BlobFileNumber();
blob_db_impl->TEST_CloseBlobFile(bfile);
ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(bfile));
switch (i) {
case 0:

Loading…
Cancel
Save