Add WriteOptions::protection_bytes_per_key (#10037)

Summary:
Added an option, `WriteOptions::protection_bytes_per_key`, that controls how many bytes per key we use for integrity protection in `WriteBatch`. It takes effect when `WriteBatch::GetProtectionBytesPerKey() == 0`.

Currently the only supported value is eight. Invoking a user API with it set to any other nonzero value will result in `Status::NotSupported` returned to the user.

There is also a bug fix for integrity protection with `inplace_callback`, where we forgot to take into account the possible change in varint length when calculating KV checksum for the final encoded buffer.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10037

Test Plan:
- Manual
  - Set default value of `WriteOptions::protection_bytes_per_key` to eight and ran `make check -j24`
  - Enabled in MyShadow for 1+ week
- Automated
  - Unit tests have a `WriteMode` that enables the integrity protection via `WriteOptions`
  - Crash test - in most cases, use `WriteOptions::protection_bytes_per_key` to enable integrity protection

Reviewed By: cbi42

Differential Revision: D36614569

Pulled By: ajkr

fbshipit-source-id: 8650087ceac9b61b560f1e5fafe5e1baf9c725fb
main
Andrew Kryczka 2 years ago committed by Facebook GitHub Bot
parent f62c1e1e56
commit 5d6005c780
  1. 2
      HISTORY.md
  2. 7
      db/db_impl/db_impl_open.cc
  3. 57
      db/db_impl/db_impl_write.cc
  4. 145
      db/db_kv_checksum_test.cc
  5. 4
      db/db_test.cc
  6. 1
      db/memtable.cc
  7. 32
      db/write_batch.cc
  8. 2
      db/write_batch_internal.h
  9. 4
      db/write_callback_test.cc
  10. 1
      db_stress_tool/db_stress_test_base.cc
  11. 7
      db_stress_tool/db_stress_tool.cc
  12. 10
      include/rocksdb/options.h
  13. 2
      include/rocksdb/utilities/write_batch_with_index.h
  14. 3
      include/rocksdb/write_batch.h
  15. 2
      tools/db_crashtest.py
  16. 35
      utilities/transactions/pessimistic_transaction_db.h
  17. 11
      utilities/transactions/transaction_base.cc
  18. 4
      utilities/transactions/write_prepared_txn.cc
  19. 9
      utilities/transactions/write_prepared_txn_db.cc
  20. 5
      utilities/transactions/write_unprepared_txn.cc
  21. 4
      utilities/transactions/write_unprepared_txn_db.cc
  22. 9
      utilities/write_batch_with_index/write_batch_with_index.cc

@ -1,6 +1,7 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## Unreleased
### Bug Fixes ### Bug Fixes
* Fixed a bug in calculating key-value integrity protection for users of in-place memtable updates. In particular, the affected users would be those who configure `protection_bytes_per_key > 0` on `WriteBatch` or `WriteOptions`, and configure `inplace_callback != nullptr`.
* Fixed a bug where a snapshot taken during SST file ingestion would be unstable. * Fixed a bug where a snapshot taken during SST file ingestion would be unstable.
* Fixed a bug for non-TransactionDB with avoid_flush_during_recovery = true and TransactionDB where in case of crash, min_log_number_to_keep may not change on recovery and persisting a new MANIFEST with advanced log_numbers for some column families, results in "column family inconsistency" error on second recovery. As a solution, RocksDB will persist the new MANIFEST after successfully syncing the new WAL. If a future recovery starts from the new MANIFEST, then it means the new WAL is successfully synced. Due to the sentinel empty write batch at the beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point. If future recovery starts from the old MANIFEST, it means the writing the new MANIFEST failed. We won't have the "SST ahead of WAL" error. * Fixed a bug for non-TransactionDB with avoid_flush_during_recovery = true and TransactionDB where in case of crash, min_log_number_to_keep may not change on recovery and persisting a new MANIFEST with advanced log_numbers for some column families, results in "column family inconsistency" error on second recovery. As a solution, RocksDB will persist the new MANIFEST after successfully syncing the new WAL. If a future recovery starts from the new MANIFEST, then it means the new WAL is successfully synced. Due to the sentinel empty write batch at the beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point. If future recovery starts from the old MANIFEST, it means the writing the new MANIFEST failed. We won't have the "SST ahead of WAL" error.
* Fixed a bug where RocksDB DB::Open() may creates and writes to two new MANIFEST files even before recovery succeeds. Now writes to MANIFEST are persisted only after recovery is successful. * Fixed a bug where RocksDB DB::Open() may creates and writes to two new MANIFEST files even before recovery succeeds. Now writes to MANIFEST are persisted only after recovery is successful.
@ -26,6 +27,7 @@
* The contract for implementations of Comparator::IsSameLengthImmediateSuccessor has been updated to work around a design bug in `auto_prefix_mode`. * The contract for implementations of Comparator::IsSameLengthImmediateSuccessor has been updated to work around a design bug in `auto_prefix_mode`.
* The API documentation for `auto_prefix_mode` now notes some corner cases in which it returns different results than `total_order_seek`, due to design bugs that are not easily fixed. Users using built-in comparators and keys at least the size of a fixed prefix length are not affected. * The API documentation for `auto_prefix_mode` now notes some corner cases in which it returns different results than `total_order_seek`, due to design bugs that are not easily fixed. Users using built-in comparators and keys at least the size of a fixed prefix length are not affected.
* Obsoleted the NUM_DATA_BLOCKS_READ_PER_LEVEL stat and introduced the NUM_LEVEL_READ_PER_MULTIGET and MULTIGET_COROUTINE_COUNT stats * Obsoleted the NUM_DATA_BLOCKS_READ_PER_LEVEL stat and introduced the NUM_LEVEL_READ_PER_MULTIGET and MULTIGET_COROUTINE_COUNT stats
* Introduced `WriteOptions::protection_bytes_per_key`, which can be used to enable key-value integrity protection for live updates.
### New Features ### New Features
* Add FileSystem::ReadAsync API in io_tracing * Add FileSystem::ReadAsync API in io_tracing

@ -1003,12 +1003,17 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
// We create a new batch and initialize with a valid prot_info_ to store // We create a new batch and initialize with a valid prot_info_ to store
// the data checksums // the data checksums
WriteBatch batch(0, 0, 8, 0); WriteBatch batch;
status = WriteBatchInternal::SetContents(&batch, record); status = WriteBatchInternal::SetContents(&batch, record);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
status = WriteBatchInternal::UpdateProtectionInfo(&batch,
8 /* bytes_per_key */);
if (!status.ok()) {
return status;
}
SequenceNumber sequence = WriteBatchInternal::Sequence(&batch); SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);

