Revert "Modification of WriteBatch to support two phase commit"

Summary: Revert D54093 and D57453

Test Plan: running make check

Reviewers: horuff, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D57819
main
Andrew Kryczka 9 years ago
parent 04dec2a359
commit 269f6b2e2d
  1. 7
      db/dbformat.h
  2. 140
      db/write_batch.cc
  3. 8
      db/write_batch_internal.h
  4. 40
      db/write_batch_test.cc
  5. 3
      include/rocksdb/utilities/write_batch_with_index.h
  6. 29
      include/rocksdb/write_batch.h
  7. 8
      tools/ldb_cmd.cc
  8. 15
      utilities/write_batch_with_index/write_batch_with_index.cc
  9. 19
      utilities/write_batch_with_index/write_batch_with_index_internal.cc
  10. 2
      utilities/write_batch_with_index/write_batch_with_index_internal.h

@ -39,11 +39,6 @@ enum ValueType : unsigned char {
kTypeColumnFamilyMerge = 0x6, // WAL only.
kTypeSingleDeletion = 0x7,
kTypeColumnFamilySingleDeletion = 0x8, // WAL only.
kTypeBeginPrepareXID = 0x9, // WAL only.
kTypeEndPrepareXID = 0xA, // WAL only.
kTypeCommitXID = 0xB, // WAL only.
kTypeRollbackXID = 0xC, // WAL only.
kTypeNoop = 0xD, // WAL only.
kMaxValue = 0x7F // Not used for storing records.
};
@ -483,5 +478,5 @@ extern bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key,
// input will be advanced to after the record.
extern Status ReadRecordFromWriteBatch(Slice* input, char* tag,
uint32_t* column_family, Slice* key,
Slice* value, Slice* blob, Slice* xid);
Slice* value, Slice* blob);
} // namespace rocksdb

