From 631863c63b7c992c19e9209f25ec1a16964dd3d5 Mon Sep 17 00:00:00 2001 From: Nathan Bronson Date: Fri, 6 Nov 2015 07:03:30 -0800 Subject: [PATCH] track WriteBatch contents Summary: Parallel writes will only be possible for certain combinations of flags and WriteBatch contents. Traversing the WriteBatch at write time to check these conditions would be expensive, but it is very cheap to keep track of when building WriteBatch-es. When loading WriteBatch-es during recovery, a deferred computation state is used so that the flags never need to be computed. Test Plan: 1. add asserts and EXPECT_EQ-s 2. make check Reviewers: sdong, igor Reviewed By: igor Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D50337 --- db/write_batch.cc | 151 +++++++++++++++++++++++++++++++++- db/write_batch_test.cc | 12 +++ include/rocksdb/write_batch.h | 27 +++++- 3 files changed, 184 insertions(+), 6 deletions(-) diff --git a/db/write_batch.cc b/db/write_batch.cc index 53431b92a..925a05efd 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -42,22 +42,91 @@ namespace rocksdb { +// anon namespace for file-local types +namespace { + +enum ContentFlags : uint32_t { + DEFERRED = 1, + HAS_PUT = 2, + HAS_DELETE = 4, + HAS_SINGLE_DELETE = 8, + HAS_MERGE = 16, +}; + +struct BatchContentClassifier : public WriteBatch::Handler { + uint32_t content_flags = 0; + + Status PutCF(uint32_t, const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_PUT; + return Status::OK(); + } + + Status DeleteCF(uint32_t, const Slice&) override { + content_flags |= ContentFlags::HAS_DELETE; + return Status::OK(); + } + + Status SingleDeleteCF(uint32_t, const Slice&) override { + content_flags |= ContentFlags::HAS_SINGLE_DELETE; + return Status::OK(); + } + + Status MergeCF(uint32_t, const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_MERGE; + return Status::OK(); + } +}; + +} // anon namespace + // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. static const size_t kHeader = 12; struct SavePoint { size_t size; // size of rep_ int count; // count of elements in rep_ - SavePoint(size_t s, int c) : size(s), count(c) {} + uint32_t content_flags; }; struct SavePoints { std::stack stack; }; -WriteBatch::WriteBatch(size_t reserved_bytes) : save_points_(nullptr) { +WriteBatch::WriteBatch(size_t reserved_bytes) + : save_points_(nullptr), content_flags_(0), rep_() { rep_.reserve((reserved_bytes > kHeader) ? reserved_bytes : kHeader); - Clear(); + rep_.resize(kHeader); +} + +WriteBatch::WriteBatch(const std::string& rep) + : save_points_(nullptr), + content_flags_(ContentFlags::DEFERRED), + rep_(rep) {} + +WriteBatch::WriteBatch(const WriteBatch& src) + : save_points_(src.save_points_), + content_flags_(src.content_flags_.load(std::memory_order_relaxed)), + rep_(src.rep_) {} + +WriteBatch::WriteBatch(WriteBatch&& src) + : save_points_(std::move(src.save_points_)), + content_flags_(src.content_flags_.load(std::memory_order_relaxed)), + rep_(std::move(src.rep_)) {} + +WriteBatch& WriteBatch::operator=(const WriteBatch& src) { + if (&src != this) { + this->~WriteBatch(); + new (this) WriteBatch(src); + } + return *this; +} + +WriteBatch& WriteBatch::operator=(WriteBatch&& src) { + if (&src != this) { + this->~WriteBatch(); + new (this) WriteBatch(std::move(src)); + } + return *this; } WriteBatch::~WriteBatch() { @@ -81,6 +150,8 @@ void WriteBatch::Clear() { rep_.clear(); rep_.resize(kHeader); + content_flags_.store(0, std::memory_order_relaxed); + if (save_points_ != nullptr) { while (!save_points_->stack.empty()) { save_points_->stack.pop(); @@ -92,6 +163,38 @@ int WriteBatch::Count() const { return WriteBatchInternal::Count(this); } +uint32_t WriteBatch::ComputeContentFlags() const { + auto rv = content_flags_.load(std::memory_order_relaxed); + if ((rv & ContentFlags::DEFERRED) != 0) { + BatchContentClassifier classifier; + Iterate(&classifier); + rv = classifier.content_flags; + + // this method is conceptually const, because it is performing a lazy + // computation that doesn't affect the abstract state of the batch. + // content_flags_ is marked mutable so that we can perform the + // following assignment + content_flags_.store(rv, std::memory_order_relaxed); + } + return rv; +} + +bool WriteBatch::HasPut() const { + return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0; +} + +bool WriteBatch::HasDelete() const { + return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0; +} + +bool WriteBatch::HasSingleDelete() const { + return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0; +} + +bool WriteBatch::HasMerge() const { + return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0; +} + Status ReadRecordFromWriteBatch(Slice* input, char* tag, uint32_t* column_family, Slice* key, Slice* value, Slice* blob) { @@ -169,21 +272,29 @@ Status WriteBatch::Iterate(Handler* handler) const { switch (tag) { case kTypeColumnFamilyValue: case kTypeValue: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_PUT)); s = handler->PutCF(column_family, key, value); found++; break; case kTypeColumnFamilyDeletion: case kTypeDeletion: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE)); s = handler->DeleteCF(column_family, key); found++; break; case kTypeColumnFamilySingleDeletion: case kTypeSingleDeletion: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE)); s = handler->SingleDeleteCF(column_family, key); found++; break; case kTypeColumnFamilyMerge: case kTypeMerge: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE)); s = handler->MergeCF(column_family, key, value); found++; break; @@ -233,6 +344,9 @@ void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, } PutLengthPrefixedSlice(&b->rep_, key); PutLengthPrefixedSlice(&b->rep_, value); + b->content_flags_.store( + b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, + std::memory_order_relaxed); } void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, @@ -251,6 +365,9 @@ void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, } PutLengthPrefixedSliceParts(&b->rep_, key); PutLengthPrefixedSliceParts(&b->rep_, value); + b->content_flags_.store( + b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, + std::memory_order_relaxed); } void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, @@ -268,6 +385,9 @@ void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, PutVarint32(&b->rep_, column_family_id); } PutLengthPrefixedSlice(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_DELETE, + std::memory_order_relaxed); } void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { @@ -284,6 +404,9 @@ void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, PutVarint32(&b->rep_, column_family_id); } PutLengthPrefixedSliceParts(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_DELETE, + std::memory_order_relaxed); } void WriteBatch::Delete(ColumnFamilyHandle* column_family, @@ -301,6 +424,9 @@ void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, PutVarint32(&b->rep_, column_family_id); } PutLengthPrefixedSlice(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_SINGLE_DELETE, + std::memory_order_relaxed); } void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, @@ -318,6 +444,9 @@ void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, PutVarint32(&b->rep_, column_family_id); } PutLengthPrefixedSliceParts(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_SINGLE_DELETE, + std::memory_order_relaxed); } void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, @@ -336,6 +465,9 @@ void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, } PutLengthPrefixedSlice(&b->rep_, key); PutLengthPrefixedSlice(&b->rep_, value); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_MERGE, + std::memory_order_relaxed); } void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, @@ -355,6 +487,9 @@ void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, } PutLengthPrefixedSliceParts(&b->rep_, key); PutLengthPrefixedSliceParts(&b->rep_, value); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_MERGE, + std::memory_order_relaxed); } void WriteBatch::Merge(ColumnFamilyHandle* column_family, @@ -374,7 +509,8 @@ void WriteBatch::SetSavePoint() { save_points_ = new SavePoints(); } // Record length and count of current batch of writes. - save_points_->stack.push(SavePoint(GetDataSize(), Count())); + save_points_->stack.push(SavePoint{ + GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)}); } Status WriteBatch::RollbackToSavePoint() { @@ -387,6 +523,7 @@ Status WriteBatch::RollbackToSavePoint() { save_points_->stack.pop(); assert(savepoint.size <= rep_.size()); + assert(savepoint.count <= Count()); if (savepoint.size == rep_.size()) { // No changes to rollback @@ -396,6 +533,7 @@ Status WriteBatch::RollbackToSavePoint() { } else { rep_.resize(savepoint.size); WriteBatchInternal::SetCount(this, savepoint.count); + content_flags_.store(savepoint.content_flags, std::memory_order_relaxed); } return Status::OK(); @@ -670,12 +808,17 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= kHeader); b->rep_.assign(contents.data(), contents.size()); + b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); } void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { SetCount(dst, Count(dst) + Count(src)); assert(src->rep_.size() >= kHeader); dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); + dst->content_flags_.store( + dst->content_flags_.load(std::memory_order_relaxed) | + src->content_flags_.load(std::memory_order_relaxed), + std::memory_order_relaxed); } } // namespace rocksdb diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 4f73c82c8..62830da48 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -39,6 +39,10 @@ static std::string PrintContents(WriteBatch* b) { ColumnFamilyMemTablesDefault cf_mems_default(mem); Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default); int count = 0; + int put_count = 0; + int delete_count = 0; + int single_delete_count = 0; + int merge_count = 0; Arena arena; ScopedArenaIterator iter(mem->NewIterator(ReadOptions(), &arena)); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -53,18 +57,21 @@ static std::string PrintContents(WriteBatch* b) { state.append(iter->value().ToString()); state.append(")"); count++; + put_count++; break; case kTypeDeletion: state.append("Delete("); state.append(ikey.user_key.ToString()); state.append(")"); count++; + delete_count++; break; case kTypeSingleDeletion: state.append("SingleDelete("); state.append(ikey.user_key.ToString()); state.append(")"); count++; + single_delete_count++; break; case kTypeMerge: state.append("Merge("); @@ -73,6 +80,7 @@ static std::string PrintContents(WriteBatch* b) { state.append(iter->value().ToString()); state.append(")"); count++; + merge_count++; break; default: assert(false); @@ -81,6 +89,10 @@ static std::string PrintContents(WriteBatch* b) { state.append("@"); state.append(NumberToString(ikey.sequence)); } + EXPECT_EQ(b->HasPut(), put_count > 0); + EXPECT_EQ(b->HasDelete(), delete_count > 0); + EXPECT_EQ(b->HasSingleDelete(), single_delete_count > 0); + EXPECT_EQ(b->HasMerge(), merge_count > 0); if (!s.ok()) { state.append(s.ToString()); } else if (count != WriteBatchInternal::Count(b)) { diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 584e7347a..f4a7ac06e 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -25,6 +25,7 @@ #ifndef STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_ #define STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_ +#include #include #include #include @@ -201,17 +202,39 @@ class WriteBatch : public WriteBatchBase { // Returns the number of updates in the batch int Count() const; + // Returns true if PutCF will be called during Iterate + bool HasPut() const; + + // Returns true if DeleteCF will be called during Iterate + bool HasDelete() const; + + // Returns true if SingleDeleteCF will be called during Iterate + bool HasSingleDelete() const; + + // Returns trie if MergeCF will be called during Iterate + bool HasMerge() const; + using WriteBatchBase::GetWriteBatch; WriteBatch* GetWriteBatch() override { return this; } // Constructor with a serialized string object - explicit WriteBatch(const std::string& rep) - : save_points_(nullptr), rep_(rep) {} + explicit WriteBatch(const std::string& rep); + + WriteBatch(const WriteBatch& src); + WriteBatch(WriteBatch&& src); + WriteBatch& operator=(const WriteBatch& src); + WriteBatch& operator=(WriteBatch&& src); private: friend class WriteBatchInternal; SavePoints* save_points_; + // For HasXYZ. Mutable to allow lazy computation of results + mutable std::atomic content_flags_; + + // Performs deferred computation of content_flags if necessary + uint32_t ComputeContentFlags() const; + protected: std::string rep_; // See comment in write_batch.cc for the format of rep_