Improve memory efficiency of many OptimisticTransactionDBs (#11439)

Summary:
Currently it's easy to use a ton of memory with many small OptimisticTransactionDB instances, because each one by default allocates a million mutexes (40 bytes each on my compiler) for validating transactions. It even puts a lot of pressure on the allocator by allocating each one individually!

In this change:
* Create a new object and option that enables sharing these buckets of mutexes between instances. This is generally good for load balancing potential contention as various DBs become hotter or colder with txn writes. About the only cases where this sharing wouldn't make sense (e.g. each DB usually written by one thread) are cases that would be better off with OccValidationPolicy::kValidateSerial which doesn't use the buckets anyway.
* Allocate the mutexes in a contiguous array, for efficiency
* Add an option to ensure the mutexes are cache-aligned. In several other places we use cache-aligned mutexes but OptimisticTransactionDB historically does not. It should be a space-time trade-off the user can choose.
* Provide some visibility into the memory used by the mutex buckets with an ApproximateMemoryUsage() function (also used in unit testing)
* Share code with other users of "striped" mutexes, appropriate refactoring for customization & efficiency (e.g. using FastRange instead of modulus)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11439

Test Plan: unit tests added. Ran sized-up versions of stress test in unit test, including a before-and-after performance test showing no consistent difference. (NOTE: OptimisticTransactionDB not currently covered by db_stress!)

Reviewed By: ltamasi

Differential Revision: D45796393

Pulled By: pdillinger

fbshipit-source-id: ae2b3a26ad91ceeec15debcdc63ff48df6736a54
oxigraph-main
Peter Dillinger 2 years ago committed by Facebook GitHub Bot
parent 93e0715fad
commit 17bc27741f
  1. 2
      HISTORY.md
  2. 4
      db/blob/blob_file_cache.cc
  3. 2
      db/blob/blob_file_cache.h
  4. 2
      db/memtable.h
  5. 4
      db/table_cache.cc
  6. 2
      db/table_cache.h
  7. 35
      include/rocksdb/utilities/optimistic_transaction_db.h
  8. 14
      util/cast_util.h
  9. 10
      util/hash.h
  10. 62
      util/mutexlock.h
  11. 31
      utilities/transactions/optimistic_transaction.cc
  12. 15
      utilities/transactions/optimistic_transaction_db_impl.cc
  13. 49
      utilities/transactions/optimistic_transaction_db_impl.h
  14. 220
      utilities/transactions/optimistic_transaction_test.cc

@ -1,5 +1,7 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## Unreleased
### New Features
* Add a new option OptimisticTransactionDBOptions::shared_lock_buckets that enables sharing mutexes for validating transactions between DB instances, for better balancing memory efficiency and validation contention across DB instances. Different column families and DBs also now use different hash seeds in this validation, so that the same set of key names will not contend across DBs or column families.
## 8.3.0 (05/19/2023) ## 8.3.0 (05/19/2023)
### New Features ### New Features
* Introduced a new option `block_protection_bytes_per_key`, which can be used to enable per key-value integrity protection for in-memory blocks in block cache (#11287). * Introduced a new option `block_protection_bytes_per_key`, which can be used to enable per key-value integrity protection for in-memory blocks in block cache (#11287).

@ -25,7 +25,7 @@ BlobFileCache::BlobFileCache(Cache* cache,
HistogramImpl* blob_file_read_hist, HistogramImpl* blob_file_read_hist,
const std::shared_ptr<IOTracer>& io_tracer) const std::shared_ptr<IOTracer>& io_tracer)
: cache_(cache), : cache_(cache),
mutex_(kNumberOfMutexStripes, kGetSliceNPHash64UnseededFnPtr), mutex_(kNumberOfMutexStripes),
immutable_options_(immutable_options), immutable_options_(immutable_options),
file_options_(file_options), file_options_(file_options),
column_family_id_(column_family_id), column_family_id_(column_family_id),
@ -55,7 +55,7 @@ Status BlobFileCache::GetBlobFileReader(
TEST_SYNC_POINT("BlobFileCache::GetBlobFileReader:DoubleCheck"); TEST_SYNC_POINT("BlobFileCache::GetBlobFileReader:DoubleCheck");
// Check again while holding mutex // Check again while holding mutex
MutexLock lock(mutex_.get(key)); MutexLock lock(&mutex_.Get(key));
handle = cache_.Lookup(key); handle = cache_.Lookup(key);
if (handle) { if (handle) {

@ -43,7 +43,7 @@ class BlobFileCache {
CacheInterface cache_; CacheInterface cache_;
// Note: mutex_ below is used to guard against multiple threads racing to open // Note: mutex_ below is used to guard against multiple threads racing to open
// the same file. // the same file.
Striped<port::Mutex, Slice> mutex_; Striped<CacheAlignedWrapper<port::Mutex>> mutex_;
const ImmutableOptions* immutable_options_; const ImmutableOptions* immutable_options_;
const FileOptions* file_options_; const FileOptions* file_options_;
uint32_t column_family_id_; uint32_t column_family_id_;

@ -596,7 +596,7 @@ class MemTable {
const SliceTransform* insert_with_hint_prefix_extractor_; const SliceTransform* insert_with_hint_prefix_extractor_;
// Insert hints for each prefix. // Insert hints for each prefix.
UnorderedMapH<Slice, void*, SliceHasher> insert_hints_; UnorderedMapH<Slice, void*, SliceHasher32> insert_hints_;
// Timestamp of oldest key // Timestamp of oldest key
std::atomic<uint64_t> oldest_key_time_; std::atomic<uint64_t> oldest_key_time_;

@ -75,7 +75,7 @@ TableCache::TableCache(const ImmutableOptions& ioptions,
cache_(cache), cache_(cache),
immortal_tables_(false), immortal_tables_(false),
block_cache_tracer_(block_cache_tracer), block_cache_tracer_(block_cache_tracer),
loader_mutex_(kLoadConcurency, kGetSliceNPHash64UnseededFnPtr), loader_mutex_(kLoadConcurency),
io_tracer_(io_tracer), io_tracer_(io_tracer),
db_session_id_(db_session_id) { db_session_id_(db_session_id) {
if (ioptions_.row_cache) { if (ioptions_.row_cache) {
@ -174,7 +174,7 @@ Status TableCache::FindTable(
if (no_io) { if (no_io) {
return Status::Incomplete("Table not found in table_cache, no_io is set"); return Status::Incomplete("Table not found in table_cache, no_io is set");
} }
MutexLock load_lock(loader_mutex_.get(key)); MutexLock load_lock(&loader_mutex_.Get(key));
// We check the cache again under loading mutex // We check the cache again under loading mutex
*handle = cache_.Lookup(key); *handle = cache_.Lookup(key);
if (*handle != nullptr) { if (*handle != nullptr) {

@ -275,7 +275,7 @@ class TableCache {
std::string row_cache_id_; std::string row_cache_id_;
bool immortal_tables_; bool immortal_tables_;
BlockCacheTracer* const block_cache_tracer_; BlockCacheTracer* const block_cache_tracer_;
Striped<port::Mutex, Slice> loader_mutex_; Striped<CacheAlignedWrapper<port::Mutex>> loader_mutex_;
std::shared_ptr<IOTracer> io_tracer_; std::shared_ptr<IOTracer> io_tracer_;
std::string db_session_id_; std::string db_session_id_;
}; };

@ -5,6 +5,7 @@
#pragma once #pragma once
#include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
@ -43,11 +44,42 @@ enum class OccValidationPolicy {
kValidateParallel = 1 kValidateParallel = 1
}; };
class OccLockBuckets {
public:
// Most details in internal derived class.
// Users should not derive from this class.
virtual ~OccLockBuckets() {}
virtual size_t ApproximateMemoryUsage() const = 0;
private:
friend class OccLockBucketsImplBase;
OccLockBuckets() {}
};
// An object for sharing a pool of locks across DB instances.
//
// Making the locks cache-aligned avoids potential false sharing, at the
// potential cost of extra memory. The implementation has historically
// used cache_aligned = false.
std::shared_ptr<OccLockBuckets> MakeSharedOccLockBuckets(
size_t bucket_count, bool cache_aligned = false);
struct OptimisticTransactionDBOptions { struct OptimisticTransactionDBOptions {
OccValidationPolicy validate_policy = OccValidationPolicy::kValidateParallel; OccValidationPolicy validate_policy = OccValidationPolicy::kValidateParallel;
// works only if validate_policy == OccValidationPolicy::kValidateParallel // Number of striped/bucketed mutex locks for validating transactions.
// Used on only if validate_policy == OccValidationPolicy::kValidateParallel
// and shared_lock_buckets (below) is empty. Larger number potentially
// reduces contention but uses more memory.
uint32_t occ_lock_buckets = (1 << 20); uint32_t occ_lock_buckets = (1 << 20);
// A pool of mutex locks for validating transactions. Can be shared among
// DBs. Ignored if validate_policy != OccValidationPolicy::kValidateParallel.
// If empty and validate_policy == OccValidationPolicy::kValidateParallel,
// an OccLockBuckets will be created using the count in occ_lock_buckets.
// See MakeSharedOccLockBuckets()
std::shared_ptr<OccLockBuckets> shared_lock_buckets;
}; };
// Range deletions (including those in `WriteBatch`es passed to `Write()`) are // Range deletions (including those in `WriteBatch`es passed to `Write()`) are
@ -95,4 +127,3 @@ class OptimisticTransactionDB : public StackableDB {
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -5,6 +5,7 @@
#pragma once #pragma once
#include <memory>
#include <type_traits> #include <type_traits>
#include "rocksdb/rocksdb_namespace.h" #include "rocksdb/rocksdb_namespace.h"
@ -23,6 +24,19 @@ inline DestClass* static_cast_with_check(SrcClass* x) {
return ret; return ret;
} }
template <class DestClass, class SrcClass>
inline std::shared_ptr<DestClass> static_cast_with_check(
std::shared_ptr<SrcClass>&& x) {
#if defined(ROCKSDB_USE_RTTI) && !defined(NDEBUG)
auto orig_raw = x.get();
#endif
auto ret = std::static_pointer_cast<DestClass>(std::move(x));
#if defined(ROCKSDB_USE_RTTI) && !defined(NDEBUG)
assert(ret.get() == dynamic_cast<DestClass*>(orig_raw));
#endif
return ret;
}
// A wrapper around static_cast for lossless conversion between integral // A wrapper around static_cast for lossless conversion between integral
// types, including enum types. For example, this can be used for converting // types, including enum types. For example, this can be used for converting
// between signed/unsigned or enum type and underlying type without fear of // between signed/unsigned or enum type and underlying type without fear of

@ -128,10 +128,14 @@ inline uint32_t Upper32of64(uint64_t v) {
} }
inline uint32_t Lower32of64(uint64_t v) { return static_cast<uint32_t>(v); } inline uint32_t Lower32of64(uint64_t v) { return static_cast<uint32_t>(v); }
// std::hash compatible interface. // std::hash-like interface.
// TODO: consider rename to SliceHasher32 struct SliceHasher32 {
struct SliceHasher {
uint32_t operator()(const Slice& s) const { return GetSliceHash(s); } uint32_t operator()(const Slice& s) const { return GetSliceHash(s); }
}; };
struct SliceNPHasher64 {
uint64_t operator()(const Slice& s, uint64_t seed = 0) const {
return GetSliceNPHash64(s, seed);
}
};
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -12,10 +12,13 @@
#include <atomic> #include <atomic>
#include <functional> #include <functional>
#include <memory>
#include <mutex> #include <mutex>
#include <thread> #include <thread>
#include "port/port.h" #include "port/port.h"
#include "util/fastrange.h"
#include "util/hash.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -129,10 +132,25 @@ class SpinMutex {
std::atomic<bool> locked_; std::atomic<bool> locked_;
}; };
// We want to prevent false sharing // For preventing false sharing, especially for mutexes.
// NOTE: if a mutex is less than half the size of a cache line, it would
// make more sense for Striped structure below to pack more than one mutex
// into each cache line, as this would only reduce contention for the same
// amount of space and cache sharing. However, a mutex is often 40 bytes out
// of a 64 byte cache line.
template <class T> template <class T>
struct ALIGN_AS(CACHE_LINE_SIZE) LockData { struct ALIGN_AS(CACHE_LINE_SIZE) CacheAlignedWrapper {
T lock_; T obj_;
};
template <class T>
struct Unwrap {
using type = T;
static type &Go(T &t) { return t; }
};
template <class T>
struct Unwrap<CacheAlignedWrapper<T>> {
using type = T;
static type &Go(CacheAlignedWrapper<T> &t) { return t.obj_; }
}; };
// //
@ -144,38 +162,28 @@ struct ALIGN_AS(CACHE_LINE_SIZE) LockData {
// single lock and allowing independent operations to lock different stripes and // single lock and allowing independent operations to lock different stripes and
// proceed concurrently, instead of creating contention for a single lock. // proceed concurrently, instead of creating contention for a single lock.
// //
template <class T, class P> template <class T, class Key = Slice, class Hash = SliceNPHasher64>
class Striped { class Striped {
public: public:
Striped(size_t stripes, std::function<uint64_t(const P &)> hash) explicit Striped(size_t stripe_count)
: stripes_(stripes), hash_(hash) { : stripe_count_(stripe_count), data_(new T[stripe_count]) {}
locks_ = reinterpret_cast<LockData<T> *>(
port::cacheline_aligned_alloc(sizeof(LockData<T>) * stripes));
for (size_t i = 0; i < stripes; i++) {
new (&locks_[i]) LockData<T>();
}
}
virtual ~Striped() { using Unwrapped = typename Unwrap<T>::type;
if (locks_ != nullptr) { Unwrapped &Get(const Key &key, uint64_t seed = 0) {
assert(stripes_ > 0); size_t index = FastRangeGeneric(hash_(key, seed), stripe_count_);
for (size_t i = 0; i < stripes_; i++) { return Unwrap<T>::Go(data_[index]);
locks_[i].~LockData<T>();
}
port::cacheline_aligned_free(locks_);
}
} }
T *get(const P &key) { size_t ApproximateMemoryUsage() const {
uint64_t h = hash_(key); // NOTE: could use malloc_usable_size() here, but that could count unmapped
size_t index = h % stripes_; // pages and could mess up unit test OccLockBucketsTest::CacheAligned
return &reinterpret_cast<LockData<T> *>(&locks_[index])->lock_; return sizeof(*this) + stripe_count_ * sizeof(T);
} }
private: private:
size_t stripes_; size_t stripe_count_;
LockData<T> *locks_; std::unique_ptr<T[]> data_;
std::function<uint64_t(const P &)> hash_; Hash hash_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -6,6 +6,7 @@
#include "utilities/transactions/optimistic_transaction.h" #include "utilities/transactions/optimistic_transaction.h"
#include <cstdint>
#include <string> #include <string>
#include "db/column_family.h" #include "db/column_family.h"
@ -15,6 +16,7 @@
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/optimistic_transaction_db.h"
#include "util/cast_util.h" #include "util/cast_util.h"
#include "util/defer.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "utilities/transactions/lock/point/point_lock_tracker.h" #include "utilities/transactions/lock/point/point_lock_tracker.h"
#include "utilities/transactions/optimistic_transaction.h" #include "utilities/transactions/optimistic_transaction.h"
@ -96,28 +98,42 @@ Status OptimisticTransaction::CommitWithParallelValidate() {
assert(txn_db_impl); assert(txn_db_impl);
DBImpl* db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB()); DBImpl* db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB());
assert(db_impl); assert(db_impl);
const size_t space = txn_db_impl->GetLockBucketsSize(); std::set<port::Mutex*> lk_ptrs;
std::set<size_t> lk_idxes;
std::vector<std::unique_lock<std::mutex>> lks;
std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it( std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
tracked_locks_->GetColumnFamilyIterator()); tracked_locks_->GetColumnFamilyIterator());
assert(cf_it != nullptr); assert(cf_it != nullptr);
while (cf_it->HasNext()) { while (cf_it->HasNext()) {
ColumnFamilyId cf = cf_it->Next(); ColumnFamilyId cf = cf_it->Next();
// To avoid the same key(s) contending across CFs or DBs, seed the
// hash independently.
uint64_t seed = reinterpret_cast<uintptr_t>(db_impl) +
uint64_t{0xb83c07fbc6ced699} /*random prime*/ * cf;
std::unique_ptr<LockTracker::KeyIterator> key_it( std::unique_ptr<LockTracker::KeyIterator> key_it(
tracked_locks_->GetKeyIterator(cf)); tracked_locks_->GetKeyIterator(cf));
assert(key_it != nullptr); assert(key_it != nullptr);
while (key_it->HasNext()) { while (key_it->HasNext()) {
const std::string& key = key_it->Next(); auto lock_bucket_ptr = &txn_db_impl->GetLockBucket(key_it->Next(), seed);
lk_idxes.insert(FastRange64(GetSliceNPHash64(key), space)); TEST_SYNC_POINT_CALLBACK(
"OptimisticTransaction::CommitWithParallelValidate::lock_bucket_ptr",
lock_bucket_ptr);
lk_ptrs.insert(lock_bucket_ptr);
} }
} }
// NOTE: in a single txn, all bucket-locks are taken in ascending order. // NOTE: in a single txn, all bucket-locks are taken in ascending order.
// In this way, txns from different threads all obey this rule so that // In this way, txns from different threads all obey this rule so that
// deadlock can be avoided. // deadlock can be avoided.
for (auto v : lk_idxes) { for (auto v : lk_ptrs) {
lks.emplace_back(txn_db_impl->LockBucket(v)); // WART: if an exception is thrown during a Lock(), previously locked will
// not be Unlock()ed. But a vector of MutexLock is likely inefficient.
v->Lock();
} }
Defer unlocks([&]() {
for (auto v : lk_ptrs) {
v->Unlock();
}
});
Status s = TransactionUtil::CheckKeysForConflicts(db_impl, *tracked_locks_, Status s = TransactionUtil::CheckKeysForConflicts(db_impl, *tracked_locks_,
true /* cache_only */); true /* cache_only */);
@ -191,4 +207,3 @@ Status OptimisticTransaction::SetName(const TransactionName& /* unused */) {
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -17,6 +17,15 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
std::shared_ptr<OccLockBuckets> MakeSharedOccLockBuckets(size_t bucket_count,
bool cache_aligned) {
if (cache_aligned) {
return std::make_shared<OccLockBucketsImpl<true>>(bucket_count);
} else {
return std::make_shared<OccLockBucketsImpl<false>>(bucket_count);
}
}
Transaction* OptimisticTransactionDBImpl::BeginTransaction( Transaction* OptimisticTransactionDBImpl::BeginTransaction(
const WriteOptions& write_options, const WriteOptions& write_options,
const OptimisticTransactionOptions& txn_options, Transaction* old_txn) { const OptimisticTransactionOptions& txn_options, Transaction* old_txn) {
@ -28,12 +37,6 @@ Transaction* OptimisticTransactionDBImpl::BeginTransaction(
} }
} }
std::unique_lock<std::mutex> OptimisticTransactionDBImpl::LockBucket(
size_t idx) {
assert(idx < bucketed_locks_.size());
return std::unique_lock<std::mutex>(*bucketed_locks_[idx]);
}
Status OptimisticTransactionDB::Open(const Options& options, Status OptimisticTransactionDB::Open(const Options& options,
const std::string& dbname, const std::string& dbname,
OptimisticTransactionDB** dbptr) { OptimisticTransactionDB** dbptr) {

@ -6,15 +6,41 @@
#pragma once #pragma once
#include <algorithm> #include <algorithm>
#include <mutex> #include <cstdint>
#include <memory>
#include <vector> #include <vector>
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/optimistic_transaction_db.h"
#include "util/cast_util.h"
#include "util/mutexlock.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class OccLockBucketsImplBase : public OccLockBuckets {
public:
virtual port::Mutex& GetLockBucket(const Slice& key, uint64_t seed) = 0;
};
template <bool cache_aligned>
class OccLockBucketsImpl : public OccLockBucketsImplBase {
public:
explicit OccLockBucketsImpl(size_t bucket_count) : locks_(bucket_count) {}
port::Mutex& GetLockBucket(const Slice& key, uint64_t seed) override {
return locks_.Get(key, seed);
}
size_t ApproximateMemoryUsage() const override {
return locks_.ApproximateMemoryUsage();
}
private:
// TODO: investigate optionally using folly::MicroLock to majorly save space
using M = std::conditional_t<cache_aligned, CacheAlignedWrapper<port::Mutex>,
port::Mutex>;
Striped<M> locks_;
};
class OptimisticTransactionDBImpl : public OptimisticTransactionDB { class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
public: public:
explicit OptimisticTransactionDBImpl( explicit OptimisticTransactionDBImpl(
@ -24,12 +50,13 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
db_owner_(take_ownership), db_owner_(take_ownership),
validate_policy_(occ_options.validate_policy) { validate_policy_(occ_options.validate_policy) {
if (validate_policy_ == OccValidationPolicy::kValidateParallel) { if (validate_policy_ == OccValidationPolicy::kValidateParallel) {
uint32_t bucket_size = std::max(16u, occ_options.occ_lock_buckets); auto bucketed_locks = occ_options.shared_lock_buckets;
bucketed_locks_.reserve(bucket_size); if (!bucketed_locks) {
for (size_t i = 0; i < bucket_size; ++i) { uint32_t bucket_count = std::max(16u, occ_options.occ_lock_buckets);
bucketed_locks_.emplace_back( bucketed_locks = MakeSharedOccLockBuckets(bucket_count);
std::unique_ptr<std::mutex>(new std::mutex));
} }
bucketed_locks_ = static_cast_with_check<OccLockBucketsImplBase>(
std::move(bucketed_locks));
} }
} }
@ -62,16 +89,14 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
return OptimisticTransactionDB::Write(write_opts, batch); return OptimisticTransactionDB::Write(write_opts, batch);
} }
size_t GetLockBucketsSize() const { return bucketed_locks_.size(); }
OccValidationPolicy GetValidatePolicy() const { return validate_policy_; } OccValidationPolicy GetValidatePolicy() const { return validate_policy_; }
std::unique_lock<std::mutex> LockBucket(size_t idx); port::Mutex& GetLockBucket(const Slice& key, uint64_t seed) {
return bucketed_locks_->GetLockBucket(key, seed);
}
private: private:
// NOTE: used in validation phase. Each key is hashed into some std::shared_ptr<OccLockBucketsImplBase> bucketed_locks_;
// bucket. We then take the lock in the hash value order to avoid deadlock.
std::vector<std::unique_ptr<std::mutex>> bucketed_locks_;
bool db_owner_; bool db_owner_;

@ -3,8 +3,9 @@
// COPYING file in the root directory) and Apache 2.0 License // COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include <cstdint>
#include <functional> #include <functional>
#include <memory>
#include <string> #include <string>
#include <thread> #include <thread>
@ -27,49 +28,54 @@ class OptimisticTransactionTest
: public testing::Test, : public testing::Test,
public testing::WithParamInterface<OccValidationPolicy> { public testing::WithParamInterface<OccValidationPolicy> {
public: public:
OptimisticTransactionDB* txn_db; std::unique_ptr<OptimisticTransactionDB> txn_db;
std::string dbname; std::string dbname;
Options options; Options options;
OptimisticTransactionDBOptions occ_opts;
OptimisticTransactionTest() { OptimisticTransactionTest() {
options.create_if_missing = true; options.create_if_missing = true;
options.max_write_buffer_number = 2; options.max_write_buffer_number = 2;
options.max_write_buffer_size_to_maintain = 2 * Arena::kInlineSize; options.max_write_buffer_size_to_maintain = 2 * Arena::kInlineSize;
options.merge_operator.reset(new TestPutOperator()); options.merge_operator.reset(new TestPutOperator());
occ_opts.validate_policy = GetParam();
dbname = test::PerThreadDBPath("optimistic_transaction_testdb"); dbname = test::PerThreadDBPath("optimistic_transaction_testdb");
EXPECT_OK(DestroyDB(dbname, options)); EXPECT_OK(DestroyDB(dbname, options));
Open(); Open();
} }
~OptimisticTransactionTest() override { ~OptimisticTransactionTest() override {
delete txn_db; EXPECT_OK(txn_db->Close());
txn_db.reset();
EXPECT_OK(DestroyDB(dbname, options)); EXPECT_OK(DestroyDB(dbname, options));
} }
void Reopen() { void Reopen() {
delete txn_db; txn_db.reset();
txn_db = nullptr;
Open(); Open();
} }
private: static void OpenImpl(const Options& options,
void Open() { const OptimisticTransactionDBOptions& occ_opts,
const std::string& dbname,
std::unique_ptr<OptimisticTransactionDB>* txn_db) {
ColumnFamilyOptions cf_options(options); ColumnFamilyOptions cf_options(options);
OptimisticTransactionDBOptions occ_opts;
occ_opts.validate_policy = GetParam();
std::vector<ColumnFamilyDescriptor> column_families; std::vector<ColumnFamilyDescriptor> column_families;
std::vector<ColumnFamilyHandle*> handles; std::vector<ColumnFamilyHandle*> handles;
column_families.push_back( column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
Status s = OptimisticTransactionDB* raw_txn_db = nullptr;
OptimisticTransactionDB::Open(DBOptions(options), occ_opts, dbname, Status s = OptimisticTransactionDB::Open(
column_families, &handles, &txn_db); options, occ_opts, dbname, column_families, &handles, &raw_txn_db);
ASSERT_OK(s); ASSERT_OK(s);
ASSERT_NE(txn_db, nullptr); ASSERT_NE(raw_txn_db, nullptr);
txn_db->reset(raw_txn_db);
ASSERT_EQ(handles.size(), 1); ASSERT_EQ(handles.size(), 1);
delete handles[0]; delete handles[0];
} }
private:
void Open() { OpenImpl(options, occ_opts, dbname, &txn_db); }
}; };
TEST_P(OptimisticTransactionTest, SuccessTest) { TEST_P(OptimisticTransactionTest, SuccessTest) {
@ -616,8 +622,11 @@ TEST_P(OptimisticTransactionTest, ColumnFamiliesTest) {
delete cfa; delete cfa;
delete cfb; delete cfb;
delete txn_db; txn_db.reset();
txn_db = nullptr;
OptimisticTransactionDBOptions my_occ_opts = occ_opts;
const size_t bucket_count = 500;
my_occ_opts.shared_lock_buckets = MakeSharedOccLockBuckets(bucket_count);
// open DB with three column families // open DB with three column families
std::vector<ColumnFamilyDescriptor> column_families; std::vector<ColumnFamilyDescriptor> column_families;
@ -630,10 +639,11 @@ TEST_P(OptimisticTransactionTest, ColumnFamiliesTest) {
column_families.push_back( column_families.push_back(
ColumnFamilyDescriptor("CFB", ColumnFamilyOptions())); ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
std::vector<ColumnFamilyHandle*> handles; std::vector<ColumnFamilyHandle*> handles;
ASSERT_OK(OptimisticTransactionDB::Open(options, dbname, column_families, OptimisticTransactionDB* raw_txn_db = nullptr;
&handles, &txn_db)); ASSERT_OK(OptimisticTransactionDB::Open(
assert(txn_db != nullptr); options, my_occ_opts, dbname, column_families, &handles, &raw_txn_db));
ASSERT_NE(txn_db, nullptr); ASSERT_NE(raw_txn_db, nullptr);
txn_db.reset(raw_txn_db);
Transaction* txn = txn_db->BeginTransaction(write_options); Transaction* txn = txn_db->BeginTransaction(write_options);
ASSERT_NE(txn, nullptr); ASSERT_NE(txn, nullptr);
@ -694,6 +704,7 @@ TEST_P(OptimisticTransactionTest, ColumnFamiliesTest) {
delete txn; delete txn;
delete txn2; delete txn2;
// ** MultiGet **
txn = txn_db->BeginTransaction(write_options, txn_options); txn = txn_db->BeginTransaction(write_options, txn_options);
snapshot_read_options.snapshot = txn->GetSnapshot(); snapshot_read_options.snapshot = txn->GetSnapshot();
@ -745,11 +756,162 @@ TEST_P(OptimisticTransactionTest, ColumnFamiliesTest) {
s = txn2->Commit(); s = txn2->Commit();
ASSERT_TRUE(s.IsBusy()); ASSERT_TRUE(s.IsBusy());
delete txn;
delete txn2;
// ** Test independence and/or sharing of lock buckets across CFs and DBs **
if (my_occ_opts.validate_policy == OccValidationPolicy::kValidateParallel) {
struct SeenStat {
uint64_t rolling_hash = 0;
uintptr_t min = 0;
uintptr_t max = 0;
};
SeenStat cur_seen;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"OptimisticTransaction::CommitWithParallelValidate::lock_bucket_ptr",
[&](void* arg) {
// Hash the pointer
cur_seen.rolling_hash = Hash64(reinterpret_cast<char*>(&arg),
sizeof(arg), cur_seen.rolling_hash);
uintptr_t val = reinterpret_cast<uintptr_t>(arg);
if (cur_seen.min == 0 || val < cur_seen.min) {
cur_seen.min = val;
}
if (cur_seen.max == 0 || val > cur_seen.max) {
cur_seen.max = val;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Another db sharing lock buckets
auto shared_dbname =
test::PerThreadDBPath("optimistic_transaction_testdb_shared");
std::unique_ptr<OptimisticTransactionDB> shared_txn_db = nullptr;
OpenImpl(options, my_occ_opts, shared_dbname, &shared_txn_db);
// Another db not sharing lock buckets
auto nonshared_dbname =
test::PerThreadDBPath("optimistic_transaction_testdb_nonshared");
std::unique_ptr<OptimisticTransactionDB> nonshared_txn_db = nullptr;
my_occ_opts.occ_lock_buckets = bucket_count;
my_occ_opts.shared_lock_buckets = nullptr;
OpenImpl(options, my_occ_opts, nonshared_dbname, &nonshared_txn_db);
// Plenty of keys to avoid randomly hitting the same hash sequence
std::array<std::string, 30> keys;
for (size_t i = 0; i < keys.size(); ++i) {
keys[i] = std::to_string(i);
}
// Get a baseline pattern of bucket accesses
cur_seen = {};
txn = txn_db->BeginTransaction(write_options, txn_options);
for (const auto& key : keys) {
txn->Put(handles[0], key, "blah");
}
ASSERT_OK(txn->Commit());
// Sufficiently large hash coverage of the space
const uintptr_t min_span_bytes = sizeof(port::Mutex) * bucket_count / 2;
ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes);
// Save
SeenStat base_seen = cur_seen;
// Verify it is repeatable
cur_seen = {};
txn = txn_db->BeginTransaction(write_options, txn_options, txn);
for (const auto& key : keys) {
txn->Put(handles[0], key, "moo");
}
ASSERT_OK(txn->Commit());
ASSERT_EQ(cur_seen.rolling_hash, base_seen.rolling_hash);
ASSERT_EQ(cur_seen.min, base_seen.min);
ASSERT_EQ(cur_seen.max, base_seen.max);
// Try another CF
cur_seen = {};
txn = txn_db->BeginTransaction(write_options, txn_options, txn);
for (const auto& key : keys) {
txn->Put(handles[1], key, "blah");
}
ASSERT_OK(txn->Commit());
// Different access pattern (different hash seed)
ASSERT_NE(cur_seen.rolling_hash, base_seen.rolling_hash);
// Same pointer space
ASSERT_LT(cur_seen.min, base_seen.max);
ASSERT_GT(cur_seen.max, base_seen.min);
// Sufficiently large hash coverage of the space
ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes);
// Save
SeenStat cf1_seen = cur_seen;
// And another CF
cur_seen = {};
txn = txn_db->BeginTransaction(write_options, txn_options, txn);
for (const auto& key : keys) {
txn->Put(handles[2], key, "blah");
}
ASSERT_OK(txn->Commit());
// Different access pattern (different hash seed)
ASSERT_NE(cur_seen.rolling_hash, base_seen.rolling_hash);
ASSERT_NE(cur_seen.rolling_hash, cf1_seen.rolling_hash);
// Same pointer space
ASSERT_LT(cur_seen.min, base_seen.max);
ASSERT_GT(cur_seen.max, base_seen.min);
// Sufficiently large hash coverage of the space
ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes);
// And DB with shared lock buckets
cur_seen = {};
delete txn;
txn = shared_txn_db->BeginTransaction(write_options, txn_options);
for (const auto& key : keys) {
txn->Put(key, "blah");
}
ASSERT_OK(txn->Commit());
// Different access pattern (different hash seed)
ASSERT_NE(cur_seen.rolling_hash, base_seen.rolling_hash);
ASSERT_NE(cur_seen.rolling_hash, cf1_seen.rolling_hash);
// Same pointer space
ASSERT_LT(cur_seen.min, base_seen.max);
ASSERT_GT(cur_seen.max, base_seen.min);
// Sufficiently large hash coverage of the space
ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes);
// And DB with distinct lock buckets
cur_seen = {};
delete txn;
txn = nonshared_txn_db->BeginTransaction(write_options, txn_options);
for (const auto& key : keys) {
txn->Put(key, "blah");
}
ASSERT_OK(txn->Commit());
// Different access pattern (different hash seed)
ASSERT_NE(cur_seen.rolling_hash, base_seen.rolling_hash);
ASSERT_NE(cur_seen.rolling_hash, cf1_seen.rolling_hash);
// Different pointer space
ASSERT_TRUE(cur_seen.min > base_seen.max || cur_seen.max < base_seen.min);
// Sufficiently large hash coverage of the space
ASSERT_GT(cur_seen.max - cur_seen.min, min_span_bytes);
delete txn;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
// ** Test dropping column family before committing, or even creating txn **
txn = txn_db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn->Delete(handles[1], "AAA"));
s = txn_db->DropColumnFamily(handles[1]); s = txn_db->DropColumnFamily(handles[1]);
ASSERT_OK(s); ASSERT_OK(s);
s = txn_db->DropColumnFamily(handles[2]); s = txn_db->DropColumnFamily(handles[2]);
ASSERT_OK(s); ASSERT_OK(s);
ASSERT_NOK(txn->Commit());
txn2 = txn_db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn2->Delete(handles[2], "AAA"));
ASSERT_NOK(txn2->Commit());
delete txn; delete txn;
delete txn2; delete txn2;
@ -1402,7 +1564,7 @@ TEST_P(OptimisticTransactionTest, OptimisticTransactionStressTest) {
std::function<void()> call_inserter = [&] { std::function<void()> call_inserter = [&] {
ASSERT_OK(OptimisticTransactionStressTestInserter( ASSERT_OK(OptimisticTransactionStressTestInserter(
txn_db, num_transactions_per_thread, num_sets, num_keys_per_set)); txn_db.get(), num_transactions_per_thread, num_sets, num_keys_per_set));
}; };
// Create N threads that use RandomTransactionInserter to write // Create N threads that use RandomTransactionInserter to write
@ -1417,7 +1579,7 @@ TEST_P(OptimisticTransactionTest, OptimisticTransactionStressTest) {
} }
// Verify that data is consistent // Verify that data is consistent
Status s = RandomTransactionInserter::Verify(txn_db, num_sets); Status s = RandomTransactionInserter::Verify(txn_db.get(), num_sets);
ASSERT_OK(s); ASSERT_OK(s);
} }
@ -1469,6 +1631,19 @@ INSTANTIATE_TEST_CASE_P(
testing::Values(OccValidationPolicy::kValidateSerial, testing::Values(OccValidationPolicy::kValidateSerial,
OccValidationPolicy::kValidateParallel)); OccValidationPolicy::kValidateParallel));
TEST(OccLockBucketsTest, CacheAligned) {
// Typical x86_64 is 40 byte mutex, 64 byte cache line
if (sizeof(port::Mutex) >= sizeof(CacheAlignedWrapper<port::Mutex>)) {
ROCKSDB_GTEST_BYPASS("Test requires mutex smaller than cache line");
return;
}
auto buckets_unaligned = MakeSharedOccLockBuckets(100, false);
auto buckets_aligned = MakeSharedOccLockBuckets(100, true);
// Save at least one byte per bucket
ASSERT_LE(buckets_unaligned->ApproximateMemoryUsage() + 100,
buckets_aligned->ApproximateMemoryUsage());
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {
@ -1476,4 +1651,3 @@ int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }

Loading…
Cancel
Save