diff --git a/db/db_memtable_test.cc b/db/db_memtable_test.cc index 63d274f6a..f55bd6bc9 100644 --- a/db/db_memtable_test.cc +++ b/db/db_memtable_test.cc @@ -28,16 +28,17 @@ class MockMemTableRep : public MemTableRep { return rep_->Allocate(len, buf); } - virtual void Insert(KeyHandle handle) override { + virtual bool Insert(KeyHandle handle) override { return rep_->Insert(handle); } - virtual void InsertWithHint(KeyHandle handle, void** hint) override { + virtual bool InsertWithHint(KeyHandle handle, void** hint) override { num_insert_with_hint_++; - ASSERT_NE(nullptr, hint); + EXPECT_NE(nullptr, hint); last_hint_in_ = *hint; - rep_->InsertWithHint(handle, hint); + bool res = rep_->InsertWithHint(handle, hint); last_hint_out_ = *hint; + return res; } virtual bool Contains(const char* key) const override { @@ -129,6 +130,84 @@ class TestPrefixExtractor : public SliceTransform { } }; +// Test that ::Add properly returns false when inserting duplicate keys +TEST_F(DBMemTableTest, DuplicateSeq) { + SequenceNumber seq = 123; + std::string value; + Status s; + MergeContext merge_context; + Options options; + InternalKeyComparator ikey_cmp(options.comparator); + RangeDelAggregator range_del_agg(ikey_cmp, {} /* snapshots */); + + // Create a MemTable + InternalKeyComparator cmp(BytewiseComparator()); + auto factory = std::make_shared(); + options.memtable_factory = factory; + ImmutableCFOptions ioptions(options); + WriteBufferManager wb(options.db_write_buffer_size); + MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, + kMaxSequenceNumber, 0 /* column_family_id */); + + // Write some keys and make sure it returns false on duplicates + bool res; + res = mem->Add(seq, kTypeValue, "key", "value2"); + ASSERT_TRUE(res); + res = mem->Add(seq, kTypeValue, "key", "value2"); + ASSERT_FALSE(res); + // Changing the type should still cause the duplicatae key + res = mem->Add(seq, kTypeMerge, "key", "value2"); + ASSERT_FALSE(res); + // Changing the seq number will make the key fresh + res = mem->Add(seq + 1, kTypeMerge, "key", "value2"); + ASSERT_TRUE(res); + // Test with different types for duplicate keys + res = mem->Add(seq, kTypeDeletion, "key", ""); + ASSERT_FALSE(res); + res = mem->Add(seq, kTypeSingleDeletion, "key", ""); + ASSERT_FALSE(res); + + // Test the duplicate keys under stress + for (int i = 0; i < 10000; i++) { + bool insert_dup = i % 10 == 1; + if (!insert_dup) { + seq++; + } + res = mem->Add(seq, kTypeValue, "foo", "value" + ToString(seq)); + if (insert_dup) { + ASSERT_FALSE(res); + } else { + ASSERT_TRUE(res); + } + } + delete mem; + + // Test with InsertWithHint + options.memtable_insert_with_hint_prefix_extractor.reset( + new TestPrefixExtractor()); // which uses _ to extract the prefix + ioptions = ImmutableCFOptions(options); + mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, + kMaxSequenceNumber, 0 /* column_family_id */); + // Insert a duplicate key with _ in it + res = mem->Add(seq, kTypeValue, "key_1", "value"); + ASSERT_TRUE(res); + res = mem->Add(seq, kTypeValue, "key_1", "value"); + ASSERT_FALSE(res); + delete mem; + + // Test when InsertConcurrently will be invoked + options.allow_concurrent_memtable_write = true; + ioptions = ImmutableCFOptions(options); + mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, + kMaxSequenceNumber, 0 /* column_family_id */); + MemTablePostProcessInfo post_process_info; + res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info); + ASSERT_TRUE(res); + res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info); + ASSERT_FALSE(res); + delete mem; +} + TEST_F(DBMemTableTest, InsertWithHint) { Options options; options.allow_concurrent_memtable_write = false; diff --git a/db/db_test.cc b/db/db_test.cc index 35edb4c45..61ddd7d58 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5114,10 +5114,10 @@ TEST_F(DBTest, AutomaticConflictsWithManualCompaction) { ASSERT_OK(Flush()); } std::thread manual_compaction_thread([this]() { - CompactRangeOptions croptions; - croptions.exclusive_manual_compaction = true; - ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr)); - }); + CompactRangeOptions croptions; + croptions.exclusive_manual_compaction = true; + ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr)); + }); TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PrePuts"); for (int i = 0; i < kNumL0Files; ++i) { diff --git a/db/db_test_util.h b/db/db_test_util.h index 9b79b6759..95f74190c 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -136,9 +136,9 @@ class SpecialMemTableRep : public MemTableRep { // Insert key into the list. // REQUIRES: nothing that compares equal to key is currently in the list. - virtual void Insert(KeyHandle handle) override { - memtable_->Insert(handle); + virtual bool Insert(KeyHandle handle) override { num_entries_++; + return memtable_->Insert(handle); } // Returns true iff an entry that compares equal to key is in the list. diff --git a/db/dbformat.cc b/db/dbformat.cc index e21612878..9357c3278 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -125,6 +125,26 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { return r; } +int InternalKeyComparator::CompareKeySeq(const Slice& akey, + const Slice& bkey) const { + // Order by: + // increasing user key (according to user-supplied comparator) + // decreasing sequence number + int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey)); + PERF_COUNTER_ADD(user_key_comparison_count, 1); + if (r == 0) { + // Shift the number to exclude the last byte which contains the value type + const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8) >> 8; + const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8) >> 8; + if (anum > bnum) { + r = -1; + } else if (anum < bnum) { + r = +1; + } + } + return r; +} + int InternalKeyComparator::Compare(const ParsedInternalKey& a, const ParsedInternalKey& b) const { // Order by: diff --git a/db/dbformat.h b/db/dbformat.h index 884a72d37..52e668d1d 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -162,6 +162,8 @@ class InternalKeyComparator virtual const char* Name() const override; virtual int Compare(const Slice& a, const Slice& b) const override; + // Same as Compare except that it excludes the value type from comparison + virtual int CompareKeySeq(const Slice& a, const Slice& b) const; virtual void FindShortestSeparator(std::string* start, const Slice& limit) const override; virtual void FindShortSuccessor(std::string* key) const override; diff --git a/db/memtable.cc b/db/memtable.cc index 66bf55920..0d0e230cd 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -225,7 +225,7 @@ int MemTable::KeyComparator::operator()(const char* prefix_len_key1, // Internal keys are encoded as length-prefixed strings. Slice k1 = GetLengthPrefixedSlice(prefix_len_key1); Slice k2 = GetLengthPrefixedSlice(prefix_len_key2); - return comparator.Compare(k1, k2); + return comparator.CompareKeySeq(k1, k2); } int MemTable::KeyComparator::operator()(const char* prefix_len_key, @@ -233,16 +233,16 @@ int MemTable::KeyComparator::operator()(const char* prefix_len_key, const { // Internal keys are encoded as length-prefixed strings. Slice a = GetLengthPrefixedSlice(prefix_len_key); - return comparator.Compare(a, key); + return comparator.CompareKeySeq(a, key); } -void MemTableRep::InsertConcurrently(KeyHandle handle) { +bool MemTableRep::InsertConcurrently(KeyHandle handle) { #ifndef ROCKSDB_LITE - throw std::runtime_error("concurrent insert not supported"); + throw std::runtime_error("concurrent insert not supported"); #else - abort(); + abort(); #endif - } +} Slice MemTableRep::UserKey(const char* key) const { Slice slice = GetLengthPrefixedSlice(key); @@ -444,7 +444,7 @@ MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey, return {entry_count * (data_size / n), entry_count}; } -void MemTable::Add(SequenceNumber s, ValueType type, +bool MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, /* user key */ const Slice& value, bool allow_concurrent, MemTablePostProcessInfo* post_process_info) { @@ -479,9 +479,15 @@ void MemTable::Add(SequenceNumber s, ValueType type, if (insert_with_hint_prefix_extractor_ != nullptr && insert_with_hint_prefix_extractor_->InDomain(key_slice)) { Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice); - table->InsertWithHint(handle, &insert_hints_[prefix]); + bool res = table->InsertWithHint(handle, &insert_hints_[prefix]); + if (!res) { + return res; + } } else { - table->Insert(handle); + bool res = table->Insert(handle); + if (!res) { + return res; + } } // this is a bit ugly, but is the way to avoid locked instructions @@ -514,7 +520,10 @@ void MemTable::Add(SequenceNumber s, ValueType type, assert(post_process_info == nullptr); UpdateFlushState(); } else { - table->InsertConcurrently(handle); + bool res = table->InsertConcurrently(handle); + if (!res) { + return res; + } assert(post_process_info != nullptr); post_process_info->num_entries++; @@ -544,6 +553,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, is_range_del_table_empty_ = false; } UpdateOldestKeyTime(); + return true; } // Callback from MemTable::Get() @@ -799,8 +809,9 @@ void MemTable::Update(SequenceNumber seq, // Correct user key const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); ValueType type; - SequenceNumber unused; - UnPackSequenceAndType(tag, &unused, &type); + SequenceNumber existing_seq; + UnPackSequenceAndType(tag, &existing_seq, &type); + assert(existing_seq != seq); if (type == kTypeValue) { Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); uint32_t prev_size = static_cast(prev_value.size()); @@ -823,7 +834,9 @@ void MemTable::Update(SequenceNumber seq, } // key doesn't exist - Add(seq, kTypeValue, key, value); + bool add_res __attribute__((__unused__)) = Add(seq, kTypeValue, key, value); + // We already checked unused != seq above. In that case, Add should not fail. + assert(add_res); } bool MemTable::UpdateCallback(SequenceNumber seq, diff --git a/db/memtable.h b/db/memtable.h index bfafbeacc..7a04eaf77 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -167,7 +167,10 @@ class MemTable { // // REQUIRES: if allow_concurrent = false, external synchronization to prevent // simultaneous operations on the same MemTable. - void Add(SequenceNumber seq, ValueType type, const Slice& key, + // + // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and + // the already exists. + bool Add(SequenceNumber seq, ValueType type, const Slice& key, const Slice& value, bool allow_concurrent = false, MemTablePostProcessInfo* post_process_info = nullptr); diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 1385a83cb..e7d675d1a 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -81,7 +81,10 @@ class MemTableRep { // single buffer and pass that in as the parameter to Insert). // REQUIRES: nothing that compares equal to key is currently in the // collection, and no concurrent modifications to the table in progress - virtual void Insert(KeyHandle handle) = 0; + // + // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and + // the already exists. + virtual bool Insert(KeyHandle handle) = 0; // Same as Insert(), but in additional pass a hint to insert location for // the key. If hint points to nullptr, a new hint will be populated. @@ -89,14 +92,20 @@ class MemTableRep { // // Currently only skip-list based memtable implement the interface. Other // implementations will fallback to Insert() by default. - virtual void InsertWithHint(KeyHandle handle, void** hint) { + // + // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and + // the already exists. + virtual bool InsertWithHint(KeyHandle handle, void** hint) { // Ignore the hint by default. - Insert(handle); + return Insert(handle); } // Like Insert(handle), but may be called concurrent with other calls - // to InsertConcurrently for other handles - virtual void InsertConcurrently(KeyHandle handle); + // to InsertConcurrently for other handles. + // + // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and + // the already exists. + virtual bool InsertConcurrently(KeyHandle handle); // Returns true iff an entry that compares equal to key is in the collection. virtual bool Contains(const char* key) const = 0; @@ -226,6 +235,12 @@ class MemTableRepFactory { // Return true if the current MemTableRep supports concurrent inserts // Default: false virtual bool IsInsertConcurrentlySupported() const { return false; } + + // Return true if the current MemTableRep supports detecting duplicate + // at insertion time. If true, then MemTableRep::Insert* returns + // false when if the already exists. + // Default: false + virtual bool CanHandleDuplicatedKey() const { return false; } }; // This uses a skip list to store keys. It is the default. @@ -247,6 +262,8 @@ class SkipListFactory : public MemTableRepFactory { bool IsInsertConcurrentlySupported() const override { return true; } + bool CanHandleDuplicatedKey() const override { return true; } + private: const size_t lookahead_; }; diff --git a/memtable/hash_cuckoo_rep.cc b/memtable/hash_cuckoo_rep.cc index 034bf5858..bfc586bb6 100644 --- a/memtable/hash_cuckoo_rep.cc +++ b/memtable/hash_cuckoo_rep.cc @@ -97,7 +97,7 @@ class HashCuckooRep : public MemTableRep { // Insert the specified key (internal_key) into the mem-table. Assertion // fails if // the current mem-table already contains the specified key. - virtual void Insert(KeyHandle handle) override; + virtual bool Insert(KeyHandle handle) override; // This function returns bucket_count_ * approximate_entry_size_ when any // of the followings happen to disallow further write operations: @@ -319,7 +319,7 @@ void HashCuckooRep::Get(const LookupKey& key, void* callback_args, } } -void HashCuckooRep::Insert(KeyHandle handle) { +bool HashCuckooRep::Insert(KeyHandle handle) { static const float kMaxFullness = 0.90f; auto* key = static_cast(handle); @@ -340,7 +340,7 @@ void HashCuckooRep::Insert(KeyHandle handle) { is_nearly_full_ = true; } backup_table_->Insert(key); - return; + return true; } // when reaching this point, means the insert can be done successfully. occupied_count_++; @@ -349,7 +349,7 @@ void HashCuckooRep::Insert(KeyHandle handle) { } // perform kickout process if the length of cuckoo path > 1. - if (cuckoo_path_length == 0) return; + if (cuckoo_path_length == 0) return true; // the cuckoo path stores the kickout path in reverse order. // so the kickout or displacement is actually performed @@ -366,6 +366,7 @@ void HashCuckooRep::Insert(KeyHandle handle) { } int insert_key_bid = cuckoo_path_[cuckoo_path_length - 1]; cuckoo_array_[insert_key_bid].store(key, std::memory_order_release); + return true; } bool HashCuckooRep::Contains(const char* internal_key) const { diff --git a/memtable/hash_linklist_rep.cc b/memtable/hash_linklist_rep.cc index 932b62a34..d0b98e993 100644 --- a/memtable/hash_linklist_rep.cc +++ b/memtable/hash_linklist_rep.cc @@ -170,7 +170,7 @@ class HashLinkListRep : public MemTableRep { virtual KeyHandle Allocate(const size_t len, char** buf) override; - virtual void Insert(KeyHandle handle) override; + virtual bool Insert(KeyHandle handle) override; virtual bool Contains(const char* key) const override; @@ -571,7 +571,7 @@ Node* HashLinkListRep::GetLinkListFirstNode(Pointer* first_next_pointer) const { return nullptr; } -void HashLinkListRep::Insert(KeyHandle handle) { +bool HashLinkListRep::Insert(KeyHandle handle) { Node* x = static_cast(handle); assert(!Contains(x->key)); Slice internal_key = GetLengthPrefixedSlice(x->key); @@ -586,7 +586,7 @@ void HashLinkListRep::Insert(KeyHandle handle) { // we publish a pointer to "x" in prev[i]. x->NoBarrier_SetNext(nullptr); bucket.store(x, std::memory_order_release); - return; + return true; } BucketHeader* header = nullptr; @@ -613,7 +613,7 @@ void HashLinkListRep::Insert(KeyHandle handle) { // incremental. skip_list_bucket_header->Counting_header.IncNumEntries(); skip_list_bucket_header->skip_list.Insert(x->key); - return; + return true; } } @@ -691,6 +691,7 @@ void HashLinkListRep::Insert(KeyHandle handle) { header->next.store(static_cast(x), std::memory_order_release); } } + return true; } bool HashLinkListRep::Contains(const char* key) const { diff --git a/memtable/hash_skiplist_rep.cc b/memtable/hash_skiplist_rep.cc index e34743eb2..01eb96c45 100644 --- a/memtable/hash_skiplist_rep.cc +++ b/memtable/hash_skiplist_rep.cc @@ -28,7 +28,7 @@ class HashSkipListRep : public MemTableRep { size_t bucket_size, int32_t skiplist_height, int32_t skiplist_branching_factor); - virtual void Insert(KeyHandle handle) override; + virtual bool Insert(KeyHandle handle) override; virtual bool Contains(const char* key) const override; @@ -267,12 +267,13 @@ HashSkipListRep::Bucket* HashSkipListRep::GetInitializedBucket( return bucket; } -void HashSkipListRep::Insert(KeyHandle handle) { +bool HashSkipListRep::Insert(KeyHandle handle) { auto* key = static_cast(handle); assert(!Contains(key)); auto transformed = transform_->Transform(UserKey(key)); auto bucket = GetInitializedBucket(transformed); bucket->Insert(key); + return true; } bool HashSkipListRep::Contains(const char* key) const { diff --git a/memtable/inlineskiplist.h b/memtable/inlineskiplist.h index 6ab79e668..702a7336d 100644 --- a/memtable/inlineskiplist.h +++ b/memtable/inlineskiplist.h @@ -45,6 +45,7 @@ #include #include #include +#include "port/likely.h" #include "port/port.h" #include "util/allocator.h" #include "util/random.h" @@ -81,7 +82,7 @@ class InlineSkipList { // // REQUIRES: nothing that compares equal to key is currently in the list. // REQUIRES: no concurrent calls to any of inserts. - void Insert(const char* key); + bool Insert(const char* key); // Inserts a key allocated by AllocateKey with a hint of last insert // position in the skip-list. If hint points to nullptr, a new hint will be @@ -93,10 +94,10 @@ class InlineSkipList { // // REQUIRES: nothing that compares equal to key is currently in the list. // REQUIRES: no concurrent calls to any of inserts. - void InsertWithHint(const char* key, void** hint); + bool InsertWithHint(const char* key, void** hint); // Like Insert, but external synchronization is not required. - void InsertConcurrently(const char* key); + bool InsertConcurrently(const char* key); // Inserts a node into the skip list. key must have been allocated by // AllocateKey and then filled in by the caller. If UseCAS is true, @@ -114,7 +115,7 @@ class InlineSkipList { // false has worse running time for the non-sequential case O(log N), // but a better constant factor. template - void Insert(const char* key, Splice* splice, bool allow_partial_splice_fix); + bool Insert(const char* key, Splice* splice, bool allow_partial_splice_fix); // Returns true iff an entry that compares equal to key is in the list. bool Contains(const char* key) const; @@ -626,29 +627,29 @@ InlineSkipList::AllocateSplice() { } template -void InlineSkipList::Insert(const char* key) { - Insert(key, seq_splice_, false); +bool InlineSkipList::Insert(const char* key) { + return Insert(key, seq_splice_, false); } template -void InlineSkipList::InsertConcurrently(const char* key) { +bool InlineSkipList::InsertConcurrently(const char* key) { Node* prev[kMaxPossibleHeight]; Node* next[kMaxPossibleHeight]; Splice splice; splice.prev_ = prev; splice.next_ = next; - Insert(key, &splice, false); + return Insert(key, &splice, false); } template -void InlineSkipList::InsertWithHint(const char* key, void** hint) { +bool InlineSkipList::InsertWithHint(const char* key, void** hint) { assert(hint != nullptr); Splice* splice = reinterpret_cast(*hint); if (splice == nullptr) { splice = AllocateSplice(); *hint = reinterpret_cast(splice); } - Insert(key, splice, true); + return Insert(key, splice, true); } template @@ -694,7 +695,7 @@ void InlineSkipList::RecomputeSpliceLevels(const char* key, template template -void InlineSkipList::Insert(const char* key, Splice* splice, +bool InlineSkipList::Insert(const char* key, Splice* splice, bool allow_partial_splice_fix) { Node* x = reinterpret_cast(const_cast(key)) - 1; int height = x->UnstashHeight(); @@ -801,6 +802,17 @@ void InlineSkipList::Insert(const char* key, Splice* splice, if (UseCAS) { for (int i = 0; i < height; ++i) { while (true) { + // Checking for duplicate keys on the level 0 is sufficient + if (UNLIKELY(i == 0 && splice->next_[i] != nullptr && + compare_(x->Key(), splice->next_[i]->Key()) >= 0)) { + // duplicate key + return false; + } + if (UNLIKELY(i == 0 && splice->prev_[i] != head_ && + compare_(splice->prev_[i]->Key(), x->Key()) >= 0)) { + // duplicate key + return false; + } assert(splice->next_[i] == nullptr || compare_(x->Key(), splice->next_[i]->Key()) < 0); assert(splice->prev_[i] == head_ || @@ -833,6 +845,17 @@ void InlineSkipList::Insert(const char* key, Splice* splice, FindSpliceForLevel(key, splice->prev_[i], nullptr, i, &splice->prev_[i], &splice->next_[i]); } + // Checking for duplicate keys on the level 0 is sufficient + if (UNLIKELY(i == 0 && splice->next_[i] != nullptr && + compare_(x->Key(), splice->next_[i]->Key()) >= 0)) { + // duplicate key + return false; + } + if (UNLIKELY(i == 0 && splice->prev_[i] != head_ && + compare_(splice->prev_[i]->Key(), x->Key()) >= 0)) { + // duplicate key + return false; + } assert(splice->next_[i] == nullptr || compare_(x->Key(), splice->next_[i]->Key()) < 0); assert(splice->prev_[i] == head_ || @@ -865,6 +888,7 @@ void InlineSkipList::Insert(const char* key, Splice* splice, } else { splice->height_ = 0; } + return true; } template diff --git a/memtable/inlineskiplist_test.cc b/memtable/inlineskiplist_test.cc index 5803e5b0f..70fd11a76 100644 --- a/memtable/inlineskiplist_test.cc +++ b/memtable/inlineskiplist_test.cc @@ -54,11 +54,12 @@ class InlineSkipTest : public testing::Test { keys_.insert(key); } - void InsertWithHint(TestInlineSkipList* list, Key key, void** hint) { + bool InsertWithHint(TestInlineSkipList* list, Key key, void** hint) { char* buf = list->AllocateKey(sizeof(Key)); memcpy(buf, &key, sizeof(Key)); - list->InsertWithHint(buf, hint); + bool res = list->InsertWithHint(buf, hint); keys_.insert(key); + return res; } void Validate(TestInlineSkipList* list) { diff --git a/memtable/skiplistrep.cc b/memtable/skiplistrep.cc index f56be5dcb..a21d5be17 100644 --- a/memtable/skiplistrep.cc +++ b/memtable/skiplistrep.cc @@ -34,16 +34,16 @@ public: // Insert key into the list. // REQUIRES: nothing that compares equal to key is currently in the list. - virtual void Insert(KeyHandle handle) override { - skip_list_.Insert(static_cast(handle)); + virtual bool Insert(KeyHandle handle) override { + return skip_list_.Insert(static_cast(handle)); } - virtual void InsertWithHint(KeyHandle handle, void** hint) override { - skip_list_.InsertWithHint(static_cast(handle), hint); + virtual bool InsertWithHint(KeyHandle handle, void** hint) override { + return skip_list_.InsertWithHint(static_cast(handle), hint); } - virtual void InsertConcurrently(KeyHandle handle) override { - skip_list_.InsertConcurrently(static_cast(handle)); + virtual bool InsertConcurrently(KeyHandle handle) override { + return skip_list_.InsertConcurrently(static_cast(handle)); } // Returns true iff an entry that compares equal to key is in the list. diff --git a/memtable/vectorrep.cc b/memtable/vectorrep.cc index e54025c2d..9b6383ffe 100644 --- a/memtable/vectorrep.cc +++ b/memtable/vectorrep.cc @@ -31,7 +31,7 @@ class VectorRep : public MemTableRep { // single buffer and pass that in as the parameter to Insert) // REQUIRES: nothing that compares equal to key is currently in the // collection. - virtual void Insert(KeyHandle handle) override; + virtual bool Insert(KeyHandle handle) override; // Returns true iff an entry that compares equal to key is in the collection. virtual bool Contains(const char* key) const override; @@ -108,11 +108,12 @@ class VectorRep : public MemTableRep { const KeyComparator& compare_; }; -void VectorRep::Insert(KeyHandle handle) { +bool VectorRep::Insert(KeyHandle handle) { auto* key = static_cast(handle); WriteLock l(&rwlock_); assert(!immutable_); bucket_->push_back(key); + return true; } // Returns true iff an entry that compares equal to key is in the collection.