Fix write_batch_test when ASSERT_STATUS_CHECKED=1 (#7575)

Summary:
Without this PR, `ASSERT_STATUS_CHECKED=1 make   -j32 write_batch_test && ./write_batch_test` fails.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7575

Test Plan: ASSERT_STATUS_CHECKED=1 make   -j32 write_batch_test && ./write_batch_test

Reviewed By: zhichao-cao

Differential Revision: D24411442

Pulled By: cheng-chang

fbshipit-source-id: f67dc43c44d6afcc6d7e5ff15c6ae9bbf4dfc943
main
Cheng Chang 4 years ago committed by Facebook GitHub Bot
parent 1eda625eab
commit 73dbe10bbf
  1. 82
      db/write_batch.cc
  2. 182
      db/write_batch_test.cc
  3. 3
      utilities/write_batch_with_index/write_batch_with_index_internal.cc

@ -640,7 +640,8 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
case kTypeBeginPrepareXID: case kTypeBeginPrepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) & assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE)); (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
handler->MarkBeginPrepare(); s = handler->MarkBeginPrepare();
assert(s.ok());
empty_batch = false; empty_batch = false;
if (!handler->WriteAfterCommit()) { if (!handler->WriteAfterCommit()) {
s = Status::NotSupported( s = Status::NotSupported(
@ -659,7 +660,8 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
case kTypeBeginPersistedPrepareXID: case kTypeBeginPersistedPrepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) & assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE)); (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
handler->MarkBeginPrepare(); s = handler->MarkBeginPrepare();
assert(s.ok());
empty_batch = false; empty_batch = false;
if (handler->WriteAfterCommit()) { if (handler->WriteAfterCommit()) {
s = Status::NotSupported( s = Status::NotSupported(
@ -672,7 +674,8 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
case kTypeBeginUnprepareXID: case kTypeBeginUnprepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) & assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE)); (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_UNPREPARE));
handler->MarkBeginPrepare(true /* unprepared */); s = handler->MarkBeginPrepare(true /* unprepared */);
assert(s.ok());
empty_batch = false; empty_batch = false;
if (handler->WriteAfterCommit()) { if (handler->WriteAfterCommit()) {
s = Status::NotSupported( s = Status::NotSupported(
@ -691,23 +694,27 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
case kTypeEndPrepareXID: case kTypeEndPrepareXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) & assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE)); (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
handler->MarkEndPrepare(xid); s = handler->MarkEndPrepare(xid);
assert(s.ok());
empty_batch = true; empty_batch = true;
break; break;
case kTypeCommitXID: case kTypeCommitXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) & assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT)); (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
handler->MarkCommit(xid); s = handler->MarkCommit(xid);
assert(s.ok());
empty_batch = true; empty_batch = true;
break; break;
case kTypeRollbackXID: case kTypeRollbackXID:
assert(wb->content_flags_.load(std::memory_order_relaxed) & assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK)); (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
handler->MarkRollback(xid); s = handler->MarkRollback(xid);
assert(s.ok());
empty_batch = true; empty_batch = true;
break; break;
case kTypeNoop: case kTypeNoop:
handler->MarkNoop(empty_batch); s = handler->MarkNoop(empty_batch);
assert(s.ok());
empty_batch = true; empty_batch = true;
break; break;
default: default:
@ -1411,8 +1418,8 @@ class MemTableInserter : public WriteBatch::Handler {
const Slice& value, ValueType value_type) { const Slice& value, ValueType value_type) {
// optimize for non-recovery mode // optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); return WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key,
return Status::OK(); value);
// else insert the values to the memtable right away // else insert the values to the memtable right away
} }
@ -1423,7 +1430,9 @@ class MemTableInserter : public WriteBatch::Handler {
assert(!write_after_commit_); assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still // The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit. // need to keep track of the keys for upcoming rollback/commit.
WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
key, value);
assert(ret_status.ok());
batch_boundry = IsDuplicateKeySeq(column_family_id, key); batch_boundry = IsDuplicateKeySeq(column_family_id, key);
} }
MaybeAdvanceSeq(batch_boundry); MaybeAdvanceSeq(batch_boundry);
@ -1501,7 +1510,9 @@ class MemTableInserter : public WriteBatch::Handler {
assert(!write_after_commit_); assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to // If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object. // the rebuilding transaction object.
WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id,
key, value);
assert(ret_status.ok());
} }
// Since all Puts are logged in transaction logs (if enabled), always bump // Since all Puts are logged in transaction logs (if enabled), always bump
// sequence number. Even if the update eventually fails and does not result // sequence number. Even if the update eventually fails and does not result
@ -1538,8 +1549,7 @@ class MemTableInserter : public WriteBatch::Handler {
Status DeleteCF(uint32_t column_family_id, const Slice& key) override { Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
// optimize for non-recovery mode // optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); return WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
return Status::OK();
// else insert the values to the memtable right away // else insert the values to the memtable right away
} }
@ -1550,7 +1560,9 @@ class MemTableInserter : public WriteBatch::Handler {
assert(!write_after_commit_); assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still // The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit. // need to keep track of the keys for upcoming rollback/commit.
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); ret_status =
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
assert(ret_status.ok());
batch_boundry = IsDuplicateKeySeq(column_family_id, key); batch_boundry = IsDuplicateKeySeq(column_family_id, key);
} }
MaybeAdvanceSeq(batch_boundry); MaybeAdvanceSeq(batch_boundry);
@ -1570,7 +1582,8 @@ class MemTableInserter : public WriteBatch::Handler {
assert(!write_after_commit_); assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to // If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object. // the rebuilding transaction object.
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); ret_status =
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
} }
return ret_status; return ret_status;
} }
@ -1578,8 +1591,8 @@ class MemTableInserter : public WriteBatch::Handler {
Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override { Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override {
// optimize for non-recovery mode // optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key); return WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
return Status::OK(); key);
// else insert the values to the memtable right away // else insert the values to the memtable right away
} }
@ -1590,8 +1603,9 @@ class MemTableInserter : public WriteBatch::Handler {
assert(!write_after_commit_); assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still // The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit. // need to keep track of the keys for upcoming rollback/commit.
WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
key); column_family_id, key);
assert(ret_status.ok());
batch_boundry = IsDuplicateKeySeq(column_family_id, key); batch_boundry = IsDuplicateKeySeq(column_family_id, key);
} }
MaybeAdvanceSeq(batch_boundry); MaybeAdvanceSeq(batch_boundry);
@ -1605,7 +1619,8 @@ class MemTableInserter : public WriteBatch::Handler {
assert(!write_after_commit_); assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to // If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object. // the rebuilding transaction object.
WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key); ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
column_family_id, key);
} }
return ret_status; return ret_status;
} }
@ -1614,9 +1629,8 @@ class MemTableInserter : public WriteBatch::Handler {
const Slice& end_key) override { const Slice& end_key) override {
// optimize for non-recovery mode // optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id, return WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
begin_key, end_key); begin_key, end_key);
return Status::OK();
// else insert the values to the memtable right away // else insert the values to the memtable right away
} }
@ -1627,8 +1641,9 @@ class MemTableInserter : public WriteBatch::Handler {
assert(!write_after_commit_); assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still // The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit. // need to keep track of the keys for upcoming rollback/commit.
WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id, ret_status = WriteBatchInternal::DeleteRange(
begin_key, end_key); rebuilding_trx_, column_family_id, begin_key, end_key);
assert(ret_status.ok());
// TODO(myabandeh): when transactional DeleteRange support is added, // TODO(myabandeh): when transactional DeleteRange support is added,
// check if end_key must also be added. // check if end_key must also be added.
batch_boundry = IsDuplicateKeySeq(column_family_id, begin_key); batch_boundry = IsDuplicateKeySeq(column_family_id, begin_key);
@ -1667,8 +1682,8 @@ class MemTableInserter : public WriteBatch::Handler {
assert(!write_after_commit_); assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to // If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object. // the rebuilding transaction object.
WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id, ret_status = WriteBatchInternal::DeleteRange(
begin_key, end_key); rebuilding_trx_, column_family_id, begin_key, end_key);
} }
return ret_status; return ret_status;
} }
@ -1677,8 +1692,8 @@ class MemTableInserter : public WriteBatch::Handler {
const Slice& value) override { const Slice& value) override {
// optimize for non-recovery mode // optimize for non-recovery mode
if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) {
WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); return WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
return Status::OK(); value);
// else insert the values to the memtable right away // else insert the values to the memtable right away
} }
@ -1689,8 +1704,9 @@ class MemTableInserter : public WriteBatch::Handler {
assert(!write_after_commit_); assert(!write_after_commit_);
// The CF is probably flushed and hence no need for insert but we still // The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit. // need to keep track of the keys for upcoming rollback/commit.
WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, ret_status = WriteBatchInternal::Merge(rebuilding_trx_,
value); column_family_id, key, value);
assert(ret_status.ok());
batch_boundry = IsDuplicateKeySeq(column_family_id, key); batch_boundry = IsDuplicateKeySeq(column_family_id, key);
} }
MaybeAdvanceSeq(batch_boundry); MaybeAdvanceSeq(batch_boundry);
@ -1784,7 +1800,9 @@ class MemTableInserter : public WriteBatch::Handler {
assert(!write_after_commit_); assert(!write_after_commit_);
// If the ret_status is TryAgain then let the next try to add the ky to // If the ret_status is TryAgain then let the next try to add the ky to
// the rebuilding transaction object. // the rebuilding transaction object.
WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id,
key, value);
assert(ret_status.ok());
} }
MaybeAdvanceSeq(); MaybeAdvanceSeq();
CheckMemtableFull(); CheckMemtableFull();

