Allow WAL dir to change with db dir (#8582)

Summary:
Prior to this change, the "wal_dir"  DBOption would always be set (defaults to dbname) when the DBOptions were sanitized.  Because of this setitng in the options file, it was not possible to rename/relocate a database directory after it had been created and use the existing options file.

After this change, the "wal_dir" option is only set under specific circumstances.  Methods were added to the ImmutableDBOptions class to see if it is set and if it is set to something other than the dbname.  Additionally, a method was added to retrieve the effective value of the WAL dir (either the option or the dbname/path).

Tests were added to the core and ldb to test that a database could be created and renamed without issue.  Additional tests for various permutations of wal_dir were also added.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8582

Reviewed By: pdillinger, autopear

Differential Revision: D29881122

Pulled By: mrambacher

fbshipit-source-id: 67d3d033dc8813d59917b0a3fba2550c0efd6dfb
main
mrambacher 3 years ago committed by Facebook GitHub Bot
parent 066b51126d
commit ab7f7c9e49
  1. 2
      HISTORY.md
  2. 9
      db/db_impl/db_impl.cc
  3. 10
      db/db_impl/db_impl_files.cc
  4. 57
      db/db_impl/db_impl_open.cc
  5. 9
      db/db_impl/db_impl_secondary.cc
  6. 17
      db/db_info_dumper.cc
  7. 16
      db/db_test2.cc
  8. 23
      db/repair.cc
  9. 22
      db/wal_manager.cc
  10. 5
      db/wal_manager.h
  11. 11
      file/file_util.cc
  12. 2
      file/file_util.h
  13. 35
      options/db_options.cc
  14. 8
      options/db_options.h
  15. 20
      tools/ldb_cmd.cc
  16. 56
      tools/ldb_cmd_test.cc
  17. 12
      utilities/checkpoint/checkpoint_impl.cc
  18. 124
      utilities/options/options_util_test.cc

@ -12,7 +12,6 @@
## 6.23.0 (2021-07-16) ## 6.23.0 (2021-07-16)
### Behavior Changes ### Behavior Changes
* Obsolete keys in the bottommost level that were preserved for a snapshot will now be cleaned upon snapshot release in all cases. This form of compaction (snapshot release triggered compaction) previously had an artificial limitation that multiple tombstones needed to be present. * Obsolete keys in the bottommost level that were preserved for a snapshot will now be cleaned upon snapshot release in all cases. This form of compaction (snapshot release triggered compaction) previously had an artificial limitation that multiple tombstones needed to be present.
### Bug Fixes ### Bug Fixes
* Blob file checksums are now printed in hexadecimal format when using the `manifest_dump` `ldb` command. * Blob file checksums are now printed in hexadecimal format when using the `manifest_dump` `ldb` command.
* `GetLiveFilesMetaData()` now populates the `temperature`, `oldest_ancester_time`, and `file_creation_time` fields of its `LiveFileMetaData` results when the information is available. Previously these fields always contained zero indicating unknown. * `GetLiveFilesMetaData()` now populates the `temperature`, `oldest_ancester_time`, and `file_creation_time` fields of its `LiveFileMetaData` results when the information is available. Previously these fields always contained zero indicating unknown.
@ -20,6 +19,7 @@
* Fix continuous logging of an existing background error on every user write * Fix continuous logging of an existing background error on every user write
* Fix a bug that `Get()` return Status::OK() and an empty value for non-existent key when `read_options.read_tier = kBlockCacheTier`. * Fix a bug that `Get()` return Status::OK() and an empty value for non-existent key when `read_options.read_tier = kBlockCacheTier`.
* Fix a bug that stat in `get_context` didn't accumulate to statistics when query is failed. * Fix a bug that stat in `get_context` didn't accumulate to statistics when query is failed.
* Fixed handling of DBOptions::wal_dir with LoadLatestOptions() or ldb --try_load_options on a copied or moved DB. Previously, when the WAL directory is same as DB directory (default), a copied or moved DB would reference the old path of the DB as the WAL directory, potentially corrupting both copies. Under this change, the wal_dir from DB::GetOptions() or LoadLatestOptions() may now be empty, indicating that the current DB directory is used for WALs. This is also a subtle API change.
### New Features ### New Features
* ldb has a new feature, `list_live_files_metadata`, that shows the live SST files, as well as their LSM storage level and the column family they belong to. * ldb has a new feature, `list_live_files_metadata`, that shows the live SST files, as well as their LSM storage level and the column family they belong to.

@ -641,7 +641,7 @@ Status DBImpl::CloseHelper() {
ROCKS_LOG_WARN( ROCKS_LOG_WARN(
immutable_db_options_.info_log, immutable_db_options_.info_log,
"Unable to Sync WAL file %s with error -- %s", "Unable to Sync WAL file %s with error -- %s",
LogFileName(immutable_db_options_.wal_dir, log_number).c_str(), LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(),
s.ToString().c_str()); s.ToString().c_str());
// Retain the first error // Retain the first error
if (ret.ok()) { if (ret.ok()) {
@ -735,7 +735,8 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
const Status DBImpl::CreateArchivalDirectory() { const Status DBImpl::CreateArchivalDirectory() {
if (immutable_db_options_.WAL_ttl_seconds > 0 || if (immutable_db_options_.WAL_ttl_seconds > 0 ||
immutable_db_options_.WAL_size_limit_MB > 0) { immutable_db_options_.WAL_size_limit_MB > 0) {
std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir); std::string archivalPath =
ArchivalDirectory(immutable_db_options_.GetWalDir());
return env_->CreateDirIfMissing(archivalPath); return env_->CreateDirIfMissing(archivalPath);
} }
return Status::OK(); return Status::OK();
@ -4051,7 +4052,7 @@ Status DestroyDB(const std::string& dbname, const Options& options,
ImmutableDBOptions soptions(SanitizeOptions(dbname, options)); ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
Env* env = soptions.env; Env* env = soptions.env;
std::vector<std::string> filenames; std::vector<std::string> filenames;
bool wal_in_db_path = IsWalDirSameAsDBPath(&soptions); bool wal_in_db_path = soptions.IsWalDirSameAsDBPath();
// Reset the logger because it holds a handle to the // Reset the logger because it holds a handle to the
// log file and prevents cleanup and directory removal // log file and prevents cleanup and directory removal
@ -4117,7 +4118,7 @@ Status DestroyDB(const std::string& dbname, const Options& options,
std::vector<std::string> walDirFiles; std::vector<std::string> walDirFiles;
std::string archivedir = ArchivalDirectory(dbname); std::string archivedir = ArchivalDirectory(dbname);
bool wal_dir_exists = false; bool wal_dir_exists = false;
if (dbname != soptions.wal_dir) { if (!soptions.IsWalDirSameAsDBPath(dbname)) {
wal_dir_exists = env->GetChildren(soptions.wal_dir, &walDirFiles).ok(); wal_dir_exists = env->GetChildren(soptions.wal_dir, &walDirFiles).ok();
archivedir = ArchivalDirectory(soptions.wal_dir); archivedir = ArchivalDirectory(soptions.wal_dir);
} }

@ -221,7 +221,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
} }
// Add log files in wal_dir // Add log files in wal_dir
if (immutable_db_options_.wal_dir != dbname_) {
if (!immutable_db_options_.IsWalDirSameAsDBPath(dbname_)) {
std::vector<std::string> log_files; std::vector<std::string> log_files;
Status s = env_->GetChildren(immutable_db_options_.wal_dir, &log_files); Status s = env_->GetChildren(immutable_db_options_.wal_dir, &log_files);
s.PermitUncheckedError(); // TODO: What should we do on error? s.PermitUncheckedError(); // TODO: What should we do on error?
@ -401,10 +402,10 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
blob_file.GetPath()); blob_file.GetPath());
} }
auto wal_dir = immutable_db_options_.GetWalDir();
for (auto file_num : state.log_delete_files) { for (auto file_num : state.log_delete_files) {
if (file_num > 0) { if (file_num > 0) {
candidate_files.emplace_back(LogFileName(file_num), candidate_files.emplace_back(LogFileName(file_num), wal_dir);
immutable_db_options_.wal_dir);
} }
} }
for (const auto& filename : state.manifest_delete_files) { for (const auto& filename : state.manifest_delete_files) {
@ -553,8 +554,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
fname = BlobFileName(candidate_file.file_path, number); fname = BlobFileName(candidate_file.file_path, number);
dir_to_sync = candidate_file.file_path; dir_to_sync = candidate_file.file_path;
} else { } else {
dir_to_sync = dir_to_sync = (type == kWalFile) ? wal_dir : dbname_;
(type == kWalFile) ? immutable_db_options_.wal_dir : dbname_;
fname = dir_to_sync + fname = dir_to_sync +
((!dir_to_sync.empty() && dir_to_sync.back() == '/') || ((!dir_to_sync.empty() && dir_to_sync.back() == '/') ||
(!to_delete.empty() && to_delete.front() == '/') (!to_delete.empty() && to_delete.front() == '/')

@ -111,16 +111,28 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
result.recycle_log_file_num = 0; result.recycle_log_file_num = 0;
} }
if (result.wal_dir.empty()) { if (result.db_paths.size() == 0) {
result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
} else if (result.wal_dir.empty()) {
// Use dbname as default // Use dbname as default
result.wal_dir = dbname; result.wal_dir = dbname;
} }
if (result.wal_dir.back() == '/') { if (!result.wal_dir.empty()) {
result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1); // If there is a wal_dir already set, check to see if the wal_dir is the
// same as the dbname AND the same as the db_path[0] (which must exist from
// a few lines ago). If the wal_dir matches both of these values, then clear
// the wal_dir value, which will make wal_dir == dbname. Most likely this
// condition was the result of reading an old options file where we forced
// wal_dir to be set (to dbname).
auto npath = NormalizePath(dbname + "/");
if (npath == NormalizePath(result.wal_dir + "/") &&
npath == NormalizePath(result.db_paths[0].path + "/")) {
result.wal_dir.clear();
}
} }
if (result.db_paths.size() == 0) { if (!result.wal_dir.empty() && result.wal_dir.back() == '/') {
result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max()); result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
} }
if (result.use_direct_reads && result.compaction_readahead_size == 0) { if (result.use_direct_reads && result.compaction_readahead_size == 0) {
@ -141,7 +153,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
ImmutableDBOptions immutable_db_options(result); ImmutableDBOptions immutable_db_options(result);
if (!IsWalDirSameAsDBPath(&immutable_db_options)) { if (!immutable_db_options.IsWalDirSameAsDBPath()) {
// Either the WAL dir and db_paths[0]/db_name are not the same, or we // Either the WAL dir and db_paths[0]/db_name are not the same, or we
// cannot tell for sure. In either case, assume they're different and // cannot tell for sure. In either case, assume they're different and
// explicitly cleanup the trash log files (bypass DeleteScheduler) // explicitly cleanup the trash log files (bypass DeleteScheduler)
@ -149,13 +161,14 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
// DeleteScheduler::CleanupDirectory on the same dir later, it will be // DeleteScheduler::CleanupDirectory on the same dir later, it will be
// safe // safe
std::vector<std::string> filenames; std::vector<std::string> filenames;
Status s = result.env->GetChildren(result.wal_dir, &filenames); auto wal_dir = immutable_db_options.GetWalDir();
Status s = result.env->GetChildren(wal_dir, &filenames);
s.PermitUncheckedError(); //**TODO: What to do on error? s.PermitUncheckedError(); //**TODO: What to do on error?
for (std::string& filename : filenames) { for (std::string& filename : filenames) {
if (filename.find(".log.trash", filename.length() - if (filename.find(".log.trash", filename.length() -
std::string(".log.trash").length()) != std::string(".log.trash").length()) !=
std::string::npos) { std::string::npos) {
std::string trash_file = result.wal_dir + "/" + filename; std::string trash_file = wal_dir + "/" + filename;
result.env->DeleteFile(trash_file).PermitUncheckedError(); result.env->DeleteFile(trash_file).PermitUncheckedError();
} }
} }
@ -543,12 +556,12 @@ Status DBImpl::Recover(
// Note that prev_log_number() is no longer used, but we pay // Note that prev_log_number() is no longer used, but we pay
// attention to it in case we are recovering a database // attention to it in case we are recovering a database
// produced by an older version of rocksdb. // produced by an older version of rocksdb.
auto wal_dir = immutable_db_options_.GetWalDir();
if (!immutable_db_options_.best_efforts_recovery) { if (!immutable_db_options_.best_efforts_recovery) {
s = env_->GetChildren(immutable_db_options_.wal_dir, &files_in_wal_dir); s = env_->GetChildren(wal_dir, &files_in_wal_dir);
} }
if (s.IsNotFound()) { if (s.IsNotFound()) {
return Status::InvalidArgument("wal_dir not found", return Status::InvalidArgument("wal_dir not found", wal_dir);
immutable_db_options_.wal_dir);
} else if (!s.ok()) { } else if (!s.ok()) {
return s; return s;
} }
@ -564,8 +577,7 @@ Status DBImpl::Recover(
"existing log file: ", "existing log file: ",
file); file);
} else { } else {
wal_files[number] = wal_files[number] = LogFileName(wal_dir, number);
LogFileName(immutable_db_options_.wal_dir, number);
} }
} }
} }
@ -645,7 +657,7 @@ Status DBImpl::Recover(
if (s.ok()) { if (s.ok()) {
const std::string normalized_dbname = NormalizePath(dbname_); const std::string normalized_dbname = NormalizePath(dbname_);
const std::string normalized_wal_dir = const std::string normalized_wal_dir =
NormalizePath(immutable_db_options_.wal_dir); NormalizePath(immutable_db_options_.GetWalDir());
if (immutable_db_options_.best_efforts_recovery) { if (immutable_db_options_.best_efforts_recovery) {
filenames = std::move(files_in_dbname); filenames = std::move(files_in_dbname);
} else if (normalized_dbname == normalized_wal_dir) { } else if (normalized_dbname == normalized_wal_dir) {
@ -850,7 +862,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
// update the file number allocation counter in VersionSet. // update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(wal_number); versions_->MarkFileNumberUsed(wal_number);
// Open the log file // Open the log file
std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number); std::string fname =
LogFileName(immutable_db_options_.GetWalDir(), wal_number);
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d", wal_number, "Recovering log #%" PRIu64 " mode %d", wal_number,
@ -1275,7 +1288,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate, Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
LogFileNumberSize* log_ptr) { LogFileNumberSize* log_ptr) {
LogFileNumberSize log(wal_number); LogFileNumberSize log(wal_number);
std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number); std::string fname =
LogFileName(immutable_db_options_.GetWalDir(), wal_number);
Status s; Status s;
// This gets the appear size of the wals, not including preallocated space. // This gets the appear size of the wals, not including preallocated space.
s = env_->GetFileSize(fname, &log.size); s = env_->GetFileSize(fname, &log.size);
@ -1515,15 +1529,14 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
BuildDBOptions(immutable_db_options_, mutable_db_options_); BuildDBOptions(immutable_db_options_, mutable_db_options_);
FileOptions opt_file_options = FileOptions opt_file_options =
fs_->OptimizeForLogWrite(file_options_, db_options); fs_->OptimizeForLogWrite(file_options_, db_options);
std::string log_fname = std::string wal_dir = immutable_db_options_.GetWalDir();
LogFileName(immutable_db_options_.wal_dir, log_file_num); std::string log_fname = LogFileName(wal_dir, log_file_num);
if (recycle_log_number) { if (recycle_log_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,
"reusing log %" PRIu64 " from recycle list\n", "reusing log %" PRIu64 " from recycle list\n",
recycle_log_number); recycle_log_number);
std::string old_log_fname = std::string old_log_fname = LogFileName(wal_dir, recycle_log_number);
LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1"); TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2"); TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options, io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
@ -1574,7 +1587,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
} }
DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn); DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir); s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
if (s.ok()) { if (s.ok()) {
std::vector<std::string> paths; std::vector<std::string> paths;
for (auto& db_path : impl->immutable_db_options_.db_paths) { for (auto& db_path : impl->immutable_db_options_.db_paths) {
@ -1606,7 +1619,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
return s; return s;
} }
impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_); impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
impl->mutex_.Lock(); impl->mutex_.Lock();
// Handles create_if_missing, error_if_exists // Handles create_if_missing, error_if_exists

@ -100,10 +100,10 @@ Status DBImplSecondary::FindNewLogNumbers(std::vector<uint64_t>* logs) {
assert(logs != nullptr); assert(logs != nullptr);
std::vector<std::string> filenames; std::vector<std::string> filenames;
Status s; Status s;
s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames); s = env_->GetChildren(immutable_db_options_.GetWalDir(), &filenames);
if (s.IsNotFound()) { if (s.IsNotFound()) {
return Status::InvalidArgument("Failed to open wal_dir", return Status::InvalidArgument("Failed to open wal_dir",
immutable_db_options_.wal_dir); immutable_db_options_.GetWalDir());
} else if (!s.ok()) { } else if (!s.ok()) {
return s; return s;
} }
@ -143,7 +143,8 @@ Status DBImplSecondary::MaybeInitLogReader(
// initialize log reader from log_number // initialize log reader from log_number
// TODO: min_log_number_to_keep_2pc check needed? // TODO: min_log_number_to_keep_2pc check needed?
// Open the log file // Open the log file
std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number); std::string fname =
LogFileName(immutable_db_options_.GetWalDir(), log_number);
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d", log_number, "Recovering log #%" PRIu64 " mode %d", log_number,
static_cast<int>(immutable_db_options_.wal_recovery_mode)); static_cast<int>(immutable_db_options_.wal_recovery_mode));
@ -630,7 +631,7 @@ Status DB::OpenAsSecondary(
&impl->write_controller_, impl->io_tracer_)); &impl->write_controller_, impl->io_tracer_));
impl->column_family_memtables_.reset( impl->column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet())); new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_); impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
impl->mutex_.Lock(); impl->mutex_.Lock();
s = impl->Recover(column_families, true, false, false); s = impl->Recover(column_families, true, false, false);

