diff --git a/cache/cache_bench.cc b/cache/cache_bench.cc index 6ff36a32d..163599546 100644 --- a/cache/cache_bench.cc +++ b/cache/cache_bench.cc @@ -14,34 +14,47 @@ int main() { #include #include #include +#include #include "port/port.h" #include "rocksdb/cache.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "util/coding.h" #include "util/gflags_compat.h" +#include "util/hash.h" #include "util/mutexlock.h" #include "util/random.h" using GFLAGS_NAMESPACE::ParseCommandLineFlags; -static const uint32_t KB = 1024; - -DEFINE_int32(threads, 16, "Number of concurrent threads to run."); -DEFINE_int64(cache_size, 8 * KB * KB, - "Number of bytes to use as a cache of uncompressed data."); -DEFINE_int32(num_shard_bits, 4, "shard_bits."); - -DEFINE_int64(max_key, 1 * KB * KB * KB, "Max number of key to place in cache"); -DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread."); - -DEFINE_bool(populate_cache, false, "Populate cache before operations"); -DEFINE_int32(insert_percent, 40, - "Ratio of insert to total workload (expressed as a percentage)"); -DEFINE_int32(lookup_percent, 50, - "Ratio of lookup to total workload (expressed as a percentage)"); -DEFINE_int32(erase_percent, 10, - "Ratio of erase to total workload (expressed as a percentage)"); +static constexpr uint32_t KiB = uint32_t{1} << 10; +static constexpr uint32_t MiB = KiB << 10; +static constexpr uint64_t GiB = MiB << 10; + +DEFINE_uint32(threads, 16, "Number of concurrent threads to run."); +DEFINE_uint64(cache_size, 1 * GiB, + "Number of bytes to use as a cache of uncompressed data."); +DEFINE_uint32(num_shard_bits, 6, "shard_bits."); + +DEFINE_double(resident_ratio, 0.25, + "Ratio of keys fitting in cache to keyspace."); +DEFINE_uint64(ops_per_thread, 0, + "Number of operations per thread. (Default: 5 * keyspace size)"); +DEFINE_uint32(value_bytes, 8 * KiB, "Size of each value added."); + +DEFINE_uint32(skew, 5, "Degree of skew in key selection"); +DEFINE_bool(populate_cache, true, "Populate cache before operations"); + +DEFINE_uint32(lookup_insert_percent, 87, + "Ratio of lookup (+ insert on not found) to total workload " + "(expressed as a percentage)"); +DEFINE_uint32(insert_percent, 2, + "Ratio of insert to total workload (expressed as a percentage)"); +DEFINE_uint32(lookup_percent, 10, + "Ratio of lookup to total workload (expressed as a percentage)"); +DEFINE_uint32(erase_percent, 1, + "Ratio of erase to total workload (expressed as a percentage)"); DEFINE_bool(use_clock_cache, false, ""); @@ -49,21 +62,15 @@ namespace ROCKSDB_NAMESPACE { class CacheBench; namespace { -void deleter(const Slice& /*key*/, void* value) { - delete reinterpret_cast(value); -} - // State shared by all concurrent executions of the same benchmark. class SharedState { public: explicit SharedState(CacheBench* cache_bench) : cv_(&mu_), - num_threads_(FLAGS_threads), num_initialized_(0), start_(false), num_done_(0), - cache_bench_(cache_bench) { - } + cache_bench_(cache_bench) {} ~SharedState() {} @@ -87,13 +94,9 @@ class SharedState { num_done_++; } - bool AllInitialized() const { - return num_initialized_ >= num_threads_; - } + bool AllInitialized() const { return num_initialized_ >= FLAGS_threads; } - bool AllDone() const { - return num_done_ >= num_threads_; - } + bool AllDone() const { return num_done_ >= FLAGS_threads; } void SetStart() { start_ = true; @@ -107,7 +110,6 @@ class SharedState { port::Mutex mu_; port::CondVar cv_; - const uint64_t num_threads_; uint64_t num_initialized_; bool start_; uint64_t num_done_; @@ -118,17 +120,69 @@ class SharedState { // Per-thread state for concurrent executions of the same benchmark. struct ThreadState { uint32_t tid; - Random rnd; + Random64 rnd; SharedState* shared; ThreadState(uint32_t index, SharedState* _shared) : tid(index), rnd(1000 + index), shared(_shared) {} }; + +struct KeyGen { + char key_data[27]; + + Slice GetRand(Random64& rnd, uint64_t max_key) { + uint64_t raw = rnd.Next(); + // Skew according to setting + for (uint32_t i = 0; i < FLAGS_skew; ++i) { + raw = std::min(raw, rnd.Next()); + } + uint64_t key = fastrange64(raw, max_key); + // Variable size and alignment + size_t off = key % 8; + key_data[0] = char{42}; + EncodeFixed64(key_data + 1, key); + key_data[9] = char{11}; + EncodeFixed64(key_data + 10, key); + key_data[18] = char{4}; + EncodeFixed64(key_data + 19, key); + return Slice(&key_data[off], sizeof(key_data) - off); + } +}; + +char* createValue(Random64& rnd) { + char* rv = new char[FLAGS_value_bytes]; + // Fill with some filler data, and take some CPU time + for (uint32_t i = 0; i < FLAGS_value_bytes; i += 8) { + EncodeFixed64(rv + i, rnd.Next()); + } + return rv; +} + +void deleter(const Slice& /*key*/, void* value) { + delete[] static_cast(value); +} } // namespace class CacheBench { + static constexpr uint64_t kHundredthUint64 = + std::numeric_limits::max() / 100U; + public: - CacheBench() : num_threads_(FLAGS_threads) { + CacheBench() + : max_key_(static_cast(FLAGS_cache_size / FLAGS_resident_ratio / + FLAGS_value_bytes)), + lookup_insert_threshold_(kHundredthUint64 * + FLAGS_lookup_insert_percent), + insert_threshold_(lookup_insert_threshold_ + + kHundredthUint64 * FLAGS_insert_percent), + lookup_threshold_(insert_threshold_ + + kHundredthUint64 * FLAGS_lookup_percent), + erase_threshold_(lookup_threshold_ + + kHundredthUint64 * FLAGS_erase_percent) { + if (erase_threshold_ != 100U * kHundredthUint64) { + fprintf(stderr, "Percentages must add to 100.\n"); + exit(1); + } if (FLAGS_use_clock_cache) { cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits); if (!cache_) { @@ -138,18 +192,19 @@ class CacheBench { } else { cache_ = NewLRUCache(FLAGS_cache_size, FLAGS_num_shard_bits); } + if (FLAGS_ops_per_thread == 0) { + FLAGS_ops_per_thread = 5 * max_key_; + } } ~CacheBench() {} void PopulateCache() { - Random rnd(1); - for (int64_t i = 0; i < FLAGS_cache_size; i++) { - uint64_t rand_key = rnd.Next() % FLAGS_max_key; - // Cast uint64* to be char*, data would be copied to cache - Slice key(reinterpret_cast(&rand_key), 8); - // do insert - cache_->Insert(key, new char[10], 1, &deleter); + Random64 rnd(1); + KeyGen keygen; + for (uint64_t i = 0; i < 2 * FLAGS_cache_size; i += FLAGS_value_bytes) { + cache_->Insert(keygen.GetRand(rnd, max_key_), createValue(rnd), + FLAGS_value_bytes, &deleter); } } @@ -158,10 +213,10 @@ class CacheBench { PrintEnv(); SharedState shared(this); - std::vector threads(num_threads_); - for (uint32_t i = 0; i < num_threads_; i++) { - threads[i] = new ThreadState(i, &shared); - env->StartThread(ThreadBody, threads[i]); + std::vector > threads(FLAGS_threads); + for (uint32_t i = 0; i < FLAGS_threads; i++) { + threads[i].reset(new ThreadState(i, &shared)); + env->StartThread(ThreadBody, threads[i].get()); } { MutexLock l(shared.GetMutex()); @@ -192,10 +247,15 @@ class CacheBench { private: std::shared_ptr cache_; - uint32_t num_threads_; + const uint64_t max_key_; + // Cumulative thresholds in the space of a random uint64_t + const uint64_t lookup_insert_threshold_; + const uint64_t insert_threshold_; + const uint64_t lookup_threshold_; + const uint64_t erase_threshold_; static void ThreadBody(void* v) { - ThreadState* thread = reinterpret_cast(v); + ThreadState* thread = static_cast(v); SharedState* shared = thread->shared; { @@ -220,40 +280,78 @@ class CacheBench { } void OperateCache(ThreadState* thread) { + // To use looked-up values + uint64_t result = 0; + // To hold handles for a non-trivial amount of time + Cache::Handle* handle = nullptr; + KeyGen gen; for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) { - uint64_t rand_key = thread->rnd.Next() % FLAGS_max_key; - // Cast uint64* to be char*, data would be copied to cache - Slice key(reinterpret_cast(&rand_key), 8); - int32_t prob_op = thread->rnd.Uniform(100); - if (prob_op >= 0 && prob_op < FLAGS_insert_percent) { - // do insert - cache_->Insert(key, new char[10], 1, &deleter); - } else if (prob_op -= FLAGS_insert_percent && - prob_op < FLAGS_lookup_percent) { + Slice key = gen.GetRand(thread->rnd, max_key_); + uint64_t random_op = thread->rnd.Next(); + if (random_op < lookup_insert_threshold_) { + if (handle) { + cache_->Release(handle); + handle = nullptr; + } // do lookup - auto handle = cache_->Lookup(key); + handle = cache_->Lookup(key); + if (handle) { + // do something with the data + result += NPHash64(static_cast(cache_->Value(handle)), + FLAGS_value_bytes); + } else { + // do insert + cache_->Insert(key, createValue(thread->rnd), FLAGS_value_bytes, + &deleter, &handle); + } + } else if (random_op < insert_threshold_) { if (handle) { cache_->Release(handle); + handle = nullptr; } - } else if (prob_op -= FLAGS_lookup_percent && - prob_op < FLAGS_erase_percent) { + // do insert + cache_->Insert(key, createValue(thread->rnd), FLAGS_value_bytes, + &deleter, &handle); + } else if (random_op < lookup_threshold_) { + if (handle) { + cache_->Release(handle); + handle = nullptr; + } + // do lookup + handle = cache_->Lookup(key); + if (handle) { + // do something with the data + result += NPHash64(static_cast(cache_->Value(handle)), + FLAGS_value_bytes); + } + } else if (random_op < erase_threshold_) { // do erase cache_->Erase(key); + } else { + // Should be extremely unlikely (noop) + assert(random_op >= kHundredthUint64 * 100U); } } + if (handle) { + cache_->Release(handle); + handle = nullptr; + } } void PrintEnv() const { printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); - printf("Number of threads : %d\n", FLAGS_threads); + printf("Number of threads : %u\n", FLAGS_threads); printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread); printf("Cache size : %" PRIu64 "\n", FLAGS_cache_size); - printf("Num shard bits : %d\n", FLAGS_num_shard_bits); - printf("Max key : %" PRIu64 "\n", FLAGS_max_key); - printf("Populate cache : %d\n", FLAGS_populate_cache); - printf("Insert percentage : %d%%\n", FLAGS_insert_percent); - printf("Lookup percentage : %d%%\n", FLAGS_lookup_percent); - printf("Erase percentage : %d%%\n", FLAGS_erase_percent); + printf("Num shard bits : %u\n", FLAGS_num_shard_bits); + printf("Max key : %" PRIu64 "\n", max_key_); + printf("Resident ratio : %g\n", FLAGS_resident_ratio); + printf("Skew degree : %u\n", FLAGS_skew); + printf("Populate cache : %d\n", int{FLAGS_populate_cache}); + printf("Lookup+Insert pct : %u%%\n", FLAGS_lookup_insert_percent); + printf("Insert percentage : %u%%\n", FLAGS_insert_percent); + printf("Lookup percentage : %u%%\n", FLAGS_lookup_percent); + printf("Erase percentage : %u%%\n", FLAGS_erase_percent); printf("----------------------------\n"); } }; @@ -270,6 +368,8 @@ int main(int argc, char** argv) { ROCKSDB_NAMESPACE::CacheBench bench; if (FLAGS_populate_cache) { bench.PopulateCache(); + printf("Population complete\n"); + printf("----------------------------\n"); } if (bench.Run()) { return 0; diff --git a/db/error_handler_fs_test.cc b/db/error_handler_fs_test.cc index 78a795b4f..912baa1ba 100644 --- a/db/error_handler_fs_test.cc +++ b/db/error_handler_fs_test.cc @@ -498,15 +498,15 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "VersionSet::LogAndApply:WriteManifest", [&](void*) { if (fail_manifest.load()) { - fault_fs->SetFilesystemActive(false,error_msg); } - }); + fault_fs->SetFilesystemActive(false, error_msg); + } + }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); Put(Key(1), "val"); s = Flush(); ASSERT_EQ(s, Status::OK()); - TEST_SYNC_POINT("CompactionManifestWriteError:0"); TEST_SYNC_POINT("CompactionManifestWriteError:1"); diff --git a/util/work_queue.h b/util/work_queue.h index 3d9126364..f120ca77c 100644 --- a/util/work_queue.h +++ b/util/work_queue.h @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include @@ -146,4 +145,4 @@ class WorkQueue { } } }; -} +} // namespace ROCKSDB_NAMESPACE