Logging timestamp size record in WAL and use it during recovery (#11471)

Summary:
Start logging the timestamp size record in WAL and use the record during recovery.  Currently, user comparator cannot be different from what was used to create a column family, so the timestamp size record is just used to confirm it's consistent with the timestamp size the running user comparator indicates.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11471

Test Plan:
```
make all check
./db_secondary_test
./db_wal_test --gtest_filter="*WithTimestamp*"
./repair_test --gtest_filter="*WithTimestamp*"
```

Reviewed By: ltamasi

Differential Revision: D46236769

Pulled By: jowlyzhang

fbshipit-source-id: f6c60b5c8defdb05021c63df302ccc0be1275ad0
oxigraph-main
Yu Zhang 1 year ago committed by Facebook GitHub Bot
parent 8848ec92dd
commit 56ca9e3106
  1. 12
      db/column_family.cc
  2. 19
      db/column_family.h
  3. 15
      db/db_impl/db_impl_open.cc
  4. 11
      db/db_impl/db_impl_secondary.cc
  5. 8
      db/db_impl/db_impl_write.cc
  6. 91
      db/db_wal_test.cc
  7. 14
      db/repair.cc
  8. 76
      db/repair_test.cc
  9. 11
      db/version_set.h
  10. 1
      unreleased_history/new_features/logging_udt_sizes.md
  11. 17
      util/udt_util.cc
  12. 13
      util/udt_util.h
  13. 133
      util/udt_util_test.cc

@ -1621,6 +1621,13 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
db_id_, db_session_id_); db_id_, db_session_id_);
column_families_.insert({name, id}); column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd}); column_family_data_.insert({id, new_cfd});
auto ucmp = new_cfd->user_comparator();
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
running_ts_sz_.insert({id, ts_sz});
if (ts_sz > 0) {
ts_sz_for_record_.insert({id, ts_sz});
}
max_column_family_ = std::max(max_column_family_, id); max_column_family_ = std::max(max_column_family_, id);
// add to linked list // add to linked list
new_cfd->next_ = dummy_cfd_; new_cfd->next_ = dummy_cfd_;
@ -1636,10 +1643,13 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
// under a DB mutex AND from a write thread // under a DB mutex AND from a write thread
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) { void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
auto cfd_iter = column_family_data_.find(cfd->GetID()); uint32_t cf_id = cfd->GetID();
auto cfd_iter = column_family_data_.find(cf_id);
assert(cfd_iter != column_family_data_.end()); assert(cfd_iter != column_family_data_.end());
column_family_data_.erase(cfd_iter); column_family_data_.erase(cfd_iter);
column_families_.erase(cfd->GetName()); column_families_.erase(cfd->GetName());
running_ts_sz_.erase(cf_id);
ts_sz_for_record_.erase(cf_id);
} }
// under a DB mutex OR from a write thread // under a DB mutex OR from a write thread

