Fix wrong key being read on ingested file with global seqno and delta encoding (#6669)

Summary:
On reading an ingested SST file, `DataBlockIter` will replace seqno encoded in a key with global seqno. However, if the original seqno was part of the prefix used for the next key, the global seqno is by mistake used as part of the prefix to construct the next key, causing wrong result being returned. Although at this point it is only software error while data in the file is not corrupted, the issue can further cause compaction output out of order and corrupted result when the ingested SST participated in compaction. Fixing the issue by save the actual seqno and restore it before the key being used as prefix to construct next key.

The unit test is by Little-Wallace from https://github.com/facebook/rocksdb/issues/6666. Fixing https://github.com/facebook/rocksdb/issues/6666.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6669

Test Plan:
New unit test

Signed-off-by: Yi Wu <yiwu@pingcap.com>

Reviewed By: cheng-chang

Differential Revision: D20931808

Pulled By: ajkr

fbshipit-source-id: f01959c35d6a493954dca981663766c7a5a9e8ab
main
Yi Wu 5 years ago committed by Facebook GitHub Bot
parent 31759a7094
commit eb287c72d7
  1. 3
      HISTORY.md
  2. 43
      db/external_sst_file_test.cc
  3. 16
      table/block_based/block.cc
  4. 5
      table/block_based/block.h

@ -1,5 +1,8 @@
# Rocksdb Change Log
## Unreleased
### Bug Fixes
* Fix wrong result being read from ingested file. May happen when a key in the file happen to be prefix of another key also in the file. The issue can further cause more data corruption. The issue exists with rocksdb >= 5.0.0 since DB::IngestExternalFile() was introduced.
### New Features
* Added support for pipelined & parallel compression optimization for `BlockBasedTableBuilder`. This optimization makes block building, block compression and block appending a pipeline, and uses multiple threads to accelerate block compression. Users can set `CompressionOptions::parallel_threads` greater than 1 to enable compression parallelism.

@ -6,7 +6,9 @@
#ifndef ROCKSDB_LITE
#include <functional>
#include "db/db_test_util.h"
#include "db/dbformat.h"
#include "file/filename.h"
#include "port/port.h"
#include "port/stack_trace.h"
@ -2799,6 +2801,47 @@ TEST_P(ExternalSSTFileTest, IngestFilesTriggerFlushingWithTwoWriteQueue) {
GenerateAndAddExternalFile(options, data);
}
TEST_P(ExternalSSTFileTest, DeltaEncodingWhileGlobalSeqnoPresents) {
Options options = CurrentOptions();
DestroyAndReopen(options);
constexpr size_t kValueSize = 8;
Random rnd(301);
std::string value(RandomString(&rnd, kValueSize));
// Write some key to make global seqno larger than zero
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put("ab" + Key(i), value));
}
// Get a Snapshot to make RocksDB assign global seqno to ingested sst files.
auto snap = dbfull()->GetSnapshot();
std::string fname = sst_files_dir_ + "test_file";
rocksdb::SstFileWriter writer(EnvOptions(), options);
ASSERT_OK(writer.Open(fname));
std::string key1 = "ab";
std::string key2 = "ab";
// Make the prefix of key2 is same with key1 add zero seqno. The tail of every
// key is composed as (seqno << 8 | value_type), and here `1` represents
// ValueType::kTypeValue
PutFixed64(&key2, PackSequenceAndType(0, kTypeValue));
key2 += "cdefghijkl";
ASSERT_OK(writer.Put(key1, value));
ASSERT_OK(writer.Put(key2, value));
ExternalSstFileInfo info;
ASSERT_OK(writer.Finish(&info));
ASSERT_OK(dbfull()->IngestExternalFile({info.file_path},
IngestExternalFileOptions()));
dbfull()->ReleaseSnapshot(snap);
ASSERT_EQ(value, Get(key1));
// You will get error here
ASSERT_EQ(value, Get(key2));
}
INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest,
testing::Values(std::make_tuple(false, false),
std::make_tuple(false, true),

@ -525,6 +525,9 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
key_.SetKey(Slice(p, non_shared), false /* copy */);
key_pinned_ = true;
} else {
if (global_seqno_ != kDisableGlobalSequenceNumber) {
key_.UpdateInternalKey(stored_seqno_, stored_value_type_);
}
// This key share `shared` bytes with prev key, we need to decode it
key_.TrimAppend(shared, p, non_shared);
key_pinned_ = false;
@ -536,11 +539,12 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
// type is kTypeValue, kTypeMerge, kTypeDeletion, or kTypeRangeDeletion.
assert(GetInternalKeySeqno(key_.GetInternalKey()) == 0);
ValueType value_type = ExtractValueType(key_.GetKey());
assert(value_type == ValueType::kTypeValue ||
value_type == ValueType::kTypeMerge ||
value_type == ValueType::kTypeDeletion ||
value_type == ValueType::kTypeRangeDeletion);
uint64_t packed = ExtractInternalKeyFooter(key_.GetKey());
UnPackSequenceAndType(packed, &stored_seqno_, &stored_value_type_);
assert(stored_value_type_ == ValueType::kTypeValue ||
stored_value_type_ == ValueType::kTypeMerge ||
stored_value_type_ == ValueType::kTypeDeletion ||
stored_value_type_ == ValueType::kTypeRangeDeletion);
if (key_pinned_) {
// TODO(tec): Investigate updating the seqno in the loaded block
@ -552,7 +556,7 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
key_pinned_ = false;
}
key_.UpdateInternalKey(global_seqno_, value_type);
key_.UpdateInternalKey(global_seqno_, stored_value_type_);
}
value_ = Slice(p + non_shared, value_length);

@ -319,6 +319,11 @@ class BlockIter : public InternalIteratorBase<TValue> {
// e.g. PinnableSlice, the pointer to the bytes will still be valid.
bool block_contents_pinned_;
SequenceNumber global_seqno_;
// Save the actual sequence before replaced by global seqno, which potentially
// is used as part of prefix of delta encoding.
SequenceNumber stored_seqno_ = 0;
// Save the value type of key_. Used to restore stored_seqno_.
ValueType stored_value_type_ = kMaxValue;
private:
// Store the cache handle, if the block is cached. We need this since the

Loading…
Cancel
Save