WritePrepared: Fix SmallestUnCommittedSeq() doesn't check delayed_prepared (#4867)

Summary:
When prepared_txns_ heap is empty, SmallestUnCommittedSeq() should check delayed_prepared_ set as well.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4867

Differential Revision: D13632134

Pulled By: maysamyabandeh

fbshipit-source-id: b0423bb0a58dc95f1e636d5ed3f6e619df801fb7
main
Yi Wu 6 years ago committed by Facebook Github Bot
parent 301da345ae
commit d50c10ed37
  1. 78
      utilities/transactions/write_prepared_transaction_test.cc
  2. 23
      utilities/transactions/write_prepared_txn_db.h

@ -13,6 +13,7 @@
#include <inttypes.h>
#include <algorithm>
#include <atomic>
#include <functional>
#include <string>
#include <thread>
@ -27,6 +28,7 @@
#include "rocksdb/utilities/transaction_db.h"
#include "table/mock_table.h"
#include "util/fault_injection_test_env.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/sync_point.h"
@ -351,6 +353,39 @@ class WritePreparedTransactionTestBase : public TransactionTestBase {
: TransactionTestBase(use_stackable_db, two_write_queue, write_policy){};
protected:
// TODO(mayabndeh): Avoid duplicating PessimisticTransaction::Open logic here.
void DestroyAndReopenWithExtraOptions(size_t snapshot_cache_bits,
size_t commit_cache_bits) {
delete db;
db = nullptr;
DestroyDB(dbname, options);
options.create_if_missing = true;
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
std::vector<ColumnFamilyHandle*> handles;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
std::vector<size_t> compaction_enabled_cf_indices;
TransactionDB::PrepareWrap(&options, &column_families,
&compaction_enabled_cf_indices);
DB* base_db = nullptr;
ASSERT_OK(DBImpl::Open(options, dbname, column_families, &handles, &base_db,
true /*use_seq_per_batch*/,
false /*use_batch_for_txn*/));
// The following is equivalent of WrapDB().
txn_db_options.write_policy = WRITE_PREPARED;
auto* wp_db = new WritePreparedTxnDB(base_db, txn_db_options, snapshot_cache_bits,
commit_cache_bits);
wp_db->UpdateCFComparatorMap(handles);
ASSERT_OK(wp_db->Initialize(compaction_enabled_cf_indices, handles));
ASSERT_EQ(1, handles.size());
delete handles[0];
db = wp_db;
}
// If expect_update is set, check if it actually updated old_commit_map_. If
// it did not and yet suggested not to check the next snapshot, do the
// opposite to check if it was not a bad suggestion.
@ -2023,6 +2058,49 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
db->ReleaseSnapshot(snapshot2);
}
TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // disable commit cache
for (bool has_recent_prepare : {true, false}) {
DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits);
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
auto* transaction =
db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
ASSERT_OK(transaction->SetName("txn"));
ASSERT_OK(transaction->Delete("key1"));
ASSERT_OK(transaction->Prepare());
// snapshot1 should get min_uncommitted from prepared_txns_ heap.
auto snapshot1 = db->GetSnapshot();
ASSERT_EQ(transaction->GetId(), ((SnapshotImpl*)snapshot1)->min_uncommitted_);
// Add a commit to advance max_evicted_seq and move the prepared transaction
// into delayed_prepared_ set.
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
Transaction* txn2 = nullptr;
if (has_recent_prepare) {
txn2 =
db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
ASSERT_OK(txn2->SetName("txn2"));
ASSERT_OK(txn2->Put("key3", "value3"));
ASSERT_OK(txn2->Prepare());
}
// snapshot2 should get min_uncommitted from delayed_prepared_ set.
auto snapshot2 = db->GetSnapshot();
ASSERT_EQ(transaction->GetId(), ((SnapshotImpl*)snapshot1)->min_uncommitted_);
ASSERT_OK(transaction->Commit());
delete transaction;
if (has_recent_prepare) {
ASSERT_OK(txn2->Commit());
delete txn2;
}
VerifyKeys({{"key1", "NOT_FOUND"}});
VerifyKeys({{"key1", "value1"}}, snapshot1);
VerifyKeys({{"key1", "value1"}}, snapshot2);
db->ReleaseSnapshot(snapshot1);
db->ReleaseSnapshot(snapshot2);
}
}
// A more complex test to verify compaction/flush should keep keys visible
// to snapshots.
TEST_P(WritePreparedTransactionTest,

@ -143,6 +143,14 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
prep_seq, snapshot_seq, 0);
return false;
}
if (prep_seq < min_uncommitted) {
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64
" returns %" PRId32
" because of min_uncommitted %" PRIu64,
prep_seq, snapshot_seq, 1, min_uncommitted);
return true;
}
if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
// We should not normally reach here
WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
@ -158,17 +166,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
return false;
}
}
// Note: since min_uncommitted does not include the delayed_prepared_ we
// should check delayed_prepared_ first before applying this optimization.
// TODO(myabandeh): include delayed_prepared_ in min_uncommitted
if (prep_seq < min_uncommitted) {
ROCKS_LOG_DETAILS(info_log_,
"IsInSnapshot %" PRIu64 " in %" PRIu64
" returns %" PRId32
" because of min_uncommitted %" PRIu64,
prep_seq, snapshot_seq, 1, min_uncommitted);
return true;
}
auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE;
CommitEntry64b dont_care;
CommitEntry cached;
@ -512,6 +509,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// reflect any uncommitted data that is not added to prepared_txns_ yet.
// Otherwise, if there is no concurrent txn, this value simply reflects that
// latest value in the memtable.
if (!delayed_prepared_.empty()) {
assert(!delayed_prepared_empty_.load());
return *delayed_prepared_.begin();
}
if (prepared_txns_.empty()) {
return db_impl_->GetLatestSequenceNumber() + 1;
} else {

Loading…
Cancel
Save