Ankit Gupta 11 years ago
commit 08314321fa
  1. 6
      build_tools/regression_build_test.sh
  2. 4
      db/column_family.cc
  3. 7
      db/column_family.h
  4. 28
      db/db_bench.cc
  5. 115
      db/db_impl.cc
  6. 16
      db/db_impl.h
  7. 77
      db/db_iter.cc
  8. 46
      db/db_iter.h
  9. 6
      db/dbformat.h
  10. 383
      db/forward_iterator.cc
  11. 105
      db/forward_iterator.h
  12. 31
      db/memtable.cc
  13. 7
      db/memtable.h
  14. 9
      db/memtable_list.cc
  15. 4
      db/memtable_list.h
  16. 10
      db/simple_table_db_test.cc
  17. 7
      db/table_cache.cc
  18. 3
      db/table_cache.h
  19. 26
      db/version_set.cc
  20. 5
      db/version_set.h
  21. 18
      include/rocksdb/memtablerep.h
  22. 9
      include/rocksdb/options.h
  23. 9
      table/block_based_table_reader.cc
  24. 2
      table/block_based_table_reader.h
  25. 20
      table/iterator.cc
  26. 27
      table/iterator_wrapper.h
  27. 68
      table/merger.cc
  28. 33
      table/merger.h
  29. 9
      table/plain_table_reader.cc
  30. 2
      table/plain_table_reader.h
  31. 7
      table/table_reader.h
  32. 13
      table/two_level_iterator.cc
  33. 7
      table/two_level_iterator.h
  34. 65
      tools/db_stress.cc
  35. 2
      util/arena.h
  36. 27
      util/dynamic_bloom.cc
  37. 30
      util/dynamic_bloom.h
  38. 35
      util/dynamic_bloom_test.cc
  39. 10
      util/hash_cuckoo_rep.cc
  40. 20
      util/hash_linklist_rep.cc
  41. 19
      util/hash_skiplist_rep.cc
  42. 2
      util/options.cc
  43. 8
      util/skiplistrep.cc
  44. 16
      util/vectorrep.cc
  45. 1
      utilities/backupable/backupable_db_test.cc

@ -308,9 +308,9 @@ function send_benchmark_to_ods {
file="$3"
QPS=$(grep $bench $file | awk '{print $5}')
P50_MICROS=$(grep $bench $file -A 4 | tail -n1 | awk '{print $3}' )
P75_MICROS=$(grep $bench $file -A 4 | tail -n1 | awk '{print $5}' )
P99_MICROS=$(grep $bench $file -A 4 | tail -n1 | awk '{print $7}' )
P50_MICROS=$(grep $bench $file -A 6 | grep "Percentiles" | awk '{print $3}' )
P75_MICROS=$(grep $bench $file -A 6 | grep "Percentiles" | awk '{print $5}' )
P99_MICROS=$(grep $bench $file -A 6 | grep "Percentiles" | awk '{print $7}' )
send_to_ods rocksdb.build.$bench_key.qps $QPS
send_to_ods rocksdb.build.$bench_key.p50_micros $P50_MICROS

@ -506,6 +506,10 @@ void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
max_column_family_ = std::max(new_max_column_family, max_column_family_);
}
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
return column_families_.size();
}
// under a DB mutex
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions,

