Redo handling of recycled logs in full purge

Summary:
This reverts commit 9e4aa798c3,
which doesn't handle all cases (see inline comment).

I reimplemented the logic as suggested in the initial PR: https://github.com/facebook/rocksdb/pull/1313.

This approach has two benefits:

- All the parsing/filtering of full_scan_candidate_files is kept together in PurgeObsoleteFiles.
- We only need to check whether log file is recycled in one place where we've already determined it's a log file

Test Plan:
new unit test, verified fails before the original fix, still passes
now.

Reviewers: IslamAbdelRahman, yiwu, sdong

Reviewed By: yiwu, sdong

Subscribers: leveldb, dhruba, andrewkr

Differential Revision: https://reviews.facebook.net/D64053
main
Andrew Kryczka 8 years ago
parent 27bfe327b2
commit 1b7af5fb1a
  1. 91
      db/db_impl.cc
  2. 36
      db/db_wal_test.cc

@ -822,6 +822,41 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
job_context->prev_log_number = versions_->prev_log_number();
versions_->AddLiveFiles(&job_context->sst_live);
if (doing_the_full_scan) {
for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
path_id++) {
// set of all files in the directory. We'll exclude files that are still
// alive in the subsequent processings.
std::vector<std::string> files;
env_->GetChildren(immutable_db_options_.db_paths[path_id].path,
&files); // Ignore errors
for (std::string file : files) {
// TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
job_context->full_scan_candidate_files.emplace_back(
"/" + file, static_cast<uint32_t>(path_id));
}
}
// Add log files in wal_dir
if (immutable_db_options_.wal_dir != dbname_) {
std::vector<std::string> log_files;
env_->GetChildren(immutable_db_options_.wal_dir,
&log_files); // Ignore errors
for (std::string log_file : log_files) {
job_context->full_scan_candidate_files.emplace_back(log_file, 0);
}
}
// Add info log files in db_log_dir
if (!immutable_db_options_.db_log_dir.empty() &&
immutable_db_options_.db_log_dir != dbname_) {
std::vector<std::string> info_log_files;
// Ignore errors
env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files);
for (std::string log_file : info_log_files) {
job_context->full_scan_candidate_files.emplace_back(log_file, 0);
}
}
}
if (!alive_log_files_.empty()) {
uint64_t min_log_number = job_context->log_number;
@ -862,62 +897,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
assert(!logs_.empty());
}
if (doing_the_full_scan) {
for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
path_id++) {
// set of all files in the directory. We'll exclude files that are still
// alive in the subsequent processings.
std::vector<std::string> files;
env_->GetChildren(immutable_db_options_.db_paths[path_id].path,
&files); // Ignore errors
for (std::string file : files) {
// TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
job_context->full_scan_candidate_files.emplace_back(
"/" + file, static_cast<uint32_t>(path_id));
}
}
//Add log files in wal_dir
if (immutable_db_options_.wal_dir != dbname_) {
std::vector<std::string> log_files;
env_->GetChildren(immutable_db_options_.wal_dir,
&log_files); // Ignore errors
InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
dbname_);
for (std::string log_file : log_files) {
uint64_t number;
FileType type;
// Ignore file if we cannot recognize it.
if (!ParseFileName(log_file, &number, info_log_prefix.prefix, &type)) {
Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log,
"Unrecognized log file %s \n", log_file.c_str());
continue;
}
// If the log file is already in the log recycle list , don't put
// it in the candidate list.
if (std::find(log_recycle_files.begin(), log_recycle_files.end(),
number) != log_recycle_files.end()) {
Log(InfoLogLevel::INFO_LEVEL, immutable_db_options_.info_log,
"Log %" PRIu64 " Already added in the recycle list, skipping.\n",
number);
continue;
}
job_context->full_scan_candidate_files.emplace_back(log_file, 0);
}
}
// Add info log files in db_log_dir
if (!immutable_db_options_.db_log_dir.empty() &&
immutable_db_options_.db_log_dir != dbname_) {
std::vector<std::string> info_log_files;
// Ignore errors
env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files);
for (std::string log_file : info_log_files) {
job_context->full_scan_candidate_files.emplace_back(log_file, 0);
}
}
}
// We're just cleaning up for DB::Write().
assert(job_context->logs_to_free.empty());
job_context->logs_to_free = logs_to_free_;

@ -382,6 +382,42 @@ TEST_F(DBWALTest, PreallocateBlock) {
#endif // !(defined NDEBUG) || !defined(OS_WIN)
#ifndef ROCKSDB_LITE
TEST_F(DBWALTest, FullPurgePreservesRecycledLog) {
// For github issue #1303
for (int i = 0; i < 2; ++i) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.recycle_log_file_num = 2;
if (i != 0) {
options.wal_dir = alternative_wal_dir_;
}
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "v1"));
VectorLogPtr log_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
ASSERT_GT(log_files.size(), 0);
ASSERT_OK(Flush());
// Now the original WAL is in log_files[0] and should be marked for
// recycling.
// Verify full purge cannot remove this file.
JobContext job_context(0);
dbfull()->TEST_LockMutex();
dbfull()->FindObsoleteFiles(&job_context, true /* force */);
dbfull()->TEST_UnlockMutex();
dbfull()->PurgeObsoleteFiles(job_context);
if (i == 0) {
ASSERT_OK(
env_->FileExists(LogFileName(dbname_, log_files[0]->LogNumber())));
} else {
ASSERT_OK(env_->FileExists(
LogFileName(alternative_wal_dir_, log_files[0]->LogNumber())));
}
}
}
TEST_F(DBWALTest, GetSortedWalFiles) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());

Loading…
Cancel
Save