Update WriteBatch::AssignTimestamp() and Add (#9205)

Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9205

Update WriteBatch::AssignTimestamp() APIs so that they take an
additional argument, i.e. a function object called `checker` indicating the user-specified logic of performing
checks on timestamp sizes.

WriteBatch is a building block used by multiple other RocksDB components, each of which may track
timestamp information in different data structures. For example, transaction can either write to
`WriteBatchWithIndex` which is a `WriteBatch` with index, or write directly to raw `WriteBatch` if
`Transaction::DisableIndexing()` is called.
`WriteBatchWithIndex` keeps mapping from column family id to comparator, and transaction needs
to keep similar information for the `WriteBatch` if user calls `Transaction::DisableIndexing()` (dynamically)
so that we will know the size of each timestamp later. The bookkeeping info maintained by `WriteBatchWithIndex`
and `Transaction` should not overlap.
When we later call `WriteBatch::AssignTimestamp()`, we need to use these data structures to guarantee
that we do not accidentally assign timestamps for keys from column families that disable timestamp.

Reviewed By: ltamasi

Differential Revision: D31735186

fbshipit-source-id: 8b1709ed880ac72f995aa9e012e5873b290840a7
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent 29954b8b57
commit 924616526a
  1. 1
      HISTORY.md
  2. 112
      db/write_batch.cc
  3. 174
      db/write_batch_internal.h
  4. 164
      db/write_batch_test.cc
  5. 38
      include/rocksdb/write_batch.h

@ -4,6 +4,7 @@
### Bug Fixes
### Behavior Changes
### Public API change
* Extend WriteBatch::AssignTimestamp and AssignTimestamps API so that both functions can accept an optional `checker` argument that performs additional checking on timestamp sizes.
## 6.27.0 (2021-11-19)
### New Features

@ -140,108 +140,6 @@ struct BatchContentClassifier : public WriteBatch::Handler {
}
};
class TimestampAssigner : public WriteBatch::Handler {
public:
explicit TimestampAssigner(const Slice& ts,
WriteBatch::ProtectionInfo* prot_info)
: timestamp_(ts),
timestamps_(kEmptyTimestampList),
prot_info_(prot_info) {}
explicit TimestampAssigner(const std::vector<Slice>& ts_list,
WriteBatch::ProtectionInfo* prot_info)
: timestamps_(ts_list), prot_info_(prot_info) {}
~TimestampAssigner() override {}
Status PutCF(uint32_t, const Slice& key, const Slice&) override {
AssignTimestamp(key);
++idx_;
return Status::OK();
}
Status DeleteCF(uint32_t, const Slice& key) override {
AssignTimestamp(key);
++idx_;
return Status::OK();
}
Status SingleDeleteCF(uint32_t, const Slice& key) override {
AssignTimestamp(key);
++idx_;
return Status::OK();
}
Status DeleteRangeCF(uint32_t, const Slice& begin_key,
const Slice& /* end_key */) override {
AssignTimestamp(begin_key);
++idx_;
return Status::OK();
}
Status MergeCF(uint32_t, const Slice& key, const Slice&) override {
AssignTimestamp(key);
++idx_;
return Status::OK();
}
Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override {
// TODO (yanqin): support blob db in the future.
return Status::OK();
}
Status MarkBeginPrepare(bool) override {
// TODO (yanqin): support in the future.
return Status::OK();
}
Status MarkEndPrepare(const Slice&) override {
// TODO (yanqin): support in the future.
return Status::OK();
}
Status MarkCommit(const Slice&) override {
// TODO (yanqin): support in the future.
return Status::OK();
}
Status MarkRollback(const Slice&) override {
// TODO (yanqin): support in the future.
return Status::OK();
}
private:
void AssignTimestamp(const Slice& key) {
assert(timestamps_.empty() || idx_ < timestamps_.size());
const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_];
size_t ts_sz = ts.size();
if (ts_sz == 0) {
// This key does not have timestamp, so skip.
return;
}
if (prot_info_ != nullptr) {
SliceParts old_key(&key, 1);
Slice key_no_ts(key.data(), key.size() - ts_sz);
std::array<Slice, 2> new_key_cmpts{{key_no_ts, ts}};
SliceParts new_key(new_key_cmpts.data(), 2);
prot_info_->entries_[idx_].UpdateK(old_key, new_key);
}
char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
memcpy(ptr, ts.data(), ts_sz);
}
static const std::vector<Slice> kEmptyTimestampList;
const Slice timestamp_;
const std::vector<Slice>& timestamps_;
WriteBatch::ProtectionInfo* const prot_info_;
size_t idx_ = 0;
// No copy or move.
TimestampAssigner(const TimestampAssigner&) = delete;
TimestampAssigner(TimestampAssigner&&) = delete;
TimestampAssigner& operator=(const TimestampAssigner&) = delete;
TimestampAssigner&& operator=(TimestampAssigner&&) = delete;
};
const std::vector<Slice> TimestampAssigner::kEmptyTimestampList;
} // anon namespace
struct SavePoints {
@ -1292,16 +1190,6 @@ Status WriteBatch::PopSavePoint() {
return Status::OK();
}
Status WriteBatch::AssignTimestamp(const Slice& ts) {
TimestampAssigner ts_assigner(ts, prot_info_.get());
return Iterate(&ts_assigner);
}
Status WriteBatch::AssignTimestamps(const std::vector<Slice>& ts_list) {
TimestampAssigner ts_assigner(ts_list, prot_info_.get());
return Iterate(&ts_assigner);
}
class MemTableInserter : public WriteBatch::Handler {
SequenceNumber sequence_;

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <array>
#include <vector>
#include "db/flush_scheduler.h"
@ -19,6 +20,7 @@
#include "rocksdb/types.h"
#include "rocksdb/write_batch.h"
#include "util/autovector.h"
#include "util/cast_util.h"
namespace ROCKSDB_NAMESPACE {
@ -260,4 +262,176 @@ class LocalSavePoint {
#endif
};
template <typename Derived, typename Checker>
class TimestampAssignerBase : public WriteBatch::Handler {
public:
explicit TimestampAssignerBase(WriteBatch::ProtectionInfo* prot_info,
Checker&& checker)
: prot_info_(prot_info), checker_(std::move(checker)) {}
~TimestampAssignerBase() override {}
Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
return AssignTimestamp(cf, key);
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
return AssignTimestamp(cf, key);
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
return AssignTimestamp(cf, key);
}
Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
const Slice&) override {
return AssignTimestamp(cf, begin_key);
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
return AssignTimestamp(cf, key);
}
Status PutBlobIndexCF(uint32_t cf, const Slice& key, const Slice&) override {
return AssignTimestamp(cf, key);
}
Status MarkBeginPrepare(bool) override { return Status::OK(); }
Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
Status MarkCommit(const Slice&) override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); }
protected:
Status AssignTimestamp(uint32_t cf, const Slice& key) {
Status s = static_cast_with_check<Derived>(this)->AssignTimestampImpl(
cf, key, idx_);
++idx_;
return s;
}
Status CheckTimestampSize(uint32_t cf, size_t& ts_sz) {
return checker_(cf, ts_sz);
}
Status UpdateTimestampIfNeeded(size_t ts_sz, const Slice& key,
const Slice& ts) {
if (ts_sz > 0) {
assert(ts_sz == ts.size());
UpdateProtectionInformationIfNeeded(key, ts);
UpdateTimestamp(key, ts);
}
return Status::OK();
}
void UpdateProtectionInformationIfNeeded(const Slice& key, const Slice& ts) {
if (prot_info_ != nullptr) {
const size_t ts_sz = ts.size();
SliceParts old_key(&key, 1);
Slice key_no_ts(key.data(), key.size() - ts_sz);
std::array<Slice, 2> new_key_cmpts{{key_no_ts, ts}};
SliceParts new_key(new_key_cmpts.data(), 2);
prot_info_->entries_[idx_].UpdateK(old_key, new_key);
}
}
void UpdateTimestamp(const Slice& key, const Slice& ts) {
const size_t ts_sz = ts.size();
char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
assert(ptr);
memcpy(ptr, ts.data(), ts_sz);
}
// No copy or move.
TimestampAssignerBase(const TimestampAssignerBase&) = delete;
TimestampAssignerBase(TimestampAssignerBase&&) = delete;
TimestampAssignerBase& operator=(const TimestampAssignerBase&) = delete;
TimestampAssignerBase& operator=(TimestampAssignerBase&&) = delete;
WriteBatch::ProtectionInfo* const prot_info_ = nullptr;
const Checker checker_{};
size_t idx_ = 0;
};
template <typename Checker>
class SimpleListTimestampAssigner
: public TimestampAssignerBase<SimpleListTimestampAssigner<Checker>,
Checker> {
public:
explicit SimpleListTimestampAssigner(WriteBatch::ProtectionInfo* prot_info,
Checker checker,
const std::vector<Slice>& timestamps)
: TimestampAssignerBase<SimpleListTimestampAssigner<Checker>, Checker>(
prot_info, std::move(checker)),
timestamps_(timestamps) {}
~SimpleListTimestampAssigner() override {}
private:
friend class TimestampAssignerBase<SimpleListTimestampAssigner<Checker>,
Checker>;
Status AssignTimestampImpl(uint32_t cf, const Slice& key, size_t idx) {
if (idx >= timestamps_.size()) {
return Status::InvalidArgument("Need more timestamps for the assignment");
}
const Slice& ts = timestamps_[idx];
size_t ts_sz = ts.size();
const Status s = this->CheckTimestampSize(cf, ts_sz);
if (!s.ok()) {
return s;
}
return this->UpdateTimestampIfNeeded(ts_sz, key, ts);
}
const std::vector<Slice>& timestamps_;
};
template <typename Checker>
class TimestampAssigner
: public TimestampAssignerBase<TimestampAssigner<Checker>, Checker> {
public:
explicit TimestampAssigner(WriteBatch::ProtectionInfo* prot_info,
Checker checker, const Slice& ts)
: TimestampAssignerBase<TimestampAssigner<Checker>, Checker>(
prot_info, std::move(checker)),
timestamp_(ts) {
assert(!timestamp_.empty());
}
~TimestampAssigner() override {}
private:
friend class TimestampAssignerBase<TimestampAssigner<Checker>, Checker>;
Status AssignTimestampImpl(uint32_t cf, const Slice& key, size_t /*idx*/) {
if (timestamp_.empty()) {
return Status::InvalidArgument("Timestamp is empty");
}
size_t ts_sz = timestamp_.size();
const Status s = this->CheckTimestampSize(cf, ts_sz);
if (!s.ok()) {
return s;
}
return this->UpdateTimestampIfNeeded(ts_sz, key, timestamp_);
}
const Slice timestamp_;
};
template <typename Checker>
Status WriteBatch::AssignTimestamp(const Slice& ts, Checker checker) {
TimestampAssigner<Checker> ts_assigner(prot_info_.get(), checker, ts);
return Iterate(&ts_assigner);
}
template <typename Checker>
Status WriteBatch::AssignTimestamps(const std::vector<Slice>& ts_list,
Checker checker) {
SimpleListTimestampAssigner<Checker> ts_assigner(prot_info_.get(), checker,
ts_list);
return Iterate(&ts_assigner);
}
} // namespace ROCKSDB_NAMESPACE