@ -106,15 +106,31 @@ void DBImpl::SetRecoverableStatePreReleaseCallback(
} }
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
return WriteImpl(write_options, my_batch, /*callback=*/nullptr, Status s;
/*log_used=*/nullptr); if (write_options.protection_bytes_per_key > 0) {
s = WriteBatchInternal::UpdateProtectionInfo(
my_batch, write_options.protection_bytes_per_key);
}
if (s.ok()) {
s = WriteImpl(write_options, my_batch, /*callback=*/nullptr,
/*log_used=*/nullptr);
}
return s;
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
Status DBImpl::WriteWithCallback(const WriteOptions& write_options, Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch, WriteBatch* my_batch,
WriteCallback* callback) { WriteCallback* callback) {
return WriteImpl(write_options, my_batch, callback, nullptr); Status s;
if (write_options.protection_bytes_per_key > 0) {
s = WriteBatchInternal::UpdateProtectionInfo(
my_batch, write_options.protection_bytes_per_key);
}
if (s.ok()) {
s = WriteImpl(write_options, my_batch, callback, nullptr);
}
return s;
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
@ -129,6 +145,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PreReleaseCallback* pre_release_callback, PreReleaseCallback* pre_release_callback,
PostMemTableCallback* post_memtable_callback) { PostMemTableCallback* post_memtable_callback) {
assert(!seq_per_batch_ || batch_cnt != 0); assert(!seq_per_batch_ || batch_cnt != 0);
assert(my_batch == nullptr || my_batch->Count() == 0 ||
write_options.protection_bytes_per_key == 0 ||
write_options.protection_bytes_per_key ==
my_batch->GetProtectionBytesPerKey());
if (my_batch == nullptr) { if (my_batch == nullptr) {
return Status::InvalidArgument("Batch is nullptr!"); return Status::InvalidArgument("Batch is nullptr!");
} else if (!disable_memtable && } else if (!disable_memtable &&
@ -156,6 +176,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
"rate-limiting automatic WAL flush, which requires " "rate-limiting automatic WAL flush, which requires "
"`WriteOptions::disableWAL` and " "`WriteOptions::disableWAL` and "
"`DBOptions::manual_wal_flush` both set to false"); "`DBOptions::manual_wal_flush` both set to false");
} else if (write_options.protection_bytes_per_key != 0 &&
write_options.protection_bytes_per_key != 8) {
return Status::InvalidArgument(
"`WriteOptions::protection_bytes_per_key` must be zero or eight");
} }
// TODO: this use of operator bool on `tracer_` can avoid unnecessary lock // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock
// grabs but does not seem thread-safe. // grabs but does not seem thread-safe.
@ -2188,7 +2212,8 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
// Pre-allocate size of write batch conservatively. // Pre-allocate size of write batch conservatively.
// 8 bytes are taken by header, 4 bytes for count, 1 byte for type, // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
// and we allocate 11 extra bytes for key length, as well as value length. // and we allocate 11 extra bytes for key length, as well as value length.
WriteBatch batch(key.size() + value.size() + 24); WriteBatch batch(key.size() + value.size() + 24, 0 /* max_bytes */,
opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);
Status s = batch.Put(column_family, key, value); Status s = batch.Put(column_family, key, value);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -2202,7 +2227,9 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
assert(default_cf); assert(default_cf);
const Comparator* const default_cf_ucmp = default_cf->GetComparator(); const Comparator* const default_cf_ucmp = default_cf->GetComparator();
assert(default_cf_ucmp); assert(default_cf_ucmp);
WriteBatch batch(0, 0, 0, default_cf_ucmp->timestamp_size()); WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
opt.protection_bytes_per_key,
default_cf_ucmp->timestamp_size());
Status s = batch.Put(column_family, key, ts, value); Status s = batch.Put(column_family, key, ts, value);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -2212,7 +2239,8 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key) { const Slice& key) {
WriteBatch batch; WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);
Status s = batch.Delete(column_family, key); Status s = batch.Delete(column_family, key);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -2226,7 +2254,9 @@ Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
assert(default_cf); assert(default_cf);
const Comparator* const default_cf_ucmp = default_cf->GetComparator(); const Comparator* const default_cf_ucmp = default_cf->GetComparator();
assert(default_cf_ucmp); assert(default_cf_ucmp);
WriteBatch batch(0, 0, 0, default_cf_ucmp->timestamp_size()); WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
opt.protection_bytes_per_key,
default_cf_ucmp->timestamp_size());
Status s = batch.Delete(column_family, key, ts); Status s = batch.Delete(column_family, key, ts);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -2236,7 +2266,8 @@ Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
Status DB::SingleDelete(const WriteOptions& opt, Status DB::SingleDelete(const WriteOptions& opt,
ColumnFamilyHandle* column_family, const Slice& key) { ColumnFamilyHandle* column_family, const Slice& key) {
WriteBatch batch; WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);
Status s = batch.SingleDelete(column_family, key); Status s = batch.SingleDelete(column_family, key);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -2251,7 +2282,9 @@ Status DB::SingleDelete(const WriteOptions& opt,
assert(default_cf); assert(default_cf);
const Comparator* const default_cf_ucmp = default_cf->GetComparator(); const Comparator* const default_cf_ucmp = default_cf->GetComparator();
assert(default_cf_ucmp); assert(default_cf_ucmp);
WriteBatch batch(0, 0, 0, default_cf_ucmp->timestamp_size()); WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
opt.protection_bytes_per_key,
default_cf_ucmp->timestamp_size());
Status s = batch.SingleDelete(column_family, key, ts); Status s = batch.SingleDelete(column_family, key, ts);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -2262,7 +2295,8 @@ Status DB::SingleDelete(const WriteOptions& opt,
Status DB::DeleteRange(const WriteOptions& opt, Status DB::DeleteRange(const WriteOptions& opt,
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key) { const Slice& begin_key, const Slice& end_key) {
WriteBatch batch; WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);
Status s = batch.DeleteRange(column_family, begin_key, end_key); Status s = batch.DeleteRange(column_family, begin_key, end_key);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -2272,7 +2306,8 @@ Status DB::DeleteRange(const WriteOptions& opt,
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) { const Slice& key, const Slice& value) {
WriteBatch batch; WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */);
Status s = batch.Merge(column_family, key, value); Status s = batch.Merge(column_family, key, value);
if (!s.ok()) { if (!s.ok()) {
return s; return s;

@ -15,7 +15,6 @@ enum class WriteBatchOpType {
kSingleDelete, kSingleDelete,
kDeleteRange, kDeleteRange,
kMerge, kMerge,
kBlobIndex,
kNum, kNum,
}; };
@ -25,11 +24,28 @@ WriteBatchOpType operator+(WriteBatchOpType lhs, const int rhs) {
return static_cast<WriteBatchOpType>(static_cast<T>(lhs) + rhs); return static_cast<WriteBatchOpType>(static_cast<T>(lhs) + rhs);
} }
enum class WriteMode {
// `Write()` a `WriteBatch` constructed with `protection_bytes_per_key > 0`.
kWriteProtectedBatch = 0,
// `Write()` a `WriteBatch` constructed with `protection_bytes_per_key == 0`.
// Protection is enabled via `WriteOptions::protection_bytes_per_key > 0`.
kWriteUnprotectedBatch,
// TODO(ajkr): add a mode that uses `Write()` wrappers, e.g., `Put()`.
kNum,
};
// Integer addition is needed for `::testing::Range()` to take the enum type.
WriteMode operator+(WriteMode lhs, const int rhs) {
using T = std::underlying_type<WriteMode>::type;
return static_cast<WriteMode>(static_cast<T>(lhs) + rhs);
}
std::pair<WriteBatch, Status> GetWriteBatch(ColumnFamilyHandle* cf_handle, std::pair<WriteBatch, Status> GetWriteBatch(ColumnFamilyHandle* cf_handle,
size_t protection_bytes_per_key,
WriteBatchOpType op_type) { WriteBatchOpType op_type) {
Status s; Status s;
WriteBatch wb(0 /* reserved_bytes */, 0 /* max_bytes */, WriteBatch wb(0 /* reserved_bytes */, 0 /* max_bytes */,
8 /* protection_bytes_per_entry */, 0 /* default_cf_ts_sz */); protection_bytes_per_key, 0 /* default_cf_ts_sz */);
switch (op_type) { switch (op_type) {
case WriteBatchOpType::kPut: case WriteBatchOpType::kPut:
s = wb.Put(cf_handle, "key", "val"); s = wb.Put(cf_handle, "key", "val");
@ -46,36 +62,44 @@ std::pair<WriteBatch, Status> GetWriteBatch(ColumnFamilyHandle* cf_handle,
case WriteBatchOpType::kMerge: case WriteBatchOpType::kMerge:
s = wb.Merge(cf_handle, "key", "val"); s = wb.Merge(cf_handle, "key", "val");
break; break;
case WriteBatchOpType::kBlobIndex: {
// TODO(ajkr): use public API once available.
uint32_t cf_id;
if (cf_handle == nullptr) {
cf_id = 0;
} else {
cf_id = cf_handle->GetID();
}
std::string blob_index;
BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 9876543210,
"val");
s = WriteBatchInternal::PutBlobIndex(&wb, cf_id, "key", blob_index);
break;
}
case WriteBatchOpType::kNum: case WriteBatchOpType::kNum:
assert(false); assert(false);
} }
return {std::move(wb), std::move(s)}; return {std::move(wb), std::move(s)};
} }
class DbKvChecksumTest class DbKvChecksumTest : public DBTestBase,
: public DBTestBase, public ::testing::WithParamInterface<
public ::testing::WithParamInterface<std::tuple<WriteBatchOpType, char>> { std::tuple<WriteBatchOpType, char, WriteMode>> {
public: public:
DbKvChecksumTest() DbKvChecksumTest()
: DBTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) { : DBTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) {
op_type_ = std::get<0>(GetParam()); op_type_ = std::get<0>(GetParam());
corrupt_byte_addend_ = std::get<1>(GetParam()); corrupt_byte_addend_ = std::get<1>(GetParam());
write_mode_ = std::get<2>(GetParam());
}
Status ExecuteWrite(ColumnFamilyHandle* cf_handle) {
switch (write_mode_) {
case WriteMode::kWriteProtectedBatch: {
auto batch_and_status = GetWriteBatch(
cf_handle, 8 /* protection_bytes_per_key */, op_type_);
assert(batch_and_status.second.ok());
return db_->Write(WriteOptions(), &batch_and_status.first);
}
case WriteMode::kWriteUnprotectedBatch: {
auto batch_and_status = GetWriteBatch(
cf_handle, 0 /* protection_bytes_per_key */, op_type_);
assert(batch_and_status.second.ok());
WriteOptions write_opts;
write_opts.protection_bytes_per_key = 8;
return db_->Write(write_opts, &batch_and_status.first);
}
case WriteMode::kNum:
assert(false);
}
return Status::NotSupported("WriteMode " +
std::to_string(static_cast<int>(write_mode_)));
} }
void CorruptNextByteCallBack(void* arg) { void CorruptNextByteCallBack(void* arg) {
@ -96,6 +120,7 @@ class DbKvChecksumTest
protected: protected:
WriteBatchOpType op_type_; WriteBatchOpType op_type_;
char corrupt_byte_addend_; char corrupt_byte_addend_;
WriteMode write_mode_;
size_t corrupt_byte_offset_ = 0; size_t corrupt_byte_offset_ = 0;
size_t entry_len_ = std::numeric_limits<size_t>::max(); size_t entry_len_ = std::numeric_limits<size_t>::max();
}; };
@ -114,9 +139,6 @@ std::string GetOpTypeString(const WriteBatchOpType& op_type) {
case WriteBatchOpType::kMerge: case WriteBatchOpType::kMerge:
return "Merge"; return "Merge";
break; break;
case WriteBatchOpType::kBlobIndex:
return "BlobIndex";
break;
case WriteBatchOpType::kNum: case WriteBatchOpType::kNum:
assert(false); assert(false);
} }
@ -128,15 +150,31 @@ INSTANTIATE_TEST_CASE_P(
DbKvChecksumTest, DbKvChecksumTest, DbKvChecksumTest, DbKvChecksumTest,
::testing::Combine(::testing::Range(static_cast<WriteBatchOpType>(0), ::testing::Combine(::testing::Range(static_cast<WriteBatchOpType>(0),
WriteBatchOpType::kNum), WriteBatchOpType::kNum),
::testing::Values(2, 103, 251)), ::testing::Values(2, 103, 251),
[](const testing::TestParamInfo<std::tuple<WriteBatchOpType, char>>& args) { ::testing::Range(static_cast<WriteMode>(0),
WriteMode::kNum)),
[](const testing::TestParamInfo<
std::tuple<WriteBatchOpType, char, WriteMode>>& args) {
std::ostringstream oss; std::ostringstream oss;
oss << GetOpTypeString(std::get<0>(args.param)) << "Add" oss << GetOpTypeString(std::get<0>(args.param)) << "Add"
<< static_cast<int>( << static_cast<int>(
static_cast<unsigned char>(std::get<1>(args.param))); static_cast<unsigned char>(std::get<1>(args.param)));
switch (std::get<2>(args.param)) {
case WriteMode::kWriteProtectedBatch:
oss << "WriteProtectedBatch";
break;
case WriteMode::kWriteUnprotectedBatch:
oss << "WriteUnprotectedBatch";
break;
case WriteMode::kNum:
assert(false);
}
return oss.str(); return oss.str();
}); });
// TODO(ajkr): add a test that corrupts the `WriteBatch` contents. Such
// corruptions should only be detectable in `WriteMode::kWriteProtectedBatch`.
TEST_P(DbKvChecksumTest, MemTableAddCorrupted) { TEST_P(DbKvChecksumTest, MemTableAddCorrupted) {
// This test repeatedly attempts to write `WriteBatch`es containing a single // This test repeatedly attempts to write `WriteBatch`es containing a single
// entry of type `op_type_`. Each attempt has one byte corrupted in its // entry of type `op_type_`. Each attempt has one byte corrupted in its
@ -158,10 +196,7 @@ TEST_P(DbKvChecksumTest, MemTableAddCorrupted) {
Reopen(options); Reopen(options);
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
auto batch_and_status = GetWriteBatch(nullptr /* cf_handle */, op_type_); ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption());
ASSERT_OK(batch_and_status.second);
ASSERT_TRUE(
db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
// In case the above callback is not invoked, this test will run // In case the above callback is not invoked, this test will run
@ -194,10 +229,7 @@ TEST_P(DbKvChecksumTest, MemTableAddWithColumnFamilyCorrupted) {
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
auto batch_and_status = GetWriteBatch(handles_[1], op_type_); ASSERT_TRUE(ExecuteWrite(handles_[1]).IsCorruption());
ASSERT_OK(batch_and_status.second);
ASSERT_TRUE(
db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
// In case the above callback is not invoked, this test will run // In case the above callback is not invoked, this test will run
@ -209,7 +241,8 @@ TEST_P(DbKvChecksumTest, MemTableAddWithColumnFamilyCorrupted) {
TEST_P(DbKvChecksumTest, NoCorruptionCase) { TEST_P(DbKvChecksumTest, NoCorruptionCase) {
// If this test fails, we may have found a piece of malfunctioned hardware // If this test fails, we may have found a piece of malfunctioned hardware
auto batch_and_status = GetWriteBatch(nullptr, op_type_); auto batch_and_status =
GetWriteBatch(nullptr, 8 /* protection_bytes_per_key */, op_type_);
ASSERT_OK(batch_and_status.second); ASSERT_OK(batch_and_status.second);
ASSERT_OK(batch_and_status.first.VerifyChecksum()); ASSERT_OK(batch_and_status.first.VerifyChecksum());
} }
@ -238,10 +271,7 @@ TEST_P(DbKvChecksumTest, WriteToWALCorrupted) {
auto log_size_pre_write = dbfull()->TEST_total_log_size(); auto log_size_pre_write = dbfull()->TEST_total_log_size();
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
auto batch_and_status = GetWriteBatch(nullptr /* cf_handle */, op_type_); ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption());
ASSERT_OK(batch_and_status.second);
ASSERT_TRUE(
db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
// Confirm that nothing was written to WAL // Confirm that nothing was written to WAL
ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size()); ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size());
ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption()); ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
@ -279,10 +309,7 @@ TEST_P(DbKvChecksumTest, WriteToWALWithColumnFamilyCorrupted) {
auto log_size_pre_write = dbfull()->TEST_total_log_size(); auto log_size_pre_write = dbfull()->TEST_total_log_size();
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
auto batch_and_status = GetWriteBatch(handles_[1], op_type_); ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption());
ASSERT_OK(batch_and_status.second);
ASSERT_TRUE(
db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
// Confirm that nothing was written to WAL // Confirm that nothing was written to WAL
ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size()); ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size());
ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption()); ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
@ -322,9 +349,11 @@ void CorruptWriteBatch(Slice* content, size_t offset,
TEST_P(DbKvChecksumTestMergedBatch, NoCorruptionCase) { TEST_P(DbKvChecksumTestMergedBatch, NoCorruptionCase) {
// Veirfy write batch checksum after write batch append // Veirfy write batch checksum after write batch append
auto batch1 = GetWriteBatch(nullptr /* cf_handle */, op_type1_); auto batch1 = GetWriteBatch(nullptr /* cf_handle */,
8 /* protection_bytes_per_key */, op_type1_);
ASSERT_OK(batch1.second); ASSERT_OK(batch1.second);
auto batch2 = GetWriteBatch(nullptr /* cf_handle */, op_type2_); auto batch2 = GetWriteBatch(nullptr /* cf_handle */,
8 /* protection_bytes_per_key */, op_type2_);
ASSERT_OK(batch2.second); ASSERT_OK(batch2.second);
ASSERT_OK(WriteBatchInternal::Append(&batch1.first, &batch2.first)); ASSERT_OK(WriteBatchInternal::Append(&batch1.first, &batch2.first));
ASSERT_OK(batch1.first.VerifyChecksum()); ASSERT_OK(batch1.first.VerifyChecksum());
@ -345,11 +374,11 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) {
options.merge_operator = MergeOperators::CreateStringAppendOperator(); options.merge_operator = MergeOperators::CreateStringAppendOperator();
} }
auto leader_batch_and_status = auto leader_batch_and_status = GetWriteBatch(
GetWriteBatch(nullptr /* cf_handle */, op_type1_); nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type1_);
ASSERT_OK(leader_batch_and_status.second); ASSERT_OK(leader_batch_and_status.second);
auto follower_batch_and_status = auto follower_batch_and_status = GetWriteBatch(
GetWriteBatch(nullptr /* cf_handle */, op_type2_); nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type2_);
size_t leader_batch_size = leader_batch_and_status.first.GetDataSize(); size_t leader_batch_size = leader_batch_and_status.first.GetDataSize();
size_t total_bytes = size_t total_bytes =
leader_batch_size + follower_batch_and_status.first.GetDataSize(); leader_batch_size + follower_batch_and_status.first.GetDataSize();
@ -390,7 +419,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) {
// follower // follower
follower_thread = port::Thread([&]() { follower_thread = port::Thread([&]() {
follower_batch_and_status = follower_batch_and_status =
GetWriteBatch(nullptr /* cf_handle */, op_type2_); GetWriteBatch(nullptr /* cf_handle */,
8 /* protection_bytes_per_key */, op_type2_);
ASSERT_OK(follower_batch_and_status.second); ASSERT_OK(follower_batch_and_status.second);
ASSERT_TRUE( ASSERT_TRUE(
db_->Write(WriteOptions(), &follower_batch_and_status.first) db_->Write(WriteOptions(), &follower_batch_and_status.first)
@ -413,7 +443,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) {
Reopen(options); Reopen(options);
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
auto log_size_pre_write = dbfull()->TEST_total_log_size(); auto log_size_pre_write = dbfull()->TEST_total_log_size();
leader_batch_and_status = GetWriteBatch(nullptr /* cf_handle */, op_type1_); leader_batch_and_status = GetWriteBatch(
nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type1_);
ASSERT_OK(leader_batch_and_status.second); ASSERT_OK(leader_batch_and_status.second);
ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first) ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first)
.IsCorruption()); .IsCorruption());
@ -452,9 +483,11 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) {
} }
CreateAndReopenWithCF({"ramen"}, options); CreateAndReopenWithCF({"ramen"}, options);
auto leader_batch_and_status = GetWriteBatch(handles_[1], op_type1_); auto leader_batch_and_status =
GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type1_);
ASSERT_OK(leader_batch_and_status.second); ASSERT_OK(leader_batch_and_status.second);
auto follower_batch_and_status = GetWriteBatch(handles_[1], op_type2_); auto follower_batch_and_status =
GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type2_);
size_t leader_batch_size = leader_batch_and_status.first.GetDataSize(); size_t leader_batch_size = leader_batch_and_status.first.GetDataSize();
size_t total_bytes = size_t total_bytes =
leader_batch_size + follower_batch_and_status.first.GetDataSize(); leader_batch_size + follower_batch_and_status.first.GetDataSize();
@ -494,7 +527,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) {
// Start the other writer thread which will join the write group as // Start the other writer thread which will join the write group as
// follower // follower
follower_thread = port::Thread([&]() { follower_thread = port::Thread([&]() {
follower_batch_and_status = GetWriteBatch(handles_[1], op_type2_); follower_batch_and_status = GetWriteBatch(
handles_[1], 8 /* protection_bytes_per_key */, op_type2_);
ASSERT_OK(follower_batch_and_status.second); ASSERT_OK(follower_batch_and_status.second);
ASSERT_TRUE( ASSERT_TRUE(
db_->Write(WriteOptions(), &follower_batch_and_status.first) db_->Write(WriteOptions(), &follower_batch_and_status.first)
@ -518,7 +552,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) {
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "ramen"}, options); ReopenWithColumnFamilies({kDefaultColumnFamilyName, "ramen"}, options);
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
auto log_size_pre_write = dbfull()->TEST_total_log_size(); auto log_size_pre_write = dbfull()->TEST_total_log_size();
leader_batch_and_status = GetWriteBatch(handles_[1], op_type1_); leader_batch_and_status =
GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type1_);
ASSERT_OK(leader_batch_and_status.second); ASSERT_OK(leader_batch_and_status.second);
ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first) ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first)
.IsCorruption()); .IsCorruption());

@ -4271,7 +4271,9 @@ TEST_F(DBTest, ConcurrentFlushWAL) {
threads.emplace_back([&] { threads.emplace_back([&] {
for (size_t i = cnt; i < 2 * cnt; i++) { for (size_t i = cnt; i < 2 * cnt; i++) {
auto istr = std::to_string(i); auto istr = std::to_string(i);
WriteBatch batch; WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
wopt.protection_bytes_per_key,
0 /* default_cf_ts_sz */);
ASSERT_OK(batch.Put("a" + istr, "b" + istr)); ASSERT_OK(batch.Put("a" + istr, "b" + istr));
ASSERT_OK( ASSERT_OK(
dbfull()->WriteImpl(wopt, &batch, nullptr, nullptr, 0, true)); dbfull()->WriteImpl(wopt, &batch, nullptr, nullptr, 0, true));

@ -1159,6 +1159,7 @@ Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key,
if (VarintLength(new_prev_size) < VarintLength(prev_size)) { if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
// shift the value buffer as well. // shift the value buffer as well.
memcpy(p, prev_buffer, new_prev_size); memcpy(p, prev_buffer, new_prev_size);
prev_buffer = p;
} }
} }
RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);

@ -2844,16 +2844,10 @@ class ProtectionInfoUpdater : public WriteBatch::Handler {
Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= WriteBatchInternal::kHeader); assert(contents.size() >= WriteBatchInternal::kHeader);
assert(b->prot_info_ == nullptr);
b->rep_.assign(contents.data(), contents.size()); b->rep_.assign(contents.data(), contents.size());
b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
// If we have a prot_info_, update protection info entries for the batch.
if (b->prot_info_) {
ProtectionInfoUpdater prot_info_updater(b->prot_info_.get());
return b->Iterate(&prot_info_updater);
}
return Status::OK(); return Status::OK();
} }
@ -2910,4 +2904,28 @@ size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
} }
} }
Status WriteBatchInternal::UpdateProtectionInfo(WriteBatch* wb,
size_t bytes_per_key) {
if (bytes_per_key == 0) {
if (wb->prot_info_ != nullptr) {
wb->prot_info_.reset();
return Status::OK();
} else {
// Already not protected.
return Status::OK();
}
} else if (bytes_per_key == 8) {
if (wb->prot_info_ == nullptr) {
wb->prot_info_.reset(new WriteBatch::ProtectionInfo());
ProtectionInfoUpdater prot_info_updater(wb->prot_info_.get());
return wb->Iterate(&prot_info_updater);
} else {
// Already protected.
return Status::OK();
}
}
return Status::NotSupported(
"WriteBatch protection info must be zero or eight bytes/key");
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -236,6 +236,8 @@ class WriteBatchInternal {
static bool HasKeyWithTimestamp(const WriteBatch& wb) { static bool HasKeyWithTimestamp(const WriteBatch& wb) {
return wb.has_key_with_ts_; return wb.has_key_with_ts_;
} }
static Status UpdateProtectionInfo(WriteBatch* wb, size_t bytes_per_key);
}; };
// LocalSavePoint is similar to a scope guard // LocalSavePoint is similar to a scope guard

