BlobDB: Implement DisableFileDeletions (#4314)

Summary:
`DB::DiableFileDeletions` and `DB::EnableFileDeletions` are used for applications to stop RocksDB background jobs to delete files while they are doing replication. Implement these methods for BlobDB. `DeleteObsolteFiles` now needs to check `disable_file_deletions_` before starting, and will hold `delete_file_mutex_` the whole time while it is running. `DisableFileDeletions` needs to wait on `delete_file_mutex_` for running `DeleteObsolteFiles` job and set `disable_file_deletions_` flag.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4314

Differential Revision: D9501373

Pulled By: yiwu-arbug

fbshipit-source-id: 81064c1228f1724eff46da22b50ff765b16292cd
main
Yi Wu 6 years ago committed by Facebook Github Bot
parent 2f871bc85e
commit a6d3de4e7a
  1. 1
      CMakeLists.txt
  2. 22
      TARGETS
  3. 2
      src.mk
  4. 46
      utilities/blob_db/blob_db_impl.cc
  5. 35
      utilities/blob_db/blob_db_impl.h
  6. 97
      utilities/blob_db/blob_db_impl_filesnapshot.cc
  7. 41
      utilities/blob_db/blob_db_test.cc

@ -612,6 +612,7 @@ set(SOURCES
utilities/blob_db/blob_compaction_filter.cc
utilities/blob_db/blob_db.cc
utilities/blob_db/blob_db_impl.cc
utilities/blob_db/blob_db_impl_filesnapshot.cc
utilities/blob_db/blob_dump_tool.cc
utilities/blob_db/blob_file.cc
utilities/blob_db/blob_log_reader.cc

@ -233,6 +233,7 @@ cpp_library(
"utilities/blob_db/blob_compaction_filter.cc",
"utilities/blob_db/blob_db.cc",
"utilities/blob_db/blob_db_impl.cc",
"utilities/blob_db/blob_db_impl_filesnapshot.cc",
"utilities/blob_db/blob_dump_tool.cc",
"utilities/blob_db/blob_file.cc",
"utilities/blob_db/blob_log_format.cc",
@ -376,11 +377,6 @@ ROCKS_TESTS = [
"table/block_based_filter_block_test.cc",
"serial",
],
[
"data_block_hash_index_test",
"table/data_block_hash_index_test.cc",
"serial",
],
[
"block_test",
"table/block_test.cc",
@ -506,6 +502,11 @@ ROCKS_TESTS = [
"table/cuckoo_table_reader_test.cc",
"serial",
],
[
"data_block_hash_index_test",
"table/data_block_hash_index_test.cc",
"serial",
],
[
"date_tiered_test",
"utilities/date_tiered/date_tiered_test.cc",
@ -956,11 +957,6 @@ ROCKS_TESTS = [
"tools/sst_dump_test.cc",
"serial",
],
[
"trace_analyzer_test",
"tools/trace_analyzer_test.cc",
"serial",
],
[
"statistics_test",
"monitoring/statistics_test.cc",
@ -996,6 +992,11 @@ ROCKS_TESTS = [
"util/timer_queue_test.cc",
"serial",
],
[
"trace_analyzer_test",
"tools/trace_analyzer_test.cc",
"serial",
],
[
"transaction_test",
"utilities/transactions/transaction_test.cc",
@ -1094,3 +1095,4 @@ if not is_opt_mode:
deps = [":" + test_bin],
command = [TEST_RUNNER, BUCK_BINS + test_bin]
)

@ -122,6 +122,7 @@ LIB_SOURCES = \
table/plain_table_reader.cc \
table/sst_file_writer.cc \
table/table_properties.cc \
tools/trace_analyzer_tool.cc \
table/two_level_iterator.cc \
tools/dump/db_dump_tool.cc \
util/arena.cc \
@ -161,6 +162,7 @@ LIB_SOURCES = \
utilities/blob_db/blob_compaction_filter.cc \
utilities/blob_db/blob_db.cc \
utilities/blob_db/blob_db_impl.cc \
utilities/blob_db/blob_db_impl_filesnapshot.cc \
utilities/blob_db/blob_file.cc \
utilities/blob_db/blob_log_format.cc \
utilities/blob_db/blob_log_reader.cc \

@ -621,39 +621,6 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
return db_->Write(options, blob_inserter.batch());
}
Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret,
uint64_t* manifest_file_size,
bool flush_memtable) {
// Hold a lock in the beginning to avoid updates to base DB during the call
ReadLock rl(&mutex_);
Status s = db_->GetLiveFiles(ret, manifest_file_size, flush_memtable);
if (!s.ok()) {
return s;
}
ret.reserve(ret.size() + blob_files_.size());
for (auto bfile_pair : blob_files_) {
auto blob_file = bfile_pair.second;
ret.emplace_back(blob_file->PathName());
}
return Status::OK();
}
void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
// Hold a lock in the beginning to avoid updates to base DB during the call
ReadLock rl(&mutex_);
db_->GetLiveFilesMetaData(metadata);
for (auto bfile_pair : blob_files_) {
auto blob_file = bfile_pair.second;
LiveFileMetaData filemetadata;
filemetadata.size = blob_file->GetFileSize();
filemetadata.name = blob_file->PathName();
auto cfh =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
filemetadata.column_family_name = cfh->GetName();
metadata->emplace_back(filemetadata);
}
}
Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& value) {
return PutUntil(options, key, value, kNoExpiration);
@ -1684,16 +1651,21 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
}
std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
if (aborted) return std::make_pair(false, -1);
if (aborted) {
return std::make_pair(false, -1);
}
{
ReadLock rl(&mutex_);
if (obsolete_files_.empty()) return std::make_pair(true, -1);
MutexLock delete_file_lock(&delete_file_mutex_);
if (disable_file_deletions_ > 0) {
return std::make_pair(true, -1);
}
std::list<std::shared_ptr<BlobFile>> tobsolete;
{
WriteLock wl(&mutex_);
if (obsolete_files_.empty()) {
return std::make_pair(true, -1);
}
tobsolete.swap(obsolete_files_);
}

@ -155,12 +155,6 @@ class BlobDBImpl : public BlobDB {
virtual Status Close() override;
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size,
bool flush_memtable = true) override;
virtual void GetLiveFilesMetaData(
std::vector<LiveFileMetaData>* ) override;
using BlobDB::PutWithTTL;
Status PutWithTTL(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) override;
@ -175,6 +169,15 @@ class BlobDBImpl : public BlobDB {
const DBOptions& db_options,
const ColumnFamilyOptions& cf_options);
virtual Status DisableFileDeletions() override;
virtual Status EnableFileDeletions(bool force) override;
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size,
bool flush_memtable = true) override;
virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override;
~BlobDBImpl();
Status Open(std::vector<ColumnFamilyHandle*>* handles);
@ -408,6 +411,26 @@ class BlobDBImpl : public BlobDB {
std::list<std::shared_ptr<BlobFile>> obsolete_files_;
// DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block
// on the mutex to avoid contention.
//
// While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note
// the difference. mutex_ only needs to be held when access the
// data-structure, and delete_file_mutex_ needs to be held the whole time
// during DeleteObsoleteFiles to avoid being run simultaneously with
// DisableFileDeletions.
//
// If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced
// to hold delete_file_mutex_ first to avoid deadlock.
mutable port::Mutex delete_file_mutex_;
// Each call of DisableFileDeletions will increase disable_file_deletion_
// by 1. EnableFileDeletions will either decrease the count by 1 or reset
// it to zeor, depending on the force flag.
//
// REQUIRES: access with delete_file_mutex_ held.
int disable_file_deletions_ = 0;
uint32_t debug_level_;
};

@ -0,0 +1,97 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_db_impl.h"
#include "util/logging.h"
#include "util/mutexlock.h"
// BlobDBImpl methods to get snapshot of files, e.g. for replication.
namespace rocksdb {
namespace blob_db {
Status BlobDBImpl::DisableFileDeletions() {
// Disable base DB file deletions.
Status s = db_impl_->DisableFileDeletions();
if (!s.ok()) {
return s;
}
int count = 0;
{
// Hold delete_file_mutex_ to make sure no DeleteObsoleteFiles job
// is running.
MutexLock l(&delete_file_mutex_);
count = ++disable_file_deletions_;
}
ROCKS_LOG_INFO(db_options_.info_log,
"Disalbed blob file deletions. count: %d", count);
return Status::OK();
}
Status BlobDBImpl::EnableFileDeletions(bool force) {
// Enable base DB file deletions.
Status s = db_impl_->EnableFileDeletions(force);
if (!s.ok()) {
return s;
}
int count = 0;
{
MutexLock l(&delete_file_mutex_);
if (force) {
disable_file_deletions_ = 0;
} else if (disable_file_deletions_ > 0) {
count = --disable_file_deletions_;
}
assert(count >= 0);
}
ROCKS_LOG_INFO(db_options_.info_log, "Enabled blob file deletions. count: %d",
count);
// Consider trigger DeleteobsoleteFiles once after re-enabled, if we are to
// make DeleteobsoleteFiles re-run interval configuration.
return Status::OK();
}
Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret,
uint64_t* manifest_file_size,
bool flush_memtable) {
// Hold a lock in the beginning to avoid updates to base DB during the call
ReadLock rl(&mutex_);
Status s = db_->GetLiveFiles(ret, manifest_file_size, flush_memtable);
if (!s.ok()) {
return s;
}
ret.reserve(ret.size() + blob_files_.size());
for (auto bfile_pair : blob_files_) {
auto blob_file = bfile_pair.second;
ret.emplace_back(blob_file->PathName());
}
return Status::OK();
}
void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
// Hold a lock in the beginning to avoid updates to base DB during the call
ReadLock rl(&mutex_);
db_->GetLiveFilesMetaData(metadata);
for (auto bfile_pair : blob_files_) {
auto blob_file = bfile_pair.second;
LiveFileMetaData filemetadata;
filemetadata.size = blob_file->GetFileSize();
filemetadata.name = blob_file->PathName();
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
filemetadata.column_family_name = cfh->GetName();
metadata->emplace_back(filemetadata);
}
}
} // namespace blob_db
} // namespace rocksdb
#endif // !ROCKSDB_LITE

@ -1415,6 +1415,47 @@ TEST_F(BlobDBTest, EvictExpiredFile) {
ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
}
TEST_F(BlobDBTest, DisableFileDeletions) {
BlobDBOptions bdb_options;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::map<std::string, std::string> data;
for (bool force : {true, false}) {
ASSERT_OK(Put("foo", "v", &data));
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
auto blob_file = blob_files[0];
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
// Call DisableFileDeletions twice.
ASSERT_OK(blob_db_->DisableFileDeletions());
ASSERT_OK(blob_db_->DisableFileDeletions());
// File deletions should be disabled.
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
VerifyDB(data);
// Enable file deletions once. If force=true, file deletion is enabled.
// Otherwise it needs to enable it for a second time.
ASSERT_OK(blob_db_->EnableFileDeletions(force));
blob_db_impl()->TEST_DeleteObsoleteFiles();
if (!force) {
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
VerifyDB(data);
// Call EnableFileDeletions a second time.
ASSERT_OK(blob_db_->EnableFileDeletions(false));
blob_db_impl()->TEST_DeleteObsoleteFiles();
}
// Regardless of value of `force`, file should be deleted by now.
ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
VerifyDB({});
}
}
} // namespace blob_db
} // namespace rocksdb

Loading…
Cancel
Save