From 4848bd0c4e98713bf5ae72a36057e188c53206f8 Mon Sep 17 00:00:00 2001 From: Shrikanth Shankar <11669147+shrikanthshankar@users.noreply.github.com> Date: Fri, 24 Aug 2018 14:57:37 -0700 Subject: [PATCH] Drop unnecessary deletion markers during compaction (issue - 3842) (#4289) Summary: This PR fixes issue 3842. We drop deletion markers iff 1. We are the bottom most level AND 2. All other occurrences of the key are in the same snapshot range as the delete I've also enhanced db_stress_test to add an option that does a full compare of the keys. This is done by a single thread (thread # 0). For tests I've run (so far) make check -j64 db_stress db_stress --acquire_snapshot_one_in=1000 --ops_per_thread=100000 /* to verify that new code doesnt break existing tests */ ./db_stress --compare_full_db_state_snapshot=true --acquire_snapshot_one_in=1000 --ops_per_thread=100000 /* to verify new test code */ Pull Request resolved: https://github.com/facebook/rocksdb/pull/4289 Differential Revision: D9491165 Pulled By: shrikanthshankar fbshipit-source-id: ce144834f31736c189aaca81bed356ba990331e2 --- db/compaction_iterator.cc | 25 +++++++++++++++++ db/compaction_iterator_test.cc | 25 ++++++++++++++--- tools/db_stress.cc | 51 +++++++++++++++++++++++++++++++++- 3 files changed, 96 insertions(+), 5 deletions(-) diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 5f1a5c3da..ac5b70d8d 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -505,6 +505,31 @@ void CompactionIterator::NextFromInput() { ++iter_stats_.num_optimized_del_drop_obsolete; } input_->Next(); + } else if ((ikey_.type == kTypeDeletion) && bottommost_level_ && + ikeyNotNeededForIncrementalSnapshot()) { + // Handle the case where we have a delete key at the bottom most level + // We can skip outputting the key iff there are no subsequent puts for this + // key + ParsedInternalKey next_ikey; + input_->Next(); + // Skip over all versions of this key that happen to occur in the same snapshot + // range as the delete + while (input_->Valid() && + ParseInternalKey(input_->key(), &next_ikey) && + cmp_->Equal(ikey_.user_key, next_ikey.user_key) && + (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot || + (snapshot_checker_ != nullptr && + UNLIKELY(!snapshot_checker_->IsInSnapshot(next_ikey.sequence, + prev_snapshot))))) { + input_->Next(); + } + // If you find you still need to output a row with this key, we need to output the + // delete too + if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) && + cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { + valid_ = true; + at_next_ = true; + } } else if (ikey_.type == kTypeMerge) { if (!merge_helper_->HasOperator()) { status_ = Status::InvalidArgument( diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index 75eba90b6..03c5a9c62 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -671,8 +671,12 @@ TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) { TEST_P(CompactionIteratorTest, RemoveDeletionAtBottomLevel) { AddSnapshot(1); RunTest({test::KeyStr("a", 1, kTypeDeletion), - test::KeyStr("b", 2, kTypeDeletion)}, - {"", ""}, {test::KeyStr("b", 2, kTypeDeletion)}, {""}, + test::KeyStr("b", 3, kTypeDeletion), + test::KeyStr("b", 1, kTypeValue)}, + {"", "", ""}, + {test::KeyStr("b", 3, kTypeDeletion), + test::KeyStr("b", 0, kTypeValue)}, + {"", ""}, kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/, nullptr /*compaction_filter*/, true /*bottommost_level*/); } @@ -841,13 +845,26 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest, {test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 2, kTypeDeletion), test::KeyStr("c", 3, kTypeDeletion)}, {"", "", ""}, - {test::KeyStr("b", 2, kTypeDeletion), - test::KeyStr("c", 3, kTypeDeletion)}, + {}, {"", ""}, kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/, nullptr /*compaction_filter*/, true /*bottommost_level*/); } +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + NotRemoveDeletionIfValuePresentToEarlierSnapshot) { + AddSnapshot(2,1); + RunTest( + {test::KeyStr("a", 4, kTypeDeletion), test::KeyStr("a", 1, kTypeValue), + test::KeyStr("b", 3, kTypeValue)}, + {"", "", ""}, + {test::KeyStr("a", 4, kTypeDeletion), test::KeyStr("a", 0, kTypeValue), + test::KeyStr("b", 3, kTypeValue)}, + {"", "", ""}, kMaxSequenceNumber /*last_commited_seq*/, + nullptr /*merge_operator*/, nullptr /*compaction_filter*/, + true /*bottommost_level*/); +} + TEST_F(CompactionIteratorWithSnapshotCheckerTest, NotRemoveSingleDeletionIfNotVisibleToEarliestSnapshot) { AddSnapshot(2, 1); diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 0eae94491..dfd8d7f41 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -419,6 +419,10 @@ DEFINE_int32(acquire_snapshot_one_in, 0, "If non-zero, then acquires a snapshot once every N operations on " "average."); +DEFINE_bool(compare_full_db_state_snapshot, false, + "If set we compare state of entire db (in one of the threads) with" + "each snapshot."); + DEFINE_uint64(snapshot_hold_ops, 0, "If non-zero, then releases snapshots N operations after they're " "acquired."); @@ -651,6 +655,18 @@ static std::string Key(int64_t val) { return big_endian_key; } +static bool GetIntVal(std::string big_endian_key, uint64_t *key_p) { + unsigned int size_key = sizeof(*key_p); + assert(big_endian_key.size() == size_key); + std::string little_endian_key; + little_endian_key.resize(size_key); + for (size_t i = 0 ; i < size_key; ++i) { + little_endian_key[i] = big_endian_key[size_key - 1 - i]; + } + Slice little_endian_slice = Slice(little_endian_key); + return GetFixed64(&little_endian_slice, key_p); +} + static std::string StringToHex(const std::string& str) { std::string result = "0x"; result.append(Slice(str).ToString(true)); @@ -1207,6 +1223,8 @@ struct ThreadState { Status status; // The value of the Get std::string value; + // optional state of all keys in the db + std::vector *key_vec; }; std::queue > snapshot_queue; @@ -1713,6 +1731,21 @@ class StressTest { ")"); } } + if (snap_state.key_vec != nullptr) { + std::unique_ptr iterator(db->NewIterator(ropt)); + std::unique_ptr> tmp_bitvec(new std::vector(FLAGS_max_key)); + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + uint64_t key_val; + if (GetIntVal(iterator->key().ToString(), &key_val)) { + (*tmp_bitvec.get())[key_val] = true; + } + } + if (!std::equal(snap_state.key_vec->begin(), + snap_state.key_vec->end(), + tmp_bitvec.get()->begin())) { + return Status::Corruption("Found inconsistent keys at this snapshot"); + } + } return Status::OK(); } @@ -1796,6 +1829,7 @@ class StressTest { while (!thread->snapshot_queue.empty()) { db_->ReleaseSnapshot( thread->snapshot_queue.front().second.snapshot); + delete thread->snapshot_queue.front().second.key_vec; thread->snapshot_queue.pop(); } thread->shared->IncVotedReopen(); @@ -1970,9 +2004,23 @@ class StressTest { // will later read the same key before releasing the snapshot and verify // that the results are the same. auto status_at = db_->Get(ropt, column_family, key, &value_at); + std::vector *key_vec = nullptr; + + if (FLAGS_compare_full_db_state_snapshot && + (thread->tid == 0)) { + key_vec = new std::vector(FLAGS_max_key); + std::unique_ptr iterator(db_->NewIterator(ropt)); + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + uint64_t key_val; + if (GetIntVal(iterator->key().ToString(), &key_val)) { + (*key_vec)[key_val] = true; + } + } + } + ThreadState::SnapshotState snap_state = { snapshot, rand_column_family, column_family->GetName(), - keystr, status_at, value_at}; + keystr, status_at, value_at, key_vec}; thread->snapshot_queue.emplace( std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops), snap_state); @@ -1990,6 +2038,7 @@ class StressTest { VerificationAbort(shared, "Snapshot gave inconsistent state", s); } db_->ReleaseSnapshot(snap_state.snapshot); + delete snap_state.key_vec; thread->snapshot_queue.pop(); }