Add utils to use for handling user defined timestamp size record in WAL (#11451)

Summary:
Add a util method `HandleWriteBatchTimestampSizeDifference` to handle a `WriteBatch` read from WAL log when user-defined timestamp size record is written and read. Two check modes are added: `kVerifyConsistency` that just verifies the recorded timestamp size are consistent with the running ones. This mode is to be used by `db_impl_secondary` for opening a DB as secondary instance. It will also be used by `db_impl_open` before the user comparator switch support is added to make a column switch between enabling/disable UDT feature. The other mode `kReconcileInconsistency` will be used by `db_impl_open` later when user comparator can be changed.

Another change is to extract a method `CollectColumnFamilyIdsFromWriteBatch` in db_secondary_impl.h into its standalone util file so it can be shared.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11451

Test Plan:
```
make check
./udt_util_test
```

Reviewed By: ltamasi

Differential Revision: D45894386

Pulled By: jowlyzhang

fbshipit-source-id: b96790777f154cddab6d45d9ba2e5d20ebc6fe9d
oxigraph-main
Yu Zhang 2 years ago committed by Facebook GitHub Bot
parent ffb5f1f445
commit 11ebddb1d4
  1. 3
      CMakeLists.txt
  2. 3
      Makefile
  3. 8
      TARGETS
  4. 1
      db/db_impl/db_impl_secondary.cc
  5. 79
      db/db_impl/db_impl_secondary.h
  6. 3
      src.mk
  7. 259
      util/udt_util.cc
  8. 136
      util/udt_util.h
  9. 337
      util/udt_util_test.cc
  10. 25
      util/write_batch_util.cc
  11. 80
      util/write_batch_util.h

@ -865,6 +865,8 @@ set(SOURCES
util/string_util.cc
util/thread_local.cc
util/threadpool_imp.cc
util/udt_util.cc
util/write_batch_util.cc
util/xxhash.cc
utilities/agg_merge/agg_merge.cc
utilities/backup/backup_engine.cc
@ -1421,6 +1423,7 @@ if(WITH_TESTS)
util/timer_test.cc
util/thread_list_test.cc
util/thread_local_test.cc
util/udt_util_test.cc
util/work_queue_test.cc
utilities/agg_merge/agg_merge_test.cc
utilities/backup/backup_engine_test.cc

@ -1417,6 +1417,9 @@ thread_local_test: $(OBJ_DIR)/util/thread_local_test.o $(TEST_LIBRARY) $(LIBRARY
work_queue_test: $(OBJ_DIR)/util/work_queue_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
udt_util_test: $(OBJ_DIR)/util/udt_util_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
corruption_test: $(OBJ_DIR)/db/corruption_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

@ -264,6 +264,8 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"util/string_util.cc",
"util/thread_local.cc",
"util/threadpool_imp.cc",
"util/udt_util.cc",
"util/write_batch_util.cc",
"util/xxhash.cc",
"utilities/agg_merge/agg_merge.cc",
"utilities/backup/backup_engine.cc",
@ -5508,6 +5510,12 @@ cpp_unittest_wrapper(name="ttl_test",
extra_compiler_flags=[])
cpp_unittest_wrapper(name="udt_util_test",
srcs=["util/udt_util_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])
cpp_unittest_wrapper(name="util_merge_operators_test",
srcs=["utilities/util_merge_operators_test.cc"],
deps=[":rocksdb_test_lib"],

@ -14,6 +14,7 @@
#include "monitoring/perf_context_imp.h"
#include "rocksdb/configurable.h"
#include "util/cast_util.h"
#include "util/write_batch_util.h"
namespace ROCKSDB_NAMESPACE {

@ -273,85 +273,6 @@ class DBImplSecondary : public DBImpl {
return Status::OK();
}
// ColumnFamilyCollector is a write batch handler which does nothing
// except recording unique column family IDs
class ColumnFamilyCollector : public WriteBatch::Handler {
std::unordered_set<uint32_t> column_family_ids_;
Status AddColumnFamilyId(uint32_t column_family_id) {
if (column_family_ids_.find(column_family_id) ==
column_family_ids_.end()) {
column_family_ids_.insert(column_family_id);
}
return Status::OK();
}
public:
explicit ColumnFamilyCollector() {}
~ColumnFamilyCollector() override {}
Status PutCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status DeleteCF(uint32_t column_family_id, const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status SingleDeleteCF(uint32_t column_family_id, const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status DeleteRangeCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status MergeCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status PutBlobIndexCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status MarkBeginPrepare(bool) override { return Status::OK(); }
Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); }
Status MarkCommit(const Slice&) override { return Status::OK(); }
Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
return Status::OK();
}
Status MarkNoop(bool) override { return Status::OK(); }
const std::unordered_set<uint32_t>& column_families() const {
return column_family_ids_;
}
};
Status CollectColumnFamilyIdsFromWriteBatch(
const WriteBatch& batch, std::vector<uint32_t>* column_family_ids) {
assert(column_family_ids != nullptr);
column_family_ids->clear();
ColumnFamilyCollector handler;
Status s = batch.Iterate(&handler);
if (s.ok()) {
for (const auto& cf : handler.column_families()) {
column_family_ids->push_back(cf);
}
}
return s;
}
bool OwnTablesAndLogs() const override {
// Currently, the secondary instance does not own the database files. It
// simply opens the files of the primary instance and tracks their file

@ -251,6 +251,8 @@ LIB_SOURCES = \
util/string_util.cc \
util/thread_local.cc \
util/threadpool_imp.cc \
util/udt_util.cc \
util/write_batch_util.cc \
util/xxhash.cc \
utilities/agg_merge/agg_merge.cc \
utilities/backup/backup_engine.cc \
@ -593,6 +595,7 @@ TEST_MAIN_SOURCES = \
util/timer_test.cc \
util/thread_list_test.cc \
util/thread_local_test.cc \
util/udt_util_test.cc \
util/work_queue_test.cc \
utilities/agg_merge/agg_merge_test.cc \
utilities/backup/backup_engine_test.cc \

@ -0,0 +1,259 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "util/udt_util.h"
#include "db/dbformat.h"
#include "rocksdb/types.h"
#include "util/write_batch_util.h"
namespace ROCKSDB_NAMESPACE {
namespace {
enum class RecoveryType {
kNoop,
kUnrecoverable,
kStripTimestamp,
kPadTimestamp,
};
RecoveryType GetRecoveryType(const size_t running_ts_sz,
const std::optional<size_t>& recorded_ts_sz) {
if (running_ts_sz == 0) {
if (!recorded_ts_sz.has_value()) {
// A column family id not recorded is equivalent to that column family has
// zero timestamp size.
return RecoveryType::kNoop;
}
return RecoveryType::kStripTimestamp;
}
assert(running_ts_sz != 0);
if (!recorded_ts_sz.has_value()) {
return RecoveryType::kPadTimestamp;
}
if (running_ts_sz != recorded_ts_sz.value()) {
return RecoveryType::kUnrecoverable;
}
return RecoveryType::kNoop;
}
bool AllRunningColumnFamiliesConsistent(
const std::unordered_map<uint32_t, size_t>& running_ts_sz,
const std::unordered_map<uint32_t, size_t>& record_ts_sz) {
for (const auto& [cf_id, ts_sz] : running_ts_sz) {
auto record_it = record_ts_sz.find(cf_id);
RecoveryType recovery_type =
GetRecoveryType(ts_sz, record_it != record_ts_sz.end()
? std::optional<size_t>(record_it->second)
: std::nullopt);
if (recovery_type != RecoveryType::kNoop) {
return false;
}
}
return true;
}
Status CheckWriteBatchTimestampSizeConsistency(
const WriteBatch* batch,
const std::unordered_map<uint32_t, size_t>& running_ts_sz,
const std::unordered_map<uint32_t, size_t>& record_ts_sz,
TimestampSizeConsistencyMode check_mode, bool* ts_need_recovery) {
std::vector<uint32_t> column_family_ids;
Status status =
CollectColumnFamilyIdsFromWriteBatch(*batch, &column_family_ids);
if (!status.ok()) {
return status;
}
for (const auto& cf_id : column_family_ids) {
auto running_iter = running_ts_sz.find(cf_id);
if (running_iter == running_ts_sz.end()) {
// Ignore dropped column family referred to in a WriteBatch regardless of
// its consistency.
continue;
}
auto record_iter = record_ts_sz.find(cf_id);
RecoveryType recovery_type = GetRecoveryType(
running_iter->second, record_iter != record_ts_sz.end()
? std::optional<size_t>(record_iter->second)
: std::nullopt);
if (recovery_type != RecoveryType::kNoop) {
if (check_mode == TimestampSizeConsistencyMode::kVerifyConsistency) {
return Status::InvalidArgument(
"WriteBatch contains timestamp size inconsistency.");
}
if (recovery_type == RecoveryType::kUnrecoverable) {
return Status::InvalidArgument(
"WriteBatch contains unrecoverable timestamp size inconsistency.");
}
// If any column family needs reconciliation, it will mark the whole
// WriteBatch to need recovery and rebuilt.
*ts_need_recovery = true;
}
}
return Status::OK();
}
} // namespace
TimestampRecoveryHandler::TimestampRecoveryHandler(
const std::unordered_map<uint32_t, size_t>& running_ts_sz,
const std::unordered_map<uint32_t, size_t>& record_ts_sz)
: running_ts_sz_(running_ts_sz),
record_ts_sz_(record_ts_sz),
new_batch_(new WriteBatch()),
handler_valid_(true) {}
Status TimestampRecoveryHandler::PutCF(uint32_t cf, const Slice& key,
const Slice& value) {
std::string new_key_buf;
Slice new_key;
Status status =
ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
if (!status.ok()) {
return status;
}
return WriteBatchInternal::Put(new_batch_.get(), cf, new_key, value);
}
Status TimestampRecoveryHandler::DeleteCF(uint32_t cf, const Slice& key) {
std::string new_key_buf;
Slice new_key;
Status status =
ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
if (!status.ok()) {
return status;
}
return WriteBatchInternal::Delete(new_batch_.get(), cf, new_key);
}
Status TimestampRecoveryHandler::SingleDeleteCF(uint32_t cf, const Slice& key) {
std::string new_key_buf;
Slice new_key;
Status status =
ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
if (!status.ok()) {
return status;
}
return WriteBatchInternal::SingleDelete(new_batch_.get(), cf, new_key);
}
Status TimestampRecoveryHandler::DeleteRangeCF(uint32_t cf,
const Slice& begin_key,
const Slice& end_key) {
std::string new_begin_key_buf;
Slice new_begin_key;
std::string new_end_key_buf;
Slice new_end_key;
Status status = ReconcileTimestampDiscrepancy(
cf, begin_key, &new_begin_key_buf, &new_begin_key);
if (!status.ok()) {
return status;
}
status = ReconcileTimestampDiscrepancy(cf, end_key, &new_end_key_buf,
&new_end_key);
if (!status.ok()) {
return status;
}
return WriteBatchInternal::DeleteRange(new_batch_.get(), cf, new_begin_key,
new_end_key);
}
Status TimestampRecoveryHandler::MergeCF(uint32_t cf, const Slice& key,
const Slice& value) {
std::string new_key_buf;
Slice new_key;
Status status =
ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
if (!status.ok()) {
return status;
}
return WriteBatchInternal::Merge(new_batch_.get(), cf, new_key, value);
}
Status TimestampRecoveryHandler::PutBlobIndexCF(uint32_t cf, const Slice& key,
const Slice& value) {
std::string new_key_buf;
Slice new_key;
Status status =
ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key);
if (!status.ok()) {
return status;
}
return WriteBatchInternal::PutBlobIndex(new_batch_.get(), cf, new_key, value);
}
Status TimestampRecoveryHandler::ReconcileTimestampDiscrepancy(
uint32_t cf, const Slice& key, std::string* new_key_buf, Slice* new_key) {
assert(handler_valid_);
auto running_iter = running_ts_sz_.find(cf);
if (running_iter == running_ts_sz_.end()) {
// The column family referred to by the WriteBatch is no longer running.
// Copy over the entry as is to the new WriteBatch.
*new_key = key;
return Status::OK();
}
size_t running_ts_sz = running_iter->second;
auto record_iter = record_ts_sz_.find(cf);
std::optional<size_t> record_ts_sz =
record_iter != record_ts_sz_.end()
? std::optional<size_t>(record_iter->second)
: std::nullopt;
RecoveryType recovery_type = GetRecoveryType(running_ts_sz, record_ts_sz);
switch (recovery_type) {
case RecoveryType::kNoop:
*new_key = key;
break;
case RecoveryType::kStripTimestamp:
assert(record_ts_sz.has_value());
*new_key = StripTimestampFromUserKey(key, record_ts_sz.value());
break;
case RecoveryType::kPadTimestamp:
AppendKeyWithMinTimestamp(new_key_buf, key, running_ts_sz);
*new_key = *new_key_buf;
break;
case RecoveryType::kUnrecoverable:
return Status::InvalidArgument(
"Unrecoverable timestamp size inconsistency encountered by "
"TimestampRecoveryHandler.");
default:
assert(false);
}
return Status::OK();
}
Status HandleWriteBatchTimestampSizeDifference(
const std::unordered_map<uint32_t, size_t>& running_ts_sz,
const std::unordered_map<uint32_t, size_t>& record_ts_sz,
TimestampSizeConsistencyMode check_mode,
std::unique_ptr<WriteBatch>& batch) {
// Quick path to bypass checking the WriteBatch.
if (AllRunningColumnFamiliesConsistent(running_ts_sz, record_ts_sz)) {
return Status::OK();
}
bool need_recovery = false;
Status status = CheckWriteBatchTimestampSizeConsistency(
batch.get(), running_ts_sz, record_ts_sz, check_mode, &need_recovery);
if (!status.ok()) {
return status;
} else if (need_recovery) {
SequenceNumber sequence = WriteBatchInternal::Sequence(batch.get());
TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz);
status = batch->Iterate(&recovery_handler);
if (!status.ok()) {
return status;
} else {
batch = recovery_handler.TransferNewBatch();
WriteBatchInternal::SetSequence(batch.get(), sequence);
}
}
return Status::OK();
}
} // namespace ROCKSDB_NAMESPACE

@ -1,14 +1,20 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <memory>
#include <optional>
#include <sstream>
#include <unordered_map>
#include <vector>
#include "db/write_batch_internal.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/write_batch.h"
#include "util/coding.h"
namespace ROCKSDB_NAMESPACE {
@ -74,4 +80,132 @@ class UserDefinedTimestampSizeRecord {
std::vector<std::pair<uint32_t, size_t>> cf_to_ts_sz_;
};
// This handler is used to recover a WriteBatch read from WAL logs during
// recovery. It does a best-effort recovery if the column families contained in
// the WriteBatch have inconsistency between the recorded timestamp size and the
// running timestamp size. And creates a new WriteBatch that are consistent with
// the running timestamp size with entries from the original WriteBatch.
//
// Note that for a WriteBatch with no inconsistency, a new WriteBatch is created
// nonetheless, and it should be exactly the same as the original WriteBatch.
//
// To access the new WriteBatch, invoke `TransferNewBatch` after calling
// `Iterate`. The handler becomes invalid afterwards.
//
// For the user key in each entry, the best effort recovery means:
// 1) If recorded timestamp size is 0, running timestamp size is > 0, a min
// timestamp of length running timestamp size is padded to the user key.
// 2) If recorded timestamp size is > 0, running timestamp size is 0, the last
// bytes of length recorded timestamp size is stripped from user key.
// 3) If recorded timestamp size is the same as running timestamp size, no-op.
// 4) If recorded timestamp size and running timestamp size are both non-zero
// but not equal, return Status::InvalidArgument.
class TimestampRecoveryHandler : public WriteBatch::Handler {
public:
TimestampRecoveryHandler(
const std::unordered_map<uint32_t, size_t>& running_ts_sz,
const std::unordered_map<uint32_t, size_t>& record_ts_sz);
~TimestampRecoveryHandler() override {}
// No copy or move.
TimestampRecoveryHandler(const TimestampRecoveryHandler&) = delete;
TimestampRecoveryHandler(TimestampRecoveryHandler&&) = delete;
TimestampRecoveryHandler& operator=(const TimestampRecoveryHandler&) = delete;
TimestampRecoveryHandler& operator=(TimestampRecoveryHandler&&) = delete;
Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override;
Status DeleteCF(uint32_t cf, const Slice& key) override;
Status SingleDeleteCF(uint32_t cf, const Slice& key) override;
Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
const Slice& end_key) override;
Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override;
Status PutBlobIndexCF(uint32_t cf, const Slice& key,
const Slice& value) override;
Status MarkBeginPrepare(bool) override { return Status::OK(); }
Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
Status MarkCommit(const Slice&) override { return Status::OK(); }
Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
return Status::OK();
}
Status MarkRollback(const Slice&) override { return Status::OK(); }
Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); }
std::unique_ptr<WriteBatch>&& TransferNewBatch() {
handler_valid_ = false;
return std::move(new_batch_);
}
private:
Status ReconcileTimestampDiscrepancy(uint32_t cf, const Slice& key,
std::string* new_key_buf,
Slice* new_key);
// Mapping from column family id to user-defined timestamp size for all
// running column families including the ones with zero timestamp size.
const std::unordered_map<uint32_t, size_t>& running_ts_sz_;
// Mapping from column family id to user-defined timestamp size as recorded
// in the WAL. This only contains non-zero user-defined timestamp size.
const std::unordered_map<uint32_t, size_t>& record_ts_sz_;
std::unique_ptr<WriteBatch> new_batch_;
// Handler is valid upon creation and becomes invalid after its `new_batch_`
// is transferred.
bool handler_valid_;
};
// Mode for checking and handling timestamp size inconsistency encountered in a
// WriteBatch read from WAL log.
enum class TimestampSizeConsistencyMode {
// Verified that the recorded user-defined timestamp size is consistent with
// the running one for all the column families involved in a WriteBatch.
// Column families referred to in the WriteBatch but are dropped are ignored.
kVerifyConsistency,
// Verified that if any inconsistency exists in a WriteBatch, it's all
// tolerable by a best-effort reconciliation. And optionally creates a new
// WriteBatch from the original WriteBatch that is consistent with the running
// timestamp size. Column families referred to in the WriteBatch but are
// dropped are ignored. If a new WriteBatch is created, such entries are
// copied over as is.
kReconcileInconsistency,
};
// Handles the inconsistency between recorded timestamp sizes and running
// timestamp sizes for a WriteBatch. A non-OK `status` indicates there are
// intolerable inconsistency with the specified `check_mode`.
//
// If `check_mode` is `kVerifyConsistency`, intolerable inconsistency means any
// running column family has an inconsistent user-defined timestamp size.
//
// If `check_mode` is `kReconcileInconsistency`, intolerable inconsistency means
// any running column family has an inconsistent user-defined timestamp size
// that cannot be reconciled with a best-effort recovery. Check
// `TimestampRecoveryHandler` for what a best-effort recovery is capable of. In
// this mode, a new WriteBatch is created on the heap and transferred to `batch`
// if there is tolerable inconsistency.
//
// An invariant that WAL logging ensures is that all timestamp size info
// is logged prior to a WriteBatch that needed this info. And zero timestamp
// size is skipped. So `record_ts_sz` only contains column family with non-zero
// timestamp size and a column family id absent from `record_ts_sz` will be
// interpreted as that column family has zero timestamp size. On the other hand,
// `running_ts_sz` should contain the timestamp size for all running column
// families including the ones with zero timestamp size.
Status HandleWriteBatchTimestampSizeDifference(
const std::unordered_map<uint32_t, size_t>& running_ts_sz,
const std::unordered_map<uint32_t, size_t>& record_ts_sz,
TimestampSizeConsistencyMode check_mode,
std::unique_ptr<WriteBatch>& batch);
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,337 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "util/udt_util.h"
#include <gtest/gtest.h>
#include "db/dbformat.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
namespace ROCKSDB_NAMESPACE {
namespace {
static const std::string kTestKeyWithoutTs = "key";
static const std::string kValuePlaceHolder = "value";
} // namespace
class HandleTimestampSizeDifferenceTest : public testing::Test {
public:
HandleTimestampSizeDifferenceTest() {}
// Test handler used to collect the column family id and user keys contained
// in a WriteBatch for test verification. And verifies the value part stays
// the same if it's available.
class KeyCollector : public WriteBatch::Handler {
public:
explicit KeyCollector() {}
~KeyCollector() override {}
Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
if (value.compare(kValuePlaceHolder) != 0) {
return Status::InvalidArgument();
}
return AddKey(cf, key);
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
return AddKey(cf, key);
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
return AddKey(cf, key);
}
Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
const Slice& end_key) override {
Status status = AddKey(cf, begin_key);
if (!status.ok()) {
return status;
}
return AddKey(cf, end_key);
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
if (value.compare(kValuePlaceHolder) != 0) {
return Status::InvalidArgument();
}
return AddKey(cf, key);
}
Status PutBlobIndexCF(uint32_t cf, const Slice& key,
const Slice& value) override {
if (value.compare(kValuePlaceHolder) != 0) {
return Status::InvalidArgument();
}
return AddKey(cf, key);
}
Status MarkBeginPrepare(bool) override { return Status::OK(); }
Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); }
Status MarkCommit(const Slice&) override { return Status::OK(); }
Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
return Status::OK();
}
Status MarkNoop(bool) override { return Status::OK(); }
const std::vector<std::pair<uint32_t, const Slice>>& GetKeys() const {
return keys_;
}
private:
Status AddKey(uint32_t cf, const Slice& key) {
keys_.push_back(std::make_pair(cf, key));
return Status::OK();
}
std::vector<std::pair<uint32_t, const Slice>> keys_;
};
void CreateKey(std::string* key_buf, size_t ts_sz) {
if (ts_sz > 0) {
AppendKeyWithMinTimestamp(key_buf, kTestKeyWithoutTs, ts_sz);
} else {
key_buf->assign(kTestKeyWithoutTs);
}
}
void CreateWriteBatch(
const std::unordered_map<uint32_t, size_t>& ts_sz_for_batch,
std::unique_ptr<WriteBatch>& batch) {
for (const auto& [cf_id, ts_sz] : ts_sz_for_batch) {
std::string key;
CreateKey(&key, ts_sz);
ASSERT_OK(
WriteBatchInternal::Put(batch.get(), cf_id, key, kValuePlaceHolder));
ASSERT_OK(WriteBatchInternal::Delete(batch.get(), cf_id, key));
ASSERT_OK(WriteBatchInternal::SingleDelete(batch.get(), cf_id, key));
ASSERT_OK(WriteBatchInternal::DeleteRange(batch.get(), cf_id, key, key));
ASSERT_OK(WriteBatchInternal::Merge(batch.get(), cf_id, key,
kValuePlaceHolder));
ASSERT_OK(WriteBatchInternal::PutBlobIndex(batch.get(), cf_id, key,
kValuePlaceHolder));
}
}
void CheckSequenceEqual(const WriteBatch& orig_batch,
const WriteBatch& new_batch) {
ASSERT_EQ(WriteBatchInternal::Sequence(&orig_batch),
WriteBatchInternal::Sequence(&new_batch));
}
void CheckCountEqual(const WriteBatch& orig_batch,
const WriteBatch& new_batch) {
ASSERT_EQ(WriteBatchInternal::Count(&orig_batch),
WriteBatchInternal::Count(&new_batch));
}
void VerifyKeys(
const std::vector<std::pair<uint32_t, const Slice>>& keys_with_ts,
const std::vector<std::pair<uint32_t, const Slice>>& keys_without_ts,
size_t ts_sz, std::optional<uint32_t> dropped_cf) {
ASSERT_EQ(keys_with_ts.size(), keys_without_ts.size());
const std::string kTsMin(ts_sz, static_cast<unsigned char>(0));
for (size_t i = 0; i < keys_with_ts.size(); i++) {
// TimestampRecoveryHandler ignores dropped column family and copy it over
// as is. Check the keys stay the same.
if (dropped_cf.has_value() &&
keys_with_ts[i].first == dropped_cf.value()) {
ASSERT_EQ(keys_with_ts[i].first, keys_without_ts[i].first);
ASSERT_EQ(keys_with_ts[i].second, keys_without_ts[i].second);
continue;
}
const Slice& key_with_ts = keys_with_ts[i].second;
const Slice& key_without_ts = keys_without_ts[i].second;
ASSERT_TRUE(key_with_ts.starts_with(key_without_ts));
ASSERT_EQ(key_with_ts.size() - key_without_ts.size(), ts_sz);
ASSERT_TRUE(key_with_ts.ends_with(kTsMin));
}
}
void CheckContentsWithTimestampStripping(const WriteBatch& orig_batch,
const WriteBatch& new_batch,
size_t ts_sz,
std::optional<uint32_t> dropped_cf) {
CheckSequenceEqual(orig_batch, new_batch);
CheckCountEqual(orig_batch, new_batch);
KeyCollector collector_for_orig_batch;
ASSERT_OK(orig_batch.Iterate(&collector_for_orig_batch));
KeyCollector collector_for_new_batch;
ASSERT_OK(new_batch.Iterate(&collector_for_new_batch));
VerifyKeys(collector_for_orig_batch.GetKeys(),
collector_for_new_batch.GetKeys(), ts_sz, dropped_cf);
}
void CheckContentsWithTimestampPadding(const WriteBatch& orig_batch,
const WriteBatch& new_batch,
size_t ts_sz) {
CheckSequenceEqual(orig_batch, new_batch);
CheckCountEqual(orig_batch, new_batch);
KeyCollector collector_for_orig_batch;
ASSERT_OK(orig_batch.Iterate(&collector_for_orig_batch));
KeyCollector collector_for_new_batch;
ASSERT_OK(new_batch.Iterate(&collector_for_new_batch));
VerifyKeys(collector_for_new_batch.GetKeys(),
collector_for_orig_batch.GetKeys(), ts_sz,
std::nullopt /* dropped_cf */);
}
};
TEST_F(HandleTimestampSizeDifferenceTest, AllColumnFamiliesConsistent) {
std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, sizeof(uint64_t)},
{2, 0}};
std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)}};
std::unique_ptr<WriteBatch> batch(new WriteBatch());
CreateWriteBatch(running_ts_sz, batch);
const WriteBatch* orig_batch = batch.get();
// All `check_mode` pass with OK status and `batch` not checked or updated.
ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency, batch));
ASSERT_EQ(orig_batch, batch.get());
ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, batch));
ASSERT_EQ(orig_batch, batch.get());
}
TEST_F(HandleTimestampSizeDifferenceTest,
AllInconsistentColumnFamiliesDropped) {
std::unordered_map<uint32_t, size_t> running_ts_sz = {{2, 0}};
std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)},
{3, sizeof(char)}};
std::unique_ptr<WriteBatch> batch(new WriteBatch());
CreateWriteBatch(record_ts_sz, batch);
const WriteBatch* orig_batch = batch.get();
// All `check_mode` pass with OK status and `batch` not checked or updated.
ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency, batch));
ASSERT_EQ(orig_batch, batch.get());
ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, batch));
ASSERT_EQ(orig_batch, batch.get());
}
TEST_F(HandleTimestampSizeDifferenceTest, InvolvedColumnFamiliesConsistent) {
std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, sizeof(uint64_t)},
{2, sizeof(char)}};
std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)}};
std::unique_ptr<WriteBatch> batch(new WriteBatch());
CreateWriteBatch(record_ts_sz, batch);
const WriteBatch* orig_batch = batch.get();
// All `check_mode` pass with OK status and `batch` not updated.
ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency, batch));
ASSERT_EQ(orig_batch, batch.get());
ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, batch));
ASSERT_EQ(orig_batch, batch.get());
}
TEST_F(HandleTimestampSizeDifferenceTest,
InconsistentColumnFamilyNeedsTimestampStripping) {
std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, 0},
{2, sizeof(char)}};
std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)}};
std::unique_ptr<WriteBatch> batch(new WriteBatch());
CreateWriteBatch(record_ts_sz, batch);
const WriteBatch* orig_batch = batch.get();
WriteBatch orig_batch_copy(*batch);
// kVerifyConsistency doesn't tolerate inconsistency for running column
// families.
ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency, batch)
.IsInvalidArgument());
ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, batch));
ASSERT_NE(orig_batch, batch.get());
CheckContentsWithTimestampStripping(orig_batch_copy, *batch, sizeof(uint64_t),
std::nullopt /* dropped_cf */);
}
TEST_F(HandleTimestampSizeDifferenceTest,
InconsistentColumnFamilyNeedsTimestampPadding) {
std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, sizeof(uint64_t)}};
// Make `record_ts_sz` not contain zero timestamp size entries to follow the
// behavior of actual WAL log timestamp size record.
std::unordered_map<uint32_t, size_t> record_ts_sz;
std::unique_ptr<WriteBatch> batch(new WriteBatch());
CreateWriteBatch({{1, 0}}, batch);
const WriteBatch* orig_batch = batch.get();
WriteBatch orig_batch_copy(*batch);
// kVerifyConsistency doesn't tolerate inconsistency for running column
// families.
ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency, batch)
.IsInvalidArgument());
ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, batch));
ASSERT_NE(orig_batch, batch.get());
CheckContentsWithTimestampPadding(orig_batch_copy, *batch, sizeof(uint64_t));
}
TEST_F(HandleTimestampSizeDifferenceTest,
InconsistencyReconcileCopyOverDroppedColumnFamily) {
std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, 0}};
std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)},
{2, sizeof(char)}};
std::unique_ptr<WriteBatch> batch(new WriteBatch());
CreateWriteBatch(record_ts_sz, batch);
const WriteBatch* orig_batch = batch.get();
WriteBatch orig_batch_copy(*batch);
// kReconcileInconsistency tolerate inconsistency for dropped column family
// and all related entries copied over to the new WriteBatch.
ASSERT_OK(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, batch));
ASSERT_NE(orig_batch, batch.get());
CheckContentsWithTimestampStripping(orig_batch_copy, *batch, sizeof(uint64_t),
std::optional<uint32_t>(2));
}
TEST_F(HandleTimestampSizeDifferenceTest, UnrecoverableInconsistency) {
std::unordered_map<uint32_t, size_t> running_ts_sz = {{1, sizeof(char)}};
std::unordered_map<uint32_t, size_t> record_ts_sz = {{1, sizeof(uint64_t)}};
std::unique_ptr<WriteBatch> batch(new WriteBatch());
CreateWriteBatch(record_ts_sz, batch);
ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kVerifyConsistency, batch)
.IsInvalidArgument());
ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference(
running_ts_sz, record_ts_sz,
TimestampSizeConsistencyMode::kReconcileInconsistency, batch)
.IsInvalidArgument());
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -0,0 +1,25 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "util/write_batch_util.h"
namespace ROCKSDB_NAMESPACE {
Status CollectColumnFamilyIdsFromWriteBatch(
const WriteBatch& batch, std::vector<uint32_t>* column_family_ids) {
assert(column_family_ids != nullptr);
column_family_ids->clear();
ColumnFamilyCollector handler;
Status s = batch.Iterate(&handler);
if (s.ok()) {
for (const auto& cf : handler.column_families()) {
column_family_ids->push_back(cf);
}
}
return s;
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,80 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <unordered_set>
#include <vector>
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/write_batch.h"
namespace ROCKSDB_NAMESPACE {
// ColumnFamilyCollector is a write batch handler which does nothing
// except recording unique column family IDs
class ColumnFamilyCollector : public WriteBatch::Handler {
std::unordered_set<uint32_t> column_family_ids_;
Status AddColumnFamilyId(uint32_t column_family_id) {
column_family_ids_.insert(column_family_id);
return Status::OK();
}
public:
explicit ColumnFamilyCollector() {}
~ColumnFamilyCollector() override {}
Status PutCF(uint32_t column_family_id, const Slice&, const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status DeleteCF(uint32_t column_family_id, const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status SingleDeleteCF(uint32_t column_family_id, const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status DeleteRangeCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status MergeCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status PutBlobIndexCF(uint32_t column_family_id, const Slice&,
const Slice&) override {
return AddColumnFamilyId(column_family_id);
}
Status MarkBeginPrepare(bool) override { return Status::OK(); }
Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); }
Status MarkCommit(const Slice&) override { return Status::OK(); }
Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
return Status::OK();
}
Status MarkNoop(bool) override { return Status::OK(); }
const std::unordered_set<uint32_t>& column_families() const {
return column_family_ids_;
}
};
Status CollectColumnFamilyIdsFromWriteBatch(
const WriteBatch& batch, std::vector<uint32_t>* column_family_ids);
} // namespace ROCKSDB_NAMESPACE
Loading…
Cancel
Save