WritePrepared Txn: PreReleaseCallback

Summary:
Add PreReleaseCallback to be called at the end of WriteImpl but before publishing the sequence number. The callback is used in WritePrepareTxn to i) update the commit map, ii) update the last published sequence number in the 2nd write queue. It also ensures that all the commits will go to the 2nd queue.
These changes will ensure that the commit map is updated before the sequence number is published and used by reading snapshots. If we use two write queues, the snapshots will use the seq number published by the 2nd queue. If we use one write queue (the default, the snapshots will use the last seq number in the memtable, which also indicates the last published seq number.
Closes https://github.com/facebook/rocksdb/pull/3205

Differential Revision: D6438959

Pulled By: maysamyabandeh

fbshipit-source-id: f8b6c434e94bc5f5ab9cb696879d4c23e2577ab9
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 3e40a5e832
commit 18dcf7f98d
  1. 1
      db/compaction_job_test.cc
  2. 47
      db/db_impl.cc
  3. 19
      db/db_impl.h
  4. 2
      db/db_impl_debug.cc
  5. 8
      db/db_impl_open.cc
  6. 60
      db/db_impl_write.cc
  7. 6
      db/db_iter.cc
  8. 1
      db/db_wal_test.cc
  9. 1
      db/external_sst_file_ingestion_job.cc
  10. 30
      db/pre_release_callback.h
  11. 1
      db/repair.cc
  12. 3
      db/version_set.cc
  13. 27
      db/version_set.h
  14. 1
      db/wal_manager_test.cc
  15. 3
      db/write_batch.cc
  16. 18
      db/write_callback_test.cc
  17. 7
      db/write_thread.h
  18. 3
      tools/ldb_cmd.cc
  19. 2
      utilities/transactions/pessimistic_transaction.cc
  20. 8
      utilities/transactions/transaction_test.h
  21. 87
      utilities/transactions/write_prepared_transaction_test.cc
  22. 93
      utilities/transactions/write_prepared_txn.cc
  23. 16
      utilities/transactions/write_prepared_txn.h
  24. 4
      utilities/transactions/write_prepared_txn_db.cc
  25. 43
      utilities/transactions/write_prepared_txn_db.h

@ -144,6 +144,7 @@ class CompactionJobTest : public testing::Test {
void SetLastSequence(const SequenceNumber sequence_number) { void SetLastSequence(const SequenceNumber sequence_number) {
versions_->SetLastAllocatedSequence(sequence_number + 1); versions_->SetLastAllocatedSequence(sequence_number + 1);
versions_->SetLastPublishedSequence(sequence_number + 1);
versions_->SetLastSequence(sequence_number + 1); versions_->SetLastSequence(sequence_number + 1);
} }

@ -196,16 +196,17 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
two_write_queues_(options.two_write_queues), two_write_queues_(options.two_write_queues),
manual_wal_flush_(options.manual_wal_flush), manual_wal_flush_(options.manual_wal_flush),
seq_per_batch_(seq_per_batch), seq_per_batch_(seq_per_batch),
// When two_write_queues_ and seq_per_batch_ are both enabled we // last_sequencee_ is always maintained by the main queue that also writes
// sometimes allocate a seq also to indicate the commit timestmamp of a // to the memtable. When two_write_queues_ is disabled last seq in
// transaction. In such cases last_sequence_ would not indicate the last // memtable is the same as last seq published to the readers. When it is
// visible sequence number in memtable and should not be used for // enabled but seq_per_batch_ is disabled, last seq in memtable still
// snapshots. It should use last_allocated_sequence_ instaed but also // indicates last published seq since wal-only writes that go to the 2nd
// needs other mechanisms to exclude the data that after last_sequence_ // queue do not consume a sequence number. Otherwise writes performed by
// and before last_allocated_sequence_ from the snapshot. In // the 2nd queue could change what is visible to the readers. In this
// WritePreparedTxn this property is ensured since such data are not // cases, last_seq_same_as_publish_seq_==false, the 2nd queue maintains a
// committed yet. // separate variable to indicate the last published sequence.
allocate_seq_only_for_data_(!(seq_per_batch && options.two_write_queues)), last_seq_same_as_publish_seq_(
!(seq_per_batch && options.two_write_queues)),
// Since seq_per_batch_ is currently set only by WritePreparedTxn which // Since seq_per_batch_ is currently set only by WritePreparedTxn which
// requires a custom gc for compaction, we use that to set use_custom_gc_ // requires a custom gc for compaction, we use that to set use_custom_gc_
// as well. // as well.
@ -765,8 +766,8 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const {
return versions_->LastSequence(); return versions_->LastSequence();
} }
SequenceNumber DBImpl::IncAndFetchSequenceNumber() { void DBImpl::SetLastPublishedSequence(SequenceNumber seq) {
return versions_->FetchAddLastAllocatedSequence(1ull) + 1ull; versions_->SetLastPublishedSequence(seq);
} }
bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) { bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
@ -992,8 +993,9 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
// super versipon because a flush happening in between may compact // super versipon because a flush happening in between may compact
// away data for the snapshot, but the snapshot is earlier than the // away data for the snapshot, but the snapshot is earlier than the
// data overwriting it, so users may see wrong results. // data overwriting it, so users may see wrong results.
snapshot = allocate_seq_only_for_data_ ? versions_->LastSequence() snapshot = last_seq_same_as_publish_seq_
: versions_->LastAllocatedSequence(); ? versions_->LastSequence()
: versions_->LastPublishedSequence();
} }
TEST_SYNC_POINT("DBImpl::GetImpl:3"); TEST_SYNC_POINT("DBImpl::GetImpl:3");
TEST_SYNC_POINT("DBImpl::GetImpl:4"); TEST_SYNC_POINT("DBImpl::GetImpl:4");
@ -1084,8 +1086,9 @@ std::vector<Status> DBImpl::MultiGet(
snapshot = reinterpret_cast<const SnapshotImpl*>( snapshot = reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_; read_options.snapshot)->number_;
} else { } else {
snapshot = allocate_seq_only_for_data_ ? versions_->LastSequence() snapshot = last_seq_same_as_publish_seq_
: versions_->LastAllocatedSequence(); ? versions_->LastSequence()
: versions_->LastPublishedSequence();
} }
for (auto mgd_iter : multiget_cf_data) { for (auto mgd_iter : multiget_cf_data) {
mgd_iter.second->super_version = mgd_iter.second->super_version =
@ -1492,7 +1495,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
#endif #endif
} else { } else {
// Note: no need to consider the special case of // Note: no need to consider the special case of
// allocate_seq_only_for_data_==false since NewIterator is overridden in // last_seq_same_as_publish_seq_==false since NewIterator is overridden in
// WritePreparedTxnDB // WritePreparedTxnDB
auto snapshot = read_options.snapshot != nullptr auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber() ? read_options.snapshot->GetSequenceNumber()
@ -1610,7 +1613,7 @@ Status DBImpl::NewIterators(
#endif #endif
} else { } else {
// Note: no need to consider the special case of // Note: no need to consider the special case of
// allocate_seq_only_for_data_==false since NewIterators is overridden in // last_seq_same_as_publish_seq_==false since NewIterators is overridden in
// WritePreparedTxnDB // WritePreparedTxnDB
auto snapshot = read_options.snapshot != nullptr auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber() ? read_options.snapshot->GetSequenceNumber()
@ -1645,9 +1648,9 @@ const Snapshot* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) {
delete s; delete s;
return nullptr; return nullptr;
} }
auto snapshot_seq = allocate_seq_only_for_data_ auto snapshot_seq = last_seq_same_as_publish_seq_
? versions_->LastSequence() ? versions_->LastSequence()
: versions_->LastAllocatedSequence(); : versions_->LastPublishedSequence();
return snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary); return snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
} }
@ -1658,9 +1661,9 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
snapshots_.Delete(casted_s); snapshots_.Delete(casted_s);
uint64_t oldest_snapshot; uint64_t oldest_snapshot;
if (snapshots_.empty()) { if (snapshots_.empty()) {
oldest_snapshot = allocate_seq_only_for_data_ oldest_snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence() ? versions_->LastSequence()
: versions_->LastAllocatedSequence(); : versions_->LastPublishedSequence();
} else { } else {
oldest_snapshot = snapshots_.oldest()->number_; oldest_snapshot = snapshots_.oldest()->number_;
} }