@ -307,6 +307,10 @@ TEST_P(WriteCallbackPTest, WriteWithCallbackTest) {
WriteOptions woptions; WriteOptions woptions;
woptions.disableWAL = !enable_WAL_; woptions.disableWAL = !enable_WAL_;
woptions.sync = enable_WAL_; woptions.sync = enable_WAL_;
if (woptions.protection_bytes_per_key > 0) {
ASSERT_OK(WriteBatchInternal::UpdateProtectionInfo(
&write_op.write_batch_, woptions.protection_bytes_per_key));
}
Status s; Status s;
if (seq_per_batch_) { if (seq_per_batch_) {
class PublishSeqCallback : public PreReleaseCallback { class PublishSeqCallback : public PreReleaseCallback {

@ -617,6 +617,7 @@ void StressTest::OperateDb(ThreadState* thread) {
write_opts.sync = true; write_opts.sync = true;
} }
write_opts.disableWAL = FLAGS_disable_wal; write_opts.disableWAL = FLAGS_disable_wal;
write_opts.protection_bytes_per_key = FLAGS_batch_protection_bytes_per_key;
const int prefix_bound = static_cast<int>(FLAGS_readpercent) + const int prefix_bound = static_cast<int>(FLAGS_readpercent) +
static_cast<int>(FLAGS_prefixpercent); static_cast<int>(FLAGS_prefixpercent);
const int write_bound = prefix_bound + static_cast<int>(FLAGS_writepercent); const int write_bound = prefix_bound + static_cast<int>(FLAGS_writepercent);

@ -265,13 +265,6 @@ int db_stress_tool(int argc, char** argv) {
"test_batches_snapshots must all be 0 when using compaction filter\n"); "test_batches_snapshots must all be 0 when using compaction filter\n");
exit(1); exit(1);
} }
if (FLAGS_batch_protection_bytes_per_key > 0 &&
!FLAGS_test_batches_snapshots) {
fprintf(stderr,
"Error: test_batches_snapshots must be enabled when "
"batch_protection_bytes_per_key > 0\n");
exit(1);
}
if (FLAGS_test_multi_ops_txns) { if (FLAGS_test_multi_ops_txns) {
CheckAndSetOptionsForMultiOpsTxnStressTest(); CheckAndSetOptionsForMultiOpsTxnStressTest();
} }

@ -1737,6 +1737,13 @@ struct WriteOptions {
// Default: `Env::IO_TOTAL` // Default: `Env::IO_TOTAL`
Env::IOPriority rate_limiter_priority; Env::IOPriority rate_limiter_priority;
// `protection_bytes_per_key` is the number of bytes used to store
// protection information for each key entry. Currently supported values are
// zero (disabled) and eight.
//
// Default: zero (disabled).
size_t protection_bytes_per_key;
WriteOptions() WriteOptions()
: sync(false), : sync(false),
disableWAL(false), disableWAL(false),
@ -1744,7 +1751,8 @@ struct WriteOptions {
no_slowdown(false), no_slowdown(false),
low_pri(false), low_pri(false),
memtable_insert_hint_per_batch(false), memtable_insert_hint_per_batch(false),
rate_limiter_priority(Env::IO_TOTAL) {} rate_limiter_priority(Env::IO_TOTAL),
protection_bytes_per_key(0) {}
}; };
// Options that control flush operations // Options that control flush operations

@ -98,7 +98,7 @@ class WriteBatchWithIndex : public WriteBatchBase {
explicit WriteBatchWithIndex( explicit WriteBatchWithIndex(
const Comparator* backup_index_comparator = BytewiseComparator(), const Comparator* backup_index_comparator = BytewiseComparator(),
size_t reserved_bytes = 0, bool overwrite_key = false, size_t reserved_bytes = 0, bool overwrite_key = false,
size_t max_bytes = 0); size_t max_bytes = 0, size_t protection_bytes_per_key = 0);
~WriteBatchWithIndex() override; ~WriteBatchWithIndex() override;
WriteBatchWithIndex(WriteBatchWithIndex&&); WriteBatchWithIndex(WriteBatchWithIndex&&);

@ -419,9 +419,6 @@ class WriteBatch : public WriteBatchBase {
struct ProtectionInfo; struct ProtectionInfo;
size_t GetProtectionBytesPerKey() const; size_t GetProtectionBytesPerKey() const;
// Clears prot_info_ if there are no entries.
void ClearProtectionInfoIfEmpty();
private: private:
friend class WriteBatchInternal; friend class WriteBatchInternal;
friend class LocalSavePoint; friend class LocalSavePoint;

@ -514,8 +514,6 @@ def finalize_and_sanitize(src_params):
dest_params["readpercent"] += dest_params.get("prefixpercent", 20) dest_params["readpercent"] += dest_params.get("prefixpercent", 20)
dest_params["prefixpercent"] = 0 dest_params["prefixpercent"] = 0
dest_params["test_batches_snapshots"] = 0 dest_params["test_batches_snapshots"] = 0
if dest_params.get("test_batches_snapshots") == 0:
dest_params["batch_protection_bytes_per_key"] = 0
if (dest_params.get("prefix_size") == -1 and if (dest_params.get("prefix_size") == -1 and
dest_params.get("memtable_whole_key_filtering") == 0): dest_params.get("memtable_whole_key_filtering") == 0):
dest_params["memtable_prefix_bloom_size_ratio"] = 0 dest_params["memtable_prefix_bloom_size_ratio"] = 0

@ -71,20 +71,27 @@ class PessimisticTransactionDB : public TransactionDB {
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
inline Status WriteWithConcurrencyControl(const WriteOptions& opts, inline Status WriteWithConcurrencyControl(const WriteOptions& opts,
WriteBatch* updates) { WriteBatch* updates) {
// Need to lock all keys in this batch to prevent write conflicts with Status s;
// concurrent transactions. if (opts.protection_bytes_per_key > 0) {
Transaction* txn = BeginInternalTransaction(opts); s = WriteBatchInternal::UpdateProtectionInfo(
txn->DisableIndexing(); updates, opts.protection_bytes_per_key);
}
auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn); if (s.ok()) {
// Need to lock all keys in this batch to prevent write conflicts with
// Since commitBatch sorts the keys before locking, concurrent Write() // concurrent transactions.
// operations will not cause a deadlock. Transaction* txn = BeginInternalTransaction(opts);
// In order to avoid a deadlock with a concurrent Transaction, Transactions txn->DisableIndexing();
// should use a lock timeout.
Status s = txn_impl->CommitBatch(updates); auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
delete txn; // Since commitBatch sorts the keys before locking, concurrent Write()
// operations will not cause a deadlock.
// In order to avoid a deadlock with a concurrent Transaction,
// Transactions should use a lock timeout.
s = txn_impl->CommitBatch(updates);
delete txn;
}
return s; return s;
} }

@ -67,8 +67,11 @@ TransactionBaseImpl::TransactionBaseImpl(
cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())), cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
lock_tracker_factory_(lock_tracker_factory), lock_tracker_factory_(lock_tracker_factory),
start_time_(dbimpl_->GetSystemClock()->NowMicros()), start_time_(dbimpl_->GetSystemClock()->NowMicros()),
write_batch_(cmp_, 0, true, 0), write_batch_(cmp_, 0, true, 0, write_options.protection_bytes_per_key),
tracked_locks_(lock_tracker_factory_.Create()), tracked_locks_(lock_tracker_factory_.Create()),
commit_time_batch_(0 /* reserved_bytes */, 0 /* max_bytes */,
write_options.protection_bytes_per_key,
0 /* default_cf_ts_sz */),
indexing_enabled_(true) { indexing_enabled_(true) {
assert(dynamic_cast<DBImpl*>(db_) != nullptr); assert(dynamic_cast<DBImpl*>(db_) != nullptr);
log_number_ = 0; log_number_ = 0;
@ -108,6 +111,12 @@ void TransactionBaseImpl::Reinitialize(DB* db,
start_time_ = dbimpl_->GetSystemClock()->NowMicros(); start_time_ = dbimpl_->GetSystemClock()->NowMicros();
indexing_enabled_ = true; indexing_enabled_ = true;
cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily()); cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily());
WriteBatchInternal::UpdateProtectionInfo(
write_batch_.GetWriteBatch(), write_options_.protection_bytes_per_key)
.PermitUncheckedError();
WriteBatchInternal::UpdateProtectionInfo(
&commit_time_batch_, write_options_.protection_bytes_per_key)
.PermitUncheckedError();
} }
void TransactionBaseImpl::SetSnapshot() { void TransactionBaseImpl::SetSnapshot() {

@ -267,7 +267,9 @@ Status WritePreparedTxn::RollbackInternal() {
assert(db_impl_); assert(db_impl_);
assert(wpt_db_); assert(wpt_db_);
WriteBatch rollback_batch; WriteBatch rollback_batch(0 /* reserved_bytes */, 0 /* max_bytes */,
write_options_.protection_bytes_per_key,
0 /* default_cf_ts_sz */);
assert(GetId() != kMaxSequenceNumber); assert(GetId() != kMaxSequenceNumber);
assert(GetId() > 0); assert(GetId() > 0);
auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap(); auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();

@ -166,6 +166,15 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
// increased for this batch. // increased for this batch.
return Status::OK(); return Status::OK();
} }
if (write_options_orig.protection_bytes_per_key > 0) {
auto s = WriteBatchInternal::UpdateProtectionInfo(
batch, write_options_orig.protection_bytes_per_key);
if (!s.ok()) {
return s;
}
}
if (batch_cnt == 0) { // not provided, then compute it if (batch_cnt == 0) { // not provided, then compute it
// TODO(myabandeh): add an option to allow user skipping this cost // TODO(myabandeh): add an option to allow user skipping this cost
SubBatchCounter counter(*GetCFComparatorMap()); SubBatchCounter counter(*GetCFComparatorMap());

@ -464,7 +464,7 @@ Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() {
// only used if the write batch encounters an invalid cf id, and falls back to // only used if the write batch encounters an invalid cf id, and falls back to
// this comparator. // this comparator.
WriteBatchWithIndex wb(wpt_db_->DefaultColumnFamily()->GetComparator(), 0, WriteBatchWithIndex wb(wpt_db_->DefaultColumnFamily()->GetComparator(), 0,
true, 0); true, 0, write_options_.protection_bytes_per_key);
// Swap with write_batch_ so that wb contains the complete write batch. The // Swap with write_batch_ so that wb contains the complete write batch. The
// actual write batch that will be flushed to DB will be built in // actual write batch that will be flushed to DB will be built in
// write_batch_, and will be read by FlushWriteBatchToDBInternal. // write_batch_, and will be read by FlushWriteBatchToDBInternal.
@ -722,7 +722,8 @@ Status WriteUnpreparedTxn::WriteRollbackKeys(
Status WriteUnpreparedTxn::RollbackInternal() { Status WriteUnpreparedTxn::RollbackInternal() {
// TODO(lth): Reduce duplicate code with WritePrepared rollback logic. // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
WriteBatchWithIndex rollback_batch( WriteBatchWithIndex rollback_batch(
wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0); wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0,
write_options_.protection_bytes_per_key);
assert(GetId() != kMaxSequenceNumber); assert(GetId() != kMaxSequenceNumber);
assert(GetId() > 0); assert(GetId() > 0);
Status s; Status s;

@ -59,7 +59,9 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); ++it) { for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); ++it) {
auto last_visible_txn = it->first - 1; auto last_visible_txn = it->first - 1;
const auto& batch = it->second.batch_; const auto& batch = it->second.batch_;
WriteBatch rollback_batch; WriteBatch rollback_batch(0 /* reserved_bytes */, 0 /* max_bytes */,
w_options.protection_bytes_per_key,
0 /* default_cf_ts_sz */);
struct RollbackWriteBatchBuilder : public WriteBatch::Handler { struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
DBImpl* db_; DBImpl* db_;

@ -25,8 +25,9 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
struct WriteBatchWithIndex::Rep { struct WriteBatchWithIndex::Rep {
explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
size_t max_bytes = 0, bool _overwrite_key = false) size_t max_bytes = 0, bool _overwrite_key = false,
: write_batch(reserved_bytes, max_bytes, /*protection_bytes_per_key=*/0, size_t protection_bytes_per_key = 0)
: write_batch(reserved_bytes, max_bytes, protection_bytes_per_key,
index_comparator ? index_comparator->timestamp_size() : 0), index_comparator ? index_comparator->timestamp_size() : 0),
comparator(index_comparator, &write_batch), comparator(index_comparator, &write_batch),
skip_list(comparator, &arena), skip_list(comparator, &arena),
@ -262,9 +263,9 @@ Status WriteBatchWithIndex::Rep::ReBuildIndex() {
WriteBatchWithIndex::WriteBatchWithIndex( WriteBatchWithIndex::WriteBatchWithIndex(
const Comparator* default_index_comparator, size_t reserved_bytes, const Comparator* default_index_comparator, size_t reserved_bytes,
bool overwrite_key, size_t max_bytes) bool overwrite_key, size_t max_bytes, size_t protection_bytes_per_key)
: rep(new Rep(default_index_comparator, reserved_bytes, max_bytes, : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes,
overwrite_key)) {} overwrite_key, protection_bytes_per_key)) {}
WriteBatchWithIndex::~WriteBatchWithIndex() {} WriteBatchWithIndex::~WriteBatchWithIndex() {}

Loading…
Cancel
Save