Provide support for subcompactions with user-defined timestamps (#10344)

Summary:
The subcompaction logic currently picks file boundaries as subcompaction boundaries. This is not compatible with user-defined timestamps because of two issues.
Issue1: ReadOptions.iterate_lower_bound and ReadOptions.iterate_upper_bound contains timestamps which results in assertion failure as BlockBasedTableIterator expects bounds to be without timestamps. As result, because of wrong comparison end key is returned as user_key resulting in assertion failure.
Issue2: Since it might result in two keys that only differ by user timestamp getting processed by two different subcompactions (and thus two different CompactionIterator state machines), which in turn can cause data correction issues.

This PR provide support to reenable subcompactions with user-defined timestamps.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10344

Test Plan:
Added new unit test
- Without fix for Issue1 unit test MultipleSubCompactions fails with error:
```
db_with_timestamp_compaction_test: ./db/compaction/clipping_iterator.h:247: void rocksdb::ClippingIterat│
or::AssertBounds(): Assertion `!valid_ || !end_ || cmp_->Compare(key(), *end_) < 0' failed.
Received signal 6 (Aborted)                                                                             │
#0   /usr/local/fbcode/platform009/lib/libc.so.6(gsignal+0x100) [0x7f8fbbbfe530] db_with_timestamp_compaction_test: ./db/compaction/clipping_iterator.h:247: void rocksdb::ClippingIterator::AssertBounds(): Assertion `!valid_ || !end_ || cmp_->Compare(key(), *end_) < 0' failed.
Aborted (core dumped)
```
Ran stress test
`make crash_test_with_ts -j32`

Reviewed By: riversand963

Differential Revision: D38220841

Pulled By: akankshamahajan15

fbshipit-source-id: 5d5cae2bd37fcaeba1e77fce0a69070ad4158ccb
main
Akanksha Mahajan 2 years ago committed by Facebook GitHub Bot
parent 54aebb2cc5
commit 56463d443d
  1. 1
      HISTORY.md
  2. 7
      db/compaction/compaction.cc
  3. 40
      db/compaction/compaction_job.cc
  4. 61
      db/db_with_timestamp_compaction_test.cc
  5. 4
      include/rocksdb/options.h

@ -6,6 +6,7 @@
* Charge memory usage of blob cache when the backing cache of the blob cache and the block cache are different. If an operation reserving memory for blob cache exceeds the avaible space left in the block cache at some point (i.e, causing a cache full under `LRUCacheOptions::strict_capacity_limit` = true), creation will fail with `Status::MemoryLimit()`. To opt in this feature, enable charging `CacheEntryRole::kBlobCache` in `BlockBasedTableOptions::cache_usage_options`. * Charge memory usage of blob cache when the backing cache of the blob cache and the block cache are different. If an operation reserving memory for blob cache exceeds the avaible space left in the block cache at some point (i.e, causing a cache full under `LRUCacheOptions::strict_capacity_limit` = true), creation will fail with `Status::MemoryLimit()`. To opt in this feature, enable charging `CacheEntryRole::kBlobCache` in `BlockBasedTableOptions::cache_usage_options`.
* Improve subcompaction range partition so that it is likely to be more even. More evenly distribution of subcompaction will improve compaction throughput for some workloads. All input files' index blocks to sample some anchor key points from which we pick positions to partition the input range. This would introduce some CPU overhead in compaction preparation phase, if subcompaction is enabled, but it should be a small fraction of the CPU usage of the whole compaction process. This also brings a behavier change: subcompaction number is much more likely to maxed out than before. * Improve subcompaction range partition so that it is likely to be more even. More evenly distribution of subcompaction will improve compaction throughput for some workloads. All input files' index blocks to sample some anchor key points from which we pick positions to partition the input range. This would introduce some CPU overhead in compaction preparation phase, if subcompaction is enabled, but it should be a small fraction of the CPU usage of the whole compaction process. This also brings a behavier change: subcompaction number is much more likely to maxed out than before.
* Add CompactionPri::kRoundRobin, a compaction picking mode that cycles through all the files with a compact cursor in a round-robin manner. This feature is available since 7.5. * Add CompactionPri::kRoundRobin, a compaction picking mode that cycles through all the files with a compact cursor in a round-robin manner. This feature is available since 7.5.
* Provide support for subcompactions for user_defined_timestamp.
### Public API changes ### Public API changes
* Removed Customizable support for RateLimiter and removed its CreateFromString() and Type() functions. * Removed Customizable support for RateLimiter and removed its CreateFromString() and Type() functions.

@ -664,13 +664,6 @@ bool Compaction::ShouldFormSubcompactions() const {
return false; return false;
} }
// Note: the subcompaction boundary picking logic does not currently guarantee
// that all user keys that differ only by timestamp get processed by the same
// subcompaction.
if (cfd_->user_comparator()->timestamp_size() > 0) {
return false;
}
// Round-Robin pri under leveled compaction allows subcompactions by default // Round-Robin pri under leveled compaction allows subcompactions by default
// and the number of subcompactions can be larger than max_subcompactions_ // and the number of subcompactions can be larger than max_subcompactions_
if (cfd_->ioptions()->compaction_pri == kRoundRobin && if (cfd_->ioptions()->compaction_pri == kRoundRobin &&

@ -240,8 +240,8 @@ void CompactionJob::Prepare() {
bottommost_level_ = c->bottommost_level(); bottommost_level_ = c->bottommost_level();
if (c->ShouldFormSubcompactions()) { if (c->ShouldFormSubcompactions()) {
StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME); StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
GenSubcompactionBoundaries(); GenSubcompactionBoundaries();
} }
if (boundaries_.size() > 1) { if (boundaries_.size() > 1) {
for (size_t i = 0; i <= boundaries_.size(); i++) { for (size_t i = 0; i <= boundaries_.size(); i++) {
@ -250,6 +250,11 @@ void CompactionJob::Prepare() {
(i != boundaries_.size()) ? std::optional<Slice>(boundaries_[i]) (i != boundaries_.size()) ? std::optional<Slice>(boundaries_[i])
: std::nullopt, : std::nullopt,
static_cast<uint32_t>(i)); static_cast<uint32_t>(i));
// assert to validate that boundaries don't have same user keys (without
// timestamp part).
assert(i == 0 || i == boundaries_.size() ||
cfd->user_comparator()->CompareWithoutTimestamp(
boundaries_[i - 1], true, boundaries_[i], true) < 0);
} }
RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED, RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
compact_->sub_compact_states.size()); compact_->sub_compact_states.size());
@ -479,9 +484,20 @@ void CompactionJob::GenSubcompactionBoundaries() {
std::sort( std::sort(
all_anchors.begin(), all_anchors.end(), all_anchors.begin(), all_anchors.end(),
[cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool { [cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool {
return cfd_comparator->Compare(a.user_key, b.user_key) < 0; return cfd_comparator->CompareWithoutTimestamp(a.user_key, true,
b.user_key, true) < 0;
}); });
// Remove duplicated entries from boundaries.
all_anchors.erase(
std::unique(all_anchors.begin(), all_anchors.end(),
[cfd_comparator](TableReader::Anchor& a,
TableReader::Anchor& b) -> bool {
return cfd_comparator->CompareWithoutTimestamp(
a.user_key, true, b.user_key, true) == 0;
}),
all_anchors.end());
// Get the number of planned subcompactions, may update reserve threads // Get the number of planned subcompactions, may update reserve threads
// and update extra_num_subcompaction_threads_reserved_ for round-robin // and update extra_num_subcompaction_threads_reserved_ for round-robin
uint64_t num_planned_subcompactions; uint64_t num_planned_subcompactions;
@ -1024,6 +1040,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
const std::optional<Slice> start = sub_compact->start; const std::optional<Slice> start = sub_compact->start;
const std::optional<Slice> end = sub_compact->end; const std::optional<Slice> end = sub_compact->end;
std::optional<Slice> start_without_ts;
std::optional<Slice> end_without_ts;
ReadOptions read_options; ReadOptions read_options;
read_options.verify_checksums = true; read_options.verify_checksums = true;
read_options.fill_cache = false; read_options.fill_cache = false;
@ -1034,15 +1053,22 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// (b) CompactionFilter::Decision::kRemoveAndSkipUntil. // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
read_options.total_order_seek = true; read_options.total_order_seek = true;
// Note: if we're going to support subcompactions for user-defined timestamps, // Remove the timestamps from boundaries because boundaries created in
// the timestamp part will have to be stripped from the bounds here. // GenSubcompactionBoundaries doesn't strip away the timestamp.
assert((!start.has_value() && !end.has_value()) || size_t ts_sz = cfd->user_comparator()->timestamp_size();
cfd->user_comparator()->timestamp_size() == 0);
if (start.has_value()) { if (start.has_value()) {
read_options.iterate_lower_bound = &start.value(); read_options.iterate_lower_bound = &start.value();
if (ts_sz > 0) {
start_without_ts = StripTimestampFromUserKey(start.value(), ts_sz);
read_options.iterate_lower_bound = &start_without_ts.value();
}
} }
if (end.has_value()) { if (end.has_value()) {
read_options.iterate_upper_bound = &end.value(); read_options.iterate_upper_bound = &end.value();
if (ts_sz > 0) {
end_without_ts = StripTimestampFromUserKey(end.value(), ts_sz);
read_options.iterate_upper_bound = &end_without_ts.value();
}
} }
// Although the v2 aggregator is what the level iterator(s) know about, // Although the v2 aggregator is what the level iterator(s) know about,

@ -109,6 +109,67 @@ TEST_F(TimestampCompatibleCompactionTest, UserKeyCrossFileBoundary) {
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_F(TimestampCompatibleCompactionTest, MultipleSubCompactions) {
Options options = CurrentOptions();
options.env = env_;
options.compaction_style = kCompactionStyleUniversal;
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
options.level0_file_num_compaction_trigger = 3;
options.max_subcompactions = 3;
options.target_file_size_base = 1024;
options.statistics = CreateDBStatistics();
DestroyAndReopen(options);
uint64_t ts = 100;
uint64_t key = 0;
WriteOptions write_opts;
// Write keys 0, 1, ..., 499 with ts from 100 to 599.
{
for (; key <= 499; ++key, ++ts) {
std::string ts_str = Timestamp(ts);
ASSERT_OK(db_->Put(write_opts, Key1(key), ts_str,
"foo_" + std::to_string(key)));
}
}
// Write keys 500, ..., 999 with ts from 600 to 1099.
{
for (; key <= 999; ++key, ++ts) {
std::string ts_str = Timestamp(ts);
ASSERT_OK(db_->Put(write_opts, Key1(key), ts_str,
"foo_" + std::to_string(key)));
}
ASSERT_OK(Flush());
}
// Wait for compaction to finish
{
ASSERT_OK(dbfull()->RunManualCompaction(
static_cast_with_check<ColumnFamilyHandleImpl>(
db_->DefaultColumnFamily())
->cfd(),
0 /* input_level */, 1 /* output_level */, CompactRangeOptions(),
nullptr /* begin */, nullptr /* end */, true /* exclusive */,
true /* disallow_trivial_move */,
std::numeric_limits<uint64_t>::max() /* max_file_num_to_ignore */,
"" /*trim_ts*/));
}
// Check stats to make sure multiple subcompactions were scheduled for
// boundaries not to be nullptr.
{
HistogramData num_sub_compactions;
options.statistics->histogramData(NUM_SUBCOMPACTIONS_SCHEDULED,
&num_sub_compactions);
ASSERT_GT(num_sub_compactions.sum, 1);
}
for (key = 0; key <= 999; ++key) {
ASSERT_EQ("foo_" + std::to_string(key), Get(Key1(key), ts));
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -1470,6 +1470,8 @@ struct ReadOptions {
// need to have the same prefix. This is because ordering is not guaranteed // need to have the same prefix. This is because ordering is not guaranteed
// outside of prefix domain. // outside of prefix domain.
// //
// In case of user_defined timestamp, if enabled, iterate_lower_bound should
// point to key without timestamp part.
// Default: nullptr // Default: nullptr
const Slice* iterate_lower_bound; const Slice* iterate_lower_bound;
@ -1489,6 +1491,8 @@ struct ReadOptions {
// If iterate_upper_bound is not null, SeekToLast() will position the iterator // If iterate_upper_bound is not null, SeekToLast() will position the iterator
// at the first key smaller than iterate_upper_bound. // at the first key smaller than iterate_upper_bound.
// //
// In case of user_defined timestamp, if enabled, iterate_upper_bound should
// point to key without timestamp part.
// Default: nullptr // Default: nullptr
const Slice* iterate_upper_bound; const Slice* iterate_upper_bound;

Loading…
Cancel
Save