Add facility to write only a portion of WriteBatch to WAL

Summary:
When constructing a write batch a client may now call MarkWalTerminationPoint() on that batch. No batch operations after this call will be added written to the WAL but will still be inserted into the Memtable. This facility is used to remove one of the three WriteImpl calls in 2PC transactions. This produces a ~1% perf improvement.

```
RocksDB - unoptimized 2pc, sync_binlog=1, disable_2pc=off
INFO 2016-08-31 14:30:38,814 [main]: REQUEST PHASE COMPLETED. 75000000 requests done in 2619 seconds. Requests/second = 28628

RocksDB - optimized 2pc , sync_binlog=1, disable_2pc=off
INFO 2016-08-31 16:26:59,442 [main]: REQUEST PHASE COMPLETED. 75000000 requests done in 2581 seconds. Requests/second = 29054
```

Test Plan: Two unit tests added.

Reviewers: sdong, yiwu, IslamAbdelRahman

Reviewed By: yiwu

Subscribers: hermanlee4, dhruba, andrewkr

Differential Revision: https://reviews.facebook.net/D64599
main
Reid Horuff 8 years ago
parent 043cb62d63
commit 2c1f95291d
  1. 9
      db/db_impl.cc
  2. 23
      db/db_wal_test.cc
  3. 24
      db/fault_injection_test.cc
  4. 48
      db/write_batch.cc
  5. 3
      db/write_batch_internal.h
  6. 21
      db/write_batch_test.cc
  7. 29
      include/rocksdb/write_batch.h
  8. 28
      utilities/transactions/transaction_impl.cc

@ -4768,7 +4768,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_GUARD(write_wal_time); PERF_TIMER_GUARD(write_wal_time);
WriteBatch* merged_batch = nullptr; WriteBatch* merged_batch = nullptr;
if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL()) { if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL() &&
write_group[0]->batch->GetWalTerminationPoint().is_cleared()) {
// we simply write the first WriteBatch to WAL if the group only
// contains one batch, that batch should be written to the WAL,
// and the batch is not wanting to be truncated
merged_batch = write_group[0]->batch; merged_batch = write_group[0]->batch;
write_group[0]->log_used = logfile_number_; write_group[0]->log_used = logfile_number_;
} else { } else {
@ -4778,7 +4782,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
merged_batch = &tmp_batch_; merged_batch = &tmp_batch_;
for (auto writer : write_group) { for (auto writer : write_group) {
if (writer->ShouldWriteToWAL()) { if (writer->ShouldWriteToWAL()) {
WriteBatchInternal::Append(merged_batch, writer->batch); WriteBatchInternal::Append(merged_batch, writer->batch,
/*WAL_only*/ true);
} }
writer->log_used = logfile_number_; writer->log_used = logfile_number_;
} }

@ -1130,6 +1130,29 @@ TEST_F(DBWALTest, RecoverFromCorruptedWALWithoutFlush) {
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
TEST_F(DBWALTest, WalTermTest) {
Options options = CurrentOptions();
options.env = env_;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "foo", "bar"));
WriteOptions wo;
wo.sync = true;
wo.disableWAL = false;
WriteBatch batch;
batch.Put("foo", "bar");
batch.MarkWalTerminationPoint();
batch.Put("foo2", "bar2");
ASSERT_OK(dbfull()->Write(wo, &batch));
// make sure we can re-open it.
ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
ASSERT_EQ("bar", Get(1, "foo"));
ASSERT_EQ("NOT_FOUND", Get(1, "foo2"));
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -503,6 +503,30 @@ TEST_P(FaultInjectionTest, ManualLogSyncTest) {
ASSERT_EQ(value_space, val); ASSERT_EQ(value_space, val);
} }
TEST_P(FaultInjectionTest, WriteBatchWalTerminationTest) {
ReadOptions ro;
Options options = CurrentOptions();
options.env = env_;
WriteOptions wo;
wo.sync = true;
wo.disableWAL = false;
WriteBatch batch;
batch.Put("cats", "dogs");
batch.MarkWalTerminationPoint();
batch.Put("boys", "girls");
ASSERT_OK(db_->Write(wo, &batch));
env_->SetFilesystemActive(false);
NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
ASSERT_OK(OpenDB());
std::string val;
ASSERT_OK(db_->Get(ro, "cats", &val));
ASSERT_EQ("dogs", val);
ASSERT_EQ(db_->Get(ro, "boys", &val), Status::NotFound());
}
INSTANTIATE_TEST_CASE_P(FaultTest, FaultInjectionTest, ::testing::Bool()); INSTANTIATE_TEST_CASE_P(FaultTest, FaultInjectionTest, ::testing::Bool());
} // namespace rocksdb } // namespace rocksdb

