[CF] Better handling of memtable logs

Summary: DBImpl now keeps a list of alive_log_files_. On every FindObsoleteFiles, it deletes all alive log files that are smaller than versions_->MinLogNumber()

Test Plan:
make check passes
no specific unit tests yet, will add

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16293
main
Igor Canadi 11 years ago
parent d39da4b578
commit b69e7d99d5
  1. 25
      db/db_impl.cc
  2. 1
      db/db_impl.h
  3. 1
      db/memtable.cc
  4. 12
      db/memtable.h

@ -1112,9 +1112,8 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
mutex_.Unlock();
std::vector<Iterator*> memtables;
for (MemTable* m : mems) {
Log(options_.info_log,
"Flushing memtable with log file: %lu\n",
(unsigned long)m->GetLogNumber());
Log(options_.info_log, "Flushing memtable with next log file: %lu\n",
(unsigned long)m->GetNextLogNumber());
memtables.push_back(m->NewIterator());
}
Iterator* iter = NewMergingIterator(env_, &cfd->internal_comparator(),
@ -1206,11 +1205,6 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
edit->SetLogNumber(mems.back()->GetNextLogNumber());
edit->SetColumnFamily(cfd->GetID());
std::vector<uint64_t> logs_to_delete;
for (auto mem : mems) {
logs_to_delete.push_back(mem->GetLogNumber());
}
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table(cfd, mems, edit, &file_number);
@ -1239,10 +1233,11 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
if (disable_delete_obsolete_files_ == 0) {
// add to deletion state
deletion_state.log_delete_files.insert(
deletion_state.log_delete_files.end(),
logs_to_delete.begin(),
logs_to_delete.end());
while (alive_log_files_.size() &&
*alive_log_files_.begin() < versions_->MinLogNumber()) {
deletion_state.log_delete_files.push_back(*alive_log_files_.begin());
alive_log_files_.pop_front();
}
}
}
return s;
@ -3589,12 +3584,14 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
logfile_number_ = new_log_number;
log_.reset(new log::Writer(std::move(lfile)));
cfd->mem()->SetNextLogNumber(logfile_number_);
// TODO also update log number for all column families with empty
// memtables (i.e. don't have data in the old log)
cfd->imm()->Add(cfd->mem());
if (force) {
cfd->imm()->FlushRequested();
}
new_mem->Ref();
new_mem->SetLogNumber(logfile_number_);
alive_log_files_.push_back(logfile_number_);
cfd->SetMemtable(new_mem);
Log(options_.info_log, "New memtable created with log file: #%lu\n",
(unsigned long)logfile_number_);
@ -3936,7 +3933,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
delete cfd->InstallSuperVersion(new SuperVersion());
cfd->mem()->SetLogNumber(impl->logfile_number_);
impl->alive_log_files_.push_back(impl->logfile_number_);
}
impl->DeleteObsoleteFiles();
impl->MaybeScheduleFlushOrCompaction();

@ -386,6 +386,7 @@ class DBImpl : public DB {
unique_ptr<log::Writer> log_;
ColumnFamilyHandleImpl* default_cf_handle_;
unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
std::deque<uint32_t> alive_log_files_;
std::string host_name_;

@ -48,7 +48,6 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
file_number_(0),
first_seqno_(0),
mem_next_logfile_number_(0),
mem_logfile_number_(0),
locks_(options.inplace_update_support ? options.inplace_update_num_locks
: 0),
prefix_extractor_(options.prefix_extractor) {

@ -143,14 +143,6 @@ class MemTable {
// be flushed to storage
void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; }
// Returns the logfile number that can be safely deleted when this
// memstore is flushed to storage
uint64_t GetLogNumber() { return mem_logfile_number_; }
// Sets the logfile number that can be safely deleted when this
// memstore is flushed to storage
void SetLogNumber(uint64_t num) { mem_logfile_number_ = num; }
// Notify the underlying storage that no more items will be added
void MarkImmutable() { table_->MarkReadOnly(); }
@ -186,10 +178,6 @@ class MemTable {
// The log files earlier than this number can be deleted.
uint64_t mem_next_logfile_number_;
// The log file that backs this memtable (to be deleted when
// memtable flush is done)
uint64_t mem_logfile_number_;
// rw locks for inplace updates
std::vector<port::RWMutex> locks_;

Loading…
Cancel
Save