diff --git a/HISTORY.md b/HISTORY.md index a65500b1e..ffad10f98 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,8 @@ # Rocksdb Change Log ## Unreleased +### Bug Fixes +* Fix wrong result being read from ingested file. May happen when a key in the file happen to be prefix of another key also in the file. The issue can further cause more data corruption. The issue exists with rocksdb >= 5.0.0 since DB::IngestExternalFile() was introduced. + ### New Features * Added support for pipelined & parallel compression optimization for `BlockBasedTableBuilder`. This optimization makes block building, block compression and block appending a pipeline, and uses multiple threads to accelerate block compression. Users can set `CompressionOptions::parallel_threads` greater than 1 to enable compression parallelism. diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 259d8ba93..e67bebb05 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -6,7 +6,9 @@ #ifndef ROCKSDB_LITE #include + #include "db/db_test_util.h" +#include "db/dbformat.h" #include "file/filename.h" #include "port/port.h" #include "port/stack_trace.h" @@ -2799,6 +2801,47 @@ TEST_P(ExternalSSTFileTest, IngestFilesTriggerFlushingWithTwoWriteQueue) { GenerateAndAddExternalFile(options, data); } +TEST_P(ExternalSSTFileTest, DeltaEncodingWhileGlobalSeqnoPresents) { + Options options = CurrentOptions(); + DestroyAndReopen(options); + constexpr size_t kValueSize = 8; + Random rnd(301); + std::string value(RandomString(&rnd, kValueSize)); + + // Write some key to make global seqno larger than zero + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put("ab" + Key(i), value)); + } + // Get a Snapshot to make RocksDB assign global seqno to ingested sst files. + auto snap = dbfull()->GetSnapshot(); + + std::string fname = sst_files_dir_ + "test_file"; + rocksdb::SstFileWriter writer(EnvOptions(), options); + ASSERT_OK(writer.Open(fname)); + std::string key1 = "ab"; + std::string key2 = "ab"; + + // Make the prefix of key2 is same with key1 add zero seqno. The tail of every + // key is composed as (seqno << 8 | value_type), and here `1` represents + // ValueType::kTypeValue + + PutFixed64(&key2, PackSequenceAndType(0, kTypeValue)); + key2 += "cdefghijkl"; + + ASSERT_OK(writer.Put(key1, value)); + ASSERT_OK(writer.Put(key2, value)); + + ExternalSstFileInfo info; + ASSERT_OK(writer.Finish(&info)); + + ASSERT_OK(dbfull()->IngestExternalFile({info.file_path}, + IngestExternalFileOptions())); + dbfull()->ReleaseSnapshot(snap); + ASSERT_EQ(value, Get(key1)); + // You will get error here + ASSERT_EQ(value, Get(key2)); +} + INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest, testing::Values(std::make_tuple(false, false), std::make_tuple(false, true), diff --git a/table/block_based/block.cc b/table/block_based/block.cc index 04829faf8..8afa2cccf 100644 --- a/table/block_based/block.cc +++ b/table/block_based/block.cc @@ -525,6 +525,9 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) { key_.SetKey(Slice(p, non_shared), false /* copy */); key_pinned_ = true; } else { + if (global_seqno_ != kDisableGlobalSequenceNumber) { + key_.UpdateInternalKey(stored_seqno_, stored_value_type_); + } // This key share `shared` bytes with prev key, we need to decode it key_.TrimAppend(shared, p, non_shared); key_pinned_ = false; @@ -536,11 +539,12 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) { // type is kTypeValue, kTypeMerge, kTypeDeletion, or kTypeRangeDeletion. assert(GetInternalKeySeqno(key_.GetInternalKey()) == 0); - ValueType value_type = ExtractValueType(key_.GetKey()); - assert(value_type == ValueType::kTypeValue || - value_type == ValueType::kTypeMerge || - value_type == ValueType::kTypeDeletion || - value_type == ValueType::kTypeRangeDeletion); + uint64_t packed = ExtractInternalKeyFooter(key_.GetKey()); + UnPackSequenceAndType(packed, &stored_seqno_, &stored_value_type_); + assert(stored_value_type_ == ValueType::kTypeValue || + stored_value_type_ == ValueType::kTypeMerge || + stored_value_type_ == ValueType::kTypeDeletion || + stored_value_type_ == ValueType::kTypeRangeDeletion); if (key_pinned_) { // TODO(tec): Investigate updating the seqno in the loaded block @@ -552,7 +556,7 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) { key_pinned_ = false; } - key_.UpdateInternalKey(global_seqno_, value_type); + key_.UpdateInternalKey(global_seqno_, stored_value_type_); } value_ = Slice(p + non_shared, value_length); diff --git a/table/block_based/block.h b/table/block_based/block.h index 9e6dbc2f4..1e0440491 100644 --- a/table/block_based/block.h +++ b/table/block_based/block.h @@ -319,6 +319,11 @@ class BlockIter : public InternalIteratorBase { // e.g. PinnableSlice, the pointer to the bytes will still be valid. bool block_contents_pinned_; SequenceNumber global_seqno_; + // Save the actual sequence before replaced by global seqno, which potentially + // is used as part of prefix of delta encoding. + SequenceNumber stored_seqno_ = 0; + // Save the value type of key_. Used to restore stored_seqno_. + ValueType stored_value_type_ = kMaxValue; private: // Store the cache handle, if the block is cached. We need this since the