From d55c2ba305f88106ab37c8981538fc4f0cba9a7e Mon Sep 17 00:00:00 2001 From: heyongqiang Date: Wed, 31 Oct 2012 11:47:18 -0700 Subject: [PATCH 1/4] Add a tool to change number of levels Summary: as subject. Test Plan: manually test it, will add a testcase Reviewers: dhruba, MarkCallaghan Differential Revision: https://reviews.facebook.net/D6345 --- Makefile | 4 + db/version_set.cc | 45 +++-- db/version_set.h | 18 +- db/version_set_reduce_num_levels.cc | 70 +++++++ tools/ldb.cc | 260 +++++------------------- tools/reduce_levels_test.cc | 200 +++++++++++++++++++ util/ldb_cmd.cc | 294 ++++++++++++++++++++++++++++ util/ldb_cmd.h | 248 +++++++++++++++++++++++ 8 files changed, 907 insertions(+), 232 deletions(-) create mode 100644 db/version_set_reduce_num_levels.cc create mode 100644 tools/reduce_levels_test.cc create mode 100644 util/ldb_cmd.cc create mode 100644 util/ldb_cmd.h diff --git a/Makefile b/Makefile index 56b042776..9b8ec13e8 100644 --- a/Makefile +++ b/Makefile @@ -51,6 +51,7 @@ TESTS = \ table_test \ version_edit_test \ version_set_test \ + reduce_levels_test \ write_batch_test \ filelock_test @@ -173,6 +174,9 @@ version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS) version_set_test: db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LDFLAGS) +reduce_levels_test: tools/reduce_levels_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) tools/reduce_levels_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LDFLAGS) + write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LDFLAGS) diff --git a/db/version_set.cc b/db/version_set.cc index f11d62321..7d4f9392d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -437,8 +437,8 @@ int Version::PickLevelForMemTableOutput( break; } if (level + 2 >= vset_->NumberLevels()) { - level++; - break; + level++; + break; } GetOverlappingInputs(level + 2, &start, &limit, &overlaps); const int64_t sum = TotalFileSize(overlaps); @@ -714,21 +714,10 @@ VersionSet::VersionSet(const std::string& dbname, descriptor_file_(NULL), descriptor_log_(NULL), dummy_versions_(this), - current_(NULL) { + current_(NULL), + num_levels_(options_->num_levels) { compact_pointer_ = new std::string[options_->num_levels]; - max_file_size_ = new uint64_t[options_->num_levels]; - level_max_bytes_ = new uint64_t[options->num_levels]; - int target_file_size_multiplier = options_->target_file_size_multiplier; - int max_bytes_multiplier = options_->max_bytes_for_level_multiplier; - for (int i = 0; i < options_->num_levels; i++) { - if (i > 1) { - max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier; - level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier; - } else { - max_file_size_[i] = options_->target_file_size_base; - level_max_bytes_[i] = options_->max_bytes_for_level_base; - } - } + Init(options_->num_levels); AppendVersion(new Version(this)); } @@ -742,6 +731,22 @@ VersionSet::~VersionSet() { delete descriptor_file_; } +void VersionSet::Init(int num_levels) { + max_file_size_ = new uint64_t[num_levels]; + level_max_bytes_ = new uint64_t[num_levels]; + int target_file_size_multiplier = options_->target_file_size_multiplier; + int max_bytes_multiplier = options_->max_bytes_for_level_multiplier; + for (int i = 0; i < num_levels; i++) { + if (i > 1) { + max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier; + level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier; + } else { + max_file_size_[i] = options_->target_file_size_base; + level_max_bytes_[i] = options_->max_bytes_for_level_base; + } + } +} + void VersionSet::AppendVersion(Version* v) { // Make "v" current assert(v->refs_ == 0); @@ -759,7 +764,8 @@ void VersionSet::AppendVersion(Version* v) { v->next_->prev_ = v; } -Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { +Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, + bool new_descriptor_log) { if (edit->has_log_number_) { assert(edit->log_number_ >= log_number_); assert(edit->log_number_ < next_file_number_); @@ -787,10 +793,10 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { std::string new_manifest_file; uint64_t new_manifest_file_size = 0; Status s; - if (descriptor_log_ == NULL) { + if (descriptor_log_ == NULL || new_descriptor_log) { // No reason to unlock *mu here since we only hit this path in the // first call to LogAndApply (when opening the database). - assert(descriptor_file_ == NULL); + assert(descriptor_file_ == NULL || new_descriptor_log) new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); edit->SetNextFile(next_file_number_); s = env_->NewWritableFile(new_manifest_file, &descriptor_file_); @@ -1090,7 +1096,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname) { printf("%s \n", v->DebugString().c_str()); } - return s; } diff --git a/db/version_set.h b/db/version_set.h index 66611fdb8..79323ecb4 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -21,6 +21,7 @@ #include "db/dbformat.h" #include "db/version_edit.h" #include "port/port.h" +#include "db/table_cache.h" namespace leveldb { @@ -156,11 +157,20 @@ class VersionSet { // current version. Will release *mu while actually writing to the file. // REQUIRES: *mu is held on entry. // REQUIRES: no other thread concurrently calls LogAndApply() - Status LogAndApply(VersionEdit* edit, port::Mutex* mu); + Status LogAndApply(VersionEdit* edit, port::Mutex* mu, + bool new_descriptor_log = false); // Recover the last saved descriptor from persistent storage. Status Recover(); + // Try to reduce the number of levels. This call is valid when + // only one level from the new max level to the old + // max level containing files. + // For example, a db currently has 7 levels [0-6], and a call to + // to reduce to 5 [0-4] can only be executed when only one level + // among [4-6] contains files. + Status ReduceNumberOfLevels(int new_levels, port::Mutex* mu); + // Return the current version. Version* current() const { return current_; } @@ -204,7 +214,7 @@ class VersionSet { // being compacted, or zero if there is no such log file. uint64_t PrevLogNumber() const { return prev_log_number_; } - int NumberLevels() const { return options_->num_levels; } + int NumberLevels() const { return num_levels_; } // Pick level and inputs for a new compaction. // Returns NULL if there is no compaction to be done. @@ -274,6 +284,8 @@ class VersionSet { friend class Compaction; friend class Version; + void Init(int num_levels); + void Finalize(Version* v); void GetRange(const std::vector& inputs, @@ -311,6 +323,8 @@ class VersionSet { uint64_t log_number_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted + int num_levels_; + // Opened lazily WritableFile* descriptor_file_; log::Writer* descriptor_log_; diff --git a/db/version_set_reduce_num_levels.cc b/db/version_set_reduce_num_levels.cc new file mode 100644 index 000000000..6032a4da0 --- /dev/null +++ b/db/version_set_reduce_num_levels.cc @@ -0,0 +1,70 @@ +// Copyright (c) 2012 Facebook. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "db/version_set.h" + +#include +#include +#include "db/log_reader.h" +#include "db/log_writer.h" +#include "util/logging.h" + +namespace leveldb { + +Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) { + + if(new_levels <= 1) { + return Status::InvalidArgument( + "Number of levels needs to be bigger than 1"); + } + + Version* current_version = current_; + int current_levels = NumberLevels(); + + // Make sure there are file only on one level from + // (new_levels-1) to (current_levels-1) + int first_nonempty_level = -1; + int first_nonempty_level_filenum = 0; + for (int i = new_levels - 1; i < current_levels; i++) { + int file_num = NumLevelFiles(i); + if (file_num != 0) { + if (first_nonempty_level < 0) { + first_nonempty_level = i; + first_nonempty_level_filenum = file_num; + } else { + char msg[255]; + sprintf(msg, "Found at least two levels containing files: " + "[%d:%d],[%d:%d].\n", + first_nonempty_level, first_nonempty_level_filenum, i, file_num); + return Status::InvalidArgument(msg); + } + } + } + + Status st; + std::vector* old_files_list = current_version->files_; + std::vector* new_files_list = + new std::vector[new_levels]; + for (int i = 0; i < new_levels - 1; i++) { + new_files_list[i] = old_files_list[i]; + } + + if (first_nonempty_level > 0) { + new_files_list[new_levels - 1] = old_files_list[first_nonempty_level]; + } + + delete[] current_version->files_; + current_version->files_ = new_files_list; + + delete[] compact_pointer_; + delete[] max_file_size_; + delete[] level_max_bytes_; + num_levels_ = new_levels; + compact_pointer_ = new std::string[new_levels]; + Init(new_levels); + st = LogAndApply(new VersionEdit(new_levels), mu, true); + return st; +} + +} diff --git a/tools/ldb.cc b/tools/ldb.cc index a44a713de..d594d500e 100644 --- a/tools/ldb.cc +++ b/tools/ldb.cc @@ -2,234 +2,74 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include -#include -#include -#include +#include "util/ldb_cmd.h" -#include "leveldb/db.h" -#include "leveldb/options.h" -#include "leveldb/iterator.h" -#include "leveldb/slice.h" - -std::string HexToString(const std::string& str) { - std::string parsed; - for (int i = 0; i < str.length(); ) { - int c; - sscanf(str.c_str() + i, "%2X", &c); - parsed.push_back(c); - i += 2; - } - return parsed; -} - - -static void print_usage() { - fprintf(stderr, - "ldb [compact|dump] " - "--db-path=database_path " - "[--from=START KEY] " - "[--to=END KEY ] " - "[--max_keys=[NUM] (only for dump)] " - "[--hex ] " - "[--count_only (only for dump)] " - "[--stats (only for dump) ] \n"); -} +namespace leveldb { +class LDBCommandRunner { +public: -static void safe_open_db(const std::string& dbname, leveldb::DB** db) { - leveldb::Options options; - options.create_if_missing = false; - leveldb::Status status = leveldb::DB::Open(options, dbname, db); - - if(!status.ok()) { - fprintf( - stderr, - "Could not open db at %s\nERROR: %s", - dbname.data(), - status.ToString().data() - ); - exit(1); - } -} + static void PrintHelp(const char* exec_name) { + std::string ret; + ret.append("--- compact ----:\n"); + ret.append(exec_name); + ret.append(" compact "); + Compactor::Help(ret); + ret.append("\n--- dump ----:\n"); + ret.append(exec_name); + ret.append(" dump "); + DBDumper::Help(ret); -static void dump_db( - const std::string& db_path, - std::string& start, - std::string& end, - int64_t max_keys, - const bool hex, - const bool print_stats, - const bool count_only -) { - // Parse command line args - uint64_t count = 0; + ret.append("\n---reduce_levels ----:\n"); + ret.append(exec_name); + ret.append(" reduce_levels "); + ReduceDBLevels::Help(ret); - - if (hex) { - start = HexToString(start); - end = HexToString(end); + fprintf(stderr, "%s\n", ret.c_str()); } - leveldb::DB *db; - safe_open_db(db_path, &db); - - if (print_stats) { - std::string stats; - if (db->GetProperty("leveldb.stats", &stats)) { - fprintf(stdout, "%s\n", stats.c_str()); + static void RunCommand(int argc, char** argv) { + if (argc <= 2) { + PrintHelp(argv[0]); + exit(1); } - } - - // Setup key iterator - leveldb::Iterator* iter = db->NewIterator(leveldb::ReadOptions()); - leveldb::Status status = iter->status(); - if (!status.ok()) { - fprintf(stderr, "%s\n", status.ToString().c_str()); - delete db; - exit(1); - } - - for (iter->Seek(start); iter->Valid(); iter->Next()) { - // If end marker was specified, we stop before it - if (!end.empty() && (iter->key().ToString() >= end)) - break; - - // Terminate if maximum number of keys have been dumped - if (max_keys == 0) - break; - - --max_keys; - ++count; - - if (!count_only) { - if (hex) { - std::string str = iter->key().ToString(); - for (int i = 0; i < str.length(); ++i) { - fprintf(stdout, "%X", str[i]); - } - fprintf(stdout, " ==> "); - str = iter->value().ToString(); - for (int i = 0; i < str.length(); ++i) { - fprintf(stdout, "%X", str[i]); - } - fprintf(stdout, "\n"); + const char* cmd = argv[1]; + std::string db_name; + std::vector args; + for (int i = 2; i < argc; i++) { + if (strncmp(argv[i], "--db=", strlen("--db=")) == 0) { + db_name = argv[i] + strlen("--db="); } else { - fprintf(stdout, "%s ==> %s\n", - iter->key().ToString().c_str(), - iter->value().ToString().c_str()); + args.push_back(argv[i]); } } - } - - fprintf(stdout, "Keys in range: %lld\n", (long long) count); - - // Clean up - delete iter; - delete db; -} -static void compact( - const std::string dbname, - std::string from, - std::string to, - const bool hex -) { - - leveldb::DB* db; - safe_open_db(dbname, &db); - - if(hex) { - from = HexToString(from); - to = HexToString(to); - } - - leveldb::Slice* begin = from.empty() ? NULL : new leveldb::Slice(from); - leveldb::Slice* end = to.empty() ? NULL : new leveldb::Slice(to); - db->CompactRange(begin, end); - delete db; -} - -int main(int argc, char** argv) { - - enum { - DUMP, COMPACT - } command; - - if (argc < 2) { - print_usage(); - exit(1); - } - - size_t n; - - const std::string dbnameKey = "--db-path="; - const std::string toKey = "--to="; - const std::string fromKey = "--from="; - std::string dbname; - bool dbnameFound = false; - std::string from; - std::string to; - int64_t temp; - int64_t max_keys = -1; - - bool print_stats = false; - bool count_only = false; - bool hex = false; - char junk; - - - std::string commandString = argv[1]; - if (commandString == "dump") { - command = DUMP; - } else if (commandString == "compact") { - command = COMPACT; - } else { - print_usage(); - exit(1); - } - - for (int i = 2; i < argc; i++) { - std::string param(argv[i]); - if ((n = param.find(dbnameKey)) != std::string::npos) { - dbname = param.substr(dbnameKey.size()); - dbnameFound = true; - } else if ((n = param.find(fromKey)) != std::string::npos) { - from = param.substr(fromKey.size()); - } else if ((n = param.find(toKey)) != std::string::npos) { - to = param.substr(toKey.size()); - } else if (sscanf(argv[i], "--max_keys=%ld%c", &temp, &junk) == 1) { - max_keys = temp; - } else if (strncmp(argv[i], "--stats", 7) == 0) { - print_stats = true; - } else if (strncmp(argv[i], "--count_only", 12) == 0) { - count_only = true; - } else if (strncmp(argv[i], "--hex", 5) == 0) { - hex = true; + LDBCommand* cmdObj = NULL; + if (strncmp(cmd, "compact", strlen("compact")) == 0) { + // run compactor + cmdObj = new Compactor(db_name, args); + } else if (strncmp(cmd, "dump", strlen("dump")) == 0) { + // run dump + cmdObj = new DBDumper(db_name, args); + } else if (strncmp(cmd, "reduce_levels", strlen("reduce_levels")) == 0) { + // reduce db levels + cmdObj = new ReduceDBLevels(db_name, args); } else { - print_usage(); + fprintf(stderr, "Unknown command: %s\n", cmd); + PrintHelp(argv[0]); exit(1); } - } - if (!dbnameFound || dbname.empty()) { - fprintf(stderr, "DB path required. See help\n"); - print_usage(); - exit(1); + cmdObj->Run(); + LDBCommandExecuteResult ret = cmdObj->GetExecuteState(); + fprintf(stderr, "%s\n", ret.ToString().c_str()); + delete cmdObj; } +}; - switch(command) { - case DUMP: - dump_db(dbname, from, to, max_keys, hex, print_stats, count_only); - break; - case COMPACT: - compact(dbname, from, to, hex); - break; - default: - print_usage(); - exit(1); - } +} - return 0; +int main(int argc, char** argv) { + leveldb::LDBCommandRunner::RunCommand(argc, argv); } diff --git a/tools/reduce_levels_test.cc b/tools/reduce_levels_test.cc new file mode 100644 index 000000000..6b70a54f3 --- /dev/null +++ b/tools/reduce_levels_test.cc @@ -0,0 +1,200 @@ +// Copyright (c) 2012 Facebook. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + + +#include "leveldb/db.h" +#include "db/db_impl.h" +#include "db/version_set.h" +#include "util/logging.h" +#include "util/testutil.h" +#include "util/testharness.h" +#include "util/ldb_cmd.h" + +namespace leveldb { + +class ReduceLevelTest { +public: + ReduceLevelTest() { + dbname_ = test::TmpDir() + "/db_reduce_levels_test"; + DestroyDB(dbname_, Options()); + db_ = NULL; + } + + Status OpenDB(bool create_if_missing, int levels, + int mem_table_compact_level); + + Status Put(const std::string& k, const std::string& v) { + return db_->Put(WriteOptions(), k, v); + } + + std::string Get(const std::string& k) { + ReadOptions options; + std::string result; + Status s = db_->Get(options, k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } + + Status CompactMemTable() { + if (db_ == NULL) { + return Status::InvalidArgument("DB not opened."); + } + DBImpl* db_impl = reinterpret_cast(db_); + return db_impl->TEST_CompactMemTable(); + } + + void CloseDB() { + if (db_ != NULL) { + delete db_; + db_ = NULL; + } + } + + bool ReduceLevels(int target_level); + + int FilesOnLevel(int level) { + std::string property; + ASSERT_TRUE( + db_->GetProperty("leveldb.num-files-at-level" + NumberToString(level), + &property)); + return atoi(property.c_str()); + } + +private: + std::string dbname_; + DB* db_; +}; + +Status ReduceLevelTest::OpenDB(bool create_if_missing, int num_levels, + int mem_table_compact_level) { + leveldb::Options opt; + opt.num_levels = num_levels; + opt.create_if_missing = create_if_missing; + opt.max_mem_compaction_level = mem_table_compact_level; + leveldb::Status st = leveldb::DB::Open(opt, dbname_, &db_); + if (!st.ok()) { + fprintf(stderr, "Can't open the db:%s\n", st.ToString().c_str()); + } + return st; +} + +bool ReduceLevelTest::ReduceLevels(int target_level) { + std::vector args = leveldb::ReduceDBLevels::PrepareArgs( + target_level, false); + ReduceDBLevels level_reducer(dbname_, args); + level_reducer.Run(); + return level_reducer.GetExecuteState().IsSucceed(); +} + +TEST(ReduceLevelTest, Last_Level) { + // create files on all levels; + ASSERT_OK(OpenDB(true, 4, 3)); + ASSERT_OK(Put("aaaa", "11111")); + ASSERT_OK(CompactMemTable()); + ASSERT_EQ(FilesOnLevel(3), 1); + CloseDB(); + + ASSERT_TRUE(ReduceLevels(3)); + ASSERT_OK(OpenDB(true, 3, 1)); + ASSERT_EQ(FilesOnLevel(2), 1); + CloseDB(); + + ASSERT_TRUE(ReduceLevels(2)); + ASSERT_OK(OpenDB(true, 2, 1)); + ASSERT_EQ(FilesOnLevel(1), 1); + CloseDB(); +} + +TEST(ReduceLevelTest, Top_Level) { + // create files on all levels; + ASSERT_OK(OpenDB(true, 5, 0)); + ASSERT_OK(Put("aaaa", "11111")); + ASSERT_OK(CompactMemTable()); + ASSERT_EQ(FilesOnLevel(0), 1); + CloseDB(); + + // The CompactRange(NULL, NULL) call in ReduceLevels + // will push this file to level-1 + ASSERT_TRUE(ReduceLevels(4)); + ASSERT_OK(OpenDB(true, 4, 0)); + ASSERT_EQ(FilesOnLevel(1), 1); + CloseDB(); + + ASSERT_TRUE(ReduceLevels(3)); + ASSERT_OK(OpenDB(true, 3, 0)); + ASSERT_EQ(FilesOnLevel(1), 1); + CloseDB(); + + ASSERT_TRUE(ReduceLevels(2)); + ASSERT_OK(OpenDB(true, 2, 0)); + ASSERT_EQ(FilesOnLevel(1), 1); + CloseDB(); +} + +TEST(ReduceLevelTest, All_Levels) { + // create files on all levels; + ASSERT_OK(OpenDB(true, 5, 1)); + ASSERT_OK(Put("a", "a11111")); + ASSERT_OK(CompactMemTable()); + ASSERT_EQ(FilesOnLevel(1), 1); + CloseDB(); + + ASSERT_OK(OpenDB(true, 5, 2)); + ASSERT_OK(Put("b", "b11111")); + ASSERT_OK(CompactMemTable()); + ASSERT_EQ(FilesOnLevel(1), 1); + ASSERT_EQ(FilesOnLevel(2), 1); + CloseDB(); + + ASSERT_OK(OpenDB(true, 5, 3)); + ASSERT_OK(Put("c", "c11111")); + ASSERT_OK(CompactMemTable()); + ASSERT_EQ(FilesOnLevel(1), 1); + ASSERT_EQ(FilesOnLevel(2), 1); + ASSERT_EQ(FilesOnLevel(3), 1); + CloseDB(); + + ASSERT_OK(OpenDB(true, 5, 4)); + ASSERT_OK(Put("d", "d11111")); + ASSERT_OK(CompactMemTable()); + ASSERT_EQ(FilesOnLevel(1), 1); + ASSERT_EQ(FilesOnLevel(2), 1); + ASSERT_EQ(FilesOnLevel(3), 1); + ASSERT_EQ(FilesOnLevel(4), 1); + CloseDB(); + + ASSERT_TRUE(ReduceLevels(4)); + ASSERT_OK(OpenDB(true, 4, 0)); + ASSERT_EQ("a11111", Get("a")); + ASSERT_EQ("b11111", Get("b")); + ASSERT_EQ("c11111", Get("c")); + ASSERT_EQ("d11111", Get("d")); + CloseDB(); + + ASSERT_TRUE(ReduceLevels(3)); + ASSERT_OK(OpenDB(true, 3, 0)); + ASSERT_EQ("a11111", Get("a")); + ASSERT_EQ("b11111", Get("b")); + ASSERT_EQ("c11111", Get("c")); + ASSERT_EQ("d11111", Get("d")); + CloseDB(); + + ASSERT_TRUE(ReduceLevels(2)); + ASSERT_OK(OpenDB(true, 2, 0)); + ASSERT_EQ("a11111", Get("a")); + ASSERT_EQ("b11111", Get("b")); + ASSERT_EQ("c11111", Get("c")); + ASSERT_EQ("d11111", Get("d")); + CloseDB(); +} + +} + +int main(int argc, char** argv) { + return leveldb::test::RunAllTests(); +} diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc new file mode 100644 index 000000000..a8e507990 --- /dev/null +++ b/util/ldb_cmd.cc @@ -0,0 +1,294 @@ +// Copyright (c) 2012 Facebook. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "util/ldb_cmd.h" + +namespace leveldb { + +const char* LDBCommand::FROM_ARG = "--from="; +const char* LDBCommand::END_ARG = "--to="; +const char* LDBCommand::HEX_ARG = "--hex"; + +Compactor::Compactor(std::string& db_name, std::vector& args) : + LDBCommand(db_name, args), null_from_(true), null_to_(true), hex_(false) { + for (int i = 0; i < args.size(); i++) { + std::string& arg = args.at(i); + if (arg.find(FROM_ARG) == 0) { + null_from_ = false; + from_ = arg.substr(strlen(FROM_ARG)); + } else if (arg.find(END_ARG) == 0) { + null_to_ = false; + to_ = arg.substr(strlen(END_ARG)); + } else if (arg.find(HEX_ARG) == 0) { + hex_ = true; + } else { + exec_state_ = LDBCommandExecuteResult::FAILED("Unknown argument." + arg); + } + } + + if (hex_) { + if (!null_from_) { + from_ = HexToString(from_); + } + if (!null_to_) { + to_ = HexToString(to_); + } + } +} + +void Compactor::Help(std::string& ret) { + LDBCommand::Help(ret); + ret.append("[--from=START KEY] "); + ret.append("[--to=START KEY] "); + ret.append("[--hex] "); +} + +void Compactor::DoCommand() { + + leveldb::Slice* begin = NULL; + leveldb::Slice* end = NULL; + if (!null_from_) { + begin = new leveldb::Slice(from_); + } + if (!null_to_) { + end = new leveldb::Slice(to_); + } + + db_->CompactRange(begin, end); + exec_state_ = LDBCommandExecuteResult::SUCCEED(""); + + delete begin; + delete end; +} + +const char* DBDumper::MAX_KEYS_ARG = "--max_keys="; +const char* DBDumper::COUNT_ONLY_ARG = "--count_only"; +const char* DBDumper::STATS_ARG = "--stats"; +const char* DBDumper::HEX_OUTPUT_ARG = "--output_hex"; + +DBDumper::DBDumper(std::string& db_name, std::vector& args) : + LDBCommand(db_name, args), null_from_(true), null_to_(true), hex_(false), + count_only_(false), print_stats_(false), max_keys_(-1), + hex_output_(false) { + for (int i = 0; i < args.size(); i++) { + std::string& arg = args.at(i); + if (arg.find(FROM_ARG) == 0) { + null_from_ = false; + from_ = arg.substr(strlen(FROM_ARG)); + } else if (arg.find(END_ARG) == 0) { + null_to_ = false; + to_ = arg.substr(strlen(END_ARG)); + } else if (arg.find(HEX_ARG) == 0) { + hex_ = true; + } else if (arg.find(MAX_KEYS_ARG) == 0) { + max_keys_ = atoi(arg.substr(strlen(MAX_KEYS_ARG)).c_str()); + } else if (arg.find(STATS_ARG) == 0) { + print_stats_ = true; + } else if (arg.find(COUNT_ONLY_ARG) == 0) { + count_only_ = true; + } else if (arg.find(HEX_OUTPUT_ARG) == 0) { + hex_output_ = true; + } else { + exec_state_ = LDBCommandExecuteResult::FAILED("Unknown argument:" + arg); + } + } + + if (hex_) { + if (!null_from_) { + from_ = HexToString(from_); + } + if (!null_to_) { + to_ = HexToString(to_); + } + } +} + +void DBDumper::Help(std::string& ret) { + LDBCommand::Help(ret); + ret.append("[--from=START KEY] "); + ret.append("[--to=END Key] "); + ret.append("[--hex] "); + ret.append("[--output_hex] "); + ret.append("[--max_keys=NUM] "); + ret.append("[--count_only] "); + ret.append("[--stats] "); +} + +void DBDumper::DoCommand() { + // Parse command line args + uint64_t count = 0; + if (print_stats_) { + std::string stats; + if (db_->GetProperty("leveldb.stats", &stats)) { + fprintf(stdout, "%s\n", stats.c_str()); + } + } + + // Setup key iterator + leveldb::Iterator* iter = db_->NewIterator(leveldb::ReadOptions()); + leveldb::Status st = iter->status(); + if (!st.ok()) { + exec_state_ = LDBCommandExecuteResult::FAILED("Iterator error." + + st.ToString()); + } + + if (!null_from_) { + iter->Seek(from_); + } else { + iter->SeekToFirst(); + } + + int max_keys = max_keys_; + for (; iter->Valid(); iter->Next()) { + // If end marker was specified, we stop before it + if (!null_to_ && (iter->key().ToString() >= to_)) + break; + // Terminate if maximum number of keys have been dumped + if (max_keys == 0) + break; + if (max_keys > 0) { + --max_keys; + } + ++count; + if (!count_only_) { + if (hex_output_) { + std::string str = iter->key().ToString(); + for (int i = 0; i < str.length(); ++i) { + fprintf(stdout, "%X", str[i]); + } + fprintf(stdout, " ==> "); + str = iter->value().ToString(); + for (int i = 0; i < str.length(); ++i) { + fprintf(stdout, "%X", str[i]); + } + fprintf(stdout, "\n"); + } else { + fprintf(stdout, "%s ==> %s\n", iter->key().ToString().c_str(), + iter->value().ToString().c_str()); + } + } + } + fprintf(stdout, "Keys in range: %lld\n", (long long) count); + // Clean up + delete iter; +} + + +const char* ReduceDBLevels::NEW_LEVLES_ARG = "--new_levels="; +const char* ReduceDBLevels::PRINT_OLD_LEVELS_ARG = "--print_old_levels"; + +ReduceDBLevels::ReduceDBLevels(std::string& db_name, + std::vector& args) +: LDBCommand(db_name, args), + new_levels_(-1), + print_old_levels_(false) { + for (int i = 0; i < args.size(); i++) { + std::string& arg = args.at(i); + if (arg.find(NEW_LEVLES_ARG) == 0) { + new_levels_ = atoi(arg.substr(strlen(NEW_LEVLES_ARG)).c_str()); + } else if (arg.find(PRINT_OLD_LEVELS_ARG) == 0) { + print_old_levels_ = true; + } else { + exec_state_ = LDBCommandExecuteResult::FAILED( + "Unknown argument." + arg); + } + } + + if(new_levels_ <= 0) { + exec_state_ = LDBCommandExecuteResult::FAILED( + " Use --new_levels to specify a new level number\n"); + } +} + +std::vector ReduceDBLevels::PrepareArgs(int new_levels, + bool print_old_level) { + std::vector ret; + char arg[100]; + sprintf(arg, "%s%d", NEW_LEVLES_ARG, new_levels); + ret.push_back(arg); + if(print_old_level) { + sprintf(arg, "%s", PRINT_OLD_LEVELS_ARG); + ret.push_back(arg); + } + return ret; +} + +void ReduceDBLevels::Help(std::string& msg) { + LDBCommand::Help(msg); + msg.append("[--new_levels=New number of levels] "); + msg.append("[--print_old_levels] "); +} + +leveldb::Options ReduceDBLevels::PrepareOptionsForOpenDB() { + leveldb::Options opt = LDBCommand::PrepareOptionsForOpenDB(); + // Set to a big value to make sure we can open the db + opt.num_levels = 1 << 16; + return opt; +} + +void ReduceDBLevels::DoCommand() { + if (new_levels_ <= 1) { + exec_state_ = LDBCommandExecuteResult::FAILED( + "Invalid number of levels.\n"); + return; + } + + leveldb::Status st; + + leveldb::Options opt = PrepareOptionsForOpenDB(); + if (print_old_levels_) { + TableCache* tc = new TableCache(db_path_, &opt, 10); + const InternalKeyComparator* cmp = new InternalKeyComparator( + opt.comparator); + VersionSet* versions = new VersionSet(db_path_, &opt, + tc, cmp); + // We rely the VersionSet::Recover to tell us the internal data structures + // in the db. And the Recover() should never do any change + // (like LogAndApply) to the manifest file. + st = versions->Recover(); + int max = -1; + for(int i = 0; iNumberLevels(); i++) { + if (versions->NumLevelFiles(i)) { + max = i; + } + } + fprintf(stdout, "The old number of levels in use is %d\n", max + 1); + delete versions; + + if (!st.ok()) { + exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString()); + return; + } + } + + // Compact the whole DB to put all files to the highest level. + db_->CompactRange(NULL, NULL); + CloseDB(); + + TableCache* tc = new TableCache(db_path_, &opt, 10); + const InternalKeyComparator* cmp = new InternalKeyComparator( + opt.comparator); + VersionSet* versions = new VersionSet(db_path_, &opt, + tc, cmp); + // We rely the VersionSet::Recover to tell us the internal data structures + // in the db. And the Recover() should never do any change (like LogAndApply) + // to the manifest file. + st = versions->Recover(); + if (!st.ok()) { + exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString()); + return; + } + + port::Mutex mu; + mu.Lock(); + st = versions->ReduceNumberOfLevels(new_levels_, &mu); + mu.Unlock(); + + if (!st.ok()) { + exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString()); + return; + } +} + +} diff --git a/util/ldb_cmd.h b/util/ldb_cmd.h new file mode 100644 index 000000000..dc4ab0054 --- /dev/null +++ b/util/ldb_cmd.h @@ -0,0 +1,248 @@ +// Copyright (c) 2012 Facebook. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef LEVELDB_UTIL_LDB_H_ +#define LEVELDB_UTIL_LDB_H_ + +#include +#include +#include +#include +#include +#include + +#include "leveldb/db.h" +#include "leveldb/options.h" +#include "leveldb/iterator.h" +#include "leveldb/slice.h" +#include "db/version_set.h" +#include "util/logging.h" + +namespace leveldb { + +class LDBCommandExecuteResult { +public: + enum State { + EXEC_NOT_STARTED = 0, EXEC_SUCCEED = 1, EXEC_FAILED = 2, + }; + + LDBCommandExecuteResult() { + state_ = EXEC_NOT_STARTED; + message_ = ""; + } + + LDBCommandExecuteResult(State state, std::string& msg) { + state_ = state; + message_ = msg; + } + + std::string ToString() { + std::string ret; + switch (state_) { + case EXEC_SUCCEED: + ret.append("Succeeded."); + break; + case EXEC_FAILED: + ret.append("Failed."); + break; + case EXEC_NOT_STARTED: + ret.append("Not started."); + } + if (!message_.empty()) { + ret.append(message_); + } + return ret; + } + + void Reset() { + state_ = EXEC_NOT_STARTED; + message_ = ""; + } + + bool IsSucceed() { + return state_ == EXEC_SUCCEED; + } + + bool IsNotStarted() { + return state_ == EXEC_NOT_STARTED; + } + + bool IsFailed() { + return state_ == EXEC_FAILED; + } + + static LDBCommandExecuteResult SUCCEED(std::string msg) { + return LDBCommandExecuteResult(EXEC_SUCCEED, msg); + } + + static LDBCommandExecuteResult FAILED(std::string msg) { + return LDBCommandExecuteResult(EXEC_FAILED, msg); + } + +private: + State state_; + std::string message_; + + bool operator==(const LDBCommandExecuteResult&); + bool operator!=(const LDBCommandExecuteResult&); +}; + +class LDBCommand { +public: + + /* Constructor */ + LDBCommand(std::string& db_name, std::vector& args) : + db_path_(db_name), + db_(NULL) { + } + + virtual leveldb::Options PrepareOptionsForOpenDB() { + leveldb::Options opt; + opt.create_if_missing = false; + return opt; + } + + virtual ~LDBCommand() { + if (db_ != NULL) { + delete db_; + db_ = NULL; + } + } + + /* Print the help message */ + static void Help(std::string& ret) { + ret.append("--db=DB_PATH "); + } + + /* Run the command, and return the execute result. */ + void Run() { + if (!exec_state_.IsNotStarted()) { + return; + } + + if (db_ == NULL) { + OpenDB(); + } + DoCommand(); + if (exec_state_.IsNotStarted()) { + exec_state_ = LDBCommandExecuteResult::SUCCEED(""); + } + CloseDB (); + } + + virtual void DoCommand() = 0; + + LDBCommandExecuteResult GetExecuteState() { + return exec_state_; + } + + void ClearPreviousRunState() { + exec_state_.Reset(); + } + + static std::string HexToString(const std::string& str) { + std::string parsed; + for (int i = 0; i < str.length();) { + int c; + sscanf(str.c_str() + i, "%2X", &c); + parsed.push_back(c); + i += 2; + } + return parsed; + } + +protected: + + void OpenDB() { + leveldb::Options opt = PrepareOptionsForOpenDB(); + // Open the DB. + leveldb::Status st = leveldb::DB::Open(opt, db_path_, &db_); + if (!st.ok()) { + std::string msg = st.ToString(); + exec_state_ = LDBCommandExecuteResult::FAILED(msg); + } + } + + void CloseDB () { + if (db_ != NULL) { + delete db_; + db_ = NULL; + } + } + + static const char* FROM_ARG; + static const char* END_ARG; + static const char* HEX_ARG; + LDBCommandExecuteResult exec_state_; + std::string db_path_; + leveldb::DB* db_; +}; + +class Compactor: public LDBCommand { +public: + Compactor(std::string& db_name, std::vector& args); + + virtual ~Compactor() {} + + static void Help(std::string& ret); + + virtual void DoCommand(); + +private: + bool null_from_; + std::string from_; + bool null_to_; + std::string to_; + bool hex_; +}; + +class DBDumper: public LDBCommand { +public: + DBDumper(std::string& db_name, std::vector& args); + virtual ~DBDumper() {} + static void Help(std::string& ret); + virtual void DoCommand(); +private: + bool null_from_; + std::string from_; + bool null_to_; + std::string to_; + int max_keys_; + bool count_only_; + bool print_stats_; + bool hex_; + bool hex_output_; + + static const char* MAX_KEYS_ARG; + static const char* COUNT_ONLY_ARG; + static const char* STATS_ARG; + static const char* HEX_OUTPUT_ARG; +}; + +class ReduceDBLevels : public LDBCommand { +public: + + ReduceDBLevels (std::string& db_name, std::vector& args); + + ~ReduceDBLevels() {} + + virtual leveldb::Options PrepareOptionsForOpenDB(); + + virtual void DoCommand(); + static void Help(std::string& msg); + + static std::vector PrepareArgs(int new_levels, + bool print_old_level = false); + +private: + int new_levels_; + bool print_old_levels_; + + static const char* NEW_LEVLES_ARG; + static const char* PRINT_OLD_LEVELS_ARG; +}; + +} + +#endif From f1a7c735b5c7c0add9a86fd214c71c66732de533 Mon Sep 17 00:00:00 2001 From: heyongqiang Date: Mon, 5 Nov 2012 10:30:19 -0800 Subject: [PATCH 2/4] fix complie error Summary: as subject Test Plan:n/a --- db/version_set.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/version_set.cc b/db/version_set.cc index 7d4f9392d..a8307ac88 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -796,7 +796,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, if (descriptor_log_ == NULL || new_descriptor_log) { // No reason to unlock *mu here since we only hit this path in the // first call to LogAndApply (when opening the database). - assert(descriptor_file_ == NULL || new_descriptor_log) + assert(descriptor_file_ == NULL || new_descriptor_log); new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); edit->SetNextFile(next_file_number_); s = env_->NewWritableFile(new_manifest_file, &descriptor_file_); From 5273c814835227c85fe26f724f7d5d80c8413a20 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Mon, 29 Oct 2012 01:13:41 -0700 Subject: [PATCH 3/4] Ability to invoke application hook for every key during compaction. Summary: There are certain use-cases where the application intends to delete older keys aftre they have expired a certian time period. One option for those applications is to periodically scan the entire database and delete appropriate keys. A better way is to allow the application to hook into the compaction process. This patch allows the application to set a method callback for every key that is being compacted. If this method returns true, then the key is not preserved in the output of the compaction. Test Plan: This is mostly to preview the proposed new public api. Since it is a public api, please do due diligence on reviewing it. I will be writing test cases for this api in mynext version of this patch. Reviewers: MarkCallaghan, heyongqiang Reviewed By: heyongqiang CC: sheki, adsharma Differential Revision: https://reviews.facebook.net/D6285 --- db/db_impl.cc | 19 ++++- db/db_test.cc | 169 ++++++++++++++++++++++++++++++++++++++ include/leveldb/options.h | 16 ++++ util/options.cc | 5 +- 4 files changed, 207 insertions(+), 2 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 62f8592ca..409b877ac 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1099,6 +1099,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } Slice key = input->key(); + Slice value = input->value(); + Slice* compaction_filter_value = NULL; if (compact->compaction->ShouldStopBefore(key) && compact->builder != NULL) { status = FinishCompactionOutputFile(compact, input); @@ -1138,6 +1140,21 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // few iterations of this loop (by rule (A) above). // Therefore this deletion marker is obsolete and can be dropped. drop = true; + } else if (options_.CompactionFilter != NULL && + ikey.type != kTypeDeletion && + ikey.sequence < compact->smallest_snapshot) { + // If the user has specified a compaction filter, then invoke + // it. If this key is not visible via any snapshot and the + // return value of the compaction filter is true and then + // drop this key from the output. + drop = options_.CompactionFilter(compact->compaction->level(), + ikey.user_key, value, &compaction_filter_value); + + // If the application wants to change the value, then do so here. + if (compaction_filter_value != NULL) { + value = *compaction_filter_value; + delete compaction_filter_value; + } } last_sequence_for_key = ikey.sequence; @@ -1164,7 +1181,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); - compact->builder->Add(key, input->value()); + compact->builder->Add(key, value); // Close output file if it is big enough if (compact->builder->FileSize() >= diff --git a/db/db_test.cc b/db/db_test.cc index 491e88072..7a2256e49 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1195,6 +1195,175 @@ TEST(DBTest, RepeatedWritesToSameKey) { } } +// This is a static filter used for filtering +// kvs during the compaction process. +static int cfilter_count; +static std::string NEW_VALUE = "NewValue"; +static bool keep_filter(int level, const Slice& key, + const Slice& value, Slice** new_value) { + cfilter_count++; + return false; +} +static bool delete_filter(int level, const Slice& key, + const Slice& value, Slice** new_value) { + cfilter_count++; + return true; +} +static bool change_filter(int level, const Slice& key, + const Slice& value, Slice** new_value) { + assert(new_value != NULL); + *new_value = new Slice(NEW_VALUE); + return false; +} + +TEST(DBTest, CompactionFilter) { + Options options = CurrentOptions(); + options.num_levels = 3; + options.max_mem_compaction_level = 0; + options.CompactionFilter = keep_filter; + Reopen(&options); + + // Write 100K+1 keys, these are written to a few files + // in L0. We do this so that the current snapshot points + // to the 100001 key.The compaction filter is not invoked + // on keys that are visible via a snapshot because we + // anyways cannot delete it. + const std::string value(10, 'x'); + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + Put(key, value); + } + dbfull()->TEST_CompactMemTable(); + + // Push all files to the highest level L2. Verify that + // the compaction is each level invokes the filter for + // all the keys in that level. + cfilter_count = 0; + dbfull()->TEST_CompactRange(0, NULL, NULL); + ASSERT_EQ(cfilter_count, 100000); + cfilter_count = 0; + dbfull()->TEST_CompactRange(1, NULL, NULL); + ASSERT_EQ(cfilter_count, 100000); + + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + ASSERT_NE(NumTableFilesAtLevel(2), 0); + cfilter_count = 0; + + // overwrite all the 100K+1 keys once again. + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + Put(key, value); + } + dbfull()->TEST_CompactMemTable(); + + // push all files to the highest level L2. This + // means that all keys should pass at least once + // via the compaction filter + cfilter_count = 0; + dbfull()->TEST_CompactRange(0, NULL, NULL); + ASSERT_EQ(cfilter_count, 100000); + cfilter_count = 0; + dbfull()->TEST_CompactRange(1, NULL, NULL); + ASSERT_EQ(cfilter_count, 100000); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + ASSERT_NE(NumTableFilesAtLevel(2), 0); + + // create a new database with the compaction + // filter in such a way that it deletes all keys + options.CompactionFilter = delete_filter; + options.create_if_missing = true; + DestroyAndReopen(&options); + + // write all the keys once again. + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + Put(key, value); + } + dbfull()->TEST_CompactMemTable(); + ASSERT_NE(NumTableFilesAtLevel(0), 0); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + ASSERT_EQ(NumTableFilesAtLevel(2), 0); + + // Push all files to the highest level L2. This + // triggers the compaction filter to delete all keys, + // verify that at the end of the compaction process, + // nothing is left. + cfilter_count = 0; + dbfull()->TEST_CompactRange(0, NULL, NULL); + ASSERT_EQ(cfilter_count, 100000); + cfilter_count = 0; + dbfull()->TEST_CompactRange(1, NULL, NULL); + ASSERT_EQ(cfilter_count, 0); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + + // Scan the entire database to ensure that only the + // 100001th key is left in the db. The 100001th key + // is part of the default-most-current snapshot and + // cannot be deleted. + Iterator* iter = db_->NewIterator(ReadOptions()); + iter->SeekToFirst(); + int count = 0; + while (iter->Valid()) { + count++; + iter->Next(); + } + ASSERT_EQ(count, 1); + delete iter; +} + +TEST(DBTest, CompactionFilterWithValueChange) { + Options options = CurrentOptions(); + options.num_levels = 3; + options.max_mem_compaction_level = 0; + options.CompactionFilter = change_filter; + Reopen(&options); + + // Write 100K+1 keys, these are written to a few files + // in L0. We do this so that the current snapshot points + // to the 100001 key.The compaction filter is not invoked + // on keys that are visible via a snapshot because we + // anyways cannot delete it. + const std::string value(10, 'x'); + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + Put(key, value); + } + + // push all files to lower levels + dbfull()->TEST_CompactMemTable(); + dbfull()->TEST_CompactRange(0, NULL, NULL); + dbfull()->TEST_CompactRange(1, NULL, NULL); + + // re-write all data again + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + Put(key, value); + } + + // push all files to lower levels. This should + // invoke the compaction filter for all 100000 keys. + dbfull()->TEST_CompactMemTable(); + dbfull()->TEST_CompactRange(0, NULL, NULL); + dbfull()->TEST_CompactRange(1, NULL, NULL); + + // verify that all keys now have the new value that + // was set by the compaction process. + for (int i = 0; i < 100000; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + std::string newvalue = Get(key); + ASSERT_EQ(newvalue.compare(NEW_VALUE), 0); + } +} + TEST(DBTest, SparseMerge) { Options options = CurrentOptions(); options.compression = kNoCompression; diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 530d8d6d3..cf7f84ee7 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -8,6 +8,7 @@ #include #include #include +#include "leveldb/slice.h" namespace leveldb { @@ -299,6 +300,21 @@ struct Options { Options(); void Dump(Logger * log) const; + + // This method allows an application to modify/delete a key-value at + // the time of compaction. The compaction process invokes this + // method for every kv that is being compacted. A return value + // of false indicates that the kv should be preserved in the + // output of this compaction run and a return value of true + // indicates that this key-value should be removed from the + // output of the compaction. The application can inspect + // the existing value of the key, modify it if needed and + // return back the new value for this key. The application + // should allocate memory for the Slice object that is used to + // return the new value and the leveldb framework will + // free up that memory. + bool (*CompactionFilter)(int level, const Slice& key, + const Slice& existing_value, Slice** new_value); }; // Options that control read operations diff --git a/util/options.cc b/util/options.cc index 765b72e5e..0ade9600d 100644 --- a/util/options.cc +++ b/util/options.cc @@ -47,7 +47,8 @@ Options::Options() table_cache_numshardbits(4), max_log_file_size(0), delete_obsolete_files_period_micros(0), - rate_limit(0.0) { + rate_limit(0.0), + CompactionFilter(NULL) { } void @@ -123,6 +124,8 @@ Options::Dump( delete_obsolete_files_period_micros); Log(log," Options.rate_limit: %.2f", rate_limit); + Log(log," Options.CompactionFilter: %p", + CompactionFilter); } // Options::Dump From cb7a00227ff3565000af4859664c77224106dcb5 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Sun, 4 Nov 2012 23:47:06 -0800 Subject: [PATCH 4/4] The method GetOverlappingInputs should use binary search. Summary: The method Version::GetOverlappingInputs used a sequential search to map a kay-range to a set of files. But the files are arranged in ascending order of key, so a biary search is more effective. This patch implements Version::GetOverlappingInputsBinarySearch that finds one file that corresponds to the specified key range and then iterates backwards and forwards to find all overlapping files. This patch is critical for making compactions efficient, especially when there are thousands of files in a single level. I measured that 1000 iterations of TEST_MaxNextLevelOverlappingBytes takes 16000 microseconds without this patch. With this patch, the same method takes about 4600 microseconds. Test Plan: Almost all unit tests in db_test uses this method to lookup keys. Reviewers: heyongqiang Reviewed By: heyongqiang CC: MarkCallaghan, emayanke, sheki Differential Revision: https://reviews.facebook.net/D6465 --- db/version_set.cc | 82 +++++++++++++++++++++++++++++++++++++++++++++++ db/version_set.h | 13 ++++++++ 2 files changed, 95 insertions(+) diff --git a/db/version_set.cc b/db/version_set.cc index a8307ac88..28f741973 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -467,6 +467,10 @@ void Version::GetOverlappingInputs( user_end = end->user_key(); } const Comparator* user_cmp = vset_->icmp_.user_comparator(); + if (begin != NULL && end != NULL && level > 0) { + GetOverlappingInputsBinarySearch(level, user_begin, user_end, inputs); + return; + } for (size_t i = 0; i < files_[level].size(); ) { FileMetaData* f = files_[level][i++]; const Slice file_start = f->smallest.user_key(); @@ -494,6 +498,84 @@ void Version::GetOverlappingInputs( } } +// Store in "*inputs" all files in "level" that overlap [begin,end] +// Employ binary search to find at least one file that overlaps the +// specified range. From that file, iterate backwards and +// forwards to find all overlapping files. +void Version::GetOverlappingInputsBinarySearch( + int level, + const Slice& user_begin, + const Slice& user_end, + std::vector* inputs) { + assert(level > 0); + int min = 0; + int mid = 0; + int max = files_[level].size() -1; + bool foundOverlap = false; + const Comparator* user_cmp = vset_->icmp_.user_comparator(); + while (min <= max) { + mid = (min + max)/2; + FileMetaData* f = files_[level][mid]; + const Slice file_start = f->smallest.user_key(); + const Slice file_limit = f->largest.user_key(); + if (user_cmp->Compare(file_limit, user_begin) < 0) { + min = mid + 1; + } else if (user_cmp->Compare(user_end, file_start) < 0) { + max = mid - 1; + } else { + foundOverlap = true; + break; + } + } + + // If there were no overlapping files, return immediately. + if (!foundOverlap) { + return; + } + ExtendOverlappingInputs(level, user_begin, user_end, inputs, mid); +} + +// Store in "*inputs" all files in "level" that overlap [begin,end] +// The midIndex specifies the index of at least one file that +// overlaps the specified range. From that file, iterate backward +// and forward to find all overlapping files. +void Version::ExtendOverlappingInputs( + int level, + const Slice& user_begin, + const Slice& user_end, + std::vector* inputs, + int midIndex) { + + // assert that the file at midIndex overlaps with the range + const Comparator* user_cmp = vset_->icmp_.user_comparator(); + assert(midIndex < files_[level].size()); + assert((user_cmp->Compare(files_[level][midIndex]->largest.user_key(), + user_begin) >= 0) || + (user_cmp->Compare(files_[level][midIndex]->smallest.user_key(), + user_end) <= 0)); + + // check backwards from 'mid' to lower indices + for (size_t i = midIndex; i < files_[level].size(); i--) { + FileMetaData* f = files_[level][i]; + const Slice file_limit = f->largest.user_key(); + if (user_cmp->Compare(file_limit, user_begin) >= 0) { + inputs->insert(inputs->begin(), f); // insert into beginning of vector + } else { + break; + } + } + // check forward from 'mid+1' to higher indices + for (size_t i = midIndex+1; i < files_[level].size(); i++) { + FileMetaData* f = files_[level][i]; + const Slice file_start = f->smallest.user_key(); + if (user_cmp->Compare(file_start, user_end) <= 0) { + inputs->push_back(f); // insert into end of vector + } else { + break; + } + } +} + std::string Version::DebugString() const { std::string r; for (int level = 0; level < vset_->NumberLevels(); level++) { diff --git a/db/version_set.h b/db/version_set.h index 79323ecb4..794163fea 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -89,6 +89,19 @@ class Version { const InternalKey* end, // NULL means after all keys std::vector* inputs); + void GetOverlappingInputsBinarySearch( + int level, + const Slice& begin, // NULL means before all keys + const Slice& end, // NULL means after all keys + std::vector* inputs); + + void ExtendOverlappingInputs( + int level, + const Slice& begin, // NULL means before all keys + const Slice& end, // NULL means after all keys + std::vector* inputs, + int index); // start extending from this index + // Returns true iff some file in the specified level overlaps // some part of [*smallest_user_key,*largest_user_key]. // smallest_user_key==NULL represents a key smaller than all keys in the DB.