@ -20,11 +20,6 @@
// kTypeColumnFamilyDeletion varint32 varstring varstring
// kTypeColumnFamilySingleDeletion varint32 varstring varstring
// kTypeColumnFamilyMerge varint32 varstring varstring
// kTypeBeginPrepareXID varstring
// kTypeEndPrepareXID
// kTypeCommitXID varstring
// kTypeRollbackXID varstring
// kTypeNoop
// varstring :=
// len: varint32
// data: uint8[len]
@ -53,15 +48,11 @@ namespace rocksdb {
namespace {
enum ContentFlags : uint32_t {
DEFERRED = 1 << 0,
HAS_PUT = 1 << 1,
HAS_DELETE = 1 << 2,
HAS_SINGLE_DELETE = 1 << 3,
HAS_MERGE = 1 << 4,
HAS_BEGIN_PREPARE = 1 << 5,
HAS_END_PREPARE = 1 << 6,
HAS_COMMIT = 1 << 7,
HAS_ROLLBACK = 1 << 8,
DEFERRED = 1,
HAS_PUT = 2,
HAS_DELETE = 4,
HAS_SINGLE_DELETE = 8,
HAS_MERGE = 16,
};
struct BatchContentClassifier : public WriteBatch::Handler {
@ -86,26 +77,6 @@ struct BatchContentClassifier : public WriteBatch::Handler {
content_flags |= ContentFlags::HAS_MERGE;
return Status::OK();
}
Status MarkBeginPrepare() override {
content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
return Status::OK();
}
Status MarkEndPrepare(const Slice&) override {
content_flags |= ContentFlags::HAS_END_PREPARE;
return Status::OK();
}
Status MarkCommit(const Slice&) override {
content_flags |= ContentFlags::HAS_COMMIT;
return Status::OK();
}
Status MarkRollback(const Slice&) override {
content_flags |= ContentFlags::HAS_ROLLBACK;
return Status::OK();
}
};
} // anon namespace
@ -126,7 +97,6 @@ WriteBatch::WriteBatch(size_t reserved_bytes)
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
reserved_bytes : WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
rep_.push_back(static_cast<char>(kTypeNoop));
}
WriteBatch::WriteBatch(const std::string& rep)
@ -176,7 +146,6 @@ bool WriteBatch::Handler::Continue() {
void WriteBatch::Clear() {
rep_.clear();
rep_.resize(WriteBatchInternal::kHeader);
rep_.push_back(static_cast<char>(kTypeNoop));
content_flags_.store(0, std::memory_order_relaxed);
@ -240,25 +209,9 @@ bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
return GetLengthPrefixedSlice(input, key);
}
bool WriteBatch::HasBeginPrepare() const {
return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
}
bool WriteBatch::HasEndPrepare() const {
return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
}
bool WriteBatch::HasCommit() const {
return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
}
bool WriteBatch::HasRollback() const {
return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
}
Status ReadRecordFromWriteBatch(Slice* input, char* tag,
uint32_t* column_family, Slice* key,
Slice* value, Slice* blob, Slice* xid) {
Slice* value, Slice* blob) {
assert(key != nullptr && value != nullptr);
*tag = (*input)[0];
input->remove_prefix(1);
@ -304,24 +257,6 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
return Status::Corruption("bad WriteBatch Blob");
}
break;
case kTypeNoop:
case kTypeBeginPrepareXID:
break;
case kTypeEndPrepareXID:
if (!GetLengthPrefixedSlice(input, xid)) {
return Status::Corruption("bad EndPrepare XID");
}
break;
case kTypeCommitXID:
if (!GetLengthPrefixedSlice(input, xid)) {
return Status::Corruption("bad Commit XID");
}
break;
case kTypeRollbackXID:
if (!GetLengthPrefixedSlice(input, xid)) {
return Status::Corruption("bad Rollback XID");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
@ -335,7 +270,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
}
input.remove_prefix(WriteBatchInternal::kHeader);
Slice key, value, blob, xid;
Slice key, value, blob;
int found = 0;
Status s;
while (s.ok() && !input.empty() && handler->Continue()) {
@ -343,7 +278,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
uint32_t column_family = 0; // default
s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
&blob, &xid);
&blob);
if (!s.ok()) {
return s;
}
@ -380,28 +315,6 @@ Status WriteBatch::Iterate(Handler* handler) const {
case kTypeLogData:
handler->LogData(blob);
break;
case kTypeBeginPrepareXID:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
handler->MarkBeginPrepare();
break;
case kTypeEndPrepareXID:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
handler->MarkEndPrepare(xid);
break;
case kTypeCommitXID:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
handler->MarkCommit(xid);
break;
case kTypeRollbackXID:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
handler->MarkRollback(xid);
break;
case kTypeNoop:
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
@ -478,43 +391,6 @@ void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);
}
void WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) {
// a manually constructed batch can only contain one prepare section
assert(b->rep_[12] == static_cast<char>(kTypeNoop));
// all savepoints up to this point are cleared
if (b->save_points_ != nullptr) {
while (!b->save_points_->stack.empty()) {
b->save_points_->stack.pop();
}
}
// rewrite noop as begin marker
b->rep_[12] = static_cast<char>(kTypeBeginPrepareXID);
b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_END_PREPARE |
ContentFlags::HAS_BEGIN_PREPARE,
std::memory_order_relaxed);
}
void WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
b->rep_.push_back(static_cast<char>(kTypeCommitXID));
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_COMMIT,
std::memory_order_relaxed);
}
void WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_ROLLBACK,
std::memory_order_relaxed);
}
void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
const Slice& key) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);

