diff --git a/db/write_batch.cc b/db/write_batch.cc index cc295791d..dacd17ed4 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -124,8 +124,8 @@ struct SavePoints { std::stack stack; }; -WriteBatch::WriteBatch(size_t reserved_bytes) - : save_points_(nullptr), content_flags_(0), rep_() { +WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes) + : save_points_(nullptr), content_flags_(0), max_bytes_(max_bytes), rep_() { rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ? reserved_bytes : WriteBatchInternal::kHeader); rep_.resize(WriteBatchInternal::kHeader); @@ -134,18 +134,21 @@ WriteBatch::WriteBatch(size_t reserved_bytes) WriteBatch::WriteBatch(const std::string& rep) : save_points_(nullptr), content_flags_(ContentFlags::DEFERRED), + max_bytes_(0), rep_(rep) {} WriteBatch::WriteBatch(const WriteBatch& src) : save_points_(src.save_points_), wal_term_point_(src.wal_term_point_), content_flags_(src.content_flags_.load(std::memory_order_relaxed)), + max_bytes_(src.max_bytes_), rep_(src.rep_) {} WriteBatch::WriteBatch(WriteBatch&& src) : save_points_(std::move(src.save_points_)), wal_term_point_(std::move(src.wal_term_point_)), content_flags_(src.content_flags_.load(std::memory_order_relaxed)), + max_bytes_(src.max_bytes_), rep_(std::move(src.rep_)) {} WriteBatch& WriteBatch::operator=(const WriteBatch& src) { @@ -470,8 +473,9 @@ size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) { return WriteBatchInternal::kHeader; } -void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, - const Slice& key, const Slice& value) { +Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, + const Slice& key, const Slice& value) { + LocalSavePoint save(b); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); if (column_family_id == 0) { b->rep_.push_back(static_cast(kTypeValue)); @@ -484,15 +488,18 @@ void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, b->content_flags_.store( b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, std::memory_order_relaxed); + return save.commit(); } -void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) { - WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value); +Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) { + return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, + value); } -void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, - const SliceParts& key, const SliceParts& value) { +Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, + const SliceParts& key, const SliceParts& value) { + LocalSavePoint save(b); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); if (column_family_id == 0) { b->rep_.push_back(static_cast(kTypeValue)); @@ -505,18 +512,21 @@ void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, b->content_flags_.store( b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, std::memory_order_relaxed); + return save.commit(); } -void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) { - WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value); +Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value) { + return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, + value); } -void WriteBatchInternal::InsertNoop(WriteBatch* b) { +Status WriteBatchInternal::InsertNoop(WriteBatch* b) { b->rep_.push_back(static_cast(kTypeNoop)); + return Status::OK(); } -void WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) { +Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) { // a manually constructed batch can only contain one prepare section assert(b->rep_[12] == static_cast(kTypeNoop)); @@ -535,26 +545,30 @@ void WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) { ContentFlags::HAS_END_PREPARE | ContentFlags::HAS_BEGIN_PREPARE, std::memory_order_relaxed); + return Status::OK(); } -void WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) { +Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) { b->rep_.push_back(static_cast(kTypeCommitXID)); PutLengthPrefixedSlice(&b->rep_, xid); b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_COMMIT, std::memory_order_relaxed); + return Status::OK(); } -void WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) { +Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) { b->rep_.push_back(static_cast(kTypeRollbackXID)); PutLengthPrefixedSlice(&b->rep_, xid); b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_ROLLBACK, std::memory_order_relaxed); + return Status::OK(); } -void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, - const Slice& key) { +Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, + const Slice& key) { + LocalSavePoint save(b); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); if (column_family_id == 0) { b->rep_.push_back(static_cast(kTypeDeletion)); @@ -566,14 +580,17 @@ void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_DELETE, std::memory_order_relaxed); + return save.commit(); } -void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { - WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key); +Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { + return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), + key); } -void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, - const SliceParts& key) { +Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, + const SliceParts& key) { + LocalSavePoint save(b); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); if (column_family_id == 0) { b->rep_.push_back(static_cast(kTypeDeletion)); @@ -585,15 +602,19 @@ void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_DELETE, std::memory_order_relaxed); + return save.commit(); } -void WriteBatch::Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) { - WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key); +Status WriteBatch::Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) { + return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), + key); } -void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, - const Slice& key) { +Status WriteBatchInternal::SingleDelete(WriteBatch* b, + uint32_t column_family_id, + const Slice& key) { + LocalSavePoint save(b); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); if (column_family_id == 0) { b->rep_.push_back(static_cast(kTypeSingleDeletion)); @@ -605,15 +626,19 @@ void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_SINGLE_DELETE, std::memory_order_relaxed); + return save.commit(); } -void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, - const Slice& key) { - WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key); +Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) { + return WriteBatchInternal::SingleDelete( + this, GetColumnFamilyID(column_family), key); } -void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, - const SliceParts& key) { +Status WriteBatchInternal::SingleDelete(WriteBatch* b, + uint32_t column_family_id, + const SliceParts& key) { + LocalSavePoint save(b); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); if (column_family_id == 0) { b->rep_.push_back(static_cast(kTypeSingleDeletion)); @@ -625,16 +650,19 @@ void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_SINGLE_DELETE, std::memory_order_relaxed); + return save.commit(); } -void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, - const SliceParts& key) { - WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key); +Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, + const SliceParts& key) { + return WriteBatchInternal::SingleDelete( + this, GetColumnFamilyID(column_family), key); } -void WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, - const Slice& begin_key, - const Slice& end_key) { +Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, + const Slice& begin_key, + const Slice& end_key) { + LocalSavePoint save(b); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); if (column_family_id == 0) { b->rep_.push_back(static_cast(kTypeRangeDeletion)); @@ -647,17 +675,19 @@ void WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_DELETE_RANGE, std::memory_order_relaxed); + return save.commit(); } -void WriteBatch::DeleteRange(ColumnFamilyHandle* column_family, - const Slice& begin_key, const Slice& end_key) { - WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family), - begin_key, end_key); +Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family, + const Slice& begin_key, const Slice& end_key) { + return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family), + begin_key, end_key); } -void WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, - const SliceParts& begin_key, - const SliceParts& end_key) { +Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, + const SliceParts& begin_key, + const SliceParts& end_key) { + LocalSavePoint save(b); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); if (column_family_id == 0) { b->rep_.push_back(static_cast(kTypeRangeDeletion)); @@ -670,17 +700,19 @@ void WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_DELETE_RANGE, std::memory_order_relaxed); + return save.commit(); } -void WriteBatch::DeleteRange(ColumnFamilyHandle* column_family, - const SliceParts& begin_key, - const SliceParts& end_key) { - WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family), - begin_key, end_key); +Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family, + const SliceParts& begin_key, + const SliceParts& end_key) { + return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family), + begin_key, end_key); } -void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, - const Slice& key, const Slice& value) { +Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, + const Slice& key, const Slice& value) { + LocalSavePoint save(b); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); if (column_family_id == 0) { b->rep_.push_back(static_cast(kTypeMerge)); @@ -693,16 +725,19 @@ void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_MERGE, std::memory_order_relaxed); + return save.commit(); } -void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) { - WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key, value); +Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) { + return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key, + value); } -void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, - const SliceParts& key, - const SliceParts& value) { +Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, + const SliceParts& key, + const SliceParts& value) { + LocalSavePoint save(b); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); if (column_family_id == 0) { b->rep_.push_back(static_cast(kTypeMerge)); @@ -715,18 +750,20 @@ void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_MERGE, std::memory_order_relaxed); + return save.commit(); } -void WriteBatch::Merge(ColumnFamilyHandle* column_family, - const SliceParts& key, - const SliceParts& value) { - WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), - key, value); +Status WriteBatch::Merge(ColumnFamilyHandle* column_family, + const SliceParts& key, const SliceParts& value) { + return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key, + value); } -void WriteBatch::PutLogData(const Slice& blob) { +Status WriteBatch::PutLogData(const Slice& blob) { + LocalSavePoint save(this); rep_.push_back(static_cast(kTypeLogData)); PutLengthPrefixedSlice(&rep_, blob); + return save.commit(); } void WriteBatch::SetSavePoint() { @@ -1300,14 +1337,15 @@ Status WriteBatchInternal::InsertInto( return s; } -void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { +Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= WriteBatchInternal::kHeader); b->rep_.assign(contents.data(), contents.size()); b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); + return Status::OK(); } -void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src, - const bool wal_only) { +Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src, + const bool wal_only) { size_t src_len; int src_count; uint32_t src_flags; @@ -1330,6 +1368,7 @@ void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src, dst->content_flags_.store( dst->content_flags_.load(std::memory_order_relaxed) | src_flags, std::memory_order_relaxed); + return Status::OK(); } size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize, diff --git a/db/write_batch_base.cc b/db/write_batch_base.cc index 159912619..a272675c9 100644 --- a/db/write_batch_base.cc +++ b/db/write_batch_base.cc @@ -8,86 +8,87 @@ #include #include "rocksdb/slice.h" +#include "rocksdb/status.h" namespace rocksdb { // Simple implementation of SlicePart variants of Put(). Child classes // can override these method with more performant solutions if they choose. -void WriteBatchBase::Put(ColumnFamilyHandle* column_family, - const SliceParts& key, const SliceParts& value) { +Status WriteBatchBase::Put(ColumnFamilyHandle* column_family, + const SliceParts& key, const SliceParts& value) { std::string key_buf, value_buf; Slice key_slice(key, &key_buf); Slice value_slice(value, &value_buf); - Put(column_family, key_slice, value_slice); + return Put(column_family, key_slice, value_slice); } -void WriteBatchBase::Put(const SliceParts& key, const SliceParts& value) { +Status WriteBatchBase::Put(const SliceParts& key, const SliceParts& value) { std::string key_buf, value_buf; Slice key_slice(key, &key_buf); Slice value_slice(value, &value_buf); - Put(key_slice, value_slice); + return Put(key_slice, value_slice); } -void WriteBatchBase::Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) { +Status WriteBatchBase::Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) { std::string key_buf; Slice key_slice(key, &key_buf); - Delete(column_family, key_slice); + return Delete(column_family, key_slice); } -void WriteBatchBase::Delete(const SliceParts& key) { +Status WriteBatchBase::Delete(const SliceParts& key) { std::string key_buf; Slice key_slice(key, &key_buf); - Delete(key_slice); + return Delete(key_slice); } -void WriteBatchBase::SingleDelete(ColumnFamilyHandle* column_family, - const SliceParts& key) { +Status WriteBatchBase::SingleDelete(ColumnFamilyHandle* column_family, + const SliceParts& key) { std::string key_buf; Slice key_slice(key, &key_buf); - SingleDelete(column_family, key_slice); + return SingleDelete(column_family, key_slice); } -void WriteBatchBase::SingleDelete(const SliceParts& key) { +Status WriteBatchBase::SingleDelete(const SliceParts& key) { std::string key_buf; Slice key_slice(key, &key_buf); - SingleDelete(key_slice); + return SingleDelete(key_slice); } -void WriteBatchBase::DeleteRange(ColumnFamilyHandle* column_family, - const SliceParts& begin_key, - const SliceParts& end_key) { +Status WriteBatchBase::DeleteRange(ColumnFamilyHandle* column_family, + const SliceParts& begin_key, + const SliceParts& end_key) { std::string begin_key_buf, end_key_buf; Slice begin_key_slice(begin_key, &begin_key_buf); Slice end_key_slice(end_key, &end_key_buf); - DeleteRange(column_family, begin_key_slice, end_key_slice); + return DeleteRange(column_family, begin_key_slice, end_key_slice); } -void WriteBatchBase::DeleteRange(const SliceParts& begin_key, - const SliceParts& end_key) { +Status WriteBatchBase::DeleteRange(const SliceParts& begin_key, + const SliceParts& end_key) { std::string begin_key_buf, end_key_buf; Slice begin_key_slice(begin_key, &begin_key_buf); Slice end_key_slice(end_key, &end_key_buf); - DeleteRange(begin_key_slice, end_key_slice); + return DeleteRange(begin_key_slice, end_key_slice); } -void WriteBatchBase::Merge(ColumnFamilyHandle* column_family, - const SliceParts& key, const SliceParts& value) { +Status WriteBatchBase::Merge(ColumnFamilyHandle* column_family, + const SliceParts& key, const SliceParts& value) { std::string key_buf, value_buf; Slice key_slice(key, &key_buf); Slice value_slice(value, &value_buf); - Merge(column_family, key_slice, value_slice); + return Merge(column_family, key_slice, value_slice); } -void WriteBatchBase::Merge(const SliceParts& key, const SliceParts& value) { +Status WriteBatchBase::Merge(const SliceParts& key, const SliceParts& value) { std::string key_buf, value_buf; Slice key_slice(key, &key_buf); Slice value_slice(value, &value_buf); - Merge(key_slice, value_slice); + return Merge(key_slice, value_slice); } } // namespace rocksdb diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 77e46ecff..16d671cee 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -68,44 +68,44 @@ class WriteBatchInternal { static const size_t kHeader = 12; // WriteBatch methods with column_family_id instead of ColumnFamilyHandle* - static void Put(WriteBatch* batch, uint32_t column_family_id, - const Slice& key, const Slice& value); + static Status Put(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const Slice& value); - static void Put(WriteBatch* batch, uint32_t column_family_id, - const SliceParts& key, const SliceParts& value); + static Status Put(WriteBatch* batch, uint32_t column_family_id, + const SliceParts& key, const SliceParts& value); - static void Delete(WriteBatch* batch, uint32_t column_family_id, - const SliceParts& key); + static Status Delete(WriteBatch* batch, uint32_t column_family_id, + const SliceParts& key); - static void Delete(WriteBatch* batch, uint32_t column_family_id, - const Slice& key); + static Status Delete(WriteBatch* batch, uint32_t column_family_id, + const Slice& key); - static void SingleDelete(WriteBatch* batch, uint32_t column_family_id, - const SliceParts& key); + static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id, + const SliceParts& key); - static void SingleDelete(WriteBatch* batch, uint32_t column_family_id, - const Slice& key); + static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id, + const Slice& key); - static void DeleteRange(WriteBatch* b, uint32_t column_family_id, - const Slice& begin_key, const Slice& end_key); + static Status DeleteRange(WriteBatch* b, uint32_t column_family_id, + const Slice& begin_key, const Slice& end_key); - static void DeleteRange(WriteBatch* b, uint32_t column_family_id, - const SliceParts& begin_key, - const SliceParts& end_key); + static Status DeleteRange(WriteBatch* b, uint32_t column_family_id, + const SliceParts& begin_key, + const SliceParts& end_key); - static void Merge(WriteBatch* batch, uint32_t column_family_id, - const Slice& key, const Slice& value); + static Status Merge(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const Slice& value); - static void Merge(WriteBatch* batch, uint32_t column_family_id, - const SliceParts& key, const SliceParts& value); + static Status Merge(WriteBatch* batch, uint32_t column_family_id, + const SliceParts& key, const SliceParts& value); - static void MarkEndPrepare(WriteBatch* batch, const Slice& xid); + static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid); - static void MarkRollback(WriteBatch* batch, const Slice& xid); + static Status MarkRollback(WriteBatch* batch, const Slice& xid); - static void MarkCommit(WriteBatch* batch, const Slice& xid); + static Status MarkCommit(WriteBatch* batch, const Slice& xid); - static void InsertNoop(WriteBatch* batch); + static Status InsertNoop(WriteBatch* batch); // Return the number of entries in the batch. static int Count(const WriteBatch* batch); @@ -132,7 +132,7 @@ class WriteBatchInternal { return batch->rep_.size(); } - static void SetContents(WriteBatch* batch, const Slice& contents); + static Status SetContents(WriteBatch* batch, const Slice& contents); // Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive. // @@ -177,12 +177,51 @@ class WriteBatchInternal { uint64_t log_number = 0, DB* db = nullptr, bool concurrent_memtable_writes = false); - static void Append(WriteBatch* dst, const WriteBatch* src, - const bool WAL_only = false); + static Status Append(WriteBatch* dst, const WriteBatch* src, + const bool WAL_only = false); // Returns the byte size of appending a WriteBatch with ByteSize // leftByteSize and a WriteBatch with ByteSize rightByteSize static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize); }; +// LocalSavePoint is similar to a scope guard +class LocalSavePoint { + public: + explicit LocalSavePoint(WriteBatch* batch) + : batch_(batch), + savepoint_(batch->GetDataSize(), batch->Count(), + batch->content_flags_.load(std::memory_order_relaxed)) +#ifndef NDEBUG + , + committed_(false) +#endif + { + } + +#ifndef NDEBUG + ~LocalSavePoint() { assert(committed_); } +#endif + Status commit() { +#ifndef NDEBUG + committed_ = true; +#endif + if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) { + batch_->rep_.resize(savepoint_.size); + WriteBatchInternal::SetCount(batch_, savepoint_.count); + batch_->content_flags_.store(savepoint_.content_flags, + std::memory_order_relaxed); + return Status::MemoryLimit(); + } + return Status::OK(); + } + + private: + WriteBatch* batch_; + SavePoint savepoint_; +#ifndef NDEBUG + bool committed_; +#endif +}; + } // namespace rocksdb diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 3006d19f2..02596799a 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -861,6 +861,18 @@ TEST_F(WriteBatchTest, SavePointTest) { ASSERT_EQ("", PrintContents(&batch2)); } +TEST_F(WriteBatchTest, MemoryLimitTest) { + Status s; + // The header size is 12 bytes. The two Puts take 8 bytes which gives total + // of 12 + 8 * 2 = 28 bytes. + WriteBatch batch(0, 28); + + ASSERT_OK(batch.Put("a", "....")); + ASSERT_OK(batch.Put("b", "....")); + s = batch.Put("c", "...."); + ASSERT_TRUE(s.IsMemoryLimit()); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 40f3eecd2..c30abf191 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -71,6 +71,7 @@ class Status { kNoSpace = 4, kDeadlock = 5, kStaleFile = 6, + kMemoryLimit = 7, kMaxSubCode }; @@ -166,6 +167,11 @@ class Status { return Status(kIOError, kNoSpace, msg, msg2); } + static Status MemoryLimit() { return Status(kAborted, kMemoryLimit); } + static Status MemoryLimit(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kAborted, kMemoryLimit, msg, msg2); + } + // Returns true iff the status indicates success. bool ok() const { return code() == kOk; } @@ -224,6 +230,13 @@ class Status { return (code() == kIOError) && (subcode() == kNoSpace); } + // Returns true iff the status indicates a memory limit error. There may be + // cases where we limit the memory used in certain operations (eg. the size + // of a write batch) in order to avoid out of memory exceptions. + bool IsMemoryLimit() const { + return (code() == kAborted) && (subcode() == kMemoryLimit); + } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index dd495e08f..33b762913 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -100,6 +100,9 @@ struct TransactionOptions { // The number of traversals to make during deadlock detection. int64_t deadlock_detect_depth = 50; + + // The maximum number of bytes used for the write batch. 0 means no limit. + size_t max_write_batch_size = 0; }; struct KeyLockInfo { diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 8c519fe5c..3e9381ab5 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -88,42 +88,45 @@ class WriteBatchWithIndex : public WriteBatchBase { // interface, or we can't find a column family from the column family handle // passed in, backup_index_comparator will be used for the column family. // reserved_bytes: reserved bytes in underlying WriteBatch + // max_bytes: maximum size of underlying WriteBatch in bytes // overwrite_key: if true, overwrite the key in the index when inserting // the same key as previously, so iterator will never // show two entries with the same key. explicit WriteBatchWithIndex( const Comparator* backup_index_comparator = BytewiseComparator(), - size_t reserved_bytes = 0, bool overwrite_key = false); + size_t reserved_bytes = 0, bool overwrite_key = false, + size_t max_bytes = 0); + virtual ~WriteBatchWithIndex(); using WriteBatchBase::Put; - void Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; + Status Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; - void Put(const Slice& key, const Slice& value) override; + Status Put(const Slice& key, const Slice& value) override; using WriteBatchBase::Merge; - void Merge(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; + Status Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; - void Merge(const Slice& key, const Slice& value) override; + Status Merge(const Slice& key, const Slice& value) override; using WriteBatchBase::Delete; - void Delete(ColumnFamilyHandle* column_family, const Slice& key) override; - void Delete(const Slice& key) override; + Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override; + Status Delete(const Slice& key) override; using WriteBatchBase::SingleDelete; - void SingleDelete(ColumnFamilyHandle* column_family, - const Slice& key) override; - void SingleDelete(const Slice& key) override; + Status SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) override; + Status SingleDelete(const Slice& key) override; using WriteBatchBase::DeleteRange; - void DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key, - const Slice& end_key) override; - void DeleteRange(const Slice& begin_key, const Slice& end_key) override; + Status DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key, + const Slice& end_key) override; + Status DeleteRange(const Slice& begin_key, const Slice& end_key) override; using WriteBatchBase::PutLogData; - void PutLogData(const Slice& blob) override; + Status PutLogData(const Slice& blob) override; using WriteBatchBase::Clear; void Clear() override; @@ -204,6 +207,8 @@ class WriteBatchWithIndex : public WriteBatchBase { // or other Status on corruption. Status RollbackToSavePoint() override; + void SetMaxBytes(size_t max_bytes) override; + private: struct Rep; std::unique_ptr rep; diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 89f9e5017..c60ac1796 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -60,80 +60,82 @@ struct SavePoint { class WriteBatch : public WriteBatchBase { public: - explicit WriteBatch(size_t reserved_bytes = 0); + explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0); ~WriteBatch(); using WriteBatchBase::Put; // Store the mapping "key->value" in the database. - void Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - void Put(const Slice& key, const Slice& value) override { - Put(nullptr, key, value); + Status Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + Status Put(const Slice& key, const Slice& value) override { + return Put(nullptr, key, value); } // Variant of Put() that gathers output like writev(2). The key and value // that will be written to the database are concatentations of arrays of // slices. - void Put(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) override; - void Put(const SliceParts& key, const SliceParts& value) override { - Put(nullptr, key, value); + Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value) override; + Status Put(const SliceParts& key, const SliceParts& value) override { + return Put(nullptr, key, value); } using WriteBatchBase::Delete; // If the database contains a mapping for "key", erase it. Else do nothing. - void Delete(ColumnFamilyHandle* column_family, const Slice& key) override; - void Delete(const Slice& key) override { Delete(nullptr, key); } + Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override; + Status Delete(const Slice& key) override { return Delete(nullptr, key); } // variant that takes SliceParts - void Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) override; - void Delete(const SliceParts& key) override { Delete(nullptr, key); } + Status Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) override; + Status Delete(const SliceParts& key) override { return Delete(nullptr, key); } using WriteBatchBase::SingleDelete; // WriteBatch implementation of DB::SingleDelete(). See db.h. - void SingleDelete(ColumnFamilyHandle* column_family, - const Slice& key) override; - void SingleDelete(const Slice& key) override { SingleDelete(nullptr, key); } + Status SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) override; + Status SingleDelete(const Slice& key) override { + return SingleDelete(nullptr, key); + } // variant that takes SliceParts - void SingleDelete(ColumnFamilyHandle* column_family, - const SliceParts& key) override; - void SingleDelete(const SliceParts& key) override { - SingleDelete(nullptr, key); + Status SingleDelete(ColumnFamilyHandle* column_family, + const SliceParts& key) override; + Status SingleDelete(const SliceParts& key) override { + return SingleDelete(nullptr, key); } using WriteBatchBase::DeleteRange; // WriteBatch implementation of DB::DeleteRange(). See db.h. - void DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key, - const Slice& end_key) override; - void DeleteRange(const Slice& begin_key, const Slice& end_key) override { - DeleteRange(nullptr, begin_key, end_key); + Status DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key, + const Slice& end_key) override; + Status DeleteRange(const Slice& begin_key, const Slice& end_key) override { + return DeleteRange(nullptr, begin_key, end_key); } // variant that takes SliceParts - void DeleteRange(ColumnFamilyHandle* column_family, - const SliceParts& begin_key, - const SliceParts& end_key) override; - void DeleteRange(const SliceParts& begin_key, - const SliceParts& end_key) override { - DeleteRange(nullptr, begin_key, end_key); + Status DeleteRange(ColumnFamilyHandle* column_family, + const SliceParts& begin_key, + const SliceParts& end_key) override; + Status DeleteRange(const SliceParts& begin_key, + const SliceParts& end_key) override { + return DeleteRange(nullptr, begin_key, end_key); } using WriteBatchBase::Merge; // Merge "value" with the existing value of "key" in the database. // "key->merge(existing, value)" - void Merge(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override; - void Merge(const Slice& key, const Slice& value) override { - Merge(nullptr, key, value); + Status Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override; + Status Merge(const Slice& key, const Slice& value) override { + return Merge(nullptr, key, value); } // variant that takes SliceParts - void Merge(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value) override; - void Merge(const SliceParts& key, const SliceParts& value) override { - Merge(nullptr, key, value); + Status Merge(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value) override; + Status Merge(const SliceParts& key, const SliceParts& value) override { + return Merge(nullptr, key, value); } using WriteBatchBase::PutLogData; @@ -147,7 +149,7 @@ class WriteBatch : public WriteBatchBase { // // Example application: add timestamps to the transaction log for use in // replication. - void PutLogData(const Slice& blob) override; + Status PutLogData(const Slice& blob) override; using WriteBatchBase::Clear; // Clear all updates buffered in this batch. @@ -304,8 +306,11 @@ class WriteBatch : public WriteBatchBase { void MarkWalTerminationPoint(); const SavePoint& GetWalTerminationPoint() const { return wal_term_point_; } + void SetMaxBytes(size_t max_bytes) override { max_bytes_ = max_bytes; } + private: friend class WriteBatchInternal; + friend class LocalSavePoint; SavePoints* save_points_; // When sending a WriteBatch through WriteImpl we might want to @@ -319,6 +324,9 @@ class WriteBatch : public WriteBatchBase { // Performs deferred computation of content_flags if necessary uint32_t ComputeContentFlags() const; + // Maximum size of rep_. + size_t max_bytes_; + protected: std::string rep_; // See comment in write_batch.cc for the format of rep_ diff --git a/include/rocksdb/write_batch_base.h b/include/rocksdb/write_batch_base.h index a05dff0d9..9278f8670 100644 --- a/include/rocksdb/write_batch_base.h +++ b/include/rocksdb/write_batch_base.h @@ -8,6 +8,8 @@ #pragma once +#include + namespace rocksdb { class Slice; @@ -24,59 +26,61 @@ class WriteBatchBase { virtual ~WriteBatchBase() {} // Store the mapping "key->value" in the database. - virtual void Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) = 0; - virtual void Put(const Slice& key, const Slice& value) = 0; + virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) = 0; + virtual Status Put(const Slice& key, const Slice& value) = 0; // Variant of Put() that gathers output like writev(2). The key and value // that will be written to the database are concatentations of arrays of // slices. - virtual void Put(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value); - virtual void Put(const SliceParts& key, const SliceParts& value); + virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value); + virtual Status Put(const SliceParts& key, const SliceParts& value); // Merge "value" with the existing value of "key" in the database. // "key->merge(existing, value)" - virtual void Merge(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) = 0; - virtual void Merge(const Slice& key, const Slice& value) = 0; + virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) = 0; + virtual Status Merge(const Slice& key, const Slice& value) = 0; // variant that takes SliceParts - virtual void Merge(ColumnFamilyHandle* column_family, const SliceParts& key, - const SliceParts& value); - virtual void Merge(const SliceParts& key, const SliceParts& value); + virtual Status Merge(ColumnFamilyHandle* column_family, const SliceParts& key, + const SliceParts& value); + virtual Status Merge(const SliceParts& key, const SliceParts& value); // If the database contains a mapping for "key", erase it. Else do nothing. - virtual void Delete(ColumnFamilyHandle* column_family, const Slice& key) = 0; - virtual void Delete(const Slice& key) = 0; + virtual Status Delete(ColumnFamilyHandle* column_family, + const Slice& key) = 0; + virtual Status Delete(const Slice& key) = 0; // variant that takes SliceParts - virtual void Delete(ColumnFamilyHandle* column_family, const SliceParts& key); - virtual void Delete(const SliceParts& key); + virtual Status Delete(ColumnFamilyHandle* column_family, + const SliceParts& key); + virtual Status Delete(const SliceParts& key); // If the database contains a mapping for "key", erase it. Expects that the // key was not overwritten. Else do nothing. - virtual void SingleDelete(ColumnFamilyHandle* column_family, - const Slice& key) = 0; - virtual void SingleDelete(const Slice& key) = 0; + virtual Status SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) = 0; + virtual Status SingleDelete(const Slice& key) = 0; // variant that takes SliceParts - virtual void SingleDelete(ColumnFamilyHandle* column_family, - const SliceParts& key); - virtual void SingleDelete(const SliceParts& key); + virtual Status SingleDelete(ColumnFamilyHandle* column_family, + const SliceParts& key); + virtual Status SingleDelete(const SliceParts& key); // If the database contains mappings in the range ["begin_key", "end_key"], // erase them. Else do nothing. - virtual void DeleteRange(ColumnFamilyHandle* column_family, - const Slice& begin_key, const Slice& end_key) = 0; - virtual void DeleteRange(const Slice& begin_key, const Slice& end_key) = 0; + virtual Status DeleteRange(ColumnFamilyHandle* column_family, + const Slice& begin_key, const Slice& end_key) = 0; + virtual Status DeleteRange(const Slice& begin_key, const Slice& end_key) = 0; // variant that takes SliceParts - virtual void DeleteRange(ColumnFamilyHandle* column_family, - const SliceParts& begin_key, - const SliceParts& end_key); - virtual void DeleteRange(const SliceParts& begin_key, - const SliceParts& end_key); + virtual Status DeleteRange(ColumnFamilyHandle* column_family, + const SliceParts& begin_key, + const SliceParts& end_key); + virtual Status DeleteRange(const SliceParts& begin_key, + const SliceParts& end_key); // Append a blob of arbitrary size to the records in this batch. The blob will // be stored in the transaction log but not in any other file. In particular, @@ -88,7 +92,7 @@ class WriteBatchBase { // // Example application: add timestamps to the transaction log for use in // replication. - virtual void PutLogData(const Slice& blob) = 0; + virtual Status PutLogData(const Slice& blob) = 0; // Clear all updates buffered in this batch. virtual void Clear() = 0; @@ -107,6 +111,9 @@ class WriteBatchBase { // If there is no previous call to SetSavePoint(), behaves the same as // Clear(). virtual Status RollbackToSavePoint() = 0; + + // Sets the maximum size of the write batch in bytes. 0 means no limit. + virtual void SetMaxBytes(size_t max_bytes) = 0; }; } // namespace rocksdb diff --git a/util/status_message.cc b/util/status_message.cc index 01956f26c..89b89096d 100644 --- a/util/status_message.cc +++ b/util/status_message.cc @@ -14,7 +14,8 @@ const char* Status::msgs[] = { "Failed to acquire lock due to max_num_locks limit", // kLockLimit "No space left on device", // kNoSpace "Deadlock", // kDeadlock - "Stale file handle" // kStaleFile + "Stale file handle", // kStaleFile + "Memory limit reached" // kMemoryLimit }; } // namespace rocksdb diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 2312ced9e..ba4c3766a 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -23,7 +23,7 @@ TransactionBaseImpl::TransactionBaseImpl(DB* db, write_options_(write_options), cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())), start_time_(db_->GetEnv()->NowMicros()), - write_batch_(cmp_, 0, true), + write_batch_(cmp_, 0, true, 0), indexing_enabled_(true) { assert(dynamic_cast(db_) != nullptr); log_number_ = 0; @@ -262,8 +262,10 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { - GetBatchForWrite()->Put(column_family, key, value); - num_puts_++; + s = GetBatchForWrite()->Put(column_family, key, value); + if (s.ok()) { + num_puts_++; + } } return s; @@ -276,8 +278,10 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family, TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { - GetBatchForWrite()->Put(column_family, key, value); - num_puts_++; + s = GetBatchForWrite()->Put(column_family, key, value); + if (s.ok()) { + num_puts_++; + } } return s; @@ -289,8 +293,10 @@ Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family, TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { - GetBatchForWrite()->Merge(column_family, key, value); - num_merges_++; + s = GetBatchForWrite()->Merge(column_family, key, value); + if (s.ok()) { + num_merges_++; + } } return s; @@ -302,8 +308,10 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { - GetBatchForWrite()->Delete(column_family, key); - num_deletes_++; + s = GetBatchForWrite()->Delete(column_family, key); + if (s.ok()) { + num_deletes_++; + } } return s; @@ -315,8 +323,10 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family, TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { - GetBatchForWrite()->Delete(column_family, key); - num_deletes_++; + s = GetBatchForWrite()->Delete(column_family, key); + if (s.ok()) { + num_deletes_++; + } } return s; @@ -328,8 +338,10 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { - GetBatchForWrite()->SingleDelete(column_family, key); - num_deletes_++; + s = GetBatchForWrite()->SingleDelete(column_family, key); + if (s.ok()) { + num_deletes_++; + } } return s; @@ -341,8 +353,10 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family, TryLock(column_family, key, false /* read_only */, true /* exclusive */); if (s.ok()) { - GetBatchForWrite()->SingleDelete(column_family, key); - num_deletes_++; + s = GetBatchForWrite()->SingleDelete(column_family, key); + if (s.ok()) { + num_deletes_++; + } } return s; @@ -354,8 +368,10 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, true /* exclusive */, true /* untracked */); if (s.ok()) { - GetBatchForWrite()->Put(column_family, key, value); - num_puts_++; + s = GetBatchForWrite()->Put(column_family, key, value); + if (s.ok()) { + num_puts_++; + } } return s; @@ -368,8 +384,10 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family, true /* exclusive */, true /* untracked */); if (s.ok()) { - GetBatchForWrite()->Put(column_family, key, value); - num_puts_++; + s = GetBatchForWrite()->Put(column_family, key, value); + if (s.ok()) { + num_puts_++; + } } return s; @@ -382,8 +400,10 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family, true /* exclusive */, true /* untracked */); if (s.ok()) { - GetBatchForWrite()->Merge(column_family, key, value); - num_merges_++; + s = GetBatchForWrite()->Merge(column_family, key, value); + if (s.ok()) { + num_merges_++; + } } return s; @@ -395,8 +415,10 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, true /* exclusive */, true /* untracked */); if (s.ok()) { - GetBatchForWrite()->Delete(column_family, key); - num_deletes_++; + s = GetBatchForWrite()->Delete(column_family, key); + if (s.ok()) { + num_deletes_++; + } } return s; @@ -408,8 +430,10 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family, true /* exclusive */, true /* untracked */); if (s.ok()) { - GetBatchForWrite()->Delete(column_family, key); - num_deletes_++; + s = GetBatchForWrite()->Delete(column_family, key); + if (s.ok()) { + num_deletes_++; + } } return s; diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 29fd26af8..dd045218e 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -284,10 +284,10 @@ class TransactionBaseImpl : public Transaction { num_merges_(num_merges) {} }; - private: // Records writes pending in this transaction WriteBatchWithIndex write_batch_; + private: // batch to be written at commit time WriteBatch commit_time_batch_; diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index 2ee775306..8a57c9684 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -60,6 +60,7 @@ void TransactionImpl::Initialize(const TransactionOptions& txn_options) { deadlock_detect_ = txn_options.deadlock_detect; deadlock_detect_depth_ = txn_options.deadlock_detect_depth; + write_batch_.SetMaxBytes(txn_options.max_write_batch_size); lock_timeout_ = txn_options.lock_timeout * 1000; if (lock_timeout_ < 0) { diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 2138d48a4..626b8acb4 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -4471,6 +4471,35 @@ TEST_P(TransactionTest, TransactionStressTest) { ASSERT_OK(s); } +TEST_P(TransactionTest, MemoryLimitTest) { + TransactionOptions txn_options; + // Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data. + txn_options.max_write_batch_size = 29; + string value; + Status s; + + Transaction* txn = db->BeginTransaction(WriteOptions(), txn_options); + ASSERT_TRUE(txn); + + ASSERT_EQ(0, txn->GetNumPuts()); + ASSERT_LE(0, txn->GetID()); + + s = txn->Put(Slice("a"), Slice("....")); + ASSERT_OK(s); + ASSERT_EQ(1, txn->GetNumPuts()); + + s = txn->Put(Slice("b"), Slice("....")); + ASSERT_OK(s); + ASSERT_EQ(2, txn->GetNumPuts()); + + s = txn->Put(Slice("b"), Slice("....")); + ASSERT_TRUE(s.IsMemoryLimit()); + ASSERT_EQ(2, txn->GetNumPuts()); + + txn->Rollback(); + delete txn; +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 43aee5bb9..0926c8468 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -386,8 +386,8 @@ class WBWIIteratorImpl : public WBWIIterator { struct WriteBatchWithIndex::Rep { Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, - bool _overwrite_key = false) - : write_batch(reserved_bytes), + size_t max_bytes = 0, bool _overwrite_key = false) + : write_batch(reserved_bytes, max_bytes), comparator(index_comparator, &write_batch), skip_list(comparator, &arena), overwrite_key(_overwrite_key), @@ -565,17 +565,18 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { return s; } -WriteBatchWithIndex::WriteBatchWithIndex( - const Comparator* default_index_comparator, size_t reserved_bytes, - bool overwrite_key) - : rep(new Rep(default_index_comparator, reserved_bytes, overwrite_key)) {} + WriteBatchWithIndex::WriteBatchWithIndex( + const Comparator* default_index_comparator, size_t reserved_bytes, + bool overwrite_key, size_t max_bytes) + : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes, + overwrite_key)) {} -WriteBatchWithIndex::~WriteBatchWithIndex() {} + WriteBatchWithIndex::~WriteBatchWithIndex() {} -WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } + WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } -WBWIIterator* WriteBatchWithIndex::NewIterator() { - return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch); + WBWIIterator* WriteBatchWithIndex::NewIterator() { + return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch); } WBWIIterator* WriteBatchWithIndex::NewIterator( @@ -604,75 +605,105 @@ Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) { rep->comparator.default_comparator()); } -void WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { +Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); - rep->write_batch.Put(column_family, key, value); - rep->AddOrUpdateIndex(column_family, key); + auto s = rep->write_batch.Put(column_family, key, value); + if (s.ok()) { + rep->AddOrUpdateIndex(column_family, key); + } + return s; } -void WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { +Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); - rep->write_batch.Put(key, value); - rep->AddOrUpdateIndex(key); + auto s = rep->write_batch.Put(key, value); + if (s.ok()) { + rep->AddOrUpdateIndex(key); + } + return s; } -void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, - const Slice& key) { +Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, + const Slice& key) { rep->SetLastEntryOffset(); - rep->write_batch.Delete(column_family, key); - rep->AddOrUpdateIndex(column_family, key); + auto s = rep->write_batch.Delete(column_family, key); + if (s.ok()) { + rep->AddOrUpdateIndex(column_family, key); + } + return s; } -void WriteBatchWithIndex::Delete(const Slice& key) { +Status WriteBatchWithIndex::Delete(const Slice& key) { rep->SetLastEntryOffset(); - rep->write_batch.Delete(key); - rep->AddOrUpdateIndex(key); + auto s = rep->write_batch.Delete(key); + if (s.ok()) { + rep->AddOrUpdateIndex(key); + } + return s; } -void WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family, - const Slice& key) { +Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family, + const Slice& key) { rep->SetLastEntryOffset(); - rep->write_batch.SingleDelete(column_family, key); - rep->AddOrUpdateIndex(column_family, key); + auto s = rep->write_batch.SingleDelete(column_family, key); + if (s.ok()) { + rep->AddOrUpdateIndex(column_family, key); + } + return s; } -void WriteBatchWithIndex::SingleDelete(const Slice& key) { +Status WriteBatchWithIndex::SingleDelete(const Slice& key) { rep->SetLastEntryOffset(); - rep->write_batch.SingleDelete(key); - rep->AddOrUpdateIndex(key); + auto s = rep->write_batch.SingleDelete(key); + if (s.ok()) { + rep->AddOrUpdateIndex(key); + } + return s; } -void WriteBatchWithIndex::DeleteRange(ColumnFamilyHandle* column_family, - const Slice& begin_key, - const Slice& end_key) { +Status WriteBatchWithIndex::DeleteRange(ColumnFamilyHandle* column_family, + const Slice& begin_key, + const Slice& end_key) { rep->SetLastEntryOffset(); - rep->write_batch.DeleteRange(column_family, begin_key, end_key); - rep->AddOrUpdateIndex(column_family, begin_key); + auto s = rep->write_batch.DeleteRange(column_family, begin_key, end_key); + if (s.ok()) { + rep->AddOrUpdateIndex(column_family, begin_key); + } + return s; } -void WriteBatchWithIndex::DeleteRange(const Slice& begin_key, - const Slice& end_key) { +Status WriteBatchWithIndex::DeleteRange(const Slice& begin_key, + const Slice& end_key) { rep->SetLastEntryOffset(); - rep->write_batch.DeleteRange(begin_key, end_key); - rep->AddOrUpdateIndex(begin_key); + auto s = rep->write_batch.DeleteRange(begin_key, end_key); + if (s.ok()) { + rep->AddOrUpdateIndex(begin_key); + } + return s; } -void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) { +Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); - rep->write_batch.Merge(column_family, key, value); - rep->AddOrUpdateIndex(column_family, key); + auto s = rep->write_batch.Merge(column_family, key, value); + if (s.ok()) { + rep->AddOrUpdateIndex(column_family, key); + } + return s; } -void WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { +Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); - rep->write_batch.Merge(key, value); - rep->AddOrUpdateIndex(key); + auto s = rep->write_batch.Merge(key, value); + if (s.ok()) { + rep->AddOrUpdateIndex(key); + } + return s; } -void WriteBatchWithIndex::PutLogData(const Slice& blob) { - rep->write_batch.PutLogData(blob); +Status WriteBatchWithIndex::PutLogData(const Slice& blob) { + return rep->write_batch.PutLogData(blob); } void WriteBatchWithIndex::Clear() { rep->Clear(); } @@ -799,5 +830,9 @@ Status WriteBatchWithIndex::RollbackToSavePoint() { return s; } +void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) { + rep->write_batch.SetMaxBytes(max_bytes); +} + } // namespace rocksdb #endif // !ROCKSDB_LITE diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h index 069d3655c..f8bc5869a 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.h +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -54,8 +54,8 @@ struct WriteBatchIndexEntry { class ReadableWriteBatch : public WriteBatch { public: - explicit ReadableWriteBatch(size_t reserved_bytes = 0) - : WriteBatch(reserved_bytes) {} + explicit ReadableWriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0) + : WriteBatch(reserved_bytes, max_bytes) {} // Retrieve some information from a write entry in the write batch, given // the start offset of the write entry. Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key,