WritePrepared Txn: Refactoring WriteCallback

Summary:
Refactor the logic around WriteCallback in the write path to clarify when and how exactly we advance the sequence number and making sure it is consistent across the code.
Closes https://github.com/facebook/rocksdb/pull/3168

Differential Revision: D6324312

Pulled By: maysamyabandeh

fbshipit-source-id: 9a34f479561fdb2a5d01ef6d37a28908d03bbe33
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 53863b76f9
commit 54b43563be
  1. 6
      db/db_impl.h
  2. 35
      db/db_impl_write.cc
  3. 7
      db/write_batch.cc

@ -662,6 +662,12 @@ class DBImpl : public DB {
void EraseThreadStatusDbInfo() const;
// If disable_memtable is set the application logic must guarantee that the
// batch will still be skipped from memtable during the recovery. In
// WriteCommitted it is guarnateed since disable_memtable is used for prepare
// batch which will be written to memtable later during the commit, and in
// WritePrepared it is guaranteed since it will be used only for WAL markers
// which will never be written to memtable.
Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,

@ -195,9 +195,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
write_group.size > 1;
size_t total_count = 0;
size_t valid_batches = 0;
uint64_t total_byte_size = 0;
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
valid_batches++;
if (writer->ShouldWriteToMemtable()) {
total_count += WriteBatchInternal::Count(writer->batch);
parallel = parallel && !writer->batch->HasMerge();
@ -207,7 +209,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
}
}
size_t seq_inc = seq_per_batch_ ? write_group.size : total_count;
// Note about seq_per_batch_: either disableWAL is set for the entire write
// group or not. In either case we inc seq for each write batch with no
// failed callback. This means that there could be a batch with
// disalbe_memtable in between; although we do not write this batch to
// memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc
// the seq per valid written key to mem.
size_t seq_inc = seq_per_batch_ ? valid_batches : total_count;
const bool concurrent_update = two_write_queues_;
// Update stats while we are an exclusive group leader, so we know
@ -263,13 +271,21 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_GUARD(write_memtable_time);
if (!parallel) {
// w.sequence will be set inside InsertInto
w.status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this, parallel, seq_per_batch_);
} else {
SequenceNumber next_sequence = current_sequence;
// Note: the logic for advancing seq here must be consistent with the
// logic in WriteBatchInternal::InsertInto(write_group...) as well as
// with WriteBatchInternal::InsertInto(write_batch...) that is called on
// the merged batch during recovery from the WAL.
for (auto* writer : write_group) {
if (writer->CallbackFailed()) {
continue;
}
writer->sequence = next_sequence;
if (seq_per_batch_) {
next_sequence++;
@ -538,14 +554,14 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
auto curr_seq = last_sequence + 1;
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
writer->sequence = curr_seq;
if (writer->CallbackFailed()) {
continue;
}
writer->sequence = curr_seq;
if (seq_per_batch_) {
curr_seq++;
} else if (writer->CheckCallback(this)) {
curr_seq += WriteBatchInternal::Count(writer->batch);
}
// else seq advances only by memtable writes
}
if (status.ok() && write_options.sync) {
// Requesting sync with two_write_queues_ is expected to be very rare. We
@ -689,7 +705,8 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
WriteBatch* merged_batch = nullptr;
*write_with_wal = 0;
auto* leader = write_group.leader;
if (write_group.size == 1 && leader->ShouldWriteToWAL() &&
assert(!leader->disable_wal); // Same holds for all in the batch group
if (write_group.size == 1 && !leader->CallbackFailed() &&
leader->batch->GetWalTerminationPoint().is_cleared()) {
// we simply write the first WriteBatch to WAL if the group only
// contains one batch, that batch should be written to the WAL,
@ -705,7 +722,7 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
// interface
merged_batch = tmp_batch;
for (auto writer : write_group) {
if (writer->ShouldWriteToWAL()) {
if (!writer->CallbackFailed()) {
WriteBatchInternal::Append(merged_batch, writer->batch,
/*WAL_only*/ true);
if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
@ -745,6 +762,8 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
SequenceNumber sequence) {
Status status;
assert(!write_group.leader->disable_wal);
// Same holds for all in the batch group
size_t write_with_wal = 0;
WriteBatch* to_be_cached_state = nullptr;
WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,
@ -812,6 +831,8 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
size_t seq_inc) {
Status status;
assert(!write_group.leader->disable_wal);
// Same holds for all in the batch group
WriteBatch tmp_batch;
size_t write_with_wal = 0;
WriteBatch* to_be_cached_state = nullptr;

@ -1471,13 +1471,16 @@ Status WriteBatchInternal::InsertInto(
db, concurrent_memtable_writes,
nullptr /*has_valid_writes*/, seq_per_batch);
for (auto w : write_group) {
if (w->CallbackFailed()) {
continue;
}
w->sequence = inserter.sequence();
if (!w->ShouldWriteToMemtable()) {
w->sequence = inserter.sequence();
// In seq_per_batch_ mode this advances the seq by one.
inserter.MaybeAdvanceSeq(true);
continue;
}
SetSequence(w->batch, inserter.sequence());
w->sequence = inserter.sequence();
inserter.set_log_number_ref(w->log_ref);
w->status = w->batch->Iterate(&inserter);
if (!w->status.ok()) {

Loading…
Cancel
Save