Allow WriteBatch to have keys with different timestamp sizes (#8725)

Summary:
In the past, we unnecessarily requires all keys in the same write batch
to be from column families whose timestamps' formats are the same for
simplicity. Specifically, we cannot use the same write batch to write to
two column families, one of which enables timestamp while the other
disables it.

The limitation is due to the member `timestamp_size_` that used to exist
in each `WriteBatch` object. We pass a timestamp_size to the constructor
of `WriteBatch`. Therefore, users can simply use the old
`WriteBatch::Put()`, `WriteBatch::Delete()`, etc APIs for write, while
the internal implementation of `WriteBatch` will take care of memory
allocation for timestamps.

The above is not necessary.
One the one hand, users can set up a memory buffer to store user key and
then contiguously append the timestamp to the user key. Then the user
can pass this buffer to the `WriteBatch::Put(Slice&)` API.
On the other hand, users can set up a SliceParts object which is an
array of Slices and let the last Slice to point to the memory buffer
storing timestamp. Then the user can pass the SliceParts object to the
`WriteBatch::Put(SliceParts&)` API.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8725

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D30654499

Pulled By: riversand963

fbshipit-source-id: 9d848c77ad3c9dd629aa5fc4e2bc16fb0687b4a2
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent 5f40b05c98
commit 2a2b3e03a5
  1. 1
      HISTORY.md
  2. 39
      db/db_impl/db_impl_write.cc
  3. 10
      db/db_kv_checksum_test.cc
  4. 102
      db/db_with_timestamp_basic_test.cc
  5. 23
      db/kv_checksum.h
  6. 122
      db/write_batch.cc
  7. 4
      db_stress_tool/batched_ops_stress.cc
  8. 31
      include/rocksdb/write_batch.h

@ -17,6 +17,7 @@
* Added properties for BlobDB: `rocksdb.num-blob-files`, `rocksdb.blob-stats`, `rocksdb.total-blob-file-size`, and `rocksdb.live-blob-file-size`. The existing property `rocksdb.estimate_live-data-size` was also extended to include live bytes residing in blob files. * Added properties for BlobDB: `rocksdb.num-blob-files`, `rocksdb.blob-stats`, `rocksdb.total-blob-file-size`, and `rocksdb.live-blob-file-size`. The existing property `rocksdb.estimate_live-data-size` was also extended to include live bytes residing in blob files.
* Added two new RateLimiter IOPriorities: `Env::IO_USER`,`Env::IO_MID`. `Env::IO_USER` will have superior priority over all other RateLimiter IOPriorities without being subject to fair scheduling constraint. * Added two new RateLimiter IOPriorities: `Env::IO_USER`,`Env::IO_MID`. `Env::IO_USER` will have superior priority over all other RateLimiter IOPriorities without being subject to fair scheduling constraint.
* `SstFileWriter` now supports `Put`s and `Delete`s with user-defined timestamps. Note that the ingestion logic itself is not timestamp-aware yet. * `SstFileWriter` now supports `Put`s and `Delete`s with user-defined timestamps. Note that the ingestion logic itself is not timestamp-aware yet.
* Allow a single write batch to include keys from multiple column families whose timestamps' formats can differ. For example, some column families may disable timestamp, while others enable timestamp.
### Public API change ### Public API change
* Remove obsolete implementation details FullKey and ParseFullKey from public API * Remove obsolete implementation details FullKey and ParseFullKey from public API

@ -1998,13 +1998,18 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
size_t ts_sz = ts->size(); size_t ts_sz = ts->size();
assert(column_family->GetComparator()); assert(column_family->GetComparator());
assert(ts_sz == column_family->GetComparator()->timestamp_size()); assert(ts_sz == column_family->GetComparator()->timestamp_size());
WriteBatch batch(key.size() + ts_sz + value.size() + 24, /*max_bytes=*/0, WriteBatch batch;
ts_sz); Status s;
Status s = batch.Put(column_family, key, value); if (key.data() + key.size() == ts->data()) {
if (!s.ok()) { Slice key_with_ts = Slice(key.data(), key.size() + ts_sz);
return s; s = batch.Put(column_family, key_with_ts, value);
} else {
std::array<Slice, 2> key_with_ts_slices{{key, *ts}};
SliceParts key_with_ts(key_with_ts_slices.data(), 2);
std::array<Slice, 1> value_slices{{value}};
SliceParts values(value_slices.data(), 1);
s = batch.Put(column_family, key_with_ts, values);
} }
s = batch.AssignTimestamp(*ts);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -2023,17 +2028,19 @@ Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
} }
const Slice* ts = opt.timestamp; const Slice* ts = opt.timestamp;
assert(ts != nullptr); assert(ts != nullptr);
const size_t ts_sz = ts->size(); size_t ts_sz = ts->size();
constexpr size_t kKeyAndValueLenSize = 11; assert(column_family->GetComparator());
constexpr size_t kWriteBatchOverhead = assert(ts_sz == column_family->GetComparator()->timestamp_size());
WriteBatchInternal::kHeader + sizeof(ValueType) + kKeyAndValueLenSize; WriteBatch batch;
WriteBatch batch(key.size() + ts_sz + kWriteBatchOverhead, /*max_bytes=*/0, Status s;
ts_sz); if (key.data() + key.size() == ts->data()) {
Status s = batch.Delete(column_family, key); Slice key_with_ts = Slice(key.data(), key.size() + ts_sz);
if (!s.ok()) { s = batch.Delete(column_family, key_with_ts);
return s; } else {
std::array<Slice, 2> key_with_ts_slices{{key, *ts}};
SliceParts key_with_ts(key_with_ts_slices.data(), 2);
s = batch.Delete(column_family, key_with_ts);
} }
s = batch.AssignTimestamp(*ts);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }

