[CF] Log deletion in column families

Summary:
* Added unit test that verifies that obsolete files are deleted.
* Advance log number for empty column family when cutting log file.
* MinLogNumber() bug fix! (caught by the new unit test)

Test Plan: unit test

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16311
main
Igor Canadi 11 years ago
parent dc277f0ab7
commit 5ad7ee03ea
  1. 108
      db/column_family_test.cc
  2. 17
      db/db_impl.cc
  3. 5
      db/version_set.h

@ -11,6 +11,7 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/testutil.h"
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
#include <algorithm> #include <algorithm>
@ -21,9 +22,17 @@ namespace rocksdb {
using namespace std; using namespace std;
namespace {
std::string RandomString(Random* rnd, int len) {
std::string r;
test::RandomString(rnd, len, &r);
return r;
}
} // anonymous namespace
class ColumnFamilyTest { class ColumnFamilyTest {
public: public:
ColumnFamilyTest() { ColumnFamilyTest() : rnd_(139) {
env_ = Env::Default(); env_ = Env::Default();
dbname_ = test::TmpDir() + "/column_family_test"; dbname_ = test::TmpDir() + "/column_family_test";
db_options_.create_if_missing = true; db_options_.create_if_missing = true;
@ -39,6 +48,10 @@ class ColumnFamilyTest {
db_ = nullptr; db_ = nullptr;
} }
Status Open() {
return Open({"default"});
}
Status Open(vector<string> cf) { Status Open(vector<string> cf) {
vector<ColumnFamilyDescriptor> column_families; vector<ColumnFamilyDescriptor> column_families;
for (auto x : cf) { for (auto x : cf) {
@ -48,6 +61,8 @@ class ColumnFamilyTest {
return DB::Open(db_options_, dbname_, column_families, &handles_, &db_); return DB::Open(db_options_, dbname_, column_families, &handles_, &db_);
} }
DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_); }
void Destroy() { void Destroy() {
for (auto h : handles_) { for (auto h : handles_) {
delete h; delete h;
@ -75,6 +90,18 @@ class ColumnFamilyTest {
} }
} }
void PutRandomData(int cf, int bytes) {
int num_insertions = (bytes + 99) / 100;
for (int i = 0; i < num_insertions; ++i) {
// 10 bytes key, 90 bytes value
ASSERT_OK(Put(cf, test::RandomKey(&rnd_, 10), RandomString(&rnd_, 90)));
}
}
void WaitForFlush(int cf) {
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf]));
}
Status Put(int cf, const string& key, const string& value) { Status Put(int cf, const string& key, const string& value) {
return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value)); return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value));
} }
@ -144,6 +171,18 @@ class ColumnFamilyTest {
} }
} }
int CountLiveLogFiles() {
int ret = 0;
VectorLogPtr wal_files;
ASSERT_OK(db_->GetSortedWalFiles(wal_files));
for (const auto& wal : wal_files) {
if (wal->Type() == kAliveLogFile) {
++ret;
}
}
return ret;
}
void CopyFile(const string& source, const string& destination, void CopyFile(const string& source, const string& destination,
uint64_t size = 0) { uint64_t size = 0) {
const EnvOptions soptions; const EnvOptions soptions;
@ -174,6 +213,7 @@ class ColumnFamilyTest {
string dbname_; string dbname_;
DB* db_ = nullptr; DB* db_ = nullptr;
Env* env_; Env* env_;
Random rnd_;
}; };
TEST(ColumnFamilyTest, AddDrop) { TEST(ColumnFamilyTest, AddDrop) {
@ -355,6 +395,72 @@ TEST(ColumnFamilyTest, FlushTest) {
Close(); Close();
} }
// Makes sure that obsolete log files get deleted
TEST(ColumnFamilyTest, LogDeletionTest) {
column_family_options_.write_buffer_size = 100000; // 100KB
ASSERT_OK(Open());
CreateColumnFamilies({"one", "two", "three", "four"});
// Each bracket is one log file. if number is in (), it means
// we don't need it anymore (it's been flushed)
// []
ASSERT_EQ(CountLiveLogFiles(), 0);
PutRandomData(0, 100);
// [0]
PutRandomData(1, 100);
// [0, 1]
PutRandomData(1, 100000);
WaitForFlush(1);
// [0, (1)] [1]
ASSERT_EQ(CountLiveLogFiles(), 2);
PutRandomData(0, 100);
// [0, (1)] [0, 1]
ASSERT_EQ(CountLiveLogFiles(), 2);
PutRandomData(2, 100);
// [0, (1)] [0, 1, 2]
PutRandomData(2, 100000);
WaitForFlush(2);
// [0, (1)] [0, 1, (2)] [2]
ASSERT_EQ(CountLiveLogFiles(), 3);
PutRandomData(2, 100000);
WaitForFlush(2);
// [0, (1)] [0, 1, (2)] [(2)] [2]
ASSERT_EQ(CountLiveLogFiles(), 4);
PutRandomData(3, 100);
// [0, (1)] [0, 1, (2)] [(2)] [2, 3]
PutRandomData(1, 100);
// [0, (1)] [0, 1, (2)] [(2)] [1, 2, 3]
ASSERT_EQ(CountLiveLogFiles(), 4);
PutRandomData(1, 100000);
WaitForFlush(1);
// [0, (1)] [0, (1), (2)] [(2)] [(1), 2, 3] [1]
ASSERT_EQ(CountLiveLogFiles(), 5);
PutRandomData(0, 100000);
WaitForFlush(0);
// [(0), (1)] [(0), (1), (2)] [(2)] [(1), 2, 3] [1, (0)] [0]
// delete obsolete logs -->
// [(1), 2, 3] [1, (0)] [0]
ASSERT_EQ(CountLiveLogFiles(), 3);
PutRandomData(0, 100000);
WaitForFlush(0);
// [(1), 2, 3] [1, (0)], [(0)] [0]
ASSERT_EQ(CountLiveLogFiles(), 4);
PutRandomData(1, 100000);
WaitForFlush(1);
// [(1), 2, 3] [(1), (0)] [(0)] [0, (1)] [1]
ASSERT_EQ(CountLiveLogFiles(), 5);
PutRandomData(2, 100000);
WaitForFlush(2);
// [(1), (2), 3] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2]
ASSERT_EQ(CountLiveLogFiles(), 6);
PutRandomData(3, 100000);
WaitForFlush(3);
// [(1), (2), (3)] [(1), (0)] [(0)] [0, (1)] [1, (2)], [2, (3)] [3]
// delete obsolete logs -->
// [0, (1)] [1, (2)], [2, (3)] [3]
ASSERT_EQ(CountLiveLogFiles(), 4);
Close();
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -1112,7 +1112,8 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
mutex_.Unlock(); mutex_.Unlock();
std::vector<Iterator*> memtables; std::vector<Iterator*> memtables;
for (MemTable* m : mems) { for (MemTable* m : mems) {
Log(options_.info_log, "Flushing memtable with next log file: %lu\n", Log(options_.info_log,
"[CF %u] Flushing memtable with next log file: %lu\n", cfd->GetID(),
(unsigned long)m->GetNextLogNumber()); (unsigned long)m->GetNextLogNumber());
memtables.push_back(m->NewIterator()); memtables.push_back(m->NewIterator());
} }
@ -3578,20 +3579,28 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
if (!s.ok()) { if (!s.ok()) {
// Avoid chewing through file number space in a tight loop. // Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number); versions_->ReuseFileNumber(new_log_number);
assert (!new_mem); assert(!new_mem);
break; break;
} }
logfile_number_ = new_log_number; logfile_number_ = new_log_number;
log_.reset(new log::Writer(std::move(lfile))); log_.reset(new log::Writer(std::move(lfile)));
cfd->mem()->SetNextLogNumber(logfile_number_); cfd->mem()->SetNextLogNumber(logfile_number_);
// TODO also update log number for all column families with empty
// memtables (i.e. don't have data in the old log)
cfd->imm()->Add(cfd->mem()); cfd->imm()->Add(cfd->mem());
if (force) { if (force) {
cfd->imm()->FlushRequested(); cfd->imm()->FlushRequested();
} }
new_mem->Ref(); new_mem->Ref();
alive_log_files_.push_back(logfile_number_); alive_log_files_.push_back(logfile_number_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
// all this is just optimization to delete logs that
// are no longer needed -- if CF is empty, that means it
// doesn't need that particular log to stay alive, so we just
// advance the log number. no need to persist this in the manifest
if (cfd->mem()->GetFirstSequenceNumber() == 0 &&
cfd->imm()->size() == 0) {
cfd->SetLogNumber(logfile_number_);
}
}
cfd->SetMemtable(new_mem); cfd->SetMemtable(new_mem);
Log(options_.info_log, "New memtable created with log file: #%lu\n", Log(options_.info_log, "New memtable created with log file: #%lu\n",
(unsigned long)logfile_number_); (unsigned long)logfile_number_);

@ -24,6 +24,7 @@
#include <vector> #include <vector>
#include <deque> #include <deque>
#include <atomic> #include <atomic>
#include <limits>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/version_edit.h" #include "db/version_edit.h"
#include "port/port.h" #include "port/port.h"
@ -359,9 +360,9 @@ class VersionSet {
// Returns the minimum log number such that all // Returns the minimum log number such that all
// log numbers less than or equal to it can be deleted // log numbers less than or equal to it can be deleted
uint64_t MinLogNumber() const { uint64_t MinLogNumber() const {
uint64_t min_log_num = 0; uint64_t min_log_num = std::numeric_limits<uint64_t>::max();
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
if (min_log_num == 0 || min_log_num > cfd->GetLogNumber()) { if (min_log_num > cfd->GetLogNumber()) {
min_log_num = cfd->GetLogNumber(); min_log_num = cfd->GetLogNumber();
} }
} }

Loading…
Cancel
Save