avoid `IterKey::UpdateInternalKey()` in `BlockIter` (#6843)

Summary:
`IterKey::UpdateInternalKey()` is an error-prone API as it's
incompatible with `IterKey::TrimAppend()`, which is used for
decoding delta-encoded internal keys. This PR stops using it in
`BlockIter`. Instead, it assigns global seqno in a separate `IterKey`'s
buffer when needed. The logic for safely getting a Slice with global
seqno properly assigned is encapsulated in `GlobalSeqnoAppliedKey`.
`BinarySeek()` is also migrated to use this API (previously it ignored
global seqno entirely).
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6843

Test Plan:
benchmark setup -- single file DBs, in-memory, no compression. "normal_db"
created by regular flush; "ingestion_db" created by ingesting a file. Both
DBs have same contents.

```
$ TEST_TMPDIR=/dev/shm/normal_db/ ./db_bench -benchmarks=fillrandom,compact -write_buffer_size=10485760000 -disable_auto_compactions=true -compression_type=none -num=1000000
$ ./ldb write_extern_sst ./tmp.sst --db=/dev/shm/ingestion_db/dbbench/ --compression_type=no --hex --create_if_missing < <(./sst_dump --command=scan --output_hex --file=/dev/shm/normal_db/dbbench/000007.sst | awk 'began {print "0x" substr($1, 2, length($1) - 2), "==>", "0x" $5} ; /^Sst file format: block-based/ {began=1}')
$ ./ldb ingest_extern_sst ./tmp.sst --db=/dev/shm/ingestion_db/dbbench/
```

benchmark run command:
```
TEST_TMPDIR=/dev/shm/$DB/ ./db_bench -benchmarks=seekrandom -seek_nexts=10 -use_existing_db=true -cache_index_and_filter_blocks=false -num=1000000 -cache_size=1048576000 -threads=1 -reads=40000000
```

results:

| DB | code | throughput |
|---|---|---|
| normal_db | master |  267.9 |
| normal_db   |    PR6843 | 254.2 (-5.1%) |
| ingestion_db |   master |  259.6 |
| ingestion_db |   PR6843 | 250.5 (-3.5%) |

Reviewed By: pdillinger

Differential Revision: D21562604

Pulled By: ajkr

fbshipit-source-id: 937596f836930515da8084d11755e1f247dcb264
main
Andrew Kryczka 4 years ago committed by Facebook GitHub Bot
parent 961c7590d6
commit c5abf78bca
  1. 1
      HISTORY.md
  2. 57
      db/external_sst_file_test.cc
  3. 96
      table/block_based/block.cc
  4. 84
      table/block_based/block.h

@ -9,6 +9,7 @@
* Fix corruption caused by enabling delete triggered compaction (NewCompactOnDeletionCollectorFactory) in universal compaction mode, along with parallel compactions. The bug can result in two parallel compactions picking the same input files, resulting in the DB resurrecting older and deleted versions of some keys.
* Fix a use-after-free bug in best-efforts recovery. column_family_memtables_ needs to point to valid ColumnFamilySet.
* Let best-efforts recovery ignore corrupted files during table loading.
* Fix corrupt key read from ingested file when iterator direction switches from reverse to forward at a key that is a prefix of another key in the same file. It is only possible in files with a non-zero global seqno.
### Public API Change
* Flush(..., column_family) may return Status::ColumnFamilyDropped() instead of Status::InvalidArgument() if column_family is dropped while processing the flush request.

@ -2801,7 +2801,7 @@ TEST_P(ExternalSSTFileTest, IngestFilesTriggerFlushingWithTwoWriteQueue) {
GenerateAndAddExternalFile(options, data);
}
TEST_P(ExternalSSTFileTest, DeltaEncodingWhileGlobalSeqnoPresents) {
TEST_P(ExternalSSTFileTest, DeltaEncodingWhileGlobalSeqnoPresent) {
Options options = CurrentOptions();
DestroyAndReopen(options);
constexpr size_t kValueSize = 8;
@ -2842,6 +2842,61 @@ TEST_P(ExternalSSTFileTest, DeltaEncodingWhileGlobalSeqnoPresents) {
ASSERT_EQ(value, Get(key2));
}
TEST_P(ExternalSSTFileTest,
DeltaEncodingWhileGlobalSeqnoPresentIteratorSwitch) {
// Regression test for bug where global seqno corrupted the shared bytes
// buffer when switching from reverse iteration to forward iteration.
constexpr size_t kValueSize = 8;
Options options = CurrentOptions();
Random rnd(301);
std::string value(RandomString(&rnd, kValueSize));
std::string key0 = "aa";
std::string key1 = "ab";
// Make the prefix of key2 is same with key1 add zero seqno. The tail of every
// key is composed as (seqno << 8 | value_type), and here `1` represents
// ValueType::kTypeValue
std::string key2 = "ab";
PutFixed64(&key2, PackSequenceAndType(0, kTypeValue));
key2 += "cdefghijkl";
std::string key3 = key2 + "_";
// Write some key to make global seqno larger than zero
ASSERT_OK(Put(key0, value));
std::string fname = sst_files_dir_ + "test_file";
rocksdb::SstFileWriter writer(EnvOptions(), options);
ASSERT_OK(writer.Open(fname));
// key0 is a dummy to ensure the turnaround point (key1) comes from Prev
// cache rather than block (restart keys are pinned in block).
ASSERT_OK(writer.Put(key0, value));
ASSERT_OK(writer.Put(key1, value));
ASSERT_OK(writer.Put(key2, value));
ASSERT_OK(writer.Put(key3, value));
ExternalSstFileInfo info;
ASSERT_OK(writer.Finish(&info));
ASSERT_OK(dbfull()->IngestExternalFile({info.file_path},
IngestExternalFileOptions()));
ReadOptions read_opts;
// Prevents Seek() when switching directions, which circumvents the bug.
read_opts.total_order_seek = true;
Iterator* iter = db_->NewIterator(read_opts);
// Scan backwards to key2. File iterator will then be positioned at key1.
iter->Seek(key3);
ASSERT_EQ(key3, iter->key());
iter->Prev();
ASSERT_EQ(key2, iter->key());
// Scan forwards and make sure key3 is present. Previously key3 would be
// corrupted by the global seqno from key1.
iter->Next();
ASSERT_EQ(key3, iter->key());
delete iter;
}
INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest,
testing::Values(std::make_tuple(false, false),
std::make_tuple(false, true),

@ -188,8 +188,13 @@ void DataBlockIter::Prev() {
const Slice current_key(key_ptr, current_prev_entry.key_size);
current_ = current_prev_entry.offset;
key_.SetKey(current_key, false /* copy */);
raw_key_.SetKey(current_key, false /* copy */);
value_ = current_prev_entry.value;
key_ = applied_key_.UpdateAndGetKey();
// This is kind of odd in that applied_key_ may say the key is pinned while
// key_pinned_ ends up being false. That'll only happen when the key resides
// in a transient caching buffer.
key_pinned_ = key_pinned_ && applied_key_.IsKeyPinned();
return;
}
@ -217,9 +222,9 @@ void DataBlockIter::Prev() {
if (!ParseNextDataKey<DecodeEntry>()) {
break;
}
Slice current_key = key();
Slice current_key = raw_key_.GetKey();
if (key_.IsKeyPinned()) {
if (raw_key_.IsKeyPinned()) {
// The key is not delta encoded
prev_entries_.emplace_back(current_, current_key.data(), 0,
current_key.size(), value());
@ -252,7 +257,8 @@ void DataBlockIter::Seek(const Slice& target) {
SeekToRestartPoint(index);
// Linear search (within restart block) for first key >= target
while (ParseNextDataKey<DecodeEntry>() && Compare(key_, seek_key) < 0) {
while (ParseNextDataKey<DecodeEntry>() &&
comparator_->Compare(applied_key_.UpdateAndGetKey(), seek_key) < 0) {
}
}
@ -273,8 +279,8 @@ void DataBlockIter::Seek(const Slice& target) {
//
// If the return value is TRUE, iter location has two possibilies:
// 1) If iter is valid, it is set to a location as if set by BinarySeek. In
// this case, it points to the first key_ with a larger user_key or a
// matching user_key with a seqno no greater than the seeking seqno.
// this case, it points to the first key with a larger user_key or a matching
// user_key with a seqno no greater than the seeking seqno.
// 2) If the iter is invalid, it means that either all the user_key is less
// than the seek_user_key, or the block ends with a matching user_key but
// with a smaller [ type | seqno ] (i.e. a larger seqno, or the same seqno
@ -330,7 +336,8 @@ bool DataBlockIter::SeekForGetImpl(const Slice& target) {
//
// TODO(fwu): check the left and write boundary of the restart interval
// to avoid linear seek a target key that is out of range.
if (!ParseNextDataKey<DecodeEntry>(limit) || Compare(key_, target) >= 0) {
if (!ParseNextDataKey<DecodeEntry>(limit) ||
comparator_->Compare(applied_key_.UpdateAndGetKey(), target) >= 0) {
// we stop at the first potential matching user key.
break;
}
@ -355,13 +362,13 @@ bool DataBlockIter::SeekForGetImpl(const Slice& target) {
return true;
}
if (user_comparator_->Compare(key_.GetUserKey(), target_user_key) != 0) {
if (user_comparator_->Compare(raw_key_.GetUserKey(), target_user_key) != 0) {
// the key is not in this block and cannot be at the next block either.
return false;
}
// Here we are conservative and only support a limited set of cases
ValueType value_type = ExtractValueType(key_.GetKey());
ValueType value_type = ExtractValueType(applied_key_.UpdateAndGetKey());
if (value_type != ValueType::kTypeValue &&
value_type != ValueType::kTypeDeletion &&
value_type != ValueType::kTypeSingleDeletion &&
@ -411,7 +418,8 @@ void IndexBlockIter::Seek(const Slice& target) {
SeekToRestartPoint(index);
// Linear search (within restart block) for first key >= target
while (ParseNextIndexKey() && Compare(key_, seek_key) < 0) {
while (ParseNextIndexKey() &&
comparator_->Compare(applied_key_.UpdateAndGetKey(), seek_key) < 0) {
}
}
@ -431,12 +439,14 @@ void DataBlockIter::SeekForPrev(const Slice& target) {
SeekToRestartPoint(index);
// Linear search (within restart block) for first key >= seek_key
while (ParseNextDataKey<DecodeEntry>() && Compare(key_, seek_key) < 0) {
while (ParseNextDataKey<DecodeEntry>() &&
comparator_->Compare(applied_key_.UpdateAndGetKey(), seek_key) < 0) {
}
if (!Valid()) {
SeekToLast();
} else {
while (Valid() && Compare(key_, seek_key) > 0) {
while (Valid() &&
comparator_->Compare(applied_key_.UpdateAndGetKey(), seek_key) > 0) {
Prev();
}
}
@ -493,7 +503,7 @@ void BlockIter<TValue>::CorruptionError() {
current_ = restarts_;
restart_index_ = num_restarts_;
status_ = Status::Corruption("bad entry in block");
key_.Clear();
raw_key_.Clear();
value_.clear();
}
@ -515,49 +525,37 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
// Decode next entry
uint32_t shared, non_shared, value_length;
p = DecodeEntryFunc()(p, limit, &shared, &non_shared, &value_length);
if (p == nullptr || key_.Size() < shared) {
if (p == nullptr || raw_key_.Size() < shared) {
CorruptionError();
return false;
} else {
if (shared == 0) {
// If this key doesn't share any bytes with prev key then we don't need
// to decode it and can use its address in the block directly.
key_.SetKey(Slice(p, non_shared), false /* copy */);
key_pinned_ = true;
raw_key_.SetKey(Slice(p, non_shared), false /* copy */);
} else {
if (global_seqno_ != kDisableGlobalSequenceNumber) {
key_.UpdateInternalKey(stored_seqno_, stored_value_type_);
}
// This key share `shared` bytes with prev key, we need to decode it
key_.TrimAppend(shared, p, non_shared);
key_pinned_ = false;
raw_key_.TrimAppend(shared, p, non_shared);
}
key_ = applied_key_.UpdateAndGetKey();
key_pinned_ = applied_key_.IsKeyPinned();
#ifndef NDEBUG
if (global_seqno_ != kDisableGlobalSequenceNumber) {
// If we are reading a file with a global sequence number we should
// expect that all encoded sequence numbers are zeros and any value
// type is kTypeValue, kTypeMerge, kTypeDeletion, or kTypeRangeDeletion.
assert(GetInternalKeySeqno(key_.GetInternalKey()) == 0);
uint64_t packed = ExtractInternalKeyFooter(key_.GetKey());
UnPackSequenceAndType(packed, &stored_seqno_, &stored_value_type_);
assert(stored_value_type_ == ValueType::kTypeValue ||
stored_value_type_ == ValueType::kTypeMerge ||
stored_value_type_ == ValueType::kTypeDeletion ||
stored_value_type_ == ValueType::kTypeRangeDeletion);
if (key_pinned_) {
// TODO(tec): Investigate updating the seqno in the loaded block
// directly instead of doing a copy and update.
// We cannot use the key address in the block directly because
// we have a global_seqno_ that will overwrite the encoded one.
key_.OwnKey();
key_pinned_ = false;
}
key_.UpdateInternalKey(global_seqno_, stored_value_type_);
uint64_t packed = ExtractInternalKeyFooter(raw_key_.GetKey());
SequenceNumber seqno;
ValueType value_type;
UnPackSequenceAndType(packed, &seqno, &value_type);
assert(value_type == ValueType::kTypeValue ||
value_type == ValueType::kTypeMerge ||
value_type == ValueType::kTypeDeletion ||
value_type == ValueType::kTypeRangeDeletion);
assert(seqno == 0);
}
#endif // NDEBUG
value_ = Slice(p + non_shared, value_length);
if (shared == 0) {
@ -591,20 +589,20 @@ bool IndexBlockIter::ParseNextIndexKey() {
} else {
p = DecodeEntry()(p, limit, &shared, &non_shared, &value_length);
}
if (p == nullptr || key_.Size() < shared) {
if (p == nullptr || raw_key_.Size() < shared) {
CorruptionError();
return false;
}
if (shared == 0) {
// If this key doesn't share any bytes with prev key then we don't need
// to decode it and can use its address in the block directly.
key_.SetKey(Slice(p, non_shared), false /* copy */);
key_pinned_ = true;
raw_key_.SetKey(Slice(p, non_shared), false /* copy */);
} else {
// This key share `shared` bytes with prev key, we need to decode it
key_.TrimAppend(shared, p, non_shared);
key_pinned_ = false;
raw_key_.TrimAppend(shared, p, non_shared);
}
key_ = applied_key_.UpdateAndGetKey();
key_pinned_ = applied_key_.IsKeyPinned();
value_ = Slice(p + non_shared, value_length);
if (shared == 0) {
while (restart_index_ + 1 < num_restarts_ &&
@ -683,7 +681,8 @@ bool BlockIter<TValue>::BinarySeek(const Slice& target, uint32_t left,
return false;
}
Slice mid_key(key_ptr, non_shared);
int cmp = comp->Compare(mid_key, target);
raw_key_.SetKey(mid_key, false /* copy */);
int cmp = comp->Compare(applied_key_.UpdateAndGetKey(), target);
if (cmp < 0) {
// Key at "mid" is smaller than "target". Therefore all
// blocks before "mid" are uninteresting.
@ -717,7 +716,8 @@ int IndexBlockIter::CompareBlockKey(uint32_t block_index, const Slice& target) {
return 1; // Return target is smaller
}
Slice block_key(key_ptr, non_shared);
return Compare(block_key, target);
raw_key_.SetKey(block_key, false /* copy */);
return comparator_->Compare(applied_key_.UpdateAndGetKey(), target);
}
// Binary search in block_ids to find the first block

@ -228,6 +228,49 @@ class Block {
DataBlockHashIndex data_block_hash_index_;
};
// A GlobalSeqnoAppliedKey exposes a key with global sequence number applied
// if configured with `global_seqno != kDisableGlobalSequenceNumber`. It may
// hold a user key or an internal key since `format_version>=3` index blocks
// contain user keys. In case it holds user keys, it must be configured with
// `global_seqno == kDisableGlobalSequenceNumber`.
class GlobalSeqnoAppliedKey {
public:
void Initialize(IterKey* key, SequenceNumber global_seqno) {
key_ = key;
global_seqno_ = global_seqno;
#ifndef NDEBUG
init_ = true;
#endif // NDEBUG
}
Slice UpdateAndGetKey() {
assert(init_);
if (global_seqno_ == kDisableGlobalSequenceNumber) {
return key_->GetKey();
}
ParsedInternalKey parsed(Slice(), 0, kTypeValue);
if (!ParseInternalKey(key_->GetInternalKey(), &parsed)) {
assert(false); // error not handled in optimized builds
return Slice();
}
parsed.sequence = global_seqno_;
scratch_.SetInternalKey(parsed);
return scratch_.GetInternalKey();
}
bool IsKeyPinned() const {
return global_seqno_ == kDisableGlobalSequenceNumber && key_->IsKeyPinned();
}
private:
const IterKey* key_;
SequenceNumber global_seqno_;
IterKey scratch_;
#ifndef NDEBUG
bool init_ = false;
#endif // NDEBUG
};
template <class TValue>
class BlockIter : public InternalIteratorBase<TValue> {
public:
@ -237,6 +280,8 @@ class BlockIter : public InternalIteratorBase<TValue> {
assert(data_ == nullptr); // Ensure it is called only once
assert(num_restarts > 0); // Ensure the param is valid
applied_key_.Initialize(&raw_key_, global_seqno);
comparator_ = comparator;
data_ = data;
restarts_ = restarts;
@ -267,7 +312,7 @@ class BlockIter : public InternalIteratorBase<TValue> {
Status status() const override { return status_; }
Slice key() const override {
assert(Valid());
return key_.GetKey();
return key_;
}
#ifndef NDEBUG
@ -310,7 +355,13 @@ class BlockIter : public InternalIteratorBase<TValue> {
uint32_t restarts_; // Offset of restart array (list of fixed32)
// current_ is offset in data_ of current entry. >= restarts_ if !Valid
uint32_t current_;
IterKey key_;
// Raw key from block.
IterKey raw_key_;
// raw_key_ with global seqno applied if necessary. Use this one for
// comparisons.
GlobalSeqnoAppliedKey applied_key_;
// Key to be exposed to users.
Slice key_;
Slice value_;
Status status_;
bool key_pinned_;
@ -319,11 +370,6 @@ class BlockIter : public InternalIteratorBase<TValue> {
// e.g. PinnableSlice, the pointer to the bytes will still be valid.
bool block_contents_pinned_;
SequenceNumber global_seqno_;
// Save the actual sequence before replaced by global seqno, which potentially
// is used as part of prefix of delta encoding.
SequenceNumber stored_seqno_ = 0;
// Save the value type of key_. Used to restore stored_seqno_.
ValueType stored_value_type_ = kMaxValue;
private:
// Store the cache handle, if the block is cached. We need this since the
@ -346,7 +392,7 @@ class BlockIter : public InternalIteratorBase<TValue> {
}
void SeekToRestartPoint(uint32_t index) {
key_.Clear();
raw_key_.Clear();
restart_index_ = index;
// current_ will be fixed by ParseNextKey();
@ -386,7 +432,7 @@ class DataBlockIter final : public BlockIter<Slice> {
InitializeBase(comparator, data, restarts, num_restarts, global_seqno,
block_contents_pinned);
user_comparator_ = user_comparator;
key_.SetIsUserKey(false);
raw_key_.SetIsUserKey(false);
read_amp_bitmap_ = read_amp_bitmap;
last_bitmap_offset_ = current_ + 1;
data_block_hash_index_ = data_block_hash_index;
@ -477,10 +523,6 @@ class DataBlockIter final : public BlockIter<Slice> {
template <typename DecodeEntryFunc>
inline bool ParseNextDataKey(const char* limit = nullptr);
inline int Compare(const IterKey& ikey, const Slice& b) const {
return comparator_->Compare(ikey.GetInternalKey(), b);
}
bool SeekForGetImpl(const Slice& target);
};
@ -488,10 +530,6 @@ class IndexBlockIter final : public BlockIter<IndexValue> {
public:
IndexBlockIter() : BlockIter(), prefix_index_(nullptr) {}
Slice key() const override {
assert(Valid());
return key_.GetKey();
}
// key_includes_seq, default true, means that the keys are in internal key
// format.
// value_is_full, default true, means that no delta encoding is
@ -511,7 +549,7 @@ class IndexBlockIter final : public BlockIter<IndexValue> {
restarts, num_restarts, kDisableGlobalSequenceNumber,
block_contents_pinned);
key_includes_seq_ = key_includes_seq;
key_.SetIsUserKey(!key_includes_seq_);
raw_key_.SetIsUserKey(!key_includes_seq_);
prefix_index_ = prefix_index;
value_delta_encoded_ = !value_is_full;
have_first_key_ = have_first_key;
@ -559,7 +597,7 @@ class IndexBlockIter final : public BlockIter<IndexValue> {
status_ = Status::InvalidArgument(
"RocksDB internal error: should never call SeekForPrev() on index "
"blocks");
key_.Clear();
raw_key_.Clear();
value_.clear();
}
@ -619,14 +657,6 @@ class IndexBlockIter final : public BlockIter<IndexValue> {
bool* prefix_may_exist);
inline int CompareBlockKey(uint32_t block_index, const Slice& target);
inline int Compare(const Slice& a, const Slice& b) const {
return comparator_->Compare(a, b);
}
inline int Compare(const IterKey& ikey, const Slice& b) const {
return comparator_->Compare(ikey.GetKey(), b);
}
inline bool ParseNextIndexKey();
// When value_delta_encoded_ is enabled it decodes the value which is assumed

Loading…
Cancel
Save