diff --git a/HISTORY.md b/HISTORY.md index 59811c189..e2f274bed 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ ### Bug Fixes ### Behavior Changes ### Public API change +* Extend WriteBatch::AssignTimestamp and AssignTimestamps API so that both functions can accept an optional `checker` argument that performs additional checking on timestamp sizes. ## 6.27.0 (2021-11-19) ### New Features diff --git a/db/write_batch.cc b/db/write_batch.cc index 3343cca06..d96dbf111 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -140,108 +140,6 @@ struct BatchContentClassifier : public WriteBatch::Handler { } }; -class TimestampAssigner : public WriteBatch::Handler { - public: - explicit TimestampAssigner(const Slice& ts, - WriteBatch::ProtectionInfo* prot_info) - : timestamp_(ts), - timestamps_(kEmptyTimestampList), - prot_info_(prot_info) {} - explicit TimestampAssigner(const std::vector& ts_list, - WriteBatch::ProtectionInfo* prot_info) - : timestamps_(ts_list), prot_info_(prot_info) {} - ~TimestampAssigner() override {} - - Status PutCF(uint32_t, const Slice& key, const Slice&) override { - AssignTimestamp(key); - ++idx_; - return Status::OK(); - } - - Status DeleteCF(uint32_t, const Slice& key) override { - AssignTimestamp(key); - ++idx_; - return Status::OK(); - } - - Status SingleDeleteCF(uint32_t, const Slice& key) override { - AssignTimestamp(key); - ++idx_; - return Status::OK(); - } - - Status DeleteRangeCF(uint32_t, const Slice& begin_key, - const Slice& /* end_key */) override { - AssignTimestamp(begin_key); - ++idx_; - return Status::OK(); - } - - Status MergeCF(uint32_t, const Slice& key, const Slice&) override { - AssignTimestamp(key); - ++idx_; - return Status::OK(); - } - - Status PutBlobIndexCF(uint32_t, const Slice&, const Slice&) override { - // TODO (yanqin): support blob db in the future. - return Status::OK(); - } - - Status MarkBeginPrepare(bool) override { - // TODO (yanqin): support in the future. - return Status::OK(); - } - - Status MarkEndPrepare(const Slice&) override { - // TODO (yanqin): support in the future. - return Status::OK(); - } - - Status MarkCommit(const Slice&) override { - // TODO (yanqin): support in the future. - return Status::OK(); - } - - Status MarkRollback(const Slice&) override { - // TODO (yanqin): support in the future. - return Status::OK(); - } - - private: - void AssignTimestamp(const Slice& key) { - assert(timestamps_.empty() || idx_ < timestamps_.size()); - const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_]; - size_t ts_sz = ts.size(); - if (ts_sz == 0) { - // This key does not have timestamp, so skip. - return; - } - if (prot_info_ != nullptr) { - SliceParts old_key(&key, 1); - Slice key_no_ts(key.data(), key.size() - ts_sz); - std::array new_key_cmpts{{key_no_ts, ts}}; - SliceParts new_key(new_key_cmpts.data(), 2); - prot_info_->entries_[idx_].UpdateK(old_key, new_key); - } - char* ptr = const_cast(key.data() + key.size() - ts_sz); - memcpy(ptr, ts.data(), ts_sz); - } - - static const std::vector kEmptyTimestampList; - const Slice timestamp_; - const std::vector& timestamps_; - WriteBatch::ProtectionInfo* const prot_info_; - size_t idx_ = 0; - - // No copy or move. - TimestampAssigner(const TimestampAssigner&) = delete; - TimestampAssigner(TimestampAssigner&&) = delete; - TimestampAssigner& operator=(const TimestampAssigner&) = delete; - TimestampAssigner&& operator=(TimestampAssigner&&) = delete; -}; -const std::vector TimestampAssigner::kEmptyTimestampList; - } // anon namespace struct SavePoints { @@ -1292,16 +1190,6 @@ Status WriteBatch::PopSavePoint() { return Status::OK(); } -Status WriteBatch::AssignTimestamp(const Slice& ts) { - TimestampAssigner ts_assigner(ts, prot_info_.get()); - return Iterate(&ts_assigner); -} - -Status WriteBatch::AssignTimestamps(const std::vector& ts_list) { - TimestampAssigner ts_assigner(ts_list, prot_info_.get()); - return Iterate(&ts_assigner); -} - class MemTableInserter : public WriteBatch::Handler { SequenceNumber sequence_; diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 8e1d0e3e2..13afa822b 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include #include #include "db/flush_scheduler.h" @@ -19,6 +20,7 @@ #include "rocksdb/types.h" #include "rocksdb/write_batch.h" #include "util/autovector.h" +#include "util/cast_util.h" namespace ROCKSDB_NAMESPACE { @@ -260,4 +262,176 @@ class LocalSavePoint { #endif }; +template +class TimestampAssignerBase : public WriteBatch::Handler { + public: + explicit TimestampAssignerBase(WriteBatch::ProtectionInfo* prot_info, + Checker&& checker) + : prot_info_(prot_info), checker_(std::move(checker)) {} + + ~TimestampAssignerBase() override {} + + Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { + return AssignTimestamp(cf, key); + } + + Status DeleteCF(uint32_t cf, const Slice& key) override { + return AssignTimestamp(cf, key); + } + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + return AssignTimestamp(cf, key); + } + + Status DeleteRangeCF(uint32_t cf, const Slice& begin_key, + const Slice&) override { + return AssignTimestamp(cf, begin_key); + } + + Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { + return AssignTimestamp(cf, key); + } + + Status PutBlobIndexCF(uint32_t cf, const Slice& key, const Slice&) override { + return AssignTimestamp(cf, key); + } + + Status MarkBeginPrepare(bool) override { return Status::OK(); } + + Status MarkEndPrepare(const Slice&) override { return Status::OK(); } + + Status MarkCommit(const Slice&) override { return Status::OK(); } + + Status MarkRollback(const Slice&) override { return Status::OK(); } + + protected: + Status AssignTimestamp(uint32_t cf, const Slice& key) { + Status s = static_cast_with_check(this)->AssignTimestampImpl( + cf, key, idx_); + ++idx_; + return s; + } + + Status CheckTimestampSize(uint32_t cf, size_t& ts_sz) { + return checker_(cf, ts_sz); + } + + Status UpdateTimestampIfNeeded(size_t ts_sz, const Slice& key, + const Slice& ts) { + if (ts_sz > 0) { + assert(ts_sz == ts.size()); + UpdateProtectionInformationIfNeeded(key, ts); + UpdateTimestamp(key, ts); + } + return Status::OK(); + } + + void UpdateProtectionInformationIfNeeded(const Slice& key, const Slice& ts) { + if (prot_info_ != nullptr) { + const size_t ts_sz = ts.size(); + SliceParts old_key(&key, 1); + Slice key_no_ts(key.data(), key.size() - ts_sz); + std::array new_key_cmpts{{key_no_ts, ts}}; + SliceParts new_key(new_key_cmpts.data(), 2); + prot_info_->entries_[idx_].UpdateK(old_key, new_key); + } + } + + void UpdateTimestamp(const Slice& key, const Slice& ts) { + const size_t ts_sz = ts.size(); + char* ptr = const_cast(key.data() + key.size() - ts_sz); + assert(ptr); + memcpy(ptr, ts.data(), ts_sz); + } + + // No copy or move. + TimestampAssignerBase(const TimestampAssignerBase&) = delete; + TimestampAssignerBase(TimestampAssignerBase&&) = delete; + TimestampAssignerBase& operator=(const TimestampAssignerBase&) = delete; + TimestampAssignerBase& operator=(TimestampAssignerBase&&) = delete; + + WriteBatch::ProtectionInfo* const prot_info_ = nullptr; + const Checker checker_{}; + size_t idx_ = 0; +}; + +template +class SimpleListTimestampAssigner + : public TimestampAssignerBase, + Checker> { + public: + explicit SimpleListTimestampAssigner(WriteBatch::ProtectionInfo* prot_info, + Checker checker, + const std::vector& timestamps) + : TimestampAssignerBase, Checker>( + prot_info, std::move(checker)), + timestamps_(timestamps) {} + + ~SimpleListTimestampAssigner() override {} + + private: + friend class TimestampAssignerBase, + Checker>; + + Status AssignTimestampImpl(uint32_t cf, const Slice& key, size_t idx) { + if (idx >= timestamps_.size()) { + return Status::InvalidArgument("Need more timestamps for the assignment"); + } + const Slice& ts = timestamps_[idx]; + size_t ts_sz = ts.size(); + const Status s = this->CheckTimestampSize(cf, ts_sz); + if (!s.ok()) { + return s; + } + return this->UpdateTimestampIfNeeded(ts_sz, key, ts); + } + + const std::vector& timestamps_; +}; + +template +class TimestampAssigner + : public TimestampAssignerBase, Checker> { + public: + explicit TimestampAssigner(WriteBatch::ProtectionInfo* prot_info, + Checker checker, const Slice& ts) + : TimestampAssignerBase, Checker>( + prot_info, std::move(checker)), + timestamp_(ts) { + assert(!timestamp_.empty()); + } + ~TimestampAssigner() override {} + + private: + friend class TimestampAssignerBase, Checker>; + + Status AssignTimestampImpl(uint32_t cf, const Slice& key, size_t /*idx*/) { + if (timestamp_.empty()) { + return Status::InvalidArgument("Timestamp is empty"); + } + size_t ts_sz = timestamp_.size(); + const Status s = this->CheckTimestampSize(cf, ts_sz); + if (!s.ok()) { + return s; + } + return this->UpdateTimestampIfNeeded(ts_sz, key, timestamp_); + } + + const Slice timestamp_; +}; + +template +Status WriteBatch::AssignTimestamp(const Slice& ts, Checker checker) { + TimestampAssigner ts_assigner(prot_info_.get(), checker, ts); + return Iterate(&ts_assigner); +} + +template +Status WriteBatch::AssignTimestamps(const std::vector& ts_list, + Checker checker) { + SimpleListTimestampAssigner ts_assigner(prot_info_.get(), checker, + ts_list); + return Iterate(&ts_assigner); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index ecadc297b..ea74e79a3 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -13,6 +13,7 @@ #include "db/db_test_util.h" #include "db/memtable.h" #include "db/write_batch_internal.h" +#include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" @@ -20,6 +21,7 @@ #include "rocksdb/write_buffer_manager.h" #include "table/scoped_arena_iterator.h" #include "test_util/testharness.h" +#include "test_util/testutil.h" #include "util/string_util.h" namespace ROCKSDB_NAMESPACE { @@ -627,13 +629,16 @@ class ColumnFamilyHandleImplDummy : public ColumnFamilyHandleImpl { public: explicit ColumnFamilyHandleImplDummy(int id) : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), id_(id) {} + explicit ColumnFamilyHandleImplDummy(int id, const Comparator* ucmp) + : ColumnFamilyHandleImpl(nullptr, nullptr, nullptr), + id_(id), + ucmp_(ucmp) {} uint32_t GetID() const override { return id_; } - const Comparator* GetComparator() const override { - return BytewiseComparator(); - } + const Comparator* GetComparator() const override { return ucmp_; } private: uint32_t id_; + const Comparator* const ucmp_ = BytewiseComparator(); }; } // namespace anonymous @@ -899,6 +904,159 @@ TEST_F(WriteBatchTest, MemoryLimitTest) { ASSERT_TRUE(s.IsMemoryLimit()); } +namespace { +class TimestampChecker : public WriteBatch::Handler { + public: + explicit TimestampChecker( + std::unordered_map cf_to_ucmps, Slice ts) + : cf_to_ucmps_(std::move(cf_to_ucmps)), timestamp_(std::move(ts)) {} + Status PutCF(uint32_t cf, const Slice& key, const Slice& /*value*/) override { + auto cf_iter = cf_to_ucmps_.find(cf); + if (cf_iter == cf_to_ucmps_.end()) { + return Status::Corruption(); + } + const Comparator* const ucmp = cf_iter->second; + assert(ucmp); + size_t ts_sz = ucmp->timestamp_size(); + if (ts_sz == 0) { + return Status::OK(); + } + if (key.size() < ts_sz) { + return Status::Corruption(); + } + Slice ts = ExtractTimestampFromUserKey(key, ts_sz); + if (ts.compare(timestamp_) != 0) { + return Status::Corruption(); + } + return Status::OK(); + } + + private: + std::unordered_map cf_to_ucmps_; + Slice timestamp_; +}; + +Status CheckTimestampsInWriteBatch( + WriteBatch& wb, Slice timestamp, + std::unordered_map cf_to_ucmps) { + TimestampChecker ts_checker(cf_to_ucmps, timestamp); + return wb.Iterate(&ts_checker); +} +} // namespace + +TEST_F(WriteBatchTest, AssignTimestamps) { + // We assume the last eight bytes of each key is reserved for timestamps. + // Therefore, we must make sure each key is longer than eight bytes. + constexpr size_t key_size = 16; + constexpr size_t num_of_keys = 10; + std::vector key_strs(num_of_keys, std::string(key_size, '\0')); + + ColumnFamilyHandleImplDummy cf0(0); + ColumnFamilyHandleImplDummy cf4(4, test::ComparatorWithU64Ts()); + ColumnFamilyHandleImplDummy cf5(5, test::ComparatorWithU64Ts()); + + const std::unordered_map cf_to_ucmps = { + {0, cf0.GetComparator()}, + {4, cf4.GetComparator()}, + {5, cf5.GetComparator()}}; + + WriteBatch batch; + // Write to the batch. We will assign timestamps later. + for (const auto& key_str : key_strs) { + ASSERT_OK(batch.Put(&cf0, key_str, "value")); + ASSERT_OK(batch.Put(&cf4, key_str, "value")); + ASSERT_OK(batch.Put(&cf5, key_str, "value")); + } + + static constexpr size_t timestamp_size = sizeof(uint64_t); + const auto checker1 = [](uint32_t cf, size_t& ts_sz) { + if (cf == 4 || cf == 5) { + if (ts_sz != timestamp_size) { + return Status::InvalidArgument("Timestamp size mismatch"); + } + } else if (cf == 0) { + ts_sz = 0; + return Status::OK(); + } else { + return Status::Corruption("Invalid cf"); + } + return Status::OK(); + }; + ASSERT_OK( + batch.AssignTimestamp(std::string(timestamp_size, '\xfe'), checker1)); + ASSERT_OK(CheckTimestampsInWriteBatch( + batch, std::string(timestamp_size, '\xfe'), cf_to_ucmps)); + + // We use indexed_cf_to_ucmps, non_indexed_cfs_with_ts and timestamp_size to + // simulate the case in which a transaction enables indexing for some writes + // while disables indexing for other writes. A transaction uses a + // WriteBatchWithIndex object to buffer writes (we consider Write-committed + // policy only). If indexing is enabled, then writes go through + // WriteBatchWithIndex API populating a WBWI internal data structure, i.e. a + // mapping from cf to user comparators. If indexing is disabled, a transaction + // writes directly to the underlying raw WriteBatch. We will need to track the + // comparator information for the column families to which un-indexed writes + // are performed. When calling AssignTimestamp(s) API of WriteBatch, we need + // indexed_cf_to_ucmps, non_indexed_cfs_with_ts, and timestamp_size to perform + // checking. + std::unordered_map indexed_cf_to_ucmps = { + {0, cf0.GetComparator()}, {4, cf4.GetComparator()}}; + std::unordered_set non_indexed_cfs_with_ts = {cf5.GetID()}; + const auto checker2 = [&indexed_cf_to_ucmps, &non_indexed_cfs_with_ts]( + uint32_t cf, size_t& ts_sz) { + if (non_indexed_cfs_with_ts.count(cf) > 0) { + if (ts_sz != timestamp_size) { + return Status::InvalidArgument("Timestamp size mismatch"); + } + return Status::OK(); + } + auto cf_iter = indexed_cf_to_ucmps.find(cf); + if (cf_iter == indexed_cf_to_ucmps.end()) { + return Status::Corruption("Unknown cf"); + } + const Comparator* const ucmp = cf_iter->second; + assert(ucmp); + if (ucmp->timestamp_size() == 0) { + ts_sz = 0; + } else if (ts_sz != ucmp->timestamp_size()) { + return Status::InvalidArgument("Timestamp size mismatch"); + } + return Status::OK(); + }; + ASSERT_OK( + batch.AssignTimestamp(std::string(timestamp_size, '\xef'), checker2)); + ASSERT_OK(CheckTimestampsInWriteBatch( + batch, std::string(timestamp_size, '\xef'), cf_to_ucmps)); + + std::vector ts_strs; + for (size_t i = 0; i < 3 * key_strs.size(); ++i) { + if (0 == (i % 3)) { + ts_strs.emplace_back(); + } else { + ts_strs.emplace_back(std::string(timestamp_size, '\xee')); + } + } + std::vector ts_vec(ts_strs.size()); + for (size_t i = 0; i < ts_vec.size(); ++i) { + ts_vec[i] = ts_strs[i]; + } + const auto checker3 = [&cf_to_ucmps](uint32_t cf, size_t& ts_sz) { + auto cf_iter = cf_to_ucmps.find(cf); + if (cf_iter == cf_to_ucmps.end()) { + return Status::Corruption("Invalid cf"); + } + const Comparator* const ucmp = cf_iter->second; + assert(ucmp); + if (ucmp->timestamp_size() != ts_sz) { + return Status::InvalidArgument("Timestamp size mismatch"); + } + return Status::OK(); + }; + ASSERT_OK(batch.AssignTimestamps(ts_vec, checker3)); + ASSERT_OK(CheckTimestampsInWriteBatch( + batch, std::string(timestamp_size, '\xee'), cf_to_ucmps)); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index e7dcc7ba2..8d03a74e3 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -25,10 +25,13 @@ #pragma once #include + #include +#include #include #include #include + #include "rocksdb/status.h" #include "rocksdb/write_batch_base.h" @@ -333,18 +336,43 @@ class WriteBatch : public WriteBatchBase { // Returns true if MarkRollback will be called during Iterate bool HasRollback() const; - // Assign timestamp to write batch. - // This requires that all keys (possibly from multiple column families) in - // the write batch have timestamps of the same format. - Status AssignTimestamp(const Slice& ts); + struct TimestampChecker final { + Status operator()(uint32_t /*cf*/, size_t& /*ts_sz*/) const { + return Status::OK(); + } + }; + // Experimental. + // Assign timestamp to write batch. + // This requires that all keys, if enable timestamp, (possibly from multiple + // column families) in the write batch have timestamps of the same format. + // checker: callable object to check the timestamp sizes of column families. + // User can call checker(uint32_t cf, size_t& ts_sz) which does the + // following: + // 1. find out the timestamp size of cf. + // 2. if cf's timestamp size is 0, then set ts_sz to 0 and return OK. + // 3. otherwise, compare ts_sz with cf's timestamp size and return + // Status::InvalidArgument() if different. + template + Status AssignTimestamp(const Slice& ts, Checker checker = Checker()); + + // Experimental. // Assign timestamps to write batch. // This API allows the write batch to include keys from multiple column // families whose timestamps' formats can differ. For example, some column // families can enable timestamp, while others disable the feature. // If key does not have timestamp, then put an empty Slice in ts_list as // a placeholder. - Status AssignTimestamps(const std::vector& ts_list); + // checker: callable object specified by caller to check the timestamp sizes + // of column families. + // User can call checker(uint32_t cf, size_t& ts_sz) which does the + // following: + // 1. find out the timestamp size of cf. + // 2. compare ts_sz with cf's timestamp size and return + // Status::InvalidArgument() if different. + template + Status AssignTimestamps(const std::vector& ts_list, + Checker checker = Checker()); using WriteBatchBase::GetWriteBatch; WriteBatch* GetWriteBatch() override { return this; }