@ -92,14 +92,6 @@ class WriteBatchInternal {
static void Merge(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value);
static void MarkBeginPrepare(WriteBatch* batch);
static void MarkEndPrepare(WriteBatch* batch, const Slice& xid);
static void MarkRollback(WriteBatch* batch, const Slice& xid);
static void MarkCommit(WriteBatch* batch, const Slice& xid);
// Return the number of entries in the batch.
static int Count(const WriteBatch* batch);

@ -231,22 +231,6 @@ namespace {
virtual void LogData(const Slice& blob) override {
seen += "LogData(" + blob.ToString() + ")";
}
virtual Status MarkBeginPrepare() override {
seen += "MarkBeginPrepare()";
return Status::OK();
}
virtual Status MarkEndPrepare(const Slice& xid) override {
seen += "MarkEndPrepare(" + xid.ToString() + ")";
return Status::OK();
}
virtual Status MarkCommit(const Slice& xid) override {
seen += "MarkCommit(" + xid.ToString() + ")";
return Status::OK();
}
virtual Status MarkRollback(const Slice& xid) override {
seen += "MarkRollback(" + xid.ToString() + ")";
return Status::OK();
}
};
}
@ -324,30 +308,6 @@ TEST_F(WriteBatchTest, Blob) {
handler.seen);
}
TEST_F(WriteBatchTest, PrepareCommit) {
WriteBatch batch;
batch.Put(Slice("k1"), Slice("v1"));
batch.Put(Slice("k2"), Slice("v2"));
batch.SetSavePoint();
WriteBatchInternal::MarkEndPrepare(&batch, Slice("xid1"));
Status s = batch.RollbackToSavePoint();
ASSERT_EQ(s, Status::NotFound());
WriteBatchInternal::MarkCommit(&batch, Slice("xid1"));
WriteBatchInternal::MarkRollback(&batch, Slice("xid1"));
ASSERT_EQ(2, batch.Count());
TestHandler handler;
batch.Iterate(&handler);
ASSERT_EQ(
"MarkBeginPrepare()"
"Put(k1, v1)"
"Put(k2, v2)"
"MarkEndPrepare(xid1)"
"MarkCommit(xid1)"
"MarkRollback(xid1)",
handler.seen);
}
// It requires more than 30GB of memory to run the test. With single memory
// allocation of more than 30GB.
// Not all platform can run it. Also it runs a long time. So disable it.

@ -34,8 +34,7 @@ enum WriteType {
kMergeRecord,
kDeleteRecord,
kSingleDeleteRecord,
kLogDataRecord,
kXIDRecord,
kLogDataRecord
};
// an entry for Put, Merge, Delete, or SingleDelete entry for write batches.

