track WriteBatch contents

Summary:
Parallel writes will only be possible for certain combinations of
flags and WriteBatch contents.  Traversing the WriteBatch at write time
to check these conditions would be expensive, but it is very cheap to
keep track of when building WriteBatch-es.  When loading WriteBatch-es
during recovery, a deferred computation state is used so that the flags
never need to be computed.

Test Plan:
1. add asserts and EXPECT_EQ-s
2. make check

Reviewers: sdong, igor

Reviewed By: igor

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D50337
main
Nathan Bronson 9 years ago
parent 505accda38
commit 631863c63b
  1. 151
      db/write_batch.cc
  2. 12
      db/write_batch_test.cc
  3. 27
      include/rocksdb/write_batch.h

@ -42,22 +42,91 @@
namespace rocksdb { namespace rocksdb {
// anon namespace for file-local types
namespace {
enum ContentFlags : uint32_t {
DEFERRED = 1,
HAS_PUT = 2,
HAS_DELETE = 4,
HAS_SINGLE_DELETE = 8,
HAS_MERGE = 16,
};
struct BatchContentClassifier : public WriteBatch::Handler {
uint32_t content_flags = 0;
Status PutCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_PUT;
return Status::OK();
}
Status DeleteCF(uint32_t, const Slice&) override {
content_flags |= ContentFlags::HAS_DELETE;
return Status::OK();
}
Status SingleDeleteCF(uint32_t, const Slice&) override {
content_flags |= ContentFlags::HAS_SINGLE_DELETE;
return Status::OK();
}
Status MergeCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_MERGE;
return Status::OK();
}
};
} // anon namespace
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count. // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static const size_t kHeader = 12; static const size_t kHeader = 12;
struct SavePoint { struct SavePoint {
size_t size; // size of rep_ size_t size; // size of rep_
int count; // count of elements in rep_ int count; // count of elements in rep_
SavePoint(size_t s, int c) : size(s), count(c) {} uint32_t content_flags;
}; };
struct SavePoints { struct SavePoints {
std::stack<SavePoint> stack; std::stack<SavePoint> stack;
}; };
WriteBatch::WriteBatch(size_t reserved_bytes) : save_points_(nullptr) { WriteBatch::WriteBatch(size_t reserved_bytes)
: save_points_(nullptr), content_flags_(0), rep_() {
rep_.reserve((reserved_bytes > kHeader) ? reserved_bytes : kHeader); rep_.reserve((reserved_bytes > kHeader) ? reserved_bytes : kHeader);
Clear(); rep_.resize(kHeader);
}
WriteBatch::WriteBatch(const std::string& rep)
: save_points_(nullptr),
content_flags_(ContentFlags::DEFERRED),
rep_(rep) {}
WriteBatch::WriteBatch(const WriteBatch& src)
: save_points_(src.save_points_),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
rep_(src.rep_) {}
WriteBatch::WriteBatch(WriteBatch&& src)
: save_points_(std::move(src.save_points_)),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
rep_(std::move(src.rep_)) {}
WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
if (&src != this) {
this->~WriteBatch();
new (this) WriteBatch(src);
}
return *this;
}
WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
if (&src != this) {
this->~WriteBatch();
new (this) WriteBatch(std::move(src));
}
return *this;
} }
WriteBatch::~WriteBatch() { WriteBatch::~WriteBatch() {
@ -81,6 +150,8 @@ void WriteBatch::Clear() {
rep_.clear(); rep_.clear();
rep_.resize(kHeader); rep_.resize(kHeader);
content_flags_.store(0, std::memory_order_relaxed);
if (save_points_ != nullptr) { if (save_points_ != nullptr) {
while (!save_points_->stack.empty()) { while (!save_points_->stack.empty()) {
save_points_->stack.pop(); save_points_->stack.pop();
@ -92,6 +163,38 @@ int WriteBatch::Count() const {
return WriteBatchInternal::Count(this); return WriteBatchInternal::Count(this);
} }
uint32_t WriteBatch::ComputeContentFlags() const {
auto rv = content_flags_.load(std::memory_order_relaxed);
if ((rv & ContentFlags::DEFERRED) != 0) {
BatchContentClassifier classifier;
Iterate(&classifier);
rv = classifier.content_flags;
// this method is conceptually const, because it is performing a lazy
// computation that doesn't affect the abstract state of the batch.
// content_flags_ is marked mutable so that we can perform the
// following assignment
content_flags_.store(rv, std::memory_order_relaxed);
}
return rv;
}
bool WriteBatch::HasPut() const {
return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
}
bool WriteBatch::HasDelete() const {
return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
}
bool WriteBatch::HasSingleDelete() const {
return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
}
bool WriteBatch::HasMerge() const {
return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
}
Status ReadRecordFromWriteBatch(Slice* input, char* tag, Status ReadRecordFromWriteBatch(Slice* input, char* tag,
uint32_t* column_family, Slice* key, uint32_t* column_family, Slice* key,
Slice* value, Slice* blob) { Slice* value, Slice* blob) {
@ -169,21 +272,29 @@ Status WriteBatch::Iterate(Handler* handler) const {
switch (tag) { switch (tag) {
case kTypeColumnFamilyValue: case kTypeColumnFamilyValue:
case kTypeValue: case kTypeValue:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
s = handler->PutCF(column_family, key, value); s = handler->PutCF(column_family, key, value);
found++; found++;
break; break;
case kTypeColumnFamilyDeletion: case kTypeColumnFamilyDeletion:
case kTypeDeletion: case kTypeDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
s = handler->DeleteCF(column_family, key); s = handler->DeleteCF(column_family, key);
found++; found++;
break; break;
case kTypeColumnFamilySingleDeletion: case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion: case kTypeSingleDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
s = handler->SingleDeleteCF(column_family, key); s = handler->SingleDeleteCF(column_family, key);
found++; found++;
break; break;
case kTypeColumnFamilyMerge: case kTypeColumnFamilyMerge:
case kTypeMerge: case kTypeMerge:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
s = handler->MergeCF(column_family, key, value); s = handler->MergeCF(column_family, key, value);
found++; found++;
break; break;
@ -233,6 +344,9 @@ void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
} }
PutLengthPrefixedSlice(&b->rep_, key); PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value); PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
std::memory_order_relaxed);
} }
void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
@ -251,6 +365,9 @@ void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
} }
PutLengthPrefixedSliceParts(&b->rep_, key); PutLengthPrefixedSliceParts(&b->rep_, key);
PutLengthPrefixedSliceParts(&b->rep_, value); PutLengthPrefixedSliceParts(&b->rep_, value);
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
std::memory_order_relaxed);
} }
void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
@ -268,6 +385,9 @@ void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
PutVarint32(&b->rep_, column_family_id); PutVarint32(&b->rep_, column_family_id);
} }
PutLengthPrefixedSlice(&b->rep_, key); PutLengthPrefixedSlice(&b->rep_, key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE,
std::memory_order_relaxed);
} }
void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
@ -284,6 +404,9 @@ void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
PutVarint32(&b->rep_, column_family_id); PutVarint32(&b->rep_, column_family_id);
} }
PutLengthPrefixedSliceParts(&b->rep_, key); PutLengthPrefixedSliceParts(&b->rep_, key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE,
std::memory_order_relaxed);
} }
void WriteBatch::Delete(ColumnFamilyHandle* column_family, void WriteBatch::Delete(ColumnFamilyHandle* column_family,
@ -301,6 +424,9 @@ void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id,
PutVarint32(&b->rep_, column_family_id); PutVarint32(&b->rep_, column_family_id);
} }
PutLengthPrefixedSlice(&b->rep_, key); PutLengthPrefixedSlice(&b->rep_, key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_SINGLE_DELETE,
std::memory_order_relaxed);
} }
void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
@ -318,6 +444,9 @@ void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id,
PutVarint32(&b->rep_, column_family_id); PutVarint32(&b->rep_, column_family_id);
} }
PutLengthPrefixedSliceParts(&b->rep_, key); PutLengthPrefixedSliceParts(&b->rep_, key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_SINGLE_DELETE,
std::memory_order_relaxed);
} }
void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
@ -336,6 +465,9 @@ void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
} }
PutLengthPrefixedSlice(&b->rep_, key); PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value); PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_MERGE,
std::memory_order_relaxed);
} }
void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
@ -355,6 +487,9 @@ void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
} }
PutLengthPrefixedSliceParts(&b->rep_, key); PutLengthPrefixedSliceParts(&b->rep_, key);
PutLengthPrefixedSliceParts(&b->rep_, value); PutLengthPrefixedSliceParts(&b->rep_, value);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_MERGE,
std::memory_order_relaxed);
} }
void WriteBatch::Merge(ColumnFamilyHandle* column_family, void WriteBatch::Merge(ColumnFamilyHandle* column_family,
@ -374,7 +509,8 @@ void WriteBatch::SetSavePoint() {
save_points_ = new SavePoints(); save_points_ = new SavePoints();
} }
// Record length and count of current batch of writes. // Record length and count of current batch of writes.
save_points_->stack.push(SavePoint(GetDataSize(), Count())); save_points_->stack.push(SavePoint{
GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)});
} }
Status WriteBatch::RollbackToSavePoint() { Status WriteBatch::RollbackToSavePoint() {
@ -387,6 +523,7 @@ Status WriteBatch::RollbackToSavePoint() {
save_points_->stack.pop(); save_points_->stack.pop();
assert(savepoint.size <= rep_.size()); assert(savepoint.size <= rep_.size());
assert(savepoint.count <= Count());
if (savepoint.size == rep_.size()) { if (savepoint.size == rep_.size()) {
// No changes to rollback // No changes to rollback
@ -396,6 +533,7 @@ Status WriteBatch::RollbackToSavePoint() {
} else { } else {
rep_.resize(savepoint.size); rep_.resize(savepoint.size);
WriteBatchInternal::SetCount(this, savepoint.count); WriteBatchInternal::SetCount(this, savepoint.count);
content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
} }
return Status::OK(); return Status::OK();
@ -670,12 +808,17 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b,
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= kHeader); assert(contents.size() >= kHeader);
b->rep_.assign(contents.data(), contents.size()); b->rep_.assign(contents.data(), contents.size());
b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
} }
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
SetCount(dst, Count(dst) + Count(src)); SetCount(dst, Count(dst) + Count(src));
assert(src->rep_.size() >= kHeader); assert(src->rep_.size() >= kHeader);
dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
dst->content_flags_.store(
dst->content_flags_.load(std::memory_order_relaxed) |
src->content_flags_.load(std::memory_order_relaxed),
std::memory_order_relaxed);
} }
} // namespace rocksdb } // namespace rocksdb