@ -28,6 +28,7 @@
#include "db/flush_scheduler.h" #include "db/flush_scheduler.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/pre_release_callback.h"
#include "db/read_callback.h" #include "db/read_callback.h"
#include "db/snapshot_checker.h" #include "db/snapshot_checker.h"
#include "db/snapshot_impl.h" #include "db/snapshot_impl.h"
@ -220,8 +221,8 @@ class DBImpl : public DB {
virtual Status SyncWAL() override; virtual Status SyncWAL() override;
virtual SequenceNumber GetLatestSequenceNumber() const override; virtual SequenceNumber GetLatestSequenceNumber() const override;
virtual SequenceNumber IncAndFetchSequenceNumber(); virtual void SetLastPublishedSequence(SequenceNumber seq);
// Returns LastSequence in allocate_seq_only_for_data_ // Returns LastSequence in last_seq_same_as_publish_seq_
// mode and LastAllocatedSequence otherwise. This is useful when visiblility // mode and LastAllocatedSequence otherwise. This is useful when visiblility
// depends also on data written to the WAL but not to the memtable. // depends also on data written to the WAL but not to the memtable.
SequenceNumber TEST_GetLastVisibleSequence() const; SequenceNumber TEST_GetLastVisibleSequence() const;
@ -671,7 +672,8 @@ class DBImpl : public DB {
Status WriteImpl(const WriteOptions& options, WriteBatch* updates, Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr, WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0, uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false, uint64_t* seq_used = nullptr); bool disable_memtable = false, uint64_t* seq_used = nullptr,
PreReleaseCallback* pre_release_callback = nullptr);
Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates, Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr, WriteCallback* callback = nullptr,
@ -682,7 +684,8 @@ class DBImpl : public DB {
Status WriteImplWALOnly(const WriteOptions& options, WriteBatch* updates, Status WriteImplWALOnly(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr, WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0, uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr); uint64_t* seq_used = nullptr,
PreReleaseCallback* pre_release_callback = nullptr);
uint64_t FindMinLogContainingOutstandingPrep(); uint64_t FindMinLogContainingOutstandingPrep();
uint64_t FindMinPrepLogReferencedByMemTable(); uint64_t FindMinPrepLogReferencedByMemTable();
@ -705,6 +708,7 @@ class DBImpl : public DB {
friend class CompactedDBImpl; friend class CompactedDBImpl;
#ifndef NDEBUG #ifndef NDEBUG
friend class DBTest2_ReadCallbackTest_Test; friend class DBTest2_ReadCallbackTest_Test;
friend class WriteCallbackTest_WriteWithCallbackTest_Test;
friend class XFTransactionWriteHandler; friend class XFTransactionWriteHandler;
friend class DBBlobIndexTest; friend class DBBlobIndexTest;
#endif #endif
@ -1344,10 +1348,9 @@ class DBImpl : public DB {
// //
// Default: false // Default: false
const bool seq_per_batch_; const bool seq_per_batch_;
// A sequence number is allocated only for data written to DB. Otherwise it // LastSequence also indicates last published sequence visibile to the
// could also be allocated for operational purposes such as commit timestamp // readers. Otherwise LastPublishedSequence should be used.
// of a transaction. const bool last_seq_same_as_publish_seq_;
const bool allocate_seq_only_for_data_;
// It indicates that a customized gc algorithm must be used for // It indicates that a customized gc algorithm must be used for
// flush/compaction and if it is not provided vis SnapshotChecker, we should // flush/compaction and if it is not provided vis SnapshotChecker, we should
// disable gc to be safe. // disable gc to be safe.

@ -210,7 +210,7 @@ int DBImpl::TEST_BGFlushesAllowed() const {
} }
SequenceNumber DBImpl::TEST_GetLastVisibleSequence() const { SequenceNumber DBImpl::TEST_GetLastVisibleSequence() const {
if (allocate_seq_only_for_data_) { if (last_seq_same_as_publish_seq_) {
return versions_->LastSequence(); return versions_->LastSequence();
} else { } else {
return versions_->LastAllocatedSequence(); return versions_->LastAllocatedSequence();

@ -591,12 +591,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// consecutive, we continue recovery despite corruption. This could // consecutive, we continue recovery despite corruption. This could
// happen when we open and write to a corrupted DB, where sequence id // happen when we open and write to a corrupted DB, where sequence id
// will start from the last sequence id we recovered. // will start from the last sequence id we recovered.
if (sequence == *next_sequence || if (sequence == *next_sequence) {
// With seq_per_batch_, if previous run was with two_write_queues_
// then allocate_seq_only_for_data_ was disabled and a gap in the
// sequence numbers in the log is expected by the commits without
// prepares.
(seq_per_batch_ && sequence >= *next_sequence)) {
stop_replay_for_corruption = false; stop_replay_for_corruption = false;
} }
if (stop_replay_for_corruption) { if (stop_replay_for_corruption) {
@ -762,6 +757,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
if ((*next_sequence != kMaxSequenceNumber) && if ((*next_sequence != kMaxSequenceNumber) &&
(versions_->LastSequence() <= last_sequence)) { (versions_->LastSequence() <= last_sequence)) {
versions_->SetLastAllocatedSequence(last_sequence); versions_->SetLastAllocatedSequence(last_sequence);
versions_->SetLastPublishedSequence(last_sequence);
versions_->SetLastSequence(last_sequence); versions_->SetLastSequence(last_sequence);
} }
} }

@ -57,10 +57,14 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// The main write queue. This is the only write queue that updates LastSequence.
// When using one write queue, the same sequence also indicates the last
// published sequence.
Status DBImpl::WriteImpl(const WriteOptions& write_options, Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback, WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref, uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used) { bool disable_memtable, uint64_t* seq_used,
PreReleaseCallback* pre_release_callback) {
if (my_batch == nullptr) { if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!"); return Status::Corruption("Batch is nullptr!");
} }
@ -89,7 +93,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (two_write_queues_ && disable_memtable) { if (two_write_queues_ && disable_memtable) {
return WriteImplWALOnly(write_options, my_batch, callback, log_used, return WriteImplWALOnly(write_options, my_batch, callback, log_used,
log_ref, seq_used); log_ref, seq_used, pre_release_callback);
} }
if (immutable_db_options_.enable_pipelined_write) { if (immutable_db_options_.enable_pipelined_write) {
@ -99,7 +103,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_GUARD(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref, WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable); disable_memtable, pre_release_callback);
if (!write_options.disableWAL) { if (!write_options.disableWAL) {
RecordTick(stats_, WRITE_WITH_WAL); RecordTick(stats_, WRITE_WITH_WAL);
@ -123,6 +127,17 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (write_thread_.CompleteParallelMemTableWriter(&w)) { if (write_thread_.CompleteParallelMemTableWriter(&w)) {
// we're responsible for exit batch group // we're responsible for exit batch group
for (auto* writer : *(w.write_group)) {
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(writer->sequence);
if (!ws.ok()) {
status = ws;
break;
}
}
}
// TODO(myabandeh): propagate status to write_group
auto last_sequence = w.write_group->last_sequence; auto last_sequence = w.write_group->last_sequence;
versions_->SetLastSequence(last_sequence); versions_->SetLastSequence(last_sequence);
MemTableInsertStatusCheck(w.status); MemTableInsertStatusCheck(w.status);
@ -345,6 +360,16 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
} }
if (should_exit_batch_group) { if (should_exit_batch_group) {
if (status.ok()) { if (status.ok()) {
for (auto* writer : write_group) {
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(writer->sequence);
if (!ws.ok()) {
status = ws;
break;
}
}
}
versions_->SetLastSequence(last_sequence); versions_->SetLastSequence(last_sequence);
} }
MemTableInsertStatusCheck(w.status); MemTableInsertStatusCheck(w.status);
@ -484,14 +509,18 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
return w.FinalStatus(); return w.FinalStatus();
} }
// The 2nd write queue. If enabled it will be used only for WAL-only writes.
// This is the only queue that updates LastPublishedSequence which is only
// applicable in a two-queue setting.
Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback, WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref, uint64_t* log_used, uint64_t log_ref,
uint64_t* seq_used) { uint64_t* seq_used,
PreReleaseCallback* pre_release_callback) {
Status status; Status status;
PERF_TIMER_GUARD(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref, WriteThread::Writer w(write_options, my_batch, callback, log_ref,
true /* disable_memtable */); true /* disable_memtable */, pre_release_callback);
if (write_options.disableWAL) { if (write_options.disableWAL) {
return status; return status;
} }
@ -577,6 +606,18 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
if (!w.CallbackFailed()) { if (!w.CallbackFailed()) {
WriteCallbackStatusCheck(status); WriteCallbackStatusCheck(status);
} }
if (status.ok()) {
for (auto* writer : write_group) {
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
Status ws = writer->pre_release_callback->Callback(writer->sequence);
if (!ws.ok()) {
status = ws;
break;
}
}
}
}
nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, status); nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, status);
if (status.ok()) { if (status.ok()) {
status = w.FinalStatus(); status = w.FinalStatus();
@ -883,13 +924,18 @@ Status DBImpl::WriteRecoverableState() {
log_write_mutex_.Lock(); log_write_mutex_.Lock();
} }
SequenceNumber seq = versions_->LastSequence(); SequenceNumber seq = versions_->LastSequence();
WriteBatchInternal::SetSequence(&cached_recoverable_state_, ++seq); WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1);
auto status = WriteBatchInternal::InsertInto( auto status = WriteBatchInternal::InsertInto(
&cached_recoverable_state_, column_family_memtables_.get(), &cached_recoverable_state_, column_family_memtables_.get(),
&flush_scheduler_, true, 0 /*recovery_log_number*/, this, &flush_scheduler_, true, 0 /*recovery_log_number*/, this,
false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool, false /* concurrent_memtable_writes */, &next_seq, &dont_care_bool,
seq_per_batch_); seq_per_batch_);
versions_->SetLastSequence(--next_seq); auto last_seq = next_seq - 1;
if (two_write_queues_) {
versions_->FetchAddLastAllocatedSequence(last_seq - seq);
}
versions_->SetLastSequence(last_seq);
versions_->SetLastPublishedSequence(last_seq);
if (two_write_queues_) { if (two_write_queues_) {
log_write_mutex_.Unlock(); log_write_mutex_.Unlock();
} }

@ -1384,9 +1384,9 @@ Status ArenaWrappedDBIter::Refresh() {
return Status::NotSupported("Creating renew iterator is not allowed."); return Status::NotSupported("Creating renew iterator is not allowed.");
} }
assert(db_iter_ != nullptr); assert(db_iter_ != nullptr);
// TODO(yiwu): For allocate_seq_only_for_data_==false, this is not the correct // TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
// behavior. Will be corrected automatically when we take a snapshot here for // correct behavior. Will be corrected automatically when we take a snapshot
// the case of WritePreparedTxnDB. // here for the case of WritePreparedTxnDB.
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber(); SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber(); uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
if (sv_number_ != cur_sv_number) { if (sv_number_ != cur_sv_number) {

@ -731,6 +731,7 @@ class RecoveryTestHelper {
WriteBatchInternal::SetSequence(&batch, seq); WriteBatchInternal::SetSequence(&batch, seq);
current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch)); current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch));
versions->SetLastAllocatedSequence(seq); versions->SetLastAllocatedSequence(seq);
versions->SetLastPublishedSequence(seq);
versions->SetLastSequence(seq); versions->SetLastSequence(seq);
} }
} }

