Limit maximum memory used in the WriteBatch representation

Summary:
Extend TransactionOptions to include max_write_batch_size which determines the maximum size of the writebatch representation. If memory limit is exceeded, the operation will abort with subcode kMemoryLimit.
Closes https://github.com/facebook/rocksdb/pull/2124

Differential Revision: D4861842

Pulled By: lth

fbshipit-source-id: 46fd172ea67cc90bbba829bf0d70cfab2261c161
main
Manuel Ung 8 years ago committed by Facebook Github Bot
parent 97ec8a1349
commit 1f8b119ed6
  1. 173
      db/write_batch.cc
  2. 55
      db/write_batch_base.cc
  3. 95
      db/write_batch_internal.h
  4. 12
      db/write_batch_test.cc
  5. 13
      include/rocksdb/status.h
  6. 3
      include/rocksdb/utilities/transaction_db.h
  7. 37
      include/rocksdb/utilities/write_batch_with_index.h
  8. 88
      include/rocksdb/write_batch.h
  9. 69
      include/rocksdb/write_batch_base.h
  10. 3
      util/status_message.cc
  11. 74
      utilities/transactions/transaction_base.cc
  12. 2
      utilities/transactions/transaction_base.h
  13. 1
      utilities/transactions/transaction_impl.cc
  14. 29
      utilities/transactions/transaction_test.cc
  15. 133
      utilities/write_batch_with_index/write_batch_with_index.cc
  16. 4
      utilities/write_batch_with_index/write_batch_with_index_internal.h

@ -124,8 +124,8 @@ struct SavePoints {
std::stack<SavePoint> stack; std::stack<SavePoint> stack;
}; };
WriteBatch::WriteBatch(size_t reserved_bytes) WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
: save_points_(nullptr), content_flags_(0), rep_() { : save_points_(nullptr), content_flags_(0), max_bytes_(max_bytes), rep_() {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ? rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
reserved_bytes : WriteBatchInternal::kHeader); reserved_bytes : WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader); rep_.resize(WriteBatchInternal::kHeader);
@ -134,18 +134,21 @@ WriteBatch::WriteBatch(size_t reserved_bytes)
WriteBatch::WriteBatch(const std::string& rep) WriteBatch::WriteBatch(const std::string& rep)
: save_points_(nullptr), : save_points_(nullptr),
content_flags_(ContentFlags::DEFERRED), content_flags_(ContentFlags::DEFERRED),
max_bytes_(0),
rep_(rep) {} rep_(rep) {}
WriteBatch::WriteBatch(const WriteBatch& src) WriteBatch::WriteBatch(const WriteBatch& src)
: save_points_(src.save_points_), : save_points_(src.save_points_),
wal_term_point_(src.wal_term_point_), wal_term_point_(src.wal_term_point_),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)), content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
max_bytes_(src.max_bytes_),
rep_(src.rep_) {} rep_(src.rep_) {}
WriteBatch::WriteBatch(WriteBatch&& src) WriteBatch::WriteBatch(WriteBatch&& src)
: save_points_(std::move(src.save_points_)), : save_points_(std::move(src.save_points_)),
wal_term_point_(std::move(src.wal_term_point_)), wal_term_point_(std::move(src.wal_term_point_)),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)), content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
max_bytes_(src.max_bytes_),
rep_(std::move(src.rep_)) {} rep_(std::move(src.rep_)) {}
WriteBatch& WriteBatch::operator=(const WriteBatch& src) { WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
@ -470,8 +473,9 @@ size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) {
return WriteBatchInternal::kHeader; return WriteBatchInternal::kHeader;
} }
void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) { const Slice& key, const Slice& value) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeValue)); b->rep_.push_back(static_cast<char>(kTypeValue));
@ -484,15 +488,18 @@ void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
b->content_flags_.store( b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
std::memory_order_relaxed); std::memory_order_relaxed);
return save.commit();
} }
void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) { const Slice& value) {
WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value); return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
value);
} }
void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value) { const SliceParts& key, const SliceParts& value) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeValue)); b->rep_.push_back(static_cast<char>(kTypeValue));
@ -505,18 +512,21 @@ void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
b->content_flags_.store( b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
std::memory_order_relaxed); std::memory_order_relaxed);
return save.commit();
} }
void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) { const SliceParts& value) {
WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value); return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
value);
} }
void WriteBatchInternal::InsertNoop(WriteBatch* b) { Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
b->rep_.push_back(static_cast<char>(kTypeNoop)); b->rep_.push_back(static_cast<char>(kTypeNoop));
return Status::OK();
} }
void WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) { Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) {
// a manually constructed batch can only contain one prepare section // a manually constructed batch can only contain one prepare section
assert(b->rep_[12] == static_cast<char>(kTypeNoop)); assert(b->rep_[12] == static_cast<char>(kTypeNoop));
@ -535,26 +545,30 @@ void WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) {
ContentFlags::HAS_END_PREPARE | ContentFlags::HAS_END_PREPARE |
ContentFlags::HAS_BEGIN_PREPARE, ContentFlags::HAS_BEGIN_PREPARE,
std::memory_order_relaxed); std::memory_order_relaxed);
return Status::OK();
} }
void WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) { Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
b->rep_.push_back(static_cast<char>(kTypeCommitXID)); b->rep_.push_back(static_cast<char>(kTypeCommitXID));
PutLengthPrefixedSlice(&b->rep_, xid); PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_COMMIT, ContentFlags::HAS_COMMIT,
std::memory_order_relaxed); std::memory_order_relaxed);
return Status::OK();
} }
void WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) { Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
b->rep_.push_back(static_cast<char>(kTypeRollbackXID)); b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
PutLengthPrefixedSlice(&b->rep_, xid); PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_ROLLBACK, ContentFlags::HAS_ROLLBACK,
std::memory_order_relaxed); std::memory_order_relaxed);
return Status::OK();
} }
void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
const Slice& key) { const Slice& key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeDeletion)); b->rep_.push_back(static_cast<char>(kTypeDeletion));
@ -566,14 +580,17 @@ void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE, ContentFlags::HAS_DELETE,
std::memory_order_relaxed); std::memory_order_relaxed);
return save.commit();
} }
void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key); return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
key);
} }
void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key) { const SliceParts& key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeDeletion)); b->rep_.push_back(static_cast<char>(kTypeDeletion));
@ -585,15 +602,19 @@ void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE, ContentFlags::HAS_DELETE,
std::memory_order_relaxed); std::memory_order_relaxed);
return save.commit();
} }
void WriteBatch::Delete(ColumnFamilyHandle* column_family, Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) { const SliceParts& key) {
WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key); return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
key);
} }
void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, Status WriteBatchInternal::SingleDelete(WriteBatch* b,
const Slice& key) { uint32_t column_family_id,
const Slice& key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeSingleDeletion)); b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
@ -605,15 +626,19 @@ void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id,
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_SINGLE_DELETE, ContentFlags::HAS_SINGLE_DELETE,
std::memory_order_relaxed); std::memory_order_relaxed);
return save.commit();
} }
void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) { const Slice& key) {
WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key); return WriteBatchInternal::SingleDelete(
this, GetColumnFamilyID(column_family), key);
} }
void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id, Status WriteBatchInternal::SingleDelete(WriteBatch* b,
const SliceParts& key) { uint32_t column_family_id,
const SliceParts& key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeSingleDeletion)); b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
@ -625,16 +650,19 @@ void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id,
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_SINGLE_DELETE, ContentFlags::HAS_SINGLE_DELETE,
std::memory_order_relaxed); std::memory_order_relaxed);
return save.commit();
} }
void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) { const SliceParts& key) {
WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key); return WriteBatchInternal::SingleDelete(
this, GetColumnFamilyID(column_family), key);
} }
void WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
const Slice& begin_key, const Slice& begin_key,
const Slice& end_key) { const Slice& end_key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeRangeDeletion)); b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
@ -647,17 +675,19 @@ void WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE_RANGE, ContentFlags::HAS_DELETE_RANGE,
std::memory_order_relaxed); std::memory_order_relaxed);
return save.commit();
} }
void WriteBatch::DeleteRange(ColumnFamilyHandle* column_family, Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key) { const Slice& begin_key, const Slice& end_key) {
WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family), return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
begin_key, end_key); begin_key, end_key);
} }
void WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id, Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
const SliceParts& begin_key, const SliceParts& begin_key,
const SliceParts& end_key) { const SliceParts& end_key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeRangeDeletion)); b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
@ -670,17 +700,19 @@ void WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE_RANGE, ContentFlags::HAS_DELETE_RANGE,
std::memory_order_relaxed); std::memory_order_relaxed);
return save.commit();
} }
void WriteBatch::DeleteRange(ColumnFamilyHandle* column_family, Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
const SliceParts& begin_key, const SliceParts& begin_key,
const SliceParts& end_key) { const SliceParts& end_key) {
WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family), return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
begin_key, end_key); begin_key, end_key);
} }
void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) { const Slice& key, const Slice& value) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeMerge)); b->rep_.push_back(static_cast<char>(kTypeMerge));
@ -693,16 +725,19 @@ void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_MERGE, ContentFlags::HAS_MERGE,
std::memory_order_relaxed); std::memory_order_relaxed);
return save.commit();
} }
void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) { const Slice& value) {
WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key, value); return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
value);
} }
void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key, const SliceParts& key,
const SliceParts& value) { const SliceParts& value) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeMerge)); b->rep_.push_back(static_cast<char>(kTypeMerge));
@ -715,18 +750,20 @@ void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_MERGE, ContentFlags::HAS_MERGE,
std::memory_order_relaxed); std::memory_order_relaxed);
return save.commit();
} }
void WriteBatch::Merge(ColumnFamilyHandle* column_family, Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
const SliceParts& key, const SliceParts& key, const SliceParts& value) {
const SliceParts& value) { return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), value);
key, value);
} }
void WriteBatch::PutLogData(const Slice& blob) { Status WriteBatch::PutLogData(const Slice& blob) {
LocalSavePoint save(this);
rep_.push_back(static_cast<char>(kTypeLogData)); rep_.push_back(static_cast<char>(kTypeLogData));
PutLengthPrefixedSlice(&rep_, blob); PutLengthPrefixedSlice(&rep_, blob);
return save.commit();
} }
void WriteBatch::SetSavePoint() { void WriteBatch::SetSavePoint() {
@ -1300,14 +1337,15 @@ Status WriteBatchInternal::InsertInto(
return s; return s;
} }
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= WriteBatchInternal::kHeader); assert(contents.size() >= WriteBatchInternal::kHeader);
b->rep_.assign(contents.data(), contents.size()); b->rep_.assign(contents.data(), contents.size());
b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
return Status::OK();
} }
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src, Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
const bool wal_only) { const bool wal_only) {
size_t src_len; size_t src_len;
int src_count; int src_count;
uint32_t src_flags; uint32_t src_flags;
@ -1330,6 +1368,7 @@ void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
dst->content_flags_.store( dst->content_flags_.store(
dst->content_flags_.load(std::memory_order_relaxed) | src_flags, dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
std::memory_order_relaxed); std::memory_order_relaxed);
return Status::OK();
} }
size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize, size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,