@ -180,7 +180,7 @@ class ColumnFamilyData {
void SetCurrent(Version* current);
void CreateNewMemtable();
TableCache* table_cache() { return table_cache_.get(); }
TableCache* table_cache() const { return table_cache_.get(); }
// See documentation in compaction_picker.h
Compaction* PickCompaction(LogBuffer* log_buffer);
@ -302,8 +302,8 @@ class ColumnFamilyData {
// family might get dropped when the DB mutex is released
// * GetDefault() -- thread safe
// * GetColumnFamily() -- either inside of DB mutex or call Lock() <-> Unlock()
// * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily() --
// inside of DB mutex
// * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily(),
// NumberOfColumnFamilies -- inside of DB mutex
class ColumnFamilySet {
public:
// ColumnFamilySet supports iteration
@ -342,6 +342,7 @@ class ColumnFamilySet {
uint32_t GetNextColumnFamilyID();
uint32_t GetMaxColumnFamily();
void UpdateMaxColumnFamily(uint32_t new_max_column_family);
size_t NumberOfColumnFamilies() const;
ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
Version* dummy_version,

@ -465,6 +465,8 @@ static auto FLAGS_compaction_fadvice_e =
DEFINE_bool(use_tailing_iterator, false,
"Use tailing iterator to access a series of keys instead of get");
DEFINE_int64(iter_refresh_interval_us, -1,
"How often to refresh iterators. Disable refresh when -1");
DEFINE_bool(use_adaptive_mutex, rocksdb::Options().use_adaptive_mutex,
"Use adaptive mutex");
@ -1926,7 +1928,7 @@ class Benchmark {
}
char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
found, read);
thread->stats.AddMessage(msg);
@ -2009,12 +2011,31 @@ class Benchmark {
multi_iters.push_back(db->NewIterator(options));
}
}
uint64_t last_refresh = FLAGS_env->NowMicros();
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
if (!FLAGS_use_tailing_iterator && FLAGS_iter_refresh_interval_us >= 0) {
uint64_t now = FLAGS_env->NowMicros();
if (now - last_refresh > (uint64_t)FLAGS_iter_refresh_interval_us) {
if (db_ != nullptr) {
delete single_iter;
single_iter = db_->NewIterator(options);
} else {
for (auto iter : multi_iters) {
delete iter;
}
multi_iters.clear();
for (DB* db : multi_dbs_) {
multi_iters.push_back(db->NewIterator(options));
}
}
}
last_refresh = now;
}
// Pick a Iterator to use
Iterator* iter_to_use = single_iter;
if (single_iter == nullptr) {
@ -2035,9 +2056,12 @@ class Benchmark {
}
char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
found, read);
thread->stats.AddMessage(msg);
if (FLAGS_perf_level > 0) {
thread->stats.AddMessage(perf_context.ToString());
}
}
void SeekRandomWhileWriting(ThreadState* thread) {

@ -36,6 +36,7 @@
#include "db/table_cache.h"
#include "db/table_properties_collector.h"
#include "db/tailing_iter.h"
#include "db/forward_iterator.h"
#include "db/transaction_log_impl.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
@ -2067,7 +2068,15 @@ void DBImpl::BackgroundCallCompaction() {
if (madeProgress || bg_schedule_needed_) {
MaybeScheduleFlushOrCompaction();
}
if (madeProgress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) {
// signal if
// * madeProgress -- need to wakeup MakeRoomForWrite
// * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
// * bg_manual_only_ > 0 -- need to wakeup RunManualCompaction
// If none of this is true, there is no need to signal since nobody is
// waiting for it
bg_cv_.SignalAll();
}
// IMPORTANT: there should be no code after calling SignalAll. This call may
// signal the DB destructor that it's OK to proceed with destruction. In
// that case, all DB variables will be dealloacated and referencing them
@ -2578,7 +2587,7 @@ Status DBImpl::ProcessKeyValueCompaction(
cfd->user_comparator()->Compare(ikey.user_key,
current_user_key.GetKey()) != 0) {
// First occurrence of this user key
current_user_key.SetUserKey(ikey.user_key);
current_user_key.SetKey(ikey.user_key);
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
visible_in_snapshot = kMaxSequenceNumber;
@ -3170,7 +3179,23 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
ColumnFamilyData* cfd,
SuperVersion* super_version) {
SuperVersion* super_version,
Arena* arena) {
Iterator* internal_iter;
if (arena != nullptr) {
// Need to create internal iterator from the arena.
MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena);
// Collect iterator for mutable mem
merge_iter_builder.AddIterator(
super_version->mem->NewIterator(options, false, arena));
// Collect all needed child iterators for immutable memtables
super_version->imm->AddIterators(options, &merge_iter_builder);
// Collect iterators for files in L0 - Ln
super_version->current->AddIterators(options, storage_options_,
&merge_iter_builder);
internal_iter = merge_iter_builder.Finish();
} else {
// Need to create internal iterator using malloc.
std::vector<Iterator*> iterator_list;
// Collect iterator for mutable mem
iterator_list.push_back(super_version->mem->NewIterator(options));
@ -3179,9 +3204,9 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
// Collect iterators for files in L0 - Ln
super_version->current->AddIterators(options, storage_options_,
&iterator_list);
Iterator* internal_iter = NewMergingIterator(
&cfd->internal_comparator(), &iterator_list[0], iterator_list.size());
internal_iter = NewMergingIterator(&cfd->internal_comparator(),
&iterator_list[0], iterator_list.size());
}
IterState* cleanup = new IterState(this, &mutex_, super_version);
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
@ -3532,30 +3557,77 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
Iterator* iter;
if (options.tailing) {
#ifdef ROCKSDB_LITE
// not supported in lite version
return nullptr;
#else
iter = new TailingIterator(env_, this, options, cfd);
// TODO(ljin): remove tailing iterator
auto iter = new ForwardIterator(this, options, cfd);
return NewDBIterator(env_, *cfd->options(), cfd->user_comparator(), iter,
kMaxSequenceNumber);
// return new TailingIterator(env_, this, options, cfd);
#endif
} else {
SequenceNumber latest_snapshot = versions_->LastSequence();
SuperVersion* sv = nullptr;
sv = cfd->GetReferencedSuperVersion(&mutex_);
iter = NewInternalIterator(options, cfd, sv);
auto snapshot =
options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot;
iter = NewDBIterator(env_, *cfd->options(),
cfd->user_comparator(), iter, snapshot);
}
return iter;
// Try to generate a DB iterator tree in continuous memory area to be
// cache friendly. Here is an example of result:
// +-------------------------------+
// | |
// | ArenaWrappedDBIter |
// | + |
// | +---> Inner Iterator ------------+
// | | | |
// | | +-- -- -- -- -- -- -- --+ |
// | +--- | Arena | |
// | | | |
// | Allocated Memory: | |
// | | +-------------------+ |
// | | | DBIter | <---+
// | | + |
// | | | +-> iter_ ------------+
// | | | | |
// | | +-------------------+ |
// | | | MergingIterator | <---+
// | | + |
// | | | +->child iter1 ------------+
// | | | | | |
// | | +->child iter2 ----------+ |
// | | | | | | |
// | | | +->child iter3 --------+ | |
// | | | | | |
// | | +-------------------+ | | |
// | | | Iterator1 | <--------+
// | | +-------------------+ | |
// | | | Iterator2 | <------+
// | | +-------------------+ |
// | | | Iterator3 | <----+
// | | +-------------------+
// | | |
// +-------+-----------------------+
//
// ArenaWrappedDBIter inlines an arena area where all the iterartor in the
// the iterator tree is allocated in the order of being accessed when
// querying.
// Laying out the iterators in the order of being accessed makes it more
// likely that any iterator pointer is close to the iterator it points to so
// that they are likely to be in the same cache line and/or page.
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, *cfd->options(), cfd->user_comparator(), snapshot);
Iterator* internal_iter =
NewInternalIterator(options, cfd, sv, db_iter->GetArena());
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
}
}
Status DBImpl::NewIterators(
@ -3679,15 +3751,17 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
uint64_t flush_column_family_if_log_file = 0;
uint64_t max_total_wal_size = (options_.max_total_wal_size == 0)
? 2 * max_total_in_memory_state_
? 4 * max_total_in_memory_state_
: options_.max_total_wal_size;
if (alive_log_files_.begin()->getting_flushed == false &&
if (versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1 &&
alive_log_files_.begin()->getting_flushed == false &&
total_log_size_ > max_total_wal_size) {
flush_column_family_if_log_file = alive_log_files_.begin()->number;
alive_log_files_.begin()->getting_flushed = true;
Log(options_.info_log,
"Flushing all column families with data in WAL number %" PRIu64,
flush_column_family_if_log_file);
"Flushing all column families with data in WAL number %" PRIu64
". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
flush_column_family_if_log_file, total_log_size_, max_total_wal_size);
}
Status status;
@ -3923,6 +3997,10 @@ Status DBImpl::MakeRoomForWrite(
uint64_t rate_limit_delay_millis = 0;
Status s;
double score;
// Once we schedule background work, we shouldn't schedule it again, since it
// might generate a tight feedback loop, constantly scheduling more background
// work, even if additional background work is not needed
bool schedule_background_work = true;
while (true) {
if (!bg_error_.ok()) {
@ -3966,7 +4044,10 @@ Status DBImpl::MakeRoomForWrite(
DelayLoggingAndReset();
Log(options_.info_log, "[%s] wait for memtable flush...\n",
cfd->GetName().c_str());
if (schedule_background_work) {
MaybeScheduleFlushOrCompaction();
schedule_background_work = false;
}
uint64_t stall;
{
StopWatch sw(env_, options_.statistics.get(),

@ -39,6 +39,7 @@ class Version;
class VersionEdit;
class VersionSet;
class CompactionFilterV2;
class Arena;
class DBImpl : public DB {
public:
@ -278,13 +279,15 @@ class DBImpl : public DB {
const DBOptions options_;
Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd,
SuperVersion* super_version);
SuperVersion* super_version,
Arena* arena = nullptr);
private:
friend class DB;
friend class InternalStats;
#ifndef ROCKSDB_LITE
friend class TailingIterator;
friend class ForwardIterator;
#endif
friend struct SuperVersion;
struct CompactionState;
@ -449,7 +452,16 @@ class DBImpl : public DB {
// State below is protected by mutex_
port::Mutex mutex_;
port::AtomicPointer shutting_down_;
port::CondVar bg_cv_; // Signalled when background work finishes
// This condition variable is signaled on these conditions:
// * whenever bg_compaction_scheduled_ goes down to 0
// * if bg_manual_only_ > 0, whenever a compaction finishes, even if it hasn't
// made any progress
// * whenever a compaction made any progress
// * whenever bg_flush_scheduled_ value decreases (i.e. whenever a flush is
// done, even if it didn't make any progress)
// * whenever there is an error in background flush or compaction
// * whenever bg_logstats_scheduled_ turns to false
port::CondVar bg_cv_;
uint64_t logfile_number_;
unique_ptr<log::Writer> log_;
bool log_empty_;

@ -18,6 +18,7 @@
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
#include "port/port.h"
#include "util/arena.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
@ -37,8 +38,6 @@ static void DumpInternalIter(Iterator* iter) {
}
#endif
namespace {
// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries. DBIter
// combines multiple entries for the same userkey found in the DB
@ -57,9 +56,10 @@ class DBIter: public Iterator {
kReverse
};
DBIter(Env* env, const Options& options,
const Comparator* cmp, Iterator* iter, SequenceNumber s)
: env_(env),
DBIter(Env* env, const Options& options, const Comparator* cmp,
Iterator* iter, SequenceNumber s, bool arena_mode)
: arena_mode_(arena_mode),
env_(env),
logger_(options.info_log.get()),
user_comparator_(cmp),
user_merge_operator_(options.merge_operator.get()),
@ -74,7 +74,15 @@ class DBIter: public Iterator {
}
virtual ~DBIter() {
RecordTick(statistics_, NO_ITERATORS, -1);
if (!arena_mode_) {
delete iter_;
} else {
iter_->~Iterator();
}
}
virtual void SetIter(Iterator* iter) {
assert(iter_ == nullptr);
iter_ = iter;
}
virtual bool Valid() const { return valid_; }
virtual Slice key() const {
@ -116,11 +124,12 @@ class DBIter: public Iterator {
}
}
bool arena_mode_;
Env* const env_;
Logger* logger_;
const Comparator* const user_comparator_;
const MergeOperator* const user_merge_operator_;
Iterator* const iter_;
Iterator* iter_;
SequenceNumber const sequence_;
Status status_;
@ -211,18 +220,18 @@ void DBIter::FindNextUserEntryInternal(bool skipping) {
case kTypeDeletion:
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
saved_key_.SetUserKey(ikey.user_key);
saved_key_.SetKey(ikey.user_key);
skipping = true;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeValue:
valid_ = true;
saved_key_.SetUserKey(ikey.user_key);
saved_key_.SetKey(ikey.user_key);
return;
case kTypeMerge:
// By now, we are sure the current ikey is going to yield a value
saved_key_.SetUserKey(ikey.user_key);
saved_key_.SetKey(ikey.user_key);
current_entry_is_merged_ = true;
valid_ = true;
MergeValuesNewToOld(); // Go to a different state machine
@ -331,7 +340,7 @@ void DBIter::Prev() {
// iter_ is pointing at the current entry. Scan backwards until
// the key changes so we can use the normal reverse scanning code.
assert(iter_->Valid()); // Otherwise valid_ would have been false
saved_key_.SetUserKey(ExtractUserKey(iter_->key()));
saved_key_.SetKey(ExtractUserKey(iter_->key()));
while (true) {
iter_->Prev();
if (!iter_->Valid()) {
@ -377,7 +386,7 @@ void DBIter::FindPrevUserEntry() {
std::string empty;
swap(empty, saved_value_);
}
saved_key_.SetUserKey(ExtractUserKey(iter_->key()));
saved_key_.SetKey(ExtractUserKey(iter_->key()));
saved_value_.assign(raw_value.data(), raw_value.size());
}
} else {
@ -461,16 +470,48 @@ void DBIter::SeekToLast() {
FindPrevUserEntry();
}
} // anonymous namespace
Iterator* NewDBIterator(
Env* env,
const Options& options,
Iterator* NewDBIterator(Env* env, const Options& options,
const Comparator* user_key_comparator,
Iterator* internal_iter,
const SequenceNumber& sequence) {
return new DBIter(env, options, user_key_comparator,
internal_iter, sequence);
return new DBIter(env, options, user_key_comparator, internal_iter, sequence,
false);
}
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
void ArenaWrappedDBIter::SetDBIter(DBIter* iter) { db_iter_ = iter; }
void ArenaWrappedDBIter::SetIterUnderDBIter(Iterator* iter) {
static_cast<DBIter*>(db_iter_)->SetIter(iter);
}
inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); }
inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); }
inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); }
inline void ArenaWrappedDBIter::Seek(const Slice& target) {
db_iter_->Seek(target);
}
inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); }
inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); }
inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); }
inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); }
inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); }
void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,
void* arg2) {
db_iter_->RegisterCleanup(function, arg1, arg2);
}
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const Options& options, const Comparator* user_key_comparator,
const SequenceNumber& sequence) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
Arena* arena = iter->GetArena();
auto mem = arena->AllocateAligned(sizeof(DBIter));
DBIter* db_iter = new (mem)
DBIter(env, options, user_key_comparator, nullptr, sequence, true);
iter->SetDBIter(db_iter);
return iter;
}
} // namespace rocksdb