@ -109,31 +109,30 @@ void DumpDBFileSummary(const ImmutableDBOptions& options,
} }
// Get wal file in wal_dir // Get wal file in wal_dir
if (dbname.compare(options.wal_dir) != 0) { const auto& wal_dir = options.GetWalDir(dbname);
if (!env->GetChildren(options.wal_dir, &files).ok()) { if (!options.IsWalDirSameAsDBPath(dbname)) {
Error(options.info_log, if (!env->GetChildren(wal_dir, &files).ok()) {
"Error when reading %s dir\n", Error(options.info_log, "Error when reading %s dir\n", wal_dir.c_str());
options.wal_dir.c_str());
return; return;
} }
wal_info.clear(); wal_info.clear();
for (const std::string& file : files) { for (const std::string& file : files) {
if (ParseFileName(file, &number, &type)) { if (ParseFileName(file, &number, &type)) {
if (type == kWalFile) { if (type == kWalFile) {
if (env->GetFileSize(options.wal_dir + "/" + file, &file_size).ok()) { if (env->GetFileSize(wal_dir + "/" + file, &file_size).ok()) {
wal_info.append(file) wal_info.append(file)
.append(" size: ") .append(" size: ")
.append(std::to_string(file_size)) .append(std::to_string(file_size))
.append(" ; "); .append(" ; ");
} else { } else {
Error(options.info_log, "Error when reading LOG file %s/%s\n", Error(options.info_log, "Error when reading LOG file %s/%s\n",
options.wal_dir.c_str(), file.c_str()); wal_dir.c_str(), file.c_str());
} }
} }
} }
} }
} }
Header(options.info_log, "Write Ahead Log file in %s: %s\n", Header(options.info_log, "Write Ahead Log file in %s: %s\n", wal_dir.c_str(),
options.wal_dir.c_str(), wal_info.c_str()); wal_info.c_str());
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -5953,6 +5953,22 @@ TEST_F(DBTest2, PointInTimeRecoveryWithSyncFailureInCFCreation) {
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
ReopenWithColumnFamilies({"default", "test1", "test2"}, options); ReopenWithColumnFamilies({"default", "test1", "test2"}, options);
} }
TEST_F(DBTest2, RenameDirectory) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value0"));
Close();
auto old_dbname = dbname_;
auto new_dbname = dbname_ + "_2";
EXPECT_OK(env_->RenameFile(dbname_, new_dbname));
options.create_if_missing = false;
dbname_ = new_dbname;
ASSERT_OK(TryReopen(options));
ASSERT_EQ("value0", Get("foo"));
Destroy(options);
dbname_ = old_dbname;
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS

