From 813719e9525f647aaebf19ca3d4bb6f1c63e2648 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Wed, 31 Jan 2018 18:45:49 -0800 Subject: [PATCH] WritePrepared Txn: Duplicate Keys, Memtable part Summary: Currently DB does not accept duplicate keys (keys with the same user key and the same sequence number). If Memtable returns false when receiving such keys, we can benefit from this signal to properly increase the sequence number in the rare cases when we have a duplicate key in the write batch written to DB under WritePrepared transactions. Closes https://github.com/facebook/rocksdb/pull/3418 Differential Revision: D6822412 Pulled By: maysamyabandeh fbshipit-source-id: adea3ce5073131cd38ed52b16bea0673b1a19e77 --- db/db_memtable_test.cc | 87 +++++++++++++++++++++++++++++++-- db/db_test.cc | 8 +-- db/db_test_util.h | 4 +- db/dbformat.cc | 20 ++++++++ db/dbformat.h | 2 + db/memtable.cc | 39 ++++++++++----- db/memtable.h | 5 +- include/rocksdb/memtablerep.h | 27 ++++++++-- memtable/hash_cuckoo_rep.cc | 9 ++-- memtable/hash_linklist_rep.cc | 9 ++-- memtable/hash_skiplist_rep.cc | 5 +- memtable/inlineskiplist.h | 46 ++++++++++++----- memtable/inlineskiplist_test.cc | 5 +- memtable/skiplistrep.cc | 12 ++--- memtable/vectorrep.cc | 5 +- 15 files changed, 223 insertions(+), 60 deletions(-) diff --git a/db/db_memtable_test.cc b/db/db_memtable_test.cc index 63d274f6a..f55bd6bc9 100644 --- a/db/db_memtable_test.cc +++ b/db/db_memtable_test.cc @@ -28,16 +28,17 @@ class MockMemTableRep : public MemTableRep { return rep_->Allocate(len, buf); } - virtual void Insert(KeyHandle handle) override { + virtual bool Insert(KeyHandle handle) override { return rep_->Insert(handle); } - virtual void InsertWithHint(KeyHandle handle, void** hint) override { + virtual bool InsertWithHint(KeyHandle handle, void** hint) override { num_insert_with_hint_++; - ASSERT_NE(nullptr, hint); + EXPECT_NE(nullptr, hint); last_hint_in_ = *hint; - rep_->InsertWithHint(handle, hint); + bool res = rep_->InsertWithHint(handle, hint); last_hint_out_ = *hint; + return res; } virtual bool Contains(const char* key) const override { @@ -129,6 +130,84 @@ class TestPrefixExtractor : public SliceTransform { } }; +// Test that ::Add properly returns false when inserting duplicate keys +TEST_F(DBMemTableTest, DuplicateSeq) { + SequenceNumber seq = 123; + std::string value; + Status s; + MergeContext merge_context; + Options options; + InternalKeyComparator ikey_cmp(options.comparator); + RangeDelAggregator range_del_agg(ikey_cmp, {} /* snapshots */); + + // Create a MemTable + InternalKeyComparator cmp(BytewiseComparator()); + auto factory = std::make_shared(); + options.memtable_factory = factory; + ImmutableCFOptions ioptions(options); + WriteBufferManager wb(options.db_write_buffer_size); + MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, + kMaxSequenceNumber, 0 /* column_family_id */); + + // Write some keys and make sure it returns false on duplicates + bool res; + res = mem->Add(seq, kTypeValue, "key", "value2"); + ASSERT_TRUE(res); + res = mem->Add(seq, kTypeValue, "key", "value2"); + ASSERT_FALSE(res); + // Changing the type should still cause the duplicatae key + res = mem->Add(seq, kTypeMerge, "key", "value2"); + ASSERT_FALSE(res); + // Changing the seq number will make the key fresh + res = mem->Add(seq + 1, kTypeMerge, "key", "value2"); + ASSERT_TRUE(res); + // Test with different types for duplicate keys + res = mem->Add(seq, kTypeDeletion, "key", ""); + ASSERT_FALSE(res); + res = mem->Add(seq, kTypeSingleDeletion, "key", ""); + ASSERT_FALSE(res); + + // Test the duplicate keys under stress + for (int i = 0; i < 10000; i++) { + bool insert_dup = i % 10 == 1; + if (!insert_dup) { + seq++; + } + res = mem->Add(seq, kTypeValue, "foo", "value" + ToString(seq)); + if (insert_dup) { + ASSERT_FALSE(res); + } else { + ASSERT_TRUE(res); + } + } + delete mem; + + // Test with InsertWithHint + options.memtable_insert_with_hint_prefix_extractor.reset( + new TestPrefixExtractor()); // which uses _ to extract the prefix + ioptions = ImmutableCFOptions(options); + mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, + kMaxSequenceNumber, 0 /* column_family_id */); + // Insert a duplicate key with _ in it + res = mem->Add(seq, kTypeValue, "key_1", "value"); + ASSERT_TRUE(res); + res = mem->Add(seq, kTypeValue, "key_1", "value"); + ASSERT_FALSE(res); + delete mem; + + // Test when InsertConcurrently will be invoked + options.allow_concurrent_memtable_write = true; + ioptions = ImmutableCFOptions(options); + mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, + kMaxSequenceNumber, 0 /* column_family_id */); + MemTablePostProcessInfo post_process_info; + res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info); + ASSERT_TRUE(res); + res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info); + ASSERT_FALSE(res); + delete mem; +} + TEST_F(DBMemTableTest, InsertWithHint) { Options options; options.allow_concurrent_memtable_write = false; diff --git a/db/db_test.cc b/db/db_test.cc index 35edb4c45..61ddd7d58 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5114,10 +5114,10 @@ TEST_F(DBTest, AutomaticConflictsWithManualCompaction) { ASSERT_OK(Flush()); } std::thread manual_compaction_thread([this]() { - CompactRangeOptions croptions; - croptions.exclusive_manual_compaction = true; - ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr)); - }); + CompactRangeOptions croptions; + croptions.exclusive_manual_compaction = true; + ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr)); + }); TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PrePuts"); for (int i = 0; i < kNumL0Files; ++i) { diff --git a/db/db_test_util.h b/db/db_test_util.h index 9b79b6759..95f74190c 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -136,9 +136,9 @@ class SpecialMemTableRep : public MemTableRep { // Insert key into the list. // REQUIRES: nothing that compares equal to key is currently in the list. - virtual void Insert(KeyHandle handle) override { - memtable_->Insert(handle); + virtual bool Insert(KeyHandle handle) override { num_entries_++; + return memtable_->Insert(handle); } // Returns true iff an entry that compares equal to key is in the list. diff --git a/db/dbformat.cc b/db/dbformat.cc index e21612878..9357c3278 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -125,6 +125,26 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { return r; } +int InternalKeyComparator::CompareKeySeq(const Slice& akey, + const Slice& bkey) const { + // Order by: + // increasing user key (according to user-supplied comparator) + // decreasing sequence number + int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey)); + PERF_COUNTER_ADD(user_key_comparison_count, 1); + if (r == 0) { + // Shift the number to exclude the last byte which contains the value type + const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8) >> 8; + const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8) >> 8; + if (anum > bnum) { + r = -1; + } else if (anum < bnum) { + r = +1; + } + } + return r; +} + int InternalKeyComparator::Compare(const ParsedInternalKey& a, const ParsedInternalKey& b) const { // Order by: diff --git a/db/dbformat.h b/db/dbformat.h index 884a72d37..52e668d1d 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -162,6 +162,8 @@ class InternalKeyComparator virtual const char* Name() const override; virtual int Compare(const Slice& a, const Slice& b) const override; + // Same as Compare except that it excludes the value type from comparison + virtual int CompareKeySeq(const Slice& a, const Slice& b) const; virtual void FindShortestSeparator(std::string* start, const Slice& limit) const override; virtual void FindShortSuccessor(std::string* key) const override; diff --git a/db/memtable.cc b/db/memtable.cc index 66bf55920..0d0e230cd 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -225,7 +225,7 @@ int MemTable::KeyComparator::operator()(const char* prefix_len_key1, // Internal keys are encoded as length-prefixed strings. Slice k1 = GetLengthPrefixedSlice(prefix_len_key1); Slice k2 = GetLengthPrefixedSlice(prefix_len_key2); - return comparator.Compare(k1, k2); + return comparator.CompareKeySeq(k1, k2); } int MemTable::KeyComparator::operator()(const char* prefix_len_key, @@ -233,16 +233,16 @@ int MemTable::KeyComparator::operator()(const char* prefix_len_key, const { // Internal keys are encoded as length-prefixed strings. Slice a = GetLengthPrefixedSlice(prefix_len_key); - return comparator.Compare(a, key); + return comparator.CompareKeySeq(a, key); } -void MemTableRep::InsertConcurrently(KeyHandle handle) { +bool MemTableRep::InsertConcurrently(KeyHandle handle) { #ifndef ROCKSDB_LITE - throw std::runtime_error("concurrent insert not supported"); + throw std::runtime_error("concurrent insert not supported"); #else - abort(); + abort(); #endif - } +} Slice MemTableRep::UserKey(const char* key) const { Slice slice = GetLengthPrefixedSlice(key); @@ -444,7 +444,7 @@ MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey, return {entry_count * (data_size / n), entry_count}; } -void MemTable::Add(SequenceNumber s, ValueType type, +bool MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, /* user key */ const Slice& value, bool allow_concurrent, MemTablePostProcessInfo* post_process_info) { @@ -479,9 +479,15 @@ void MemTable::Add(SequenceNumber s, ValueType type, if (insert_with_hint_prefix_extractor_ != nullptr && insert_with_hint_prefix_extractor_->InDomain(key_slice)) { Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice); - table->InsertWithHint(handle, &insert_hints_[prefix]); + bool res = table->InsertWithHint(handle, &insert_hints_[prefix]); + if (!res) { + return res; + } } else { - table->Insert(handle); + bool res = table->Insert(handle); + if (!res) { + return res; + } } // this is a bit ugly, but is the way to avoid locked instructions @@ -514,7 +520,10 @@ void MemTable::Add(SequenceNumber s, ValueType type, assert(post_process_info == nullptr); UpdateFlushState(); } else { - table->InsertConcurrently(handle); + bool res = table->InsertConcurrently(handle); + if (!res) { + return res; + } assert(post_process_info != nullptr); post_process_info->num_entries++; @@ -544,6 +553,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, is_range_del_table_empty_ = false; } UpdateOldestKeyTime(); + return true; } // Callback from MemTable::Get() @@ -799,8 +809,9 @@ void MemTable::Update(SequenceNumber seq, // Correct user key const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); ValueType type; - SequenceNumber unused; - UnPackSequenceAndType(tag, &unused, &type); + SequenceNumber existing_seq; + UnPackSequenceAndType(tag, &existing_seq, &type); + assert(existing_seq != seq); if (type == kTypeValue) { Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); uint32_t prev_size = static_cast(prev_value.size()); @@ -823,7 +834,9 @@ void MemTable::Update(SequenceNumber seq, } // key doesn't exist - Add(seq, kTypeValue, key, value); + bool add_res __attribute__((__unused__)) = Add(seq, kTypeValue, key, value); + // We already checked unused != seq above. In that case, Add should not fail. + assert(add_res); } bool MemTable::UpdateCallback(SequenceNumber seq, diff --git a/db/memtable.h b/db/memtable.h index bfafbeacc..7a04eaf77 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -167,7 +167,10 @@ class MemTable { // // REQUIRES: if allow_concurrent = false, external synchronization to prevent // simultaneous operations on the same MemTable. - void Add(SequenceNumber seq, ValueType type, const Slice& key, + // + // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and + // the already exists. + bool Add(SequenceNumber seq, ValueType type, const Slice& key, const Slice& value, bool allow_concurrent = false, MemTablePostProcessInfo* post_process_info = nullptr); diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 1385a83cb..e7d675d1a 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -81,7 +81,10 @@ class MemTableRep { // single buffer and pass that in as the parameter to Insert). // REQUIRES: nothing that compares equal to key is currently in the // collection, and no concurrent modifications to the table in progress - virtual void Insert(KeyHandle handle) = 0; + // + // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and + // the already exists. + virtual bool Insert(KeyHandle handle) = 0; // Same as Insert(), but in additional pass a hint to insert location for // the key. If hint points to nullptr, a new hint will be populated. @@ -89,14 +92,20 @@ class MemTableRep { // // Currently only skip-list based memtable implement the interface. Other // implementations will fallback to Insert() by default. - virtual void InsertWithHint(KeyHandle handle, void** hint) { + // + // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and + // the already exists. + virtual bool InsertWithHint(KeyHandle handle, void** hint) { // Ignore the hint by default. - Insert(handle); + return Insert(handle); } // Like Insert(handle), but may be called concurrent with other calls - // to InsertConcurrently for other handles - virtual void InsertConcurrently(KeyHandle handle); + // to InsertConcurrently for other handles. + // + // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and + // the already exists. + virtual bool InsertConcurrently(KeyHandle handle); // Returns true iff an entry that compares equal to key is in the collection. virtual bool Contains(const char* key) const = 0; @@ -226,6 +235,12 @@ class MemTableRepFactory { // Return true if the current MemTableRep supports concurrent inserts // Default: false virtual bool IsInsertConcurrentlySupported() const { return false; } + + // Return true if the current MemTableRep supports detecting duplicate + // at insertion time. If true, then MemTableRep::Insert* returns + // false when if the already exists. + // Default: false + virtual bool CanHandleDuplicatedKey() const { return false; } }; // This uses a skip list to store keys. It is the default. @@ -247,6 +262,8 @@ class SkipListFactory : public MemTableRepFactory { bool IsInsertConcurrentlySupported() const override { return true; } + bool CanHandleDuplicatedKey() const override { return true; } + private: const size_t lookahead_; }; diff --git a/memtable/hash_cuckoo_rep.cc b/memtable/hash_cuckoo_rep.cc index 034bf5858..bfc586bb6 100644 --- a/memtable/hash_cuckoo_rep.cc +++ b/memtable/hash_cuckoo_rep.cc @@ -97,7 +97,7 @@ class HashCuckooRep : public MemTableRep { // Insert the specified key (internal_key) into the mem-table. Assertion // fails if // the current mem-table already contains the specified key. - virtual void Insert(KeyHandle handle) override; + virtual bool Insert(KeyHandle handle) override; // This function returns bucket_count_ * approximate_entry_size_ when any // of the followings happen to disallow further write operations: @@ -319,7 +319,7 @@ void HashCuckooRep::Get(const LookupKey& key, void* callback_args, } } -void HashCuckooRep::Insert(KeyHandle handle) { +bool HashCuckooRep::Insert(KeyHandle handle) { static const float kMaxFullness = 0.90f; auto* key = static_cast(handle); @@ -340,7 +340,7 @@ void HashCuckooRep::Insert(KeyHandle handle) { is_nearly_full_ = true; } backup_table_->Insert(key); - return; + return true; } // when reaching this point, means the insert can be done successfully. occupied_count_++; @@ -349,7 +349,7 @@ void HashCuckooRep::Insert(KeyHandle handle) { } // perform kickout process if the length of cuckoo path > 1. - if (cuckoo_path_length == 0) return; + if (cuckoo_path_length == 0) return true; // the cuckoo path stores the kickout path in reverse order. // so the kickout or displacement is actually performed @@ -366,6 +366,7 @@ void HashCuckooRep::Insert(KeyHandle handle) { } int insert_key_bid = cuckoo_path_[cuckoo_path_length - 1]; cuckoo_array_[insert_key_bid].store(key, std::memory_order_release); + return true; } bool HashCuckooRep::Contains(const char* internal_key) const { diff --git a/memtable/hash_linklist_rep.cc b/memtable/hash_linklist_rep.cc index 932b62a34..d0b98e993 100644 --- a/memtable/hash_linklist_rep.cc +++ b/memtable/hash_linklist_rep.cc @@ -170,7 +170,7 @@ class HashLinkListRep : public MemTableRep { virtual KeyHandle Allocate(const size_t len, char** buf) override; - virtual void Insert(KeyHandle handle) override; + virtual bool Insert(KeyHandle handle) override; virtual bool Contains(const char* key) const override; @@ -571,7 +571,7 @@ Node* HashLinkListRep::GetLinkListFirstNode(Pointer* first_next_pointer) const { return nullptr; } -void HashLinkListRep::Insert(KeyHandle handle) { +bool HashLinkListRep::Insert(KeyHandle handle) { Node* x = static_cast(handle); assert(!Contains(x->key)); Slice internal_key = GetLengthPrefixedSlice(x->key); @@ -586,7 +586,7 @@ void HashLinkListRep::Insert(KeyHandle handle) { // we publish a pointer to "x" in prev[i]. x->NoBarrier_SetNext(nullptr); bucket.store(x, std::memory_order_release); - return; + return true; } BucketHeader* header = nullptr; @@ -613,7 +613,7 @@ void HashLinkListRep::Insert(KeyHandle handle) { // incremental. skip_list_bucket_header->Counting_header.IncNumEntries(); skip_list_bucket_header->skip_list.Insert(x->key); - return; + return true; } } @@ -691,6 +691,7 @@ void HashLinkListRep::Insert(KeyHandle handle) { header->next.store(static_cast(x), std::memory_order_release); } } + return true; } bool HashLinkListRep::Contains(const char* key) const { diff --git a/memtable/hash_skiplist_rep.cc b/memtable/hash_skiplist_rep.cc index e34743eb2..01eb96c45 100644 --- a/memtable/hash_skiplist_rep.cc +++ b/memtable/hash_skiplist_rep.cc @@ -28,7 +28,7 @@ class HashSkipListRep : public MemTableRep { size_t bucket_size, int32_t skiplist_height, int32_t skiplist_branching_factor); - virtual void Insert(KeyHandle handle) override; + virtual bool Insert(KeyHandle handle) override; virtual bool Contains(const char* key) const override; @@ -267,12 +267,13 @@ HashSkipListRep::Bucket* HashSkipListRep::GetInitializedBucket( return bucket; } -void HashSkipListRep::Insert(KeyHandle handle) { +bool HashSkipListRep::Insert(KeyHandle handle) { auto* key = static_cast(handle); assert(!Contains(key)); auto transformed = transform_->Transform(UserKey(key)); auto bucket = GetInitializedBucket(transformed); bucket->Insert(key); + return true; } bool HashSkipListRep::Contains(const char* key) const { diff --git a/memtable/inlineskiplist.h b/memtable/inlineskiplist.h index 6ab79e668..702a7336d 100644 --- a/memtable/inlineskiplist.h +++ b/memtable/inlineskiplist.h @@ -45,6 +45,7 @@ #include #include #include +#include "port/likely.h" #include "port/port.h" #include "util/allocator.h" #include "util/random.h" @@ -81,7 +82,7 @@ class InlineSkipList { // // REQUIRES: nothing that compares equal to key is currently in the list. // REQUIRES: no concurrent calls to any of inserts. - void Insert(const char* key); + bool Insert(const char* key); // Inserts a key allocated by AllocateKey with a hint of last insert // position in the skip-list. If hint points to nullptr, a new hint will be @@ -93,10 +94,10 @@ class InlineSkipList { // // REQUIRES: nothing that compares equal to key is currently in the list. // REQUIRES: no concurrent calls to any of inserts. - void InsertWithHint(const char* key, void** hint); + bool InsertWithHint(const char* key, void** hint); // Like Insert, but external synchronization is not required. - void InsertConcurrently(const char* key); + bool InsertConcurrently(const char* key); // Inserts a node into the skip list. key must have been allocated by // AllocateKey and then filled in by the caller. If UseCAS is true, @@ -114,7 +115,7 @@ class InlineSkipList { // false has worse running time for the non-sequential case O(log N), // but a better constant factor. template - void Insert(const char* key, Splice* splice, bool allow_partial_splice_fix); + bool Insert(const char* key, Splice* splice, bool allow_partial_splice_fix); // Returns true iff an entry that compares equal to key is in the list. bool Contains(const char* key) const; @@ -626,29 +627,29 @@ InlineSkipList::AllocateSplice() { } template -void InlineSkipList::Insert(const char* key) { - Insert(key, seq_splice_, false); +bool InlineSkipList::Insert(const char* key) { + return Insert(key, seq_splice_, false); } template -void InlineSkipList::InsertConcurrently(const char* key) { +bool InlineSkipList::InsertConcurrently(const char* key) { Node* prev[kMaxPossibleHeight]; Node* next[kMaxPossibleHeight]; Splice splice; splice.prev_ = prev; splice.next_ = next; - Insert(key, &splice, false); + return Insert(key, &splice, false); } template -void InlineSkipList::InsertWithHint(const char* key, void** hint) { +bool InlineSkipList::InsertWithHint(const char* key, void** hint) { assert(hint != nullptr); Splice* splice = reinterpret_cast(*hint); if (splice == nullptr) { splice = AllocateSplice(); *hint = reinterpret_cast(splice); } - Insert(key, splice, true); + return Insert(key, splice, true); } template @@ -694,7 +695,7 @@ void InlineSkipList::RecomputeSpliceLevels(const char* key, template template -void InlineSkipList::Insert(const char* key, Splice* splice, +bool InlineSkipList::Insert(const char* key, Splice* splice, bool allow_partial_splice_fix) { Node* x = reinterpret_cast(const_cast(key)) - 1; int height = x->UnstashHeight(); @@ -801,6 +802,17 @@ void InlineSkipList::Insert(const char* key, Splice* splice, if (UseCAS) { for (int i = 0; i < height; ++i) { while (true) { + // Checking for duplicate keys on the level 0 is sufficient + if (UNLIKELY(i == 0 && splice->next_[i] != nullptr && + compare_(x->Key(), splice->next_[i]->Key()) >= 0)) { + // duplicate key + return false; + } + if (UNLIKELY(i == 0 && splice->prev_[i] != head_ && + compare_(splice->prev_[i]->Key(), x->Key()) >= 0)) { + // duplicate key + return false; + } assert(splice->next_[i] == nullptr || compare_(x->Key(), splice->next_[i]->Key()) < 0); assert(splice->prev_[i] == head_ || @@ -833,6 +845,17 @@ void InlineSkipList::Insert(const char* key, Splice* splice, FindSpliceForLevel(key, splice->prev_[i], nullptr, i, &splice->prev_[i], &splice->next_[i]); } + // Checking for duplicate keys on the level 0 is sufficient + if (UNLIKELY(i == 0 && splice->next_[i] != nullptr && + compare_(x->Key(), splice->next_[i]->Key()) >= 0)) { + // duplicate key + return false; + } + if (UNLIKELY(i == 0 && splice->prev_[i] != head_ && + compare_(splice->prev_[i]->Key(), x->Key()) >= 0)) { + // duplicate key + return false; + } assert(splice->next_[i] == nullptr || compare_(x->Key(), splice->next_[i]->Key()) < 0); assert(splice->prev_[i] == head_ || @@ -865,6 +888,7 @@ void InlineSkipList::Insert(const char* key, Splice* splice, } else { splice->height_ = 0; } + return true; } template diff --git a/memtable/inlineskiplist_test.cc b/memtable/inlineskiplist_test.cc index 5803e5b0f..70fd11a76 100644 --- a/memtable/inlineskiplist_test.cc +++ b/memtable/inlineskiplist_test.cc @@ -54,11 +54,12 @@ class InlineSkipTest : public testing::Test { keys_.insert(key); } - void InsertWithHint(TestInlineSkipList* list, Key key, void** hint) { + bool InsertWithHint(TestInlineSkipList* list, Key key, void** hint) { char* buf = list->AllocateKey(sizeof(Key)); memcpy(buf, &key, sizeof(Key)); - list->InsertWithHint(buf, hint); + bool res = list->InsertWithHint(buf, hint); keys_.insert(key); + return res; } void Validate(TestInlineSkipList* list) { diff --git a/memtable/skiplistrep.cc b/memtable/skiplistrep.cc index f56be5dcb..a21d5be17 100644 --- a/memtable/skiplistrep.cc +++ b/memtable/skiplistrep.cc @@ -34,16 +34,16 @@ public: // Insert key into the list. // REQUIRES: nothing that compares equal to key is currently in the list. - virtual void Insert(KeyHandle handle) override { - skip_list_.Insert(static_cast(handle)); + virtual bool Insert(KeyHandle handle) override { + return skip_list_.Insert(static_cast(handle)); } - virtual void InsertWithHint(KeyHandle handle, void** hint) override { - skip_list_.InsertWithHint(static_cast(handle), hint); + virtual bool InsertWithHint(KeyHandle handle, void** hint) override { + return skip_list_.InsertWithHint(static_cast(handle), hint); } - virtual void InsertConcurrently(KeyHandle handle) override { - skip_list_.InsertConcurrently(static_cast(handle)); + virtual bool InsertConcurrently(KeyHandle handle) override { + return skip_list_.InsertConcurrently(static_cast(handle)); } // Returns true iff an entry that compares equal to key is in the list. diff --git a/memtable/vectorrep.cc b/memtable/vectorrep.cc index e54025c2d..9b6383ffe 100644 --- a/memtable/vectorrep.cc +++ b/memtable/vectorrep.cc @@ -31,7 +31,7 @@ class VectorRep : public MemTableRep { // single buffer and pass that in as the parameter to Insert) // REQUIRES: nothing that compares equal to key is currently in the // collection. - virtual void Insert(KeyHandle handle) override; + virtual bool Insert(KeyHandle handle) override; // Returns true iff an entry that compares equal to key is in the collection. virtual bool Contains(const char* key) const override; @@ -108,11 +108,12 @@ class VectorRep : public MemTableRep { const KeyComparator& compare_; }; -void VectorRep::Insert(KeyHandle handle) { +bool VectorRep::Insert(KeyHandle handle) { auto* key = static_cast(handle); WriteLock l(&rwlock_); assert(!immutable_); bucket_->push_back(key); + return true; } // Returns true iff an entry that compares equal to key is in the collection.