From b253f244033f174b7cac9e9282e764b9cc9a8572 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Mon, 24 Mar 2014 11:38:44 -0700 Subject: [PATCH 1/5] Rate limiter for BackupableDB Summary: Might be useful if client doesn't want to effect running system during backup too much. Test Plan: added a test case Reviewers: dhruba, haobo, ljin Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D17091 --- include/utilities/backupable_db.h | 18 ++++- utilities/backupable/backupable_db.cc | 94 ++++++++++++++++++---- utilities/backupable/backupable_db_test.cc | 48 ++++++++++- 3 files changed, 142 insertions(+), 18 deletions(-) diff --git a/include/utilities/backupable_db.h b/include/utilities/backupable_db.h index 1ec9e8966..8f85e0614 100644 --- a/include/utilities/backupable_db.h +++ b/include/utilities/backupable_db.h @@ -61,6 +61,16 @@ struct BackupableDBOptions { // Default: true bool backup_log_files; + // Max bytes that can be transferred in a second during backup. + // If 0, go as fast as you can + // Default: 0 + uint64_t backup_rate_limit; + + // Max bytes that can be transferred in a second during restore. + // If 0, go as fast as you can + // Default: 0 + uint64_t restore_rate_limit; + void Dump(Logger* logger) const; explicit BackupableDBOptions(const std::string& _backup_dir, @@ -68,14 +78,18 @@ struct BackupableDBOptions { bool _share_table_files = true, Logger* _info_log = nullptr, bool _sync = true, bool _destroy_old_data = false, - bool _backup_log_files = true) + bool _backup_log_files = true, + uint64_t _backup_rate_limit = 0, + uint64_t _restore_rate_limit = 0) : backup_dir(_backup_dir), backup_env(_backup_env), share_table_files(_share_table_files), info_log(_info_log), sync(_sync), destroy_old_data(_destroy_old_data), - backup_log_files(_backup_log_files) {} + backup_log_files(_backup_log_files), + backup_rate_limit(_backup_rate_limit), + restore_rate_limit(_restore_rate_limit) {} }; struct RestoreOptions { diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index a78427533..32c3b8481 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -26,17 +26,60 @@ namespace rocksdb { +namespace { +class RateLimiter { + public: + RateLimiter(Env* env, uint64_t max_bytes_per_second, uint64_t bytes_per_check) + : env_(env), + max_bytes_per_second_(max_bytes_per_second), + bytes_per_check_(bytes_per_check), + micros_start_time_(env->NowMicros()), + bytes_since_start_(0) {} + + void ReportAndWait(uint64_t bytes_since_last_call) { + bytes_since_start_ += bytes_since_last_call; + if (bytes_since_start_ < bytes_per_check_) { + // not enough bytes to be rate-limited + return; + } + + uint64_t now = env_->NowMicros(); + uint64_t interval = now - micros_start_time_; + uint64_t should_take_micros = + (bytes_since_start_ * kMicrosInSecond) / max_bytes_per_second_; + + if (should_take_micros > interval) { + env_->SleepForMicroseconds(should_take_micros - interval); + now = env_->NowMicros(); + } + // reset interval + micros_start_time_ = now; + bytes_since_start_ = 0; + } + + private: + Env* env_; + uint64_t max_bytes_per_second_; + uint64_t bytes_per_check_; + uint64_t micros_start_time_; + uint64_t bytes_since_start_; + static const uint64_t kMicrosInSecond = 1000 * 1000LL; +}; +} // namespace + void BackupableDBOptions::Dump(Logger* logger) const { - Log(logger, " Options.backup_dir: %s", backup_dir.c_str()); - Log(logger, " Options.backup_env: %p", backup_env); - Log(logger, "Options.share_table_files: %d", + Log(logger, " Options.backup_dir: %s", backup_dir.c_str()); + Log(logger, " Options.backup_env: %p", backup_env); + Log(logger, " Options.share_table_files: %d", static_cast(share_table_files)); - Log(logger, " Options.info_log: %p", info_log); - Log(logger, " Options.sync: %d", static_cast(sync)); - Log(logger, " Options.destroy_old_data: %d", + Log(logger, " Options.info_log: %p", info_log); + Log(logger, " Options.sync: %d", static_cast(sync)); + Log(logger, " Options.destroy_old_data: %d", static_cast(destroy_old_data)); - Log(logger, " Options.backup_log_files: %d", + Log(logger, " Options.backup_log_files: %d", static_cast(backup_log_files)); + Log(logger, " Options.backup_rate_limit: %" PRIu64, backup_rate_limit); + Log(logger, "Options.restore_rate_limit: %" PRIu64, restore_rate_limit); } // -------- BackupEngineImpl class --------- @@ -170,6 +213,7 @@ class BackupEngineImpl : public BackupEngine { Env* src_env, Env* dst_env, bool sync, + RateLimiter* rate_limiter, uint64_t* size = nullptr, uint32_t* checksum_value = nullptr, uint64_t size_limit = 0); @@ -179,6 +223,7 @@ class BackupEngineImpl : public BackupEngine { bool shared, const std::string& src_dir, const std::string& src_fname, // starts with "/" + RateLimiter* rate_limiter, uint64_t size_limit = 0); Status CalculateChecksum(const std::string& src, @@ -209,7 +254,8 @@ class BackupEngineImpl : public BackupEngine { unique_ptr meta_directory_; unique_ptr private_directory_; - static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL; // 5MB + static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB + size_t copy_file_buffer_size_; }; BackupEngine* BackupEngine::NewBackupEngine( @@ -222,9 +268,8 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env, : stop_backup_(false), options_(options), db_env_(db_env), - backup_env_(options.backup_env != nullptr ? options.backup_env - : db_env_) { - + backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_), + copy_file_buffer_size_(kDefaultCopyFileBufferSize) { options_.Dump(options_.info_log); // create all the dirs we need @@ -350,6 +395,13 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { s = backup_env_->CreateDir( GetAbsolutePath(GetPrivateFileRel(new_backup_id, true))); + unique_ptr rate_limiter; + if (options_.backup_rate_limit > 0) { + copy_file_buffer_size_ = options_.backup_rate_limit / 10; + rate_limiter.reset(new RateLimiter(db_env_, options_.backup_rate_limit, + copy_file_buffer_size_)); + } + // copy live_files for (size_t i = 0; s.ok() && i < live_files.size(); ++i) { uint64_t number; @@ -371,6 +423,7 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { options_.share_table_files && type == kTableFile, db->GetName(), /* src_dir */ live_files[i], /* src_fname */ + rate_limiter.get(), (type == kDescriptorFile) ? manifest_file_size : 0); } @@ -383,7 +436,8 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { &new_backup, false, /* not shared */ db->GetOptions().wal_dir, - live_wal_files[i]->PathName()); + live_wal_files[i]->PathName(), + rate_limiter.get()); } } @@ -527,6 +581,12 @@ Status BackupEngineImpl::RestoreDBFromBackup( DeleteChildren(db_dir); } + unique_ptr rate_limiter; + if (options_.restore_rate_limit > 0) { + copy_file_buffer_size_ = options_.restore_rate_limit / 10; + rate_limiter.reset(new RateLimiter(db_env_, options_.restore_rate_limit, + copy_file_buffer_size_)); + } Status s; for (auto& file : backup.GetFiles()) { std::string dst; @@ -551,7 +611,7 @@ Status BackupEngineImpl::RestoreDBFromBackup( Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); uint32_t checksum_value; s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false, - nullptr /* size */, &checksum_value); + rate_limiter.get(), nullptr /* size */, &checksum_value); if (!s.ok()) { break; } @@ -631,7 +691,8 @@ Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) { Status BackupEngineImpl::CopyFile(const std::string& src, const std::string& dst, Env* src_env, - Env* dst_env, bool sync, uint64_t* size, + Env* dst_env, bool sync, + RateLimiter* rate_limiter, uint64_t* size, uint32_t* checksum_value, uint64_t size_limit) { Status s; @@ -684,6 +745,9 @@ Status BackupEngineImpl::CopyFile(const std::string& src, data.size()); } s = dst_file->Append(data); + if (rate_limiter != nullptr) { + rate_limiter->ReportAndWait(data.size()); + } } while (s.ok() && data.size() > 0 && size_limit > 0); if (s.ok() && sync) { @@ -697,6 +761,7 @@ Status BackupEngineImpl::CopyFile(const std::string& src, Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup, bool shared, const std::string& src_dir, const std::string& src_fname, + RateLimiter* rate_limiter, uint64_t size_limit) { assert(src_fname.size() > 0 && src_fname[0] == '/'); @@ -732,6 +797,7 @@ Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup, db_env_, backup_env_, options_.sync, + rate_limiter, &size, &checksum_value, size_limit); diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index c5ee445a6..88c4af0d3 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -293,13 +293,16 @@ class FileManager : public EnvWrapper { }; // FileManager // utility functions -static void FillDB(DB* db, int from, int to) { +static size_t FillDB(DB* db, int from, int to) { + size_t bytes_written = 0; for (int i = from; i < to; ++i) { std::string key = "testkey" + std::to_string(i); std::string value = "testvalue" + std::to_string(i); + bytes_written += key.size() + value.size(); ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value))); } + return bytes_written; } static void AssertExists(DB* db, int from, int to) { @@ -780,8 +783,8 @@ TEST(BackupableDBTest, DeleteTmpFiles) { } TEST(BackupableDBTest, KeepLogFiles) { - // basically infinite backupable_options_->backup_log_files = false; + // basically infinite options_.WAL_ttl_seconds = 24 * 60 * 60; OpenBackupableDB(true); FillDB(db_.get(), 0, 100); @@ -800,6 +803,47 @@ TEST(BackupableDBTest, KeepLogFiles) { AssertBackupConsistency(0, 0, 500, 600, true); } +TEST(BackupableDBTest, RateLimiting) { + uint64_t const KB = 1024 * 1024; + size_t const kMicrosPerSec = 1000 * 1000LL; + + std::vector> limits( + {{KB, 5 * KB}, {2 * KB, 3 * KB}}); + + for (const auto& limit : limits) { + // destroy old data + DestroyDB(dbname_, Options()); + + backupable_options_->backup_rate_limit = limit.first; + backupable_options_->restore_rate_limit = limit.second; + options_.compression = kNoCompression; + OpenBackupableDB(true); + size_t bytes_written = FillDB(db_.get(), 0, 100000); + + auto start_backup = env_->NowMicros(); + ASSERT_OK(db_->CreateNewBackup(false)); + auto backup_time = env_->NowMicros() - start_backup; + auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) / + backupable_options_->backup_rate_limit; + ASSERT_GT(backup_time, 0.9 * rate_limited_backup_time); + ASSERT_LT(backup_time, 1.1 * rate_limited_backup_time); + + CloseBackupableDB(); + + OpenRestoreDB(); + auto start_restore = env_->NowMicros(); + ASSERT_OK(restore_db_->RestoreDBFromLatestBackup(dbname_, dbname_)); + auto restore_time = env_->NowMicros() - start_restore; + CloseRestoreDB(); + auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) / + backupable_options_->restore_rate_limit; + ASSERT_GT(restore_time, 0.9 * rate_limited_restore_time); + ASSERT_LT(restore_time, 1.1 * rate_limited_restore_time); + + AssertBackupConsistency(0, 0, 100000, 100010); + } +} + } // anon namespace } // namespace rocksdb From e6d4b006b69a7eef770f8d60fa261693944dd403 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Mon, 24 Mar 2014 11:59:42 -0700 Subject: [PATCH 2/5] Relax backupable RateLimiter unit test for slow environments --- utilities/backupable/backupable_db_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 88c4af0d3..62717f5dc 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -826,7 +826,7 @@ TEST(BackupableDBTest, RateLimiting) { auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) / backupable_options_->backup_rate_limit; ASSERT_GT(backup_time, 0.9 * rate_limited_backup_time); - ASSERT_LT(backup_time, 1.1 * rate_limited_backup_time); + ASSERT_LT(backup_time, 1.3 * rate_limited_backup_time); CloseBackupableDB(); @@ -838,7 +838,7 @@ TEST(BackupableDBTest, RateLimiting) { auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) / backupable_options_->restore_rate_limit; ASSERT_GT(restore_time, 0.9 * rate_limited_restore_time); - ASSERT_LT(restore_time, 1.1 * rate_limited_restore_time); + ASSERT_LT(restore_time, 1.3 * rate_limited_restore_time); AssertBackupConsistency(0, 0, 100000, 100010); } From cda4006e8724c9960ddd07e833371dd8f07c9886 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Mon, 24 Mar 2014 17:57:13 -0700 Subject: [PATCH 3/5] Enhance partial merge to support multiple arguments Summary: * PartialMerge api now takes a list of operands instead of two operands. * Add min_pertial_merge_operands to Options, indicating the minimum number of operands to trigger partial merge. * This diff is based on Schalk's previous diff (D14601), but it also includes necessary changes such as updating the pure C api for partial merge. Test Plan: * make check all * develop tests for cases where partial merge takes more than two operands. TODOs (from Schalk): * Add test with min_partial_merge_operands > 2. * Perform benchmarks to measure the performance improvements (can probably use results of task #2837810.) * Add description of problem to doc/index.html. * Change wiki pages to reflect the interface changes. Reviewers: haobo, igor, vamsi Reviewed By: haobo CC: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D16815 --- db/builder.cc | 1 + db/c.cc | 64 ++++++------- db/c_test.c | 4 +- db/db_impl.cc | 1 + db/db_iter.cc | 15 --- db/memtable.cc | 13 +-- db/merge_context.h | 8 +- db/merge_helper.cc | 39 ++++---- db/merge_helper.h | 6 +- db/merge_operator.cc | 23 ++++- db/merge_test.cc | 91 +++++++++++++++++-- db/version_set.cc | 13 +-- include/rocksdb/c.h | 4 +- include/rocksdb/merge_operator.h | 51 +++++++++-- include/rocksdb/options.h | 9 ++ util/options.cc | 14 +-- utilities/merge_operators/put.cc | 14 +++ .../string_append/stringappend2.cc | 41 +++++---- .../string_append/stringappend2.h | 24 ++--- utilities/ttl/db_ttl.h | 30 +++--- 20 files changed, 297 insertions(+), 168 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 08e76b539..ce85ae589 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -73,6 +73,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, MergeHelper merge(internal_comparator.user_comparator(), options.merge_operator.get(), options.info_log.get(), + options.min_partial_merge_operands, true /* internal key corruption is not ok */); if (purge) { diff --git a/db/c.cc b/db/c.cc index e4946f351..9084c4a9a 100644 --- a/db/c.cc +++ b/db/c.cc @@ -159,12 +159,10 @@ struct rocksdb_mergeoperator_t : public MergeOperator { const char* const* operands_list, const size_t* operands_list_length, int num_operands, unsigned char* success, size_t* new_value_length); - char* (*partial_merge_)( - void*, - const char* key, size_t key_length, - const char* left_operand, size_t left_operand_length, - const char* right_operand, size_t right_operand_length, - unsigned char* success, size_t* new_value_length); + char* (*partial_merge_)(void*, const char* key, size_t key_length, + const char* const* operands_list, + const size_t* operands_list_length, int num_operands, + unsigned char* success, size_t* new_value_length); void (*delete_value_)( void*, const char* value, size_t value_length); @@ -219,21 +217,23 @@ struct rocksdb_mergeoperator_t : public MergeOperator { return success; } - virtual bool PartialMerge( - const Slice& key, - const Slice& left_operand, - const Slice& right_operand, - std::string* new_value, - Logger* logger) const { + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const { + size_t operand_count = operand_list.size(); + std::vector operand_pointers(operand_count); + std::vector operand_sizes(operand_count); + for (size_t i = 0; i < operand_count; ++i) { + Slice operand(operand_list[i]); + operand_pointers[i] = operand.data(); + operand_sizes[i] = operand.size(); + } unsigned char success; size_t new_value_len; char* tmp_new_value = (*partial_merge_)( - state_, - key.data(), key.size(), - left_operand.data(), left_operand.size(), - right_operand.data(), right_operand.size(), - &success, &new_value_len); + state_, key.data(), key.size(), &operand_pointers[0], &operand_sizes[0], + operand_count, &success, &new_value_len); new_value->assign(tmp_new_value, new_value_len); if (delete_value_ != nullptr) { @@ -1094,24 +1094,18 @@ rocksdb_filterpolicy_t* rocksdb_filterpolicy_create_bloom(int bits_per_key) { } rocksdb_mergeoperator_t* rocksdb_mergeoperator_create( - void* state, - void (*destructor)(void*), - char* (*full_merge)( - void*, - const char* key, size_t key_length, - const char* existing_value, size_t existing_value_length, - const char* const* operands_list, const size_t* operands_list_length, - int num_operands, - unsigned char* success, size_t* new_value_length), - char* (*partial_merge)( - void*, - const char* key, size_t key_length, - const char* left_operand, size_t left_operand_length, - const char* right_operand, size_t right_operand_length, - unsigned char* success, size_t* new_value_length), - void (*delete_value)( - void*, - const char* value, size_t value_length), + void* state, void (*destructor)(void*), + char* (*full_merge)(void*, const char* key, size_t key_length, + const char* existing_value, + size_t existing_value_length, + const char* const* operands_list, + const size_t* operands_list_length, int num_operands, + unsigned char* success, size_t* new_value_length), + char* (*partial_merge)(void*, const char* key, size_t key_length, + const char* const* operands_list, + const size_t* operands_list_length, int num_operands, + unsigned char* success, size_t* new_value_length), + void (*delete_value)(void*, const char* value, size_t value_length), const char* (*name)(void*)) { rocksdb_mergeoperator_t* result = new rocksdb_mergeoperator_t; result->state_ = state; diff --git a/db/c_test.c b/db/c_test.c index a68abca48..f17e37128 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -175,8 +175,8 @@ static char* MergeOperatorFullMerge( static char* MergeOperatorPartialMerge( void* arg, const char* key, size_t key_length, - const char* left_operand, size_t left_operand_length, - const char* right_operand, size_t right_operand_length, + const char* const* operands_list, const size_t* operands_list_length, + int num_operands, unsigned char* success, size_t* new_value_length) { *new_value_length = 4; *success = 1; diff --git a/db/db_impl.cc b/db/db_impl.cc index 5b79807f5..b8dc9cb4b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2463,6 +2463,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, std::vector delete_key; // for compaction filter MergeHelper merge(user_comparator(), options_.merge_operator.get(), options_.info_log.get(), + options_.min_partial_merge_operands, false /* internal key corruption is expected */); auto compaction_filter = options_.compaction_filter; std::unique_ptr compaction_filter_from_factory = nullptr; diff --git a/db/db_iter.cc b/db/db_iter.cc index e7491f7e3..bbf8a8115 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -313,21 +313,6 @@ void DBIter::MergeValuesNewToOld() { // when complete, add result to operands and continue. const Slice& value = iter_->value(); operands.push_front(value.ToString()); - while(operands.size() >= 2) { - // Call user associative-merge until it returns false - if (user_merge_operator_->PartialMerge(ikey.user_key, - Slice(operands[0]), - Slice(operands[1]), - &merge_result, - logger_)) { - operands.pop_front(); - swap(operands.front(), merge_result); - } else { - // Associative merge returns false ==> stack the operands - break; - } - } - } } diff --git a/db/memtable.cc b/db/memtable.cc index b787ec24e..5fefab04b 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -36,7 +36,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options) kWriteBufferSize(options.write_buffer_size), arena_(options.arena_block_size), table_(options.memtable_factory->CreateMemTableRep( - comparator_, &arena_, options.prefix_extractor.get())), + comparator_, &arena_, options.prefix_extractor.get())), flush_in_progress_(false), flush_completed_(false), file_number_(0), @@ -353,17 +353,6 @@ static bool SaveValue(void* arg, const char* entry) { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); *(s->merge_in_progress) = true; merge_context->PushOperand(v); - while (merge_context->GetNumOperands() >= 2) { - // Attempt to associative merge. (Returns true if successful) - if (merge_operator->PartialMerge( - s->key->user_key(), merge_context->GetOperand(0), - merge_context->GetOperand(1), &merge_result, s->logger)) { - merge_context->PushPartialMergeResult(merge_result); - } else { - // Stack them because user can't associative merge - break; - } - } return true; } default: diff --git a/db/merge_context.h b/db/merge_context.h index 91d9f8a01..bf483a827 100644 --- a/db/merge_context.h +++ b/db/merge_context.h @@ -25,12 +25,12 @@ public: operand_list->clear(); } } - // Replace the first two operands of merge_result, which are expected be the - // merge results of them. + // Replace all operands with merge_result, which are expected to be the + // merge result of them. void PushPartialMergeResult(std::string& merge_result) { assert (operand_list); - operand_list->pop_front(); - swap(operand_list->front(), merge_result); + operand_list->clear(); + operand_list->push_front(std::move(merge_result)); } // Push a merge operand void PushOperand(const Slice& operand_slice) { diff --git a/db/merge_helper.cc b/db/merge_helper.cc index e3f3adb1f..cc1dac6c1 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -129,28 +129,10 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // => then continue because we haven't yet seen a Put/Delete. assert(!operands_.empty()); // Should have at least one element in it + // keep queuing keys and operands until we either meet a put / delete + // request or later did a partial merge. keys_.push_front(iter->key().ToString()); operands_.push_front(iter->value().ToString()); - while (operands_.size() >= 2) { - // Returns false when the merge_operator can no longer process it - if (user_merge_operator_->PartialMerge(ikey.user_key, - Slice(operands_[0]), - Slice(operands_[1]), - &merge_result, - logger_)) { - // Merging of operands (associative merge) was successful. - // Replace these frontmost two operands with the merge result - keys_.pop_front(); - operands_.pop_front(); - swap(operands_.front(), merge_result); - } else { - // Merging of operands (associative merge) returned false. - // The user merge_operator does not know how to merge these operands. - // So we just stack them up until we find a Put/Delete or end of key. - break; - } - } - continue; } } @@ -192,6 +174,23 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, RecordTick(stats, NUMBER_MERGE_FAILURES); // Do nothing if not success_. Leave keys() and operands() as they are. } + } else { + // We haven't seen the beginning of the key nor a Put/Delete. + // Attempt to use the user's associative merge function to + // merge the stacked merge operands into a single operand. + + if (operands_.size() >= 2 && + operands_.size() >= min_partial_merge_operands_ && + user_merge_operator_->PartialMergeMulti( + ikey.user_key, + std::deque(operands_.begin(), operands_.end()), + &merge_result, logger_)) { + // Merging of operands (associative merge) was successful. + // Replace operands with the merge result + operands_.clear(); + operands_.push_front(std::move(merge_result)); + keys_.erase(keys_.begin(), keys_.end() - 1); + } } } diff --git a/db/merge_helper.h b/db/merge_helper.h index 6fe9bfb23..5311555a0 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -22,12 +22,13 @@ class Statistics; class MergeHelper { public: MergeHelper(const Comparator* user_comparator, - const MergeOperator* user_merge_operator, - Logger* logger, + const MergeOperator* user_merge_operator, Logger* logger, + unsigned min_partial_merge_operands, bool assert_valid_internal_key) : user_comparator_(user_comparator), user_merge_operator_(user_merge_operator), logger_(logger), + min_partial_merge_operands_(min_partial_merge_operands), assert_valid_internal_key_(assert_valid_internal_key), keys_(), operands_(), @@ -88,6 +89,7 @@ class MergeHelper { const Comparator* user_comparator_; const MergeOperator* user_merge_operator_; Logger* logger_; + unsigned min_partial_merge_operands_; bool assert_valid_internal_key_; // enforce no internal key corruption? // the scratch area that holds the result of MergeUntil diff --git a/db/merge_operator.cc b/db/merge_operator.cc index 7d1ee4e5f..43a8df371 100644 --- a/db/merge_operator.cc +++ b/db/merge_operator.cc @@ -11,6 +11,28 @@ namespace rocksdb { +// The default implementation of PartialMergeMulti, which invokes +// PartialMerge multiple times internally and merges two operands at +// a time. +bool MergeOperator::PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, + Logger* logger) const { + // Simply loop through the operands + std::string temp_value; + Slice temp_slice; + for (const auto& operand : operand_list) { + if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) { + return false; + } + swap(temp_value, *new_value); + temp_slice = Slice(*new_value); + } + + // The result will be in *new_value. All merges succeeded. + return true; +} + // Given a "real" merge from the library, call the user's // associative merge function one-by-one on each of the operands. // NOTE: It is assumed that the client's merge-operator will handle any errors. @@ -46,7 +68,6 @@ bool AssociativeMergeOperator::PartialMerge( const Slice& right_operand, std::string* new_value, Logger* logger) const { - return Merge(key, &left_operand, right_operand, new_value, logger); } diff --git a/db/merge_test.cc b/db/merge_test.cc index 887d8ad42..4b98f0581 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -24,10 +24,14 @@ using namespace rocksdb; namespace { int numMergeOperatorCalls; - void resetNumMergeOperatorCalls() { numMergeOperatorCalls = 0; } + + int num_partial_merge_calls; + void resetNumPartialMergeCalls() { + num_partial_merge_calls = 0; + } } class CountMergeOperator : public AssociativeMergeOperator { @@ -42,6 +46,11 @@ class CountMergeOperator : public AssociativeMergeOperator { std::string* new_value, Logger* logger) const override { ++numMergeOperatorCalls; + if (existing_value == nullptr) { + new_value->assign(value.data(), value.size()); + return true; + } + return mergeOperator_->PartialMerge( key, *existing_value, @@ -50,6 +59,14 @@ class CountMergeOperator : public AssociativeMergeOperator { logger); } + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const { + ++num_partial_merge_calls; + return mergeOperator_->PartialMergeMulti(key, operand_list, new_value, + logger); + } + virtual const char* Name() const override { return "UInt64AddOperator"; } @@ -58,16 +75,16 @@ class CountMergeOperator : public AssociativeMergeOperator { std::shared_ptr mergeOperator_; }; -std::shared_ptr OpenDb( - const string& dbname, - const bool ttl = false, - const unsigned max_successive_merges = 0) { +std::shared_ptr OpenDb(const string& dbname, const bool ttl = false, + const size_t max_successive_merges = 0, + const uint32_t min_partial_merge_operands = 2) { DB* db; StackableDB* sdb; Options options; options.create_if_missing = true; options.merge_operator = std::make_shared(); options.max_successive_merges = max_successive_merges; + options.min_partial_merge_operands = min_partial_merge_operands; Status s; DestroyDB(dbname, Options()); if (ttl) { @@ -306,6 +323,44 @@ void testSuccessiveMerge( } } +void testPartialMerge(Counters* counters, DB* db, int max_merge, int min_merge, + int count) { + FlushOptions o; + o.wait = true; + + // Test case 1: partial merge should be called when the number of merge + // operands exceeds the threshold. + uint64_t tmp_sum = 0; + resetNumPartialMergeCalls(); + for (int i = 1; i <= count; i++) { + counters->assert_add("b", i); + tmp_sum += i; + } + db->Flush(o); + db->CompactRange(nullptr, nullptr); + ASSERT_EQ(tmp_sum, counters->assert_get("b")); + if (count > max_merge) { + // in this case, FullMerge should be called instead. + ASSERT_EQ(num_partial_merge_calls, 0); + } else { + // if count >= min_merge, then partial merge should be called once. + ASSERT_EQ((count >= min_merge), (num_partial_merge_calls == 1)); + } + + // Test case 2: partial merge should not be called when a put is found. + resetNumPartialMergeCalls(); + tmp_sum = 0; + db->Put(rocksdb::WriteOptions(), "c", "10"); + for (int i = 1; i <= count; i++) { + counters->assert_add("c", i); + tmp_sum += i; + } + db->Flush(o); + db->CompactRange(nullptr, nullptr); + ASSERT_EQ(tmp_sum, counters->assert_get("c")); + ASSERT_EQ(num_partial_merge_calls, 0); +} + void testSingleBatchSuccessiveMerge( DB* db, int max_num_merges, @@ -370,20 +425,40 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) { { cout << "Test merge in memtable... \n"; - unsigned maxMerge = 5; - auto db = OpenDb(dbname, use_ttl, maxMerge); + size_t max_merge = 5; + auto db = OpenDb(dbname, use_ttl, max_merge); MergeBasedCounters counters(db, 0); testCounters(counters, db.get(), compact); - testSuccessiveMerge(counters, maxMerge, maxMerge * 2); + testSuccessiveMerge(counters, max_merge, max_merge * 2); testSingleBatchSuccessiveMerge(db.get(), 5, 7); DestroyDB(dbname, Options()); } + { + cout << "Test Partial-Merge\n"; + size_t max_merge = 100; + for (uint32_t min_merge = 5; min_merge < 25; min_merge += 5) { + for (uint32_t count = min_merge - 1; count <= min_merge + 1; count++) { + auto db = OpenDb(dbname, use_ttl, max_merge, min_merge); + MergeBasedCounters counters(db, 0); + testPartialMerge(&counters, db.get(), max_merge, min_merge, count); + DestroyDB(dbname, Options()); + } + { + auto db = OpenDb(dbname, use_ttl, max_merge, min_merge); + MergeBasedCounters counters(db, 0); + testPartialMerge(&counters, db.get(), max_merge, min_merge, + min_merge * 10); + DestroyDB(dbname, Options()); + } + } + } } int main(int argc, char *argv[]) { //TODO: Make this test like a general rocksdb unit-test runTest(argc, test::TmpDir() + "/merge_testdb"); runTest(argc, test::TmpDir() + "/merge_testdbttl", true); // Run test on TTL database + printf("Passed all tests!\n"); return 0; } diff --git a/db/version_set.cc b/db/version_set.cc index 70da5b9d5..3d9b0f128 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -419,18 +419,7 @@ static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key, assert(s->state == kNotFound || s->state == kMerge); s->state = kMerge; merge_contex->PushOperand(v); - while (merge_contex->GetNumOperands() >= 2) { - // Attempt to merge operands together via user associateive merge - if (s->merge_operator->PartialMerge( - s->user_key, merge_contex->GetOperand(0), - merge_contex->GetOperand(1), &merge_result, s->logger)) { - merge_contex->PushPartialMergeResult(merge_result); - } else { - // Associative merge returns false ==> stack the operands - break; - } - } - return true; + return true; default: assert(false); diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 62be94fe4..23c63f24f 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -425,8 +425,8 @@ extern rocksdb_mergeoperator_t* rocksdb_mergeoperator_create( char* (*partial_merge)( void*, const char* key, size_t key_length, - const char* left_operand, size_t left_operand_length, - const char* right_operand, size_t right_operand_length, + const char* const* operands_list, const size_t* operands_list_length, + int num_operands, unsigned char* success, size_t* new_value_length), void (*delete_value)( void*, diff --git a/include/rocksdb/merge_operator.h b/include/rocksdb/merge_operator.h index bd4c36c07..2ae64c1bc 100644 --- a/include/rocksdb/merge_operator.h +++ b/include/rocksdb/merge_operator.h @@ -32,9 +32,9 @@ class Logger; // // b) MergeOperator - the generic class for all the more abstract / complex // operations; one method (FullMerge) to merge a Put/Delete value with a -// merge operand; and another method (PartialMerge) that merges two -// operands together. this is especially useful if your key values have a -// complex structure but you would still like to support client-specific +// merge operand; and another method (PartialMerge) that merges multiple +// operands together. this is especially useful if your key values have +// complex structures but you would still like to support client-specific // incremental updates. // // AssociativeMergeOperator is simpler to implement. MergeOperator is simply @@ -80,6 +80,13 @@ class MergeOperator { // DB::Merge(key, *new_value) would yield the same result as a call // to DB::Merge(key, left_op) followed by DB::Merge(key, right_op). // + // The default implementation of PartialMergeMulti will use this function + // as a helper, for backward compatibility. Any successor class of + // MergeOperator should either implement PartialMerge or PartialMergeMulti, + // although implementing PartialMergeMulti is suggested as it is in general + // more effective to merge multiple operands at a time instead of two + // operands at a time. + // // If it is impossible or infeasible to combine the two operations, // leave new_value unchanged and return false. The library will // internally keep track of the operations, and apply them in the @@ -89,12 +96,38 @@ class MergeOperator { // and simply "return false". For now, the client should simply return // false in any case it cannot perform partial-merge, regardless of reason. // If there is corruption in the data, handle it in the FullMerge() function, - // and return false there. - virtual bool PartialMerge(const Slice& key, - const Slice& left_operand, - const Slice& right_operand, - std::string* new_value, - Logger* logger) const = 0; + // and return false there. The default implementation of PartialMerge will + // always return false. + virtual bool PartialMerge(const Slice& key, const Slice& left_operand, + const Slice& right_operand, std::string* new_value, + Logger* logger) const { + return false; + } + + // This function performs merge when all the operands are themselves merge + // operation types that you would have passed to a DB::Merge() call in the + // same order (front() first) + // (i.e. DB::Merge(key, operand_list[0]), followed by + // DB::Merge(key, operand_list[1]), ...) + // + // PartialMergeMulti should combine them into a single merge operation that is + // saved into *new_value, and then it should return true. *new_value should + // be constructed such that a call to DB::Merge(key, *new_value) would yield + // the same result as subquential individual calls to DB::Merge(key, operand) + // for each operand in operand_list from front() to back(). + // + // The PartialMergeMulti function will be called only when the list of + // operands are long enough. The minimum amount of operands that will be + // passed to the function are specified by the "min_partial_merge_operands" + // option. + // + // In the default implementation, PartialMergeMulti will invoke PartialMerge + // multiple times, where each time it only merges two operands. Developers + // should either implement PartialMergeMulti, or implement PartialMerge which + // is served as the helper function of the default PartialMergeMulti. + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const; // The name of the MergeOperator. Used to check for MergeOperator // mismatches (i.e., a DB created with one MergeOperator is diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index bb676f985..0e5ff577e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -723,6 +723,15 @@ struct Options { // Default: 0 (disabled) size_t max_successive_merges; + // The number of partial merge operands to accumulate before partial + // merge will be performed. Partial merge will not be called + // if the list of values to merge is less than min_partial_merge_operands. + // + // If min_partial_merge_operands < 2, then it will be treated as 2. + // + // Default: 2 + uint32_t min_partial_merge_operands; + // Allow RocksDB to use thread local storage to optimize performance. // Default: true bool allow_thread_local; diff --git a/util/options.cc b/util/options.cc index 17ef3da9f..007584b94 100644 --- a/util/options.cc +++ b/util/options.cc @@ -30,16 +30,15 @@ Options::Options() : comparator(BytewiseComparator()), merge_operator(nullptr), compaction_filter(nullptr), - compaction_filter_factory( - std::shared_ptr( - new DefaultCompactionFilterFactory())), + compaction_filter_factory(std::shared_ptr( + new DefaultCompactionFilterFactory())), create_if_missing(false), error_if_exists(false), paranoid_checks(false), env(Env::Default()), info_log(nullptr), info_log_level(INFO), - write_buffer_size(4<<20), + write_buffer_size(4 << 20), max_write_buffer_number(2), min_write_buffer_number_to_merge(1), max_open_files(1000), @@ -95,7 +94,7 @@ Options::Options() is_fd_close_on_exec(true), skip_log_error_on_recovery(false), stats_dump_period_sec(3600), - block_size_deviation (10), + block_size_deviation(10), advise_random_on_open(true), access_hint_on_compaction_start(NORMAL), use_adaptive_mutex(false), @@ -106,13 +105,14 @@ Options::Options() max_sequential_skip_in_iterations(8), memtable_factory(std::shared_ptr(new SkipListFactory)), table_factory( - std::shared_ptr(new BlockBasedTableFactory())), + std::shared_ptr(new BlockBasedTableFactory())), inplace_update_support(false), inplace_update_num_locks(10000), inplace_callback(nullptr), memtable_prefix_bloom_bits(0), memtable_prefix_bloom_probes(6), max_successive_merges(0), + min_partial_merge_operands(2), allow_thread_local(true) { assert(memtable_factory.get() != nullptr); } @@ -306,6 +306,8 @@ Options::Dump(Logger* log) const inplace_update_support); Log(log, " Options.inplace_update_num_locks: %zd", inplace_update_num_locks); + Log(log, " Options.min_partial_merge_operands: %u", + min_partial_merge_operands); // TODO: easier config for bloom (maybe based on avg key/value size) Log(log, " Options.memtable_prefix_bloom_bits: %d", memtable_prefix_bloom_bits); diff --git a/utilities/merge_operators/put.cc b/utilities/merge_operators/put.cc index e77449d32..333084313 100644 --- a/utilities/merge_operators/put.cc +++ b/utilities/merge_operators/put.cc @@ -1,3 +1,8 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + #include #include "rocksdb/slice.h" #include "rocksdb/merge_operator.h" @@ -38,6 +43,15 @@ class PutOperator : public MergeOperator { return true; } + using MergeOperator::PartialMergeMulti; + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const + override { + new_value->assign(operand_list.back().data(), operand_list.back().size()); + return true; + } + virtual const char* Name() const override { return "PutOperator"; } diff --git a/utilities/merge_operators/string_append/stringappend2.cc b/utilities/merge_operators/string_append/stringappend2.cc index e153a388e..b2e03588f 100644 --- a/utilities/merge_operators/string_append/stringappend2.cc +++ b/utilities/merge_operators/string_append/stringappend2.cc @@ -6,6 +6,7 @@ #include "stringappend2.h" #include +#include #include #include "rocksdb/slice.h" @@ -61,31 +62,39 @@ bool StringAppendTESTOperator::FullMerge( return true; } -bool StringAppendTESTOperator::PartialMerge(const Slice& key, - const Slice& left_operand, - const Slice& right_operand, - std::string* new_value, - Logger* logger) const { +bool StringAppendTESTOperator::PartialMergeMulti( + const Slice& key, const std::deque& operand_list, + std::string* new_value, Logger* logger) const { return false; } // A version of PartialMerge that actually performs "partial merging". // Use this to simulate the exact behaviour of the StringAppendOperator. -bool StringAppendTESTOperator::_AssocPartialMerge(const Slice& key, - const Slice& left_operand, - const Slice& right_operand, - std::string* new_value, - Logger* logger) const { - // Clear the *new_value for writing. +bool StringAppendTESTOperator::_AssocPartialMergeMulti( + const Slice& key, const std::deque& operand_list, + std::string* new_value, Logger* logger) const { + // Clear the *new_value for writing assert(new_value); new_value->clear(); + assert(operand_list.size() >= 2); // Generic append - // Reserve correct size for *new_value, and apply concatenation. - new_value->reserve(left_operand.size() + 1 + right_operand.size()); - new_value->assign(left_operand.data(), left_operand.size()); - new_value->append(1,delim_); - new_value->append(right_operand.data(), right_operand.size()); + // Determine and reserve correct size for *new_value. + size_t size = 0; + for (const auto& operand : operand_list) { + size += operand.size(); + } + size += operand_list.size() - 1; // Delimiters + new_value->reserve(size); + + // Apply concatenation + new_value->assign(operand_list.front().data(), operand_list.front().size()); + + for (std::deque::const_iterator it = operand_list.begin() + 1; + it != operand_list.end(); ++it) { + new_value->append(1, delim_); + new_value->append(it->data(), it->size()); + } return true; } diff --git a/utilities/merge_operators/string_append/stringappend2.h b/utilities/merge_operators/string_append/stringappend2.h index 01a4be4db..5e506ef8f 100644 --- a/utilities/merge_operators/string_append/stringappend2.h +++ b/utilities/merge_operators/string_append/stringappend2.h @@ -11,6 +11,9 @@ */ #pragma once +#include +#include + #include "rocksdb/merge_operator.h" #include "rocksdb/slice.h" @@ -18,8 +21,8 @@ namespace rocksdb { class StringAppendTESTOperator : public MergeOperator { public: - - StringAppendTESTOperator(char delim_char); /// Constructor with delimiter + // Constructor with delimiter + explicit StringAppendTESTOperator(char delim_char); virtual bool FullMerge(const Slice& key, const Slice* existing_value, @@ -27,22 +30,19 @@ class StringAppendTESTOperator : public MergeOperator { std::string* new_value, Logger* logger) const override; - virtual bool PartialMerge(const Slice& key, - const Slice& left_operand, - const Slice& right_operand, - std::string* new_value, - Logger* logger) const override; + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const + override; virtual const char* Name() const override; private: // A version of PartialMerge that actually performs "partial merging". // Use this to simulate the exact behaviour of the StringAppendOperator. - bool _AssocPartialMerge(const Slice& key, - const Slice& left_operand, - const Slice& right_operand, - std::string* new_value, - Logger* logger) const; + bool _AssocPartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const; char delim_; // The delimiter is inserted between elements diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 2fdc664e2..519ae32c7 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -3,6 +3,9 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include +#include + #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/compaction_filter.h" @@ -268,24 +271,27 @@ class TtlMergeOperator : public MergeOperator { } } - virtual bool PartialMerge(const Slice& key, - const Slice& left_operand, - const Slice& right_operand, - std::string* new_value, - Logger* logger) const override { + virtual bool PartialMergeMulti(const Slice& key, + const std::deque& operand_list, + std::string* new_value, Logger* logger) const + override { const uint32_t ts_len = DBWithTTL::kTSLength; + std::deque operands_without_ts; - if (left_operand.size() < ts_len || right_operand.size() < ts_len) { - Log(logger, "Error: Could not remove timestamp from value."); - return false; + for (const auto& operand : operand_list) { + if (operand.size() < ts_len) { + Log(logger, "Error: Could not remove timestamp from value."); + return false; + } + + operands_without_ts.push_back( + Slice(operand.data(), operand.size() - ts_len)); } // Apply the user partial-merge operator (store result in *new_value) assert(new_value); - Slice left_without_ts(left_operand.data(), left_operand.size() - ts_len); - Slice right_without_ts(right_operand.data(), right_operand.size() - ts_len); - if (!user_merge_op_->PartialMerge(key, left_without_ts, right_without_ts, - new_value, logger)) { + if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value, + logger)) { return false; } From b47812fba601e23872349407d565d15f0b41a2fe Mon Sep 17 00:00:00 2001 From: Danny Guo Date: Thu, 9 Jan 2014 17:52:11 -0800 Subject: [PATCH 4/5] [rocksdb] new CompactionFilterV2 API Summary: This diff adds a new CompactionFilterV2 API that roll up the decisions of kv pairs during compactions. These kv pairs must share the same key prefix. They are buffered inside the db. typedef std::vector SliceVector; virtual std::vector Filter(int level, const SliceVector& keys, const SliceVector& existing_values, std::vector* new_values, std::vector* values_changed ) const = 0; Application can override the Filter() function to operate on the buffered kv pairs. More details in the inline documentation. Test Plan: make check. Added unit tests to make sure Keep, Delete, Change all works. Reviewers: haobo CCs: leveldb Differential Revision: https://reviews.facebook.net/D15087 --- HISTORY.md | 2 + db/db_impl.cc | 517 ++++++++++++++++++++++++---- db/db_impl.h | 19 + db/db_test.cc | 254 +++++++++++++- db/merge_helper.cc | 14 +- db/merge_helper.h | 3 +- include/rocksdb/compaction_filter.h | 119 ++++++- include/rocksdb/env.h | 2 +- include/rocksdb/options.h | 5 + util/options.cc | 5 + utilities/ttl/db_ttl.h | 2 +- utilities/ttl/ttl_test.cc | 2 +- 12 files changed, 850 insertions(+), 94 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 2fd37cc01..3b71273aa 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -15,11 +15,13 @@ * Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools * Added a command "checkconsistency" in ldb tool, which checks if file system state matches DB state (file existence and file sizes) +* CompactionFilter::Context is now CompactionFilterContext. It is shared by CompactionFilter and CompactionFilterV2 ### New Features * If we find one truncated record at the end of the MANIFEST or WAL files, we will ignore it. We assume that writers of these records were interrupted and that we can safely ignore it. +* Now compaction filter has a V2 interface. It buffers the kv-pairs sharing the same key prefix, process them in batches, and return the batched results back to DB. ## 2.7.0 (01/28/2014) diff --git a/db/db_impl.cc b/db/db_impl.cc index b8dc9cb4b..95d228ccf 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -70,6 +70,7 @@ namespace rocksdb { int DBImpl::SuperVersion::dummy = 0; void* const DBImpl::SuperVersion::kSVInUse = &DBImpl::SuperVersion::dummy; void* const DBImpl::SuperVersion::kSVObsolete = nullptr; +const std::string kNullString = "NULL"; void DumpLeveldbBuildVersion(Logger * log); @@ -118,12 +119,129 @@ struct DBImpl::CompactionState { } // Create a client visible context of this compaction - CompactionFilter::Context GetFilterContext() { - CompactionFilter::Context context; + CompactionFilterContext GetFilterContext() { + CompactionFilterContext context; context.is_full_compaction = compaction->IsFullCompaction(); context.is_manual_compaction = compaction->IsManualCompaction(); return context; } + + std::vector key_buf_; + std::vector existing_value_buf_; + std::vector key_str_buf_; + std::vector existing_value_str_buf_; + // new_value_buf_ will only be appended if a value changes + std::vector new_value_buf_; + // if values_changed_buf_[i] is true + // new_value_buf_ will add a new entry with the changed value + std::vector value_changed_buf_; + // to_delete_buf_[i] is true iff key_buf_[i] is deleted + std::vector to_delete_buf_; + // buffer for the parsed internal keys, the string buffer is backed + // by key_str_buf_ + std::vector ikey_buf_; + + std::vector other_key_buf_; + std::vector other_value_buf_; + std::vector other_key_str_buf_; + std::vector other_value_str_buf_; + + std::vector combined_key_buf_; + std::vector combined_value_buf_; + + std::string cur_prefix_; + + // Buffers the kv-pair that will be run through compaction filter V2 + // in the future. + void BufferKeyValueSlices(const Slice& key, const Slice& value) { + key_str_buf_.emplace_back(key.ToString()); + existing_value_str_buf_.emplace_back(value.ToString()); + key_buf_.emplace_back(Slice(key_str_buf_.back())); + existing_value_buf_.emplace_back(Slice(existing_value_str_buf_.back())); + + ParsedInternalKey ikey; + ParseInternalKey(key_buf_.back(), &ikey); + ikey_buf_.emplace_back(ikey); + } + + // Buffers the kv-pair that will not be run through compaction filter V2 + // in the future. + void BufferOtherKeyValueSlices(const Slice& key, const Slice& value) { + other_key_str_buf_.emplace_back(key.ToString()); + other_value_str_buf_.emplace_back(value.ToString()); + other_key_buf_.emplace_back(Slice(other_key_str_buf_.back())); + other_value_buf_.emplace_back(Slice(other_value_str_buf_.back())); + } + + // Add a kv-pair to the combined buffer + void AddToCombinedKeyValueSlices(const Slice& key, const Slice& value) { + // The real strings are stored in the batch buffers + combined_key_buf_.emplace_back(key); + combined_value_buf_.emplace_back(value); + } + + // Merging the two buffers + void MergeKeyValueSliceBuffer(const InternalKeyComparator* comparator) { + size_t i = 0; + size_t j = 0; + size_t total_size = key_buf_.size() + other_key_buf_.size(); + combined_key_buf_.reserve(total_size); + combined_value_buf_.reserve(total_size); + + while (i + j < total_size) { + int comp_res = 0; + if (i < key_buf_.size() && j < other_key_buf_.size()) { + comp_res = comparator->Compare(key_buf_[i], other_key_buf_[j]); + } else if (i >= key_buf_.size() && j < other_key_buf_.size()) { + comp_res = 1; + } else if (j >= other_key_buf_.size() && i < key_buf_.size()) { + comp_res = -1; + } + if (comp_res > 0) { + AddToCombinedKeyValueSlices(other_key_buf_[j], other_value_buf_[j]); + j++; + } else if (comp_res < 0) { + AddToCombinedKeyValueSlices(key_buf_[i], existing_value_buf_[i]); + i++; + } + } + } + + void CleanupBatchBuffer() { + to_delete_buf_.clear(); + key_buf_.clear(); + existing_value_buf_.clear(); + key_str_buf_.clear(); + existing_value_str_buf_.clear(); + new_value_buf_.clear(); + value_changed_buf_.clear(); + ikey_buf_.clear(); + + to_delete_buf_.shrink_to_fit(); + key_buf_.shrink_to_fit(); + existing_value_buf_.shrink_to_fit(); + key_str_buf_.shrink_to_fit(); + existing_value_str_buf_.shrink_to_fit(); + new_value_buf_.shrink_to_fit(); + value_changed_buf_.shrink_to_fit(); + ikey_buf_.shrink_to_fit(); + + other_key_buf_.clear(); + other_value_buf_.clear(); + other_key_str_buf_.clear(); + other_value_str_buf_.clear(); + other_key_buf_.shrink_to_fit(); + other_value_buf_.shrink_to_fit(); + other_key_str_buf_.shrink_to_fit(); + other_value_str_buf_.shrink_to_fit(); + } + + void CleanupMergedBuffer() { + combined_key_buf_.clear(); + combined_value_buf_.clear(); + combined_key_buf_.shrink_to_fit(); + combined_value_buf_.shrink_to_fit(); + } }; // Fix user-supplied options to be reasonable @@ -2401,66 +2519,27 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( return 0; } -Status DBImpl::DoCompactionWork(CompactionState* compact, - DeletionState& deletion_state, - LogBuffer* log_buffer) { - assert(compact); - int64_t imm_micros = 0; // Micros spent doing imm_ compactions - Log(options_.info_log, - "Compacting %d@%d + %d@%d files, score %.2f slots available %d", - compact->compaction->num_input_files(0), - compact->compaction->level(), - compact->compaction->num_input_files(1), - compact->compaction->output_level(), - compact->compaction->score(), - options_.max_background_compactions - bg_compaction_scheduled_); - char scratch[2345]; - compact->compaction->Summary(scratch, sizeof(scratch)); - Log(options_.info_log, "Compaction start summary: %s\n", scratch); - - assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0); - assert(compact->builder == nullptr); - assert(!compact->outfile); - - SequenceNumber visible_at_tip = 0; - SequenceNumber earliest_snapshot; - SequenceNumber latest_snapshot = 0; - snapshots_.getAll(compact->existing_snapshots); - if (compact->existing_snapshots.size() == 0) { - // optimize for fast path if there are no snapshots - visible_at_tip = versions_->LastSequence(); - earliest_snapshot = visible_at_tip; - } else { - latest_snapshot = compact->existing_snapshots.back(); - // Add the current seqno as the 'latest' virtual - // snapshot to the end of this list. - compact->existing_snapshots.push_back(versions_->LastSequence()); - earliest_snapshot = compact->existing_snapshots[0]; - } - - // Is this compaction producing files at the bottommost level? - bool bottommost_level = compact->compaction->BottomMostLevel(); - - // Allocate the output file numbers before we release the lock - AllocateCompactionOutputFileNumbers(compact); - - // Release mutex while we're actually doing the compaction work - mutex_.Unlock(); - // flush log buffer immediately after releasing the mutex - log_buffer->FlushBufferToLog(); - - const uint64_t start_micros = env_->NowMicros(); - unique_ptr input(versions_->MakeInputIterator(compact->compaction)); - input->SeekToFirst(); +Status DBImpl::ProcessKeyValueCompaction( + SequenceNumber visible_at_tip, + SequenceNumber earliest_snapshot, + SequenceNumber latest_snapshot, + DeletionState& deletion_state, + bool bottommost_level, + int64_t& imm_micros, + Iterator* input, + CompactionState* compact, + bool is_compaction_v2, + LogBuffer* log_buffer) { + size_t combined_idx = 0; Status status; + std::string compaction_filter_value; ParsedInternalKey ikey; std::string current_user_key; bool has_current_user_key = false; + std::vector delete_key; // for compaction filter SequenceNumber last_sequence_for_key __attribute__((unused)) = kMaxSequenceNumber; SequenceNumber visible_in_snapshot = kMaxSequenceNumber; - std::string compaction_filter_value; - std::vector delete_key; // for compaction filter MergeHelper merge(user_comparator(), options_.merge_operator.get(), options_.info_log.get(), options_.min_partial_merge_operands, @@ -2490,12 +2569,31 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, imm_micros += (env_->NowMicros() - imm_start); } - Slice key = input->key(); - Slice value = input->value(); + Slice key; + Slice value; + // If is_compaction_v2 is on, kv-pairs are reset to the prefix batch. + // This prefix batch should contain results after calling + // compaction_filter_v2. + // + // If is_compaction_v2 is off, this function will go through all the + // kv-pairs in input. + if (!is_compaction_v2) { + key = input->key(); + value = input->value(); + } else { + if (combined_idx >= compact->combined_key_buf_.size()) { + break; + } + assert(combined_idx < compact->combined_key_buf_.size()); + key = compact->combined_key_buf_[combined_idx]; + value = compact->combined_value_buf_[combined_idx]; + + ++combined_idx; + } if (compact->compaction->ShouldStopBefore(key) && compact->builder != nullptr) { - status = FinishCompactionOutputFile(compact, input.get()); + status = FinishCompactionOutputFile(compact, input); if (!status.ok()) { break; } @@ -2515,15 +2613,14 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, } else { if (!has_current_user_key || user_comparator()->Compare(ikey.user_key, - Slice(current_user_key)) != 0) { + Slice(current_user_key)) != 0) { // First occurrence of this user key current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); has_current_user_key = true; last_sequence_for_key = kMaxSequenceNumber; visible_in_snapshot = kMaxSequenceNumber; - // apply the compaction filter to the first occurrence of the user key - if (compaction_filter && + if (compaction_filter && !is_compaction_v2 && ikey.type == kTypeValue && (visible_at_tip || ikey.sequence > latest_snapshot)) { // If the user has specified a compaction filter and the sequence @@ -2535,15 +2632,15 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, compaction_filter_value.clear(); bool to_delete = compaction_filter->Filter(compact->compaction->level(), - ikey.user_key, value, - &compaction_filter_value, - &value_changed); + ikey.user_key, value, + &compaction_filter_value, + &value_changed); if (to_delete) { // make a copy of the original key delete_key.assign(key.data(), key.data() + key.size()); // convert it to a delete UpdateInternalKey(&delete_key[0], delete_key.size(), - ikey.sequence, kTypeDeletion); + ikey.sequence, kTypeDeletion); // anchor the key again key = Slice(&delete_key[0], delete_key.size()); // needed because ikey is backed by key @@ -2565,8 +2662,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, SequenceNumber visible = visible_at_tip ? visible_at_tip : findEarliestVisibleSnapshot(ikey.sequence, - compact->existing_snapshots, - &prev_snapshot); + compact->existing_snapshots, + &prev_snapshot); if (visible_in_snapshot == visible) { // If the earliest snapshot is which this key is visible in @@ -2578,8 +2675,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, drop = true; // (A) RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_NEWER_ENTRY); } else if (ikey.type == kTypeDeletion && - ikey.sequence <= earliest_snapshot && - compact->compaction->IsBaseLevelForKey(ikey.user_key)) { + ikey.sequence <= earliest_snapshot && + compact->compaction->IsBaseLevelForKey(ikey.user_key)) { // For this user key: // (1) there is no data in higher levels // (2) data in lower levels will have larger sequence numbers @@ -2596,8 +2693,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // object to minimize change to the existing flow. Turn out this // logic could also be nicely re-used for memtable flush purge // optimization in BuildTable. - merge.MergeUntil(input.get(), prev_snapshot, bottommost_level, - options_.statistics.get()); + int steps = 0; + merge.MergeUntil(input, prev_snapshot, bottommost_level, + options_.statistics.get(), &steps); + // Skip the Merge ops + combined_idx = combined_idx - 1 + steps; + current_entry_is_merging = true; if (merge.IsSuccess()) { // Successfully found Put/Delete/(end-of-key-range) while merging @@ -2699,7 +2800,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // Close output file if it is big enough if (compact->builder->FileSize() >= compact->compaction->MaxOutputFileSize()) { - status = FinishCompactionOutputFile(compact, input.get()); + status = FinishCompactionOutputFile(compact, input); if (!status.ok()) { break; } @@ -2736,6 +2837,278 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, } } + return status; +} + +void DBImpl::CallCompactionFilterV2(CompactionState* compact, + CompactionFilterV2* compaction_filter_v2) { + if (compact == nullptr || compaction_filter_v2 == nullptr) { + return; + } + + std::vector user_key_buf; + for (const auto& key : compact->ikey_buf_) { + user_key_buf.emplace_back(key.user_key); + } + + // If the user has specified a compaction filter and the sequence + // number is greater than any external snapshot, then invoke the + // filter. + // If the return value of the compaction filter is true, replace + // the entry with a delete marker. + compact->to_delete_buf_ = compaction_filter_v2->Filter( + compact->compaction->level(), + user_key_buf, compact->existing_value_buf_, + &compact->new_value_buf_, + &compact->value_changed_buf_); + + // new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all + // kv-pairs in this compaction run needs to be deleted. + assert(compact->to_delete_buf_.size() == + compact->key_buf_.size()); + assert(compact->to_delete_buf_.size() == + compact->existing_value_buf_.size()); + assert(compact->to_delete_buf_.size() == + compact->value_changed_buf_.size()); + + int new_value_idx = 0; + for (unsigned int i = 0; i < compact->to_delete_buf_.size(); ++i) { + if (compact->to_delete_buf_[i]) { + // update the string buffer directly + // the Slice buffer points to the updated buffer + UpdateInternalKey(&compact->key_str_buf_[i][0], + compact->key_str_buf_[i].size(), + compact->ikey_buf_[i].sequence, + kTypeDeletion); + + // no value associated with delete + compact->existing_value_buf_[i].clear(); + RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_USER); + } else if (compact->value_changed_buf_[i]) { + compact->existing_value_buf_[i] = + Slice(compact->new_value_buf_[new_value_idx++]); + } + } // for +} + +Status DBImpl::DoCompactionWork(CompactionState* compact, + DeletionState& deletion_state, + LogBuffer* log_buffer) { + assert(compact); + compact->CleanupBatchBuffer(); + compact->CleanupMergedBuffer(); + compact->cur_prefix_ = kNullString; + + int64_t imm_micros = 0; // Micros spent doing imm_ compactions + Log(options_.info_log, + "Compacting %d@%d + %d@%d files, score %.2f slots available %d", + compact->compaction->num_input_files(0), + compact->compaction->level(), + compact->compaction->num_input_files(1), + compact->compaction->output_level(), + compact->compaction->score(), + options_.max_background_compactions - bg_compaction_scheduled_); + char scratch[2345]; + compact->compaction->Summary(scratch, sizeof(scratch)); + Log(options_.info_log, "Compaction start summary: %s\n", scratch); + + assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0); + assert(compact->builder == nullptr); + assert(!compact->outfile); + + SequenceNumber visible_at_tip = 0; + SequenceNumber earliest_snapshot; + SequenceNumber latest_snapshot = 0; + snapshots_.getAll(compact->existing_snapshots); + if (compact->existing_snapshots.size() == 0) { + // optimize for fast path if there are no snapshots + visible_at_tip = versions_->LastSequence(); + earliest_snapshot = visible_at_tip; + } else { + latest_snapshot = compact->existing_snapshots.back(); + // Add the current seqno as the 'latest' virtual + // snapshot to the end of this list. + compact->existing_snapshots.push_back(versions_->LastSequence()); + earliest_snapshot = compact->existing_snapshots[0]; + } + + // Is this compaction producing files at the bottommost level? + bool bottommost_level = compact->compaction->BottomMostLevel(); + + // Allocate the output file numbers before we release the lock + AllocateCompactionOutputFileNumbers(compact); + + // Release mutex while we're actually doing the compaction work + mutex_.Unlock(); + + const uint64_t start_micros = env_->NowMicros(); + unique_ptr input(versions_->MakeInputIterator(compact->compaction)); + input->SeekToFirst(); + shared_ptr backup_input( + versions_->MakeInputIterator(compact->compaction)); + backup_input->SeekToFirst(); + + Status status; + ParsedInternalKey ikey; + std::unique_ptr compaction_filter_from_factory_v2 + = nullptr; + auto context = compact->GetFilterContext(); + compaction_filter_from_factory_v2 = + options_.compaction_filter_factory_v2->CreateCompactionFilterV2(context); + auto compaction_filter_v2 = + compaction_filter_from_factory_v2.get(); + + // temp_backup_input always point to the start of the current buffer + // temp_backup_input = backup_input; + // iterate through input, + // 1) buffer ineligible keys and value keys into 2 separate buffers; + // 2) send value_buffer to compaction filter and alternate the values; + // 3) merge value_buffer with ineligible_value_buffer; + // 4) run the modified "compaction" using the old for loop. + if (compaction_filter_v2) { + for (; backup_input->Valid() && !shutting_down_.Acquire_Load(); ) { + // Prioritize immutable compaction work + if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) { + const uint64_t imm_start = env_->NowMicros(); + LogFlush(options_.info_log); + mutex_.Lock(); + if (imm_.IsFlushPending()) { + FlushMemTableToOutputFile(nullptr, deletion_state, log_buffer); + bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary + } + mutex_.Unlock(); + imm_micros += (env_->NowMicros() - imm_start); + } + + Slice key = backup_input->key(); + Slice value = backup_input->value(); + + const SliceTransform* transformer = + options_.compaction_filter_factory_v2->GetPrefixExtractor(); + std::string key_prefix = transformer->Transform(key).ToString(); + if (compact->cur_prefix_ == kNullString) { + compact->cur_prefix_ = key_prefix; + } + if (!ParseInternalKey(key, &ikey)) { + // log error + Log(options_.info_log, "Failed to parse key: %s", + key.ToString().c_str()); + continue; + } else { + // If the prefix remains the same, keep buffering + if (key_prefix == compact->cur_prefix_) { + // Apply the compaction filter V2 to all the kv pairs sharing + // the same prefix + if (ikey.type == kTypeValue && + (visible_at_tip || ikey.sequence > latest_snapshot)) { + // Buffer all keys sharing the same prefix for CompactionFilterV2 + // Iterate through keys to check prefix + compact->BufferKeyValueSlices(key, value); + } else { + // buffer ineligible keys + compact->BufferOtherKeyValueSlices(key, value); + } + backup_input->Next(); + continue; + // finish changing values for eligible keys + } else { + // Now prefix changes, this batch is done. + // Call compaction filter on the buffered values to change the value + if (compact->key_buf_.size() > 0) { + CallCompactionFilterV2(compact, compaction_filter_v2); + } + compact->cur_prefix_ = key_prefix; + } + } + + // Merge this batch of data (values + ineligible keys) + compact->MergeKeyValueSliceBuffer(&internal_comparator_); + + // Done buffering for the current prefix. Spit it out to disk + // Now just iterate through all the kv-pairs + status = ProcessKeyValueCompaction( + visible_at_tip, + earliest_snapshot, + latest_snapshot, + deletion_state, + bottommost_level, + imm_micros, + input.get(), + compact, + true, + log_buffer); + + if (!status.ok()) { + break; + } + + // After writing the kv-pairs, we can safely remove the reference + // to the string buffer and clean them up + compact->CleanupBatchBuffer(); + compact->CleanupMergedBuffer(); + // Buffer the key that triggers the mismatch in prefix + if (ikey.type == kTypeValue && + (visible_at_tip || ikey.sequence > latest_snapshot)) { + compact->BufferKeyValueSlices(key, value); + } else { + compact->BufferOtherKeyValueSlices(key, value); + } + backup_input->Next(); + if (!backup_input->Valid()) { + // If this is the single last value, we need to merge it. + if (compact->key_buf_.size() > 0) { + CallCompactionFilterV2(compact, compaction_filter_v2); + } + compact->MergeKeyValueSliceBuffer(&internal_comparator_); + + status = ProcessKeyValueCompaction( + visible_at_tip, + earliest_snapshot, + latest_snapshot, + deletion_state, + bottommost_level, + imm_micros, + input.get(), + compact, + true, + log_buffer); + + compact->CleanupBatchBuffer(); + compact->CleanupMergedBuffer(); + } + } // done processing all prefix batches + // finish the last batch + if (compact->key_buf_.size() > 0) { + CallCompactionFilterV2(compact, compaction_filter_v2); + } + compact->MergeKeyValueSliceBuffer(&internal_comparator_); + status = ProcessKeyValueCompaction( + visible_at_tip, + earliest_snapshot, + latest_snapshot, + deletion_state, + bottommost_level, + imm_micros, + input.get(), + compact, + true, + log_buffer); + } // checking for compaction filter v2 + + if (!compaction_filter_v2) { + status = ProcessKeyValueCompaction( + visible_at_tip, + earliest_snapshot, + latest_snapshot, + deletion_state, + bottommost_level, + imm_micros, + input.get(), + compact, + false, + log_buffer); + } + if (status.ok() && shutting_down_.Acquire_Load()) { status = Status::ShutdownInProgress( "Database shutdown started during compaction"); diff --git a/db/db_impl.h b/db/db_impl.h index 7ce768d31..6165f93d3 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -36,6 +36,7 @@ class TableCache; class Version; class VersionEdit; class VersionSet; +class CompactionFilterV2; class DBImpl : public DB { public: @@ -366,6 +367,24 @@ class DBImpl : public DB { DeletionState& deletion_state, LogBuffer* log_buffer); + // Call compaction filter if is_compaction_v2 is not true. Then iterate + // through input and compact the kv-pairs + Status ProcessKeyValueCompaction( + SequenceNumber visible_at_tip, + SequenceNumber earliest_snapshot, + SequenceNumber latest_snapshot, + DeletionState& deletion_state, + bool bottommost_level, + int64_t& imm_micros, + Iterator* input, + CompactionState* compact, + bool is_compaction_v2, + LogBuffer* log_buffer); + + // Call compaction_filter_v2->Filter() on kv-pairs in compact + void CallCompactionFilterV2(CompactionState* compact, + CompactionFilterV2* compaction_filter_v2); + Status OpenCompactionOutputFile(CompactionState* compact); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); Status InstallCompactionResults(CompactionState* compact); diff --git a/db/db_test.cc b/db/db_test.cc index 92f6ab840..f881aed98 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2434,7 +2434,7 @@ class KeepFilterFactory : public CompactionFilterFactory { : check_context_(check_context) {} virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilter::Context& context) override { + const CompactionFilterContext& context) override { if (check_context_) { ASSERT_EQ(expect_full_compaction_.load(), context.is_full_compaction); ASSERT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction); @@ -2451,7 +2451,7 @@ class KeepFilterFactory : public CompactionFilterFactory { class DeleteFilterFactory : public CompactionFilterFactory { public: virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilter::Context& context) override { + const CompactionFilterContext& context) override { if (context.is_manual_compaction) { return std::unique_ptr(new DeleteFilter()); } else { @@ -2467,7 +2467,7 @@ class ChangeFilterFactory : public CompactionFilterFactory { explicit ChangeFilterFactory() {} virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilter::Context& context) override { + const CompactionFilterContext& context) override { return std::unique_ptr(new ChangeFilter()); } @@ -3507,7 +3507,7 @@ TEST(DBTest, CompactionFilterWithValueChange) { // verify that all keys now have the new value that // was set by the compaction process. - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 100001; i++) { char key[100]; snprintf(key, sizeof(key), "B%010d", i); std::string newvalue = Get(key); @@ -3570,6 +3570,252 @@ TEST(DBTest, CompactionFilterContextManual) { delete iter; } +class KeepFilterV2 : public CompactionFilterV2 { + public: + virtual std::vector Filter(int level, + const SliceVector& keys, + const SliceVector& existing_values, + std::vector* new_values, + std::vector* values_changed) + const override { + cfilter_count++; + std::vector ret; + new_values->clear(); + values_changed->clear(); + for (unsigned int i = 0; i < keys.size(); ++i) { + values_changed->push_back(false); + ret.push_back(false); + } + return ret; + } + + virtual const char* Name() const override { + return "KeepFilterV2"; + } +}; + +class DeleteFilterV2 : public CompactionFilterV2 { + public: + virtual std::vector Filter(int level, + const SliceVector& keys, + const SliceVector& existing_values, + std::vector* new_values, + std::vector* values_changed) + const override { + cfilter_count++; + new_values->clear(); + values_changed->clear(); + std::vector ret; + for (unsigned int i = 0; i < keys.size(); ++i) { + values_changed->push_back(false); + ret.push_back(true); + } + return ret; + } + + virtual const char* Name() const override { + return "DeleteFilterV2"; + } +}; + +class ChangeFilterV2 : public CompactionFilterV2 { + public: + virtual std::vector Filter(int level, + const SliceVector& keys, + const SliceVector& existing_values, + std::vector* new_values, + std::vector* values_changed) + const override { + std::vector ret; + new_values->clear(); + values_changed->clear(); + for (unsigned int i = 0; i < keys.size(); ++i) { + values_changed->push_back(true); + new_values->push_back(NEW_VALUE); + ret.push_back(false); + } + return ret; + } + + virtual const char* Name() const override { + return "ChangeFilterV2"; + } +}; + +class KeepFilterFactoryV2 : public CompactionFilterFactoryV2 { + public: + explicit KeepFilterFactoryV2(const SliceTransform* prefix_extractor) + : CompactionFilterFactoryV2(prefix_extractor) { } + + virtual std::unique_ptr + CreateCompactionFilterV2( + const CompactionFilterContext& context) override { + return std::unique_ptr(new KeepFilterV2()); + } + + virtual const char* Name() const override { + return "KeepFilterFactoryV2"; + } +}; + +class DeleteFilterFactoryV2 : public CompactionFilterFactoryV2 { + public: + explicit DeleteFilterFactoryV2(const SliceTransform* prefix_extractor) + : CompactionFilterFactoryV2(prefix_extractor) { } + + virtual std::unique_ptr + CreateCompactionFilterV2( + const CompactionFilterContext& context) override { + return std::unique_ptr(new DeleteFilterV2()); + } + + virtual const char* Name() const override { + return "DeleteFilterFactoryV2"; + } +}; + +class ChangeFilterFactoryV2 : public CompactionFilterFactoryV2 { + public: + explicit ChangeFilterFactoryV2(const SliceTransform* prefix_extractor) + : CompactionFilterFactoryV2(prefix_extractor) { } + + virtual std::unique_ptr + CreateCompactionFilterV2( + const CompactionFilterContext& context) override { + return std::unique_ptr(new ChangeFilterV2()); + } + + virtual const char* Name() const override { + return "ChangeFilterFactoryV2"; + } +}; + +TEST(DBTest, CompactionFilterV2) { + Options options = CurrentOptions(); + options.num_levels = 3; + options.max_mem_compaction_level = 0; + // extract prefix + auto prefix_extractor = NewFixedPrefixTransform(8); + options.compaction_filter_factory_v2 + = std::make_shared(prefix_extractor); + // In a testing environment, we can only flush the application + // compaction filter buffer using universal compaction + option_config_ = kUniversalCompaction; + options.compaction_style = (rocksdb::CompactionStyle)1; + Reopen(&options); + + // Write 100K keys, these are written to a few files in L0. + const std::string value(10, 'x'); + for (int i = 0; i < 100000; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%08d%010d", i , i); + Put(key, value); + } + + dbfull()->TEST_FlushMemTable(); + + dbfull()->TEST_CompactRange(0, nullptr, nullptr); + dbfull()->TEST_CompactRange(1, nullptr, nullptr); + + ASSERT_EQ(NumTableFilesAtLevel(0), 1); + + // All the files are in the lowest level. + int count = 0; + int total = 0; + Iterator* iter = dbfull()->TEST_NewInternalIterator(); + iter->SeekToFirst(); + ASSERT_OK(iter->status()); + while (iter->Valid()) { + ParsedInternalKey ikey(Slice(), 0, kTypeValue); + ikey.sequence = -1; + ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); + total++; + if (ikey.sequence != 0) { + count++; + } + iter->Next(); + } + + ASSERT_EQ(total, 100000); + // 1 snapshot only. Since we are using universal compacton, + // the sequence no is cleared for better compression + ASSERT_EQ(count, 1); + delete iter; + + // create a new database with the compaction + // filter in such a way that it deletes all keys + options.compaction_filter_factory_v2 = + std::make_shared(prefix_extractor); + options.create_if_missing = true; + DestroyAndReopen(&options); + + // write all the keys once again. + for (int i = 0; i < 100000; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%08d%010d", i, i); + Put(key, value); + } + + dbfull()->TEST_FlushMemTable(); + ASSERT_NE(NumTableFilesAtLevel(0), 0); + + dbfull()->TEST_CompactRange(0, nullptr, nullptr); + dbfull()->TEST_CompactRange(1, nullptr, nullptr); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + + // Scan the entire database to ensure that nothing is left + iter = db_->NewIterator(ReadOptions()); + iter->SeekToFirst(); + count = 0; + while (iter->Valid()) { + count++; + iter->Next(); + } + + ASSERT_EQ(count, 0); + delete iter; +} + +TEST(DBTest, CompactionFilterV2WithValueChange) { + Options options = CurrentOptions(); + options.num_levels = 3; + options.max_mem_compaction_level = 0; + auto prefix_extractor = NewFixedPrefixTransform(8); + options.compaction_filter_factory_v2 = + std::make_shared(prefix_extractor); + // In a testing environment, we can only flush the application + // compaction filter buffer using universal compaction + option_config_ = kUniversalCompaction; + options.compaction_style = (rocksdb::CompactionStyle)1; + Reopen(&options); + + // Write 100K+1 keys, these are written to a few files + // in L0. We do this so that the current snapshot points + // to the 100001 key.The compaction filter is not invoked + // on keys that are visible via a snapshot because we + // anyways cannot delete it. + const std::string value(10, 'x'); + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%08d%010d", i, i); + Put(key, value); + } + + // push all files to lower levels + dbfull()->TEST_FlushMemTable(); + dbfull()->TEST_CompactRange(0, nullptr, nullptr); + dbfull()->TEST_CompactRange(1, nullptr, nullptr); + + // verify that all keys now have the new value that + // was set by the compaction process. + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%08d%010d", i, i); + std::string newvalue = Get(key); + ASSERT_EQ(newvalue.compare(NEW_VALUE), 0); + } +} + TEST(DBTest, SparseMerge) { do { Options options = CurrentOptions(); diff --git a/db/merge_helper.cc b/db/merge_helper.cc index cc1dac6c1..f5244498d 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -21,7 +21,7 @@ namespace rocksdb { // operands_ stores the list of merge operands encountered while merging. // keys_[i] corresponds to operands_[i] for each i. void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, - bool at_bottom, Statistics* stats) { + bool at_bottom, Statistics* stats, int* steps) { // Get a copy of the internal key, before it's invalidated by iter->Next() // Also maintain the list of merge operands seen. keys_.clear(); @@ -42,6 +42,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, bool hit_the_next_user_key = false; ParsedInternalKey ikey; std::string merge_result; // Temporary value for merge results + if (steps) { + ++(*steps); + } for (iter->Next(); iter->Valid(); iter->Next()) { assert(operands_.size() >= 1); // Should be invariants! assert(keys_.size() == operands_.size()); @@ -91,6 +94,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // move iter to the next entry (before doing anything else) iter->Next(); + if (steps) { + ++(*steps); + } return; } @@ -119,6 +125,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // move iter to the next entry iter->Next(); + if (steps) { + ++(*steps); + } return; } @@ -133,6 +142,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // request or later did a partial merge. keys_.push_front(iter->key().ToString()); operands_.push_front(iter->value().ToString()); + if (steps) { + ++(*steps); + } } } diff --git a/db/merge_helper.h b/db/merge_helper.h index 5311555a0..fef153eb0 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -47,7 +47,8 @@ class MergeHelper { // at_bottom: (IN) true if the iterator covers the bottem level, which means // we could reach the start of the history of this user key. void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0, - bool at_bottom = false, Statistics* stats = nullptr); + bool at_bottom = false, Statistics* stats = nullptr, + int* steps = nullptr); // Query the merge result // These are valid until the next MergeUntil call diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index dfd2f928b..9576bf2ca 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -10,26 +10,27 @@ #define STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_ #include +#include namespace rocksdb { class Slice; +class SliceTransform; + +// Context information of a compaction run +struct CompactionFilterContext { + // Does this compaction run include all data files + bool is_full_compaction; + // Is this compaction requested by the client (true), + // or is it occurring as an automatic compaction process + bool is_manual_compaction; +}; // CompactionFilter allows an application to modify/delete a key-value at // the time of compaction. class CompactionFilter { public: - - // Context information of a compaction run - struct Context { - // Does this compaction run include all data files - bool is_full_compaction; - // Is this compaction requested by the client (true), - // or is it occurring as an automatic compaction process - bool is_manual_compaction; - }; - virtual ~CompactionFilter() {} // The compaction process invokes this @@ -64,14 +65,47 @@ class CompactionFilter { virtual const char* Name() const = 0; }; +// CompactionFilterV2 that buffers kv pairs sharing the same prefix and let +// application layer to make individual decisions for all the kv pairs in the +// buffer. +class CompactionFilterV2 { + public: + virtual ~CompactionFilterV2() {} + + // The compaction process invokes this method for all the kv pairs + // sharing the same prefix. It is a "roll-up" version of CompactionFilter. + // + // Each entry in the return vector indicates if the corresponding kv should + // be preserved in the output of this compaction run. The application can + // inspect the exisitng values of the keys and make decision based on it. + // + // When a value is to be preserved, the application has the option + // to modify the entry in existing_values and pass it back through an entry + // in new_values. A corresponding values_changed entry needs to be set to + // true in this case. Note that the new_values vector contains only changed + // values, i.e. new_values.size() <= values_changed.size(). + // + typedef std::vector SliceVector; + virtual std::vector Filter(int level, + const SliceVector& keys, + const SliceVector& existing_values, + std::vector* new_values, + std::vector* values_changed) + const = 0; + + // Returns a name that identifies this compaction filter. + // The name will be printed to LOG file on start up for diagnosis. + virtual const char* Name() const = 0; +}; + // Each compaction will create a new CompactionFilter allowing the // application to know about different campactions class CompactionFilterFactory { public: - virtual ~CompactionFilterFactory() { }; + virtual ~CompactionFilterFactory() { } virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilter::Context& context) = 0; + const CompactionFilterContext& context) = 0; // Returns a name that identifies this compaction filter factory. virtual const char* Name() const = 0; @@ -82,7 +116,7 @@ class CompactionFilterFactory { class DefaultCompactionFilterFactory : public CompactionFilterFactory { public: virtual std::unique_ptr - CreateCompactionFilter(const CompactionFilter::Context& context) override { + CreateCompactionFilter(const CompactionFilterContext& context) override { return std::unique_ptr(nullptr); } @@ -91,6 +125,65 @@ class DefaultCompactionFilterFactory : public CompactionFilterFactory { } }; +// Each compaction will create a new CompactionFilterV2 +// +// CompactionFilterFactoryV2 enables application to specify a prefix and use +// CompactionFilterV2 to filter kv-pairs in batches. Each batch contains all +// the kv-pairs sharing the same prefix. +// +// This is useful for applications that require grouping kv-pairs in +// compaction filter to make a purge/no-purge decision. For example, if the +// key prefix is user id and the rest of key represents the type of value. +// This batching filter will come in handy if the application's compaction +// filter requires knowledge of all types of values for any user id. +// +class CompactionFilterFactoryV2 { + public: + explicit CompactionFilterFactoryV2(const SliceTransform* prefix_extractor) + : prefix_extractor_(prefix_extractor) { } + + virtual ~CompactionFilterFactoryV2() { } + + virtual std::unique_ptr CreateCompactionFilterV2( + const CompactionFilterContext& context) = 0; + + // Returns a name that identifies this compaction filter factory. + virtual const char* Name() const = 0; + + const SliceTransform* GetPrefixExtractor() const { + return prefix_extractor_; + } + + void SetPrefixExtractor(const SliceTransform* prefix_extractor) { + prefix_extractor_ = prefix_extractor; + } + + private: + // Prefix extractor for compaction filter v2 + // Keys sharing the same prefix will be buffered internally. + // Client can implement a Filter callback function to operate on the buffer + const SliceTransform* prefix_extractor_; +}; + +// Default implementaion of CompactionFilterFactoryV2 which does not +// return any filter +class DefaultCompactionFilterFactoryV2 : public CompactionFilterFactoryV2 { + public: + explicit DefaultCompactionFilterFactoryV2( + const SliceTransform* prefix_extractor) + : CompactionFilterFactoryV2(prefix_extractor) { } + + virtual std::unique_ptr + CreateCompactionFilterV2( + const CompactionFilterContext& context) override { + return std::unique_ptr(nullptr); + } + + virtual const char* Name() const override { + return "DefaultCompactionFilterFactoryV2"; + } +}; + } // namespace rocksdb #endif // STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_ diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index f1c579981..a64425174 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -214,7 +214,7 @@ class Env { virtual void StartThread(void (*function)(void* arg), void* arg) = 0; // Wait for all threads started by StartThread to terminate. - virtual void WaitForJoin() = 0; + virtual void WaitForJoin() {} // Get thread pool queue length for specific thrad pool. virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 0e5ff577e..28b2d58bc 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -22,6 +22,7 @@ namespace rocksdb { class Cache; class CompactionFilter; class CompactionFilterFactory; +class CompactionFilterFactoryV2; class Comparator; class Env; enum InfoLogLevel : unsigned char; @@ -123,6 +124,10 @@ struct Options { // Default: a factory that doesn't provide any object std::shared_ptr compaction_filter_factory; + // Version TWO of the compaction_filter_factory + // It supports rolling compaction + std::shared_ptr compaction_filter_factory_v2; + // If true, the database will be created if it is missing. // Default: false bool create_if_missing; diff --git a/util/options.cc b/util/options.cc index 007584b94..bc325e9c0 100644 --- a/util/options.cc +++ b/util/options.cc @@ -32,6 +32,9 @@ Options::Options() compaction_filter(nullptr), compaction_filter_factory(std::shared_ptr( new DefaultCompactionFilterFactory())), + compaction_filter_factory_v2( + new DefaultCompactionFilterFactoryV2( + NewFixedPrefixTransform(8))), create_if_missing(false), error_if_exists(false), paranoid_checks(false), @@ -131,6 +134,8 @@ Options::Dump(Logger* log) const compaction_filter? compaction_filter->Name() : "None"); Log(log," Options.compaction_filter_factory: %s", compaction_filter_factory->Name()); + Log(log, " Options.compaction_filter_factory_v2: %s", + compaction_filter_factory_v2->Name()); Log(log," Options.memtable_factory: %s", memtable_factory->Name()); Log(log," Options.table_factory: %s", table_factory->Name()); diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 519ae32c7..d23ef88cf 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -192,7 +192,7 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory { user_comp_filter_factory_(comp_filter_factory) { } virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilter::Context& context) { + const CompactionFilterContext& context) { return std::unique_ptr( new TtlCompactionFilter( ttl_, diff --git a/utilities/ttl/ttl_test.cc b/utilities/ttl/ttl_test.cc index 8804d893c..a981cceb8 100644 --- a/utilities/ttl/ttl_test.cc +++ b/utilities/ttl/ttl_test.cc @@ -285,7 +285,7 @@ class TtlTest { virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilter::Context& context) override { + const CompactionFilterContext& context) override { return std::unique_ptr( new TestFilter(kSampleSize_, kNewValue_)); } From ebaff6f9e281024d1dbb36d628303a153a858db9 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 25 Mar 2014 10:24:37 -0700 Subject: [PATCH 5/5] fix the HISTORY file to describe change happened in b47812fb --- HISTORY.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/HISTORY.md b/HISTORY.md index 3b71273aa..905901853 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,7 +7,7 @@ * Removed arena.h from public header files. * By default, checksums are verified on every read from database * Added is_manual_compaction to CompactionFilter::Context -* Added "virtual void WaitForJoin() = 0" in class Env +* Added "virtual void WaitForJoin()" in class Env. Default operation is no-op. * Removed BackupEngine::DeleteBackupsNewerThan() function * Added new option -- verify_checksums_in_compaction * Chagned Options.prefix_extractor from raw pointer to shared_ptr (take ownership)