diff --git a/db/column_family.cc b/db/column_family.cc index 369dc6615..42e13a13a 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1621,6 +1621,13 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( db_id_, db_session_id_); column_families_.insert({name, id}); column_family_data_.insert({id, new_cfd}); + auto ucmp = new_cfd->user_comparator(); + assert(ucmp); + size_t ts_sz = ucmp->timestamp_size(); + running_ts_sz_.insert({id, ts_sz}); + if (ts_sz > 0) { + ts_sz_for_record_.insert({id, ts_sz}); + } max_column_family_ = std::max(max_column_family_, id); // add to linked list new_cfd->next_ = dummy_cfd_; @@ -1636,10 +1643,13 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( // under a DB mutex AND from a write thread void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) { - auto cfd_iter = column_family_data_.find(cfd->GetID()); + uint32_t cf_id = cfd->GetID(); + auto cfd_iter = column_family_data_.find(cf_id); assert(cfd_iter != column_family_data_.end()); column_family_data_.erase(cfd_iter); column_families_.erase(cfd->GetName()); + running_ts_sz_.erase(cf_id); + ts_sz_for_record_.erase(cf_id); } // under a DB mutex OR from a write thread diff --git a/db/column_family.h b/db/column_family.h index 9ec093010..5fe593a84 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -705,6 +705,16 @@ class ColumnFamilySet { Version* dummy_version, const ColumnFamilyOptions& options); + const std::unordered_map& + GetRunningColumnFamiliesTimestampSize() const { + return running_ts_sz_; + } + + const std::unordered_map& + GetColumnFamiliesTimestampSizeForRecord() const { + return ts_sz_for_record_; + } + iterator begin() { return iterator(dummy_cfd_->next_); } iterator end() { return iterator(dummy_cfd_); } @@ -730,6 +740,15 @@ class ColumnFamilySet { UnorderedMap column_families_; UnorderedMap column_family_data_; + // Mutating / reading `running_ts_sz_` and `ts_sz_for_record_` follow + // the same requirements as `column_families_` and `column_family_data_`. + // Mapping from column family id to user-defined timestamp size for all + // running column families. + std::unordered_map running_ts_sz_; + // Mapping from column family id to user-defined timestamp size for + // column families with non-zero user-defined timestamp size. + std::unordered_map ts_sz_for_record_; + uint32_t max_column_family_; const FileOptions file_options_; diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 6bc8ae4c3..931062298 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -25,6 +25,7 @@ #include "rocksdb/wal_filter.h" #include "test_util/sync_point.h" #include "util/rate_limiter_impl.h" +#include "util/udt_util.h" namespace ROCKSDB_NAMESPACE { Options SanitizeOptions(const std::string& dbname, const Options& src, @@ -1186,6 +1187,9 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, std::string scratch; Slice record; + const std::unordered_map& running_ts_sz = + versions_->GetRunningColumnFamiliesTimestampSize(); + TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal", /*arg=*/nullptr); uint64_t record_checksum; @@ -1208,6 +1212,17 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, if (!status.ok()) { return status; } + + const std::unordered_map& record_ts_sz = + reader.GetRecordedTimestampSize(); + // TODO(yuzhangyu): update mode to kReconcileInconsistency when user + // comparator can be changed. + status = HandleWriteBatchTimestampSizeDifference( + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency); + if (!status.ok()) { + return status; + } TEST_SYNC_POINT_CALLBACK( "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch", &batch); TEST_SYNC_POINT_CALLBACK( diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 114b659f5..310342897 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -198,6 +198,9 @@ Status DBImplSecondary::RecoverLogFiles( } assert(reader != nullptr); } + + const std::unordered_map& running_ts_sz = + versions_->GetRunningColumnFamiliesTimestampSize(); for (auto log_number : log_numbers) { auto it = log_readers_.find(log_number); assert(it != log_readers_.end()); @@ -225,6 +228,14 @@ Status DBImplSecondary::RecoverLogFiles( if (!status.ok()) { break; } + const std::unordered_map& record_ts_sz = + reader->GetRecordedTimestampSize(); + status = HandleWriteBatchTimestampSizeDifference( + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency); + if (!status.ok()) { + break; + } SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch); std::vector column_family_ids; status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index fb74434dd..8a1a6ce31 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1328,7 +1328,13 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, if (UNLIKELY(needs_locking)) { log_write_mutex_.Lock(); } - IOStatus io_s = log_writer->AddRecord(log_entry, rate_limiter_priority); + IOStatus io_s = log_writer->MaybeAddUserDefinedTimestampSizeRecord( + versions_->GetColumnFamiliesTimestampSizeForRecord(), + rate_limiter_priority); + if (!io_s.ok()) { + return io_s; + } + io_s = log_writer->AddRecord(log_entry, rate_limiter_priority); if (UNLIKELY(needs_locking)) { log_write_mutex_.Unlock(); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index fad0f5d09..a96204937 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/db_test_util.h" +#include "db/db_with_timestamp_test_util.h" #include "options/options_helper.h" #include "port/port.h" #include "port/stack_trace.h" @@ -301,6 +302,96 @@ TEST_F(DBWALTest, Recover) { } while (ChangeWalOptions()); } +class DBWALTestWithTimestamp : public DBBasicTestWithTimestampBase { + public: + DBWALTestWithTimestamp() + : DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {} + + Status CreateAndReopenWithCFWithTs(const std::vector& cfs, + const Options& options) { + CreateColumnFamilies(cfs, options); + return ReopenColumnFamiliesWithTs(cfs, options); + } + + Status ReopenColumnFamiliesWithTs(const std::vector& cfs, + Options ts_options) { + Options default_options = CurrentOptions(); + default_options.create_if_missing = false; + ts_options.create_if_missing = false; + + std::vector cf_options(cfs.size(), ts_options); + std::vector cfs_plus_default = cfs; + cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName); + cf_options.insert(cf_options.begin(), default_options); + Close(); + return TryReopenWithColumnFamilies(cfs_plus_default, cf_options); + } + + Status Put(uint32_t cf, const Slice& key, const Slice& ts, + const Slice& value) { + WriteOptions write_opts; + return db_->Put(write_opts, handles_[cf], key, ts, value); + } + + void CheckGet(const ReadOptions& read_opts, uint32_t cf, const Slice& key, + const std::string& expected_value) { + std::string actual_value; + ASSERT_OK(db_->Get(read_opts, handles_[cf], key, &actual_value)); + ASSERT_EQ(expected_value, actual_value); + } +}; + +TEST_F(DBWALTestWithTimestamp, Recover) { + // Set up the option that enables user defined timestmp size. + std::string ts = Timestamp(1, 0); + const size_t kTimestampSize = ts.size(); + TestComparator test_cmp(kTimestampSize); + Options ts_options; + ts_options.create_if_missing = true; + ts_options.comparator = &test_cmp; + + ReadOptions read_opts; + Slice ts_slice = ts; + read_opts.timestamp = &ts_slice; + do { + ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options)); + ASSERT_OK(Put(1, "foo", ts, "v1")); + ASSERT_OK(Put(1, "baz", ts, "v5")); + + ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options)); + CheckGet(read_opts, 1, "foo", "v1"); + CheckGet(read_opts, 1, "baz", "v5"); + ASSERT_OK(Put(1, "bar", ts, "v2")); + ASSERT_OK(Put(1, "foo", ts, "v3")); + + ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options)); + CheckGet(read_opts, 1, "foo", "v3"); + ASSERT_OK(Put(1, "foo", ts, "v4")); + CheckGet(read_opts, 1, "foo", "v4"); + CheckGet(read_opts, 1, "bar", "v2"); + CheckGet(read_opts, 1, "baz", "v5"); + } while (ChangeWalOptions()); +} + +TEST_F(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) { + // Set up the option that enables user defined timestmp size. + std::string ts = Timestamp(1, 0); + const size_t kTimestampSize = ts.size(); + TestComparator test_cmp(kTimestampSize); + Options ts_options; + ts_options.create_if_missing = true; + ts_options.comparator = &test_cmp; + + ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options)); + ASSERT_OK(Put(1, "foo", ts, "v1")); + ASSERT_OK(Put(1, "baz", ts, "v5")); + + TestComparator diff_test_cmp(kTimestampSize + 1); + ts_options.comparator = &diff_test_cmp; + ASSERT_TRUE( + ReopenColumnFamiliesWithTs({"pikachu"}, ts_options).IsInvalidArgument()); +} + TEST_F(DBWALTest, RecoverWithTableHandle) { do { Options options = CurrentOptions(); diff --git a/db/repair.cc b/db/repair.cc index 67513cacc..847e82524 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -394,9 +394,12 @@ class Repairer { auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet()); // Read all the records and add to a memtable + const std::unordered_map& running_ts_sz = + vset_.GetRunningColumnFamiliesTimestampSize(); std::string scratch; Slice record; WriteBatch batch; + int counter = 0; while (reader.ReadRecord(&record, &scratch)) { if (record.size() < WriteBatchInternal::kHeader) { @@ -406,8 +409,15 @@ class Repairer { } Status record_status = WriteBatchInternal::SetContents(&batch, record); if (record_status.ok()) { - record_status = - WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr); + const std::unordered_map& record_ts_sz = + reader.GetRecordedTimestampSize(); + record_status = HandleWriteBatchTimestampSizeDifference( + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency); + if (record_status.ok()) { + record_status = + WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr); + } } if (record_status.ok()) { counter += WriteBatchInternal::Count(&batch); diff --git a/db/repair_test.cc b/db/repair_test.cc index 47482699d..a8cf5281d 100644 --- a/db/repair_test.cc +++ b/db/repair_test.cc @@ -3,17 +3,17 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -#include "rocksdb/options.h" - #include #include #include #include "db/db_impl/db_impl.h" #include "db/db_test_util.h" +#include "db/db_with_timestamp_test_util.h" #include "file/file_util.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" +#include "rocksdb/options.h" #include "rocksdb/transaction_log.h" #include "table/unique_id_impl.h" #include "util/string_util.h" @@ -315,6 +315,78 @@ TEST_F(RepairTest, UnflushedSst) { ASSERT_EQ(Get("key"), "val"); } +class RepairTestWithTimestamp : public DBBasicTestWithTimestampBase { + public: + RepairTestWithTimestamp() + : DBBasicTestWithTimestampBase("repair_test_with_timestamp") {} + + Status Put(const Slice& key, const Slice& ts, const Slice& value) { + WriteOptions write_opts; + return db_->Put(write_opts, handles_[0], key, ts, value); + } + + void CheckGet(const ReadOptions& read_opts, const Slice& key, + const std::string& expected_value) { + std::string actual_value; + ASSERT_OK(db_->Get(read_opts, handles_[0], key, &actual_value)); + ASSERT_EQ(expected_value, actual_value); + } +}; + +TEST_F(RepairTestWithTimestamp, UnflushedSst) { + Destroy(last_options_); + + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + std::string ts = Timestamp(0, 0); + const size_t kTimestampSize = ts.size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + + ColumnFamilyOptions cf_options(options); + std::vector column_families; + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); + + ASSERT_OK(DB::Open(options, dbname_, column_families, &handles_, &db_)); + + ASSERT_OK(Put("key", ts, "val")); + VectorLogPtr wal_files; + ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files)); + ASSERT_EQ(wal_files.size(), 1); + { + uint64_t total_ssts_size; + std::unordered_map sst_files; + ASSERT_OK(GetAllDataFiles(kTableFile, &sst_files, &total_ssts_size)); + ASSERT_EQ(total_ssts_size, 0); + } + // Need to get path before Close() deletes db_, but delete it after Close() to + // ensure Close() didn't change the manifest. + std::string manifest_path = + DescriptorFileName(dbname_, dbfull()->TEST_Current_Manifest_FileNo()); + + Close(); + ASSERT_OK(env_->FileExists(manifest_path)); + ASSERT_OK(env_->DeleteFile(manifest_path)); + ASSERT_OK(RepairDB(dbname_, options)); + ASSERT_OK(DB::Open(options, dbname_, column_families, &handles_, &db_)); + + ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files)); + ASSERT_EQ(wal_files.size(), 0); + { + uint64_t total_ssts_size; + std::unordered_map sst_files; + ASSERT_OK(GetAllDataFiles(kTableFile, &sst_files, &total_ssts_size)); + ASSERT_GT(total_ssts_size, 0); + } + + ReadOptions read_opts; + Slice read_ts_slice = ts; + read_opts.timestamp = &read_ts_slice; + CheckGet(read_opts, "key", "val"); +} + TEST_F(RepairTest, SeparateWalDir) { do { Options options = CurrentOptions(); diff --git a/db/version_set.h b/db/version_set.h index 25ad36583..3993a4a9f 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1467,6 +1467,17 @@ class VersionSet { uint64_t min_pending_output); ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); } + + const std::unordered_map& + GetRunningColumnFamiliesTimestampSize() const { + return column_family_set_->GetRunningColumnFamiliesTimestampSize(); + } + + const std::unordered_map& + GetColumnFamiliesTimestampSizeForRecord() const { + return column_family_set_->GetColumnFamiliesTimestampSizeForRecord(); + } + RefedColumnFamilySet GetRefedColumnFamilySet() { return RefedColumnFamilySet(GetColumnFamilySet()); } diff --git a/unreleased_history/new_features/logging_udt_sizes.md b/unreleased_history/new_features/logging_udt_sizes.md new file mode 100644 index 000000000..b87ec0dfd --- /dev/null +++ b/unreleased_history/new_features/logging_udt_sizes.md @@ -0,0 +1 @@ +Start logging non-zero user-defined timestamp sizes in WAL to signal user key format in subsequent records and use it during recovery. This change will break recovery from WAL files written by early versions that contain user-defined timestamps. The workaround is to ensure there are no WAL files to recover (i.e. by flushing before close) before upgrade. \ No newline at end of file diff --git a/util/udt_util.cc b/util/udt_util.cc index b57d49e07..6043b7755 100644 --- a/util/udt_util.cc +++ b/util/udt_util.cc @@ -108,7 +108,8 @@ TimestampRecoveryHandler::TimestampRecoveryHandler( : running_ts_sz_(running_ts_sz), record_ts_sz_(record_ts_sz), new_batch_(new WriteBatch()), - handler_valid_(true) {} + handler_valid_(true), + new_batch_diff_from_orig_batch_(false) {} Status TimestampRecoveryHandler::PutCF(uint32_t cf, const Slice& key, const Slice& value) { @@ -214,10 +215,12 @@ Status TimestampRecoveryHandler::ReconcileTimestampDiscrepancy( case RecoveryType::kStripTimestamp: assert(record_ts_sz.has_value()); *new_key = StripTimestampFromUserKey(key, record_ts_sz.value()); + new_batch_diff_from_orig_batch_ = true; break; case RecoveryType::kPadTimestamp: AppendKeyWithMinTimestamp(new_key_buf, key, running_ts_sz); *new_key = *new_key_buf; + new_batch_diff_from_orig_batch_ = true; break; case RecoveryType::kUnrecoverable: return Status::InvalidArgument( @@ -230,28 +233,30 @@ Status TimestampRecoveryHandler::ReconcileTimestampDiscrepancy( } Status HandleWriteBatchTimestampSizeDifference( + const WriteBatch* batch, const std::unordered_map& running_ts_sz, const std::unordered_map& record_ts_sz, TimestampSizeConsistencyMode check_mode, - std::unique_ptr& batch) { + std::unique_ptr* new_batch) { // Quick path to bypass checking the WriteBatch. if (AllRunningColumnFamiliesConsistent(running_ts_sz, record_ts_sz)) { return Status::OK(); } bool need_recovery = false; Status status = CheckWriteBatchTimestampSizeConsistency( - batch.get(), running_ts_sz, record_ts_sz, check_mode, &need_recovery); + batch, running_ts_sz, record_ts_sz, check_mode, &need_recovery); if (!status.ok()) { return status; } else if (need_recovery) { - SequenceNumber sequence = WriteBatchInternal::Sequence(batch.get()); + assert(new_batch); + SequenceNumber sequence = WriteBatchInternal::Sequence(batch); TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz); status = batch->Iterate(&recovery_handler); if (!status.ok()) { return status; } else { - batch = recovery_handler.TransferNewBatch(); - WriteBatchInternal::SetSequence(batch.get(), sequence); + *new_batch = recovery_handler.TransferNewBatch(); + WriteBatchInternal::SetSequence(new_batch->get(), sequence); } } return Status::OK(); diff --git a/util/udt_util.h b/util/udt_util.h index 78d2efdd4..7bf8b739e 100644 --- a/util/udt_util.h +++ b/util/udt_util.h @@ -143,6 +143,7 @@ class TimestampRecoveryHandler : public WriteBatch::Handler { Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); } std::unique_ptr&& TransferNewBatch() { + assert(new_batch_diff_from_orig_batch_); handler_valid_ = false; return std::move(new_batch_); } @@ -164,6 +165,10 @@ class TimestampRecoveryHandler : public WriteBatch::Handler { // Handler is valid upon creation and becomes invalid after its `new_batch_` // is transferred. bool handler_valid_; + + // False upon creation, and become true if at least one user key from the + // original batch is updated when creating the new batch. + bool new_batch_diff_from_orig_batch_; }; // Mode for checking and handling timestamp size inconsistency encountered in a @@ -193,8 +198,9 @@ enum class TimestampSizeConsistencyMode { // any running column family has an inconsistent user-defined timestamp size // that cannot be reconciled with a best-effort recovery. Check // `TimestampRecoveryHandler` for what a best-effort recovery is capable of. In -// this mode, a new WriteBatch is created on the heap and transferred to `batch` -// if there is tolerable inconsistency. +// this mode, output argument `new_batch` should be set, a new WriteBatch is +// created on the heap and transferred to `new_batch` if there is tolerable +// inconsistency. // // An invariant that WAL logging ensures is that all timestamp size info // is logged prior to a WriteBatch that needed this info. And zero timestamp @@ -204,8 +210,9 @@ enum class TimestampSizeConsistencyMode { // `running_ts_sz` should contain the timestamp size for all running column // families including the ones with zero timestamp size. Status HandleWriteBatchTimestampSizeDifference( + const WriteBatch* batch, const std::unordered_map& running_ts_sz, const std::unordered_map& record_ts_sz, TimestampSizeConsistencyMode check_mode, - std::unique_ptr& batch); + std::unique_ptr* new_batch = nullptr); } // namespace ROCKSDB_NAMESPACE diff --git a/util/udt_util_test.cc b/util/udt_util_test.cc index d91b32e49..9c87c23ab 100644 --- a/util/udt_util_test.cc +++ b/util/udt_util_test.cc @@ -106,18 +106,17 @@ class HandleTimestampSizeDifferenceTest : public testing::Test { void CreateWriteBatch( const std::unordered_map& ts_sz_for_batch, - std::unique_ptr& batch) { + WriteBatch* batch) { for (const auto& [cf_id, ts_sz] : ts_sz_for_batch) { std::string key; CreateKey(&key, ts_sz); + ASSERT_OK(WriteBatchInternal::Put(batch, cf_id, key, kValuePlaceHolder)); + ASSERT_OK(WriteBatchInternal::Delete(batch, cf_id, key)); + ASSERT_OK(WriteBatchInternal::SingleDelete(batch, cf_id, key)); + ASSERT_OK(WriteBatchInternal::DeleteRange(batch, cf_id, key, key)); ASSERT_OK( - WriteBatchInternal::Put(batch.get(), cf_id, key, kValuePlaceHolder)); - ASSERT_OK(WriteBatchInternal::Delete(batch.get(), cf_id, key)); - ASSERT_OK(WriteBatchInternal::SingleDelete(batch.get(), cf_id, key)); - ASSERT_OK(WriteBatchInternal::DeleteRange(batch.get(), cf_id, key, key)); - ASSERT_OK(WriteBatchInternal::Merge(batch.get(), cf_id, key, - kValuePlaceHolder)); - ASSERT_OK(WriteBatchInternal::PutBlobIndex(batch.get(), cf_id, key, + WriteBatchInternal::Merge(batch, cf_id, key, kValuePlaceHolder)); + ASSERT_OK(WriteBatchInternal::PutBlobIndex(batch, cf_id, key, kValuePlaceHolder)); } } @@ -189,19 +188,18 @@ TEST_F(HandleTimestampSizeDifferenceTest, AllColumnFamiliesConsistent) { std::unordered_map running_ts_sz = {{1, sizeof(uint64_t)}, {2, 0}}; std::unordered_map record_ts_sz = {{1, sizeof(uint64_t)}}; - std::unique_ptr batch(new WriteBatch()); - CreateWriteBatch(running_ts_sz, batch); - const WriteBatch* orig_batch = batch.get(); + WriteBatch batch; + CreateWriteBatch(running_ts_sz, &batch); // All `check_mode` pass with OK status and `batch` not checked or updated. ASSERT_OK(HandleWriteBatchTimestampSizeDifference( - running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency, batch)); - ASSERT_EQ(orig_batch, batch.get()); + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency)); + std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( - running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); - ASSERT_EQ(orig_batch, batch.get()); + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + ASSERT_TRUE(new_batch.get() == nullptr); } TEST_F(HandleTimestampSizeDifferenceTest, @@ -209,38 +207,36 @@ TEST_F(HandleTimestampSizeDifferenceTest, std::unordered_map running_ts_sz = {{2, 0}}; std::unordered_map record_ts_sz = {{1, sizeof(uint64_t)}, {3, sizeof(char)}}; - std::unique_ptr batch(new WriteBatch()); - CreateWriteBatch(record_ts_sz, batch); - const WriteBatch* orig_batch = batch.get(); + WriteBatch batch; + CreateWriteBatch(record_ts_sz, &batch); // All `check_mode` pass with OK status and `batch` not checked or updated. ASSERT_OK(HandleWriteBatchTimestampSizeDifference( - running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency, batch)); - ASSERT_EQ(orig_batch, batch.get()); + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency)); + std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( - running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); - ASSERT_EQ(orig_batch, batch.get()); + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + ASSERT_TRUE(new_batch.get() == nullptr); } TEST_F(HandleTimestampSizeDifferenceTest, InvolvedColumnFamiliesConsistent) { std::unordered_map running_ts_sz = {{1, sizeof(uint64_t)}, {2, sizeof(char)}}; std::unordered_map record_ts_sz = {{1, sizeof(uint64_t)}}; - std::unique_ptr batch(new WriteBatch()); - CreateWriteBatch(record_ts_sz, batch); - const WriteBatch* orig_batch = batch.get(); + WriteBatch batch; + CreateWriteBatch(record_ts_sz, &batch); // All `check_mode` pass with OK status and `batch` not updated. ASSERT_OK(HandleWriteBatchTimestampSizeDifference( - running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency, batch)); - ASSERT_EQ(orig_batch, batch.get()); + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency)); + std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( - running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); - ASSERT_EQ(orig_batch, batch.get()); + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + ASSERT_TRUE(new_batch.get() == nullptr); } TEST_F(HandleTimestampSizeDifferenceTest, @@ -248,23 +244,22 @@ TEST_F(HandleTimestampSizeDifferenceTest, std::unordered_map running_ts_sz = {{1, 0}, {2, sizeof(char)}}; std::unordered_map record_ts_sz = {{1, sizeof(uint64_t)}}; - std::unique_ptr batch(new WriteBatch()); - CreateWriteBatch(record_ts_sz, batch); - const WriteBatch* orig_batch = batch.get(); - WriteBatch orig_batch_copy(*batch); + WriteBatch batch; + CreateWriteBatch(record_ts_sz, &batch); // kVerifyConsistency doesn't tolerate inconsistency for running column // families. ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( - running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency, batch) + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency) .IsInvalidArgument()); + std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( - running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); - ASSERT_NE(orig_batch, batch.get()); - CheckContentsWithTimestampStripping(orig_batch_copy, *batch, sizeof(uint64_t), + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + ASSERT_TRUE(new_batch.get() != nullptr); + CheckContentsWithTimestampStripping(batch, *new_batch, sizeof(uint64_t), std::nullopt /* dropped_cf */); } @@ -274,22 +269,22 @@ TEST_F(HandleTimestampSizeDifferenceTest, // Make `record_ts_sz` not contain zero timestamp size entries to follow the // behavior of actual WAL log timestamp size record. std::unordered_map record_ts_sz; - std::unique_ptr batch(new WriteBatch()); - CreateWriteBatch({{1, 0}}, batch); - const WriteBatch* orig_batch = batch.get(); - WriteBatch orig_batch_copy(*batch); + WriteBatch batch; + CreateWriteBatch({{1, 0}}, &batch); // kVerifyConsistency doesn't tolerate inconsistency for running column // families. ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( - running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency, batch) + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency) .IsInvalidArgument()); + + std::unique_ptr new_batch(nullptr); ASSERT_OK(HandleWriteBatchTimestampSizeDifference( - running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); - ASSERT_NE(orig_batch, batch.get()); - CheckContentsWithTimestampPadding(orig_batch_copy, *batch, sizeof(uint64_t)); + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + ASSERT_TRUE(new_batch.get() != nullptr); + CheckContentsWithTimestampPadding(batch, *new_batch, sizeof(uint64_t)); } TEST_F(HandleTimestampSizeDifferenceTest, @@ -297,35 +292,35 @@ TEST_F(HandleTimestampSizeDifferenceTest, std::unordered_map running_ts_sz = {{1, 0}}; std::unordered_map record_ts_sz = {{1, sizeof(uint64_t)}, {2, sizeof(char)}}; - std::unique_ptr batch(new WriteBatch()); - CreateWriteBatch(record_ts_sz, batch); - const WriteBatch* orig_batch = batch.get(); - WriteBatch orig_batch_copy(*batch); + WriteBatch batch; + CreateWriteBatch(record_ts_sz, &batch); + std::unique_ptr new_batch(nullptr); // kReconcileInconsistency tolerate inconsistency for dropped column family // and all related entries copied over to the new WriteBatch. ASSERT_OK(HandleWriteBatchTimestampSizeDifference( - running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); - ASSERT_NE(orig_batch, batch.get()); - CheckContentsWithTimestampStripping(orig_batch_copy, *batch, sizeof(uint64_t), + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch)); + + ASSERT_TRUE(new_batch.get() != nullptr); + CheckContentsWithTimestampStripping(batch, *new_batch, sizeof(uint64_t), std::optional(2)); } TEST_F(HandleTimestampSizeDifferenceTest, UnrecoverableInconsistency) { std::unordered_map running_ts_sz = {{1, sizeof(char)}}; std::unordered_map record_ts_sz = {{1, sizeof(uint64_t)}}; - std::unique_ptr batch(new WriteBatch()); - CreateWriteBatch(record_ts_sz, batch); + WriteBatch batch; + CreateWriteBatch(record_ts_sz, &batch); ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( - running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kVerifyConsistency, batch) + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency) .IsInvalidArgument()); ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( - running_ts_sz, record_ts_sz, - TimestampSizeConsistencyMode::kReconcileInconsistency, batch) + &batch, running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency) .IsInvalidArgument()); } } // namespace ROCKSDB_NAMESPACE