diff --git a/db/compaction.cc b/db/compaction.cc index 682fb61db..8a1a22a14 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -18,6 +18,7 @@ #include "db/column_family.h" #include "util/logging.h" +#include "util/sync_point.h" namespace rocksdb { @@ -130,17 +131,28 @@ void Compaction::GenerateFileLevels() { } } +bool Compaction::InputCompressionMatchesOutput() const { + int base_level = input_version_->storage_info()->base_level(); + bool matches = (GetCompressionType(*cfd_->ioptions(), start_level_, + base_level) == output_compression_); + if (matches) { + TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches"); + return true; + } + TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:DidntMatch"); + return false; +} + bool Compaction::IsTrivialMove() const { // Avoid a move if there is lots of overlapping grandparent data. // Otherwise, the move could create a parent file that will require // a very expensive merge later on. // If start_level_== output_level_, the purpose is to force compaction // filter to be applied to that level, and thus cannot be a trivia move. - return (start_level_ != output_level_ && - num_input_levels() == 2 && - num_input_files(0) == 1 && - num_input_files(1) == 0 && + return (start_level_ != output_level_ && num_input_levels() == 2 && + num_input_files(0) == 1 && num_input_files(1) == 0 && input(0, 0)->fd.GetPathId() == GetOutputPathId() && + InputCompressionMatchesOutput() && TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_); } diff --git a/db/compaction.h b/db/compaction.h index 3a95f2416..bb2e70d06 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -262,6 +262,9 @@ class Compaction { // In case of compaction error, reset the nextIndex that is used // to pick up the next file to be compacted from files_by_size_ void ResetNextCompactionIndex(); + + // Does input compression match the output compression? + bool InputCompressionMatchesOutput() const; }; // Utility function diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index ecbe89fad..8650b5cfd 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -32,6 +32,8 @@ uint64_t TotalCompensatedFileSize(const std::vector& files) { return sum; } +} // anonymous namespace + // Determine compression type, based on user options, level of the output // file and whether compression is disabled. // If enable_compression is false, then compression is always disabled no @@ -39,7 +41,7 @@ uint64_t TotalCompensatedFileSize(const std::vector& files) { // Otherwise, the compression type is determined based on options and level. CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, int level, int base_level, - const bool enable_compression = true) { + const bool enable_compression) { if (!enable_compression) { // disable compression return kNoCompression; @@ -62,9 +64,6 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, } } - -} // anonymous namespace - CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions, const InternalKeyComparator* icmp) : ioptions_(ioptions), icmp_(icmp) {} diff --git a/db/compaction_picker.h b/db/compaction_picker.h index d7f0d69c6..43ccfcc9a 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -342,4 +342,8 @@ class NullCompactionPicker : public CompactionPicker { }; #endif // !ROCKSDB_LITE +CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, + int level, int base_level, + const bool enable_compression = true); + } // namespace rocksdb diff --git a/db/db_impl.cc b/db/db_impl.cc index a37b660c1..9d729d22a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2269,6 +2269,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, c->num_input_files(0)); *madeProgress = true; } else if (!is_manual && c->IsTrivialMove()) { + TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove"); // Instrument for event update // TODO(yhchiang): add op details for showing trivial-move. ThreadStatusUtil::SetColumnFamily(c->column_family_data()); @@ -2302,6 +2303,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, // Clear Instrument ThreadStatusUtil::ResetThreadStatus(); } else { + TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial"); auto yield_callback = [&]() { return CallFlushDuringCompaction(c->column_family_data(), *c->mutable_cf_options(), job_context, diff --git a/db/db_test.cc b/db/db_test.cc index d2f455c15..312f92a02 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -12281,6 +12281,107 @@ TEST_F(DBTest, EmptyCompactedDB) { Close(); } +TEST_F(DBTest, CompressLevelCompaction) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleLevel; + options.write_buffer_size = 100 << 10; // 100KB + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 4; + options.max_bytes_for_level_base = 400 * 1024; + // First two levels have no compression, so that a trivial move between + // them will be allowed. Level 2 has Zlib compression so that a trivial + // move to level 3 will not be allowed + options.compression_per_level = {kNoCompression, kNoCompression, + kZlibCompression}; + int matches = 0, didnt_match = 0, trivial_move = 0, non_trivial = 0; + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "Compaction::InputCompressionMatchesOutput:Matches", + [&]() { matches++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "Compaction::InputCompressionMatchesOutput:DidntMatch", + [&]() { didnt_match++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial", [&]() { non_trivial++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:TrivialMove", [&]() { trivial_move++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Reopen(options); + + Random rnd(301); + int key_idx = 0; + + // First three 110KB files are going to level 0 + // After that, (100K, 200K) + for (int num = 0; num < 3; num++) { + GenerateNewFile(&rnd, &key_idx); + } + + // Another 110KB triggers a compaction to 400K file to fill up level 0 + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ(4, GetSstFileCount(dbname_)); + + // (1, 4) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4", FilesPerLevel(0)); + + // (1, 4, 1) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,1", FilesPerLevel(0)); + + // (1, 4, 2) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,2", FilesPerLevel(0)); + + // (1, 4, 3) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,3", FilesPerLevel(0)); + + // (1, 4, 4) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,4", FilesPerLevel(0)); + + // (1, 4, 5) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,5", FilesPerLevel(0)); + + // (1, 4, 6) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,6", FilesPerLevel(0)); + + // (1, 4, 7) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,7", FilesPerLevel(0)); + + // (1, 4, 8) + GenerateNewFile(&rnd, &key_idx); + ASSERT_EQ("1,4,8", FilesPerLevel(0)); + + ASSERT_EQ(matches, 12); + ASSERT_EQ(didnt_match, 8); + ASSERT_EQ(trivial_move, 12); + ASSERT_EQ(non_trivial, 8); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + for (int i = 0; i < key_idx; i++) { + auto v = Get(Key(i)); + ASSERT_NE(v, "NOT_FOUND"); + ASSERT_TRUE(v.size() == 1 || v.size() == 10000); + } + + Reopen(options); + + for (int i = 0; i < key_idx; i++) { + auto v = Get(Key(i)); + ASSERT_NE(v, "NOT_FOUND"); + ASSERT_TRUE(v.size() == 1 || v.size() == 10000); + } + + Destroy(options); +} + } // namespace rocksdb int main(int argc, char** argv) {