Avoid allocations/copies for large `GetMergeOperands()` results (#10458)

Summary:
This PR avoids allocations and copies for the result of `GetMergeOperands()` when the average operand size is at least 256 bytes and the total operands size is at least 32KB. The `GetMergeOperands()` already included `PinnableSlice` but was calling `PinSelf()` (i.e., allocating and copying) for each operand. When this optimization takes effect, we instead call `PinSlice()` to skip that allocation and copy. Resources are pinned in order for the `PinnableSlice` to point to valid memory even after `GetMergeOperands()` returns.

The pinned resources include a referenced `SuperVersion`, a `MergingContext`, and a `PinnedIteratorsManager`. They are bundled into a `GetMergeOperandsState`. We use `SharedCleanablePtr` to share that bundle among all `PinnableSlice`s populated by `GetMergeOperands()`. That way, the last `PinnableSlice` to be `Reset()` will cleanup the bundle, including unreferencing the `SuperVersion`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10458

Test Plan:
- new DB level test
- measured benefit/regression in a number of memtable scenarios

Setup command:
```
$ ./db_bench -benchmarks=mergerandom -merge_operator=StringAppendOperator -num=$num -writes=16384 -key_size=16 -value_size=$value_sz -compression_type=none -write_buffer_size=1048576000
```

Benchmark command:
```
./db_bench -threads=$threads -use_existing_db=true -avoid_flush_during_recovery=true -write_buffer_size=1048576000 -benchmarks=readrandomoperands -merge_operator=StringAppendOperator -num=$num -duration=10
```

Worst regression is when a key has many tiny operands:

- Parameters: num=1 (implying 16384 operands per key), value_sz=8, threads=1
- `GetMergeOperands()` latency increases 682 micros -> 800 micros (+17%)

The regression disappears into the noise (<1% difference) if we remove the `Reset()` loop and the size counting loop. The former is arguably needed regardless of this PR as the convention in `Get()` and `MultiGet()` is to `Reset()` the input `PinnableSlice`s at the start. The latter could be optimized to count the size as we accumulate operands rather than after the fact.

Best improvement is when a key has large operands and high concurrency:

- Parameters: num=4 (implying 4096 operands per key), value_sz=2KB, threads=32
- `GetMergeOperands()` latency decreases 11492 micros -> 437 micros (-96%).

Reviewed By: cbi42

Differential Revision: D38336578

Pulled By: ajkr

fbshipit-source-id: 48146d127e04cb7f2d4d2939a2b9dff3aba18258
main
Andrew Kryczka 3 years ago committed by Facebook GitHub Bot
parent d23752f672
commit 504fe4de80
  1. 160
      db/db_impl/db_impl.cc
  2. 2
      db/db_impl/db_impl.h
  3. 42
      db/db_merge_operand_test.cc
  4. 5
      db/pinned_iterators_manager.h
  5. 4
      include/rocksdb/db.h
  6. 4
      include/rocksdb/slice.h
  7. 6
      microbench/db_basic_bench.cc
  8. 1
      tools/db_bench_tool.cc

@ -1674,9 +1674,16 @@ void DBImpl::BackgroundCallPurge() {
}
namespace {
struct IterState {
IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version,
bool _background_purge)
// A `SuperVersionHandle` holds a non-null `SuperVersion*` pointing at a
// `SuperVersion` referenced once for this object. It also contains the state
// needed to clean up the `SuperVersion` reference from outside of `DBImpl`
// using `CleanupSuperVersionHandle()`.
struct SuperVersionHandle {
// `_super_version` must be non-nullptr and `Ref()`'d once as long as the
// `SuperVersionHandle` may use it.
SuperVersionHandle(DBImpl* _db, InstrumentedMutex* _mu,
SuperVersion* _super_version, bool _background_purge)
: db(_db),
mu(_mu),
super_version(_super_version),
@ -1688,35 +1695,49 @@ struct IterState {
bool background_purge;
};
static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
IterState* state = reinterpret_cast<IterState*>(arg1);
static void CleanupSuperVersionHandle(void* arg1, void* /*arg2*/) {
SuperVersionHandle* sv_handle = reinterpret_cast<SuperVersionHandle*>(arg1);
if (state->super_version->Unref()) {
if (sv_handle->super_version->Unref()) {
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
state->mu->Lock();
state->super_version->Cleanup();
state->db->FindObsoleteFiles(&job_context, false, true);
if (state->background_purge) {
state->db->ScheduleBgLogWriterClose(&job_context);
state->db->AddSuperVersionsToFreeQueue(state->super_version);
state->db->SchedulePurge();
sv_handle->mu->Lock();
sv_handle->super_version->Cleanup();
sv_handle->db->FindObsoleteFiles(&job_context, false, true);
if (sv_handle->background_purge) {
sv_handle->db->ScheduleBgLogWriterClose(&job_context);
sv_handle->db->AddSuperVersionsToFreeQueue(sv_handle->super_version);
sv_handle->db->SchedulePurge();
}
state->mu->Unlock();
sv_handle->mu->Unlock();
if (!state->background_purge) {
delete state->super_version;
if (!sv_handle->background_purge) {
delete sv_handle->super_version;
}
if (job_context.HaveSomethingToDelete()) {
state->db->PurgeObsoleteFiles(job_context, state->background_purge);
sv_handle->db->PurgeObsoleteFiles(job_context,
sv_handle->background_purge);
}
job_context.Clean();
}
delete sv_handle;
}
struct GetMergeOperandsState {
MergeContext merge_context;
PinnedIteratorsManager pinned_iters_mgr;
SuperVersionHandle* sv_handle;
};
static void CleanupGetMergeOperandsState(void* arg1, void* /*arg2*/) {
GetMergeOperandsState* state = static_cast<GetMergeOperandsState*>(arg1);
CleanupSuperVersionHandle(state->sv_handle /* arg1 */, nullptr /* arg2 */);
delete state;
}
} // namespace
InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
@ -1761,11 +1782,11 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
allow_unprepared_value);
}
internal_iter = merge_iter_builder.Finish();
IterState* cleanup =
new IterState(this, &mutex_, super_version,
read_options.background_purge_on_iterator_cleanup ||
immutable_db_options_.avoid_unnecessary_blocking_io);
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
SuperVersionHandle* cleanup = new SuperVersionHandle(
this, &mutex_, super_version,
read_options.background_purge_on_iterator_cleanup ||
immutable_db_options_.avoid_unnecessary_blocking_io);
internal_iter->RegisterCleanup(CleanupSuperVersionHandle, cleanup, nullptr);
return internal_iter;
} else {
@ -1801,6 +1822,34 @@ Status DBImpl::Get(const ReadOptions& read_options,
return s;
}
bool DBImpl::ShouldReferenceSuperVersion(const MergeContext& merge_context) {
// If both thresholds are reached, a function returning merge operands as
// `PinnableSlice`s should reference the `SuperVersion` to avoid large and/or
// numerous `memcpy()`s.
//
// The below constants enable the optimization conservatively. They are
// verified to not regress `GetMergeOperands()` latency in the following
// scenarios.
//
// - CPU: two socket Intel(R) Xeon(R) Gold 6138 CPU @ 2.00GHz
// - `GetMergeOperands()` threads: 1 - 32
// - Entry size: 32 bytes - 4KB
// - Merges per key: 1 - 16K
// - LSM component: memtable
//
// TODO(ajkr): expand measurement to SST files.
static const size_t kNumBytesForSvRef = 32768;
static const size_t kLog2AvgBytesForSvRef = 8; // 256 bytes
size_t num_bytes = 0;
for (const Slice& sl : merge_context.GetOperands()) {
num_bytes += sl.size();
}
return num_bytes >= kNumBytesForSvRef &&
(num_bytes >> kLog2AvgBytesForSvRef) >=
merge_context.GetOperands().size();
}
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
GetImplOptions& get_impl_options) {
assert(get_impl_options.value != nullptr ||
@ -1848,6 +1897,14 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
}
}
if (get_impl_options.get_merge_operands_options != nullptr) {
for (int i = 0; i < get_impl_options.get_merge_operands_options
->expected_max_number_of_operands;
++i) {
get_impl_options.merge_operands[i].Reset();
}
}
// Acquire SuperVersion
SuperVersion* sv = GetAndRefSuperVersion(cfd);
@ -1995,19 +2052,68 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
s = Status::Incomplete(
Status::SubCode::KMergeOperandsInsufficientCapacity);
} else {
for (const Slice& sl : merge_context.GetOperands()) {
size += sl.size();
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
// Each operand depends on one of the following resources: `sv`,
// `pinned_iters_mgr`, or `merge_context`. It would be crazy expensive
// to reference `sv` for each operand relying on it because `sv` is
// (un)ref'd in all threads using the DB. Furthermore, we do not track
// on which resource each operand depends.
//
// To solve this, we bundle the resources in a `GetMergeOperandsState`
// and manage them with a `SharedCleanablePtr` shared among the
// `PinnableSlice`s we return. This bundle includes one `sv` reference
// and ownership of the `merge_context` and `pinned_iters_mgr`
// objects.
bool ref_sv = ShouldReferenceSuperVersion(merge_context);
if (ref_sv) {
assert(!merge_context.GetOperands().empty());
SharedCleanablePtr shared_cleanable;
GetMergeOperandsState* state = nullptr;
state = new GetMergeOperandsState();
state->merge_context = std::move(merge_context);
state->pinned_iters_mgr = std::move(pinned_iters_mgr);
sv->Ref();
state->sv_handle = new SuperVersionHandle(
this, &mutex_, sv,
immutable_db_options_.avoid_unnecessary_blocking_io);
shared_cleanable.Allocate();
shared_cleanable->RegisterCleanup(CleanupGetMergeOperandsState,
state /* arg1 */,
nullptr /* arg2 */);
for (size_t i = 0; i < state->merge_context.GetOperands().size();
++i) {
const Slice& sl = state->merge_context.GetOperands()[i];
size += sl.size();
get_impl_options.merge_operands->PinSlice(
sl, nullptr /* cleanable */);
if (i == state->merge_context.GetOperands().size() - 1) {
shared_cleanable.MoveAsCleanupTo(
get_impl_options.merge_operands);
} else {
shared_cleanable.RegisterCopyWith(
get_impl_options.merge_operands);
}
get_impl_options.merge_operands++;
}
} else {
for (const Slice& sl : merge_context.GetOperands()) {
size += sl.size();
get_impl_options.merge_operands->PinSelf(sl);
get_impl_options.merge_operands++;
}
}
}
}
RecordTick(stats_, BYTES_READ, size);
PERF_COUNTER_ADD(get_read_bytes, size);
}
RecordInHistogram(stats_, BYTES_PER_READ, size);
ReturnAndCleanupSuperVersion(cfd, sv);
RecordInHistogram(stats_, BYTES_PER_READ, size);
}
return s;
}

