diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 9291e6aaf..38656fef3 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -75,7 +75,7 @@ Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) { // list wal files in archive dir. Status s; - std::string archivedir = ArchivalDirectory(dbname_); + std::string archivedir = ArchivalDirectory(options_.wal_dir); if (env_->FileExists(archivedir)) { s = AppendSortedWalsOfType(archivedir, files, kArchivedLogFile); if (!s.ok()) { @@ -83,19 +83,19 @@ Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) { } } // list wal files in main db dir. - return AppendSortedWalsOfType(dbname_, files, kAliveLogFile); + return AppendSortedWalsOfType(options_.wal_dir, files, kAliveLogFile); } Status DBImpl::DeleteWalFiles(const VectorLogPtr& files) { Status s; - std::string archivedir = ArchivalDirectory(dbname_); + std::string archivedir = ArchivalDirectory(options_.wal_dir); std::string files_not_deleted; for (const auto& wal : files) { /* Try deleting in the dir that pathname points to for the logfile. This may fail if we try to delete a log file which was live when captured but is archived now. Try deleting it from archive also */ - Status st = env_->DeleteFile(dbname_ + "/" + wal->PathName()); + Status st = env_->DeleteFile(options_.wal_dir + "/" + wal->PathName()); if (!st.ok()) { if (wal->Type() == kAliveLogFile && env_->DeleteFile(LogFileName(archivedir, wal->LogNumber())).ok()) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 1a1a5d405..3e4984d97 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -179,6 +179,11 @@ Options SanitizeOptions(const std::string& dbname, Log(result.info_log, "Prefix hash memtable rep is in use."); } } + + if (result.wal_dir.empty()) { + // Use dbname as default + result.wal_dir = dbname; + } return result; } @@ -353,7 +358,7 @@ void DBImpl::MaybeIgnoreError(Status* s) const { const Status DBImpl::CreateArchivalDirectory() { if (options_.WAL_ttl_seconds > 0) { - std::string archivalPath = ArchivalDirectory(dbname_); + std::string archivalPath = ArchivalDirectory(options_.wal_dir); return env_->CreateDirIfMissing(archivalPath); } return Status::OK(); @@ -419,6 +424,17 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) { // set of all files in the directory env_->GetChildren(dbname_, &deletion_state.allfiles); // Ignore errors + //Add log files in wal_dir + if (options_.wal_dir != dbname_) { + std::vector log_files; + env_->GetChildren(options_.wal_dir, &log_files); // Ignore errors + deletion_state.allfiles.insert( + deletion_state.allfiles.end(), + log_files.begin(), + log_files.end() + ); + } + // store the current filenum, lognum, etc deletion_state.filenumber = versions_->ManifestFileNumber(); deletion_state.lognumber = versions_->LogNumber(); @@ -427,10 +443,10 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) { Status DBImpl::DeleteLogFile(uint64_t number) { Status s; - auto filename = LogFileName(dbname_, number); + auto filename = LogFileName(options_.wal_dir, number); if (options_.WAL_ttl_seconds > 0) { s = env_->RenameFile(filename, - ArchivedLogFileName(dbname_, number)); + ArchivedLogFileName(options_.wal_dir, number)); if (!s.ok()) { Log(options_.info_log, "RenameFile logfile #%lu FAILED", number); @@ -448,7 +464,7 @@ Status DBImpl::DeleteLogFile(uint64_t number) { // Diffs the files listed in filenames and those that do not // belong to live files are posibly removed. If the removed file // is a sst file, then it returns the file number in files_to_evict. -// It is not necesary to hold the mutex when invoking this method. +// It is not necessary to hold the mutex when invoking this method. void DBImpl::PurgeObsoleteFiles(DeletionState& state) { uint64_t number; FileType type; @@ -557,7 +573,7 @@ void DBImpl::PurgeObsoleteWALFiles() { return; } std::vector wal_files; - std::string archival_dir = ArchivalDirectory(dbname_); + std::string archival_dir = ArchivalDirectory(options_.wal_dir); env_->GetChildren(archival_dir, &wal_files); for (const auto& f : wal_files) { uint64_t file_m_time; @@ -634,7 +650,7 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table, const uint64_t min_log = versions_->LogNumber(); const uint64_t prev_log = versions_->PrevLogNumber(); std::vector filenames; - s = env_->GetChildren(dbname_, &filenames); + s = env_->GetChildren(options_.wal_dir, &filenames); if (!s.ok()) { return s; } @@ -701,7 +717,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, mutex_.AssertHeld(); // Open the log file - std::string fname = LogFileName(dbname_, log_number); + std::string fname = LogFileName(options_.wal_dir, log_number); unique_ptr file; Status status = env_->NewSequentialFile(fname, &file, storage_options_); if (!status.ok()) { @@ -1111,7 +1127,7 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, return s; } iter->reset( - new TransactionLogIteratorImpl(dbname_, + new TransactionLogIteratorImpl(options_.wal_dir, &options_, storage_options_, seq, @@ -1147,7 +1163,8 @@ Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs, bool DBImpl::CheckWalFileExistsAndEmpty(const WalFileType type, const uint64_t number) { const std::string fname = (type == kAliveLogFile) ? - LogFileName(dbname_, number) : ArchivedLogFileName(dbname_, number); + LogFileName(options_.wal_dir, number) : + ArchivedLogFileName(options_.wal_dir, number); uint64_t file_size; Status s = env_->GetFileSize(fname, &file_size); return (s.ok() && (file_size == 0)); @@ -1157,23 +1174,24 @@ Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number, WriteBatch* const result) { if (type == kAliveLogFile) { - std::string fname = LogFileName(dbname_, number); + std::string fname = LogFileName(options_.wal_dir, number); Status status = ReadFirstLine(fname, result); if (!status.ok()) { // check if the file got moved to archive. - std::string archived_file = ArchivedLogFileName(dbname_, number); + std::string archived_file = + ArchivedLogFileName(options_.wal_dir, number); Status s = ReadFirstLine(archived_file, result); if (!s.ok()) { - return Status::IOError("Log File has been deleted"); + return Status::IOError("Log File has been deleted: " + archived_file); } } return Status::OK(); } else if (type == kArchivedLogFile) { - std::string fname = ArchivedLogFileName(dbname_, number); + std::string fname = ArchivedLogFileName(options_.wal_dir, number); Status status = ReadFirstLine(fname, result); return status; } - return Status::NotSupported("File Type Not Known"); + return Status::NotSupported("File Type Not Known: " + type); } Status DBImpl::ReadFirstLine(const std::string& fname, @@ -2811,7 +2829,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { EnvOptions soptions(storage_options_); soptions.use_mmap_writes = false; s = env_->NewWritableFile( - LogFileName(dbname_, new_log_number), + LogFileName(options_.wal_dir, new_log_number), &lfile, soptions ); @@ -3176,8 +3194,15 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { return Status::InvalidArgument( "no_block_cache is true while block_cache is not nullptr"); } + DBImpl* impl = new DBImpl(options, dbname); - Status s = impl->CreateArchivalDirectory(); + Status s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir); + if (!s.ok()) { + delete impl; + return s; + } + + s = impl->CreateArchivalDirectory(); if (!s.ok()) { delete impl; return s; @@ -3189,8 +3214,11 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { uint64_t new_log_number = impl->versions_->NewFileNumber(); unique_ptr lfile; soptions.use_mmap_writes = false; - s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), - &lfile, soptions); + s = options.env->NewWritableFile( + LogFileName(impl->options_.wal_dir, new_log_number), + &lfile, + soptions + ); if (s.ok()) { lfile->SetPreallocationBlockSize(1.1 * options.write_buffer_size); edit.SetLogNumber(new_log_number); @@ -3231,13 +3259,24 @@ Snapshot::~Snapshot() { } Status DestroyDB(const std::string& dbname, const Options& options) { - Env* env = options.env; + const InternalKeyComparator comparator(options.comparator); + const InternalFilterPolicy filter_policy(options.filter_policy); + const Options& soptions(SanitizeOptions( + dbname, &comparator, &filter_policy, options)); + Env* env = soptions.env; std::vector filenames; std::vector archiveFiles; + std::string archivedir = ArchivalDirectory(dbname); // Ignore error in case directory does not exist env->GetChildren(dbname, &filenames); - env->GetChildren(ArchivalDirectory(dbname), &archiveFiles); + + if (dbname != soptions.wal_dir) { + std::vector logfilenames; + env->GetChildren(soptions.wal_dir, &logfilenames); + filenames.insert(filenames.end(), logfilenames.begin(), logfilenames.end()); + archivedir = ArchivalDirectory(soptions.wal_dir); + } if (filenames.empty()) { return Status::OK(); @@ -3255,6 +3294,8 @@ Status DestroyDB(const std::string& dbname, const Options& options) { Status del; if (type == kMetaDatabase) { del = DestroyDB(dbname + "/" + filenames[i], options); + } else if (type == kLogFile) { + del = env->DeleteFile(soptions.wal_dir + "/" + filenames[i]); } else { del = env->DeleteFile(dbname + "/" + filenames[i]); } @@ -3264,23 +3305,24 @@ Status DestroyDB(const std::string& dbname, const Options& options) { } } + env->GetChildren(archivedir, &archiveFiles); // Delete archival files. for (size_t i = 0; i < archiveFiles.size(); ++i) { ParseFileName(archiveFiles[i], &number, &type); if (type == kLogFile) { - Status del = env->DeleteFile(ArchivalDirectory(dbname) + "/" + - archiveFiles[i]); + Status del = env->DeleteFile(archivedir + "/" + archiveFiles[i]); if (result.ok() && !del.ok()) { result = del; } } } // ignore case where no archival directory is present. - env->DeleteDir(ArchivalDirectory(dbname)); + env->DeleteDir(archivedir); env->UnlockFile(lock); // Ignore error since state is already gone env->DeleteFile(lockname); env->DeleteDir(dbname); // Ignore error in case dir contains other files + env->DeleteDir(soptions.wal_dir); } return result; } diff --git a/db/db_test.cc b/db/db_test.cc index 17e884a3e..d69280e8a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -220,6 +220,7 @@ class DBTest { kUncompressed, kNumLevel_3, kDBLogDir, + kWalDir, kManifestFileSize, kCompactOnFlush, kPerfOptions, @@ -286,10 +287,14 @@ class DBTest { } // Switch between different compaction styles (we have only 2 now). - bool ChangeCompactOptions(Options* options = nullptr) { + bool ChangeCompactOptions(Options* prev_options = nullptr) { if (option_config_ == kDefault) { option_config_ = kUniversalCompaction; - DestroyAndReopen(options); + if (prev_options == nullptr) { + prev_options = &last_options_; + } + Destroy(prev_options); + TryReopen(); return true; } else { return false; @@ -319,6 +324,9 @@ class DBTest { case kDBLogDir: options.db_log_dir = test::TmpDir(); break; + case kWalDir: + options.wal_dir = "/tmp/wal"; + break; case kManifestFileSize: options.max_manifest_file_size = 50; // 50 bytes case kCompactOnFlush: @@ -377,7 +385,7 @@ class DBTest { return DB::Open(*options, dbname_, db); } - Status TryReopen(Options* options) { + Status TryReopen(Options* options = nullptr) { delete db_; db_ = nullptr; Options opts; @@ -529,7 +537,13 @@ class DBTest { int CountFiles() { std::vector files; env_->GetChildren(dbname_, &files); - return static_cast(files.size()); + + std::vector logfiles; + if (dbname_ != last_options_.wal_dir) { + env_->GetChildren(last_options_.wal_dir, &logfiles); + } + + return static_cast(files.size() + logfiles.size()); } int CountLiveFiles() { @@ -902,7 +916,6 @@ TEST(DBTest, NonBlockingIteration) { options.statistics = rocksdb::CreateDBStatistics(); non_blocking_opts.read_tier = kBlockCacheTier; Reopen(&options); - // write one kv to the database. ASSERT_OK(db_->Put(WriteOptions(), "a", "b")); @@ -2820,16 +2833,16 @@ TEST(DBTest, ComparatorCheck) { BytewiseComparator()->FindShortSuccessor(key); } }; - + Options new_options; + NewComparator cmp; do { - NewComparator cmp; - Options new_options = CurrentOptions(); + new_options = CurrentOptions(); new_options.comparator = &cmp; Status s = TryReopen(&new_options); ASSERT_TRUE(!s.ok()); ASSERT_TRUE(s.ToString().find("comparator") != std::string::npos) << s.ToString(); - } while (ChangeCompactOptions()); + } while (ChangeCompactOptions(&new_options)); } TEST(DBTest, CustomComparator) { @@ -2858,10 +2871,10 @@ TEST(DBTest, CustomComparator) { return val; } }; - + Options new_options; + NumberComparator cmp; do { - NumberComparator cmp; - Options new_options = CurrentOptions(); + new_options = CurrentOptions(); new_options.create_if_missing = true; new_options.comparator = &cmp; new_options.filter_policy = nullptr; // Cannot use bloom filters @@ -2887,7 +2900,7 @@ TEST(DBTest, CustomComparator) { } Compact("[0]", "[1000000]"); } - } while (ChangeCompactOptions()); + } while (ChangeCompactOptions(&new_options)); } TEST(DBTest, ManualCompaction) { diff --git a/db/filename.cc b/db/filename.cc index 170b25069..acba11425 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -57,8 +57,8 @@ std::string LogFileName(const std::string& name, uint64_t number) { return MakeFileName(name, number, "log"); } -std::string ArchivalDirectory(const std::string& dbname) { - return dbname + "/" + ARCHIVAL_DIR; +std::string ArchivalDirectory(const std::string& dir) { + return dir + "/" + ARCHIVAL_DIR; } std::string ArchivedLogFileName(const std::string& name, uint64_t number) { assert(number > 0); diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index b8bb3c253..5ce89b3d4 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -4,13 +4,13 @@ namespace rocksdb { TransactionLogIteratorImpl::TransactionLogIteratorImpl( - const std::string& dbname, + const std::string& dir, const Options* options, const EnvOptions& soptions, const SequenceNumber seq, std::unique_ptr files, SequenceNumber const * const lastFlushedSequence) : - dbname_(dbname), + dir_(dir), options_(options), soptions_(soptions), startingSequenceNumber_(seq), @@ -31,15 +31,15 @@ Status TransactionLogIteratorImpl::OpenLogFile( unique_ptr* file) { Env* env = options_->env; if (logFile->Type() == kArchivedLogFile) { - std::string fname = ArchivedLogFileName(dbname_, logFile->LogNumber()); + std::string fname = ArchivedLogFileName(dir_, logFile->LogNumber()); return env->NewSequentialFile(fname, file, soptions_); } else { - std::string fname = LogFileName(dbname_, logFile->LogNumber()); + std::string fname = LogFileName(dir_, logFile->LogNumber()); Status status = env->NewSequentialFile(fname, file, soptions_); if (!status.ok()) { // If cannot open file in DB directory. // Try the archive dir, as it could have moved in the meanwhile. - fname = ArchivedLogFileName(dbname_, logFile->LogNumber()); + fname = ArchivedLogFileName(dir_, logFile->LogNumber()); status = env->NewSequentialFile(fname, file, soptions_); if (!status.ok()) { return Status::IOError(" Requested file not present in the dir"); diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index a8d155126..9b4d7c9b5 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -58,7 +58,7 @@ class LogFileImpl : public LogFile { class TransactionLogIteratorImpl : public TransactionLogIterator { public: - TransactionLogIteratorImpl(const std::string& dbname, + TransactionLogIteratorImpl(const std::string& dir, const Options* options, const EnvOptions& soptions, const SequenceNumber seqNum, @@ -74,7 +74,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { virtual BatchResult GetBatch(); private: - const std::string& dbname_; + const std::string& dir_; const Options* options_; const EnvOptions& soptions_; const SequenceNumber startingSequenceNumber_; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 75d0775cd..14d98bb25 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -351,13 +351,21 @@ struct Options { // Default value is 1800 (half an hour). int db_stats_log_interval; - // This specifies the log dir. + // This specifies the info LOG dir. // If it is empty, the log files will be in the same dir as data. // If it is non empty, the log files will be in the specified dir, // and the db data dir's absolute path will be used as the log file // name's prefix. std::string db_log_dir; + // This specifies the absolute dir path for write-ahead logs (WAL). + // If it is empty, the log files will be in the same dir as data, + // dbname is used as the data dir by default + // If it is non empty, the log files will be in kept the specified dir. + // When destroying the db, + // all log files in wal_dir and the dir itself is deleted + std::string wal_dir; + // Disable compaction triggered by seek. // With bloomfilter and fast storage, a miss on one level // is very cheap if the file handle is cached in table cache @@ -461,7 +469,7 @@ struct Options { // The number of seconds a WAL(write ahead log) should be kept after it has // been marked as Not Live. If the value is set. The WAL files are moved to - // the archive direcotory and deleted after the given TTL. + // the archive directory and deleted after the given TTL. // If set to 0, WAL files are deleted as soon as they are not required by // the database. // If set to std::numeric_limits::max() the WAL files will never be diff --git a/util/options.cc b/util/options.cc index c54e3045c..fe7a3689d 100644 --- a/util/options.cc +++ b/util/options.cc @@ -51,6 +51,7 @@ Options::Options() use_fsync(false), db_stats_log_interval(1800), db_log_dir(""), + wal_dir(""), disable_seek_compaction(false), delete_obsolete_files_period_micros(0), max_background_compactions(1), @@ -188,6 +189,8 @@ Options::Dump(Logger* log) const max_grandparent_overlap_factor); Log(log," Options.db_log_dir: %s", db_log_dir.c_str()); + Log(log," Options.wal_dir: %s", + wal_dir.c_str()); Log(log," Options.disable_seek_compaction: %d", disable_seek_compaction); Log(log," Options.no_block_cache: %d",