From 5ad7ee03ea9d2dd30db23e17a32c37cc3d5005a9 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 25 Feb 2014 10:38:04 -0800 Subject: [PATCH] [CF] Log deletion in column families Summary: * Added unit test that verifies that obsolete files are deleted. * Advance log number for empty column family when cutting log file. * MinLogNumber() bug fix! (caught by the new unit test) Test Plan: unit test Reviewers: dhruba, haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D16311 --- db/column_family_test.cc | 108 ++++++++++++++++++++++++++++++++++++++- db/db_impl.cc | 17 ++++-- db/version_set.h | 5 +- 3 files changed, 123 insertions(+), 7 deletions(-) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 80c5b95a9..9917ca0c1 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -11,6 +11,7 @@ #include "rocksdb/env.h" #include "rocksdb/db.h" #include "util/testharness.h" +#include "util/testutil.h" #include "utilities/merge_operators.h" #include @@ -21,9 +22,17 @@ namespace rocksdb { using namespace std; +namespace { +std::string RandomString(Random* rnd, int len) { + std::string r; + test::RandomString(rnd, len, &r); + return r; +} +} // anonymous namespace + class ColumnFamilyTest { public: - ColumnFamilyTest() { + ColumnFamilyTest() : rnd_(139) { env_ = Env::Default(); dbname_ = test::TmpDir() + "/column_family_test"; db_options_.create_if_missing = true; @@ -39,6 +48,10 @@ class ColumnFamilyTest { db_ = nullptr; } + Status Open() { + return Open({"default"}); + } + Status Open(vector cf) { vector column_families; for (auto x : cf) { @@ -48,6 +61,8 @@ class ColumnFamilyTest { return DB::Open(db_options_, dbname_, column_families, &handles_, &db_); } + DBImpl* dbfull() { return reinterpret_cast(db_); } + void Destroy() { for (auto h : handles_) { delete h; @@ -75,6 +90,18 @@ class ColumnFamilyTest { } } + void PutRandomData(int cf, int bytes) { + int num_insertions = (bytes + 99) / 100; + for (int i = 0; i < num_insertions; ++i) { + // 10 bytes key, 90 bytes value + ASSERT_OK(Put(cf, test::RandomKey(&rnd_, 10), RandomString(&rnd_, 90))); + } + } + + void WaitForFlush(int cf) { + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf])); + } + Status Put(int cf, const string& key, const string& value) { return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value)); } @@ -144,6 +171,18 @@ class ColumnFamilyTest { } } + int CountLiveLogFiles() { + int ret = 0; + VectorLogPtr wal_files; + ASSERT_OK(db_->GetSortedWalFiles(wal_files)); + for (const auto& wal : wal_files) { + if (wal->Type() == kAliveLogFile) { + ++ret; + } + } + return ret; + } + void CopyFile(const string& source, const string& destination, uint64_t size = 0) { const EnvOptions soptions; @@ -174,6 +213,7 @@ class ColumnFamilyTest { string dbname_; DB* db_ = nullptr; Env* env_; + Random rnd_; }; TEST(ColumnFamilyTest, AddDrop) { @@ -355,6 +395,72 @@ TEST(ColumnFamilyTest, FlushTest) { Close(); } +// Makes sure that obsolete log files get deleted +TEST(ColumnFamilyTest, LogDeletionTest) { + column_family_options_.write_buffer_size = 100000; // 100KB + ASSERT_OK(Open()); + CreateColumnFamilies({"one", "two", "three", "four"}); + // Each bracket is one log file. if number is in (), it means + // we don't need it anymore (it's been flushed) + // [] + ASSERT_EQ(CountLiveLogFiles(), 0); + PutRandomData(0, 100); + // [0] + PutRandomData(1, 100); + // [0, 1] + PutRandomData(1, 100000); + WaitForFlush(1); + // [0, (1)] [1] + ASSERT_EQ(CountLiveLogFiles(), 2); + PutRandomData(0, 100); + // [0, (1)] [0, 1] + ASSERT_EQ(CountLiveLogFiles(), 2); + PutRandomData(2, 100); + // [0, (1)] [0, 1, 2] + PutRandomData(2, 100000); + WaitForFlush(2); + // [0, (1)] [0, 1, (2)] [2] + ASSERT_EQ(CountLiveLogFiles(), 3); + PutRandomData(2, 100000); + WaitForFlush(2); + // [0, (1)] [0, 1, (2)] [(2)] [2] + ASSERT_EQ(CountLiveLogFiles(), 4); + PutRandomData(3, 100); + // [0, (1)] [0, 1, (2)] [(2)] [2, 3] + PutRandomData(1, 100); + // [0, (1)] [0, 1, (2)] [(2)] [1, 2, 3] + ASSERT_EQ(CountLiveLogFiles(), 4); + PutRandomData(1, 100000); + WaitForFlush(1); + // [0, (1)] [0, (1), (2)] [(2)] [(1), 2, 3] [1] + ASSERT_EQ(CountLiveLogFiles(), 5); + PutRandomData(0, 100000); + WaitForFlush(0); + // [(0), (1)] [(0), (1), (2)] [(2)] [(1), 2, 3] [1, (0)] [0] + // delete obsolete logs --> + // [(1), 2, 3] [1, (0)] [0] + ASSERT_EQ(CountLiveLogFiles(), 3); + PutRandomData(0, 100000); + WaitForFlush(0); + // [(1), 2, 3] [1, (0)], [(0)] [0] + ASSERT_EQ(CountLiveLogFiles(), 4); + PutRandomData(1, 100000); + WaitForFlush(1); + // [(1), 2, 3] [(1), (0)] [(0)] [0, (1)] [1] + ASSERT_EQ(CountLiveLogFiles(), 5); + PutRandomData(2, 100000); + WaitForFlush(2); + // [(1), (2), 3] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2] + ASSERT_EQ(CountLiveLogFiles(), 6); + PutRandomData(3, 100000); + WaitForFlush(3); + // [(1), (2), (3)] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2, (3)] [3] + // delete obsolete logs --> + // [0, (1)] [1, (2)], [2, (3)] [3] + ASSERT_EQ(CountLiveLogFiles(), 4); + Close(); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 415afe892..fa65c2446 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1112,7 +1112,8 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, mutex_.Unlock(); std::vector memtables; for (MemTable* m : mems) { - Log(options_.info_log, "Flushing memtable with next log file: %lu\n", + Log(options_.info_log, + "[CF %u] Flushing memtable with next log file: %lu\n", cfd->GetID(), (unsigned long)m->GetNextLogNumber()); memtables.push_back(m->NewIterator()); } @@ -3578,20 +3579,28 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) { if (!s.ok()) { // Avoid chewing through file number space in a tight loop. versions_->ReuseFileNumber(new_log_number); - assert (!new_mem); + assert(!new_mem); break; } logfile_number_ = new_log_number; log_.reset(new log::Writer(std::move(lfile))); cfd->mem()->SetNextLogNumber(logfile_number_); - // TODO also update log number for all column families with empty - // memtables (i.e. don't have data in the old log) cfd->imm()->Add(cfd->mem()); if (force) { cfd->imm()->FlushRequested(); } new_mem->Ref(); alive_log_files_.push_back(logfile_number_); + for (auto cfd : *versions_->GetColumnFamilySet()) { + // all this is just optimization to delete logs that + // are no longer needed -- if CF is empty, that means it + // doesn't need that particular log to stay alive, so we just + // advance the log number. no need to persist this in the manifest + if (cfd->mem()->GetFirstSequenceNumber() == 0 && + cfd->imm()->size() == 0) { + cfd->SetLogNumber(logfile_number_); + } + } cfd->SetMemtable(new_mem); Log(options_.info_log, "New memtable created with log file: #%lu\n", (unsigned long)logfile_number_); diff --git a/db/version_set.h b/db/version_set.h index 10a3a50bb..d48a3862b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "db/dbformat.h" #include "db/version_edit.h" #include "port/port.h" @@ -359,9 +360,9 @@ class VersionSet { // Returns the minimum log number such that all // log numbers less than or equal to it can be deleted uint64_t MinLogNumber() const { - uint64_t min_log_num = 0; + uint64_t min_log_num = std::numeric_limits::max(); for (auto cfd : *column_family_set_) { - if (min_log_num == 0 || min_log_num > cfd->GetLogNumber()) { + if (min_log_num > cfd->GetLogNumber()) { min_log_num = cfd->GetLogNumber(); } }