Refactor ArenaWrappedDBIter into separate files (#5801)

Summary:
Move definition and implementation for ArenaWrappedDBIter into its own .h/.cc files. Also, change inlining of functions to better comply with the Google C++ style guide.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5801

Test Plan: make check

Differential Revision: D17371012

Pulled By: anand1976

fbshipit-source-id: c1361abc2851575111e357a63d88be3b3d6cb341
main
anand76 5 years ago committed by Facebook Github Bot
parent 6a171724b7
commit 83a6a614e9
  1. 1
      CMakeLists.txt
  2. 1
      TARGETS
  3. 106
      db/arena_wrapped_db_iter.cc
  4. 112
      db/arena_wrapped_db_iter.h
  5. 1
      db/db_blob_index_test.cc
  6. 1
      db/db_impl/db_impl.cc
  7. 1
      db/db_impl/db_impl_readonly.cc
  8. 2
      db/db_impl/db_impl_secondary.cc
  9. 481
      db/db_iter.cc
  10. 339
      db/db_iter.h
  11. 1
      db/db_iterator_test.cc
  12. 1
      src.mk
  13. 1
      utilities/blob_db/blob_db_iterator.h
  14. 1
      utilities/transactions/write_prepared_txn_db.cc
  15. 1
      utilities/transactions/write_unprepared_txn_db.cc

@ -481,6 +481,7 @@ set(SOURCES
cache/clock_cache.cc cache/clock_cache.cc
cache/lru_cache.cc cache/lru_cache.cc
cache/sharded_cache.cc cache/sharded_cache.cc
db/arena_wrapped_db_iter.cc
db/builder.cc db/builder.cc
db/c.cc db/c.cc
db/column_family.cc db/column_family.cc

@ -108,6 +108,7 @@ cpp_library(
"cache/clock_cache.cc", "cache/clock_cache.cc",
"cache/lru_cache.cc", "cache/lru_cache.cc",
"cache/sharded_cache.cc", "cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/builder.cc", "db/builder.cc",
"db/c.cc", "db/c.cc",
"db/column_family.cc", "db/column_family.cc",

@ -0,0 +1,106 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/arena_wrapped_db_iter.h"
#include "memory/arena.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
#include "util/user_comparator_wrapper.h"
namespace rocksdb {
Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
std::string* prop) {
if (prop_name == "rocksdb.iterator.super-version-number") {
// First try to pass the value returned from inner iterator.
if (!db_iter_->GetProperty(prop_name, prop).ok()) {
*prop = ToString(sv_number_);
}
return Status::OK();
}
return db_iter_->GetProperty(prop_name, prop);
}
void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iteration,
uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool allow_blob,
bool allow_refresh) {
auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ = new (mem) DBIter(env, read_options, cf_options, mutable_cf_options,
cf_options.user_comparator, nullptr, sequence,
true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, allow_blob);
sv_number_ = version_number;
allow_refresh_ = allow_refresh;
}
Status ArenaWrappedDBIter::Refresh() {
if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) {
return Status::NotSupported("Creating renew iterator is not allowed.");
}
assert(db_iter_ != nullptr);
// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
// correct behavior. Will be corrected automatically when we take a snapshot
// here for the case of WritePreparedTxnDB.
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
if (sv_number_ != cur_sv_number) {
Env* env = db_iter_->env();
db_iter_->~DBIter();
arena_.~Arena();
new (&arena_) Arena();
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
if (read_callback_) {
read_callback_->Refresh(latest_seq);
}
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_,
allow_refresh_);
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(),
latest_seq);
SetIterUnderDBIter(internal_iter);
} else {
db_iter_->set_sequence(latest_seq);
db_iter_->set_valid(false);
}
return Status::OK();
}
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob, bool allow_refresh) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
iter->Init(env, read_options, cf_options, mutable_cf_options, sequence,
max_sequential_skip_in_iterations, version_number, read_callback,
db_impl, cfd, allow_blob, allow_refresh);
if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback,
allow_blob);
}
return iter;
}
} // namespace rocksdb

