From 1aac81457846ad2bb533859ad6a95df798c6cde4 Mon Sep 17 00:00:00 2001 From: Peter Dillinger Date: Fri, 17 Jun 2022 13:08:45 -0700 Subject: [PATCH] Use optimized folly DistributedMutex in LRUCache when available (#10179) Summary: folly DistributedMutex is faster than standard mutexes though imposes some static obligations on usage. See https://github.com/facebook/folly/blob/main/folly/synchronization/DistributedMutex.h for details. Here we use this alternative for our Cache implementations (especially LRUCache) for better locking performance, when RocksDB is compiled with folly. Also added information about which distributed mutex implementation is being used to cache_bench output and to DB LOG. Intended follow-up: * Use DMutex in more places, perhaps improving API to support non-scoped locking * Fix linking with fbcode compiler (needs ROCKSDB_NO_FBCODE=1 currently) Credit: Thanks Siying for reminding me about this line of work that was previously left unfinished. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10179 Test Plan: for correctness, existing tests. CircleCI config updated. Also Meta-internal buck build updated. For performance, ran simultaneous before & after cache_bench. Out of three comparison runs, the middle improvement to ops/sec was +21%: Baseline: USE_CLANG=1 DEBUG_LEVEL=0 make -j24 cache_bench (fbcode compiler) ``` Complete in 20.201 s; Rough parallel ops/sec = 1584062 Thread ops/sec = 107176 Operation latency (ns): Count: 32000000 Average: 9257.9421 StdDev: 122412.04 Min: 134 Median: 3623.0493 Max: 56918500 Percentiles: P50: 3623.05 P75: 10288.02 P99: 30219.35 P99.9: 683522.04 P99.99: 7302791.63 ``` New: (add USE_FOLLY=1) ``` Complete in 16.674 s; Rough parallel ops/sec = 1919135 (+21%) Thread ops/sec = 135487 Operation latency (ns): Count: 32000000 Average: 7304.9294 StdDev: 108530.28 Min: 132 Median: 3777.6012 Max: 91030902 Percentiles: P50: 3777.60 P75: 10169.89 P99: 24504.51 P99.9: 59721.59 P99.99: 1861151.83 ``` Reviewed By: anand1976 Differential Revision: D37182983 Pulled By: pdillinger fbshipit-source-id: a17eb05f25b832b6a2c1356f5c657e831a5af8d1 --- .circleci/config.yml | 16 +++++++++--- CMakeLists.txt | 7 +++++- HISTORY.md | 3 +++ Makefile | 8 +++++- TARGETS | 2 ++ buckifier/buckify_rocksdb.py | 6 +++-- cache/cache_bench_tool.cc | 9 +++++++ cache/clock_cache.cc | 16 ++++++------ cache/fast_lru_cache.cc | 24 +++++++++--------- cache/fast_lru_cache.h | 3 ++- cache/lru_cache.cc | 38 ++++++++++++++-------------- cache/lru_cache.h | 3 ++- db/db_impl/db_impl.cc | 3 +++ port/port_posix.h | 7 ++++++ port/win/port_win.h | 14 ++++++++--- src.mk | 4 +++ util/distributed_mutex.h | 48 ++++++++++++++++++++++++++++++++++++ 17 files changed, 160 insertions(+), 51 deletions(-) create mode 100644 util/distributed_mutex.h diff --git a/.circleci/config.yml b/.circleci/config.yml index 80ae91a11..c7c3ff056 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -168,6 +168,16 @@ commands: command: | echo "export PKG_CONFIG_PATH=/usr/local/OFF/:~/libprotobuf-mutator/build/external.protobuf/lib/pkgconfig/" >> $BASH_ENV echo "export PROTOC_BIN=~/libprotobuf-mutator/build/external.protobuf/bin/protoc" >> $BASH_ENV + setup-folly: + steps: + - run: + name: Install folly dependencies + command: | + sudo apt-get install libgoogle-glog-dev + - run: + name: Checkout folly sources + command: | + make checkout_folly build-for-benchmarks: steps: @@ -442,7 +452,7 @@ jobs: - pre-steps - install-gflags - upgrade-cmake - - run: make checkout_folly + - setup-folly - run: (mkdir build && cd build && cmake -DUSE_FOLLY=1 -DWITH_GFLAGS=1 .. && make V=1 -j20 && ctest -j20) - post-steps @@ -477,7 +487,7 @@ jobs: steps: - pre-steps - run: sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test && sudo apt-get update -y && sudo apt-get install gcc-7 g++-7 libgflags-dev - - run: make checkout_folly + - setup-folly - run: USE_FOLLY=1 CC=gcc-7 CXX=g++-7 V=1 make -j32 check - post-steps @@ -532,7 +542,7 @@ jobs: - pre-steps - install-clang-13 - install-gflags - - run: make checkout_folly + - setup-folly - run: CC=clang-13 CXX=clang++-13 USE_CLANG=1 USE_FOLLY=1 COMPILE_WITH_UBSAN=1 COMPILE_WITH_ASAN=1 make -j32 check - post-steps diff --git a/CMakeLists.txt b/CMakeLists.txt index 91fbade1e..b5fed942d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -587,6 +587,7 @@ include_directories(${PROJECT_SOURCE_DIR}/include) if(USE_FOLLY) include_directories(${PROJECT_SOURCE_DIR}/third-party/folly) add_definitions(-DUSE_FOLLY -DFOLLY_NO_CONFIG) + list(APPEND THIRDPARTY_LIBS glog) endif() find_package(Threads REQUIRED) @@ -975,9 +976,13 @@ endif() if(USE_FOLLY) list(APPEND SOURCES third-party/folly/folly/container/detail/F14Table.cpp + third-party/folly/folly/detail/Futex.cpp third-party/folly/folly/lang/SafeAssert.cpp third-party/folly/folly/lang/ToAscii.cpp - third-party/folly/folly/ScopeGuard.cpp) + third-party/folly/folly/ScopeGuard.cpp + third-party/folly/folly/synchronization/AtomicNotification.cpp + third-party/folly/folly/synchronization/DistributedMutex.cpp + third-party/folly/folly/synchronization/ParkingLot.cpp) endif() set(ROCKSDB_STATIC_LIB rocksdb${ARTIFACT_SUFFIX}) diff --git a/HISTORY.md b/HISTORY.md index 6405ec8a5..b5cb8cd25 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -42,6 +42,9 @@ * Removed support for reading Bloom filters using obsolete block-based filter format. (Support for writing such filters was dropped in 7.0.) For good read performance on old DBs using these filters, a full compaction is required. * Per KV checksum in write batch is verified before a write batch is written to WAL to detect any corruption to the write batch (#10114). +### Performance Improvements +* When compiled with folly (Meta-internal integration; experimental in open source build), improve the locking performance (CPU efficiency) of LRUCache by using folly DistributedMutex in place of standard mutex. + ## 7.3.0 (05/20/2022) ### Bug Fixes * Fixed a bug where manual flush would block forever even though flush options had wait=false. diff --git a/Makefile b/Makefile index 187abe6a1..278ad8c29 100644 --- a/Makefile +++ b/Makefile @@ -461,6 +461,8 @@ ifeq ($(USE_FOLLY),1) endif PLATFORM_CCFLAGS += -DUSE_FOLLY -DFOLLY_NO_CONFIG PLATFORM_CXXFLAGS += -DUSE_FOLLY -DFOLLY_NO_CONFIG +# TODO: fix linking with fbcode compiler config + PLATFORM_LDFLAGS += -lglog endif ifdef TEST_CACHE_LINE_SIZE @@ -2354,10 +2356,14 @@ checkout_folly: fi @# Pin to a particular version for public CI, so that PR authors don't @# need to worry about folly breaking our integration. Update periodically - cd third-party/folly && git reset --hard 98b9b2c1124e99f50f9085ddee74ce32afffc665 + cd third-party/folly && git reset --hard beacd86d63cd71c904632262e6c36f60874d78ba @# A hack to remove boost dependency. @# NOTE: this hack is not needed if using FBCODE compiler config perl -pi -e 's/^(#include .)/__cpp_rtti && $$1/' third-party/folly/folly/memory/MemoryResource.h # --------------------------------------------------------------------------- # Build size testing diff --git a/TARGETS b/TARGETS index d19f5eb6d..a6556d901 100644 --- a/TARGETS +++ b/TARGETS @@ -334,6 +334,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "//folly/experimental/coro:collect", "//folly/experimental/coro:coroutine", "//folly/experimental/coro:task", + "//folly/synchronization:distributed_mutex", ], headers=None, link_whole=False, extra_test_libs=False) cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ @@ -662,6 +663,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "//folly/experimental/coro:collect", "//folly/experimental/coro:coroutine", "//folly/experimental/coro:task", + "//folly/synchronization:distributed_mutex", ], headers=None, link_whole=True, extra_test_libs=False) cpp_library_wrapper(name="rocksdb_test_lib", srcs=[ diff --git a/buckifier/buckify_rocksdb.py b/buckifier/buckify_rocksdb.py index f285d49d1..46514146d 100755 --- a/buckifier/buckify_rocksdb.py +++ b/buckifier/buckify_rocksdb.py @@ -150,7 +150,8 @@ def generate_targets(repo_path, deps_map): "//folly/experimental/coro:blocking_wait", "//folly/experimental/coro:collect", "//folly/experimental/coro:coroutine", - "//folly/experimental/coro:task"]) + "//folly/experimental/coro:task", + "//folly/synchronization:distributed_mutex"]) # rocksdb_whole_archive_lib TARGETS.add_library( "rocksdb_whole_archive_lib", @@ -163,7 +164,8 @@ def generate_targets(repo_path, deps_map): "//folly/experimental/coro:blocking_wait", "//folly/experimental/coro:collect", "//folly/experimental/coro:coroutine", - "//folly/experimental/coro:task"], + "//folly/experimental/coro:task", + "//folly/synchronization:distributed_mutex"], headers=None, extra_external_deps="", link_whole=True) diff --git a/cache/cache_bench_tool.cc b/cache/cache_bench_tool.cc index 504f8f77f..2d5c35340 100644 --- a/cache/cache_bench_tool.cc +++ b/cache/cache_bench_tool.cc @@ -26,6 +26,7 @@ #include "table/block_based/block_based_table_reader.h" #include "table/block_based/cachable_entry.h" #include "util/coding.h" +#include "util/distributed_mutex.h" #include "util/gflags_compat.h" #include "util/hash.h" #include "util/mutexlock.h" @@ -587,7 +588,15 @@ class CacheBench { } void PrintEnv() const { +#if defined(__GNUC__) && !defined(__OPTIMIZE__) + printf( + "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"); +#endif +#ifndef NDEBUG + printf("WARNING: Assertions are enabled; benchmarks unnecessarily slow\n"); +#endif printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); + printf("DMutex impl name : %s\n", DMutex::kName()); printf("Number of threads : %u\n", FLAGS_threads); printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread); printf("Cache size : %s\n", diff --git a/cache/clock_cache.cc b/cache/clock_cache.cc index 76bf37ada..20b9f0a63 100644 --- a/cache/clock_cache.cc +++ b/cache/clock_cache.cc @@ -39,7 +39,7 @@ std::shared_ptr NewClockCache( #include "port/port.h" #include "tbb/concurrent_hash_map.h" #include "util/autovector.h" -#include "util/mutexlock.h" +#include "util/distributed_mutex.h" namespace ROCKSDB_NAMESPACE { @@ -368,7 +368,7 @@ class ClockCacheShard final : public CacheShard { // Guards list_, head_, and recycle_. In addition, updating table_ also has // to hold the mutex, to avoid the cache being in inconsistent state. - mutable port::Mutex mutex_; + mutable DMutex mutex_; // The circular list of cache handles. Initially the list is empty. Once a // handle is needed by insertion, and no more handles are available in @@ -431,7 +431,7 @@ void ClockCacheShard::ApplyToSomeEntries( DeleterFn deleter)>& callback, uint32_t average_entries_per_lock, uint32_t* state) { assert(average_entries_per_lock > 0); - MutexLock lock(&mutex_); + DMutexLock l(mutex_); // Figure out the range to iterate, update `state` size_t list_size = list_.size(); @@ -532,7 +532,7 @@ bool ClockCacheShard::Unref(CacheHandle* handle, bool set_usage, pinned_usage_.fetch_sub(total_charge, std::memory_order_relaxed); // Cleanup if it is the last reference. if (!InCache(flags)) { - MutexLock l(&mutex_); + DMutexLock l(mutex_); RecycleHandle(handle, context); } } @@ -598,7 +598,7 @@ bool ClockCacheShard::EvictFromCache(size_t charge, CleanupContext* context) { void ClockCacheShard::SetCapacity(size_t capacity) { CleanupContext context; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); capacity_.store(capacity, std::memory_order_relaxed); EvictFromCache(0, &context); } @@ -618,7 +618,7 @@ CacheHandle* ClockCacheShard::Insert( uint32_t meta_charge = CacheHandle::CalcMetadataCharge(key, metadata_charge_policy_); size_t total_charge = charge + meta_charge; - MutexLock l(&mutex_); + DMutexLock l(mutex_); bool success = EvictFromCache(total_charge, context); bool strict = strict_capacity_limit_.load(std::memory_order_relaxed); if (!success && (strict || !hold_reference)) { @@ -744,7 +744,7 @@ void ClockCacheShard::Erase(const Slice& key, uint32_t hash) { bool ClockCacheShard::EraseAndConfirm(const Slice& key, uint32_t hash, CleanupContext* context) { - MutexLock l(&mutex_); + DMutexLock l(mutex_); HashTable::accessor accessor; bool erased = false; if (table_.find(accessor, ClockCacheKey(key, hash))) { @@ -758,7 +758,7 @@ bool ClockCacheShard::EraseAndConfirm(const Slice& key, uint32_t hash, void ClockCacheShard::EraseUnRefEntries() { CleanupContext context; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); table_.clear(); for (auto& handle : list_) { UnsetInCache(&handle, &context); diff --git a/cache/fast_lru_cache.cc b/cache/fast_lru_cache.cc index ba4c0e1e1..aaeb24ad8 100644 --- a/cache/fast_lru_cache.cc +++ b/cache/fast_lru_cache.cc @@ -16,7 +16,7 @@ #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" #include "port/lang.h" -#include "util/mutexlock.h" +#include "util/distributed_mutex.h" #define KEY_LENGTH \ 16 // TODO(guido) Make use of this symbol in other parts of the source code @@ -93,7 +93,7 @@ LRUCacheShard::LRUCacheShard(size_t capacity, size_t estimated_value_size, void LRUCacheShard::EraseUnRefEntries() { autovector last_reference_list; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); while (lru_.next != &lru_) { LRUHandle* old = lru_.next; // LRU list contains only elements which can be evicted. @@ -120,7 +120,7 @@ void LRUCacheShard::ApplyToSomeEntries( // The state is essentially going to be the starting hash, which works // nicely even if we resize between calls because we use upper-most // hash bits for table indexes. - MutexLock l(&mutex_); + DMutexLock l(mutex_); uint32_t length_bits = table_.GetLengthBits(); uint32_t length = uint32_t{1} << length_bits; @@ -208,7 +208,7 @@ int LRUCacheShard::GetHashBits( void LRUCacheShard::SetCapacity(size_t capacity) { autovector last_reference_list; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); capacity_ = capacity; EvictFromLRU(0, &last_reference_list); } @@ -220,7 +220,7 @@ void LRUCacheShard::SetCapacity(size_t capacity) { } void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { - MutexLock l(&mutex_); + DMutexLock l(mutex_); strict_capacity_limit_ = strict_capacity_limit; } @@ -229,7 +229,7 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle, Status s = Status::OK(); autovector last_reference_list; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); // Free the space following strict LRU policy until enough space // is freed or the lru list is empty. @@ -289,7 +289,7 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle, Cache::Handle* LRUCacheShard::Lookup(const Slice& key, uint32_t hash) { LRUHandle* e = nullptr; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); e = table_.Lookup(key, hash); if (e != nullptr) { assert(e->InCache()); @@ -305,7 +305,7 @@ Cache::Handle* LRUCacheShard::Lookup(const Slice& key, uint32_t hash) { bool LRUCacheShard::Ref(Cache::Handle* h) { LRUHandle* e = reinterpret_cast(h); - MutexLock l(&mutex_); + DMutexLock l(mutex_); // To create another reference - entry must be already externally referenced. assert(e->HasRefs()); e->Ref(); @@ -319,7 +319,7 @@ bool LRUCacheShard::Release(Cache::Handle* handle, bool erase_if_last_ref) { LRUHandle* e = reinterpret_cast(handle); bool last_reference = false; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); last_reference = e->Unref(); if (last_reference && e->InCache()) { // The item is still in cache, and nobody else holds a reference to it. @@ -382,7 +382,7 @@ void LRUCacheShard::Erase(const Slice& key, uint32_t hash) { LRUHandle* e; bool last_reference = false; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); e = table_.Remove(key, hash); if (e != nullptr) { assert(e->InCache()); @@ -405,12 +405,12 @@ void LRUCacheShard::Erase(const Slice& key, uint32_t hash) { } size_t LRUCacheShard::GetUsage() const { - MutexLock l(&mutex_); + DMutexLock l(mutex_); return usage_; } size_t LRUCacheShard::GetPinnedUsage() const { - MutexLock l(&mutex_); + DMutexLock l(mutex_); assert(usage_ >= lru_usage_); return usage_ - lru_usage_; } diff --git a/cache/fast_lru_cache.h b/cache/fast_lru_cache.h index 00e2de9c7..7810af918 100644 --- a/cache/fast_lru_cache.h +++ b/cache/fast_lru_cache.h @@ -17,6 +17,7 @@ #include "port/port.h" #include "rocksdb/secondary_cache.h" #include "util/autovector.h" +#include "util/distributed_mutex.h" namespace ROCKSDB_NAMESPACE { namespace fast_lru_cache { @@ -273,7 +274,7 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { // mutex_ protects the following state. // We don't count mutex_ as the cache's internal state so semantically we // don't mind mutex_ invoking the non-const actions. - mutable port::Mutex mutex_; + mutable DMutex mutex_; }; class LRUCache diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index eb978f4f0..c3f50f7b9 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -16,7 +16,7 @@ #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" #include "port/lang.h" -#include "util/mutexlock.h" +#include "util/distributed_mutex.h" namespace ROCKSDB_NAMESPACE { namespace lru_cache { @@ -135,7 +135,7 @@ LRUCacheShard::LRUCacheShard( void LRUCacheShard::EraseUnRefEntries() { autovector last_reference_list; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); while (lru_.next != &lru_) { LRUHandle* old = lru_.next; // LRU list contains only elements which can be evicted. @@ -161,7 +161,7 @@ void LRUCacheShard::ApplyToSomeEntries( // The state is essentially going to be the starting hash, which works // nicely even if we resize between calls because we use upper-most // hash bits for table indexes. - MutexLock l(&mutex_); + DMutexLock l(mutex_); uint32_t length_bits = table_.GetLengthBits(); uint32_t length = uint32_t{1} << length_bits; @@ -193,13 +193,13 @@ void LRUCacheShard::ApplyToSomeEntries( } void LRUCacheShard::TEST_GetLRUList(LRUHandle** lru, LRUHandle** lru_low_pri) { - MutexLock l(&mutex_); + DMutexLock l(mutex_); *lru = &lru_; *lru_low_pri = lru_low_pri_; } size_t LRUCacheShard::TEST_GetLRUSize() { - MutexLock l(&mutex_); + DMutexLock l(mutex_); LRUHandle* lru_handle = lru_.next; size_t lru_size = 0; while (lru_handle != &lru_) { @@ -210,7 +210,7 @@ size_t LRUCacheShard::TEST_GetLRUSize() { } double LRUCacheShard::GetHighPriPoolRatio() { - MutexLock l(&mutex_); + DMutexLock l(mutex_); return high_pri_pool_ratio_; } @@ -285,7 +285,7 @@ void LRUCacheShard::EvictFromLRU(size_t charge, void LRUCacheShard::SetCapacity(size_t capacity) { autovector last_reference_list; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); capacity_ = capacity; high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_; EvictFromLRU(0, &last_reference_list); @@ -304,7 +304,7 @@ void LRUCacheShard::SetCapacity(size_t capacity) { } void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { - MutexLock l(&mutex_); + DMutexLock l(mutex_); strict_capacity_limit_ = strict_capacity_limit; } @@ -314,7 +314,7 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle, autovector last_reference_list; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); // Free the space following strict LRU policy until enough space // is freed or the lru list is empty. @@ -402,7 +402,7 @@ void LRUCacheShard::Promote(LRUHandle* e) { } else { // Since the secondary cache lookup failed, mark the item as not in cache // Don't charge the cache as its only metadata that'll shortly be released - MutexLock l(&mutex_); + DMutexLock l(mutex_); // TODO e->CalcTotalCharge(0, metadata_charge_policy_); e->SetInCache(false); @@ -416,7 +416,7 @@ Cache::Handle* LRUCacheShard::Lookup( bool wait, Statistics* stats) { LRUHandle* e = nullptr; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); e = table_.Lookup(key, hash); if (e != nullptr) { assert(e->InCache()); @@ -489,7 +489,7 @@ Cache::Handle* LRUCacheShard::Lookup( bool LRUCacheShard::Ref(Cache::Handle* h) { LRUHandle* e = reinterpret_cast(h); - MutexLock l(&mutex_); + DMutexLock l(mutex_); // To create another reference - entry must be already externally referenced. assert(e->HasRefs()); e->Ref(); @@ -497,7 +497,7 @@ bool LRUCacheShard::Ref(Cache::Handle* h) { } void LRUCacheShard::SetHighPriorityPoolRatio(double high_pri_pool_ratio) { - MutexLock l(&mutex_); + DMutexLock l(mutex_); high_pri_pool_ratio_ = high_pri_pool_ratio; high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_; MaintainPoolSize(); @@ -510,7 +510,7 @@ bool LRUCacheShard::Release(Cache::Handle* handle, bool erase_if_last_ref) { LRUHandle* e = reinterpret_cast(handle); bool last_reference = false; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); last_reference = e->Unref(); if (last_reference && e->InCache()) { // The item is still in cache, and nobody else holds a reference to it. @@ -582,7 +582,7 @@ void LRUCacheShard::Erase(const Slice& key, uint32_t hash) { LRUHandle* e; bool last_reference = false; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); e = table_.Remove(key, hash); if (e != nullptr) { assert(e->InCache()); @@ -606,7 +606,7 @@ void LRUCacheShard::Erase(const Slice& key, uint32_t hash) { bool LRUCacheShard::IsReady(Cache::Handle* handle) { LRUHandle* e = reinterpret_cast(handle); - MutexLock l(&mutex_); + DMutexLock l(mutex_); bool ready = true; if (e->IsPending()) { assert(secondary_cache_); @@ -617,12 +617,12 @@ bool LRUCacheShard::IsReady(Cache::Handle* handle) { } size_t LRUCacheShard::GetUsage() const { - MutexLock l(&mutex_); + DMutexLock l(mutex_); return usage_; } size_t LRUCacheShard::GetPinnedUsage() const { - MutexLock l(&mutex_); + DMutexLock l(mutex_); assert(usage_ >= lru_usage_); return usage_ - lru_usage_; } @@ -631,7 +631,7 @@ std::string LRUCacheShard::GetPrintableOptions() const { const int kBufferSize = 200; char buffer[kBufferSize]; { - MutexLock l(&mutex_); + DMutexLock l(mutex_); snprintf(buffer, kBufferSize, " high_pri_pool_ratio: %.3lf\n", high_pri_pool_ratio_); } diff --git a/cache/lru_cache.h b/cache/lru_cache.h index c718add77..5c1081a82 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -17,6 +17,7 @@ #include "port/port.h" #include "rocksdb/secondary_cache.h" #include "util/autovector.h" +#include "util/distributed_mutex.h" namespace ROCKSDB_NAMESPACE { namespace lru_cache { @@ -453,7 +454,7 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { // mutex_ protects the following state. // We don't count mutex_ as the cache's internal state so semantically we // don't mind mutex_ invoking the non-const actions. - mutable port::Mutex mutex_; + mutable DMutex mutex_; std::shared_ptr secondary_cache_; }; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 7c0f6dbc1..d3e0ff0c2 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -101,6 +101,7 @@ #include "util/compression.h" #include "util/crc32c.h" #include "util/defer.h" +#include "util/distributed_mutex.h" #include "util/hash_containers.h" #include "util/mutexlock.h" #include "util/stop_watch.h" @@ -145,6 +146,8 @@ void DumpSupportInfo(Logger* logger) { } ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s", crc32c::IsFastCrc32Supported().c_str()); + + ROCKS_LOG_HEADER(logger, "DMutex implementation: %s", DMutex::kName()); } } // namespace diff --git a/port/port_posix.h b/port/port_posix.h index d153c5817..cd7bc1a6b 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -95,6 +95,8 @@ class CondVar; class Mutex { public: + static const char* kName() { return "pthread_mutex_t"; } + explicit Mutex(bool adaptive = kDefaultToAdaptiveMutex); // No copying Mutex(const Mutex&) = delete; @@ -111,6 +113,11 @@ class Mutex { // it does NOT verify that mutex is held by a calling thread void AssertHeld(); + // Also implement std Lockable + inline void lock() { Lock(); } + inline void unlock() { Unlock(); } + inline bool try_lock() { return TryLock(); } + private: friend class CondVar; pthread_mutex_t mu_; diff --git a/port/win/port_win.h b/port/win/port_win.h index 55ecc71f9..5a8f66051 100644 --- a/port/win/port_win.h +++ b/port/win/port_win.h @@ -79,12 +79,15 @@ class CondVar; class Mutex { public: + static const char* kName() { return "std::mutex"; } - /* implicit */ Mutex(bool adaptive = kDefaultToAdaptiveMutex) + explicit Mutex(bool IGNORED_adaptive = kDefaultToAdaptiveMutex) #ifndef NDEBUG - : locked_(false) + : locked_(false) #endif - { } + { + (void)IGNORED_adaptive; + } ~Mutex(); @@ -120,6 +123,11 @@ class Mutex { #endif } + // Also implement std Lockable + inline void lock() { Lock(); } + inline void unlock() { Unlock(); } + inline bool try_lock() { return TryLock(); } + // Mutex is move only with lock ownership transfer Mutex(const Mutex&) = delete; void operator=(const Mutex&) = delete; diff --git a/src.mk b/src.mk index 882d6fa48..d8cf58296 100644 --- a/src.mk +++ b/src.mk @@ -374,9 +374,13 @@ TEST_LIB_SOURCES = \ FOLLY_SOURCES = \ $(FOLLY_DIR)/folly/container/detail/F14Table.cpp \ + $(FOLLY_DIR)/folly/detail/Futex.cpp \ $(FOLLY_DIR)/folly/lang/SafeAssert.cpp \ $(FOLLY_DIR)/folly/lang/ToAscii.cpp \ $(FOLLY_DIR)/folly/ScopeGuard.cpp \ + $(FOLLY_DIR)/folly/synchronization/AtomicNotification.cpp \ + $(FOLLY_DIR)/folly/synchronization/DistributedMutex.cpp \ + $(FOLLY_DIR)/folly/synchronization/ParkingLot.cpp \ TOOLS_MAIN_SOURCES = \ db_stress_tool/db_stress.cc \ diff --git a/util/distributed_mutex.h b/util/distributed_mutex.h new file mode 100644 index 000000000..9675a1e2d --- /dev/null +++ b/util/distributed_mutex.h @@ -0,0 +1,48 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/rocksdb_namespace.h" + +// This file declares a wrapper around the efficient folly DistributedMutex +// that falls back on a standard mutex when not available. See +// https://github.com/facebook/folly/blob/main/folly/synchronization/DistributedMutex.h +// for benefits and limitations. + +// At the moment, only scoped locking is supported using DMutexLock +// RAII wrapper, because lock/unlock APIs will vary. + +#ifdef USE_FOLLY + +#include + +namespace ROCKSDB_NAMESPACE { + +class DMutex : public folly::DistributedMutex { + public: + static const char* kName() { return "folly::DistributedMutex"; } + + explicit DMutex(bool IGNORED_adaptive = false) { (void)IGNORED_adaptive; } + + // currently no-op + void AssertHeld() {} +}; +using DMutexLock = std::lock_guard; + +} // namespace ROCKSDB_NAMESPACE + +#else + +#include "port/port.h" + +namespace ROCKSDB_NAMESPACE { + +using DMutex = port::Mutex; +using DMutexLock = std::lock_guard; + +} // namespace ROCKSDB_NAMESPACE + +#endif