Handle file boundaries when timestamps should not be persisted (#11578)

Summary:
Handle file boundaries `FileMetaData.smallest`, `FileMetaData.largest` for when `persist_user_defined_timestamps` is false:
    1) on the manifest write path, the original user-defined timestamps in file boundaries are stripped. This stripping is done during `VersionEdit::Encode` to limit the effect of the stripping to only the persisted version of the file boundaries.
    2) on the manifest read path during DB open, a a min timestamp is padded to the file boundaries. Ideally, this padding should happen during `VersionEdit::Decode` so that all in memory file boundaries have a compatible user key format as the running user comparator. However, because the user-defined timestamp size information is not available at that time. This change is added to `VersionEditHandler::OnNonCfOperation`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11578

Test Plan:
```
make all check
./version_edit_test --gtest_filter="*EncodeDecodeNewFile4HandleFileBoundary*".
./db_with_timestamp_basic_test --gtest_filter="*HandleFileBoundariesTest*"
```

Reviewed By: pdillinger

Differential Revision: D47309399

Pulled By: jowlyzhang

fbshipit-source-id: 21b4d54d2089a62826b31d779094a39cb2bbbd51
oxigraph-main
Yu Zhang 1 year ago committed by Facebook GitHub Bot
parent baf37a0e81
commit f74526341d
  1. 70
      db/db_with_timestamp_basic_test.cc
  2. 25
      db/version_edit.cc
  3. 21
      db/version_edit.h
  4. 52
      db/version_edit_handler.cc
  5. 11
      db/version_edit_handler.h
  6. 73
      db/version_edit_test.cc
  7. 21
      db/version_set.cc
  8. 2
      db/version_set_test.cc

