[RocksDB] Remove Log file immediately after memtable flush

Summary: As title. The DB log file life cycle is tied up with the memtable it backs. Once the memtable is flushed to sst and committed, we should be able to delete the log file, without holding the mutex. This is part of the bigger change to avoid FindObsoleteFiles at runtime. It deals with log files. sst files will be dealt with later.

Test Plan: make check; db_bench

Reviewers: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D11709
main
Haobo Xu 12 years ago
parent 6e2b5809f6
commit 0e422308aa
  1. 7
      db/db_bench.cc
  2. 58
      db/db_impl.cc
  3. 2
      db/db_impl.h
  4. 1
      db/memtable.cc
  5. 12
      db/memtable.h
  6. 5
      include/rocksdb/options.h
  7. 5
      util/options.cc

@ -378,6 +378,9 @@ static enum RepFactory FLAGS_rep_factory;
// The possible merge operators are defined in utilities/merge_operators.h // The possible merge operators are defined in utilities/merge_operators.h
static std::string FLAGS_merge_operator = ""; static std::string FLAGS_merge_operator = "";
static auto FLAGS_purge_log_after_memtable_flush =
leveldb::Options().purge_log_after_memtable_flush;
namespace leveldb { namespace leveldb {
// Helper for quickly generating random data. // Helper for quickly generating random data.
@ -1226,6 +1229,7 @@ class Benchmark {
); );
break; break;
} }
options.purge_log_after_memtable_flush = FLAGS_purge_log_after_memtable_flush;
if (FLAGS_max_bytes_for_level_multiplier_additional.size() > 0) { if (FLAGS_max_bytes_for_level_multiplier_additional.size() > 0) {
if (FLAGS_max_bytes_for_level_multiplier_additional.size() != if (FLAGS_max_bytes_for_level_multiplier_additional.size() !=
(unsigned int)FLAGS_num_levels) { (unsigned int)FLAGS_num_levels) {
@ -2518,6 +2522,9 @@ int main(int argc, char** argv) {
FLAGS_filter_deletes = n; FLAGS_filter_deletes = n;
} else if (sscanf(argv[i], "--merge_operator=%s", buf) == 1) { } else if (sscanf(argv[i], "--merge_operator=%s", buf) == 1) {
FLAGS_merge_operator = buf; FLAGS_merge_operator = buf;
} else if (sscanf(argv[i], "--purge_log_after_memtable_flush=%d%c", &n, &junk)
== 1 && (n == 0 || n ==1 )) {
FLAGS_purge_log_after_memtable_flush = n;
} else { } else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]); fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1); exit(1);

@ -420,6 +420,26 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) {
deletion_state.prevlognumber = versions_->PrevLogNumber(); deletion_state.prevlognumber = versions_->PrevLogNumber();
} }
Status DBImpl::DeleteLogFile(uint64_t number) {
Status s;
auto filename = LogFileName(dbname_, number);
if (options_.WAL_ttl_seconds > 0) {
s = env_->RenameFile(filename,
ArchivedLogFileName(dbname_, number));
if (!s.ok()) {
Log(options_.info_log, "RenameFile logfile #%lu FAILED", number);
}
} else {
s = env_->DeleteFile(filename);
if(!s.ok()) {
Log(options_.info_log, "Delete logfile #%lu FAILED", number);
}
}
return s;
}
// Diffs the files listed in filenames and those that do not // Diffs the files listed in filenames and those that do not
// belong to live files are posibly removed. If the removed file // belong to live files are posibly removed. If the removed file
// is a sst file, then it returns the file number in files_to_evict. // is a sst file, then it returns the file number in files_to_evict.
@ -474,19 +494,9 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
state.files_to_evict.push_back(number); state.files_to_evict.push_back(number);
} }
Log(options_.info_log, "Delete type=%d #%lu", int(type), number); Log(options_.info_log, "Delete type=%d #%lu", int(type), number);
if (type == kLogFile && options_.WAL_ttl_seconds > 0) {
Status st = env_->RenameFile(
LogFileName(dbname_, number),
ArchivedLogFileName(dbname_, number)
);
if (!st.ok()) { if (type == kLogFile) {
Log( DeleteLogFile(number);
options_.info_log, "RenameFile type=%d #%lu FAILED",
int(type),
number
);
}
} else { } else {
Status st = env_->DeleteFile(dbname_ + "/" + state.allfiles[i]); Status st = env_->DeleteFile(dbname_ + "/" + state.allfiles[i]);
if (!st.ok()) { if (!st.ok()) {
@ -499,7 +509,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
} }
} }
// Delete old log files. // Delete old info log files.
size_t old_log_file_count = old_log_files.size(); size_t old_log_file_count = old_log_files.size();
// NOTE: Currently we only support log purge when options_.db_log_dir is // NOTE: Currently we only support log purge when options_.db_log_dir is
// located in `dbname` directory. // located in `dbname` directory.
@ -904,7 +914,6 @@ Status DBImpl::CompactMemTable(bool* madeProgress) {
} }
// Save the contents of the earliest memtable as a new Table // Save the contents of the earliest memtable as a new Table
// This will release and re-acquire the mutex.
uint64_t file_number; uint64_t file_number;
std::vector<MemTable*> mems; std::vector<MemTable*> mems;
imm_.PickMemtablesToFlush(&mems); imm_.PickMemtablesToFlush(&mems);
@ -918,8 +927,10 @@ Status DBImpl::CompactMemTable(bool* madeProgress) {
MemTable* m = mems[0]; MemTable* m = mems[0];
VersionEdit* edit = m->GetEdits(); VersionEdit* edit = m->GetEdits();
edit->SetPrevLogNumber(0); edit->SetPrevLogNumber(0);
edit->SetLogNumber(m->GetLogNumber()); // Earlier logs no longer needed edit->SetLogNumber(m->GetNextLogNumber()); // Earlier logs no longer needed
auto to_delete = m->GetLogNumber();
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table(mems, edit, &file_number); Status s = WriteLevel0Table(mems, edit, &file_number);
if (s.ok() && shutting_down_.Acquire_Load()) { if (s.ok() && shutting_down_.Acquire_Load()) {
@ -937,10 +948,17 @@ Status DBImpl::CompactMemTable(bool* madeProgress) {
if (madeProgress) { if (madeProgress) {
*madeProgress = 1; *madeProgress = 1;
} }
MaybeScheduleLogDBDeployStats(); MaybeScheduleLogDBDeployStats();
// we could have deleted obsolete files here, but it is not // TODO: if log deletion failed for any reason, we probably
// absolutely necessary because it could be also done as part // should store the file number in the shared state, and retry
// of other background compaction // However, for now, PurgeObsoleteFiles will take care of that
// anyways.
if (options_.purge_log_after_memtable_flush && to_delete > 0) {
mutex_.Unlock();
DeleteLogFile(to_delete);
mutex_.Lock();
}
} }
return s; return s;
} }
@ -2743,7 +2761,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size); lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size);
logfile_number_ = new_log_number; logfile_number_ = new_log_number;
log_.reset(new log::Writer(std::move(lfile))); log_.reset(new log::Writer(std::move(lfile)));
mem_->SetLogNumber(logfile_number_); mem_->SetNextLogNumber(logfile_number_);
imm_.Add(mem_); imm_.Add(mem_);
if (force) { if (force) {
imm_.FlushRequested(); imm_.FlushRequested();
@ -2751,6 +2769,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
mem_ = new MemTable(internal_comparator_, mem_rep_factory_, mem_ = new MemTable(internal_comparator_, mem_rep_factory_,
NumberLevels(), options_); NumberLevels(), options_);
mem_->Ref(); mem_->Ref();
mem_->SetLogNumber(logfile_number_);
force = false; // Do not force another compaction if have room force = false; // Do not force another compaction if have room
MaybeScheduleCompaction(); MaybeScheduleCompaction();
} }
@ -3113,6 +3132,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
s = impl->versions_->LogAndApply(&edit, &impl->mutex_); s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
} }
if (s.ok()) { if (s.ok()) {
impl->mem_->SetLogNumber(impl->logfile_number_);
impl->DeleteObsoleteFiles(); impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction(); impl->MaybeScheduleCompaction();
impl->MaybeScheduleLogDBDeployStats(); impl->MaybeScheduleLogDBDeployStats();

@ -212,6 +212,8 @@ class DBImpl : public DB {
// Removes the file listed in files_to_evict from the table_cache // Removes the file listed in files_to_evict from the table_cache
void EvictObsoleteFiles(DeletionState& deletion_state); void EvictObsoleteFiles(DeletionState& deletion_state);
Status DeleteLogFile(uint64_t number);
void PurgeObsoleteWALFiles(); void PurgeObsoleteWALFiles();
Status AppendSortedWalsOfType(const std::string& path, Status AppendSortedWalsOfType(const std::string& path,

@ -29,6 +29,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
file_number_(0), file_number_(0),
edit_(numlevel), edit_(numlevel),
first_seqno_(0), first_seqno_(0),
mem_next_logfile_number_(0),
mem_logfile_number_(0) { } mem_logfile_number_(0) { }
MemTable::~MemTable() { MemTable::~MemTable() {

@ -92,6 +92,14 @@ class MemTable {
// into the memtable // into the memtable
SequenceNumber GetFirstSequenceNumber() { return first_seqno_; } SequenceNumber GetFirstSequenceNumber() { return first_seqno_; }
// Returns the next active logfile number when this memtable is about to
// be flushed to storage
uint64_t GetNextLogNumber() { return mem_next_logfile_number_; }
// Sets the next active logfile number when this memtable is about to
// be flushed to storage
void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; }
// Returns the logfile number that can be safely deleted when this // Returns the logfile number that can be safely deleted when this
// memstore is flushed to storage // memstore is flushed to storage
uint64_t GetLogNumber() { return mem_logfile_number_; } uint64_t GetLogNumber() { return mem_logfile_number_; }
@ -127,6 +135,10 @@ class MemTable {
SequenceNumber first_seqno_; SequenceNumber first_seqno_;
// The log files earlier than this number can be deleted. // The log files earlier than this number can be deleted.
uint64_t mem_next_logfile_number_;
// The log file that backs this memtable (to be deleted when
// memtable flush is done)
uint64_t mem_logfile_number_; uint64_t mem_logfile_number_;
// No copying allowed // No copying allowed

@ -548,6 +548,11 @@ struct Options {
// an application to modify/delete a key-value during background compaction. // an application to modify/delete a key-value during background compaction.
// Default: a factory that doesn't provide any object // Default: a factory that doesn't provide any object
std::shared_ptr<CompactionFilterFactory> compaction_filter_factory; std::shared_ptr<CompactionFilterFactory> compaction_filter_factory;
// Remove the log file immediately after the corresponding memtable is flushed
// to data file.
// Default: true
bool purge_log_after_memtable_flush;
}; };
// //

@ -85,7 +85,8 @@ Options::Options()
memtable_factory(std::shared_ptr<SkipListFactory>(new SkipListFactory)), memtable_factory(std::shared_ptr<SkipListFactory>(new SkipListFactory)),
compaction_filter_factory( compaction_filter_factory(
std::shared_ptr<CompactionFilterFactory>( std::shared_ptr<CompactionFilterFactory>(
new DefaultCompactionFilterFactory())) { new DefaultCompactionFilterFactory())),
purge_log_after_memtable_flush(true) {
assert(memtable_factory.get() != nullptr); assert(memtable_factory.get() != nullptr);
} }
@ -242,6 +243,8 @@ Options::Dump(Logger* log) const
compaction_options_universal.min_merge_width); compaction_options_universal.min_merge_width);
Log(log," Options.compaction_options_universal.max_merge_width: %u", Log(log," Options.compaction_options_universal.max_merge_width: %u",
compaction_options_universal.max_merge_width); compaction_options_universal.max_merge_width);
Log(log," Options.purge_log_after_memtable_flush: %d",
purge_log_after_memtable_flush);
} // Options::Dump } // Options::Dump
// //

Loading…
Cancel
Save