@ -200,6 +200,7 @@ Status ExternalSstFileIngestionJob::Run() {
if (consumed_seqno) { if (consumed_seqno) {
versions_->SetLastAllocatedSequence(last_seqno + 1); versions_->SetLastAllocatedSequence(last_seqno + 1);
versions_->SetLastPublishedSequence(last_seqno + 1);
versions_->SetLastSequence(last_seqno + 1); versions_->SetLastSequence(last_seqno + 1);
} }

@ -0,0 +1,30 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/status.h"
namespace rocksdb {
class DB;
class PreReleaseCallback {
public:
virtual ~PreReleaseCallback() {}
// Will be called while on the write thread after the write and before the
// release of the sequence number. This is useful if any operation needs to be
// done before the write gets visible to the readers, or if we want to reduce
// the overhead of locking by updating something sequentially while we are on
// the write thread. If the callback fails, this function returns a non-OK
// status, the sequence number will not be released, and same status will be
// propagated to all the writers in the write group.
// seq is the sequence number that is used for this write and will be
// released.
virtual Status Callback(SequenceNumber seq) = 0;
};
} // namespace rocksdb

@ -549,6 +549,7 @@ class Repairer {
} }
} }
vset_.SetLastAllocatedSequence(max_sequence); vset_.SetLastAllocatedSequence(max_sequence);
vset_.SetLastPublishedSequence(max_sequence);
vset_.SetLastSequence(max_sequence); vset_.SetLastSequence(max_sequence);
for (const auto& cf_id_and_tables : cf_id_to_tables) { for (const auto& cf_id_and_tables : cf_id_to_tables) {

@ -2399,6 +2399,7 @@ VersionSet::VersionSet(const std::string& dbname,
pending_manifest_file_number_(0), pending_manifest_file_number_(0),
last_sequence_(0), last_sequence_(0),
last_allocated_sequence_(0), last_allocated_sequence_(0),
last_published_sequence_(0),
prev_log_number_(0), prev_log_number_(0),
current_version_number_(0), current_version_number_(0),
manifest_file_size_(0), manifest_file_size_(0),
@ -3058,6 +3059,7 @@ Status VersionSet::Recover(
manifest_file_size_ = current_manifest_file_size; manifest_file_size_ = current_manifest_file_size;
next_file_number_.store(next_file + 1); next_file_number_.store(next_file + 1);
last_allocated_sequence_ = last_sequence; last_allocated_sequence_ = last_sequence;
last_published_sequence_ = last_sequence;
last_sequence_ = last_sequence; last_sequence_ = last_sequence;
prev_log_number_ = previous_log_number; prev_log_number_ = previous_log_number;
@ -3429,6 +3431,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
next_file_number_.store(next_file + 1); next_file_number_.store(next_file + 1);
last_allocated_sequence_ = last_sequence; last_allocated_sequence_ = last_sequence;
last_published_sequence_ = last_sequence;
last_sequence_ = last_sequence; last_sequence_ = last_sequence;
prev_log_number_ = previous_log_number; prev_log_number_ = previous_log_number;

@ -765,6 +765,11 @@ class VersionSet {
return last_allocated_sequence_.load(std::memory_order_seq_cst); return last_allocated_sequence_.load(std::memory_order_seq_cst);
} }
// Note: memory_order_acquire must be sufficient.
uint64_t LastPublishedSequence() const {
return last_published_sequence_.load(std::memory_order_seq_cst);
}
// Set the last sequence number to s. // Set the last sequence number to s.
void SetLastSequence(uint64_t s) { void SetLastSequence(uint64_t s) {
assert(s >= last_sequence_); assert(s >= last_sequence_);
@ -773,6 +778,12 @@ class VersionSet {
last_sequence_.store(s, std::memory_order_release); last_sequence_.store(s, std::memory_order_release);
} }
// Note: memory_order_release must be sufficient
void SetLastPublishedSequence(uint64_t s) {
assert(s >= last_published_sequence_);
last_published_sequence_.store(s, std::memory_order_seq_cst);
}
// Note: memory_order_release must be sufficient // Note: memory_order_release must be sufficient
void SetLastAllocatedSequence(uint64_t s) { void SetLastAllocatedSequence(uint64_t s) {
assert(s >= last_allocated_sequence_); assert(s >= last_allocated_sequence_);
@ -888,11 +899,21 @@ class VersionSet {
uint64_t manifest_file_number_; uint64_t manifest_file_number_;
uint64_t options_file_number_; uint64_t options_file_number_;
uint64_t pending_manifest_file_number_; uint64_t pending_manifest_file_number_;
// The last seq visible to reads // The last seq visible to reads. It normally indicates the last sequence in
// the memtable but when using two write queues it could also indicate the
// last sequence in the WAL visible to reads.
std::atomic<uint64_t> last_sequence_; std::atomic<uint64_t> last_sequence_;
// The last seq that is already allocated. The seq might or might not have // The last seq that is already allocated. It is applicable only when we have
// appreated in memtable. // two write queues. In that case seq might or might not have appreated in
// memtable but it is expected to appear in the WAL.
// We have last_sequence <= last_allocated_sequence_
std::atomic<uint64_t> last_allocated_sequence_; std::atomic<uint64_t> last_allocated_sequence_;
// The last allocated sequence that is also published to the readers. This is
// applicable only when last_seq_same_as_publish_seq_ is not set. Otherwise
// last_sequence_ also indicates the last published seq.
// We have last_sequence <= last_published_seqeunce_ <=
// last_allocated_sequence_
std::atomic<uint64_t> last_published_sequence_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
// Opened lazily // Opened lazily

@ -68,6 +68,7 @@ class WalManagerTest : public testing::Test {
WriteBatchInternal::SetSequence(&batch, seq); WriteBatchInternal::SetSequence(&batch, seq);
current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch)); current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch));
versions_->SetLastAllocatedSequence(seq); versions_->SetLastAllocatedSequence(seq);
versions_->SetLastPublishedSequence(seq);
versions_->SetLastSequence(seq); versions_->SetLastSequence(seq);
} }

@ -454,7 +454,8 @@ Status WriteBatch::Iterate(Handler* handler) const {
break; break;
case kTypeLogData: case kTypeLogData:
handler->LogData(blob); handler->LogData(blob);
empty_batch = true; // A batch might have nothing but LogData. It is still a batch.
empty_batch = false;
break; break;
case kTypeBeginPrepareXID: case kTypeBeginPrepareXID:
assert(content_flags_.load(std::memory_order_relaxed) & assert(content_flags_.load(std::memory_order_relaxed) &

@ -290,8 +290,24 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
WriteOptions woptions; WriteOptions woptions;
woptions.disableWAL = !enable_WAL; woptions.disableWAL = !enable_WAL;
woptions.sync = enable_WAL; woptions.sync = enable_WAL;
Status s = db_impl->WriteWithCallback( Status s;
if (seq_per_batch && two_queues) {
class PublishSeqCallback : public PreReleaseCallback {
public:
PublishSeqCallback(DBImpl* db_impl) : db_impl_(db_impl) {}
virtual Status Callback(SequenceNumber last_seq) {
db_impl_->SetLastPublishedSequence(last_seq);
return Status::OK();
}
DBImpl* db_impl_;
} publish_seq_callback(db_impl);
s = db_impl->WriteImpl(woptions, &write_op.write_batch_,
&write_op.callback_, nullptr, 0, false,
nullptr, &publish_seq_callback);
} else {
s = db_impl->WriteWithCallback(
woptions, &write_op.write_batch_, &write_op.callback_); woptions, &write_op.write_batch_, &write_op.callback_);
}
if (write_op.callback_.should_fail_) { if (write_op.callback_.should_fail_) {
ASSERT_TRUE(s.IsBusy()); ASSERT_TRUE(s.IsBusy());

@ -15,6 +15,7 @@
#include <vector> #include <vector>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/pre_release_callback.h"
#include "db/write_callback.h" #include "db/write_callback.h"
#include "monitoring/instrumented_mutex.h" #include "monitoring/instrumented_mutex.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
@ -117,6 +118,7 @@ class WriteThread {
bool no_slowdown; bool no_slowdown;
bool disable_wal; bool disable_wal;
bool disable_memtable; bool disable_memtable;
PreReleaseCallback* pre_release_callback;
uint64_t log_used; // log number that this batch was inserted into uint64_t log_used; // log number that this batch was inserted into
uint64_t log_ref; // log number that memtable insert should reference uint64_t log_ref; // log number that memtable insert should reference
WriteCallback* callback; WriteCallback* callback;
@ -137,6 +139,7 @@ class WriteThread {
no_slowdown(false), no_slowdown(false),
disable_wal(false), disable_wal(false),
disable_memtable(false), disable_memtable(false),
pre_release_callback(nullptr),
log_used(0), log_used(0),
log_ref(0), log_ref(0),
callback(nullptr), callback(nullptr),
@ -148,12 +151,14 @@ class WriteThread {
link_newer(nullptr) {} link_newer(nullptr) {}
Writer(const WriteOptions& write_options, WriteBatch* _batch, Writer(const WriteOptions& write_options, WriteBatch* _batch,
WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable) WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable,
PreReleaseCallback* _pre_release_callback = nullptr)
: batch(_batch), : batch(_batch),
sync(write_options.sync), sync(write_options.sync),
no_slowdown(write_options.no_slowdown), no_slowdown(write_options.no_slowdown),
disable_wal(write_options.disableWAL), disable_wal(write_options.disableWAL),
disable_memtable(_disable_memtable), disable_memtable(_disable_memtable),
pre_release_callback(_pre_release_callback),
log_used(0), log_used(0),
log_ref(_log_ref), log_ref(_log_ref),
callback(_callback), callback(_callback),

@ -369,7 +369,8 @@ void LDBCommand::OpenDB() {
if (column_families_.empty()) { if (column_families_.empty()) {
st = DB::Open(options_, db_path_, &db_); st = DB::Open(options_, db_path_, &db_);
} else { } else {
st = DB::Open(options_, db_path_, column_families_, &handles_opened, &db_); st = DB::Open(options_, db_path_, column_families_, &handles_opened,
&db_);
} }
} }
} }

@ -512,7 +512,7 @@ Status PessimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
if (tracked_at_seq == kMaxSequenceNumber) { if (tracked_at_seq == kMaxSequenceNumber) {
// Since we haven't checked a snapshot, we only know this key has not // Since we haven't checked a snapshot, we only know this key has not
// been modified since after we locked it. // been modified since after we locked it.
// Note: when allocate_seq_only_for_data_==false this is less than the // Note: when last_seq_same_as_publish_seq_==false this is less than the
// latest allocated seq but it is ok since i) this is just a heuristic // latest allocated seq but it is ok since i) this is just a heuristic
// used only as a hint to avoid actual check for conflicts, ii) this would // used only as a hint to avoid actual check for conflicts, ii) this would
// cause a false positive only if the snapthot is taken right after the // cause a false positive only if the snapthot is taken right after the

@ -143,11 +143,9 @@ class TransactionTest : public ::testing::TestWithParam<
} else { } else {
// Consume one seq per batch // Consume one seq per batch
exp_seq++; exp_seq++;
if (options.two_write_queues) {
// Consume one seq for commit // Consume one seq for commit
exp_seq++; exp_seq++;
} }
}
}; };
std::function<void(size_t)> txn_t0 = [&](size_t index) { std::function<void(size_t)> txn_t0 = [&](size_t index) {
return txn_t0_with_status(index, Status::OK()); return txn_t0_with_status(index, Status::OK());
@ -168,11 +166,9 @@ class TransactionTest : public ::testing::TestWithParam<
} else { } else {
// Consume one seq per batch // Consume one seq per batch
exp_seq++; exp_seq++;
if (options.two_write_queues) {
// Consume one seq for commit // Consume one seq for commit
exp_seq++; exp_seq++;
} }
}
ASSERT_OK(s); ASSERT_OK(s);
}; };
std::function<void(size_t)> txn_t2 = [&](size_t index) { std::function<void(size_t)> txn_t2 = [&](size_t index) {
@ -196,11 +192,9 @@ class TransactionTest : public ::testing::TestWithParam<
} else { } else {
// Consume one seq per batch // Consume one seq per batch
exp_seq++; exp_seq++;
if (options.two_write_queues) {
// Consume one seq for commit // Consume one seq for commit
exp_seq++; exp_seq++;
} }
}
auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db); auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
pdb->UnregisterTransaction(txn); pdb->UnregisterTransaction(txn);
delete txn; delete txn;
@ -264,11 +258,9 @@ class TransactionTest : public ::testing::TestWithParam<
exp_seq++; exp_seq++;
// Consume one seq per rollback batch // Consume one seq per rollback batch
exp_seq++; exp_seq++;
if (options.two_write_queues) {
// Consume one seq for rollback commit // Consume one seq for rollback commit
exp_seq++; exp_seq++;
} }
}
delete txn; delete txn;
}; };

