Allow DB reopen with reduced options.num_levels

Summary:
Allow user to reduce number of levels in LSM by issue a full CompactRange() and put the result in a lower level, and then reopen DB with reduced options.num_levels. Previous this will fail on reopen on when recovery replaying the previous MANIFEST and found a historical file was on a higher level than the new options.num_levels. The workaround was after CompactRange(), reopen the DB with old num_levels, which will create a new MANIFEST, and then reopen the DB again with new num_levels.

This patch relax the check of levels during recovery. It allows DB to open if there was a historical file on level > options.num_levels, but was also deleted.
Closes https://github.com/facebook/rocksdb/pull/2740

Differential Revision: D5629354

Pulled By: yiwu-arbug

fbshipit-source-id: 545903f6b36b6083e8cbaf777176aef2f488021d
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 92bfd6c507
commit 3c840d1a6d
  1. 21
      db/db_test2.cc
  2. 71
      db/version_builder.cc
  3. 1
      db/version_builder.h
  4. 17
      db/version_set.cc

@ -2304,6 +2304,27 @@ TEST_F(DBTest2, RateLimitedCompactionReads) {
} }
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// Make sure DB can be reopen with reduced number of levels, given no file
// is on levels higher than the new num_levels.
TEST_F(DBTest2, ReduceLevel) {
Options options;
options.disable_auto_compactions = true;
options.num_levels = 7;
Reopen(options);
Put("foo", "bar");
Flush();
MoveFilesToLevel(6);
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel());
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 1;
dbfull()->CompactRange(compact_options, nullptr, nullptr);
ASSERT_EQ("0,1", FilesPerLevel());
options.num_levels = 3;
Reopen(options);
ASSERT_EQ("0,1", FilesPerLevel());
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -17,6 +17,7 @@
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <functional> #include <functional>
#include <map>
#include <set> #include <set>
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
@ -87,7 +88,16 @@ class VersionBuilder::Rep {
Logger* info_log_; Logger* info_log_;
TableCache* table_cache_; TableCache* table_cache_;
VersionStorageInfo* base_vstorage_; VersionStorageInfo* base_vstorage_;
int num_levels_;
LevelState* levels_; LevelState* levels_;
// Store states of levels larger than num_levels_. We do this instead of
// storing them in levels_ to avoid regression in case there are no files
// on invalid levels. The version is not consistent if in the end the files
// on invalid levels don't cancel out.
std::map<int, std::unordered_set<uint64_t>> invalid_levels_;
// Whether there are invalid new files or invalid deletion on levels larger
// than num_levels_.
bool has_invalid_levels_;
FileComparator level_zero_cmp_; FileComparator level_zero_cmp_;
FileComparator level_nonzero_cmp_; FileComparator level_nonzero_cmp_;
@ -97,8 +107,10 @@ class VersionBuilder::Rep {
: env_options_(env_options), : env_options_(env_options),
info_log_(info_log), info_log_(info_log),
table_cache_(table_cache), table_cache_(table_cache),
base_vstorage_(base_vstorage) { base_vstorage_(base_vstorage),
levels_ = new LevelState[base_vstorage_->num_levels()]; num_levels_(base_vstorage->num_levels()),
has_invalid_levels_(false) {
levels_ = new LevelState[num_levels_];
level_zero_cmp_.sort_method = FileComparator::kLevel0; level_zero_cmp_.sort_method = FileComparator::kLevel0;
level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0; level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
level_nonzero_cmp_.internal_comparator = level_nonzero_cmp_.internal_comparator =
@ -106,7 +118,7 @@ class VersionBuilder::Rep {
} }
~Rep() { ~Rep() {
for (int level = 0; level < base_vstorage_->num_levels(); level++) { for (int level = 0; level < num_levels_; level++) {
const auto& added = levels_[level].added_files; const auto& added = levels_[level].added_files;
for (auto& pair : added) { for (auto& pair : added) {
UnrefFile(pair.second); UnrefFile(pair.second);
@ -137,7 +149,7 @@ class VersionBuilder::Rep {
} }
#endif #endif
// make sure the files are sorted correctly // make sure the files are sorted correctly
for (int level = 0; level < vstorage->num_levels(); level++) { for (int level = 0; level < num_levels_; level++) {
auto& level_files = vstorage->LevelFiles(level); auto& level_files = vstorage->LevelFiles(level);
for (size_t i = 1; i < level_files.size(); i++) { for (size_t i = 1; i < level_files.size(); i++) {
auto f1 = level_files[i - 1]; auto f1 = level_files[i - 1];
@ -196,7 +208,7 @@ class VersionBuilder::Rep {
#endif #endif
// a file to be deleted better exist in the previous version // a file to be deleted better exist in the previous version
bool found = false; bool found = false;
for (int l = 0; !found && l < base_vstorage_->num_levels(); l++) { for (int l = 0; !found && l < num_levels_; l++) {
const std::vector<FileMetaData*>& base_files = const std::vector<FileMetaData*>& base_files =
base_vstorage_->LevelFiles(l); base_vstorage_->LevelFiles(l);
for (size_t i = 0; i < base_files.size(); i++) { for (size_t i = 0; i < base_files.size(); i++) {
@ -210,7 +222,7 @@ class VersionBuilder::Rep {
// if the file did not exist in the previous version, then it // if the file did not exist in the previous version, then it
// is possibly moved from lower level to higher level in current // is possibly moved from lower level to higher level in current
// version // version
for (int l = level + 1; !found && l < base_vstorage_->num_levels(); l++) { for (int l = level + 1; !found && l < num_levels_; l++) {
auto& level_added = levels_[l].added_files; auto& level_added = levels_[l].added_files;
auto got = level_added.find(number); auto got = level_added.find(number);
if (got != level_added.end()) { if (got != level_added.end()) {
@ -233,6 +245,19 @@ class VersionBuilder::Rep {
} }
} }
bool CheckConsistencyForNumLevels() {
// Make sure there are no files on or beyond num_levels().
if (has_invalid_levels_) {
return false;
}
for (auto& level : invalid_levels_) {
if (level.second.size() > 0) {
return false;
}
}
return true;
}
// Apply all of the edits in *edit to the current state. // Apply all of the edits in *edit to the current state.
void Apply(VersionEdit* edit) { void Apply(VersionEdit* edit) {
CheckConsistency(base_vstorage_); CheckConsistency(base_vstorage_);
@ -242,6 +267,7 @@ class VersionBuilder::Rep {
for (const auto& del_file : del) { for (const auto& del_file : del) {
const auto level = del_file.first; const auto level = del_file.first;
const auto number = del_file.second; const auto number = del_file.second;
if (level < num_levels_) {
levels_[level].deleted_files.insert(number); levels_[level].deleted_files.insert(number);
CheckConsistencyForDeletes(edit, number, level); CheckConsistencyForDeletes(edit, number, level);
@ -250,11 +276,20 @@ class VersionBuilder::Rep {
UnrefFile(exising->second); UnrefFile(exising->second);
levels_[level].added_files.erase(number); levels_[level].added_files.erase(number);
} }
} else {
if (invalid_levels_[level].count(number) > 0) {
invalid_levels_[level].erase(number);
} else {
// Deleting an non-existing file on invalid level.
has_invalid_levels_ = true;
}
}
} }
// Add new files // Add new files
for (const auto& new_file : edit->GetNewFiles()) { for (const auto& new_file : edit->GetNewFiles()) {
const int level = new_file.first; const int level = new_file.first;
if (level < num_levels_) {
FileMetaData* f = new FileMetaData(new_file.second); FileMetaData* f = new FileMetaData(new_file.second);
f->refs = 1; f->refs = 1;
@ -262,6 +297,15 @@ class VersionBuilder::Rep {
levels_[level].added_files.end()); levels_[level].added_files.end());
levels_[level].deleted_files.erase(f->fd.GetNumber()); levels_[level].deleted_files.erase(f->fd.GetNumber());
levels_[level].added_files[f->fd.GetNumber()] = f; levels_[level].added_files[f->fd.GetNumber()] = f;
} else {
uint64_t number = new_file.second.fd.GetNumber();
if (invalid_levels_[level].count(number) == 0) {
invalid_levels_[level].insert(number);
} else {
// Creating an already existing file on invalid level.
has_invalid_levels_ = true;
}
}
} }
} }
@ -270,7 +314,7 @@ class VersionBuilder::Rep {
CheckConsistency(base_vstorage_); CheckConsistency(base_vstorage_);
CheckConsistency(vstorage); CheckConsistency(vstorage);
for (int level = 0; level < base_vstorage_->num_levels(); level++) { for (int level = 0; level < num_levels_; level++) {
const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_; const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
// Merge the set of added files with the set of pre-existing files. // Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v. // Drop any deleted files. Store the result in *v.
@ -325,7 +369,7 @@ class VersionBuilder::Rep {
assert(table_cache_ != nullptr); assert(table_cache_ != nullptr);
// <file metadata, level> // <file metadata, level>
std::vector<std::pair<FileMetaData*, int>> files_meta; std::vector<std::pair<FileMetaData*, int>> files_meta;
for (int level = 0; level < base_vstorage_->num_levels(); level++) { for (int level = 0; level < num_levels_; level++) {
for (auto& file_meta_pair : levels_[level].added_files) { for (auto& file_meta_pair : levels_[level].added_files) {
auto* file_meta = file_meta_pair.second; auto* file_meta = file_meta_pair.second;
assert(!file_meta->table_reader_handle); assert(!file_meta->table_reader_handle);
@ -386,24 +430,35 @@ VersionBuilder::VersionBuilder(const EnvOptions& env_options,
VersionStorageInfo* base_vstorage, VersionStorageInfo* base_vstorage,
Logger* info_log) Logger* info_log)
: rep_(new Rep(env_options, info_log, table_cache, base_vstorage)) {} : rep_(new Rep(env_options, info_log, table_cache, base_vstorage)) {}
VersionBuilder::~VersionBuilder() { delete rep_; } VersionBuilder::~VersionBuilder() { delete rep_; }
void VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) { void VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) {
rep_->CheckConsistency(vstorage); rep_->CheckConsistency(vstorage);
} }
void VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit, void VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit,
uint64_t number, int level) { uint64_t number, int level) {
rep_->CheckConsistencyForDeletes(edit, number, level); rep_->CheckConsistencyForDeletes(edit, number, level);
} }
bool VersionBuilder::CheckConsistencyForNumLevels() {
return rep_->CheckConsistencyForNumLevels();
}
void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); } void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); }
void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) { void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
rep_->SaveTo(vstorage); rep_->SaveTo(vstorage);
} }
void VersionBuilder::LoadTableHandlers( void VersionBuilder::LoadTableHandlers(
InternalStats* internal_stats, int max_threads, InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache) { bool prefetch_index_and_filter_in_cache) {
rep_->LoadTableHandlers(internal_stats, max_threads, rep_->LoadTableHandlers(internal_stats, max_threads,
prefetch_index_and_filter_in_cache); prefetch_index_and_filter_in_cache);
} }
void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level, void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,
FileMetaData* f) { FileMetaData* f) {
rep_->MaybeAddFile(vstorage, level, f); rep_->MaybeAddFile(vstorage, level, f);

@ -29,6 +29,7 @@ class VersionBuilder {
void CheckConsistency(VersionStorageInfo* vstorage); void CheckConsistency(VersionStorageInfo* vstorage);
void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number, void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number,
int level); int level);
bool CheckConsistencyForNumLevels();
void Apply(VersionEdit* edit); void Apply(VersionEdit* edit);
void SaveTo(VersionStorageInfo* vstorage); void SaveTo(VersionStorageInfo* vstorage);
void LoadTableHandlers(InternalStats* internal_stats, int max_threads, void LoadTableHandlers(InternalStats* internal_stats, int max_threads,

@ -2842,11 +2842,6 @@ Status VersionSet::Recover(
cfd = column_family_set_->GetColumnFamily(edit.column_family_); cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// this should never happen since cf_in_builders is true // this should never happen since cf_in_builders is true
assert(cfd != nullptr); assert(cfd != nullptr);
if (edit.max_level_ >= cfd->current()->storage_info()->num_levels()) {
s = Status::InvalidArgument(
"db has more levels than options.num_levels");
break;
}
// if it is not column family add or column family drop, // if it is not column family add or column family drop,
// then it's a file add/delete, which should be forwarded // then it's a file add/delete, which should be forwarded
@ -2930,6 +2925,18 @@ Status VersionSet::Recover(
list_of_not_found); list_of_not_found);
} }
if (s.ok()) {
for (auto cfd : *column_family_set_) {
assert(builders.count(cfd->GetID()) > 0);
auto* builder = builders[cfd->GetID()]->version_builder();
if (!builder->CheckConsistencyForNumLevels()) {
s = Status::InvalidArgument(
"db has more levels than options.num_levels");
break;
}
}
}
if (s.ok()) { if (s.ok()) {
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) { if (cfd->IsDropped()) {

Loading…
Cancel
Save