@ -11,9 +11,14 @@
#include <stdint.h>
#include "rocksdb/db.h"
#include "db/dbformat.h"
#include "util/arena.h"
#include "util/autovector.h"
namespace rocksdb {
class Arena;
class DBIter;
// Return a new iterator that converts internal keys (yielded by
// "*internal_iter") that were live at the specified "sequence" number
// into appropriate user keys.
@ -24,4 +29,45 @@ extern Iterator* NewDBIterator(
Iterator* internal_iter,
const SequenceNumber& sequence);
// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed be allocated. This class is used as an entry point of
// a iterator hierarchy whose memory can be allocated inline. In that way,
// accessing the iterator tree can be more cache friendly. It is also faster
// to allocate.
class ArenaWrappedDBIter : public Iterator {
public:
virtual ~ArenaWrappedDBIter();
// Get the arena to be used to allocate memory for DBIter to be wrapped,
// as well as child iterators in it.
virtual Arena* GetArena() { return &arena_; }
// Set the DB Iterator to be wrapped
virtual void SetDBIter(DBIter* iter);
// Set the internal iterator wrapped inside the DB Iterator. Usually it is
// a merging iterator.
virtual void SetIterUnderDBIter(Iterator* iter);
virtual bool Valid() const override;
virtual void SeekToFirst() override;
virtual void SeekToLast() override;
virtual void Seek(const Slice& target) override;
virtual void Next() override;
virtual void Prev() override;
virtual Slice key() const override;
virtual Slice value() const override;
virtual Status status() const override;
void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);
private:
DBIter* db_iter_;
Arena arena_;
};
// Generate the arena wrapped iterator class.
extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const Options& options, const Comparator* user_key_comparator,
const SequenceNumber& sequence);
} // namespace rocksdb