@ -727,26 +727,6 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
// The latest seq might be due to a commit without prepare and hence not
// persisted in the WAL. We need to discount such seqs if they are not
// continued by any seq consued by a value write.
if (options.two_write_queues) {
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
MutexLock l(&wp_db->seq_for_metadata_mutex_);
auto& vec = wp_db->seq_for_metadata;
std::sort(vec.begin(), vec.end());
// going backward discount any last seq consumed for metadata until we see
// a seq that is consumed for actualy key/values.
auto rit = vec.rbegin();
for (; rit != vec.rend(); ++rit) {
if (*rit == exp_seq) {
exp_seq--;
} else {
break;
}
}
}
// Check if recovery preserves the last sequence number // Check if recovery preserves the last sequence number
db_impl->FlushWAL(true); db_impl->FlushWAL(true);
ReOpenNoDelete(); ReOpenNoDelete();
@ -1084,11 +1064,10 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
} }
} }
void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v, void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s,
Slice key) { PinnableSlice& exp_v, Slice key) {
Status s; Status s;
PinnableSlice v; PinnableSlice v;
ReadOptions roptions;
s = db->Get(roptions, db->DefaultColumnFamily(), key, &v); s = db->Get(roptions, db->DefaultColumnFamily(), key, &v);
ASSERT_TRUE(exp_s == s); ASSERT_TRUE(exp_s == s);
ASSERT_TRUE(s.ok() || s.IsNotFound()); ASSERT_TRUE(s.ok() || s.IsNotFound());
@ -1110,6 +1089,11 @@ void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v,
} }
} }
void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v,
Slice key) {
ASSERT_SAME(ReadOptions(), db, exp_s, exp_v, key);
}
TEST_P(WritePreparedTransactionTest, RollbackTest) { TEST_P(WritePreparedTransactionTest, RollbackTest) {
ReadOptions roptions; ReadOptions roptions;
WriteOptions woptions; WriteOptions woptions;
@ -1296,9 +1280,7 @@ TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) {
VerifyKeys({{"foo", v}}); VerifyKeys({{"foo", v}});
seq++; // one for the key/value seq++; // one for the key/value
KeyVersion kv = {"foo", v, seq, kTypeValue}; KeyVersion kv = {"foo", v, seq, kTypeValue};
if (options.two_write_queues) {
seq++; // one for the commit seq++; // one for the commit
}
versions.emplace_back(kv); versions.emplace_back(kv);
} }
std::reverse(std::begin(versions), std::end(versions)); std::reverse(std::begin(versions), std::end(versions));
@ -1344,9 +1326,7 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
auto add_key = [&](std::function<Status()> func) { auto add_key = [&](std::function<Status()> func) {
ASSERT_OK(func()); ASSERT_OK(func());
expected_seq++; expected_seq++;
if (options.two_write_queues) {
expected_seq++; // 1 for commit expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
snapshots.push_back(db->GetSnapshot()); snapshots.push_back(db->GetSnapshot());
}; };
@ -1455,16 +1435,12 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2")); ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
expected_seq++; // 1 for write expected_seq++; // 1 for write
SequenceNumber seq1 = expected_seq; SequenceNumber seq1 = expected_seq;
if (options.two_write_queues) {
expected_seq++; // 1 for commit expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2")); ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
expected_seq++; // 1 for write expected_seq++; // 1 for write
SequenceNumber seq2 = expected_seq; SequenceNumber seq2 = expected_seq;
if (options.two_write_queues) {
expected_seq++; // 1 for commit expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence()); ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
ASSERT_OK(db->Flush(FlushOptions())); ASSERT_OK(db->Flush(FlushOptions()));
db->ReleaseSnapshot(snapshot1); db->ReleaseSnapshot(snapshot1);
@ -1587,7 +1563,10 @@ TEST_P(WritePreparedTransactionTest,
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
SequenceNumber seq1 = expected_seq; SequenceNumber seq1 = expected_seq;
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2")); ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber()); DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
expected_seq++; // one for data
expected_seq++; // one for commit
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
ASSERT_OK(db->Flush(FlushOptions())); ASSERT_OK(db->Flush(FlushOptions()));
// Dummy keys to avoid compaction trivially move files and get around actual // Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic. // compaction logic.
@ -1667,6 +1646,50 @@ TEST_P(WritePreparedTransactionTest, Iterate) {
delete transaction; delete transaction;
} }
// Test that updating the commit map will not affect the existing snapshots
TEST_P(WritePreparedTransactionTest, AtomicCommit) {
for (bool skip_prepare : {true, false}) {
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"WritePreparedTxnDB::AddCommitted:start",
"AtomicCommit::GetSnapshot:start"},
{"AtomicCommit::Get:end",
"WritePreparedTxnDB::AddCommitted:start:pause"},
{"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"},
{"AtomicCommit::Get2:end",
"WritePreparedTxnDB::AddCommitted:end:pause:"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rocksdb::port::Thread write_thread([&]() {
if (skip_prepare) {
db->Put(WriteOptions(), Slice("key"), Slice("value"));
} else {
Transaction* txn =
db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->SetName("xid"));
ASSERT_OK(txn->Put(Slice("key"), Slice("value")));
ASSERT_OK(txn->Prepare());
ASSERT_OK(txn->Commit());
delete txn;
}
});
rocksdb::port::Thread read_thread([&]() {
ReadOptions roptions;
TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start");
roptions.snapshot = db->GetSnapshot();
PinnableSlice val;
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &val);
TEST_SYNC_POINT("AtomicCommit::Get:end");
TEST_SYNC_POINT("AtomicCommit::Get2:start");
ASSERT_SAME(roptions, db, s, val, "key");
TEST_SYNC_POINT("AtomicCommit::Get2:end");
db->ReleaseSnapshot(roptions.snapshot);
});
read_thread.join();
write_thread.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
}
// Test that we can change write policy from WriteCommitted to WritePrepared // Test that we can change write policy from WriteCommitted to WritePrepared
// after a clean shutdown (which would empty the WAL) // after a clean shutdown (which would empty the WAL)
TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) { TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) {

@ -61,10 +61,10 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
Status WritePreparedTxn::PrepareInternal() { Status WritePreparedTxn::PrepareInternal() {
WriteOptions write_options = write_options_; WriteOptions write_options = write_options_;
write_options.disableWAL = false; write_options.disableWAL = false;
const bool write_after_commit = true; const bool WRITE_AFTER_COMMIT = true;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_, WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
!write_after_commit); !WRITE_AFTER_COMMIT);
const bool disable_memtable = true; const bool DISABLE_MEMTABLE = true;
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
bool collapsed = GetWriteBatch()->Collapse(); bool collapsed = GetWriteBatch()->Collapse();
if (collapsed) { if (collapsed) {
@ -74,7 +74,7 @@ Status WritePreparedTxn::PrepareInternal() {
Status s = Status s =
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0, /*callback*/ nullptr, &log_number_, /*log ref*/ 0,
!disable_memtable, &seq_used); !DISABLE_MEMTABLE, &seq_used);
assert(seq_used != kMaxSequenceNumber); assert(seq_used != kMaxSequenceNumber);
auto prepare_seq = seq_used; auto prepare_seq = seq_used;
SetId(prepare_seq); SetId(prepare_seq);
@ -91,34 +91,29 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() {
return CommitBatchInternal(GetWriteBatch()->GetWriteBatch()); return CommitBatchInternal(GetWriteBatch()->GetWriteBatch());
} }
SequenceNumber WritePreparedTxn::GetACommitSeqNumber(SequenceNumber prep_seq) {
if (db_impl_->immutable_db_options().two_write_queues) {
auto s = db_impl_->IncAndFetchSequenceNumber();
#ifndef NDEBUG
MutexLock l(&wpt_db_->seq_for_metadata_mutex_);
wpt_db_->seq_for_metadata.push_back(s);
#endif
return s;
} else {
return prep_seq;
}
}
Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) { Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) {
// TODO(myabandeh): handle the duplicate keys in the batch // TODO(myabandeh): handle the duplicate keys in the batch
// In the absence of Prepare markers, use Noop as a batch separator // In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(batch); WriteBatchInternal::InsertNoop(batch);
const bool disable_memtable = true; const bool DISABLE_MEMTABLE = true;
const uint64_t no_log_ref = 0; const uint64_t no_log_ref = 0;
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr, auto s = db_impl_->WriteImpl(write_options_, batch, nullptr, nullptr,
no_log_ref, !disable_memtable, &seq_used); no_log_ref, !DISABLE_MEMTABLE, &seq_used);
assert(seq_used != kMaxSequenceNumber); assert(seq_used != kMaxSequenceNumber);
uint64_t& prepare_seq = seq_used; uint64_t& prepare_seq = seq_used;
uint64_t commit_seq = GetACommitSeqNumber(prepare_seq); // Commit the batch by writing an empty batch to the queue that will release
// TODO(myabandeh): skip AddPrepared // the commit sequence number to readers.
wpt_db_->AddPrepared(prepare_seq); WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_->AddCommitted(prepare_seq, commit_seq); wpt_db_, db_impl_, prepare_seq);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch);
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used,
&update_commit_map);
assert(seq_used != kMaxSequenceNumber);
return s; return s;
} }
@ -137,28 +132,22 @@ Status WritePreparedTxn::CommitInternal() {
WriteBatchInternal::SetAsLastestPersistentState(working_batch); WriteBatchInternal::SetAsLastestPersistentState(working_batch);
} }
const bool disable_memtable = true; // TODO(myabandeh): Reject a commit request if AddCommitted cannot encode
// commit_seq. This happens if prep_seq <<< commit_seq.
auto prepare_seq = GetId();
const bool includes_data = !empty && !for_recovery;
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, prepare_seq, includes_data);
const bool disable_memtable = !includes_data;
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
// Since the prepared batch is directly written to memtable, there is already // Since the prepared batch is directly written to memtable, there is already
// a connection between the memtable and its WAL, so there is no need to // a connection between the memtable and its WAL, so there is no need to
// redundantly reference the log that contains the prepared data. // redundantly reference the log that contains the prepared data.
const uint64_t zero_log_number = 0ull; const uint64_t zero_log_number = 0ull;
auto s = db_impl_->WriteImpl( auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
write_options_, working_batch, nullptr, nullptr, zero_log_number, zero_log_number, disable_memtable, &seq_used,
empty || for_recovery ? disable_memtable : !disable_memtable, &seq_used); &update_commit_map);
assert(seq_used != kMaxSequenceNumber); assert(seq_used != kMaxSequenceNumber);
uint64_t& commit_seq = seq_used;
// TODO(myabandeh): Reject a commit request if AddCommitted cannot encode
// commit_seq. This happens if prep_seq <<< commit_seq.
auto prepare_seq = GetId();
wpt_db_->AddCommitted(prepare_seq, commit_seq);
if (!empty && !for_recovery) {
// Commit the data that is accompnaied with the commit marker
// TODO(myabandeh): skip AddPrepared
wpt_db_->AddPrepared(commit_seq);
uint64_t commit_seq_2 = GetACommitSeqNumber(commit_seq);
wpt_db_->AddCommitted(commit_seq, commit_seq_2);
}
return s; return s;
} }
@ -232,19 +221,31 @@ Status WritePreparedTxn::RollbackInternal() {
} }
// The Rollback marker will be used as a batch separator // The Rollback marker will be used as a batch separator
WriteBatchInternal::MarkRollback(&rollback_batch, name_); WriteBatchInternal::MarkRollback(&rollback_batch, name_);
const bool disable_memtable = true; const bool DISABLE_MEMTABLE = true;
const uint64_t no_log_ref = 0; const uint64_t no_log_ref = 0;
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr,
no_log_ref, !disable_memtable, &seq_used); no_log_ref, !DISABLE_MEMTABLE, &seq_used);
assert(seq_used != kMaxSequenceNumber); assert(seq_used != kMaxSequenceNumber);
if (!s.ok()) {
return s;
}
uint64_t& prepare_seq = seq_used; uint64_t& prepare_seq = seq_used;
uint64_t commit_seq = GetACommitSeqNumber(prepare_seq); // Commit the batch by writing an empty batch to the queue that will release
// TODO(myabandeh): skip AddPrepared // the commit sequence number to readers.
wpt_db_->AddPrepared(prepare_seq); WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_->AddCommitted(prepare_seq, commit_seq); wpt_db_, db_impl_, prepare_seq);
WriteBatch empty_batch;
empty_batch.PutLogData(Slice());
// In the absence of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(&empty_batch);
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used,
&update_commit_map);
assert(seq_used != kMaxSequenceNumber);
// Mark the txn as rolled back // Mark the txn as rolled back
wpt_db_->RollbackPrepared(GetId(), commit_seq); uint64_t& rollback_seq = seq_used;
wpt_db_->RollbackPrepared(GetId(), rollback_seq);
return s; return s;
} }