@ -8,86 +8,87 @@
#include <string> #include <string>
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/status.h"
namespace rocksdb { namespace rocksdb {
// Simple implementation of SlicePart variants of Put(). Child classes // Simple implementation of SlicePart variants of Put(). Child classes
// can override these method with more performant solutions if they choose. // can override these method with more performant solutions if they choose.
void WriteBatchBase::Put(ColumnFamilyHandle* column_family, Status WriteBatchBase::Put(ColumnFamilyHandle* column_family,
const SliceParts& key, const SliceParts& value) { const SliceParts& key, const SliceParts& value) {
std::string key_buf, value_buf; std::string key_buf, value_buf;
Slice key_slice(key, &key_buf); Slice key_slice(key, &key_buf);
Slice value_slice(value, &value_buf); Slice value_slice(value, &value_buf);
Put(column_family, key_slice, value_slice); return Put(column_family, key_slice, value_slice);
} }
void WriteBatchBase::Put(const SliceParts& key, const SliceParts& value) { Status WriteBatchBase::Put(const SliceParts& key, const SliceParts& value) {
std::string key_buf, value_buf; std::string key_buf, value_buf;
Slice key_slice(key, &key_buf); Slice key_slice(key, &key_buf);
Slice value_slice(value, &value_buf); Slice value_slice(value, &value_buf);
Put(key_slice, value_slice); return Put(key_slice, value_slice);
} }
void WriteBatchBase::Delete(ColumnFamilyHandle* column_family, Status WriteBatchBase::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) { const SliceParts& key) {
std::string key_buf; std::string key_buf;
Slice key_slice(key, &key_buf); Slice key_slice(key, &key_buf);
Delete(column_family, key_slice); return Delete(column_family, key_slice);
} }
void WriteBatchBase::Delete(const SliceParts& key) { Status WriteBatchBase::Delete(const SliceParts& key) {
std::string key_buf; std::string key_buf;
Slice key_slice(key, &key_buf); Slice key_slice(key, &key_buf);
Delete(key_slice); return Delete(key_slice);
} }
void WriteBatchBase::SingleDelete(ColumnFamilyHandle* column_family, Status WriteBatchBase::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) { const SliceParts& key) {
std::string key_buf; std::string key_buf;
Slice key_slice(key, &key_buf); Slice key_slice(key, &key_buf);
SingleDelete(column_family, key_slice); return SingleDelete(column_family, key_slice);
} }
void WriteBatchBase::SingleDelete(const SliceParts& key) { Status WriteBatchBase::SingleDelete(const SliceParts& key) {
std::string key_buf; std::string key_buf;
Slice key_slice(key, &key_buf); Slice key_slice(key, &key_buf);
SingleDelete(key_slice); return SingleDelete(key_slice);
} }
void WriteBatchBase::DeleteRange(ColumnFamilyHandle* column_family, Status WriteBatchBase::DeleteRange(ColumnFamilyHandle* column_family,
const SliceParts& begin_key, const SliceParts& begin_key,
const SliceParts& end_key) { const SliceParts& end_key) {
std::string begin_key_buf, end_key_buf; std::string begin_key_buf, end_key_buf;
Slice begin_key_slice(begin_key, &begin_key_buf); Slice begin_key_slice(begin_key, &begin_key_buf);
Slice end_key_slice(end_key, &end_key_buf); Slice end_key_slice(end_key, &end_key_buf);
DeleteRange(column_family, begin_key_slice, end_key_slice); return DeleteRange(column_family, begin_key_slice, end_key_slice);
} }
void WriteBatchBase::DeleteRange(const SliceParts& begin_key, Status WriteBatchBase::DeleteRange(const SliceParts& begin_key,
const SliceParts& end_key) { const SliceParts& end_key) {
std::string begin_key_buf, end_key_buf; std::string begin_key_buf, end_key_buf;
Slice begin_key_slice(begin_key, &begin_key_buf); Slice begin_key_slice(begin_key, &begin_key_buf);
Slice end_key_slice(end_key, &end_key_buf); Slice end_key_slice(end_key, &end_key_buf);
DeleteRange(begin_key_slice, end_key_slice); return DeleteRange(begin_key_slice, end_key_slice);
} }
void WriteBatchBase::Merge(ColumnFamilyHandle* column_family, Status WriteBatchBase::Merge(ColumnFamilyHandle* column_family,
const SliceParts& key, const SliceParts& value) { const SliceParts& key, const SliceParts& value) {
std::string key_buf, value_buf; std::string key_buf, value_buf;
Slice key_slice(key, &key_buf); Slice key_slice(key, &key_buf);
Slice value_slice(value, &value_buf); Slice value_slice(value, &value_buf);
Merge(column_family, key_slice, value_slice); return Merge(column_family, key_slice, value_slice);
} }
void WriteBatchBase::Merge(const SliceParts& key, const SliceParts& value) { Status WriteBatchBase::Merge(const SliceParts& key, const SliceParts& value) {
std::string key_buf, value_buf; std::string key_buf, value_buf;
Slice key_slice(key, &key_buf); Slice key_slice(key, &key_buf);
Slice value_slice(value, &value_buf); Slice value_slice(value, &value_buf);
Merge(key_slice, value_slice); return Merge(key_slice, value_slice);
} }
} // namespace rocksdb } // namespace rocksdb

