// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #if !defined(GFLAGS) || defined(ROCKSDB_LITE) #include int main() { fprintf(stderr, "Please install gflags to run rocksdb tools\n"); return 1; } #elif defined(OS_MACOSX) || defined(OS_WIN) // Block forward_iterator_bench under MAC and Windows int main() { return 0; } #else #include #include #include #include #include #include #include #include #include #include #include #include "port/port.h" #include "rocksdb/cache.h" #include "rocksdb/db.h" #include "rocksdb/status.h" #include "rocksdb/table.h" #include "test_util/testharness.h" #include "util/gflags_compat.h" const int MAX_SHARDS = 100000; DEFINE_int32(writers, 8, ""); DEFINE_int32(readers, 8, ""); DEFINE_int64(rate, 100000, ""); DEFINE_int64(value_size, 300, ""); DEFINE_int64(shards, 1000, ""); DEFINE_int64(memtable_size, 500000000, ""); DEFINE_int64(block_cache_size, 300000000, ""); DEFINE_int64(block_size, 65536, ""); DEFINE_double(runtime, 300.0, ""); DEFINE_bool(cache_only_first, true, ""); DEFINE_bool(iterate_upper_bound, true, ""); struct Stats { char pad1[128] __attribute__((__unused__)); std::atomic written{0}; char pad2[128] __attribute__((__unused__)); std::atomic read{0}; std::atomic cache_misses{0}; char pad3[128] __attribute__((__unused__)); } stats; struct Key { Key() {} Key(uint64_t shard_in, uint64_t seqno_in) : shard_be(htobe64(shard_in)), seqno_be(htobe64(seqno_in)) {} uint64_t shard() const { return be64toh(shard_be); } uint64_t seqno() const { return be64toh(seqno_be); } private: uint64_t shard_be; uint64_t seqno_be; } __attribute__((__packed__)); struct Reader; struct Writer; struct ShardState { char pad1[128] __attribute__((__unused__)); std::atomic last_written{0}; Writer* writer; Reader* reader; char pad2[128] __attribute__((__unused__)); std::atomic last_read{0}; std::unique_ptr it; std::unique_ptr it_cacheonly; Key upper_bound; ROCKSDB_NAMESPACE::Slice upper_bound_slice; char pad3[128] __attribute__((__unused__)); }; struct Reader { public: explicit Reader(std::vector* shard_states, ROCKSDB_NAMESPACE::DB* db) : shard_states_(shard_states), db_(db) { sem_init(&sem_, 0, 0); thread_ = port::Thread(&Reader::run, this); } void run() { while (1) { sem_wait(&sem_); if (done_.load()) { break; } uint64_t shard; { std::lock_guard guard(queue_mutex_); assert(!shards_pending_queue_.empty()); shard = shards_pending_queue_.front(); shards_pending_queue_.pop(); shards_pending_set_.reset(shard); } readOnceFromShard(shard); } } void readOnceFromShard(uint64_t shard) { ShardState& state = (*shard_states_)[shard]; if (!state.it) { // Initialize iterators ROCKSDB_NAMESPACE::ReadOptions options; options.tailing = true; if (FLAGS_iterate_upper_bound) { state.upper_bound = Key(shard, std::numeric_limits::max()); state.upper_bound_slice = ROCKSDB_NAMESPACE::Slice( (const char*)&state.upper_bound, sizeof(state.upper_bound)); options.iterate_upper_bound = &state.upper_bound_slice; } state.it.reset(db_->NewIterator(options)); if (FLAGS_cache_only_first) { options.read_tier = ROCKSDB_NAMESPACE::ReadTier::kBlockCacheTier; state.it_cacheonly.reset(db_->NewIterator(options)); } } const uint64_t upto = state.last_written.load(); for (ROCKSDB_NAMESPACE::Iterator* it : {state.it_cacheonly.get(), state.it.get()}) { if (it == nullptr) { continue; } if (state.last_read.load() >= upto) { break; } bool need_seek = true; for (uint64_t seq = state.last_read.load() + 1; seq <= upto; ++seq) { if (need_seek) { Key from(shard, state.last_read.load() + 1); it->Seek(ROCKSDB_NAMESPACE::Slice((const char*)&from, sizeof(from))); need_seek = false; } else { it->Next(); } if (it->status().IsIncomplete()) { ++::stats.cache_misses; break; } assert(it->Valid()); assert(it->key().size() == sizeof(Key)); Key key; memcpy(&key, it->key().data(), it->key().size()); // fprintf(stderr, "Expecting (%ld, %ld) read (%ld, %ld)\n", // shard, seq, key.shard(), key.seqno()); assert(key.shard() == shard); assert(key.seqno() == seq); state.last_read.store(seq); ++::stats.read; } } } void onWrite(uint64_t shard) { { std::lock_guard guard(queue_mutex_); if (!shards_pending_set_.test(shard)) { shards_pending_queue_.push(shard); shards_pending_set_.set(shard); sem_post(&sem_); } } } ~Reader() { done_.store(true); sem_post(&sem_); thread_.join(); } private: char pad1[128] __attribute__((__unused__)); std::vector* shard_states_; ROCKSDB_NAMESPACE::DB* db_; ROCKSDB_NAMESPACE::port::Thread thread_; sem_t sem_; std::mutex queue_mutex_; std::bitset shards_pending_set_; std::queue shards_pending_queue_; std::atomic done_{false}; char pad2[128] __attribute__((__unused__)); }; struct Writer { explicit Writer(std::vector* shard_states, ROCKSDB_NAMESPACE::DB* db) : shard_states_(shard_states), db_(db) {} void start() { thread_ = port::Thread(&Writer::run, this); } void run() { std::queue workq; std::chrono::steady_clock::time_point deadline( std::chrono::steady_clock::now() + std::chrono::nanoseconds((uint64_t)(1000000000 * FLAGS_runtime))); std::vector my_shards; for (int i = 1; i <= FLAGS_shards; ++i) { if ((*shard_states_)[i].writer == this) { my_shards.push_back(i); } } std::mt19937 rng{std::random_device()()}; std::uniform_int_distribution shard_dist( 0, static_cast(my_shards.size()) - 1); std::string value(FLAGS_value_size, '*'); while (1) { auto now = std::chrono::steady_clock::now(); if (FLAGS_runtime >= 0 && now >= deadline) { break; } if (workq.empty()) { for (int i = 0; i < FLAGS_rate; i += FLAGS_writers) { std::chrono::nanoseconds offset(1000000000LL * i / FLAGS_rate); workq.push(now + offset); } } while (!workq.empty() && workq.front() < now) { workq.pop(); uint64_t shard = my_shards[shard_dist(rng)]; ShardState& state = (*shard_states_)[shard]; uint64_t seqno = state.last_written.load() + 1; Key key(shard, seqno); // fprintf(stderr, "Writing (%ld, %ld)\n", shard, seqno); ROCKSDB_NAMESPACE::Status status = db_->Put(ROCKSDB_NAMESPACE::WriteOptions(), ROCKSDB_NAMESPACE::Slice((const char*)&key, sizeof(key)), ROCKSDB_NAMESPACE::Slice(value)); assert(status.ok()); state.last_written.store(seqno); state.reader->onWrite(shard); ++::stats.written; } std::this_thread::sleep_for(std::chrono::milliseconds(1)); } // fprintf(stderr, "Writer done\n"); } ~Writer() { thread_.join(); } private: char pad1[128] __attribute__((__unused__)); std::vector* shard_states_; ROCKSDB_NAMESPACE::DB* db_; ROCKSDB_NAMESPACE::port::Thread thread_; char pad2[128] __attribute__((__unused__)); }; struct StatsThread { explicit StatsThread(ROCKSDB_NAMESPACE::DB* db) : db_(db), thread_(&StatsThread::run, this) {} void run() { auto tstart = std::chrono::steady_clock::now(), tlast = tstart; uint64_t wlast = 0, rlast = 0; while (!done_.load()) { { std::unique_lock lock(cvm_); cv_.wait_for(lock, std::chrono::seconds(1)); } auto now = std::chrono::steady_clock::now(); double elapsed = std::chrono::duration_cast >(now - tlast) .count(); uint64_t w = ::stats.written.load(); uint64_t r = ::stats.read.load(); fprintf(stderr, "%s elapsed %4lds | written %10ld | w/s %10.0f | read %10ld | " "r/s %10.0f | cache misses %10ld\n", db_->GetEnv()->TimeToString(time(nullptr)).c_str(), std::chrono::duration_cast(now - tstart) .count(), w, (w - wlast) / elapsed, r, (r - rlast) / elapsed, ::stats.cache_misses.load()); wlast = w; rlast = r; tlast = now; } } ~StatsThread() { { std::lock_guard guard(cvm_); done_.store(true); } cv_.notify_all(); thread_.join(); } private: ROCKSDB_NAMESPACE::DB* db_; std::mutex cvm_; std::condition_variable cv_; ROCKSDB_NAMESPACE::port::Thread thread_; std::atomic done_{false}; }; int main(int argc, char** argv) { GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true); std::mt19937 rng{std::random_device()()}; ROCKSDB_NAMESPACE::Status status; std::string path = ROCKSDB_NAMESPACE::test::PerThreadDBPath("forward_iterator_test"); fprintf(stderr, "db path is %s\n", path.c_str()); ROCKSDB_NAMESPACE::Options options; options.create_if_missing = true; options.compression = ROCKSDB_NAMESPACE::CompressionType::kNoCompression; options.compaction_style = ROCKSDB_NAMESPACE::CompactionStyle::kCompactionStyleNone; options.level0_slowdown_writes_trigger = 99999; options.level0_stop_writes_trigger = 99999; options.use_direct_io_for_flush_and_compaction = true; options.write_buffer_size = FLAGS_memtable_size; ROCKSDB_NAMESPACE::BlockBasedTableOptions table_options; table_options.block_cache = ROCKSDB_NAMESPACE::NewLRUCache(FLAGS_block_cache_size); table_options.block_size = FLAGS_block_size; options.table_factory.reset( ROCKSDB_NAMESPACE::NewBlockBasedTableFactory(table_options)); status = ROCKSDB_NAMESPACE::DestroyDB(path, options); assert(status.ok()); ROCKSDB_NAMESPACE::DB* db_raw; status = ROCKSDB_NAMESPACE::DB::Open(options, path, &db_raw); assert(status.ok()); std::unique_ptr db(db_raw); std::vector shard_states(FLAGS_shards + 1); std::deque readers; while (static_cast(readers.size()) < FLAGS_readers) { readers.emplace_back(&shard_states, db_raw); } std::deque writers; while (static_cast(writers.size()) < FLAGS_writers) { writers.emplace_back(&shard_states, db_raw); } // Each shard gets a random reader and random writer assigned to it for (int i = 1; i <= FLAGS_shards; ++i) { std::uniform_int_distribution reader_dist(0, FLAGS_readers - 1); std::uniform_int_distribution writer_dist(0, FLAGS_writers - 1); shard_states[i].reader = &readers[reader_dist(rng)]; shard_states[i].writer = &writers[writer_dist(rng)]; } StatsThread stats_thread(db_raw); for (Writer& w : writers) { w.start(); } writers.clear(); readers.clear(); } #endif // !defined(GFLAGS) || defined(ROCKSDB_LITE)