@ -186,23 +186,6 @@ class WriteBatch : public WriteBatchBase {
// The default implementation of LogData does nothing.
virtual void LogData(const Slice& blob);
virtual Status MarkBeginPrepare() {
return Status::InvalidArgument("MarkBeginPrepare() handler not defined.");
}
virtual Status MarkEndPrepare(const Slice& xid) {
return Status::InvalidArgument("MarkEndPrepare() handler not defined.");
}
virtual Status MarkRollback(const Slice& xid) {
return Status::InvalidArgument(
"MarkRollbackPrepare() handler not defined.");
}
virtual Status MarkCommit(const Slice& xid) {
return Status::InvalidArgument("MarkCommit() handler not defined.");
}
// Continue is called by WriteBatch::Iterate. If it returns false,
// iteration is halted. Otherwise, it continues iterating. The default
// implementation always returns true.
@ -231,18 +214,6 @@ class WriteBatch : public WriteBatchBase {
// Returns trie if MergeCF will be called during Iterate
bool HasMerge() const;
// Returns true if MarkBeginPrepare will be called during Iterate
bool HasBeginPrepare() const;
// Returns true if MarkEndPrepare will be called during Iterate
bool HasEndPrepare() const;
// Returns trie if MarkCommit will be called during Iterate
bool HasCommit() const;
// Returns trie if MarkRollback will be called during Iterate
bool HasRollback() const;
using WriteBatchBase::GetWriteBatch;
WriteBatch* GetWriteBatch() override { return this; }

@ -1782,24 +1782,24 @@ class InMemoryHandler : public WriteBatch::Handler {
return Status::OK();
}
virtual Status MarkBeginPrepare() override {
virtual Status MarkBeginPrepare() {
row_ << "BEGIN_PREARE ";
return Status::OK();
}
virtual Status MarkEndPrepare(const Slice& xid) override {
virtual Status MarkEndPrepare(const Slice& xid) {
row_ << "END_PREPARE(";
row_ << LDBCommand::StringToHex(xid.ToString()) << ") ";
return Status::OK();
}
virtual Status MarkRollback(const Slice& xid) override {
virtual Status MarkRollback(const Slice& xid) {
row_ << "ROLLBACK(";
row_ << LDBCommand::StringToHex(xid.ToString()) << ") ";
return Status::OK();
}
virtual Status MarkCommit(const Slice& xid) override {
virtual Status MarkCommit(const Slice& xid) {
row_ << "COMMIT(";
row_ << LDBCommand::StringToHex(xid.ToString()) << ") ";
return Status::OK();

@ -337,13 +337,13 @@ class WBWIIteratorImpl : public WBWIIterator {
virtual WriteEntry Entry() const override {
WriteEntry ret;
Slice blob, xid;
Slice blob;
const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key();
// this is guaranteed with Valid()
assert(iter_entry != nullptr &&
iter_entry->column_family == column_family_id_);
auto s = write_batch_->GetEntryFromDataOffset(
iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid);
auto s = write_batch_->GetEntryFromDataOffset(iter_entry->offset, &ret.type,
&ret.key, &ret.value, &blob);
assert(s.ok());
assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
ret.type == kSingleDeleteRecord || ret.type == kMergeRecord);
@ -501,7 +501,7 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
// Loop through all entries in Rep and add each one to the index
int found = 0;
while (s.ok() && !input.empty()) {
Slice key, value, blob, xid;
Slice key, value, blob;
uint32_t column_family_id = 0; // default
char tag = 0;
@ -509,7 +509,7 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
last_entry_offset = input.data() - write_batch.Data().data();
s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key,
&value, &blob, &xid);
&value, &blob);
if (!s.ok()) {
break;
}
@ -529,11 +529,6 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
}
break;
case kTypeLogData:
case kTypeBeginPrepareXID:
case kTypeEndPrepareXID:
case kTypeCommitXID:
case kTypeRollbackXID:
case kTypeNoop:
break;
default:
return Status::Corruption("unknown WriteBatch tag");

@ -24,10 +24,10 @@ class Statistics;
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
WriteType* type, Slice* Key,
Slice* value, Slice* blob,
Slice* xid) const {
Slice* value,
Slice* blob) const {
if (type == nullptr || Key == nullptr || value == nullptr ||
blob == nullptr || xid == nullptr) {
blob == nullptr) {
return Status::InvalidArgument("Output parameters cannot be null");
}
@ -42,8 +42,8 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset);
char tag;
uint32_t column_family;
Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value,
blob, xid);
Status s =
ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob);
switch (tag) {
case kTypeColumnFamilyValue:
@ -65,12 +65,6 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
case kTypeLogData:
*type = kLogDataRecord;
break;
case kTypeBeginPrepareXID:
case kTypeEndPrepareXID:
case kTypeCommitXID:
case kTypeRollbackXID:
*type = kXIDRecord;
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
@ -189,8 +183,7 @@ WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch(
result = WriteBatchWithIndexInternal::Result::kDeleted;
break;
}
case kLogDataRecord:
case kXIDRecord: {
case kLogDataRecord: {
// ignore
break;
}

@ -58,7 +58,7 @@ class ReadableWriteBatch : public WriteBatch {
// Retrieve some information from a write entry in the write batch, given
// the start offset of the write entry.
Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key,
Slice* value, Slice* blob, Slice* xid) const;
Slice* value, Slice* blob) const;
};
class WriteBatchEntryComparator {

Loading…
Cancel
Save