diff --git a/db/write_batch.cc b/db/write_batch.cc index 53431b92a..925a05efd 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -42,22 +42,91 @@ namespace rocksdb { +// anon namespace for file-local types +namespace { + +enum ContentFlags : uint32_t { + DEFERRED = 1, + HAS_PUT = 2, + HAS_DELETE = 4, + HAS_SINGLE_DELETE = 8, + HAS_MERGE = 16, +}; + +struct BatchContentClassifier : public WriteBatch::Handler { + uint32_t content_flags = 0; + + Status PutCF(uint32_t, const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_PUT; + return Status::OK(); + } + + Status DeleteCF(uint32_t, const Slice&) override { + content_flags |= ContentFlags::HAS_DELETE; + return Status::OK(); + } + + Status SingleDeleteCF(uint32_t, const Slice&) override { + content_flags |= ContentFlags::HAS_SINGLE_DELETE; + return Status::OK(); + } + + Status MergeCF(uint32_t, const Slice&, const Slice&) override { + content_flags |= ContentFlags::HAS_MERGE; + return Status::OK(); + } +}; + +} // anon namespace + // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. static const size_t kHeader = 12; struct SavePoint { size_t size; // size of rep_ int count; // count of elements in rep_ - SavePoint(size_t s, int c) : size(s), count(c) {} + uint32_t content_flags; }; struct SavePoints { std::stack stack; }; -WriteBatch::WriteBatch(size_t reserved_bytes) : save_points_(nullptr) { +WriteBatch::WriteBatch(size_t reserved_bytes) + : save_points_(nullptr), content_flags_(0), rep_() { rep_.reserve((reserved_bytes > kHeader) ? reserved_bytes : kHeader); - Clear(); + rep_.resize(kHeader); +} + +WriteBatch::WriteBatch(const std::string& rep) + : save_points_(nullptr), + content_flags_(ContentFlags::DEFERRED), + rep_(rep) {} + +WriteBatch::WriteBatch(const WriteBatch& src) + : save_points_(src.save_points_), + content_flags_(src.content_flags_.load(std::memory_order_relaxed)), + rep_(src.rep_) {} + +WriteBatch::WriteBatch(WriteBatch&& src) + : save_points_(std::move(src.save_points_)), + content_flags_(src.content_flags_.load(std::memory_order_relaxed)), + rep_(std::move(src.rep_)) {} + +WriteBatch& WriteBatch::operator=(const WriteBatch& src) { + if (&src != this) { + this->~WriteBatch(); + new (this) WriteBatch(src); + } + return *this; +} + +WriteBatch& WriteBatch::operator=(WriteBatch&& src) { + if (&src != this) { + this->~WriteBatch(); + new (this) WriteBatch(std::move(src)); + } + return *this; } WriteBatch::~WriteBatch() { @@ -81,6 +150,8 @@ void WriteBatch::Clear() { rep_.clear(); rep_.resize(kHeader); + content_flags_.store(0, std::memory_order_relaxed); + if (save_points_ != nullptr) { while (!save_points_->stack.empty()) { save_points_->stack.pop(); @@ -92,6 +163,38 @@ int WriteBatch::Count() const { return WriteBatchInternal::Count(this); } +uint32_t WriteBatch::ComputeContentFlags() const { + auto rv = content_flags_.load(std::memory_order_relaxed); + if ((rv & ContentFlags::DEFERRED) != 0) { + BatchContentClassifier classifier; + Iterate(&classifier); + rv = classifier.content_flags; + + // this method is conceptually const, because it is performing a lazy + // computation that doesn't affect the abstract state of the batch. + // content_flags_ is marked mutable so that we can perform the + // following assignment + content_flags_.store(rv, std::memory_order_relaxed); + } + return rv; +} + +bool WriteBatch::HasPut() const { + return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0; +} + +bool WriteBatch::HasDelete() const { + return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0; +} + +bool WriteBatch::HasSingleDelete() const { + return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0; +} + +bool WriteBatch::HasMerge() const { + return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0; +} + Status ReadRecordFromWriteBatch(Slice* input, char* tag, uint32_t* column_family, Slice* key, Slice* value, Slice* blob) { @@ -169,21 +272,29 @@ Status WriteBatch::Iterate(Handler* handler) const { switch (tag) { case kTypeColumnFamilyValue: case kTypeValue: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_PUT)); s = handler->PutCF(column_family, key, value); found++; break; case kTypeColumnFamilyDeletion: case kTypeDeletion: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE)); s = handler->DeleteCF(column_family, key); found++; break; case kTypeColumnFamilySingleDeletion: case kTypeSingleDeletion: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE)); s = handler->SingleDeleteCF(column_family, key); found++; break; case kTypeColumnFamilyMerge: case kTypeMerge: + assert(content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE)); s = handler->MergeCF(column_family, key, value); found++; break; @@ -233,6 +344,9 @@ void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, } PutLengthPrefixedSlice(&b->rep_, key); PutLengthPrefixedSlice(&b->rep_, value); + b->content_flags_.store( + b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, + std::memory_order_relaxed); } void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, @@ -251,6 +365,9 @@ void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, } PutLengthPrefixedSliceParts(&b->rep_, key); PutLengthPrefixedSliceParts(&b->rep_, value); + b->content_flags_.store( + b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, + std::memory_order_relaxed); } void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, @@ -268,6 +385,9 @@ void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, PutVarint32(&b->rep_, column_family_id); } PutLengthPrefixedSlice(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_DELETE, + std::memory_order_relaxed); } void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { @@ -284,6 +404,9 @@ void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, PutVarint32(&b->rep_, column_family_id); } PutLengthPrefixedSliceParts(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_DELETE, + std::memory_order_relaxed); } void WriteBatch::Delete(ColumnFamilyHandle* column_family, @@ -301,6 +424,9 @@ void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, PutVarint32(&b->rep_, column_family_id); } PutLengthPrefixedSlice(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_SINGLE_DELETE, + std::memory_order_relaxed); } void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, @@ -318,6 +444,9 @@ void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, PutVarint32(&b->rep_, column_family_id); } PutLengthPrefixedSliceParts(&b->rep_, key); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_SINGLE_DELETE, + std::memory_order_relaxed); } void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, @@ -336,6 +465,9 @@ void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, } PutLengthPrefixedSlice(&b->rep_, key); PutLengthPrefixedSlice(&b->rep_, value); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_MERGE, + std::memory_order_relaxed); } void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, @@ -355,6 +487,9 @@ void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, } PutLengthPrefixedSliceParts(&b->rep_, key); PutLengthPrefixedSliceParts(&b->rep_, value); + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_MERGE, + std::memory_order_relaxed); } void WriteBatch::Merge(ColumnFamilyHandle* column_family, @@ -374,7 +509,8 @@ void WriteBatch::SetSavePoint() { save_points_ = new SavePoints(); } // Record length and count of current batch of writes. - save_points_->stack.push(SavePoint(GetDataSize(), Count())); + save_points_->stack.push(SavePoint{ + GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)}); } Status WriteBatch::RollbackToSavePoint() { @@ -387,6 +523,7 @@ Status WriteBatch::RollbackToSavePoint() { save_points_->stack.pop(); assert(savepoint.size <= rep_.size()); + assert(savepoint.count <= Count()); if (savepoint.size == rep_.size()) { // No changes to rollback @@ -396,6 +533,7 @@ Status WriteBatch::RollbackToSavePoint() { } else { rep_.resize(savepoint.size); WriteBatchInternal::SetCount(this, savepoint.count); + content_flags_.store(savepoint.content_flags, std::memory_order_relaxed); } return Status::OK(); @@ -670,12 +808,17 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= kHeader); b->rep_.assign(contents.data(), contents.size()); + b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); } void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { SetCount(dst, Count(dst) + Count(src)); assert(src->rep_.size() >= kHeader); dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); + dst->content_flags_.store( + dst->content_flags_.load(std::memory_order_relaxed) | + src->content_flags_.load(std::memory_order_relaxed), + std::memory_order_relaxed); } } // namespace rocksdb diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 4f73c82c8..62830da48 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -39,6 +39,10 @@ static std::string PrintContents(WriteBatch* b) { ColumnFamilyMemTablesDefault cf_mems_default(mem); Status s = WriteBatchInternal::InsertInto(b, &cf_mems_default); int count = 0; + int put_count = 0; + int delete_count = 0; + int single_delete_count = 0; + int merge_count = 0; Arena arena; ScopedArenaIterator iter(mem->NewIterator(ReadOptions(), &arena)); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -53,18 +57,21 @@ static std::string PrintContents(WriteBatch* b) { state.append(iter->value().ToString()); state.append(")"); count++; + put_count++; break; case kTypeDeletion: state.append("Delete("); state.append(ikey.user_key.ToString()); state.append(")"); count++; + delete_count++; break; case kTypeSingleDeletion: state.append("SingleDelete("); state.append(ikey.user_key.ToString()); state.append(")"); count++; + single_delete_count++; break; case kTypeMerge: state.append("Merge("); @@ -73,6 +80,7 @@ static std::string PrintContents(WriteBatch* b) { state.append(iter->value().ToString()); state.append(")"); count++; + merge_count++; break; default: assert(false); @@ -81,6 +89,10 @@ static std::string PrintContents(WriteBatch* b) { state.append("@"); state.append(NumberToString(ikey.sequence)); } + EXPECT_EQ(b->HasPut(), put_count > 0); + EXPECT_EQ(b->HasDelete(), delete_count > 0); + EXPECT_EQ(b->HasSingleDelete(), single_delete_count > 0); + EXPECT_EQ(b->HasMerge(), merge_count > 0); if (!s.ok()) { state.append(s.ToString()); } else if (count != WriteBatchInternal::Count(b)) { diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 584e7347a..f4a7ac06e 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -25,6 +25,7 @@ #ifndef STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_ #define STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_ +#include #include #include #include @@ -201,17 +202,39 @@ class WriteBatch : public WriteBatchBase { // Returns the number of updates in the batch int Count() const; + // Returns true if PutCF will be called during Iterate + bool HasPut() const; + + // Returns true if DeleteCF will be called during Iterate + bool HasDelete() const; + + // Returns true if SingleDeleteCF will be called during Iterate + bool HasSingleDelete() const; + + // Returns trie if MergeCF will be called during Iterate + bool HasMerge() const; + using WriteBatchBase::GetWriteBatch; WriteBatch* GetWriteBatch() override { return this; } // Constructor with a serialized string object - explicit WriteBatch(const std::string& rep) - : save_points_(nullptr), rep_(rep) {} + explicit WriteBatch(const std::string& rep); + + WriteBatch(const WriteBatch& src); + WriteBatch(WriteBatch&& src); + WriteBatch& operator=(const WriteBatch& src); + WriteBatch& operator=(WriteBatch&& src); private: friend class WriteBatchInternal; SavePoints* save_points_; + // For HasXYZ. Mutable to allow lazy computation of results + mutable std::atomic content_flags_; + + // Performs deferred computation of content_flags if necessary + uint32_t ComputeContentFlags() const; + protected: std::string rep_; // See comment in write_batch.cc for the format of rep_