@ -3273,6 +3273,76 @@ TEST_F(UpdateFullHistoryTsLowTest, ConcurrentUpdate) {
Close(); Close();
} }
// Tests the effect of flag `persist_user_defined_timestamps` on the file
// boundaries contained in the Manifest, a.k.a FileMetaData.smallest,
// FileMetaData.largest.
class HandleFileBoundariesTest
: public DBBasicTestWithTimestampBase,
public testing::WithParamInterface<test::UserDefinedTimestampTestMode> {
public:
HandleFileBoundariesTest()
: DBBasicTestWithTimestampBase("/handle_file_boundaries") {}
};
TEST_P(HandleFileBoundariesTest, ConfigurePersistUdt) {
Options options = CurrentOptions();
options.env = env_;
// Write a timestamp that is not the min timestamp to help test the behavior
// of flag `persist_user_defined_timestamps`.
std::string write_ts = Timestamp(1, 0);
std::string min_ts = Timestamp(0, 0);
std::string smallest_ukey_without_ts = "bar";
std::string largest_ukey_without_ts = "foo";
const size_t kTimestampSize = write_ts.size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
bool persist_udt = test::ShouldPersistUDT(GetParam());
options.persist_user_defined_timestamps = persist_udt;
DestroyAndReopen(options);
ASSERT_OK(
db_->Put(WriteOptions(), smallest_ukey_without_ts, write_ts, "val1"));
ASSERT_OK(
db_->Put(WriteOptions(), largest_ukey_without_ts, write_ts, "val2"));
// Create a L0 SST file and its record is added to the Manfiest.
ASSERT_OK(Flush());
Close();
options.create_if_missing = false;
// Reopen the DB and process manifest file.
Reopen(options);
std::vector<std::vector<FileMetaData>> level_to_files;
dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
&level_to_files);
ASSERT_GT(level_to_files.size(), 1);
// L0 only has one SST file.
ASSERT_EQ(level_to_files[0].size(), 1);
auto file_meta = level_to_files[0][0];
if (persist_udt) {
ASSERT_EQ(smallest_ukey_without_ts + write_ts,
file_meta.smallest.user_key());
ASSERT_EQ(largest_ukey_without_ts + write_ts, file_meta.largest.user_key());
} else {
// If `persist_user_defined_timestamps` is false, the file boundaries should
// have the min timestamp. Behind the scenes, when file boundaries in
// FileMetaData is persisted to Manifest, the original user-defined
// timestamps in user key are stripped. When manifest is read and processed
// during DB open, a min timestamp is padded to the file boundaries. This
// test's writes contain non min timestamp to verify this logic end-to-end.
ASSERT_EQ(smallest_ukey_without_ts + min_ts, file_meta.smallest.user_key());
ASSERT_EQ(largest_ukey_without_ts + min_ts, file_meta.largest.user_key());
}
Close();
}
INSTANTIATE_TEST_CASE_P(
ConfigurePersistUdt, HandleFileBoundariesTest,
::testing::Values(
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
test::UserDefinedTimestampTestMode::kNormal));
TEST_F(DBBasicTestWithTimestamp, TEST_F(DBBasicTestWithTimestamp,
GCPreserveRangeTombstoneWhenNoOrSmallFullHistoryLow) { GCPreserveRangeTombstoneWhenNoOrSmallFullHistoryLow) {
Options options = CurrentOptions(); Options options = CurrentOptions();

@ -93,7 +93,8 @@ void VersionEdit::Clear() {
full_history_ts_low_.clear(); full_history_ts_low_.clear();
} }
bool VersionEdit::EncodeTo(std::string* dst) const { bool VersionEdit::EncodeTo(std::string* dst,
std::optional<size_t> ts_sz) const {
if (has_db_id_) { if (has_db_id_) {
PutVarint32(dst, kDbId); PutVarint32(dst, kDbId);
PutLengthPrefixedSlice(dst, db_id_); PutLengthPrefixedSlice(dst, db_id_);
@ -133,6 +134,8 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
} }
bool min_log_num_written = false; bool min_log_num_written = false;
assert(new_files_.empty() || ts_sz.has_value());
for (size_t i = 0; i < new_files_.size(); i++) { for (size_t i = 0; i < new_files_.size(); i++) {
const FileMetaData& f = new_files_[i].second; const FileMetaData& f = new_files_[i].second;
if (!f.smallest.Valid() || !f.largest.Valid() || if (!f.smallest.Valid() || !f.largest.Valid() ||
@ -142,8 +145,7 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32(dst, kNewFile4); PutVarint32(dst, kNewFile4);
PutVarint32Varint64(dst, new_files_[i].first /* level */, f.fd.GetNumber()); PutVarint32Varint64(dst, new_files_[i].first /* level */, f.fd.GetNumber());
PutVarint64(dst, f.fd.GetFileSize()); PutVarint64(dst, f.fd.GetFileSize());
PutLengthPrefixedSlice(dst, f.smallest.Encode()); EncodeFileBoundaries(dst, f, ts_sz.value());
PutLengthPrefixedSlice(dst, f.largest.Encode());
PutVarint64Varint64(dst, f.fd.smallest_seqno, f.fd.largest_seqno); PutVarint64Varint64(dst, f.fd.smallest_seqno, f.fd.largest_seqno);
// Customized fields' format: // Customized fields' format:
// +-----------------------------+ // +-----------------------------+
@ -458,6 +460,23 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
return nullptr; return nullptr;
} }
void VersionEdit::EncodeFileBoundaries(std::string* dst,
const FileMetaData& meta,
size_t ts_sz) const {
if (ts_sz == 0 || meta.user_defined_timestamps_persisted) {
PutLengthPrefixedSlice(dst, meta.smallest.Encode());
PutLengthPrefixedSlice(dst, meta.largest.Encode());
return;
}
std::string smallest_buf;
std::string largest_buf;
StripTimestampFromInternalKey(&smallest_buf, meta.smallest.Encode(), ts_sz);
StripTimestampFromInternalKey(&largest_buf, meta.largest.Encode(), ts_sz);
PutLengthPrefixedSlice(dst, smallest_buf);
PutLengthPrefixedSlice(dst, largest_buf);
return;
};
Status VersionEdit::DecodeFrom(const Slice& src) { Status VersionEdit::DecodeFrom(const Slice& src) {
Clear(); Clear();
#ifndef NDEBUG #ifndef NDEBUG

@ -9,6 +9,7 @@
#pragma once #pragma once
#include <algorithm> #include <algorithm>
#include <optional>
#include <set> #include <set>
#include <string> #include <string>
#include <utility> #include <utility>
@ -489,6 +490,8 @@ class VersionEdit {
using NewFiles = std::vector<std::pair<int, FileMetaData>>; using NewFiles = std::vector<std::pair<int, FileMetaData>>;
const NewFiles& GetNewFiles() const { return new_files_; } const NewFiles& GetNewFiles() const { return new_files_; }
NewFiles& GetMutableNewFiles() { return new_files_; }
// Retrieve all the compact cursors // Retrieve all the compact cursors
using CompactCursors = std::vector<std::pair<int, InternalKey>>; using CompactCursors = std::vector<std::pair<int, InternalKey>>;
const CompactCursors& GetCompactCursors() const { return compact_cursors_; } const CompactCursors& GetCompactCursors() const { return compact_cursors_; }
@ -639,7 +642,17 @@ class VersionEdit {
} }
// return true on success. // return true on success.
bool EncodeTo(std::string* dst) const; // `ts_sz` is the size in bytes for the user-defined timestamp contained in
// a user key. This argument is optional because it's only required for
// encoding a `VersionEdit` with new SST files to add. It's used to handle the
// file boundaries: `smallest`, `largest` when
// `FileMetaData.user_defined_timestamps_persisted` is false. When reading
// the Manifest file, a mirroring change needed to handle
// file boundaries are not added to the `VersionEdit.DecodeFrom` function
// because timestamp size is not available at `VersionEdit` decoding time,
// it's instead added to `VersionEditHandler::OnNonCfOperation`.
bool EncodeTo(std::string* dst,
std::optional<size_t> ts_sz = std::nullopt) const;
Status DecodeFrom(const Slice& src); Status DecodeFrom(const Slice& src);
std::string DebugString(bool hex_key = false) const; std::string DebugString(bool hex_key = false) const;
@ -660,6 +673,12 @@ class VersionEdit {
const char* DecodeNewFile4From(Slice* input); const char* DecodeNewFile4From(Slice* input);
// Encode file boundaries `FileMetaData.smallest` and `FileMetaData.largest`.
// User-defined timestamps in the user key will be stripped if they shouldn't
// be persisted.
void EncodeFileBoundaries(std::string* dst, const FileMetaData& meta,
size_t ts_sz) const;
int max_level_ = 0; int max_level_ = 0;
std::string db_id_; std::string db_id_;
std::string comparator_; std::string comparator_;

@ -308,6 +308,17 @@ Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit,
tmp_cfd = version_set_->GetColumnFamilySet()->GetColumnFamily( tmp_cfd = version_set_->GetColumnFamilySet()->GetColumnFamily(
edit.column_family_); edit.column_family_);
assert(tmp_cfd != nullptr); assert(tmp_cfd != nullptr);
// It's important to handle file boundaries before `MaybeCreateVersion`
// because `VersionEditHandlerPointInTime::MaybeCreateVersion` does
// `FileMetaData` verification that involves the file boundaries.
// All `VersionEditHandlerBase` subclasses that need to deal with
// `FileMetaData` for new files are also subclasses of
// `VersionEditHandler`, so it's sufficient to do the file boundaries
// handling in this method.
s = MaybeHandleFileBoundariesForNewFiles(edit, tmp_cfd);
if (!s.ok()) {
return s;
}
s = MaybeCreateVersion(edit, tmp_cfd, /*force_create_version=*/false); s = MaybeCreateVersion(edit, tmp_cfd, /*force_create_version=*/false);
if (s.ok()) { if (s.ok()) {
s = builder_iter->second->version_builder()->Apply(&edit); s = builder_iter->second->version_builder()->Apply(&edit);
@ -647,6 +658,47 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
return s; return s;
} }
Status VersionEditHandler::MaybeHandleFileBoundariesForNewFiles(
VersionEdit& edit, const ColumnFamilyData* cfd) {
if (edit.GetNewFiles().empty()) {
return Status::OK();
}
auto ucmp = cfd->user_comparator();
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
if (ts_sz == 0) {
return Status::OK();
}
VersionEdit::NewFiles& new_files = edit.GetMutableNewFiles();
assert(!new_files.empty());
bool file_boundaries_need_handling = false;
for (auto& new_file : new_files) {
FileMetaData& meta = new_file.second;
if (meta.user_defined_timestamps_persisted) {
// `FileMetaData.user_defined_timestamps_persisted` field is the value of
// the flag `AdvancedColumnFamilyOptions.persist_user_defined_timestamps`
// at the time when the SST file was created. As a result, all added SST
// files in one `VersionEdit` should have the same value for it.
if (file_boundaries_need_handling) {
return Status::Corruption(
"New files in one VersionEdit has different "
"user_defined_timestamps_persisted value.");
}
break;
}
file_boundaries_need_handling = true;
std::string smallest_buf;
std::string largest_buf;
PadInternalKeyWithMinTimestamp(&smallest_buf, meta.smallest.Encode(),
ts_sz);
PadInternalKeyWithMinTimestamp(&largest_buf, meta.largest.Encode(), ts_sz);
meta.smallest.DecodeFrom(smallest_buf);
meta.largest.DecodeFrom(largest_buf);
}
return Status::OK();
}
VersionEditHandlerPointInTime::VersionEditHandlerPointInTime( VersionEditHandlerPointInTime::VersionEditHandlerPointInTime(
bool read_only, std::vector<ColumnFamilyDescriptor> column_families, bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
VersionSet* version_set, const std::shared_ptr<IOTracer>& io_tracer, VersionSet* version_set, const std::shared_ptr<IOTracer>& io_tracer,

@ -206,6 +206,17 @@ class VersionEditHandler : public VersionEditHandlerBase {
private: private:
Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd, Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
const VersionEdit& edit); const VersionEdit& edit);
// When `FileMetaData.user_defined_timestamps_persisted` is false and
// user-defined timestamp size is non-zero. User-defined timestamps are
// stripped from file boundaries: `smallest`, `largest` in
// `VersionEdit.DecodeFrom` before they were written to Manifest.
// This is the mirroring change to handle file boundaries on the Manifest read
// path for this scenario: to pad a minimum timestamp to the user key in
// `smallest` and `largest` so their format are consistent with the running
// user comparator.
Status MaybeHandleFileBoundariesForNewFiles(VersionEdit& edit,
const ColumnFamilyData* cfd);
}; };
// A class similar to its base class, i.e. VersionEditHandler. // A class similar to its base class, i.e. VersionEditHandler.

@ -21,12 +21,20 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
static void TestEncodeDecode(const VersionEdit& edit) { static void TestEncodeDecode(const VersionEdit& edit) {
// Encoding one `VersionEdit` and decoding it again should result in the
// exact same `VersionEdit`. However, a special handling is applied to file
// boundaries: `FileMetaData.smallest`, `FileMetaData.largest` when
// user-defined timestamps should not be persisted. In that scenario, this
// invariant does not hold. We disable this scenario in this util method to
// enable all other test cases continue to verify this invariant, while the
// special case is separately covered in test
// `EncodeDecodeNewFile4HandleFileBoundary`.
std::string encoded, encoded2; std::string encoded, encoded2;
edit.EncodeTo(&encoded); edit.EncodeTo(&encoded, 0 /* ts_sz */);
VersionEdit parsed; VersionEdit parsed;
Status s = parsed.DecodeFrom(encoded); Status s = parsed.DecodeFrom(encoded);
ASSERT_TRUE(s.ok()) << s.ToString(); ASSERT_TRUE(s.ok()) << s.ToString();
parsed.EncodeTo(&encoded2); parsed.EncodeTo(&encoded2, 0 /* ts_sz */);
ASSERT_EQ(encoded, encoded2); ASSERT_EQ(encoded, encoded2);
} }
@ -93,7 +101,7 @@ TEST_F(VersionEditTest, EncodeDecodeNewFile4) {
TestEncodeDecode(edit); TestEncodeDecode(edit);
std::string encoded, encoded2; std::string encoded, encoded2;
edit.EncodeTo(&encoded); edit.EncodeTo(&encoded, 0 /* ts_sz */);
VersionEdit parsed; VersionEdit parsed;
Status s = parsed.DecodeFrom(encoded); Status s = parsed.DecodeFrom(encoded);
ASSERT_TRUE(s.ok()) << s.ToString(); ASSERT_TRUE(s.ok()) << s.ToString();
@ -119,6 +127,57 @@ TEST_F(VersionEditTest, EncodeDecodeNewFile4) {
ASSERT_TRUE(new_files[3].second.user_defined_timestamps_persisted); ASSERT_TRUE(new_files[3].second.user_defined_timestamps_persisted);
} }
TEST_F(VersionEditTest, EncodeDecodeNewFile4HandleFileBoundary) {
static const uint64_t kBig = 1ull << 50;
size_t ts_sz = 16;
static std::string min_ts(ts_sz, static_cast<unsigned char>(0));
VersionEdit edit;
std::string smallest = "foo";
std::string largest = "zoo";
// In real manifest writing scenarios, one `VersionEdit` should not contain
// files with different `user_defined_timestamps_persisted` flag value.
// This is just for testing file boundaries handling w.r.t persisting user
// defined timestamps during `VersionEdit` encoding.
edit.AddFile(
3, 300, 3, 100, InternalKey(smallest + min_ts, kBig + 500, kTypeValue),
InternalKey(largest + min_ts, kBig + 600, kTypeDeletion), kBig + 500,
kBig + 600, true, Temperature::kUnknown, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
300 /* epoch_number */, kUnknownFileChecksum,
kUnknownFileChecksumFuncName, kNullUniqueId64x2,
0 /* compensated_range_deletion_size */, 0 /* tail_size */,
false /* user_defined_timestamps_persisted */);
edit.AddFile(3, 300, 3, 100,
InternalKey(smallest + min_ts, kBig + 500, kTypeValue),
InternalKey(largest + min_ts, kBig + 600, kTypeDeletion),
kBig + 500, kBig + 600, true, Temperature::kUnknown,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime, 300 /* epoch_number */,
kUnknownFileChecksum, kUnknownFileChecksumFuncName,
kNullUniqueId64x2, 0 /* compensated_range_deletion_size */,
0 /* tail_size */, true /* user_defined_timestamps_persisted */);
std::string encoded;
edit.EncodeTo(&encoded, ts_sz);
VersionEdit parsed;
Status s = parsed.DecodeFrom(encoded);
ASSERT_TRUE(s.ok()) << s.ToString();
auto& new_files = parsed.GetNewFiles();
ASSERT_TRUE(new_files.size() == 2);
ASSERT_FALSE(new_files[0].second.user_defined_timestamps_persisted);
// First file's boundaries do not contain user-defined timestamps.
ASSERT_EQ(InternalKey(smallest, kBig + 500, kTypeValue).Encode(),
new_files[0].second.smallest.Encode());
ASSERT_EQ(InternalKey(largest, kBig + 600, kTypeDeletion).Encode(),
new_files[0].second.largest.Encode());
ASSERT_TRUE(new_files[1].second.user_defined_timestamps_persisted);
// Second file's boundaries contain user-defined timestamps.
ASSERT_EQ(InternalKey(smallest + min_ts, kBig + 500, kTypeValue).Encode(),
new_files[1].second.smallest.Encode());
ASSERT_EQ(InternalKey(largest + min_ts, kBig + 600, kTypeDeletion).Encode(),
new_files[1].second.largest.Encode());
}
TEST_F(VersionEditTest, ForwardCompatibleNewFile4) { TEST_F(VersionEditTest, ForwardCompatibleNewFile4) {
static const uint64_t kBig = 1ull << 50; static const uint64_t kBig = 1ull << 50;
VersionEdit edit; VersionEdit edit;
@ -158,7 +217,7 @@ TEST_F(VersionEditTest, ForwardCompatibleNewFile4) {
} }
}); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
edit.EncodeTo(&encoded); edit.EncodeTo(&encoded, 0 /* ts_sz */);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
VersionEdit parsed; VersionEdit parsed;
@ -198,7 +257,7 @@ TEST_F(VersionEditTest, NewFile4NotSupportedField) {
PutLengthPrefixedSlice(str, str1); PutLengthPrefixedSlice(str, str1);
}); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
edit.EncodeTo(&encoded); edit.EncodeTo(&encoded, 0 /* ts_sz */);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
VersionEdit parsed; VersionEdit parsed;
@ -214,7 +273,7 @@ TEST_F(VersionEditTest, EncodeEmptyFile) {
1 /*epoch_number*/, kUnknownFileChecksum, 1 /*epoch_number*/, kUnknownFileChecksum,
kUnknownFileChecksumFuncName, kNullUniqueId64x2, 0, 0, true); kUnknownFileChecksumFuncName, kNullUniqueId64x2, 0, 0, true);
std::string buffer; std::string buffer;
ASSERT_TRUE(!edit.EncodeTo(&buffer)); ASSERT_TRUE(!edit.EncodeTo(&buffer, 0 /* ts_sz */));
} }
TEST_F(VersionEditTest, ColumnFamilyTest) { TEST_F(VersionEditTest, ColumnFamilyTest) {
@ -583,7 +642,7 @@ TEST_F(VersionEditTest, IgnorableTags) {
edit.SetColumnFamily(kColumnFamilyId); edit.SetColumnFamily(kColumnFamilyId);
std::string encoded; std::string encoded;
ASSERT_TRUE(edit.EncodeTo(&encoded)); ASSERT_TRUE(edit.EncodeTo(&encoded, 0 /* ts_sz */));
VersionEdit decoded; VersionEdit decoded;
ASSERT_OK(decoded.DecodeFrom(encoded)); ASSERT_OK(decoded.DecodeFrom(encoded));

@ -5118,6 +5118,12 @@ Status VersionSet::ProcessManifestWrites(
assert(manifest_writers_.front() == &first_writer); assert(manifest_writers_.front() == &first_writer);
autovector<VersionEdit*> batch_edits; autovector<VersionEdit*> batch_edits;
// This vector keeps track of the corresponding user-defined timestamp size
// for `batch_edits` side by side, which is only needed for encoding a
// `VersionEdit` that adds new SST files.
// Note that anytime `batch_edits` has new element added or get existing
// element removed, `batch_edits_ts_sz` should be updated too.
autovector<std::optional<size_t>> batch_edits_ts_sz;
autovector<Version*> versions; autovector<Version*> versions;
autovector<const MutableCFOptions*> mutable_cf_options_ptrs; autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards; std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
@ -5133,6 +5139,7 @@ Status VersionSet::ProcessManifestWrites(
// No group commits for column family add or drop // No group commits for column family add or drop
LogAndApplyCFHelper(first_writer.edit_list.front(), &max_last_sequence); LogAndApplyCFHelper(first_writer.edit_list.front(), &max_last_sequence);
batch_edits.push_back(first_writer.edit_list.front()); batch_edits.push_back(first_writer.edit_list.front());
batch_edits_ts_sz.push_back(std::nullopt);
} else { } else {
auto it = manifest_writers_.cbegin(); auto it = manifest_writers_.cbegin();
size_t group_start = std::numeric_limits<size_t>::max(); size_t group_start = std::numeric_limits<size_t>::max();
@ -5209,6 +5216,9 @@ Status VersionSet::ProcessManifestWrites(
TEST_SYNC_POINT_CALLBACK("VersionSet::ProcessManifestWrites:NewVersion", TEST_SYNC_POINT_CALLBACK("VersionSet::ProcessManifestWrites:NewVersion",
version); version);
} }
const Comparator* ucmp = last_writer->cfd->user_comparator();
assert(ucmp);
std::optional<size_t> edit_ts_sz = ucmp->timestamp_size();
for (const auto& e : last_writer->edit_list) { for (const auto& e : last_writer->edit_list) {
if (e->is_in_atomic_group_) { if (e->is_in_atomic_group_) {
if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ || if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
@ -5229,6 +5239,7 @@ Status VersionSet::ProcessManifestWrites(
return s; return s;
} }
batch_edits.push_back(e); batch_edits.push_back(e);
batch_edits_ts_sz.push_back(edit_ts_sz);
} }
} }
for (int i = 0; i < static_cast<int>(versions.size()); ++i) { for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
@ -5394,9 +5405,11 @@ Status VersionSet::ProcessManifestWrites(
#ifndef NDEBUG #ifndef NDEBUG
size_t idx = 0; size_t idx = 0;
#endif #endif
for (auto& e : batch_edits) { assert(batch_edits.size() == batch_edits_ts_sz.size());
for (size_t bidx = 0; bidx < batch_edits.size(); bidx++) {
auto& e = batch_edits[bidx];
std::string record; std::string record;
if (!e->EncodeTo(&record)) { if (!e->EncodeTo(&record, batch_edits_ts_sz[bidx])) {
s = Status::Corruption("Unable to encode VersionEdit:" + s = Status::Corruption("Unable to encode VersionEdit:" +
e->DebugString(true)); e->DebugString(true));
break; break;
@ -6505,8 +6518,10 @@ Status VersionSet::WriteCurrentStateToManifest(
edit.SetLastSequence(descriptor_last_sequence_); edit.SetLastSequence(descriptor_last_sequence_);
const Comparator* ucmp = cfd->user_comparator();
assert(ucmp);
std::string record; std::string record;
if (!edit.EncodeTo(&record)) { if (!edit.EncodeTo(&record, ucmp->timestamp_size())) {
return Status::Corruption("Unable to Encode VersionEdit:" + return Status::Corruption("Unable to Encode VersionEdit:" +
edit.DebugString(true)); edit.DebugString(true));
} }

@ -3320,7 +3320,7 @@ class VersionSetTestMissingFiles : public VersionSetTestBase,
++last_seqno_; ++last_seqno_;
assert(log_writer_.get() != nullptr); assert(log_writer_.get() != nullptr);
std::string record; std::string record;
ASSERT_TRUE(edit.EncodeTo(&record)); ASSERT_TRUE(edit.EncodeTo(&record, 0 /* ts_sz */));
Status s = log_writer_->AddRecord(record); Status s = log_writer_->AddRecord(record);
ASSERT_OK(s); ASSERT_OK(s);
} }

Loading…
Cancel
Save