@ -39,6 +39,10 @@ static std::string PrintContents(WriteBatch* b) {
ColumnFamilyMemTablesDefault cf_mems_default(mem); ColumnFamilyMemTablesDefault cf_mems_default(mem);
Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default); Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default);
int count = 0; int count = 0;
int put_count = 0;
int delete_count = 0;
int single_delete_count = 0;
int merge_count = 0;
Arena arena; Arena arena;
ScopedArenaIterator iter(mem->NewIterator(ReadOptions(), &arena)); ScopedArenaIterator iter(mem->NewIterator(ReadOptions(), &arena));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
@ -53,18 +57,21 @@ static std::string PrintContents(WriteBatch* b) {
state.append(iter->value().ToString()); state.append(iter->value().ToString());
state.append(")"); state.append(")");
count++; count++;
put_count++;
break; break;
case kTypeDeletion: case kTypeDeletion:
state.append("Delete("); state.append("Delete(");
state.append(ikey.user_key.ToString()); state.append(ikey.user_key.ToString());
state.append(")"); state.append(")");
count++; count++;
delete_count++;
break; break;
case kTypeSingleDeletion: case kTypeSingleDeletion:
state.append("SingleDelete("); state.append("SingleDelete(");
state.append(ikey.user_key.ToString()); state.append(ikey.user_key.ToString());
state.append(")"); state.append(")");
count++; count++;
single_delete_count++;
break; break;
case kTypeMerge: case kTypeMerge:
state.append("Merge("); state.append("Merge(");
@ -73,6 +80,7 @@ static std::string PrintContents(WriteBatch* b) {
state.append(iter->value().ToString()); state.append(iter->value().ToString());
state.append(")"); state.append(")");
count++; count++;
merge_count++;
break; break;
default: default:
assert(false); assert(false);
@ -81,6 +89,10 @@ static std::string PrintContents(WriteBatch* b) {
state.append("@"); state.append("@");
state.append(NumberToString(ikey.sequence)); state.append(NumberToString(ikey.sequence));
} }
EXPECT_EQ(b->HasPut(), put_count > 0);
EXPECT_EQ(b->HasDelete(), delete_count > 0);
EXPECT_EQ(b->HasSingleDelete(), single_delete_count > 0);
EXPECT_EQ(b->HasMerge(), merge_count > 0);
if (!s.ok()) { if (!s.ok()) {
state.append(s.ToString()); state.append(s.ToString());
} else if (count != WriteBatchInternal::Count(b)) { } else if (count != WriteBatchInternal::Count(b)) {

@ -25,6 +25,7 @@
#ifndef STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_ #ifndef STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_
#define STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_ #define STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_
#include <atomic>
#include <stack> #include <stack>
#include <string> #include <string>
#include <stdint.h> #include <stdint.h>
@ -201,17 +202,39 @@ class WriteBatch : public WriteBatchBase {
// Returns the number of updates in the batch // Returns the number of updates in the batch
int Count() const; int Count() const;
// Returns true if PutCF will be called during Iterate
bool HasPut() const;
// Returns true if DeleteCF will be called during Iterate
bool HasDelete() const;
// Returns true if SingleDeleteCF will be called during Iterate
bool HasSingleDelete() const;
// Returns trie if MergeCF will be called during Iterate
bool HasMerge() const;
using WriteBatchBase::GetWriteBatch; using WriteBatchBase::GetWriteBatch;
WriteBatch* GetWriteBatch() override { return this; } WriteBatch* GetWriteBatch() override { return this; }
// Constructor with a serialized string object // Constructor with a serialized string object
explicit WriteBatch(const std::string& rep) explicit WriteBatch(const std::string& rep);
: save_points_(nullptr), rep_(rep) {}
WriteBatch(const WriteBatch& src);
WriteBatch(WriteBatch&& src);
WriteBatch& operator=(const WriteBatch& src);
WriteBatch& operator=(WriteBatch&& src);
private: private:
friend class WriteBatchInternal; friend class WriteBatchInternal;
SavePoints* save_points_; SavePoints* save_points_;
// For HasXYZ. Mutable to allow lazy computation of results
mutable std::atomic<uint32_t> content_flags_;
// Performs deferred computation of content_flags if necessary
uint32_t ComputeContentFlags() const;
protected: protected:
std::string rep_; // See comment in write_batch.cc for the format of rep_ std::string rep_; // See comment in write_batch.cc for the format of rep_

Loading…
Cancel
Save