@ -0,0 +1,112 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <stdint.h>
#include <string>
#include "db/db_impl/db_impl.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/range_del_aggregator.h"
#include "memory/arena.h"
#include "options/cf_options.h"
#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
#include "util/autovector.h"
namespace rocksdb {
class Arena;
// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed to be allocated. This class is used as an entry point of
// a iterator hierarchy whose memory can be allocated inline. In that way,
// accessing the iterator tree can be more cache friendly. It is also faster
// to allocate.
// When using the class's Iterator interface, the behavior is exactly
// the same as the inner DBIter.
class ArenaWrappedDBIter : public Iterator {
public:
virtual ~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
// Get the arena to be used to allocate memory for DBIter to be wrapped,
// as well as child iterators in it.
virtual Arena* GetArena() { return &arena_; }
virtual ReadRangeDelAggregator* GetRangeDelAggregator() {
return db_iter_->GetRangeDelAggregator();
}
// Set the internal iterator wrapped inside the DB Iterator. Usually it is
// a merging iterator.
virtual void SetIterUnderDBIter(InternalIterator* iter) {
static_cast<DBIter*>(db_iter_)->SetIter(iter);
}
virtual bool Valid() const override { return db_iter_->Valid(); }
virtual void SeekToFirst() override { db_iter_->SeekToFirst(); }
virtual void SeekToLast() override { db_iter_->SeekToLast(); }
virtual void Seek(const Slice& target) override { db_iter_->Seek(target); }
virtual void SeekForPrev(const Slice& target) override {
db_iter_->SeekForPrev(target);
}
virtual void Next() override { db_iter_->Next(); }
virtual void Prev() override { db_iter_->Prev(); }
virtual Slice key() const override { return db_iter_->key(); }
virtual Slice value() const override { return db_iter_->value(); }
virtual Status status() const override { return db_iter_->status(); }
bool IsBlob() const { return db_iter_->IsBlob(); }
virtual Status GetProperty(std::string prop_name, std::string* prop) override;
virtual Status Refresh() override;
void Init(Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob, bool allow_refresh);
// Store some parameters so we can refresh the iterator at a later point
// with these same params
void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl,
ColumnFamilyData* cfd, ReadCallback* read_callback,
bool allow_blob) {
read_options_ = read_options;
db_impl_ = db_impl;
cfd_ = cfd;
read_callback_ = read_callback;
allow_blob_ = allow_blob;
}
private:
DBIter* db_iter_;
Arena arena_;
uint64_t sv_number_;
ColumnFamilyData* cfd_ = nullptr;
DBImpl* db_impl_ = nullptr;
ReadOptions read_options_;
ReadCallback* read_callback_;
bool allow_blob_ = false;
bool allow_refresh_ = true;
};
// Generate the arena wrapped iterator class.
// `db_impl` and `cfd` are used for reneweal. If left null, renewal will not
// be supported.
extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl = nullptr,
ColumnFamilyData* cfd = nullptr, bool allow_blob = false,
bool allow_refresh = true);
} // namespace rocksdb

@ -12,6 +12,7 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "db/arena_wrapped_db_iter.h"
#include "db/column_family.h" #include "db/column_family.h"
#include "db/db_iter.h" #include "db/db_iter.h"
#include "db/db_test_util.h" #include "db/db_test_util.h"

@ -25,6 +25,7 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "db/arena_wrapped_db_iter.h"
#include "db/builder.h" #include "db/builder.h"
#include "db/compaction/compaction_job.h" #include "db/compaction/compaction_job.h"
#include "db/db_info_dumper.h" #include "db/db_info_dumper.h"

@ -3,6 +3,7 @@
// COPYING file in the root directory) and Apache 2.0 License // COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include "db/arena_wrapped_db_iter.h"
#include "db/db_impl/db_impl_readonly.h" #include "db/db_impl/db_impl_readonly.h"
#include "db/compacted_db_impl.h" #include "db/compacted_db_impl.h"

@ -7,7 +7,7 @@
#include <cinttypes> #include <cinttypes>
#include "db/db_iter.h" #include "db/arena_wrapped_db_iter.h"
#include "db/merge_context.h" #include "db/merge_context.h"
#include "logging/auto_roll_logger.h" #include "logging/auto_roll_logger.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"

