diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index e58e6cd82..bd82e49e0 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -3273,6 +3273,76 @@ TEST_F(UpdateFullHistoryTsLowTest, ConcurrentUpdate) { Close(); } +// Tests the effect of flag `persist_user_defined_timestamps` on the file +// boundaries contained in the Manifest, a.k.a FileMetaData.smallest, +// FileMetaData.largest. +class HandleFileBoundariesTest + : public DBBasicTestWithTimestampBase, + public testing::WithParamInterface { + public: + HandleFileBoundariesTest() + : DBBasicTestWithTimestampBase("/handle_file_boundaries") {} +}; + +TEST_P(HandleFileBoundariesTest, ConfigurePersistUdt) { + Options options = CurrentOptions(); + options.env = env_; + // Write a timestamp that is not the min timestamp to help test the behavior + // of flag `persist_user_defined_timestamps`. + std::string write_ts = Timestamp(1, 0); + std::string min_ts = Timestamp(0, 0); + std::string smallest_ukey_without_ts = "bar"; + std::string largest_ukey_without_ts = "foo"; + const size_t kTimestampSize = write_ts.size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + bool persist_udt = test::ShouldPersistUDT(GetParam()); + options.persist_user_defined_timestamps = persist_udt; + DestroyAndReopen(options); + + ASSERT_OK( + db_->Put(WriteOptions(), smallest_ukey_without_ts, write_ts, "val1")); + ASSERT_OK( + db_->Put(WriteOptions(), largest_ukey_without_ts, write_ts, "val2")); + + // Create a L0 SST file and its record is added to the Manfiest. + ASSERT_OK(Flush()); + Close(); + + options.create_if_missing = false; + // Reopen the DB and process manifest file. + Reopen(options); + + std::vector> level_to_files; + dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(), + &level_to_files); + ASSERT_GT(level_to_files.size(), 1); + // L0 only has one SST file. + ASSERT_EQ(level_to_files[0].size(), 1); + auto file_meta = level_to_files[0][0]; + if (persist_udt) { + ASSERT_EQ(smallest_ukey_without_ts + write_ts, + file_meta.smallest.user_key()); + ASSERT_EQ(largest_ukey_without_ts + write_ts, file_meta.largest.user_key()); + } else { + // If `persist_user_defined_timestamps` is false, the file boundaries should + // have the min timestamp. Behind the scenes, when file boundaries in + // FileMetaData is persisted to Manifest, the original user-defined + // timestamps in user key are stripped. When manifest is read and processed + // during DB open, a min timestamp is padded to the file boundaries. This + // test's writes contain non min timestamp to verify this logic end-to-end. + ASSERT_EQ(smallest_ukey_without_ts + min_ts, file_meta.smallest.user_key()); + ASSERT_EQ(largest_ukey_without_ts + min_ts, file_meta.largest.user_key()); + } + Close(); +} + +INSTANTIATE_TEST_CASE_P( + ConfigurePersistUdt, HandleFileBoundariesTest, + ::testing::Values( + test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp, + test::UserDefinedTimestampTestMode::kNormal)); + TEST_F(DBBasicTestWithTimestamp, GCPreserveRangeTombstoneWhenNoOrSmallFullHistoryLow) { Options options = CurrentOptions(); diff --git a/db/version_edit.cc b/db/version_edit.cc index a9ef6aced..f5783eacd 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -93,7 +93,8 @@ void VersionEdit::Clear() { full_history_ts_low_.clear(); } -bool VersionEdit::EncodeTo(std::string* dst) const { +bool VersionEdit::EncodeTo(std::string* dst, + std::optional ts_sz) const { if (has_db_id_) { PutVarint32(dst, kDbId); PutLengthPrefixedSlice(dst, db_id_); @@ -133,6 +134,8 @@ bool VersionEdit::EncodeTo(std::string* dst) const { } bool min_log_num_written = false; + + assert(new_files_.empty() || ts_sz.has_value()); for (size_t i = 0; i < new_files_.size(); i++) { const FileMetaData& f = new_files_[i].second; if (!f.smallest.Valid() || !f.largest.Valid() || @@ -142,8 +145,7 @@ bool VersionEdit::EncodeTo(std::string* dst) const { PutVarint32(dst, kNewFile4); PutVarint32Varint64(dst, new_files_[i].first /* level */, f.fd.GetNumber()); PutVarint64(dst, f.fd.GetFileSize()); - PutLengthPrefixedSlice(dst, f.smallest.Encode()); - PutLengthPrefixedSlice(dst, f.largest.Encode()); + EncodeFileBoundaries(dst, f, ts_sz.value()); PutVarint64Varint64(dst, f.fd.smallest_seqno, f.fd.largest_seqno); // Customized fields' format: // +-----------------------------+ @@ -458,6 +460,23 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { return nullptr; } +void VersionEdit::EncodeFileBoundaries(std::string* dst, + const FileMetaData& meta, + size_t ts_sz) const { + if (ts_sz == 0 || meta.user_defined_timestamps_persisted) { + PutLengthPrefixedSlice(dst, meta.smallest.Encode()); + PutLengthPrefixedSlice(dst, meta.largest.Encode()); + return; + } + std::string smallest_buf; + std::string largest_buf; + StripTimestampFromInternalKey(&smallest_buf, meta.smallest.Encode(), ts_sz); + StripTimestampFromInternalKey(&largest_buf, meta.largest.Encode(), ts_sz); + PutLengthPrefixedSlice(dst, smallest_buf); + PutLengthPrefixedSlice(dst, largest_buf); + return; +}; + Status VersionEdit::DecodeFrom(const Slice& src) { Clear(); #ifndef NDEBUG diff --git a/db/version_edit.h b/db/version_edit.h index 01c013b6c..cedccb3a2 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -9,6 +9,7 @@ #pragma once #include +#include #include #include #include @@ -489,6 +490,8 @@ class VersionEdit { using NewFiles = std::vector>; const NewFiles& GetNewFiles() const { return new_files_; } + NewFiles& GetMutableNewFiles() { return new_files_; } + // Retrieve all the compact cursors using CompactCursors = std::vector>; const CompactCursors& GetCompactCursors() const { return compact_cursors_; } @@ -639,7 +642,17 @@ class VersionEdit { } // return true on success. - bool EncodeTo(std::string* dst) const; + // `ts_sz` is the size in bytes for the user-defined timestamp contained in + // a user key. This argument is optional because it's only required for + // encoding a `VersionEdit` with new SST files to add. It's used to handle the + // file boundaries: `smallest`, `largest` when + // `FileMetaData.user_defined_timestamps_persisted` is false. When reading + // the Manifest file, a mirroring change needed to handle + // file boundaries are not added to the `VersionEdit.DecodeFrom` function + // because timestamp size is not available at `VersionEdit` decoding time, + // it's instead added to `VersionEditHandler::OnNonCfOperation`. + bool EncodeTo(std::string* dst, + std::optional ts_sz = std::nullopt) const; Status DecodeFrom(const Slice& src); std::string DebugString(bool hex_key = false) const; @@ -660,6 +673,12 @@ class VersionEdit { const char* DecodeNewFile4From(Slice* input); + // Encode file boundaries `FileMetaData.smallest` and `FileMetaData.largest`. + // User-defined timestamps in the user key will be stripped if they shouldn't + // be persisted. + void EncodeFileBoundaries(std::string* dst, const FileMetaData& meta, + size_t ts_sz) const; + int max_level_ = 0; std::string db_id_; std::string comparator_; diff --git a/db/version_edit_handler.cc b/db/version_edit_handler.cc index d507c4b0c..965213b58 100644 --- a/db/version_edit_handler.cc +++ b/db/version_edit_handler.cc @@ -308,6 +308,17 @@ Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit, tmp_cfd = version_set_->GetColumnFamilySet()->GetColumnFamily( edit.column_family_); assert(tmp_cfd != nullptr); + // It's important to handle file boundaries before `MaybeCreateVersion` + // because `VersionEditHandlerPointInTime::MaybeCreateVersion` does + // `FileMetaData` verification that involves the file boundaries. + // All `VersionEditHandlerBase` subclasses that need to deal with + // `FileMetaData` for new files are also subclasses of + // `VersionEditHandler`, so it's sufficient to do the file boundaries + // handling in this method. + s = MaybeHandleFileBoundariesForNewFiles(edit, tmp_cfd); + if (!s.ok()) { + return s; + } s = MaybeCreateVersion(edit, tmp_cfd, /*force_create_version=*/false); if (s.ok()) { s = builder_iter->second->version_builder()->Apply(&edit); @@ -647,6 +658,47 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, return s; } +Status VersionEditHandler::MaybeHandleFileBoundariesForNewFiles( + VersionEdit& edit, const ColumnFamilyData* cfd) { + if (edit.GetNewFiles().empty()) { + return Status::OK(); + } + auto ucmp = cfd->user_comparator(); + assert(ucmp); + size_t ts_sz = ucmp->timestamp_size(); + if (ts_sz == 0) { + return Status::OK(); + } + + VersionEdit::NewFiles& new_files = edit.GetMutableNewFiles(); + assert(!new_files.empty()); + bool file_boundaries_need_handling = false; + for (auto& new_file : new_files) { + FileMetaData& meta = new_file.second; + if (meta.user_defined_timestamps_persisted) { + // `FileMetaData.user_defined_timestamps_persisted` field is the value of + // the flag `AdvancedColumnFamilyOptions.persist_user_defined_timestamps` + // at the time when the SST file was created. As a result, all added SST + // files in one `VersionEdit` should have the same value for it. + if (file_boundaries_need_handling) { + return Status::Corruption( + "New files in one VersionEdit has different " + "user_defined_timestamps_persisted value."); + } + break; + } + file_boundaries_need_handling = true; + std::string smallest_buf; + std::string largest_buf; + PadInternalKeyWithMinTimestamp(&smallest_buf, meta.smallest.Encode(), + ts_sz); + PadInternalKeyWithMinTimestamp(&largest_buf, meta.largest.Encode(), ts_sz); + meta.smallest.DecodeFrom(smallest_buf); + meta.largest.DecodeFrom(largest_buf); + } + return Status::OK(); +} + VersionEditHandlerPointInTime::VersionEditHandlerPointInTime( bool read_only, std::vector column_families, VersionSet* version_set, const std::shared_ptr& io_tracer, diff --git a/db/version_edit_handler.h b/db/version_edit_handler.h index 4b9f19542..54454cf70 100644 --- a/db/version_edit_handler.h +++ b/db/version_edit_handler.h @@ -206,6 +206,17 @@ class VersionEditHandler : public VersionEditHandlerBase { private: Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, const VersionEdit& edit); + + // When `FileMetaData.user_defined_timestamps_persisted` is false and + // user-defined timestamp size is non-zero. User-defined timestamps are + // stripped from file boundaries: `smallest`, `largest` in + // `VersionEdit.DecodeFrom` before they were written to Manifest. + // This is the mirroring change to handle file boundaries on the Manifest read + // path for this scenario: to pad a minimum timestamp to the user key in + // `smallest` and `largest` so their format are consistent with the running + // user comparator. + Status MaybeHandleFileBoundariesForNewFiles(VersionEdit& edit, + const ColumnFamilyData* cfd); }; // A class similar to its base class, i.e. VersionEditHandler. diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index a3bf6beff..f3473b476 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -21,12 +21,20 @@ namespace ROCKSDB_NAMESPACE { static void TestEncodeDecode(const VersionEdit& edit) { + // Encoding one `VersionEdit` and decoding it again should result in the + // exact same `VersionEdit`. However, a special handling is applied to file + // boundaries: `FileMetaData.smallest`, `FileMetaData.largest` when + // user-defined timestamps should not be persisted. In that scenario, this + // invariant does not hold. We disable this scenario in this util method to + // enable all other test cases continue to verify this invariant, while the + // special case is separately covered in test + // `EncodeDecodeNewFile4HandleFileBoundary`. std::string encoded, encoded2; - edit.EncodeTo(&encoded); + edit.EncodeTo(&encoded, 0 /* ts_sz */); VersionEdit parsed; Status s = parsed.DecodeFrom(encoded); ASSERT_TRUE(s.ok()) << s.ToString(); - parsed.EncodeTo(&encoded2); + parsed.EncodeTo(&encoded2, 0 /* ts_sz */); ASSERT_EQ(encoded, encoded2); } @@ -93,7 +101,7 @@ TEST_F(VersionEditTest, EncodeDecodeNewFile4) { TestEncodeDecode(edit); std::string encoded, encoded2; - edit.EncodeTo(&encoded); + edit.EncodeTo(&encoded, 0 /* ts_sz */); VersionEdit parsed; Status s = parsed.DecodeFrom(encoded); ASSERT_TRUE(s.ok()) << s.ToString(); @@ -119,6 +127,57 @@ TEST_F(VersionEditTest, EncodeDecodeNewFile4) { ASSERT_TRUE(new_files[3].second.user_defined_timestamps_persisted); } +TEST_F(VersionEditTest, EncodeDecodeNewFile4HandleFileBoundary) { + static const uint64_t kBig = 1ull << 50; + size_t ts_sz = 16; + static std::string min_ts(ts_sz, static_cast(0)); + VersionEdit edit; + std::string smallest = "foo"; + std::string largest = "zoo"; + // In real manifest writing scenarios, one `VersionEdit` should not contain + // files with different `user_defined_timestamps_persisted` flag value. + // This is just for testing file boundaries handling w.r.t persisting user + // defined timestamps during `VersionEdit` encoding. + edit.AddFile( + 3, 300, 3, 100, InternalKey(smallest + min_ts, kBig + 500, kTypeValue), + InternalKey(largest + min_ts, kBig + 600, kTypeDeletion), kBig + 500, + kBig + 600, true, Temperature::kUnknown, kInvalidBlobFileNumber, + kUnknownOldestAncesterTime, kUnknownFileCreationTime, + 300 /* epoch_number */, kUnknownFileChecksum, + kUnknownFileChecksumFuncName, kNullUniqueId64x2, + 0 /* compensated_range_deletion_size */, 0 /* tail_size */, + false /* user_defined_timestamps_persisted */); + edit.AddFile(3, 300, 3, 100, + InternalKey(smallest + min_ts, kBig + 500, kTypeValue), + InternalKey(largest + min_ts, kBig + 600, kTypeDeletion), + kBig + 500, kBig + 600, true, Temperature::kUnknown, + kInvalidBlobFileNumber, kUnknownOldestAncesterTime, + kUnknownFileCreationTime, 300 /* epoch_number */, + kUnknownFileChecksum, kUnknownFileChecksumFuncName, + kNullUniqueId64x2, 0 /* compensated_range_deletion_size */, + 0 /* tail_size */, true /* user_defined_timestamps_persisted */); + + std::string encoded; + edit.EncodeTo(&encoded, ts_sz); + VersionEdit parsed; + Status s = parsed.DecodeFrom(encoded); + ASSERT_TRUE(s.ok()) << s.ToString(); + auto& new_files = parsed.GetNewFiles(); + ASSERT_TRUE(new_files.size() == 2); + ASSERT_FALSE(new_files[0].second.user_defined_timestamps_persisted); + // First file's boundaries do not contain user-defined timestamps. + ASSERT_EQ(InternalKey(smallest, kBig + 500, kTypeValue).Encode(), + new_files[0].second.smallest.Encode()); + ASSERT_EQ(InternalKey(largest, kBig + 600, kTypeDeletion).Encode(), + new_files[0].second.largest.Encode()); + ASSERT_TRUE(new_files[1].second.user_defined_timestamps_persisted); + // Second file's boundaries contain user-defined timestamps. + ASSERT_EQ(InternalKey(smallest + min_ts, kBig + 500, kTypeValue).Encode(), + new_files[1].second.smallest.Encode()); + ASSERT_EQ(InternalKey(largest + min_ts, kBig + 600, kTypeDeletion).Encode(), + new_files[1].second.largest.Encode()); +} + TEST_F(VersionEditTest, ForwardCompatibleNewFile4) { static const uint64_t kBig = 1ull << 50; VersionEdit edit; @@ -158,7 +217,7 @@ TEST_F(VersionEditTest, ForwardCompatibleNewFile4) { } }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); - edit.EncodeTo(&encoded); + edit.EncodeTo(&encoded, 0 /* ts_sz */); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); VersionEdit parsed; @@ -198,7 +257,7 @@ TEST_F(VersionEditTest, NewFile4NotSupportedField) { PutLengthPrefixedSlice(str, str1); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); - edit.EncodeTo(&encoded); + edit.EncodeTo(&encoded, 0 /* ts_sz */); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); VersionEdit parsed; @@ -214,7 +273,7 @@ TEST_F(VersionEditTest, EncodeEmptyFile) { 1 /*epoch_number*/, kUnknownFileChecksum, kUnknownFileChecksumFuncName, kNullUniqueId64x2, 0, 0, true); std::string buffer; - ASSERT_TRUE(!edit.EncodeTo(&buffer)); + ASSERT_TRUE(!edit.EncodeTo(&buffer, 0 /* ts_sz */)); } TEST_F(VersionEditTest, ColumnFamilyTest) { @@ -583,7 +642,7 @@ TEST_F(VersionEditTest, IgnorableTags) { edit.SetColumnFamily(kColumnFamilyId); std::string encoded; - ASSERT_TRUE(edit.EncodeTo(&encoded)); + ASSERT_TRUE(edit.EncodeTo(&encoded, 0 /* ts_sz */)); VersionEdit decoded; ASSERT_OK(decoded.DecodeFrom(encoded)); diff --git a/db/version_set.cc b/db/version_set.cc index 8086b01e6..139043ae9 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -5118,6 +5118,12 @@ Status VersionSet::ProcessManifestWrites( assert(manifest_writers_.front() == &first_writer); autovector batch_edits; + // This vector keeps track of the corresponding user-defined timestamp size + // for `batch_edits` side by side, which is only needed for encoding a + // `VersionEdit` that adds new SST files. + // Note that anytime `batch_edits` has new element added or get existing + // element removed, `batch_edits_ts_sz` should be updated too. + autovector> batch_edits_ts_sz; autovector versions; autovector mutable_cf_options_ptrs; std::vector> builder_guards; @@ -5133,6 +5139,7 @@ Status VersionSet::ProcessManifestWrites( // No group commits for column family add or drop LogAndApplyCFHelper(first_writer.edit_list.front(), &max_last_sequence); batch_edits.push_back(first_writer.edit_list.front()); + batch_edits_ts_sz.push_back(std::nullopt); } else { auto it = manifest_writers_.cbegin(); size_t group_start = std::numeric_limits::max(); @@ -5209,6 +5216,9 @@ Status VersionSet::ProcessManifestWrites( TEST_SYNC_POINT_CALLBACK("VersionSet::ProcessManifestWrites:NewVersion", version); } + const Comparator* ucmp = last_writer->cfd->user_comparator(); + assert(ucmp); + std::optional edit_ts_sz = ucmp->timestamp_size(); for (const auto& e : last_writer->edit_list) { if (e->is_in_atomic_group_) { if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ || @@ -5229,6 +5239,7 @@ Status VersionSet::ProcessManifestWrites( return s; } batch_edits.push_back(e); + batch_edits_ts_sz.push_back(edit_ts_sz); } } for (int i = 0; i < static_cast(versions.size()); ++i) { @@ -5394,9 +5405,11 @@ Status VersionSet::ProcessManifestWrites( #ifndef NDEBUG size_t idx = 0; #endif - for (auto& e : batch_edits) { + assert(batch_edits.size() == batch_edits_ts_sz.size()); + for (size_t bidx = 0; bidx < batch_edits.size(); bidx++) { + auto& e = batch_edits[bidx]; std::string record; - if (!e->EncodeTo(&record)) { + if (!e->EncodeTo(&record, batch_edits_ts_sz[bidx])) { s = Status::Corruption("Unable to encode VersionEdit:" + e->DebugString(true)); break; @@ -6505,8 +6518,10 @@ Status VersionSet::WriteCurrentStateToManifest( edit.SetLastSequence(descriptor_last_sequence_); + const Comparator* ucmp = cfd->user_comparator(); + assert(ucmp); std::string record; - if (!edit.EncodeTo(&record)) { + if (!edit.EncodeTo(&record, ucmp->timestamp_size())) { return Status::Corruption("Unable to Encode VersionEdit:" + edit.DebugString(true)); } diff --git a/db/version_set_test.cc b/db/version_set_test.cc index a16cdc4c1..1d5971c59 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -3320,7 +3320,7 @@ class VersionSetTestMissingFiles : public VersionSetTestBase, ++last_seqno_; assert(log_writer_.get() != nullptr); std::string record; - ASSERT_TRUE(edit.EncodeTo(&record)); + ASSERT_TRUE(edit.EncodeTo(&record, 0 /* ts_sz */)); Status s = log_writer_->AddRecord(record); ASSERT_OK(s); }