diff --git a/db/column_family.cc b/db/column_family.cc index c081431cd..cffd8b898 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -352,11 +352,13 @@ Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) { } Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level, + uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end) { return compaction_picker_->CompactRange(current_, input_level, output_level, - begin, end, compaction_end); + output_path_id, begin, end, + compaction_end); } SuperVersion* ColumnFamilyData::GetReferencedSuperVersion( diff --git a/db/column_family.h b/db/column_family.h index 826fcc669..b6620675c 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -185,7 +185,8 @@ class ColumnFamilyData { // See documentation in compaction_picker.h Compaction* PickCompaction(LogBuffer* log_buffer); Compaction* CompactRange(int input_level, int output_level, - const InternalKey* begin, const InternalKey* end, + uint32_t output_path_id, const InternalKey* begin, + const InternalKey* end, InternalKey** compaction_end); CompactionPicker* compaction_picker() { return compaction_picker_.get(); } diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 84fbc6b02..82f659b8b 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -330,9 +330,9 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) { } } - Compaction* CompactionPicker::CompactRange(Version* version, int input_level, int output_level, + uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end) { @@ -372,10 +372,11 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, } } } - Compaction* c = new Compaction(version, input_level, output_level, - MaxFileSizeForLevel(output_level), - MaxGrandParentOverlapBytes(input_level), 0, - GetCompressionType(*options_, output_level)); + assert(output_path_id < static_cast(options_->db_paths.size())); + Compaction* c = new Compaction( + version, input_level, output_level, MaxFileSizeForLevel(output_level), + MaxGrandParentOverlapBytes(input_level), output_path_id, + GetCompressionType(*options_, output_level)); c->inputs_[0].files = inputs; if (ExpandWhileOverlapping(c) == false) { @@ -983,17 +984,19 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version, return c; } -Compaction* FIFOCompactionPicker::CompactRange(Version* version, - int input_level, - int output_level, - const InternalKey* begin, - const InternalKey* end, - InternalKey** compaction_end) { +Compaction* FIFOCompactionPicker::CompactRange( + Version* version, int input_level, int output_level, + uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end) { assert(input_level == 0); assert(output_level == 0); *compaction_end = nullptr; LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, options_->info_log.get()); - auto c = PickCompaction(version, &log_buffer); + Compaction* c = PickCompaction(version, &log_buffer); + if (c != nullptr) { + assert(output_path_id < static_cast(options_->db_paths.size())); + c->output_path_id_ = output_path_id; + } log_buffer.FlushBufferToLog(); return c; } diff --git a/db/compaction_picker.h b/db/compaction_picker.h index ae6472838..6edd3404d 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -48,7 +48,8 @@ class CompactionPicker { // Client is responsible for compaction_end storage -- when called, // *compaction_end should point to valid InternalKey! virtual Compaction* CompactRange(Version* version, int input_level, - int output_level, const InternalKey* begin, + int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end); @@ -192,7 +193,8 @@ class FIFOCompactionPicker : public CompactionPicker { LogBuffer* log_buffer) override; virtual Compaction* CompactRange(Version* version, int input_level, - int output_level, const InternalKey* begin, + int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end) override; diff --git a/db/db_impl.cc b/db/db_impl.cc index d008c7699..e77b5ec40 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1614,7 +1614,12 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end, - bool reduce_level, int target_level) { + bool reduce_level, int target_level, + uint32_t target_path_id) { + if (target_path_id >= options_.db_paths.size()) { + return Status::InvalidArgument("Invalid target path ID"); + } + auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); @@ -1640,9 +1645,10 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, if (cfd->options()->compaction_style == kCompactionStyleUniversal || cfd->options()->compaction_style == kCompactionStyleFIFO || level == max_level_with_files) { - s = RunManualCompaction(cfd, level, level, begin, end); + s = RunManualCompaction(cfd, level, level, target_path_id, begin, end); } else { - s = RunManualCompaction(cfd, level, level + 1, begin, end); + s = RunManualCompaction(cfd, level, level + 1, target_path_id, begin, + end); } if (!s.ok()) { LogFlush(options_.info_log); @@ -1775,8 +1781,8 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const { } Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, - int output_level, const Slice* begin, - const Slice* end) { + int output_level, uint32_t output_path_id, + const Slice* begin, const Slice* end) { assert(input_level >= 0); InternalKey begin_storage, end_storage; @@ -1785,6 +1791,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, manual.cfd = cfd; manual.input_level = input_level; manual.output_level = output_level; + manual.output_path_id = output_path_id; manual.done = false; manual.in_progress = false; // For universal compaction, we enforce every manual compaction to compact @@ -2177,8 +2184,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, if (is_manual) { ManualCompaction* m = manual_compaction_; assert(m->in_progress); - c.reset(m->cfd->CompactRange(m->input_level, m->output_level, m->begin, - m->end, &manual_end)); + c.reset(m->cfd->CompactRange(m->input_level, m->output_level, + m->output_path_id, m->begin, m->end, + &manual_end)); if (!c) { m->done = true; } diff --git a/db/db_impl.h b/db/db_impl.h index 9e9d10817..759e961ed 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -102,7 +102,8 @@ class DBImpl : public DB { using DB::CompactRange; virtual Status CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end, - bool reduce_level = false, int target_level = -1); + bool reduce_level = false, int target_level = -1, + uint32_t target_path_id = 0); using DB::NumberLevels; virtual int NumberLevels(ColumnFamilyHandle* column_family); @@ -145,8 +146,8 @@ class DBImpl : public DB { virtual Status GetDbIdentity(std::string& identity); Status RunManualCompaction(ColumnFamilyData* cfd, int input_level, - int output_level, const Slice* begin, - const Slice* end); + int output_level, uint32_t output_path_id, + const Slice* begin, const Slice* end); #ifndef ROCKSDB_LITE // Extra methods (for testing) that are not in the public DB interface @@ -531,6 +532,7 @@ class DBImpl : public DB { ColumnFamilyData* cfd; int input_level; int output_level; + uint32_t output_path_id; bool done; Status status; bool in_progress; // compaction request being processed? diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 927a01a04..8df66f6c6 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -85,7 +85,7 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin, cfd->options()->compaction_style == kCompactionStyleFIFO) ? level : level + 1; - return RunManualCompaction(cfd, level, output_level, begin, end); + return RunManualCompaction(cfd, level, output_level, 0, begin, end); } Status DBImpl::TEST_FlushMemTable(bool wait) { diff --git a/db/db_test.cc b/db/db_test.cc index b62c59dea..6f81d207a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5252,6 +5252,54 @@ TEST(DBTest, ManualCompaction) { } +TEST(DBTest, ManualCompactionOutputPathId) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.db_paths.emplace_back(dbname_, 1000000000); + options.db_paths.emplace_back(dbname_ + "_2", 1000000000); + options.compaction_style = kCompactionStyleUniversal; + options.level0_file_num_compaction_trigger = 10; + Destroy(&options); + DestroyAndReopen(&options); + CreateAndReopenWithCF({"pikachu"}, &options); + MakeTables(3, "p", "q", 1); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ("3", FilesPerLevel(1)); + ASSERT_EQ(3, GetSstFileCount(options.db_paths[0].path)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path)); + + // Full compaction to DB path 0 + db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 1); + ASSERT_EQ("1", FilesPerLevel(1)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + + ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, &options); + ASSERT_EQ("1", FilesPerLevel(1)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + + MakeTables(1, "p", "q", 1); + ASSERT_EQ("2", FilesPerLevel(1)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + + ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, &options); + ASSERT_EQ("2", FilesPerLevel(1)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); + + // Full compaction to DB path 0 + db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 0); + ASSERT_EQ("1", FilesPerLevel(1)); + ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); + ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path)); + + // Fail when compacting to an invalid path ID + ASSERT_TRUE(db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 2) + .IsInvalidArgument()); +} + TEST(DBTest, DBOpen_Options) { std::string dbname = test::TmpDir() + "/db_options_test"; ASSERT_OK(DestroyDB(dbname, Options())); @@ -6559,7 +6607,8 @@ class ModelDB: public DB { using DB::CompactRange; virtual Status CompactRange(ColumnFamilyHandle* column_family, const Slice* start, const Slice* end, - bool reduce_level, int target_level) { + bool reduce_level, int target_level, + uint32_t output_path_id) { return Status::NotSupported("Not supported operation."); } diff --git a/db/file_indexer.cc b/db/file_indexer.cc index e148777b0..56691bde5 100644 --- a/db/file_indexer.cc +++ b/db/file_indexer.cc @@ -15,10 +15,7 @@ namespace rocksdb { FileIndexer::FileIndexer(const Comparator* ucmp) - : num_levels_(0), - ucmp_(ucmp), - level_rb_(nullptr) { -} + : num_levels_(0), ucmp_(ucmp), level_rb_(nullptr) {} uint32_t FileIndexer::NumLevelIndex() { return next_level_index_.size(); @@ -47,8 +44,9 @@ void FileIndexer::GetNextLevelIndex( const auto& index = index_units[file_index]; if (cmp_smallest < 0) { - *left_bound = (level > 0 && file_index > 0) ? - index_units[file_index - 1].largest_lb : 0; + *left_bound = (level > 0 && file_index > 0) + ? index_units[file_index - 1].largest_lb + : 0; *right_bound = index.smallest_rb; } else if (cmp_smallest == 0) { *left_bound = index.smallest_lb; @@ -71,23 +69,22 @@ void FileIndexer::GetNextLevelIndex( assert(*right_bound <= level_rb_[level + 1]); } -void FileIndexer::UpdateIndex(Arena* arena, - const uint32_t num_levels, +void FileIndexer::UpdateIndex(Arena* arena, const uint32_t num_levels, std::vector* const files) { if (files == nullptr) { return; } - if (num_levels == 0) { // uint_32 0-1 would cause bad behavior + if (num_levels == 0) { // uint_32 0-1 would cause bad behavior num_levels_ = num_levels; return; } - assert(level_rb_ == nullptr); // level_rb_ should be init here + assert(level_rb_ == nullptr); // level_rb_ should be init here num_levels_ = num_levels; next_level_index_.resize(num_levels); char* mem = arena->AllocateAligned(num_levels_ * sizeof(int32_t)); - level_rb_ = new (mem)int32_t[num_levels_]; + level_rb_ = new (mem) int32_t[num_levels_]; for (size_t i = 0; i < num_levels_; i++) { level_rb_[i] = -1; } @@ -104,44 +101,40 @@ void FileIndexer::UpdateIndex(Arena* arena, IndexLevel& index_level = next_level_index_[level]; index_level.num_index = upper_size; char* mem = arena->AllocateAligned(upper_size * sizeof(IndexUnit)); - index_level.index_units = new (mem)IndexUnit[upper_size]; + index_level.index_units = new (mem) IndexUnit[upper_size]; - CalculateLB(upper_files, lower_files, &index_level, - [this](const FileMetaData* a, const FileMetaData* b) -> int { + CalculateLB( + upper_files, lower_files, &index_level, + [this](const FileMetaData * a, const FileMetaData * b)->int { return ucmp_->Compare(a->smallest.user_key(), b->largest.user_key()); }, - [](IndexUnit* index, int32_t f_idx) { - index->smallest_lb = f_idx; - }); - CalculateLB(upper_files, lower_files, &index_level, - [this](const FileMetaData* a, const FileMetaData* b) -> int { + [](IndexUnit* index, int32_t f_idx) { index->smallest_lb = f_idx; }); + CalculateLB( + upper_files, lower_files, &index_level, + [this](const FileMetaData * a, const FileMetaData * b)->int { return ucmp_->Compare(a->largest.user_key(), b->largest.user_key()); }, - [](IndexUnit* index, int32_t f_idx) { - index->largest_lb = f_idx; - }); - CalculateRB(upper_files, lower_files, &index_level, - [this](const FileMetaData* a, const FileMetaData* b) -> int { + [](IndexUnit* index, int32_t f_idx) { index->largest_lb = f_idx; }); + CalculateRB( + upper_files, lower_files, &index_level, + [this](const FileMetaData * a, const FileMetaData * b)->int { return ucmp_->Compare(a->smallest.user_key(), b->smallest.user_key()); }, - [](IndexUnit* index, int32_t f_idx) { - index->smallest_rb = f_idx; - }); - CalculateRB(upper_files, lower_files, &index_level, - [this](const FileMetaData* a, const FileMetaData* b) -> int { + [](IndexUnit* index, int32_t f_idx) { index->smallest_rb = f_idx; }); + CalculateRB( + upper_files, lower_files, &index_level, + [this](const FileMetaData * a, const FileMetaData * b)->int { return ucmp_->Compare(a->largest.user_key(), b->smallest.user_key()); }, - [](IndexUnit* index, int32_t f_idx) { - index->largest_rb = f_idx; - }); + [](IndexUnit* index, int32_t f_idx) { index->largest_rb = f_idx; }); } level_rb_[num_levels_ - 1] = files[num_levels_ - 1].size() - 1; } -void FileIndexer::CalculateLB(const std::vector& upper_files, - const std::vector& lower_files, - IndexLevel *index_level, +void FileIndexer::CalculateLB( + const std::vector& upper_files, + const std::vector& lower_files, IndexLevel* index_level, std::function cmp_op, std::function set_index) { const int32_t upper_size = upper_files.size(); @@ -177,9 +170,9 @@ void FileIndexer::CalculateLB(const std::vector& upper_files, } } -void FileIndexer::CalculateRB(const std::vector& upper_files, - const std::vector& lower_files, - IndexLevel *index_level, +void FileIndexer::CalculateRB( + const std::vector& upper_files, + const std::vector& lower_files, IndexLevel* index_level, std::function cmp_op, std::function set_index) { const int32_t upper_size = upper_files.size(); diff --git a/db/file_indexer.h b/db/file_indexer.h index c49813e51..127b3ee46 100644 --- a/db/file_indexer.h +++ b/db/file_indexer.h @@ -54,8 +54,7 @@ class FileIndexer { const uint32_t level, const uint32_t file_index, const int cmp_smallest, const int cmp_largest, int32_t* left_bound, int32_t* right_bound); - void UpdateIndex(Arena* arena, - const uint32_t num_levels, + void UpdateIndex(Arena* arena, const uint32_t num_levels, std::vector* const files); enum { @@ -119,20 +118,20 @@ class FileIndexer { size_t num_index; IndexUnit* index_units; - IndexLevel(): num_index(0), index_units(nullptr) {} + IndexLevel() : num_index(0), index_units(nullptr) {} }; - void CalculateLB(const std::vector& upper_files, - const std::vector& lower_files, - IndexLevel* index_level, - std::function cmp_op, - std::function set_index); - - void CalculateRB(const std::vector& upper_files, - const std::vector& lower_files, - IndexLevel* index_level, - std::function cmp_op, - std::function set_index); + void CalculateLB( + const std::vector& upper_files, + const std::vector& lower_files, IndexLevel* index_level, + std::function cmp_op, + std::function set_index); + + void CalculateRB( + const std::vector& upper_files, + const std::vector& lower_files, IndexLevel* index_level, + std::function cmp_op, + std::function set_index); autovector next_level_index_; int32_t* level_rb_; diff --git a/db/file_indexer_test.cc b/db/file_indexer_test.cc index fe9caf0a2..673d85a5c 100644 --- a/db/file_indexer_test.cc +++ b/db/file_indexer_test.cc @@ -37,10 +37,8 @@ class IntComparator : public Comparator { struct FileIndexerTest { public: - FileIndexerTest() : - kNumLevels(4), - files(new std::vector[kNumLevels]) { - } + FileIndexerTest() + : kNumLevels(4), files(new std::vector[kNumLevels]) {} ~FileIndexerTest() { ClearFiles(); @@ -73,7 +71,7 @@ struct FileIndexerTest { *left_index = 100; *right_index = 100; indexer->GetNextLevelIndex(level, file_index, cmp_smallest, cmp_largest, - left_index, right_index); + left_index, right_index); } int32_t left = 100; diff --git a/db/version_set.cc b/db/version_set.cc index b7c5b7d49..c27c7f340 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -762,8 +762,9 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset, // cfd is nullptr if Version is dummy num_levels_(cfd == nullptr ? 0 : cfd->NumberLevels()), num_non_empty_levels_(num_levels_), - file_indexer_(cfd == nullptr ? nullptr - : cfd->internal_comparator().user_comparator()), + file_indexer_(cfd == nullptr + ? nullptr + : cfd->internal_comparator().user_comparator()), vset_(vset), next_(this), prev_(this), diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 7f61d6e15..c83e4d7ad 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -339,15 +339,17 @@ class DB { // hosting all the files. In this case, client could set reduce_level // to true, to move the files back to the minimum level capable of holding // the data set or a given level (specified by non-negative target_level). + // Compaction outputs should be placed in options.db_paths[target_path_id]. + // Behavior is undefined if target_path_id is out of range. virtual Status CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end, - bool reduce_level = false, - int target_level = -1) = 0; + bool reduce_level = false, int target_level = -1, + uint32_t target_path_id = 0) = 0; virtual Status CompactRange(const Slice* begin, const Slice* end, - bool reduce_level = false, - int target_level = -1) { + bool reduce_level = false, int target_level = -1, + uint32_t target_path_id = 0) { return CompactRange(DefaultColumnFamily(), begin, end, reduce_level, - target_level); + target_level, target_path_id); } // Number of levels used for this DB. diff --git a/include/utilities/stackable_db.h b/include/utilities/stackable_db.h index 562f95c41..5c8c7fe6e 100644 --- a/include/utilities/stackable_db.h +++ b/include/utilities/stackable_db.h @@ -120,10 +120,10 @@ class StackableDB : public DB { using DB::CompactRange; virtual Status CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end, - bool reduce_level = false, - int target_level = -1) override { + bool reduce_level = false, int target_level = -1, + uint32_t target_path_id = 0) override { return db_->CompactRange(column_family, begin, end, reduce_level, - target_level); + target_level, target_path_id); } using DB::NumberLevels;