Let DBSecondary close files after catch up (#6114)

Summary:
After secondary instance replays the logs from primary, certain files become
obsolete. The secondary should find these files, evict their table readers from
table cache and close them. If this is not done, the secondary will hold on to
these files and prevent their space from being freed.

Test plan (devserver):
```
$./db_secondary_test --gtest_filter=DBSecondaryTest.SecondaryCloseFiles
$make check
$./db_stress -ops_per_thread=100000 -enable_secondary=true -threads=32 -secondary_catch_up_one_in=10000 -clear_column_family_one_in=1000 -reopen=100
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6114

Differential Revision: D18769998

Pulled By: riversand963

fbshipit-source-id: 5d1f151567247196164e1b79d8402fa2045b9120
main
Yanqin Jin 5 years ago committed by Facebook Github Bot
parent 16fa6fd2a6
commit fe1147db1c
  1. 2
      db/db_impl/db_impl.h
  2. 8
      db/db_impl/db_impl_files.cc
  3. 85
      db/db_impl/db_impl_secondary.cc
  4. 26
      db/db_impl/db_impl_secondary.h
  5. 84
      db/db_impl/db_secondary_test.cc

@ -1109,6 +1109,8 @@ class DBImpl : public DB {
bool read_only = false, bool error_if_log_file_exist = false,
bool error_if_data_exists_in_logs = false);
virtual bool OwnTablesAndLogs() const { return true; }
private:
friend class DB;
friend class ErrorHandler;

@ -385,6 +385,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
w->Close();
}
bool own_files = OwnTablesAndLogs();
std::unordered_set<uint64_t> files_to_del;
for (const auto& candidate_file : candidate_files) {
const std::string& to_delete = candidate_file.file_name;
@ -484,6 +485,12 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
}
#endif // !ROCKSDB_LITE
// If I do not own these files, e.g. secondary instance with max_open_files
// = -1, then no need to delete or schedule delete these files since they
// will be removed by their owner, e.g. the primary instance.
if (!own_files) {
continue;
}
Status file_deletion_status;
if (schedule_only) {
InstrumentedMutexLock guard_lock(&mutex_);
@ -495,7 +502,6 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
{
// After purging obsolete files, remove them from files_grabbed_for_purge_.
// Use a temporary vector to perform bulk deletion via swap.
InstrumentedMutexLock guard_lock(&mutex_);
autovector<uint64_t> to_be_removed;
for (auto fn : files_grabbed_for_purge_) {

@ -11,6 +11,7 @@
#include "db/merge_context.h"
#include "logging/auto_roll_logger.h"
#include "monitoring/perf_context_imp.h"
#include "util/cast_util.h"
namespace rocksdb {
@ -497,45 +498,61 @@ Status DBImplSecondary::TryCatchUpWithPrimary() {
// read the manifest and apply new changes to the secondary instance
std::unordered_set<ColumnFamilyData*> cfds_changed;
JobContext job_context(0, true /*create_superversion*/);
InstrumentedMutexLock lock_guard(&mutex_);
s = static_cast<ReactiveVersionSet*>(versions_.get())
->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
static_cast<uint64_t>(versions_->LastSequence()));
for (ColumnFamilyData* cfd : cfds_changed) {
if (cfd->IsDropped()) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
cfd->GetName().c_str());
continue;
{
InstrumentedMutexLock lock_guard(&mutex_);
s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
static_cast<uint64_t>(versions_->LastSequence()));
for (ColumnFamilyData* cfd : cfds_changed) {
if (cfd->IsDropped()) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
cfd->GetName().c_str());
continue;
}
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Level summary: %s\n", cfd->GetName().c_str(),
cfd->current()->storage_info()->LevelSummary(&tmp));
}
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] Level summary: %s\n",
cfd->GetName().c_str(),
cfd->current()->storage_info()->LevelSummary(&tmp));
}
// list wal_dir to discover new WALs and apply new changes to the secondary
// instance
if (s.ok()) {
s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
// list wal_dir to discover new WALs and apply new changes to the secondary
// instance
if (s.ok()) {
s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
}
if (s.IsPathNotFound()) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Secondary tries to read WAL, but WAL file(s) have already "
"been purged by primary.");
s = Status::OK();
}
if (s.ok()) {
for (auto cfd : cfds_changed) {
cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
&job_context.memtables_to_free);
auto& sv_context = job_context.superversion_contexts.back();
cfd->InstallSuperVersion(&sv_context, &mutex_);
sv_context.NewSuperVersion();
}
}
}
if (s.IsPathNotFound()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Secondary tries to read WAL, but WAL file(s) have already "
"been purged by primary.");
s = Status::OK();
job_context.Clean();
// Cleanup unused, obsolete files.
JobContext purge_files_job_context(0);
{
InstrumentedMutexLock lock_guard(&mutex_);
// Currently, secondary instance does not own the database files, thus it
// is unnecessary for the secondary to force full scan.
FindObsoleteFiles(&purge_files_job_context, /*force=*/false);
}
if (s.ok()) {
for (auto cfd : cfds_changed) {
cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
&job_context.memtables_to_free);
auto& sv_context = job_context.superversion_contexts.back();
cfd->InstallSuperVersion(&sv_context, &mutex_);
sv_context.NewSuperVersion();
}
job_context.Clean();
if (purge_files_job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(purge_files_job_context);
}
purge_files_job_context.Clean();
return s;
}

@ -172,6 +172,24 @@ class DBImplSecondary : public DBImpl {
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DBImpl::SetDBOptions;
Status SetDBOptions(const std::unordered_map<std::string, std::string>&
/*options_map*/) override {
// Currently not supported because changing certain options may cause
// flush/compaction.
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DBImpl::SetOptions;
Status SetOptions(
ColumnFamilyHandle* /*cfd*/,
const std::unordered_map<std::string, std::string>& /*options_map*/)
override {
// Currently not supported because changing certain options may cause
// flush/compaction and/or write to MANIFEST.
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DBImpl::SyncWAL;
Status SyncWAL() override {
return Status::NotSupported("Not supported operation in secondary mode.");
@ -269,6 +287,14 @@ class DBImplSecondary : public DBImpl {
return s;
}
bool OwnTablesAndLogs() const override {
// Currently, the secondary instance does not own the database files. It
// simply opens the files of the primary instance and tracks their file
// descriptors until they become obsolete. In the future, the secondary may
// create links to database files. OwnTablesAndLogs will return true then.
return false;
}
private:
friend class DB;

@ -195,6 +195,90 @@ TEST_F(DBSecondaryTest, OpenAsSecondary) {
verify_db_func("new_foo_value", "new_bar_value");
}
namespace {
class TraceFileEnv : public EnvWrapper {
public:
explicit TraceFileEnv(Env* target) : EnvWrapper(target) {}
Status NewRandomAccessFile(const std::string& f,
std::unique_ptr<RandomAccessFile>* r,
const EnvOptions& env_options) override {
class TracedRandomAccessFile : public RandomAccessFile {
public:
TracedRandomAccessFile(std::unique_ptr<RandomAccessFile>&& target,
std::atomic<int>& counter)
: target_(std::move(target)), files_closed_(counter) {}
~TracedRandomAccessFile() override {
files_closed_.fetch_add(1, std::memory_order_relaxed);
}
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
return target_->Read(offset, n, result, scratch);
}
private:
std::unique_ptr<RandomAccessFile> target_;
std::atomic<int>& files_closed_;
};
Status s = target()->NewRandomAccessFile(f, r, env_options);
if (s.ok()) {
r->reset(new TracedRandomAccessFile(std::move(*r), files_closed_));
}
return s;
}
int files_closed() const {
return files_closed_.load(std::memory_order_relaxed);
}
private:
std::atomic<int> files_closed_{0};
};
} // namespace
TEST_F(DBSecondaryTest, SecondaryCloseFiles) {
Options options;
options.env = env_;
options.max_open_files = 1;
options.disable_auto_compactions = true;
Reopen(options);
Options options1;
std::unique_ptr<Env> traced_env(new TraceFileEnv(env_));
options1.env = traced_env.get();
OpenSecondary(options1);
static const auto verify_db = [&]() {
std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(ReadOptions()));
for (iter1->SeekToFirst(), iter2->SeekToFirst();
iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
ASSERT_EQ(iter1->key(), iter2->key());
ASSERT_EQ(iter1->value(), iter2->value());
}
ASSERT_FALSE(iter1->Valid());
ASSERT_FALSE(iter2->Valid());
};
ASSERT_OK(Put("a", "value"));
ASSERT_OK(Put("c", "value"));
ASSERT_OK(Flush());
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
verify_db();
ASSERT_OK(Put("b", "value"));
ASSERT_OK(Put("d", "value"));
ASSERT_OK(Flush());
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
verify_db();
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
ASSERT_EQ(2, static_cast<TraceFileEnv*>(traced_env.get())->files_closed());
Status s = db_secondary_->SetDBOptions({{"max_open_files", "-1"}});
ASSERT_TRUE(s.IsNotSupported());
CloseSecondary();
}
TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) {
Options options;
options.env = env_;

Loading…
Cancel
Save