Don't roll empty logs

Summary:
With multiple column families, especially when manual Flush is executed, we might roll the log file, although the current log file is empty (no data has been written to the log).

After the diff, we won't create new log file if current is empty.

Next, I will write an algorithm that will flush column families that reference old log files (i.e., that weren't flushed in a while)

Test Plan: Added an unit test. Confirmed that unit test failes in master

Reviewers: dhruba, haobo, ljin, sdong

Reviewed By: ljin

CC: leveldb

Differential Revision: https://reviews.facebook.net/D17631
main
Igor Canadi 11 years ago
parent c87ed0942c
commit e6acb874cd
  1. 42
      db/column_family_test.cc
  2. 67
      db/db_impl.cc
  3. 1
      db/db_impl.h
  4. 2
      db/version_set.cc

@ -29,12 +29,31 @@ std::string RandomString(Random* rnd, int len) {
} }
} // anonymous namespace } // anonymous namespace
// counts how many operations were performed
class EnvCounter : public EnvWrapper {
public:
explicit EnvCounter(Env* base)
: EnvWrapper(base), num_new_writable_file_(0) {}
int GetNumberOfNewWritableFileCalls() {
return num_new_writable_file_;
}
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
const EnvOptions& soptions) {
++num_new_writable_file_;
return EnvWrapper::NewWritableFile(f, r, soptions);
}
private:
int num_new_writable_file_;
};
class ColumnFamilyTest { class ColumnFamilyTest {
public: public:
ColumnFamilyTest() : rnd_(139) { ColumnFamilyTest() : rnd_(139) {
env_ = Env::Default(); env_ = new EnvCounter(Env::Default());
dbname_ = test::TmpDir() + "/column_family_test"; dbname_ = test::TmpDir() + "/column_family_test";
db_options_.create_if_missing = true; db_options_.create_if_missing = true;
db_options_.env = env_;
DestroyDB(dbname_, Options(db_options_, column_family_options_)); DestroyDB(dbname_, Options(db_options_, column_family_options_));
} }
@ -299,7 +318,7 @@ class ColumnFamilyTest {
DBOptions db_options_; DBOptions db_options_;
std::string dbname_; std::string dbname_;
DB* db_ = nullptr; DB* db_ = nullptr;
Env* env_; EnvCounter* env_;
Random rnd_; Random rnd_;
}; };
@ -895,6 +914,25 @@ TEST(ColumnFamilyTest, ReadOnlyDBTest) {
ASSERT_TRUE(!s.ok()); ASSERT_TRUE(!s.ok());
} }
TEST(ColumnFamilyTest, DontRollEmptyLogs) {
Open();
CreateColumnFamiliesAndReopen({"one", "two", "three", "four"});
for (int i = 0; i < handles_.size(); ++i) {
PutRandomData(i, 10, 100);
}
int num_writable_file_start = env_->GetNumberOfNewWritableFileCalls();
// this will trigger the flushes
ASSERT_OK(db_->Write(WriteOptions(), nullptr));
for (int i = 0; i < 4; ++i) {
dbfull()->TEST_WaitForFlushMemTable(handles_[i]);
}
int total_new_writable_files =
env_->GetNumberOfNewWritableFileCalls() - num_writable_file_start;
ASSERT_EQ(total_new_writable_files, handles_.size() + 1);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -349,6 +349,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
shutting_down_(nullptr), shutting_down_(nullptr),
bg_cv_(&mutex_), bg_cv_(&mutex_),
logfile_number_(0), logfile_number_(0),
log_empty_(true),
default_cf_handle_(nullptr), default_cf_handle_(nullptr),
tmp_batch_(), tmp_batch_(),
bg_schedule_needed_(false), bg_schedule_needed_(false),
@ -3785,6 +3786,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
RecordTick(options_.statistics.get(), WAL_FILE_SYNCED, 1); RecordTick(options_.statistics.get(), WAL_FILE_SYNCED, 1);
RecordTick(options_.statistics.get(), WAL_FILE_BYTES, log_entry.size()); RecordTick(options_.statistics.get(), WAL_FILE_BYTES, log_entry.size());
if (status.ok() && options.sync) { if (status.ok() && options.sync) {
log_empty_ = false;
if (options_.use_fsync) { if (options_.use_fsync) {
StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS); StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS);
status = log_->file()->Fsync(); status = log_->file()->Fsync();
@ -4057,57 +4059,66 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
// Attempt to switch to a new memtable and trigger flush of old. // Attempt to switch to a new memtable and trigger flush of old.
// Do this without holding the dbmutex lock. // Do this without holding the dbmutex lock.
assert(versions_->PrevLogNumber() == 0); assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber(); bool creating_new_log = !log_empty_;
uint64_t new_log_number =
creating_new_log ? versions_->NewFileNumber() : logfile_number_;
SuperVersion* new_superversion = nullptr; SuperVersion* new_superversion = nullptr;
mutex_.Unlock(); mutex_.Unlock();
{ {
DelayLoggingAndReset(); DelayLoggingAndReset();
s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number), if (creating_new_log) {
&lfile, s = env_->NewWritableFile(
env_->OptimizeForLogWrite(storage_options_)); LogFileName(options_.wal_dir, new_log_number), &lfile,
env_->OptimizeForLogWrite(storage_options_));
if (s.ok()) {
// Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution.
lfile->SetPreallocationBlockSize(1.1 *
cfd->options()->write_buffer_size);
new_log = new log::Writer(std::move(lfile));
}
}
if (s.ok()) { if (s.ok()) {
// Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution.
lfile->SetPreallocationBlockSize(1.1 *
cfd->options()->write_buffer_size);
new_log = new log::Writer(std::move(lfile));
new_mem = new MemTable(cfd->internal_comparator(), *cfd->options()); new_mem = new MemTable(cfd->internal_comparator(), *cfd->options());
new_superversion = new SuperVersion(); new_superversion = new SuperVersion();
} }
Log(options_.info_log,
"New memtable created with log file: #%lu\n",
(unsigned long)new_log_number);
} }
mutex_.Lock(); mutex_.Lock();
if (!s.ok()) { if (!s.ok()) {
// how do we fail if we're not creating new log?
assert(creating_new_log);
// Avoid chewing through file number space in a tight loop. // Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number); versions_->ReuseFileNumber(new_log_number);
assert(!new_mem); assert(!new_mem);
assert(!new_log); assert(!new_log);
break; break;
} }
logfile_number_ = new_log_number; if (creating_new_log) {
assert(new_log != nullptr); logfile_number_ = new_log_number;
// TODO(icanadi) delete outside of mutex assert(new_log != nullptr);
delete log_.release(); // TODO(icanadi) delete outside of mutex
log_.reset(new_log); delete log_.release();
log_.reset(new_log);
log_empty_ = true;
alive_log_files_.push_back(logfile_number_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
// all this is just optimization to delete logs that
// are no longer needed -- if CF is empty, that means it
// doesn't need that particular log to stay alive, so we just
// advance the log number. no need to persist this in the manifest
if (cfd->mem()->GetFirstSequenceNumber() == 0 &&
cfd->imm()->size() == 0) {
cfd->SetLogNumber(logfile_number_);
}
}
}
cfd->mem()->SetNextLogNumber(logfile_number_); cfd->mem()->SetNextLogNumber(logfile_number_);
cfd->imm()->Add(cfd->mem()); cfd->imm()->Add(cfd->mem());
if (force) { if (force) {
cfd->imm()->FlushRequested(); cfd->imm()->FlushRequested();
} }
new_mem->Ref(); new_mem->Ref();
alive_log_files_.push_back(logfile_number_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
// all this is just optimization to delete logs that
// are no longer needed -- if CF is empty, that means it
// doesn't need that particular log to stay alive, so we just
// advance the log number. no need to persist this in the manifest
if (cfd->mem()->GetFirstSequenceNumber() == 0 &&
cfd->imm()->size() == 0) {
cfd->SetLogNumber(logfile_number_);
}
}
cfd->SetMemtable(new_mem); cfd->SetMemtable(new_mem);
Log(options_.info_log, Log(options_.info_log,
"[CF %" PRIu32 "] New memtable created with log file: #%lu\n", "[CF %" PRIu32 "] New memtable created with log file: #%lu\n",

@ -421,6 +421,7 @@ class DBImpl : public DB {
port::CondVar bg_cv_; // Signalled when background work finishes port::CondVar bg_cv_; // Signalled when background work finishes
uint64_t logfile_number_; uint64_t logfile_number_;
unique_ptr<log::Writer> log_; unique_ptr<log::Writer> log_;
bool log_empty_;
ColumnFamilyHandleImpl* default_cf_handle_; ColumnFamilyHandleImpl* default_cf_handle_;
unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_; unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
std::deque<uint64_t> alive_log_files_; std::deque<uint64_t> alive_log_files_;

@ -1742,7 +1742,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
} }
} }
if (max_log_number_in_batch != 0) { if (max_log_number_in_batch != 0) {
assert(column_family_data->GetLogNumber() < max_log_number_in_batch); assert(column_family_data->GetLogNumber() <= max_log_number_in_batch);
column_family_data->SetLogNumber(max_log_number_in_batch); column_family_data->SetLogNumber(max_log_number_in_batch);
} }
AppendVersion(column_family_data, v); AppendVersion(column_family_data, v);

Loading…
Cancel
Save