@ -46,323 +46,72 @@ static void DumpInternalIter(Iterator* iter) {
} }
#endif #endif
// Memtables and sstables that make the DB representation contain DBIter::DBIter(Env* _env, const ReadOptions& read_options,
// (userkey,seq,type) => uservalue entries. DBIter const ImmutableCFOptions& cf_options,
// combines multiple entries for the same userkey found in the DB const MutableCFOptions& mutable_cf_options, const Comparator* cmp,
// representation into a single entry while accounting for sequence InternalIterator* iter, SequenceNumber s, bool arena_mode,
// numbers, deletion markers, overwrites, etc. uint64_t max_sequential_skip_in_iterations,
class DBIter final: public Iterator { ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
public: bool allow_blob)
// The following is grossly complicated. TODO: clean it up : env_(_env),
// Which direction is the iterator currently moving? logger_(cf_options.info_log),
// (1) When moving forward: user_comparator_(cmp),
// (1a) if current_entry_is_merged_ = false, the internal iterator is merge_operator_(cf_options.merge_operator),
// positioned at the exact entry that yields this->key(), this->value() iter_(iter),
// (1b) if current_entry_is_merged_ = true, the internal iterator is read_callback_(read_callback),
// positioned immediately after the last entry that contributed to the sequence_(s),
// current this->value(). That entry may or may not have key equal to statistics_(cf_options.statistics),
// this->key(). num_internal_keys_skipped_(0),
// (2) When moving backwards, the internal iterator is positioned iterate_lower_bound_(read_options.iterate_lower_bound),
// just before all entries whose user key == this->key(). iterate_upper_bound_(read_options.iterate_upper_bound),
enum Direction { direction_(kForward),
kForward, valid_(false),
kReverse current_entry_is_merged_(false),
}; is_key_seqnum_zero_(false),
prefix_same_as_start_(read_options.prefix_same_as_start),
// LocalStatistics contain Statistics counters that will be aggregated per pin_thru_lifetime_(read_options.pin_data),
// each iterator instance and then will be sent to the global statistics when total_order_seek_(read_options.total_order_seek),
// the iterator is destroyed. allow_blob_(allow_blob),
// is_blob_(false),
// The purpose of this approach is to avoid perf regression happening arena_mode_(arena_mode),
// when multiple threads bump the atomic counters from a DBIter::Next(). range_del_agg_(&cf_options.internal_comparator, s),
struct LocalStatistics { db_impl_(db_impl),
explicit LocalStatistics() { ResetCounters(); } cfd_(cfd),
start_seqnum_(read_options.iter_start_seqnum) {
void ResetCounters() { RecordTick(statistics_, NO_ITERATOR_CREATED);
next_count_ = 0; prefix_extractor_ = mutable_cf_options.prefix_extractor.get();
next_found_count_ = 0; max_skip_ = max_sequential_skip_in_iterations;
prev_count_ = 0; max_skippable_internal_keys_ = read_options.max_skippable_internal_keys;
prev_found_count_ = 0; if (pin_thru_lifetime_) {
bytes_read_ = 0; pinned_iters_mgr_.StartPinning();
skip_count_ = 0; }
} if (iter_.iter()) {
void BumpGlobalStatistics(Statistics* global_statistics) {
RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
RecordTick(global_statistics, NUMBER_ITER_SKIP, skip_count_);
PERF_COUNTER_ADD(iter_read_bytes, bytes_read_);
ResetCounters();
}
// Map to Tickers::NUMBER_DB_NEXT
uint64_t next_count_;
// Map to Tickers::NUMBER_DB_NEXT_FOUND
uint64_t next_found_count_;
// Map to Tickers::NUMBER_DB_PREV
uint64_t prev_count_;
// Map to Tickers::NUMBER_DB_PREV_FOUND
uint64_t prev_found_count_;
// Map to Tickers::ITER_BYTES_READ
uint64_t bytes_read_;
// Map to Tickers::NUMBER_ITER_SKIP
uint64_t skip_count_;
};
DBIter(Env* _env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const Comparator* cmp,
InternalIterator* iter, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob)
: env_(_env),
logger_(cf_options.info_log),
user_comparator_(cmp),
merge_operator_(cf_options.merge_operator),
iter_(iter),
read_callback_(read_callback),
sequence_(s),
statistics_(cf_options.statistics),
num_internal_keys_skipped_(0),
iterate_lower_bound_(read_options.iterate_lower_bound),
iterate_upper_bound_(read_options.iterate_upper_bound),
direction_(kForward),
valid_(false),
current_entry_is_merged_(false),
is_key_seqnum_zero_(false),
prefix_same_as_start_(read_options.prefix_same_as_start),
pin_thru_lifetime_(read_options.pin_data),
total_order_seek_(read_options.total_order_seek),
allow_blob_(allow_blob),
is_blob_(false),
arena_mode_(arena_mode),
range_del_agg_(&cf_options.internal_comparator, s),
db_impl_(db_impl),
cfd_(cfd),
start_seqnum_(read_options.iter_start_seqnum) {
RecordTick(statistics_, NO_ITERATOR_CREATED);
prefix_extractor_ = mutable_cf_options.prefix_extractor.get();
max_skip_ = max_sequential_skip_in_iterations;
max_skippable_internal_keys_ = read_options.max_skippable_internal_keys;
if (pin_thru_lifetime_) {
pinned_iters_mgr_.StartPinning();
}
if (iter_.iter()) {
iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
}
}
// No copying allowed
DBIter(const DBIter&) = delete;
void operator=(const DBIter&) = delete;
~DBIter() override {
// Release pinned data if any
if (pinned_iters_mgr_.PinningEnabled()) {
pinned_iters_mgr_.ReleasePinnedData();
}
RecordTick(statistics_, NO_ITERATOR_DELETED);
ResetInternalKeysSkippedCounter();
local_stats_.BumpGlobalStatistics(statistics_);
iter_.DeleteIter(arena_mode_);
}
virtual void SetIter(InternalIterator* iter) {
assert(iter_.iter() == nullptr);
iter_.Set(iter);
iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_); iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
} }
virtual ReadRangeDelAggregator* GetRangeDelAggregator() { }
return &range_del_agg_;
}
bool Valid() const override { return valid_; }
Slice key() const override {
assert(valid_);
if(start_seqnum_ > 0) {
return saved_key_.GetInternalKey();
} else {
return saved_key_.GetUserKey();
}
}
Slice value() const override {
assert(valid_);
if (current_entry_is_merged_) {
// If pinned_value_ is set then the result of merge operator is one of
// the merge operands and we should return it.
return pinned_value_.data() ? pinned_value_ : saved_value_;
} else if (direction_ == kReverse) {
return pinned_value_;
} else {
return iter_.value();
}
}
Status status() const override {
if (status_.ok()) {
return iter_.status();
} else {
assert(!valid_);
return status_;
}
}
bool IsBlob() const {
assert(valid_ && (allow_blob_ || !is_blob_));
return is_blob_;
}
Status GetProperty(std::string prop_name, std::string* prop) override {
if (prop == nullptr) {
return Status::InvalidArgument("prop is nullptr");
}
if (prop_name == "rocksdb.iterator.super-version-number") {
// First try to pass the value returned from inner iterator.
return iter_.iter()->GetProperty(prop_name, prop);
} else if (prop_name == "rocksdb.iterator.is-key-pinned") {
if (valid_) {
*prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
} else {
*prop = "Iterator is not valid.";
}
return Status::OK();
} else if (prop_name == "rocksdb.iterator.internal-key") {
*prop = saved_key_.GetUserKey().ToString();
return Status::OK();
}
return Status::InvalidArgument("Unidentified property.");
}
inline void Next() final override;
inline void Prev() final override;
inline void Seek(const Slice& target) final override;
inline void SeekForPrev(const Slice& target) final override;
inline void SeekToFirst() final override;
inline void SeekToLast() final override;
Env* env() { return env_; }
void set_sequence(uint64_t s) {
sequence_ = s;
if (read_callback_) {
read_callback_->Refresh(s);
}
}
void set_valid(bool v) { valid_ = v; }
private:
// For all methods in this block:
// PRE: iter_->Valid() && status_.ok()
// Return false if there was an error, and status() is non-ok, valid_ = false;
// in this case callers would usually stop what they were doing and return.
bool ReverseToForward();
bool ReverseToBackward();
bool FindValueForCurrentKey();
bool FindValueForCurrentKeyUsingSeek();
bool FindUserKeyBeforeSavedKey();
inline bool FindNextUserEntry(bool skipping, bool prefix_check);
inline bool FindNextUserEntryInternal(bool skipping, bool prefix_check);
bool ParseKey(ParsedInternalKey* key);
bool MergeValuesNewToOld();
void PrevInternal();
bool TooManyInternalKeysSkipped(bool increment = true);
inline bool IsVisible(SequenceNumber sequence);
// Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
// is called
void TempPinData() {
if (!pin_thru_lifetime_) {
pinned_iters_mgr_.StartPinning();
}
}
// Release blocks pinned by TempPinData() Status DBIter::GetProperty(std::string prop_name, std::string* prop) {
void ReleaseTempPinnedData() { if (prop == nullptr) {
if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) { return Status::InvalidArgument("prop is nullptr");
pinned_iters_mgr_.ReleasePinnedData();
}
} }
if (prop_name == "rocksdb.iterator.super-version-number") {
inline void ClearSavedValue() { // First try to pass the value returned from inner iterator.
if (saved_value_.capacity() > 1048576) { return iter_.iter()->GetProperty(prop_name, prop);
std::string empty; } else if (prop_name == "rocksdb.iterator.is-key-pinned") {
swap(empty, saved_value_);
} else {
saved_value_.clear();
}
}
inline void ResetInternalKeysSkippedCounter() {
local_stats_.skip_count_ += num_internal_keys_skipped_;
if (valid_) { if (valid_) {
local_stats_.skip_count_--; *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0";
} else {
*prop = "Iterator is not valid.";
} }
num_internal_keys_skipped_ = 0; return Status::OK();
} else if (prop_name == "rocksdb.iterator.internal-key") {
*prop = saved_key_.GetUserKey().ToString();
return Status::OK();
} }
return Status::InvalidArgument("Unidentified property.");
}
const SliceTransform* prefix_extractor_; bool DBIter::ParseKey(ParsedInternalKey* ikey) {
Env* const env_;
Logger* logger_;
UserComparatorWrapper user_comparator_;
const MergeOperator* const merge_operator_;
IteratorWrapper iter_;
ReadCallback* read_callback_;
// Max visible sequence number. It is normally the snapshot seq unless we have
// uncommitted data in db as in WriteUnCommitted.
SequenceNumber sequence_;
IterKey saved_key_;
// Reusable internal key data structure. This is only used inside one function
// and should not be used across functions. Reusing this object can reduce
// overhead of calling construction of the function if creating it each time.
ParsedInternalKey ikey_;
std::string saved_value_;
Slice pinned_value_;
// for prefix seek mode to support prev()
Statistics* statistics_;
uint64_t max_skip_;
uint64_t max_skippable_internal_keys_;
uint64_t num_internal_keys_skipped_;
const Slice* iterate_lower_bound_;
const Slice* iterate_upper_bound_;
IterKey prefix_start_buf_;
Status status_;
Slice prefix_start_key_;
Direction direction_;
bool valid_;
bool current_entry_is_merged_;
// True if we know that the current entry's seqnum is 0.
// This information is used as that the next entry will be for another
// user key.
bool is_key_seqnum_zero_;
const bool prefix_same_as_start_;
// Means that we will pin all data blocks we read as long the Iterator
// is not deleted, will be true if ReadOptions::pin_data is true
const bool pin_thru_lifetime_;
const bool total_order_seek_;
bool allow_blob_;
bool is_blob_;
bool arena_mode_;
// List of operands for merge operator.
MergeContext merge_context_;
ReadRangeDelAggregator range_del_agg_;
LocalStatistics local_stats_;
PinnedIteratorsManager pinned_iters_mgr_;
#ifdef ROCKSDB_LITE
ROCKSDB_FIELD_UNUSED
#endif
DBImpl* db_impl_;
#ifdef ROCKSDB_LITE
ROCKSDB_FIELD_UNUSED
#endif
ColumnFamilyData* cfd_;
// for diff snapshots we want the lower bound on the seqnum;
// if this value > 0 iterator will return internal keys
SequenceNumber start_seqnum_;
};
inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
if (!ParseInternalKey(iter_.key(), ikey)) { if (!ParseInternalKey(iter_.key(), ikey)) {
status_ = Status::Corruption("corrupted internal key in DBIter"); status_ = Status::Corruption("corrupted internal key in DBIter");
valid_ = false; valid_ = false;
@ -429,13 +178,13 @@ void DBIter::Next() {
// keys against the prefix of the seeked key. Set to false when // keys against the prefix of the seeked key. Set to false when
// performing a seek without a key (e.g. SeekToFirst). Set to // performing a seek without a key (e.g. SeekToFirst). Set to
// prefix_same_as_start_ for other iterations. // prefix_same_as_start_ for other iterations.
inline bool DBIter::FindNextUserEntry(bool skipping, bool prefix_check) { bool DBIter::FindNextUserEntry(bool skipping, bool prefix_check) {
PERF_TIMER_GUARD(find_next_user_entry_time); PERF_TIMER_GUARD(find_next_user_entry_time);
return FindNextUserEntryInternal(skipping, prefix_check); return FindNextUserEntryInternal(skipping, prefix_check);
} }
// Actual implementation of DBIter::FindNextUserEntry() // Actual implementation of DBIter::FindNextUserEntry()
inline bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) {
// Loop until we hit an acceptable entry to yield // Loop until we hit an acceptable entry to yield
assert(iter_.Valid()); assert(iter_.Valid());
assert(status_.ok()); assert(status_.ok());
@ -1523,114 +1272,4 @@ Iterator* NewDBIterator(Env* env, const ReadOptions& read_options,
return db_iter; return db_iter;
} }
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
ReadRangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() {
return db_iter_->GetRangeDelAggregator();
}
void ArenaWrappedDBIter::SetIterUnderDBIter(InternalIterator* iter) {
static_cast<DBIter*>(db_iter_)->SetIter(iter);
}
inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); }
inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); }
inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); }
inline void ArenaWrappedDBIter::Seek(const Slice& target) {
db_iter_->Seek(target);
}
inline void ArenaWrappedDBIter::SeekForPrev(const Slice& target) {
db_iter_->SeekForPrev(target);
}
inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
bool ArenaWrappedDBIter::IsBlob() const { return db_iter_->IsBlob(); }
inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
std::string* prop) {
if (prop_name == "rocksdb.iterator.super-version-number") {
// First try to pass the value returned from inner iterator.
if (!db_iter_->GetProperty(prop_name, prop).ok()) {
*prop = ToString(sv_number_);
}
return Status::OK();
}
return db_iter_->GetProperty(prop_name, prop);
}
void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iteration,
uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool allow_blob,
bool allow_refresh) {
auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ = new (mem) DBIter(env, read_options, cf_options, mutable_cf_options,
cf_options.user_comparator, nullptr, sequence,
true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, allow_blob);
sv_number_ = version_number;
allow_refresh_ = allow_refresh;
}
Status ArenaWrappedDBIter::Refresh() {
if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) {
return Status::NotSupported("Creating renew iterator is not allowed.");
}
assert(db_iter_ != nullptr);
// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
// correct behavior. Will be corrected automatically when we take a snapshot
// here for the case of WritePreparedTxnDB.
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
if (sv_number_ != cur_sv_number) {
Env* env = db_iter_->env();
db_iter_->~DBIter();
arena_.~Arena();
new (&arena_) Arena();
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
if (read_callback_) {
read_callback_->Refresh(latest_seq);
}
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_,
allow_refresh_);
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(),
latest_seq);
SetIterUnderDBIter(internal_iter);
} else {
db_iter_->set_sequence(latest_seq);
db_iter_->set_valid(false);
}
return Status::OK();
}
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob, bool allow_refresh) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
iter->Init(env, read_options, cf_options, mutable_cf_options, sequence,
max_sequential_skip_in_iterations, version_number, read_callback,
db_impl, cfd, allow_blob, allow_refresh);
if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback,
allow_blob);
}
return iter;
}
} // namespace rocksdb } // namespace rocksdb

