diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 7b0091074..38033b6e0 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1674,9 +1674,16 @@ void DBImpl::BackgroundCallPurge() { } namespace { -struct IterState { - IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version, - bool _background_purge) + +// A `SuperVersionHandle` holds a non-null `SuperVersion*` pointing at a +// `SuperVersion` referenced once for this object. It also contains the state +// needed to clean up the `SuperVersion` reference from outside of `DBImpl` +// using `CleanupSuperVersionHandle()`. +struct SuperVersionHandle { + // `_super_version` must be non-nullptr and `Ref()`'d once as long as the + // `SuperVersionHandle` may use it. + SuperVersionHandle(DBImpl* _db, InstrumentedMutex* _mu, + SuperVersion* _super_version, bool _background_purge) : db(_db), mu(_mu), super_version(_super_version), @@ -1688,35 +1695,49 @@ struct IterState { bool background_purge; }; -static void CleanupIteratorState(void* arg1, void* /*arg2*/) { - IterState* state = reinterpret_cast(arg1); +static void CleanupSuperVersionHandle(void* arg1, void* /*arg2*/) { + SuperVersionHandle* sv_handle = reinterpret_cast(arg1); - if (state->super_version->Unref()) { + if (sv_handle->super_version->Unref()) { // Job id == 0 means that this is not our background process, but rather // user thread JobContext job_context(0); - state->mu->Lock(); - state->super_version->Cleanup(); - state->db->FindObsoleteFiles(&job_context, false, true); - if (state->background_purge) { - state->db->ScheduleBgLogWriterClose(&job_context); - state->db->AddSuperVersionsToFreeQueue(state->super_version); - state->db->SchedulePurge(); + sv_handle->mu->Lock(); + sv_handle->super_version->Cleanup(); + sv_handle->db->FindObsoleteFiles(&job_context, false, true); + if (sv_handle->background_purge) { + sv_handle->db->ScheduleBgLogWriterClose(&job_context); + sv_handle->db->AddSuperVersionsToFreeQueue(sv_handle->super_version); + sv_handle->db->SchedulePurge(); } - state->mu->Unlock(); + sv_handle->mu->Unlock(); - if (!state->background_purge) { - delete state->super_version; + if (!sv_handle->background_purge) { + delete sv_handle->super_version; } if (job_context.HaveSomethingToDelete()) { - state->db->PurgeObsoleteFiles(job_context, state->background_purge); + sv_handle->db->PurgeObsoleteFiles(job_context, + sv_handle->background_purge); } job_context.Clean(); } + delete sv_handle; +} + +struct GetMergeOperandsState { + MergeContext merge_context; + PinnedIteratorsManager pinned_iters_mgr; + SuperVersionHandle* sv_handle; +}; + +static void CleanupGetMergeOperandsState(void* arg1, void* /*arg2*/) { + GetMergeOperandsState* state = static_cast(arg1); + CleanupSuperVersionHandle(state->sv_handle /* arg1 */, nullptr /* arg2 */); delete state; } + } // namespace InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options, @@ -1761,11 +1782,11 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options, allow_unprepared_value); } internal_iter = merge_iter_builder.Finish(); - IterState* cleanup = - new IterState(this, &mutex_, super_version, - read_options.background_purge_on_iterator_cleanup || - immutable_db_options_.avoid_unnecessary_blocking_io); - internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); + SuperVersionHandle* cleanup = new SuperVersionHandle( + this, &mutex_, super_version, + read_options.background_purge_on_iterator_cleanup || + immutable_db_options_.avoid_unnecessary_blocking_io); + internal_iter->RegisterCleanup(CleanupSuperVersionHandle, cleanup, nullptr); return internal_iter; } else { @@ -1801,6 +1822,34 @@ Status DBImpl::Get(const ReadOptions& read_options, return s; } +bool DBImpl::ShouldReferenceSuperVersion(const MergeContext& merge_context) { + // If both thresholds are reached, a function returning merge operands as + // `PinnableSlice`s should reference the `SuperVersion` to avoid large and/or + // numerous `memcpy()`s. + // + // The below constants enable the optimization conservatively. They are + // verified to not regress `GetMergeOperands()` latency in the following + // scenarios. + // + // - CPU: two socket Intel(R) Xeon(R) Gold 6138 CPU @ 2.00GHz + // - `GetMergeOperands()` threads: 1 - 32 + // - Entry size: 32 bytes - 4KB + // - Merges per key: 1 - 16K + // - LSM component: memtable + // + // TODO(ajkr): expand measurement to SST files. + static const size_t kNumBytesForSvRef = 32768; + static const size_t kLog2AvgBytesForSvRef = 8; // 256 bytes + + size_t num_bytes = 0; + for (const Slice& sl : merge_context.GetOperands()) { + num_bytes += sl.size(); + } + return num_bytes >= kNumBytesForSvRef && + (num_bytes >> kLog2AvgBytesForSvRef) >= + merge_context.GetOperands().size(); +} + Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, GetImplOptions& get_impl_options) { assert(get_impl_options.value != nullptr || @@ -1848,6 +1897,14 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, } } + if (get_impl_options.get_merge_operands_options != nullptr) { + for (int i = 0; i < get_impl_options.get_merge_operands_options + ->expected_max_number_of_operands; + ++i) { + get_impl_options.merge_operands[i].Reset(); + } + } + // Acquire SuperVersion SuperVersion* sv = GetAndRefSuperVersion(cfd); @@ -1995,19 +2052,68 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, s = Status::Incomplete( Status::SubCode::KMergeOperandsInsufficientCapacity); } else { - for (const Slice& sl : merge_context.GetOperands()) { - size += sl.size(); - get_impl_options.merge_operands->PinSelf(sl); - get_impl_options.merge_operands++; + // Each operand depends on one of the following resources: `sv`, + // `pinned_iters_mgr`, or `merge_context`. It would be crazy expensive + // to reference `sv` for each operand relying on it because `sv` is + // (un)ref'd in all threads using the DB. Furthermore, we do not track + // on which resource each operand depends. + // + // To solve this, we bundle the resources in a `GetMergeOperandsState` + // and manage them with a `SharedCleanablePtr` shared among the + // `PinnableSlice`s we return. This bundle includes one `sv` reference + // and ownership of the `merge_context` and `pinned_iters_mgr` + // objects. + bool ref_sv = ShouldReferenceSuperVersion(merge_context); + if (ref_sv) { + assert(!merge_context.GetOperands().empty()); + SharedCleanablePtr shared_cleanable; + GetMergeOperandsState* state = nullptr; + state = new GetMergeOperandsState(); + state->merge_context = std::move(merge_context); + state->pinned_iters_mgr = std::move(pinned_iters_mgr); + + sv->Ref(); + + state->sv_handle = new SuperVersionHandle( + this, &mutex_, sv, + immutable_db_options_.avoid_unnecessary_blocking_io); + + shared_cleanable.Allocate(); + shared_cleanable->RegisterCleanup(CleanupGetMergeOperandsState, + state /* arg1 */, + nullptr /* arg2 */); + for (size_t i = 0; i < state->merge_context.GetOperands().size(); + ++i) { + const Slice& sl = state->merge_context.GetOperands()[i]; + size += sl.size(); + + get_impl_options.merge_operands->PinSlice( + sl, nullptr /* cleanable */); + if (i == state->merge_context.GetOperands().size() - 1) { + shared_cleanable.MoveAsCleanupTo( + get_impl_options.merge_operands); + } else { + shared_cleanable.RegisterCopyWith( + get_impl_options.merge_operands); + } + get_impl_options.merge_operands++; + } + } else { + for (const Slice& sl : merge_context.GetOperands()) { + size += sl.size(); + get_impl_options.merge_operands->PinSelf(sl); + get_impl_options.merge_operands++; + } } } } RecordTick(stats_, BYTES_READ, size); PERF_COUNTER_ADD(get_read_bytes, size); } - RecordInHistogram(stats_, BYTES_PER_READ, size); ReturnAndCleanupSuperVersion(cfd, sv); + + RecordInHistogram(stats_, BYTES_PER_READ, size); } return s; } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 2a9a6b9c2..3f718b598 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2260,6 +2260,8 @@ class DBImpl : public DB { Status IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd, std::string ts_low); + bool ShouldReferenceSuperVersion(const MergeContext& merge_context); + // Lock over the persistent DB state. Non-nullptr iff successfully acquired. FileLock* db_lock_; diff --git a/db/db_merge_operand_test.cc b/db/db_merge_operand_test.cc index bca35b258..1ae5f3287 100644 --- a/db/db_merge_operand_test.cc +++ b/db/db_merge_operand_test.cc @@ -397,6 +397,48 @@ TEST_F(DBMergeOperandTest, BlobDBGetMergeOperandsBasic) { ASSERT_EQ(values[3], "ed"); } +TEST_F(DBMergeOperandTest, GetMergeOperandsLargeResultOptimization) { + // These constants are chosen to trigger the large result optimization + // (pinning a bundle of `DBImpl` resources). + const int kNumOperands = 1024; + const int kOperandLen = 1024; + + Options options; + options.create_if_missing = true; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + DestroyAndReopen(options); + + Random rnd(301); + std::vector expected_merge_operands; + expected_merge_operands.reserve(kNumOperands); + for (int i = 0; i < kNumOperands; ++i) { + expected_merge_operands.emplace_back(rnd.RandomString(kOperandLen)); + ASSERT_OK(Merge("key", expected_merge_operands.back())); + } + + std::vector merge_operands(kNumOperands); + GetMergeOperandsOptions merge_operands_info; + merge_operands_info.expected_max_number_of_operands = kNumOperands; + int num_merge_operands = 0; + ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(), + "key", merge_operands.data(), + &merge_operands_info, &num_merge_operands)); + ASSERT_EQ(num_merge_operands, kNumOperands); + + // Ensures the large result optimization was used. + for (int i = 0; i < kNumOperands; ++i) { + ASSERT_TRUE(merge_operands[i].IsPinned()); + } + + // Add a Flush() to change the `SuperVersion` to challenge the resource + // pinning. + ASSERT_OK(Flush()); + + for (int i = 0; i < kNumOperands; ++i) { + ASSERT_EQ(expected_merge_operands[i], merge_operands[i]); + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/pinned_iterators_manager.h b/db/pinned_iterators_manager.h index 1336f5420..0fcf231da 100644 --- a/db/pinned_iterators_manager.h +++ b/db/pinned_iterators_manager.h @@ -25,6 +25,11 @@ class PinnedIteratorsManager : public Cleanable { } } + // Move constructor and move assignment is allowed. + PinnedIteratorsManager(PinnedIteratorsManager&& other) noexcept = default; + PinnedIteratorsManager& operator=(PinnedIteratorsManager&& other) noexcept = + default; + // Enable Iterators pinning void StartPinning() { assert(pinning_enabled == false); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 8e72140d0..2c94a7c58 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -581,6 +581,10 @@ class DB { // `merge_operands`- Points to an array of at-least // merge_operands_options.expected_max_number_of_operands and the // caller is responsible for allocating it. + // + // The caller should delete or `Reset()` the `merge_operands` entries when + // they are no longer needed. All `merge_operands` entries must be destroyed + // or `Reset()` before this DB is closed or destroyed. virtual Status GetMergeOperands( const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* merge_operands, diff --git a/include/rocksdb/slice.h b/include/rocksdb/slice.h index 3722fc4e6..0d7eb5949 100644 --- a/include/rocksdb/slice.h +++ b/include/rocksdb/slice.h @@ -162,7 +162,9 @@ class PinnableSlice : public Slice, public Cleanable { pinned_ = true; data_ = s.data(); size_ = s.size(); - cleanable->DelegateCleanupsTo(this); + if (cleanable != nullptr) { + cleanable->DelegateCleanupsTo(this); + } assert(pinned_); } diff --git a/microbench/db_basic_bench.cc b/microbench/db_basic_bench.cc index a7aa64774..6c70ad21d 100644 --- a/microbench/db_basic_bench.cc +++ b/microbench/db_basic_bench.cc @@ -841,6 +841,9 @@ static void DBGetMergeOperandsInMemtable(benchmark::State& state) { if (num_value_operands != static_cast(kNumEntriesPerKey)) { state.SkipWithError("Unexpected number of merge operands found for key"); } + for (auto& value_operand : value_operands) { + value_operand.Reset(); + } } if (state.thread_index() == 0) { @@ -938,6 +941,9 @@ static void DBGetMergeOperandsInSstFile(benchmark::State& state) { if (num_value_operands != static_cast(kNumEntriesPerKey)) { state.SkipWithError("Unexpected number of merge operands found for key"); } + for (auto& value_operand : value_operands) { + value_operand.Reset(); + } } if (state.thread_index() == 0) { diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 68488cb40..8ee149705 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -6008,6 +6008,7 @@ class Benchmark { bytes += key.size() + pinnable_val.size() + user_timestamp_size_; for (size_t i = 0; i < pinnable_vals.size(); ++i) { bytes += pinnable_vals[i].size(); + pinnable_vals[i].Reset(); } } else if (!s.IsNotFound()) { fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());