@ -256,10 +256,10 @@ class IterKey {
void Clear() { key_size_ = 0; }
void SetUserKey(const Slice& user_key) {
size_t size = user_key.size();
void SetKey(const Slice& key) {
size_t size = key.size();
EnlargeBufferIfNeeded(size);
memcpy(key_, user_key.data(), size);
memcpy(key_, key.data(), size);
key_size_ = size;
}

@ -0,0 +1,383 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include "db/forward_iterator.h"
#include <string>
#include <utility>
#include <limits>
#include "db/db_impl.h"
#include "db/db_iter.h"
#include "db/column_family.h"
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "table/merger.h"
#include "db/dbformat.h"
namespace rocksdb {
// Usage:
// LevelIterator iter;
// iter.SetFileIndex(file_index);
// iter.Seek(target);
// iter.Next()
class LevelIterator : public Iterator {
public:
LevelIterator(const ColumnFamilyData* const cfd,
const ReadOptions& read_options,
const std::vector<FileMetaData*>& files)
: cfd_(cfd), read_options_(read_options), files_(files), valid_(false),
file_index_(std::numeric_limits<uint32_t>::max()) {}
void SetFileIndex(uint32_t file_index) {
assert(file_index < files_.size());
if (file_index != file_index_) {
file_index_ = file_index;
file_iter_.reset(cfd_->table_cache()->NewIterator(
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
*(files_[file_index_]), nullptr /* table_reader_ptr */, false));
}
valid_ = false;
}
void SeekToLast() override {
status_ = Status::NotSupported("LevelIterator::SeekToLast()");
valid_ = false;
}
void Prev() {
status_ = Status::NotSupported("LevelIterator::Prev()");
valid_ = false;
}
bool Valid() const override {
return valid_;
}
void SeekToFirst() override {
SetFileIndex(0);
file_iter_->SeekToFirst();
valid_ = file_iter_->Valid();
}
void Seek(const Slice& internal_key) override {
assert(file_iter_ != nullptr);
file_iter_->Seek(internal_key);
valid_ = file_iter_->Valid();
assert(valid_);
}
void Next() override {
assert(valid_);
file_iter_->Next();
while (!file_iter_->Valid()) {
if (file_index_ + 1 >= files_.size()) {
valid_ = false;
return;
}
SetFileIndex(file_index_ + 1);
file_iter_->SeekToFirst();
}
valid_ = file_iter_->Valid();
}
Slice key() const override {
assert(valid_);
return file_iter_->key();
}
Slice value() const override {
assert(valid_);
return file_iter_->value();
}
Status status() const override {
return status_;
}
private:
const ColumnFamilyData* const cfd_;
const ReadOptions& read_options_;
const std::vector<FileMetaData*>& files_;
bool valid_;
uint32_t file_index_;
Status status_;
std::unique_ptr<Iterator> file_iter_;
};
ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
ColumnFamilyData* cfd)
: db_(db),
read_options_(read_options),
cfd_(cfd),
prefix_extractor_(cfd->options()->prefix_extractor.get()),
user_comparator_(cfd->user_comparator()),
immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())),
sv_(nullptr),
mutable_iter_(nullptr),
current_(nullptr),
valid_(false),
is_prev_set_(false) {}
ForwardIterator::~ForwardIterator() {
Cleanup();
}
void ForwardIterator::Cleanup() {
delete mutable_iter_;
for (auto* m : imm_iters_) {
delete m;
}
imm_iters_.clear();
for (auto* f : l0_iters_) {
delete f;
}
l0_iters_.clear();
for (auto* l : level_iters_) {
delete l;
}
level_iters_.clear();
if (sv_ != nullptr && sv_->Unref()) {
DBImpl::DeletionState deletion_state;
db_->mutex_.Lock();
sv_->Cleanup();
db_->FindObsoleteFiles(deletion_state, false, true);
db_->mutex_.Unlock();
delete sv_;
if (deletion_state.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(deletion_state);
}
}
}
bool ForwardIterator::Valid() const {
return valid_;
}
void ForwardIterator::SeekToFirst() {
if (sv_ == nullptr ||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
RebuildIterators();
}
SeekInternal(Slice(), true);
}
void ForwardIterator::Seek(const Slice& internal_key) {
if (sv_ == nullptr ||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
RebuildIterators();
}
SeekInternal(internal_key, false);
}
void ForwardIterator::SeekInternal(const Slice& internal_key,
bool seek_to_first) {
// mutable
seek_to_first ? mutable_iter_->SeekToFirst() :
mutable_iter_->Seek(internal_key);
// immutable
// TODO(ljin): NeedToSeekImmutable has negative impact on performance
// if it turns to need to seek immutable often. We probably want to have
// an option to turn it off.
if (seek_to_first || NeedToSeekImmutable(internal_key)) {
{
auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
immutable_min_heap_.swap(tmp);
}
for (auto* m : imm_iters_) {
seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
if (m->Valid()) {
immutable_min_heap_.push(m);
}
}
auto* files = sv_->current->files_;
for (uint32_t i = 0; i < files[0].size(); ++i) {
if (seek_to_first) {
l0_iters_[i]->SeekToFirst();
} else {
// If the target key passes over the larget key, we are sure Next()
// won't go over this file.
if (user_comparator_->Compare(ExtractUserKey(internal_key),
files[0][i]->largest.user_key()) > 0) {
continue;
}
l0_iters_[i]->Seek(internal_key);
}
if (l0_iters_[i]->Valid()) {
immutable_min_heap_.push(l0_iters_[i]);
}
}
for (int32_t level = 1; level < sv_->current->NumberLevels(); ++level) {
if (files[level].empty()) {
continue;
}
assert(level_iters_[level - 1] != nullptr);
uint32_t f_idx = 0;
if (!seek_to_first) {
f_idx = FindFileInRange(
files[level], internal_key, 0, files[level].size());
}
if (f_idx < files[level].size()) {
level_iters_[level - 1]->SetFileIndex(f_idx);
seek_to_first ? level_iters_[level - 1]->SeekToFirst() :
level_iters_[level - 1]->Seek(internal_key);
if (level_iters_[level - 1]->Valid()) {
immutable_min_heap_.push(level_iters_[level - 1]);
}
}
}
if (seek_to_first || immutable_min_heap_.empty()) {
is_prev_set_ = false;
} else {
prev_key_.SetKey(internal_key);
is_prev_set_ = true;
}
}
UpdateCurrent();
}
void ForwardIterator::Next() {
assert(valid_);
if (sv_ == nullptr ||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
std::string current_key = key().ToString();
Slice old_key(current_key.data(), current_key.size());
RebuildIterators();
SeekInternal(old_key, false);
if (!valid_ || key().compare(old_key) != 0) {
return;
}
} else if (current_ != mutable_iter_) {
// It is going to advance immutable iterator
prev_key_.SetKey(current_->key());
is_prev_set_ = true;
}
current_->Next();
if (current_->Valid() && current_ != mutable_iter_) {
immutable_min_heap_.push(current_);
}
UpdateCurrent();
}
Slice ForwardIterator::key() const {
assert(valid_);
return current_->key();
}
Slice ForwardIterator::value() const {
assert(valid_);
return current_->value();
}
Status ForwardIterator::status() const {
if (!status_.ok()) {
return status_;
} else if (!mutable_iter_->status().ok()) {
return mutable_iter_->status();
}
return Status::OK();
}
void ForwardIterator::RebuildIterators() {
// Clean up
Cleanup();
// New
sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
mutable_iter_ = sv_->mem->NewIterator(read_options_);
sv_->imm->AddIterators(read_options_, &imm_iters_);
const auto& l0_files = sv_->current->files_[0];
l0_iters_.reserve(l0_files.size());
for (const auto* l0 : l0_files) {
l0_iters_.push_back(cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0));
}
level_iters_.reserve(sv_->current->NumberLevels() - 1);
for (int32_t level = 1; level < sv_->current->NumberLevels(); ++level) {
if (sv_->current->files_[level].empty()) {
level_iters_.push_back(nullptr);
} else {
level_iters_.push_back(new LevelIterator(cfd_, read_options_,
sv_->current->files_[level]));
}
}
current_ = nullptr;
is_prev_set_ = false;
}
void ForwardIterator::UpdateCurrent() {
if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) {
current_ = nullptr;
} else if (immutable_min_heap_.empty()) {
current_ = mutable_iter_;
} else if (!mutable_iter_->Valid()) {
current_ = immutable_min_heap_.top();
immutable_min_heap_.pop();
} else {
current_ = immutable_min_heap_.top();
assert(current_ != nullptr);
assert(current_->Valid());
int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare(
mutable_iter_->key(), current_->key()) > 0;
assert(cmp != 0);
if (cmp > 0) {
immutable_min_heap_.pop();
} else {
current_ = mutable_iter_;
}
}
valid_ = (current_ != nullptr);
if (!status_.ok()) {
status_ = Status::OK();
}
}
bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
if (!is_prev_set_) {
return true;
}
Slice prev_key = prev_key_.GetKey();
if (prefix_extractor_ && prefix_extractor_->Transform(target).compare(
prefix_extractor_->Transform(prev_key)) != 0) {
return true;
}
if (cfd_->internal_comparator().InternalKeyComparator::Compare(
prev_key, target) >= 0) {
return true;
}
if (immutable_min_heap_.empty() ||
cfd_->internal_comparator().InternalKeyComparator::Compare(
target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
: current_->key()) > 0) {
return true;
}
return false;
}
uint32_t ForwardIterator::FindFileInRange(
const std::vector<FileMetaData*>& files, const Slice& internal_key,
uint32_t left, uint32_t right) {
while (left < right) {
uint32_t mid = (left + right) / 2;
const FileMetaData* f = files[mid];
if (cfd_->internal_comparator().InternalKeyComparator::Compare(
f->largest.Encode(), internal_key) < 0) {
// Key at "mid.largest" is < "target". Therefore all
// files at or before "mid" are uninteresting.
left = mid + 1;
} else {
// Key at "mid.largest" is >= "target". Therefore all files
// after "mid" are uninteresting.
right = mid;
}
}
return right;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,105 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include <string>
#include <vector>
#include <queue>
#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "db/dbformat.h"
namespace rocksdb {
class DBImpl;
class Env;
struct SuperVersion;
class ColumnFamilyData;
class LevelIterator;
struct FileMetaData;
class MinIterComparator {
public:
explicit MinIterComparator(const Comparator* comparator) :
comparator_(comparator) {}
bool operator()(Iterator* a, Iterator* b) {
return comparator_->Compare(a->key(), b->key()) > 0;
}
private:
const Comparator* comparator_;
};
typedef std::priority_queue<Iterator*,
std::vector<Iterator*>,
MinIterComparator> MinIterHeap;
/**
* ForwardIterator is a special type of iterator that only supports Seek()
* and Next(). It is expected to perform better than TailingIterator by
* removing the encapsulation and making all information accessible within
* the iterator. At the current implementation, snapshot is taken at the
* time Seek() is called. The Next() followed do not see new values after.
*/
class ForwardIterator : public Iterator {
public:
ForwardIterator(DBImpl* db, const ReadOptions& read_options,
ColumnFamilyData* cfd);
virtual ~ForwardIterator();
void SeekToLast() override {
status_ = Status::NotSupported("ForwardIterator::SeekToLast()");
valid_ = false;
}
void Prev() {
status_ = Status::NotSupported("ForwardIterator::Prev");
valid_ = false;
}
virtual bool Valid() const override;
void SeekToFirst() override;
virtual void Seek(const Slice& target) override;
virtual void Next() override;
virtual Slice key() const override;
virtual Slice value() const override;
virtual Status status() const override;
private:
void Cleanup();
void RebuildIterators();
void SeekInternal(const Slice& internal_key, bool seek_to_first);
void UpdateCurrent();
bool NeedToSeekImmutable(const Slice& internal_key);
uint32_t FindFileInRange(
const std::vector<FileMetaData*>& files, const Slice& internal_key,
uint32_t left, uint32_t right);
DBImpl* const db_;
const ReadOptions read_options_;
ColumnFamilyData* const cfd_;
const SliceTransform* const prefix_extractor_;
const Comparator* user_comparator_;
MinIterHeap immutable_min_heap_;
SuperVersion* sv_;
Iterator* mutable_iter_;
std::vector<Iterator*> imm_iters_;
std::vector<Iterator*> l0_iters_;
std::vector<LevelIterator*> level_iters_;
Iterator* current_;
// internal iterator status
Status status_;
bool valid_;
IterKey prev_key_;
bool is_prev_set_;
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -20,6 +20,7 @@
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice_transform.h"
#include "table/merger.h"
#include "util/arena.h"
#include "util/coding.h"
#include "util/murmurhash.h"
@ -173,15 +174,24 @@ const char* EncodeKey(std::string* scratch, const Slice& target) {
class MemTableIterator: public Iterator {
public:
MemTableIterator(const MemTable& mem, const ReadOptions& options,
bool enforce_total_order)
bool enforce_total_order, Arena* arena)
: bloom_(nullptr),
prefix_extractor_(mem.prefix_extractor_),
valid_(false) {
valid_(false),
arena_mode_(arena != nullptr) {
if (prefix_extractor_ != nullptr && !enforce_total_order) {
bloom_ = mem.prefix_bloom_.get();
iter_.reset(mem.table_->GetDynamicPrefixIterator());
iter_ = mem.table_->GetDynamicPrefixIterator(arena);
} else {
iter_.reset(mem.table_->GetIterator());
iter_ = mem.table_->GetIterator(arena);
}
}
~MemTableIterator() {
if (arena_mode_) {
iter_->~Iterator();
} else {
delete iter_;
}
}
@ -228,8 +238,9 @@ class MemTableIterator: public Iterator {
private:
DynamicBloom* bloom_;
const SliceTransform* const prefix_extractor_;
std::unique_ptr<MemTableRep::Iterator> iter_;
MemTableRep::Iterator* iter_;
bool valid_;
bool arena_mode_;
// No copying allowed
MemTableIterator(const MemTableIterator&);
@ -237,8 +248,14 @@ class MemTableIterator: public Iterator {
};
Iterator* MemTable::NewIterator(const ReadOptions& options,
bool enforce_total_order) {
return new MemTableIterator(*this, options, enforce_total_order);
bool enforce_total_order, Arena* arena) {
if (arena == nullptr) {
return new MemTableIterator(*this, options, enforce_total_order, nullptr);
} else {
auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
return new (mem)
MemTableIterator(*this, options, enforce_total_order, arena);
}
}
port::RWMutex* MemTable::GetLock(const Slice& key) {

@ -21,6 +21,7 @@
namespace rocksdb {
class Arena;
class Mutex;
class MemTableIterator;
class MergeContext;
@ -77,8 +78,12 @@ class MemTable {
//
// By default, it returns an iterator for prefix seek if prefix_extractor
// is configured in Options.
// arena: If not null, the arena needs to be used to allocate the Iterator.
// Calling ~Iterator of the iterator will destroy all the states but
// those allocated in arena.
Iterator* NewIterator(const ReadOptions& options,
bool enforce_total_order = false);
bool enforce_total_order = false,
Arena* arena = nullptr);
// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.

@ -11,6 +11,7 @@
#include "db/version_set.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "table/merger.h"
#include "util/coding.h"
#include "util/log_buffer.h"
@ -78,6 +79,14 @@ void MemTableListVersion::AddIterators(const ReadOptions& options,
}
}
void MemTableListVersion::AddIterators(
const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) {
for (auto& m : memlist_) {
merge_iter_builder->AddIterator(
m->NewIterator(options, merge_iter_builder->GetArena()));
}
}
uint64_t MemTableListVersion::GetTotalNumEntries() const {
uint64_t total_num = 0;
for (auto& m : memlist_) {

@ -28,6 +28,7 @@ namespace rocksdb {
class ColumnFamilyData;
class InternalKeyComparator;
class Mutex;
class MergeIteratorBuilder;
// keeps a list of immutable memtables in a vector. the list is immutable
// if refcount is bigger than one. It is used as a state for Get() and
@ -49,6 +50,9 @@ class MemTableListVersion {
void AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iterator_list);
void AddIterators(const ReadOptions& options,
MergeIteratorBuilder* merge_iter_builder);
uint64_t GetTotalNumEntries() const;
private:

@ -83,7 +83,7 @@ public:
unique_ptr<RandomAccessFile> && file, uint64_t file_size,
unique_ptr<TableReader>* table_reader);
Iterator* NewIterator(const ReadOptions&) override;
Iterator* NewIterator(const ReadOptions&, Arena* arena) override;
Status Get(const ReadOptions&, const Slice& key, void* arg,
bool (*handle_result)(void* arg, const ParsedInternalKey& k,
@ -218,8 +218,14 @@ std::shared_ptr<const TableProperties> SimpleTableReader::GetTableProperties()
return rep_->table_properties;
}
Iterator* SimpleTableReader::NewIterator(const ReadOptions& options) {
Iterator* SimpleTableReader::NewIterator(const ReadOptions& options,
Arena* arena) {
if (arena == nullptr) {
return new SimpleTableIterator(this);
} else {
auto mem = arena->AllocateAligned(sizeof(SimpleTableIterator));
return new (mem) SimpleTableIterator(this);
}
}
Status SimpleTableReader::GetOffset(const Slice& target, uint64_t* offset) {

@ -13,6 +13,7 @@
#include "db/version_edit.h"
#include "rocksdb/statistics.h"
#include "table/iterator_wrapper.h"
#include "table/table_reader.h"
#include "util/coding.h"
#include "util/stop_watch.h"
@ -102,7 +103,7 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
const InternalKeyComparator& icomparator,
const FileMetaData& file_meta,
TableReader** table_reader_ptr,
bool for_compaction) {
bool for_compaction, Arena* arena) {
if (table_reader_ptr != nullptr) {
*table_reader_ptr = nullptr;
}
@ -113,12 +114,12 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
s = FindTable(toptions, icomparator, file_meta.number, file_meta.file_size,
&handle, nullptr, options.read_tier == kBlockCacheTier);
if (!s.ok()) {
return NewErrorIterator(s);
return NewErrorIterator(s, arena);
}
table_reader = GetTableReaderFromHandle(handle);
}
Iterator* result = table_reader->NewIterator(options);
Iterator* result = table_reader->NewIterator(options, arena);
if (handle != nullptr) {
result->RegisterCleanup(&UnrefEntry, cache_, handle);
}

@ -23,6 +23,7 @@
namespace rocksdb {
class Env;
class Arena;
struct FileMetaData;
// TODO(sdong): try to come up with a better API to pass the file information
@ -44,7 +45,7 @@ class TableCache {
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
TableReader** table_reader_ptr = nullptr,
bool for_compaction = false);
bool for_compaction = false, Arena* arena = nullptr);
// If a seek to internal key "k" in specified file finds an entry,
// call (*handle_result)(arg, found_key, found_value) repeatedly until

@ -332,6 +332,32 @@ void Version::AddIterators(const ReadOptions& read_options,
}
}
void Version::AddIterators(const ReadOptions& read_options,
const EnvOptions& soptions,
MergeIteratorBuilder* merge_iter_builder) {
// Merge all level zero files together since they may overlap
for (const FileMetaData* file : files_[0]) {
merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(), *file, nullptr,
false, merge_iter_builder->GetArena()));
}
// For levels > 0, we can use a concatenating iterator that sequentially
// walks through the non-overlapping files in the level, opening them
// lazily.
for (int level = 1; level < num_levels_; level++) {
if (!files_[level].empty()) {
merge_iter_builder->AddIterator(NewTwoLevelIterator(
new LevelFileIteratorState(
cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(), false /* for_compaction */,
cfd_->options()->prefix_extractor != nullptr),
new LevelFileNumIterator(cfd_->internal_comparator(), &files_[level]),
merge_iter_builder->GetArena()));
}
}
}
// Callback from TableCache::Get()
namespace {
enum SaverState {

@ -51,6 +51,7 @@ class MergeContext;
class ColumnFamilyData;
class ColumnFamilySet;
class TableCache;
class MergeIteratorBuilder;
// Return the smallest index i such that files[i]->largest >= key.
// Return files.size() if there is no such file.
@ -80,6 +81,9 @@ class Version {
void AddIterators(const ReadOptions&, const EnvOptions& soptions,
std::vector<Iterator*>* iters);
void AddIterators(const ReadOptions&, const EnvOptions& soptions,
MergeIteratorBuilder* merger_iter_builder);
// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status. Fills *stats.
// Uses *operands to store merge_operator operations to apply later
@ -218,6 +222,7 @@ class Version {
friend class LevelCompactionPicker;
friend class UniversalCompactionPicker;
friend class FIFOCompactionPicker;
friend class ForwardIterator;
class LevelFileNumIterator;
class LevelFileIteratorState;

@ -142,16 +142,28 @@ class MemTableRep {
};
// Return an iterator over the keys in this representation.
virtual Iterator* GetIterator() = 0;
// arena: If not null, the arena needs to be used to allocate the Iterator.
// When destroying the iterator, the caller will not call "delete"
// but Iterator::~Iterator() directly. The destructor needs to destroy
// all the states but those allocated in arena.
virtual Iterator* GetIterator(Arena* arena = nullptr) = 0;
// Return an iterator over at least the keys with the specified user key. The
// iterator may also allow access to other keys, but doesn't have to. Default:
// GetIterator().
virtual Iterator* GetIterator(const Slice& user_key) { return GetIterator(); }
virtual Iterator* GetIterator(const Slice& user_key) {
return GetIterator(nullptr);
}
// Return an iterator that has a special Seek semantics. The result of
// a Seek might only include keys with the same prefix as the target key.
virtual Iterator* GetDynamicPrefixIterator() { return GetIterator(); }
// arena: If not null, the arena needs to be used to allocate the Iterator.
// When destroying the iterator, the caller will not call "delete"
// but Iterator::~Iterator() directly. The destructor needs to destroy
// all the states but those allocated in arena.
virtual Iterator* GetDynamicPrefixIterator(Arena* arena = nullptr) {
return GetIterator(arena);
}
// Return true if the current MemTableRep supports merge operator.
// Default: true

@ -547,12 +547,9 @@ struct ColumnFamilyOptions {
// Control locality of bloom filter probes to improve cache miss rate.
// This option only applies to memtable prefix bloom and plaintable
// prefix bloom. It essentially limits the max number of cache lines each
// bloom filter check can touch.
// This optimization is turned off when set to 0. The number should never
// be greater than number of probes. This option can boost performance
// for in-memory workload but should use with care since it can cause
// higher false positive rate.
// prefix bloom. It essentially limits every bloom checking to one cache line.
// This optimization is turned off when set to 0, and positive number to turn
// it on.
// Default: 0
uint32_t bloom_locality;

@ -980,10 +980,11 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) {
return may_match;
}
Iterator* BlockBasedTable::NewIterator(const ReadOptions& read_options) {
return NewTwoLevelIterator(new BlockEntryIteratorState(this, read_options,
nullptr),
NewIndexIterator(read_options));
Iterator* BlockBasedTable::NewIterator(const ReadOptions& read_options,
Arena* arena) {
return NewTwoLevelIterator(
new BlockEntryIteratorState(this, read_options, nullptr),
NewIndexIterator(read_options), arena);
}
Status BlockBasedTable::Get(

@ -68,7 +68,7 @@ class BlockBasedTable : public TableReader {
// Returns a new iterator over the table contents.
// The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it).
Iterator* NewIterator(const ReadOptions&) override;
Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override;
Status Get(const ReadOptions& readOptions, const Slice& key,
void* handle_context,

@ -8,6 +8,8 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/iterator.h"
#include "table/iterator_wrapper.h"
#include "util/arena.h"
namespace rocksdb {
@ -65,8 +67,26 @@ Iterator* NewEmptyIterator() {
return new EmptyIterator(Status::OK());
}
Iterator* NewEmptyIterator(Arena* arena) {
if (arena == nullptr) {
return NewEmptyIterator();
} else {
auto mem = arena->AllocateAligned(sizeof(EmptyIterator));
return new (mem) EmptyIterator(Status::OK());
}
}
Iterator* NewErrorIterator(const Status& status) {
return new EmptyIterator(status);
}
Iterator* NewErrorIterator(const Status& status, Arena* arena) {
if (arena == nullptr) {
return NewErrorIterator(status);
} else {
auto mem = arena->AllocateAligned(sizeof(EmptyIterator));
return new (mem) EmptyIterator(status);
}
}
} // namespace rocksdb

@ -8,6 +8,9 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "rocksdb/iterator.h"
namespace rocksdb {
// A internal wrapper class with an interface similar to Iterator that
@ -20,14 +23,7 @@ class IteratorWrapper {
explicit IteratorWrapper(Iterator* iter): iter_(nullptr) {
Set(iter);
}
IteratorWrapper(const IteratorWrapper&) {
// Iterator wrapper exclusively owns iter_ so it cannot be copied.
// Didn't delete the function because vector<IteratorWrapper> requires
// this function to compile.
assert(false);
}
void operator=(const IteratorWrapper&) = delete;
~IteratorWrapper() { delete iter_; }
~IteratorWrapper() {}
Iterator* iter() const { return iter_; }
// Takes ownership of "iter" and will delete it when destroyed, or
@ -42,6 +38,14 @@ class IteratorWrapper {
}
}
void DeleteIter(bool is_arena_mode) {
if (!is_arena_mode) {
delete iter_;
} else {
iter_->~Iterator();
}
}
// Iterator interface methods
bool Valid() const { return valid_; }
Slice key() const { assert(Valid()); return key_; }
@ -67,4 +71,11 @@ class IteratorWrapper {
Slice key_;
};
class Arena;
// Return an empty iterator (yields nothing) allocated from arena.
extern Iterator* NewEmptyIterator(Arena* arena);
// Return an empty iterator with the specified status, allocated arena.
extern Iterator* NewErrorIterator(const Status& status, Arena* arena);
} // namespace rocksdb

@ -17,13 +17,13 @@
#include "rocksdb/options.h"
#include "table/iter_heap.h"
#include "table/iterator_wrapper.h"
#include "util/arena.h"
#include "util/stop_watch.h"
#include "util/perf_context_imp.h"
#include "util/autovector.h"
namespace rocksdb {
namespace {
typedef std::priority_queue<
IteratorWrapper*,
std::vector<IteratorWrapper*>,
@ -43,13 +43,16 @@ MaxIterHeap NewMaxIterHeap(const Comparator* comparator) {
MinIterHeap NewMinIterHeap(const Comparator* comparator) {
return MinIterHeap(MinIteratorComparator(comparator));
}
} // namespace
const size_t kNumIterReserve = 4;
class MergingIterator : public Iterator {
public:
MergingIterator(const Comparator* comparator, Iterator** children, int n)
: comparator_(comparator),
MergingIterator(const Comparator* comparator, Iterator** children, int n,
bool is_arena_mode)
: is_arena_mode_(is_arena_mode),
comparator_(comparator),
current_(nullptr),
use_heap_(true),
direction_(kForward),
@ -66,7 +69,20 @@ class MergingIterator : public Iterator {
}
}
virtual ~MergingIterator() { }
virtual void AddIterator(Iterator* iter) {
assert(direction_ == kForward);
children_.emplace_back(iter);
auto new_wrapper = children_.back();
if (new_wrapper.Valid()) {
minHeap_.push(&new_wrapper);
}
}
virtual ~MergingIterator() {
for (auto& child : children_) {
child.DeleteIter(is_arena_mode_);
}
}
virtual bool Valid() const {
return (current_ != nullptr);
@ -242,6 +258,7 @@ class MergingIterator : public Iterator {
void FindLargest();
void ClearHeaps();
bool is_arena_mode_;
const Comparator* comparator_;
autovector<IteratorWrapper, kNumIterReserve> children_;
IteratorWrapper* current_;
@ -288,16 +305,51 @@ void MergingIterator::ClearHeaps() {
maxHeap_ = NewMaxIterHeap(comparator_);
minHeap_ = NewMinIterHeap(comparator_);
}
} // namespace
Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n) {
Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n,
Arena* arena) {
assert(n >= 0);
if (n == 0) {
return NewEmptyIterator();
return NewEmptyIterator(arena);
} else if (n == 1) {
return list[0];
} else {
return new MergingIterator(cmp, list, n);
if (arena == nullptr) {
return new MergingIterator(cmp, list, n, false);
} else {
auto mem = arena->AllocateAligned(sizeof(MergingIterator));
return new (mem) MergingIterator(cmp, list, n, true);
}
}
}
MergeIteratorBuilder::MergeIteratorBuilder(const Comparator* comparator,
Arena* a)
: first_iter(nullptr), use_merging_iter(false), arena(a) {
auto mem = arena->AllocateAligned(sizeof(MergingIterator));
merge_iter = new (mem) MergingIterator(comparator, nullptr, 0, true);
}
void MergeIteratorBuilder::AddIterator(Iterator* iter) {
if (!use_merging_iter && first_iter != nullptr) {
merge_iter->AddIterator(first_iter);
use_merging_iter = true;
}
if (use_merging_iter) {
merge_iter->AddIterator(iter);
} else {
first_iter = iter;
}
}
Iterator* MergeIteratorBuilder::Finish() {
if (!use_merging_iter) {
return first_iter;
} else {
auto ret = merge_iter;
merge_iter = nullptr;
return ret;
}
}

@ -9,11 +9,14 @@
#pragma once
#include "rocksdb/types.h"
namespace rocksdb {
class Comparator;
class Iterator;
class Env;
class Arena;
// Return an iterator that provided the union of the data in
// children[0,n-1]. Takes ownership of the child iterators and
@ -24,6 +27,34 @@ class Env;
//
// REQUIRES: n >= 0
extern Iterator* NewMergingIterator(const Comparator* comparator,
Iterator** children, int n);
Iterator** children, int n,
Arena* arena = nullptr);
class MergingIterator;
// A builder class to build a merging iterator by adding iterators one by one.
class MergeIteratorBuilder {
public:
// comparator: the comparator used in merging comparator
// arena: where the merging iterator needs to be allocated from.
explicit MergeIteratorBuilder(const Comparator* comparator, Arena* arena);
~MergeIteratorBuilder() {}
// Add iter to the merging iterator.
void AddIterator(Iterator* iter);
// Get arena used to build the merging iterator. It is called one a child
// iterator needs to be allocated.
Arena* GetArena() { return arena; }
// Return the result merging iterator.
Iterator* Finish();
private:
MergingIterator* merge_iter;
Iterator* first_iter;
bool use_merging_iter;
Arena* arena;
};
} // namespace rocksdb

@ -156,8 +156,15 @@ Status PlainTableReader::Open(const Options& options,
void PlainTableReader::SetupForCompaction() {
}
Iterator* PlainTableReader::NewIterator(const ReadOptions& options) {
Iterator* PlainTableReader::NewIterator(const ReadOptions& options,
Arena* arena) {
if (arena == nullptr) {
return new PlainTableIterator(this, options_.prefix_extractor != nullptr);
} else {
auto mem = arena->AllocateAligned(sizeof(PlainTableIterator));
return new (mem)
PlainTableIterator(this, options_.prefix_extractor != nullptr);
}
}
struct PlainTableReader::IndexRecord {

@ -55,7 +55,7 @@ class PlainTableReader: public TableReader {
const int bloom_bits_per_key, double hash_table_ratio,
size_t index_sparseness, size_t huge_page_tlb_size);
Iterator* NewIterator(const ReadOptions&);
Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override;
Status Get(const ReadOptions&, const Slice& key, void* arg,
bool (*result_handler)(void* arg, const ParsedInternalKey& k,

@ -15,6 +15,7 @@ namespace rocksdb {
class Iterator;
struct ParsedInternalKey;
class Slice;
class Arena;
struct ReadOptions;
struct TableProperties;
@ -28,7 +29,11 @@ class TableReader {
// Returns a new iterator over the table contents.
// The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it).
virtual Iterator* NewIterator(const ReadOptions&) = 0;
// arena: If not null, the arena needs to be used to allocate the Iterator.
// When destroying the iterator, the caller will not call "delete"
// but Iterator::~Iterator() directly. The destructor needs to destroy
// all the states but those allocated in arena.
virtual Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) = 0;
// Given a key, return an approximate byte offset in the file where
// the data for that key begins (or would begin if the key were

@ -13,6 +13,7 @@
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/format.h"
#include "util/arena.h"
namespace rocksdb {
@ -23,7 +24,10 @@ class TwoLevelIterator: public Iterator {
explicit TwoLevelIterator(TwoLevelIteratorState* state,
Iterator* first_level_iter);
virtual ~TwoLevelIterator() {}
virtual ~TwoLevelIterator() {
first_level_iter_.DeleteIter(false);
second_level_iter_.DeleteIter(false);
}
virtual void Seek(const Slice& target);
virtual void SeekToFirst();
@ -183,8 +187,13 @@ void TwoLevelIterator::InitDataBlock() {
} // namespace
Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state,
Iterator* first_level_iter) {
Iterator* first_level_iter, Arena* arena) {
if (arena == nullptr) {
return new TwoLevelIterator(state, first_level_iter);
} else {
auto mem = arena->AllocateAligned(sizeof(TwoLevelIterator));
return new (mem) TwoLevelIterator(state, first_level_iter);
}
}
} // namespace rocksdb

@ -16,6 +16,7 @@ namespace rocksdb {
struct ReadOptions;
class InternalKeyComparator;
class Arena;
struct TwoLevelIteratorState {
explicit TwoLevelIteratorState(bool check_prefix_may_match)
@ -39,7 +40,11 @@ struct TwoLevelIteratorState {
//
// Uses a supplied function to convert an index_iter value into
// an iterator over the contents of the corresponding block.
// arena: If not null, the arena is used to allocate the Iterator.
// When destroying the iterator, the destructor will destroy
// all the states but those allocated in arena.
extern Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state,
Iterator* first_level_iter);
Iterator* first_level_iter,
Arena* arena = nullptr);
} // namespace rocksdb

@ -162,6 +162,14 @@ DEFINE_int32(max_background_compactions,
"The maximum number of concurrent background compactions "
"that can occur in parallel.");
DEFINE_int32(compaction_thread_pool_adjust_interval, 0,
"The interval (in milliseconds) to adjust compaction thread pool "
"size. Don't change it periodically if the value is 0.");
DEFINE_int32(compaction_thread_pool_varations, 2,
"Range of bakground thread pool size variations when adjusted "
"periodically.");
DEFINE_int32(max_background_flushes, rocksdb::Options().max_background_flushes,
"The maximum number of concurrent background flushes "
"that can occur in parallel.");
@ -567,6 +575,8 @@ class SharedState {
num_done_(0),
start_(false),
start_verify_(false),
should_stop_bg_thread_(false),
bg_thread_finished_(false),
stress_test_(stress_test),
verification_failure_(false) {
if (FLAGS_test_batches_snapshots) {
@ -694,6 +704,14 @@ class SharedState {
uint32_t GetSeed() const { return seed_; }
void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }
bool ShoudStopBgThread() { return should_stop_bg_thread_; }
void SetBgThreadFinish() { bg_thread_finished_ = true; }
bool BgThreadFinished() const { return bg_thread_finished_; }
private:
port::Mutex mu_;
port::CondVar cv_;
@ -707,6 +725,8 @@ class SharedState {
long num_done_;
bool start_;
bool start_verify_;
bool should_stop_bg_thread_;
bool bg_thread_finished_;
StressTest* stress_test_;
std::atomic<bool> verification_failure_;
@ -777,6 +797,11 @@ class StressTest {
threads[i] = new ThreadState(i, &shared);
FLAGS_env->StartThread(ThreadBody, threads[i]);
}
ThreadState bg_thread(0, &shared);
if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
FLAGS_env->StartThread(PoolSizeChangeThread, &bg_thread);
}
// Each thread goes through the following states:
// initializing -> wait for others to init -> read/populate/depopulate
// wait for others to operate -> verify -> done
@ -829,6 +854,14 @@ class StressTest {
}
PrintStatistics();
if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
MutexLock l(shared.GetMutex());
shared.SetShouldStopBgThread();
while (!shared.BgThreadFinished()) {
shared.GetCondVar()->Wait();
}
}
if (shared.HasVerificationFailedYet()) {
printf("Verification failed :(\n");
return false;
@ -879,6 +912,38 @@ class StressTest {
}
static void PoolSizeChangeThread(void* v) {
assert(FLAGS_compaction_thread_pool_adjust_interval > 0);
ThreadState* thread = reinterpret_cast<ThreadState*>(v);
SharedState* shared = thread->shared;
while (true) {
{
MutexLock l(shared->GetMutex());
if (shared->ShoudStopBgThread()) {
shared->SetBgThreadFinish();
shared->GetCondVar()->SignalAll();
return;
}
}
auto thread_pool_size_base = FLAGS_max_background_compactions;
auto thread_pool_size_var = FLAGS_compaction_thread_pool_varations;
int new_thread_pool_size =
thread_pool_size_base - thread_pool_size_var +
thread->rand.Next() % (thread_pool_size_var * 2 + 1);
if (new_thread_pool_size < 1) {
new_thread_pool_size = 1;
}
FLAGS_env->SetBackgroundThreads(new_thread_pool_size);
// Sleep up to 3 seconds
FLAGS_env->SleepForMicroseconds(
thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval *
1000 +
1);
}
}
// Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
// ("9"+K, "9"+V) in DB atomically i.e in a single batch.
// Also refer MultiGet.

@ -23,6 +23,8 @@ namespace rocksdb {
class Logger;
const size_t kInlineSize = 2048;
class Arena {
public:
// No copying allowed

@ -17,32 +17,39 @@ namespace {
static uint32_t BloomHash(const Slice& key) {
return Hash(key.data(), key.size(), 0xbc9f1d34);
}
uint32_t GetNumBlocks(uint32_t total_bits) {
uint32_t num_blocks = (total_bits + CACHE_LINE_SIZE * 8 - 1) /
(CACHE_LINE_SIZE * 8) * (CACHE_LINE_SIZE * 8);
// Make num_blocks an odd number to make sure more bits are involved
// when determining which block.
if (num_blocks % 2 == 0) {
num_blocks++;
}
return num_blocks;
}
}
DynamicBloom::DynamicBloom(uint32_t total_bits, uint32_t cl_per_block,
DynamicBloom::DynamicBloom(uint32_t total_bits, uint32_t locality,
uint32_t num_probes,
uint32_t (*hash_func)(const Slice& key),
size_t huge_page_tlb_size, Logger* logger)
: kBlocked(cl_per_block > 0),
kBitsPerBlock(std::min(cl_per_block, num_probes) * CACHE_LINE_SIZE * 8),
kTotalBits((kBlocked ? (total_bits + kBitsPerBlock - 1) / kBitsPerBlock *
kBitsPerBlock
: total_bits + 7) /
: kTotalBits(((locality > 0) ? GetNumBlocks(total_bits) : total_bits + 7) /
8 * 8),
kNumBlocks(kBlocked ? kTotalBits / kBitsPerBlock : 1),
kNumBlocks((locality > 0) ? kTotalBits / (CACHE_LINE_SIZE * 8) : 0),
kNumProbes(num_probes),
hash_func_(hash_func == nullptr ? &BloomHash : hash_func) {
assert(kBlocked ? kTotalBits > 0 : kTotalBits >= kBitsPerBlock);
assert(kNumBlocks > 0 || kTotalBits > 0);
assert(kNumProbes > 0);
uint32_t sz = kTotalBits / 8;
if (kBlocked) {
if (kNumBlocks > 0) {
sz += CACHE_LINE_SIZE - 1;
}
raw_ = reinterpret_cast<unsigned char*>(
arena_.AllocateAligned(sz, huge_page_tlb_size, logger));
memset(raw_, 0, sz);
if (kBlocked && (reinterpret_cast<uint64_t>(raw_) % CACHE_LINE_SIZE)) {
if (kNumBlocks > 0 && (reinterpret_cast<uint64_t>(raw_) % CACHE_LINE_SIZE)) {
data_ = raw_ + CACHE_LINE_SIZE -
reinterpret_cast<uint64_t>(raw_) % CACHE_LINE_SIZE;
} else {

@ -8,6 +8,7 @@
#include <atomic>
#include <memory>
#include "port/port.h"
#include <util/arena.h>
namespace rocksdb {
@ -19,15 +20,14 @@ class DynamicBloom {
public:
// total_bits: fixed total bits for the bloom
// num_probes: number of hash probes for a single key
// cl_per_block: block size in cache lines. When this is non-zero, a
// query/set is done within a block to improve cache locality.
// locality: If positive, optimize for cache line locality, 0 otherwise.
// hash_func: customized hash function
// huge_page_tlb_size: if >0, try to allocate bloom bytes from huge page TLB
// withi this page size. Need to reserve huge pages for
// it to be allocated, like:
// sysctl -w vm.nr_hugepages=20
// See linux doc Documentation/vm/hugetlbpage.txt
explicit DynamicBloom(uint32_t total_bits, uint32_t cl_per_block = 0,
explicit DynamicBloom(uint32_t total_bits, uint32_t locality = 0,
uint32_t num_probes = 6,
uint32_t (*hash_func)(const Slice& key) = nullptr,
size_t huge_page_tlb_size = 0,
@ -48,8 +48,6 @@ class DynamicBloom {
bool MayContainHash(uint32_t hash);
private:
const bool kBlocked;
const uint32_t kBitsPerBlock;
const uint32_t kTotalBits;
const uint32_t kNumBlocks;
const uint32_t kNumProbes;
@ -69,13 +67,18 @@ inline bool DynamicBloom::MayContain(const Slice& key) {
inline bool DynamicBloom::MayContainHash(uint32_t h) {
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
if (kBlocked) {
uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * kBitsPerBlock;
if (kNumBlocks != 0) {
uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * (CACHE_LINE_SIZE * 8);
for (uint32_t i = 0; i < kNumProbes; ++i) {
const uint32_t bitpos = b + h % kBitsPerBlock;
// Since CACHE_LINE_SIZE is defined as 2^n, this line will be optimized
// to a simple and operation by compiler.
const uint32_t bitpos = b + (h % (CACHE_LINE_SIZE * 8));
if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) {
return false;
}
// Rotate h so that we don't reuse the same bytes.
h = h / (CACHE_LINE_SIZE * 8) +
(h % (CACHE_LINE_SIZE * 8)) * (0x20000000U / CACHE_LINE_SIZE);
h += delta;
}
} else {
@ -92,11 +95,16 @@ inline bool DynamicBloom::MayContainHash(uint32_t h) {
inline void DynamicBloom::AddHash(uint32_t h) {
const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits
if (kBlocked) {
uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * kBitsPerBlock;
if (kNumBlocks != 0) {
uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * (CACHE_LINE_SIZE * 8);
for (uint32_t i = 0; i < kNumProbes; ++i) {
const uint32_t bitpos = b + h % kBitsPerBlock;
// Since CACHE_LINE_SIZE is defined as 2^n, this line will be optimized
// to a simple and operation by compiler.
const uint32_t bitpos = b + (h % (CACHE_LINE_SIZE * 8));
data_[bitpos / 8] |= (1 << (bitpos % 8));
// Rotate h so that we don't reuse the same bytes.
h = h / (CACHE_LINE_SIZE * 8) +
(h % (CACHE_LINE_SIZE * 8)) * (0x20000000U / CACHE_LINE_SIZE);
h += delta;
}
} else {

@ -91,17 +91,16 @@ TEST(DynamicBloomTest, VaryingLengths) {
fprintf(stderr, "bits_per_key: %d num_probes: %d\n",
FLAGS_bits_per_key, num_probes);
for (uint32_t cl_per_block = 0; cl_per_block < num_probes;
++cl_per_block) {
for (uint32_t enable_locality = 0; enable_locality < 2; ++enable_locality) {
for (uint32_t num = 1; num <= 10000; num = NextNum(num)) {
uint32_t bloom_bits = 0;
if (cl_per_block == 0) {
if (enable_locality == 0) {
bloom_bits = std::max(num * FLAGS_bits_per_key, 64U);
} else {
bloom_bits = std::max(num * FLAGS_bits_per_key,
cl_per_block * CACHE_LINE_SIZE * 8);
enable_locality * CACHE_LINE_SIZE * 8);
}
DynamicBloom bloom(bloom_bits, cl_per_block, num_probes);
DynamicBloom bloom(bloom_bits, enable_locality, num_probes);
for (uint64_t i = 0; i < num; i++) {
bloom.Add(Key(i, buffer));
ASSERT_TRUE(bloom.MayContain(Key(i, buffer)));
@ -123,8 +122,10 @@ TEST(DynamicBloomTest, VaryingLengths) {
}
double rate = result / 10000.0;
fprintf(stderr, "False positives: %5.2f%% @ num = %6u, bloom_bits = %6u, "
"cl per block = %u\n", rate*100.0, num, bloom_bits, cl_per_block);
fprintf(stderr,
"False positives: %5.2f%% @ num = %6u, bloom_bits = %6u, "
"enable locality?%u\n",
rate * 100.0, num, bloom_bits, enable_locality);
if (rate > 0.0125)
mediocre_filters++; // Allowed, but not too often
@ -173,20 +174,20 @@ TEST(DynamicBloomTest, perf) {
elapsed / count);
ASSERT_TRUE(count == num_keys);
for (uint32_t cl_per_block = 1; cl_per_block <= num_probes;
++cl_per_block) {
DynamicBloom blocked_bloom(num_keys * 10, cl_per_block, num_probes);
// Locality enabled version
DynamicBloom blocked_bloom(num_keys * 10, 1, num_probes);
timer.Start();
for (uint64_t i = 1; i <= num_keys; ++i) {
blocked_bloom.Add(Slice(reinterpret_cast<const char*>(&i), 8));
}
uint64_t elapsed = timer.ElapsedNanos();
fprintf(stderr, "blocked bloom(%d), avg add latency %" PRIu64 "\n",
cl_per_block, elapsed / num_keys);
elapsed = timer.ElapsedNanos();
fprintf(stderr,
"blocked bloom(enable locality), avg add latency %" PRIu64 "\n",
elapsed / num_keys);
uint64_t count = 0;
count = 0;
timer.Start();
for (uint64_t i = 1; i <= num_keys; ++i) {
if (blocked_bloom.MayContain(
@ -196,12 +197,12 @@ TEST(DynamicBloomTest, perf) {
}
elapsed = timer.ElapsedNanos();
fprintf(stderr, "blocked bloom(%d), avg query latency %" PRIu64 "\n",
cl_per_block, elapsed / count);
fprintf(stderr,
"blocked bloom(enable locality), avg query latency %" PRIu64 "\n",
elapsed / count);
ASSERT_TRUE(count == num_keys);
}
}
}
} // namespace rocksdb

@ -250,7 +250,7 @@ class HashCuckooRep : public MemTableRep {
// are sorted according to the user specified KeyComparator. Note that
// any insert after this function call may affect the sorted nature of
// the returned iterator.
virtual MemTableRep::Iterator* GetIterator() override {
virtual MemTableRep::Iterator* GetIterator(Arena* arena) override {
std::vector<const char*> compact_buckets;
for (unsigned int bid = 0; bid < bucket_count_; ++bid) {
const char* bucket = cuckoo_array_[bid].load(std::memory_order_relaxed);
@ -265,10 +265,18 @@ class HashCuckooRep : public MemTableRep {
compact_buckets.push_back(iter->key());
}
}
if (arena == nullptr) {
return new Iterator(
std::shared_ptr<std::vector<const char*>>(
new std::vector<const char*>(std::move(compact_buckets))),
compare_);
} else {
auto mem = arena->AllocateAligned(sizeof(Iterator));
return new (mem) Iterator(
std::shared_ptr<std::vector<const char*>>(
new std::vector<const char*>(std::move(compact_buckets))),
compare_);
}
}
};

@ -70,11 +70,12 @@ class HashLinkListRep : public MemTableRep {
virtual ~HashLinkListRep();
virtual MemTableRep::Iterator* GetIterator() override;
virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override;
virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override;
virtual MemTableRep::Iterator* GetDynamicPrefixIterator() override;
virtual MemTableRep::Iterator* GetDynamicPrefixIterator(
Arena* arena = nullptr) override;
private:
friend class DynamicIterator;
@ -411,7 +412,7 @@ void HashLinkListRep::Get(const LookupKey& k, void* callback_args,
}
}
MemTableRep::Iterator* HashLinkListRep::GetIterator() {
MemTableRep::Iterator* HashLinkListRep::GetIterator(Arena* alloc_arena) {
// allocate a new arena of similar size to the one currently in use
Arena* new_arena = new Arena(arena_->BlockSize());
auto list = new FullList(compare_, new_arena);
@ -424,7 +425,12 @@ MemTableRep::Iterator* HashLinkListRep::GetIterator() {
}
}
}
if (alloc_arena == nullptr) {
return new FullListIterator(list, new_arena);
} else {
auto mem = alloc_arena->AllocateAligned(sizeof(FullListIterator));
return new (mem) FullListIterator(list, new_arena);
}
}
MemTableRep::Iterator* HashLinkListRep::GetIterator(const Slice& slice) {
@ -435,8 +441,14 @@ MemTableRep::Iterator* HashLinkListRep::GetIterator(const Slice& slice) {
return new Iterator(this, bucket);
}
MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator() {
MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator(
Arena* alloc_arena) {
if (alloc_arena == nullptr) {
return new DynamicIterator(*this);
} else {
auto mem = alloc_arena->AllocateAligned(sizeof(DynamicIterator));
return new (mem) DynamicIterator(*this);
}
}
bool HashLinkListRep::BucketContains(Node* head, const Slice& user_key) const {

@ -38,11 +38,12 @@ class HashSkipListRep : public MemTableRep {
virtual ~HashSkipListRep();
virtual MemTableRep::Iterator* GetIterator() override;
virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override;
virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override;
virtual MemTableRep::Iterator* GetDynamicPrefixIterator() override;
virtual MemTableRep::Iterator* GetDynamicPrefixIterator(
Arena* arena = nullptr) override;
private:
friend class DynamicIterator;
@ -288,7 +289,7 @@ void HashSkipListRep::Get(const LookupKey& k, void* callback_args,
}
}
MemTableRep::Iterator* HashSkipListRep::GetIterator() {
MemTableRep::Iterator* HashSkipListRep::GetIterator(Arena* arena) {
// allocate a new arena of similar size to the one currently in use
Arena* new_arena = new Arena(arena_->BlockSize());
auto list = new Bucket(compare_, new_arena);
@ -301,7 +302,12 @@ MemTableRep::Iterator* HashSkipListRep::GetIterator() {
}
}
}
if (arena == nullptr) {
return new Iterator(list, true, new_arena);
} else {
auto mem = arena->AllocateAligned(sizeof(Iterator));
return new (mem) Iterator(list, true, new_arena);
}
}
MemTableRep::Iterator* HashSkipListRep::GetIterator(const Slice& slice) {
@ -312,8 +318,13 @@ MemTableRep::Iterator* HashSkipListRep::GetIterator(const Slice& slice) {
return new Iterator(bucket, false);
}
MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator() {
MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator(Arena* arena) {
if (arena == nullptr) {
return new DynamicIterator(*this);
} else {
auto mem = arena->AllocateAligned(sizeof(DynamicIterator));
return new (mem) DynamicIterator(*this);
}
}
} // anon namespace

@ -490,7 +490,9 @@ ColumnFamilyOptions* ColumnFamilyOptions::OptimizeForPointLookup() {
BlockBasedTableOptions block_based_options;
block_based_options.index_type = BlockBasedTableOptions::kBinarySearch;
table_factory.reset(new BlockBasedTableFactory(block_based_options));
#ifndef ROCKSDB_LITE
memtable_factory.reset(NewHashLinkListRepFactory());
#endif
return this;
}

@ -6,6 +6,7 @@
#include "rocksdb/memtablerep.h"
#include "db/memtable.h"
#include "db/skiplist.h"
#include "util/arena.h"
namespace rocksdb {
namespace {
@ -108,8 +109,13 @@ public:
// Unhide default implementations of GetIterator
using MemTableRep::GetIterator;
virtual MemTableRep::Iterator* GetIterator() override {
virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override {
if (arena == nullptr) {
return new SkipListRep::Iterator(&skip_list_);
} else {
auto mem = arena->AllocateAligned(sizeof(SkipListRep::Iterator));
return new (mem) SkipListRep::Iterator(&skip_list_);
}
}
};
}

@ -95,7 +95,7 @@ class VectorRep : public MemTableRep {
using MemTableRep::GetIterator;
// Return an iterator over the keys in this representation.
virtual MemTableRep::Iterator* GetIterator() override;
virtual MemTableRep::Iterator* GetIterator(Arena* arena) override;
private:
friend class Iterator;
@ -259,16 +259,28 @@ void VectorRep::Get(const LookupKey& k, void* callback_args,
}
}
MemTableRep::Iterator* VectorRep::GetIterator() {
MemTableRep::Iterator* VectorRep::GetIterator(Arena* arena) {
char* mem = nullptr;
if (arena != nullptr) {
mem = arena->AllocateAligned(sizeof(Iterator));
}
ReadLock l(&rwlock_);
// Do not sort here. The sorting would be done the first time
// a Seek is performed on the iterator.
if (immutable_) {
if (arena == nullptr) {
return new Iterator(this, bucket_, compare_);
} else {
return new (mem) Iterator(this, bucket_, compare_);
}
} else {
std::shared_ptr<Bucket> tmp;
tmp.reset(new Bucket(*bucket_)); // make a copy
if (arena == nullptr) {
return new Iterator(nullptr, tmp, compare_);
} else {
return new (mem) Iterator(nullptr, tmp, compare_);
}
}
}
} // anon namespace

@ -918,7 +918,6 @@ TEST(BackupableDBTest, RateLimiting) {
auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) /
backupable_options_->backup_rate_limit;
ASSERT_GT(backup_time, 0.9 * rate_limited_backup_time);
ASSERT_LT(backup_time, 2.5 * rate_limited_backup_time);
CloseBackupableDB();

Loading…
Cancel
Save