Make VersionSet::ReduceNumberOfLevels() static

Summary:
A lot of our code implicitly assumes number_levels to be static. ReduceNumberOfLevels() breaks that assumption. For example, after calling ReduceNumberOfLevels(), DBImpl::NumberLevels() will be different from VersionSet::NumberLevels(). This is dangerous. Thankfully, it's not in public headers and is only used from LDB cmd tool. LDB tool is only using it statically, i.e. it never calls it with running DB instance. With this diff, we make it explicitly static. This way, we can assume number_levels to be immutable and not break assumption that lot of our code is relying upon. LDB tool can still use the method.

Also, I removed the method from a separate file since it breaks filename completition. version_se<TAB> now completes to "version_set." instead of "version_set" (without the dot). I don't see a big reason that the function should be in a different file.

Test Plan: reduce_levels_test

Reviewers: dhruba, haobo, kailiu, sdong

Reviewed By: kailiu

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15303
main
Igor Canadi 11 years ago
parent c583157d49
commit 677fee27c6
  1. 8
      db/compaction_picker.cc
  2. 5
      db/compaction_picker.h
  3. 69
      db/version_set.cc
  4. 8
      db/version_set.h
  5. 77
      db/version_set_reduce_num_levels.cc
  6. 17
      util/ldb_cmd.cc

