diff --git a/HISTORY.md b/HISTORY.md index 30e42b371..991e30f2a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * Charge memory usage of blob cache when the backing cache of the blob cache and the block cache are different. If an operation reserving memory for blob cache exceeds the avaible space left in the block cache at some point (i.e, causing a cache full under `LRUCacheOptions::strict_capacity_limit` = true), creation will fail with `Status::MemoryLimit()`. To opt in this feature, enable charging `CacheEntryRole::kBlobCache` in `BlockBasedTableOptions::cache_usage_options`. * Improve subcompaction range partition so that it is likely to be more even. More evenly distribution of subcompaction will improve compaction throughput for some workloads. All input files' index blocks to sample some anchor key points from which we pick positions to partition the input range. This would introduce some CPU overhead in compaction preparation phase, if subcompaction is enabled, but it should be a small fraction of the CPU usage of the whole compaction process. This also brings a behavier change: subcompaction number is much more likely to maxed out than before. * Add CompactionPri::kRoundRobin, a compaction picking mode that cycles through all the files with a compact cursor in a round-robin manner. This feature is available since 7.5. +* Provide support for subcompactions for user_defined_timestamp. ### Public API changes * Removed Customizable support for RateLimiter and removed its CreateFromString() and Type() functions. diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 0983bd70f..8b1e9aaef 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -664,13 +664,6 @@ bool Compaction::ShouldFormSubcompactions() const { return false; } - // Note: the subcompaction boundary picking logic does not currently guarantee - // that all user keys that differ only by timestamp get processed by the same - // subcompaction. - if (cfd_->user_comparator()->timestamp_size() > 0) { - return false; - } - // Round-Robin pri under leveled compaction allows subcompactions by default // and the number of subcompactions can be larger than max_subcompactions_ if (cfd_->ioptions()->compaction_pri == kRoundRobin && diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index ba20698c1..3f1dc5c5e 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -240,8 +240,8 @@ void CompactionJob::Prepare() { bottommost_level_ = c->bottommost_level(); if (c->ShouldFormSubcompactions()) { - StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME); - GenSubcompactionBoundaries(); + StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME); + GenSubcompactionBoundaries(); } if (boundaries_.size() > 1) { for (size_t i = 0; i <= boundaries_.size(); i++) { @@ -250,6 +250,11 @@ void CompactionJob::Prepare() { (i != boundaries_.size()) ? std::optional(boundaries_[i]) : std::nullopt, static_cast(i)); + // assert to validate that boundaries don't have same user keys (without + // timestamp part). + assert(i == 0 || i == boundaries_.size() || + cfd->user_comparator()->CompareWithoutTimestamp( + boundaries_[i - 1], true, boundaries_[i], true) < 0); } RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED, compact_->sub_compact_states.size()); @@ -479,9 +484,20 @@ void CompactionJob::GenSubcompactionBoundaries() { std::sort( all_anchors.begin(), all_anchors.end(), [cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool { - return cfd_comparator->Compare(a.user_key, b.user_key) < 0; + return cfd_comparator->CompareWithoutTimestamp(a.user_key, true, + b.user_key, true) < 0; }); + // Remove duplicated entries from boundaries. + all_anchors.erase( + std::unique(all_anchors.begin(), all_anchors.end(), + [cfd_comparator](TableReader::Anchor& a, + TableReader::Anchor& b) -> bool { + return cfd_comparator->CompareWithoutTimestamp( + a.user_key, true, b.user_key, true) == 0; + }), + all_anchors.end()); + // Get the number of planned subcompactions, may update reserve threads // and update extra_num_subcompaction_threads_reserved_ for round-robin uint64_t num_planned_subcompactions; @@ -1024,6 +1040,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { const std::optional start = sub_compact->start; const std::optional end = sub_compact->end; + std::optional start_without_ts; + std::optional end_without_ts; + ReadOptions read_options; read_options.verify_checksums = true; read_options.fill_cache = false; @@ -1034,15 +1053,22 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // (b) CompactionFilter::Decision::kRemoveAndSkipUntil. read_options.total_order_seek = true; - // Note: if we're going to support subcompactions for user-defined timestamps, - // the timestamp part will have to be stripped from the bounds here. - assert((!start.has_value() && !end.has_value()) || - cfd->user_comparator()->timestamp_size() == 0); + // Remove the timestamps from boundaries because boundaries created in + // GenSubcompactionBoundaries doesn't strip away the timestamp. + size_t ts_sz = cfd->user_comparator()->timestamp_size(); if (start.has_value()) { read_options.iterate_lower_bound = &start.value(); + if (ts_sz > 0) { + start_without_ts = StripTimestampFromUserKey(start.value(), ts_sz); + read_options.iterate_lower_bound = &start_without_ts.value(); + } } if (end.has_value()) { read_options.iterate_upper_bound = &end.value(); + if (ts_sz > 0) { + end_without_ts = StripTimestampFromUserKey(end.value(), ts_sz); + read_options.iterate_upper_bound = &end_without_ts.value(); + } } // Although the v2 aggregator is what the level iterator(s) know about, diff --git a/db/db_with_timestamp_compaction_test.cc b/db/db_with_timestamp_compaction_test.cc index f5758e01e..c092b5cae 100644 --- a/db/db_with_timestamp_compaction_test.cc +++ b/db/db_with_timestamp_compaction_test.cc @@ -109,6 +109,67 @@ TEST_F(TimestampCompatibleCompactionTest, UserKeyCrossFileBoundary) { SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(TimestampCompatibleCompactionTest, MultipleSubCompactions) { + Options options = CurrentOptions(); + options.env = env_; + options.compaction_style = kCompactionStyleUniversal; + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + options.level0_file_num_compaction_trigger = 3; + options.max_subcompactions = 3; + options.target_file_size_base = 1024; + options.statistics = CreateDBStatistics(); + DestroyAndReopen(options); + + uint64_t ts = 100; + uint64_t key = 0; + WriteOptions write_opts; + + // Write keys 0, 1, ..., 499 with ts from 100 to 599. + { + for (; key <= 499; ++key, ++ts) { + std::string ts_str = Timestamp(ts); + ASSERT_OK(db_->Put(write_opts, Key1(key), ts_str, + "foo_" + std::to_string(key))); + } + } + + // Write keys 500, ..., 999 with ts from 600 to 1099. + { + for (; key <= 999; ++key, ++ts) { + std::string ts_str = Timestamp(ts); + ASSERT_OK(db_->Put(write_opts, Key1(key), ts_str, + "foo_" + std::to_string(key))); + } + ASSERT_OK(Flush()); + } + + // Wait for compaction to finish + { + ASSERT_OK(dbfull()->RunManualCompaction( + static_cast_with_check( + db_->DefaultColumnFamily()) + ->cfd(), + 0 /* input_level */, 1 /* output_level */, CompactRangeOptions(), + nullptr /* begin */, nullptr /* end */, true /* exclusive */, + true /* disallow_trivial_move */, + std::numeric_limits::max() /* max_file_num_to_ignore */, + "" /*trim_ts*/)); + } + + // Check stats to make sure multiple subcompactions were scheduled for + // boundaries not to be nullptr. + { + HistogramData num_sub_compactions; + options.statistics->histogramData(NUM_SUBCOMPACTIONS_SCHEDULED, + &num_sub_compactions); + ASSERT_GT(num_sub_compactions.sum, 1); + } + + for (key = 0; key <= 999; ++key) { + ASSERT_EQ("foo_" + std::to_string(key), Get(Key1(key), ts)); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 917379e05..630072043 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1470,6 +1470,8 @@ struct ReadOptions { // need to have the same prefix. This is because ordering is not guaranteed // outside of prefix domain. // + // In case of user_defined timestamp, if enabled, iterate_lower_bound should + // point to key without timestamp part. // Default: nullptr const Slice* iterate_lower_bound; @@ -1489,6 +1491,8 @@ struct ReadOptions { // If iterate_upper_bound is not null, SeekToLast() will position the iterator // at the first key smaller than iterate_upper_bound. // + // In case of user_defined timestamp, if enabled, iterate_upper_bound should + // point to key without timestamp part. // Default: nullptr const Slice* iterate_upper_bound;