@ -13,6 +13,7 @@
#include "db/db_test_util.h"
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
@ -20,6 +21,7 @@
#include "rocksdb/write_buffer_manager.h"
#include "table/scoped_arena_iterator.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
@ -627,13 +629,16 @@ class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl {
public:
explicit ColumnFamilyHandleImplDummy(int id)
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {}
explicit ColumnFamilyHandleImplDummy(int id, const Comparator* ucmp)
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr),
id_(id),
ucmp_(ucmp) {}
uint32_t GetID() const override { return id_; }
const Comparator* GetComparator() const override {
return BytewiseComparator();
}
const Comparator* GetComparator() const override { return ucmp_; }
private:
uint32_t id_;
const Comparator* const ucmp_ = BytewiseComparator();
};
} // namespace anonymous
@ -899,6 +904,159 @@ TEST_F(WriteBatchTest, MemoryLimitTest) {
ASSERT_TRUE(s.IsMemoryLimit());
}
namespace {
class TimestampChecker : public WriteBatch::Handler {
public:
explicit TimestampChecker(
std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps, Slice ts)
: cf_to_ucmps_(std::move(cf_to_ucmps)), timestamp_(std::move(ts)) {}
Status PutCF(uint32_t cf, const Slice& key, const Slice& /*value*/) override {
auto cf_iter = cf_to_ucmps_.find(cf);
if (cf_iter == cf_to_ucmps_.end()) {
return Status::Corruption();
}
const Comparator* const ucmp = cf_iter->second;
assert(ucmp);
size_t ts_sz = ucmp->timestamp_size();
if (ts_sz == 0) {
return Status::OK();
}
if (key.size() < ts_sz) {
return Status::Corruption();
}
Slice ts = ExtractTimestampFromUserKey(key, ts_sz);
if (ts.compare(timestamp_) != 0) {
return Status::Corruption();
}
return Status::OK();
}
private:
std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps_;
Slice timestamp_;
};
Status CheckTimestampsInWriteBatch(
WriteBatch& wb, Slice timestamp,
std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps) {
TimestampChecker ts_checker(cf_to_ucmps, timestamp);
return wb.Iterate(&ts_checker);
}
} // namespace
TEST_F(WriteBatchTest, AssignTimestamps) {
// We assume the last eight bytes of each key is reserved for timestamps.
// Therefore, we must make sure each key is longer than eight bytes.
constexpr size_t key_size = 16;
constexpr size_t num_of_keys = 10;
std::vector<std::string> key_strs(num_of_keys, std::string(key_size, '\0'));
ColumnFamilyHandleImplDummy cf0(0);
ColumnFamilyHandleImplDummy cf4(4, test::ComparatorWithU64Ts());
ColumnFamilyHandleImplDummy cf5(5, test::ComparatorWithU64Ts());
const std::unordered_map<uint32_t, const Comparator*> cf_to_ucmps = {
{0, cf0.GetComparator()},
{4, cf4.GetComparator()},
{5, cf5.GetComparator()}};
WriteBatch batch;
// Write to the batch. We will assign timestamps later.
for (const auto& key_str : key_strs) {
ASSERT_OK(batch.Put(&cf0, key_str, "value"));
ASSERT_OK(batch.Put(&cf4, key_str, "value"));
ASSERT_OK(batch.Put(&cf5, key_str, "value"));
}
static constexpr size_t timestamp_size = sizeof(uint64_t);
const auto checker1 = [](uint32_t cf, size_t& ts_sz) {
if (cf == 4 || cf == 5) {
if (ts_sz != timestamp_size) {
return Status::InvalidArgument("Timestamp size mismatch");
}
} else if (cf == 0) {
ts_sz = 0;
return Status::OK();
} else {
return Status::Corruption("Invalid cf");
}
return Status::OK();
};
ASSERT_OK(
batch.AssignTimestamp(std::string(timestamp_size, '\xfe'), checker1));
ASSERT_OK(CheckTimestampsInWriteBatch(
batch, std::string(timestamp_size, '\xfe'), cf_to_ucmps));
// We use indexed_cf_to_ucmps, non_indexed_cfs_with_ts and timestamp_size to
// simulate the case in which a transaction enables indexing for some writes
// while disables indexing for other writes. A transaction uses a
// WriteBatchWithIndex object to buffer writes (we consider Write-committed
// policy only). If indexing is enabled, then writes go through
// WriteBatchWithIndex API populating a WBWI internal data structure, i.e. a
// mapping from cf to user comparators. If indexing is disabled, a transaction
// writes directly to the underlying raw WriteBatch. We will need to track the
// comparator information for the column families to which un-indexed writes
// are performed. When calling AssignTimestamp(s) API of WriteBatch, we need
// indexed_cf_to_ucmps, non_indexed_cfs_with_ts, and timestamp_size to perform
// checking.
std::unordered_map<uint32_t, const Comparator*> indexed_cf_to_ucmps = {
{0, cf0.GetComparator()}, {4, cf4.GetComparator()}};
std::unordered_set<uint32_t> non_indexed_cfs_with_ts = {cf5.GetID()};
const auto checker2 = [&indexed_cf_to_ucmps, &non_indexed_cfs_with_ts](
uint32_t cf, size_t& ts_sz) {
if (non_indexed_cfs_with_ts.count(cf) > 0) {
if (ts_sz != timestamp_size) {
return Status::InvalidArgument("Timestamp size mismatch");
}
return Status::OK();
}
auto cf_iter = indexed_cf_to_ucmps.find(cf);
if (cf_iter == indexed_cf_to_ucmps.end()) {
return Status::Corruption("Unknown cf");
}
const Comparator* const ucmp = cf_iter->second;
assert(ucmp);
if (ucmp->timestamp_size() == 0) {
ts_sz = 0;
} else if (ts_sz != ucmp->timestamp_size()) {
return Status::InvalidArgument("Timestamp size mismatch");
}
return Status::OK();
};
ASSERT_OK(
batch.AssignTimestamp(std::string(timestamp_size, '\xef'), checker2));
ASSERT_OK(CheckTimestampsInWriteBatch(
batch, std::string(timestamp_size, '\xef'), cf_to_ucmps));
std::vector<std::string> ts_strs;
for (size_t i = 0; i < 3 * key_strs.size(); ++i) {
if (0 == (i % 3)) {
ts_strs.emplace_back();
} else {
ts_strs.emplace_back(std::string(timestamp_size, '\xee'));
}
}
std::vector<Slice> ts_vec(ts_strs.size());
for (size_t i = 0; i < ts_vec.size(); ++i) {
ts_vec[i] = ts_strs[i];
}
const auto checker3 = [&cf_to_ucmps](uint32_t cf, size_t& ts_sz) {
auto cf_iter = cf_to_ucmps.find(cf);
if (cf_iter == cf_to_ucmps.end()) {
return Status::Corruption("Invalid cf");
}
const Comparator* const ucmp = cf_iter->second;
assert(ucmp);
if (ucmp->timestamp_size() != ts_sz) {
return Status::InvalidArgument("Timestamp size mismatch");
}
return Status::OK();
};
ASSERT_OK(batch.AssignTimestamps(ts_vec, checker3));
ASSERT_OK(CheckTimestampsInWriteBatch(
batch, std::string(timestamp_size, '\xee'), cf_to_ucmps));
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -25,10 +25,13 @@
#pragma once
#include <stdint.h>
#include <atomic>
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include "rocksdb/status.h"
#include "rocksdb/write_batch_base.h"
@ -333,18 +336,43 @@ class WriteBatch : public WriteBatchBase {
// Returns true if MarkRollback will be called during Iterate
bool HasRollback() const;
// Assign timestamp to write batch.
// This requires that all keys (possibly from multiple column families) in
// the write batch have timestamps of the same format.
Status AssignTimestamp(const Slice& ts);
struct TimestampChecker final {
Status operator()(uint32_t /*cf*/, size_t& /*ts_sz*/) const {
return Status::OK();
}
};
// Experimental.
// Assign timestamp to write batch.
// This requires that all keys, if enable timestamp, (possibly from multiple
// column families) in the write batch have timestamps of the same format.
// checker: callable object to check the timestamp sizes of column families.
// User can call checker(uint32_t cf, size_t& ts_sz) which does the
// following:
// 1. find out the timestamp size of cf.
// 2. if cf's timestamp size is 0, then set ts_sz to 0 and return OK.
// 3. otherwise, compare ts_sz with cf's timestamp size and return
// Status::InvalidArgument() if different.
template <typename Checker = TimestampChecker>
Status AssignTimestamp(const Slice& ts, Checker checker = Checker());
// Experimental.
// Assign timestamps to write batch.
// This API allows the write batch to include keys from multiple column
// families whose timestamps' formats can differ. For example, some column
// families can enable timestamp, while others disable the feature.
// If key does not have timestamp, then put an empty Slice in ts_list as
// a placeholder.
Status AssignTimestamps(const std::vector<Slice>& ts_list);
// checker: callable object specified by caller to check the timestamp sizes
// of column families.
// User can call checker(uint32_t cf, size_t& ts_sz) which does the
// following:
// 1. find out the timestamp size of cf.
// 2. compare ts_sz with cf's timestamp size and return
// Status::InvalidArgument() if different.
template <typename Checker = TimestampChecker>
Status AssignTimestamps(const std::vector<Slice>& ts_list,
Checker checker = Checker());
using WriteBatchBase::GetWriteBatch;
WriteBatch* GetWriteBatch() override { return this; }

Loading…
Cancel
Save