Introduce PinnedIteratorsManager (Reduce PinData() overhead / Refactor PinData)

Summary:
While trying to reuse PinData() / ReleasePinnedData() .. to optimize away some memcpys I realized that there is a significant overhead for using PinData() / ReleasePinnedData if they were called many times.
This diff refactor the pinning logic by introducing PinnedIteratorsManager a centralized component that will be created once and will be notified whenever we need to Pin an Iterator. This implementation have much less overhead than the original implementation

Test Plan:
make check -j64
COMPILE_WITH_ASAN=1 make check -j64

Reviewers: yhchiang, sdong, andrewkr

Reviewed By: andrewkr

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D56493
main
Islam AbdelRahman 9 years ago
parent 1995e34d6a
commit d719b095dc
  1. 61
      db/db_iter.cc
  2. 2
      db/db_iter.h
  3. 25
      db/memtable.cc
  4. 66
      db/pinned_iterators_manager.h
  5. 6
      include/rocksdb/iterator.h
  6. 2
      include/rocksdb/table.h
  7. 22
      table/block.h
  8. 23
      table/internal_iterator.h
  9. 106
      table/iterator_wrapper.h
  10. 58
      table/merger.cc
  11. 34
      table/two_level_iterator.cc

@ -16,6 +16,7 @@
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/filename.h" #include "db/filename.h"
#include "db/merge_context.h" #include "db/merge_context.h"
#include "db/pinned_iterators_manager.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
@ -103,7 +104,7 @@ class DBIter: public Iterator {
InternalIterator* iter, SequenceNumber s, bool arena_mode, InternalIterator* iter, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number, uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
const Slice* iterate_upper_bound = nullptr, const Slice* iterate_upper_bound = nullptr,
bool prefix_same_as_start = false) bool prefix_same_as_start = false, bool pin_data = false)
: arena_mode_(arena_mode), : arena_mode_(arena_mode),
env_(env), env_(env),
logger_(ioptions.info_log), logger_(ioptions.info_log),
@ -118,12 +119,21 @@ class DBIter: public Iterator {
version_number_(version_number), version_number_(version_number),
iterate_upper_bound_(iterate_upper_bound), iterate_upper_bound_(iterate_upper_bound),
prefix_same_as_start_(prefix_same_as_start), prefix_same_as_start_(prefix_same_as_start),
iter_pinned_(false) { pin_thru_lifetime_(pin_data) {
RecordTick(statistics_, NO_ITERATORS); RecordTick(statistics_, NO_ITERATORS);
prefix_extractor_ = ioptions.prefix_extractor; prefix_extractor_ = ioptions.prefix_extractor;
max_skip_ = max_sequential_skip_in_iterations; max_skip_ = max_sequential_skip_in_iterations;
if (pin_thru_lifetime_) {
pinned_iters_mgr_.StartPinning();
}
if (iter_) {
iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
}
} }
virtual ~DBIter() { virtual ~DBIter() {
if (pin_thru_lifetime_) {
pinned_iters_mgr_.ReleasePinnedIterators();
}
RecordTick(statistics_, NO_ITERATORS, -1); RecordTick(statistics_, NO_ITERATORS, -1);
local_stats_.BumpGlobalStatistics(statistics_); local_stats_.BumpGlobalStatistics(statistics_);
if (!arena_mode_) { if (!arena_mode_) {
@ -135,9 +145,7 @@ class DBIter: public Iterator {
virtual void SetIter(InternalIterator* iter) { virtual void SetIter(InternalIterator* iter) {
assert(iter_ == nullptr); assert(iter_ == nullptr);
iter_ = iter; iter_ = iter;
if (iter_ && iter_pinned_) { iter_->SetPinnedItersMgr(&pinned_iters_mgr_);
iter_->PinData();
}
} }
virtual bool Valid() const override { return valid_; } virtual bool Valid() const override { return valid_; }
virtual Slice key() const override { virtual Slice key() const override {
@ -156,28 +164,6 @@ class DBIter: public Iterator {
return status_; return status_;
} }
} }
virtual Status PinData() {
Status s;
if (iter_) {
s = iter_->PinData();
}
if (s.ok()) {
// Even if iter_ is nullptr, we set iter_pinned_ to true so that when
// iter_ is updated using SetIter, we Pin it.
iter_pinned_ = true;
}
return s;
}
virtual Status ReleasePinnedData() {
Status s;
if (iter_) {
s = iter_->ReleasePinnedData();
}
if (s.ok()) {
iter_pinned_ = false;
}
return s;
}
virtual Status GetProperty(std::string prop_name, virtual Status GetProperty(std::string prop_name,
std::string* prop) override { std::string* prop) override {
@ -192,7 +178,7 @@ class DBIter: public Iterator {
return Status::OK(); return Status::OK();
} else if (prop_name == "rocksdb.iterator.is-key-pinned") { } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
if (valid_) { if (valid_) {
*prop = (iter_pinned_ && saved_key_.IsKeyPinned()) ? "1" : "0"; *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
} else { } else {
*prop = "Iterator is not valid."; *prop = "Iterator is not valid.";
} }
@ -250,10 +236,13 @@ class DBIter: public Iterator {
const Slice* iterate_upper_bound_; const Slice* iterate_upper_bound_;
IterKey prefix_start_; IterKey prefix_start_;
bool prefix_same_as_start_; bool prefix_same_as_start_;
bool iter_pinned_; // Means that we will pin all data blocks we read as long the Iterator
// is not deleted, will be true if ReadOptions::pin_data is true
const bool pin_thru_lifetime_;
// List of operands for merge operator. // List of operands for merge operator.
MergeContext merge_context_; MergeContext merge_context_;
LocalStatistics local_stats_; LocalStatistics local_stats_;
PinnedIteratorsManager pinned_iters_mgr_;
// No copying allowed // No copying allowed
DBIter(const DBIter&); DBIter(const DBIter&);
@ -890,10 +879,7 @@ Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions,
DBIter* db_iter = DBIter* db_iter =
new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence, new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence,
false, max_sequential_skip_in_iterations, version_number, false, max_sequential_skip_in_iterations, version_number,
iterate_upper_bound, prefix_same_as_start); iterate_upper_bound, prefix_same_as_start, pin_data);
if (pin_data) {
db_iter->PinData();
}
return db_iter; return db_iter;
} }
@ -916,14 +902,10 @@ inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); } inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); } inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); } inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
inline Status ArenaWrappedDBIter::PinData() { return db_iter_->PinData(); }
inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name, inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
std::string* prop) { std::string* prop) {
return db_iter_->GetProperty(prop_name, prop); return db_iter_->GetProperty(prop_name, prop);
} }
inline Status ArenaWrappedDBIter::ReleasePinnedData() {
return db_iter_->ReleasePinnedData();
}
void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1, void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,
void* arg2) { void* arg2) {
db_iter_->RegisterCleanup(function, arg1, arg2); db_iter_->RegisterCleanup(function, arg1, arg2);
@ -941,12 +923,9 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
DBIter* db_iter = DBIter* db_iter =
new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence, new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence,
true, max_sequential_skip_in_iterations, version_number, true, max_sequential_skip_in_iterations, version_number,
iterate_upper_bound, prefix_same_as_start); iterate_upper_bound, prefix_same_as_start, pin_data);
iter->SetDBIter(db_iter); iter->SetDBIter(db_iter);
if (pin_data) {
iter->PinData();
}
return iter; return iter;
} }

