Reuse file iterators in tailing iterator when memtable is flushed

Summary:
Under a tailing workload, there were increased block cache
misses when a memtable was flushed because we were rebuilding iterators
in that case since the version set changed. This was exacerbated in the
case of iterate_upper_bound, since file iterators which were over the
iterate_upper_bound would have been deleted and are now brought back as
part of the Rebuild, only to be deleted again. We now renew the iterators
and only build iterators for files which are added and delete file
iterators for files which are deleted.
Refer to https://reviews.facebook.net/D50463 for previous version

Test Plan: DBTestTailingIterator.TailingIteratorTrimSeekToNext

Reviewers: anthony, IslamAbdelRahman, igor, tnovak, yhchiang, sdong

Reviewed By: sdong

Subscribers: yhchiang, march, dhruba, leveldb, lovro

Differential Revision: https://reviews.facebook.net/D50679
main
Venkatesh Radhakrishnan 9 years ago
parent 2ae4d7d708
commit 7824444bfc
  1. 1
      CMakeLists.txt
  2. 5
      Makefile
  3. 10
      db/db_tailing_iter_test.cc
  4. 133
      db/forward_iterator.cc
  5. 4
      db/forward_iterator.h
  6. 375
      db/forward_iterator_bench.cc
  7. 1
      src.mk

