From 13232e11d4bbb4c923a49f395de1487108cf08b4 Mon Sep 17 00:00:00 2001 From: anand76 Date: Wed, 19 May 2021 15:24:37 -0700 Subject: [PATCH] Allow cache_bench/db_bench to use a custom secondary cache (#8312) Summary: This PR adds a ```-secondary_cache_uri``` option to the cache_bench and db_bench tools to allow the user to specify a custom secondary cache URI. The object registry is used to create an instance of the ```SecondaryCache``` object of the type specified in the URI. The main cache_bench code is packaged into a separate library, similar to db_bench. An example invocation of db_bench with a secondary cache URI - ```db_bench --env_uri=ws://ws.flash_sandbox.vll1_2/ -db=anand/nvm_cache_2 -use_existing_db=true -benchmarks=readrandom -num=30000000 -key_size=32 -value_size=256 -use_direct_reads=true -cache_size=67108864 -cache_index_and_filter_blocks=true -secondary_cache_uri='cachelibwrapper://filename=/home/anand76/nvm_cache/cache_file;size=2147483648;regionSize=16777216;admPolicy=random;admProbability=1.0;volatileSize=8388608;bktPower=20;lockPower=12' -partition_index_and_filters=true -duration=1800``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/8312 Reviewed By: zhichao-cao Differential Revision: D28544325 Pulled By: anand1976 fbshipit-source-id: 8f209b9af900c459dc42daa7a610d5f00176eeed --- CMakeLists.txt | 3 +- Makefile | 5 +- TARGETS | 15 + buckifier/buckify_rocksdb.py | 5 + cache/cache_bench.cc | 524 +------------------------- cache/cache_bench_tool.cc | 573 +++++++++++++++++++++++++++++ include/rocksdb/cache_bench_tool.h | 14 + include/rocksdb/secondary_cache.h | 2 + src.mk | 3 + tools/db_bench_tool.cc | 45 ++- 10 files changed, 660 insertions(+), 529 deletions(-) create mode 100644 cache/cache_bench_tool.cc create mode 100644 include/rocksdb/cache_bench_tool.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 8f67a1a36..6a2a98e42 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1349,7 +1349,8 @@ if(WITH_BENCHMARK_TOOLS) ${ROCKSDB_LIB} ${THIRDPARTY_LIBS}) add_executable(cache_bench${ARTIFACT_SUFFIX} - cache/cache_bench.cc) + cache/cache_bench.cc + cache/cache_bench_tool.cc) target_link_libraries(cache_bench${ARTIFACT_SUFFIX} ${ROCKSDB_LIB} ${GFLAGS_LIB}) diff --git a/Makefile b/Makefile index 4e5191a47..8bf806f38 100644 --- a/Makefile +++ b/Makefile @@ -491,13 +491,14 @@ VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full TEST_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(TEST_LIB_SOURCES) $(MOCK_LIB_SOURCES)) $(GTEST) BENCH_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(BENCH_LIB_SOURCES)) +CACHE_BENCH_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(CACHE_BENCH_LIB_SOURCES)) TOOL_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(TOOL_LIB_SOURCES)) ANALYZE_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(ANALYZER_LIB_SOURCES)) STRESS_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(STRESS_LIB_SOURCES)) # Exclude build_version.cc -- a generated source file -- from all sources. Not needed for dependencies ALL_SOURCES = $(filter-out util/build_version.cc, $(LIB_SOURCES)) $(TEST_LIB_SOURCES) $(MOCK_LIB_SOURCES) $(GTEST_DIR)/gtest/gtest-all.cc -ALL_SOURCES += $(TOOL_LIB_SOURCES) $(BENCH_LIB_SOURCES) $(ANALYZER_LIB_SOURCES) $(STRESS_LIB_SOURCES) +ALL_SOURCES += $(TOOL_LIB_SOURCES) $(BENCH_LIB_SOURCES) $(CACHE_BENCH_LIB_SOURCES) $(ANALYZER_LIB_SOURCES) $(STRESS_LIB_SOURCES) ALL_SOURCES += $(TEST_MAIN_SOURCES) $(TOOL_MAIN_SOURCES) $(BENCH_MAIN_SOURCES) TESTS = $(patsubst %.cc, %, $(notdir $(TEST_MAIN_SOURCES))) @@ -1252,7 +1253,7 @@ folly_synchronization_distributed_mutex_test: $(OBJ_DIR)/third-party/folly/folly $(AM_LINK) endif -cache_bench: $(OBJ_DIR)/cache/cache_bench.o $(LIBRARY) +cache_bench: $(OBJ_DIR)/cache/cache_bench.o $(CACHE_BENCH_OBJECTS) $(LIBRARY) $(AM_LINK) persistent_cache_bench: $(OBJ_DIR)/utilities/persistent_cache/persistent_cache_bench.o $(LIBRARY) diff --git a/TARGETS b/TARGETS index d6b14caa2..427af17ed 100644 --- a/TARGETS +++ b/TARGETS @@ -795,6 +795,21 @@ cpp_library( link_whole = False, ) +cpp_library( + name = "rocksdb_cache_bench_tools_lib", + srcs = ["cache/cache_bench_tool.cc"], + auto_headers = AutoHeaders.RECURSIVE_GLOB, + arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS, + compiler_flags = ROCKSDB_COMPILER_FLAGS, + os_deps = ROCKSDB_OS_DEPS, + os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS, + preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS, + include_paths = ROCKSDB_INCLUDE_PATHS, + deps = [":rocksdb_lib"], + external_deps = ROCKSDB_EXTERNAL_DEPS, + link_whole = False, +) + cpp_library( name = "rocksdb_stress_lib", srcs = [ diff --git a/buckifier/buckify_rocksdb.py b/buckifier/buckify_rocksdb.py index 0fc7fc0c9..993dd4d1b 100644 --- a/buckifier/buckify_rocksdb.py +++ b/buckifier/buckify_rocksdb.py @@ -173,6 +173,11 @@ def generate_targets(repo_path, deps_map): src_mk.get("ANALYZER_LIB_SOURCES", []) + ["test_util/testutil.cc"], [":rocksdb_lib"]) + # rocksdb_cache_bench_tools_lib + TARGETS.add_library( + "rocksdb_cache_bench_tools_lib", + src_mk.get("CACHE_BENCH_LIB_SOURCES", []), + [":rocksdb_lib"]) # rocksdb_stress_lib TARGETS.add_rocksdb_library( "rocksdb_stress_lib", diff --git a/cache/cache_bench.cc b/cache/cache_bench.cc index b7de5ae86..0669354ae 100644 --- a/cache/cache_bench.cc +++ b/cache/cache_bench.cc @@ -1,528 +1,20 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// Copyright (c) 2013-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). - -#include -#include -#include -#include -#include - -#include "monitoring/histogram.h" -#include "port/port.h" -#include "rocksdb/cache.h" -#include "rocksdb/db.h" -#include "rocksdb/env.h" -#include "rocksdb/system_clock.h" -#include "table/block_based/cachable_entry.h" -#include "util/coding.h" -#include "util/hash.h" -#include "util/mutexlock.h" -#include "util/random.h" -#include "util/stop_watch.h" -#include "util/string_util.h" - +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. #ifndef GFLAGS +#include int main() { fprintf(stderr, "Please install gflags to run rocksdb tools\n"); return 1; } #else - -#include "util/gflags_compat.h" - -using GFLAGS_NAMESPACE::ParseCommandLineFlags; - -static constexpr uint32_t KiB = uint32_t{1} << 10; -static constexpr uint32_t MiB = KiB << 10; -static constexpr uint64_t GiB = MiB << 10; - -DEFINE_uint32(threads, 16, "Number of concurrent threads to run."); -DEFINE_uint64(cache_size, 1 * GiB, - "Number of bytes to use as a cache of uncompressed data."); -DEFINE_uint32(num_shard_bits, 6, "shard_bits."); - -DEFINE_double(resident_ratio, 0.25, - "Ratio of keys fitting in cache to keyspace."); -DEFINE_uint64(ops_per_thread, 2000000U, "Number of operations per thread."); -DEFINE_uint32(value_bytes, 8 * KiB, "Size of each value added."); - -DEFINE_uint32(skew, 5, "Degree of skew in key selection"); -DEFINE_bool(populate_cache, true, "Populate cache before operations"); - -DEFINE_uint32(lookup_insert_percent, 87, - "Ratio of lookup (+ insert on not found) to total workload " - "(expressed as a percentage)"); -DEFINE_uint32(insert_percent, 2, - "Ratio of insert to total workload (expressed as a percentage)"); -DEFINE_uint32(lookup_percent, 10, - "Ratio of lookup to total workload (expressed as a percentage)"); -DEFINE_uint32(erase_percent, 1, - "Ratio of erase to total workload (expressed as a percentage)"); -DEFINE_bool(gather_stats, false, - "Whether to periodically simulate gathering block cache stats, " - "using one more thread."); -DEFINE_uint32( - gather_stats_sleep_ms, 1000, - "How many milliseconds to sleep between each gathering of stats."); - -DEFINE_uint32(gather_stats_entries_per_lock, 256, - "For Cache::ApplyToAllEntries"); - -DEFINE_bool(use_clock_cache, false, ""); - -namespace ROCKSDB_NAMESPACE { - -class CacheBench; -namespace { -// State shared by all concurrent executions of the same benchmark. -class SharedState { - public: - explicit SharedState(CacheBench* cache_bench) - : cv_(&mu_), - num_initialized_(0), - start_(false), - num_done_(0), - cache_bench_(cache_bench) {} - - ~SharedState() {} - - port::Mutex* GetMutex() { - return &mu_; - } - - port::CondVar* GetCondVar() { - return &cv_; - } - - CacheBench* GetCacheBench() const { - return cache_bench_; - } - - void IncInitialized() { - num_initialized_++; - } - - void IncDone() { - num_done_++; - } - - bool AllInitialized() const { return num_initialized_ >= FLAGS_threads; } - - bool AllDone() const { return num_done_ >= FLAGS_threads; } - - void SetStart() { - start_ = true; - } - - bool Started() const { - return start_; - } - - private: - port::Mutex mu_; - port::CondVar cv_; - - uint64_t num_initialized_; - bool start_; - uint64_t num_done_; - - CacheBench* cache_bench_; -}; - -// Per-thread state for concurrent executions of the same benchmark. -struct ThreadState { - uint32_t tid; - Random64 rnd; - SharedState* shared; - HistogramImpl latency_ns_hist; - uint64_t duration_us = 0; - - ThreadState(uint32_t index, SharedState* _shared) - : tid(index), rnd(1000 + index), shared(_shared) {} -}; - -struct KeyGen { - char key_data[27]; - - Slice GetRand(Random64& rnd, uint64_t max_key) { - uint64_t raw = rnd.Next(); - // Skew according to setting - for (uint32_t i = 0; i < FLAGS_skew; ++i) { - raw = std::min(raw, rnd.Next()); - } - uint64_t key = FastRange64(raw, max_key); - // Variable size and alignment - size_t off = key % 8; - key_data[0] = char{42}; - EncodeFixed64(key_data + 1, key); - key_data[9] = char{11}; - EncodeFixed64(key_data + 10, key); - key_data[18] = char{4}; - EncodeFixed64(key_data + 19, key); - return Slice(&key_data[off], sizeof(key_data) - off); - } -}; - -char* createValue(Random64& rnd) { - char* rv = new char[FLAGS_value_bytes]; - // Fill with some filler data, and take some CPU time - for (uint32_t i = 0; i < FLAGS_value_bytes; i += 8) { - EncodeFixed64(rv + i, rnd.Next()); - } - return rv; -} - -// Different deleters to simulate using deleter to gather -// stats on the code origin and kind of cache entries. -void deleter1(const Slice& /*key*/, void* value) { - delete[] static_cast(value); -} -void deleter2(const Slice& /*key*/, void* value) { - delete[] static_cast(value); -} -void deleter3(const Slice& /*key*/, void* value) { - delete[] static_cast(value); -} -} // namespace - -class CacheBench { - static constexpr uint64_t kHundredthUint64 = - std::numeric_limits::max() / 100U; - - public: - CacheBench() - : max_key_(static_cast(FLAGS_cache_size / FLAGS_resident_ratio / - FLAGS_value_bytes)), - lookup_insert_threshold_(kHundredthUint64 * - FLAGS_lookup_insert_percent), - insert_threshold_(lookup_insert_threshold_ + - kHundredthUint64 * FLAGS_insert_percent), - lookup_threshold_(insert_threshold_ + - kHundredthUint64 * FLAGS_lookup_percent), - erase_threshold_(lookup_threshold_ + - kHundredthUint64 * FLAGS_erase_percent) { - if (erase_threshold_ != 100U * kHundredthUint64) { - fprintf(stderr, "Percentages must add to 100.\n"); - exit(1); - } - if (FLAGS_use_clock_cache) { - cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits); - if (!cache_) { - fprintf(stderr, "Clock cache not supported.\n"); - exit(1); - } - } else { - cache_ = NewLRUCache(FLAGS_cache_size, FLAGS_num_shard_bits); - } - } - - ~CacheBench() {} - - void PopulateCache() { - Random64 rnd(1); - KeyGen keygen; - for (uint64_t i = 0; i < 2 * FLAGS_cache_size; i += FLAGS_value_bytes) { - cache_->Insert(keygen.GetRand(rnd, max_key_), createValue(rnd), - FLAGS_value_bytes, &deleter1); - } - } - - bool Run() { - const auto clock = SystemClock::Default().get(); - - PrintEnv(); - SharedState shared(this); - std::vector > threads(FLAGS_threads); - for (uint32_t i = 0; i < FLAGS_threads; i++) { - threads[i].reset(new ThreadState(i, &shared)); - std::thread(ThreadBody, threads[i].get()).detach(); - } - - HistogramImpl stats_hist; - std::string stats_report; - std::thread stats_thread(StatsBody, &shared, &stats_hist, &stats_report); - - uint64_t start_time; - { - MutexLock l(shared.GetMutex()); - while (!shared.AllInitialized()) { - shared.GetCondVar()->Wait(); - } - // Record start time - start_time = clock->NowMicros(); - - // Start all threads - shared.SetStart(); - shared.GetCondVar()->SignalAll(); - - // Wait threads to complete - while (!shared.AllDone()) { - shared.GetCondVar()->Wait(); - } - } - - // Stats gathering is considered background work. This time measurement - // is for foreground work, and not really ideal for that. See below. - uint64_t end_time = clock->NowMicros(); - stats_thread.join(); - - // Wall clock time - includes idle time if threads - // finish at different times (not ideal). - double elapsed_secs = static_cast(end_time - start_time) * 1e-6; - uint32_t ops_per_sec = static_cast( - 1.0 * FLAGS_threads * FLAGS_ops_per_thread / elapsed_secs); - printf("Complete in %.3f s; Rough parallel ops/sec = %u\n", elapsed_secs, - ops_per_sec); - - // Total time in each thread (more accurate throughput measure) - elapsed_secs = 0; - for (uint32_t i = 0; i < FLAGS_threads; i++) { - elapsed_secs += threads[i]->duration_us * 1e-6; - } - ops_per_sec = static_cast(1.0 * FLAGS_threads * - FLAGS_ops_per_thread / elapsed_secs); - printf("Thread ops/sec = %u\n", ops_per_sec); - - printf("\nOperation latency (ns):\n"); - HistogramImpl combined; - for (uint32_t i = 0; i < FLAGS_threads; i++) { - combined.Merge(threads[i]->latency_ns_hist); - } - printf("%s", combined.ToString().c_str()); - - if (FLAGS_gather_stats) { - printf("\nGather stats latency (us):\n"); - printf("%s", stats_hist.ToString().c_str()); - } - - printf("\n%s", stats_report.c_str()); - - return true; - } - - private: - std::shared_ptr cache_; - const uint64_t max_key_; - // Cumulative thresholds in the space of a random uint64_t - const uint64_t lookup_insert_threshold_; - const uint64_t insert_threshold_; - const uint64_t lookup_threshold_; - const uint64_t erase_threshold_; - - // A benchmark version of gathering stats on an active block cache by - // iterating over it. The primary purpose is to measure the impact of - // gathering stats with ApplyToAllEntries on throughput- and - // latency-sensitive Cache users. Performance of stats gathering is - // also reported. The last set of gathered stats is also reported, for - // manual sanity checking for logical errors or other unexpected - // behavior of cache_bench or the underlying Cache. - static void StatsBody(SharedState* shared, HistogramImpl* stats_hist, - std::string* stats_report) { - if (!FLAGS_gather_stats) { - return; - } - const auto clock = SystemClock::Default().get(); - uint64_t total_key_size = 0; - uint64_t total_charge = 0; - uint64_t total_entry_count = 0; - std::set deleters; - StopWatchNano timer(clock); - - for (;;) { - uint64_t time; - time = clock->NowMicros(); - uint64_t deadline = time + uint64_t{FLAGS_gather_stats_sleep_ms} * 1000; - - { - MutexLock l(shared->GetMutex()); - for (;;) { - if (shared->AllDone()) { - std::ostringstream ostr; - ostr << "Most recent cache entry stats:\n" - << "Number of entries: " << total_entry_count << "\n" - << "Total charge: " << BytesToHumanString(total_charge) << "\n" - << "Average key size: " - << (1.0 * total_key_size / total_entry_count) << "\n" - << "Average charge: " - << BytesToHumanString(1.0 * total_charge / total_entry_count) - << "\n" - << "Unique deleters: " << deleters.size() << "\n"; - *stats_report = ostr.str(); - return; - } - if (clock->NowMicros() >= deadline) { - break; - } - uint64_t diff = deadline - std::min(clock->NowMicros(), deadline); - shared->GetCondVar()->TimedWait(diff + 1); - } - } - - // Now gather stats, outside of mutex - total_key_size = 0; - total_charge = 0; - total_entry_count = 0; - deleters.clear(); - auto fn = [&](const Slice& key, void* /*value*/, size_t charge, - Cache::DeleterFn deleter) { - total_key_size += key.size(); - total_charge += charge; - ++total_entry_count; - // Something slightly more expensive as in (future) stats by category - deleters.insert(deleter); - }; - timer.Start(); - Cache::ApplyToAllEntriesOptions opts; - opts.average_entries_per_lock = FLAGS_gather_stats_entries_per_lock; - shared->GetCacheBench()->cache_->ApplyToAllEntries(fn, opts); - stats_hist->Add(timer.ElapsedNanos() / 1000); - } - } - - static void ThreadBody(ThreadState* thread) { - SharedState* shared = thread->shared; - - { - MutexLock l(shared->GetMutex()); - shared->IncInitialized(); - if (shared->AllInitialized()) { - shared->GetCondVar()->SignalAll(); - } - while (!shared->Started()) { - shared->GetCondVar()->Wait(); - } - } - thread->shared->GetCacheBench()->OperateCache(thread); - - { - MutexLock l(shared->GetMutex()); - shared->IncDone(); - if (shared->AllDone()) { - shared->GetCondVar()->SignalAll(); - } - } - } - - void OperateCache(ThreadState* thread) { - // To use looked-up values - uint64_t result = 0; - // To hold handles for a non-trivial amount of time - Cache::Handle* handle = nullptr; - KeyGen gen; - const auto clock = SystemClock::Default().get(); - uint64_t start_time = clock->NowMicros(); - StopWatchNano timer(clock); - - for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) { - timer.Start(); - Slice key = gen.GetRand(thread->rnd, max_key_); - uint64_t random_op = thread->rnd.Next(); - if (random_op < lookup_insert_threshold_) { - if (handle) { - cache_->Release(handle); - handle = nullptr; - } - // do lookup - handle = cache_->Lookup(key); - if (handle) { - // do something with the data - result += NPHash64(static_cast(cache_->Value(handle)), - FLAGS_value_bytes); - } else { - // do insert - cache_->Insert(key, createValue(thread->rnd), FLAGS_value_bytes, - &deleter2, &handle); - } - } else if (random_op < insert_threshold_) { - if (handle) { - cache_->Release(handle); - handle = nullptr; - } - // do insert - cache_->Insert(key, createValue(thread->rnd), FLAGS_value_bytes, - &deleter3, &handle); - } else if (random_op < lookup_threshold_) { - if (handle) { - cache_->Release(handle); - handle = nullptr; - } - // do lookup - handle = cache_->Lookup(key); - if (handle) { - // do something with the data - result += NPHash64(static_cast(cache_->Value(handle)), - FLAGS_value_bytes); - } - } else if (random_op < erase_threshold_) { - // do erase - cache_->Erase(key); - } else { - // Should be extremely unlikely (noop) - assert(random_op >= kHundredthUint64 * 100U); - } - thread->latency_ns_hist.Add(timer.ElapsedNanos()); - } - if (handle) { - cache_->Release(handle); - handle = nullptr; - } - // Ensure computations on `result` are not optimized away. - if (result == 1) { - printf("You are extremely unlucky(2). Try again.\n"); - exit(1); - } - thread->duration_us = clock->NowMicros() - start_time; - } - - void PrintEnv() const { - printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); - printf("Number of threads : %u\n", FLAGS_threads); - printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread); - printf("Cache size : %s\n", - BytesToHumanString(FLAGS_cache_size).c_str()); - printf("Num shard bits : %u\n", FLAGS_num_shard_bits); - printf("Max key : %" PRIu64 "\n", max_key_); - printf("Resident ratio : %g\n", FLAGS_resident_ratio); - printf("Skew degree : %u\n", FLAGS_skew); - printf("Populate cache : %d\n", int{FLAGS_populate_cache}); - printf("Lookup+Insert pct : %u%%\n", FLAGS_lookup_insert_percent); - printf("Insert percentage : %u%%\n", FLAGS_insert_percent); - printf("Lookup percentage : %u%%\n", FLAGS_lookup_percent); - printf("Erase percentage : %u%%\n", FLAGS_erase_percent); - std::ostringstream stats; - if (FLAGS_gather_stats) { - stats << "enabled (" << FLAGS_gather_stats_sleep_ms << "ms, " - << FLAGS_gather_stats_entries_per_lock << "/lock)"; - } else { - stats << "disabled"; - } - printf("Gather stats : %s\n", stats.str().c_str()); - printf("----------------------------\n"); - } -}; -} // namespace ROCKSDB_NAMESPACE - +#include int main(int argc, char** argv) { - ParseCommandLineFlags(&argc, &argv, true); - - if (FLAGS_threads <= 0) { - fprintf(stderr, "threads number <= 0\n"); - exit(1); - } - - ROCKSDB_NAMESPACE::CacheBench bench; - if (FLAGS_populate_cache) { - bench.PopulateCache(); - printf("Population complete\n"); - printf("----------------------------\n"); - } - if (bench.Run()) { - return 0; - } else { - return 1; - } + return ROCKSDB_NAMESPACE::cache_bench_tool(argc, argv); } - #endif // GFLAGS diff --git a/cache/cache_bench_tool.cc b/cache/cache_bench_tool.cc new file mode 100644 index 000000000..9fadf85a9 --- /dev/null +++ b/cache/cache_bench_tool.cc @@ -0,0 +1,573 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifdef GFLAGS +#include +#include +#include +#include +#include + +#include "monitoring/histogram.h" +#include "port/port.h" +#include "rocksdb/cache.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/secondary_cache.h" +#include "rocksdb/system_clock.h" +#include "rocksdb/utilities/object_registry.h" +#include "table/block_based/cachable_entry.h" +#include "util/coding.h" +#include "util/gflags_compat.h" +#include "util/hash.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "util/stop_watch.h" +#include "util/string_util.h" + +using GFLAGS_NAMESPACE::ParseCommandLineFlags; + +static constexpr uint32_t KiB = uint32_t{1} << 10; +static constexpr uint32_t MiB = KiB << 10; +static constexpr uint64_t GiB = MiB << 10; + +DEFINE_uint32(threads, 16, "Number of concurrent threads to run."); +DEFINE_uint64(cache_size, 1 * GiB, + "Number of bytes to use as a cache of uncompressed data."); +DEFINE_uint32(num_shard_bits, 6, "shard_bits."); + +DEFINE_double(resident_ratio, 0.25, + "Ratio of keys fitting in cache to keyspace."); +DEFINE_uint64(ops_per_thread, 2000000U, "Number of operations per thread."); +DEFINE_uint32(value_bytes, 8 * KiB, "Size of each value added."); + +DEFINE_uint32(skew, 5, "Degree of skew in key selection"); +DEFINE_bool(populate_cache, true, "Populate cache before operations"); + +DEFINE_uint32(lookup_insert_percent, 87, + "Ratio of lookup (+ insert on not found) to total workload " + "(expressed as a percentage)"); +DEFINE_uint32(insert_percent, 2, + "Ratio of insert to total workload (expressed as a percentage)"); +DEFINE_uint32(lookup_percent, 10, + "Ratio of lookup to total workload (expressed as a percentage)"); +DEFINE_uint32(erase_percent, 1, + "Ratio of erase to total workload (expressed as a percentage)"); +DEFINE_bool(gather_stats, false, + "Whether to periodically simulate gathering block cache stats, " + "using one more thread."); +DEFINE_uint32( + gather_stats_sleep_ms, 1000, + "How many milliseconds to sleep between each gathering of stats."); + +DEFINE_uint32(gather_stats_entries_per_lock, 256, + "For Cache::ApplyToAllEntries"); +DEFINE_bool(skewed, false, "If true, skew the key access distribution"); +#ifndef ROCKSDB_LITE +DEFINE_string(secondary_cache_uri, "", + "Full URI for creating a custom secondary cache object"); +static class std::shared_ptr secondary_cache; +#endif // ROCKSDB_LITE + +DEFINE_bool(use_clock_cache, false, ""); + +namespace ROCKSDB_NAMESPACE { + +class CacheBench; +namespace { +// State shared by all concurrent executions of the same benchmark. +class SharedState { + public: + explicit SharedState(CacheBench* cache_bench) + : cv_(&mu_), + num_initialized_(0), + start_(false), + num_done_(0), + cache_bench_(cache_bench) {} + + ~SharedState() {} + + port::Mutex* GetMutex() { return &mu_; } + + port::CondVar* GetCondVar() { return &cv_; } + + CacheBench* GetCacheBench() const { return cache_bench_; } + + void IncInitialized() { num_initialized_++; } + + void IncDone() { num_done_++; } + + bool AllInitialized() const { return num_initialized_ >= FLAGS_threads; } + + bool AllDone() const { return num_done_ >= FLAGS_threads; } + + void SetStart() { start_ = true; } + + bool Started() const { return start_; } + + private: + port::Mutex mu_; + port::CondVar cv_; + + uint64_t num_initialized_; + bool start_; + uint64_t num_done_; + + CacheBench* cache_bench_; +}; + +// Per-thread state for concurrent executions of the same benchmark. +struct ThreadState { + uint32_t tid; + Random64 rnd; + SharedState* shared; + HistogramImpl latency_ns_hist; + uint64_t duration_us = 0; + + ThreadState(uint32_t index, SharedState* _shared) + : tid(index), rnd(1000 + index), shared(_shared) {} +}; + +struct KeyGen { + char key_data[27]; + + Slice GetRand(Random64& rnd, uint64_t max_key, int max_log) { + uint64_t key = 0; + if (!FLAGS_skewed) { + uint64_t raw = rnd.Next(); + // Skew according to setting + for (uint32_t i = 0; i < FLAGS_skew; ++i) { + raw = std::min(raw, rnd.Next()); + } + key = FastRange64(raw, max_key); + } else { + key = rnd.Skewed(max_log); + if (key > max_key) { + key -= max_key; + } + } + // Variable size and alignment + size_t off = key % 8; + key_data[0] = char{42}; + EncodeFixed64(key_data + 1, key); + key_data[9] = char{11}; + EncodeFixed64(key_data + 10, key); + key_data[18] = char{4}; + EncodeFixed64(key_data + 19, key); + return Slice(&key_data[off], sizeof(key_data) - off); + } +}; + +char* createValue(Random64& rnd) { + char* rv = new char[FLAGS_value_bytes]; + // Fill with some filler data, and take some CPU time + for (uint32_t i = 0; i < FLAGS_value_bytes; i += 8) { + EncodeFixed64(rv + i, rnd.Next()); + } + return rv; +} + +// Callbacks for secondary cache +size_t SizeFn(void* /*obj*/) { return FLAGS_value_bytes; } + +Status SaveToFn(void* obj, size_t /*offset*/, size_t size, void* out) { + memcpy(out, obj, size); + return Status::OK(); +} + +// Different deleters to simulate using deleter to gather +// stats on the code origin and kind of cache entries. +void deleter1(const Slice& /*key*/, void* value) { + delete[] static_cast(value); +} +void deleter2(const Slice& /*key*/, void* value) { + delete[] static_cast(value); +} +void deleter3(const Slice& /*key*/, void* value) { + delete[] static_cast(value); +} + +Cache::CacheItemHelper helper1(SizeFn, SaveToFn, deleter1); +Cache::CacheItemHelper helper2(SizeFn, SaveToFn, deleter2); +Cache::CacheItemHelper helper3(SizeFn, SaveToFn, deleter3); +} // namespace + +class CacheBench { + static constexpr uint64_t kHundredthUint64 = + std::numeric_limits::max() / 100U; + + public: + CacheBench() + : max_key_(static_cast(FLAGS_cache_size / FLAGS_resident_ratio / + FLAGS_value_bytes)), + lookup_insert_threshold_(kHundredthUint64 * + FLAGS_lookup_insert_percent), + insert_threshold_(lookup_insert_threshold_ + + kHundredthUint64 * FLAGS_insert_percent), + lookup_threshold_(insert_threshold_ + + kHundredthUint64 * FLAGS_lookup_percent), + erase_threshold_(lookup_threshold_ + + kHundredthUint64 * FLAGS_erase_percent), + skewed_(FLAGS_skewed) { + if (erase_threshold_ != 100U * kHundredthUint64) { + fprintf(stderr, "Percentages must add to 100.\n"); + exit(1); + } + + max_log_ = 0; + if (skewed_) { + uint64_t max_key = max_key_; + while (max_key >>= 1) max_log_++; + if (max_key > (1u << max_log_)) max_log_++; + } + + if (FLAGS_use_clock_cache) { + cache_ = NewClockCache(FLAGS_cache_size, FLAGS_num_shard_bits); + if (!cache_) { + fprintf(stderr, "Clock cache not supported.\n"); + exit(1); + } + } else { + LRUCacheOptions opts(FLAGS_cache_size, FLAGS_num_shard_bits, false, 0.5); +#ifndef ROCKSDB_LITE + if (!FLAGS_secondary_cache_uri.empty()) { + Status s = + ObjectRegistry::NewInstance()->NewSharedObject( + FLAGS_secondary_cache_uri, &secondary_cache); + if (secondary_cache == nullptr) { + fprintf( + stderr, + "No secondary cache registered matching string: %s status=%s\n", + FLAGS_secondary_cache_uri.c_str(), s.ToString().c_str()); + exit(1); + } + opts.secondary_cache = secondary_cache; + } +#endif // ROCKSDB_LITE + + cache_ = NewLRUCache(opts); + } + } + + ~CacheBench() {} + + void PopulateCache() { + Random64 rnd(1); + KeyGen keygen; + for (uint64_t i = 0; i < 2 * FLAGS_cache_size; i += FLAGS_value_bytes) { + cache_->Insert(keygen.GetRand(rnd, max_key_, max_log_), createValue(rnd), + &helper1, FLAGS_value_bytes); + } + } + + bool Run() { + const auto clock = SystemClock::Default().get(); + + PrintEnv(); + SharedState shared(this); + std::vector > threads(FLAGS_threads); + for (uint32_t i = 0; i < FLAGS_threads; i++) { + threads[i].reset(new ThreadState(i, &shared)); + std::thread(ThreadBody, threads[i].get()).detach(); + } + + HistogramImpl stats_hist; + std::string stats_report; + std::thread stats_thread(StatsBody, &shared, &stats_hist, &stats_report); + + uint64_t start_time; + { + MutexLock l(shared.GetMutex()); + while (!shared.AllInitialized()) { + shared.GetCondVar()->Wait(); + } + // Record start time + start_time = clock->NowMicros(); + + // Start all threads + shared.SetStart(); + shared.GetCondVar()->SignalAll(); + + // Wait threads to complete + while (!shared.AllDone()) { + shared.GetCondVar()->Wait(); + } + } + + // Stats gathering is considered background work. This time measurement + // is for foreground work, and not really ideal for that. See below. + uint64_t end_time = clock->NowMicros(); + stats_thread.join(); + + // Wall clock time - includes idle time if threads + // finish at different times (not ideal). + double elapsed_secs = static_cast(end_time - start_time) * 1e-6; + uint32_t ops_per_sec = static_cast( + 1.0 * FLAGS_threads * FLAGS_ops_per_thread / elapsed_secs); + printf("Complete in %.3f s; Rough parallel ops/sec = %u\n", elapsed_secs, + ops_per_sec); + + // Total time in each thread (more accurate throughput measure) + elapsed_secs = 0; + for (uint32_t i = 0; i < FLAGS_threads; i++) { + elapsed_secs += threads[i]->duration_us * 1e-6; + } + ops_per_sec = static_cast(1.0 * FLAGS_threads * + FLAGS_ops_per_thread / elapsed_secs); + printf("Thread ops/sec = %u\n", ops_per_sec); + + printf("\nOperation latency (ns):\n"); + HistogramImpl combined; + for (uint32_t i = 0; i < FLAGS_threads; i++) { + combined.Merge(threads[i]->latency_ns_hist); + } + printf("%s", combined.ToString().c_str()); + + if (FLAGS_gather_stats) { + printf("\nGather stats latency (us):\n"); + printf("%s", stats_hist.ToString().c_str()); + } + + printf("\n%s", stats_report.c_str()); + + return true; + } + + private: + std::shared_ptr cache_; + const uint64_t max_key_; + // Cumulative thresholds in the space of a random uint64_t + const uint64_t lookup_insert_threshold_; + const uint64_t insert_threshold_; + const uint64_t lookup_threshold_; + const uint64_t erase_threshold_; + const bool skewed_; + int max_log_; + + // A benchmark version of gathering stats on an active block cache by + // iterating over it. The primary purpose is to measure the impact of + // gathering stats with ApplyToAllEntries on throughput- and + // latency-sensitive Cache users. Performance of stats gathering is + // also reported. The last set of gathered stats is also reported, for + // manual sanity checking for logical errors or other unexpected + // behavior of cache_bench or the underlying Cache. + static void StatsBody(SharedState* shared, HistogramImpl* stats_hist, + std::string* stats_report) { + if (!FLAGS_gather_stats) { + return; + } + const auto clock = SystemClock::Default().get(); + uint64_t total_key_size = 0; + uint64_t total_charge = 0; + uint64_t total_entry_count = 0; + std::set deleters; + StopWatchNano timer(clock); + + for (;;) { + uint64_t time; + time = clock->NowMicros(); + uint64_t deadline = time + uint64_t{FLAGS_gather_stats_sleep_ms} * 1000; + + { + MutexLock l(shared->GetMutex()); + for (;;) { + if (shared->AllDone()) { + std::ostringstream ostr; + ostr << "Most recent cache entry stats:\n" + << "Number of entries: " << total_entry_count << "\n" + << "Total charge: " << BytesToHumanString(total_charge) << "\n" + << "Average key size: " + << (1.0 * total_key_size / total_entry_count) << "\n" + << "Average charge: " + << BytesToHumanString(1.0 * total_charge / total_entry_count) + << "\n" + << "Unique deleters: " << deleters.size() << "\n"; + *stats_report = ostr.str(); + return; + } + if (clock->NowMicros() >= deadline) { + break; + } + uint64_t diff = deadline - std::min(clock->NowMicros(), deadline); + shared->GetCondVar()->TimedWait(diff + 1); + } + } + + // Now gather stats, outside of mutex + total_key_size = 0; + total_charge = 0; + total_entry_count = 0; + deleters.clear(); + auto fn = [&](const Slice& key, void* /*value*/, size_t charge, + Cache::DeleterFn deleter) { + total_key_size += key.size(); + total_charge += charge; + ++total_entry_count; + // Something slightly more expensive as in (future) stats by category + deleters.insert(deleter); + }; + timer.Start(); + Cache::ApplyToAllEntriesOptions opts; + opts.average_entries_per_lock = FLAGS_gather_stats_entries_per_lock; + shared->GetCacheBench()->cache_->ApplyToAllEntries(fn, opts); + stats_hist->Add(timer.ElapsedNanos() / 1000); + } + } + + static void ThreadBody(ThreadState* thread) { + SharedState* shared = thread->shared; + + { + MutexLock l(shared->GetMutex()); + shared->IncInitialized(); + if (shared->AllInitialized()) { + shared->GetCondVar()->SignalAll(); + } + while (!shared->Started()) { + shared->GetCondVar()->Wait(); + } + } + thread->shared->GetCacheBench()->OperateCache(thread); + + { + MutexLock l(shared->GetMutex()); + shared->IncDone(); + if (shared->AllDone()) { + shared->GetCondVar()->SignalAll(); + } + } + } + + void OperateCache(ThreadState* thread) { + // To use looked-up values + uint64_t result = 0; + // To hold handles for a non-trivial amount of time + Cache::Handle* handle = nullptr; + KeyGen gen; + const auto clock = SystemClock::Default().get(); + uint64_t start_time = clock->NowMicros(); + StopWatchNano timer(clock); + + for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) { + timer.Start(); + Slice key = gen.GetRand(thread->rnd, max_key_, max_log_); + uint64_t random_op = thread->rnd.Next(); + Cache::CreateCallback create_cb = + [](void* buf, size_t size, void** out_obj, size_t* charge) -> Status { + *out_obj = reinterpret_cast(new char[size]); + memcpy(*out_obj, buf, size); + *charge = size; + return Status::OK(); + }; + + if (random_op < lookup_insert_threshold_) { + if (handle) { + cache_->Release(handle); + handle = nullptr; + } + // do lookup + handle = cache_->Lookup(key, &helper2, create_cb, Cache::Priority::LOW, + true); + if (handle) { + // do something with the data + result += NPHash64(static_cast(cache_->Value(handle)), + FLAGS_value_bytes); + } else { + // do insert + cache_->Insert(key, createValue(thread->rnd), &helper2, + FLAGS_value_bytes, &handle); + } + } else if (random_op < insert_threshold_) { + if (handle) { + cache_->Release(handle); + handle = nullptr; + } + // do insert + cache_->Insert(key, createValue(thread->rnd), &helper3, + FLAGS_value_bytes, &handle); + } else if (random_op < lookup_threshold_) { + if (handle) { + cache_->Release(handle); + handle = nullptr; + } + // do lookup + handle = cache_->Lookup(key, &helper2, create_cb, Cache::Priority::LOW, + true); + if (handle) { + // do something with the data + result += NPHash64(static_cast(cache_->Value(handle)), + FLAGS_value_bytes); + } + } else if (random_op < erase_threshold_) { + // do erase + cache_->Erase(key); + } else { + // Should be extremely unlikely (noop) + assert(random_op >= kHundredthUint64 * 100U); + } + thread->latency_ns_hist.Add(timer.ElapsedNanos()); + } + if (handle) { + cache_->Release(handle); + handle = nullptr; + } + // Ensure computations on `result` are not optimized away. + if (result == 1) { + printf("You are extremely unlucky(2). Try again.\n"); + exit(1); + } + thread->duration_us = clock->NowMicros() - start_time; + } + + void PrintEnv() const { + printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); + printf("Number of threads : %u\n", FLAGS_threads); + printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread); + printf("Cache size : %s\n", + BytesToHumanString(FLAGS_cache_size).c_str()); + printf("Num shard bits : %u\n", FLAGS_num_shard_bits); + printf("Max key : %" PRIu64 "\n", max_key_); + printf("Resident ratio : %g\n", FLAGS_resident_ratio); + printf("Skew degree : %u\n", FLAGS_skew); + printf("Populate cache : %d\n", int{FLAGS_populate_cache}); + printf("Lookup+Insert pct : %u%%\n", FLAGS_lookup_insert_percent); + printf("Insert percentage : %u%%\n", FLAGS_insert_percent); + printf("Lookup percentage : %u%%\n", FLAGS_lookup_percent); + printf("Erase percentage : %u%%\n", FLAGS_erase_percent); + std::ostringstream stats; + if (FLAGS_gather_stats) { + stats << "enabled (" << FLAGS_gather_stats_sleep_ms << "ms, " + << FLAGS_gather_stats_entries_per_lock << "/lock)"; + } else { + stats << "disabled"; + } + printf("Gather stats : %s\n", stats.str().c_str()); + printf("----------------------------\n"); + } +}; + +int cache_bench_tool(int argc, char** argv) { + ParseCommandLineFlags(&argc, &argv, true); + + if (FLAGS_threads <= 0) { + fprintf(stderr, "threads number <= 0\n"); + exit(1); + } + + ROCKSDB_NAMESPACE::CacheBench bench; + if (FLAGS_populate_cache) { + bench.PopulateCache(); + printf("Population complete\n"); + printf("----------------------------\n"); + } + if (bench.Run()) { + return 0; + } else { + return 1; + } +} // namespace ROCKSDB_NAMESPACE +} // namespace ROCKSDB_NAMESPACE + +#endif // GFLAGS diff --git a/include/rocksdb/cache_bench_tool.h b/include/rocksdb/cache_bench_tool.h new file mode 100644 index 000000000..413ce1593 --- /dev/null +++ b/include/rocksdb/cache_bench_tool.h @@ -0,0 +1,14 @@ +// Copyright (c) 2013-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/status.h" +#include "rocksdb/types.h" + +namespace ROCKSDB_NAMESPACE { + +int cache_bench_tool(int argc, char** argv); +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/secondary_cache.h b/include/rocksdb/secondary_cache.h index 740a3c435..de5d3d043 100644 --- a/include/rocksdb/secondary_cache.h +++ b/include/rocksdb/secondary_cache.h @@ -48,6 +48,8 @@ class SecondaryCache { virtual std::string Name() = 0; + static const std::string Type() { return "SecondaryCache"; } + // Insert the given value into this cache. The value is not written // directly. Rather, the SaveToCallback provided by helper_cb will be // used to extract the persistable data in value, which will be written diff --git a/src.mk b/src.mk index 7077ccc4e..06afa3c46 100644 --- a/src.mk +++ b/src.mk @@ -320,6 +320,9 @@ BENCH_LIB_SOURCES = \ tools/db_bench_tool.cc \ tools/simulated_hybrid_file_system.cc \ +CACHE_BENCH_LIB_SOURCES = \ + cache/cache_bench_tool.cc \ + STRESS_LIB_SOURCES = \ db_stress_tool/batched_ops_stress.cc \ db_stress_tool/cf_consistency_stress.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 145add864..e112c87da 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -53,11 +53,13 @@ #include "rocksdb/perf_context.h" #include "rocksdb/persistent_cache.h" #include "rocksdb/rate_limiter.h" +#include "rocksdb/secondary_cache.h" #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "rocksdb/stats_history.h" #include "rocksdb/utilities/object_registry.h" #include "rocksdb/utilities/optimistic_transaction_db.h" +#include "rocksdb/utilities/options_type.h" #include "rocksdb/utilities/options_util.h" #include "rocksdb/utilities/sim_cache.h" #include "rocksdb/utilities/transaction.h" @@ -1417,6 +1419,12 @@ DEFINE_bool(read_with_latest_user_timestamp, true, "If true, always use the current latest timestamp for read. If " "false, choose a random timestamp from the past."); +#ifndef ROCKSDB_LITE +DEFINE_string(secondary_cache_uri, "", + "Full URI for creating a custom secondary cache object"); +static class std::shared_ptr secondary_cache; +#endif // ROCKSDB_LITE + static const bool FLAGS_soft_rate_limit_dummy __attribute__((__unused__)) = RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit); @@ -2778,22 +2786,39 @@ class Benchmark { } return cache; } else { - if (FLAGS_use_cache_memkind_kmem_allocator) { + LRUCacheOptions opts( + static_cast(capacity), FLAGS_cache_numshardbits, + false /*strict_capacity_limit*/, FLAGS_cache_high_pri_pool_ratio, #ifdef MEMKIND - return NewLRUCache( - static_cast(capacity), FLAGS_cache_numshardbits, - false /*strict_capacity_limit*/, FLAGS_cache_high_pri_pool_ratio, - std::make_shared()); - + FLAGS_use_cache_memkind_kmem_allocator + ? std::make_shared() + : nullptr #else + nullptr +#endif + ); + if (FLAGS_use_cache_memkind_kmem_allocator) { +#ifndef MEMKIND fprintf(stderr, "Memkind library is not linked with the binary."); exit(1); #endif - } else { - return NewLRUCache( - static_cast(capacity), FLAGS_cache_numshardbits, - false /*strict_capacity_limit*/, FLAGS_cache_high_pri_pool_ratio); } +#ifndef ROCKSDB_LITE + if (!FLAGS_secondary_cache_uri.empty()) { + Status s = + ObjectRegistry::NewInstance()->NewSharedObject( + FLAGS_secondary_cache_uri, &secondary_cache); + if (secondary_cache == nullptr) { + fprintf( + stderr, + "No secondary cache registered matching string: %s status=%s\n", + FLAGS_secondary_cache_uri.c_str(), s.ToString().c_str()); + exit(1); + } + opts.secondary_cache = secondary_cache; + } +#endif // ROCKSDB_LITE + return NewLRUCache(opts); } }