@ -34,10 +34,9 @@ class DbKvChecksumTest
corrupt_byte_addend_ = std::get<1>(GetParam()); corrupt_byte_addend_ = std::get<1>(GetParam());
} }
std::pair<WriteBatch, Status> GetWriteBatch(size_t ts_sz, std::pair<WriteBatch, Status> GetWriteBatch(ColumnFamilyHandle* cf_handle) {
ColumnFamilyHandle* cf_handle) {
Status s; Status s;
WriteBatch wb(0 /* reserved_bytes */, 0 /* max_bytes */, ts_sz, WriteBatch wb(0 /* reserved_bytes */, 0 /* max_bytes */,
8 /* protection_bytes_per_entry */); 8 /* protection_bytes_per_entry */);
switch (op_type_) { switch (op_type_) {
case WriteBatchOpType::kPut: case WriteBatchOpType::kPut:
@ -151,8 +150,7 @@ TEST_P(DbKvChecksumTest, MemTableAddCorrupted) {
Reopen(options); Reopen(options);
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
auto batch_and_status = auto batch_and_status = GetWriteBatch(nullptr /* cf_handle */);
GetWriteBatch(0 /* ts_sz */, nullptr /* cf_handle */);
ASSERT_OK(batch_and_status.second); ASSERT_OK(batch_and_status.second);
ASSERT_TRUE( ASSERT_TRUE(
db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption()); db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());
@ -183,7 +181,7 @@ TEST_P(DbKvChecksumTest, MemTableAddWithColumnFamilyCorrupted) {
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
auto batch_and_status = GetWriteBatch(0 /* ts_sz */, handles_[1]); auto batch_and_status = GetWriteBatch(handles_[1]);
ASSERT_OK(batch_and_status.second); ASSERT_OK(batch_and_status.second);
ASSERT_TRUE( ASSERT_TRUE(
db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption()); db_->Write(WriteOptions(), &batch_and_status.first).IsCorruption());

@ -196,6 +196,69 @@ class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
: DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {} : DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {}
}; };
TEST_F(DBBasicTestWithTimestamp, MixedCfs) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.avoid_flush_during_shutdown = true;
DestroyAndReopen(options);
Options options1 = CurrentOptions();
options1.env = env_;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options1.comparator = &test_cmp;
ColumnFamilyHandle* handle = nullptr;
Status s = db_->CreateColumnFamily(options1, "data", &handle);
ASSERT_OK(s);
WriteBatch wb;
ASSERT_OK(wb.Put("a", "value"));
{
std::string key("a");
std::string ts(kTimestampSize, '\0');
std::array<Slice, 2> key_with_ts_slices{{key, ts}};
SliceParts key_with_ts(key_with_ts_slices.data(), 2);
std::string value_str("value");
Slice value_slice(value_str.data(), value_str.size());
SliceParts value(&value_slice, 1);
ASSERT_OK(wb.Put(handle, key_with_ts, value));
}
{
std::string ts = Timestamp(1, 0);
std::vector<Slice> ts_list({Slice(), ts});
ASSERT_OK(wb.AssignTimestamps(ts_list));
ASSERT_OK(db_->Write(WriteOptions(), &wb));
}
const auto verify_db = [this](ColumnFamilyHandle* h) {
ASSERT_EQ("value", Get("a"));
std::string ts = Timestamp(1, 0);
Slice read_ts_slice(ts);
ReadOptions read_opts;
read_opts.timestamp = &read_ts_slice;
std::string value;
ASSERT_OK(db_->Get(read_opts, h, "a", &value));
ASSERT_EQ("value", value);
};
verify_db(handle);
delete handle;
Close();
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back("data", options1);
options.create_if_missing = false;
s = DB::Open(options, dbname_, cf_descs, &handles_, &db_);
ASSERT_OK(s);
verify_db(handles_[1]);
Close();
}
TEST_F(DBBasicTestWithTimestamp, CompactRangeWithSpecifiedRange) { TEST_F(DBBasicTestWithTimestamp, CompactRangeWithSpecifiedRange) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = env_; options.env = env_;
@ -766,11 +829,15 @@ TEST_F(DBBasicTestWithTimestamp, ChangeIterationDirection) {
const std::vector<std::tuple<std::string, std::string>> kvs = { const std::vector<std::tuple<std::string, std::string>> kvs = {
std::make_tuple("aa", "value1"), std::make_tuple("ab", "value2")}; std::make_tuple("aa", "value1"), std::make_tuple("ab", "value2")};
for (const auto& ts : timestamps) { for (const auto& ts : timestamps) {
WriteBatch wb(0, 0, kTimestampSize); WriteBatch wb;
for (const auto& kv : kvs) { for (const auto& kv : kvs) {
const std::string& key = std::get<0>(kv); const std::string& key = std::get<0>(kv);
const std::string& value = std::get<1>(kv); const std::string& value = std::get<1>(kv);
ASSERT_OK(wb.Put(key, value)); std::array<Slice, 2> key_with_ts_slices{{Slice(key), Slice(ts)}};
SliceParts key_with_ts(key_with_ts_slices.data(), 2);
std::array<Slice, 1> value_slices{{Slice(value)}};
SliceParts values(value_slices.data(), 1);
ASSERT_OK(wb.Put(key_with_ts, values));
} }
ASSERT_OK(wb.AssignTimestamp(ts)); ASSERT_OK(wb.AssignTimestamp(ts));
@ -1072,9 +1139,20 @@ TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) {
} }
{ {
std::string ts_str = Timestamp(static_cast<uint64_t>(kNumKeys + 1), 0); std::string ts_str = Timestamp(static_cast<uint64_t>(kNumKeys + 1), 0);
WriteBatch batch(0, 0, kTimestampSize); WriteBatch batch;
ASSERT_OK(batch.Put("a", "new_value")); const std::string dummy_ts(kTimestampSize, '\0');
ASSERT_OK(batch.Put("b", "new_value")); {
std::array<Slice, 2> key_with_ts_slices{{"a", dummy_ts}};
SliceParts key_with_ts(key_with_ts_slices.data(), 2);
std::array<Slice, 1> value_slices{{"new_value"}};
SliceParts values(value_slices.data(), 1);
ASSERT_OK(batch.Put(key_with_ts, values));
}
{
std::string key_with_ts("b");
key_with_ts.append(dummy_ts);
ASSERT_OK(batch.Put(key_with_ts, "new_value"));
}
s = batch.AssignTimestamp(ts_str); s = batch.AssignTimestamp(ts_str);
ASSERT_OK(s); ASSERT_OK(s);
s = db_->Write(write_opts, &batch); s = db_->Write(write_opts, &batch);
@ -2615,17 +2693,23 @@ TEST_F(DBBasicTestWithTimestamp, BatchWriteAndMultiGet) {
} }
}; };
const std::string dummy_ts(ts_sz, '\0');
for (size_t i = 0; i != kNumTimestamps; ++i) { for (size_t i = 0; i != kNumTimestamps; ++i) {
write_ts_list.push_back(Timestamp(i * 2, 0)); write_ts_list.push_back(Timestamp(i * 2, 0));
read_ts_list.push_back(Timestamp(1 + i * 2, 0)); read_ts_list.push_back(Timestamp(1 + i * 2, 0));
const Slice& write_ts = write_ts_list.back(); const Slice& write_ts = write_ts_list.back();
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) { for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
WriteOptions wopts; WriteOptions wopts;
WriteBatch batch(0, 0, ts_sz); WriteBatch batch;
for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
ASSERT_OK( const std::string key = Key1(j);
batch.Put(handles_[cf], Key1(j), const std::string value =
"value_" + std::to_string(j) + "_" + std::to_string(i))); "value_" + std::to_string(j) + "_" + std::to_string(i);
std::array<Slice, 2> key_with_ts_slices{{key, dummy_ts}};
SliceParts key_with_ts(key_with_ts_slices.data(), 2);
std::array<Slice, 1> value_slices{{value}};
SliceParts values(value_slices.data(), 1);
ASSERT_OK(batch.Put(handles_[cf], key_with_ts, values));
} }
ASSERT_OK(batch.AssignTimestamp(write_ts)); ASSERT_OK(batch.AssignTimestamp(write_ts));
ASSERT_OK(db_->Write(wopts, &batch)); ASSERT_OK(db_->Write(wopts, &batch));

@ -61,11 +61,10 @@ class ProtectionInfo {
Status GetStatus() const; Status GetStatus() const;
ProtectionInfoKVOT<T> ProtectKVOT(const Slice& key, const Slice& value, ProtectionInfoKVOT<T> ProtectKVOT(const Slice& key, const Slice& value,
ValueType op_type, ValueType op_type) const;
const Slice& timestamp) const;
ProtectionInfoKVOT<T> ProtectKVOT(const SliceParts& key, ProtectionInfoKVOT<T> ProtectKVOT(const SliceParts& key,
const SliceParts& value, ValueType op_type, const SliceParts& value,
const Slice& timestamp) const; ValueType op_type) const;
private: private:
friend class ProtectionInfoKVOT<T>; friend class ProtectionInfoKVOT<T>;
@ -222,9 +221,9 @@ Status ProtectionInfo<T>::GetStatus() const {
} }
template <typename T> template <typename T>
ProtectionInfoKVOT<T> ProtectionInfo<T>::ProtectKVOT( ProtectionInfoKVOT<T> ProtectionInfo<T>::ProtectKVOT(const Slice& key,
const Slice& key, const Slice& value, ValueType op_type, const Slice& value,
const Slice& timestamp) const { ValueType op_type) const {
T val = GetVal(); T val = GetVal();
val = val ^ static_cast<T>(GetSliceNPHash64(key, ProtectionInfo<T>::kSeedK)); val = val ^ static_cast<T>(GetSliceNPHash64(key, ProtectionInfo<T>::kSeedK));
val = val =
@ -232,15 +231,13 @@ ProtectionInfoKVOT<T> ProtectionInfo<T>::ProtectKVOT(
val = val ^ val = val ^
static_cast<T>(NPHash64(reinterpret_cast<char*>(&op_type), static_cast<T>(NPHash64(reinterpret_cast<char*>(&op_type),
sizeof(op_type), ProtectionInfo<T>::kSeedO)); sizeof(op_type), ProtectionInfo<T>::kSeedO));
val = val ^
static_cast<T>(GetSliceNPHash64(timestamp, ProtectionInfo<T>::kSeedT));
return ProtectionInfoKVOT<T>(val); return ProtectionInfoKVOT<T>(val);
} }
template <typename T> template <typename T>
ProtectionInfoKVOT<T> ProtectionInfo<T>::ProtectKVOT( ProtectionInfoKVOT<T> ProtectionInfo<T>::ProtectKVOT(const SliceParts& key,
const SliceParts& key, const SliceParts& value, ValueType op_type, const SliceParts& value,
const Slice& timestamp) const { ValueType op_type) const {
T val = GetVal(); T val = GetVal();
val = val ^ val = val ^
static_cast<T>(GetSlicePartsNPHash64(key, ProtectionInfo<T>::kSeedK)); static_cast<T>(GetSlicePartsNPHash64(key, ProtectionInfo<T>::kSeedK));
@ -249,8 +246,6 @@ ProtectionInfoKVOT<T> ProtectionInfo<T>::ProtectKVOT(
val = val ^ val = val ^
static_cast<T>(NPHash64(reinterpret_cast<char*>(&op_type), static_cast<T>(NPHash64(reinterpret_cast<char*>(&op_type),
sizeof(op_type), ProtectionInfo<T>::kSeedO)); sizeof(op_type), ProtectionInfo<T>::kSeedO));
val = val ^
static_cast<T>(GetSliceNPHash64(timestamp, ProtectionInfo<T>::kSeedT));
return ProtectionInfoKVOT<T>(val); return ProtectionInfoKVOT<T>(val);
} }

@ -149,9 +149,7 @@ class TimestampAssigner : public WriteBatch::Handler {
prot_info_(prot_info) {} prot_info_(prot_info) {}
explicit TimestampAssigner(const std::vector<Slice>& ts_list, explicit TimestampAssigner(const std::vector<Slice>& ts_list,
WriteBatch::ProtectionInfo* prot_info) WriteBatch::ProtectionInfo* prot_info)
: timestamps_(ts_list), prot_info_(prot_info) { : timestamps_(ts_list), prot_info_(prot_info) {}
SanityCheck();
}
~TimestampAssigner() override {} ~TimestampAssigner() override {}
Status PutCF(uint32_t, const Slice& key, const Slice&) override { Status PutCF(uint32_t, const Slice& key, const Slice&) override {
@ -211,20 +209,14 @@ class TimestampAssigner : public WriteBatch::Handler {
} }
private: private:
void SanityCheck() const {
assert(!timestamps_.empty());
#ifndef NDEBUG
const size_t ts_sz = timestamps_[0].size();
for (size_t i = 1; i != timestamps_.size(); ++i) {
assert(ts_sz == timestamps_[i].size());
}
#endif // !NDEBUG
}
void AssignTimestamp(const Slice& key) { void AssignTimestamp(const Slice& key) {
assert(timestamps_.empty() || idx_ < timestamps_.size()); assert(timestamps_.empty() || idx_ < timestamps_.size());
const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_]; const Slice& ts = timestamps_.empty() ? timestamp_ : timestamps_[idx_];
size_t ts_sz = ts.size(); size_t ts_sz = ts.size();
if (ts_sz == 0) {
// This key does not have timestamp, so skip.
return;
}
char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz); char* ptr = const_cast<char*>(key.data() + key.size() - ts_sz);
if (prot_info_ != nullptr) { if (prot_info_ != nullptr) {
Slice old_ts(ptr, ts_sz), new_ts(ts.data(), ts_sz); Slice old_ts(ptr, ts_sz), new_ts(ts.data(), ts_sz);
@ -254,23 +246,16 @@ struct SavePoints {
}; };
WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes) WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
: content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(0) { : content_flags_(0), max_bytes_(max_bytes), rep_() {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
? reserved_bytes ? reserved_bytes
: WriteBatchInternal::kHeader); : WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader); rep_.resize(WriteBatchInternal::kHeader);
} }
WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz) WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes,
: content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
reserved_bytes : WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
}
WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz,
size_t protection_bytes_per_key) size_t protection_bytes_per_key)
: content_flags_(0), max_bytes_(max_bytes), rep_(), timestamp_size_(ts_sz) { : content_flags_(0), max_bytes_(max_bytes), rep_() {
// Currently `protection_bytes_per_key` can only be enabled at 8 bytes per // Currently `protection_bytes_per_key` can only be enabled at 8 bytes per
// entry. // entry.
assert(protection_bytes_per_key == 0 || protection_bytes_per_key == 8); assert(protection_bytes_per_key == 0 || protection_bytes_per_key == 8);
@ -284,23 +269,18 @@ WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz,
} }
WriteBatch::WriteBatch(const std::string& rep) WriteBatch::WriteBatch(const std::string& rep)
: content_flags_(ContentFlags::DEFERRED), : content_flags_(ContentFlags::DEFERRED), max_bytes_(0), rep_(rep) {}
max_bytes_(0),
rep_(rep),
timestamp_size_(0) {}
WriteBatch::WriteBatch(std::string&& rep) WriteBatch::WriteBatch(std::string&& rep)
: content_flags_(ContentFlags::DEFERRED), : content_flags_(ContentFlags::DEFERRED),
max_bytes_(0), max_bytes_(0),
rep_(std::move(rep)), rep_(std::move(rep)) {}
timestamp_size_(0) {}
WriteBatch::WriteBatch(const WriteBatch& src) WriteBatch::WriteBatch(const WriteBatch& src)
: wal_term_point_(src.wal_term_point_), : wal_term_point_(src.wal_term_point_),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)), content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
max_bytes_(src.max_bytes_), max_bytes_(src.max_bytes_),
rep_(src.rep_), rep_(src.rep_) {
timestamp_size_(src.timestamp_size_) {
if (src.save_points_ != nullptr) { if (src.save_points_ != nullptr) {
save_points_.reset(new SavePoints()); save_points_.reset(new SavePoints());
save_points_->stack = src.save_points_->stack; save_points_->stack = src.save_points_->stack;
@ -317,8 +297,7 @@ WriteBatch::WriteBatch(WriteBatch&& src) noexcept
content_flags_(src.content_flags_.load(std::memory_order_relaxed)), content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
max_bytes_(src.max_bytes_), max_bytes_(src.max_bytes_),
prot_info_(std::move(src.prot_info_)), prot_info_(std::move(src.prot_info_)),
rep_(std::move(src.rep_)), rep_(std::move(src.rep_)) {}
timestamp_size_(src.timestamp_size_) {}
WriteBatch& WriteBatch::operator=(const WriteBatch& src) { WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
if (&src != this) { if (&src != this) {
@ -817,15 +796,7 @@ Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue)); b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id); PutVarint32(&b->rep_, column_family_id);
} }
std::string timestamp(b->timestamp_size_, '\0'); PutLengthPrefixedSlice(&b->rep_, key);
if (0 == b->timestamp_size_) {
PutLengthPrefixedSlice(&b->rep_, key);
} else {
PutVarint32(&b->rep_,
static_cast<uint32_t>(key.size() + b->timestamp_size_));
b->rep_.append(key.data(), key.size());
b->rep_.append(timestamp);
}
PutLengthPrefixedSlice(&b->rep_, value); PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store( b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
@ -839,7 +810,7 @@ Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
// inserted into memtable. // inserted into memtable.
b->prot_info_->entries_.emplace_back( b->prot_info_->entries_.emplace_back(
ProtectionInfo64() ProtectionInfo64()
.ProtectKVOT(key, value, kTypeValue, timestamp) .ProtectKVOT(key, value, kTypeValue)
.ProtectC(column_family_id)); .ProtectC(column_family_id));
} }
return save.commit(); return save.commit();
@ -886,12 +857,7 @@ Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue)); b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id); PutVarint32(&b->rep_, column_family_id);
} }
std::string timestamp(b->timestamp_size_, '\0'); PutLengthPrefixedSliceParts(&b->rep_, key);
if (0 == b->timestamp_size_) {
PutLengthPrefixedSliceParts(&b->rep_, key);
} else {
PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
}
PutLengthPrefixedSliceParts(&b->rep_, value); PutLengthPrefixedSliceParts(&b->rep_, value);
b->content_flags_.store( b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT, b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
@ -901,7 +867,7 @@ Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
// `ValueType` argument passed to `ProtectKVOT()`. // `ValueType` argument passed to `ProtectKVOT()`.
b->prot_info_->entries_.emplace_back( b->prot_info_->entries_.emplace_back(
ProtectionInfo64() ProtectionInfo64()
.ProtectKVOT(key, value, kTypeValue, timestamp) .ProtectKVOT(key, value, kTypeValue)
.ProtectC(column_family_id)); .ProtectC(column_family_id));
} }
return save.commit(); return save.commit();
@ -978,15 +944,7 @@ Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion)); b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&b->rep_, column_family_id); PutVarint32(&b->rep_, column_family_id);
} }
std::string timestamp(b->timestamp_size_, '\0'); PutLengthPrefixedSlice(&b->rep_, key);
if (0 == b->timestamp_size_) {
PutLengthPrefixedSlice(&b->rep_, key);
} else {
PutVarint32(&b->rep_,
static_cast<uint32_t>(key.size() + b->timestamp_size_));
b->rep_.append(key.data(), key.size());
b->rep_.append(timestamp);
}
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE, ContentFlags::HAS_DELETE,
std::memory_order_relaxed); std::memory_order_relaxed);
@ -995,7 +953,7 @@ Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
// `ValueType` argument passed to `ProtectKVOT()`. // `ValueType` argument passed to `ProtectKVOT()`.
b->prot_info_->entries_.emplace_back( b->prot_info_->entries_.emplace_back(
ProtectionInfo64() ProtectionInfo64()
.ProtectKVOT(key, "" /* value */, kTypeDeletion, timestamp) .ProtectKVOT(key, "" /* value */, kTypeDeletion)
.ProtectC(column_family_id)); .ProtectC(column_family_id));
} }
return save.commit(); return save.commit();
@ -1016,12 +974,7 @@ Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion)); b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&b->rep_, column_family_id); PutVarint32(&b->rep_, column_family_id);
} }
std::string timestamp(b->timestamp_size_, '\0'); PutLengthPrefixedSliceParts(&b->rep_, key);
if (0 == b->timestamp_size_) {
PutLengthPrefixedSliceParts(&b->rep_, key);
} else {
PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
}
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE, ContentFlags::HAS_DELETE,
std::memory_order_relaxed); std::memory_order_relaxed);
@ -1032,7 +985,7 @@ Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
ProtectionInfo64() ProtectionInfo64()
.ProtectKVOT(key, .ProtectKVOT(key,
SliceParts(nullptr /* _parts */, 0 /* _num_parts */), SliceParts(nullptr /* _parts */, 0 /* _num_parts */),
kTypeDeletion, timestamp) kTypeDeletion)
.ProtectC(column_family_id)); .ProtectC(column_family_id));
} }
return save.commit(); return save.commit();
@ -1062,11 +1015,10 @@ Status WriteBatchInternal::SingleDelete(WriteBatch* b,
if (b->prot_info_ != nullptr) { if (b->prot_info_ != nullptr) {
// See comment in first `WriteBatchInternal::Put()` overload concerning the // See comment in first `WriteBatchInternal::Put()` overload concerning the
// `ValueType` argument passed to `ProtectKVOT()`. // `ValueType` argument passed to `ProtectKVOT()`.
b->prot_info_->entries_.emplace_back(ProtectionInfo64() b->prot_info_->entries_.emplace_back(
.ProtectKVOT(key, "" /* value */, ProtectionInfo64()
kTypeSingleDeletion, .ProtectKVOT(key, "" /* value */, kTypeSingleDeletion)
"" /* timestamp */) .ProtectC(column_family_id));
.ProtectC(column_family_id));
} }
return save.commit(); return save.commit();
} }
@ -1100,7 +1052,7 @@ Status WriteBatchInternal::SingleDelete(WriteBatch* b,
.ProtectKVOT(key, .ProtectKVOT(key,
SliceParts(nullptr /* _parts */, SliceParts(nullptr /* _parts */,
0 /* _num_parts */) /* value */, 0 /* _num_parts */) /* value */,
kTypeSingleDeletion, "" /* timestamp */) kTypeSingleDeletion)
.ProtectC(column_family_id)); .ProtectC(column_family_id));
} }
return save.commit(); return save.commit();
@ -1132,11 +1084,10 @@ Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
// See comment in first `WriteBatchInternal::Put()` overload concerning the // See comment in first `WriteBatchInternal::Put()` overload concerning the
// `ValueType` argument passed to `ProtectKVOT()`. // `ValueType` argument passed to `ProtectKVOT()`.
// In `DeleteRange()`, the end key is treated as the value. // In `DeleteRange()`, the end key is treated as the value.
b->prot_info_->entries_.emplace_back(ProtectionInfo64() b->prot_info_->entries_.emplace_back(
.ProtectKVOT(begin_key, end_key, ProtectionInfo64()
kTypeRangeDeletion, .ProtectKVOT(begin_key, end_key, kTypeRangeDeletion)
"" /* timestamp */) .ProtectC(column_family_id));
.ProtectC(column_family_id));
} }
return save.commit(); return save.commit();
} }
@ -1167,11 +1118,10 @@ Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
// See comment in first `WriteBatchInternal::Put()` overload concerning the // See comment in first `WriteBatchInternal::Put()` overload concerning the
// `ValueType` argument passed to `ProtectKVOT()`. // `ValueType` argument passed to `ProtectKVOT()`.
// In `DeleteRange()`, the end key is treated as the value. // In `DeleteRange()`, the end key is treated as the value.
b->prot_info_->entries_.emplace_back(ProtectionInfo64() b->prot_info_->entries_.emplace_back(
.ProtectKVOT(begin_key, end_key, ProtectionInfo64()
kTypeRangeDeletion, .ProtectKVOT(begin_key, end_key, kTypeRangeDeletion)
"" /* timestamp */) .ProtectC(column_family_id));
.ProtectC(column_family_id));
} }
return save.commit(); return save.commit();
} }
@ -1210,7 +1160,7 @@ Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
// `ValueType` argument passed to `ProtectKVOT()`. // `ValueType` argument passed to `ProtectKVOT()`.
b->prot_info_->entries_.emplace_back( b->prot_info_->entries_.emplace_back(
ProtectionInfo64() ProtectionInfo64()
.ProtectKVOT(key, value, kTypeMerge, "" /* timestamp */) .ProtectKVOT(key, value, kTypeMerge)
.ProtectC(column_family_id)); .ProtectC(column_family_id));
} }
return save.commit(); return save.commit();
@ -1248,7 +1198,7 @@ Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
// `ValueType` argument passed to `ProtectKVOT()`. // `ValueType` argument passed to `ProtectKVOT()`.
b->prot_info_->entries_.emplace_back( b->prot_info_->entries_.emplace_back(
ProtectionInfo64() ProtectionInfo64()
.ProtectKVOT(key, value, kTypeMerge, "" /* timestamp */) .ProtectKVOT(key, value, kTypeMerge)
.ProtectC(column_family_id)); .ProtectC(column_family_id));
} }
return save.commit(); return save.commit();
@ -1281,7 +1231,7 @@ Status WriteBatchInternal::PutBlobIndex(WriteBatch* b,
// `ValueType` argument passed to `ProtectKVOT()`. // `ValueType` argument passed to `ProtectKVOT()`.
b->prot_info_->entries_.emplace_back( b->prot_info_->entries_.emplace_back(
ProtectionInfo64() ProtectionInfo64()
.ProtectKVOT(key, value, kTypeBlobIndex, "" /* timestamp */) .ProtectKVOT(key, value, kTypeBlobIndex)
.ProtectC(column_family_id)); .ProtectC(column_family_id));
} }
return save.commit(); return save.commit();

