From 7824444bfc0140b4dbaf9cabe489f919e0430e93 Mon Sep 17 00:00:00 2001 From: Venkatesh Radhakrishnan Date: Fri, 13 Nov 2015 15:50:59 -0800 Subject: [PATCH] Reuse file iterators in tailing iterator when memtable is flushed Summary: Under a tailing workload, there were increased block cache misses when a memtable was flushed because we were rebuilding iterators in that case since the version set changed. This was exacerbated in the case of iterate_upper_bound, since file iterators which were over the iterate_upper_bound would have been deleted and are now brought back as part of the Rebuild, only to be deleted again. We now renew the iterators and only build iterators for files which are added and delete file iterators for files which are deleted. Refer to https://reviews.facebook.net/D50463 for previous version Test Plan: DBTestTailingIterator.TailingIteratorTrimSeekToNext Reviewers: anthony, IslamAbdelRahman, igor, tnovak, yhchiang, sdong Reviewed By: sdong Subscribers: yhchiang, march, dhruba, leveldb, lovro Differential Revision: https://reviews.facebook.net/D50679 --- CMakeLists.txt | 1 + Makefile | 5 +- db/db_tailing_iter_test.cc | 10 + db/forward_iterator.cc | 133 ++++++++++--- db/forward_iterator.h | 4 + db/forward_iterator_bench.cc | 375 +++++++++++++++++++++++++++++++++++ src.mk | 1 + 7 files changed, 501 insertions(+), 28 deletions(-) create mode 100644 db/forward_iterator_bench.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 9cf1bbaa3..2a05192da 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -285,6 +285,7 @@ target_link_libraries(rocksdb${ARTIFACT_SUFFIX} ${LIBS}) set(APPS db/db_bench.cc + db/forward_iterator_bench.cc db/memtablerep_bench.cc table/table_reader_bench.cc tools/db_stress.cc diff --git a/Makefile b/Makefile index 02ffd5653..0fd1dae7f 100644 --- a/Makefile +++ b/Makefile @@ -337,7 +337,7 @@ TOOLS = \ rocksdb_dump \ rocksdb_undump -BENCHMARKS = db_bench table_reader_bench cache_bench memtablerep_bench +BENCHMARKS = db_bench table_reader_bench cache_bench memtablerep_bench forward_iterator_bench # if user didn't config LIBNAME, set the default ifeq ($(LIBNAME),) @@ -978,6 +978,9 @@ ldb_cmd_test: tools/ldb_cmd_test.o $(LIBOBJECTS) $(TESTHARNESS) ldb: tools/ldb.o $(LIBOBJECTS) $(AM_LINK) +forward_iterator_bench: db/forward_iterator_bench.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + #------------------------------------------------- # make install related stuff INSTALL_PATH ?= /usr/local diff --git a/db/db_tailing_iter_test.cc b/db/db_tailing_iter_test.cc index 3be5e953c..d83c5eec3 100644 --- a/db/db_tailing_iter_test.cc +++ b/db/db_tailing_iter_test.cc @@ -140,6 +140,8 @@ TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) { std::unique_ptr iterh(db_->NewIterator(read_options, handles_[1])); std::string value(1024, 'a'); bool file_iters_deleted = false; + bool file_iters_renewed_null = false; + bool file_iters_renewed_copy = false; rocksdb::SyncPoint::GetInstance()->SetCallBack( "ForwardIterator::SeekInternal:Return", [&](void* arg) { ForwardIterator* fiter = reinterpret_cast(arg); @@ -152,6 +154,12 @@ TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) { ASSERT_TRUE(!file_iters_deleted || fiter->TEST_CheckDeletedIters(&deleted_iters, &num_iters)); }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "ForwardIterator::RenewIterators:Null", + [&](void* arg) { file_iters_renewed_null = true; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "ForwardIterator::RenewIterators:Copy", + [&](void* arg) { file_iters_renewed_copy = true; }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); const int num_records = 1000; for (int i = 1; i < num_records; ++i) { @@ -203,6 +211,8 @@ TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) { ASSERT_LE(num_iters, 1); file_iters_deleted = false; } + ASSERT_TRUE(file_iters_renewed_null); + ASSERT_TRUE(file_iters_renewed_copy); iter = 0; itern = 0; iterh = 0; diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 2d68368ea..b6437c837 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -144,6 +144,23 @@ ForwardIterator::~ForwardIterator() { Cleanup(true); } +void ForwardIterator::SVCleanup() { + if (sv_ != nullptr && sv_->Unref()) { + // Job id == 0 means that this is not our background process, but rather + // user thread + JobContext job_context(0); + db_->mutex_.Lock(); + sv_->Cleanup(); + db_->FindObsoleteFiles(&job_context, false, true); + db_->mutex_.Unlock(); + delete sv_; + if (job_context.HaveSomethingToDelete()) { + db_->PurgeObsoleteFiles(job_context); + } + job_context.Clean(); + } +} + void ForwardIterator::Cleanup(bool release_sv) { if (mutable_iter_ != nullptr) { mutable_iter_->~InternalIterator(); @@ -162,20 +179,7 @@ void ForwardIterator::Cleanup(bool release_sv) { level_iters_.clear(); if (release_sv) { - if (sv_ != nullptr && sv_->Unref()) { - // Job id == 0 means that this is not our background process, but rather - // user thread - JobContext job_context(0); - db_->mutex_.Lock(); - sv_->Cleanup(); - db_->FindObsoleteFiles(&job_context, false, true); - db_->mutex_.Unlock(); - delete sv_; - if (job_context.HaveSomethingToDelete()) { - db_->PurgeObsoleteFiles(job_context); - } - job_context.Clean(); - } + SVCleanup(); } } @@ -185,9 +189,10 @@ bool ForwardIterator::Valid() const { } void ForwardIterator::SeekToFirst() { - if (sv_ == nullptr || - sv_ ->version_number != cfd_->GetSuperVersionNumber()) { + if (sv_ == nullptr) { RebuildIterators(true); + } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) { + RenewIterators(); } else if (immutable_status_.IsIncomplete()) { ResetIncompleteIterators(); } @@ -205,9 +210,10 @@ void ForwardIterator::Seek(const Slice& internal_key) { if (IsOverUpperBound(internal_key)) { valid_ = false; } - if (sv_ == nullptr || - sv_ ->version_number != cfd_->GetSuperVersionNumber()) { + if (sv_ == nullptr) { RebuildIterators(true); + } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) { + RenewIterators(); } else if (immutable_status_.IsIncomplete()) { ResetIncompleteIterators(); } @@ -227,7 +233,9 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, // an option to turn it off. if (seek_to_first || NeedToSeekImmutable(internal_key)) { immutable_status_ = Status::OK(); - if (has_iter_trimmed_for_upper_bound_) { + if ((has_iter_trimmed_for_upper_bound_) && + (cfd_->internal_comparator().InternalKeyComparator::Compare( + prev_key_.GetKey(), internal_key) > 0)) { // Some iterators are trimmed. Need to rebuild. RebuildIterators(true); // Already seeked mutable iter, so seek again @@ -393,7 +401,11 @@ void ForwardIterator::Next() { std::string current_key = key().ToString(); Slice old_key(current_key.data(), current_key.size()); - RebuildIterators(true); + if (sv_ == nullptr) { + RebuildIterators(true); + } else { + RenewIterators(); + } SeekInternal(old_key, false); if (!valid_ || key().compare(old_key) != 0) { return; @@ -485,26 +497,93 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd)); } level_iters_.reserve(vstorage->num_levels() - 1); + BuildLevelIterators(vstorage); + current_ = nullptr; + is_prev_set_ = false; +} + +void ForwardIterator::RenewIterators() { + SuperVersion* svnew; + assert(sv_); + svnew = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); + + if (mutable_iter_ != nullptr) { + mutable_iter_->~InternalIterator(); + } + for (auto* m : imm_iters_) { + m->~InternalIterator(); + } + imm_iters_.clear(); + + mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_); + svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_); + + const auto* vstorage = sv_->current->storage_info(); + const auto& l0_files = vstorage->LevelFiles(0); + const auto* vstorage_new = svnew->current->storage_info(); + const auto& l0_files_new = vstorage_new->LevelFiles(0); + uint32_t iold, inew; + bool found; + std::vector l0_iters_new; + l0_iters_new.reserve(l0_files_new.size()); + + for (inew = 0; inew < l0_files_new.size(); inew++) { + found = false; + for (iold = 0; iold < l0_files.size(); iold++) { + if (l0_files[iold] == l0_files_new[inew]) { + found = true; + break; + } + } + if (found) { + if (l0_iters_[iold] == nullptr) { + l0_iters_new.push_back(nullptr); + TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Null", this); + } else { + l0_iters_new.push_back(l0_iters_[iold]); + l0_iters_[iold] = nullptr; + TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Copy", this); + } + continue; + } + l0_iters_new.push_back(cfd_->table_cache()->NewIterator( + read_options_, *cfd_->soptions(), cfd_->internal_comparator(), + l0_files_new[inew]->fd)); + } + + for (auto* f : l0_iters_) { + delete f; + } + l0_iters_.clear(); + l0_iters_ = l0_iters_new; + + for (auto* l : level_iters_) { + delete l; + } + BuildLevelIterators(vstorage_new); + current_ = nullptr; + is_prev_set_ = false; + SVCleanup(); + sv_ = svnew; +} + +void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) { for (int32_t level = 1; level < vstorage->num_levels(); ++level) { const auto& level_files = vstorage->LevelFiles(level); - if ((level_files.empty()) || ((read_options_.iterate_upper_bound != nullptr) && (user_comparator_->Compare(*read_options_.iterate_upper_bound, level_files[0]->smallest.user_key()) < 0))) { - level_iters_.push_back(nullptr); + level_iters_[level - 1] = nullptr; if (!level_files.empty()) { has_iter_trimmed_for_upper_bound_ = true; } } else { - level_iters_.push_back( - new LevelIterator(cfd_, read_options_, level_files)); + level_iters_[level - 1] = + new LevelIterator(cfd_, read_options_, level_files); } } - - current_ = nullptr; - is_prev_set_ = false; } void ForwardIterator::ResetIncompleteIterators() { diff --git a/db/forward_iterator.h b/db/forward_iterator.h index a159a6101..1c4d4975e 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -24,6 +24,7 @@ class Env; struct SuperVersion; class ColumnFamilyData; class LevelIterator; +class VersionStorageInfo; struct FileMetaData; class MinIterComparator { @@ -74,7 +75,10 @@ class ForwardIterator : public InternalIterator { private: void Cleanup(bool release_sv); + void SVCleanup(); void RebuildIterators(bool refresh_sv); + void RenewIterators(); + void BuildLevelIterators(const VersionStorageInfo* vstorage); void ResetIncompleteIterators(); void SeekInternal(const Slice& internal_key, bool seek_to_first); void UpdateCurrent(); diff --git a/db/forward_iterator_bench.cc b/db/forward_iterator_bench.cc new file mode 100644 index 000000000..6d352d077 --- /dev/null +++ b/db/forward_iterator_bench.cc @@ -0,0 +1,375 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#ifndef GFLAGS +#include +int main() { + fprintf(stderr, "Please install gflags to run rocksdb tools\n"); + return 1; +} +#else + +#ifndef ROCKSDB_LITE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rocksdb/cache.h" +#include "rocksdb/db.h" +#include "rocksdb/status.h" +#include "rocksdb/table.h" +#include "util/testharness.h" + +const int MAX_SHARDS = 100000; + +DEFINE_int64(writers, 8, ""); +DEFINE_int64(readers, 8, ""); +DEFINE_int64(rate, 100000, ""); +DEFINE_int64(value_size, 300, ""); +DEFINE_int64(shards, 1000, ""); +DEFINE_int64(memtable_size, 500000000, ""); +DEFINE_int64(block_cache_size, 300000000, ""); +DEFINE_int64(block_size, 65536, ""); +DEFINE_double(runtime, 300.0, ""); +DEFINE_bool(cache_only_first, true, ""); +DEFINE_bool(iterate_upper_bound, true, ""); + +struct Stats { + char pad1[128]; + std::atomic written{0}; + char pad2[128]; + std::atomic read{0}; + std::atomic cache_misses{0}; + char pad3[128]; +} stats; + +struct Key { + Key() {} + Key(uint64_t shard_in, uint64_t seqno_in) + : shard_be(htobe64(shard_in)), seqno_be(htobe64(seqno_in)) {} + + uint64_t shard() const { return be64toh(shard_be); } + uint64_t seqno() const { return be64toh(seqno_be); } + + private: + uint64_t shard_be; + uint64_t seqno_be; +} __attribute__((__packed__)); + +struct Reader; +struct Writer; + +struct ShardState { + char pad1[128]; + std::atomic last_written{0}; + Writer* writer; + Reader* reader; + char pad2[128]; + std::atomic last_read{0}; + std::unique_ptr it; + std::unique_ptr it_cacheonly; + Key upper_bound; + rocksdb::Slice upper_bound_slice; + char pad3[128]; +}; + +struct Reader { + public: + explicit Reader(std::vector* shard_states, rocksdb::DB* db) + : shard_states_(shard_states), db_(db) { + sem_init(&sem_, 0, 0); + thread_ = std::thread(&Reader::run, this); + } + + void run() { + while (1) { + sem_wait(&sem_); + if (done_.load()) { + break; + } + + uint64_t shard; + { + std::lock_guard guard(queue_mutex_); + assert(!shards_pending_queue_.empty()); + shard = shards_pending_queue_.front(); + shards_pending_queue_.pop(); + shards_pending_set_.reset(shard); + } + readOnceFromShard(shard); + } + } + + void readOnceFromShard(uint64_t shard) { + ShardState& state = (*shard_states_)[shard]; + if (!state.it) { + // Initialize iterators + rocksdb::ReadOptions options; + options.tailing = true; + if (FLAGS_iterate_upper_bound) { + state.upper_bound = Key(shard, std::numeric_limits::max()); + state.upper_bound_slice = rocksdb::Slice( + (const char*)&state.upper_bound, sizeof(state.upper_bound)); + options.iterate_upper_bound = &state.upper_bound_slice; + } + + state.it.reset(db_->NewIterator(options)); + + if (FLAGS_cache_only_first) { + options.read_tier = rocksdb::ReadTier::kBlockCacheTier; + state.it_cacheonly.reset(db_->NewIterator(options)); + } + } + + const uint64_t upto = state.last_written.load(); + for (rocksdb::Iterator* it : {state.it_cacheonly.get(), state.it.get()}) { + if (it == nullptr) { + continue; + } + if (state.last_read.load() >= upto) { + break; + } + bool need_seek = true; + for (uint64_t seq = state.last_read.load() + 1; seq <= upto; ++seq) { + if (need_seek) { + Key from(shard, state.last_read.load() + 1); + it->Seek(rocksdb::Slice((const char*)&from, sizeof(from))); + need_seek = false; + } else { + it->Next(); + } + if (it->status().IsIncomplete()) { + ++::stats.cache_misses; + break; + } + assert(it->Valid()); + assert(it->key().size() == sizeof(Key)); + Key key; + memcpy(&key, it->key().data(), it->key().size()); + // fprintf(stderr, "Expecting (%ld, %ld) read (%ld, %ld)\n", + // shard, seq, key.shard(), key.seqno()); + assert(key.shard() == shard); + assert(key.seqno() == seq); + state.last_read.store(seq); + ++::stats.read; + } + } + } + + void onWrite(uint64_t shard) { + { + std::lock_guard guard(queue_mutex_); + if (!shards_pending_set_.test(shard)) { + shards_pending_queue_.push(shard); + shards_pending_set_.set(shard); + sem_post(&sem_); + } + } + } + + ~Reader() { + done_.store(true); + sem_post(&sem_); + thread_.join(); + } + + private: + char pad1[128]; + std::vector* shard_states_; + rocksdb::DB* db_; + std::thread thread_; + sem_t sem_; + std::mutex queue_mutex_; + std::bitset shards_pending_set_; + std::queue shards_pending_queue_; + std::atomic done_{false}; + char pad2[128]; +}; + +struct Writer { + explicit Writer(std::vector* shard_states, rocksdb::DB* db) + : shard_states_(shard_states), db_(db) {} + + void start() { thread_ = std::thread(&Writer::run, this); } + + void run() { + std::queue workq; + std::chrono::steady_clock::time_point deadline( + std::chrono::steady_clock::now() + + std::chrono::nanoseconds((uint64_t)(1000000000 * FLAGS_runtime))); + std::vector my_shards; + for (int i = 1; i <= FLAGS_shards; ++i) { + if ((*shard_states_)[i].writer == this) { + my_shards.push_back(i); + } + } + + std::mt19937 rng{std::random_device()()}; + std::uniform_int_distribution shard_dist(0, my_shards.size() - 1); + std::string value(FLAGS_value_size, '*'); + + while (1) { + auto now = std::chrono::steady_clock::now(); + if (FLAGS_runtime >= 0 && now >= deadline) { + break; + } + if (workq.empty()) { + for (int i = 0; i < FLAGS_rate; i += FLAGS_writers) { + std::chrono::nanoseconds offset(1000000000LL * i / FLAGS_rate); + workq.push(now + offset); + } + } + while (!workq.empty() && workq.front() < now) { + workq.pop(); + uint64_t shard = my_shards[shard_dist(rng)]; + ShardState& state = (*shard_states_)[shard]; + uint64_t seqno = state.last_written.load() + 1; + Key key(shard, seqno); + // fprintf(stderr, "Writing (%ld, %ld)\n", shard, seqno); + rocksdb::Status status = + db_->Put(rocksdb::WriteOptions(), + rocksdb::Slice((const char*)&key, sizeof(key)), + rocksdb::Slice(value)); + assert(status.ok()); + state.last_written.store(seqno); + state.reader->onWrite(shard); + ++::stats.written; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + // fprintf(stderr, "Writer done\n"); + } + + ~Writer() { thread_.join(); } + + private: + char pad1[128]; + std::vector* shard_states_; + rocksdb::DB* db_; + std::thread thread_; + char pad2[128]; +}; + +struct StatsThread { + explicit StatsThread(rocksdb::DB* db) + : db_(db), thread_(&StatsThread::run, this) {} + + void run() { + // using namespace std::chrono; + auto tstart = std::chrono::steady_clock::now(), tlast = tstart; + uint64_t wlast = 0, rlast = 0; + while (!done_.load()) { + { + std::unique_lock lock(cvm_); + cv_.wait_for(lock, std::chrono::seconds(1)); + } + auto now = std::chrono::steady_clock::now(); + double elapsed = + std::chrono::duration_cast >( + now - tlast).count(); + uint64_t w = ::stats.written.load(); + uint64_t r = ::stats.read.load(); + fprintf(stderr, + "%s elapsed %4lds | written %10ld | w/s %10.0f | read %10ld | " + "r/s %10.0f | cache misses %10ld\n", + db_->GetEnv()->TimeToString(time(nullptr)).c_str(), + std::chrono::duration_cast(now - tstart) + .count(), + w, (w - wlast) / elapsed, r, (r - rlast) / elapsed, + ::stats.cache_misses.load()); + wlast = w; + rlast = r; + tlast = now; + } + } + + ~StatsThread() { + { + std::lock_guard guard(cvm_); + done_.store(true); + } + cv_.notify_all(); + thread_.join(); + } + + private: + rocksdb::DB* db_; + std::mutex cvm_; + std::condition_variable cv_; + std::thread thread_; + std::atomic done_{false}; +}; + +int main(int argc, char** argv) { + GFLAGS::ParseCommandLineFlags(&argc, &argv, true); + + std::mt19937 rng{std::random_device()()}; + rocksdb::Status status; + std::string path = rocksdb::test::TmpDir() + "/forward_iterator_test"; + fprintf(stderr, "db path is %s\n", path.c_str()); + rocksdb::Options options; + options.create_if_missing = true; + options.compression = rocksdb::CompressionType::kNoCompression; + options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleNone; + options.level0_slowdown_writes_trigger = 99999; + options.level0_stop_writes_trigger = 99999; + options.allow_os_buffer = false; + options.write_buffer_size = FLAGS_memtable_size; + rocksdb::BlockBasedTableOptions table_options; + table_options.block_cache = rocksdb::NewLRUCache(FLAGS_block_cache_size); + table_options.block_size = FLAGS_block_size; + options.table_factory.reset( + rocksdb::NewBlockBasedTableFactory(table_options)); + + status = rocksdb::DestroyDB(path, options); + assert(status.ok()); + rocksdb::DB* db_raw; + status = rocksdb::DB::Open(options, path, &db_raw); + assert(status.ok()); + std::unique_ptr db(db_raw); + + std::vector shard_states(FLAGS_shards + 1); + std::deque readers; + while (static_cast(readers.size()) < FLAGS_readers) { + readers.emplace_back(&shard_states, db_raw); + } + std::deque writers; + while (static_cast(writers.size()) < FLAGS_writers) { + writers.emplace_back(&shard_states, db_raw); + } + + // Each shard gets a random reader and random writer assigned to it + for (int i = 1; i <= FLAGS_shards; ++i) { + std::uniform_int_distribution reader_dist(0, FLAGS_readers - 1); + std::uniform_int_distribution writer_dist(0, FLAGS_writers - 1); + shard_states[i].reader = &readers[reader_dist(rng)]; + shard_states[i].writer = &writers[writer_dist(rng)]; + } + + StatsThread stats_thread(db_raw); + for (Writer& w : writers) { + w.start(); + } + + writers.clear(); + readers.clear(); +} + +#endif // ROCKSDB_LITE + +#endif // GFLAGS diff --git a/src.mk b/src.mk index b6cc81cac..d9fd5df6d 100644 --- a/src.mk +++ b/src.mk @@ -190,6 +190,7 @@ TEST_BENCH_SOURCES = \ db/db_compaction_filter_test.cc \ db/db_compaction_test.cc \ db/db_dynamic_level_test.cc \ + db/forward_iterator_bench.cc \ db/db_inplace_update_test.cc \ db/db_log_iter_test.cc \ db/db_universal_compaction_test.cc \