diff --git a/CMakeLists.txt b/CMakeLists.txt index 9cf1bbaa3..2a05192da 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -285,6 +285,7 @@ target_link_libraries(rocksdb${ARTIFACT_SUFFIX} ${LIBS}) set(APPS db/db_bench.cc + db/forward_iterator_bench.cc db/memtablerep_bench.cc table/table_reader_bench.cc tools/db_stress.cc diff --git a/Makefile b/Makefile index 02ffd5653..0fd1dae7f 100644 --- a/Makefile +++ b/Makefile @@ -337,7 +337,7 @@ TOOLS = \ rocksdb_dump \ rocksdb_undump -BENCHMARKS = db_bench table_reader_bench cache_bench memtablerep_bench +BENCHMARKS = db_bench table_reader_bench cache_bench memtablerep_bench forward_iterator_bench # if user didn't config LIBNAME, set the default ifeq ($(LIBNAME),) @@ -978,6 +978,9 @@ ldb_cmd_test: tools/ldb_cmd_test.o $(LIBOBJECTS) $(TESTHARNESS) ldb: tools/ldb.o $(LIBOBJECTS) $(AM_LINK) +forward_iterator_bench: db/forward_iterator_bench.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + #------------------------------------------------- # make install related stuff INSTALL_PATH ?= /usr/local diff --git a/db/db_tailing_iter_test.cc b/db/db_tailing_iter_test.cc index 3be5e953c..d83c5eec3 100644 --- a/db/db_tailing_iter_test.cc +++ b/db/db_tailing_iter_test.cc @@ -140,6 +140,8 @@ TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) { std::unique_ptr iterh(db_->NewIterator(read_options, handles_[1])); std::string value(1024, 'a'); bool file_iters_deleted = false; + bool file_iters_renewed_null = false; + bool file_iters_renewed_copy = false; rocksdb::SyncPoint::GetInstance()->SetCallBack( "ForwardIterator::SeekInternal:Return", [&](void* arg) { ForwardIterator* fiter = reinterpret_cast(arg); @@ -152,6 +154,12 @@ TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) { ASSERT_TRUE(!file_iters_deleted || fiter->TEST_CheckDeletedIters(&deleted_iters, &num_iters)); }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "ForwardIterator::RenewIterators:Null", + [&](void* arg) { file_iters_renewed_null = true; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "ForwardIterator::RenewIterators:Copy", + [&](void* arg) { file_iters_renewed_copy = true; }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); const int num_records = 1000; for (int i = 1; i < num_records; ++i) { @@ -203,6 +211,8 @@ TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) { ASSERT_LE(num_iters, 1); file_iters_deleted = false; } + ASSERT_TRUE(file_iters_renewed_null); + ASSERT_TRUE(file_iters_renewed_copy); iter = 0; itern = 0; iterh = 0; diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 2d68368ea..b6437c837 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -144,6 +144,23 @@ ForwardIterator::~ForwardIterator() { Cleanup(true); } +void ForwardIterator::SVCleanup() { + if (sv_ != nullptr && sv_->Unref()) { + // Job id == 0 means that this is not our background process, but rather + // user thread + JobContext job_context(0); + db_->mutex_.Lock(); + sv_->Cleanup(); + db_->FindObsoleteFiles(&job_context, false, true); + db_->mutex_.Unlock(); + delete sv_; + if (job_context.HaveSomethingToDelete()) { + db_->PurgeObsoleteFiles(job_context); + } + job_context.Clean(); + } +} + void ForwardIterator::Cleanup(bool release_sv) { if (mutable_iter_ != nullptr) { mutable_iter_->~InternalIterator(); @@ -162,20 +179,7 @@ void ForwardIterator::Cleanup(bool release_sv) { level_iters_.clear(); if (release_sv) { - if (sv_ != nullptr && sv_->Unref()) { - // Job id == 0 means that this is not our background process, but rather - // user thread - JobContext job_context(0); - db_->mutex_.Lock(); - sv_->Cleanup(); - db_->FindObsoleteFiles(&job_context, false, true); - db_->mutex_.Unlock(); - delete sv_; - if (job_context.HaveSomethingToDelete()) { - db_->PurgeObsoleteFiles(job_context); - } - job_context.Clean(); - } + SVCleanup(); } } @@ -185,9 +189,10 @@ bool ForwardIterator::Valid() const { } void ForwardIterator::SeekToFirst() { - if (sv_ == nullptr || - sv_ ->version_number != cfd_->GetSuperVersionNumber()) { + if (sv_ == nullptr) { RebuildIterators(true); + } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) { + RenewIterators(); } else if (immutable_status_.IsIncomplete()) { ResetIncompleteIterators(); } @@ -205,9 +210,10 @@ void ForwardIterator::Seek(const Slice& internal_key) { if (IsOverUpperBound(internal_key)) { valid_ = false; } - if (sv_ == nullptr || - sv_ ->version_number != cfd_->GetSuperVersionNumber()) { + if (sv_ == nullptr) { RebuildIterators(true); + } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) { + RenewIterators(); } else if (immutable_status_.IsIncomplete()) { ResetIncompleteIterators(); } @@ -227,7 +233,9 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, // an option to turn it off. if (seek_to_first || NeedToSeekImmutable(internal_key)) { immutable_status_ = Status::OK(); - if (has_iter_trimmed_for_upper_bound_) { + if ((has_iter_trimmed_for_upper_bound_) && + (cfd_->internal_comparator().InternalKeyComparator::Compare( + prev_key_.GetKey(), internal_key) > 0)) { // Some iterators are trimmed. Need to rebuild. RebuildIterators(true); // Already seeked mutable iter, so seek again @@ -393,7 +401,11 @@ void ForwardIterator::Next() { std::string current_key = key().ToString(); Slice old_key(current_key.data(), current_key.size()); - RebuildIterators(true); + if (sv_ == nullptr) { + RebuildIterators(true); + } else { + RenewIterators(); + } SeekInternal(old_key, false); if (!valid_ || key().compare(old_key) != 0) { return; @@ -485,26 +497,93 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd)); } level_iters_.reserve(vstorage->num_levels() - 1); + BuildLevelIterators(vstorage); + current_ = nullptr; + is_prev_set_ = false; +} + +void ForwardIterator::RenewIterators() { + SuperVersion* svnew; + assert(sv_); + svnew = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); + + if (mutable_iter_ != nullptr) { + mutable_iter_->~InternalIterator(); + } + for (auto* m : imm_iters_) { + m->~InternalIterator(); + } + imm_iters_.clear(); + + mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_); + svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_); + + const auto* vstorage = sv_->current->storage_info(); + const auto& l0_files = vstorage->LevelFiles(0); + const auto* vstorage_new = svnew->current->storage_info(); + const auto& l0_files_new = vstorage_new->LevelFiles(0); + uint32_t iold, inew; + bool found; + std::vector l0_iters_new; + l0_iters_new.reserve(l0_files_new.size()); + + for (inew = 0; inew < l0_files_new.size(); inew++) { + found = false; + for (iold = 0; iold < l0_files.size(); iold++) { + if (l0_files[iold] == l0_files_new[inew]) { + found = true; + break; + } + } + if (found) { + if (l0_iters_[iold] == nullptr) { + l0_iters_new.push_back(nullptr); + TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Null", this); + } else { + l0_iters_new.push_back(l0_iters_[iold]); + l0_iters_[iold] = nullptr; + TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Copy", this); + } + continue; + } + l0_iters_new.push_back(cfd_->table_cache()->NewIterator( + read_options_, *cfd_->soptions(), cfd_->internal_comparator(), + l0_files_new[inew]->fd)); + } + + for (auto* f : l0_iters_) { + delete f; + } + l0_iters_.clear(); + l0_iters_ = l0_iters_new; + + for (auto* l : level_iters_) { + delete l; + } + BuildLevelIterators(vstorage_new); + current_ = nullptr; + is_prev_set_ = false; + SVCleanup(); + sv_ = svnew; +} + +void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) { for (int32_t level = 1; level < vstorage->num_levels(); ++level) { const auto& level_files = vstorage->LevelFiles(level); - if ((level_files.empty()) || ((read_options_.iterate_upper_bound != nullptr) && (user_comparator_->Compare(*read_options_.iterate_upper_bound, level_files[0]->smallest.user_key()) < 0))) { - level_iters_.push_back(nullptr); + level_iters_[level - 1] = nullptr; if (!level_files.empty()) { has_iter_trimmed_for_upper_bound_ = true; } } else { - level_iters_.push_back( - new LevelIterator(cfd_, read_options_, level_files)); + level_iters_[level - 1] = + new LevelIterator(cfd_, read_options_, level_files); } } - - current_ = nullptr; - is_prev_set_ = false; } void ForwardIterator::ResetIncompleteIterators() { diff --git a/db/forward_iterator.h b/db/forward_iterator.h index a159a6101..1c4d4975e 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -24,6 +24,7 @@ class Env; struct SuperVersion; class ColumnFamilyData; class LevelIterator; +class VersionStorageInfo; struct FileMetaData; class MinIterComparator { @@ -74,7 +75,10 @@ class ForwardIterator : public InternalIterator { private: void Cleanup(bool release_sv); + void SVCleanup(); void RebuildIterators(bool refresh_sv); + void RenewIterators(); + void BuildLevelIterators(const VersionStorageInfo* vstorage); void ResetIncompleteIterators(); void SeekInternal(const Slice& internal_key, bool seek_to_first); void UpdateCurrent(); diff --git a/db/forward_iterator_bench.cc b/db/forward_iterator_bench.cc new file mode 100644 index 000000000..6d352d077 --- /dev/null +++ b/db/forward_iterator_bench.cc @@ -0,0 +1,375 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#ifndef GFLAGS +#include +int main() { + fprintf(stderr, "Please install gflags to run rocksdb tools\n"); + return 1; +} +#else + +#ifndef ROCKSDB_LITE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rocksdb/cache.h" +#include "rocksdb/db.h" +#include "rocksdb/status.h" +#include "rocksdb/table.h" +#include "util/testharness.h" + +const int MAX_SHARDS = 100000; + +DEFINE_int64(writers, 8, ""); +DEFINE_int64(readers, 8, ""); +DEFINE_int64(rate, 100000, ""); +DEFINE_int64(value_size, 300, ""); +DEFINE_int64(shards, 1000, ""); +DEFINE_int64(memtable_size, 500000000, ""); +DEFINE_int64(block_cache_size, 300000000, ""); +DEFINE_int64(block_size, 65536, ""); +DEFINE_double(runtime, 300.0, ""); +DEFINE_bool(cache_only_first, true, ""); +DEFINE_bool(iterate_upper_bound, true, ""); + +struct Stats { + char pad1[128]; + std::atomic written{0}; + char pad2[128]; + std::atomic read{0}; + std::atomic cache_misses{0}; + char pad3[128]; +} stats; + +struct Key { + Key() {} + Key(uint64_t shard_in, uint64_t seqno_in) + : shard_be(htobe64(shard_in)), seqno_be(htobe64(seqno_in)) {} + + uint64_t shard() const { return be64toh(shard_be); } + uint64_t seqno() const { return be64toh(seqno_be); } + + private: + uint64_t shard_be; + uint64_t seqno_be; +} __attribute__((__packed__)); + +struct Reader; +struct Writer; + +struct ShardState { + char pad1[128]; + std::atomic last_written{0}; + Writer* writer; + Reader* reader; + char pad2[128]; + std::atomic last_read{0}; + std::unique_ptr it; + std::unique_ptr it_cacheonly; + Key upper_bound; + rocksdb::Slice upper_bound_slice; + char pad3[128]; +}; + +struct Reader { + public: + explicit Reader(std::vector* shard_states, rocksdb::DB* db) + : shard_states_(shard_states), db_(db) { + sem_init(&sem_, 0, 0); + thread_ = std::thread(&Reader::run, this); + } + + void run() { + while (1) { + sem_wait(&sem_); + if (done_.load()) { + break; + } + + uint64_t shard; + { + std::lock_guard guard(queue_mutex_); + assert(!shards_pending_queue_.empty()); + shard = shards_pending_queue_.front(); + shards_pending_queue_.pop(); + shards_pending_set_.reset(shard); + } + readOnceFromShard(shard); + } + } + + void readOnceFromShard(uint64_t shard) { + ShardState& state = (*shard_states_)[shard]; + if (!state.it) { + // Initialize iterators + rocksdb::ReadOptions options; + options.tailing = true; + if (FLAGS_iterate_upper_bound) { + state.upper_bound = Key(shard, std::numeric_limits::max()); + state.upper_bound_slice = rocksdb::Slice( + (const char*)&state.upper_bound, sizeof(state.upper_bound)); + options.iterate_upper_bound = &state.upper_bound_slice; + } + + state.it.reset(db_->NewIterator(options)); + + if (FLAGS_cache_only_first) { + options.read_tier = rocksdb::ReadTier::kBlockCacheTier; + state.it_cacheonly.reset(db_->NewIterator(options)); + } + } + + const uint64_t upto = state.last_written.load(); + for (rocksdb::Iterator* it : {state.it_cacheonly.get(), state.it.get()}) { + if (it == nullptr) { + continue; + } + if (state.last_read.load() >= upto) { + break; + } + bool need_seek = true; + for (uint64_t seq = state.last_read.load() + 1; seq <= upto; ++seq) { + if (need_seek) { + Key from(shard, state.last_read.load() + 1); + it->Seek(rocksdb::Slice((const char*)&from, sizeof(from))); + need_seek = false; + } else { + it->Next(); + } + if (it->status().IsIncomplete()) { + ++::stats.cache_misses; + break; + } + assert(it->Valid()); + assert(it->key().size() == sizeof(Key)); + Key key; + memcpy(&key, it->key().data(), it->key().size()); + // fprintf(stderr, "Expecting (%ld, %ld) read (%ld, %ld)\n", + // shard, seq, key.shard(), key.seqno()); + assert(key.shard() == shard); + assert(key.seqno() == seq); + state.last_read.store(seq); + ++::stats.read; + } + } + } + + void onWrite(uint64_t shard) { + { + std::lock_guard guard(queue_mutex_); + if (!shards_pending_set_.test(shard)) { + shards_pending_queue_.push(shard); + shards_pending_set_.set(shard); + sem_post(&sem_); + } + } + } + + ~Reader() { + done_.store(true); + sem_post(&sem_); + thread_.join(); + } + + private: + char pad1[128]; + std::vector* shard_states_; + rocksdb::DB* db_; + std::thread thread_; + sem_t sem_; + std::mutex queue_mutex_; + std::bitset shards_pending_set_; + std::queue shards_pending_queue_; + std::atomic done_{false}; + char pad2[128]; +}; + +struct Writer { + explicit Writer(std::vector* shard_states, rocksdb::DB* db) + : shard_states_(shard_states), db_(db) {} + + void start() { thread_ = std::thread(&Writer::run, this); } + + void run() { + std::queue workq; + std::chrono::steady_clock::time_point deadline( + std::chrono::steady_clock::now() + + std::chrono::nanoseconds((uint64_t)(1000000000 * FLAGS_runtime))); + std::vector my_shards; + for (int i = 1; i <= FLAGS_shards; ++i) { + if ((*shard_states_)[i].writer == this) { + my_shards.push_back(i); + } + } + + std::mt19937 rng{std::random_device()()}; + std::uniform_int_distribution shard_dist(0, my_shards.size() - 1); + std::string value(FLAGS_value_size, '*'); + + while (1) { + auto now = std::chrono::steady_clock::now(); + if (FLAGS_runtime >= 0 && now >= deadline) { + break; + } + if (workq.empty()) { + for (int i = 0; i < FLAGS_rate; i += FLAGS_writers) { + std::chrono::nanoseconds offset(1000000000LL * i / FLAGS_rate); + workq.push(now + offset); + } + } + while (!workq.empty() && workq.front() < now) { + workq.pop(); + uint64_t shard = my_shards[shard_dist(rng)]; + ShardState& state = (*shard_states_)[shard]; + uint64_t seqno = state.last_written.load() + 1; + Key key(shard, seqno); + // fprintf(stderr, "Writing (%ld, %ld)\n", shard, seqno); + rocksdb::Status status = + db_->Put(rocksdb::WriteOptions(), + rocksdb::Slice((const char*)&key, sizeof(key)), + rocksdb::Slice(value)); + assert(status.ok()); + state.last_written.store(seqno); + state.reader->onWrite(shard); + ++::stats.written; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + // fprintf(stderr, "Writer done\n"); + } + + ~Writer() { thread_.join(); } + + private: + char pad1[128]; + std::vector* shard_states_; + rocksdb::DB* db_; + std::thread thread_; + char pad2[128]; +}; + +struct StatsThread { + explicit StatsThread(rocksdb::DB* db) + : db_(db), thread_(&StatsThread::run, this) {} + + void run() { + // using namespace std::chrono; + auto tstart = std::chrono::steady_clock::now(), tlast = tstart; + uint64_t wlast = 0, rlast = 0; + while (!done_.load()) { + { + std::unique_lock lock(cvm_); + cv_.wait_for(lock, std::chrono::seconds(1)); + } + auto now = std::chrono::steady_clock::now(); + double elapsed = + std::chrono::duration_cast >( + now - tlast).count(); + uint64_t w = ::stats.written.load(); + uint64_t r = ::stats.read.load(); + fprintf(stderr, + "%s elapsed %4lds | written %10ld | w/s %10.0f | read %10ld | " + "r/s %10.0f | cache misses %10ld\n", + db_->GetEnv()->TimeToString(time(nullptr)).c_str(), + std::chrono::duration_cast(now - tstart) + .count(), + w, (w - wlast) / elapsed, r, (r - rlast) / elapsed, + ::stats.cache_misses.load()); + wlast = w; + rlast = r; + tlast = now; + } + } + + ~StatsThread() { + { + std::lock_guard guard(cvm_); + done_.store(true); + } + cv_.notify_all(); + thread_.join(); + } + + private: + rocksdb::DB* db_; + std::mutex cvm_; + std::condition_variable cv_; + std::thread thread_; + std::atomic done_{false}; +}; + +int main(int argc, char** argv) { + GFLAGS::ParseCommandLineFlags(&argc, &argv, true); + + std::mt19937 rng{std::random_device()()}; + rocksdb::Status status; + std::string path = rocksdb::test::TmpDir() + "/forward_iterator_test"; + fprintf(stderr, "db path is %s\n", path.c_str()); + rocksdb::Options options; + options.create_if_missing = true; + options.compression = rocksdb::CompressionType::kNoCompression; + options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleNone; + options.level0_slowdown_writes_trigger = 99999; + options.level0_stop_writes_trigger = 99999; + options.allow_os_buffer = false; + options.write_buffer_size = FLAGS_memtable_size; + rocksdb::BlockBasedTableOptions table_options; + table_options.block_cache = rocksdb::NewLRUCache(FLAGS_block_cache_size); + table_options.block_size = FLAGS_block_size; + options.table_factory.reset( + rocksdb::NewBlockBasedTableFactory(table_options)); + + status = rocksdb::DestroyDB(path, options); + assert(status.ok()); + rocksdb::DB* db_raw; + status = rocksdb::DB::Open(options, path, &db_raw); + assert(status.ok()); + std::unique_ptr db(db_raw); + + std::vector shard_states(FLAGS_shards + 1); + std::deque readers; + while (static_cast(readers.size()) < FLAGS_readers) { + readers.emplace_back(&shard_states, db_raw); + } + std::deque writers; + while (static_cast(writers.size()) < FLAGS_writers) { + writers.emplace_back(&shard_states, db_raw); + } + + // Each shard gets a random reader and random writer assigned to it + for (int i = 1; i <= FLAGS_shards; ++i) { + std::uniform_int_distribution reader_dist(0, FLAGS_readers - 1); + std::uniform_int_distribution writer_dist(0, FLAGS_writers - 1); + shard_states[i].reader = &readers[reader_dist(rng)]; + shard_states[i].writer = &writers[writer_dist(rng)]; + } + + StatsThread stats_thread(db_raw); + for (Writer& w : writers) { + w.start(); + } + + writers.clear(); + readers.clear(); +} + +#endif // ROCKSDB_LITE + +#endif // GFLAGS diff --git a/src.mk b/src.mk index b6cc81cac..d9fd5df6d 100644 --- a/src.mk +++ b/src.mk @@ -190,6 +190,7 @@ TEST_BENCH_SOURCES = \ db/db_compaction_filter_test.cc \ db/db_compaction_test.cc \ db/db_dynamic_level_test.cc \ + db/forward_iterator_bench.cc \ db/db_inplace_update_test.cc \ db/db_log_iter_test.cc \ db/db_universal_compaction_test.cc \