@ -55,8 +55,7 @@ Status WritePreparedTxnDB::Initialize(
virtual Status Callback ( SequenceNumber commit_seq ,
virtual Status Callback ( SequenceNumber commit_seq ,
bool is_mem_disabled ) override {
bool is_mem_disabled ) override {
assert ( ! is_mem_disabled ) ;
assert ( ! is_mem_disabled ) ;
const bool PREPARE_SKIPPED = true ;
db_ - > AddCommitted ( commit_seq , commit_seq ) ;
db_ - > AddCommitted ( commit_seq , commit_seq , PREPARE_SKIPPED ) ;
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
@ -167,7 +166,7 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
no_log_ref , ! DISABLE_MEMTABLE , & seq_used ,
no_log_ref , ! DISABLE_MEMTABLE , & seq_used ,
batch_cnt , pre_release_callback ) ;
batch_cnt , pre_release_callback ) ;
assert ( ! s . ok ( ) | | seq_used ! = kMaxSequenceNumber ) ;
assert ( ! s . ok ( ) | | seq_used ! = kMaxSequenceNumber ) ;
uint64_t & prepare_seq = seq_used ;
uint64_t prepare_seq = seq_used ;
if ( txn ! = nullptr ) {
if ( txn ! = nullptr ) {
txn - > SetId ( prepare_seq ) ;
txn - > SetId ( prepare_seq ) ;
}
}
@ -185,9 +184,8 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
// Commit the batch by writing an empty batch to the 2nd queue that will
// Commit the batch by writing an empty batch to the 2nd queue that will
// release the commit sequence number to readers.
// release the commit sequence number to readers.
const size_t ZERO_COMMITS = 0 ;
const size_t ZERO_COMMITS = 0 ;
const bool PREP_HEAP_SKIPPED = true ;
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare (
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare (
this , db_impl_ , prepare_seq , batch_cnt , ZERO_COMMITS , ! PREP_HEAP_SKIPPED ) ;
this , db_impl_ , prepare_seq , batch_cnt , ZERO_COMMITS ) ;
WriteBatch empty_batch ;
WriteBatch empty_batch ;
empty_batch . PutLogData ( Slice ( ) ) ;
empty_batch . PutLogData ( Slice ( ) ) ;
const size_t ONE_BATCH = 1 ;
const size_t ONE_BATCH = 1 ;
@ -197,6 +195,9 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
no_log_ref , DISABLE_MEMTABLE , & seq_used , ONE_BATCH ,
no_log_ref , DISABLE_MEMTABLE , & seq_used , ONE_BATCH ,
& update_commit_map_with_prepare ) ;
& update_commit_map_with_prepare ) ;
assert ( ! s . ok ( ) | | seq_used ! = kMaxSequenceNumber ) ;
assert ( ! s . ok ( ) | | seq_used ! = kMaxSequenceNumber ) ;
// Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks.
RemovePrepared ( prepare_seq , batch_cnt ) ;
return s ;
return s ;
}
}
@ -392,24 +393,13 @@ void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq,
throw std : : runtime_error (
throw std : : runtime_error (
" Rollback reqeust while there are live snapshots. " ) ;
" Rollback reqeust while there are live snapshots. " ) ;
}
}
WriteLock wl ( & prepared_mutex_ ) ;
RemovePrepared ( prep_seq ) ;
prepared_txns_ . erase ( prep_seq ) ;
bool was_empty = delayed_prepared_ . empty ( ) ;
if ( ! was_empty ) {
delayed_prepared_ . erase ( prep_seq ) ;
bool is_empty = delayed_prepared_ . empty ( ) ;
if ( was_empty ! = is_empty ) {
delayed_prepared_empty_ . store ( is_empty , std : : memory_order_release ) ;
}
}
}
}
void WritePreparedTxnDB : : AddCommitted ( uint64_t prepare_seq , uint64_t commit_seq ,
void WritePreparedTxnDB : : AddCommitted ( uint64_t prepare_seq , uint64_t commit_seq ,
bool prepare_skipped , uint8_t loop_cnt ) {
uint8_t loop_cnt ) {
ROCKS_LOG_DETAILS ( info_log_ ,
ROCKS_LOG_DETAILS ( info_log_ , " Txn % " PRIu64 " Committing with % " PRIu64 ,
" Txn % " PRIu64 " Committing with % " PRIu64
prepare_seq , commit_seq ) ;
" (prepare_skipped=%d) " ,
prepare_seq , commit_seq , prepare_skipped ) ;
TEST_SYNC_POINT ( " WritePreparedTxnDB::AddCommitted:start " ) ;
TEST_SYNC_POINT ( " WritePreparedTxnDB::AddCommitted:start " ) ;
TEST_SYNC_POINT ( " WritePreparedTxnDB::AddCommitted:start:pause " ) ;
TEST_SYNC_POINT ( " WritePreparedTxnDB::AddCommitted:start:pause " ) ;
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE ;
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE ;
@ -443,23 +433,27 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
if ( loop_cnt > 100 ) {
if ( loop_cnt > 100 ) {
throw std : : runtime_error ( " Infinite loop in AddCommitted! " ) ;
throw std : : runtime_error ( " Infinite loop in AddCommitted! " ) ;
}
}
AddCommitted ( prepare_seq , commit_seq , prepare_skipped , + + loop_cnt ) ;
AddCommitted ( prepare_seq , commit_seq , + + loop_cnt ) ;
return ;
return ;
}
}
if ( ! prepare_skipped ) {
TEST_SYNC_POINT ( " WritePreparedTxnDB::AddCommitted:end " ) ;
WriteLock wl ( & prepared_mutex_ ) ;
TEST_SYNC_POINT ( " WritePreparedTxnDB::AddCommitted:end:pause " ) ;
prepared_txns_ . erase ( prepare_seq ) ;
}
void WritePreparedTxnDB : : RemovePrepared ( const uint64_t prepare_seq ,
const size_t batch_cnt ) {
WriteLock wl ( & prepared_mutex_ ) ;
for ( size_t i = 0 ; i < batch_cnt ; i + + ) {
prepared_txns_ . erase ( prepare_seq + i ) ;
bool was_empty = delayed_prepared_ . empty ( ) ;
bool was_empty = delayed_prepared_ . empty ( ) ;
if ( ! was_empty ) {
if ( ! was_empty ) {
delayed_prepared_ . erase ( prepare_seq ) ;
delayed_prepared_ . erase ( prepare_seq + i ) ;
bool is_empty = delayed_prepared_ . empty ( ) ;
bool is_empty = delayed_prepared_ . empty ( ) ;
if ( was_empty ! = is_empty ) {
if ( was_empty ! = is_empty ) {
delayed_prepared_empty_ . store ( is_empty , std : : memory_order_release ) ;
delayed_prepared_empty_ . store ( is_empty , std : : memory_order_release ) ;
}
}
}
}
}
}
TEST_SYNC_POINT ( " WritePreparedTxnDB::AddCommitted:end " ) ;
TEST_SYNC_POINT ( " WritePreparedTxnDB::AddCommitted:end:pause " ) ;
}
}
bool WritePreparedTxnDB : : GetCommitEntry ( const uint64_t indexed_seq ,
bool WritePreparedTxnDB : : GetCommitEntry ( const uint64_t indexed_seq ,
@ -542,10 +536,13 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
}
}
const Snapshot * WritePreparedTxnDB : : GetSnapshot ( ) {
const Snapshot * WritePreparedTxnDB : : GetSnapshot ( ) {
// Note: SmallestUnCommittedSeq must be called before GetSnapshotImpl. Refer
// to WritePreparedTxn::SetSnapshot for more explanation.
auto min_uncommitted = WritePreparedTxnDB : : SmallestUnCommittedSeq ( ) ;
const bool FOR_WW_CONFLICT_CHECK = true ;
const bool FOR_WW_CONFLICT_CHECK = true ;
SnapshotImpl * snap_impl = db_impl_ - > GetSnapshotImpl ( ! FOR_WW_CONFLICT_CHECK ) ;
SnapshotImpl * snap_impl = db_impl_ - > GetSnapshotImpl ( ! FOR_WW_CONFLICT_CHECK ) ;
assert ( snap_impl ) ;
assert ( snap_impl ) ;
EnhanceSnapshot ( snap_impl ) ;
EnhanceSnapshot ( snap_impl , min_uncommitted ) ;
return snap_impl ;
return snap_impl ;
}
}