Use optimized folly DistributedMutex in LRUCache when available (#10179)

Summary:
folly DistributedMutex is faster than standard mutexes though
imposes some static obligations on usage. See
https://github.com/facebook/folly/blob/main/folly/synchronization/DistributedMutex.h
for details. Here we use this alternative for our Cache implementations
(especially LRUCache) for better locking performance, when RocksDB is
compiled with folly.

Also added information about which distributed mutex implementation is
being used to cache_bench output and to DB LOG.

Intended follow-up:
* Use DMutex in more places, perhaps improving API to support non-scoped
locking
* Fix linking with fbcode compiler (needs ROCKSDB_NO_FBCODE=1 currently)

Credit: Thanks Siying for reminding me about this line of work that was previously
left unfinished.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10179

Test Plan:
for correctness, existing tests. CircleCI config updated.
Also Meta-internal buck build updated.

For performance, ran simultaneous before & after cache_bench. Out of three
comparison runs, the middle improvement to ops/sec was +21%:

Baseline: USE_CLANG=1 DEBUG_LEVEL=0 make -j24 cache_bench (fbcode
compiler)

```
Complete in 20.201 s; Rough parallel ops/sec = 1584062
Thread ops/sec = 107176

Operation latency (ns):
Count: 32000000 Average: 9257.9421  StdDev: 122412.04
Min: 134  Median: 3623.0493  Max: 56918500
Percentiles: P50: 3623.05 P75: 10288.02 P99: 30219.35 P99.9: 683522.04 P99.99: 7302791.63
```

New: (add USE_FOLLY=1)

```
Complete in 16.674 s; Rough parallel ops/sec = 1919135  (+21%)
Thread ops/sec = 135487

Operation latency (ns):
Count: 32000000 Average: 7304.9294  StdDev: 108530.28
Min: 132  Median: 3777.6012  Max: 91030902
Percentiles: P50: 3777.60 P75: 10169.89 P99: 24504.51 P99.9: 59721.59 P99.99: 1861151.83
```

Reviewed By: anand1976

Differential Revision: D37182983

Pulled By: pdillinger

fbshipit-source-id: a17eb05f25b832b6a2c1356f5c657e831a5af8d1
main
Peter Dillinger 3 years ago committed by Facebook GitHub Bot
parent f87adcfb3f
commit 1aac814578
  1. 16
      .circleci/config.yml
  2. 7
      CMakeLists.txt
  3. 3
      HISTORY.md
  4. 8
      Makefile
  5. 2
      TARGETS
  6. 6
      buckifier/buckify_rocksdb.py
  7. 9
      cache/cache_bench_tool.cc
  8. 16
      cache/clock_cache.cc
  9. 24
      cache/fast_lru_cache.cc
  10. 3
      cache/fast_lru_cache.h
  11. 38
      cache/lru_cache.cc
  12. 3
      cache/lru_cache.h
  13. 3
      db/db_impl/db_impl.cc
  14. 7
      port/port_posix.h
  15. 12
      port/win/port_win.h
  16. 4
      src.mk
  17. 48
      util/distributed_mutex.h

@ -168,6 +168,16 @@ commands:
command: | command: |
echo "export PKG_CONFIG_PATH=/usr/local/OFF/:~/libprotobuf-mutator/build/external.protobuf/lib/pkgconfig/" >> $BASH_ENV echo "export PKG_CONFIG_PATH=/usr/local/OFF/:~/libprotobuf-mutator/build/external.protobuf/lib/pkgconfig/" >> $BASH_ENV
echo "export PROTOC_BIN=~/libprotobuf-mutator/build/external.protobuf/bin/protoc" >> $BASH_ENV echo "export PROTOC_BIN=~/libprotobuf-mutator/build/external.protobuf/bin/protoc" >> $BASH_ENV
setup-folly:
steps:
- run:
name: Install folly dependencies
command: |
sudo apt-get install libgoogle-glog-dev
- run:
name: Checkout folly sources
command: |
make checkout_folly
build-for-benchmarks: build-for-benchmarks:
steps: steps:
@ -442,7 +452,7 @@ jobs:
- pre-steps - pre-steps
- install-gflags - install-gflags
- upgrade-cmake - upgrade-cmake
- run: make checkout_folly - setup-folly
- run: (mkdir build && cd build && cmake -DUSE_FOLLY=1 -DWITH_GFLAGS=1 .. && make V=1 -j20 && ctest -j20) - run: (mkdir build && cd build && cmake -DUSE_FOLLY=1 -DWITH_GFLAGS=1 .. && make V=1 -j20 && ctest -j20)
- post-steps - post-steps
@ -477,7 +487,7 @@ jobs:
steps: steps:
- pre-steps - pre-steps
- run: sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test && sudo apt-get update -y && sudo apt-get install gcc-7 g++-7 libgflags-dev - run: sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test && sudo apt-get update -y && sudo apt-get install gcc-7 g++-7 libgflags-dev
- run: make checkout_folly - setup-folly
- run: USE_FOLLY=1 CC=gcc-7 CXX=g++-7 V=1 make -j32 check - run: USE_FOLLY=1 CC=gcc-7 CXX=g++-7 V=1 make -j32 check
- post-steps - post-steps
@ -532,7 +542,7 @@ jobs:
- pre-steps - pre-steps
- install-clang-13 - install-clang-13
- install-gflags - install-gflags
- run: make checkout_folly - setup-folly
- run: CC=clang-13 CXX=clang++-13 USE_CLANG=1 USE_FOLLY=1 COMPILE_WITH_UBSAN=1 COMPILE_WITH_ASAN=1 make -j32 check - run: CC=clang-13 CXX=clang++-13 USE_CLANG=1 USE_FOLLY=1 COMPILE_WITH_UBSAN=1 COMPILE_WITH_ASAN=1 make -j32 check
- post-steps - post-steps

@ -587,6 +587,7 @@ include_directories(${PROJECT_SOURCE_DIR}/include)
if(USE_FOLLY) if(USE_FOLLY)
include_directories(${PROJECT_SOURCE_DIR}/third-party/folly) include_directories(${PROJECT_SOURCE_DIR}/third-party/folly)
add_definitions(-DUSE_FOLLY -DFOLLY_NO_CONFIG) add_definitions(-DUSE_FOLLY -DFOLLY_NO_CONFIG)
list(APPEND THIRDPARTY_LIBS glog)
endif() endif()
find_package(Threads REQUIRED) find_package(Threads REQUIRED)
@ -975,9 +976,13 @@ endif()
if(USE_FOLLY) if(USE_FOLLY)
list(APPEND SOURCES list(APPEND SOURCES
third-party/folly/folly/container/detail/F14Table.cpp third-party/folly/folly/container/detail/F14Table.cpp
third-party/folly/folly/detail/Futex.cpp
third-party/folly/folly/lang/SafeAssert.cpp third-party/folly/folly/lang/SafeAssert.cpp
third-party/folly/folly/lang/ToAscii.cpp third-party/folly/folly/lang/ToAscii.cpp
third-party/folly/folly/ScopeGuard.cpp) third-party/folly/folly/ScopeGuard.cpp
third-party/folly/folly/synchronization/AtomicNotification.cpp
third-party/folly/folly/synchronization/DistributedMutex.cpp
third-party/folly/folly/synchronization/ParkingLot.cpp)
endif() endif()
set(ROCKSDB_STATIC_LIB rocksdb${ARTIFACT_SUFFIX}) set(ROCKSDB_STATIC_LIB rocksdb${ARTIFACT_SUFFIX})

@ -42,6 +42,9 @@
* Removed support for reading Bloom filters using obsolete block-based filter format. (Support for writing such filters was dropped in 7.0.) For good read performance on old DBs using these filters, a full compaction is required. * Removed support for reading Bloom filters using obsolete block-based filter format. (Support for writing such filters was dropped in 7.0.) For good read performance on old DBs using these filters, a full compaction is required.
* Per KV checksum in write batch is verified before a write batch is written to WAL to detect any corruption to the write batch (#10114). * Per KV checksum in write batch is verified before a write batch is written to WAL to detect any corruption to the write batch (#10114).
### Performance Improvements
* When compiled with folly (Meta-internal integration; experimental in open source build), improve the locking performance (CPU efficiency) of LRUCache by using folly DistributedMutex in place of standard mutex.
## 7.3.0 (05/20/2022) ## 7.3.0 (05/20/2022)
### Bug Fixes ### Bug Fixes
* Fixed a bug where manual flush would block forever even though flush options had wait=false. * Fixed a bug where manual flush would block forever even though flush options had wait=false.

@ -461,6 +461,8 @@ ifeq ($(USE_FOLLY),1)
endif endif
PLATFORM_CCFLAGS += -DUSE_FOLLY -DFOLLY_NO_CONFIG PLATFORM_CCFLAGS += -DUSE_FOLLY -DFOLLY_NO_CONFIG
PLATFORM_CXXFLAGS += -DUSE_FOLLY -DFOLLY_NO_CONFIG PLATFORM_CXXFLAGS += -DUSE_FOLLY -DFOLLY_NO_CONFIG
# TODO: fix linking with fbcode compiler config
PLATFORM_LDFLAGS += -lglog
endif endif
ifdef TEST_CACHE_LINE_SIZE ifdef TEST_CACHE_LINE_SIZE
@ -2354,10 +2356,14 @@ checkout_folly:
fi fi
@# Pin to a particular version for public CI, so that PR authors don't @# Pin to a particular version for public CI, so that PR authors don't
@# need to worry about folly breaking our integration. Update periodically @# need to worry about folly breaking our integration. Update periodically
cd third-party/folly && git reset --hard 98b9b2c1124e99f50f9085ddee74ce32afffc665 cd third-party/folly && git reset --hard beacd86d63cd71c904632262e6c36f60874d78ba
@# A hack to remove boost dependency. @# A hack to remove boost dependency.
@# NOTE: this hack is not needed if using FBCODE compiler config @# NOTE: this hack is not needed if using FBCODE compiler config
perl -pi -e 's/^(#include <boost)/\/\/$$1/' third-party/folly/folly/functional/Invoke.h perl -pi -e 's/^(#include <boost)/\/\/$$1/' third-party/folly/folly/functional/Invoke.h
@# NOTE: this hack is required for clang in some cases
perl -pi -e 's/int rv = syscall/int rv = (int)syscall/' third-party/folly/folly/detail/Futex.cpp
@# NOTE: this hack is required for gcc in some cases
perl -pi -e 's/(__has_include.<experimental.memory_resource>.)/__cpp_rtti && $$1/' third-party/folly/folly/memory/MemoryResource.h
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Build size testing # Build size testing

@ -334,6 +334,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"//folly/experimental/coro:collect", "//folly/experimental/coro:collect",
"//folly/experimental/coro:coroutine", "//folly/experimental/coro:coroutine",
"//folly/experimental/coro:task", "//folly/experimental/coro:task",
"//folly/synchronization:distributed_mutex",
], headers=None, link_whole=False, extra_test_libs=False) ], headers=None, link_whole=False, extra_test_libs=False)
cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
@ -662,6 +663,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"//folly/experimental/coro:collect", "//folly/experimental/coro:collect",
"//folly/experimental/coro:coroutine", "//folly/experimental/coro:coroutine",
"//folly/experimental/coro:task", "//folly/experimental/coro:task",
"//folly/synchronization:distributed_mutex",
], headers=None, link_whole=True, extra_test_libs=False) ], headers=None, link_whole=True, extra_test_libs=False)
cpp_library_wrapper(name="rocksdb_test_lib", srcs=[ cpp_library_wrapper(name="rocksdb_test_lib", srcs=[

@ -150,7 +150,8 @@ def generate_targets(repo_path, deps_map):
"//folly/experimental/coro:blocking_wait", "//folly/experimental/coro:blocking_wait",
"//folly/experimental/coro:collect", "//folly/experimental/coro:collect",
"//folly/experimental/coro:coroutine", "//folly/experimental/coro:coroutine",
"//folly/experimental/coro:task"]) "//folly/experimental/coro:task",
"//folly/synchronization:distributed_mutex"])
# rocksdb_whole_archive_lib # rocksdb_whole_archive_lib
TARGETS.add_library( TARGETS.add_library(
"rocksdb_whole_archive_lib", "rocksdb_whole_archive_lib",
@ -163,7 +164,8 @@ def generate_targets(repo_path, deps_map):
"//folly/experimental/coro:blocking_wait", "//folly/experimental/coro:blocking_wait",
"//folly/experimental/coro:collect", "//folly/experimental/coro:collect",
"//folly/experimental/coro:coroutine", "//folly/experimental/coro:coroutine",
"//folly/experimental/coro:task"], "//folly/experimental/coro:task",
"//folly/synchronization:distributed_mutex"],
headers=None, headers=None,
extra_external_deps="", extra_external_deps="",
link_whole=True) link_whole=True)