@ -46,16 +46,16 @@ class WritePreparedTxn : public PessimisticTransaction {
virtual ~WritePreparedTxn() {} virtual ~WritePreparedTxn() {}
// To make WAL commit markers visible, the snapshot will be based on the last // To make WAL commit markers visible, the snapshot will be based on the last
// seq in the WAL, LastAllocatedSequence, as opposed to the last seq in the // seq in the WAL that is also published, LastPublishedSequence, as opposed to
// memtable. // the last seq in the memtable.
using Transaction::Get; using Transaction::Get;
virtual Status Get(const ReadOptions& options, virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override; PinnableSlice* value) override;
// To make WAL commit markers visible, the snapshot will be based on the last // To make WAL commit markers visible, the snapshot will be based on the last
// seq in the WAL, LastAllocatedSequence, as opposed to the last seq in the // seq in the WAL that is also published, LastPublishedSequence, as opposed to
// memtable. // the last seq in the memtable.
using Transaction::GetIterator; using Transaction::GetIterator;
virtual Iterator* GetIterator(const ReadOptions& options) override; virtual Iterator* GetIterator(const ReadOptions& options) override;
virtual Iterator* GetIterator(const ReadOptions& options, virtual Iterator* GetIterator(const ReadOptions& options,
@ -64,8 +64,6 @@ class WritePreparedTxn : public PessimisticTransaction {
private: private:
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
SequenceNumber GetACommitSeqNumber(SequenceNumber prep_seq);
Status PrepareInternal() override; Status PrepareInternal() override;
Status CommitWithoutPrepareInternal() override; Status CommitWithoutPrepareInternal() override;
@ -75,9 +73,9 @@ class WritePreparedTxn : public PessimisticTransaction {
// Since the data is already written to memtables at the Prepare phase, the // Since the data is already written to memtables at the Prepare phase, the
// commit entails writing only a commit marker in the WAL. The sequence number // commit entails writing only a commit marker in the WAL. The sequence number
// of the commit marker is then the commit timestamp of the transaction. To // of the commit marker is then the commit timestamp of the transaction. To
// make the commit timestamp visible to readers, their snapshot is based on // make WAL commit markers visible, the snapshot will be based on the last seq
// the last seq in the WAL, LastAllocatedSequence, as opposed to the last seq // in the WAL that is also published, LastPublishedSequence, as opposed to the
// in the memtable. // last seq in the memtable.
Status CommitInternal() override; Status CommitInternal() override;
Status RollbackInternal() override; Status RollbackInternal() override;

@ -283,6 +283,8 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
uint64_t commit_seq) { uint64_t commit_seq) {
ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64,
prepare_seq, commit_seq); prepare_seq, commit_seq);
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE; auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
CommitEntry64b evicted_64b; CommitEntry64b evicted_64b;
CommitEntry evicted; CommitEntry evicted;
@ -319,6 +321,8 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
} }
} }
} }
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
} }
bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq, bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,