@ -63,8 +63,6 @@ class ArenaWrappedDBIter : public Iterator {
virtual Status status() const override; virtual Status status() const override;
void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2); void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);
virtual Status PinData();
virtual Status ReleasePinnedData();
virtual Status GetProperty(std::string prop_name, std::string* prop) override; virtual Status GetProperty(std::string prop_name, std::string* prop) override;
private: private:

@ -15,6 +15,7 @@
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/merge_context.h" #include "db/merge_context.h"
#include "db/pinned_iterators_manager.h"
#include "db/writebuffer.h" #include "db/writebuffer.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
@ -231,6 +232,12 @@ class MemTableIterator : public InternalIterator {
} }
~MemTableIterator() { ~MemTableIterator() {
#ifndef NDEBUG
// Assert that the MemTableIterator is never deleted while
// Pinning is Enabled.
assert(!pinned_iters_mgr_ ||
(pinned_iters_mgr_ && !pinned_iters_mgr_->PinningEnabled()));
#endif
if (arena_mode_) { if (arena_mode_) {
iter_->~Iterator(); iter_->~Iterator();
} else { } else {
@ -238,6 +245,14 @@ class MemTableIterator : public InternalIterator {
} }
} }
#ifndef NDEBUG
virtual void SetPinnedItersMgr(
PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
}
PinnedIteratorsManager* pinned_iters_mgr_ = nullptr;
#endif
virtual bool Valid() const override { return valid_; } virtual bool Valid() const override { return valid_; }
virtual void Seek(const Slice& k) override { virtual void Seek(const Slice& k) override {
PERF_TIMER_GUARD(seek_on_memtable_time); PERF_TIMER_GUARD(seek_on_memtable_time);
@ -285,16 +300,6 @@ class MemTableIterator : public InternalIterator {
virtual Status status() const override { return Status::OK(); } virtual Status status() const override { return Status::OK(); }
virtual Status PinData() override {
// memtable data is always pinned
return Status::OK();
}
virtual Status ReleasePinnedData() override {
// memtable data is always pinned
return Status::OK();
}
virtual bool IsKeyPinned() const override { virtual bool IsKeyPinned() const override {
// memtable data is always pinned // memtable data is always pinned
return true; return true;

@ -0,0 +1,66 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#pragma once
#include <algorithm>
#include <memory>
#include <vector>
#include "table/internal_iterator.h"
namespace rocksdb {
// PinnedIteratorsManager will be notified whenever we need to pin an Iterator
// and it will be responsible for deleting pinned Iterators when they are
// not needed anymore.
class PinnedIteratorsManager {
public:
PinnedIteratorsManager() : pinning_enabled(false), pinned_iters_(nullptr) {}
~PinnedIteratorsManager() { assert(!pinning_enabled); }
// Enable Iterators pinning
void StartPinning() {
if (!pinning_enabled) {
pinning_enabled = true;
if (!pinned_iters_) {
pinned_iters_.reset(new std::vector<InternalIterator*>());
}
}
}
// Is pinning enabled ?
bool PinningEnabled() { return pinning_enabled; }
// Take ownership of iter if pinning is enabled and delete it when
// ReleasePinnedIterators() is called
void PinIteratorIfNeeded(InternalIterator* iter) {
if (!pinning_enabled || !iter) {
return;
}
pinned_iters_->push_back(iter);
}
// Release pinned Iterators
void ReleasePinnedIterators() {
if (pinning_enabled) {
pinning_enabled = false;
// Remove duplicate pointers
std::sort(pinned_iters_->begin(), pinned_iters_->end());
std::unique(pinned_iters_->begin(), pinned_iters_->end());
for (auto& iter : *pinned_iters_) {
delete iter;
}
pinned_iters_->clear();
}
}
private:
bool pinning_enabled;
std::unique_ptr<std::vector<InternalIterator*>> pinned_iters_;
};
} // namespace rocksdb

@ -98,13 +98,11 @@ class Iterator : public Cleanable {
// Property "rocksdb.iterator.is-key-pinned": // Property "rocksdb.iterator.is-key-pinned":
// If returning "1", this means that the Slice returned by key() is valid // If returning "1", this means that the Slice returned by key() is valid
// as long as the iterator is not deleted and ReleasePinnedData() is not // as long as the iterator is not deleted.
// called.
// It is guaranteed to always return "1" if // It is guaranteed to always return "1" if
// - Iterator created with ReadOptions::pin_data = true // - Iterator created with ReadOptions::pin_data = true
// - DB tables were created with // - DB tables were created with
// BlockBasedTableOptions::use_delta_encoding // BlockBasedTableOptions::use_delta_encoding = false.
// set to false.
// Property "rocksdb.iterator.super-version-number": // Property "rocksdb.iterator.super-version-number":
// LSM version used by the iterator. The same format as DB Property // LSM version used by the iterator. The same format as DB Property
// kCurrentSuperVersionNumber. See its comment for more information. // kCurrentSuperVersionNumber. See its comment for more information.

@ -130,7 +130,7 @@ struct BlockBasedTableOptions {
int index_block_restart_interval = 1; int index_block_restart_interval = 1;
// Use delta encoding to compress keys in blocks. // Use delta encoding to compress keys in blocks.
// Iterator::PinData() requires this option to be disabled. // ReadOptions::pin_data requires this option to be disabled.
// //
// Default: true // Default: true
bool use_delta_encoding = true; bool use_delta_encoding = true;

@ -14,11 +14,12 @@
#include <malloc.h> #include <malloc.h>
#endif #endif
#include "db/dbformat.h"
#include "db/pinned_iterators_manager.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "db/dbformat.h"
#include "table/block_prefix_index.h"
#include "table/block_hash_index.h" #include "table/block_hash_index.h"
#include "table/block_prefix_index.h"
#include "table/internal_iterator.h" #include "table/internal_iterator.h"
#include "format.h" #include "format.h"
@ -151,15 +152,18 @@ class BlockIter : public InternalIterator {
virtual void SeekToLast() override; virtual void SeekToLast() override;
virtual Status PinData() override { #ifndef NDEBUG
// block data is always pinned. ~BlockIter() {
return Status::OK(); // Assert that the BlockIter is never deleted while Pinning is Enabled.
assert(!pinned_iters_mgr_ ||
(pinned_iters_mgr_ && !pinned_iters_mgr_->PinningEnabled()));
} }
virtual void SetPinnedItersMgr(
virtual Status ReleasePinnedData() override { PinnedIteratorsManager* pinned_iters_mgr) override {
// block data is always pinned. pinned_iters_mgr_ = pinned_iters_mgr;
return Status::OK();
} }
PinnedIteratorsManager* pinned_iters_mgr_ = nullptr;
#endif
virtual bool IsKeyPinned() const override { return key_.IsKeyPinned(); } virtual bool IsKeyPinned() const override { return key_.IsKeyPinned(); }

@ -12,6 +12,8 @@
namespace rocksdb { namespace rocksdb {
class PinnedIteratorsManager;
class InternalIterator : public Cleanable { class InternalIterator : public Cleanable {
public: public:
InternalIterator() {} InternalIterator() {}
@ -61,20 +63,19 @@ class InternalIterator : public Cleanable {
// satisfied without doing some IO, then this returns Status::Incomplete(). // satisfied without doing some IO, then this returns Status::Incomplete().
virtual Status status() const = 0; virtual Status status() const = 0;
// Make sure that all current and future data blocks used by this iterator // Pass the PinnedIteratorsManager to the Iterator, most Iterators dont
// will be pinned in memory and will not be released except when // communicate with PinnedIteratorsManager so default implementation is no-op
// ReleasePinnedData() is called or the iterator is deleted. // but for Iterators that need to communicate with PinnedIteratorsManager
virtual Status PinData() { return Status::NotSupported(""); } // they will implement this function and use the passed pointer to communicate
// with PinnedIteratorsManager.
// Release all blocks that were pinned because of PinData() and no future virtual void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) {}
// blocks will be pinned.
virtual Status ReleasePinnedData() { return Status::NotSupported(""); }
// If true, this means that the Slice returned by key() is valid as long // If true, this means that the Slice returned by key() is valid as long as
// as the iterator is not deleted and ReleasePinnedData() is not called. // PinnedIteratorsManager::ReleasePinnedIterators is not called and the
// Iterator is not deleted.
// //
// IsKeyPinned() is guaranteed to always return true if // IsKeyPinned() is guaranteed to always return true if
// - PinData() is called // - Iterator is created with ReadOptions::pin_data = true
// - DB tables were created with BlockBasedTableOptions::use_delta_encoding // - DB tables were created with BlockBasedTableOptions::use_delta_encoding
// set to false. // set to false.
virtual bool IsKeyPinned() const { return false; } virtual bool IsKeyPinned() const { return false; }

@ -15,95 +15,47 @@
namespace rocksdb { namespace rocksdb {
// A internal wrapper class with an interface similar to Iterator that // A internal wrapper class with an interface similar to Iterator that caches
// caches the valid() and key() results for an underlying iterator. // the valid(), key() and IsKeyPinned() results for an underlying iterator.
// This can help avoid virtual function calls and also gives better // This can help avoid virtual function calls and also gives better
// cache locality. // cache locality.
class IteratorWrapper { class IteratorWrapper {
public: public:
IteratorWrapper() : iter_(nullptr), iters_pinned_(false), valid_(false) {} IteratorWrapper() : iter_(nullptr), valid_(false) {}
explicit IteratorWrapper(InternalIterator* _iter) explicit IteratorWrapper(InternalIterator* _iter) : iter_(nullptr) {
: iter_(nullptr), iters_pinned_(false) {
Set(_iter); Set(_iter);
} }
~IteratorWrapper() {} ~IteratorWrapper() {}
InternalIterator* iter() const { return iter_; } InternalIterator* iter() const { return iter_; }
// Takes the ownership of "_iter" and will delete it when destroyed. // Set the underlying Iterator to _iter and return
// Next call to Set() will destroy "_iter" except if PinData() was called. // previous underlying Iterator.
void Set(InternalIterator* _iter) { InternalIterator* Set(InternalIterator* _iter) {
if (iters_pinned_ && iter_) { InternalIterator* old_iter = iter_;
// keep old iterator until ReleasePinnedData() is called
pinned_iters_.insert(iter_);
} else {
delete iter_;
}
iter_ = _iter; iter_ = _iter;
if (iter_ == nullptr) { if (iter_ == nullptr) {
valid_ = false; valid_ = false;
} else { } else {
Update(); Update();
if (iters_pinned_) {
// Pin new iterator
Status s = iter_->PinData();
assert(s.ok());
}
}
}
Status PinData() {
Status s;
if (iters_pinned_) {
return s;
}
if (iter_) {
s = iter_->PinData();
} }
return old_iter;
if (s.ok()) {
iters_pinned_ = true;
}
return s;
}
Status ReleasePinnedData() {
Status s;
if (!iters_pinned_) {
return s;
} }
void DeleteIter(bool is_arena_mode) {
if (iter_) { if (iter_) {
s = iter_->ReleasePinnedData(); if (!is_arena_mode) {
} delete iter_;
} else {
if (s.ok()) { iter_->~InternalIterator();
iters_pinned_ = false;
// No need to call ReleasePinnedData() for pinned_iters_
// since we will delete them
DeletePinnedIterators(false);
}
return s;
}
bool IsKeyPinned() const {
assert(iter_);
return iters_pinned_ && iter_->IsKeyPinned();
} }
void DeleteIter(bool is_arena_mode) {
if (iter_ && pinned_iters_.find(iter_) == pinned_iters_.end()) {
DestroyIterator(iter_, is_arena_mode);
} }
DeletePinnedIterators(is_arena_mode);
} }
// Iterator interface methods // Iterator interface methods
bool Valid() const { return valid_; } bool Valid() const { return valid_; }
Slice key() const { assert(Valid()); return key_; } Slice key() const { assert(Valid()); return key_; }
bool IsKeyPinned() const { assert(Valid()); return is_key_pinned_; }
Slice value() const { assert(Valid()); return iter_->value(); } Slice value() const { assert(Valid()); return iter_->value(); }
// Methods below require iter() != nullptr // Methods below require iter() != nullptr
Status status() const { assert(iter_); return iter_->status(); } Status status() const { assert(iter_); return iter_->status(); }
@ -112,39 +64,25 @@ class IteratorWrapper {
void Seek(const Slice& k) { assert(iter_); iter_->Seek(k); Update(); } void Seek(const Slice& k) { assert(iter_); iter_->Seek(k); Update(); }
void SeekToFirst() { assert(iter_); iter_->SeekToFirst(); Update(); } void SeekToFirst() { assert(iter_); iter_->SeekToFirst(); Update(); }
void SeekToLast() { assert(iter_); iter_->SeekToLast(); Update(); } void SeekToLast() { assert(iter_); iter_->SeekToLast(); Update(); }
void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) {
assert(iter_);
iter_->SetPinnedItersMgr(pinned_iters_mgr);
Update();
}
private: private:
void Update() { void Update() {
valid_ = iter_->Valid(); valid_ = iter_->Valid();
if (valid_) { if (valid_) {
key_ = iter_->key(); key_ = iter_->key();
} is_key_pinned_ = iter_->IsKeyPinned();
}
void DeletePinnedIterators(bool is_arena_mode) {
for (auto it : pinned_iters_) {
DestroyIterator(it, is_arena_mode);
}
pinned_iters_.clear();
}
inline void DestroyIterator(InternalIterator* it, bool is_arena_mode) {
if (!is_arena_mode) {
delete it;
} else {
it->~InternalIterator();
} }
} }
InternalIterator* iter_; InternalIterator* iter_;
// If set to true, current and future iterators wont be deleted.
bool iters_pinned_;
// List of past iterators that are pinned and wont be deleted as long as
// iters_pinned_ is true. When we are pinning iterators this set will contain
// iterators of previous data blocks to keep them from being deleted.
std::set<InternalIterator*> pinned_iters_;
bool valid_; bool valid_;
Slice key_; Slice key_;
bool is_key_pinned_;
}; };
class Arena; class Arena;

@ -11,6 +11,7 @@
#include <vector> #include <vector>
#include "db/pinned_iterators_manager.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
@ -18,11 +19,11 @@
#include "table/iter_heap.h" #include "table/iter_heap.h"
#include "table/iterator_wrapper.h" #include "table/iterator_wrapper.h"
#include "util/arena.h" #include "util/arena.h"
#include "util/autovector.h"
#include "util/heap.h" #include "util/heap.h"
#include "util/perf_context_imp.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/sync_point.h" #include "util/sync_point.h"
#include "util/perf_context_imp.h"
#include "util/autovector.h"
namespace rocksdb { namespace rocksdb {
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
@ -37,12 +38,12 @@ class MergingIterator : public InternalIterator {
public: public:
MergingIterator(const Comparator* comparator, InternalIterator** children, MergingIterator(const Comparator* comparator, InternalIterator** children,
int n, bool is_arena_mode) int n, bool is_arena_mode)
: data_pinned_(false), : is_arena_mode_(is_arena_mode),
is_arena_mode_(is_arena_mode),
comparator_(comparator), comparator_(comparator),
current_(nullptr), current_(nullptr),
direction_(kForward), direction_(kForward),
minHeap_(comparator_) { minHeap_(comparator_),
pinned_iters_mgr_(nullptr) {
children_.resize(n); children_.resize(n);
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
children_[i].Set(children[i]); children_[i].Set(children[i]);
@ -58,9 +59,8 @@ class MergingIterator : public InternalIterator {
virtual void AddIterator(InternalIterator* iter) { virtual void AddIterator(InternalIterator* iter) {
assert(direction_ == kForward); assert(direction_ == kForward);
children_.emplace_back(iter); children_.emplace_back(iter);
if (data_pinned_) { if (pinned_iters_mgr_) {
Status s = iter->PinData(); iter->SetPinnedItersMgr(pinned_iters_mgr_);
assert(s.ok());
} }
auto new_wrapper = children_.back(); auto new_wrapper = children_.back();
if (new_wrapper.Valid()) { if (new_wrapper.Valid()) {
@ -243,50 +243,21 @@ class MergingIterator : public InternalIterator {
return s; return s;
} }
virtual Status PinData() override { virtual void SetPinnedItersMgr(
Status s; PinnedIteratorsManager* pinned_iters_mgr) override {
if (data_pinned_) { pinned_iters_mgr_ = pinned_iters_mgr;
return s;
}
for (size_t i = 0; i < children_.size(); i++) {
s = children_[i].PinData();
if (!s.ok()) {
// We failed to pin an iterator, clean up
for (size_t j = 0; j < i; j++) {
children_[j].ReleasePinnedData();
}
break;
}
}
data_pinned_ = s.ok();
return s;
}
virtual Status ReleasePinnedData() override {
Status s;
if (!data_pinned_) {
return s;
}
for (auto& child : children_) { for (auto& child : children_) {
Status release_status = child.ReleasePinnedData(); child.SetPinnedItersMgr(pinned_iters_mgr);
if (s.ok() && !release_status.ok()) {
s = release_status;
}
} }
data_pinned_ = false;
return s;
} }
virtual bool IsKeyPinned() const override { virtual bool IsKeyPinned() const override {
assert(Valid()); assert(Valid());
return current_->IsKeyPinned(); return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
current_->IsKeyPinned();
} }
private: private:
bool data_pinned_;
// Clears heaps for both directions, used when changing direction or seeking // Clears heaps for both directions, used when changing direction or seeking
void ClearHeaps(); void ClearHeaps();
// Ensures that maxHeap_ is initialized when starting to go in the reverse // Ensures that maxHeap_ is initialized when starting to go in the reverse
@ -311,6 +282,7 @@ class MergingIterator : public InternalIterator {
// Max heap is used for reverse iteration, which is way less common than // Max heap is used for reverse iteration, which is way less common than
// forward. Lazily initialize it to save memory. // forward. Lazily initialize it to save memory.
std::unique_ptr<MergerMaxIterHeap> maxHeap_; std::unique_ptr<MergerMaxIterHeap> maxHeap_;
PinnedIteratorsManager* pinned_iters_mgr_;
IteratorWrapper* CurrentForward() const { IteratorWrapper* CurrentForward() const {
assert(direction_ == kForward); assert(direction_ == kForward);

@ -9,6 +9,7 @@
#include "table/two_level_iterator.h" #include "table/two_level_iterator.h"
#include "db/pinned_iterators_manager.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "table/block.h" #include "table/block.h"
@ -26,6 +27,10 @@ class TwoLevelIterator : public InternalIterator {
bool need_free_iter_and_state); bool need_free_iter_and_state);
virtual ~TwoLevelIterator() { virtual ~TwoLevelIterator() {
// Assert that the TwoLevelIterator is never deleted while Pinning is
// Enabled.
assert(!pinned_iters_mgr_ ||
(pinned_iters_mgr_ && !pinned_iters_mgr_->PinningEnabled()));
first_level_iter_.DeleteIter(!need_free_iter_and_state_); first_level_iter_.DeleteIter(!need_free_iter_and_state_);
second_level_iter_.DeleteIter(false); second_level_iter_.DeleteIter(false);
if (need_free_iter_and_state_) { if (need_free_iter_and_state_) {
@ -61,12 +66,17 @@ class TwoLevelIterator : public InternalIterator {
return status_; return status_;
} }
} }
virtual Status PinData() override { return second_level_iter_.PinData(); } virtual void SetPinnedItersMgr(
virtual Status ReleasePinnedData() override { PinnedIteratorsManager* pinned_iters_mgr) override {
return second_level_iter_.ReleasePinnedData(); pinned_iters_mgr_ = pinned_iters_mgr;
first_level_iter_.SetPinnedItersMgr(pinned_iters_mgr);
if (second_level_iter_.iter()) {
second_level_iter_.SetPinnedItersMgr(pinned_iters_mgr);
}
} }
virtual bool IsKeyPinned() const override { virtual bool IsKeyPinned() const override {
return second_level_iter_.iter() ? second_level_iter_.IsKeyPinned() : false; return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
second_level_iter_.iter() && second_level_iter_.IsKeyPinned();
} }
private: private:
@ -82,6 +92,7 @@ class TwoLevelIterator : public InternalIterator {
IteratorWrapper first_level_iter_; IteratorWrapper first_level_iter_;
IteratorWrapper second_level_iter_; // May be nullptr IteratorWrapper second_level_iter_; // May be nullptr
bool need_free_iter_and_state_; bool need_free_iter_and_state_;
PinnedIteratorsManager* pinned_iters_mgr_;
Status status_; Status status_;
// If second_level_iter is non-nullptr, then "data_block_handle_" holds the // If second_level_iter is non-nullptr, then "data_block_handle_" holds the
// "index_value" passed to block_function_ to create the second_level_iter. // "index_value" passed to block_function_ to create the second_level_iter.
@ -93,7 +104,8 @@ TwoLevelIterator::TwoLevelIterator(TwoLevelIteratorState* state,
bool need_free_iter_and_state) bool need_free_iter_and_state)
: state_(state), : state_(state),
first_level_iter_(first_level_iter), first_level_iter_(first_level_iter),
need_free_iter_and_state_(need_free_iter_and_state) {} need_free_iter_and_state_(need_free_iter_and_state),
pinned_iters_mgr_(nullptr) {}
void TwoLevelIterator::Seek(const Slice& target) { void TwoLevelIterator::Seek(const Slice& target) {
if (state_->check_prefix_may_match && if (state_->check_prefix_may_match &&
@ -179,7 +191,17 @@ void TwoLevelIterator::SetSecondLevelIterator(InternalIterator* iter) {
if (second_level_iter_.iter() != nullptr) { if (second_level_iter_.iter() != nullptr) {
SaveError(second_level_iter_.status()); SaveError(second_level_iter_.status());
} }
second_level_iter_.Set(iter);
if (pinned_iters_mgr_ && iter) {
iter->SetPinnedItersMgr(pinned_iters_mgr_);
}
InternalIterator* old_iter = second_level_iter_.Set(iter);
if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
pinned_iters_mgr_->PinIteratorIfNeeded(old_iter);
} else {
delete old_iter;
}
} }
void TwoLevelIterator::InitDataBlock() { void TwoLevelIterator::InitDataBlock() {

Loading…
Cancel
Save