Trigger compaction in CompactOnDeletionCollector based on deletion ratio (#6806)

Summary:
In level compaction, if the total size (even if compensated after taking account of the deletions) of a level hasn't exceeded the limit, but there are lots of deletion entries in some SST files of the level, these files should also be good candidates for compaction. Otherwise, queries for the deleted keys might be slow because they need to go over all the tombstones.

This PR adds an option `deletion_ratio` to the factory of `CompactOnDeletionCollector` to configure it to trigger compaction when the ratio of tombstones >= `deletion_ratio`.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6806

Test Plan:
Added new unit test in `compact_on_deletion_collector_test.cc`.
make compact_on_deletion_collector_test && ./compact_on_deletion_collector_test

Reviewed By: ajkr

Differential Revision: D21511981

Pulled By: cheng-chang

fbshipit-source-id: 65a9d0150e8c9c00337787686475252e4535a3e1
main
Cheng Chang 4 years ago committed by Facebook GitHub Bot
parent d790e6004f
commit b9d65f5aa6
  1. 33
      include/rocksdb/utilities/table_properties_collectors.h
  2. 84
      utilities/table_properties_collectors/compact_on_deletion_collector.cc
  3. 14
      utilities/table_properties_collectors/compact_on_deletion_collector.h
  4. 121
      utilities/table_properties_collectors/compact_on_deletion_collector_test.cc

@ -14,7 +14,8 @@ namespace ROCKSDB_NAMESPACE {
// A factory of a table property collector that marks a SST
// file as need-compaction when it observe at least "D" deletion
// entries in any "N" consecutive entires.
// entries in any "N" consecutive entires or the ratio of tombstone
// entries in the whole file >= the specified deletion ratio.
class CompactOnDeletionCollectorFactory
: public TablePropertiesCollectorFactory {
public:
@ -34,6 +35,13 @@ class CompactOnDeletionCollectorFactory
deletion_trigger_.store(deletion_trigger);
}
// Change deletion ratio.
// @param deletion_ratio, if <= 0 or > 1, disable triggering compaction
// based on deletion ratio.
void SetDeletionRatio(double deletion_ratio) {
deletion_ratio_.store(deletion_ratio);
}
const char* Name() const override {
return "CompactOnDeletionCollector";
}
@ -43,34 +51,45 @@ class CompactOnDeletionCollectorFactory
private:
friend std::shared_ptr<CompactOnDeletionCollectorFactory>
NewCompactOnDeletionCollectorFactory(size_t sliding_window_size,
size_t deletion_trigger);
size_t deletion_trigger,
double deletion_ratio);
// A factory of a table property collector that marks a SST
// file as need-compaction when it observe at least "D" deletion
// entries in any "N" consecutive entires.
// entries in any "N" consecutive entires, or the ratio of tombstone
// entries >= deletion_ratio.
//
// @param sliding_window_size "N"
// @param deletion_trigger "D"
// @param deletion_ratio, if <= 0 or > 1, disable triggering compaction
// based on deletion ratio.
CompactOnDeletionCollectorFactory(size_t sliding_window_size,
size_t deletion_trigger)
size_t deletion_trigger,
double deletion_ratio)
: sliding_window_size_(sliding_window_size),
deletion_trigger_(deletion_trigger) {}
deletion_trigger_(deletion_trigger),
deletion_ratio_(deletion_ratio) {}
std::atomic<size_t> sliding_window_size_;
std::atomic<size_t> deletion_trigger_;
std::atomic<double> deletion_ratio_;
};
// Creates a factory of a table property collector that marks a SST
// file as need-compaction when it observe at least "D" deletion
// entries in any "N" consecutive entires.
// entries in any "N" consecutive entires, or the ratio of tombstone
// entries >= deletion_ratio.
//
// @param sliding_window_size "N". Note that this number will be
// round up to the smallest multiple of 128 that is no less
// than the specified size.
// @param deletion_trigger "D". Note that even when "N" is changed,
// the specified number for "D" will not be changed.
// @param deletion_ratio, if <= 0 or > 1, disable triggering compaction
// based on deletion ratio. Disabled by default.
extern std::shared_ptr<CompactOnDeletionCollectorFactory>
NewCompactOnDeletionCollectorFactory(size_t sliding_window_size,
size_t deletion_trigger);
size_t deletion_trigger,
double deletion_ratio = 0);
} // namespace ROCKSDB_NAMESPACE
#endif // !ROCKSDB_LITE