@ -118,13 +118,6 @@ struct BatchContentClassifier : public WriteBatch::Handler {
} // anon namespace } // anon namespace
struct SavePoint {
size_t size; // size of rep_
int count; // count of elements in rep_
uint32_t content_flags;
};
struct SavePoints { struct SavePoints {
std::stack<SavePoint> stack; std::stack<SavePoint> stack;
}; };
@ -143,11 +136,13 @@ WriteBatch::WriteBatch(const std::string& rep)
WriteBatch::WriteBatch(const WriteBatch& src) WriteBatch::WriteBatch(const WriteBatch& src)
: save_points_(src.save_points_), : save_points_(src.save_points_),
wal_term_point_(src.wal_term_point_),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)), content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
rep_(src.rep_) {} rep_(src.rep_) {}
WriteBatch::WriteBatch(WriteBatch&& src) WriteBatch::WriteBatch(WriteBatch&& src)
: save_points_(std::move(src.save_points_)), : save_points_(std::move(src.save_points_)),
wal_term_point_(std::move(src.wal_term_point_)),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)), content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
rep_(std::move(src.rep_)) {} rep_(std::move(src.rep_)) {}
@ -191,6 +186,8 @@ void WriteBatch::Clear() {
save_points_->stack.pop(); save_points_->stack.pop();
} }
} }
wal_term_point_.clear();
} }
int WriteBatch::Count() const { int WriteBatch::Count() const {
@ -213,6 +210,12 @@ uint32_t WriteBatch::ComputeContentFlags() const {
return rv; return rv;
} }
void WriteBatch::MarkWalTerminationPoint() {
wal_term_point_.size = GetDataSize();
wal_term_point_.count = Count();
wal_term_point_.content_flags = content_flags_;
}
bool WriteBatch::HasPut() const { bool WriteBatch::HasPut() const {
return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0; return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
} }
@ -729,8 +732,8 @@ void WriteBatch::SetSavePoint() {
save_points_ = new SavePoints(); save_points_ = new SavePoints();
} }
// Record length and count of current batch of writes. // Record length and count of current batch of writes.
save_points_->stack.push(SavePoint{ save_points_->stack.push(SavePoint(
GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)}); GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)));
} }
Status WriteBatch::RollbackToSavePoint() { Status WriteBatch::RollbackToSavePoint() {
@ -1252,14 +1255,29 @@ void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
} }
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
SetCount(dst, Count(dst) + Count(src)); const bool wal_only) {
size_t src_len;
int src_count;
uint32_t src_flags;
const SavePoint& batch_end = src->GetWalTerminationPoint();
if (wal_only && !batch_end.is_cleared()) {
src_len = batch_end.size - WriteBatchInternal::kHeader;
src_count = batch_end.count;
src_flags = batch_end.content_flags;
} else {
src_len = src->rep_.size() - WriteBatchInternal::kHeader;
src_count = Count(src);
src_flags = src->content_flags_.load(std::memory_order_relaxed);
}
SetCount(dst, Count(dst) + src_count);
assert(src->rep_.size() >= WriteBatchInternal::kHeader); assert(src->rep_.size() >= WriteBatchInternal::kHeader);
dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
src->rep_.size() - WriteBatchInternal::kHeader);
dst->content_flags_.store( dst->content_flags_.store(
dst->content_flags_.load(std::memory_order_relaxed) | dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
src->content_flags_.load(std::memory_order_relaxed),
std::memory_order_relaxed); std::memory_order_relaxed);
} }

@ -177,7 +177,8 @@ class WriteBatchInternal {
uint64_t log_number = 0, DB* db = nullptr, uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false); bool concurrent_memtable_writes = false);
static void Append(WriteBatch* dst, const WriteBatch* src); static void Append(WriteBatch* dst, const WriteBatch* src,
const bool WAL_only = false);
// Returns the byte size of appending a WriteBatch with ByteSize // Returns the byte size of appending a WriteBatch with ByteSize
// leftByteSize and a WriteBatch with ByteSize rightByteSize // leftByteSize and a WriteBatch with ByteSize rightByteSize

