diff --git a/HISTORY.md b/HISTORY.md index a455f19b4..6405ec8a5 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,6 +1,7 @@ # Rocksdb Change Log ## Unreleased ### Bug Fixes +* Fixed a bug in calculating key-value integrity protection for users of in-place memtable updates. In particular, the affected users would be those who configure `protection_bytes_per_key > 0` on `WriteBatch` or `WriteOptions`, and configure `inplace_callback != nullptr`. * Fixed a bug where a snapshot taken during SST file ingestion would be unstable. * Fixed a bug for non-TransactionDB with avoid_flush_during_recovery = true and TransactionDB where in case of crash, min_log_number_to_keep may not change on recovery and persisting a new MANIFEST with advanced log_numbers for some column families, results in "column family inconsistency" error on second recovery. As a solution, RocksDB will persist the new MANIFEST after successfully syncing the new WAL. If a future recovery starts from the new MANIFEST, then it means the new WAL is successfully synced. Due to the sentinel empty write batch at the beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point. If future recovery starts from the old MANIFEST, it means the writing the new MANIFEST failed. We won't have the "SST ahead of WAL" error. * Fixed a bug where RocksDB DB::Open() may creates and writes to two new MANIFEST files even before recovery succeeds. Now writes to MANIFEST are persisted only after recovery is successful. @@ -26,6 +27,7 @@ * The contract for implementations of Comparator::IsSameLengthImmediateSuccessor has been updated to work around a design bug in `auto_prefix_mode`. * The API documentation for `auto_prefix_mode` now notes some corner cases in which it returns different results than `total_order_seek`, due to design bugs that are not easily fixed. Users using built-in comparators and keys at least the size of a fixed prefix length are not affected. * Obsoleted the NUM_DATA_BLOCKS_READ_PER_LEVEL stat and introduced the NUM_LEVEL_READ_PER_MULTIGET and MULTIGET_COROUTINE_COUNT stats +* Introduced `WriteOptions::protection_bytes_per_key`, which can be used to enable key-value integrity protection for live updates. ### New Features * Add FileSystem::ReadAsync API in io_tracing diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 7c4852ab2..c71bd7429 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1003,12 +1003,17 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, // We create a new batch and initialize with a valid prot_info_ to store // the data checksums - WriteBatch batch(0, 0, 8, 0); + WriteBatch batch; status = WriteBatchInternal::SetContents(&batch, record); if (!status.ok()) { return status; } + status = WriteBatchInternal::UpdateProtectionInfo(&batch, + 8 /* bytes_per_key */); + if (!status.ok()) { + return status; + } SequenceNumber sequence = WriteBatchInternal::Sequence(&batch); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 787006d35..09287a4ef 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -106,15 +106,31 @@ void DBImpl::SetRecoverableStatePreReleaseCallback( } Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { - return WriteImpl(write_options, my_batch, /*callback=*/nullptr, - /*log_used=*/nullptr); + Status s; + if (write_options.protection_bytes_per_key > 0) { + s = WriteBatchInternal::UpdateProtectionInfo( + my_batch, write_options.protection_bytes_per_key); + } + if (s.ok()) { + s = WriteImpl(write_options, my_batch, /*callback=*/nullptr, + /*log_used=*/nullptr); + } + return s; } #ifndef ROCKSDB_LITE Status DBImpl::WriteWithCallback(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback) { - return WriteImpl(write_options, my_batch, callback, nullptr); + Status s; + if (write_options.protection_bytes_per_key > 0) { + s = WriteBatchInternal::UpdateProtectionInfo( + my_batch, write_options.protection_bytes_per_key); + } + if (s.ok()) { + s = WriteImpl(write_options, my_batch, callback, nullptr); + } + return s; } #endif // ROCKSDB_LITE @@ -129,6 +145,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PreReleaseCallback* pre_release_callback, PostMemTableCallback* post_memtable_callback) { assert(!seq_per_batch_ || batch_cnt != 0); + assert(my_batch == nullptr || my_batch->Count() == 0 || + write_options.protection_bytes_per_key == 0 || + write_options.protection_bytes_per_key == + my_batch->GetProtectionBytesPerKey()); if (my_batch == nullptr) { return Status::InvalidArgument("Batch is nullptr!"); } else if (!disable_memtable && @@ -156,6 +176,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, "rate-limiting automatic WAL flush, which requires " "`WriteOptions::disableWAL` and " "`DBOptions::manual_wal_flush` both set to false"); + } else if (write_options.protection_bytes_per_key != 0 && + write_options.protection_bytes_per_key != 8) { + return Status::InvalidArgument( + "`WriteOptions::protection_bytes_per_key` must be zero or eight"); } // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock // grabs but does not seem thread-safe. @@ -2188,7 +2212,8 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, // Pre-allocate size of write batch conservatively. // 8 bytes are taken by header, 4 bytes for count, 1 byte for type, // and we allocate 11 extra bytes for key length, as well as value length. - WriteBatch batch(key.size() + value.size() + 24); + WriteBatch batch(key.size() + value.size() + 24, 0 /* max_bytes */, + opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); Status s = batch.Put(column_family, key, value); if (!s.ok()) { return s; @@ -2202,7 +2227,9 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, assert(default_cf); const Comparator* const default_cf_ucmp = default_cf->GetComparator(); assert(default_cf_ucmp); - WriteBatch batch(0, 0, 0, default_cf_ucmp->timestamp_size()); + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, + default_cf_ucmp->timestamp_size()); Status s = batch.Put(column_family, key, ts, value); if (!s.ok()) { return s; @@ -2212,7 +2239,8 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key) { - WriteBatch batch; + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); Status s = batch.Delete(column_family, key); if (!s.ok()) { return s; @@ -2226,7 +2254,9 @@ Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, assert(default_cf); const Comparator* const default_cf_ucmp = default_cf->GetComparator(); assert(default_cf_ucmp); - WriteBatch batch(0, 0, 0, default_cf_ucmp->timestamp_size()); + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, + default_cf_ucmp->timestamp_size()); Status s = batch.Delete(column_family, key, ts); if (!s.ok()) { return s; @@ -2236,7 +2266,8 @@ Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, Status DB::SingleDelete(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key) { - WriteBatch batch; + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); Status s = batch.SingleDelete(column_family, key); if (!s.ok()) { return s; @@ -2251,7 +2282,9 @@ Status DB::SingleDelete(const WriteOptions& opt, assert(default_cf); const Comparator* const default_cf_ucmp = default_cf->GetComparator(); assert(default_cf_ucmp); - WriteBatch batch(0, 0, 0, default_cf_ucmp->timestamp_size()); + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, + default_cf_ucmp->timestamp_size()); Status s = batch.SingleDelete(column_family, key, ts); if (!s.ok()) { return s; @@ -2262,7 +2295,8 @@ Status DB::SingleDelete(const WriteOptions& opt, Status DB::DeleteRange(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& begin_key, const Slice& end_key) { - WriteBatch batch; + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); Status s = batch.DeleteRange(column_family, begin_key, end_key); if (!s.ok()) { return s; @@ -2272,7 +2306,8 @@ Status DB::DeleteRange(const WriteOptions& opt, Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - WriteBatch batch; + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); Status s = batch.Merge(column_family, key, value); if (!s.ok()) { return s; diff --git a/db/db_kv_checksum_test.cc b/db/db_kv_checksum_test.cc index 5636c9e6e..3bed9d784 100644 --- a/db/db_kv_checksum_test.cc +++ b/db/db_kv_checksum_test.cc @@ -15,7 +15,6 @@ enum class WriteBatchOpType { kSingleDelete, kDeleteRange, kMerge, - kBlobIndex, kNum, }; @@ -25,11 +24,28 @@ WriteBatchOpType operator+(WriteBatchOpType lhs, const int rhs) { return static_cast(static_cast(lhs) + rhs); } +enum class WriteMode { + // `Write()` a `WriteBatch` constructed with `protection_bytes_per_key > 0`. + kWriteProtectedBatch = 0, + // `Write()` a `WriteBatch` constructed with `protection_bytes_per_key == 0`. + // Protection is enabled via `WriteOptions::protection_bytes_per_key > 0`. + kWriteUnprotectedBatch, + // TODO(ajkr): add a mode that uses `Write()` wrappers, e.g., `Put()`. + kNum, +}; + +// Integer addition is needed for `::testing::Range()` to take the enum type. +WriteMode operator+(WriteMode lhs, const int rhs) { + using T = std::underlying_type::type; + return static_cast(static_cast(lhs) + rhs); +} + std::pair GetWriteBatch(ColumnFamilyHandle* cf_handle, + size_t protection_bytes_per_key, WriteBatchOpType op_type) { Status s; WriteBatch wb(0 /* reserved_bytes */, 0 /* max_bytes */, - 8 /* protection_bytes_per_entry */, 0 /* default_cf_ts_sz */); + protection_bytes_per_key, 0 /* default_cf_ts_sz */); switch (op_type) { case WriteBatchOpType::kPut: s = wb.Put(cf_handle, "key", "val"); @@ -46,36 +62,44 @@ std::pair GetWriteBatch(ColumnFamilyHandle* cf_handle, case WriteBatchOpType::kMerge: s = wb.Merge(cf_handle, "key", "val"); break; - case WriteBatchOpType::kBlobIndex: { - // TODO(ajkr): use public API once available. - uint32_t cf_id; - if (cf_handle == nullptr) { - cf_id = 0; - } else { - cf_id = cf_handle->GetID(); - } - - std::string blob_index; - BlobIndex::EncodeInlinedTTL(&blob_index, /* expiration */ 9876543210, - "val"); - - s = WriteBatchInternal::PutBlobIndex(&wb, cf_id, "key", blob_index); - break; - } case WriteBatchOpType::kNum: assert(false); } return {std::move(wb), std::move(s)}; } -class DbKvChecksumTest - : public DBTestBase, - public ::testing::WithParamInterface> { +class DbKvChecksumTest : public DBTestBase, + public ::testing::WithParamInterface< + std::tuple> { public: DbKvChecksumTest() : DBTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) { op_type_ = std::get<0>(GetParam()); corrupt_byte_addend_ = std::get<1>(GetParam()); + write_mode_ = std::get<2>(GetParam()); + } + + Status ExecuteWrite(ColumnFamilyHandle* cf_handle) { + switch (write_mode_) { + case WriteMode::kWriteProtectedBatch: { + auto batch_and_status = GetWriteBatch( + cf_handle, 8 /* protection_bytes_per_key */, op_type_); + assert(batch_and_status.second.ok()); + return db_->Write(WriteOptions(), &batch_and_status.first); + } + case WriteMode::kWriteUnprotectedBatch: { + auto batch_and_status = GetWriteBatch( + cf_handle, 0 /* protection_bytes_per_key */, op_type_); + assert(batch_and_status.second.ok()); + WriteOptions write_opts; + write_opts.protection_bytes_per_key = 8; + return db_->Write(write_opts, &batch_and_status.first); + } + case WriteMode::kNum: + assert(false); + } + return Status::NotSupported("WriteMode " + + std::to_string(static_cast(write_mode_))); } void CorruptNextByteCallBack(void* arg) { @@ -96,6 +120,7 @@ class DbKvChecksumTest protected: WriteBatchOpType op_type_; char corrupt_byte_addend_; + WriteMode write_mode_; size_t corrupt_byte_offset_ = 0; size_t entry_len_ = std::numeric_limits::max(); }; @@ -114,9 +139,6 @@ std::string GetOpTypeString(const WriteBatchOpType& op_type) { case WriteBatchOpType::kMerge: return "Merge"; break; - case WriteBatchOpType::kBlobIndex: - return "BlobIndex"; - break; case WriteBatchOpType::kNum: assert(false); } @@ -128,15 +150,31 @@ INSTANTIATE_TEST_CASE_P( DbKvChecksumTest, DbKvChecksumTest, ::testing::Combine(::testing::Range(static_cast(0), WriteBatchOpType::kNum), - ::testing::Values(2, 103, 251)), - [](const testing::TestParamInfo>& args) { + ::testing::Values(2, 103, 251), + ::testing::Range(static_cast(0), + WriteMode::kNum)), + [](const testing::TestParamInfo< + std::tuple>& args) { std::ostringstream oss; oss << GetOpTypeString(std::get<0>(args.param)) << "Add" << static_cast( static_cast(std::get<1>(args.param))); + switch (std::get<2>(args.param)) { + case WriteMode::kWriteProtectedBatch: + oss << "WriteProtectedBatch"; + break; + case WriteMode::kWriteUnprotectedBatch: + oss << "WriteUnprotectedBatch"; + break; + case WriteMode::kNum: + assert(false); + } return oss.str(); }); +// TODO(ajkr): add a test that corrupts the `WriteBatch` contents. Such +// corruptions should only be detectable in `WriteMode::kWriteProtectedBatch`. + TEST_P(DbKvChecksumTest, MemTableAddCorrupted) { // This test repeatedly attempts to write `WriteBatch`es containing a single // entry of type `op_type_`. Each attempt has one byte corrupted in its @@ -158,10 +196,7 @@ TEST_P(DbKvChecksumTest, MemTableAddCorrupted) { Reopen(options); SyncPoint::GetInstance()->EnableProcessing(); - auto batch_and_status = GetWriteBatch(nullptr /* cf_handle */, op_type_); - ASSERT_OK(batch_and_status.second); - ASSERT_TRUE( - db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption()); + ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption()); SyncPoint::GetInstance()->DisableProcessing(); // In case the above callback is not invoked, this test will run @@ -194,10 +229,7 @@ TEST_P(DbKvChecksumTest, MemTableAddWithColumnFamilyCorrupted) { ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); SyncPoint::GetInstance()->EnableProcessing(); - auto batch_and_status = GetWriteBatch(handles_[1], op_type_); - ASSERT_OK(batch_and_status.second); - ASSERT_TRUE( - db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption()); + ASSERT_TRUE(ExecuteWrite(handles_[1]).IsCorruption()); SyncPoint::GetInstance()->DisableProcessing(); // In case the above callback is not invoked, this test will run @@ -209,7 +241,8 @@ TEST_P(DbKvChecksumTest, MemTableAddWithColumnFamilyCorrupted) { TEST_P(DbKvChecksumTest, NoCorruptionCase) { // If this test fails, we may have found a piece of malfunctioned hardware - auto batch_and_status = GetWriteBatch(nullptr, op_type_); + auto batch_and_status = + GetWriteBatch(nullptr, 8 /* protection_bytes_per_key */, op_type_); ASSERT_OK(batch_and_status.second); ASSERT_OK(batch_and_status.first.VerifyChecksum()); } @@ -238,10 +271,7 @@ TEST_P(DbKvChecksumTest, WriteToWALCorrupted) { auto log_size_pre_write = dbfull()->TEST_total_log_size(); SyncPoint::GetInstance()->EnableProcessing(); - auto batch_and_status = GetWriteBatch(nullptr /* cf_handle */, op_type_); - ASSERT_OK(batch_and_status.second); - ASSERT_TRUE( - db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption()); + ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption()); // Confirm that nothing was written to WAL ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size()); ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption()); @@ -279,10 +309,7 @@ TEST_P(DbKvChecksumTest, WriteToWALWithColumnFamilyCorrupted) { auto log_size_pre_write = dbfull()->TEST_total_log_size(); SyncPoint::GetInstance()->EnableProcessing(); - auto batch_and_status = GetWriteBatch(handles_[1], op_type_); - ASSERT_OK(batch_and_status.second); - ASSERT_TRUE( - db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption()); + ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption()); // Confirm that nothing was written to WAL ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size()); ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption()); @@ -322,9 +349,11 @@ void CorruptWriteBatch(Slice* content, size_t offset, TEST_P(DbKvChecksumTestMergedBatch, NoCorruptionCase) { // Veirfy write batch checksum after write batch append - auto batch1 = GetWriteBatch(nullptr /* cf_handle */, op_type1_); + auto batch1 = GetWriteBatch(nullptr /* cf_handle */, + 8 /* protection_bytes_per_key */, op_type1_); ASSERT_OK(batch1.second); - auto batch2 = GetWriteBatch(nullptr /* cf_handle */, op_type2_); + auto batch2 = GetWriteBatch(nullptr /* cf_handle */, + 8 /* protection_bytes_per_key */, op_type2_); ASSERT_OK(batch2.second); ASSERT_OK(WriteBatchInternal::Append(&batch1.first, &batch2.first)); ASSERT_OK(batch1.first.VerifyChecksum()); @@ -345,11 +374,11 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) { options.merge_operator = MergeOperators::CreateStringAppendOperator(); } - auto leader_batch_and_status = - GetWriteBatch(nullptr /* cf_handle */, op_type1_); + auto leader_batch_and_status = GetWriteBatch( + nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type1_); ASSERT_OK(leader_batch_and_status.second); - auto follower_batch_and_status = - GetWriteBatch(nullptr /* cf_handle */, op_type2_); + auto follower_batch_and_status = GetWriteBatch( + nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type2_); size_t leader_batch_size = leader_batch_and_status.first.GetDataSize(); size_t total_bytes = leader_batch_size + follower_batch_and_status.first.GetDataSize(); @@ -390,7 +419,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) { // follower follower_thread = port::Thread([&]() { follower_batch_and_status = - GetWriteBatch(nullptr /* cf_handle */, op_type2_); + GetWriteBatch(nullptr /* cf_handle */, + 8 /* protection_bytes_per_key */, op_type2_); ASSERT_OK(follower_batch_and_status.second); ASSERT_TRUE( db_->Write(WriteOptions(), &follower_batch_and_status.first) @@ -413,7 +443,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) { Reopen(options); SyncPoint::GetInstance()->EnableProcessing(); auto log_size_pre_write = dbfull()->TEST_total_log_size(); - leader_batch_and_status = GetWriteBatch(nullptr /* cf_handle */, op_type1_); + leader_batch_and_status = GetWriteBatch( + nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type1_); ASSERT_OK(leader_batch_and_status.second); ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first) .IsCorruption()); @@ -452,9 +483,11 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) { } CreateAndReopenWithCF({"ramen"}, options); - auto leader_batch_and_status = GetWriteBatch(handles_[1], op_type1_); + auto leader_batch_and_status = + GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type1_); ASSERT_OK(leader_batch_and_status.second); - auto follower_batch_and_status = GetWriteBatch(handles_[1], op_type2_); + auto follower_batch_and_status = + GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type2_); size_t leader_batch_size = leader_batch_and_status.first.GetDataSize(); size_t total_bytes = leader_batch_size + follower_batch_and_status.first.GetDataSize(); @@ -494,7 +527,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) { // Start the other writer thread which will join the write group as // follower follower_thread = port::Thread([&]() { - follower_batch_and_status = GetWriteBatch(handles_[1], op_type2_); + follower_batch_and_status = GetWriteBatch( + handles_[1], 8 /* protection_bytes_per_key */, op_type2_); ASSERT_OK(follower_batch_and_status.second); ASSERT_TRUE( db_->Write(WriteOptions(), &follower_batch_and_status.first) @@ -518,7 +552,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) { ReopenWithColumnFamilies({kDefaultColumnFamilyName, "ramen"}, options); SyncPoint::GetInstance()->EnableProcessing(); auto log_size_pre_write = dbfull()->TEST_total_log_size(); - leader_batch_and_status = GetWriteBatch(handles_[1], op_type1_); + leader_batch_and_status = + GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type1_); ASSERT_OK(leader_batch_and_status.second); ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first) .IsCorruption()); diff --git a/db/db_test.cc b/db/db_test.cc index 3fb685680..a47e8fdb4 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4271,7 +4271,9 @@ TEST_F(DBTest, ConcurrentFlushWAL) { threads.emplace_back([&] { for (size_t i = cnt; i < 2 * cnt; i++) { auto istr = std::to_string(i); - WriteBatch batch; + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + wopt.protection_bytes_per_key, + 0 /* default_cf_ts_sz */); ASSERT_OK(batch.Put("a" + istr, "b" + istr)); ASSERT_OK( dbfull()->WriteImpl(wopt, &batch, nullptr, nullptr, 0, true)); diff --git a/db/memtable.cc b/db/memtable.cc index 4b609cae7..2f86d0758 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -1159,6 +1159,7 @@ Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key, if (VarintLength(new_prev_size) < VarintLength(prev_size)) { // shift the value buffer as well. memcpy(p, prev_buffer, new_prev_size); + prev_buffer = p; } } RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); diff --git a/db/write_batch.cc b/db/write_batch.cc index b919ea056..7a863c67f 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -2844,16 +2844,10 @@ class ProtectionInfoUpdater : public WriteBatch::Handler { Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= WriteBatchInternal::kHeader); + assert(b->prot_info_ == nullptr); b->rep_.assign(contents.data(), contents.size()); b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); - - // If we have a prot_info_, update protection info entries for the batch. - if (b->prot_info_) { - ProtectionInfoUpdater prot_info_updater(b->prot_info_.get()); - return b->Iterate(&prot_info_updater); - } - return Status::OK(); } @@ -2910,4 +2904,28 @@ size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize, } } +Status WriteBatchInternal::UpdateProtectionInfo(WriteBatch* wb, + size_t bytes_per_key) { + if (bytes_per_key == 0) { + if (wb->prot_info_ != nullptr) { + wb->prot_info_.reset(); + return Status::OK(); + } else { + // Already not protected. + return Status::OK(); + } + } else if (bytes_per_key == 8) { + if (wb->prot_info_ == nullptr) { + wb->prot_info_.reset(new WriteBatch::ProtectionInfo()); + ProtectionInfoUpdater prot_info_updater(wb->prot_info_.get()); + return wb->Iterate(&prot_info_updater); + } else { + // Already protected. + return Status::OK(); + } + } + return Status::NotSupported( + "WriteBatch protection info must be zero or eight bytes/key"); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 926acc63a..53e83a23e 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -236,6 +236,8 @@ class WriteBatchInternal { static bool HasKeyWithTimestamp(const WriteBatch& wb) { return wb.has_key_with_ts_; } + + static Status UpdateProtectionInfo(WriteBatch* wb, size_t bytes_per_key); }; // LocalSavePoint is similar to a scope guard diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index 1dc659aad..423b2a2aa 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -307,6 +307,10 @@ TEST_P(WriteCallbackPTest, WriteWithCallbackTest) { WriteOptions woptions; woptions.disableWAL = !enable_WAL_; woptions.sync = enable_WAL_; + if (woptions.protection_bytes_per_key > 0) { + ASSERT_OK(WriteBatchInternal::UpdateProtectionInfo( + &write_op.write_batch_, woptions.protection_bytes_per_key)); + } Status s; if (seq_per_batch_) { class PublishSeqCallback : public PreReleaseCallback { diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 466cef209..91036dc22 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -617,6 +617,7 @@ void StressTest::OperateDb(ThreadState* thread) { write_opts.sync = true; } write_opts.disableWAL = FLAGS_disable_wal; + write_opts.protection_bytes_per_key = FLAGS_batch_protection_bytes_per_key; const int prefix_bound = static_cast(FLAGS_readpercent) + static_cast(FLAGS_prefixpercent); const int write_bound = prefix_bound + static_cast(FLAGS_writepercent); diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 1729ee3f7..3e8490ccc 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -265,13 +265,6 @@ int db_stress_tool(int argc, char** argv) { "test_batches_snapshots must all be 0 when using compaction filter\n"); exit(1); } - if (FLAGS_batch_protection_bytes_per_key > 0 && - !FLAGS_test_batches_snapshots) { - fprintf(stderr, - "Error: test_batches_snapshots must be enabled when " - "batch_protection_bytes_per_key > 0\n"); - exit(1); - } if (FLAGS_test_multi_ops_txns) { CheckAndSetOptionsForMultiOpsTxnStressTest(); } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 19bc3cb19..72a2f7de1 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1737,6 +1737,13 @@ struct WriteOptions { // Default: `Env::IO_TOTAL` Env::IOPriority rate_limiter_priority; + // `protection_bytes_per_key` is the number of bytes used to store + // protection information for each key entry. Currently supported values are + // zero (disabled) and eight. + // + // Default: zero (disabled). + size_t protection_bytes_per_key; + WriteOptions() : sync(false), disableWAL(false), @@ -1744,7 +1751,8 @@ struct WriteOptions { no_slowdown(false), low_pri(false), memtable_insert_hint_per_batch(false), - rate_limiter_priority(Env::IO_TOTAL) {} + rate_limiter_priority(Env::IO_TOTAL), + protection_bytes_per_key(0) {} }; // Options that control flush operations diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 90174abaf..21974e67a 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -98,7 +98,7 @@ class WriteBatchWithIndex : public WriteBatchBase { explicit WriteBatchWithIndex( const Comparator* backup_index_comparator = BytewiseComparator(), size_t reserved_bytes = 0, bool overwrite_key = false, - size_t max_bytes = 0); + size_t max_bytes = 0, size_t protection_bytes_per_key = 0); ~WriteBatchWithIndex() override; WriteBatchWithIndex(WriteBatchWithIndex&&); diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index d8bd108ea..f4838272b 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -419,9 +419,6 @@ class WriteBatch : public WriteBatchBase { struct ProtectionInfo; size_t GetProtectionBytesPerKey() const; - // Clears prot_info_ if there are no entries. - void ClearProtectionInfoIfEmpty(); - private: friend class WriteBatchInternal; friend class LocalSavePoint; diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index cea8232d5..579885fd5 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -514,8 +514,6 @@ def finalize_and_sanitize(src_params): dest_params["readpercent"] += dest_params.get("prefixpercent", 20) dest_params["prefixpercent"] = 0 dest_params["test_batches_snapshots"] = 0 - if dest_params.get("test_batches_snapshots") == 0: - dest_params["batch_protection_bytes_per_key"] = 0 if (dest_params.get("prefix_size") == -1 and dest_params.get("memtable_whole_key_filtering") == 0): dest_params["memtable_prefix_bloom_size_ratio"] = 0 diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 68b6227ef..755b94a75 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -71,20 +71,27 @@ class PessimisticTransactionDB : public TransactionDB { virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; inline Status WriteWithConcurrencyControl(const WriteOptions& opts, WriteBatch* updates) { - // Need to lock all keys in this batch to prevent write conflicts with - // concurrent transactions. - Transaction* txn = BeginInternalTransaction(opts); - txn->DisableIndexing(); - - auto txn_impl = static_cast_with_check(txn); - - // Since commitBatch sorts the keys before locking, concurrent Write() - // operations will not cause a deadlock. - // In order to avoid a deadlock with a concurrent Transaction, Transactions - // should use a lock timeout. - Status s = txn_impl->CommitBatch(updates); - - delete txn; + Status s; + if (opts.protection_bytes_per_key > 0) { + s = WriteBatchInternal::UpdateProtectionInfo( + updates, opts.protection_bytes_per_key); + } + if (s.ok()) { + // Need to lock all keys in this batch to prevent write conflicts with + // concurrent transactions. + Transaction* txn = BeginInternalTransaction(opts); + txn->DisableIndexing(); + + auto txn_impl = static_cast_with_check(txn); + + // Since commitBatch sorts the keys before locking, concurrent Write() + // operations will not cause a deadlock. + // In order to avoid a deadlock with a concurrent Transaction, + // Transactions should use a lock timeout. + s = txn_impl->CommitBatch(updates); + + delete txn; + } return s; } diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 53d54abfb..c98cfcbf2 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -67,8 +67,11 @@ TransactionBaseImpl::TransactionBaseImpl( cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())), lock_tracker_factory_(lock_tracker_factory), start_time_(dbimpl_->GetSystemClock()->NowMicros()), - write_batch_(cmp_, 0, true, 0), + write_batch_(cmp_, 0, true, 0, write_options.protection_bytes_per_key), tracked_locks_(lock_tracker_factory_.Create()), + commit_time_batch_(0 /* reserved_bytes */, 0 /* max_bytes */, + write_options.protection_bytes_per_key, + 0 /* default_cf_ts_sz */), indexing_enabled_(true) { assert(dynamic_cast(db_) != nullptr); log_number_ = 0; @@ -108,6 +111,12 @@ void TransactionBaseImpl::Reinitialize(DB* db, start_time_ = dbimpl_->GetSystemClock()->NowMicros(); indexing_enabled_ = true; cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily()); + WriteBatchInternal::UpdateProtectionInfo( + write_batch_.GetWriteBatch(), write_options_.protection_bytes_per_key) + .PermitUncheckedError(); + WriteBatchInternal::UpdateProtectionInfo( + &commit_time_batch_, write_options_.protection_bytes_per_key) + .PermitUncheckedError(); } void TransactionBaseImpl::SetSnapshot() { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index ce2975354..1133f903a 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -267,7 +267,9 @@ Status WritePreparedTxn::RollbackInternal() { assert(db_impl_); assert(wpt_db_); - WriteBatch rollback_batch; + WriteBatch rollback_batch(0 /* reserved_bytes */, 0 /* max_bytes */, + write_options_.protection_bytes_per_key, + 0 /* default_cf_ts_sz */); assert(GetId() != kMaxSequenceNumber); assert(GetId() > 0); auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap(); diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index d70d9591d..c6661479a 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -166,6 +166,15 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, // increased for this batch. return Status::OK(); } + + if (write_options_orig.protection_bytes_per_key > 0) { + auto s = WriteBatchInternal::UpdateProtectionInfo( + batch, write_options_orig.protection_bytes_per_key); + if (!s.ok()) { + return s; + } + } + if (batch_cnt == 0) { // not provided, then compute it // TODO(myabandeh): add an option to allow user skipping this cost SubBatchCounter counter(*GetCFComparatorMap()); diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 2e375d54e..6e04d3344 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -464,7 +464,7 @@ Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() { // only used if the write batch encounters an invalid cf id, and falls back to // this comparator. WriteBatchWithIndex wb(wpt_db_->DefaultColumnFamily()->GetComparator(), 0, - true, 0); + true, 0, write_options_.protection_bytes_per_key); // Swap with write_batch_ so that wb contains the complete write batch. The // actual write batch that will be flushed to DB will be built in // write_batch_, and will be read by FlushWriteBatchToDBInternal. @@ -722,7 +722,8 @@ Status WriteUnpreparedTxn::WriteRollbackKeys( Status WriteUnpreparedTxn::RollbackInternal() { // TODO(lth): Reduce duplicate code with WritePrepared rollback logic. WriteBatchWithIndex rollback_batch( - wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0); + wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0, + write_options_.protection_bytes_per_key); assert(GetId() != kMaxSequenceNumber); assert(GetId() > 0); Status s; diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 0ef96d0a4..72a21755a 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -59,7 +59,9 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction( for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); ++it) { auto last_visible_txn = it->first - 1; const auto& batch = it->second.batch_; - WriteBatch rollback_batch; + WriteBatch rollback_batch(0 /* reserved_bytes */, 0 /* max_bytes */, + w_options.protection_bytes_per_key, + 0 /* default_cf_ts_sz */); struct RollbackWriteBatchBuilder : public WriteBatch::Handler { DBImpl* db_; diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index af13d901a..9f65216f7 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -25,8 +25,9 @@ namespace ROCKSDB_NAMESPACE { struct WriteBatchWithIndex::Rep { explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, - size_t max_bytes = 0, bool _overwrite_key = false) - : write_batch(reserved_bytes, max_bytes, /*protection_bytes_per_key=*/0, + size_t max_bytes = 0, bool _overwrite_key = false, + size_t protection_bytes_per_key = 0) + : write_batch(reserved_bytes, max_bytes, protection_bytes_per_key, index_comparator ? index_comparator->timestamp_size() : 0), comparator(index_comparator, &write_batch), skip_list(comparator, &arena), @@ -262,9 +263,9 @@ Status WriteBatchWithIndex::Rep::ReBuildIndex() { WriteBatchWithIndex::WriteBatchWithIndex( const Comparator* default_index_comparator, size_t reserved_bytes, - bool overwrite_key, size_t max_bytes) + bool overwrite_key, size_t max_bytes, size_t protection_bytes_per_key) : rep(new Rep(default_index_comparator, reserved_bytes, max_bytes, - overwrite_key)) {} + overwrite_key, protection_bytes_per_key)) {} WriteBatchWithIndex::~WriteBatchWithIndex() {}