@ -14,6 +14,7 @@
#include <vector> #include <vector>
#include "db/db_iter.h" #include "db/db_iter.h"
#include "db/pre_release_callback.h"
#include "db/read_callback.h" #include "db/read_callback.h"
#include "db/snapshot_checker.h" #include "db/snapshot_checker.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
@ -186,12 +187,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Struct to hold ownership of snapshot and read callback for cleanup. // Struct to hold ownership of snapshot and read callback for cleanup.
struct IteratorState; struct IteratorState;
#ifndef NDEBUG
// For unit tests we can track of the seq numbers that are used for metadata as opposed to actual key/values
std::vector<uint64_t> seq_for_metadata;
mutable port::Mutex seq_for_metadata_mutex_;
#endif
private: private:
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
@ -373,5 +368,41 @@ class WritePreparedTxnReadCallback : public ReadCallback {
SequenceNumber snapshot_; SequenceNumber snapshot_;
}; };
class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
public:
// includes_data indicates that the commit also writes non-empty
// CommitTimeWriteBatch to memtable, which needs to be committed separately.
WritePreparedCommitEntryPreReleaseCallback(WritePreparedTxnDB* db,
DBImpl* db_impl,
SequenceNumber prep_seq,
bool includes_data = false)
: db_(db),
db_impl_(db_impl),
prep_seq_(prep_seq),
includes_data_(includes_data) {}
virtual Status Callback(SequenceNumber commit_seq) {
db_->AddCommitted(prep_seq_, commit_seq);
if (includes_data_) {
// Commit the data that is accompnaied with the commit marker
// TODO(myabandeh): skip AddPrepared
db_->AddPrepared(commit_seq);
db_->AddCommitted(commit_seq, commit_seq);
}
// Publish the sequence number. We can do that here assuming the callback is
// invoked only from one write queue, which would guarantee that the publish
// sequence numbers will be in order, i.e., once a seq is published all the
// seq prior to that are also publishable.
db_impl_->SetLastPublishedSequence(commit_seq);
return Status::OK();
}
private:
WritePreparedTxnDB* db_;
DBImpl* db_impl_;
SequenceNumber prep_seq_;
bool includes_data_;
};
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

Loading…
Cancel
Save