@ -705,6 +705,16 @@ class ColumnFamilySet {
Version* dummy_version, Version* dummy_version,
const ColumnFamilyOptions& options); const ColumnFamilyOptions& options);
const std::unordered_map<uint32_t, size_t>&
GetRunningColumnFamiliesTimestampSize() const {
return running_ts_sz_;
}
const std::unordered_map<uint32_t, size_t>&
GetColumnFamiliesTimestampSizeForRecord() const {
return ts_sz_for_record_;
}
iterator begin() { return iterator(dummy_cfd_->next_); } iterator begin() { return iterator(dummy_cfd_->next_); }
iterator end() { return iterator(dummy_cfd_); } iterator end() { return iterator(dummy_cfd_); }
@ -730,6 +740,15 @@ class ColumnFamilySet {
UnorderedMap<std::string, uint32_t> column_families_; UnorderedMap<std::string, uint32_t> column_families_;
UnorderedMap<uint32_t, ColumnFamilyData*> column_family_data_; UnorderedMap<uint32_t, ColumnFamilyData*> column_family_data_;
// Mutating / reading `running_ts_sz_` and `ts_sz_for_record_` follow
// the same requirements as `column_families_` and `column_family_data_`.
// Mapping from column family id to user-defined timestamp size for all
// running column families.
std::unordered_map<uint32_t, size_t> running_ts_sz_;
// Mapping from column family id to user-defined timestamp size for
// column families with non-zero user-defined timestamp size.
std::unordered_map<uint32_t, size_t> ts_sz_for_record_;
uint32_t max_column_family_; uint32_t max_column_family_;
const FileOptions file_options_; const FileOptions file_options_;

@ -25,6 +25,7 @@
#include "rocksdb/wal_filter.h" #include "rocksdb/wal_filter.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/rate_limiter_impl.h" #include "util/rate_limiter_impl.h"
#include "util/udt_util.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
Options SanitizeOptions(const std::string& dbname, const Options& src, Options SanitizeOptions(const std::string& dbname, const Options& src,
@ -1186,6 +1187,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
std::string scratch; std::string scratch;
Slice record; Slice record;
const std::unordered_map<uint32_t, size_t>& running_ts_sz =
versions_->GetRunningColumnFamiliesTimestampSize();
TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal", TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
/*arg=*/nullptr); /*arg=*/nullptr);
uint64_t record_checksum; uint64_t record_checksum;
@ -1208,6 +1212,17 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
const std::unordered_map<uint32_t, size_t>& record_ts_sz =
reader.GetRecordedTimestampSize();
// TODO(yuzhangyu): update mode to kReconcileInconsistency when user
// comparator can be changed.
status = HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency);
if (!status.ok()) {
return status;
}
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch", &batch); "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch", &batch);
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(

@ -198,6 +198,9 @@ Status DBImplSecondary::RecoverLogFiles(
} }
assert(reader != nullptr); assert(reader != nullptr);
} }
const std::unordered_map<uint32_t, size_t>& running_ts_sz =
versions_->GetRunningColumnFamiliesTimestampSize();
for (auto log_number : log_numbers) { for (auto log_number : log_numbers) {
auto it = log_readers_.find(log_number); auto it = log_readers_.find(log_number);
assert(it != log_readers_.end()); assert(it != log_readers_.end());
@ -225,6 +228,14 @@ Status DBImplSecondary::RecoverLogFiles(
if (!status.ok()) { if (!status.ok()) {
break; break;
} }
const std::unordered_map<uint32_t, size_t>& record_ts_sz =
reader->GetRecordedTimestampSize();
status = HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency);
if (!status.ok()) {
break;
}
SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch); SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch);
std::vector<uint32_t> column_family_ids; std::vector<uint32_t> column_family_ids;
status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids); status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids);

