Add support to strip / pad timestamp when writing / reading a block (#11472)

Summary:
This patch adds support in `BlockBuilder` to strip user-defined timestamp from the `key` added via `Add(key, value)` and its equivalent APIs. The stripping logic is different when the key is either a user key or an internal key, so the `BlockBuilder` is created with a flag to indicate that. This patch also add support on the read path to APIs `NewIndexIterator`, `NewDataIterator` to support pad a min timestamp.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11472

Test Plan:
Three test modes are added to parameterize existing tests:
UserDefinedTimestampTestMode::kNone -> UDT feature is not enabled
UserDefinedTimestampTestMode::kNormal -> UDT feature enabled, write / read with min timestamp
UserDefinedTimestampTestMode::kStripUserDefinedTimestamps -> UDT feature enabled, write / read with min timestamp, set `persist_user_defined_timestamps` where it applies to false.
The tests read/write with min timestamp so that point read and range scan can correctly read values in all three test modes.

`block_test` are parameterized to run with above three test modes and some additional parameteriazation

```
make all check
./block_test --gtest_filter="P/BlockTest*"
./block_test --gtest_filter="P/IndexBlockTest*"
```

Reviewed By: ltamasi

Differential Revision: D46200539

Pulled By: jowlyzhang

fbshipit-source-id: 59f5d6b584639976b69c2943eba723bd47d9b3c0
oxigraph-main
Yu Zhang 1 year ago committed by Facebook GitHub Bot
parent dcc6fc99f9
commit d1ae7f6c41
  1. 19
      db/dbformat.cc
  2. 85
      db/dbformat.h
  3. 108
      db/dbformat_test.cc
  4. 53
      table/block_based/block.cc
  5. 72
      table/block_based/block.h
  6. 38
      table/block_based/block_builder.cc
  7. 27
      table/block_based/block_builder.h
  8. 215
      table/block_based/block_test.cc
  9. 2
      table/block_based/hash_index_reader.cc
  10. 16
      test_util/testutil.cc
  11. 16
      test_util/testutil.h

@ -101,6 +101,25 @@ void AppendUserKeyWithMaxTimestamp(std::string* result, const Slice& key,
} }
} }
void PadInternalKeyWithMinTimestamp(std::string* result, const Slice& key,
size_t ts_sz) {
assert(ts_sz > 0);
size_t user_key_size = key.size() - kNumInternalBytes;
result->reserve(key.size() + ts_sz);
result->append(key.data(), user_key_size);
result->append(ts_sz, static_cast<unsigned char>(0));
result->append(key.data() + user_key_size, kNumInternalBytes);
}
void StripTimestampFromInternalKey(std::string* result, const Slice& key,
size_t ts_sz) {
assert(key.size() >= ts_sz + kNumInternalBytes);
result->reserve(key.size() - ts_sz);
result->append(key.data(), key.size() - kNumInternalBytes - ts_sz);
result->append(key.data() + key.size() - kNumInternalBytes,
kNumInternalBytes);
}
std::string ParsedInternalKey::DebugString(bool log_err_key, bool hex) const { std::string ParsedInternalKey::DebugString(bool log_err_key, bool hex) const {
std::string result = "'"; std::string result = "'";
if (log_err_key) { if (log_err_key) {

@ -195,6 +195,18 @@ extern void AppendKeyWithMaxTimestamp(std::string* result, const Slice& key,
extern void AppendUserKeyWithMaxTimestamp(std::string* result, const Slice& key, extern void AppendUserKeyWithMaxTimestamp(std::string* result, const Slice& key,
size_t ts_sz); size_t ts_sz);
// `key` is an internal key containing a user key without timestamp. Create a
// new key in *result by padding a min timestamp of size `ts_sz` to the user key
// and copying the remaining internal key bytes.
extern void PadInternalKeyWithMinTimestamp(std::string* result,
const Slice& key, size_t ts_sz);
// `key` is an internal key containing a user key with timestamp of size
// `ts_sz`. Create a new internal key in *result by stripping the timestamp from
// the user key and copying the remaining internal key bytes.
extern void StripTimestampFromInternalKey(std::string* result, const Slice& key,
size_t ts_sz);
// Attempt to parse an internal key from "internal_key". On success, // Attempt to parse an internal key from "internal_key". On success,
// stores the parsed data in "*result", and returns true. // stores the parsed data in "*result", and returns true.
// //
@ -504,6 +516,62 @@ class IterKey {
key_size_ = total_size; key_size_ = total_size;
} }
// A version of `TrimAppend` assuming the last bytes of length `ts_sz` in the
// user key part of `key_` is not counted towards shared bytes. And the
// decoded key needed a min timestamp of length `ts_sz` pad to the user key.
void TrimAppendWithTimestamp(const size_t shared_len,
const char* non_shared_data,
const size_t non_shared_len,
const size_t ts_sz) {
std::string kTsMin(ts_sz, static_cast<unsigned char>(0));
std::string key_with_ts;
std::vector<Slice> key_parts_with_ts;
if (IsUserKey()) {
key_parts_with_ts = {Slice(key_, shared_len),
Slice(non_shared_data, non_shared_len),
Slice(kTsMin)};
} else {
assert(shared_len + non_shared_len >= kNumInternalBytes);
// Invaraint: shared_user_key_len + shared_internal_bytes_len = shared_len
// In naming below `*_len` variables, keyword `user_key` refers to the
// user key part of the existing key in `key_` as apposed to the new key.
// Similary, `internal_bytes` refers to the footer part of the existing
// key. These bytes potentially will move between user key part and the
// footer part in the new key.
const size_t user_key_len = key_size_ - kNumInternalBytes;
const size_t sharable_user_key_len = user_key_len - ts_sz;
const size_t shared_user_key_len =
std::min(shared_len, sharable_user_key_len);
const size_t shared_internal_bytes_len = shared_len - shared_user_key_len;
// One Slice among the three Slices will get split into two Slices, plus
// a timestamp slice.
key_parts_with_ts.reserve(5);
bool ts_added = false;
// Add slice parts and find the right location to add the min timestamp.
MaybeAddKeyPartsWithTimestamp(
key_, shared_user_key_len,
shared_internal_bytes_len + non_shared_len < kNumInternalBytes,
shared_len + non_shared_len - kNumInternalBytes, kTsMin,
key_parts_with_ts, &ts_added);
MaybeAddKeyPartsWithTimestamp(
key_ + user_key_len, shared_internal_bytes_len,
non_shared_len < kNumInternalBytes,
shared_internal_bytes_len + non_shared_len - kNumInternalBytes,
kTsMin, key_parts_with_ts, &ts_added);
MaybeAddKeyPartsWithTimestamp(non_shared_data, non_shared_len,
non_shared_len >= kNumInternalBytes,
non_shared_len - kNumInternalBytes, kTsMin,
key_parts_with_ts, &ts_added);
assert(ts_added);
}
Slice new_key(SliceParts(&key_parts_with_ts.front(),
static_cast<int>(key_parts_with_ts.size())),
&key_with_ts);
SetKey(new_key);
}
Slice SetKey(const Slice& key, bool copy = true) { Slice SetKey(const Slice& key, bool copy = true) {
// is_user_key_ expected to be set already via SetIsUserKey // is_user_key_ expected to be set already via SetIsUserKey
return SetKeyImpl(key, copy); return SetKeyImpl(key, copy);
@ -661,6 +729,23 @@ class IterKey {
} }
void EnlargeBuffer(size_t key_size); void EnlargeBuffer(size_t key_size);
void MaybeAddKeyPartsWithTimestamp(const char* slice_data,
const size_t slice_sz, bool add_timestamp,
const size_t left_sz,
const std::string& min_timestamp,
std::vector<Slice>& key_parts,
bool* ts_added) {
if (add_timestamp && !*ts_added) {
assert(slice_sz >= left_sz);
key_parts.emplace_back(slice_data, left_sz);
key_parts.emplace_back(min_timestamp);
key_parts.emplace_back(slice_data + left_sz, slice_sz - left_sz);
*ts_added = true;
} else {
key_parts.emplace_back(slice_data, slice_sz);
}
}
}; };
// Convert from a SliceTransform of user keys, to a SliceTransform of // Convert from a SliceTransform of user keys, to a SliceTransform of

@ -178,6 +178,79 @@ TEST_F(FormatTest, IterKeyOperation) {
"abcdefghijklmnopqrstuvwxyz")); "abcdefghijklmnopqrstuvwxyz"));
} }
TEST_F(FormatTest, IterKeyWithTimestampOperation) {
IterKey k;
k.SetUserKey("");
const char p[] = "abcdefghijklmnopqrstuvwxyz";
const char q[] = "0123456789";
ASSERT_EQ(std::string(k.GetUserKey().data(), k.GetUserKey().size()),
std::string(""));
size_t ts_sz = 8;
std::string min_timestamp(ts_sz, static_cast<unsigned char>(0));
k.TrimAppendWithTimestamp(0, p, 3, ts_sz);
ASSERT_EQ(std::string(k.GetUserKey().data(), k.GetUserKey().size()),
"abc" + min_timestamp);
k.TrimAppendWithTimestamp(1, p, 3, ts_sz);
ASSERT_EQ(std::string(k.GetUserKey().data(), k.GetUserKey().size()),
"aabc" + min_timestamp);
k.TrimAppendWithTimestamp(0, p, 26, ts_sz);
ASSERT_EQ(std::string(k.GetUserKey().data(), k.GetUserKey().size()),
"abcdefghijklmnopqrstuvwxyz" + min_timestamp);
k.TrimAppendWithTimestamp(26, q, 10, ts_sz);
ASSERT_EQ(std::string(k.GetUserKey().data(), k.GetUserKey().size()),
"abcdefghijklmnopqrstuvwxyz0123456789" + min_timestamp);
k.TrimAppendWithTimestamp(36, q, 1, ts_sz);
ASSERT_EQ(std::string(k.GetUserKey().data(), k.GetUserKey().size()),
"abcdefghijklmnopqrstuvwxyz01234567890" + min_timestamp);
k.TrimAppendWithTimestamp(26, q, 1, ts_sz);
ASSERT_EQ(std::string(k.GetUserKey().data(), k.GetUserKey().size()),
"abcdefghijklmnopqrstuvwxyz0" + min_timestamp);
k.TrimAppendWithTimestamp(27, p, 26, ts_sz);
ASSERT_EQ(std::string(k.GetUserKey().data(), k.GetUserKey().size()),
"abcdefghijklmnopqrstuvwxyz0"
"abcdefghijklmnopqrstuvwxyz" +
min_timestamp);
// IterKey holds an internal key, the last 8 bytes hold the key footer, the
// timestamp is expected to be added before the key footer.
std::string key_without_ts = "keywithoutts";
k.SetInternalKey(key_without_ts + min_timestamp + "internal");
ASSERT_EQ(std::string(k.GetInternalKey().data(), k.GetInternalKey().size()),
key_without_ts + min_timestamp + "internal");
k.TrimAppendWithTimestamp(0, p, 10, ts_sz);
ASSERT_EQ(std::string(k.GetInternalKey().data(), k.GetInternalKey().size()),
"ab" + min_timestamp + "cdefghij");
k.TrimAppendWithTimestamp(1, p, 8, ts_sz);
ASSERT_EQ(std::string(k.GetInternalKey().data(), k.GetInternalKey().size()),
"a" + min_timestamp + "abcdefgh");
k.TrimAppendWithTimestamp(9, p, 3, ts_sz);
ASSERT_EQ(std::string(k.GetInternalKey().data(), k.GetInternalKey().size()),
"aabc" + min_timestamp + "defghabc");
k.TrimAppendWithTimestamp(10, q, 10, ts_sz);
ASSERT_EQ(std::string(k.GetInternalKey().data(), k.GetInternalKey().size()),
"aabcdefgha01" + min_timestamp + "23456789");
k.TrimAppendWithTimestamp(20, q, 1, ts_sz);
ASSERT_EQ(std::string(k.GetInternalKey().data(), k.GetInternalKey().size()),
"aabcdefgha012" + min_timestamp + "34567890");
k.TrimAppendWithTimestamp(21, p, 26, ts_sz);
ASSERT_EQ(
std::string(k.GetInternalKey().data(), k.GetInternalKey().size()),
"aabcdefgha01234567890abcdefghijklmnopqr" + min_timestamp + "stuvwxyz");
}
TEST_F(FormatTest, UpdateInternalKey) { TEST_F(FormatTest, UpdateInternalKey) {
std::string user_key("abcdefghijklmnopqrstuvwxyz"); std::string user_key("abcdefghijklmnopqrstuvwxyz");
uint64_t new_seq = 0x123456; uint64_t new_seq = 0x123456;
@ -204,6 +277,41 @@ TEST_F(FormatTest, RangeTombstoneSerializeEndKey) {
ASSERT_LT(cmp.Compare(t.SerializeEndKey(), k), 0); ASSERT_LT(cmp.Compare(t.SerializeEndKey(), k), 0);
} }
TEST_F(FormatTest, PadInternalKeyWithMinTimestamp) {
std::string orig_user_key = "foo";
std::string orig_internal_key = IKey(orig_user_key, 100, kTypeValue);
size_t ts_sz = 8;
std::string key_buf;
PadInternalKeyWithMinTimestamp(&key_buf, orig_internal_key, ts_sz);
ParsedInternalKey key_with_timestamp;
Slice in(key_buf);
ASSERT_OK(ParseInternalKey(in, &key_with_timestamp, true /*log_err_key*/));
std::string min_timestamp(ts_sz, static_cast<unsigned char>(0));
ASSERT_EQ(orig_user_key + min_timestamp, key_with_timestamp.user_key);
ASSERT_EQ(100, key_with_timestamp.sequence);
ASSERT_EQ(kTypeValue, key_with_timestamp.type);
}
TEST_F(FormatTest, StripTimestampFromInternalKey) {
std::string orig_user_key = "foo";
size_t ts_sz = 8;
std::string timestamp(ts_sz, static_cast<unsigned char>(0));
orig_user_key.append(timestamp.data(), timestamp.size());
std::string orig_internal_key = IKey(orig_user_key, 100, kTypeValue);
std::string key_buf;
StripTimestampFromInternalKey(&key_buf, orig_internal_key, ts_sz);
ParsedInternalKey key_without_timestamp;
Slice in(key_buf);
ASSERT_OK(ParseInternalKey(in, &key_without_timestamp, true /*log_err_key*/));
ASSERT_EQ("foo", key_without_timestamp.user_key);
ASSERT_EQ(100, key_without_timestamp.sequence);
ASSERT_EQ(kTypeValue, key_without_timestamp.type);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -634,13 +634,22 @@ bool BlockIter<TValue>::ParseNextKey(bool* is_shared) {
} else { } else {
if (shared == 0) { if (shared == 0) {
*is_shared = false; *is_shared = false;
// If this key doesn't share any bytes with prev key then we don't need // If this key doesn't share any bytes with prev key, and no min timestamp
// to decode it and can use its address in the block directly. // needs to be padded to the key, then we don't need to decode it and
raw_key_.SetKey(Slice(p, non_shared), false /* copy */); // can use its address in the block directly (no copy).
UpdateRawKeyAndMaybePadMinTimestamp(Slice(p, non_shared));
} else { } else {
// This key share `shared` bytes with prev key, we need to decode it // This key share `shared` bytes with prev key, we need to decode it
*is_shared = true; *is_shared = true;
raw_key_.TrimAppend(shared, p, non_shared); // If user-defined timestamp is stripped from user key before keys are
// delta encoded, the decoded key consisting of the shared and non shared
// bytes do not have user-defined timestamp yet. We need to pad min
// timestamp to it.
if (pad_min_timestamp_) {
raw_key_.TrimAppendWithTimestamp(shared, p, non_shared, ts_sz_);
} else {
raw_key_.TrimAppend(shared, p, non_shared);
}
} }
value_ = Slice(p + non_shared, value_length); value_ = Slice(p + non_shared, value_length);
if (shared == 0) { if (shared == 0) {
@ -686,7 +695,8 @@ bool IndexBlockIter::ParseNextIndexKey() {
bool ok = (value_delta_encoded_) ? ParseNextKey<DecodeEntryV4>(&is_shared) bool ok = (value_delta_encoded_) ? ParseNextKey<DecodeEntryV4>(&is_shared)
: ParseNextKey<DecodeEntry>(&is_shared); : ParseNextKey<DecodeEntry>(&is_shared);
if (ok) { if (ok) {
if (value_delta_encoded_ || global_seqno_state_ != nullptr) { if (value_delta_encoded_ || global_seqno_state_ != nullptr ||
pad_min_timestamp_) {
DecodeCurrentValue(is_shared); DecodeCurrentValue(is_shared);
} }
} }
@ -732,6 +742,12 @@ void IndexBlockIter::DecodeCurrentValue(bool is_shared) {
value_type); value_type);
decoded_value_.first_internal_key = first_internal_key.GetKey(); decoded_value_.first_internal_key = first_internal_key.GetKey();
} }
if (pad_min_timestamp_ && !decoded_value_.first_internal_key.empty()) {
first_internal_key_with_ts_.clear();
PadInternalKeyWithMinTimestamp(&first_internal_key_with_ts_,
decoded_value_.first_internal_key, ts_sz_);
decoded_value_.first_internal_key = first_internal_key_with_ts_;
}
} }
template <class TValue> template <class TValue>
@ -817,7 +833,7 @@ bool BlockIter<TValue>::BinarySeek(const Slice& target, uint32_t* index,
return false; return false;
} }
Slice mid_key(key_ptr, non_shared); Slice mid_key(key_ptr, non_shared);
raw_key_.SetKey(mid_key, false /* copy */); UpdateRawKeyAndMaybePadMinTimestamp(mid_key);
int cmp = CompareCurrentKey(target); int cmp = CompareCurrentKey(target);
if (cmp < 0) { if (cmp < 0) {
// Key at "mid" is smaller than "target". Therefore all // Key at "mid" is smaller than "target". Therefore all
@ -1080,9 +1096,12 @@ void Block::InitializeDataBlockProtectionInfo(uint8_t protection_bytes_per_key,
// //
// We do not know global_seqno yet, so checksum computation and // We do not know global_seqno yet, so checksum computation and
// verification all assume global_seqno = 0. // verification all assume global_seqno = 0.
// TODO(yuzhangyu): handle the implication of padding timestamp for kv
// protection.
std::unique_ptr<DataBlockIter> iter{NewDataIterator( std::unique_ptr<DataBlockIter> iter{NewDataIterator(
raw_ucmp, kDisableGlobalSequenceNumber, nullptr /* iter */, raw_ucmp, kDisableGlobalSequenceNumber, nullptr /* iter */,
nullptr /* stats */, true /* block_contents_pinned */)}; nullptr /* stats */, true /* block_contents_pinned */,
true /* user_defined_timestamps_persisted */)};
if (iter->status().ok()) { if (iter->status().ok()) {
block_restart_interval_ = iter->GetRestartInterval(); block_restart_interval_ = iter->GetRestartInterval();
} }
@ -1123,11 +1142,14 @@ void Block::InitializeIndexBlockProtectionInfo(uint8_t protection_bytes_per_key,
// raw_key_.GetKey() returned by iter->key() as the `key` part of key-value // raw_key_.GetKey() returned by iter->key() as the `key` part of key-value
// checksum, and the content of this buffer do not change for different // checksum, and the content of this buffer do not change for different
// values of `global_seqno` or `key_includes_seq`. // values of `global_seqno` or `key_includes_seq`.
// TODO(yuzhangyu): handle the implication of padding timestamp for kv
// protection.
std::unique_ptr<IndexBlockIter> iter{NewIndexIterator( std::unique_ptr<IndexBlockIter> iter{NewIndexIterator(
raw_ucmp, kDisableGlobalSequenceNumber /* global_seqno */, nullptr, raw_ucmp, kDisableGlobalSequenceNumber /* global_seqno */, nullptr,
nullptr /* Statistics */, true /* total_order_seek */, nullptr /* Statistics */, true /* total_order_seek */,
index_has_first_key /* have_first_key */, false /* key_includes_seq */, index_has_first_key /* have_first_key */, false /* key_includes_seq */,
value_is_full, true /* block_contents_pinned */, value_is_full, true /* block_contents_pinned */,
true /* user_defined_timestamps_persisted*/,
nullptr /* prefix_index */)}; nullptr /* prefix_index */)};
if (iter->status().ok()) { if (iter->status().ok()) {
block_restart_interval_ = iter->GetRestartInterval(); block_restart_interval_ = iter->GetRestartInterval();
@ -1210,7 +1232,8 @@ MetaBlockIter* Block::NewMetaIterator(bool block_contents_pinned) {
DataBlockIter* Block::NewDataIterator(const Comparator* raw_ucmp, DataBlockIter* Block::NewDataIterator(const Comparator* raw_ucmp,
SequenceNumber global_seqno, SequenceNumber global_seqno,
DataBlockIter* iter, Statistics* stats, DataBlockIter* iter, Statistics* stats,
bool block_contents_pinned) { bool block_contents_pinned,
bool user_defined_timestamps_persisted) {
DataBlockIter* ret_iter; DataBlockIter* ret_iter;
if (iter != nullptr) { if (iter != nullptr) {
ret_iter = iter; ret_iter = iter;
@ -1229,6 +1252,7 @@ DataBlockIter* Block::NewDataIterator(const Comparator* raw_ucmp,
ret_iter->Initialize( ret_iter->Initialize(
raw_ucmp, data_, restart_offset_, num_restarts_, global_seqno, raw_ucmp, data_, restart_offset_, num_restarts_, global_seqno,
read_amp_bitmap_.get(), block_contents_pinned, read_amp_bitmap_.get(), block_contents_pinned,
user_defined_timestamps_persisted,
data_block_hash_index_.Valid() ? &data_block_hash_index_ : nullptr, data_block_hash_index_.Valid() ? &data_block_hash_index_ : nullptr,
protection_bytes_per_key_, kv_checksum_, block_restart_interval_); protection_bytes_per_key_, kv_checksum_, block_restart_interval_);
if (read_amp_bitmap_) { if (read_amp_bitmap_) {
@ -1246,7 +1270,8 @@ IndexBlockIter* Block::NewIndexIterator(
const Comparator* raw_ucmp, SequenceNumber global_seqno, const Comparator* raw_ucmp, SequenceNumber global_seqno,
IndexBlockIter* iter, Statistics* /*stats*/, bool total_order_seek, IndexBlockIter* iter, Statistics* /*stats*/, bool total_order_seek,
bool have_first_key, bool key_includes_seq, bool value_is_full, bool have_first_key, bool key_includes_seq, bool value_is_full,
bool block_contents_pinned, BlockPrefixIndex* prefix_index) { bool block_contents_pinned, bool user_defined_timestamps_persisted,
BlockPrefixIndex* prefix_index) {
IndexBlockIter* ret_iter; IndexBlockIter* ret_iter;
if (iter != nullptr) { if (iter != nullptr) {
ret_iter = iter; ret_iter = iter;
@ -1264,11 +1289,11 @@ IndexBlockIter* Block::NewIndexIterator(
} else { } else {
BlockPrefixIndex* prefix_index_ptr = BlockPrefixIndex* prefix_index_ptr =
total_order_seek ? nullptr : prefix_index; total_order_seek ? nullptr : prefix_index;
ret_iter->Initialize(raw_ucmp, data_, restart_offset_, num_restarts_, ret_iter->Initialize(
global_seqno, prefix_index_ptr, have_first_key, raw_ucmp, data_, restart_offset_, num_restarts_, global_seqno,
key_includes_seq, value_is_full, block_contents_pinned, prefix_index_ptr, have_first_key, key_includes_seq, value_is_full,
protection_bytes_per_key_, kv_checksum_, block_contents_pinned, user_defined_timestamps_persisted,
block_restart_interval_); protection_bytes_per_key_, kv_checksum_, block_restart_interval_);
} }
return ret_iter; return ret_iter;

@ -188,6 +188,9 @@ class Block {
// will not go away (for example, it's from mmapped file which will not be // will not go away (for example, it's from mmapped file which will not be
// closed). // closed).
// //
// `user_defined_timestamps_persisted` controls whether a min timestamp is
// padded while key is being parsed from the block.
//
// NOTE: for the hash based lookup, if a key prefix doesn't match any key, // NOTE: for the hash based lookup, if a key prefix doesn't match any key,
// the iterator will simply be set as "invalid", rather than returning // the iterator will simply be set as "invalid", rather than returning
// the key that is just pass the target key. // the key that is just pass the target key.
@ -195,7 +198,8 @@ class Block {
SequenceNumber global_seqno, SequenceNumber global_seqno,
DataBlockIter* iter = nullptr, DataBlockIter* iter = nullptr,
Statistics* stats = nullptr, Statistics* stats = nullptr,
bool block_contents_pinned = false); bool block_contents_pinned = false,
bool user_defined_timestamps_persisted = true);
// Returns an MetaBlockIter for iterating over blocks containing metadata // Returns an MetaBlockIter for iterating over blocks containing metadata
// (like Properties blocks). Unlike data blocks, the keys for these blocks // (like Properties blocks). Unlike data blocks, the keys for these blocks
@ -227,13 +231,15 @@ class Block {
// first_internal_key. It affects data serialization format, so the same value // first_internal_key. It affects data serialization format, so the same value
// have_first_key must be used when writing and reading index. // have_first_key must be used when writing and reading index.
// It is determined by IndexType property of the table. // It is determined by IndexType property of the table.
IndexBlockIter* NewIndexIterator(const Comparator* raw_ucmp, // `user_defined_timestamps_persisted` controls whether a min timestamp is
SequenceNumber global_seqno, // padded while key is being parsed from the block.
IndexBlockIter* iter, Statistics* stats, IndexBlockIter* NewIndexIterator(
bool total_order_seek, bool have_first_key, const Comparator* raw_ucmp, SequenceNumber global_seqno,
bool key_includes_seq, bool value_is_full, IndexBlockIter* iter, Statistics* stats, bool total_order_seek,
bool block_contents_pinned = false, bool have_first_key, bool key_includes_seq, bool value_is_full,
BlockPrefixIndex* prefix_index = nullptr); bool block_contents_pinned = false,
bool user_defined_timestamps_persisted = true,
BlockPrefixIndex* prefix_index = nullptr);
// Report an approximation of how much memory has been used. // Report an approximation of how much memory has been used.
size_t ApproximateMemoryUsage() const; size_t ApproximateMemoryUsage() const;
@ -440,6 +446,19 @@ class BlockIter : public InternalIteratorBase<TValue> {
// Key to be exposed to users. // Key to be exposed to users.
Slice key_; Slice key_;
SequenceNumber global_seqno_; SequenceNumber global_seqno_;
// Size of the user-defined timestamp.
size_t ts_sz_ = 0;
// If user-defined timestamp is enabled but not persisted. A min timestamp
// will be padded to the key during key parsing where it applies. Such as when
// parsing keys from data block, index block, parsing the first internal
// key from IndexValue entry. Min timestamp padding is different for when
// `raw_key_` is a user key vs is an internal key.
//
// This only applies to data block and index blocks including index block for
// data blocks, index block for partitioned filter blocks, index block for
// partitioned index blocks. In summary, this only applies to block whose key
// are real user keys or internal keys created from user keys.
bool pad_min_timestamp_;
// Per key-value checksum related states // Per key-value checksum related states
const char* kv_checksum_; const char* kv_checksum_;
@ -505,6 +524,8 @@ class BlockIter : public InternalIteratorBase<TValue> {
void InitializeBase(const Comparator* raw_ucmp, const char* data, void InitializeBase(const Comparator* raw_ucmp, const char* data,
uint32_t restarts, uint32_t num_restarts, uint32_t restarts, uint32_t num_restarts,
SequenceNumber global_seqno, bool block_contents_pinned, SequenceNumber global_seqno, bool block_contents_pinned,
bool user_defined_timestamp_persisted,
uint8_t protection_bytes_per_key, const char* kv_checksum, uint8_t protection_bytes_per_key, const char* kv_checksum,
uint32_t block_restart_interval) { uint32_t block_restart_interval) {
assert(data_ == nullptr); // Ensure it is called only once assert(data_ == nullptr); // Ensure it is called only once
@ -517,6 +538,10 @@ class BlockIter : public InternalIteratorBase<TValue> {
current_ = restarts_; current_ = restarts_;
restart_index_ = num_restarts_; restart_index_ = num_restarts_;
global_seqno_ = global_seqno; global_seqno_ = global_seqno;
if (raw_ucmp != nullptr) {
ts_sz_ = raw_ucmp->timestamp_size();
}
pad_min_timestamp_ = ts_sz_ > 0 && !user_defined_timestamp_persisted;
block_contents_pinned_ = block_contents_pinned; block_contents_pinned_ = block_contents_pinned;
cache_handle_ = nullptr; cache_handle_ = nullptr;
cur_entry_idx_ = -1; cur_entry_idx_ = -1;
@ -548,6 +573,20 @@ class BlockIter : public InternalIteratorBase<TValue> {
CorruptionError(error_msg); CorruptionError(error_msg);
} }
void UpdateRawKeyAndMaybePadMinTimestamp(const Slice& key) {
if (pad_min_timestamp_) {
std::string buf;
if (raw_key_.IsUserKey()) {
AppendKeyWithMinTimestamp(&buf, key, ts_sz_);
} else {
PadInternalKeyWithMinTimestamp(&buf, key, ts_sz_);
}
raw_key_.SetKey(buf, true /* copy */);
} else {
raw_key_.SetKey(key, false /* copy */);
}
}
// Must be called every time a key is found that needs to be returned to user, // Must be called every time a key is found that needs to be returned to user,
// and may be called when no key is found (as a no-op). Updates `key_`, // and may be called when no key is found (as a no-op). Updates `key_`,
// `key_buf_`, and `key_pinned_` with info about the found key. // `key_buf_`, and `key_pinned_` with info about the found key.
@ -658,11 +697,13 @@ class DataBlockIter final : public BlockIter<Slice> {
SequenceNumber global_seqno, SequenceNumber global_seqno,
BlockReadAmpBitmap* read_amp_bitmap, BlockReadAmpBitmap* read_amp_bitmap,
bool block_contents_pinned, bool block_contents_pinned,
bool user_defined_timestamps_persisted,
DataBlockHashIndex* data_block_hash_index, DataBlockHashIndex* data_block_hash_index,
uint8_t protection_bytes_per_key, const char* kv_checksum, uint8_t protection_bytes_per_key, const char* kv_checksum,
uint32_t block_restart_interval) { uint32_t block_restart_interval) {
InitializeBase(raw_ucmp, data, restarts, num_restarts, global_seqno, InitializeBase(raw_ucmp, data, restarts, num_restarts, global_seqno,
block_contents_pinned, protection_bytes_per_key, kv_checksum, block_contents_pinned, user_defined_timestamps_persisted,
protection_bytes_per_key, kv_checksum,
block_restart_interval); block_restart_interval);
raw_key_.SetIsUserKey(false); raw_key_.SetIsUserKey(false);
read_amp_bitmap_ = read_amp_bitmap; read_amp_bitmap_ = read_amp_bitmap;
@ -763,6 +804,7 @@ class MetaBlockIter final : public BlockIter<Slice> {
// the raw key being a user key. // the raw key being a user key.
InitializeBase(BytewiseComparator(), data, restarts, num_restarts, InitializeBase(BytewiseComparator(), data, restarts, num_restarts,
kDisableGlobalSequenceNumber, block_contents_pinned, kDisableGlobalSequenceNumber, block_contents_pinned,
/* user_defined_timestamps_persisted */ true,
protection_bytes_per_key, kv_checksum, protection_bytes_per_key, kv_checksum,
block_restart_interval); block_restart_interval);
raw_key_.SetIsUserKey(true); raw_key_.SetIsUserKey(true);
@ -800,12 +842,13 @@ class IndexBlockIter final : public BlockIter<IndexValue> {
SequenceNumber global_seqno, BlockPrefixIndex* prefix_index, SequenceNumber global_seqno, BlockPrefixIndex* prefix_index,
bool have_first_key, bool key_includes_seq, bool have_first_key, bool key_includes_seq,
bool value_is_full, bool block_contents_pinned, bool value_is_full, bool block_contents_pinned,
bool user_defined_timestamps_persisted,
uint8_t protection_bytes_per_key, const char* kv_checksum, uint8_t protection_bytes_per_key, const char* kv_checksum,
uint32_t block_restart_interval) { uint32_t block_restart_interval) {
InitializeBase(raw_ucmp, data, restarts, num_restarts, InitializeBase(raw_ucmp, data, restarts, num_restarts,
kDisableGlobalSequenceNumber, block_contents_pinned, kDisableGlobalSequenceNumber, block_contents_pinned,
protection_bytes_per_key, kv_checksum, user_defined_timestamps_persisted, protection_bytes_per_key,
block_restart_interval); kv_checksum, block_restart_interval);
raw_key_.SetIsUserKey(!key_includes_seq); raw_key_.SetIsUserKey(!key_includes_seq);
prefix_index_ = prefix_index; prefix_index_ = prefix_index;
value_delta_encoded_ = !value_is_full; value_delta_encoded_ = !value_is_full;
@ -824,7 +867,8 @@ class IndexBlockIter final : public BlockIter<IndexValue> {
IndexValue value() const override { IndexValue value() const override {
assert(Valid()); assert(Valid());
if (value_delta_encoded_ || global_seqno_state_ != nullptr) { if (value_delta_encoded_ || global_seqno_state_ != nullptr ||
pad_min_timestamp_) {
return decoded_value_; return decoded_value_;
} else { } else {
IndexValue entry; IndexValue entry;
@ -899,6 +943,10 @@ class IndexBlockIter final : public BlockIter<IndexValue> {
std::unique_ptr<GlobalSeqnoState> global_seqno_state_; std::unique_ptr<GlobalSeqnoState> global_seqno_state_;
// Buffers the `first_internal_key` referred by `decoded_value_` when
// `pad_min_timestamp_` is true.
std::string first_internal_key_with_ts_;
// Set *prefix_may_exist to false if no key possibly share the same prefix // Set *prefix_may_exist to false if no key possibly share the same prefix
// as `target`. If not set, the result position should be the same as total // as `target`. If not set, the result position should be the same as total
// order Seek. // order Seek.

@ -48,10 +48,14 @@ BlockBuilder::BlockBuilder(
int block_restart_interval, bool use_delta_encoding, int block_restart_interval, bool use_delta_encoding,
bool use_value_delta_encoding, bool use_value_delta_encoding,
BlockBasedTableOptions::DataBlockIndexType index_type, BlockBasedTableOptions::DataBlockIndexType index_type,
double data_block_hash_table_util_ratio) double data_block_hash_table_util_ratio, size_t ts_sz,
bool persist_user_defined_timestamps, bool is_user_key)
: block_restart_interval_(block_restart_interval), : block_restart_interval_(block_restart_interval),
use_delta_encoding_(use_delta_encoding), use_delta_encoding_(use_delta_encoding),
use_value_delta_encoding_(use_value_delta_encoding), use_value_delta_encoding_(use_value_delta_encoding),
ts_sz_(ts_sz),
persist_user_defined_timestamps_(persist_user_defined_timestamps),
is_user_key_(is_user_key),
restarts_(1, 0), // First restart point is at offset 0 restarts_(1, 0), // First restart point is at offset 0
counter_(0), counter_(0),
finished_(false) { finished_(false) {
@ -96,6 +100,9 @@ size_t BlockBuilder::EstimateSizeAfterKV(const Slice& key,
// Note: this is an imprecise estimate as it accounts for the whole key size // Note: this is an imprecise estimate as it accounts for the whole key size
// instead of non-shared key size. // instead of non-shared key size.
estimate += key.size(); estimate += key.size();
if (ts_sz_ > 0 && !persist_user_defined_timestamps_) {
estimate -= ts_sz_;
}
// In value delta encoding we estimate the value delta size as half the full // In value delta encoding we estimate the value delta size as half the full
// value size since only the size field of block handle is encoded. // value size since only the size field of block handle is encoded.
estimate += estimate +=
@ -187,6 +194,15 @@ inline void BlockBuilder::AddWithLastKeyImpl(const Slice& key,
assert(!finished_); assert(!finished_);
assert(counter_ <= block_restart_interval_); assert(counter_ <= block_restart_interval_);
assert(!use_value_delta_encoding_ || delta_value); assert(!use_value_delta_encoding_ || delta_value);
std::string key_buf;
std::string last_key_buf;
const Slice key_to_persist = MaybeStripTimestampFromKey(&key_buf, key);
// For delta key encoding, the first key in each restart interval doesn't have
// a last key to share bytes with.
const Slice last_key_persisted =
last_key.size() == 0
? last_key
: MaybeStripTimestampFromKey(&last_key_buf, last_key);
size_t shared = 0; // number of bytes shared with prev key size_t shared = 0; // number of bytes shared with prev key
if (counter_ >= block_restart_interval_) { if (counter_ >= block_restart_interval_) {
// Restart compression // Restart compression
@ -195,10 +211,10 @@ inline void BlockBuilder::AddWithLastKeyImpl(const Slice& key,
counter_ = 0; counter_ = 0;
} else if (use_delta_encoding_) { } else if (use_delta_encoding_) {
// See how much sharing to do with previous string // See how much sharing to do with previous string
shared = key.difference_offset(last_key); shared = key_to_persist.difference_offset(last_key_persisted);
} }
const size_t non_shared = key.size() - shared; const size_t non_shared = key_to_persist.size() - shared;
if (use_value_delta_encoding_) { if (use_value_delta_encoding_) {
// Add "<shared><non_shared>" to buffer_ // Add "<shared><non_shared>" to buffer_
@ -212,7 +228,7 @@ inline void BlockBuilder::AddWithLastKeyImpl(const Slice& key,
} }
// Add string delta to buffer_ followed by value // Add string delta to buffer_ followed by value
buffer_.append(key.data() + shared, non_shared); buffer_.append(key_to_persist.data() + shared, non_shared);
// Use value delta encoding only when the key has shared bytes. This would // Use value delta encoding only when the key has shared bytes. This would
// simplify the decoding, where it can figure which decoding to use simply by // simplify the decoding, where it can figure which decoding to use simply by
// looking at the shared bytes size. // looking at the shared bytes size.
@ -222,6 +238,7 @@ inline void BlockBuilder::AddWithLastKeyImpl(const Slice& key,
buffer_.append(value.data(), value.size()); buffer_.append(value.data(), value.size());
} }
// TODO(yuzhangyu): make user defined timestamp work with block hash index.
if (data_block_hash_index_builder_.Valid()) { if (data_block_hash_index_builder_.Valid()) {
data_block_hash_index_builder_.Add(ExtractUserKey(key), data_block_hash_index_builder_.Add(ExtractUserKey(key),
restarts_.size() - 1); restarts_.size() - 1);
@ -231,4 +248,17 @@ inline void BlockBuilder::AddWithLastKeyImpl(const Slice& key,
estimate_ += buffer_.size() - buffer_size; estimate_ += buffer_.size() - buffer_size;
} }
const Slice BlockBuilder::MaybeStripTimestampFromKey(std::string* key_buf,
const Slice& key) {
Slice stripped_key = key;
if (ts_sz_ > 0 && !persist_user_defined_timestamps_) {
if (is_user_key_) {
stripped_key.remove_suffix(ts_sz_);
} else {
StripTimestampFromInternalKey(key_buf, key, ts_sz_);
stripped_key = *key_buf;
}
}
return stripped_key;
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -28,7 +28,10 @@ class BlockBuilder {
bool use_value_delta_encoding = false, bool use_value_delta_encoding = false,
BlockBasedTableOptions::DataBlockIndexType index_type = BlockBasedTableOptions::DataBlockIndexType index_type =
BlockBasedTableOptions::kDataBlockBinarySearch, BlockBasedTableOptions::kDataBlockBinarySearch,
double data_block_hash_table_util_ratio = 0.75); double data_block_hash_table_util_ratio = 0.75,
size_t ts_sz = 0,
bool persist_user_defined_timestamps = true,
bool is_user_key = false);
// Reset the contents as if the BlockBuilder was just constructed. // Reset the contents as if the BlockBuilder was just constructed.
void Reset(); void Reset();
@ -83,11 +86,33 @@ class BlockBuilder {
const Slice* const delta_value, const Slice* const delta_value,
size_t buffer_size); size_t buffer_size);
inline const Slice MaybeStripTimestampFromKey(std::string* key_buf,
const Slice& key);
const int block_restart_interval_; const int block_restart_interval_;
// TODO(myabandeh): put it into a separate IndexBlockBuilder // TODO(myabandeh): put it into a separate IndexBlockBuilder
const bool use_delta_encoding_; const bool use_delta_encoding_;
// Refer to BlockIter::DecodeCurrentValue for format of delta encoded values // Refer to BlockIter::DecodeCurrentValue for format of delta encoded values
const bool use_value_delta_encoding_; const bool use_value_delta_encoding_;
// Size in bytes for the user-defined timestamp in a user key.
const size_t ts_sz_;
// Whether the user-defined timestamp part in user keys should be persisted.
// If false, it will be stripped from the key before it's encoded.
const bool persist_user_defined_timestamps_;
// Whether the keys provided to build this block are user keys. If not,
// the keys are internal keys. This will affect how timestamp stripping is
// done for the key if `persisted_user_defined_timestamps_` is false and
// `ts_sz_` is non-zero.
// The timestamp stripping only applies to the keys added to the block. If the
// value contains user defined timestamp that needed to be stripped too, such
// as the `first_internal_key` in an `IndexValue` for an index block, the
// value part for a range deletion entry, their timestamp should be stripped
// before calling `BlockBuilder::Add`.
// Timestamp stripping only applies to data block and index blocks including
// index block for data blocks, index block for partitioned filter blocks,
// index block for partitioned index blocks. In summary, this only applies to
// block whose key are real user keys or internal keys created from user keys.
const bool is_user_key_;
std::string buffer_; // Destination buffer std::string buffer_; // Destination buffer
std::vector<uint32_t> restarts_; // Restart points std::vector<uint32_t> restarts_; // Restart points

@ -34,7 +34,8 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
std::string GenerateInternalKey(int primary_key, int secondary_key, std::string GenerateInternalKey(int primary_key, int secondary_key,
int padding_size, Random *rnd) { int padding_size, Random *rnd,
size_t ts_sz = 0) {
char buf[50]; char buf[50];
char *p = &buf[0]; char *p = &buf[0];
snprintf(buf, sizeof(buf), "%6d%4d", primary_key, secondary_key); snprintf(buf, sizeof(buf), "%6d%4d", primary_key, secondary_key);
@ -43,6 +44,11 @@ std::string GenerateInternalKey(int primary_key, int secondary_key,
k += rnd->RandomString(padding_size); k += rnd->RandomString(padding_size);
} }
AppendInternalKeyFooter(&k, 0 /* seqno */, kTypeValue); AppendInternalKeyFooter(&k, 0 /* seqno */, kTypeValue);
std::string key_with_ts;
if (ts_sz > 0) {
PadInternalKeyWithMinTimestamp(&key_with_ts, k, ts_sz);
return key_with_ts;
}
return k; return k;
} }
@ -54,7 +60,7 @@ void GenerateRandomKVs(std::vector<std::string> *keys,
std::vector<std::string> *values, const int from, std::vector<std::string> *values, const int from,
const int len, const int step = 1, const int len, const int step = 1,
const int padding_size = 0, const int padding_size = 0,
const int keys_share_prefix = 1) { const int keys_share_prefix = 1, size_t ts_sz = 0) {
Random rnd(302); Random rnd(302);
// generate different prefix // generate different prefix
@ -62,7 +68,7 @@ void GenerateRandomKVs(std::vector<std::string> *keys,
// generating keys that shares the prefix // generating keys that shares the prefix
for (int j = 0; j < keys_share_prefix; ++j) { for (int j = 0; j < keys_share_prefix; ++j) {
// `DataBlockIter` assumes it reads only internal keys. // `DataBlockIter` assumes it reads only internal keys.
keys->emplace_back(GenerateInternalKey(i, j, padding_size, &rnd)); keys->emplace_back(GenerateInternalKey(i, j, padding_size, &rnd, ts_sz));
// 100 bytes values // 100 bytes values
values->emplace_back(rnd.RandomString(100)); values->emplace_back(rnd.RandomString(100));
@ -70,19 +76,39 @@ void GenerateRandomKVs(std::vector<std::string> *keys,
} }
} }
class BlockTest : public testing::Test {}; class BlockTest : public testing::Test,
public testing::WithParamInterface<
std::tuple<bool, test::UserDefinedTimestampTestMode>> {
public:
bool keyUseDeltaEncoding() const { return std::get<0>(GetParam()); }
bool isUDTEnabled() const {
return test::IsUDTEnabled(std::get<1>(GetParam()));
}
bool shouldPersistUDT() const {
return test::ShouldPersistUDT(std::get<1>(GetParam()));
}
};
// block test // block test
TEST_F(BlockTest, SimpleTest) { TEST_P(BlockTest, SimpleTest) {
Random rnd(301); Random rnd(301);
Options options = Options(); Options options = Options();
if (isUDTEnabled()) {
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
}
size_t ts_sz = options.comparator->timestamp_size();
std::vector<std::string> keys; std::vector<std::string> keys;
std::vector<std::string> values; std::vector<std::string> values;
BlockBuilder builder(16); BlockBuilder builder(16, keyUseDeltaEncoding(),
false /* use_value_delta_encoding */,
BlockBasedTableOptions::kDataBlockBinarySearch,
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
shouldPersistUDT(), false /* is_user_key */);
int num_records = 100000; int num_records = 100000;
GenerateRandomKVs(&keys, &values, 0, num_records); GenerateRandomKVs(&keys, &values, 0, num_records, 1 /* step */,
0 /* padding_size */, 1 /* keys_share_prefix */, ts_sz);
// add a bunch of records to a block // add a bunch of records to a block
for (int i = 0; i < num_records; i++) { for (int i = 0; i < num_records; i++) {
builder.Add(keys[i], values[i]); builder.Add(keys[i], values[i]);
@ -98,8 +124,10 @@ TEST_F(BlockTest, SimpleTest) {
// read contents of block sequentially // read contents of block sequentially
int count = 0; int count = 0;
InternalIterator *iter = InternalIterator *iter = reader.NewDataIterator(
reader.NewDataIterator(options.comparator, kDisableGlobalSequenceNumber); options.comparator, kDisableGlobalSequenceNumber, nullptr /* iter */,
nullptr /* stats */, false /* block_contents_pinned */,
shouldPersistUDT());
for (iter->SeekToFirst(); iter->Valid(); count++, iter->Next()) { for (iter->SeekToFirst(); iter->Valid(); count++, iter->Next()) {
// read kv from block // read kv from block
Slice k = iter->key(); Slice k = iter->key();
@ -112,8 +140,10 @@ TEST_F(BlockTest, SimpleTest) {
delete iter; delete iter;
// read block contents randomly // read block contents randomly
iter = iter = reader.NewDataIterator(
reader.NewDataIterator(options.comparator, kDisableGlobalSequenceNumber); options.comparator, kDisableGlobalSequenceNumber, nullptr /* iter */,
nullptr /* stats */, false /* block_contents_pinned */,
shouldPersistUDT());
for (int i = 0; i < num_records; i++) { for (int i = 0; i < num_records; i++) {
// find a random key in the lookaside array // find a random key in the lookaside array
int index = rnd.Uniform(num_records); int index = rnd.Uniform(num_records);
@ -132,8 +162,15 @@ TEST_F(BlockTest, SimpleTest) {
BlockContents GetBlockContents(std::unique_ptr<BlockBuilder> *builder, BlockContents GetBlockContents(std::unique_ptr<BlockBuilder> *builder,
const std::vector<std::string> &keys, const std::vector<std::string> &keys,
const std::vector<std::string> &values, const std::vector<std::string> &values,
bool key_use_delta_encoding, size_t ts_sz,
bool should_persist_udt,
const int /*prefix_group_size*/ = 1) { const int /*prefix_group_size*/ = 1) {
builder->reset(new BlockBuilder(1 /* restart interval */)); builder->reset(
new BlockBuilder(1 /* restart interval */, key_use_delta_encoding,
false /* use_value_delta_encoding */,
BlockBasedTableOptions::kDataBlockBinarySearch,
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
should_persist_udt, false /* is_user_key */));
// Add only half of the keys // Add only half of the keys
for (size_t i = 0; i < keys.size(); ++i) { for (size_t i = 0; i < keys.size(); ++i) {
@ -149,7 +186,8 @@ BlockContents GetBlockContents(std::unique_ptr<BlockBuilder> *builder,
void CheckBlockContents(BlockContents contents, const int max_key, void CheckBlockContents(BlockContents contents, const int max_key,
const std::vector<std::string> &keys, const std::vector<std::string> &keys,
const std::vector<std::string> &values) { const std::vector<std::string> &values,
bool is_udt_enabled, bool should_persist_udt) {
const size_t prefix_size = 6; const size_t prefix_size = 6;
// create block reader // create block reader
BlockContents contents_ref(contents.data); BlockContents contents_ref(contents.data);
@ -160,7 +198,10 @@ void CheckBlockContents(BlockContents contents, const int max_key,
NewFixedPrefixTransform(prefix_size)); NewFixedPrefixTransform(prefix_size));
std::unique_ptr<InternalIterator> regular_iter(reader2.NewDataIterator( std::unique_ptr<InternalIterator> regular_iter(reader2.NewDataIterator(
BytewiseComparator(), kDisableGlobalSequenceNumber)); is_udt_enabled ? test::BytewiseComparatorWithU64TsWrapper()
: BytewiseComparator(),
kDisableGlobalSequenceNumber, nullptr /* iter */, nullptr /* stats */,
false /* block_contents_pinned */, should_persist_udt));
// Seek existent keys // Seek existent keys
for (size_t i = 0; i < keys.size(); i++) { for (size_t i = 0; i < keys.size(); i++) {
@ -178,46 +219,62 @@ void CheckBlockContents(BlockContents contents, const int max_key,
// return the one that is closest. // return the one that is closest.
for (int i = 1; i < max_key - 1; i += 2) { for (int i = 1; i < max_key - 1; i += 2) {
// `DataBlockIter` assumes its APIs receive only internal keys. // `DataBlockIter` assumes its APIs receive only internal keys.
auto key = GenerateInternalKey(i, 0, 0, nullptr); auto key = GenerateInternalKey(i, 0, 0, nullptr,
is_udt_enabled ? 8 : 0 /* ts_sz */);
regular_iter->Seek(key); regular_iter->Seek(key);
ASSERT_TRUE(regular_iter->Valid()); ASSERT_TRUE(regular_iter->Valid());
} }
} }
// In this test case, no two key share same prefix. // In this test case, no two key share same prefix.
TEST_F(BlockTest, SimpleIndexHash) { TEST_P(BlockTest, SimpleIndexHash) {
const int kMaxKey = 100000; const int kMaxKey = 100000;
size_t ts_sz = isUDTEnabled() ? 8 : 0;
std::vector<std::string> keys; std::vector<std::string> keys;
std::vector<std::string> values; std::vector<std::string> values;
GenerateRandomKVs(&keys, &values, 0 /* first key id */, GenerateRandomKVs(&keys, &values, 0 /* first key id */,
kMaxKey /* last key id */, 2 /* step */, kMaxKey /* last key id */, 2 /* step */,
8 /* padding size (8 bytes randomly generated suffix) */); 8 /* padding size (8 bytes randomly generated suffix) */,
1 /* keys_share_prefix */, ts_sz);
std::unique_ptr<BlockBuilder> builder; std::unique_ptr<BlockBuilder> builder;
auto contents = GetBlockContents(&builder, keys, values); auto contents = GetBlockContents(
&builder, keys, values, keyUseDeltaEncoding(), ts_sz, shouldPersistUDT());
CheckBlockContents(std::move(contents), kMaxKey, keys, values); CheckBlockContents(std::move(contents), kMaxKey, keys, values, isUDTEnabled(),
shouldPersistUDT());
} }
TEST_F(BlockTest, IndexHashWithSharedPrefix) { TEST_P(BlockTest, IndexHashWithSharedPrefix) {
const int kMaxKey = 100000; const int kMaxKey = 100000;
// for each prefix, there will be 5 keys starts with it. // for each prefix, there will be 5 keys starts with it.
const int kPrefixGroup = 5; const int kPrefixGroup = 5;
size_t ts_sz = isUDTEnabled() ? 8 : 0;
std::vector<std::string> keys; std::vector<std::string> keys;
std::vector<std::string> values; std::vector<std::string> values;
// Generate keys with same prefix. // Generate keys with same prefix.
GenerateRandomKVs(&keys, &values, 0, // first key id GenerateRandomKVs(&keys, &values, 0, // first key id
kMaxKey, // last key id kMaxKey, // last key id
2, // step 2 /* step */,
10, // padding size, 10 /* padding size (8 bytes randomly generated suffix) */,
kPrefixGroup); kPrefixGroup /* keys_share_prefix */, ts_sz);
std::unique_ptr<BlockBuilder> builder; std::unique_ptr<BlockBuilder> builder;
auto contents = GetBlockContents(&builder, keys, values, kPrefixGroup); auto contents =
GetBlockContents(&builder, keys, values, keyUseDeltaEncoding(),
isUDTEnabled(), shouldPersistUDT(), kPrefixGroup);
CheckBlockContents(std::move(contents), kMaxKey, keys, values); CheckBlockContents(std::move(contents), kMaxKey, keys, values, isUDTEnabled(),
shouldPersistUDT());
} }
// Param 0: key use delta encoding
// Param 1: user-defined timestamp test mode
INSTANTIATE_TEST_CASE_P(
P, BlockTest,
::testing::Combine(::testing::Bool(),
::testing::ValuesIn(test::GetUDTTestModes())));
// A slow and accurate version of BlockReadAmpBitmap that simply store // A slow and accurate version of BlockReadAmpBitmap that simply store
// all the marked ranges in a set. // all the marked ranges in a set.
class BlockReadAmpBitmapSlowAndAccurate { class BlockReadAmpBitmapSlowAndAccurate {
@ -362,7 +419,7 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) {
BlockBuilder builder(16); BlockBuilder builder(16);
int num_records = 10000; int num_records = 10000;
GenerateRandomKVs(&keys, &values, 0, num_records, 1); GenerateRandomKVs(&keys, &values, 0, num_records, 1 /* step */);
// add a bunch of records to a block // add a bunch of records to a block
for (int i = 0; i < num_records; i++) { for (int i = 0; i < num_records; i++) {
builder.Add(keys[i], values[i]); builder.Add(keys[i], values[i]);
@ -495,19 +552,28 @@ TEST_F(BlockTest, ReadAmpBitmapPow2) {
class IndexBlockTest class IndexBlockTest
: public testing::Test, : public testing::Test,
public testing::WithParamInterface<std::tuple<bool, bool>> { public testing::WithParamInterface<
std::tuple<bool, bool, bool, test::UserDefinedTimestampTestMode>> {
public: public:
IndexBlockTest() = default; IndexBlockTest() = default;
bool useValueDeltaEncoding() const { return std::get<0>(GetParam()); } bool keyIncludesSeq() const { return std::get<0>(GetParam()); }
bool includeFirstKey() const { return std::get<1>(GetParam()); } bool useValueDeltaEncoding() const { return std::get<1>(GetParam()); }
bool includeFirstKey() const { return std::get<2>(GetParam()); }
bool isUDTEnabled() const {
return test::IsUDTEnabled(std::get<3>(GetParam()));
}
bool shouldPersistUDT() const {
return test::ShouldPersistUDT(std::get<3>(GetParam()));
}
}; };
// Similar to GenerateRandomKVs but for index block contents. // Similar to GenerateRandomKVs but for index block contents.
void GenerateRandomIndexEntries(std::vector<std::string> *separators, void GenerateRandomIndexEntries(std::vector<std::string> *separators,
std::vector<BlockHandle> *block_handles, std::vector<BlockHandle> *block_handles,
std::vector<std::string> *first_keys, std::vector<std::string> *first_keys,
const int len, bool zero_seqno = false) { const int len, size_t ts_sz = 0,
bool zero_seqno = false) {
Random rnd(42); Random rnd(42);
// For each of `len` blocks, we need to generate a first and last key. // For each of `len` blocks, we need to generate a first and last key.
@ -519,7 +585,13 @@ void GenerateRandomIndexEntries(std::vector<std::string> *separators,
if (zero_seqno) { if (zero_seqno) {
AppendInternalKeyFooter(&new_key, 0 /* seqno */, kTypeValue); AppendInternalKeyFooter(&new_key, 0 /* seqno */, kTypeValue);
} }
keys.insert(std::move(new_key)); if (ts_sz > 0) {
std::string key;
PadInternalKeyWithMinTimestamp(&key, new_key, ts_sz);
keys.insert(std::move(key));
} else {
keys.insert(std::move(new_key));
}
} }
uint64_t offset = 0; uint64_t offset = 0;
@ -536,19 +608,34 @@ void GenerateRandomIndexEntries(std::vector<std::string> *separators,
TEST_P(IndexBlockTest, IndexValueEncodingTest) { TEST_P(IndexBlockTest, IndexValueEncodingTest) {
Random rnd(301); Random rnd(301);
Options options = Options(); Options options = Options();
if (isUDTEnabled()) {
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
}
size_t ts_sz = options.comparator->timestamp_size();
std::vector<std::string> separators; std::vector<std::string> separators;
std::vector<BlockHandle> block_handles; std::vector<BlockHandle> block_handles;
std::vector<std::string> first_keys; std::vector<std::string> first_keys;
const bool kUseDeltaEncoding = true; const bool kUseDeltaEncoding = true;
BlockBuilder builder(16, kUseDeltaEncoding, useValueDeltaEncoding()); BlockBuilder builder(16, kUseDeltaEncoding, useValueDeltaEncoding(),
BlockBasedTableOptions::kDataBlockBinarySearch,
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
shouldPersistUDT(), !keyIncludesSeq());
int num_records = 100; int num_records = 100;
GenerateRandomIndexEntries(&separators, &block_handles, &first_keys, GenerateRandomIndexEntries(&separators, &block_handles, &first_keys,
num_records); num_records, ts_sz, false /* zero_seqno */);
BlockHandle last_encoded_handle; BlockHandle last_encoded_handle;
for (int i = 0; i < num_records; i++) { for (int i = 0; i < num_records; i++) {
IndexValue entry(block_handles[i], first_keys[i]); std::string first_key_to_persist_buf;
Slice first_internal_key = first_keys[i];
if (ts_sz > 0 && !shouldPersistUDT()) {
StripTimestampFromInternalKey(&first_key_to_persist_buf, first_keys[i],
ts_sz);
first_internal_key = first_key_to_persist_buf;
}
IndexValue entry(block_handles[i], first_internal_key);
std::string encoded_entry; std::string encoded_entry;
std::string delta_encoded_entry; std::string delta_encoded_entry;
entry.EncodeTo(&encoded_entry, includeFirstKey(), nullptr); entry.EncodeTo(&encoded_entry, includeFirstKey(), nullptr);
@ -558,7 +645,13 @@ TEST_P(IndexBlockTest, IndexValueEncodingTest) {
} }
last_encoded_handle = entry.handle; last_encoded_handle = entry.handle;
const Slice delta_encoded_entry_slice(delta_encoded_entry); const Slice delta_encoded_entry_slice(delta_encoded_entry);
builder.Add(separators[i], encoded_entry, &delta_encoded_entry_slice);
if (keyIncludesSeq()) {
builder.Add(separators[i], encoded_entry, &delta_encoded_entry_slice);
} else {
const Slice user_key = ExtractUserKey(separators[i]);
builder.Add(user_key, encoded_entry, &delta_encoded_entry_slice);
}
} }
// read serialized contents of the block // read serialized contents of the block
@ -570,14 +663,14 @@ TEST_P(IndexBlockTest, IndexValueEncodingTest) {
Block reader(std::move(contents)); Block reader(std::move(contents));
const bool kTotalOrderSeek = true; const bool kTotalOrderSeek = true;
const bool kIncludesSeq = true;
const bool kValueIsFull = !useValueDeltaEncoding();
IndexBlockIter *kNullIter = nullptr; IndexBlockIter *kNullIter = nullptr;
Statistics *kNullStats = nullptr; Statistics *kNullStats = nullptr;
// read contents of block sequentially // read contents of block sequentially
InternalIteratorBase<IndexValue> *iter = reader.NewIndexIterator( InternalIteratorBase<IndexValue> *iter = reader.NewIndexIterator(
options.comparator, kDisableGlobalSequenceNumber, kNullIter, kNullStats, options.comparator, kDisableGlobalSequenceNumber, kNullIter, kNullStats,
kTotalOrderSeek, includeFirstKey(), kIncludesSeq, kValueIsFull); kTotalOrderSeek, includeFirstKey(), keyIncludesSeq(),
!useValueDeltaEncoding(), false /* block_contents_pinned */,
shouldPersistUDT());
iter->SeekToFirst(); iter->SeekToFirst();
for (int index = 0; index < num_records; ++index) { for (int index = 0; index < num_records; ++index) {
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());
@ -585,7 +678,12 @@ TEST_P(IndexBlockTest, IndexValueEncodingTest) {
Slice k = iter->key(); Slice k = iter->key();
IndexValue v = iter->value(); IndexValue v = iter->value();
EXPECT_EQ(separators[index], k.ToString()); if (keyIncludesSeq()) {
EXPECT_EQ(separators[index], k.ToString());
} else {
const Slice user_key = ExtractUserKey(separators[index]);
EXPECT_EQ(user_key, k);
}
EXPECT_EQ(block_handles[index].offset(), v.handle.offset()); EXPECT_EQ(block_handles[index].offset(), v.handle.offset());
EXPECT_EQ(block_handles[index].size(), v.handle.size()); EXPECT_EQ(block_handles[index].size(), v.handle.size());
EXPECT_EQ(includeFirstKey() ? first_keys[index] : "", EXPECT_EQ(includeFirstKey() ? first_keys[index] : "",
@ -598,7 +696,9 @@ TEST_P(IndexBlockTest, IndexValueEncodingTest) {
// read block contents randomly // read block contents randomly
iter = reader.NewIndexIterator( iter = reader.NewIndexIterator(
options.comparator, kDisableGlobalSequenceNumber, kNullIter, kNullStats, options.comparator, kDisableGlobalSequenceNumber, kNullIter, kNullStats,
kTotalOrderSeek, includeFirstKey(), kIncludesSeq, kValueIsFull); kTotalOrderSeek, includeFirstKey(), keyIncludesSeq(),
!useValueDeltaEncoding(), false /* block_contents_pinned */,
shouldPersistUDT());
for (int i = 0; i < num_records * 2; i++) { for (int i = 0; i < num_records * 2; i++) {
// find a random key in the lookaside array // find a random key in the lookaside array
int index = rnd.Uniform(num_records); int index = rnd.Uniform(num_records);
@ -608,7 +708,12 @@ TEST_P(IndexBlockTest, IndexValueEncodingTest) {
iter->Seek(k); iter->Seek(k);
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());
IndexValue v = iter->value(); IndexValue v = iter->value();
EXPECT_EQ(separators[index], iter->key().ToString()); if (keyIncludesSeq()) {
EXPECT_EQ(separators[index], iter->key().ToString());
} else {
const Slice user_key = ExtractUserKey(separators[index]);
EXPECT_EQ(user_key, iter->key());
}
EXPECT_EQ(block_handles[index].offset(), v.handle.offset()); EXPECT_EQ(block_handles[index].offset(), v.handle.offset());
EXPECT_EQ(block_handles[index].size(), v.handle.size()); EXPECT_EQ(block_handles[index].size(), v.handle.size());
EXPECT_EQ(includeFirstKey() ? first_keys[index] : "", EXPECT_EQ(includeFirstKey() ? first_keys[index] : "",
@ -617,11 +722,15 @@ TEST_P(IndexBlockTest, IndexValueEncodingTest) {
delete iter; delete iter;
} }
INSTANTIATE_TEST_CASE_P(P, IndexBlockTest, // Param 0: key includes sequence number (whether to use user key or internal
::testing::Values(std::make_tuple(false, false), // key as key entry in index block).
std::make_tuple(false, true), // Param 1: use value delta encoding
std::make_tuple(true, false), // Param 2: include first key
std::make_tuple(true, true))); // Param 3: user-defined timestamp test mode
INSTANTIATE_TEST_CASE_P(
P, IndexBlockTest,
::testing::Combine(::testing::Bool(), ::testing::Bool(), ::testing::Bool(),
::testing::ValuesIn(test::GetUDTTestModes())));
class BlockPerKVChecksumTest : public DBTestBase { class BlockPerKVChecksumTest : public DBTestBase {
public: public:
@ -1110,7 +1219,7 @@ TEST_P(IndexBlockKVChecksumTest, ChecksumConstructionAndVerification) {
std::vector<BlockHandle> block_handles; std::vector<BlockHandle> block_handles;
std::vector<std::string> first_keys; std::vector<std::string> first_keys;
GenerateRandomIndexEntries(&separators, &block_handles, &first_keys, GenerateRandomIndexEntries(&separators, &block_handles, &first_keys,
kNumRecords, kNumRecords, 0 /* ts_sz */,
seqno != kDisableGlobalSequenceNumber); seqno != kDisableGlobalSequenceNumber);
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
std::unique_ptr<Block_kIndex> index_block = GenerateIndexBlock( std::unique_ptr<Block_kIndex> index_block = GenerateIndexBlock(
@ -1123,7 +1232,9 @@ TEST_P(IndexBlockKVChecksumTest, ChecksumConstructionAndVerification) {
true /* total_order_seek */, IncludeFirstKey() /* have_first_key */, true /* total_order_seek */, IncludeFirstKey() /* have_first_key */,
true /* key_includes_seq */, true /* key_includes_seq */,
!UseValueDeltaEncoding() /* value_is_full */, !UseValueDeltaEncoding() /* value_is_full */,
true /* block_contents_pinned */, nullptr /* prefix_index */)}; true /* block_contents_pinned*/,
true /* user_defined_timestamps_persisted */,
nullptr /* prefix_index */)};
biter->SeekToFirst(); biter->SeekToFirst();
const char *checksum_ptr = index_block->TEST_GetKVChecksum(); const char *checksum_ptr = index_block->TEST_GetKVChecksum();
// Check checksum of correct length is generated // Check checksum of correct length is generated
@ -1364,7 +1475,9 @@ class IndexBlockKVChecksumCorruptionTest : public IndexBlockKVChecksumTest {
true /* total_order_seek */, IncludeFirstKey() /* have_first_key */, true /* total_order_seek */, IncludeFirstKey() /* have_first_key */,
true /* key_includes_seq */, true /* key_includes_seq */,
!UseValueDeltaEncoding() /* value_is_full */, !UseValueDeltaEncoding() /* value_is_full */,
true /* block_contents_pinned */, nullptr /* prefix_index */)}; true /* block_contents_pinned */,
true /* user_defined_timestamps_persisted */,
nullptr /* prefix_index */)};
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
return biter; return biter;
} }
@ -1407,7 +1520,7 @@ TEST_P(IndexBlockKVChecksumCorruptionTest, CorruptEntry) {
std::vector<BlockHandle> block_handles; std::vector<BlockHandle> block_handles;
std::vector<std::string> first_keys; std::vector<std::string> first_keys;
GenerateRandomIndexEntries(&separators, &block_handles, &first_keys, GenerateRandomIndexEntries(&separators, &block_handles, &first_keys,
kNumRecords, kNumRecords, 0 /* ts_sz */,
seqno != kDisableGlobalSequenceNumber); seqno != kDisableGlobalSequenceNumber);
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"BlockIter::UpdateKey::value", [](void *arg) { "BlockIter::UpdateKey::value", [](void *arg) {
@ -1540,4 +1653,4 @@ int main(int argc, char **argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }

@ -137,7 +137,7 @@ InternalIteratorBase<IndexValue>* HashIndexReader::NewIterator(
rep->get_global_seqno(BlockType::kIndex), iter, kNullStats, rep->get_global_seqno(BlockType::kIndex), iter, kNullStats,
total_order_seek, index_has_first_key(), index_key_includes_seq(), total_order_seek, index_has_first_key(), index_key_includes_seq(),
index_value_is_full(), false /* block_contents_pinned */, index_value_is_full(), false /* block_contents_pinned */,
prefix_index_.get()); true /* user_defined_timestamps_persisted */, prefix_index_.get());
assert(it != nullptr); assert(it != nullptr);
index_block.TransferTo(it); index_block.TransferTo(it);

@ -72,6 +72,22 @@ std::string RandomKey(Random* rnd, int len, RandomKeyType type) {
return result; return result;
} }
const std::vector<UserDefinedTimestampTestMode>& GetUDTTestModes() {
static std::vector<UserDefinedTimestampTestMode> udt_test_modes = {
UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
UserDefinedTimestampTestMode::kNormal,
UserDefinedTimestampTestMode::kNone};
return udt_test_modes;
}
bool IsUDTEnabled(const UserDefinedTimestampTestMode& test_mode) {
return test_mode != UserDefinedTimestampTestMode::kNone;
}
bool ShouldPersistUDT(const UserDefinedTimestampTestMode& test_mode) {
return test_mode != UserDefinedTimestampTestMode::kStripUserDefinedTimestamp;
}
extern Slice CompressibleString(Random* rnd, double compressed_fraction, extern Slice CompressibleString(Random* rnd, double compressed_fraction,
int len, std::string* dst) { int len, std::string* dst) {
int raw = static_cast<int>(len * compressed_fraction); int raw = static_cast<int>(len * compressed_fraction);

@ -52,6 +52,22 @@ enum RandomKeyType : char { RANDOM, LARGEST, SMALLEST, MIDDLE };
extern std::string RandomKey(Random* rnd, int len, extern std::string RandomKey(Random* rnd, int len,
RandomKeyType type = RandomKeyType::RANDOM); RandomKeyType type = RandomKeyType::RANDOM);
enum class UserDefinedTimestampTestMode {
// Test does not enable user-defined timestamp feature.
kNone,
// Test enables user-defined timestamp feature. Write/read with min timestamps
kNormal,
// Test enables user-defined timestamp feature. Write/read with min timestamps
// Set `persist_user_defined_timestamps` to false.
kStripUserDefinedTimestamp,
};
extern const std::vector<UserDefinedTimestampTestMode>& GetUDTTestModes();
extern bool IsUDTEnabled(const UserDefinedTimestampTestMode& test_mode);
extern bool ShouldPersistUDT(const UserDefinedTimestampTestMode& test_mode);
// Store in *dst a string of length "len" that will compress to // Store in *dst a string of length "len" that will compress to
// "N*compressed_fraction" bytes and return a Slice that references // "N*compressed_fraction" bytes and return a Slice that references
// the generated data. // the generated data.

Loading…
Cancel
Save