@ -13,12 +13,14 @@
namespace ROCKSDB_NAMESPACE {
CompactOnDeletionCollector::CompactOnDeletionCollector(
size_t sliding_window_size, size_t deletion_trigger)
size_t sliding_window_size, size_t deletion_trigger, double deletion_ratio)
: bucket_size_((sliding_window_size + kNumBuckets - 1) / kNumBuckets),
current_bucket_(0),
num_keys_in_current_bucket_(0),
num_deletions_in_observation_window_(0),
deletion_trigger_(deletion_trigger),
deletion_ratio_(deletion_ratio),
deletion_ratio_enabled_(deletion_ratio > 0 && deletion_ratio <= 1),
need_compaction_(false),
finished_(false) {
memset(num_deletions_in_buckets_, 0, sizeof(size_t) * kNumBuckets);
@ -35,7 +37,7 @@ Status CompactOnDeletionCollector::AddUserKey(const Slice& /*key*/,
SequenceNumber /*seq*/,
uint64_t /*file_size*/) {
assert(!finished_);
if (bucket_size_ == 0) {
if (!bucket_size_ && !deletion_ratio_enabled_) {
// This collector is effectively disabled
return Status::OK();
}
@ -45,54 +47,76 @@ Status CompactOnDeletionCollector::AddUserKey(const Slice& /*key*/,
return Status::OK();
}
if (num_keys_in_current_bucket_ == bucket_size_) {
// When the current bucket is full, advance the cursor of the
// ring buffer to the next bucket.
current_bucket_ = (current_bucket_ + 1) % kNumBuckets;
// Update the current count of observed deletion keys by excluding
// the number of deletion keys in the oldest bucket in the
// observation window.
assert(num_deletions_in_observation_window_ >=
num_deletions_in_buckets_[current_bucket_]);
num_deletions_in_observation_window_ -=
num_deletions_in_buckets_[current_bucket_];
num_deletions_in_buckets_[current_bucket_] = 0;
num_keys_in_current_bucket_ = 0;
if (deletion_ratio_enabled_) {
total_entries_++;
if (type == kEntryDelete) {
deletion_entries_++;
}
}
num_keys_in_current_bucket_++;
if (type == kEntryDelete) {
num_deletions_in_observation_window_++;
num_deletions_in_buckets_[current_bucket_]++;
if (num_deletions_in_observation_window_ >= deletion_trigger_) {
need_compaction_ = true;
if (bucket_size_) {
if (num_keys_in_current_bucket_ == bucket_size_) {
// When the current bucket is full, advance the cursor of the
// ring buffer to the next bucket.
current_bucket_ = (current_bucket_ + 1) % kNumBuckets;
// Update the current count of observed deletion keys by excluding
// the number of deletion keys in the oldest bucket in the
// observation window.
assert(num_deletions_in_observation_window_ >=
num_deletions_in_buckets_[current_bucket_]);
num_deletions_in_observation_window_ -=
num_deletions_in_buckets_[current_bucket_];
num_deletions_in_buckets_[current_bucket_] = 0;
num_keys_in_current_bucket_ = 0;
}
num_keys_in_current_bucket_++;
if (type == kEntryDelete) {
num_deletions_in_observation_window_++;
num_deletions_in_buckets_[current_bucket_]++;
if (num_deletions_in_observation_window_ >= deletion_trigger_) {
need_compaction_ = true;
}
}
}
return Status::OK();
}
Status CompactOnDeletionCollector::Finish(
UserCollectedProperties* /*properties*/) {
if (!need_compaction_ && deletion_ratio_enabled_ && total_entries_ > 0) {
double ratio = static_cast<double>(deletion_entries_) / total_entries_;
need_compaction_ = ratio >= deletion_ratio_;
}
finished_ = true;
return Status::OK();
}
TablePropertiesCollector*
CompactOnDeletionCollectorFactory::CreateTablePropertiesCollector(
TablePropertiesCollectorFactory::Context /*context*/) {
return new CompactOnDeletionCollector(
sliding_window_size_.load(), deletion_trigger_.load());
return new CompactOnDeletionCollector(sliding_window_size_.load(),
deletion_trigger_.load(),
deletion_ratio_.load());
}
std::string CompactOnDeletionCollectorFactory::ToString() const {
std::ostringstream cfg;
cfg << Name() << " (Sliding window size = " << sliding_window_size_.load()
<< " Deletion trigger = " << deletion_trigger_.load() << ')';
<< " Deletion trigger = " << deletion_trigger_.load()
<< " Deletion ratio = " << deletion_ratio_.load() << ')';
return cfg.str();
}
std::shared_ptr<CompactOnDeletionCollectorFactory>
NewCompactOnDeletionCollectorFactory(
size_t sliding_window_size,
size_t deletion_trigger) {
NewCompactOnDeletionCollectorFactory(size_t sliding_window_size,
size_t deletion_trigger,
double deletion_ratio) {
return std::shared_ptr<CompactOnDeletionCollectorFactory>(
new CompactOnDeletionCollectorFactory(
sliding_window_size, deletion_trigger));
new CompactOnDeletionCollectorFactory(sliding_window_size,
deletion_trigger, deletion_ratio));
}
} // namespace ROCKSDB_NAMESPACE
#endif // !ROCKSDB_LITE

@ -11,9 +11,8 @@ namespace ROCKSDB_NAMESPACE {
class CompactOnDeletionCollector : public TablePropertiesCollector {
public:
CompactOnDeletionCollector(
size_t sliding_window_size,
size_t deletion_trigger);
CompactOnDeletionCollector(size_t sliding_window_size,
size_t deletion_trigger, double deletion_raatio);
// AddUserKey() will be called when a new key/value pair is inserted into the
// table.
@ -28,10 +27,7 @@ class CompactOnDeletionCollector : public TablePropertiesCollector {
// for writing the properties block.
// @params properties User will add their collected statistics to
// `properties`.
virtual Status Finish(UserCollectedProperties* /*properties*/) override {
finished_ = true;
return Status::OK();
}
virtual Status Finish(UserCollectedProperties* /*properties*/) override;
// Return the human-readable properties, where the key is property name and
// the value is the human-readable form of value.
@ -64,6 +60,10 @@ class CompactOnDeletionCollector : public TablePropertiesCollector {
size_t num_keys_in_current_bucket_;
size_t num_deletions_in_observation_window_;
size_t deletion_trigger_;
const double deletion_ratio_;
const bool deletion_ratio_enabled_;
size_t total_entries_ = 0;
size_t deletion_entries_ = 0;
// true if the current SST file needs to be compacted.
bool need_compaction_;
bool finished_;

@ -14,19 +14,75 @@
#include <cmath>
#include <vector>
#include "port/stack_trace.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/utilities/table_properties_collectors.h"
#include "test_util/testharness.h"
#include "util/random.h"
#include "utilities/table_properties_collectors/compact_on_deletion_collector.h"
int main(int /*argc*/, char** /*argv*/) {
namespace ROCKSDB_NAMESPACE {
TEST(CompactOnDeletionCollector, DeletionRatio) {
TablePropertiesCollectorFactory::Context context;
context.column_family_id =
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
const size_t kTotalEntries = 100;
{
// Disable deletion ratio.
for (double deletion_ratio : {-1.5, -1.0, 0.0, 1.5, 2.0}) {
auto factory = NewCompactOnDeletionCollectorFactory(0, 0, deletion_ratio);
std::unique_ptr<TablePropertiesCollector> collector(
factory->CreateTablePropertiesCollector(context));
for (size_t i = 0; i < kTotalEntries; i++) {
// All entries are deletion entries.
collector->AddUserKey("hello", "rocksdb", kEntryDelete, 0, 0);
ASSERT_FALSE(collector->NeedCompact());
}
collector->Finish(nullptr);
ASSERT_FALSE(collector->NeedCompact());
}
}
{
for (double deletion_ratio : {0.3, 0.5, 0.8, 1.0}) {
auto factory = NewCompactOnDeletionCollectorFactory(0, 0, deletion_ratio);
const size_t deletion_entries_trigger =
static_cast<size_t>(deletion_ratio * kTotalEntries);
for (int delta : {-1, 0, 1}) {
// Actual deletion entry ratio <, =, > deletion_ratio
size_t actual_deletion_entries = deletion_entries_trigger + delta;
std::unique_ptr<TablePropertiesCollector> collector(
factory->CreateTablePropertiesCollector(context));
for (size_t i = 0; i < kTotalEntries; i++) {
if (i < actual_deletion_entries) {
collector->AddUserKey("hello", "rocksdb", kEntryDelete, 0, 0);
} else {
collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0);
}
ASSERT_FALSE(collector->NeedCompact());
}
collector->Finish(nullptr);
if (delta >= 0) {
// >= deletion_ratio
ASSERT_TRUE(collector->NeedCompact());
} else {
ASSERT_FALSE(collector->NeedCompact());
}
}
}
}
}
TEST(CompactOnDeletionCollector, SlidingWindow) {
const int kWindowSizes[] =
{1000, 10000, 10000, 127, 128, 129, 255, 256, 257, 2, 10000};
const int kDeletionTriggers[] =
{500, 9500, 4323, 47, 61, 128, 250, 250, 250, 2, 2};
ROCKSDB_NAMESPACE::TablePropertiesCollectorFactory::Context context;
context.column_family_id = ROCKSDB_NAMESPACE::
TablePropertiesCollectorFactory::Context context;
context.column_family_id =
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
std::vector<int> window_sizes;
@ -38,7 +94,7 @@ int main(int /*argc*/, char** /*argv*/) {
}
// randomize tests
ROCKSDB_NAMESPACE::Random rnd(301);
Random rnd(301);
const int kMaxTestSize = 100000l;
for (int random_test = 0; random_test < 10; random_test++) {
int window_size = rnd.Uniform(kMaxTestSize) + 1;
@ -58,21 +114,19 @@ int main(int /*argc*/, char** /*argv*/) {
const int kBias = (kNumDeletionTrigger + kBucketSize - 1) / kBucketSize;
// Simple test
{
auto factory = ROCKSDB_NAMESPACE::NewCompactOnDeletionCollectorFactory(
kWindowSize, kNumDeletionTrigger);
auto factory = NewCompactOnDeletionCollectorFactory(kWindowSize,
kNumDeletionTrigger);
const int kSample = 10;
for (int delete_rate = 0; delete_rate <= kSample; ++delete_rate) {
std::unique_ptr<ROCKSDB_NAMESPACE::TablePropertiesCollector> collector(
std::unique_ptr<TablePropertiesCollector> collector(
factory->CreateTablePropertiesCollector(context));
int deletions = 0;
for (int i = 0; i < kPaddedWindowSize; ++i) {
if (i % kSample < delete_rate) {
collector->AddUserKey("hello", "rocksdb",
ROCKSDB_NAMESPACE::kEntryDelete, 0, 0);
collector->AddUserKey("hello", "rocksdb", kEntryDelete, 0, 0);
deletions++;
} else {
collector->AddUserKey("hello", "rocksdb",
ROCKSDB_NAMESPACE::kEntryPut, 0, 0);
collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0);
}
}
if (collector->NeedCompact() !=
@ -82,7 +136,7 @@ int main(int /*argc*/, char** /*argv*/) {
" with kWindowSize = %d and kNumDeletionTrigger = %d\n",
deletions, kNumDeletionTrigger,
kWindowSize, kNumDeletionTrigger);
assert(false);
ASSERT_TRUE(false);
}
collector->Finish(nullptr);
}
@ -90,35 +144,31 @@ int main(int /*argc*/, char** /*argv*/) {
// Only one section of a file satisfies the compaction trigger
{
auto factory = ROCKSDB_NAMESPACE::NewCompactOnDeletionCollectorFactory(
kWindowSize, kNumDeletionTrigger);
auto factory = NewCompactOnDeletionCollectorFactory(kWindowSize,
kNumDeletionTrigger);
const int kSample = 10;
for (int delete_rate = 0; delete_rate <= kSample; ++delete_rate) {
std::unique_ptr<ROCKSDB_NAMESPACE::TablePropertiesCollector> collector(
std::unique_ptr<TablePropertiesCollector> collector(
factory->CreateTablePropertiesCollector(context));
int deletions = 0;
for (int section = 0; section < 5; ++section) {
int initial_entries = rnd.Uniform(kWindowSize) + kWindowSize;
for (int i = 0; i < initial_entries; ++i) {
collector->AddUserKey("hello", "rocksdb",
ROCKSDB_NAMESPACE::kEntryPut, 0, 0);
collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0);
}
}
for (int i = 0; i < kPaddedWindowSize; ++i) {
if (i % kSample < delete_rate) {
collector->AddUserKey("hello", "rocksdb",
ROCKSDB_NAMESPACE::kEntryDelete, 0, 0);
collector->AddUserKey("hello", "rocksdb", kEntryDelete, 0, 0);
deletions++;
} else {
collector->AddUserKey("hello", "rocksdb",
ROCKSDB_NAMESPACE::kEntryPut, 0, 0);
collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0);
}
}
for (int section = 0; section < 5; ++section) {
int ending_entries = rnd.Uniform(kWindowSize) + kWindowSize;
for (int i = 0; i < ending_entries; ++i) {
collector->AddUserKey("hello", "rocksdb",
ROCKSDB_NAMESPACE::kEntryPut, 0, 0);
collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0);
}
}
if (collector->NeedCompact() != (deletions >= kNumDeletionTrigger) &&
@ -128,7 +178,7 @@ int main(int /*argc*/, char** /*argv*/) {
collector->NeedCompact(),
deletions, kNumDeletionTrigger, kWindowSize,
kNumDeletionTrigger);
assert(false);
ASSERT_TRUE(false);
}
collector->Finish(nullptr);
}
@ -137,9 +187,9 @@ int main(int /*argc*/, char** /*argv*/) {
// TEST 3: Issues a lots of deletes, but their density is not
// high enough to trigger compaction.
{
std::unique_ptr<ROCKSDB_NAMESPACE::TablePropertiesCollector> collector;
auto factory = ROCKSDB_NAMESPACE::NewCompactOnDeletionCollectorFactory(
kWindowSize, kNumDeletionTrigger);
std::unique_ptr<TablePropertiesCollector> collector;
auto factory = NewCompactOnDeletionCollectorFactory(kWindowSize,
kNumDeletionTrigger);
collector.reset(factory->CreateTablePropertiesCollector(context));
assert(collector->NeedCompact() == false);
// Insert "kNumDeletionTrigger * 0.95" deletions for every
@ -149,11 +199,9 @@ int main(int /*argc*/, char** /*argv*/) {
for (int section = 0; section < 200; ++section) {
for (int i = 0; i < kPaddedWindowSize; ++i) {
if (i < kDeletionsPerSection) {
collector->AddUserKey("hello", "rocksdb",
ROCKSDB_NAMESPACE::kEntryDelete, 0, 0);
collector->AddUserKey("hello", "rocksdb", kEntryDelete, 0, 0);
} else {
collector->AddUserKey("hello", "rocksdb",
ROCKSDB_NAMESPACE::kEntryPut, 0, 0);
collector->AddUserKey("hello", "rocksdb", kEntryPut, 0, 0);
}
}
}
@ -162,13 +210,20 @@ int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr, "[Error] collector->NeedCompact() != false"
" with kWindowSize = %d and kNumDeletionTrigger = %d\n",
kWindowSize, kNumDeletionTrigger);
assert(false);
ASSERT_TRUE(false);
}
collector->Finish(nullptr);
}
}
}
fprintf(stderr, "PASSED\n");
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
int main(int /*argc*/, char** /*argv*/) {

Loading…
Cancel
Save