@ -1328,7 +1328,13 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
if (UNLIKELY(needs_locking)) { if (UNLIKELY(needs_locking)) {
log_write_mutex_.Lock(); log_write_mutex_.Lock();
} }
IOStatus io_s = log_writer->AddRecord(log_entry, rate_limiter_priority); IOStatus io_s = log_writer->MaybeAddUserDefinedTimestampSizeRecord(
versions_->GetColumnFamiliesTimestampSizeForRecord(),
rate_limiter_priority);
if (!io_s.ok()) {
return io_s;
}
io_s = log_writer->AddRecord(log_entry, rate_limiter_priority);
if (UNLIKELY(needs_locking)) { if (UNLIKELY(needs_locking)) {
log_write_mutex_.Unlock(); log_write_mutex_.Unlock();

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "db/db_with_timestamp_test_util.h"
#include "options/options_helper.h" #include "options/options_helper.h"
#include "port/port.h" #include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
@ -301,6 +302,96 @@ TEST_F(DBWALTest, Recover) {
} while (ChangeWalOptions()); } while (ChangeWalOptions());
} }
class DBWALTestWithTimestamp : public DBBasicTestWithTimestampBase {
public:
DBWALTestWithTimestamp()
: DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {}
Status CreateAndReopenWithCFWithTs(const std::vector<std::string>& cfs,
const Options& options) {
CreateColumnFamilies(cfs, options);
return ReopenColumnFamiliesWithTs(cfs, options);
}
Status ReopenColumnFamiliesWithTs(const std::vector<std::string>& cfs,
Options ts_options) {
Options default_options = CurrentOptions();
default_options.create_if_missing = false;
ts_options.create_if_missing = false;
std::vector<Options> cf_options(cfs.size(), ts_options);
std::vector<std::string> cfs_plus_default = cfs;
cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
cf_options.insert(cf_options.begin(), default_options);
Close();
return TryReopenWithColumnFamilies(cfs_plus_default, cf_options);
}
Status Put(uint32_t cf, const Slice& key, const Slice& ts,
const Slice& value) {
WriteOptions write_opts;
return db_->Put(write_opts, handles_[cf], key, ts, value);
}
void CheckGet(const ReadOptions& read_opts, uint32_t cf, const Slice& key,
const std::string& expected_value) {
std::string actual_value;
ASSERT_OK(db_->Get(read_opts, handles_[cf], key, &actual_value));
ASSERT_EQ(expected_value, actual_value);
}
};
TEST_F(DBWALTestWithTimestamp, Recover) {
// Set up the option that enables user defined timestmp size.
std::string ts = Timestamp(1, 0);
const size_t kTimestampSize = ts.size();
TestComparator test_cmp(kTimestampSize);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = &test_cmp;
ReadOptions read_opts;
Slice ts_slice = ts;
read_opts.timestamp = &ts_slice;
do {
ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options));
ASSERT_OK(Put(1, "foo", ts, "v1"));
ASSERT_OK(Put(1, "baz", ts, "v5"));
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options));
CheckGet(read_opts, 1, "foo", "v1");
CheckGet(read_opts, 1, "baz", "v5");
ASSERT_OK(Put(1, "bar", ts, "v2"));
ASSERT_OK(Put(1, "foo", ts, "v3"));
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options));
CheckGet(read_opts, 1, "foo", "v3");
ASSERT_OK(Put(1, "foo", ts, "v4"));
CheckGet(read_opts, 1, "foo", "v4");
CheckGet(read_opts, 1, "bar", "v2");
CheckGet(read_opts, 1, "baz", "v5");
} while (ChangeWalOptions());
}
TEST_F(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) {
// Set up the option that enables user defined timestmp size.
std::string ts = Timestamp(1, 0);
const size_t kTimestampSize = ts.size();
TestComparator test_cmp(kTimestampSize);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = &test_cmp;
ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options));
ASSERT_OK(Put(1, "foo", ts, "v1"));
ASSERT_OK(Put(1, "baz", ts, "v5"));
TestComparator diff_test_cmp(kTimestampSize + 1);
ts_options.comparator = &diff_test_cmp;
ASSERT_TRUE(
ReopenColumnFamiliesWithTs({"pikachu"}, ts_options).IsInvalidArgument());
}
TEST_F(DBWALTest, RecoverWithTableHandle) { TEST_F(DBWALTest, RecoverWithTableHandle) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();

@ -394,9 +394,12 @@ class Repairer {
auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet()); auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet());
// Read all the records and add to a memtable // Read all the records and add to a memtable
const std::unordered_map<uint32_t, size_t>& running_ts_sz =
vset_.GetRunningColumnFamiliesTimestampSize();
std::string scratch; std::string scratch;
Slice record; Slice record;
WriteBatch batch; WriteBatch batch;
int counter = 0; int counter = 0;
while (reader.ReadRecord(&record, &scratch)) { while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < WriteBatchInternal::kHeader) { if (record.size() < WriteBatchInternal::kHeader) {
@ -406,8 +409,15 @@ class Repairer {
} }
Status record_status = WriteBatchInternal::SetContents(&batch, record); Status record_status = WriteBatchInternal::SetContents(&batch, record);
if (record_status.ok()) { if (record_status.ok()) {
record_status = const std::unordered_map<uint32_t, size_t>& record_ts_sz =
WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr); reader.GetRecordedTimestampSize();
record_status = HandleWriteBatchTimestampSizeDifference(
&batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency);
if (record_status.ok()) {
record_status =
WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr);
}
} }
if (record_status.ok()) { if (record_status.ok()) {
counter += WriteBatchInternal::Count(&batch); counter += WriteBatchInternal::Count(&batch);

@ -3,17 +3,17 @@
// COPYING file in the root directory) and Apache 2.0 License // COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/options.h"
#include <algorithm> #include <algorithm>
#include <string> #include <string>
#include <vector> #include <vector>
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "db/db_with_timestamp_test_util.h"
#include "file/file_util.h" #include "file/file_util.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "table/unique_id_impl.h" #include "table/unique_id_impl.h"
#include "util/string_util.h" #include "util/string_util.h"
@ -315,6 +315,78 @@ TEST_F(RepairTest, UnflushedSst) {
ASSERT_EQ(Get("key"), "val"); ASSERT_EQ(Get("key"), "val");
} }
class RepairTestWithTimestamp : public DBBasicTestWithTimestampBase {
public:
RepairTestWithTimestamp()
: DBBasicTestWithTimestampBase("repair_test_with_timestamp") {}
Status Put(const Slice& key, const Slice& ts, const Slice& value) {
WriteOptions write_opts;
return db_->Put(write_opts, handles_[0], key, ts, value);
}
void CheckGet(const ReadOptions& read_opts, const Slice& key,
const std::string& expected_value) {
std::string actual_value;
ASSERT_OK(db_->Get(read_opts, handles_[0], key, &actual_value));
ASSERT_EQ(expected_value, actual_value);
}
};
TEST_F(RepairTestWithTimestamp, UnflushedSst) {
Destroy(last_options_);
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
std::string ts = Timestamp(0, 0);
const size_t kTimestampSize = ts.size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
ASSERT_OK(DB::Open(options, dbname_, column_families, &handles_, &db_));
ASSERT_OK(Put("key", ts, "val"));
VectorLogPtr wal_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));
ASSERT_EQ(wal_files.size(), 1);
{
uint64_t total_ssts_size;
std::unordered_map<std::string, uint64_t> sst_files;
ASSERT_OK(GetAllDataFiles(kTableFile, &sst_files, &total_ssts_size));
ASSERT_EQ(total_ssts_size, 0);
}
// Need to get path before Close() deletes db_, but delete it after Close() to
// ensure Close() didn't change the manifest.
std::string manifest_path =
DescriptorFileName(dbname_, dbfull()->TEST_Current_Manifest_FileNo());
Close();
ASSERT_OK(env_->FileExists(manifest_path));
ASSERT_OK(env_->DeleteFile(manifest_path));
ASSERT_OK(RepairDB(dbname_, options));
ASSERT_OK(DB::Open(options, dbname_, column_families, &handles_, &db_));
ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));
ASSERT_EQ(wal_files.size(), 0);
{
uint64_t total_ssts_size;
std::unordered_map<std::string, uint64_t> sst_files;
ASSERT_OK(GetAllDataFiles(kTableFile, &sst_files, &total_ssts_size));
ASSERT_GT(total_ssts_size, 0);
}
ReadOptions read_opts;
Slice read_ts_slice = ts;
read_opts.timestamp = &read_ts_slice;
CheckGet(read_opts, "key", "val");
}
TEST_F(RepairTest, SeparateWalDir) { TEST_F(RepairTest, SeparateWalDir) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();

@ -1467,6 +1467,17 @@ class VersionSet {
uint64_t min_pending_output); uint64_t min_pending_output);
ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); } ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
const std::unordered_map<uint32_t, size_t>&
GetRunningColumnFamiliesTimestampSize() const {
return column_family_set_->GetRunningColumnFamiliesTimestampSize();
}
const std::unordered_map<uint32_t, size_t>&
GetColumnFamiliesTimestampSizeForRecord() const {
return column_family_set_->GetColumnFamiliesTimestampSizeForRecord();
}
RefedColumnFamilySet GetRefedColumnFamilySet() { RefedColumnFamilySet GetRefedColumnFamilySet() {
return RefedColumnFamilySet(GetColumnFamilySet()); return RefedColumnFamilySet(GetColumnFamilySet());
} }

