From 26894303c1eb55650c9b2f5e5a8a3bfbd9f5f5ce Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Mon, 3 Aug 2015 20:42:55 -0700 Subject: [PATCH] Add CompactOnDeletionCollector in utilities/table_properties_collectors. Summary: This diff adds CompactOnDeletionCollector in utilities/table_properties_collectors, which applies a sliding window to a sst file and mark this file as need-compaction when it observe enough deletion entries within the consecutive keys covered by the sliding window. Test Plan: compact_on_deletion_collector_test Reviewers: igor, anthony, IslamAbdelRahman, kradhakrishnan, yoshinorim, sdong Reviewed By: sdong Subscribers: maykov, dhruba Differential Revision: https://reviews.facebook.net/D41175 --- CMakeLists.txt | 2 + Makefile | 8 +- .../utilities/table_properties_collectors.h | 29 +++ src.mk | 6 +- .../compact_on_deletion_collector.cc | 93 +++++++++ .../compact_on_deletion_collector.h | 101 ++++++++++ .../compact_on_deletion_collector_test.cc | 177 ++++++++++++++++++ 7 files changed, 412 insertions(+), 4 deletions(-) create mode 100644 include/rocksdb/utilities/table_properties_collectors.h create mode 100644 utilities/table_properties_collectors/compact_on_deletion_collector.cc create mode 100644 utilities/table_properties_collectors/compact_on_deletion_collector.h create mode 100644 utilities/table_properties_collectors/compact_on_deletion_collector_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 1c5a31818..e9b0a0dae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -216,6 +216,7 @@ set(SOURCES utilities/merge_operators/uint64add.cc utilities/redis/redis_lists.cc utilities/spatialdb/spatial_db.cc + utilities/table_properties_collectors/compact_on_deletion_collector.cc utilities/transactions/optimistic_transaction_impl.cc utilities/transactions/optimistic_transaction_db_impl.cc utilities/ttl/db_ttl_impl.cc @@ -327,6 +328,7 @@ set(TESTS utilities/merge_operators/string_append/stringappend_test.cc utilities/redis/redis_lists_test.cc utilities/spatialdb/spatial_db_test.cc + utilities/table_properties_collectors/compact_on_deletion_collector_test.cc utilities/transactions/optimistic_transaction_test.cc utilities/ttl/ttl_test.cc utilities/write_batch_with_index/write_batch_with_index_test.cc diff --git a/Makefile b/Makefile index 5a09840c3..82074fcbd 100644 --- a/Makefile +++ b/Makefile @@ -300,8 +300,9 @@ TESTS = \ perf_context_test \ optimistic_transaction_test \ write_callback_test \ - compaction_job_stats_test \ - heap_test + heap_test \ + compact_on_deletion_collector_test \ + compaction_job_stats_test SUBSET := $(shell echo $(TESTS) |sed s/^.*$(ROCKSDBTESTS_START)/$(ROCKSDBTESTS_START)/) @@ -757,6 +758,9 @@ compaction_job_test: db/compaction_job_test.o $(LIBOBJECTS) $(TESTHARNESS) compaction_job_stats_test: db/compaction_job_stats_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +compact_on_deletion_collector_test: utilities/table_properties_collectors/compact_on_deletion_collector_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + wal_manager_test: db/wal_manager_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/include/rocksdb/utilities/table_properties_collectors.h b/include/rocksdb/utilities/table_properties_collectors.h new file mode 100644 index 000000000..dd66083c6 --- /dev/null +++ b/include/rocksdb/utilities/table_properties_collectors.h @@ -0,0 +1,29 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#ifndef ROCKSDB_LITE +#include + +#include "rocksdb/table_properties.h" + +namespace rocksdb { + +// Creates a factory of a table property collector that marks a SST +// file as need-compaction when it observe at least "D" deletion +// entries in any "N" consecutive entires. +// +// @param sliding_window_size "N". Note that this number will be +// round up to the smallest multilpe of 128 that is no less +// than the specified size. +// @param deletion_trigger "D". Note that even when "N" is changed, +// the specified number for "D" will not be changed. +extern std::shared_ptr + NewCompactOnDeletionCollectorFactory( + size_t sliding_window_size, + size_t deletion_trigger); +} // namespace rocksdb + +#endif // !ROCKSDB_LITE diff --git a/src.mk b/src.mk index 142595382..8da93881f 100644 --- a/src.mk +++ b/src.mk @@ -111,6 +111,7 @@ LIB_SOURCES = \ utilities/merge_operators/uint64add.cc \ utilities/redis/redis_lists.cc \ utilities/spatialdb/spatial_db.cc \ + utilities/table_properties_collectors/compact_on_deletion_collector.cc \ utilities/transactions/optimistic_transaction_impl.cc \ utilities/transactions/optimistic_transaction_db_impl.cc \ utilities/ttl/db_ttl_impl.cc \ @@ -227,9 +228,10 @@ TEST_BENCH_SOURCES = \ utilities/merge_operators/string_append/stringappend_test.cc \ utilities/redis/redis_lists_test.cc \ utilities/spatialdb/spatial_db_test.cc \ - utilities/transactions/optimistic_transaction_test.cc \ + utilities/table_properties_collectors/compact_on_deletion_collector_test.cc \ + utilities/transactions/optimistic_transaction_test.cc \ utilities/ttl/ttl_test.cc \ - utilities/write_batch_with_index/write_batch_with_index_test.cc \ + utilities/write_batch_with_index/write_batch_with_index_test.cc \ util/log_write_bench.cc \ util/manual_compaction_test.cc \ util/memenv_test.cc \ diff --git a/utilities/table_properties_collectors/compact_on_deletion_collector.cc b/utilities/table_properties_collectors/compact_on_deletion_collector.cc new file mode 100644 index 000000000..be0e53ae6 --- /dev/null +++ b/utilities/table_properties_collectors/compact_on_deletion_collector.cc @@ -0,0 +1,93 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE +#include + +#include "rocksdb/utilities/table_properties_collectors.h" +#include "utilities/table_properties_collectors/compact_on_deletion_collector.h" + +namespace rocksdb { + +CompactOnDeletionCollector::CompactOnDeletionCollector( + size_t sliding_window_size, + size_t deletion_trigger) { + deletion_trigger_ = deletion_trigger; + + // First, compute the number of keys in each bucket. + bucket_size_ = + (sliding_window_size + kNumBuckets - 1) / kNumBuckets; + assert(bucket_size_ > 0U); + + Reset(); +} + +void CompactOnDeletionCollector::Reset() { + for (int i = 0; i < kNumBuckets; ++i) { + num_deletions_in_buckets_[i] = 0; + } + current_bucket_ = 0; + num_keys_in_current_bucket_ = 0; + num_deletions_in_observation_window_ = 0; + need_compaction_ = false; +} + +// AddUserKey() will be called when a new key/value pair is inserted into the +// table. +// @params key the user key that is inserted into the table. +// @params value the value that is inserted into the table. +// @params file_size file size up to now +Status CompactOnDeletionCollector::AddUserKey( + const Slice& key, const Slice& value, + EntryType type, SequenceNumber seq, + uint64_t file_size) { + if (need_compaction_) { + // If the output file already needs to be compacted, skip the check. + return Status::OK(); + } + + if (num_keys_in_current_bucket_ == bucket_size_) { + // When the current bucket is full, advance the cursor of the + // ring buffer to the next bucket. + current_bucket_ = (current_bucket_ + 1) % kNumBuckets; + + // Update the current count of observed deletion keys by excluding + // the number of deletion keys in the oldest bucket in the + // observation window. + assert(num_deletions_in_observation_window_ >= + num_deletions_in_buckets_[current_bucket_]); + num_deletions_in_observation_window_ -= + num_deletions_in_buckets_[current_bucket_]; + num_deletions_in_buckets_[current_bucket_] = 0; + num_keys_in_current_bucket_ = 0; + } + + num_keys_in_current_bucket_++; + if (type == kEntryDelete) { + num_deletions_in_observation_window_++; + num_deletions_in_buckets_[current_bucket_]++; + if (num_deletions_in_observation_window_ >= deletion_trigger_) { + need_compaction_ = true; + } + } + return Status::OK(); +} + +TablePropertiesCollector* CompactOnDeletionCollectorFactory:: + CreateTablePropertiesCollector() { + return new CompactOnDeletionCollector( + sliding_window_size_, deletion_trigger_); +} + +std::shared_ptr + NewCompactOnDeletionCollectorFactory( + size_t sliding_window_size, + size_t deletion_trigger) { + return std::shared_ptr( + new CompactOnDeletionCollectorFactory( + sliding_window_size, deletion_trigger)); +} +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/utilities/table_properties_collectors/compact_on_deletion_collector.h b/utilities/table_properties_collectors/compact_on_deletion_collector.h new file mode 100644 index 000000000..0d5e98b3f --- /dev/null +++ b/utilities/table_properties_collectors/compact_on_deletion_collector.h @@ -0,0 +1,101 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#ifndef ROCKSDB_LITE +namespace rocksdb { + +// A factory of a table property collector that marks a SST +// file as need-compaction when it observe at least "D" deletion +// entries in any "N" consecutive entires. +class CompactOnDeletionCollectorFactory + : public TablePropertiesCollectorFactory { + public: + // A factory of a table property collector that marks a SST + // file as need-compaction when it observe at least "D" deletion + // entries in any "N" consecutive entires. + // + // @param sliding_window_size "N" + // @param deletion_trigger "D" + CompactOnDeletionCollectorFactory( + size_t sliding_window_size, + size_t deletion_trigger) : + sliding_window_size_(sliding_window_size), + deletion_trigger_(deletion_trigger) {} + + virtual ~CompactOnDeletionCollectorFactory() {} + + virtual TablePropertiesCollector* CreateTablePropertiesCollector() override; + + virtual const char* Name() const override { + return "CompactOnDeletionCollector"; + } + + private: + size_t sliding_window_size_; + size_t deletion_trigger_; +}; + +class CompactOnDeletionCollector : public TablePropertiesCollector { + public: + CompactOnDeletionCollector( + size_t sliding_window_size, + size_t deletion_trigger); + + // AddUserKey() will be called when a new key/value pair is inserted into the + // table. + // @params key the user key that is inserted into the table. + // @params value the value that is inserted into the table. + // @params file_size file size up to now + virtual Status AddUserKey(const Slice& key, const Slice& value, + EntryType type, SequenceNumber seq, + uint64_t file_size) override; + + // Finish() will be called when a table has already been built and is ready + // for writing the properties block. + // @params properties User will add their collected statistics to + // `properties`. + virtual Status Finish(UserCollectedProperties* properties) override { + Reset(); + return Status::OK(); + } + + // Return the human-readable properties, where the key is property name and + // the value is the human-readable form of value. + virtual UserCollectedProperties GetReadableProperties() const override { + return UserCollectedProperties(); + } + + // The name of the properties collector can be used for debugging purpose. + virtual const char* Name() const { + return "CompactOnDeletionCollector"; + } + + // EXPERIMENTAL Return whether the output file should be further compacted + virtual bool NeedCompact() const { + return need_compaction_; + } + + static const int kNumBuckets = 128; + + private: + void Reset(); + + // A ring buffer that used to count the number of deletion entries for every + // "bucket_size_" keys. + size_t num_deletions_in_buckets_[kNumBuckets]; + // the number of keys in a bucket + size_t bucket_size_; + + size_t current_bucket_; + size_t num_keys_in_current_bucket_; + size_t num_deletions_in_observation_window_; + size_t deletion_trigger_; + // true if the current SST file needs to be compacted. + bool need_compaction_; +}; +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/utilities/table_properties_collectors/compact_on_deletion_collector_test.cc b/utilities/table_properties_collectors/compact_on_deletion_collector_test.cc new file mode 100644 index 000000000..12f4e2e4f --- /dev/null +++ b/utilities/table_properties_collectors/compact_on_deletion_collector_test.cc @@ -0,0 +1,177 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include + +#ifndef ROCKSDB_LITE +#include +#include +#include + +#include "rocksdb/table.h" +#include "rocksdb/utilities/table_properties_collectors.h" +#include "util/random.h" +#include "utilities/table_properties_collectors/compact_on_deletion_collector.h" + +int main(int argc, char** argv) { + const int kWindowSizes[] = + {1000, 10000, 10000, 127, 128, 129, 255, 256, 257, 2, 10000}; + const int kDeletionTriggers[] = + {500, 9500, 4323, 47, 61, 128, 250, 250, 250, 2, 2}; + + std::vector window_sizes; + std::vector deletion_triggers; + // deterministic tests + for (int test = 0; test < 9; ++test) { + window_sizes.emplace_back(kWindowSizes[test]); + deletion_triggers.emplace_back(kDeletionTriggers[test]); + } + + // randomize tests + rocksdb::Random rnd(301); + const int kMaxTestSize = 100000l; + for (int random_test = 0; random_test < 100; random_test++) { + int window_size = rnd.Uniform(kMaxTestSize) + 1; + int deletion_trigger = rnd.Uniform(window_size); + window_sizes.emplace_back(window_size); + deletion_triggers.emplace_back(deletion_trigger); + } + + assert(window_sizes.size() == deletion_triggers.size()); + + for (size_t test = 0; test < window_sizes.size(); ++test) { + const int kBucketSize = 128; + const int kWindowSize = window_sizes[test]; + const int kPaddedWindowSize = + kBucketSize * ((window_sizes[test] + kBucketSize - 1) / kBucketSize); + const int kNumDeletionTrigger = deletion_triggers[test]; + const int kBias = (kNumDeletionTrigger + kBucketSize - 1) / kBucketSize; + // Simple test + { + std::unique_ptr collector; + auto factory = rocksdb::NewCompactOnDeletionCollectorFactory( + kWindowSize, kNumDeletionTrigger); + collector.reset( + factory->CreateTablePropertiesCollector()); + const int kSample = 10; + for (int delete_rate = 0; delete_rate <= kSample; ++delete_rate) { + int deletions = 0; + for (int i = 0; i < kPaddedWindowSize; ++i) { + if (i % kSample < delete_rate) { + collector->AddUserKey("hello", "rocksdb", + rocksdb::kEntryDelete, 0, 0); + deletions++; + } else { + collector->AddUserKey("hello", "rocksdb", + rocksdb::kEntryPut, 0, 0); + } + } + if (collector->NeedCompact() != + (deletions >= kNumDeletionTrigger) && + std::abs(deletions - kNumDeletionTrigger) > kBias) { + fprintf(stderr, "[Error] collector->NeedCompact() != (%d >= %d)" + " with kWindowSize = %d and kNumDeletionTrigger = %d\n", + deletions, kNumDeletionTrigger, + kWindowSize, kNumDeletionTrigger); + assert(false); + } + collector->Finish(nullptr); + } + } + + // Only one section of a file satisfies the compaction trigger + { + std::unique_ptr collector; + auto factory = rocksdb::NewCompactOnDeletionCollectorFactory( + kWindowSize, kNumDeletionTrigger); + collector.reset( + factory->CreateTablePropertiesCollector()); + const int kSample = 10; + for (int delete_rate = 0; delete_rate <= kSample; ++delete_rate) { + int deletions = 0; + for (int section = 0; section < 5; ++section) { + int initial_entries = rnd.Uniform(kWindowSize) + kWindowSize; + for (int i = 0; i < initial_entries; ++i) { + collector->AddUserKey("hello", "rocksdb", + rocksdb::kEntryPut, 0, 0); + } + } + for (int i = 0; i < kPaddedWindowSize; ++i) { + if (i % kSample < delete_rate) { + collector->AddUserKey("hello", "rocksdb", + rocksdb::kEntryDelete, 0, 0); + deletions++; + } else { + collector->AddUserKey("hello", "rocksdb", + rocksdb::kEntryPut, 0, 0); + } + } + for (int section = 0; section < 5; ++section) { + int ending_entries = rnd.Uniform(kWindowSize) + kWindowSize; + for (int i = 0; i < ending_entries; ++i) { + collector->AddUserKey("hello", "rocksdb", + rocksdb::kEntryPut, 0, 0); + } + } + if (collector->NeedCompact() != (deletions >= kNumDeletionTrigger) && + std::abs(deletions - kNumDeletionTrigger) > kBias) { + fprintf(stderr, "[Error] collector->NeedCompact() %d != (%d >= %d)" + " with kWindowSize = %d, kNumDeletionTrigger = %d\n", + collector->NeedCompact(), + deletions, kNumDeletionTrigger, kWindowSize, + kNumDeletionTrigger); + assert(false); + } + collector->Finish(nullptr); + } + } + + // TEST 3: Issues a lots of deletes, but their density is not + // high enough to trigger compaction. + { + std::unique_ptr collector; + auto factory = rocksdb::NewCompactOnDeletionCollectorFactory( + kWindowSize, kNumDeletionTrigger); + collector.reset( + factory->CreateTablePropertiesCollector()); + assert(collector->NeedCompact() == false); + // Insert "kNumDeletionTrigger * 0.95" deletions for every + // "kWindowSize" and verify compaction is not needed. + const int kDeletionsPerSection = kNumDeletionTrigger * 95 / 100; + if (kDeletionsPerSection >= 0) { + for (int section = 0; section < 200; ++section) { + for (int i = 0; i < kPaddedWindowSize; ++i) { + if (i < kDeletionsPerSection) { + collector->AddUserKey("hello", "rocksdb", + rocksdb::kEntryDelete, 0, 0); + } else { + collector->AddUserKey("hello", "rocksdb", + rocksdb::kEntryPut, 0, 0); + } + } + } + if (collector->NeedCompact() && + std::abs(kDeletionsPerSection - kNumDeletionTrigger) > kBias) { + fprintf(stderr, "[Error] collector->NeedCompact() != false" + " with kWindowSize = %d and kNumDeletionTrigger = %d\n", + kWindowSize, kNumDeletionTrigger); + assert(false); + } + collector->Finish(nullptr); + } + } + } + fprintf(stderr, "PASSED\n"); +} +#else +int main(int argc, char** argv) { + fprintf(stderr, "SKIPPED as RocksDBLite does not include utilities.\n"); + return 0; +} +#endif // !ROCKSDB_LITE