When creating a new DB, fail it when wal_dir contains existing log files

Summary: Current behavior of creating new DB is, if there is existing log files, we will go ahead and replay them on top of empty DB. This is a behavior that no user would expect. With this patch, we will fail the creation if a user creates a DB with existing log files.

Test Plan: make all check

Reviewers: haobo, igor, ljin

Reviewed By: haobo

CC: nkg-, yhchiang, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D17817
main
sdong 11 years ago
parent c166615850
commit 0f40fe4bc7
  1. 15
      db/db_impl.cc
  2. 19
      db/db_test.cc

@ -1062,6 +1062,7 @@ Status DBImpl::Recover(
bool error_if_log_file_exist) { bool error_if_log_file_exist) {
mutex_.AssertHeld(); mutex_.AssertHeld();
bool is_new_db = false;
assert(db_lock_ == nullptr); assert(db_lock_ == nullptr);
if (!read_only) { if (!read_only) {
// We call CreateDirIfMissing() as the directory may already exist (if we // We call CreateDirIfMissing() as the directory may already exist (if we
@ -1090,6 +1091,7 @@ Status DBImpl::Recover(
if (options_.create_if_missing) { if (options_.create_if_missing) {
// TODO: add merge_operator name check // TODO: add merge_operator name check
s = NewDB(); s = NewDB();
is_new_db = true;
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -1140,10 +1142,15 @@ Status DBImpl::Recover(
for (size_t i = 0; i < filenames.size(); i++) { for (size_t i = 0; i < filenames.size(); i++) {
uint64_t number; uint64_t number;
FileType type; FileType type;
if (ParseFileName(filenames[i], &number, &type) if (ParseFileName(filenames[i], &number, &type) && type == kLogFile) {
&& type == kLogFile if (is_new_db) {
&& ((number >= min_log) || (number == prev_log))) { return Status::Corruption(
logs.push_back(number); "While creating a new Db, wal_dir contains "
"existing log file: ",
filenames[i]);
} else if ((number >= min_log) || (number == prev_log)) {
logs.push_back(number);
}
} }
} }

@ -2095,14 +2095,14 @@ TEST(DBTest, IgnoreRecoveredLog) {
ASSERT_EQ(one, Get("bar")); ASSERT_EQ(one, Get("bar"));
Close(); Close();
Destroy(&options); Destroy(&options);
Reopen(&options);
Close();
// copy the logs from backup back to wal dir // copy the logs from backup back to wal dir
env_->CreateDirIfMissing(options.wal_dir); env_->CreateDirIfMissing(options.wal_dir);
for (auto& log : logs) { for (auto& log : logs) {
if (log != ".." && log != ".") { if (log != ".." && log != ".") {
CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log); CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
// we won't be needing this file no more
env_->DeleteFile(backup_logs + "/" + log);
} }
} }
// assert that we successfully recovered only from logs, even though we // assert that we successfully recovered only from logs, even though we
@ -2110,7 +2110,20 @@ TEST(DBTest, IgnoreRecoveredLog) {
Reopen(&options); Reopen(&options);
ASSERT_EQ(two, Get("foo")); ASSERT_EQ(two, Get("foo"));
ASSERT_EQ(one, Get("bar")); ASSERT_EQ(one, Get("bar"));
Close();
// Recovery will fail if DB directory doesn't exist.
Destroy(&options);
// copy the logs from backup back to wal dir
env_->CreateDirIfMissing(options.wal_dir);
for (auto& log : logs) {
if (log != ".." && log != ".") {
CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
// we won't be needing this file no more
env_->DeleteFile(backup_logs + "/" + log);
}
}
Status s = TryReopen(&options);
ASSERT_TRUE(!s.ok());
} while (ChangeOptions()); } while (ChangeOptions());
} }

Loading…
Cancel
Save