@ -17,6 +17,7 @@
#include "options/cf_options.h" #include "options/cf_options.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "table/iterator_wrapper.h"
#include "util/autovector.h" #include "util/autovector.h"
namespace rocksdb { namespace rocksdb {
@ -46,9 +47,268 @@ namespace rocksdb {
// key: BBB value: v1 // key: BBB value: v1
// key: BBC value: v1 // key: BBC value: v1
// //
class Arena;
class DBIter;
// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries. DBIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
class DBIter final: public Iterator {
public:
// The following is grossly complicated. TODO: clean it up
// Which direction is the iterator currently moving?
// (1) When moving forward:
// (1a) if current_entry_is_merged_ = false, the internal iterator is
// positioned at the exact entry that yields this->key(), this->value()
// (1b) if current_entry_is_merged_ = true, the internal iterator is
// positioned immediately after the last entry that contributed to the
// current this->value(). That entry may or may not have key equal to
// this->key().
// (2) When moving backwards, the internal iterator is positioned
// just before all entries whose user key == this->key().
enum Direction {
kForward,
kReverse
};
// LocalStatistics contain Statistics counters that will be aggregated per
// each iterator instance and then will be sent to the global statistics when
// the iterator is destroyed.
//
// The purpose of this approach is to avoid perf regression happening
// when multiple threads bump the atomic counters from a DBIter::Next().
struct LocalStatistics {
explicit LocalStatistics() { ResetCounters(); }
void ResetCounters() {
next_count_ = 0;
next_found_count_ = 0;
prev_count_ = 0;
prev_found_count_ = 0;
bytes_read_ = 0;
skip_count_ = 0;
}
void BumpGlobalStatistics(Statistics* global_statistics) {
RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_);
RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_);
RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_);
RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_);
RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_);
RecordTick(global_statistics, NUMBER_ITER_SKIP, skip_count_);
PERF_COUNTER_ADD(iter_read_bytes, bytes_read_);
ResetCounters();
}
// Map to Tickers::NUMBER_DB_NEXT
uint64_t next_count_;
// Map to Tickers::NUMBER_DB_NEXT_FOUND
uint64_t next_found_count_;
// Map to Tickers::NUMBER_DB_PREV
uint64_t prev_count_;
// Map to Tickers::NUMBER_DB_PREV_FOUND
uint64_t prev_found_count_;
// Map to Tickers::ITER_BYTES_READ
uint64_t bytes_read_;
// Map to Tickers::NUMBER_ITER_SKIP
uint64_t skip_count_;
};
DBIter(Env* _env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const Comparator* cmp,
InternalIterator* iter, SequenceNumber s, bool arena_mode,
uint64_t max_sequential_skip_in_iterations,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob);
// No copying allowed
DBIter(const DBIter&) = delete;
void operator=(const DBIter&) = delete;
~DBIter() override {
// Release pinned data if any
if (pinned_iters_mgr_.PinningEnabled()) {
pinned_iters_mgr_.ReleasePinnedData();
}
RecordTick(statistics_, NO_ITERATOR_DELETED);
ResetInternalKeysSkippedCounter();
local_stats_.BumpGlobalStatistics(statistics_);
iter_.DeleteIter(arena_mode_);
}
virtual void SetIter(InternalIterator* iter) {
assert(iter_.iter() == nullptr);
iter_.Set(iter);
iter_.iter()->SetPinnedItersMgr(&pinned_iters_mgr_);
}
virtual ReadRangeDelAggregator* GetRangeDelAggregator() {
return &range_del_agg_;
}
bool Valid() const override { return valid_; }
Slice key() const override {
assert(valid_);
if(start_seqnum_ > 0) {
return saved_key_.GetInternalKey();
} else {
return saved_key_.GetUserKey();
}
}
Slice value() const override {
assert(valid_);
if (current_entry_is_merged_) {
// If pinned_value_ is set then the result of merge operator is one of
// the merge operands and we should return it.
return pinned_value_.data() ? pinned_value_ : saved_value_;
} else if (direction_ == kReverse) {
return pinned_value_;
} else {
return iter_.value();
}
}
Status status() const override {
if (status_.ok()) {
return iter_.status();
} else {
assert(!valid_);
return status_;
}
}
bool IsBlob() const {
assert(valid_ && (allow_blob_ || !is_blob_));
return is_blob_;
}
Status GetProperty(std::string prop_name, std::string* prop) override;
void Next() final override;
void Prev() final override;
void Seek(const Slice& target) final override;
void SeekForPrev(const Slice& target) final override;
void SeekToFirst() final override;
void SeekToLast() final override;
Env* env() { return env_; }
void set_sequence(uint64_t s) {
sequence_ = s;
if (read_callback_) {
read_callback_->Refresh(s);
}
}
void set_valid(bool v) { valid_ = v; }
private:
// For all methods in this block:
// PRE: iter_->Valid() && status_.ok()
// Return false if there was an error, and status() is non-ok, valid_ = false;
// in this case callers would usually stop what they were doing and return.
bool ReverseToForward();
bool ReverseToBackward();
bool FindValueForCurrentKey();
bool FindValueForCurrentKeyUsingSeek();
bool FindUserKeyBeforeSavedKey();
bool FindNextUserEntry(bool skipping, bool prefix_check);
bool FindNextUserEntryInternal(bool skipping, bool prefix_check);
bool ParseKey(ParsedInternalKey* key);
bool MergeValuesNewToOld();
void PrevInternal();
bool TooManyInternalKeysSkipped(bool increment = true);
bool IsVisible(SequenceNumber sequence);
// Temporarily pin the blocks that we encounter until ReleaseTempPinnedData()
// is called
void TempPinData() {
if (!pin_thru_lifetime_) {
pinned_iters_mgr_.StartPinning();
}
}
// Release blocks pinned by TempPinData()
void ReleaseTempPinnedData() {
if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) {
pinned_iters_mgr_.ReleasePinnedData();
}
}
inline void ClearSavedValue() {
if (saved_value_.capacity() > 1048576) {
std::string empty;
swap(empty, saved_value_);
} else {
saved_value_.clear();
}
}
inline void ResetInternalKeysSkippedCounter() {
local_stats_.skip_count_ += num_internal_keys_skipped_;
if (valid_) {
local_stats_.skip_count_--;
}
num_internal_keys_skipped_ = 0;
}
const SliceTransform* prefix_extractor_;
Env* const env_;
Logger* logger_;
UserComparatorWrapper user_comparator_;
const MergeOperator* const merge_operator_;
IteratorWrapper iter_;
ReadCallback* read_callback_;
// Max visible sequence number. It is normally the snapshot seq unless we have
// uncommitted data in db as in WriteUnCommitted.
SequenceNumber sequence_;
IterKey saved_key_;
// Reusable internal key data structure. This is only used inside one function
// and should not be used across functions. Reusing this object can reduce
// overhead of calling construction of the function if creating it each time.
ParsedInternalKey ikey_;
std::string saved_value_;
Slice pinned_value_;
// for prefix seek mode to support prev()
Statistics* statistics_;
uint64_t max_skip_;
uint64_t max_skippable_internal_keys_;
uint64_t num_internal_keys_skipped_;
const Slice* iterate_lower_bound_;
const Slice* iterate_upper_bound_;
IterKey prefix_start_buf_;
Status status_;
Slice prefix_start_key_;
Direction direction_;
bool valid_;
bool current_entry_is_merged_;
// True if we know that the current entry's seqnum is 0.
// This information is used as that the next entry will be for another
// user key.
bool is_key_seqnum_zero_;
const bool prefix_same_as_start_;
// Means that we will pin all data blocks we read as long the Iterator
// is not deleted, will be true if ReadOptions::pin_data is true
const bool pin_thru_lifetime_;
const bool total_order_seek_;
bool allow_blob_;
bool is_blob_;
bool arena_mode_;
// List of operands for merge operator.
MergeContext merge_context_;
ReadRangeDelAggregator range_del_agg_;
LocalStatistics local_stats_;
PinnedIteratorsManager pinned_iters_mgr_;
#ifdef ROCKSDB_LITE
ROCKSDB_FIELD_UNUSED
#endif
DBImpl* db_impl_;
#ifdef ROCKSDB_LITE
ROCKSDB_FIELD_UNUSED
#endif
ColumnFamilyData* cfd_;
// for diff snapshots we want the lower bound on the seqnum;
// if this value > 0 iterator will return internal keys
SequenceNumber start_seqnum_;
};
// Return a new iterator that converts internal keys (yielded by // Return a new iterator that converts internal keys (yielded by
// "*internal_iter") that were live at the specified `sequence` number // "*internal_iter") that were live at the specified `sequence` number
// into appropriate user keys. // into appropriate user keys.
@ -61,79 +321,4 @@ extern Iterator* NewDBIterator(
ReadCallback* read_callback, DBImpl* db_impl = nullptr, ReadCallback* read_callback, DBImpl* db_impl = nullptr,
ColumnFamilyData* cfd = nullptr, bool allow_blob = false); ColumnFamilyData* cfd = nullptr, bool allow_blob = false);
// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed be allocated. This class is used as an entry point of
// a iterator hierarchy whose memory can be allocated inline. In that way,
// accessing the iterator tree can be more cache friendly. It is also faster
// to allocate.
// When using the class's Iterator interface, the behavior is exactly
// the same as the inner DBIter.
class ArenaWrappedDBIter : public Iterator {
public:
virtual ~ArenaWrappedDBIter();
// Get the arena to be used to allocate memory for DBIter to be wrapped,
// as well as child iterators in it.
virtual Arena* GetArena() { return &arena_; }
virtual ReadRangeDelAggregator* GetRangeDelAggregator();
// Set the internal iterator wrapped inside the DB Iterator. Usually it is
// a merging iterator.
virtual void SetIterUnderDBIter(InternalIterator* iter);
virtual bool Valid() const override;
virtual void SeekToFirst() override;
virtual void SeekToLast() override;
virtual void Seek(const Slice& target) override;
virtual void SeekForPrev(const Slice& target) override;
virtual void Next() override;
virtual void Prev() override;
virtual Slice key() const override;
virtual Slice value() const override;
virtual Status status() const override;
virtual Status Refresh() override;
bool IsBlob() const;
virtual Status GetProperty(std::string prop_name, std::string* prop) override;
void Init(Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options,
const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd,
bool allow_blob, bool allow_refresh);
void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl,
ColumnFamilyData* cfd, ReadCallback* read_callback,
bool allow_blob) {
read_options_ = read_options;
db_impl_ = db_impl;
cfd_ = cfd;
read_callback_ = read_callback;
allow_blob_ = allow_blob;
}
private:
DBIter* db_iter_;
Arena arena_;
uint64_t sv_number_;
ColumnFamilyData* cfd_ = nullptr;
DBImpl* db_impl_ = nullptr;
ReadOptions read_options_;
ReadCallback* read_callback_;
bool allow_blob_ = false;
bool allow_refresh_ = true;
};
// Generate the arena wrapped iterator class.
// `db_impl` and `cfd` are used for reneweal. If left null, renewal will not
// be supported.
extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options,
const ImmutableCFOptions& cf_options,
const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence,
uint64_t max_sequential_skip_in_iterations, uint64_t version_number,
ReadCallback* read_callback, DBImpl* db_impl = nullptr,
ColumnFamilyData* cfd = nullptr, bool allow_blob = false,
bool allow_refresh = true);
} // namespace rocksdb } // namespace rocksdb

@ -9,6 +9,7 @@
#include <functional> #include <functional>
#include "db/arena_wrapped_db_iter.h"
#include "db/db_iter.h" #include "db/db_iter.h"
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/port.h" #include "port/port.h"

@ -3,6 +3,7 @@ LIB_SOURCES = \
cache/clock_cache.cc \ cache/clock_cache.cc \
cache/lru_cache.cc \ cache/lru_cache.cc \
cache/sharded_cache.cc \ cache/sharded_cache.cc \
db/arena_wrapped_db_iter.cc \
db/builder.cc \ db/builder.cc \
db/c.cc \ db/c.cc \
db/column_family.cc \ db/column_family.cc \

@ -6,6 +6,7 @@
#pragma once #pragma once
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "db/arena_wrapped_db_iter.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"

@ -13,6 +13,7 @@
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
#include "db/arena_wrapped_db_iter.h"
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"

@ -5,6 +5,7 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "db/arena_wrapped_db_iter.h"
#include "utilities/transactions/write_unprepared_txn_db.h" #include "utilities/transactions/write_unprepared_txn_db.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
#include "util/cast_util.h" #include "util/cast_util.h"

Loading…
Cancel
Save