@ -68,44 +68,44 @@ class WriteBatchInternal {
static const size_t kHeader = 12; static const size_t kHeader = 12;
// WriteBatch methods with column_family_id instead of ColumnFamilyHandle* // WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
static void Put(WriteBatch* batch, uint32_t column_family_id, static Status Put(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value); const Slice& key, const Slice& value);
static void Put(WriteBatch* batch, uint32_t column_family_id, static Status Put(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value); const SliceParts& key, const SliceParts& value);
static void Delete(WriteBatch* batch, uint32_t column_family_id, static Status Delete(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key); const SliceParts& key);
static void Delete(WriteBatch* batch, uint32_t column_family_id, static Status Delete(WriteBatch* batch, uint32_t column_family_id,
const Slice& key); const Slice& key);
static void SingleDelete(WriteBatch* batch, uint32_t column_family_id, static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key); const SliceParts& key);
static void SingleDelete(WriteBatch* batch, uint32_t column_family_id, static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
const Slice& key); const Slice& key);
static void DeleteRange(WriteBatch* b, uint32_t column_family_id, static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
const Slice& begin_key, const Slice& end_key); const Slice& begin_key, const Slice& end_key);
static void DeleteRange(WriteBatch* b, uint32_t column_family_id, static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
const SliceParts& begin_key, const SliceParts& begin_key,
const SliceParts& end_key); const SliceParts& end_key);
static void Merge(WriteBatch* batch, uint32_t column_family_id, static Status Merge(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value); const Slice& key, const Slice& value);
static void Merge(WriteBatch* batch, uint32_t column_family_id, static Status Merge(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value); const SliceParts& key, const SliceParts& value);
static void MarkEndPrepare(WriteBatch* batch, const Slice& xid); static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid);
static void MarkRollback(WriteBatch* batch, const Slice& xid); static Status MarkRollback(WriteBatch* batch, const Slice& xid);
static void MarkCommit(WriteBatch* batch, const Slice& xid); static Status MarkCommit(WriteBatch* batch, const Slice& xid);
static void InsertNoop(WriteBatch* batch); static Status InsertNoop(WriteBatch* batch);
// Return the number of entries in the batch. // Return the number of entries in the batch.
static int Count(const WriteBatch* batch); static int Count(const WriteBatch* batch);
@ -132,7 +132,7 @@ class WriteBatchInternal {
return batch->rep_.size(); return batch->rep_.size();
} }
static void SetContents(WriteBatch* batch, const Slice& contents); static Status SetContents(WriteBatch* batch, const Slice& contents);
// Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive. // Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive.
// //
@ -177,12 +177,51 @@ class WriteBatchInternal {
uint64_t log_number = 0, DB* db = nullptr, uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false); bool concurrent_memtable_writes = false);
static void Append(WriteBatch* dst, const WriteBatch* src, static Status Append(WriteBatch* dst, const WriteBatch* src,
const bool WAL_only = false); const bool WAL_only = false);
// Returns the byte size of appending a WriteBatch with ByteSize // Returns the byte size of appending a WriteBatch with ByteSize
// leftByteSize and a WriteBatch with ByteSize rightByteSize // leftByteSize and a WriteBatch with ByteSize rightByteSize
static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize); static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize);
}; };
// LocalSavePoint is similar to a scope guard
class LocalSavePoint {
public:
explicit LocalSavePoint(WriteBatch* batch)
: batch_(batch),
savepoint_(batch->GetDataSize(), batch->Count(),
batch->content_flags_.load(std::memory_order_relaxed))
#ifndef NDEBUG
,
committed_(false)
#endif
{
}
#ifndef NDEBUG
~LocalSavePoint() { assert(committed_); }
#endif
Status commit() {
#ifndef NDEBUG
committed_ = true;
#endif
if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) {
batch_->rep_.resize(savepoint_.size);
WriteBatchInternal::SetCount(batch_, savepoint_.count);
batch_->content_flags_.store(savepoint_.content_flags,
std::memory_order_relaxed);
return Status::MemoryLimit();
}
return Status::OK();
}
private:
WriteBatch* batch_;
SavePoint savepoint_;
#ifndef NDEBUG
bool committed_;
#endif
};
} // namespace rocksdb } // namespace rocksdb