@ -2260,6 +2260,8 @@ class DBImpl : public DB {
Status IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd,
std::string ts_low);
bool ShouldReferenceSuperVersion(const MergeContext& merge_context);
// Lock over the persistent DB state. Non-nullptr iff successfully acquired.
FileLock* db_lock_;

@ -397,6 +397,48 @@ TEST_F(DBMergeOperandTest, BlobDBGetMergeOperandsBasic) {
ASSERT_EQ(values[3], "ed");
}
TEST_F(DBMergeOperandTest, GetMergeOperandsLargeResultOptimization) {
// These constants are chosen to trigger the large result optimization
// (pinning a bundle of `DBImpl` resources).
const int kNumOperands = 1024;
const int kOperandLen = 1024;
Options options;
options.create_if_missing = true;
options.merge_operator = MergeOperators::CreateStringAppendOperator();
DestroyAndReopen(options);
Random rnd(301);
std::vector<std::string> expected_merge_operands;
expected_merge_operands.reserve(kNumOperands);
for (int i = 0; i < kNumOperands; ++i) {
expected_merge_operands.emplace_back(rnd.RandomString(kOperandLen));
ASSERT_OK(Merge("key", expected_merge_operands.back()));
}
std::vector<PinnableSlice> merge_operands(kNumOperands);
GetMergeOperandsOptions merge_operands_info;
merge_operands_info.expected_max_number_of_operands = kNumOperands;
int num_merge_operands = 0;
ASSERT_OK(db_->GetMergeOperands(ReadOptions(), db_->DefaultColumnFamily(),
"key", merge_operands.data(),
&merge_operands_info, &num_merge_operands));
ASSERT_EQ(num_merge_operands, kNumOperands);
// Ensures the large result optimization was used.
for (int i = 0; i < kNumOperands; ++i) {
ASSERT_TRUE(merge_operands[i].IsPinned());
}
// Add a Flush() to change the `SuperVersion` to challenge the resource
// pinning.
ASSERT_OK(Flush());
for (int i = 0; i < kNumOperands; ++i) {
ASSERT_EQ(expected_merge_operands[i], merge_operands[i]);
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -25,6 +25,11 @@ class PinnedIteratorsManager : public Cleanable {
}
}
// Move constructor and move assignment is allowed.
PinnedIteratorsManager(PinnedIteratorsManager&& other) noexcept = default;
PinnedIteratorsManager& operator=(PinnedIteratorsManager&& other) noexcept =
default;
// Enable Iterators pinning
void StartPinning() {
assert(pinning_enabled == false);

@ -581,6 +581,10 @@ class DB {
// `merge_operands`- Points to an array of at-least
// merge_operands_options.expected_max_number_of_operands and the
// caller is responsible for allocating it.
//
// The caller should delete or `Reset()` the `merge_operands` entries when
// they are no longer needed. All `merge_operands` entries must be destroyed
// or `Reset()` before this DB is closed or destroyed.
virtual Status GetMergeOperands(
const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* merge_operands,

@ -162,7 +162,9 @@ class PinnableSlice : public Slice, public Cleanable {
pinned_ = true;
data_ = s.data();
size_ = s.size();
cleanable->DelegateCleanupsTo(this);
if (cleanable != nullptr) {
cleanable->DelegateCleanupsTo(this);
}
assert(pinned_);
}

@ -841,6 +841,9 @@ static void DBGetMergeOperandsInMemtable(benchmark::State& state) {
if (num_value_operands != static_cast<int>(kNumEntriesPerKey)) {
state.SkipWithError("Unexpected number of merge operands found for key");
}
for (auto& value_operand : value_operands) {
value_operand.Reset();
}
}
if (state.thread_index() == 0) {
@ -938,6 +941,9 @@ static void DBGetMergeOperandsInSstFile(benchmark::State& state) {
if (num_value_operands != static_cast<int>(kNumEntriesPerKey)) {
state.SkipWithError("Unexpected number of merge operands found for key");
}
for (auto& value_operand : value_operands) {
value_operand.Reset();
}
}
if (state.thread_index() == 0) {

@ -6008,6 +6008,7 @@ class Benchmark {
bytes += key.size() + pinnable_val.size() + user_timestamp_size_;
for (size_t i = 0; i < pinnable_vals.size(); ++i) {
bytes += pinnable_vals[i].size();
pinnable_vals[i].Reset();
}
} else if (!s.IsNotFound()) {
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());

Loading…
Cancel
Save