diff --git a/db/db_test.cc b/db/db_test.cc index fb18bdb8f..6223c3591 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11243,13 +11243,23 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { DestroyAndReopen(options); // When base level is L4, L4 is LZ4. + std::atomic num_zlib(0); + std::atomic num_lz4(0); + std::atomic num_no(0); rocksdb::SyncPoint::GetInstance()->SetCallBack( "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { Compaction* compaction = reinterpret_cast(arg); if (compaction->output_level() == 4) { ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression); + num_lz4.fetch_add(1); } }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table:output_compression", [&](void* arg) { + auto* compression = reinterpret_cast(arg); + ASSERT_TRUE(*compression == kNoCompression); + num_no.fetch_add(1); + }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); for (int i = 0; i < 100; i++) { @@ -11264,18 +11274,31 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { ASSERT_EQ(NumTableFilesAtLevel(2), 0); ASSERT_EQ(NumTableFilesAtLevel(3), 0); ASSERT_GT(NumTableFilesAtLevel(4), 0); + ASSERT_GT(num_no.load(), 2); + ASSERT_GT(num_lz4.load(), 0); int prev_num_files_l4 = NumTableFilesAtLevel(4); // After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib + num_lz4.store(0); + num_no.store(0); rocksdb::SyncPoint::GetInstance()->SetCallBack( "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { Compaction* compaction = reinterpret_cast(arg); if (compaction->output_level() == 4 && compaction->start_level() == 3) { ASSERT_TRUE(compaction->OutputCompressionType() == kZlibCompression); + num_zlib.fetch_add(1); } else { ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression); + num_lz4.fetch_add(1); } }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table:output_compression", [&](void* arg) { + auto* compression = reinterpret_cast(arg); + ASSERT_TRUE(*compression == kNoCompression); + num_no.fetch_add(1); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); for (int i = 101; i < 500; i++) { ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200))); @@ -11291,6 +11314,9 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { ASSERT_EQ(NumTableFilesAtLevel(2), 0); ASSERT_GT(NumTableFilesAtLevel(3), 0); ASSERT_GT(NumTableFilesAtLevel(4), prev_num_files_l4); + ASSERT_GT(num_no.load(), 2); + ASSERT_GT(num_lz4.load(), 0); + ASSERT_GT(num_zlib.load(), 0); } TEST_F(DBTest, DynamicCompactionOptions) { diff --git a/db/flush_job.cc b/db/flush_job.cc index 1dceee603..1a304fa9c 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -180,6 +180,8 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started", cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber()); + TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", + &output_compression_); s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_, cfd_->table_cache(), iter.get(), &meta, cfd_->internal_comparator(),