@ -185,6 +185,27 @@ TEST_F(WriteBatchTest, Append) {
"Delete(foo)@203", "Delete(foo)@203",
PrintContents(&b1)); PrintContents(&b1));
ASSERT_EQ(4, b1.Count()); ASSERT_EQ(4, b1.Count());
b2.Clear();
b2.Put("c", "cc");
b2.Put("d", "dd");
b2.MarkWalTerminationPoint();
b2.Put("e", "ee");
WriteBatchInternal::Append(&b1, &b2, /*wal only*/ true);
ASSERT_EQ(
"Put(a, va)@200"
"Put(b, vb)@202"
"Put(b, vb)@201"
"Put(c, cc)@204"
"Put(d, dd)@205"
"Delete(foo)@203",
PrintContents(&b1));
ASSERT_EQ(6, b1.Count());
ASSERT_EQ(
"Put(c, cc)@0"
"Put(d, dd)@1"
"Put(e, ee)@2",
PrintContents(&b2));
ASSERT_EQ(3, b2.Count());
} }
TEST_F(WriteBatchTest, SingleDeletion) { TEST_F(WriteBatchTest, SingleDeletion) {

@ -39,6 +39,25 @@ class ColumnFamilyHandle;
struct SavePoints; struct SavePoints;
struct SliceParts; struct SliceParts;
struct SavePoint {
size_t size; // size of rep_
int count; // count of elements in rep_
uint32_t content_flags;
SavePoint() : size(0), count(0), content_flags(0) {}
SavePoint(size_t _size, int _count, uint32_t _flags)
: size(_size), count(_count), content_flags(_flags) {}
void clear() {
size = 0;
count = 0;
content_flags = 0;
}
bool is_cleared() const { return (size | count | content_flags) == 0; }
};
class WriteBatch : public WriteBatchBase { class WriteBatch : public WriteBatchBase {
public: public:
explicit WriteBatch(size_t reserved_bytes = 0); explicit WriteBatch(size_t reserved_bytes = 0);
@ -280,10 +299,20 @@ class WriteBatch : public WriteBatchBase {
WriteBatch& operator=(const WriteBatch& src); WriteBatch& operator=(const WriteBatch& src);
WriteBatch& operator=(WriteBatch&& src); WriteBatch& operator=(WriteBatch&& src);
// marks this point in the WriteBatch as the last record to
// be inserted into the WAL, provided the WAL is enabled
void MarkWalTerminationPoint();
const SavePoint& GetWalTerminationPoint() const { return wal_term_point_; }
private: private:
friend class WriteBatchInternal; friend class WriteBatchInternal;
SavePoints* save_points_; SavePoints* save_points_;
// When sending a WriteBatch through WriteImpl we might want to
// specify that only the first x records of the batch be written to
// the WAL.
SavePoint wal_term_point_;
// For HasXYZ. Mutable to allow lazy computation of results // For HasXYZ. Mutable to allow lazy computation of results
mutable std::atomic<uint32_t> content_flags_; mutable std::atomic<uint32_t> content_flags_;

@ -250,25 +250,23 @@ Status TransactionImpl::Commit() {
} }
} else if (commit_prepared) { } else if (commit_prepared) {
exec_status_.store(AWAITING_COMMIT); exec_status_.store(AWAITING_COMMIT);
WriteOptions write_options = write_options_;
// insert prepared batch into Memtable only.
// Memtable will ignore BeginPrepare/EndPrepare markers
// in non recovery mode and simply insert the values
write_options.disableWAL = true;
assert(log_number_ > 0);
s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
nullptr, nullptr, log_number_);
if (!s.ok()) {
return s;
}
// We take the commit-time batch and append the Commit marker. // We take the commit-time batch and append the Commit marker.
// We then write this batch to both WAL and Memtable.
// The Memtable will ignore the Commit marker in non-recovery mode // The Memtable will ignore the Commit marker in non-recovery mode
write_options.disableWAL = false; WriteBatch* working_batch = GetCommitTimeWriteBatch();
WriteBatchInternal::MarkCommit(GetCommitTimeWriteBatch(), name_); WriteBatchInternal::MarkCommit(working_batch, name_);
s = db_impl_->WriteImpl(write_options, GetCommitTimeWriteBatch());
// any operations appended to this working_batch will be ignored from WAL
working_batch->MarkWalTerminationPoint();
// insert prepared batch into Memtable only skipping WAL.
// Memtable will ignore BeginPrepare/EndPrepare markers
// in non recovery mode and simply insert the values
WriteBatchInternal::Append(working_batch, GetWriteBatch()->GetWriteBatch());
s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
log_number_);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }

Loading…
Cancel
Save