From 153f4f0719bf904c9d38bf32dbf9069f15760f9d Mon Sep 17 00:00:00 2001 From: Venkatesh Radhakrishnan Date: Mon, 15 Dec 2014 21:48:16 -0800 Subject: [PATCH] RocksDB: Allow Level-Style Compaction to Place Files in Different Paths Summary: Allow Level-style compaction to place files in different paths This diff provides the code for task 4854591. We now support level-compaction to place files in different paths by specifying them in db_paths along with the minimum level for files to store in that path. Test Plan: ManualLevelCompactionOutputPathId in db_test.cc Reviewers: yhchiang, MarkCallaghan, dhruba, yoshinorim, sdong Reviewed By: sdong Subscribers: yoshinorim, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D29799 --- HISTORY.md | 8 +- db/compaction_picker.cc | 42 ++++- db/compaction_picker.h | 5 + db/compaction_picker_test.cc | 3 + db/db_impl.cc | 36 ++++- db/db_test.cc | 302 +++++++++++++++++++++++++++++++++++ db/flush_job.cc | 7 + 7 files changed, 394 insertions(+), 9 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index e1b9f15a1..121b936af 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -14,9 +14,13 @@ ### Public API changes * New API to create a checkpoint added. Given a directory name, creates a new database which is an image of the existing database. -*New API LinkFile added to Env. If you implement your own Env class, an - implementation of the API LinkFile will have to be provided. +* New API LinkFile added to Env. If you implement your own Env class, an + implementation of the API LinkFile will have to be provided. * MemTableRep takes MemTableAllocator instead of Arena +* We now allow level-compaction to place files in different paths by + specifying them in db_paths along with the target_size. + Lower numbered levels will be placed earlier in the db_paths and higher + numbered levels will be placed later in the db_paths vector. ### Improvements * RocksDBLite library now becomes smaller and will be compiled with -fno-exceptions flag. diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 213daefc1..82653ff70 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -769,6 +769,44 @@ Compaction* LevelCompactionPicker::PickCompaction( return c; } +/* + * Find the optimal path to place a file + * Given a level, finds the path where levels up to it will fit in levels + * up to and including this path + */ +uint32_t LevelCompactionPicker::GetPathId( + const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options, int level) { + uint32_t p = 0; + assert(!ioptions.db_paths.empty()); + + // size remaining in the most recent path + uint64_t current_path_size = ioptions.db_paths[0].target_size; + + uint64_t level_size; + int cur_level = 0; + + level_size = mutable_cf_options.max_bytes_for_level_base; + + // Last path is the fallback + while (p < ioptions.db_paths.size() - 1) { + if (level_size <= current_path_size) { + if (cur_level == level) { + // Does desired level fit in this path? + return p; + } else { + current_path_size -= level_size; + level_size *= mutable_cf_options.max_bytes_for_level_multiplier; + cur_level++; + continue; + } + } + p++; + current_path_size = ioptions.db_paths[p].target_size; + } + return p; +} + Compaction* LevelCompactionPicker::PickCompactionBySize( const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, int level, double score) { @@ -786,7 +824,8 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( assert(level + 1 < NumberLevels()); c = new Compaction(vstorage->num_levels(), level, level + 1, mutable_cf_options.MaxFileSizeForLevel(level + 1), - mutable_cf_options.MaxGrandParentOverlapBytes(level), 0, + mutable_cf_options.MaxGrandParentOverlapBytes(level), + GetPathId(ioptions_, mutable_cf_options, level + 1), GetCompressionType(ioptions_, level + 1)); c->score_ = score; @@ -960,6 +999,7 @@ uint32_t UniversalCompactionPicker::GetPathId( uint64_t future_size = file_size * (100 - ioptions.compaction_options_universal.size_ratio) / 100; uint32_t p = 0; + assert(!ioptions.db_paths.empty()); for (; p < ioptions.db_paths.size() - 1; p++) { uint64_t target_size = ioptions.db_paths[p].target_size; if (target_size > file_size && diff --git a/db/compaction_picker.h b/db/compaction_picker.h index cfed5109d..ad72e609a 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -188,6 +188,11 @@ class LevelCompactionPicker : public CompactionPicker { virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const override; + // Pick a path ID to place a newly generated file, with its level + static uint32_t GetPathId(const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options, + int level); + private: // For the specfied level, pick a compaction. // Returns nullptr if there is no compaction to be done. diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index 419d239c8..5ffc74f0d 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -4,6 +4,7 @@ // of patent rights can be found in the PATENTS file in the same directory. #include "db/compaction_picker.h" +#include #include #include "util/logging.h" #include "util/testharness.h" @@ -47,6 +48,8 @@ class CompactionPickerTest { fifo_options_.max_table_files_size = 1; mutable_cf_options_.RefreshDerivedOptions(ioptions_); size_being_compacted_.resize(options_.num_levels); + ioptions_.db_paths.emplace_back("dummy", + std::numeric_limits::max()); } ~CompactionPickerTest() { diff --git a/db/db_impl.cc b/db/db_impl.cc index b4b423d9d..2d9b32cc8 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -67,6 +67,7 @@ #include "util/build_version.h" #include "util/coding.h" #include "util/db_info_dumper.h" +#include "util/file_util.h" #include "util/hash_skiplist_rep.h" #include "util/hash_linklist_rep.h" #include "util/logging.h" @@ -2059,10 +2060,31 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, // Move file to next level assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); + FileMetaData ftemp; + uint64_t fdnum = f->fd.GetNumber(); + uint32_t fdpath = f->fd.GetPathId(); c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); - c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(), - f->fd.GetFileSize(), f->smallest, f->largest, - f->smallest_seqno, f->largest_seqno); + // Need to move file if file is to be stored in a new path + if (c->GetOutputPathId() != f->fd.GetPathId()) { + fdnum = versions_->NewFileNumber(); + std::string source = TableFileName(db_options_.db_paths, + f->fd.GetNumber(), f->fd.GetPathId()); + std::string destination = + TableFileName(db_options_.db_paths, fdnum, c->GetOutputPathId()); + Status s = CopyFile(env_, source, destination, 0); + if (s.ok()) { + fdpath = c->GetOutputPathId(); + } else { + fdnum = f->fd.GetNumber(); + if (!s.IsShutdownInProgress()) { + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "Compaction error: %s", s.ToString().c_str()); + } + } + } + c->edit()->AddFile(c->level() + 1, fdnum, fdpath, f->fd.GetFileSize(), + f->smallest, f->largest, f->smallest_seqno, + f->largest_seqno); status = versions_->LogAndApply(c->column_family_data(), *c->mutable_cf_options(), c->edit(), &mutex_, db_directory_.get()); @@ -3519,18 +3541,20 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, if (!s.ok()) { return s; } + if (db_options.db_paths.size() > 1) { for (auto& cfd : column_families) { - if (cfd.options.compaction_style != kCompactionStyleUniversal) { + if ((cfd.options.compaction_style != kCompactionStyleUniversal) && + (cfd.options.compaction_style != kCompactionStyleLevel)) { return Status::NotSupported( "More than one DB paths are only supported in " - "universal compaction style. "); + "universal and level compaction styles. "); } } if (db_options.db_paths.size() > 4) { return Status::NotSupported( - "More than four DB paths are not supported yet. "); + "More than four DB paths are not supported yet. "); } } diff --git a/db/db_test.cc b/db/db_test.cc index facae2f68..6c995e7a0 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1004,6 +1004,12 @@ class DBTest { return size; } + void Compact(int cf, const Slice& start, const Slice& limit, + uint32_t target_path_id) { + ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit, false, -1, + target_path_id)); + } + void Compact(int cf, const Slice& start, const Slice& limit) { ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit)); } @@ -4087,6 +4093,233 @@ TEST(DBTest, UniversalCompactionSecondPathRatio) { Destroy(options); } +TEST(DBTest, LevelCompactionThirdPath) { + Options options; + options.db_paths.emplace_back(dbname_, 500 * 1024); + options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024); + options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024); + options.compaction_style = kCompactionStyleLevel; + options.write_buffer_size = 100 << 10; // 100KB + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 4; + options.max_bytes_for_level_base = 400 * 1024; + // options = CurrentOptions(options); + + std::vector filenames; + env_->GetChildren(options.db_paths[1].path, &filenames); + // Delete archival files. + for (size_t i = 0; i < filenames.size(); ++i) { + env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]); + } + env_->DeleteDir(options.db_paths[1].path); + Reopen(options); + + Random rnd(301); + int key_idx = 0; + + // First three 110KB files are not going to second path. + // After that, (100K, 200K) + for (int num = 0; num < 3; num++) { + GenerateNewFile(&rnd, &key_idx); + } + + // Another 110KB triggers a compaction to 400K file to fill up first path + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ(3, GetSstFileCount(options.db_paths[1].path)); + + // (1, 4) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4", FilesPerLevel(0)); + ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(dbname_)); + + // (1, 4, 1) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,1", FilesPerLevel(0)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(dbname_)); + + // (1, 4, 2) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,2", FilesPerLevel(0)); + ASSERT_EQ(2, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(dbname_)); + + // (1, 4, 3) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,3", FilesPerLevel(0)); + ASSERT_EQ(3, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(dbname_)); + + // (1, 4, 4) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,4", FilesPerLevel(0)); + ASSERT_EQ(4, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(dbname_)); + + // (1, 4, 5) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,5", FilesPerLevel(0)); + ASSERT_EQ(5, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(dbname_)); + + // (1, 4, 6) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,6", FilesPerLevel(0)); + ASSERT_EQ(6, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(dbname_)); + + // (1, 4, 7) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,7", FilesPerLevel(0)); + ASSERT_EQ(7, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(dbname_)); + + // (1, 4, 8) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,8", FilesPerLevel(0)); + ASSERT_EQ(8, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(dbname_)); + + for (int i = 0; i < key_idx; i++) { + auto v = Get(Key(i)); + ASSERT_NE(v, "NOT_FOUND"); + ASSERT_TRUE(v.size() == 1 || v.size() == 10000); + } + + Reopen(options); + + for (int i = 0; i < key_idx; i++) { + auto v = Get(Key(i)); + ASSERT_NE(v, "NOT_FOUND"); + ASSERT_TRUE(v.size() == 1 || v.size() == 10000); + } + + Destroy(options); +} + +TEST(DBTest, LevelCompactionPathUse) { + Options options; + options.db_paths.emplace_back(dbname_, 500 * 1024); + options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024); + options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024); + options.compaction_style = kCompactionStyleLevel; + options.write_buffer_size = 100 << 10; // 100KB + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 4; + options.max_bytes_for_level_base = 400 * 1024; + // options = CurrentOptions(options); + + std::vector filenames; + env_->GetChildren(options.db_paths[1].path, &filenames); + // Delete archival files. + for (size_t i = 0; i < filenames.size(); ++i) { + env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]); + } + env_->DeleteDir(options.db_paths[1].path); + Reopen(options); + + Random rnd(301); + int key_idx = 0; + + // Always gets compacted into 1 Level1 file, + // 0/1 Level 0 file + for (int num = 0; num < 3; num++) { + key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + } + + key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + + key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,1", FilesPerLevel(0)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(dbname_)); + + key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("0,1", FilesPerLevel(0)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); + + key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,1", FilesPerLevel(0)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(dbname_)); + + key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("0,1", FilesPerLevel(0)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); + + key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,1", FilesPerLevel(0)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(dbname_)); + + key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("0,1", FilesPerLevel(0)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); + + key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,1", FilesPerLevel(0)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(dbname_)); + + key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("0,1", FilesPerLevel(0)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); + + key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,1", FilesPerLevel(0)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(dbname_)); + + for (int i = 0; i < key_idx; i++) { + auto v = Get(Key(i)); + ASSERT_NE(v, "NOT_FOUND"); + ASSERT_TRUE(v.size() == 1 || v.size() == 10000); + } + + Reopen(options); + + for (int i = 0; i < key_idx; i++) { + auto v = Get(Key(i)); + ASSERT_NE(v, "NOT_FOUND"); + ASSERT_TRUE(v.size() == 1 || v.size() == 10000); + } + + Destroy(options); +} + TEST(DBTest, UniversalCompactionFourPaths) { Options options; options.db_paths.emplace_back(dbname_, 300 * 1024); @@ -5953,6 +6186,75 @@ TEST(DBTest, ManualCompactionOutputPathId) { .IsInvalidArgument()); } +TEST(DBTest, ManualLevelCompactionOutputPathId) { + Options options = CurrentOptions(); + options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760); + options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760); + options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760); + options.max_background_flushes = 1; + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_EQ(dbfull()->MaxMemCompactionLevel(), 2) + << "Need to update this test to match kMaxMemCompactLevel"; + + // iter - 0 with 7 levels + // iter - 1 with 3 levels + for (int iter = 0; iter < 2; ++iter) { + MakeTables(3, "p", "q", 1); + ASSERT_EQ("3", FilesPerLevel(1)); + ASSERT_EQ(3, GetSstFileCount(options.db_paths[0].path)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); + + // Compaction range falls before files + Compact(1, "", "c"); + ASSERT_EQ("3", FilesPerLevel(1)); + + // Compaction range falls after files + Compact(1, "r", "z"); + ASSERT_EQ("3", FilesPerLevel(1)); + + // Compaction range overlaps files + Compact(1, "p1", "p9", 1); + ASSERT_EQ("0,1", FilesPerLevel(1)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); + + // Populate a different range + MakeTables(3, "c", "e", 1); + ASSERT_EQ("3,1", FilesPerLevel(1)); + + // Compact just the new range + Compact(1, "b", "f", 1); + ASSERT_EQ("0,2", FilesPerLevel(1)); + ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); + + // Compact all + MakeTables(1, "a", "z", 1); + ASSERT_EQ("1,2", FilesPerLevel(1)); + ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); + db_->CompactRange(handles_[1], nullptr, nullptr, false, 1, 1); + ASSERT_EQ("0,1", FilesPerLevel(1)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); + + if (iter == 0) { + DestroyAndReopen(options); + options = CurrentOptions(); + options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760); + options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760); + options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760); + options.max_background_flushes = 1; + options.num_levels = 3; + options.create_if_missing = true; + CreateAndReopenWithCF({"pikachu"}, options); + } + } +} + TEST(DBTest, DBOpen_Options) { Options options = CurrentOptions(); std::string dbname = test::TmpDir(env_) + "/db_options_test"; diff --git a/db/flush_job.cc b/db/flush_job.cc index 10bd6f96b..ccc0245a3 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -40,6 +40,7 @@ #include "table/table_builder.h" #include "table/two_level_iterator.h" #include "util/coding.h" +#include "util/file_util.h" #include "util/logging.h" #include "util/log_buffer.h" #include "util/mutexlock.h" @@ -194,6 +195,12 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, cfd_->ioptions()->compaction_style == kCompactionStyleLevel) { level = base->storage_info()->PickLevelForMemTableOutput( mutable_cf_options_, min_user_key, max_user_key); + // If level does not match path id, reset level back to 0 + uint32_t fdpath = LevelCompactionPicker::GetPathId( + *cfd_->ioptions(), mutable_cf_options_, level); + if (fdpath != 0) { + level = 0; + } } edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), meta.fd.GetFileSize(), meta.smallest, meta.largest,