@ -138,10 +138,10 @@ TEST_F(WriteBatchTest, Empty) {
TEST_F(WriteBatchTest, Multiple) { TEST_F(WriteBatchTest, Multiple) {
WriteBatch batch; WriteBatch batch;
batch.Put(Slice("foo"), Slice("bar")); ASSERT_OK(batch.Put(Slice("foo"), Slice("bar")));
batch.Delete(Slice("box")); ASSERT_OK(batch.Delete(Slice("box")));
batch.DeleteRange(Slice("bar"), Slice("foo")); ASSERT_OK(batch.DeleteRange(Slice("bar"), Slice("foo")));
batch.Put(Slice("baz"), Slice("boo")); ASSERT_OK(batch.Put(Slice("baz"), Slice("boo")));
WriteBatchInternal::SetSequence(&batch, 100); WriteBatchInternal::SetSequence(&batch, 100);
ASSERT_EQ(100U, WriteBatchInternal::Sequence(&batch)); ASSERT_EQ(100U, WriteBatchInternal::Sequence(&batch));
ASSERT_EQ(4u, WriteBatchInternal::Count(&batch)); ASSERT_EQ(4u, WriteBatchInternal::Count(&batch));
@ -156,12 +156,12 @@ TEST_F(WriteBatchTest, Multiple) {
TEST_F(WriteBatchTest, Corruption) { TEST_F(WriteBatchTest, Corruption) {
WriteBatch batch; WriteBatch batch;
batch.Put(Slice("foo"), Slice("bar")); ASSERT_OK(batch.Put(Slice("foo"), Slice("bar")));
batch.Delete(Slice("box")); ASSERT_OK(batch.Delete(Slice("box")));
WriteBatchInternal::SetSequence(&batch, 200); WriteBatchInternal::SetSequence(&batch, 200);
Slice contents = WriteBatchInternal::Contents(&batch); Slice contents = WriteBatchInternal::Contents(&batch);
WriteBatchInternal::SetContents(&batch, ASSERT_OK(WriteBatchInternal::SetContents(
Slice(contents.data(),contents.size()-1)); &batch, Slice(contents.data(), contents.size() - 1)));
ASSERT_EQ("Put(foo, bar)@200" ASSERT_EQ("Put(foo, bar)@200"
"Corruption: bad WriteBatch Delete", "Corruption: bad WriteBatch Delete",
PrintContents(&batch)); PrintContents(&batch));
@ -171,24 +171,24 @@ TEST_F(WriteBatchTest, Append) {
WriteBatch b1, b2; WriteBatch b1, b2;
WriteBatchInternal::SetSequence(&b1, 200); WriteBatchInternal::SetSequence(&b1, 200);
WriteBatchInternal::SetSequence(&b2, 300); WriteBatchInternal::SetSequence(&b2, 300);
WriteBatchInternal::Append(&b1, &b2); ASSERT_OK(WriteBatchInternal::Append(&b1, &b2));
ASSERT_EQ("", ASSERT_EQ("",
PrintContents(&b1)); PrintContents(&b1));
ASSERT_EQ(0u, b1.Count()); ASSERT_EQ(0u, b1.Count());
b2.Put("a", "va"); ASSERT_OK(b2.Put("a", "va"));
WriteBatchInternal::Append(&b1, &b2); ASSERT_OK(WriteBatchInternal::Append(&b1, &b2));
ASSERT_EQ("Put(a, va)@200", ASSERT_EQ("Put(a, va)@200",
PrintContents(&b1)); PrintContents(&b1));
ASSERT_EQ(1u, b1.Count()); ASSERT_EQ(1u, b1.Count());
b2.Clear(); b2.Clear();
b2.Put("b", "vb"); ASSERT_OK(b2.Put("b", "vb"));
WriteBatchInternal::Append(&b1, &b2); ASSERT_OK(WriteBatchInternal::Append(&b1, &b2));
ASSERT_EQ("Put(a, va)@200" ASSERT_EQ("Put(a, va)@200"
"Put(b, vb)@201", "Put(b, vb)@201",
PrintContents(&b1)); PrintContents(&b1));
ASSERT_EQ(2u, b1.Count()); ASSERT_EQ(2u, b1.Count());
b2.Delete("foo"); ASSERT_OK(b2.Delete("foo"));
WriteBatchInternal::Append(&b1, &b2); ASSERT_OK(WriteBatchInternal::Append(&b1, &b2));
ASSERT_EQ("Put(a, va)@200" ASSERT_EQ("Put(a, va)@200"
"Put(b, vb)@202" "Put(b, vb)@202"
"Put(b, vb)@201" "Put(b, vb)@201"
@ -196,11 +196,11 @@ TEST_F(WriteBatchTest, Append) {
PrintContents(&b1)); PrintContents(&b1));
ASSERT_EQ(4u, b1.Count()); ASSERT_EQ(4u, b1.Count());
b2.Clear(); b2.Clear();
b2.Put("c", "cc"); ASSERT_OK(b2.Put("c", "cc"));
b2.Put("d", "dd"); ASSERT_OK(b2.Put("d", "dd"));
b2.MarkWalTerminationPoint(); b2.MarkWalTerminationPoint();
b2.Put("e", "ee"); ASSERT_OK(b2.Put("e", "ee"));
WriteBatchInternal::Append(&b1, &b2, /*wal only*/ true); ASSERT_OK(WriteBatchInternal::Append(&b1, &b2, /*wal only*/ true));
ASSERT_EQ( ASSERT_EQ(
"Put(a, va)@200" "Put(a, va)@200"
"Put(b, vb)@202" "Put(b, vb)@202"
@ -223,10 +223,10 @@ TEST_F(WriteBatchTest, SingleDeletion) {
WriteBatchInternal::SetSequence(&batch, 100); WriteBatchInternal::SetSequence(&batch, 100);
ASSERT_EQ("", PrintContents(&batch)); ASSERT_EQ("", PrintContents(&batch));
ASSERT_EQ(0u, batch.Count()); ASSERT_EQ(0u, batch.Count());
batch.Put("a", "va"); ASSERT_OK(batch.Put("a", "va"));
ASSERT_EQ("Put(a, va)@100", PrintContents(&batch)); ASSERT_EQ("Put(a, va)@100", PrintContents(&batch));
ASSERT_EQ(1u, batch.Count()); ASSERT_EQ(1u, batch.Count());
batch.SingleDelete("a"); ASSERT_OK(batch.SingleDelete("a"));
ASSERT_EQ( ASSERT_EQ(
"SingleDelete(a)@101" "SingleDelete(a)@101"
"Put(a, va)@100", "Put(a, va)@100",
@ -316,7 +316,7 @@ namespace {
TEST_F(WriteBatchTest, PutNotImplemented) { TEST_F(WriteBatchTest, PutNotImplemented) {
WriteBatch batch; WriteBatch batch;
batch.Put(Slice("k1"), Slice("v1")); ASSERT_OK(batch.Put(Slice("k1"), Slice("v1")));
ASSERT_EQ(1u, batch.Count()); ASSERT_EQ(1u, batch.Count());
ASSERT_EQ("Put(k1, v1)@0", PrintContents(&batch)); ASSERT_EQ("Put(k1, v1)@0", PrintContents(&batch));
@ -326,7 +326,7 @@ TEST_F(WriteBatchTest, PutNotImplemented) {
TEST_F(WriteBatchTest, DeleteNotImplemented) { TEST_F(WriteBatchTest, DeleteNotImplemented) {
WriteBatch batch; WriteBatch batch;
batch.Delete(Slice("k2")); ASSERT_OK(batch.Delete(Slice("k2")));
ASSERT_EQ(1u, batch.Count()); ASSERT_EQ(1u, batch.Count());
ASSERT_EQ("Delete(k2)@0", PrintContents(&batch)); ASSERT_EQ("Delete(k2)@0", PrintContents(&batch));
@ -336,7 +336,7 @@ TEST_F(WriteBatchTest, DeleteNotImplemented) {
TEST_F(WriteBatchTest, SingleDeleteNotImplemented) { TEST_F(WriteBatchTest, SingleDeleteNotImplemented) {
WriteBatch batch; WriteBatch batch;
batch.SingleDelete(Slice("k2")); ASSERT_OK(batch.SingleDelete(Slice("k2")));
ASSERT_EQ(1u, batch.Count()); ASSERT_EQ(1u, batch.Count());
ASSERT_EQ("SingleDelete(k2)@0", PrintContents(&batch)); ASSERT_EQ("SingleDelete(k2)@0", PrintContents(&batch));
@ -346,7 +346,7 @@ TEST_F(WriteBatchTest, SingleDeleteNotImplemented) {
TEST_F(WriteBatchTest, MergeNotImplemented) { TEST_F(WriteBatchTest, MergeNotImplemented) {
WriteBatch batch; WriteBatch batch;
batch.Merge(Slice("foo"), Slice("bar")); ASSERT_OK(batch.Merge(Slice("foo"), Slice("bar")));
ASSERT_EQ(1u, batch.Count()); ASSERT_EQ(1u, batch.Count());
ASSERT_EQ("Merge(foo, bar)@0", PrintContents(&batch)); ASSERT_EQ("Merge(foo, bar)@0", PrintContents(&batch));
@ -356,14 +356,14 @@ TEST_F(WriteBatchTest, MergeNotImplemented) {
TEST_F(WriteBatchTest, Blob) { TEST_F(WriteBatchTest, Blob) {
WriteBatch batch; WriteBatch batch;
batch.Put(Slice("k1"), Slice("v1")); ASSERT_OK(batch.Put(Slice("k1"), Slice("v1")));
batch.Put(Slice("k2"), Slice("v2")); ASSERT_OK(batch.Put(Slice("k2"), Slice("v2")));
batch.Put(Slice("k3"), Slice("v3")); ASSERT_OK(batch.Put(Slice("k3"), Slice("v3")));
batch.PutLogData(Slice("blob1")); ASSERT_OK(batch.PutLogData(Slice("blob1")));
batch.Delete(Slice("k2")); ASSERT_OK(batch.Delete(Slice("k2")));
batch.SingleDelete(Slice("k3")); ASSERT_OK(batch.SingleDelete(Slice("k3")));
batch.PutLogData(Slice("blob2")); ASSERT_OK(batch.PutLogData(Slice("blob2")));
batch.Merge(Slice("foo"), Slice("bar")); ASSERT_OK(batch.Merge(Slice("foo"), Slice("bar")));
ASSERT_EQ(6u, batch.Count()); ASSERT_EQ(6u, batch.Count());
ASSERT_EQ( ASSERT_EQ(
"Merge(foo, bar)@5" "Merge(foo, bar)@5"
@ -375,7 +375,7 @@ TEST_F(WriteBatchTest, Blob) {
PrintContents(&batch)); PrintContents(&batch));
TestHandler handler; TestHandler handler;
batch.Iterate(&handler); ASSERT_OK(batch.Iterate(&handler));
ASSERT_EQ( ASSERT_EQ(
"Put(k1, v1)" "Put(k1, v1)"
"Put(k2, v2)" "Put(k2, v2)"
@ -390,19 +390,19 @@ TEST_F(WriteBatchTest, Blob) {
TEST_F(WriteBatchTest, PrepareCommit) { TEST_F(WriteBatchTest, PrepareCommit) {
WriteBatch batch; WriteBatch batch;
WriteBatchInternal::InsertNoop(&batch); ASSERT_OK(WriteBatchInternal::InsertNoop(&batch));
batch.Put(Slice("k1"), Slice("v1")); ASSERT_OK(batch.Put(Slice("k1"), Slice("v1")));
batch.Put(Slice("k2"), Slice("v2")); ASSERT_OK(batch.Put(Slice("k2"), Slice("v2")));
batch.SetSavePoint(); batch.SetSavePoint();
WriteBatchInternal::MarkEndPrepare(&batch, Slice("xid1")); ASSERT_OK(WriteBatchInternal::MarkEndPrepare(&batch, Slice("xid1")));
Status s = batch.RollbackToSavePoint(); Status s = batch.RollbackToSavePoint();
ASSERT_EQ(s, Status::NotFound()); ASSERT_EQ(s, Status::NotFound());
WriteBatchInternal::MarkCommit(&batch, Slice("xid1")); ASSERT_OK(WriteBatchInternal::MarkCommit(&batch, Slice("xid1")));
WriteBatchInternal::MarkRollback(&batch, Slice("xid1")); ASSERT_OK(WriteBatchInternal::MarkRollback(&batch, Slice("xid1")));
ASSERT_EQ(2u, batch.Count()); ASSERT_EQ(2u, batch.Count());
TestHandler handler; TestHandler handler;
batch.Iterate(&handler); ASSERT_OK(batch.Iterate(&handler));
ASSERT_EQ( ASSERT_EQ(
"MarkBeginPrepare(false)" "MarkBeginPrepare(false)"
"Put(k1, v1)" "Put(k1, v1)"
@ -430,7 +430,7 @@ TEST_F(WriteBatchTest, DISABLED_ManyUpdates) {
raw[0] = c; raw[0] = c;
raw[raw.length() - 1] = c; raw[raw.length() - 1] = c;
c++; c++;
batch.Put(raw, raw); ASSERT_OK(batch.Put(raw, raw));
} }
ASSERT_EQ(kNumUpdates, batch.Count()); ASSERT_EQ(kNumUpdates, batch.Count());
@ -472,7 +472,7 @@ TEST_F(WriteBatchTest, DISABLED_ManyUpdates) {
bool Continue() override { return num_seen < kNumUpdates; } bool Continue() override { return num_seen < kNumUpdates; }
} handler; } handler;
batch.Iterate(&handler); ASSERT_OK(batch.Iterate(&handler));
ASSERT_EQ(kNumUpdates, handler.num_seen); ASSERT_EQ(kNumUpdates, handler.num_seen);
} }
@ -486,7 +486,7 @@ TEST_F(WriteBatchTest, DISABLED_LargeKeyValue) {
for (char i = 0; i < 2; i++) { for (char i = 0; i < 2; i++) {
raw[0] = 'A' + i; raw[0] = 'A' + i;
raw[raw.length() - 1] = 'A' - i; raw[raw.length() - 1] = 'A' - i;
batch.Put(raw, raw); ASSERT_OK(batch.Put(raw, raw));
} }
ASSERT_EQ(2u, batch.Count()); ASSERT_EQ(2u, batch.Count());
@ -523,7 +523,7 @@ TEST_F(WriteBatchTest, DISABLED_LargeKeyValue) {
bool Continue() override { return num_seen < 2; } bool Continue() override { return num_seen < 2; }
} handler; } handler;
batch.Iterate(&handler); ASSERT_OK(batch.Iterate(&handler));
ASSERT_EQ(2, handler.num_seen); ASSERT_EQ(2, handler.num_seen);
} }
@ -558,14 +558,14 @@ TEST_F(WriteBatchTest, Continue) {
bool Continue() override { return num_seen < 5; } bool Continue() override { return num_seen < 5; }
} handler; } handler;
batch.Put(Slice("k1"), Slice("v1")); ASSERT_OK(batch.Put(Slice("k1"), Slice("v1")));
batch.Put(Slice("k2"), Slice("v2")); ASSERT_OK(batch.Put(Slice("k2"), Slice("v2")));
batch.PutLogData(Slice("blob1")); ASSERT_OK(batch.PutLogData(Slice("blob1")));
batch.Delete(Slice("k1")); ASSERT_OK(batch.Delete(Slice("k1")));
batch.SingleDelete(Slice("k2")); ASSERT_OK(batch.SingleDelete(Slice("k2")));
batch.PutLogData(Slice("blob2")); ASSERT_OK(batch.PutLogData(Slice("blob2")));
batch.Merge(Slice("foo"), Slice("bar")); ASSERT_OK(batch.Merge(Slice("foo"), Slice("bar")));
batch.Iterate(&handler); ASSERT_OK(batch.Iterate(&handler));
ASSERT_EQ( ASSERT_EQ(
"Put(k1, v1)" "Put(k1, v1)"
"Put(k2, v2)" "Put(k2, v2)"
@ -577,22 +577,22 @@ TEST_F(WriteBatchTest, Continue) {
TEST_F(WriteBatchTest, PutGatherSlices) { TEST_F(WriteBatchTest, PutGatherSlices) {
WriteBatch batch; WriteBatch batch;
batch.Put(Slice("foo"), Slice("bar")); ASSERT_OK(batch.Put(Slice("foo"), Slice("bar")));
{ {
// Try a write where the key is one slice but the value is two // Try a write where the key is one slice but the value is two
Slice key_slice("baz"); Slice key_slice("baz");
Slice value_slices[2] = { Slice("header"), Slice("payload") }; Slice value_slices[2] = { Slice("header"), Slice("payload") };
batch.Put(SliceParts(&key_slice, 1), ASSERT_OK(
SliceParts(value_slices, 2)); batch.Put(SliceParts(&key_slice, 1), SliceParts(value_slices, 2)));
} }
{ {
// One where the key is composite but the value is a single slice // One where the key is composite but the value is a single slice
Slice key_slices[3] = { Slice("key"), Slice("part2"), Slice("part3") }; Slice key_slices[3] = { Slice("key"), Slice("part2"), Slice("part3") };
Slice value_slice("value"); Slice value_slice("value");
batch.Put(SliceParts(key_slices, 3), ASSERT_OK(
SliceParts(&value_slice, 1)); batch.Put(SliceParts(key_slices, 3), SliceParts(&value_slice, 1)));
} }
WriteBatchInternal::SetSequence(&batch, 100); WriteBatchInternal::SetSequence(&batch, 100);
@ -621,18 +621,18 @@ class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) { TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) {
WriteBatch batch; WriteBatch batch;
ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8); ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8);
batch.Put(&zero, Slice("foo"), Slice("bar")); ASSERT_OK(batch.Put(&zero, Slice("foo"), Slice("bar")));
batch.Put(&two, Slice("twofoo"), Slice("bar2")); ASSERT_OK(batch.Put(&two, Slice("twofoo"), Slice("bar2")));
batch.Put(&eight, Slice("eightfoo"), Slice("bar8")); ASSERT_OK(batch.Put(&eight, Slice("eightfoo"), Slice("bar8")));
batch.Delete(&eight, Slice("eightfoo")); ASSERT_OK(batch.Delete(&eight, Slice("eightfoo")));
batch.SingleDelete(&two, Slice("twofoo")); ASSERT_OK(batch.SingleDelete(&two, Slice("twofoo")));
batch.DeleteRange(&two, Slice("3foo"), Slice("4foo")); ASSERT_OK(batch.DeleteRange(&two, Slice("3foo"), Slice("4foo")));
batch.Merge(&three, Slice("threethree"), Slice("3three")); ASSERT_OK(batch.Merge(&three, Slice("threethree"), Slice("3three")));
batch.Put(&zero, Slice("foo"), Slice("bar")); ASSERT_OK(batch.Put(&zero, Slice("foo"), Slice("bar")));
batch.Merge(Slice("omom"), Slice("nom")); ASSERT_OK(batch.Merge(Slice("omom"), Slice("nom")));
TestHandler handler; TestHandler handler;
batch.Iterate(&handler); ASSERT_OK(batch.Iterate(&handler));
ASSERT_EQ( ASSERT_EQ(
"Put(foo, bar)" "Put(foo, bar)"
"PutCF(2, twofoo, bar2)" "PutCF(2, twofoo, bar2)"
@ -650,14 +650,14 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) {
TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) { TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
WriteBatchWithIndex batch; WriteBatchWithIndex batch;
ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8); ColumnFamilyHandleImplDummy zero(0), two(2), three(3), eight(8);
batch.Put(&zero, Slice("foo"), Slice("bar")); ASSERT_OK(batch.Put(&zero, Slice("foo"), Slice("bar")));
batch.Put(&two, Slice("twofoo"), Slice("bar2")); ASSERT_OK(batch.Put(&two, Slice("twofoo"), Slice("bar2")));
batch.Put(&eight, Slice("eightfoo"), Slice("bar8")); ASSERT_OK(batch.Put(&eight, Slice("eightfoo"), Slice("bar8")));
batch.Delete(&eight, Slice("eightfoo")); ASSERT_OK(batch.Delete(&eight, Slice("eightfoo")));
batch.SingleDelete(&two, Slice("twofoo")); ASSERT_OK(batch.SingleDelete(&two, Slice("twofoo")));
batch.Merge(&three, Slice("threethree"), Slice("3three")); ASSERT_OK(batch.Merge(&three, Slice("threethree"), Slice("3three")));
batch.Put(&zero, Slice("foo"), Slice("bar")); ASSERT_OK(batch.Put(&zero, Slice("foo"), Slice("bar")));
batch.Merge(Slice("omom"), Slice("nom")); ASSERT_OK(batch.Merge(Slice("omom"), Slice("nom")));
std::unique_ptr<WBWIIterator> iter; std::unique_ptr<WBWIIterator> iter;
@ -736,7 +736,7 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
ASSERT_TRUE(!iter->Valid()); ASSERT_TRUE(!iter->Valid());
TestHandler handler; TestHandler handler;
batch.GetWriteBatch()->Iterate(&handler); ASSERT_OK(batch.GetWriteBatch()->Iterate(&handler));
ASSERT_EQ( ASSERT_EQ(
"Put(foo, bar)" "Put(foo, bar)"
"PutCF(2, twofoo, bar2)" "PutCF(2, twofoo, bar2)"
@ -755,12 +755,12 @@ TEST_F(WriteBatchTest, SavePointTest) {
WriteBatch batch; WriteBatch batch;
batch.SetSavePoint(); batch.SetSavePoint();
batch.Put("A", "a"); ASSERT_OK(batch.Put("A", "a"));
batch.Put("B", "b"); ASSERT_OK(batch.Put("B", "b"));
batch.SetSavePoint(); batch.SetSavePoint();
batch.Put("C", "c"); ASSERT_OK(batch.Put("C", "c"));
batch.Delete("A"); ASSERT_OK(batch.Delete("A"));
batch.SetSavePoint(); batch.SetSavePoint();
batch.SetSavePoint(); batch.SetSavePoint();
@ -779,8 +779,8 @@ TEST_F(WriteBatchTest, SavePointTest) {
"Put(B, b)@1", "Put(B, b)@1",
PrintContents(&batch)); PrintContents(&batch));
batch.Delete("A"); ASSERT_OK(batch.Delete("A"));
batch.Put("B", "bb"); ASSERT_OK(batch.Put("B", "bb"));
ASSERT_OK(batch.RollbackToSavePoint()); ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_EQ("", PrintContents(&batch)); ASSERT_EQ("", PrintContents(&batch));
@ -789,12 +789,12 @@ TEST_F(WriteBatchTest, SavePointTest) {
ASSERT_TRUE(s.IsNotFound()); ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ("", PrintContents(&batch)); ASSERT_EQ("", PrintContents(&batch));
batch.Put("D", "d"); ASSERT_OK(batch.Put("D", "d"));
batch.Delete("A"); ASSERT_OK(batch.Delete("A"));
batch.SetSavePoint(); batch.SetSavePoint();
batch.Put("A", "aaa"); ASSERT_OK(batch.Put("A", "aaa"));
ASSERT_OK(batch.RollbackToSavePoint()); ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_EQ( ASSERT_EQ(
@ -804,8 +804,8 @@ TEST_F(WriteBatchTest, SavePointTest) {
batch.SetSavePoint(); batch.SetSavePoint();
batch.Put("D", "d"); ASSERT_OK(batch.Put("D", "d"));
batch.Delete("A"); ASSERT_OK(batch.Delete("A"));
ASSERT_OK(batch.RollbackToSavePoint()); ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_EQ( ASSERT_EQ(
@ -826,7 +826,7 @@ TEST_F(WriteBatchTest, SavePointTest) {
ASSERT_TRUE(s.IsNotFound()); ASSERT_TRUE(s.IsNotFound());
ASSERT_EQ("", PrintContents(&batch2)); ASSERT_EQ("", PrintContents(&batch2));
batch2.Delete("A"); ASSERT_OK(batch2.Delete("A"));
batch2.SetSavePoint(); batch2.SetSavePoint();
s = batch2.RollbackToSavePoint(); s = batch2.RollbackToSavePoint();
@ -838,7 +838,7 @@ TEST_F(WriteBatchTest, SavePointTest) {
batch2.SetSavePoint(); batch2.SetSavePoint();
batch2.Delete("B"); ASSERT_OK(batch2.Delete("B"));
ASSERT_EQ("Delete(B)@0", PrintContents(&batch2)); ASSERT_EQ("Delete(B)@0", PrintContents(&batch2));
batch2.SetSavePoint(); batch2.SetSavePoint();
@ -861,7 +861,7 @@ TEST_F(WriteBatchTest, SavePointTest) {
ASSERT_EQ("", PrintContents(&batch3)); ASSERT_EQ("", PrintContents(&batch3));
batch3.SetSavePoint(); batch3.SetSavePoint();
batch3.Delete("A"); ASSERT_OK(batch3.Delete("A"));
s = batch3.PopSavePoint(); s = batch3.PopSavePoint();
ASSERT_OK(s); ASSERT_OK(s);

@ -45,6 +45,9 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
uint32_t column_family; uint32_t column_family;
Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value,
blob, xid); blob, xid);
if (!s.ok()) {
return s;
}
switch (tag) { switch (tag) {
case kTypeColumnFamilyValue: case kTypeColumnFamilyValue:

Loading…
Cancel
Save