disable size compaction in ldb reduce_levels and added compression and file size parameter to it

Summary:
disable size compaction in ldb reduce_levels, this will avoid compactions rather than the manual comapction,

added --compression=none|snappy|zlib|bzip2 and --file_size= per-file size to ldb reduce_levels command

Test Plan: run ldb

Reviewers: dhruba, MarkCallaghan

Reviewed By: dhruba

CC: sheki, emayanke

Differential Revision: https://reviews.facebook.net/D6597
main
heyongqiang 12 years ago
parent e00c709661
commit 20d18a89a3
  1. 6
      db/db_bench.cc
  2. 2
      db/db_impl.cc
  3. 4
      db/version_set_reduce_num_levels.cc
  4. 2
      include/leveldb/options.h
  5. 6
      tools/db_stress.cc
  6. 92
      util/ldb_cmd.cc
  7. 25
      util/ldb_cmd.h
  8. 2
      util/options.cc

@ -148,7 +148,7 @@ static int FLAGS_target_file_size_base = 2 * 1048576;
static int FLAGS_target_file_size_multiplier = 1; static int FLAGS_target_file_size_multiplier = 1;
// Max bytes for level-1 // Max bytes for level-1
static int FLAGS_max_bytes_for_level_base = 10 * 1048576; static uint64_t FLAGS_max_bytes_for_level_base = 10 * 1048576;
// A multiplier to compute max bytes for level-N // A multiplier to compute max bytes for level-N
static int FLAGS_max_bytes_for_level_multiplier = 10; static int FLAGS_max_bytes_for_level_multiplier = 10;
@ -1340,8 +1340,8 @@ int main(int argc, char** argv) {
&n, &junk) == 1) { &n, &junk) == 1) {
FLAGS_target_file_size_multiplier = n; FLAGS_target_file_size_multiplier = n;
} else if ( } else if (
sscanf(argv[i], "--max_bytes_for_level_base=%d%c", &n, &junk) == 1) { sscanf(argv[i], "--max_bytes_for_level_base=%ld%c", &l, &junk) == 1) {
FLAGS_max_bytes_for_level_base = n; FLAGS_max_bytes_for_level_base = l;
} else if (sscanf(argv[i], "--max_bytes_for_level_multiplier=%d%c", } else if (sscanf(argv[i], "--max_bytes_for_level_multiplier=%d%c",
&n, &junk) == 1) { &n, &junk) == 1) {
FLAGS_max_bytes_for_level_multiplier = n; FLAGS_max_bytes_for_level_multiplier = n;

@ -230,7 +230,7 @@ DBImpl::~DBImpl() {
if (flush_on_destroy_) { if (flush_on_destroy_) {
FlushMemTable(FlushOptions()); FlushMemTable(FlushOptions());
} }
mutex_.Lock(); mutex_.Lock();
shutting_down_.Release_Store(this); // Any non-NULL value is ok shutting_down_.Release_Store(this); // Any non-NULL value is ok
while (bg_compaction_scheduled_ || bg_logstats_scheduled_) { while (bg_compaction_scheduled_ || bg_logstats_scheduled_) {
bg_cv_.Wait(); bg_cv_.Wait();

@ -22,6 +22,10 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) {
Version* current_version = current_; Version* current_version = current_;
int current_levels = NumberLevels(); int current_levels = NumberLevels();
if (current_levels <= new_levels) {
return Status::OK();
}
// Make sure there are file only on one level from // Make sure there are file only on one level from
// (new_levels-1) to (current_levels-1) // (new_levels-1) to (current_levels-1)
int first_nonempty_level = -1; int first_nonempty_level = -1;

@ -221,7 +221,7 @@ struct Options {
// by default 'max_bytes_for_level_base' is 10MB. // by default 'max_bytes_for_level_base' is 10MB.
int max_bytes_for_level_base; uint64_t max_bytes_for_level_base;
// by default 'max_bytes_for_level_base' is 10. // by default 'max_bytes_for_level_base' is 10.
int max_bytes_for_level_multiplier; int max_bytes_for_level_multiplier;

@ -87,7 +87,7 @@ static int FLAGS_target_file_size_base = 64 * KB;
static int FLAGS_target_file_size_multiplier = 1; static int FLAGS_target_file_size_multiplier = 1;
// Max bytes for level-0 // Max bytes for level-0
static int FLAGS_max_bytes_for_level_base = 256 * KB; static uint64_t FLAGS_max_bytes_for_level_base = 256 * KB;
// A multiplier to compute max bytes for level-N // A multiplier to compute max bytes for level-N
static int FLAGS_max_bytes_for_level_multiplier = 2; static int FLAGS_max_bytes_for_level_multiplier = 2;
@ -783,8 +783,8 @@ int main(int argc, char** argv) {
&n, &junk) == 1) { &n, &junk) == 1) {
FLAGS_target_file_size_multiplier = n; FLAGS_target_file_size_multiplier = n;
} else if ( } else if (
sscanf(argv[i], "--max_bytes_for_level_base=%d%c", &n, &junk) == 1) { sscanf(argv[i], "--max_bytes_for_level_base=%ld%c", &l, &junk) == 1) {
FLAGS_max_bytes_for_level_base = n; FLAGS_max_bytes_for_level_base = l;
} else if (sscanf(argv[i], "--max_bytes_for_level_multiplier=%d%c", } else if (sscanf(argv[i], "--max_bytes_for_level_multiplier=%d%c",
&n, &junk) == 1) { &n, &junk) == 1) {
FLAGS_max_bytes_for_level_multiplier = n; FLAGS_max_bytes_for_level_multiplier = n;

@ -182,18 +182,39 @@ void DBDumper::DoCommand() {
const char* ReduceDBLevels::NEW_LEVLES_ARG = "--new_levels="; const char* ReduceDBLevels::NEW_LEVLES_ARG = "--new_levels=";
const char* ReduceDBLevels::PRINT_OLD_LEVELS_ARG = "--print_old_levels"; const char* ReduceDBLevels::PRINT_OLD_LEVELS_ARG = "--print_old_levels";
const char* ReduceDBLevels::COMPRESSION_TYPE_ARG = "--compression=";
const char* ReduceDBLevels::FILE_SIZE_ARG = "--file_size=";
ReduceDBLevels::ReduceDBLevels(std::string& db_name, ReduceDBLevels::ReduceDBLevels(std::string& db_name,
std::vector<std::string>& args) std::vector<std::string>& args)
: LDBCommand(db_name, args), : LDBCommand(db_name, args),
old_levels_(1 << 16),
new_levels_(-1), new_levels_(-1),
print_old_levels_(false) { print_old_levels_(false) {
file_size_ = leveldb::Options().target_file_size_base;
compression_ = leveldb::Options().compression;
for (unsigned int i = 0; i < args.size(); i++) { for (unsigned int i = 0; i < args.size(); i++) {
std::string& arg = args.at(i); std::string& arg = args.at(i);
if (arg.find(NEW_LEVLES_ARG) == 0) { if (arg.find(NEW_LEVLES_ARG) == 0) {
new_levels_ = atoi(arg.substr(strlen(NEW_LEVLES_ARG)).c_str()); new_levels_ = atoi(arg.substr(strlen(NEW_LEVLES_ARG)).c_str());
} else if (arg.find(PRINT_OLD_LEVELS_ARG) == 0) { } else if (arg.find(PRINT_OLD_LEVELS_ARG) == 0) {
print_old_levels_ = true; print_old_levels_ = true;
} else if (arg.find(COMPRESSION_TYPE_ARG) == 0) {
const char* type = arg.substr(strlen(COMPRESSION_TYPE_ARG)).c_str();
if (!strcasecmp(type, "none"))
compression_ = leveldb::kNoCompression;
else if (!strcasecmp(type, "snappy"))
compression_ = leveldb::kSnappyCompression;
else if (!strcasecmp(type, "zlib"))
compression_ = leveldb::kZlibCompression;
else if (!strcasecmp(type, "bzip2"))
compression_ = leveldb::kBZip2Compression;
else
exec_state_ = LDBCommandExecuteResult::FAILED(
"Invalid compression arg : " + arg);
} else if (arg.find(FILE_SIZE_ARG) == 0) {
file_size_ = atoi(arg.substr(strlen(FILE_SIZE_ARG)).c_str());
} else { } else {
exec_state_ = LDBCommandExecuteResult::FAILED( exec_state_ = LDBCommandExecuteResult::FAILED(
"Unknown argument." + arg); "Unknown argument." + arg);
@ -223,15 +244,45 @@ void ReduceDBLevels::Help(std::string& msg) {
LDBCommand::Help(msg); LDBCommand::Help(msg);
msg.append("[--new_levels=New number of levels] "); msg.append("[--new_levels=New number of levels] ");
msg.append("[--print_old_levels] "); msg.append("[--print_old_levels] ");
msg.append("[--compression=none|snappy|zlib|bzip2] ");
msg.append("[--file_size= per-file size] ");
} }
leveldb::Options ReduceDBLevels::PrepareOptionsForOpenDB() { leveldb::Options ReduceDBLevels::PrepareOptionsForOpenDB() {
leveldb::Options opt = LDBCommand::PrepareOptionsForOpenDB(); leveldb::Options opt = LDBCommand::PrepareOptionsForOpenDB();
// Set to a big value to make sure we can open the db opt.num_levels = old_levels_;
opt.num_levels = 1 << 16; // Disable size compaction
opt.max_bytes_for_level_base = 1 << 60;
opt.max_bytes_for_level_multiplier = 1;
opt.max_mem_compaction_level = 0;
return opt; return opt;
} }
Status ReduceDBLevels::GetOldNumOfLevels(leveldb::Options& opt, int* levels) {
TableCache* tc = new TableCache(db_path_, &opt, 10);
const InternalKeyComparator* cmp = new InternalKeyComparator(
opt.comparator);
VersionSet* versions = new VersionSet(db_path_, &opt,
tc, cmp);
// We rely the VersionSet::Recover to tell us the internal data structures
// in the db. And the Recover() should never do any change
// (like LogAndApply) to the manifest file.
Status st = versions->Recover();
if (!st.ok()) {
return st;
}
int max = -1;
for (int i = 0; i < versions->NumberLevels(); i++) {
if (versions->NumLevelFiles(i)) {
max = i;
}
}
*levels = max + 1;
delete versions;
return st;
}
void ReduceDBLevels::DoCommand() { void ReduceDBLevels::DoCommand() {
if (new_levels_ <= 1) { if (new_levels_ <= 1) {
exec_state_ = LDBCommandExecuteResult::FAILED( exec_state_ = LDBCommandExecuteResult::FAILED(
@ -240,34 +291,27 @@ void ReduceDBLevels::DoCommand() {
} }
leveldb::Status st; leveldb::Status st;
leveldb::Options opt = PrepareOptionsForOpenDB(); leveldb::Options opt = PrepareOptionsForOpenDB();
int old_level_num = -1;
st = GetOldNumOfLevels(opt, &old_level_num);
if (!st.ok()) {
exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString());
return;
}
if (print_old_levels_) { if (print_old_levels_) {
TableCache* tc = new TableCache(db_path_, &opt, 10); fprintf(stdout, "The old number of levels in use is %d\n", old_level_num);
const InternalKeyComparator* cmp = new InternalKeyComparator( }
opt.comparator);
VersionSet* versions = new VersionSet(db_path_, &opt,
tc, cmp);
// We rely the VersionSet::Recover to tell us the internal data structures
// in the db. And the Recover() should never do any change
// (like LogAndApply) to the manifest file.
st = versions->Recover();
int max = -1;
for(int i = 0; i<versions->NumberLevels(); i++) {
if (versions->NumLevelFiles(i)) {
max = i;
}
}
fprintf(stdout, "The old number of levels in use is %d\n", max + 1);
delete versions;
if (!st.ok()) { if (old_level_num <= new_levels_) {
exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString()); return;
return;
}
} }
old_levels_ = old_level_num;
OpenDB();
// Compact the whole DB to put all files to the highest level. // Compact the whole DB to put all files to the highest level.
fprintf(stdout, "Compacting the db...\n");
db_->CompactRange(NULL, NULL); db_->CompactRange(NULL, NULL);
CloseDB(); CloseDB();

@ -103,6 +103,10 @@ public:
return opt; return opt;
} }
virtual bool NoDBOpen() {
return false;
}
virtual ~LDBCommand() { virtual ~LDBCommand() {
if (db_ != NULL) { if (db_ != NULL) {
delete db_; delete db_;
@ -121,14 +125,18 @@ public:
return; return;
} }
if (db_ == NULL) { if (db_ == NULL && !NoDBOpen()) {
OpenDB(); OpenDB();
} }
DoCommand(); DoCommand();
if (exec_state_.IsNotStarted()) { if (exec_state_.IsNotStarted()) {
exec_state_ = LDBCommandExecuteResult::SUCCEED(""); exec_state_ = LDBCommandExecuteResult::SUCCEED("");
} }
CloseDB ();
if (db_ != NULL) {
CloseDB ();
}
} }
virtual void DoCommand() = 0; virtual void DoCommand() = 0;
@ -230,17 +238,28 @@ public:
virtual leveldb::Options PrepareOptionsForOpenDB(); virtual leveldb::Options PrepareOptionsForOpenDB();
virtual void DoCommand(); virtual void DoCommand();
static void Help(std::string& msg);
virtual bool NoDBOpen() {
return true;
}
static void Help(std::string& msg);
static std::vector<std::string> PrepareArgs(int new_levels, static std::vector<std::string> PrepareArgs(int new_levels,
bool print_old_level = false); bool print_old_level = false);
private: private:
int old_levels_;
int new_levels_; int new_levels_;
bool print_old_levels_; bool print_old_levels_;
int file_size_;
enum leveldb::CompressionType compression_;
static const char* NEW_LEVLES_ARG; static const char* NEW_LEVLES_ARG;
static const char* PRINT_OLD_LEVELS_ARG; static const char* PRINT_OLD_LEVELS_ARG;
static const char* COMPRESSION_TYPE_ARG;
static const char* FILE_SIZE_ARG;
Status GetOldNumOfLevels(leveldb::Options& opt, int* levels);
}; };
} }

@ -104,7 +104,7 @@ Options::Dump(
target_file_size_base); target_file_size_base);
Log(log," Options.target_file_size_multiplier: %d", Log(log," Options.target_file_size_multiplier: %d",
target_file_size_multiplier); target_file_size_multiplier);
Log(log," Options.max_bytes_for_level_base: %d", Log(log," Options.max_bytes_for_level_base: %ld",
max_bytes_for_level_base); max_bytes_for_level_base);
Log(log," Options.max_bytes_for_level_multiplier: %d", Log(log," Options.max_bytes_for_level_multiplier: %d",
max_bytes_for_level_multiplier); max_bytes_for_level_multiplier);

Loading…
Cancel
Save