@ -861,6 +861,18 @@ TEST_F(WriteBatchTest, SavePointTest) {
ASSERT_EQ("", PrintContents(&batch2)); ASSERT_EQ("", PrintContents(&batch2));
} }
TEST_F(WriteBatchTest, MemoryLimitTest) {
Status s;
// The header size is 12 bytes. The two Puts take 8 bytes which gives total
// of 12 + 8 * 2 = 28 bytes.
WriteBatch batch(0, 28);
ASSERT_OK(batch.Put("a", "...."));
ASSERT_OK(batch.Put("b", "...."));
s = batch.Put("c", "....");
ASSERT_TRUE(s.IsMemoryLimit());
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -71,6 +71,7 @@ class Status {
kNoSpace = 4, kNoSpace = 4,
kDeadlock = 5, kDeadlock = 5,
kStaleFile = 6, kStaleFile = 6,
kMemoryLimit = 7,
kMaxSubCode kMaxSubCode
}; };
@ -166,6 +167,11 @@ class Status {
return Status(kIOError, kNoSpace, msg, msg2); return Status(kIOError, kNoSpace, msg, msg2);
} }
static Status MemoryLimit() { return Status(kAborted, kMemoryLimit); }
static Status MemoryLimit(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kAborted, kMemoryLimit, msg, msg2);
}
// Returns true iff the status indicates success. // Returns true iff the status indicates success.
bool ok() const { return code() == kOk; } bool ok() const { return code() == kOk; }
@ -224,6 +230,13 @@ class Status {
return (code() == kIOError) && (subcode() == kNoSpace); return (code() == kIOError) && (subcode() == kNoSpace);
} }
// Returns true iff the status indicates a memory limit error. There may be
// cases where we limit the memory used in certain operations (eg. the size
// of a write batch) in order to avoid out of memory exceptions.
bool IsMemoryLimit() const {
return (code() == kAborted) && (subcode() == kMemoryLimit);
}
// Return a string representation of this status suitable for printing. // Return a string representation of this status suitable for printing.
// Returns the string "OK" for success. // Returns the string "OK" for success.
std::string ToString() const; std::string ToString() const;

