diff --git a/CMakeLists.txt b/CMakeLists.txt index b2e600923..7eccd754d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -620,6 +620,7 @@ set(SOURCES db/memtable_list.cc db/merge_helper.cc db/merge_operator.cc + db/output_validator.cc db/range_del_aggregator.cc db/range_tombstone_fragmenter.cc db/repair.cc diff --git a/HISTORY.md b/HISTORY.md index 1ff4c895d..8001464d5 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -14,6 +14,7 @@ ### New Features * Methods to configure serialize, and compare -- such as TableFactory -- are exposed directly through the Configurable base class (from which these objects inherit). This change will allow for better and more thorough configuration management and retrieval in the future. The options for a Configurable object can be set via the ConfigureFromMap, ConfigureFromString, or ConfigureOption method. The serialized version of the options of an object can be retrieved via the GetOptionString, ToString, or GetOption methods. The list of options supported by an object can be obtained via the GetOptionNames method. The "raw" object (such as the BlockBasedTableOption) for an option may be retrieved via the GetOptions method. Configurable options can be compared via the AreEquivalent method. The settings within a Configurable object may be validated via the ValidateOptions method. The object may be intialized (at which point only mutable options may be updated) via the PrepareOptions method. +* Introduce options.check_flush_compaction_key_order with default value to be true. With this option, during flush and compaction, key order will be checked when writing to each SST file. If the order is violated, the flush or compaction will fail. ## 6.13 (09/12/2020) ### Bug fixes diff --git a/TARGETS b/TARGETS index 0de860173..ee66ee94c 100644 --- a/TARGETS +++ b/TARGETS @@ -184,6 +184,7 @@ cpp_library( "db/memtable_list.cc", "db/merge_helper.cc", "db/merge_operator.cc", + "db/output_validator.cc", "db/range_del_aggregator.cc", "db/range_tombstone_fragmenter.cc", "db/repair.cc", diff --git a/db/builder.cc b/db/builder.cc index f4fd717a4..fe73b5ef2 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -19,6 +19,7 @@ #include "db/event_helpers.h" #include "db/internal_stats.h" #include "db/merge_helper.h" +#include "db/output_validator.h" #include "db/range_del_aggregator.h" #include "db/table_cache.h" #include "db/version_edit.h" @@ -96,7 +97,11 @@ Status BuildTable( column_family_name.empty()); // Reports the IOStats for flush for every following bytes. const size_t kReportFlushIOStatsEvery = 1048576; - uint64_t paranoid_hash = 0; + OutputValidator output_validator( + internal_comparator, + /*enable_order_check=*/ + mutable_cf_options.check_flush_compaction_key_order, + /*enable_hash=*/paranoid_file_checks); Status s; meta->fd.file_size = 0; iter->SeekToFirst(); @@ -187,10 +192,10 @@ Status BuildTable( const Slice& key = c_iter.key(); const Slice& value = c_iter.value(); const ParsedInternalKey& ikey = c_iter.ikey(); - if (paranoid_file_checks) { - // Generate a rolling 64-bit hash of the key and values - paranoid_hash = Hash64(key.data(), key.size(), paranoid_hash); - paranoid_hash = Hash64(value.data(), value.size(), paranoid_hash); + // Generate a rolling 64-bit hash of the key and values + s = output_validator.Add(key, value); + if (!s.ok()) { + break; } builder->Add(key, value); meta->UpdateBoundaries(key, value, ikey.sequence, ikey.type); @@ -202,23 +207,24 @@ Status BuildTable( ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); } } + if (s.ok()) { + auto range_del_it = range_del_agg->NewIterator(); + for (range_del_it->SeekToFirst(); range_del_it->Valid(); + range_del_it->Next()) { + auto tombstone = range_del_it->Tombstone(); + auto kv = tombstone.Serialize(); + builder->Add(kv.first.Encode(), kv.second); + meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(), + tombstone.seq_, internal_comparator); + } - auto range_del_it = range_del_agg->NewIterator(); - for (range_del_it->SeekToFirst(); range_del_it->Valid(); - range_del_it->Next()) { - auto tombstone = range_del_it->Tombstone(); - auto kv = tombstone.Serialize(); - builder->Add(kv.first.Encode(), kv.second); - meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(), - tombstone.seq_, internal_comparator); - } - - // Finish and check for builder errors - s = c_iter.status(); + // Finish and check for builder errors + s = c_iter.status(); - if (blob_file_builder) { - if (s.ok()) { - s = blob_file_builder->Finish(); + if (blob_file_builder) { + if (s.ok()) { + s = blob_file_builder->Finish(); + } } } @@ -291,15 +297,15 @@ Status BuildTable( /*allow_unprepared_value*/ false)); s = it->status(); if (s.ok() && paranoid_file_checks) { - uint64_t check_hash = 0; + OutputValidator file_validator(internal_comparator, + /*enable_order_check=*/true, + /*enable_hash=*/true); for (it->SeekToFirst(); it->Valid(); it->Next()) { // Generate a rolling 64-bit hash of the key and values - check_hash = Hash64(it->key().data(), it->key().size(), check_hash); - check_hash = - Hash64(it->value().data(), it->value().size(), check_hash); + file_validator.Add(it->key(), it->value()).PermitUncheckedError(); } s = it->status(); - if (s.ok() && check_hash != paranoid_hash) { + if (s.ok() && !output_validator.CompareValidator(file_validator)) { s = Status::Corruption("Paranoid checksums do not match"); } } diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 457e57ebb..185127c86 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -32,6 +32,7 @@ #include "db/memtable_list.h" #include "db/merge_context.h" #include "db/merge_helper.h" +#include "db/output_validator.h" #include "db/range_del_aggregator.h" #include "db/version_set.h" #include "file/filename.h" @@ -124,9 +125,14 @@ struct CompactionJob::SubcompactionState { // Files produced by this subcompaction struct Output { + Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp, + bool _enable_order_check, bool _enable_hash) + : meta(std::move(_meta)), + validator(_icmp, _enable_order_check, _enable_hash), + finished(false) {} FileMetaData meta; + OutputValidator validator; bool finished; - uint64_t paranoid_hash; std::shared_ptr table_properties; }; @@ -170,17 +176,16 @@ struct CompactionJob::SubcompactionState { // Adds the key and value to the builder // If paranoid is true, adds the key-value to the paranoid hash - void AddToBuilder(const Slice& key, const Slice& value, bool paranoid) { + Status AddToBuilder(const Slice& key, const Slice& value) { auto curr = current_output(); assert(builder != nullptr); assert(curr != nullptr); - if (paranoid) { - // Generate a rolling 64-bit hash of the key and values - curr->paranoid_hash = Hash64(key.data(), key.size(), curr->paranoid_hash); - curr->paranoid_hash = - Hash64(value.data(), value.size(), curr->paranoid_hash); + Status s = curr->validator.Add(key, value); + if (!s.ok()) { + return s; } builder->Add(key, value); + return Status::OK(); } // Returns true iff we should stop building the current output @@ -662,14 +667,20 @@ Status CompactionJob::Run() { auto s = iter->status(); if (s.ok() && paranoid_file_checks_) { - uint64_t hash = 0; + OutputValidator validator(cfd->internal_comparator(), + /*_enable_order_check=*/true, + /*_enable_hash=*/true); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - // Generate a rolling 64-bit hash of the key and values, using the - hash = Hash64(iter->key().data(), iter->key().size(), hash); - hash = Hash64(iter->value().data(), iter->value().size(), hash); + s = validator.Add(iter->key(), iter->value()); + if (!s.ok()) { + break; + } } - s = iter->status(); - if (s.ok() && hash != files_output[file_idx]->paranoid_hash) { + if (s.ok()) { + s = iter->status(); + } + if (s.ok() && + !validator.CompareValidator(files_output[file_idx]->validator)) { s = Status::Corruption("Paranoid checksums do not match"); } } @@ -961,7 +972,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { break; } } - sub_compact->AddToBuilder(key, value, paranoid_file_checks_); + status = sub_compact->AddToBuilder(key, value); + if (!status.ok()) { + break; + } sub_compact->current_output_file_size = sub_compact->builder->EstimatedFileSize(); @@ -1276,8 +1290,8 @@ Status CompactionJob::FinishCompactionOutputFile( auto kv = tombstone.Serialize(); assert(lower_bound == nullptr || ucmp->Compare(*lower_bound, kv.second) < 0); - sub_compact->AddToBuilder(kv.first.Encode(), kv.second, - paranoid_file_checks_); + // Range tombstone is not supported by output validator yet. + sub_compact->builder->Add(kv.first.Encode(), kv.second); InternalKey smallest_candidate = std::move(kv.first); if (lower_bound != nullptr && ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) { @@ -1594,14 +1608,17 @@ Status CompactionJob::OpenCompactionOutputFile( // Initialize a SubcompactionState::Output and add it to sub_compact->outputs { - SubcompactionState::Output out; - out.meta.fd = FileDescriptor(file_number, - sub_compact->compaction->output_path_id(), 0); - out.meta.oldest_ancester_time = oldest_ancester_time; - out.meta.file_creation_time = current_time; - out.finished = false; - out.paranoid_hash = 0; - sub_compact->outputs.push_back(out); + FileMetaData meta; + meta.fd = FileDescriptor(file_number, + sub_compact->compaction->output_path_id(), 0); + meta.oldest_ancester_time = oldest_ancester_time; + meta.file_creation_time = current_time; + sub_compact->outputs.emplace_back( + std::move(meta), cfd->internal_comparator(), + /*enable_order_check=*/ + sub_compact->compaction->mutable_cf_options() + ->check_flush_compaction_key_order, + /*enable_hash=*/paranoid_file_checks_); } writable_file->SetIOPriority(Env::IOPriority::IO_LOW); diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index a94e3a2b8..210042ca0 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -130,7 +130,7 @@ class CompactionJobTest : public testing::Test { return blob_index; } - void AddMockFile(const stl_wrappers::KVMap& contents, int level = 0) { + void AddMockFile(const mock::KVVector& contents, int level = 0) { assert(contents.size() > 0); bool first_key = true; @@ -205,8 +205,8 @@ class CompactionJobTest : public testing::Test { } // returns expected result after compaction - stl_wrappers::KVMap CreateTwoFiles(bool gen_corrupted_keys) { - auto expected_results = mock::MakeMockFile(); + mock::KVVector CreateTwoFiles(bool gen_corrupted_keys) { + stl_wrappers::KVMap expected_results; const int kKeysPerFile = 10000; const int kCorruptKeysPerFile = 200; const int kMatchingKeys = kKeysPerFile / 2; @@ -232,19 +232,25 @@ class CompactionJobTest : public testing::Test { test::CorruptKeyType(&internal_key); test::CorruptKeyType(&bottommost_internal_key); } - contents.insert({ internal_key.Encode().ToString(), value }); + contents.push_back({internal_key.Encode().ToString(), value}); if (i == 1 || k < kMatchingKeys || corrupt_id(k - kMatchingKeys)) { expected_results.insert( - { bottommost_internal_key.Encode().ToString(), value }); + {bottommost_internal_key.Encode().ToString(), value}); } } + mock::SortKVVector(&contents); AddMockFile(contents); } SetLastSequence(sequence_number); - return expected_results; + mock::KVVector expected_results_kvvector; + for (auto& kv : expected_results) { + expected_results_kvvector.push_back({kv.first, kv.second}); + } + + return expected_results_kvvector; } void NewDB() { @@ -299,7 +305,7 @@ class CompactionJobTest : public testing::Test { void RunCompaction( const std::vector>& input_files, - const stl_wrappers::KVMap& expected_results, + const mock::KVVector& expected_results, const std::vector& snapshots = {}, SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber, int output_level = 1, bool verify = true, @@ -644,7 +650,7 @@ TEST_F(CompactionJobTest, FilterAllMergeOperands) { SetLastSequence(11U); auto files = cfd_->current()->storage_info()->LevelFiles(0); - stl_wrappers::KVMap empty_map; + mock::KVVector empty_map; RunCompaction({files}, empty_map); } diff --git a/db/corruption_test.cc b/db/corruption_test.cc index 0633e23cf..bcce7e0ee 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -562,12 +562,14 @@ TEST_F(CorruptionTest, FileSystemStateCorrupted) { } } -static const auto& corruption_modes = {mock::MockTableFactory::kCorruptNone, - mock::MockTableFactory::kCorruptKey, - mock::MockTableFactory::kCorruptValue}; +static const auto& corruption_modes = { + mock::MockTableFactory::kCorruptNone, mock::MockTableFactory::kCorruptKey, + mock::MockTableFactory::kCorruptValue, + mock::MockTableFactory::kCorruptReorderKey}; TEST_F(CorruptionTest, ParanoidFileChecksOnFlush) { Options options; + options.check_flush_compaction_key_order = false; options.paranoid_file_checks = true; options.create_if_missing = true; Status s; @@ -595,6 +597,7 @@ TEST_F(CorruptionTest, ParanoidFileChecksOnCompact) { Options options; options.paranoid_file_checks = true; options.create_if_missing = true; + options.check_flush_compaction_key_order = false; Status s; for (const auto& mode : corruption_modes) { delete db_; @@ -644,6 +647,78 @@ TEST_F(CorruptionTest, LogCorruptionErrorsInCompactionIterator) { ASSERT_TRUE(s.IsCorruption()); } +TEST_F(CorruptionTest, CompactionKeyOrderCheck) { + Options options; + options.paranoid_file_checks = false; + options.create_if_missing = true; + options.check_flush_compaction_key_order = false; + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + std::shared_ptr mock = + std::make_shared(); + options.table_factory = mock; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + assert(db_ != nullptr); + mock->SetCorruptionMode(mock::MockTableFactory::kCorruptReorderKey); + Build(100, 2); + DBImpl* dbi = static_cast_with_check(db_); + ASSERT_OK(dbi->TEST_FlushMemTable()); + + mock->SetCorruptionMode(mock::MockTableFactory::kCorruptNone); + ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "true"}})); + ASSERT_NOK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true)); +} + +TEST_F(CorruptionTest, FlushKeyOrderCheck) { + Options options; + options.paranoid_file_checks = false; + options.create_if_missing = true; + ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "true"}})); + + ASSERT_OK(db_->Put(WriteOptions(), "foo1", "v1")); + ASSERT_OK(db_->Put(WriteOptions(), "foo2", "v1")); + ASSERT_OK(db_->Put(WriteOptions(), "foo3", "v1")); + ASSERT_OK(db_->Put(WriteOptions(), "foo4", "v1")); + + int cnt = 0; + // Generate some out of order keys from the memtable + SyncPoint::GetInstance()->SetCallBack( + "MemTableIterator::Next:0", [&](void* arg) { + MemTableRep::Iterator* mem_iter = + static_cast(arg); + if (++cnt == 3) { + mem_iter->Prev(); + mem_iter->Prev(); + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + Status s = static_cast_with_check(db_)->TEST_FlushMemTable(); + ASSERT_NOK(s); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(CorruptionTest, DisableKeyOrderCheck) { + Options options; + ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "false"}})); + DBImpl* dbi = static_cast_with_check(db_); + + SyncPoint::GetInstance()->SetCallBack( + "OutputValidator::Add:order_check", + [&](void* /*arg*/) { ASSERT_TRUE(false); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(db_->Put(WriteOptions(), "foo1", "v1")); + ASSERT_OK(db_->Put(WriteOptions(), "foo3", "v1")); + ASSERT_OK(dbi->TEST_FlushMemTable()); + ASSERT_OK(db_->Put(WriteOptions(), "foo2", "v1")); + ASSERT_OK(db_->Put(WriteOptions(), "foo4", "v1")); + ASSERT_OK(dbi->TEST_FlushMemTable()); + ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true)); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_test.cc b/db/db_test.cc index c624f1b9e..7291999e2 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5291,6 +5291,13 @@ TEST_F(DBTest, DynamicMiscOptions) { ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1], &mutable_cf_options)); ASSERT_TRUE(mutable_cf_options.report_bg_io_stats); + ASSERT_TRUE(mutable_cf_options.check_flush_compaction_key_order); + + ASSERT_OK(dbfull()->SetOptions( + handles_[1], {{"check_flush_compaction_key_order", "false"}})); + ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1], + &mutable_cf_options)); + ASSERT_FALSE(mutable_cf_options.check_flush_compaction_key_order); } #endif // ROCKSDB_LITE diff --git a/db/dbformat.h b/db/dbformat.h index 158d9c5dd..81c852ac4 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -96,6 +96,8 @@ static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1); static const SequenceNumber kDisableGlobalSequenceNumber = port::kMaxUint64; +constexpr uint64_t kNumInternalBytes = 8; + // The data structure that represents an internal key in the way that user_key, // sequence number and type are stored in separated forms. struct ParsedInternalKey { @@ -121,7 +123,7 @@ struct ParsedInternalKey { // Return the length of the encoding of "key". inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) { - return key.user_key.size() + 8; + return key.user_key.size() + kNumInternalBytes; } // Pack a sequence number and a ValueType into a uint64_t @@ -168,14 +170,15 @@ extern Status ParseInternalKey(const Slice& internal_key, // Returns the user key portion of an internal key. inline Slice ExtractUserKey(const Slice& internal_key) { - assert(internal_key.size() >= 8); - return Slice(internal_key.data(), internal_key.size() - 8); + assert(internal_key.size() >= kNumInternalBytes); + return Slice(internal_key.data(), internal_key.size() - kNumInternalBytes); } inline Slice ExtractUserKeyAndStripTimestamp(const Slice& internal_key, size_t ts_sz) { - assert(internal_key.size() >= 8 + ts_sz); - return Slice(internal_key.data(), internal_key.size() - 8 - ts_sz); + assert(internal_key.size() >= kNumInternalBytes + ts_sz); + return Slice(internal_key.data(), + internal_key.size() - kNumInternalBytes - ts_sz); } inline Slice StripTimestampFromUserKey(const Slice& user_key, size_t ts_sz) { @@ -189,9 +192,9 @@ inline Slice ExtractTimestampFromUserKey(const Slice& user_key, size_t ts_sz) { } inline uint64_t ExtractInternalKeyFooter(const Slice& internal_key) { - assert(internal_key.size() >= 8); + assert(internal_key.size() >= kNumInternalBytes); const size_t n = internal_key.size(); - return DecodeFixed64(internal_key.data() + n - 8); + return DecodeFixed64(internal_key.data() + n - kNumInternalBytes); } inline ValueType ExtractValueType(const Slice& internal_key) { @@ -327,13 +330,15 @@ inline int InternalKeyComparator::Compare(const InternalKey& a, inline Status ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result) { const size_t n = internal_key.size(); - if (n < 8) return Status::Corruption("Internal Key too small"); - uint64_t num = DecodeFixed64(internal_key.data() + n - 8); + if (n < kNumInternalBytes) { + return Status::Corruption("Internal Key too small"); + } + uint64_t num = DecodeFixed64(internal_key.data() + n - kNumInternalBytes); unsigned char c = num & 0xff; result->sequence = num >> 8; result->type = static_cast(c); assert(result->type <= ValueType::kMaxValue); - result->user_key = Slice(internal_key.data(), n - 8); + result->user_key = Slice(internal_key.data(), n - kNumInternalBytes); return IsExtendedValueType(result->type) ? Status::OK() : Status::Corruption("Invalid Key Type"); @@ -343,19 +348,19 @@ inline Status ParseInternalKey(const Slice& internal_key, // Guarantees not to invalidate ikey.data(). inline void UpdateInternalKey(std::string* ikey, uint64_t seq, ValueType t) { size_t ikey_sz = ikey->size(); - assert(ikey_sz >= 8); + assert(ikey_sz >= kNumInternalBytes); uint64_t newval = (seq << 8) | t; // Note: Since C++11, strings are guaranteed to be stored contiguously and // string::operator[]() is guaranteed not to change ikey.data(). - EncodeFixed64(&(*ikey)[ikey_sz - 8], newval); + EncodeFixed64(&(*ikey)[ikey_sz - kNumInternalBytes], newval); } // Get the sequence number from the internal key inline uint64_t GetInternalKeySeqno(const Slice& internal_key) { const size_t n = internal_key.size(); - assert(n >= 8); - uint64_t num = DecodeFixed64(internal_key.data() + n - 8); + assert(n >= kNumInternalBytes); + uint64_t num = DecodeFixed64(internal_key.data() + n - kNumInternalBytes); return num >> 8; } @@ -394,8 +399,8 @@ class IterKey { if (IsUserKey()) { return Slice(key_, key_size_); } else { - assert(key_size_ >= 8); - return Slice(key_, key_size_ - 8); + assert(key_size_ >= kNumInternalBytes); + return Slice(key_, key_size_ - kNumInternalBytes); } } @@ -453,9 +458,9 @@ class IterKey { // and returns a Slice referencing the new copy. Slice SetInternalKey(const Slice& key, ParsedInternalKey* ikey) { size_t key_n = key.size(); - assert(key_n >= 8); + assert(key_n >= kNumInternalBytes); SetInternalKey(key); - ikey->user_key = Slice(key_, key_n - 8); + ikey->user_key = Slice(key_, key_n - kNumInternalBytes); return Slice(key_, key_n); } @@ -472,9 +477,9 @@ class IterKey { // invalidate slices to the key (and the user key). void UpdateInternalKey(uint64_t seq, ValueType t) { assert(!IsKeyPinned()); - assert(key_size_ >= 8); + assert(key_size_ >= kNumInternalBytes); uint64_t newval = (seq << 8) | t; - EncodeFixed64(&buf_[key_size_ - 8], newval); + EncodeFixed64(&buf_[key_size_ - kNumInternalBytes], newval); } bool IsKeyPinned() const { return (key_ != buf_); } @@ -679,8 +684,10 @@ inline int InternalKeyComparator::Compare(const Slice& akey, // decreasing type (though sequence# should be enough to disambiguate) int r = user_comparator_.Compare(ExtractUserKey(akey), ExtractUserKey(bkey)); if (r == 0) { - const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8); - const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8); + const uint64_t anum = + DecodeFixed64(akey.data() + akey.size() - kNumInternalBytes); + const uint64_t bnum = + DecodeFixed64(bkey.data() + bkey.size() - kNumInternalBytes); if (anum > bnum) { r = -1; } else if (anum < bnum) { @@ -698,8 +705,10 @@ inline int InternalKeyComparator::CompareKeySeq(const Slice& akey, int r = user_comparator_.Compare(ExtractUserKey(akey), ExtractUserKey(bkey)); if (r == 0) { // Shift the number to exclude the last byte which contains the value type - const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8) >> 8; - const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8) >> 8; + const uint64_t anum = + DecodeFixed64(akey.data() + akey.size() - kNumInternalBytes) >> 8; + const uint64_t bnum = + DecodeFixed64(bkey.data() + bkey.size() - kNumInternalBytes) >> 8; if (anum > bnum) { r = -1; } else if (anum < bnum) { diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 70abdca75..618594b2d 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -170,14 +170,14 @@ TEST_F(FlushJobTest, NonEmpty) { new_mem->Add(SequenceNumber(i), kTypeValue, key, value); if ((i + 1000) % 10000 < 9995) { InternalKey internal_key(key, SequenceNumber(i), kTypeValue); - inserted_keys.insert({internal_key.Encode().ToString(), value}); + inserted_keys.push_back({internal_key.Encode().ToString(), value}); } } { new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", "9999a"); InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion); - inserted_keys.insert({internal_key.Encode().ToString(), "9999a"}); + inserted_keys.push_back({internal_key.Encode().ToString(), "9999a"}); } // Note: the first two blob references will not be considered when resolving @@ -205,9 +205,9 @@ TEST_F(FlushJobTest, NonEmpty) { new_mem->Add(seq, kTypeBlobIndex, key, blob_index); InternalKey internal_key(key, seq, kTypeBlobIndex); - inserted_keys.emplace_hint(inserted_keys.end(), - internal_key.Encode().ToString(), blob_index); + inserted_keys.push_back({internal_key.Encode().ToString(), blob_index}); } + mock::SortKVVector(&inserted_keys); autovector to_delete; cfd->imm()->Add(new_mem, &to_delete); @@ -454,10 +454,11 @@ TEST_F(FlushJobTest, Snapshots) { (snapshots_set.find(seqno) != snapshots_set.end()); if (visible) { InternalKey internal_key(key, seqno, kTypeValue); - inserted_keys.insert({internal_key.Encode().ToString(), value}); + inserted_keys.push_back({internal_key.Encode().ToString(), value}); } } } + mock::SortKVVector(&inserted_keys); autovector to_delete; cfd->imm()->Add(new_mem, &to_delete); diff --git a/db/memtable.cc b/db/memtable.cc index 6aa7faea5..53be973e7 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -376,6 +376,7 @@ class MemTableIterator : public InternalIterator { PERF_COUNTER_ADD(next_on_memtable_count, 1); assert(Valid()); iter_->Next(); + TEST_SYNC_POINT_CALLBACK("MemTableIterator::Next:0", iter_); valid_ = iter_->Valid(); } bool NextAndGetResult(IterateResult* result) override { diff --git a/db/output_validator.cc b/db/output_validator.cc new file mode 100644 index 000000000..56b8fe59e --- /dev/null +++ b/db/output_validator.cc @@ -0,0 +1,30 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include "db/output_validator.h" + +namespace ROCKSDB_NAMESPACE { +Status OutputValidator::Add(const Slice& key, const Slice& value) { + if (enable_hash_) { + // Generate a rolling 64-bit hash of the key and values + paranoid_hash_ = Hash64(key.data(), key.size(), paranoid_hash_); + paranoid_hash_ = Hash64(value.data(), value.size(), paranoid_hash_); + } + if (enable_order_check_) { + TEST_SYNC_POINT_CALLBACK("OutputValidator::Add:order_check", + /*arg=*/nullptr); + if (key.size() < kNumInternalBytes) { + return Status::Corruption( + "Compaction tries to write a key without internal bytes."); + } + // prev_key_ starts with empty. + if (!prev_key_.empty() && icmp_.Compare(key, prev_key_) < 0) { + return Status::Corruption("Compaction sees out-of-order keys."); + } + prev_key_.assign(key.data(), key.size()); + } + return Status::OK(); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/db/output_validator.h b/db/output_validator.h new file mode 100644 index 000000000..167b25e06 --- /dev/null +++ b/db/output_validator.h @@ -0,0 +1,45 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once +#include "db/dbformat.h" +#include "rocksdb/status.h" +#include "test_util/sync_point.h" +#include "util/hash.h" + +namespace ROCKSDB_NAMESPACE { +// A class that validates key/value that is inserted to an SST file. +// Pass every key/value of the file using OutputValidator::Add() +// and the class validates key order and optionally calculate a hash +// of all the key and value. +class OutputValidator { + public: + explicit OutputValidator(const InternalKeyComparator& icmp, + bool enable_order_check, bool enable_hash) + : icmp_(icmp), + enable_order_check_(enable_order_check), + enable_hash_(enable_hash) {} + + // Add a key to the KV sequence, and return whether the key follows + // criteria, e.g. key is ordered. + Status Add(const Slice& key, const Slice& value); + + // Compare result of two key orders are the same. It can be used + // to compare the keys inserted into a file, and what is read back. + // Return true if the validation passes. + bool CompareValidator(const OutputValidator& other_validator) { + return GetHash() == other_validator.GetHash(); + } + + private: + uint64_t GetHash() const { return paranoid_hash_; } + + const InternalKeyComparator& icmp_; + std::string prev_key_; + uint64_t paranoid_hash_ = 0; + bool enable_order_check_; + bool enable_hash_; +}; +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index caacce3f0..c76c60416 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -642,6 +642,14 @@ struct AdvancedColumnFamilyOptions { // Default: false bool optimize_filters_for_hits = false; + // During flush or compaction, check whether keys inserted to output files + // are in order. + // + // Default: true + // + // Dynamically changeable through SetOptions() API + bool check_flush_compaction_key_order = true; + // After writing every SST file, reopen it and read all the keys. // Checks the hash of all of the keys and values written versus the // keys in the file and signals a corruption if they do not match diff --git a/options/cf_options.cc b/options/cf_options.cc index 76eca44df..fb56f2388 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -223,6 +223,10 @@ static std::unordered_map {"filter_deletes", {0, OptionType::kBoolean, OptionVerificationType::kDeprecated, OptionTypeFlags::kMutable}}, + {"check_flush_compaction_key_order", + {offsetof(struct MutableCFOptions, check_flush_compaction_key_order), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, {"paranoid_file_checks", {offsetof(struct MutableCFOptions, paranoid_file_checks), OptionType::kBoolean, OptionVerificationType::kNormal, @@ -974,6 +978,8 @@ void MutableCFOptions::Dump(Logger* log) const { result.c_str()); ROCKS_LOG_INFO(log, " max_sequential_skip_in_iterations: %" PRIu64, max_sequential_skip_in_iterations); + ROCKS_LOG_INFO(log, " check_flush_compaction_key_order: %d", + check_flush_compaction_key_order); ROCKS_LOG_INFO(log, " paranoid_file_checks: %d", paranoid_file_checks); ROCKS_LOG_INFO(log, " report_bg_io_stats: %d", diff --git a/options/cf_options.h b/options/cf_options.h index 8d5b649e8..ca086b5c8 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -165,6 +165,8 @@ struct MutableCFOptions { blob_compression_type(options.blob_compression_type), max_sequential_skip_in_iterations( options.max_sequential_skip_in_iterations), + check_flush_compaction_key_order( + options.check_flush_compaction_key_order), paranoid_file_checks(options.paranoid_file_checks), report_bg_io_stats(options.report_bg_io_stats), compression(options.compression), @@ -205,6 +207,7 @@ struct MutableCFOptions { blob_file_size(0), blob_compression_type(kNoCompression), max_sequential_skip_in_iterations(0), + check_flush_compaction_key_order(true), paranoid_file_checks(false), report_bg_io_stats(false), compression(Snappy_Supported() ? kSnappyCompression : kNoCompression), @@ -267,6 +270,7 @@ struct MutableCFOptions { // Misc options uint64_t max_sequential_skip_in_iterations; + bool check_flush_compaction_key_order; bool paranoid_file_checks; bool report_bg_io_stats; CompressionType compression; diff --git a/options/options_helper.cc b/options/options_helper.cc index 70b980b22..5240c4b94 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -230,6 +230,8 @@ ColumnFamilyOptions BuildColumnFamilyOptions( // Misc options cf_opts.max_sequential_skip_in_iterations = mutable_cf_options.max_sequential_skip_in_iterations; + cf_opts.check_flush_compaction_key_order = + mutable_cf_options.check_flush_compaction_key_order; cf_opts.paranoid_file_checks = mutable_cf_options.paranoid_file_checks; cf_opts.report_bg_io_stats = mutable_cf_options.report_bg_io_stats; cf_opts.compression = mutable_cf_options.compression; diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index bf0958610..28e25b576 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -479,6 +479,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "memtable_prefix_bloom_size_ratio=0.4642;" "memtable_whole_key_filtering=true;" "memtable_insert_with_hint_prefix_extractor=rocksdb.CappedPrefix.13;" + "check_flush_compaction_key_order=false;" "paranoid_file_checks=true;" "force_consistency_checks=true;" "inplace_update_num_locks=7429;" diff --git a/src.mk b/src.mk index 037fa0c17..344915b37 100644 --- a/src.mk +++ b/src.mk @@ -56,6 +56,7 @@ LIB_SOURCES = \ db/memtable_list.cc \ db/merge_helper.cc \ db/merge_operator.cc \ + db/output_validator.cc \ db/range_del_aggregator.cc \ db/range_tombstone_fragmenter.cc \ db/repair.cc \ diff --git a/table/mock_table.cc b/table/mock_table.cc index fc867fa97..757fdb963 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -16,20 +16,19 @@ namespace ROCKSDB_NAMESPACE { namespace mock { -namespace { - -const InternalKeyComparator icmp_(BytewiseComparator()); - -} // namespace - -stl_wrappers::KVMap MakeMockFile( - std::initializer_list> l) { - return stl_wrappers::KVMap(l, stl_wrappers::LessOfComparator(&icmp_)); +KVVector MakeMockFile(std::initializer_list l) { return KVVector(l); } + +void SortKVVector(KVVector* kv_vector) { + InternalKeyComparator icmp(BytewiseComparator()); + std::sort(kv_vector->begin(), kv_vector->end(), + [icmp](KVPair a, KVPair b) -> bool { + return icmp.Compare(a.first, b.first) < 0; + }); } class MockTableReader : public TableReader { public: - explicit MockTableReader(const stl_wrappers::KVMap& table) : table_(table) {} + explicit MockTableReader(const KVVector& table) : table_(table) {} InternalIterator* NewIterator(const ReadOptions&, const SliceTransform* prefix_extractor, @@ -61,12 +60,12 @@ class MockTableReader : public TableReader { ~MockTableReader() {} private: - const stl_wrappers::KVMap& table_; + const KVVector& table_; }; class MockTableIterator : public InternalIterator { public: - explicit MockTableIterator(const stl_wrappers::KVMap& table) : table_(table) { + explicit MockTableIterator(const KVVector& table) : table_(table) { itr_ = table_.end(); } @@ -80,13 +79,21 @@ class MockTableIterator : public InternalIterator { } void Seek(const Slice& target) override { - std::string str_target(target.data(), target.size()); - itr_ = table_.lower_bound(str_target); + KVPair target_pair(target.ToString(), ""); + InternalKeyComparator icmp(BytewiseComparator()); + itr_ = std::lower_bound(table_.begin(), table_.end(), target_pair, + [icmp](KVPair a, KVPair b) -> bool { + return icmp.Compare(a.first, b.first) < 0; + }); } void SeekForPrev(const Slice& target) override { - std::string str_target(target.data(), target.size()); - itr_ = table_.upper_bound(str_target); + KVPair target_pair(target.ToString(), ""); + InternalKeyComparator icmp(BytewiseComparator()); + itr_ = std::upper_bound(table_.begin(), table_.end(), target_pair, + [icmp](KVPair a, KVPair b) -> bool { + return icmp.Compare(a.first, b.first) < 0; + }); Prev(); } @@ -107,8 +114,8 @@ class MockTableIterator : public InternalIterator { Status status() const override { return Status::OK(); } private: - const stl_wrappers::KVMap& table_; - stl_wrappers::KVMap::const_iterator itr_; + const KVVector& table_; + KVVector::const_iterator itr_; }; class MockTableBuilder : public TableBuilder { @@ -129,13 +136,22 @@ class MockTableBuilder : public TableBuilder { void Add(const Slice& key, const Slice& value) override { if (corrupt_mode_ == MockTableFactory::kCorruptValue) { // Corrupt the value - table_.insert({key.ToString(), value.ToString() + " "}); + table_.push_back({key.ToString(), value.ToString() + " "}); corrupt_mode_ = MockTableFactory::kCorruptNone; } else if (corrupt_mode_ == MockTableFactory::kCorruptKey) { - table_.insert({key.ToString() + " ", value.ToString()}); + table_.push_back({key.ToString() + " ", value.ToString()}); corrupt_mode_ = MockTableFactory::kCorruptNone; + } else if (corrupt_mode_ == MockTableFactory::kCorruptReorderKey) { + if (prev_key_.empty()) { + prev_key_ = key.ToString(); + prev_value_ = value.ToString(); + } else { + table_.push_back({key.ToString(), value.ToString()}); + table_.push_back({prev_key_, prev_value_}); + corrupt_mode_ = MockTableFactory::kCorruptNone; + } } else { - table_.insert({key.ToString(), value.ToString()}); + table_.push_back({key.ToString(), value.ToString()}); } } @@ -170,9 +186,11 @@ class MockTableBuilder : public TableBuilder { private: uint32_t id_; + std::string prev_key_; + std::string prev_value_; MockTableFileSystem* file_system_; int corrupt_mode_; - stl_wrappers::KVMap table_; + KVVector table_; }; InternalIterator* MockTableReader::NewIterator( @@ -238,7 +256,7 @@ TableBuilder* MockTableFactory::NewTableBuilder( } Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname, - stl_wrappers::KVMap file_contents) { + KVVector file_contents) { std::unique_ptr file; auto s = env->NewWritableFile(fname, &file, EnvOptions()); if (!s.ok()) { @@ -269,14 +287,12 @@ uint32_t MockTableFactory::GetIDFromFile(RandomAccessFileReader* file) const { return DecodeFixed32(buf); } -void MockTableFactory::AssertSingleFile( - const stl_wrappers::KVMap& file_contents) { +void MockTableFactory::AssertSingleFile(const KVVector& file_contents) { ASSERT_EQ(file_system_.files.size(), 1U); ASSERT_EQ(file_contents, file_system_.files.begin()->second); } -void MockTableFactory::AssertLatestFile( - const stl_wrappers::KVMap& file_contents) { +void MockTableFactory::AssertLatestFile(const KVVector& file_contents) { ASSERT_GE(file_system_.files.size(), 1U); auto latest = file_system_.files.end(); --latest; diff --git a/table/mock_table.h b/table/mock_table.h index 6f8232240..0ab9674d6 100644 --- a/table/mock_table.h +++ b/table/mock_table.h @@ -27,13 +27,15 @@ namespace ROCKSDB_NAMESPACE { namespace mock { +using KVPair = std::pair; +using KVVector = std::vector; -stl_wrappers::KVMap MakeMockFile( - std::initializer_list> l = {}); +KVVector MakeMockFile(std::initializer_list l = {}); +void SortKVVector(KVVector* kv_vector); struct MockTableFileSystem { port::Mutex mutex; - std::map files; + std::map files; }; class MockTableFactory : public TableFactory { @@ -42,6 +44,7 @@ class MockTableFactory : public TableFactory { kCorruptNone, kCorruptKey, kCorruptValue, + kCorruptReorderKey, }; MockTableFactory(); @@ -60,7 +63,7 @@ class MockTableFactory : public TableFactory { // MockTableBuilder. file_contents has to have a format of . Those key-value pairs will then be inserted into the mock table. Status CreateMockTable(Env* env, const std::string& fname, - stl_wrappers::KVMap file_contents); + KVVector file_contents); virtual std::string GetPrintableOptions() const override { return std::string(); @@ -69,8 +72,8 @@ class MockTableFactory : public TableFactory { void SetCorruptionMode(MockCorruptionMode mode) { corrupt_mode_ = mode; } // This function will assert that only a single file exists and that the // contents are equal to file_contents - void AssertSingleFile(const stl_wrappers::KVMap& file_contents); - void AssertLatestFile(const stl_wrappers::KVMap& file_contents); + void AssertSingleFile(const KVVector& file_contents); + void AssertLatestFile(const KVVector& file_contents); private: uint32_t GetAndWriteNextID(WritableFileWriter* file) const;