Add option for storing transaction logs in a separate dir

Summary: In some cases, you might not want to store the data log (write ahead log) files in the same dir as the sst files. An example use case is leaf, which stores sst files in tmpfs. And would like to save the log files in a separate dir (disk) to save memory.

Test Plan: make all. Ran db_test test. A few test failing. P2785018. If you guys don't see an obvious problem with the code, maybe somebody from the rocksdb team could help me debug the issue here. Running this on leaf worked well. I could see logs stored on disk, and deleted appropriately after compactions. Obviously this is only one set of options. The unit tests cover different options. Seems like I'm missing some edge cases.

Reviewers: dhruba, haobo, leveldb

CC: xinyaohu, sumeet

Differential Revision: https://reviews.facebook.net/D13239
main
Naman Gupta 11 years ago
parent 116071411b
commit cbf4a06427
  1. 8
      db/db_filesnapshot.cc
  2. 88
      db/db_impl.cc
  3. 39
      db/db_test.cc
  4. 4
      db/filename.cc
  5. 10
      db/transaction_log_impl.cc
  6. 4
      db/transaction_log_impl.h
  7. 12
      include/rocksdb/options.h
  8. 3
      util/options.cc

@ -75,7 +75,7 @@ Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
// list wal files in archive dir.
Status s;
std::string archivedir = ArchivalDirectory(dbname_);
std::string archivedir = ArchivalDirectory(options_.wal_dir);
if (env_->FileExists(archivedir)) {
s = AppendSortedWalsOfType(archivedir, files, kArchivedLogFile);
if (!s.ok()) {
@ -83,19 +83,19 @@ Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
}
}
// list wal files in main db dir.
return AppendSortedWalsOfType(dbname_, files, kAliveLogFile);
return AppendSortedWalsOfType(options_.wal_dir, files, kAliveLogFile);
}
Status DBImpl::DeleteWalFiles(const VectorLogPtr& files) {
Status s;
std::string archivedir = ArchivalDirectory(dbname_);
std::string archivedir = ArchivalDirectory(options_.wal_dir);
std::string files_not_deleted;
for (const auto& wal : files) {
/* Try deleting in the dir that pathname points to for the logfile.
This may fail if we try to delete a log file which was live when captured
but is archived now. Try deleting it from archive also
*/
Status st = env_->DeleteFile(dbname_ + "/" + wal->PathName());
Status st = env_->DeleteFile(options_.wal_dir + "/" + wal->PathName());
if (!st.ok()) {
if (wal->Type() == kAliveLogFile &&
env_->DeleteFile(LogFileName(archivedir, wal->LogNumber())).ok()) {

@ -179,6 +179,11 @@ Options SanitizeOptions(const std::string& dbname,
Log(result.info_log, "Prefix hash memtable rep is in use.");
}
}
if (result.wal_dir.empty()) {
// Use dbname as default
result.wal_dir = dbname;
}
return result;
}
@ -353,7 +358,7 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
const Status DBImpl::CreateArchivalDirectory() {
if (options_.WAL_ttl_seconds > 0) {
std::string archivalPath = ArchivalDirectory(dbname_);
std::string archivalPath = ArchivalDirectory(options_.wal_dir);
return env_->CreateDirIfMissing(archivalPath);
}
return Status::OK();
@ -419,6 +424,17 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) {
// set of all files in the directory
env_->GetChildren(dbname_, &deletion_state.allfiles); // Ignore errors
//Add log files in wal_dir
if (options_.wal_dir != dbname_) {
std::vector<std::string> log_files;
env_->GetChildren(options_.wal_dir, &log_files); // Ignore errors
deletion_state.allfiles.insert(
deletion_state.allfiles.end(),
log_files.begin(),
log_files.end()
);
}
// store the current filenum, lognum, etc
deletion_state.filenumber = versions_->ManifestFileNumber();
deletion_state.lognumber = versions_->LogNumber();
@ -427,10 +443,10 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) {
Status DBImpl::DeleteLogFile(uint64_t number) {
Status s;
auto filename = LogFileName(dbname_, number);
auto filename = LogFileName(options_.wal_dir, number);
if (options_.WAL_ttl_seconds > 0) {
s = env_->RenameFile(filename,
ArchivedLogFileName(dbname_, number));
ArchivedLogFileName(options_.wal_dir, number));
if (!s.ok()) {
Log(options_.info_log, "RenameFile logfile #%lu FAILED", number);
@ -448,7 +464,7 @@ Status DBImpl::DeleteLogFile(uint64_t number) {
// Diffs the files listed in filenames and those that do not
// belong to live files are posibly removed. If the removed file
// is a sst file, then it returns the file number in files_to_evict.
// It is not necesary to hold the mutex when invoking this method.
// It is not necessary to hold the mutex when invoking this method.
void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
uint64_t number;
FileType type;
@ -557,7 +573,7 @@ void DBImpl::PurgeObsoleteWALFiles() {
return;
}
std::vector<std::string> wal_files;
std::string archival_dir = ArchivalDirectory(dbname_);
std::string archival_dir = ArchivalDirectory(options_.wal_dir);
env_->GetChildren(archival_dir, &wal_files);
for (const auto& f : wal_files) {
uint64_t file_m_time;
@ -634,7 +650,7 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
s = env_->GetChildren(options_.wal_dir, &filenames);
if (!s.ok()) {
return s;
}
@ -701,7 +717,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
mutex_.AssertHeld();
// Open the log file
std::string fname = LogFileName(dbname_, log_number);
std::string fname = LogFileName(options_.wal_dir, log_number);
unique_ptr<SequentialFile> file;
Status status = env_->NewSequentialFile(fname, &file, storage_options_);
if (!status.ok()) {
@ -1111,7 +1127,7 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
return s;
}
iter->reset(
new TransactionLogIteratorImpl(dbname_,
new TransactionLogIteratorImpl(options_.wal_dir,
&options_,
storage_options_,
seq,
@ -1147,7 +1163,8 @@ Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs,
bool DBImpl::CheckWalFileExistsAndEmpty(const WalFileType type,
const uint64_t number) {
const std::string fname = (type == kAliveLogFile) ?
LogFileName(dbname_, number) : ArchivedLogFileName(dbname_, number);
LogFileName(options_.wal_dir, number) :
ArchivedLogFileName(options_.wal_dir, number);
uint64_t file_size;
Status s = env_->GetFileSize(fname, &file_size);
return (s.ok() && (file_size == 0));
@ -1157,23 +1174,24 @@ Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number,
WriteBatch* const result) {
if (type == kAliveLogFile) {
std::string fname = LogFileName(dbname_, number);
std::string fname = LogFileName(options_.wal_dir, number);
Status status = ReadFirstLine(fname, result);
if (!status.ok()) {
// check if the file got moved to archive.
std::string archived_file = ArchivedLogFileName(dbname_, number);
std::string archived_file =
ArchivedLogFileName(options_.wal_dir, number);
Status s = ReadFirstLine(archived_file, result);
if (!s.ok()) {
return Status::IOError("Log File has been deleted");
return Status::IOError("Log File has been deleted: " + archived_file);
}
}
return Status::OK();
} else if (type == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(dbname_, number);
std::string fname = ArchivedLogFileName(options_.wal_dir, number);
Status status = ReadFirstLine(fname, result);
return status;
}
return Status::NotSupported("File Type Not Known");
return Status::NotSupported("File Type Not Known: " + type);
}
Status DBImpl::ReadFirstLine(const std::string& fname,
@ -2811,7 +2829,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
EnvOptions soptions(storage_options_);
soptions.use_mmap_writes = false;
s = env_->NewWritableFile(
LogFileName(dbname_, new_log_number),
LogFileName(options_.wal_dir, new_log_number),
&lfile,
soptions
);
@ -3176,8 +3194,15 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
return Status::InvalidArgument(
"no_block_cache is true while block_cache is not nullptr");
}
DBImpl* impl = new DBImpl(options, dbname);
Status s = impl->CreateArchivalDirectory();
Status s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir);
if (!s.ok()) {
delete impl;
return s;
}
s = impl->CreateArchivalDirectory();
if (!s.ok()) {
delete impl;
return s;
@ -3189,8 +3214,11 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
unique_ptr<WritableFile> lfile;
soptions.use_mmap_writes = false;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile, soptions);
s = options.env->NewWritableFile(
LogFileName(impl->options_.wal_dir, new_log_number),
&lfile,
soptions
);
if (s.ok()) {
lfile->SetPreallocationBlockSize(1.1 * options.write_buffer_size);
edit.SetLogNumber(new_log_number);
@ -3231,13 +3259,24 @@ Snapshot::~Snapshot() {
}
Status DestroyDB(const std::string& dbname, const Options& options) {
Env* env = options.env;
const InternalKeyComparator comparator(options.comparator);
const InternalFilterPolicy filter_policy(options.filter_policy);
const Options& soptions(SanitizeOptions(
dbname, &comparator, &filter_policy, options));
Env* env = soptions.env;
std::vector<std::string> filenames;
std::vector<std::string> archiveFiles;
std::string archivedir = ArchivalDirectory(dbname);
// Ignore error in case directory does not exist
env->GetChildren(dbname, &filenames);
env->GetChildren(ArchivalDirectory(dbname), &archiveFiles);
if (dbname != soptions.wal_dir) {
std::vector<std::string> logfilenames;
env->GetChildren(soptions.wal_dir, &logfilenames);
filenames.insert(filenames.end(), logfilenames.begin(), logfilenames.end());
archivedir = ArchivalDirectory(soptions.wal_dir);
}
if (filenames.empty()) {
return Status::OK();
@ -3255,6 +3294,8 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
Status del;
if (type == kMetaDatabase) {
del = DestroyDB(dbname + "/" + filenames[i], options);
} else if (type == kLogFile) {
del = env->DeleteFile(soptions.wal_dir + "/" + filenames[i]);
} else {
del = env->DeleteFile(dbname + "/" + filenames[i]);
}
@ -3264,23 +3305,24 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
}
}
env->GetChildren(archivedir, &archiveFiles);
// Delete archival files.
for (size_t i = 0; i < archiveFiles.size(); ++i) {
ParseFileName(archiveFiles[i], &number, &type);
if (type == kLogFile) {
Status del = env->DeleteFile(ArchivalDirectory(dbname) + "/" +
archiveFiles[i]);
Status del = env->DeleteFile(archivedir + "/" + archiveFiles[i]);
if (result.ok() && !del.ok()) {
result = del;
}
}
}
// ignore case where no archival directory is present.
env->DeleteDir(ArchivalDirectory(dbname));
env->DeleteDir(archivedir);
env->UnlockFile(lock); // Ignore error since state is already gone
env->DeleteFile(lockname);
env->DeleteDir(dbname); // Ignore error in case dir contains other files
env->DeleteDir(soptions.wal_dir);
}
return result;
}

@ -220,6 +220,7 @@ class DBTest {
kUncompressed,
kNumLevel_3,
kDBLogDir,
kWalDir,
kManifestFileSize,
kCompactOnFlush,
kPerfOptions,
@ -286,10 +287,14 @@ class DBTest {
}
// Switch between different compaction styles (we have only 2 now).
bool ChangeCompactOptions(Options* options = nullptr) {
bool ChangeCompactOptions(Options* prev_options = nullptr) {
if (option_config_ == kDefault) {
option_config_ = kUniversalCompaction;
DestroyAndReopen(options);
if (prev_options == nullptr) {
prev_options = &last_options_;
}
Destroy(prev_options);
TryReopen();
return true;
} else {
return false;
@ -319,6 +324,9 @@ class DBTest {
case kDBLogDir:
options.db_log_dir = test::TmpDir();
break;
case kWalDir:
options.wal_dir = "/tmp/wal";
break;
case kManifestFileSize:
options.max_manifest_file_size = 50; // 50 bytes
case kCompactOnFlush:
@ -377,7 +385,7 @@ class DBTest {
return DB::Open(*options, dbname_, db);
}
Status TryReopen(Options* options) {
Status TryReopen(Options* options = nullptr) {
delete db_;
db_ = nullptr;
Options opts;
@ -529,7 +537,13 @@ class DBTest {
int CountFiles() {
std::vector<std::string> files;
env_->GetChildren(dbname_, &files);
return static_cast<int>(files.size());
std::vector<std::string> logfiles;
if (dbname_ != last_options_.wal_dir) {
env_->GetChildren(last_options_.wal_dir, &logfiles);
}
return static_cast<int>(files.size() + logfiles.size());
}
int CountLiveFiles() {
@ -902,7 +916,6 @@ TEST(DBTest, NonBlockingIteration) {
options.statistics = rocksdb::CreateDBStatistics();
non_blocking_opts.read_tier = kBlockCacheTier;
Reopen(&options);
// write one kv to the database.
ASSERT_OK(db_->Put(WriteOptions(), "a", "b"));
@ -2820,16 +2833,16 @@ TEST(DBTest, ComparatorCheck) {
BytewiseComparator()->FindShortSuccessor(key);
}
};
Options new_options;
NewComparator cmp;
do {
NewComparator cmp;
Options new_options = CurrentOptions();
new_options = CurrentOptions();
new_options.comparator = &cmp;
Status s = TryReopen(&new_options);
ASSERT_TRUE(!s.ok());
ASSERT_TRUE(s.ToString().find("comparator") != std::string::npos)
<< s.ToString();
} while (ChangeCompactOptions());
} while (ChangeCompactOptions(&new_options));
}
TEST(DBTest, CustomComparator) {
@ -2858,10 +2871,10 @@ TEST(DBTest, CustomComparator) {
return val;
}
};
Options new_options;
NumberComparator cmp;
do {
NumberComparator cmp;
Options new_options = CurrentOptions();
new_options = CurrentOptions();
new_options.create_if_missing = true;
new_options.comparator = &cmp;
new_options.filter_policy = nullptr; // Cannot use bloom filters
@ -2887,7 +2900,7 @@ TEST(DBTest, CustomComparator) {
}
Compact("[0]", "[1000000]");
}
} while (ChangeCompactOptions());
} while (ChangeCompactOptions(&new_options));
}
TEST(DBTest, ManualCompaction) {

@ -57,8 +57,8 @@ std::string LogFileName(const std::string& name, uint64_t number) {
return MakeFileName(name, number, "log");
}
std::string ArchivalDirectory(const std::string& dbname) {
return dbname + "/" + ARCHIVAL_DIR;
std::string ArchivalDirectory(const std::string& dir) {
return dir + "/" + ARCHIVAL_DIR;
}
std::string ArchivedLogFileName(const std::string& name, uint64_t number) {
assert(number > 0);

@ -4,13 +4,13 @@
namespace rocksdb {
TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dbname,
const std::string& dir,
const Options* options,
const EnvOptions& soptions,
const SequenceNumber seq,
std::unique_ptr<VectorLogPtr> files,
SequenceNumber const * const lastFlushedSequence) :
dbname_(dbname),
dir_(dir),
options_(options),
soptions_(soptions),
startingSequenceNumber_(seq),
@ -31,15 +31,15 @@ Status TransactionLogIteratorImpl::OpenLogFile(
unique_ptr<SequentialFile>* file) {
Env* env = options_->env;
if (logFile->Type() == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(dbname_, logFile->LogNumber());
std::string fname = ArchivedLogFileName(dir_, logFile->LogNumber());
return env->NewSequentialFile(fname, file, soptions_);
} else {
std::string fname = LogFileName(dbname_, logFile->LogNumber());
std::string fname = LogFileName(dir_, logFile->LogNumber());
Status status = env->NewSequentialFile(fname, file, soptions_);
if (!status.ok()) {
// If cannot open file in DB directory.
// Try the archive dir, as it could have moved in the meanwhile.
fname = ArchivedLogFileName(dbname_, logFile->LogNumber());
fname = ArchivedLogFileName(dir_, logFile->LogNumber());
status = env->NewSequentialFile(fname, file, soptions_);
if (!status.ok()) {
return Status::IOError(" Requested file not present in the dir");

@ -58,7 +58,7 @@ class LogFileImpl : public LogFile {
class TransactionLogIteratorImpl : public TransactionLogIterator {
public:
TransactionLogIteratorImpl(const std::string& dbname,
TransactionLogIteratorImpl(const std::string& dir,
const Options* options,
const EnvOptions& soptions,
const SequenceNumber seqNum,
@ -74,7 +74,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
virtual BatchResult GetBatch();
private:
const std::string& dbname_;
const std::string& dir_;
const Options* options_;
const EnvOptions& soptions_;
const SequenceNumber startingSequenceNumber_;

@ -351,13 +351,21 @@ struct Options {
// Default value is 1800 (half an hour).
int db_stats_log_interval;
// This specifies the log dir.
// This specifies the info LOG dir.
// If it is empty, the log files will be in the same dir as data.
// If it is non empty, the log files will be in the specified dir,
// and the db data dir's absolute path will be used as the log file
// name's prefix.
std::string db_log_dir;
// This specifies the absolute dir path for write-ahead logs (WAL).
// If it is empty, the log files will be in the same dir as data,
// dbname is used as the data dir by default
// If it is non empty, the log files will be in kept the specified dir.
// When destroying the db,
// all log files in wal_dir and the dir itself is deleted
std::string wal_dir;
// Disable compaction triggered by seek.
// With bloomfilter and fast storage, a miss on one level
// is very cheap if the file handle is cached in table cache
@ -461,7 +469,7 @@ struct Options {
// The number of seconds a WAL(write ahead log) should be kept after it has
// been marked as Not Live. If the value is set. The WAL files are moved to
// the archive direcotory and deleted after the given TTL.
// the archive directory and deleted after the given TTL.
// If set to 0, WAL files are deleted as soon as they are not required by
// the database.
// If set to std::numeric_limits<uint64_t>::max() the WAL files will never be

@ -51,6 +51,7 @@ Options::Options()
use_fsync(false),
db_stats_log_interval(1800),
db_log_dir(""),
wal_dir(""),
disable_seek_compaction(false),
delete_obsolete_files_period_micros(0),
max_background_compactions(1),
@ -188,6 +189,8 @@ Options::Dump(Logger* log) const
max_grandparent_overlap_factor);
Log(log," Options.db_log_dir: %s",
db_log_dir.c_str());
Log(log," Options.wal_dir: %s",
wal_dir.c_str());
Log(log," Options.disable_seek_compaction: %d",
disable_seek_compaction);
Log(log," Options.no_block_cache: %d",

Loading…
Cancel
Save