@ -100,6 +100,9 @@ struct TransactionOptions {
// The number of traversals to make during deadlock detection. // The number of traversals to make during deadlock detection.
int64_t deadlock_detect_depth = 50; int64_t deadlock_detect_depth = 50;
// The maximum number of bytes used for the write batch. 0 means no limit.
size_t max_write_batch_size = 0;
}; };
struct KeyLockInfo { struct KeyLockInfo {

@ -88,42 +88,45 @@ class WriteBatchWithIndex : public WriteBatchBase {
// interface, or we can't find a column family from the column family handle // interface, or we can't find a column family from the column family handle
// passed in, backup_index_comparator will be used for the column family. // passed in, backup_index_comparator will be used for the column family.
// reserved_bytes: reserved bytes in underlying WriteBatch // reserved_bytes: reserved bytes in underlying WriteBatch
// max_bytes: maximum size of underlying WriteBatch in bytes
// overwrite_key: if true, overwrite the key in the index when inserting // overwrite_key: if true, overwrite the key in the index when inserting
// the same key as previously, so iterator will never // the same key as previously, so iterator will never
// show two entries with the same key. // show two entries with the same key.
explicit WriteBatchWithIndex( explicit WriteBatchWithIndex(
const Comparator* backup_index_comparator = BytewiseComparator(), const Comparator* backup_index_comparator = BytewiseComparator(),
size_t reserved_bytes = 0, bool overwrite_key = false); size_t reserved_bytes = 0, bool overwrite_key = false,
size_t max_bytes = 0);
virtual ~WriteBatchWithIndex(); virtual ~WriteBatchWithIndex();
using WriteBatchBase::Put; using WriteBatchBase::Put;
void Put(ColumnFamilyHandle* column_family, const Slice& key, Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override; const Slice& value) override;
void Put(const Slice& key, const Slice& value) override; Status Put(const Slice& key, const Slice& value) override;
using WriteBatchBase::Merge; using WriteBatchBase::Merge;
void Merge(ColumnFamilyHandle* column_family, const Slice& key, Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override; const Slice& value) override;
void Merge(const Slice& key, const Slice& value) override; Status Merge(const Slice& key, const Slice& value) override;
using WriteBatchBase::Delete; using WriteBatchBase::Delete;
void Delete(ColumnFamilyHandle* column_family, const Slice& key) override; Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
void Delete(const Slice& key) override; Status Delete(const Slice& key) override;
using WriteBatchBase::SingleDelete; using WriteBatchBase::SingleDelete;
void SingleDelete(ColumnFamilyHandle* column_family, Status SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) override; const Slice& key) override;
void SingleDelete(const Slice& key) override; Status SingleDelete(const Slice& key) override;
using WriteBatchBase::DeleteRange; using WriteBatchBase::DeleteRange;
void DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key, Status DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key,
const Slice& end_key) override; const Slice& end_key) override;
void DeleteRange(const Slice& begin_key, const Slice& end_key) override; Status DeleteRange(const Slice& begin_key, const Slice& end_key) override;
using WriteBatchBase::PutLogData; using WriteBatchBase::PutLogData;
void PutLogData(const Slice& blob) override; Status PutLogData(const Slice& blob) override;
using WriteBatchBase::Clear; using WriteBatchBase::Clear;
void Clear() override; void Clear() override;
@ -204,6 +207,8 @@ class WriteBatchWithIndex : public WriteBatchBase {
// or other Status on corruption. // or other Status on corruption.
Status RollbackToSavePoint() override; Status RollbackToSavePoint() override;
void SetMaxBytes(size_t max_bytes) override;
private: private:
struct Rep; struct Rep;
std::unique_ptr<Rep> rep; std::unique_ptr<Rep> rep;

@ -60,80 +60,82 @@ struct SavePoint {
class WriteBatch : public WriteBatchBase { class WriteBatch : public WriteBatchBase {
public: public:
explicit WriteBatch(size_t reserved_bytes = 0); explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0);
~WriteBatch(); ~WriteBatch();
using WriteBatchBase::Put; using WriteBatchBase::Put;
// Store the mapping "key->value" in the database. // Store the mapping "key->value" in the database.
void Put(ColumnFamilyHandle* column_family, const Slice& key, Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override; const Slice& value) override;
void Put(const Slice& key, const Slice& value) override { Status Put(const Slice& key, const Slice& value) override {
Put(nullptr, key, value); return Put(nullptr, key, value);
} }
// Variant of Put() that gathers output like writev(2). The key and value // Variant of Put() that gathers output like writev(2). The key and value
// that will be written to the database are concatentations of arrays of // that will be written to the database are concatentations of arrays of
// slices. // slices.
void Put(ColumnFamilyHandle* column_family, const SliceParts& key, Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override; const SliceParts& value) override;
void Put(const SliceParts& key, const SliceParts& value) override { Status Put(const SliceParts& key, const SliceParts& value) override {
Put(nullptr, key, value); return Put(nullptr, key, value);
} }
using WriteBatchBase::Delete; using WriteBatchBase::Delete;
// If the database contains a mapping for "key", erase it. Else do nothing. // If the database contains a mapping for "key", erase it. Else do nothing.
void Delete(ColumnFamilyHandle* column_family, const Slice& key) override; Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
void Delete(const Slice& key) override { Delete(nullptr, key); } Status Delete(const Slice& key) override { return Delete(nullptr, key); }
// variant that takes SliceParts // variant that takes SliceParts
void Delete(ColumnFamilyHandle* column_family, Status Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) override; const SliceParts& key) override;
void Delete(const SliceParts& key) override { Delete(nullptr, key); } Status Delete(const SliceParts& key) override { return Delete(nullptr, key); }
using WriteBatchBase::SingleDelete; using WriteBatchBase::SingleDelete;
// WriteBatch implementation of DB::SingleDelete(). See db.h. // WriteBatch implementation of DB::SingleDelete(). See db.h.
void SingleDelete(ColumnFamilyHandle* column_family, Status SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) override; const Slice& key) override;
void SingleDelete(const Slice& key) override { SingleDelete(nullptr, key); } Status SingleDelete(const Slice& key) override {
return SingleDelete(nullptr, key);
}
// variant that takes SliceParts // variant that takes SliceParts
void SingleDelete(ColumnFamilyHandle* column_family, Status SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) override; const SliceParts& key) override;
void SingleDelete(const SliceParts& key) override { Status SingleDelete(const SliceParts& key) override {
SingleDelete(nullptr, key); return SingleDelete(nullptr, key);
} }
using WriteBatchBase::DeleteRange; using WriteBatchBase::DeleteRange;
// WriteBatch implementation of DB::DeleteRange(). See db.h. // WriteBatch implementation of DB::DeleteRange(). See db.h.
void DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key, Status DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key,
const Slice& end_key) override; const Slice& end_key) override;
void DeleteRange(const Slice& begin_key, const Slice& end_key) override { Status DeleteRange(const Slice& begin_key, const Slice& end_key) override {
DeleteRange(nullptr, begin_key, end_key); return DeleteRange(nullptr, begin_key, end_key);
} }
// variant that takes SliceParts // variant that takes SliceParts
void DeleteRange(ColumnFamilyHandle* column_family, Status DeleteRange(ColumnFamilyHandle* column_family,
const SliceParts& begin_key, const SliceParts& begin_key,
const SliceParts& end_key) override; const SliceParts& end_key) override;
void DeleteRange(const SliceParts& begin_key, Status DeleteRange(const SliceParts& begin_key,
const SliceParts& end_key) override { const SliceParts& end_key) override {
DeleteRange(nullptr, begin_key, end_key); return DeleteRange(nullptr, begin_key, end_key);
} }
using WriteBatchBase::Merge; using WriteBatchBase::Merge;
// Merge "value" with the existing value of "key" in the database. // Merge "value" with the existing value of "key" in the database.
// "key->merge(existing, value)" // "key->merge(existing, value)"
void Merge(ColumnFamilyHandle* column_family, const Slice& key, Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override; const Slice& value) override;
void Merge(const Slice& key, const Slice& value) override { Status Merge(const Slice& key, const Slice& value) override {
Merge(nullptr, key, value); return Merge(nullptr, key, value);
} }
// variant that takes SliceParts // variant that takes SliceParts
void Merge(ColumnFamilyHandle* column_family, const SliceParts& key, Status Merge(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override; const SliceParts& value) override;
void Merge(const SliceParts& key, const SliceParts& value) override { Status Merge(const SliceParts& key, const SliceParts& value) override {
Merge(nullptr, key, value); return Merge(nullptr, key, value);
} }
using WriteBatchBase::PutLogData; using WriteBatchBase::PutLogData;
@ -147,7 +149,7 @@ class WriteBatch : public WriteBatchBase {
// //
// Example application: add timestamps to the transaction log for use in // Example application: add timestamps to the transaction log for use in
// replication. // replication.
void PutLogData(const Slice& blob) override; Status PutLogData(const Slice& blob) override;
using WriteBatchBase::Clear; using WriteBatchBase::Clear;
// Clear all updates buffered in this batch. // Clear all updates buffered in this batch.
@ -304,8 +306,11 @@ class WriteBatch : public WriteBatchBase {
void MarkWalTerminationPoint(); void MarkWalTerminationPoint();
const SavePoint& GetWalTerminationPoint() const { return wal_term_point_; } const SavePoint& GetWalTerminationPoint() const { return wal_term_point_; }
void SetMaxBytes(size_t max_bytes) override { max_bytes_ = max_bytes; }
private: private:
friend class WriteBatchInternal; friend class WriteBatchInternal;
friend class LocalSavePoint;
SavePoints* save_points_; SavePoints* save_points_;
// When sending a WriteBatch through WriteImpl we might want to // When sending a WriteBatch through WriteImpl we might want to
@ -319,6 +324,9 @@ class WriteBatch : public WriteBatchBase {
// Performs deferred computation of content_flags if necessary // Performs deferred computation of content_flags if necessary
uint32_t ComputeContentFlags() const; uint32_t ComputeContentFlags() const;
// Maximum size of rep_.
size_t max_bytes_;
protected: protected:
std::string rep_; // See comment in write_batch.cc for the format of rep_ std::string rep_; // See comment in write_batch.cc for the format of rep_

@ -8,6 +8,8 @@
#pragma once #pragma once
#include <cstddef>
namespace rocksdb { namespace rocksdb {
class Slice; class Slice;
@ -24,59 +26,61 @@ class WriteBatchBase {
virtual ~WriteBatchBase() {} virtual ~WriteBatchBase() {}
// Store the mapping "key->value" in the database. // Store the mapping "key->value" in the database.
virtual void Put(ColumnFamilyHandle* column_family, const Slice& key, virtual Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0; const Slice& value) = 0;
virtual void Put(const Slice& key, const Slice& value) = 0; virtual Status Put(const Slice& key, const Slice& value) = 0;
// Variant of Put() that gathers output like writev(2). The key and value // Variant of Put() that gathers output like writev(2). The key and value
// that will be written to the database are concatentations of arrays of // that will be written to the database are concatentations of arrays of
// slices. // slices.
virtual void Put(ColumnFamilyHandle* column_family, const SliceParts& key, virtual Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value); const SliceParts& value);
virtual void Put(const SliceParts& key, const SliceParts& value); virtual Status Put(const SliceParts& key, const SliceParts& value);
// Merge "value" with the existing value of "key" in the database. // Merge "value" with the existing value of "key" in the database.
// "key->merge(existing, value)" // "key->merge(existing, value)"
virtual void Merge(ColumnFamilyHandle* column_family, const Slice& key, virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) = 0; const Slice& value) = 0;
virtual void Merge(const Slice& key, const Slice& value) = 0; virtual Status Merge(const Slice& key, const Slice& value) = 0;
// variant that takes SliceParts // variant that takes SliceParts
virtual void Merge(ColumnFamilyHandle* column_family, const SliceParts& key, virtual Status Merge(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value); const SliceParts& value);
virtual void Merge(const SliceParts& key, const SliceParts& value); virtual Status Merge(const SliceParts& key, const SliceParts& value);
// If the database contains a mapping for "key", erase it. Else do nothing. // If the database contains a mapping for "key", erase it. Else do nothing.
virtual void Delete(ColumnFamilyHandle* column_family, const Slice& key) = 0; virtual Status Delete(ColumnFamilyHandle* column_family,
virtual void Delete(const Slice& key) = 0; const Slice& key) = 0;
virtual Status Delete(const Slice& key) = 0;
// variant that takes SliceParts // variant that takes SliceParts
virtual void Delete(ColumnFamilyHandle* column_family, const SliceParts& key); virtual Status Delete(ColumnFamilyHandle* column_family,
virtual void Delete(const SliceParts& key); const SliceParts& key);
virtual Status Delete(const SliceParts& key);
// If the database contains a mapping for "key", erase it. Expects that the // If the database contains a mapping for "key", erase it. Expects that the
// key was not overwritten. Else do nothing. // key was not overwritten. Else do nothing.
virtual void SingleDelete(ColumnFamilyHandle* column_family, virtual Status SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) = 0; const Slice& key) = 0;
virtual void SingleDelete(const Slice& key) = 0; virtual Status SingleDelete(const Slice& key) = 0;
// variant that takes SliceParts // variant that takes SliceParts
virtual void SingleDelete(ColumnFamilyHandle* column_family, virtual Status SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key); const SliceParts& key);
virtual void SingleDelete(const SliceParts& key); virtual Status SingleDelete(const SliceParts& key);
// If the database contains mappings in the range ["begin_key", "end_key"], // If the database contains mappings in the range ["begin_key", "end_key"],
// erase them. Else do nothing. // erase them. Else do nothing.
virtual void DeleteRange(ColumnFamilyHandle* column_family, virtual Status DeleteRange(ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key) = 0; const Slice& begin_key, const Slice& end_key) = 0;
virtual void DeleteRange(const Slice& begin_key, const Slice& end_key) = 0; virtual Status DeleteRange(const Slice& begin_key, const Slice& end_key) = 0;
// variant that takes SliceParts // variant that takes SliceParts
virtual void DeleteRange(ColumnFamilyHandle* column_family, virtual Status DeleteRange(ColumnFamilyHandle* column_family,
const SliceParts& begin_key, const SliceParts& begin_key,
const SliceParts& end_key); const SliceParts& end_key);
virtual void DeleteRange(const SliceParts& begin_key, virtual Status DeleteRange(const SliceParts& begin_key,
const SliceParts& end_key); const SliceParts& end_key);
// Append a blob of arbitrary size to the records in this batch. The blob will // Append a blob of arbitrary size to the records in this batch. The blob will
// be stored in the transaction log but not in any other file. In particular, // be stored in the transaction log but not in any other file. In particular,
@ -88,7 +92,7 @@ class WriteBatchBase {
// //
// Example application: add timestamps to the transaction log for use in // Example application: add timestamps to the transaction log for use in
// replication. // replication.
virtual void PutLogData(const Slice& blob) = 0; virtual Status PutLogData(const Slice& blob) = 0;
// Clear all updates buffered in this batch. // Clear all updates buffered in this batch.
virtual void Clear() = 0; virtual void Clear() = 0;
@ -107,6 +111,9 @@ class WriteBatchBase {
// If there is no previous call to SetSavePoint(), behaves the same as // If there is no previous call to SetSavePoint(), behaves the same as
// Clear(). // Clear().
virtual Status RollbackToSavePoint() = 0; virtual Status RollbackToSavePoint() = 0;
// Sets the maximum size of the write batch in bytes. 0 means no limit.
virtual void SetMaxBytes(size_t max_bytes) = 0;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -14,7 +14,8 @@ const char* Status::msgs[] = {
"Failed to acquire lock due to max_num_locks limit", // kLockLimit "Failed to acquire lock due to max_num_locks limit", // kLockLimit
"No space left on device", // kNoSpace "No space left on device", // kNoSpace
"Deadlock", // kDeadlock "Deadlock", // kDeadlock
"Stale file handle" // kStaleFile "Stale file handle", // kStaleFile
"Memory limit reached" // kMemoryLimit
}; };
} // namespace rocksdb } // namespace rocksdb

@ -23,7 +23,7 @@ TransactionBaseImpl::TransactionBaseImpl(DB* db,
write_options_(write_options), write_options_(write_options),
cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())), cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
start_time_(db_->GetEnv()->NowMicros()), start_time_(db_->GetEnv()->NowMicros()),
write_batch_(cmp_, 0, true), write_batch_(cmp_, 0, true, 0),
indexing_enabled_(true) { indexing_enabled_(true) {
assert(dynamic_cast<DBImpl*>(db_) != nullptr); assert(dynamic_cast<DBImpl*>(db_) != nullptr);
log_number_ = 0; log_number_ = 0;
@ -262,8 +262,10 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
TryLock(column_family, key, false /* read_only */, true /* exclusive */); TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) { if (s.ok()) {
GetBatchForWrite()->Put(column_family, key, value); s = GetBatchForWrite()->Put(column_family, key, value);
num_puts_++; if (s.ok()) {
num_puts_++;
}
} }
return s; return s;
@ -276,8 +278,10 @@ Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
TryLock(column_family, key, false /* read_only */, true /* exclusive */); TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) { if (s.ok()) {
GetBatchForWrite()->Put(column_family, key, value); s = GetBatchForWrite()->Put(column_family, key, value);
num_puts_++; if (s.ok()) {
num_puts_++;
}
} }
return s; return s;
@ -289,8 +293,10 @@ Status TransactionBaseImpl::Merge(ColumnFamilyHandle* column_family,
TryLock(column_family, key, false /* read_only */, true /* exclusive */); TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) { if (s.ok()) {
GetBatchForWrite()->Merge(column_family, key, value); s = GetBatchForWrite()->Merge(column_family, key, value);
num_merges_++; if (s.ok()) {
num_merges_++;
}
} }
return s; return s;
@ -302,8 +308,10 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
TryLock(column_family, key, false /* read_only */, true /* exclusive */); TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) { if (s.ok()) {
GetBatchForWrite()->Delete(column_family, key); s = GetBatchForWrite()->Delete(column_family, key);
num_deletes_++; if (s.ok()) {
num_deletes_++;
}
} }
return s; return s;
@ -315,8 +323,10 @@ Status TransactionBaseImpl::Delete(ColumnFamilyHandle* column_family,
TryLock(column_family, key, false /* read_only */, true /* exclusive */); TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) { if (s.ok()) {
GetBatchForWrite()->Delete(column_family, key); s = GetBatchForWrite()->Delete(column_family, key);
num_deletes_++; if (s.ok()) {
num_deletes_++;
}
} }
return s; return s;
@ -328,8 +338,10 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
TryLock(column_family, key, false /* read_only */, true /* exclusive */); TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) { if (s.ok()) {
GetBatchForWrite()->SingleDelete(column_family, key); s = GetBatchForWrite()->SingleDelete(column_family, key);
num_deletes_++; if (s.ok()) {
num_deletes_++;
}
} }
return s; return s;
@ -341,8 +353,10 @@ Status TransactionBaseImpl::SingleDelete(ColumnFamilyHandle* column_family,
TryLock(column_family, key, false /* read_only */, true /* exclusive */); TryLock(column_family, key, false /* read_only */, true /* exclusive */);
if (s.ok()) { if (s.ok()) {
GetBatchForWrite()->SingleDelete(column_family, key); s = GetBatchForWrite()->SingleDelete(column_family, key);
num_deletes_++; if (s.ok()) {
num_deletes_++;
}
} }
return s; return s;
@ -354,8 +368,10 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
true /* exclusive */, true /* untracked */); true /* exclusive */, true /* untracked */);
if (s.ok()) { if (s.ok()) {
GetBatchForWrite()->Put(column_family, key, value); s = GetBatchForWrite()->Put(column_family, key, value);
num_puts_++; if (s.ok()) {
num_puts_++;
}
} }
return s; return s;
@ -368,8 +384,10 @@ Status TransactionBaseImpl::PutUntracked(ColumnFamilyHandle* column_family,
true /* exclusive */, true /* untracked */); true /* exclusive */, true /* untracked */);
if (s.ok()) { if (s.ok()) {
GetBatchForWrite()->Put(column_family, key, value); s = GetBatchForWrite()->Put(column_family, key, value);
num_puts_++; if (s.ok()) {
num_puts_++;
}
} }
return s; return s;
@ -382,8 +400,10 @@ Status TransactionBaseImpl::MergeUntracked(ColumnFamilyHandle* column_family,
true /* exclusive */, true /* untracked */); true /* exclusive */, true /* untracked */);
if (s.ok()) { if (s.ok()) {
GetBatchForWrite()->Merge(column_family, key, value); s = GetBatchForWrite()->Merge(column_family, key, value);
num_merges_++; if (s.ok()) {
num_merges_++;
}
} }
return s; return s;
@ -395,8 +415,10 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
true /* exclusive */, true /* untracked */); true /* exclusive */, true /* untracked */);
if (s.ok()) { if (s.ok()) {
GetBatchForWrite()->Delete(column_family, key); s = GetBatchForWrite()->Delete(column_family, key);
num_deletes_++; if (s.ok()) {
num_deletes_++;
}
} }
return s; return s;
@ -408,8 +430,10 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
true /* exclusive */, true /* untracked */); true /* exclusive */, true /* untracked */);
if (s.ok()) { if (s.ok()) {
GetBatchForWrite()->Delete(column_family, key); s = GetBatchForWrite()->Delete(column_family, key);
num_deletes_++; if (s.ok()) {
num_deletes_++;
}
} }
return s; return s;