@ -0,0 +1 @@
Start logging non-zero user-defined timestamp sizes in WAL to signal user key format in subsequent records and use it during recovery. This change will break recovery from WAL files written by early versions that contain user-defined timestamps. The workaround is to ensure there are no WAL files to recover (i.e. by flushing before close) before upgrade.

@ -108,7 +108,8 @@ TimestampRecoveryHandler::TimestampRecoveryHandler(
: running_ts_sz_(running_ts_sz), : running_ts_sz_(running_ts_sz),
record_ts_sz_(record_ts_sz), record_ts_sz_(record_ts_sz),
new_batch_(new WriteBatch()), new_batch_(new WriteBatch()),
handler_valid_(true) {} handler_valid_(true),
new_batch_diff_from_orig_batch_(false) {}
Status TimestampRecoveryHandler::PutCF(uint32_t cf, const Slice& key, Status TimestampRecoveryHandler::PutCF(uint32_t cf, const Slice& key,
const Slice& value) { const Slice& value) {
@ -214,10 +215,12 @@ Status TimestampRecoveryHandler::ReconcileTimestampDiscrepancy(
case RecoveryType::kStripTimestamp: case RecoveryType::kStripTimestamp:
assert(record_ts_sz.has_value()); assert(record_ts_sz.has_value());
*new_key = StripTimestampFromUserKey(key, record_ts_sz.value()); *new_key = StripTimestampFromUserKey(key, record_ts_sz.value());
new_batch_diff_from_orig_batch_ = true;
break; break;
case RecoveryType::kPadTimestamp: case RecoveryType::kPadTimestamp:
AppendKeyWithMinTimestamp(new_key_buf, key, running_ts_sz); AppendKeyWithMinTimestamp(new_key_buf, key, running_ts_sz);
*new_key = *new_key_buf; *new_key = *new_key_buf;
new_batch_diff_from_orig_batch_ = true;
break; break;
case RecoveryType::kUnrecoverable: case RecoveryType::kUnrecoverable:
return Status::InvalidArgument( return Status::InvalidArgument(
@ -230,28 +233,30 @@ Status TimestampRecoveryHandler::ReconcileTimestampDiscrepancy(
} }
Status HandleWriteBatchTimestampSizeDifference( Status HandleWriteBatchTimestampSizeDifference(
const WriteBatch* batch,
const std::unordered_map<uint32_t, size_t>& running_ts_sz, const std::unordered_map<uint32_t, size_t>& running_ts_sz,
const std::unordered_map<uint32_t, size_t>& record_ts_sz, const std::unordered_map<uint32_t, size_t>& record_ts_sz,
TimestampSizeConsistencyMode check_mode, TimestampSizeConsistencyMode check_mode,
std::unique_ptr<WriteBatch>& batch) { std::unique_ptr<WriteBatch>* new_batch) {
// Quick path to bypass checking the WriteBatch. // Quick path to bypass checking the WriteBatch.
if (AllRunningColumnFamiliesConsistent(running_ts_sz, record_ts_sz)) { if (AllRunningColumnFamiliesConsistent(running_ts_sz, record_ts_sz)) {
return Status::OK(); return Status::OK();
} }
bool need_recovery = false; bool need_recovery = false;
Status status = CheckWriteBatchTimestampSizeConsistency( Status status = CheckWriteBatchTimestampSizeConsistency(
batch.get(), running_ts_sz, record_ts_sz, check_mode, &need_recovery); batch, running_ts_sz, record_ts_sz, check_mode, &need_recovery);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} else if (need_recovery) { } else if (need_recovery) {
SequenceNumber sequence = WriteBatchInternal::Sequence(batch.get()); assert(new_batch);
SequenceNumber sequence = WriteBatchInternal::Sequence(batch);
TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz); TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz);
status = batch->Iterate(&recovery_handler); status = batch->Iterate(&recovery_handler);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} else { } else {
batch = recovery_handler.TransferNewBatch(); *new_batch = recovery_handler.TransferNewBatch();
WriteBatchInternal::SetSequence(batch.get(), sequence); WriteBatchInternal::SetSequence(new_batch->get(), sequence);
} }
} }
return Status::OK(); return Status::OK();

@ -143,6 +143,7 @@ class TimestampRecoveryHandler : public WriteBatch::Handler {
Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); } Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); }
std::unique_ptr<WriteBatch>&& TransferNewBatch() { std::unique_ptr<WriteBatch>&& TransferNewBatch() {
assert(new_batch_diff_from_orig_batch_);
handler_valid_ = false; handler_valid_ = false;
return std::move(new_batch_); return std::move(new_batch_);
} }
@ -164,6 +165,10 @@ class TimestampRecoveryHandler : public WriteBatch::Handler {
// Handler is valid upon creation and becomes invalid after its `new_batch_` // Handler is valid upon creation and becomes invalid after its `new_batch_`
// is transferred. // is transferred.
bool handler_valid_; bool handler_valid_;
// False upon creation, and become true if at least one user key from the
// original batch is updated when creating the new batch.
bool new_batch_diff_from_orig_batch_;
}; };
// Mode for checking and handling timestamp size inconsistency encountered in a // Mode for checking and handling timestamp size inconsistency encountered in a
@ -193,8 +198,9 @@ enum class TimestampSizeConsistencyMode {
// any running column family has an inconsistent user-defined timestamp size // any running column family has an inconsistent user-defined timestamp size
// that cannot be reconciled with a best-effort recovery. Check // that cannot be reconciled with a best-effort recovery. Check
// `TimestampRecoveryHandler` for what a best-effort recovery is capable of. In // `TimestampRecoveryHandler` for what a best-effort recovery is capable of. In
// this mode, a new WriteBatch is created on the heap and transferred to `batch` // this mode, output argument `new_batch` should be set, a new WriteBatch is
// if there is tolerable inconsistency. // created on the heap and transferred to `new_batch` if there is tolerable
// inconsistency.
// //
// An invariant that WAL logging ensures is that all timestamp size info // An invariant that WAL logging ensures is that all timestamp size info
// is logged prior to a WriteBatch that needed this info. And zero timestamp // is logged prior to a WriteBatch that needed this info. And zero timestamp
@ -204,8 +210,9 @@ enum class TimestampSizeConsistencyMode {
// `running_ts_sz` should contain the timestamp size for all running column // `running_ts_sz` should contain the timestamp size for all running column
// families including the ones with zero timestamp size. // families including the ones with zero timestamp size.
Status HandleWriteBatchTimestampSizeDifference( Status HandleWriteBatchTimestampSizeDifference(
const WriteBatch* batch,
const std::unordered_map<uint32_t, size_t>& running_ts_sz, const std::unordered_map<uint32_t, size_t>& running_ts_sz,
const std::unordered_map<uint32_t, size_t>& record_ts_sz, const std::unordered_map<uint32_t, size_t>& record_ts_sz,
TimestampSizeConsistencyMode check_mode, TimestampSizeConsistencyMode check_mode,
std::unique_ptr<WriteBatch>& batch); std::unique_ptr<WriteBatch>* new_batch = nullptr);
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -106,18 +106,17 @@ class HandleTimestampSizeDifferenceTest : public testing::Test {
void CreateWriteBatch( void CreateWriteBatch(
const std::unordered_map<uint32_t, size_t>& ts_sz_for_batch, const std::unordered_map<uint32_t, size_t>& ts_sz_for_batch,
std::unique_ptr<WriteBatch>& batch) { WriteBatch* batch) {
for (const auto& [cf_id, ts_sz] : ts_sz_for_batch) { for (const auto& [cf_id, ts_sz] : ts_sz_for_batch) {
std::string key; std::string key;
CreateKey(&key, ts_sz); CreateKey(&key, ts_sz);
ASSERT_OK(WriteBatchInternal::Put(batch, cf_id, key, kValuePlaceHolder));
ASSERT_OK(WriteBatchInternal::Delete(batch, cf_id, key));
ASSERT_OK(WriteBatchInternal::SingleDelete(batch, cf_id, key));
ASSERT_OK(WriteBatchInternal::DeleteRange(batch, cf_id, key, key));
ASSERT_OK( ASSERT_OK(
WriteBatchInternal::Put(batch.get(), cf_id, key, kValuePlaceHolder)); WriteBatchInternal::Merge(batch, cf_id, key, kValuePlaceHolder));
ASSERT_OK(WriteBatchInternal::Delete(batch.get(), cf_id, key)); ASSERT_OK(WriteBatchInternal::PutBlobIndex(batch, cf_id, key,
ASSERT_OK(WriteBatchInternal::SingleDelete(batch.get(), cf_id, key));
ASSERT_OK(WriteBatchInternal::DeleteRange(batch.get(), cf_id, key, key));
ASSERT_OK(WriteBatchInternal::Merge(batch.get(), cf_id, key,
kValuePlaceHolder));
ASSERT_OK(WriteBatchInternal::PutBlobIndex(batch.get(), cf_id, key,
kValuePlaceHolder)); kValuePlaceHolder));
} }
} }
@ -189,19 +188,18 @@ TEST_F(HandleTimestampSizeDifferenceTest, AllColumnFamiliesConsistent) {
std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, sizeof(uint64_t)}, std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, sizeof(uint64_t)},
{2, 0}}; {2, 0}};
std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)}}; std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)}};
std::unique_ptr<WriteBatch> batch(new WriteBatch()); WriteBatch batch;
CreateWriteBatch(running_ts_sz, batch); CreateWriteBatch(running_ts_sz, &batch);
const WriteBatch* orig_batch = batch.get();
// All `check_mode` pass with OK status and `batch` not checked or updated. // All `check_mode` pass with OK status and `batch` not checked or updated.
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency, batch)); TimestampSizeConsistencyMode::kVerifyConsistency));
ASSERT_EQ(orig_batch, batch.get()); std::unique_ptr<WriteBatch> new_batch(nullptr);
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch));
ASSERT_EQ(orig_batch, batch.get()); ASSERT_TRUE(new_batch.get() == nullptr);
} }
TEST_F(HandleTimestampSizeDifferenceTest, TEST_F(HandleTimestampSizeDifferenceTest,
@ -209,38 +207,36 @@ TEST_F(HandleTimestampSizeDifferenceTest,
std::unordered_map<uint32_t, size_t> running_ts_sz = {{2, 0}}; std::unordered_map<uint32_t, size_t> running_ts_sz = {{2, 0}};
std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)}, std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)},
{3, sizeof(char)}}; {3, sizeof(char)}};
std::unique_ptr<WriteBatch> batch(new WriteBatch()); WriteBatch batch;
CreateWriteBatch(record_ts_sz, batch); CreateWriteBatch(record_ts_sz, &batch);
const WriteBatch* orig_batch = batch.get();
// All `check_mode` pass with OK status and `batch` not checked or updated. // All `check_mode` pass with OK status and `batch` not checked or updated.
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency, batch)); TimestampSizeConsistencyMode::kVerifyConsistency));
ASSERT_EQ(orig_batch, batch.get()); std::unique_ptr<WriteBatch> new_batch(nullptr);
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch));
ASSERT_EQ(orig_batch, batch.get()); ASSERT_TRUE(new_batch.get() == nullptr);
} }
TEST_F(HandleTimestampSizeDifferenceTest, InvolvedColumnFamiliesConsistent) { TEST_F(HandleTimestampSizeDifferenceTest, InvolvedColumnFamiliesConsistent) {
std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, sizeof(uint64_t)}, std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, sizeof(uint64_t)},
{2, sizeof(char)}}; {2, sizeof(char)}};
std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)}}; std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)}};
std::unique_ptr<WriteBatch> batch(new WriteBatch()); WriteBatch batch;
CreateWriteBatch(record_ts_sz, batch); CreateWriteBatch(record_ts_sz, &batch);
const WriteBatch* orig_batch = batch.get();
// All `check_mode` pass with OK status and `batch` not updated. // All `check_mode` pass with OK status and `batch` not updated.
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency, batch)); TimestampSizeConsistencyMode::kVerifyConsistency));
ASSERT_EQ(orig_batch, batch.get()); std::unique_ptr<WriteBatch> new_batch(nullptr);
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch));
ASSERT_EQ(orig_batch, batch.get()); ASSERT_TRUE(new_batch.get() == nullptr);
} }
TEST_F(HandleTimestampSizeDifferenceTest, TEST_F(HandleTimestampSizeDifferenceTest,
@ -248,23 +244,22 @@ TEST_F(HandleTimestampSizeDifferenceTest,
std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, 0}, std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, 0},
{2, sizeof(char)}}; {2, sizeof(char)}};
std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)}}; std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)}};
std::unique_ptr<WriteBatch> batch(new WriteBatch()); WriteBatch batch;
CreateWriteBatch(record_ts_sz, batch); CreateWriteBatch(record_ts_sz, &batch);
const WriteBatch* orig_batch = batch.get();
WriteBatch orig_batch_copy(*batch);
// kVerifyConsistency doesn't tolerate inconsistency for running column // kVerifyConsistency doesn't tolerate inconsistency for running column
// families. // families.
ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency, batch) TimestampSizeConsistencyMode::kVerifyConsistency)
.IsInvalidArgument()); .IsInvalidArgument());
std::unique_ptr<WriteBatch> new_batch(nullptr);
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch));
ASSERT_NE(orig_batch, batch.get()); ASSERT_TRUE(new_batch.get() != nullptr);
CheckContentsWithTimestampStripping(orig_batch_copy, *batch, sizeof(uint64_t), CheckContentsWithTimestampStripping(batch, *new_batch, sizeof(uint64_t),
std::nullopt /* dropped_cf */); std::nullopt /* dropped_cf */);
} }
@ -274,22 +269,22 @@ TEST_F(HandleTimestampSizeDifferenceTest,
// Make `record_ts_sz` not contain zero timestamp size entries to follow the // Make `record_ts_sz` not contain zero timestamp size entries to follow the
// behavior of actual WAL log timestamp size record. // behavior of actual WAL log timestamp size record.
std::unordered_map<uint32_t, size_t> record_ts_sz; std::unordered_map<uint32_t, size_t> record_ts_sz;
std::unique_ptr<WriteBatch> batch(new WriteBatch()); WriteBatch batch;
CreateWriteBatch({{1, 0}}, batch); CreateWriteBatch({{1, 0}}, &batch);
const WriteBatch* orig_batch = batch.get();
WriteBatch orig_batch_copy(*batch);
// kVerifyConsistency doesn't tolerate inconsistency for running column // kVerifyConsistency doesn't tolerate inconsistency for running column
// families. // families.
ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency, batch) TimestampSizeConsistencyMode::kVerifyConsistency)
.IsInvalidArgument()); .IsInvalidArgument());
std::unique_ptr<WriteBatch> new_batch(nullptr);
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch));
ASSERT_NE(orig_batch, batch.get()); ASSERT_TRUE(new_batch.get() != nullptr);
CheckContentsWithTimestampPadding(orig_batch_copy, *batch, sizeof(uint64_t)); CheckContentsWithTimestampPadding(batch, *new_batch, sizeof(uint64_t));
} }
TEST_F(HandleTimestampSizeDifferenceTest, TEST_F(HandleTimestampSizeDifferenceTest,
@ -297,35 +292,35 @@ TEST_F(HandleTimestampSizeDifferenceTest,
std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, 0}}; std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, 0}};
std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)}, std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)},
{2, sizeof(char)}}; {2, sizeof(char)}};
std::unique_ptr<WriteBatch> batch(new WriteBatch()); WriteBatch batch;
CreateWriteBatch(record_ts_sz, batch); CreateWriteBatch(record_ts_sz, &batch);
const WriteBatch* orig_batch = batch.get(); std::unique_ptr<WriteBatch> new_batch(nullptr);
WriteBatch orig_batch_copy(*batch);
// kReconcileInconsistency tolerate inconsistency for dropped column family // kReconcileInconsistency tolerate inconsistency for dropped column family
// and all related entries copied over to the new WriteBatch. // and all related entries copied over to the new WriteBatch.
ASSERT_OK(HandleWriteBatchTimestampSizeDifference( ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch));
ASSERT_NE(orig_batch, batch.get());
CheckContentsWithTimestampStripping(orig_batch_copy, *batch, sizeof(uint64_t), ASSERT_TRUE(new_batch.get() != nullptr);
CheckContentsWithTimestampStripping(batch, *new_batch, sizeof(uint64_t),
std::optional<uint32_t>(2)); std::optional<uint32_t>(2));
} }
TEST_F(HandleTimestampSizeDifferenceTest, UnrecoverableInconsistency) { TEST_F(HandleTimestampSizeDifferenceTest, UnrecoverableInconsistency) {
std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, sizeof(char)}}; std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, sizeof(char)}};
std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)}}; std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)}};
std::unique_ptr<WriteBatch> batch(new WriteBatch()); WriteBatch batch;
CreateWriteBatch(record_ts_sz, batch); CreateWriteBatch(record_ts_sz, &batch);
ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency, batch) TimestampSizeConsistencyMode::kVerifyConsistency)
.IsInvalidArgument()); .IsInvalidArgument());
ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz, &batch, running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, batch) TimestampSizeConsistencyMode::kReconcileInconsistency)
.IsInvalidArgument()); .IsInvalidArgument());
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save