@ -26,6 +26,7 @@
#include "table/block_based/block_based_table_reader.h" #include "table/block_based/block_based_table_reader.h"
#include "table/block_based/cachable_entry.h" #include "table/block_based/cachable_entry.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/distributed_mutex.h"
#include "util/gflags_compat.h" #include "util/gflags_compat.h"
#include "util/hash.h" #include "util/hash.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
@ -587,7 +588,15 @@ class CacheBench {
} }
void PrintEnv() const { void PrintEnv() const {
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
printf(
"WARNING: Optimization is disabled: benchmarks unnecessarily slow\n");
#endif
#ifndef NDEBUG
printf("WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif
printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion);
printf("DMutex impl name : %s\n", DMutex::kName());
printf("Number of threads : %u\n", FLAGS_threads); printf("Number of threads : %u\n", FLAGS_threads);
printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread); printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread);
printf("Cache size : %s\n", printf("Cache size : %s\n",

@ -39,7 +39,7 @@ std::shared_ptr<Cache> NewClockCache(
#include "port/port.h" #include "port/port.h"
#include "tbb/concurrent_hash_map.h" #include "tbb/concurrent_hash_map.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "util/mutexlock.h" #include "util/distributed_mutex.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -368,7 +368,7 @@ class ClockCacheShard final : public CacheShard {
// Guards list_, head_, and recycle_. In addition, updating table_ also has // Guards list_, head_, and recycle_. In addition, updating table_ also has
// to hold the mutex, to avoid the cache being in inconsistent state. // to hold the mutex, to avoid the cache being in inconsistent state.
mutable port::Mutex mutex_; mutable DMutex mutex_;
// The circular list of cache handles. Initially the list is empty. Once a // The circular list of cache handles. Initially the list is empty. Once a
// handle is needed by insertion, and no more handles are available in // handle is needed by insertion, and no more handles are available in
@ -431,7 +431,7 @@ void ClockCacheShard::ApplyToSomeEntries(
DeleterFn deleter)>& callback, DeleterFn deleter)>& callback,
uint32_t average_entries_per_lock, uint32_t* state) { uint32_t average_entries_per_lock, uint32_t* state) {
assert(average_entries_per_lock > 0); assert(average_entries_per_lock > 0);
MutexLock lock(&mutex_); DMutexLock l(mutex_);
// Figure out the range to iterate, update `state` // Figure out the range to iterate, update `state`
size_t list_size = list_.size(); size_t list_size = list_.size();
@ -532,7 +532,7 @@ bool ClockCacheShard::Unref(CacheHandle* handle, bool set_usage,
pinned_usage_.fetch_sub(total_charge, std::memory_order_relaxed); pinned_usage_.fetch_sub(total_charge, std::memory_order_relaxed);
// Cleanup if it is the last reference. // Cleanup if it is the last reference.
if (!InCache(flags)) { if (!InCache(flags)) {
MutexLock l(&mutex_); DMutexLock l(mutex_);
RecycleHandle(handle, context); RecycleHandle(handle, context);
} }
} }
@ -598,7 +598,7 @@ bool ClockCacheShard::EvictFromCache(size_t charge, CleanupContext* context) {
void ClockCacheShard::SetCapacity(size_t capacity) { void ClockCacheShard::SetCapacity(size_t capacity) {
CleanupContext context; CleanupContext context;
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
capacity_.store(capacity, std::memory_order_relaxed); capacity_.store(capacity, std::memory_order_relaxed);
EvictFromCache(0, &context); EvictFromCache(0, &context);
} }
@ -618,7 +618,7 @@ CacheHandle* ClockCacheShard::Insert(
uint32_t meta_charge = uint32_t meta_charge =
CacheHandle::CalcMetadataCharge(key, metadata_charge_policy_); CacheHandle::CalcMetadataCharge(key, metadata_charge_policy_);
size_t total_charge = charge + meta_charge; size_t total_charge = charge + meta_charge;
MutexLock l(&mutex_); DMutexLock l(mutex_);
bool success = EvictFromCache(total_charge, context); bool success = EvictFromCache(total_charge, context);
bool strict = strict_capacity_limit_.load(std::memory_order_relaxed); bool strict = strict_capacity_limit_.load(std::memory_order_relaxed);
if (!success && (strict || !hold_reference)) { if (!success && (strict || !hold_reference)) {
@ -744,7 +744,7 @@ void ClockCacheShard::Erase(const Slice& key, uint32_t hash) {
bool ClockCacheShard::EraseAndConfirm(const Slice& key, uint32_t hash, bool ClockCacheShard::EraseAndConfirm(const Slice& key, uint32_t hash,
CleanupContext* context) { CleanupContext* context) {
MutexLock l(&mutex_); DMutexLock l(mutex_);
HashTable::accessor accessor; HashTable::accessor accessor;
bool erased = false; bool erased = false;
if (table_.find(accessor, ClockCacheKey(key, hash))) { if (table_.find(accessor, ClockCacheKey(key, hash))) {
@ -758,7 +758,7 @@ bool ClockCacheShard::EraseAndConfirm(const Slice& key, uint32_t hash,
void ClockCacheShard::EraseUnRefEntries() { void ClockCacheShard::EraseUnRefEntries() {
CleanupContext context; CleanupContext context;
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
table_.clear(); table_.clear();
for (auto& handle : list_) { for (auto& handle : list_) {
UnsetInCache(&handle, &context); UnsetInCache(&handle, &context);

@ -16,7 +16,7 @@
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
#include "port/lang.h" #include "port/lang.h"
#include "util/mutexlock.h" #include "util/distributed_mutex.h"
#define KEY_LENGTH \ #define KEY_LENGTH \
16 // TODO(guido) Make use of this symbol in other parts of the source code 16 // TODO(guido) Make use of this symbol in other parts of the source code
@ -93,7 +93,7 @@ LRUCacheShard::LRUCacheShard(size_t capacity, size_t estimated_value_size,
void LRUCacheShard::EraseUnRefEntries() { void LRUCacheShard::EraseUnRefEntries() {
autovector<LRUHandle*> last_reference_list; autovector<LRUHandle*> last_reference_list;
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
while (lru_.next != &lru_) { while (lru_.next != &lru_) {
LRUHandle* old = lru_.next; LRUHandle* old = lru_.next;
// LRU list contains only elements which can be evicted. // LRU list contains only elements which can be evicted.
@ -120,7 +120,7 @@ void LRUCacheShard::ApplyToSomeEntries(
// The state is essentially going to be the starting hash, which works // The state is essentially going to be the starting hash, which works
// nicely even if we resize between calls because we use upper-most // nicely even if we resize between calls because we use upper-most
// hash bits for table indexes. // hash bits for table indexes.
MutexLock l(&mutex_); DMutexLock l(mutex_);
uint32_t length_bits = table_.GetLengthBits(); uint32_t length_bits = table_.GetLengthBits();
uint32_t length = uint32_t{1} << length_bits; uint32_t length = uint32_t{1} << length_bits;
@ -208,7 +208,7 @@ int LRUCacheShard::GetHashBits(
void LRUCacheShard::SetCapacity(size_t capacity) { void LRUCacheShard::SetCapacity(size_t capacity) {
autovector<LRUHandle*> last_reference_list; autovector<LRUHandle*> last_reference_list;
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
capacity_ = capacity; capacity_ = capacity;
EvictFromLRU(0, &last_reference_list); EvictFromLRU(0, &last_reference_list);
} }
@ -220,7 +220,7 @@ void LRUCacheShard::SetCapacity(size_t capacity) {
} }
void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) {
MutexLock l(&mutex_); DMutexLock l(mutex_);
strict_capacity_limit_ = strict_capacity_limit; strict_capacity_limit_ = strict_capacity_limit;
} }
@ -229,7 +229,7 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle,
Status s = Status::OK(); Status s = Status::OK();
autovector<LRUHandle*> last_reference_list; autovector<LRUHandle*> last_reference_list;
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
// Free the space following strict LRU policy until enough space // Free the space following strict LRU policy until enough space
// is freed or the lru list is empty. // is freed or the lru list is empty.
@ -289,7 +289,7 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle,
Cache::Handle* LRUCacheShard::Lookup(const Slice& key, uint32_t hash) { Cache::Handle* LRUCacheShard::Lookup(const Slice& key, uint32_t hash) {
LRUHandle* e = nullptr; LRUHandle* e = nullptr;
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
e = table_.Lookup(key, hash); e = table_.Lookup(key, hash);
if (e != nullptr) { if (e != nullptr) {
assert(e->InCache()); assert(e->InCache());
@ -305,7 +305,7 @@ Cache::Handle* LRUCacheShard::Lookup(const Slice& key, uint32_t hash) {
bool LRUCacheShard::Ref(Cache::Handle* h) { bool LRUCacheShard::Ref(Cache::Handle* h) {
LRUHandle* e = reinterpret_cast<LRUHandle*>(h); LRUHandle* e = reinterpret_cast<LRUHandle*>(h);
MutexLock l(&mutex_); DMutexLock l(mutex_);
// To create another reference - entry must be already externally referenced. // To create another reference - entry must be already externally referenced.
assert(e->HasRefs()); assert(e->HasRefs());
e->Ref(); e->Ref();
@ -319,7 +319,7 @@ bool LRUCacheShard::Release(Cache::Handle* handle, bool erase_if_last_ref) {
LRUHandle* e = reinterpret_cast<LRUHandle*>(handle); LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
bool last_reference = false; bool last_reference = false;
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
last_reference = e->Unref(); last_reference = e->Unref();
if (last_reference && e->InCache()) { if (last_reference && e->InCache()) {
// The item is still in cache, and nobody else holds a reference to it. // The item is still in cache, and nobody else holds a reference to it.
@ -382,7 +382,7 @@ void LRUCacheShard::Erase(const Slice& key, uint32_t hash) {
LRUHandle* e; LRUHandle* e;
bool last_reference = false; bool last_reference = false;
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
e = table_.Remove(key, hash); e = table_.Remove(key, hash);
if (e != nullptr) { if (e != nullptr) {
assert(e->InCache()); assert(e->InCache());
@ -405,12 +405,12 @@ void LRUCacheShard::Erase(const Slice& key, uint32_t hash) {
} }
size_t LRUCacheShard::GetUsage() const { size_t LRUCacheShard::GetUsage() const {
MutexLock l(&mutex_); DMutexLock l(mutex_);
return usage_; return usage_;
} }
size_t LRUCacheShard::GetPinnedUsage() const { size_t LRUCacheShard::GetPinnedUsage() const {
MutexLock l(&mutex_); DMutexLock l(mutex_);
assert(usage_ >= lru_usage_); assert(usage_ >= lru_usage_);
return usage_ - lru_usage_; return usage_ - lru_usage_;
} }

@ -17,6 +17,7 @@
#include "port/port.h" #include "port/port.h"
#include "rocksdb/secondary_cache.h" #include "rocksdb/secondary_cache.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "util/distributed_mutex.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
namespace fast_lru_cache { namespace fast_lru_cache {
@ -273,7 +274,7 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard {
// mutex_ protects the following state. // mutex_ protects the following state.
// We don't count mutex_ as the cache's internal state so semantically we // We don't count mutex_ as the cache's internal state so semantically we
// don't mind mutex_ invoking the non-const actions. // don't mind mutex_ invoking the non-const actions.
mutable port::Mutex mutex_; mutable DMutex mutex_;
}; };
class LRUCache class LRUCache

38
cache/lru_cache.cc vendored

@ -16,7 +16,7 @@
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
#include "port/lang.h" #include "port/lang.h"
#include "util/mutexlock.h" #include "util/distributed_mutex.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
namespace lru_cache { namespace lru_cache {
@ -135,7 +135,7 @@ LRUCacheShard::LRUCacheShard(
void LRUCacheShard::EraseUnRefEntries() { void LRUCacheShard::EraseUnRefEntries() {
autovector<LRUHandle*> last_reference_list; autovector<LRUHandle*> last_reference_list;
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
while (lru_.next != &lru_) { while (lru_.next != &lru_) {
LRUHandle* old = lru_.next; LRUHandle* old = lru_.next;
// LRU list contains only elements which can be evicted. // LRU list contains only elements which can be evicted.
@ -161,7 +161,7 @@ void LRUCacheShard::ApplyToSomeEntries(
// The state is essentially going to be the starting hash, which works // The state is essentially going to be the starting hash, which works
// nicely even if we resize between calls because we use upper-most // nicely even if we resize between calls because we use upper-most
// hash bits for table indexes. // hash bits for table indexes.
MutexLock l(&mutex_); DMutexLock l(mutex_);
uint32_t length_bits = table_.GetLengthBits(); uint32_t length_bits = table_.GetLengthBits();
uint32_t length = uint32_t{1} << length_bits; uint32_t length = uint32_t{1} << length_bits;
@ -193,13 +193,13 @@ void LRUCacheShard::ApplyToSomeEntries(
} }
void LRUCacheShard::TEST_GetLRUList(LRUHandle** lru, LRUHandle** lru_low_pri) { void LRUCacheShard::TEST_GetLRUList(LRUHandle** lru, LRUHandle** lru_low_pri) {
MutexLock l(&mutex_); DMutexLock l(mutex_);
*lru = &lru_; *lru = &lru_;
*lru_low_pri = lru_low_pri_; *lru_low_pri = lru_low_pri_;
} }
size_t LRUCacheShard::TEST_GetLRUSize() { size_t LRUCacheShard::TEST_GetLRUSize() {
MutexLock l(&mutex_); DMutexLock l(mutex_);
LRUHandle* lru_handle = lru_.next; LRUHandle* lru_handle = lru_.next;
size_t lru_size = 0; size_t lru_size = 0;
while (lru_handle != &lru_) { while (lru_handle != &lru_) {
@ -210,7 +210,7 @@ size_t LRUCacheShard::TEST_GetLRUSize() {
} }
double LRUCacheShard::GetHighPriPoolRatio() { double LRUCacheShard::GetHighPriPoolRatio() {
MutexLock l(&mutex_); DMutexLock l(mutex_);
return high_pri_pool_ratio_; return high_pri_pool_ratio_;
} }
@ -285,7 +285,7 @@ void LRUCacheShard::EvictFromLRU(size_t charge,
void LRUCacheShard::SetCapacity(size_t capacity) { void LRUCacheShard::SetCapacity(size_t capacity) {
autovector<LRUHandle*> last_reference_list; autovector<LRUHandle*> last_reference_list;
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
capacity_ = capacity; capacity_ = capacity;
high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_; high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_;
EvictFromLRU(0, &last_reference_list); EvictFromLRU(0, &last_reference_list);
@ -304,7 +304,7 @@ void LRUCacheShard::SetCapacity(size_t capacity) {
} }
void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) {
MutexLock l(&mutex_); DMutexLock l(mutex_);
strict_capacity_limit_ = strict_capacity_limit; strict_capacity_limit_ = strict_capacity_limit;
} }
@ -314,7 +314,7 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle,
autovector<LRUHandle*> last_reference_list; autovector<LRUHandle*> last_reference_list;
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
// Free the space following strict LRU policy until enough space // Free the space following strict LRU policy until enough space
// is freed or the lru list is empty. // is freed or the lru list is empty.
@ -402,7 +402,7 @@ void LRUCacheShard::Promote(LRUHandle* e) {
} else { } else {
// Since the secondary cache lookup failed, mark the item as not in cache // Since the secondary cache lookup failed, mark the item as not in cache
// Don't charge the cache as its only metadata that'll shortly be released // Don't charge the cache as its only metadata that'll shortly be released
MutexLock l(&mutex_); DMutexLock l(mutex_);
// TODO // TODO
e->CalcTotalCharge(0, metadata_charge_policy_); e->CalcTotalCharge(0, metadata_charge_policy_);
e->SetInCache(false); e->SetInCache(false);
@ -416,7 +416,7 @@ Cache::Handle* LRUCacheShard::Lookup(
bool wait, Statistics* stats) { bool wait, Statistics* stats) {
LRUHandle* e = nullptr; LRUHandle* e = nullptr;
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
e = table_.Lookup(key, hash); e = table_.Lookup(key, hash);
if (e != nullptr) { if (e != nullptr) {
assert(e->InCache()); assert(e->InCache());
@ -489,7 +489,7 @@ Cache::Handle* LRUCacheShard::Lookup(
bool LRUCacheShard::Ref(Cache::Handle* h) { bool LRUCacheShard::Ref(Cache::Handle* h) {
LRUHandle* e = reinterpret_cast<LRUHandle*>(h); LRUHandle* e = reinterpret_cast<LRUHandle*>(h);
MutexLock l(&mutex_); DMutexLock l(mutex_);
// To create another reference - entry must be already externally referenced. // To create another reference - entry must be already externally referenced.
assert(e->HasRefs()); assert(e->HasRefs());
e->Ref(); e->Ref();
@ -497,7 +497,7 @@ bool LRUCacheShard::Ref(Cache::Handle* h) {
} }
void LRUCacheShard::SetHighPriorityPoolRatio(double high_pri_pool_ratio) { void LRUCacheShard::SetHighPriorityPoolRatio(double high_pri_pool_ratio) {
MutexLock l(&mutex_); DMutexLock l(mutex_);
high_pri_pool_ratio_ = high_pri_pool_ratio; high_pri_pool_ratio_ = high_pri_pool_ratio;
high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_; high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_;
MaintainPoolSize(); MaintainPoolSize();
@ -510,7 +510,7 @@ bool LRUCacheShard::Release(Cache::Handle* handle, bool erase_if_last_ref) {
LRUHandle* e = reinterpret_cast<LRUHandle*>(handle); LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
bool last_reference = false; bool last_reference = false;
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
last_reference = e->Unref(); last_reference = e->Unref();
if (last_reference && e->InCache()) { if (last_reference && e->InCache()) {
// The item is still in cache, and nobody else holds a reference to it. // The item is still in cache, and nobody else holds a reference to it.
@ -582,7 +582,7 @@ void LRUCacheShard::Erase(const Slice& key, uint32_t hash) {
LRUHandle* e; LRUHandle* e;
bool last_reference = false; bool last_reference = false;
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
e = table_.Remove(key, hash); e = table_.Remove(key, hash);
if (e != nullptr) { if (e != nullptr) {
assert(e->InCache()); assert(e->InCache());
@ -606,7 +606,7 @@ void LRUCacheShard::Erase(const Slice& key, uint32_t hash) {
bool LRUCacheShard::IsReady(Cache::Handle* handle) { bool LRUCacheShard::IsReady(Cache::Handle* handle) {
LRUHandle* e = reinterpret_cast<LRUHandle*>(handle); LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
MutexLock l(&mutex_); DMutexLock l(mutex_);
bool ready = true; bool ready = true;
if (e->IsPending()) { if (e->IsPending()) {
assert(secondary_cache_); assert(secondary_cache_);
@ -617,12 +617,12 @@ bool LRUCacheShard::IsReady(Cache::Handle* handle) {
} }
size_t LRUCacheShard::GetUsage() const { size_t LRUCacheShard::GetUsage() const {
MutexLock l(&mutex_); DMutexLock l(mutex_);
return usage_; return usage_;
} }
size_t LRUCacheShard::GetPinnedUsage() const { size_t LRUCacheShard::GetPinnedUsage() const {
MutexLock l(&mutex_); DMutexLock l(mutex_);
assert(usage_ >= lru_usage_); assert(usage_ >= lru_usage_);
return usage_ - lru_usage_; return usage_ - lru_usage_;
} }
@ -631,7 +631,7 @@ std::string LRUCacheShard::GetPrintableOptions() const {
const int kBufferSize = 200; const int kBufferSize = 200;
char buffer[kBufferSize]; char buffer[kBufferSize];
{ {
MutexLock l(&mutex_); DMutexLock l(mutex_);
snprintf(buffer, kBufferSize, " high_pri_pool_ratio: %.3lf\n", snprintf(buffer, kBufferSize, " high_pri_pool_ratio: %.3lf\n",
high_pri_pool_ratio_); high_pri_pool_ratio_);
} }

3
cache/lru_cache.h vendored

@ -17,6 +17,7 @@
#include "port/port.h" #include "port/port.h"
#include "rocksdb/secondary_cache.h" #include "rocksdb/secondary_cache.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "util/distributed_mutex.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
namespace lru_cache { namespace lru_cache {
@ -453,7 +454,7 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard {
// mutex_ protects the following state. // mutex_ protects the following state.
// We don't count mutex_ as the cache's internal state so semantically we // We don't count mutex_ as the cache's internal state so semantically we
// don't mind mutex_ invoking the non-const actions. // don't mind mutex_ invoking the non-const actions.
mutable port::Mutex mutex_; mutable DMutex mutex_;
std::shared_ptr<SecondaryCache> secondary_cache_; std::shared_ptr<SecondaryCache> secondary_cache_;
}; };

@ -101,6 +101,7 @@
#include "util/compression.h" #include "util/compression.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/defer.h" #include "util/defer.h"
#include "util/distributed_mutex.h"
#include "util/hash_containers.h" #include "util/hash_containers.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
@ -145,6 +146,8 @@ void DumpSupportInfo(Logger* logger) {
} }
ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s", ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s",
crc32c::IsFastCrc32Supported().c_str()); crc32c::IsFastCrc32Supported().c_str());
ROCKS_LOG_HEADER(logger, "DMutex implementation: %s", DMutex::kName());
} }
} // namespace } // namespace

@ -95,6 +95,8 @@ class CondVar;
class Mutex { class Mutex {
public: public:
static const char* kName() { return "pthread_mutex_t"; }
explicit Mutex(bool adaptive = kDefaultToAdaptiveMutex); explicit Mutex(bool adaptive = kDefaultToAdaptiveMutex);
// No copying // No copying
Mutex(const Mutex&) = delete; Mutex(const Mutex&) = delete;
@ -111,6 +113,11 @@ class Mutex {
// it does NOT verify that mutex is held by a calling thread // it does NOT verify that mutex is held by a calling thread
void AssertHeld(); void AssertHeld();
// Also implement std Lockable
inline void lock() { Lock(); }
inline void unlock() { Unlock(); }
inline bool try_lock() { return TryLock(); }
private: private:
friend class CondVar; friend class CondVar;
pthread_mutex_t mu_; pthread_mutex_t mu_;

@ -79,12 +79,15 @@ class CondVar;
class Mutex { class Mutex {
public: public:
static const char* kName() { return "std::mutex"; }
/* implicit */ Mutex(bool adaptive = kDefaultToAdaptiveMutex) explicit Mutex(bool IGNORED_adaptive = kDefaultToAdaptiveMutex)
#ifndef NDEBUG #ifndef NDEBUG
: locked_(false) : locked_(false)
#endif #endif
{ } {
(void)IGNORED_adaptive;
}
~Mutex(); ~Mutex();
@ -120,6 +123,11 @@ class Mutex {
#endif #endif
} }
// Also implement std Lockable
inline void lock() { Lock(); }
inline void unlock() { Unlock(); }
inline bool try_lock() { return TryLock(); }
// Mutex is move only with lock ownership transfer // Mutex is move only with lock ownership transfer
Mutex(const Mutex&) = delete; Mutex(const Mutex&) = delete;
void operator=(const Mutex&) = delete; void operator=(const Mutex&) = delete;

@ -374,9 +374,13 @@ TEST_LIB_SOURCES = \
FOLLY_SOURCES = \ FOLLY_SOURCES = \
$(FOLLY_DIR)/folly/container/detail/F14Table.cpp \ $(FOLLY_DIR)/folly/container/detail/F14Table.cpp \
$(FOLLY_DIR)/folly/detail/Futex.cpp \
$(FOLLY_DIR)/folly/lang/SafeAssert.cpp \ $(FOLLY_DIR)/folly/lang/SafeAssert.cpp \
$(FOLLY_DIR)/folly/lang/ToAscii.cpp \ $(FOLLY_DIR)/folly/lang/ToAscii.cpp \
$(FOLLY_DIR)/folly/ScopeGuard.cpp \ $(FOLLY_DIR)/folly/ScopeGuard.cpp \
$(FOLLY_DIR)/folly/synchronization/AtomicNotification.cpp \
$(FOLLY_DIR)/folly/synchronization/DistributedMutex.cpp \
$(FOLLY_DIR)/folly/synchronization/ParkingLot.cpp \
TOOLS_MAIN_SOURCES = \ TOOLS_MAIN_SOURCES = \
db_stress_tool/db_stress.cc \ db_stress_tool/db_stress.cc \

@ -0,0 +1,48 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/rocksdb_namespace.h"
// This file declares a wrapper around the efficient folly DistributedMutex
// that falls back on a standard mutex when not available. See
// https://github.com/facebook/folly/blob/main/folly/synchronization/DistributedMutex.h
// for benefits and limitations.
// At the moment, only scoped locking is supported using DMutexLock
// RAII wrapper, because lock/unlock APIs will vary.
#ifdef USE_FOLLY
#include <folly/synchronization/DistributedMutex.h>
namespace ROCKSDB_NAMESPACE {
class DMutex : public folly::DistributedMutex {
public:
static const char* kName() { return "folly::DistributedMutex"; }
explicit DMutex(bool IGNORED_adaptive = false) { (void)IGNORED_adaptive; }
// currently no-op
void AssertHeld() {}
};
using DMutexLock = std::lock_guard<folly::DistributedMutex>;
} // namespace ROCKSDB_NAMESPACE
#else
#include "port/port.h"
namespace ROCKSDB_NAMESPACE {
using DMutex = port::Mutex;
using DMutexLock = std::lock_guard<DMutex>;
} // namespace ROCKSDB_NAMESPACE
#endif
Loading…
Cancel
Save