parallel occ (#6240)

Summary:
This is a continuation of https://github.com/facebook/rocksdb/pull/5320/files
I open a new mr for these purposes, half a year has past since the old mr is posted so it's almost impossible to fulfill some points below on the old mr, especially 5)
1) add validation modes for optimistic txns
2) modify unittests to test both modes
3) make format
4) refine hash functor
5) push to master
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6240

Differential Revision: D19301296

fbshipit-source-id: 5b5b3cbd39558f43947f7d2dec6cd31a06386edb
main
wolfkdy 5 years ago committed by Facebook Github Bot
parent 2fdd8087ce
commit 1ab1231acf
  1. 1
      HISTORY.md
  2. 27
      include/rocksdb/utilities/optimistic_transaction_db.h
  3. 53
      utilities/transactions/optimistic_transaction.cc
  4. 4
      utilities/transactions/optimistic_transaction.h
  5. 20
      utilities/transactions/optimistic_transaction_db_impl.cc
  6. 34
      utilities/transactions/optimistic_transaction_db_impl.h
  7. 63
      utilities/transactions/optimistic_transaction_test.cc

@ -4,6 +4,7 @@
* Added a rocksdb::FileSystem class in include/rocksdb/file_system.h to encapsulate file creation/read/write operations, and an option DBOptions::file_system to allow a user to pass in an instance of rocksdb::FileSystem. If its a non-null value, this will take precendence over DBOptions::env for file operations. A new API rocksdb::FileSystem::Default() returns a platform default object. The DBOptions::env option and Env::Default() API will continue to be used for threading and other OS related functions, and where DBOptions::file_system is not specified, for file operations. For storage developers who are accustomed to rocksdb::Env, the interface in rocksdb::FileSystem is new and will probably undergo some changes as more storage systems are ported to it from rocksdb::Env. As of now, no env other than Posix has been ported to the new interface.
* A new rocksdb::NewSstFileManager() API that allows the caller to pass in separate Env and FileSystem objects.
* Changed Java API for RocksDB.keyMayExist functions to use Holder<byte[]> instead of StringBuilder, so that retrieved values need not decode to Strings.
* A new `OptimisticTransactionDBOptions` Option that allows users to configure occ validation policy. The default policy changes from kValidateSerial to kValidateParallel to reduce mutex contention.
### Bug Fixes
* Fix a bug that can cause unnecessary bg thread to be scheduled(#6104).

@ -31,6 +31,26 @@ struct OptimisticTransactionOptions {
const Comparator* cmp = BytewiseComparator();
};
enum class OccValidationPolicy {
// Validate serially at commit stage, AFTER entering the write-group.
// Isolation validation is processed single-threaded(since in the
// write-group).
// May suffer from high mutex contention, as per this link:
// https://github.com/facebook/rocksdb/issues/4402
kValidateSerial = 0,
// Validate parallelly before commit stage, BEFORE entering the write-group to
// reduce mutex contention. Each txn acquires locks for its write-set
// records in some well-defined order.
kValidateParallel = 1
};
struct OptimisticTransactionDBOptions {
OccValidationPolicy validate_policy = OccValidationPolicy::kValidateParallel;
// works only if validate_policy == OccValidationPolicy::kValidateParallel
uint32_t occ_lock_buckets = (1 << 20);
};
class OptimisticTransactionDB : public StackableDB {
public:
// Open an OptimisticTransactionDB similar to DB::Open().
@ -42,6 +62,13 @@ class OptimisticTransactionDB : public StackableDB {
std::vector<ColumnFamilyHandle*>* handles,
OptimisticTransactionDB** dbptr);
static Status Open(const DBOptions& db_options,
const OptimisticTransactionDBOptions& occ_options,
const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
OptimisticTransactionDB** dbptr);
virtual ~OptimisticTransactionDB() {}
// Starts a new Transaction.

@ -18,6 +18,8 @@
#include "util/cast_util.h"
#include "util/string_util.h"
#include "utilities/transactions/transaction_util.h"
#include "utilities/transactions/optimistic_transaction.h"
#include "utilities/transactions/optimistic_transaction_db_impl.h"
namespace rocksdb {
@ -54,6 +56,22 @@ Status OptimisticTransaction::Prepare() {
}
Status OptimisticTransaction::Commit() {
auto txn_db_impl = static_cast_with_check<OptimisticTransactionDBImpl,
OptimisticTransactionDB>(txn_db_);
assert(txn_db_impl);
OccValidationPolicy policy = txn_db_impl->GetValidatePolicy();
if (policy == OccValidationPolicy::kValidateParallel) {
return CommitWithParallelValidate();
} else if (policy == OccValidationPolicy::kValidateSerial) {
return CommitWithSerialValidate();
} else {
assert(0);
}
// unreachable, just void compiler complain
return Status::OK();
}
Status OptimisticTransaction::CommitWithSerialValidate() {
// Set up callback which will call CheckTransactionForConflicts() to
// check whether this transaction is safe to be committed.
OptimisticTransactionCallback callback(this);
@ -70,6 +88,41 @@ Status OptimisticTransaction::Commit() {
return s;
}
Status OptimisticTransaction::CommitWithParallelValidate() {
auto txn_db_impl = static_cast_with_check<OptimisticTransactionDBImpl,
OptimisticTransactionDB>(txn_db_);
assert(txn_db_impl);
DBImpl* db_impl = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
assert(db_impl);
const size_t space = txn_db_impl->GetLockBucketsSize();
std::set<size_t> lk_idxes;
std::vector<std::unique_lock<std::mutex>> lks;
for (auto& cfit : GetTrackedKeys()) {
for (auto& keyit : cfit.second) {
lk_idxes.insert(fastrange64(GetSliceNPHash64(keyit.first), space));
}
}
// NOTE: in a single txn, all bucket-locks are taken in ascending order.
// In this way, txns from different threads all obey this rule so that
// deadlock can be avoided.
for (auto v : lk_idxes) {
lks.emplace_back(txn_db_impl->LockBucket(v));
}
Status s = TransactionUtil::CheckKeysForConflicts(db_impl, GetTrackedKeys(),
true /* cache_only */);
if (!s.ok()) {
return s;
}
s = db_impl->Write(write_options_, GetWriteBatch()->GetWriteBatch());
if (s.ok()) {
Clear();
}
return s;
}
Status OptimisticTransaction::Rollback() {
Clear();
return Status::OK();

@ -74,6 +74,10 @@ class OptimisticTransaction : public TransactionBaseImpl {
const Slice& /* unused */) override {
// Nothing to unlock.
}
Status CommitWithSerialValidate();
Status CommitWithParallelValidate();
};
// Used at commit time to trigger transaction validation

@ -29,6 +29,12 @@ Transaction* OptimisticTransactionDBImpl::BeginTransaction(
}
}
std::unique_lock<std::mutex> OptimisticTransactionDBImpl::LockBucket(
size_t idx) {
assert(idx < bucketed_locks_.size());
return std::unique_lock<std::mutex>(*bucketed_locks_[idx]);
}
Status OptimisticTransactionDB::Open(const Options& options,
const std::string& dbname,
OptimisticTransactionDB** dbptr) {
@ -54,6 +60,18 @@ Status OptimisticTransactionDB::Open(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
OptimisticTransactionDB** dbptr) {
return OptimisticTransactionDB::Open(db_options,
OptimisticTransactionDBOptions(), dbname,
column_families, handles, dbptr);
}
Status OptimisticTransactionDB::Open(
const DBOptions& db_options,
const OptimisticTransactionDBOptions& occ_options,
const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles,
OptimisticTransactionDB** dbptr) {
Status s;
DB* db;
@ -74,7 +92,7 @@ Status OptimisticTransactionDB::Open(
s = DB::Open(db_options, dbname, column_families_copy, handles, &db);
if (s.ok()) {
*dbptr = new OptimisticTransactionDBImpl(db);
*dbptr = new OptimisticTransactionDBImpl(db, occ_options);
}
return s;

@ -6,6 +6,10 @@
#pragma once
#ifndef ROCKSDB_LITE
#include <mutex>
#include <vector>
#include <algorithm>
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
@ -14,8 +18,21 @@ namespace rocksdb {
class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
public:
explicit OptimisticTransactionDBImpl(DB* db, bool take_ownership = true)
: OptimisticTransactionDB(db), db_owner_(take_ownership) {}
explicit OptimisticTransactionDBImpl(
DB* db, const OptimisticTransactionDBOptions& occ_options,
bool take_ownership = true)
: OptimisticTransactionDB(db),
db_owner_(take_ownership),
validate_policy_(occ_options.validate_policy) {
if (validate_policy_ == OccValidationPolicy::kValidateParallel) {
uint32_t bucket_size = std::max(16u, occ_options.occ_lock_buckets);
bucketed_locks_.reserve(bucket_size);
for (size_t i = 0; i < bucket_size; ++i) {
bucketed_locks_.emplace_back(
std::unique_ptr<std::mutex>(new std::mutex));
}
}
}
~OptimisticTransactionDBImpl() {
// Prevent this stackable from destroying
@ -29,9 +46,20 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB {
const OptimisticTransactionOptions& txn_options,
Transaction* old_txn) override;
size_t GetLockBucketsSize() const { return bucketed_locks_.size(); }
OccValidationPolicy GetValidatePolicy() const { return validate_policy_; }
std::unique_lock<std::mutex> LockBucket(size_t idx);
private:
// NOTE: used in validation phase. Each key is hashed into some
// bucket. We then take the lock in the hash value order to avoid deadlock.
std::vector<std::unique_ptr<std::mutex>> bucketed_locks_;
bool db_owner_;
bool db_owner_;
const OccValidationPolicy validate_policy_;
void ReinitializeTransaction(Transaction* txn,
const WriteOptions& write_options,

@ -26,7 +26,9 @@ using std::string;
namespace rocksdb {
class OptimisticTransactionTest : public testing::Test {
class OptimisticTransactionTest
: public testing::Test,
public testing::WithParamInterface<OccValidationPolicy> {
public:
OptimisticTransactionDB* txn_db;
string dbname;
@ -54,13 +56,25 @@ class OptimisticTransactionTest : public testing::Test {
private:
void Open() {
Status s = OptimisticTransactionDB::Open(options, dbname, &txn_db);
ColumnFamilyOptions cf_options(options);
OptimisticTransactionDBOptions occ_opts;
occ_opts.validate_policy = GetParam();
std::vector<ColumnFamilyDescriptor> column_families;
std::vector<ColumnFamilyHandle*> handles;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
Status s =
OptimisticTransactionDB::Open(DBOptions(options), occ_opts, dbname,
column_families, &handles, &txn_db);
assert(s.ok());
assert(txn_db != nullptr);
assert(handles.size() == 1);
delete handles[0];
}
};
TEST_F(OptimisticTransactionTest, SuccessTest) {
TEST_P(OptimisticTransactionTest, SuccessTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
@ -89,7 +103,7 @@ TEST_F(OptimisticTransactionTest, SuccessTest) {
delete txn;
}
TEST_F(OptimisticTransactionTest, WriteConflictTest) {
TEST_P(OptimisticTransactionTest, WriteConflictTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
@ -123,7 +137,7 @@ TEST_F(OptimisticTransactionTest, WriteConflictTest) {
delete txn;
}
TEST_F(OptimisticTransactionTest, WriteConflictTest2) {
TEST_P(OptimisticTransactionTest, WriteConflictTest2) {
WriteOptions write_options;
ReadOptions read_options;
OptimisticTransactionOptions txn_options;
@ -158,7 +172,7 @@ TEST_F(OptimisticTransactionTest, WriteConflictTest2) {
delete txn;
}
TEST_F(OptimisticTransactionTest, ReadConflictTest) {
TEST_P(OptimisticTransactionTest, ReadConflictTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
OptimisticTransactionOptions txn_options;
@ -197,7 +211,7 @@ TEST_F(OptimisticTransactionTest, ReadConflictTest) {
delete txn;
}
TEST_F(OptimisticTransactionTest, TxnOnlyTest) {
TEST_P(OptimisticTransactionTest, TxnOnlyTest) {
// Test to make sure transactions work when there are no other writes in an
// empty db.
@ -217,7 +231,7 @@ TEST_F(OptimisticTransactionTest, TxnOnlyTest) {
delete txn;
}
TEST_F(OptimisticTransactionTest, FlushTest) {
TEST_P(OptimisticTransactionTest, FlushTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
string value;
@ -257,7 +271,7 @@ TEST_F(OptimisticTransactionTest, FlushTest) {
delete txn;
}
TEST_F(OptimisticTransactionTest, FlushTest2) {
TEST_P(OptimisticTransactionTest, FlushTest2) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
string value;
@ -314,7 +328,7 @@ TEST_F(OptimisticTransactionTest, FlushTest2) {
// Trigger the condition where some old memtables are skipped when doing
// TransactionUtil::CheckKey(), and make sure the result is still correct.
TEST_F(OptimisticTransactionTest, CheckKeySkipOldMemtable) {
TEST_P(OptimisticTransactionTest, CheckKeySkipOldMemtable) {
const int kAttemptHistoryMemtable = 0;
const int kAttemptImmMemTable = 1;
for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable;
@ -426,7 +440,7 @@ TEST_F(OptimisticTransactionTest, CheckKeySkipOldMemtable) {
}
}
TEST_F(OptimisticTransactionTest, NoSnapshotTest) {
TEST_P(OptimisticTransactionTest, NoSnapshotTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
@ -455,7 +469,7 @@ TEST_F(OptimisticTransactionTest, NoSnapshotTest) {
delete txn;
}
TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) {
TEST_P(OptimisticTransactionTest, MultipleSnapshotTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
string value;
@ -562,7 +576,7 @@ TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) {
delete txn2;
}
TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) {
TEST_P(OptimisticTransactionTest, ColumnFamiliesTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
OptimisticTransactionOptions txn_options;
@ -720,7 +734,7 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) {
}
}
TEST_F(OptimisticTransactionTest, EmptyTest) {
TEST_P(OptimisticTransactionTest, EmptyTest) {
WriteOptions write_options;
ReadOptions read_options;
string value;
@ -757,7 +771,7 @@ TEST_F(OptimisticTransactionTest, EmptyTest) {
delete txn;
}
TEST_F(OptimisticTransactionTest, PredicateManyPreceders) {
TEST_P(OptimisticTransactionTest, PredicateManyPreceders) {
WriteOptions write_options;
ReadOptions read_options1, read_options2;
OptimisticTransactionOptions txn_options;
@ -821,7 +835,7 @@ TEST_F(OptimisticTransactionTest, PredicateManyPreceders) {
delete txn2;
}
TEST_F(OptimisticTransactionTest, LostUpdate) {
TEST_P(OptimisticTransactionTest, LostUpdate) {
WriteOptions write_options;
ReadOptions read_options, read_options1, read_options2;
OptimisticTransactionOptions txn_options;
@ -919,7 +933,7 @@ TEST_F(OptimisticTransactionTest, LostUpdate) {
ASSERT_EQ(value, "8");
}
TEST_F(OptimisticTransactionTest, UntrackedWrites) {
TEST_P(OptimisticTransactionTest, UntrackedWrites) {
WriteOptions write_options;
ReadOptions read_options;
string value;
@ -970,7 +984,7 @@ TEST_F(OptimisticTransactionTest, UntrackedWrites) {
delete txn;
}
TEST_F(OptimisticTransactionTest, IteratorTest) {
TEST_P(OptimisticTransactionTest, IteratorTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
OptimisticTransactionOptions txn_options;
@ -1085,7 +1099,7 @@ TEST_F(OptimisticTransactionTest, IteratorTest) {
delete txn;
}
TEST_F(OptimisticTransactionTest, SavepointTest) {
TEST_P(OptimisticTransactionTest, SavepointTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
OptimisticTransactionOptions txn_options;
@ -1249,7 +1263,7 @@ TEST_F(OptimisticTransactionTest, SavepointTest) {
delete txn;
}
TEST_F(OptimisticTransactionTest, UndoGetForUpdateTest) {
TEST_P(OptimisticTransactionTest, UndoGetForUpdateTest) {
WriteOptions write_options;
ReadOptions read_options, snapshot_read_options;
OptimisticTransactionOptions txn_options;
@ -1438,7 +1452,7 @@ Status OptimisticTransactionStressTestInserter(OptimisticTransactionDB* db,
}
} // namespace
TEST_F(OptimisticTransactionTest, OptimisticTransactionStressTest) {
TEST_P(OptimisticTransactionTest, OptimisticTransactionStressTest) {
const size_t num_threads = 4;
const size_t num_transactions_per_thread = 10000;
const size_t num_sets = 3;
@ -1469,7 +1483,7 @@ TEST_F(OptimisticTransactionTest, OptimisticTransactionStressTest) {
ASSERT_OK(s);
}
TEST_F(OptimisticTransactionTest, SequenceNumberAfterRecoverTest) {
TEST_P(OptimisticTransactionTest, SequenceNumberAfterRecoverTest) {
WriteOptions write_options;
OptimisticTransactionOptions transaction_options;
@ -1496,6 +1510,11 @@ TEST_F(OptimisticTransactionTest, SequenceNumberAfterRecoverTest) {
delete transaction;
}
INSTANTIATE_TEST_CASE_P(
InstanceOccGroup, OptimisticTransactionTest,
testing::Values(OccValidationPolicy::kValidateSerial,
OccValidationPolicy::kValidateParallel));
} // namespace rocksdb
int main(int argc, char** argv) {

Loading…
Cancel
Save