@ -286,23 +286,15 @@ class Repairer {
} }
// search wal_dir if user uses a customize wal_dir // search wal_dir if user uses a customize wal_dir
bool same = false; bool same = immutable_db_options_.IsWalDirSameAsDBPath(dbname_);
Status status = env_->AreFilesSame(db_options_.wal_dir, dbname_, &same);
if (status.IsNotSupported()) {
same = db_options_.wal_dir == dbname_;
status = Status::OK();
} else if (!status.ok()) {
return status;
}
if (!same) { if (!same) {
to_search_paths.push_back(db_options_.wal_dir); to_search_paths.push_back(immutable_db_options_.wal_dir);
} }
for (size_t path_id = 0; path_id < to_search_paths.size(); path_id++) { for (size_t path_id = 0; path_id < to_search_paths.size(); path_id++) {
ROCKS_LOG_INFO(db_options_.info_log, "Searching path %s\n", ROCKS_LOG_INFO(db_options_.info_log, "Searching path %s\n",
to_search_paths[path_id].c_str()); to_search_paths[path_id].c_str());
status = env_->GetChildren(to_search_paths[path_id], &filenames); Status status = env_->GetChildren(to_search_paths[path_id], &filenames);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
@ -339,10 +331,11 @@ class Repairer {
} }
void ConvertLogFilesToTables() { void ConvertLogFilesToTables() {
const auto& wal_dir = immutable_db_options_.GetWalDir();
for (size_t i = 0; i < logs_.size(); i++) { for (size_t i = 0; i < logs_.size(); i++) {
// we should use LogFileName(wal_dir, logs_[i]) here. user might uses wal_dir option. // we should use LogFileName(wal_dir, logs_[i]) here. user might uses wal_dir option.
std::string logname = LogFileName(db_options_.wal_dir, logs_[i]); std::string logname = LogFileName(wal_dir, logs_[i]);
Status status = ConvertLogToTable(logs_[i]); Status status = ConvertLogToTable(wal_dir, logs_[i]);
if (!status.ok()) { if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(db_options_.info_log,
"Log #%" PRIu64 ": ignoring conversion error: %s", "Log #%" PRIu64 ": ignoring conversion error: %s",
@ -352,7 +345,7 @@ class Repairer {
} }
} }
Status ConvertLogToTable(uint64_t log) { Status ConvertLogToTable(const std::string& wal_dir, uint64_t log) {
struct LogReporter : public log::Reader::Reporter { struct LogReporter : public log::Reader::Reporter {
Env* env; Env* env;
std::shared_ptr<Logger> info_log; std::shared_ptr<Logger> info_log;
@ -365,7 +358,7 @@ class Repairer {
}; };
// Open the log file // Open the log file
std::string logname = LogFileName(db_options_.wal_dir, log); std::string logname = LogFileName(wal_dir, log);
const auto& fs = env_->GetFileSystem(); const auto& fs = env_->GetFileSystem();
std::unique_ptr<SequentialFileReader> lfile_reader; std::unique_ptr<SequentialFileReader> lfile_reader;
Status status = SequentialFileReader::Create( Status status = SequentialFileReader::Create(

@ -37,7 +37,7 @@ namespace ROCKSDB_NAMESPACE {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
Status WalManager::DeleteFile(const std::string& fname, uint64_t number) { Status WalManager::DeleteFile(const std::string& fname, uint64_t number) {
auto s = env_->DeleteFile(db_options_.wal_dir + "/" + fname); auto s = env_->DeleteFile(wal_dir_ + "/" + fname);
if (s.ok()) { if (s.ok()) {
MutexLock l(&read_first_record_cache_mutex_); MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.erase(number); read_first_record_cache_.erase(number);
@ -52,7 +52,7 @@ Status WalManager::GetSortedWalFiles(VectorLogPtr& files) {
Status s; Status s;
// list wal files in main db dir. // list wal files in main db dir.
VectorLogPtr logs; VectorLogPtr logs;
s = GetSortedWalsOfType(db_options_.wal_dir, logs, kAliveLogFile); s = GetSortedWalsOfType(wal_dir_, logs, kAliveLogFile);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -65,7 +65,7 @@ Status WalManager::GetSortedWalFiles(VectorLogPtr& files) {
files.clear(); files.clear();
// list wal files in archive dir. // list wal files in archive dir.
std::string archivedir = ArchivalDirectory(db_options_.wal_dir); std::string archivedir = ArchivalDirectory(wal_dir_);
Status exists = env_->FileExists(archivedir); Status exists = env_->FileExists(archivedir);
if (exists.ok()) { if (exists.ok()) {
s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile); s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
@ -120,7 +120,7 @@ Status WalManager::GetUpdatesSince(
return s; return s;
} }
iter->reset(new TransactionLogIteratorImpl( iter->reset(new TransactionLogIteratorImpl(
db_options_.wal_dir, &db_options_, read_options, file_options_, seq, wal_dir_, &db_options_, read_options, file_options_, seq,
std::move(wal_files), version_set, seq_per_batch_, io_tracer_)); std::move(wal_files), version_set, seq_per_batch_, io_tracer_));
return (*iter)->status(); return (*iter)->status();
} }
@ -159,7 +159,7 @@ void WalManager::PurgeObsoleteWALFiles() {
purge_wal_files_last_run_ = now_seconds; purge_wal_files_last_run_ = now_seconds;
std::string archival_dir = ArchivalDirectory(db_options_.wal_dir); std::string archival_dir = ArchivalDirectory(wal_dir_);
std::vector<std::string> files; std::vector<std::string> files;
s = env_->GetChildren(archival_dir, &files); s = env_->GetChildren(archival_dir, &files);
if (!s.ok()) { if (!s.ok()) {
@ -257,8 +257,7 @@ void WalManager::PurgeObsoleteWALFiles() {
for (size_t i = 0; i < files_del_num; ++i) { for (size_t i = 0; i < files_del_num; ++i) {
std::string const file_path = archived_logs[i]->PathName(); std::string const file_path = archived_logs[i]->PathName();
s = DeleteDBFile(&db_options_, db_options_.wal_dir + "/" + file_path, s = DeleteDBFile(&db_options_, wal_dir_ + "/" + file_path, wal_dir_, false,
db_options_.wal_dir, false,
/*force_fg=*/!wal_in_db_path_); /*force_fg=*/!wal_in_db_path_);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s", ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
@ -272,7 +271,7 @@ void WalManager::PurgeObsoleteWALFiles() {
} }
void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) { void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
auto archived_log_name = ArchivedLogFileName(db_options_.wal_dir, number); auto archived_log_name = ArchivedLogFileName(wal_dir_, number);
// The sync point below is used in (DBTest,TransactionLogIteratorRace) // The sync point below is used in (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1"); TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1");
Status s = env_->RenameFile(fname, archived_log_name); Status s = env_->RenameFile(fname, archived_log_name);
@ -388,7 +387,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
} }
Status s; Status s;
if (type == kAliveLogFile) { if (type == kAliveLogFile) {
std::string fname = LogFileName(db_options_.wal_dir, number); std::string fname = LogFileName(wal_dir_, number);
s = ReadFirstLine(fname, number, sequence); s = ReadFirstLine(fname, number, sequence);
if (!s.ok() && env_->FileExists(fname).ok()) { if (!s.ok() && env_->FileExists(fname).ok()) {
// return any error that is not caused by non-existing file // return any error that is not caused by non-existing file
@ -398,8 +397,7 @@ Status WalManager::ReadFirstRecord(const WalFileType type,
if (type == kArchivedLogFile || !s.ok()) { if (type == kArchivedLogFile || !s.ok()) {
// check if the file got moved to archive. // check if the file got moved to archive.
std::string archived_file = std::string archived_file = ArchivedLogFileName(wal_dir_, number);
ArchivedLogFileName(db_options_.wal_dir, number);
s = ReadFirstLine(archived_file, number, sequence); s = ReadFirstLine(archived_file, number, sequence);
// maybe the file was deleted from archive dir. If that's the case, return // maybe the file was deleted from archive dir. If that's the case, return
// Status::OK(). The caller with identify this as empty file because // Status::OK(). The caller with identify this as empty file because
@ -429,7 +427,7 @@ Status WalManager::GetLiveWalFile(uint64_t number,
Status s; Status s;
uint64_t size_bytes; uint64_t size_bytes;
s = env_->GetFileSize(LogFileName(db_options_.wal_dir, number), &size_bytes); s = env_->GetFileSize(LogFileName(wal_dir_, number), &size_bytes);
if (!s.ok()) { if (!s.ok()) {
return s; return s;

@ -45,7 +45,8 @@ class WalManager {
fs_(db_options.fs, io_tracer), fs_(db_options.fs, io_tracer),
purge_wal_files_last_run_(0), purge_wal_files_last_run_(0),
seq_per_batch_(seq_per_batch), seq_per_batch_(seq_per_batch),
wal_in_db_path_(IsWalDirSameAsDBPath(&db_options)), wal_dir_(db_options_.GetWalDir()),
wal_in_db_path_(db_options_.IsWalDirSameAsDBPath()),
io_tracer_(io_tracer) {} io_tracer_(io_tracer) {}
Status GetSortedWalFiles(VectorLogPtr& files); Status GetSortedWalFiles(VectorLogPtr& files);
@ -106,6 +107,8 @@ class WalManager {
bool seq_per_batch_; bool seq_per_batch_;
const std::string& wal_dir_;
bool wal_in_db_path_; bool wal_in_db_path_;
// obsolete files will be deleted every this seconds if ttl deletion is // obsolete files will be deleted every this seconds if ttl deletion is

@ -112,17 +112,6 @@ Status DeleteDBFile(const ImmutableDBOptions* db_options,
#endif #endif
} }
bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options) {
bool same = false;
assert(!db_options->db_paths.empty());
Status s = db_options->env->AreFilesSame(db_options->wal_dir,
db_options->db_paths[0].path, &same);
if (s.IsNotSupported()) {
same = db_options->wal_dir == db_options->db_paths[0].path;
}
return same;
}
// requested_checksum_func_name brings the function name of the checksum // requested_checksum_func_name brings the function name of the checksum
// generator in checksum_factory. Empty string is permitted, in which case the // generator in checksum_factory. Empty string is permitted, in which case the
// name of the generator created by the factory is unchecked. When // name of the generator created by the factory is unchecked. When

@ -45,8 +45,6 @@ extern Status DeleteDBFile(const ImmutableDBOptions* db_options,
const std::string& path_to_sync, const bool force_bg, const std::string& path_to_sync, const bool force_bg,
const bool force_fg); const bool force_fg);
extern bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options);
extern IOStatus GenerateOneFileChecksum( extern IOStatus GenerateOneFileChecksum(
FileSystem* fs, const std::string& file_path, FileSystem* fs, const std::string& file_path,
FileChecksumGenFactory* checksum_factory, FileChecksumGenFactory* checksum_factory,

@ -844,6 +844,41 @@ void ImmutableDBOptions::Dump(Logger* log) const {
db_host_id.c_str()); db_host_id.c_str());
} }
bool ImmutableDBOptions::IsWalDirSameAsDBPath() const {
assert(!db_paths.empty());
return IsWalDirSameAsDBPath(db_paths[0].path);
}
bool ImmutableDBOptions::IsWalDirSameAsDBPath(
const std::string& db_path) const {
bool same = wal_dir.empty();
if (!same) {
Status s = env->AreFilesSame(wal_dir, db_path, &same);
if (s.IsNotSupported()) {
same = wal_dir == db_path;
}
}
return same;
}
const std::string& ImmutableDBOptions::GetWalDir() const {
if (wal_dir.empty()) {
assert(!db_paths.empty());
return db_paths[0].path;
} else {
return wal_dir;
}
}
const std::string& ImmutableDBOptions::GetWalDir(
const std::string& path) const {
if (wal_dir.empty()) {
return path;
} else {
return wal_dir;
}
}
MutableDBOptions::MutableDBOptions() MutableDBOptions::MutableDBOptions()
: max_background_jobs(2), : max_background_jobs(2),
base_background_compactions(-1), base_background_compactions(-1),

@ -36,6 +36,9 @@ struct ImmutableDBOptions {
bool use_fsync; bool use_fsync;
std::vector<DbPath> db_paths; std::vector<DbPath> db_paths;
std::string db_log_dir; std::string db_log_dir;
// The wal_dir option from the file. To determine the
// directory in use, the GetWalDir or IsWalDirSameAsDBPath
// methods should be used instead of accessing this variable directly.
std::string wal_dir; std::string wal_dir;
size_t max_log_file_size; size_t max_log_file_size;
size_t log_file_time_to_roll; size_t log_file_time_to_roll;
@ -103,6 +106,11 @@ struct ImmutableDBOptions {
Statistics* stats; Statistics* stats;
Logger* logger; Logger* logger;
std::shared_ptr<CompactionService> compaction_service; std::shared_ptr<CompactionService> compaction_service;
bool IsWalDirSameAsDBPath() const;
bool IsWalDirSameAsDBPath(const std::string& path) const;
const std::string& GetWalDir() const;
const std::string& GetWalDir(const std::string& path) const;
}; };
struct MutableDBOptions { struct MutableDBOptions {

@ -704,11 +704,13 @@ void LDBCommand::PrepareOptions() {
db_ = nullptr; db_ = nullptr;
return; return;
} }
if (options_.env->FileExists(options_.wal_dir).IsNotFound()) { if (!options_.wal_dir.empty()) {
options_.wal_dir = db_path_; if (options_.env->FileExists(options_.wal_dir).IsNotFound()) {
fprintf( options_.wal_dir = db_path_;
stderr, fprintf(
"wal_dir loaded from the option file doesn't exist. Ignore it.\n"); stderr,
"wal_dir loaded from the option file doesn't exist. Ignore it.\n");
}
} }
// If merge operator is not set, set a string append operator. // If merge operator is not set, set a string append operator.
@ -3395,9 +3397,15 @@ void DBFileDumperCommand::DoCommand() {
if (!s.ok()) { if (!s.ok()) {
std::cerr << "Error when getting WAL files" << std::endl; std::cerr << "Error when getting WAL files" << std::endl;
} else { } else {
std::string wal_dir;
if (options_.wal_dir.empty()) {
wal_dir = db_->GetName();
} else {
wal_dir = NormalizePath(options_.wal_dir + "/");
}
for (auto& wal : wal_files) { for (auto& wal : wal_files) {
// TODO(qyang): option.wal_dir should be passed into ldb command // TODO(qyang): option.wal_dir should be passed into ldb command
std::string filename = db_->GetOptions().wal_dir + wal->PathName(); std::string filename = wal_dir + wal->PathName();
std::cout << filename << std::endl; std::cout << filename << std::endl;
// TODO(myabandeh): allow configuring is_write_commited // TODO(myabandeh): allow configuring is_write_commited
DumpWalFile(options_, filename, true, true, true /* is_write_commited */, DumpWalFile(options_, filename, true, true, true /* is_write_commited */,

@ -874,6 +874,62 @@ TEST_F(LdbCmdTest, LoadCFOptionsAndOverride) {
ASSERT_EQ(column_families[1].options.num_levels, cf_opts.num_levels); ASSERT_EQ(column_families[1].options.num_levels, cf_opts.num_levels);
ASSERT_EQ(column_families[1].options.write_buffer_size, 268435456); ASSERT_EQ(column_families[1].options.write_buffer_size, 268435456);
} }
TEST_F(LdbCmdTest, RenameDbAndLoadOptions) {
Env* env = TryLoadCustomOrDefaultEnv();
Options opts;
opts.env = env;
opts.create_if_missing = false;
std::string old_dbname = test::PerThreadDBPath(env, "ldb_cmd_test");
std::string new_dbname = old_dbname + "_2";
DestroyDB(old_dbname, opts);
DestroyDB(new_dbname, opts);
char old_arg[1024];
snprintf(old_arg, sizeof(old_arg), "--db=%s", old_dbname.c_str());
char new_arg[1024];
snprintf(new_arg, sizeof(old_arg), "--db=%s", new_dbname.c_str());
const char* argv1[] = {"./ldb",
old_arg,
"put",
"key1",
"value1",
"--try_load_options",
"--create_if_missing"};
const char* argv2[] = {"./ldb", old_arg, "get", "key1", "--try_load_options"};
const char* argv3[] = {"./ldb", new_arg, "put",
"key2", "value2", "--try_load_options"};
const char* argv4[] = {"./ldb", new_arg, "get", "key1", "--try_load_options"};
const char* argv5[] = {"./ldb", new_arg, "get", "key2", "--try_load_options"};
ASSERT_EQ(
0, LDBCommandRunner::RunCommand(7, argv1, opts, LDBOptions(), nullptr));
ASSERT_EQ(
0, LDBCommandRunner::RunCommand(5, argv2, opts, LDBOptions(), nullptr));
ConfigOptions config_opts;
Options options;
std::vector<ColumnFamilyDescriptor> column_families;
config_opts.env = env;
ASSERT_OK(
LoadLatestOptions(config_opts, old_dbname, &options, &column_families));
ASSERT_EQ(options.wal_dir, "");
ASSERT_OK(env->RenameFile(old_dbname, new_dbname));
ASSERT_NE(
0, LDBCommandRunner::RunCommand(6, argv1, opts, LDBOptions(), nullptr));
ASSERT_NE(
0, LDBCommandRunner::RunCommand(5, argv2, opts, LDBOptions(), nullptr));
ASSERT_EQ(
0, LDBCommandRunner::RunCommand(6, argv3, opts, LDBOptions(), nullptr));
ASSERT_EQ(
0, LDBCommandRunner::RunCommand(5, argv4, opts, LDBOptions(), nullptr));
ASSERT_EQ(
0, LDBCommandRunner::RunCommand(5, argv5, opts, LDBOptions(), nullptr));
DestroyDB(new_dbname, opts);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS

@ -434,29 +434,29 @@ Status CheckpointImpl::CreateCustomCheckpoint(
// Link WAL files. Copy exact size of last one because it is the only one // Link WAL files. Copy exact size of last one because it is the only one
// that has changes after the last flush. // that has changes after the last flush.
ImmutableDBOptions ioptions(db_options);
auto wal_dir = ioptions.GetWalDir();
for (size_t i = 0; s.ok() && i < wal_size; ++i) { for (size_t i = 0; s.ok() && i < wal_size; ++i) {
if ((live_wal_files[i]->Type() == kAliveLogFile) && if ((live_wal_files[i]->Type() == kAliveLogFile) &&
(!flush_memtable || (!flush_memtable ||
live_wal_files[i]->LogNumber() >= min_log_num)) { live_wal_files[i]->LogNumber() >= min_log_num)) {
if (i + 1 == wal_size) { if (i + 1 == wal_size) {
s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), s = copy_file_cb(wal_dir, live_wal_files[i]->PathName(),
live_wal_files[i]->SizeFileBytes(), kWalFile, live_wal_files[i]->SizeFileBytes(), kWalFile,
kUnknownFileChecksumFuncName, kUnknownFileChecksum); kUnknownFileChecksumFuncName, kUnknownFileChecksum);
break; break;
} }
if (same_fs) { if (same_fs) {
// we only care about live log files // we only care about live log files
s = link_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), s = link_file_cb(wal_dir, live_wal_files[i]->PathName(), kWalFile);
kWalFile);
if (s.IsNotSupported()) { if (s.IsNotSupported()) {
same_fs = false; same_fs = false;
s = Status::OK(); s = Status::OK();
} }
} }
if (!same_fs) { if (!same_fs) {
s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), 0, s = copy_file_cb(wal_dir, live_wal_files[i]->PathName(), 0, kWalFile,
kWalFile, kUnknownFileChecksumFuncName, kUnknownFileChecksumFuncName, kUnknownFileChecksum);
kUnknownFileChecksum);
} }
} }
} }

@ -634,6 +634,130 @@ TEST_F(OptionsUtilTest, BadLatestOptions) {
// Ignore the errors for future releases when ignore_unknown_options=true // Ignore the errors for future releases when ignore_unknown_options=true
ASSERT_OK(LoadLatestOptions(ignore_opts, dbname_, &db_opts, &cf_descs)); ASSERT_OK(LoadLatestOptions(ignore_opts, dbname_, &db_opts, &cf_descs));
} }
TEST_F(OptionsUtilTest, RenameDatabaseDirectory) {
DB* db;
Options options;
DBOptions db_opts;
std::vector<ColumnFamilyDescriptor> cf_descs;
std::vector<ColumnFamilyHandle*> handles;
options.create_if_missing = true;
ASSERT_OK(DB::Open(options, dbname_, &db));
ASSERT_OK(db->Put(WriteOptions(), "foo", "value0"));
delete db;
auto new_dbname = dbname_ + "_2";
ASSERT_OK(options.env->RenameFile(dbname_, new_dbname));
ASSERT_OK(LoadLatestOptions(new_dbname, options.env, &db_opts, &cf_descs));
ASSERT_EQ(cf_descs.size(), 1U);
db_opts.create_if_missing = false;
ASSERT_OK(DB::Open(db_opts, new_dbname, cf_descs, &handles, &db));
std::string value;
ASSERT_OK(db->Get(ReadOptions(), "foo", &value));
ASSERT_EQ("value0", value);
// close the db
for (auto* handle : handles) {
delete handle;
}
delete db;
Options new_options(db_opts, cf_descs[0].options);
ASSERT_OK(DestroyDB(new_dbname, new_options, cf_descs));
ASSERT_OK(DestroyDB(dbname_, options));
}
TEST_F(OptionsUtilTest, WalDirSettings) {
DB* db;
Options options;
DBOptions db_opts;
std::vector<ColumnFamilyDescriptor> cf_descs;
std::vector<ColumnFamilyHandle*> handles;
options.create_if_missing = true;
// Open a DB with no wal dir set. The wal_dir should stay empty
ASSERT_OK(DB::Open(options, dbname_, &db));
delete db;
ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
ASSERT_EQ(db_opts.wal_dir, "");
// Open a DB with wal_dir == dbname. The wal_dir should be set to empty
options.wal_dir = dbname_;
ASSERT_OK(DB::Open(options, dbname_, &db));
delete db;
ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
ASSERT_EQ(db_opts.wal_dir, "");
// Open a DB with no wal_dir but a db_path==dbname_. The wal_dir should be
// empty
options.wal_dir = "";
options.db_paths.emplace_back(dbname_, std::numeric_limits<uint64_t>::max());
ASSERT_OK(DB::Open(options, dbname_, &db));
delete db;
ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
ASSERT_EQ(db_opts.wal_dir, "");
// Open a DB with no wal_dir==dbname_ and db_path==dbname_. The wal_dir
// should be empty
options.wal_dir = dbname_ + "/";
options.db_paths.emplace_back(dbname_, std::numeric_limits<uint64_t>::max());
ASSERT_OK(DB::Open(options, dbname_, &db));
delete db;
ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
ASSERT_EQ(db_opts.wal_dir, "");
ASSERT_OK(DestroyDB(dbname_, options));
// Open a DB with no wal_dir but db_path != db_name. The wal_dir == dbname_
options.wal_dir = "";
options.db_paths.clear();
options.db_paths.emplace_back(dbname_ + "_0",
std::numeric_limits<uint64_t>::max());
ASSERT_OK(DB::Open(options, dbname_, &db));
delete db;
ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
ASSERT_EQ(db_opts.wal_dir, dbname_);
ASSERT_OK(DestroyDB(dbname_, options));
// Open a DB with wal_dir != db_name. The wal_dir remains unchanged
options.wal_dir = dbname_ + "/wal";
options.db_paths.clear();
ASSERT_OK(DB::Open(options, dbname_, &db));
delete db;
ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
ASSERT_EQ(db_opts.wal_dir, dbname_ + "/wal");
ASSERT_OK(DestroyDB(dbname_, options));
}
TEST_F(OptionsUtilTest, WalDirInOptins) {
DB* db;
Options options;
DBOptions db_opts;
std::vector<ColumnFamilyDescriptor> cf_descs;
std::vector<ColumnFamilyHandle*> handles;
// Store an options file with wal_dir=dbname_ and make sure it still loads
// when the input wal_dir is empty
options.create_if_missing = true;
options.wal_dir = "";
ASSERT_OK(DB::Open(options, dbname_, &db));
delete db;
options.wal_dir = dbname_;
std::string options_file;
ASSERT_OK(GetLatestOptionsFileName(dbname_, options.env, &options_file));
ASSERT_OK(PersistRocksDBOptions(options, {"default"}, {options},
dbname_ + "/" + options_file,
options.env->GetFileSystem().get()));
ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
ASSERT_EQ(db_opts.wal_dir, dbname_);
options.wal_dir = "";
ASSERT_OK(DB::Open(options, dbname_, &db));
delete db;
ASSERT_OK(LoadLatestOptions(dbname_, options.env, &db_opts, &cf_descs));
ASSERT_EQ(db_opts.wal_dir, "");
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save