@ -45,15 +45,7 @@ CompactionPicker::CompactionPicker(const Options* options,
options_(options),
num_levels_(options->num_levels),
icmp_(icmp) {
Init();
}
void CompactionPicker::ReduceNumberOfLevels(int new_levels) {
num_levels_ = new_levels;
Init();
}
void CompactionPicker::Init() {
max_file_size_.reset(new uint64_t[NumberLevels()]);
level_max_bytes_.reset(new uint64_t[NumberLevels()]);
int target_file_size_multiplier = options_->target_file_size_multiplier;

@ -27,9 +27,6 @@ class CompactionPicker {
CompactionPicker(const Options* options, const InternalKeyComparator* icmp);
virtual ~CompactionPicker();
// See VersionSet::ReduceNumberOfLevels()
void ReduceNumberOfLevels(int new_levels);
// Pick level and inputs for a new compaction.
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
@ -120,8 +117,6 @@ class CompactionPicker {
const Options* const options_;
private:
void Init();
int num_levels_;
const InternalKeyComparator* const icmp_;

@ -1730,6 +1730,75 @@ Status VersionSet::Recover() {
return s;
}
Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
const Options* options,
const EnvOptions& storage_options,
int new_levels) {
if (new_levels <= 1) {
return Status::InvalidArgument(
"Number of levels needs to be bigger than 1");
}
const InternalKeyComparator cmp(options->comparator);
TableCache tc(dbname, options, storage_options, 10);
VersionSet versions(dbname, options, storage_options, &tc, &cmp);
Status status;
status = versions.Recover();
if (!status.ok()) {
return status;
}
Version* current_version = versions.current();
int current_levels = current_version->NumberLevels();
if (current_levels <= new_levels) {
return Status::OK();
}
// Make sure there are file only on one level from
// (new_levels-1) to (current_levels-1)
int first_nonempty_level = -1;
int first_nonempty_level_filenum = 0;
for (int i = new_levels - 1; i < current_levels; i++) {
int file_num = current_version->NumLevelFiles(i);
if (file_num != 0) {
if (first_nonempty_level < 0) {
first_nonempty_level = i;
first_nonempty_level_filenum = file_num;
} else {
char msg[255];
snprintf(msg, sizeof(msg),
"Found at least two levels containing files: "
"[%d:%d],[%d:%d].\n",
first_nonempty_level, first_nonempty_level_filenum, i,
file_num);
return Status::InvalidArgument(msg);
}
}
}
std::vector<FileMetaData*>* old_files_list = current_version->files_;
std::vector<FileMetaData*>* new_files_list =
new std::vector<FileMetaData*>[new_levels];
for (int i = 0; i < new_levels - 1; i++) {
new_files_list[i] = old_files_list[i];
}
if (first_nonempty_level > 0) {
new_files_list[new_levels - 1] = old_files_list[first_nonempty_level];
}
delete[] current_version->files_;
current_version->files_ = new_files_list;
current_version->num_levels_ = new_levels;
VersionEdit ve;
port::Mutex dummy_mutex;
MutexLock l(&dummy_mutex);
return versions.LogAndApply(&ve, &dummy_mutex, true);
}
Status VersionSet::DumpManifest(Options& options, std::string& dscname,
bool verbose, bool hex) {
struct LogReporter : public log::Reader::Reporter {

@ -283,10 +283,16 @@ class VersionSet {
// Try to reduce the number of levels. This call is valid when
// only one level from the new max level to the old
// max level containing files.
// The call is static, since number of levels is immutable during
// the lifetime of a RocksDB instance. It reduces number of levels
// in a DB by applying changes to manifest.
// For example, a db currently has 7 levels [0-6], and a call to
// to reduce to 5 [0-4] can only be executed when only one level
// among [4-6] contains files.
Status ReduceNumberOfLevels(int new_levels, port::Mutex* mu);
static Status ReduceNumberOfLevels(const std::string& dbname,
const Options* options,
const EnvOptions& storage_options,
int new_levels);
// Return the current version.
Version* current() const { return current_; }

@ -1,77 +0,0 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2012 Facebook. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "db/version_set.h"
#include <algorithm>
#include <stdio.h>
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "util/logging.h"
namespace rocksdb {
Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) {
if(new_levels <= 1) {
return Status::InvalidArgument(
"Number of levels needs to be bigger than 1");
}
Version* current_version = current_;
int current_levels = current_version->NumberLevels();
if (current_levels <= new_levels) {
return Status::OK();
}
// Make sure there are file only on one level from
// (new_levels-1) to (current_levels-1)
int first_nonempty_level = -1;
int first_nonempty_level_filenum = 0;
for (int i = new_levels - 1; i < current_levels; i++) {
int file_num = current_version->NumLevelFiles(i);
if (file_num != 0) {
if (first_nonempty_level < 0) {
first_nonempty_level = i;
first_nonempty_level_filenum = file_num;
} else {
char msg[255];
sprintf(msg, "Found at least two levels containing files: "
"[%d:%d],[%d:%d].\n",
first_nonempty_level, first_nonempty_level_filenum, i, file_num);
return Status::InvalidArgument(msg);
}
}
}
Status st;
std::vector<FileMetaData*>* old_files_list = current_version->files_;
std::vector<FileMetaData*>* new_files_list =
new std::vector<FileMetaData*>[new_levels];
for (int i = 0; i < new_levels - 1; i++) {
new_files_list[i] = old_files_list[i];
}
if (first_nonempty_level > 0) {
new_files_list[new_levels - 1] = old_files_list[first_nonempty_level];
}
delete[] current_version->files_;
current_version->files_ = new_files_list;
current_version->num_levels_ = new_levels;
num_levels_ = new_levels;
compaction_picker_->ReduceNumberOfLevels(new_levels);
VersionEdit ve;
st = LogAndApply(&ve, mu, true);
return st;
}
}

@ -1069,23 +1069,8 @@ void ReduceDBLevelsCommand::DoCommand() {
CloseDB();
EnvOptions soptions;
TableCache tc(db_path_, &opt, soptions, 10);
const InternalKeyComparator cmp(opt.comparator);
VersionSet versions(db_path_, &opt, soptions, &tc, &cmp);
// We rely the VersionSet::Recover to tell us the internal data structures
// in the db. And the Recover() should never do any change (like LogAndApply)
// to the manifest file.
st = versions.Recover();
if (!st.ok()) {
exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString());
return;
}
port::Mutex mu;
mu.Lock();
st = versions.ReduceNumberOfLevels(new_levels_, &mu);
mu.Unlock();
st = VersionSet::ReduceNumberOfLevels(db_path_, &opt, soptions, new_levels_);
if (!st.ok()) {
exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString());
return;

Loading…
Cancel
Save