From ce2c11d848db3ab02a4c847007d755155c88baed Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 14 Sep 2022 21:59:56 -0700 Subject: [PATCH] Fix a bug by setting up subcompaction bounds properly (#10658) Summary: When user-defined timestamp is enabled, subcompaction bounds should be set up properly. When creating InputIterator for the compaction, the `start` and `end` should have their timestamp portions set to kMaxTimestamp, which is the highest possible timestamp. This is similar to what we do with setting up their sequence numbers to `kMaxSequenceNumber`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10658 Test Plan: ```bash make check rm -rf /dev/shm/rocksdb/* && mkdir /dev/shm/rocksdb/rocksdb_crashtest_expected && ./db_stress --allow_data_in_errors=True --clear_column_family_one_in=0 --continuous_verification_interval=0 --data_block_index_type=1 --db=/dev/shm/rocksdb//rocksdb_crashtest_blackbox --delpercent=5 --delrangepercent=0 --expected_values_dir=/dev/shm/rocksdb//rocksdb_crashtest_expected --iterpercent=0 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=25000000 --max_write_batch_group_size_bytes=1048576 --nooverwritepercent=1 --ops_per_thread=300000 --paranoid_file_checks=1 --partition_filters=0 --prefix_size=8 --prefixpercent=5 --readpercent=30 --reopen=0 --snapshot_hold_ops=100000 --subcompactions=4 --target_file_size_base=65536 --target_file_size_multiplier=2 --test_batches_snapshots=0 --test_cf_consistency=0 --use_multiget=1 --user_timestamp_size=8 --value_size_mult=32 --verify_checksum=1 --write_buffer_size=65536 --writepercent=60 -disable_wal=1 -column_families=1 ``` Reviewed By: akankshamahajan15 Differential Revision: D39393402 Pulled By: riversand963 fbshipit-source-id: f276e35b19fce51a175c368a502fb0718d1f3871 --- db/compaction/compaction_job.cc | 31 +- db/compaction/compaction_job_test.cc | 444 +++++++++++++++++++++------ db/version_set.cc | 8 +- 3 files changed, 373 insertions(+), 110 deletions(-) diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 39550e212..15acde694 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -254,7 +254,7 @@ void CompactionJob::Prepare() { // timestamp part). assert(i == 0 || i == boundaries_.size() || cfd->user_comparator()->CompareWithoutTimestamp( - boundaries_[i - 1], true, boundaries_[i], true) < 0); + boundaries_[i - 1], boundaries_[i]) < 0); } RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED, compact_->sub_compact_states.size()); @@ -486,8 +486,8 @@ void CompactionJob::GenSubcompactionBoundaries() { std::sort( all_anchors.begin(), all_anchors.end(), [cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool { - return cfd_comparator->CompareWithoutTimestamp(a.user_key, true, - b.user_key, true) < 0; + return cfd_comparator->CompareWithoutTimestamp(a.user_key, b.user_key) < + 0; }); // Remove duplicated entries from boundaries. @@ -496,7 +496,7 @@ void CompactionJob::GenSubcompactionBoundaries() { [cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool { return cfd_comparator->CompareWithoutTimestamp( - a.user_key, true, b.user_key, true) == 0; + a.user_key, b.user_key) == 0; }), all_anchors.end()); @@ -1085,13 +1085,34 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { Slice start_slice; Slice end_slice; + static constexpr char kMaxTs[] = + "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"; + Slice ts_slice; + std::string max_ts; + if (ts_sz > 0) { + if (ts_sz <= strlen(kMaxTs)) { + ts_slice = Slice(kMaxTs, ts_sz); + } else { + max_ts = std::string(ts_sz, '\xff'); + ts_slice = Slice(max_ts); + } + } + if (start.has_value()) { start_ikey.SetInternalKey(start.value(), kMaxSequenceNumber, kValueTypeForSeek); + if (ts_sz > 0) { + start_ikey.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeek, + &ts_slice); + } start_slice = start_ikey.GetInternalKey(); } if (end.has_value()) { end_ikey.SetInternalKey(end.value(), kMaxSequenceNumber, kValueTypeForSeek); + if (ts_sz > 0) { + end_ikey.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeek, + &ts_slice); + } end_slice = end_ikey.GetInternalKey(); } @@ -1112,7 +1133,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } std::unique_ptr trim_history_iter; - if (cfd->user_comparator()->timestamp_size() > 0 && !trim_ts_.empty()) { + if (ts_sz > 0 && !trim_ts_.empty()) { trim_history_iter = std::make_unique( input, cfd->user_comparator(), trim_ts_); input = trim_history_iter.get(); diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 79b7f7be2..311d29335 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -19,7 +19,9 @@ #include "db/db_impl/db_impl.h" #include "db/error_handler.h" #include "db/version_set.h" +#include "file/random_access_file_reader.h" #include "file/writable_file_writer.h" +#include "options/options_helper.h" #include "rocksdb/cache.h" #include "rocksdb/convenience.h" #include "rocksdb/db.h" @@ -194,13 +196,15 @@ class MockTestFileSystem : public FileSystemWrapper { Env::IOPriority write_io_priority_; }; +enum TableTypeForTest : uint8_t { kMockTable = 0, kBlockBasedTable = 1 }; + } // namespace class CompactionJobTestBase : public testing::Test { protected: CompactionJobTestBase(std::string dbname, const Comparator* ucmp, std::function encode_u64_ts, - bool test_io_priority) + bool test_io_priority, TableTypeForTest table_type) : dbname_(std::move(dbname)), ucmp_(ucmp), db_options_(), @@ -217,7 +221,8 @@ class CompactionJobTestBase : public testing::Test { mock_table_factory_(new mock::MockTableFactory()), error_handler_(nullptr, db_options_, &mutex_), encode_u64_ts_(std::move(encode_u64_ts)), - test_io_priority_(test_io_priority) { + test_io_priority_(test_io_priority), + table_type_(table_type) { Env* base_env = Env::Default(); EXPECT_OK( test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_)); @@ -232,11 +237,13 @@ class CompactionJobTestBase : public testing::Test { db_options_.db_paths.emplace_back(dbname_, std::numeric_limits::max()); cf_options_.comparator = ucmp_; - if (test_io_priority_) { + if (table_type_ == TableTypeForTest::kBlockBasedTable) { BlockBasedTableOptions table_options; cf_options_.table_factory.reset(NewBlockBasedTableFactory(table_options)); - } else { + } else if (table_type_ == TableTypeForTest::kMockTable) { cf_options_.table_factory = mock_table_factory_; + } else { + assert(false); } } @@ -358,13 +365,15 @@ class CompactionJobTestBase : public testing::Test { uint64_t file_number = versions_->NewFileNumber(); - uint64_t file_size; - if (test_io_priority_) { + uint64_t file_size = 0; + if (table_type_ == TableTypeForTest::kBlockBasedTable) { CreateTable(GenerateFileName(file_number), contents, file_size); - } else { + } else if (table_type_ == TableTypeForTest::kMockTable) { file_size = 10; EXPECT_OK(mock_table_factory_->CreateMockTable( env_, GenerateFileName(file_number), std::move(contents))); + } else { + assert(false); } VersionEdit edit; @@ -381,6 +390,83 @@ class CompactionJobTestBase : public testing::Test { mutex_.Unlock(); } + void VerifyTables(int output_level, + const std::vector& expected_results, + std::vector expected_oldest_blob_file_numbers) { + if (expected_results.empty()) { + ASSERT_EQ(compaction_job_stats_.num_output_files, 0U); + return; + } + int expected_output_file_num = 0; + for (const auto& e : expected_results) { + if (!e.empty()) { + ++expected_output_file_num; + } + } + ASSERT_EQ(expected_output_file_num, compaction_job_stats_.num_output_files); + if (expected_output_file_num == 0) { + return; + } + + if (expected_oldest_blob_file_numbers.empty()) { + expected_oldest_blob_file_numbers.resize(expected_output_file_num, + kInvalidBlobFileNumber); + } + + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + if (table_type_ == TableTypeForTest::kMockTable) { + assert(expected_results.size() == 1); + mock_table_factory_->AssertLatestFile(expected_results[0]); + } else { + assert(table_type_ == TableTypeForTest::kBlockBasedTable); + } + + auto output_files = + cfd->current()->storage_info()->LevelFiles(output_level); + ASSERT_EQ(expected_output_file_num, output_files.size()); + + if (table_type_ == TableTypeForTest::kMockTable) { + assert(output_files.size() == 1); + const FileMetaData* const output_file = output_files[0]; + ASSERT_EQ(output_file->oldest_blob_file_number, + expected_oldest_blob_file_numbers[0]); + return; + } + + for (size_t i = 0; i < expected_results.size(); ++i) { + const FileMetaData* const output_file = output_files[i]; + std::string file_name = GenerateFileName(output_file->fd.GetNumber()); + const auto& fs = env_->GetFileSystem(); + std::unique_ptr freader; + IOStatus ios = RandomAccessFileReader::Create( + fs, file_name, FileOptions(), &freader, nullptr); + ASSERT_OK(ios); + std::unique_ptr table_reader; + uint64_t file_size = output_file->fd.GetFileSize(); + ReadOptions read_opts; + Status s = cf_options_.table_factory->NewTableReader( + read_opts, + TableReaderOptions(*cfd->ioptions(), nullptr, FileOptions(), + cfd_->internal_comparator()), + std::move(freader), file_size, &table_reader, false); + ASSERT_OK(s); + assert(table_reader); + std::unique_ptr iiter( + table_reader->NewIterator(read_opts, nullptr, nullptr, true, + TableReaderCaller::kUncategorized)); + assert(iiter); + + mock::KVVector from_db; + for (iiter->SeekToFirst(); iiter->Valid(); iiter->Next()) { + const Slice key = iiter->key(); + const Slice value = iiter->value(); + from_db.emplace_back( + make_pair(key.ToString(false), value.ToString(false))); + } + ASSERT_EQ(expected_results[i], from_db); + } + } + void SetLastSequence(const SequenceNumber sequence_number) { versions_->SetLastAllocatedSequence(sequence_number + 1); versions_->SetLastPublishedSequence(sequence_number + 1); @@ -439,6 +525,13 @@ class CompactionJobTestBase : public testing::Test { void NewDB() { EXPECT_OK(DestroyDB(dbname_, Options())); EXPECT_OK(env_->CreateDirIfMissing(dbname_)); + + std::shared_ptr info_log; + DBOptions db_opts = BuildDBOptions(db_options_, mutable_db_options_); + Status s = CreateLoggerFromOptions(dbname_, db_opts, &info_log); + ASSERT_OK(s); + db_options_.info_log = info_log; + versions_.reset( new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, @@ -455,9 +548,9 @@ class CompactionJobTestBase : public testing::Test { const std::string manifest = DescriptorFileName(dbname_, 1); std::unique_ptr file_writer; const auto& fs = env_->GetFileSystem(); - Status s = WritableFileWriter::Create( - fs, manifest, fs->OptimizeForManifestWrite(env_options_), &file_writer, - nullptr); + s = WritableFileWriter::Create(fs, manifest, + fs->OptimizeForManifestWrite(env_options_), + &file_writer, nullptr); ASSERT_OK(s); { @@ -481,27 +574,32 @@ class CompactionJobTestBase : public testing::Test { cfd_ = versions_->GetColumnFamilySet()->GetDefault(); } + // input_files[i] on input_levels[i] void RunLastLevelCompaction( const std::vector>& input_files, + const std::vector input_levels, std::function&& verify_func, const std::vector& snapshots = {}) { const int kLastLevel = cf_options_.num_levels - 1; verify_per_key_placement_ = std::move(verify_func); mock::KVVector empty_map; - RunCompaction(input_files, empty_map, snapshots, kMaxSequenceNumber, - kLastLevel, false); + RunCompaction(input_files, input_levels, {empty_map}, snapshots, + kMaxSequenceNumber, kLastLevel, false); } + // input_files[i] on input_levels[i] void RunCompaction( const std::vector>& input_files, - const mock::KVVector& expected_results, + const std::vector& input_levels, + const std::vector& expected_results, const std::vector& snapshots = {}, SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber, int output_level = 1, bool verify = true, - uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber, + std::vector expected_oldest_blob_file_numbers = {}, bool check_get_priority = false, Env::IOPriority read_io_priority = Env::IO_TOTAL, - Env::IOPriority write_io_priority = Env::IO_TOTAL) { + Env::IOPriority write_io_priority = Env::IO_TOTAL, + int max_subcompactions = 0) { // For compaction, set fs as MockTestFileSystem to check the io_priority. if (test_io_priority_) { db_options_.fs.reset( @@ -512,10 +610,10 @@ class CompactionJobTestBase : public testing::Test { size_t num_input_files = 0; std::vector compaction_input_files; - for (size_t level = 0; level < input_files.size(); level++) { - auto level_files = input_files[level]; + for (size_t i = 0; i < input_files.size(); ++i) { + auto level_files = input_files[i]; CompactionInputFiles compaction_level; - compaction_level.level = static_cast(level); + compaction_level.level = input_levels[i]; compaction_level.files.insert(compaction_level.files.end(), level_files.begin(), level_files.end()); compaction_input_files.push_back(compaction_level); @@ -527,9 +625,10 @@ class CompactionJobTestBase : public testing::Test { *cfd->GetLatestMutableCFOptions(), mutable_db_options_, compaction_input_files, output_level, 1024 * 1024, 10 * 1024 * 1024, 0, kNoCompression, cfd->GetLatestMutableCFOptions()->compression_opts, - Temperature::kUnknown, 0, {}, true); + Temperature::kUnknown, max_subcompactions, {}, true); compaction.SetInputVersion(cfd->current()); + assert(db_options_.info_log); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); mutex_.Lock(); EventLogger event_logger(db_options_.info_log.get()); @@ -559,23 +658,14 @@ class CompactionJobTestBase : public testing::Test { ASSERT_OK(compaction_job.Install(*cfd->GetLatestMutableCFOptions())); ASSERT_OK(compaction_job.io_status()); mutex_.Unlock(); + log_buffer.FlushBufferToLog(); if (verify) { ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U); ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files); - if (expected_results.empty()) { - ASSERT_EQ(compaction_job_stats_.num_output_files, 0U); - } else { - ASSERT_EQ(compaction_job_stats_.num_output_files, 1U); - mock_table_factory_->AssertLatestFile(expected_results); - - auto output_files = - cfd->current()->storage_info()->LevelFiles(output_level); - ASSERT_EQ(output_files.size(), 1); - ASSERT_EQ(output_files[0]->oldest_blob_file_number, - expected_oldest_blob_file_number); - } + VerifyTables(output_level, expected_results, + expected_oldest_blob_file_numbers); } if (check_get_priority) { @@ -635,8 +725,9 @@ class CompactionJobTestBase : public testing::Test { ErrorHandler error_handler_; std::string full_history_ts_low_; const std::function encode_u64_ts_; - bool test_io_priority_; + const bool test_io_priority_; std::function verify_per_key_placement_; + const TableTypeForTest table_type_ = kMockTable; }; // TODO(icanadi) Make it simpler once we mock out VersionSet @@ -645,7 +736,8 @@ class CompactionJobTest : public CompactionJobTestBase { CompactionJobTest() : CompactionJobTestBase( test::PerThreadDBPath("compaction_job_test"), BytewiseComparator(), - [](uint64_t /*ts*/) { return ""; }, false) {} + [](uint64_t /*ts*/) { return ""; }, /*test_io_priority=*/false, + TableTypeForTest::kMockTable) {} }; TEST_F(CompactionJobTest, Simple) { @@ -653,9 +745,10 @@ TEST_F(CompactionJobTest, Simple) { auto expected_results = CreateTwoFiles(false); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); - auto files = cfd->current()->storage_info()->LevelFiles(0); + constexpr int input_level = 0; + auto files = cfd->current()->storage_info()->LevelFiles(input_level); ASSERT_EQ(2U, files.size()); - RunCompaction({ files }, expected_results); + RunCompaction({files}, {input_level}, {expected_results}); } TEST_F(CompactionJobTest, DISABLED_SimpleCorrupted) { @@ -663,8 +756,9 @@ TEST_F(CompactionJobTest, DISABLED_SimpleCorrupted) { auto expected_results = CreateTwoFiles(true); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); - auto files = cfd->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results); + constexpr int input_level = 0; + auto files = cfd->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}); ASSERT_EQ(compaction_job_stats_.num_corrupt_keys, 400U); } @@ -683,8 +777,9 @@ TEST_F(CompactionJobTest, SimpleDeletion) { mock::MakeMockFile({{KeyStr("b", 0U, kTypeValue), "val"}}); SetLastSequence(4U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}); } TEST_F(CompactionJobTest, OutputNothing) { @@ -701,8 +796,10 @@ TEST_F(CompactionJobTest, OutputNothing) { auto expected_results = mock::MakeMockFile(); SetLastSequence(4U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results); + + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}); } TEST_F(CompactionJobTest, SimpleOverwrite) { @@ -723,8 +820,9 @@ TEST_F(CompactionJobTest, SimpleOverwrite) { {KeyStr("b", 0U, kTypeValue), "val3"}}); SetLastSequence(4U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}); } TEST_F(CompactionJobTest, SimpleNonLastLevel) { @@ -751,9 +849,12 @@ TEST_F(CompactionJobTest, SimpleNonLastLevel) { {KeyStr("b", 6U, kTypeValue), "val3"}}); SetLastSequence(6U); - auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0); - auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1); - RunCompaction({lvl0_files, lvl1_files}, expected_results); + const std::vector input_levels = {0, 1}; + auto lvl0_files = + cfd_->current()->storage_info()->LevelFiles(input_levels[0]); + auto lvl1_files = + cfd_->current()->storage_info()->LevelFiles(input_levels[1]); + RunCompaction({lvl0_files, lvl1_files}, input_levels, {expected_results}); } TEST_F(CompactionJobTest, SimpleMerge) { @@ -776,8 +877,9 @@ TEST_F(CompactionJobTest, SimpleMerge) { {KeyStr("b", 0U, kTypeValue), "1,2"}}); SetLastSequence(5U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}); } TEST_F(CompactionJobTest, NonAssocMerge) { @@ -800,8 +902,9 @@ TEST_F(CompactionJobTest, NonAssocMerge) { {KeyStr("b", 0U, kTypeValue), "1,2"}}); SetLastSequence(5U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}); } // Filters merge operands with value 10. @@ -827,8 +930,9 @@ TEST_F(CompactionJobTest, MergeOperandFilter) { {KeyStr("b", 0U, kTypeValue), test::EncodeInt(2U)}}); SetLastSequence(5U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}); } TEST_F(CompactionJobTest, FilterSomeMergeOperands) { @@ -863,8 +967,9 @@ TEST_F(CompactionJobTest, FilterSomeMergeOperands) { }); SetLastSequence(5U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}); } // Test where all operands/merge results are filtered out. @@ -897,10 +1002,11 @@ TEST_F(CompactionJobTest, FilterAllMergeOperands) { AddMockFile(file3, 2); SetLastSequence(11U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); mock::KVVector empty_map; - RunCompaction({files}, empty_map); + RunCompaction({files}, {input_level}, {empty_map}); } TEST_F(CompactionJobTest, SimpleSingleDelete) { @@ -925,8 +1031,9 @@ TEST_F(CompactionJobTest, SimpleSingleDelete) { mock::MakeMockFile({{KeyStr("a", 5U, kTypeDeletion), ""}}); SetLastSequence(6U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}); } TEST_F(CompactionJobTest, SingleDeleteSnapshots) { @@ -990,8 +1097,9 @@ TEST_F(CompactionJobTest, SingleDeleteSnapshots) { }); SetLastSequence(22U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results, {10U, 20U}, 10U); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}, {10U, 20U}, 10U); } TEST_F(CompactionJobTest, EarliestWriteConflictSnapshot) { @@ -1068,8 +1176,10 @@ TEST_F(CompactionJobTest, EarliestWriteConflictSnapshot) { }); SetLastSequence(24U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results, {10U, 20U, 30U}, 20U); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}, {10U, 20U, 30U}, + 20U); } TEST_F(CompactionJobTest, SingleDeleteZeroSeq) { @@ -1091,8 +1201,9 @@ TEST_F(CompactionJobTest, SingleDeleteZeroSeq) { }); SetLastSequence(22U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results, {}); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}, {}); } TEST_F(CompactionJobTest, MultiSingleDelete) { @@ -1246,8 +1357,9 @@ TEST_F(CompactionJobTest, MultiSingleDelete) { {KeyStr("M", 3U, kTypeSingleDeletion), ""}}); SetLastSequence(22U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results, {10U}, 10U); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}, {10U}, 10U); } // This test documents the behavior where a corrupt key follows a deletion or a @@ -1276,8 +1388,9 @@ TEST_F(CompactionJobTest, DISABLED_CorruptionAfterDeletion) { {test::KeyStr("c", 0U, kTypeValue), "val2"}}); SetLastSequence(6U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}); } TEST_F(CompactionJobTest, OldestBlobFileNumber) { @@ -1322,10 +1435,12 @@ TEST_F(CompactionJobTest, OldestBlobFileNumber) { expected_blob4, expected_blob5, expected_blob6}); SetLastSequence(6U); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results, std::vector(), - kMaxSequenceNumber, /* output_level */ 1, /* verify */ true, - /* expected_oldest_blob_file_number */ 19); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}, + std::vector(), kMaxSequenceNumber, + /* output_level */ 1, /* verify */ true, + /* expected_oldest_blob_file_numbers */ {19}); } TEST_F(CompactionJobTest, VerifyPenultimateLevelOutput) { @@ -1377,13 +1492,15 @@ TEST_F(CompactionJobTest, VerifyPenultimateLevelOutput) { AddMockFile(file3_2, 3); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); - auto files0 = cfd->current()->storage_info()->LevelFiles(0); - auto files1 = cfd->current()->storage_info()->LevelFiles(1); - auto files2 = cfd->current()->storage_info()->LevelFiles(2); - auto files3 = cfd->current()->storage_info()->LevelFiles(3); + const std::vector input_levels = {0, 1, 2, 3}; + auto files0 = cfd->current()->storage_info()->LevelFiles(input_levels[0]); + auto files1 = cfd->current()->storage_info()->LevelFiles(input_levels[1]); + auto files2 = cfd->current()->storage_info()->LevelFiles(input_levels[2]); + auto files3 = cfd->current()->storage_info()->LevelFiles(input_levels[3]); RunLastLevelCompaction( - {files0, files1, files2, files3}, /*verify_func=*/[&](Compaction& comp) { + {files0, files1, files2, files3}, input_levels, + /*verify_func=*/[&](Compaction& comp) { for (char c = 'a'; c <= 'z'; c++) { std::string c_str; c_str = c; @@ -1408,8 +1525,9 @@ TEST_F(CompactionJobTest, NoEnforceSingleDeleteContract) { SetLastSequence(4U); auto expected_results = mock::MakeMockFile(); - auto files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results); + constexpr int input_level = 0; + auto files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}); } TEST_F(CompactionJobTest, InputSerialization) { @@ -1609,7 +1727,8 @@ class CompactionJobTimestampTest : public CompactionJobTestBase { CompactionJobTimestampTest() : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"), test::BytewiseComparatorWithU64TsWrapper(), - test::EncodeInt, false) {} + test::EncodeInt, /*test_io_priority=*/false, + TableTypeForTest::kMockTable) {} }; TEST_F(CompactionJobTimestampTest, GCDisabled) { @@ -1641,8 +1760,9 @@ TEST_F(CompactionJobTimestampTest, GCDisabled) { {KeyStr("c", 4, ValueType::kTypeValue, 94), "c5"}, {KeyStr("d", 7, ValueType::kTypeValue, 97), "d7"}, {KeyStr("d", 3, ValueType::kTypeSingleDeletion, 93), ""}}); - const auto& files = cfd_->current()->storage_info()->LevelFiles(0); - RunCompaction({files}, expected_results); + constexpr int input_level = 0; + const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level); + RunCompaction({files}, {input_level}, {expected_results}); } TEST_F(CompactionJobTimestampTest, NoKeyExpired) { @@ -1667,10 +1787,11 @@ TEST_F(CompactionJobTimestampTest, NoKeyExpired) { {KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"}, {KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"}, {KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}}); - const auto& files = cfd_->current()->storage_info()->LevelFiles(0); + constexpr int input_level = 0; + const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level); full_history_ts_low_ = encode_u64_ts_(0); - RunCompaction({files}, expected_results); + RunCompaction({files}, {input_level}, {expected_results}); } TEST_F(CompactionJobTimestampTest, AllKeysExpired) { @@ -1693,10 +1814,11 @@ TEST_F(CompactionJobTimestampTest, AllKeysExpired) { auto expected_results = mock::MakeMockFile({{KeyStr("c", 0, ValueType::kTypeValue, 0), "c7"}}); - const auto& files = cfd_->current()->storage_info()->LevelFiles(0); + constexpr int input_level = 0; + const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level); full_history_ts_low_ = encode_u64_ts_(std::numeric_limits::max()); - RunCompaction({files}, expected_results); + RunCompaction({files}, {input_level}, {expected_results}); } TEST_F(CompactionJobTimestampTest, SomeKeysExpired) { @@ -1719,10 +1841,121 @@ TEST_F(CompactionJobTimestampTest, SomeKeysExpired) { mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"}, {KeyStr("a", 0, ValueType::kTypeValue, 0), "a3"}, {KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}}); - const auto& files = cfd_->current()->storage_info()->LevelFiles(0); + constexpr int input_level = 0; + const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level); full_history_ts_low_ = encode_u64_ts_(49); - RunCompaction({files}, expected_results); + RunCompaction({files}, {input_level}, {expected_results}); +} + +class CompactionJobTimestampTestWithBbTable : public CompactionJobTestBase { + public: + // Block-based table is needed if we want to test subcompaction partitioning + // with anchors. + explicit CompactionJobTimestampTestWithBbTable() + : CompactionJobTestBase( + test::PerThreadDBPath("compaction_job_ts_bbt_test"), + test::BytewiseComparatorWithU64TsWrapper(), test::EncodeInt, + /*test_io_priority=*/false, TableTypeForTest::kBlockBasedTable) {} +}; + +TEST_F(CompactionJobTimestampTestWithBbTable, SubcompactionAnchorL1) { + cf_options_.target_file_size_base = 20; + mutable_cf_options_.target_file_size_base = 20; + NewDB(); + + const std::vector keys = { + KeyStr("a", 20, ValueType::kTypeValue, 200), + KeyStr("b", 21, ValueType::kTypeValue, 210), + KeyStr("b", 20, ValueType::kTypeValue, 200), + KeyStr("b", 18, ValueType::kTypeValue, 180), + KeyStr("c", 17, ValueType::kTypeValue, 170), + KeyStr("c", 16, ValueType::kTypeValue, 160), + KeyStr("c", 15, ValueType::kTypeValue, 150)}; + const std::vector values = {"a20", "b21", "b20", "b18", + "c17", "c16", "c15"}; + + constexpr int input_level = 1; + + auto file1 = mock::MakeMockFile( + {{keys[0], values[0]}, {keys[1], values[1]}, {keys[2], values[2]}}); + AddMockFile(file1, input_level); + + auto file2 = mock::MakeMockFile( + {{keys[3], values[3]}, {keys[4], values[4]}, {keys[5], values[5]}}); + AddMockFile(file2, input_level); + + auto file3 = mock::MakeMockFile({{keys[6], values[6]}}); + AddMockFile(file3, input_level); + + SetLastSequence(20); + + auto output1 = mock::MakeMockFile({{keys[0], values[0]}}); + auto output2 = mock::MakeMockFile( + {{keys[1], values[1]}, {keys[2], values[2]}, {keys[3], values[3]}}); + auto output3 = mock::MakeMockFile( + {{keys[4], values[4]}, {keys[5], values[5]}, {keys[6], values[6]}}); + + auto expected_results = + std::vector{output1, output2, output3}; + const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level); + + constexpr int output_level = 2; + constexpr int max_subcompactions = 4; + RunCompaction({files}, {input_level}, expected_results, /*snapshots=*/{}, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + output_level, /*verify=*/true, {kInvalidBlobFileNumber}, + /*check_get_priority=*/false, Env::IO_TOTAL, Env::IO_TOTAL, + max_subcompactions); +} + +TEST_F(CompactionJobTimestampTestWithBbTable, SubcompactionL0) { + cf_options_.target_file_size_base = 20; + mutable_cf_options_.target_file_size_base = 20; + NewDB(); + + const std::vector keys = { + KeyStr("a", 20, ValueType::kTypeValue, 200), + KeyStr("b", 20, ValueType::kTypeValue, 200), + KeyStr("b", 19, ValueType::kTypeValue, 190), + KeyStr("b", 18, ValueType::kTypeValue, 180), + KeyStr("c", 17, ValueType::kTypeValue, 170), + KeyStr("c", 16, ValueType::kTypeValue, 160), + KeyStr("c", 15, ValueType::kTypeValue, 150)}; + const std::vector values = {"a20", "b20", "b19", "b18", + "c17", "c16", "c15"}; + + constexpr int input_level = 0; + + auto file1 = mock::MakeMockFile({{keys[5], values[5]}, {keys[6], values[6]}}); + AddMockFile(file1, input_level); + + auto file2 = mock::MakeMockFile({{keys[3], values[3]}, {keys[4], values[4]}}); + AddMockFile(file2, input_level); + + auto file3 = mock::MakeMockFile( + {{keys[0], values[0]}, {keys[1], values[1]}, {keys[2], values[2]}}); + AddMockFile(file3, input_level); + + SetLastSequence(20); + + auto output1 = mock::MakeMockFile({{keys[0], values[0]}}); + auto output2 = mock::MakeMockFile( + {{keys[1], values[1]}, {keys[2], values[2]}, {keys[3], values[3]}}); + auto output3 = mock::MakeMockFile( + {{keys[4], values[4]}, {keys[5], values[5]}, {keys[6], values[6]}}); + + auto expected_results = + std::vector{output1, output2, output3}; + const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level); + + constexpr int output_level = 1; + constexpr int max_subcompactions = 4; + RunCompaction({files}, {input_level}, expected_results, /*snapshots=*/{}, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + output_level, /*verify=*/true, {kInvalidBlobFileNumber}, + /*check_get_priority=*/false, Env::IO_TOTAL, Env::IO_TOTAL, + max_subcompactions); } // The io priority of the compaction reads and writes are different from @@ -1734,7 +1967,8 @@ class CompactionJobIOPriorityTest : public CompactionJobTestBase { CompactionJobIOPriorityTest() : CompactionJobTestBase( test::PerThreadDBPath("compaction_job_io_priority_test"), - BytewiseComparator(), [](uint64_t /*ts*/) { return ""; }, true) {} + BytewiseComparator(), [](uint64_t /*ts*/) { return ""; }, + /*test_io_priority=*/true, TableTypeForTest::kBlockBasedTable) {} }; TEST_F(CompactionJobIOPriorityTest, WriteControllerStateNormal) { @@ -1742,10 +1976,12 @@ TEST_F(CompactionJobIOPriorityTest, WriteControllerStateNormal) { NewDB(); mock::KVVector expected_results = CreateTwoFiles(false); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); - auto files = cfd->current()->storage_info()->LevelFiles(0); + constexpr int input_level = 0; + auto files = cfd->current()->storage_info()->LevelFiles(input_level); ASSERT_EQ(2U, files.size()); - RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false, - kInvalidBlobFileNumber, false, Env::IO_LOW, Env::IO_LOW); + RunCompaction({files}, {input_level}, {expected_results}, {}, + kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false, + Env::IO_LOW, Env::IO_LOW); } TEST_F(CompactionJobIOPriorityTest, WriteControllerStateDelayed) { @@ -1753,13 +1989,15 @@ TEST_F(CompactionJobIOPriorityTest, WriteControllerStateDelayed) { NewDB(); mock::KVVector expected_results = CreateTwoFiles(false); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); - auto files = cfd->current()->storage_info()->LevelFiles(0); + constexpr int input_level = 0; + auto files = cfd->current()->storage_info()->LevelFiles(input_level); ASSERT_EQ(2U, files.size()); { std::unique_ptr delay_token = write_controller_.GetDelayToken(1000000); - RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false, - kInvalidBlobFileNumber, false, Env::IO_USER, Env::IO_USER); + RunCompaction({files}, {input_level}, {expected_results}, {}, + kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false, + Env::IO_USER, Env::IO_USER); } } @@ -1768,13 +2006,15 @@ TEST_F(CompactionJobIOPriorityTest, WriteControllerStateStalled) { NewDB(); mock::KVVector expected_results = CreateTwoFiles(false); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); - auto files = cfd->current()->storage_info()->LevelFiles(0); + constexpr int input_level = 0; + auto files = cfd->current()->storage_info()->LevelFiles(input_level); ASSERT_EQ(2U, files.size()); { std::unique_ptr stop_token = write_controller_.GetStopToken(); - RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false, - kInvalidBlobFileNumber, false, Env::IO_USER, Env::IO_USER); + RunCompaction({files}, {input_level}, {expected_results}, {}, + kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false, + Env::IO_USER, Env::IO_USER); } } @@ -1782,10 +2022,12 @@ TEST_F(CompactionJobIOPriorityTest, GetRateLimiterPriority) { NewDB(); mock::KVVector expected_results = CreateTwoFiles(false); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); - auto files = cfd->current()->storage_info()->LevelFiles(0); + constexpr int input_level = 0; + auto files = cfd->current()->storage_info()->LevelFiles(input_level); ASSERT_EQ(2U, files.size()); - RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false, - kInvalidBlobFileNumber, true, Env::IO_LOW, Env::IO_LOW); + RunCompaction({files}, {input_level}, {expected_results}, {}, + kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, true, + Env::IO_LOW, Env::IO_LOW); } } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index 49f41d4b8..f2f8d8fb7 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -6433,16 +6433,16 @@ InternalIterator* VersionSet::MakeInputIterator( for (size_t i = 0; i < flevel->num_files; i++) { const FileMetaData& fmd = *flevel->files[i].file_metadata; if (start.has_value() && - cfd->user_comparator()->Compare(start.value(), - fmd.largest.user_key()) > 0) { + cfd->user_comparator()->CompareWithoutTimestamp( + start.value(), fmd.largest.user_key()) > 0) { continue; } // We should be able to filter out the case where the end key // equals to the end boundary, since the end key is exclusive. // We try to be extra safe here. if (end.has_value() && - cfd->user_comparator()->Compare(end.value(), - fmd.smallest.user_key()) < 0) { + cfd->user_comparator()->CompareWithoutTimestamp( + end.value(), fmd.smallest.user_key()) < 0) { continue; }