@ -285,6 +285,7 @@ target_link_libraries(rocksdb${ARTIFACT_SUFFIX} ${LIBS})
set(APPS
db/db_bench.cc
db/forward_iterator_bench.cc
db/memtablerep_bench.cc
table/table_reader_bench.cc
tools/db_stress.cc

@ -337,7 +337,7 @@ TOOLS = \
rocksdb_dump \
rocksdb_undump
BENCHMARKS = db_bench table_reader_bench cache_bench memtablerep_bench
BENCHMARKS = db_bench table_reader_bench cache_bench memtablerep_bench forward_iterator_bench
# if user didn't config LIBNAME, set the default
ifeq ($(LIBNAME),)
@ -978,6 +978,9 @@ ldb_cmd_test: tools/ldb_cmd_test.o $(LIBOBJECTS) $(TESTHARNESS)
ldb: tools/ldb.o $(LIBOBJECTS)
$(AM_LINK)
forward_iterator_bench: db/forward_iterator_bench.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
#-------------------------------------------------
# make install related stuff
INSTALL_PATH ?= /usr/local

@ -140,6 +140,8 @@ TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) {
std::unique_ptr<Iterator> iterh(db_->NewIterator(read_options, handles_[1]));
std::string value(1024, 'a');
bool file_iters_deleted = false;
bool file_iters_renewed_null = false;
bool file_iters_renewed_copy = false;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"ForwardIterator::SeekInternal:Return", [&](void* arg) {
ForwardIterator* fiter = reinterpret_cast<ForwardIterator*>(arg);
@ -152,6 +154,12 @@ TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) {
ASSERT_TRUE(!file_iters_deleted ||
fiter->TEST_CheckDeletedIters(&deleted_iters, &num_iters));
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"ForwardIterator::RenewIterators:Null",
[&](void* arg) { file_iters_renewed_null = true; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"ForwardIterator::RenewIterators:Copy",
[&](void* arg) { file_iters_renewed_copy = true; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
const int num_records = 1000;
for (int i = 1; i < num_records; ++i) {
@ -203,6 +211,8 @@ TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) {
ASSERT_LE(num_iters, 1);
file_iters_deleted = false;
}
ASSERT_TRUE(file_iters_renewed_null);
ASSERT_TRUE(file_iters_renewed_copy);
iter = 0;
itern = 0;
iterh = 0;

@ -144,6 +144,23 @@ ForwardIterator::~ForwardIterator() {
Cleanup(true);
}
void ForwardIterator::SVCleanup() {
if (sv_ != nullptr && sv_->Unref()) {
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
db_->mutex_.Lock();
sv_->Cleanup();
db_->FindObsoleteFiles(&job_context, false, true);
db_->mutex_.Unlock();
delete sv_;
if (job_context.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(job_context);
}
job_context.Clean();
}
}
void ForwardIterator::Cleanup(bool release_sv) {
if (mutable_iter_ != nullptr) {
mutable_iter_->~InternalIterator();
@ -162,20 +179,7 @@ void ForwardIterator::Cleanup(bool release_sv) {
level_iters_.clear();
if (release_sv) {
if (sv_ != nullptr && sv_->Unref()) {
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
db_->mutex_.Lock();
sv_->Cleanup();
db_->FindObsoleteFiles(&job_context, false, true);
db_->mutex_.Unlock();
delete sv_;
if (job_context.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(job_context);
}
job_context.Clean();
}
SVCleanup();
}
}
@ -185,9 +189,10 @@ bool ForwardIterator::Valid() const {
}
void ForwardIterator::SeekToFirst() {
if (sv_ == nullptr ||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
if (sv_ == nullptr) {
RebuildIterators(true);
} else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
RenewIterators();
} else if (immutable_status_.IsIncomplete()) {
ResetIncompleteIterators();
}
@ -205,9 +210,10 @@ void ForwardIterator::Seek(const Slice& internal_key) {
if (IsOverUpperBound(internal_key)) {
valid_ = false;
}
if (sv_ == nullptr ||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
if (sv_ == nullptr) {
RebuildIterators(true);
} else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
RenewIterators();
} else if (immutable_status_.IsIncomplete()) {
ResetIncompleteIterators();
}
@ -227,7 +233,9 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
// an option to turn it off.
if (seek_to_first || NeedToSeekImmutable(internal_key)) {
immutable_status_ = Status::OK();
if (has_iter_trimmed_for_upper_bound_) {
if ((has_iter_trimmed_for_upper_bound_) &&
(cfd_->internal_comparator().InternalKeyComparator::Compare(
prev_key_.GetKey(), internal_key) > 0)) {
// Some iterators are trimmed. Need to rebuild.
RebuildIterators(true);
// Already seeked mutable iter, so seek again
@ -393,7 +401,11 @@ void ForwardIterator::Next() {
std::string current_key = key().ToString();
Slice old_key(current_key.data(), current_key.size());
RebuildIterators(true);
if (sv_ == nullptr) {
RebuildIterators(true);
} else {
RenewIterators();
}
SeekInternal(old_key, false);
if (!valid_ || key().compare(old_key) != 0) {
return;
@ -485,26 +497,93 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd));
}
level_iters_.reserve(vstorage->num_levels() - 1);
BuildLevelIterators(vstorage);
current_ = nullptr;
is_prev_set_ = false;
}
void ForwardIterator::RenewIterators() {
SuperVersion* svnew;
assert(sv_);
svnew = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
if (mutable_iter_ != nullptr) {
mutable_iter_->~InternalIterator();
}
for (auto* m : imm_iters_) {
m->~InternalIterator();
}
imm_iters_.clear();
mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_);
svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_);
const auto* vstorage = sv_->current->storage_info();
const auto& l0_files = vstorage->LevelFiles(0);
const auto* vstorage_new = svnew->current->storage_info();
const auto& l0_files_new = vstorage_new->LevelFiles(0);
uint32_t iold, inew;
bool found;
std::vector<InternalIterator*> l0_iters_new;
l0_iters_new.reserve(l0_files_new.size());
for (inew = 0; inew < l0_files_new.size(); inew++) {
found = false;
for (iold = 0; iold < l0_files.size(); iold++) {
if (l0_files[iold] == l0_files_new[inew]) {
found = true;
break;
}
}
if (found) {
if (l0_iters_[iold] == nullptr) {
l0_iters_new.push_back(nullptr);
TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Null", this);
} else {
l0_iters_new.push_back(l0_iters_[iold]);
l0_iters_[iold] = nullptr;
TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Copy", this);
}
continue;
}
l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
l0_files_new[inew]->fd));
}
for (auto* f : l0_iters_) {
delete f;
}
l0_iters_.clear();
l0_iters_ = l0_iters_new;
for (auto* l : level_iters_) {
delete l;
}
BuildLevelIterators(vstorage_new);
current_ = nullptr;
is_prev_set_ = false;
SVCleanup();
sv_ = svnew;
}
void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) {
for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
const auto& level_files = vstorage->LevelFiles(level);
if ((level_files.empty()) ||
((read_options_.iterate_upper_bound != nullptr) &&
(user_comparator_->Compare(*read_options_.iterate_upper_bound,
level_files[0]->smallest.user_key()) <
0))) {
level_iters_.push_back(nullptr);
level_iters_[level - 1] = nullptr;
if (!level_files.empty()) {
has_iter_trimmed_for_upper_bound_ = true;
}
} else {
level_iters_.push_back(
new LevelIterator(cfd_, read_options_, level_files));
level_iters_[level - 1] =
new LevelIterator(cfd_, read_options_, level_files);
}
}
current_ = nullptr;
is_prev_set_ = false;
}
void ForwardIterator::ResetIncompleteIterators() {

@ -24,6 +24,7 @@ class Env;
struct SuperVersion;
class ColumnFamilyData;
class LevelIterator;
class VersionStorageInfo;
struct FileMetaData;
class MinIterComparator {
@ -74,7 +75,10 @@ class ForwardIterator : public InternalIterator {
private:
void Cleanup(bool release_sv);
void SVCleanup();
void RebuildIterators(bool refresh_sv);
void RenewIterators();
void BuildLevelIterators(const VersionStorageInfo* vstorage);
void ResetIncompleteIterators();
void SeekInternal(const Slice& internal_key, bool seek_to_first);
void UpdateCurrent();

@ -0,0 +1,375 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#ifndef GFLAGS
#include <cstdio>
int main() {
fprintf(stderr, "Please install gflags to run rocksdb tools\n");
return 1;
}
#else
#ifndef ROCKSDB_LITE
#include <gflags/gflags.h>
#include <semaphore.h>
#include <atomic>
#include <bitset>
#include <chrono>
#include <climits>
#include <condition_variable>
#include <limits>
#include <mutex>
#include <queue>
#include <random>
#include <thread>
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "util/testharness.h"
const int MAX_SHARDS = 100000;
DEFINE_int64(writers, 8, "");
DEFINE_int64(readers, 8, "");
DEFINE_int64(rate, 100000, "");
DEFINE_int64(value_size, 300, "");
DEFINE_int64(shards, 1000, "");
DEFINE_int64(memtable_size, 500000000, "");
DEFINE_int64(block_cache_size, 300000000, "");
DEFINE_int64(block_size, 65536, "");
DEFINE_double(runtime, 300.0, "");
DEFINE_bool(cache_only_first, true, "");
DEFINE_bool(iterate_upper_bound, true, "");
struct Stats {
char pad1[128];
std::atomic<uint64_t> written{0};
char pad2[128];
std::atomic<uint64_t> read{0};
std::atomic<uint64_t> cache_misses{0};
char pad3[128];
} stats;
struct Key {
Key() {}
Key(uint64_t shard_in, uint64_t seqno_in)
: shard_be(htobe64(shard_in)), seqno_be(htobe64(seqno_in)) {}
uint64_t shard() const { return be64toh(shard_be); }
uint64_t seqno() const { return be64toh(seqno_be); }
private:
uint64_t shard_be;
uint64_t seqno_be;
} __attribute__((__packed__));
struct Reader;
struct Writer;
struct ShardState {
char pad1[128];
std::atomic<uint64_t> last_written{0};
Writer* writer;
Reader* reader;
char pad2[128];
std::atomic<uint64_t> last_read{0};
std::unique_ptr<rocksdb::Iterator> it;
std::unique_ptr<rocksdb::Iterator> it_cacheonly;
Key upper_bound;
rocksdb::Slice upper_bound_slice;
char pad3[128];
};
struct Reader {
public:
explicit Reader(std::vector<ShardState>* shard_states, rocksdb::DB* db)
: shard_states_(shard_states), db_(db) {
sem_init(&sem_, 0, 0);
thread_ = std::thread(&Reader::run, this);
}
void run() {
while (1) {
sem_wait(&sem_);
if (done_.load()) {
break;
}
uint64_t shard;
{
std::lock_guard<std::mutex> guard(queue_mutex_);
assert(!shards_pending_queue_.empty());
shard = shards_pending_queue_.front();
shards_pending_queue_.pop();
shards_pending_set_.reset(shard);
}
readOnceFromShard(shard);
}
}
void readOnceFromShard(uint64_t shard) {
ShardState& state = (*shard_states_)[shard];
if (!state.it) {
// Initialize iterators
rocksdb::ReadOptions options;
options.tailing = true;
if (FLAGS_iterate_upper_bound) {
state.upper_bound = Key(shard, std::numeric_limits<uint64_t>::max());
state.upper_bound_slice = rocksdb::Slice(
(const char*)&state.upper_bound, sizeof(state.upper_bound));
options.iterate_upper_bound = &state.upper_bound_slice;
}
state.it.reset(db_->NewIterator(options));
if (FLAGS_cache_only_first) {
options.read_tier = rocksdb::ReadTier::kBlockCacheTier;
state.it_cacheonly.reset(db_->NewIterator(options));
}
}
const uint64_t upto = state.last_written.load();
for (rocksdb::Iterator* it : {state.it_cacheonly.get(), state.it.get()}) {
if (it == nullptr) {
continue;
}
if (state.last_read.load() >= upto) {
break;
}
bool need_seek = true;
for (uint64_t seq = state.last_read.load() + 1; seq <= upto; ++seq) {
if (need_seek) {
Key from(shard, state.last_read.load() + 1);
it->Seek(rocksdb::Slice((const char*)&from, sizeof(from)));
need_seek = false;
} else {
it->Next();
}
if (it->status().IsIncomplete()) {
++::stats.cache_misses;
break;
}
assert(it->Valid());
assert(it->key().size() == sizeof(Key));
Key key;
memcpy(&key, it->key().data(), it->key().size());
// fprintf(stderr, "Expecting (%ld, %ld) read (%ld, %ld)\n",
// shard, seq, key.shard(), key.seqno());
assert(key.shard() == shard);
assert(key.seqno() == seq);
state.last_read.store(seq);
++::stats.read;
}
}
}
void onWrite(uint64_t shard) {
{
std::lock_guard<std::mutex> guard(queue_mutex_);
if (!shards_pending_set_.test(shard)) {
shards_pending_queue_.push(shard);
shards_pending_set_.set(shard);
sem_post(&sem_);
}
}
}
~Reader() {
done_.store(true);
sem_post(&sem_);
thread_.join();
}
private:
char pad1[128];
std::vector<ShardState>* shard_states_;
rocksdb::DB* db_;
std::thread thread_;
sem_t sem_;
std::mutex queue_mutex_;
std::bitset<MAX_SHARDS + 1> shards_pending_set_;
std::queue<uint64_t> shards_pending_queue_;
std::atomic<bool> done_{false};
char pad2[128];
};
struct Writer {
explicit Writer(std::vector<ShardState>* shard_states, rocksdb::DB* db)
: shard_states_(shard_states), db_(db) {}
void start() { thread_ = std::thread(&Writer::run, this); }
void run() {
std::queue<std::chrono::steady_clock::time_point> workq;
std::chrono::steady_clock::time_point deadline(
std::chrono::steady_clock::now() +
std::chrono::nanoseconds((uint64_t)(1000000000 * FLAGS_runtime)));
std::vector<uint64_t> my_shards;
for (int i = 1; i <= FLAGS_shards; ++i) {
if ((*shard_states_)[i].writer == this) {
my_shards.push_back(i);
}
}
std::mt19937 rng{std::random_device()()};
std::uniform_int_distribution<int> shard_dist(0, my_shards.size() - 1);
std::string value(FLAGS_value_size, '*');
while (1) {
auto now = std::chrono::steady_clock::now();
if (FLAGS_runtime >= 0 && now >= deadline) {
break;
}
if (workq.empty()) {
for (int i = 0; i < FLAGS_rate; i += FLAGS_writers) {
std::chrono::nanoseconds offset(1000000000LL * i / FLAGS_rate);
workq.push(now + offset);
}
}
while (!workq.empty() && workq.front() < now) {
workq.pop();
uint64_t shard = my_shards[shard_dist(rng)];
ShardState& state = (*shard_states_)[shard];
uint64_t seqno = state.last_written.load() + 1;
Key key(shard, seqno);
// fprintf(stderr, "Writing (%ld, %ld)\n", shard, seqno);
rocksdb::Status status =
db_->Put(rocksdb::WriteOptions(),
rocksdb::Slice((const char*)&key, sizeof(key)),
rocksdb::Slice(value));
assert(status.ok());
state.last_written.store(seqno);
state.reader->onWrite(shard);
++::stats.written;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
// fprintf(stderr, "Writer done\n");
}
~Writer() { thread_.join(); }
private:
char pad1[128];
std::vector<ShardState>* shard_states_;
rocksdb::DB* db_;
std::thread thread_;
char pad2[128];
};
struct StatsThread {
explicit StatsThread(rocksdb::DB* db)
: db_(db), thread_(&StatsThread::run, this) {}
void run() {
// using namespace std::chrono;
auto tstart = std::chrono::steady_clock::now(), tlast = tstart;
uint64_t wlast = 0, rlast = 0;
while (!done_.load()) {
{
std::unique_lock<std::mutex> lock(cvm_);
cv_.wait_for(lock, std::chrono::seconds(1));
}
auto now = std::chrono::steady_clock::now();
double elapsed =
std::chrono::duration_cast<std::chrono::duration<double> >(
now - tlast).count();
uint64_t w = ::stats.written.load();
uint64_t r = ::stats.read.load();
fprintf(stderr,
"%s elapsed %4lds | written %10ld | w/s %10.0f | read %10ld | "
"r/s %10.0f | cache misses %10ld\n",
db_->GetEnv()->TimeToString(time(nullptr)).c_str(),
std::chrono::duration_cast<std::chrono::seconds>(now - tstart)
.count(),
w, (w - wlast) / elapsed, r, (r - rlast) / elapsed,
::stats.cache_misses.load());
wlast = w;
rlast = r;
tlast = now;
}
}
~StatsThread() {
{
std::lock_guard<std::mutex> guard(cvm_);
done_.store(true);
}
cv_.notify_all();
thread_.join();
}
private:
rocksdb::DB* db_;
std::mutex cvm_;
std::condition_variable cv_;
std::thread thread_;
std::atomic<bool> done_{false};
};
int main(int argc, char** argv) {
GFLAGS::ParseCommandLineFlags(&argc, &argv, true);
std::mt19937 rng{std::random_device()()};
rocksdb::Status status;
std::string path = rocksdb::test::TmpDir() + "/forward_iterator_test";
fprintf(stderr, "db path is %s\n", path.c_str());
rocksdb::Options options;
options.create_if_missing = true;
options.compression = rocksdb::CompressionType::kNoCompression;
options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleNone;
options.level0_slowdown_writes_trigger = 99999;
options.level0_stop_writes_trigger = 99999;
options.allow_os_buffer = false;
options.write_buffer_size = FLAGS_memtable_size;
rocksdb::BlockBasedTableOptions table_options;
table_options.block_cache = rocksdb::NewLRUCache(FLAGS_block_cache_size);
table_options.block_size = FLAGS_block_size;
options.table_factory.reset(
rocksdb::NewBlockBasedTableFactory(table_options));
status = rocksdb::DestroyDB(path, options);
assert(status.ok());
rocksdb::DB* db_raw;
status = rocksdb::DB::Open(options, path, &db_raw);
assert(status.ok());
std::unique_ptr<rocksdb::DB> db(db_raw);
std::vector<ShardState> shard_states(FLAGS_shards + 1);
std::deque<Reader> readers;
while (static_cast<int>(readers.size()) < FLAGS_readers) {
readers.emplace_back(&shard_states, db_raw);
}
std::deque<Writer> writers;
while (static_cast<int>(writers.size()) < FLAGS_writers) {
writers.emplace_back(&shard_states, db_raw);
}
// Each shard gets a random reader and random writer assigned to it
for (int i = 1; i <= FLAGS_shards; ++i) {
std::uniform_int_distribution<int> reader_dist(0, FLAGS_readers - 1);
std::uniform_int_distribution<int> writer_dist(0, FLAGS_writers - 1);
shard_states[i].reader = &readers[reader_dist(rng)];
shard_states[i].writer = &writers[writer_dist(rng)];
}
StatsThread stats_thread(db_raw);
for (Writer& w : writers) {
w.start();
}
writers.clear();
readers.clear();
}
#endif // ROCKSDB_LITE
#endif // GFLAGS

@ -190,6 +190,7 @@ TEST_BENCH_SOURCES = \
db/db_compaction_filter_test.cc \
db/db_compaction_test.cc \
db/db_dynamic_level_test.cc \
db/forward_iterator_bench.cc \
db/db_inplace_update_test.cc \
db/db_log_iter_test.cc \
db/db_universal_compaction_test.cc \

Loading…
Cancel
Save