@ -31,7 +31,7 @@ class BatchedOpsStressTest : public StressTest {
std::string keys[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"}; std::string keys[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"};
std::string values[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"}; std::string values[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"};
Slice value_slices[10]; Slice value_slices[10];
WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, 0 /* ts_sz */, WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
FLAGS_batch_protection_bytes_per_key); FLAGS_batch_protection_bytes_per_key);
Status s; Status s;
auto cfh = column_families_[rand_column_families[0]]; auto cfh = column_families_[rand_column_families[0]];
@ -67,7 +67,7 @@ class BatchedOpsStressTest : public StressTest {
std::unique_ptr<MutexLock>& /* lock */) override { std::unique_ptr<MutexLock>& /* lock */) override {
std::string keys[10] = {"9", "7", "5", "3", "1", "8", "6", "4", "2", "0"}; std::string keys[10] = {"9", "7", "5", "3", "1", "8", "6", "4", "2", "0"};
WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, 0 /* ts_sz */, WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
FLAGS_batch_protection_bytes_per_key); FLAGS_batch_protection_bytes_per_key);
Status s; Status s;
auto cfh = column_families_[rand_column_families[0]]; auto cfh = column_families_[rand_column_families[0]];

@ -61,16 +61,19 @@ struct SavePoint {
class WriteBatch : public WriteBatchBase { class WriteBatch : public WriteBatchBase {
public: public:
explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0); explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0);
explicit WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz);
// `protection_bytes_per_key` is the number of bytes used to store // `protection_bytes_per_key` is the number of bytes used to store
// protection information for each key entry. Currently supported values are // protection information for each key entry. Currently supported values are
// zero (disabled) and eight. // zero (disabled) and eight.
explicit WriteBatch(size_t reserved_bytes, size_t max_bytes, size_t ts_sz, explicit WriteBatch(size_t reserved_bytes, size_t max_bytes,
size_t protection_bytes_per_key); size_t protection_bytes_per_key);
~WriteBatch() override; ~WriteBatch() override;
using WriteBatchBase::Put; using WriteBatchBase::Put;
// Store the mapping "key->value" in the database. // Store the mapping "key->value" in the database.
// The following Put(..., const Slice& key, ...) API can also be used when
// user-defined timestamp is enabled as long as `key` points to a contiguous
// buffer with timestamp appended after user key. The caller is responsible
// for setting up the memory buffer pointed to by `key`.
Status Put(ColumnFamilyHandle* column_family, const Slice& key, Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override; const Slice& value) override;
Status Put(const Slice& key, const Slice& value) override { Status Put(const Slice& key, const Slice& value) override {
@ -80,6 +83,10 @@ class WriteBatch : public WriteBatchBase {
// Variant of Put() that gathers output like writev(2). The key and value // Variant of Put() that gathers output like writev(2). The key and value
// that will be written to the database are concatenations of arrays of // that will be written to the database are concatenations of arrays of
// slices. // slices.
// The following Put(..., const SliceParts& key, ...) API can be used when
// user-defined timestamp is enabled as long as the timestamp is the last
// Slice in `key`, a SliceParts (array of Slices). The caller is responsible
// for setting up the `key` SliceParts object.
Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) override; const SliceParts& value) override;
Status Put(const SliceParts& key, const SliceParts& value) override { Status Put(const SliceParts& key, const SliceParts& value) override {
@ -88,10 +95,18 @@ class WriteBatch : public WriteBatchBase {
using WriteBatchBase::Delete; using WriteBatchBase::Delete;
// If the database contains a mapping for "key", erase it. Else do nothing. // If the database contains a mapping for "key", erase it. Else do nothing.
// The following Delete(..., const Slice& key) can be used when user-defined
// timestamp is enabled as long as `key` points to a contiguous buffer with
// timestamp appended after user key. The caller is responsible for setting
// up the memory buffer pointed to by `key`.
Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override; Status Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
Status Delete(const Slice& key) override { return Delete(nullptr, key); } Status Delete(const Slice& key) override { return Delete(nullptr, key); }
// variant that takes SliceParts // variant that takes SliceParts
// These two variants of Delete(..., const SliceParts& key) can be used when
// user-defined timestamp is enabled as long as the timestamp is the last
// Slice in `key`, a SliceParts (array of Slices). The caller is responsible
// for setting up the `key` SliceParts object.
Status Delete(ColumnFamilyHandle* column_family, Status Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) override; const SliceParts& key) override;
Status Delete(const SliceParts& key) override { return Delete(nullptr, key); } Status Delete(const SliceParts& key) override { return Delete(nullptr, key); }
@ -318,10 +333,17 @@ class WriteBatch : public WriteBatchBase {
// Returns true if MarkRollback will be called during Iterate // Returns true if MarkRollback will be called during Iterate
bool HasRollback() const; bool HasRollback() const;
// Assign timestamp to write batch // Assign timestamp to write batch.
// This requires that all keys (possibly from multiple column families) in
// the write batch have timestamps of the same format.
Status AssignTimestamp(const Slice& ts); Status AssignTimestamp(const Slice& ts);
// Assign timestamps to write batch // Assign timestamps to write batch.
// This API allows the write batch to include keys from multiple column
// families whose timestamps' formats can differ. For example, some column
// families can enable timestamp, while others disable the feature.
// If key does not have timestamp, then put an empty Slice in ts_list as
// a placeholder.
Status AssignTimestamps(const std::vector<Slice>& ts_list); Status AssignTimestamps(const std::vector<Slice>& ts_list);
using WriteBatchBase::GetWriteBatch; using WriteBatchBase::GetWriteBatch;
@ -379,7 +401,6 @@ class WriteBatch : public WriteBatchBase {
protected: protected:
std::string rep_; // See comment in write_batch.cc for the format of rep_ std::string rep_; // See comment in write_batch.cc for the format of rep_
const size_t timestamp_size_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save