@ -284,10 +284,10 @@ class TransactionBaseImpl : public Transaction {
num_merges_(num_merges) {} num_merges_(num_merges) {}
}; };
private:
// Records writes pending in this transaction // Records writes pending in this transaction
WriteBatchWithIndex write_batch_; WriteBatchWithIndex write_batch_;
private:
// batch to be written at commit time // batch to be written at commit time
WriteBatch commit_time_batch_; WriteBatch commit_time_batch_;

@ -60,6 +60,7 @@ void TransactionImpl::Initialize(const TransactionOptions& txn_options) {
deadlock_detect_ = txn_options.deadlock_detect; deadlock_detect_ = txn_options.deadlock_detect;
deadlock_detect_depth_ = txn_options.deadlock_detect_depth; deadlock_detect_depth_ = txn_options.deadlock_detect_depth;
write_batch_.SetMaxBytes(txn_options.max_write_batch_size);
lock_timeout_ = txn_options.lock_timeout * 1000; lock_timeout_ = txn_options.lock_timeout * 1000;
if (lock_timeout_ < 0) { if (lock_timeout_ < 0) {

@ -4471,6 +4471,35 @@ TEST_P(TransactionTest, TransactionStressTest) {
ASSERT_OK(s); ASSERT_OK(s);
} }
TEST_P(TransactionTest, MemoryLimitTest) {
TransactionOptions txn_options;
// Header (12 bytes) + NOOP (1 byte) + 2 * 8 bytes for data.
txn_options.max_write_batch_size = 29;
string value;
Status s;
Transaction* txn = db->BeginTransaction(WriteOptions(), txn_options);
ASSERT_TRUE(txn);
ASSERT_EQ(0, txn->GetNumPuts());
ASSERT_LE(0, txn->GetID());
s = txn->Put(Slice("a"), Slice("...."));
ASSERT_OK(s);
ASSERT_EQ(1, txn->GetNumPuts());
s = txn->Put(Slice("b"), Slice("...."));
ASSERT_OK(s);
ASSERT_EQ(2, txn->GetNumPuts());
s = txn->Put(Slice("b"), Slice("...."));
ASSERT_TRUE(s.IsMemoryLimit());
ASSERT_EQ(2, txn->GetNumPuts());
txn->Rollback();
delete txn;
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -386,8 +386,8 @@ class WBWIIteratorImpl : public WBWIIterator {
struct WriteBatchWithIndex::Rep { struct WriteBatchWithIndex::Rep {
Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
bool _overwrite_key = false) size_t max_bytes = 0, bool _overwrite_key = false)
: write_batch(reserved_bytes), : write_batch(reserved_bytes, max_bytes),
comparator(index_comparator, &write_batch), comparator(index_comparator, &write_batch),
skip_list(comparator, &arena), skip_list(comparator, &arena),
overwrite_key(_overwrite_key), overwrite_key(_overwrite_key),
@ -565,17 +565,18 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
return s; return s;
} }
WriteBatchWithIndex::WriteBatchWithIndex( WriteBatchWithIndex::WriteBatchWithIndex(
const Comparator* default_index_comparator, size_t reserved_bytes, const Comparator* default_index_comparator, size_t reserved_bytes,
bool overwrite_key) bool overwrite_key, size_t max_bytes)
: rep(new Rep(default_index_comparator, reserved_bytes, overwrite_key)) {} : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
overwrite_key)) {}
WriteBatchWithIndex::~WriteBatchWithIndex() {} WriteBatchWithIndex::~WriteBatchWithIndex() {}
WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; }
WBWIIterator* WriteBatchWithIndex::NewIterator() { WBWIIterator* WriteBatchWithIndex::NewIterator() {
return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch); return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch);
} }
WBWIIterator* WriteBatchWithIndex::NewIterator( WBWIIterator* WriteBatchWithIndex::NewIterator(
@ -604,75 +605,105 @@ Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) {
rep->comparator.default_comparator()); rep->comparator.default_comparator());
} }
void WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) { const Slice& key, const Slice& value) {
rep->SetLastEntryOffset(); rep->SetLastEntryOffset();
rep->write_batch.Put(column_family, key, value); auto s = rep->write_batch.Put(column_family, key, value);
rep->AddOrUpdateIndex(column_family, key); if (s.ok()) {
rep->AddOrUpdateIndex(column_family, key);
}
return s;
} }
void WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
rep->SetLastEntryOffset(); rep->SetLastEntryOffset();
rep->write_batch.Put(key, value); auto s = rep->write_batch.Put(key, value);
rep->AddOrUpdateIndex(key); if (s.ok()) {
rep->AddOrUpdateIndex(key);
}
return s;
} }
void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
const Slice& key) { const Slice& key) {
rep->SetLastEntryOffset(); rep->SetLastEntryOffset();
rep->write_batch.Delete(column_family, key); auto s = rep->write_batch.Delete(column_family, key);
rep->AddOrUpdateIndex(column_family, key); if (s.ok()) {
rep->AddOrUpdateIndex(column_family, key);
}
return s;
} }
void WriteBatchWithIndex::Delete(const Slice& key) { Status WriteBatchWithIndex::Delete(const Slice& key) {
rep->SetLastEntryOffset(); rep->SetLastEntryOffset();
rep->write_batch.Delete(key); auto s = rep->write_batch.Delete(key);
rep->AddOrUpdateIndex(key); if (s.ok()) {
rep->AddOrUpdateIndex(key);
}
return s;
} }
void WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family, Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) { const Slice& key) {
rep->SetLastEntryOffset(); rep->SetLastEntryOffset();
rep->write_batch.SingleDelete(column_family, key); auto s = rep->write_batch.SingleDelete(column_family, key);
rep->AddOrUpdateIndex(column_family, key); if (s.ok()) {
rep->AddOrUpdateIndex(column_family, key);
}
return s;
} }
void WriteBatchWithIndex::SingleDelete(const Slice& key) { Status WriteBatchWithIndex::SingleDelete(const Slice& key) {
rep->SetLastEntryOffset(); rep->SetLastEntryOffset();
rep->write_batch.SingleDelete(key); auto s = rep->write_batch.SingleDelete(key);
rep->AddOrUpdateIndex(key); if (s.ok()) {
rep->AddOrUpdateIndex(key);
}
return s;
} }
void WriteBatchWithIndex::DeleteRange(ColumnFamilyHandle* column_family, Status WriteBatchWithIndex::DeleteRange(ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& begin_key,
const Slice& end_key) { const Slice& end_key) {
rep->SetLastEntryOffset(); rep->SetLastEntryOffset();
rep->write_batch.DeleteRange(column_family, begin_key, end_key); auto s = rep->write_batch.DeleteRange(column_family, begin_key, end_key);
rep->AddOrUpdateIndex(column_family, begin_key); if (s.ok()) {
rep->AddOrUpdateIndex(column_family, begin_key);
}
return s;
} }
void WriteBatchWithIndex::DeleteRange(const Slice& begin_key, Status WriteBatchWithIndex::DeleteRange(const Slice& begin_key,
const Slice& end_key) { const Slice& end_key) {
rep->SetLastEntryOffset(); rep->SetLastEntryOffset();
rep->write_batch.DeleteRange(begin_key, end_key); auto s = rep->write_batch.DeleteRange(begin_key, end_key);
rep->AddOrUpdateIndex(begin_key); if (s.ok()) {
rep->AddOrUpdateIndex(begin_key);
}
return s;
} }
void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) { const Slice& key, const Slice& value) {
rep->SetLastEntryOffset(); rep->SetLastEntryOffset();
rep->write_batch.Merge(column_family, key, value); auto s = rep->write_batch.Merge(column_family, key, value);
rep->AddOrUpdateIndex(column_family, key); if (s.ok()) {
rep->AddOrUpdateIndex(column_family, key);
}
return s;
} }
void WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
rep->SetLastEntryOffset(); rep->SetLastEntryOffset();
rep->write_batch.Merge(key, value); auto s = rep->write_batch.Merge(key, value);
rep->AddOrUpdateIndex(key); if (s.ok()) {
rep->AddOrUpdateIndex(key);
}
return s;
} }
void WriteBatchWithIndex::PutLogData(const Slice& blob) { Status WriteBatchWithIndex::PutLogData(const Slice& blob) {
rep->write_batch.PutLogData(blob); return rep->write_batch.PutLogData(blob);
} }
void WriteBatchWithIndex::Clear() { rep->Clear(); } void WriteBatchWithIndex::Clear() { rep->Clear(); }
@ -799,5 +830,9 @@ Status WriteBatchWithIndex::RollbackToSavePoint() {
return s; return s;
} }
void WriteBatchWithIndex::SetMaxBytes(size_t max_bytes) {
rep->write_batch.SetMaxBytes(max_bytes);
}
} // namespace rocksdb } // namespace rocksdb
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE

@ -54,8 +54,8 @@ struct WriteBatchIndexEntry {
class ReadableWriteBatch : public WriteBatch { class ReadableWriteBatch : public WriteBatch {
public: public:
explicit ReadableWriteBatch(size_t reserved_bytes = 0) explicit ReadableWriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0)
: WriteBatch(reserved_bytes) {} : WriteBatch(reserved_bytes, max_bytes) {}
// Retrieve some information from a write entry in the write batch, given // Retrieve some information from a write entry in the write batch, given
// the start offset of the write entry. // the start offset of the write entry.
Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key,

Loading…
Cancel
Save