diff --git a/db/db_memtable_test.cc b/db/db_memtable_test.cc index 8706b690b..f2c61aeb7 100644 --- a/db/db_memtable_test.cc +++ b/db/db_memtable_test.cc @@ -146,22 +146,15 @@ TEST_F(DBMemTableTest, DuplicateSeq) { kMaxSequenceNumber, 0 /* column_family_id */); // Write some keys and make sure it returns false on duplicates - bool res; - res = mem->Add(seq, kTypeValue, "key", "value2"); - ASSERT_TRUE(res); - res = mem->Add(seq, kTypeValue, "key", "value2"); - ASSERT_FALSE(res); + ASSERT_OK(mem->Add(seq, kTypeValue, "key", "value2")); + ASSERT_TRUE(mem->Add(seq, kTypeValue, "key", "value2").IsTryAgain()); // Changing the type should still cause the duplicatae key - res = mem->Add(seq, kTypeMerge, "key", "value2"); - ASSERT_FALSE(res); + ASSERT_TRUE(mem->Add(seq, kTypeMerge, "key", "value2").IsTryAgain()); // Changing the seq number will make the key fresh - res = mem->Add(seq + 1, kTypeMerge, "key", "value2"); - ASSERT_TRUE(res); + ASSERT_OK(mem->Add(seq + 1, kTypeMerge, "key", "value2")); // Test with different types for duplicate keys - res = mem->Add(seq, kTypeDeletion, "key", ""); - ASSERT_FALSE(res); - res = mem->Add(seq, kTypeSingleDeletion, "key", ""); - ASSERT_FALSE(res); + ASSERT_TRUE(mem->Add(seq, kTypeDeletion, "key", "").IsTryAgain()); + ASSERT_TRUE(mem->Add(seq, kTypeSingleDeletion, "key", "").IsTryAgain()); // Test the duplicate keys under stress for (int i = 0; i < 10000; i++) { @@ -169,11 +162,11 @@ TEST_F(DBMemTableTest, DuplicateSeq) { if (!insert_dup) { seq++; } - res = mem->Add(seq, kTypeValue, "foo", "value" + ToString(seq)); + s = mem->Add(seq, kTypeValue, "foo", "value" + ToString(seq)); if (insert_dup) { - ASSERT_FALSE(res); + ASSERT_TRUE(s.IsTryAgain()); } else { - ASSERT_TRUE(res); + ASSERT_OK(s); } } delete mem; @@ -185,10 +178,8 @@ TEST_F(DBMemTableTest, DuplicateSeq) { mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, kMaxSequenceNumber, 0 /* column_family_id */); // Insert a duplicate key with _ in it - res = mem->Add(seq, kTypeValue, "key_1", "value"); - ASSERT_TRUE(res); - res = mem->Add(seq, kTypeValue, "key_1", "value"); - ASSERT_FALSE(res); + ASSERT_OK(mem->Add(seq, kTypeValue, "key_1", "value")); + ASSERT_TRUE(mem->Add(seq, kTypeValue, "key_1", "value").IsTryAgain()); delete mem; // Test when InsertConcurrently will be invoked @@ -197,10 +188,11 @@ TEST_F(DBMemTableTest, DuplicateSeq) { mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, kMaxSequenceNumber, 0 /* column_family_id */); MemTablePostProcessInfo post_process_info; - res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info); - ASSERT_TRUE(res); - res = mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info); - ASSERT_FALSE(res); + ASSERT_OK( + mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info)); + ASSERT_TRUE( + mem->Add(seq, kTypeValue, "key", "value", true, &post_process_info) + .IsTryAgain()); delete mem; } @@ -227,8 +219,7 @@ TEST_F(DBMemTableTest, ConcurrentMergeWrite) { // Put 0 as the base PutFixed64(&value, static_cast(0)); - bool res = mem->Add(0, kTypeValue, "key", value); - ASSERT_TRUE(res); + ASSERT_OK(mem->Add(0, kTypeValue, "key", value)); value.clear(); // Write Merge concurrently @@ -237,9 +228,8 @@ TEST_F(DBMemTableTest, ConcurrentMergeWrite) { std::string v1; for (int seq = 1; seq < num_ops / 2; seq++) { PutFixed64(&v1, seq); - bool res1 = - mem->Add(seq, kTypeMerge, "key", v1, true, &post_process_info1); - ASSERT_TRUE(res1); + ASSERT_OK( + mem->Add(seq, kTypeMerge, "key", v1, true, &post_process_info1)); v1.clear(); } }); @@ -248,9 +238,8 @@ TEST_F(DBMemTableTest, ConcurrentMergeWrite) { std::string v2; for (int seq = num_ops / 2; seq < num_ops; seq++) { PutFixed64(&v2, seq); - bool res2 = - mem->Add(seq, kTypeMerge, "key", v2, true, &post_process_info2); - ASSERT_TRUE(res2); + ASSERT_OK( + mem->Add(seq, kTypeMerge, "key", v2, true, &post_process_info2)); v2.clear(); } }); @@ -261,8 +250,8 @@ TEST_F(DBMemTableTest, ConcurrentMergeWrite) { ReadOptions roptions; SequenceNumber max_covering_tombstone_seq = 0; LookupKey lkey("key", kMaxSequenceNumber); - res = mem->Get(lkey, &value, /*timestamp=*/nullptr, &status, &merge_context, - &max_covering_tombstone_seq, roptions); + bool res = mem->Get(lkey, &value, /*timestamp=*/nullptr, &status, + &merge_context, &max_covering_tombstone_seq, roptions); ASSERT_TRUE(res); uint64_t ivalue = DecodeFixed64(Slice(value).data()); uint64_t sum = 0; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 6b879a300..6ac6a2e80 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -187,7 +187,7 @@ TEST_F(FlushJobTest, NonEmpty) { for (int i = 1; i < 10000; ++i) { std::string key(ToString((i + 1000) % 10000)); std::string value("value" + key); - new_mem->Add(SequenceNumber(i), kTypeValue, key, value); + ASSERT_OK(new_mem->Add(SequenceNumber(i), kTypeValue, key, value)); if ((i + 1000) % 10000 < 9995) { InternalKey internal_key(key, SequenceNumber(i), kTypeValue); inserted_keys.push_back({internal_key.Encode().ToString(), value}); @@ -195,7 +195,8 @@ TEST_F(FlushJobTest, NonEmpty) { } { - new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", "9999a"); + ASSERT_OK(new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", + "9999a")); InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion); inserted_keys.push_back({internal_key.Encode().ToString(), "9999a"}); } @@ -222,7 +223,7 @@ TEST_F(FlushJobTest, NonEmpty) { } const SequenceNumber seq(i + 10001); - new_mem->Add(seq, kTypeBlobIndex, key, blob_index); + ASSERT_OK(new_mem->Add(seq, kTypeBlobIndex, key, blob_index)); InternalKey internal_key(key, seq, kTypeBlobIndex); inserted_keys.push_back({internal_key.Encode().ToString(), blob_index}); @@ -283,8 +284,8 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) { for (size_t j = 0; j < num_keys_per_table; ++j) { std::string key(ToString(j + i * num_keys_per_table)); std::string value("value" + key); - mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, key, - value); + ASSERT_OK(mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, + key, value)); } } @@ -356,7 +357,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { for (size_t j = 0; j != num_keys_per_memtable; ++j) { std::string key(ToString(j + i * num_keys_per_memtable)); std::string value("value" + key); - mem->Add(curr_seqno++, kTypeValue, key, value); + ASSERT_OK(mem->Add(curr_seqno++, kTypeValue, key, value)); } cfd->imm()->Add(mem, &to_delete); @@ -466,7 +467,7 @@ TEST_F(FlushJobTest, Snapshots) { for (int j = 0; j < insertions; ++j) { std::string value(rnd.HumanReadableString(10)); auto seqno = ++current_seqno; - new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value); + ASSERT_OK(new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value)); // a key is visible only if: // 1. it's the last one written (j == insertions - 1) // 2. there's a snapshot pointing at it @@ -518,7 +519,7 @@ class FlushJobTimestampTest : public FlushJobTestBase { Slice value) { std::string key_str(std::move(key)); PutFixed64(&key_str, ts); - memtable->Add(seq, value_type, key_str, value); + ASSERT_OK(memtable->Add(seq, value_type, key_str, value)); } protected: diff --git a/db/memtable.cc b/db/memtable.cc index 6a96a6fa5..cca4d45af 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -480,10 +480,10 @@ MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey, return {entry_count * (data_size / n), entry_count}; } -bool MemTable::Add(SequenceNumber s, ValueType type, - const Slice& key, /* user key */ - const Slice& value, bool allow_concurrent, - MemTablePostProcessInfo* post_process_info, void** hint) { +Status MemTable::Add(SequenceNumber s, ValueType type, + const Slice& key, /* user key */ + const Slice& value, bool allow_concurrent, + MemTablePostProcessInfo* post_process_info, void** hint) { // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] @@ -519,12 +519,12 @@ bool MemTable::Add(SequenceNumber s, ValueType type, Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice); bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]); if (UNLIKELY(!res)) { - return res; + return Status::TryAgain("key+seq exists"); } } else { bool res = table->InsertKey(handle); if (UNLIKELY(!res)) { - return res; + return Status::TryAgain("key+seq exists"); } } @@ -566,7 +566,7 @@ bool MemTable::Add(SequenceNumber s, ValueType type, ? table->InsertKeyConcurrently(handle) : table->InsertKeyWithHintConcurrently(handle, hint); if (UNLIKELY(!res)) { - return res; + return Status::TryAgain("key+seq exists"); } assert(post_process_info != nullptr); @@ -600,7 +600,7 @@ bool MemTable::Add(SequenceNumber s, ValueType type, is_range_del_table_empty_.store(false, std::memory_order_relaxed); } UpdateOldestKeyTime(); - return true; + return Status::OK(); } // Callback from MemTable::Get() @@ -973,9 +973,8 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, PERF_COUNTER_ADD(get_from_memtable_count, 1); } -void MemTable::Update(SequenceNumber seq, - const Slice& key, - const Slice& value) { +Status MemTable::Update(SequenceNumber seq, const Slice& key, + const Slice& value) { LookupKey lkey(key, seq); Slice mem_key = lkey.memtable_key(); @@ -1019,22 +1018,18 @@ void MemTable::Update(SequenceNumber seq, (unsigned)(VarintLength(key_length) + key_length + VarintLength(value.size()) + value.size())); RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); - return; + return Status::OK(); } } } } - // key doesn't exist - bool add_res __attribute__((__unused__)); - add_res = Add(seq, kTypeValue, key, value); - // We already checked unused != seq above. In that case, Add should not fail. - assert(add_res); + // The latest value is not `kTypeValue` or key doesn't exist + return Add(seq, kTypeValue, key, value); } -bool MemTable::UpdateCallback(SequenceNumber seq, - const Slice& key, - const Slice& delta) { +Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key, + const Slice& delta) { LookupKey lkey(key, seq); Slice memkey = lkey.memtable_key(); @@ -1088,16 +1083,17 @@ bool MemTable::UpdateCallback(SequenceNumber seq, } RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); UpdateFlushState(); - return true; + return Status::OK(); } else if (status == UpdateStatus::UPDATED) { - Add(seq, kTypeValue, key, Slice(str_value)); + Status s = Add(seq, kTypeValue, key, Slice(str_value)); RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN); UpdateFlushState(); - return true; + return s; } else if (status == UpdateStatus::UPDATE_FAILED) { - // No action required. Return. + // `UPDATE_FAILED` is named incorrectly. It indicates no update + // happened. It does not indicate a failure happened. UpdateFlushState(); - return true; + return Status::OK(); } } default: @@ -1105,9 +1101,8 @@ bool MemTable::UpdateCallback(SequenceNumber seq, } } } - // If the latest value is not kTypeValue - // or key doesn't exist - return false; + // The latest value is not `kTypeValue` or key doesn't exist + return Status::NotFound(); } size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) { diff --git a/db/memtable.h b/db/memtable.h index d5bd4e95a..04600262b 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -182,12 +182,13 @@ class MemTable { // REQUIRES: if allow_concurrent = false, external synchronization to prevent // simultaneous operations on the same MemTable. // - // Returns false if MemTableRepFactory::CanHandleDuplicatedKey() is true and - // the already exists. - bool Add(SequenceNumber seq, ValueType type, const Slice& key, - const Slice& value, bool allow_concurrent = false, - MemTablePostProcessInfo* post_process_info = nullptr, - void** hint = nullptr); + // Returns `Status::TryAgain` if the `seq`, `key` combination already exists + // in the memtable and `MemTableRepFactory::CanHandleDuplicatedKey()` is true. + // The next attempt should try a larger value for `seq`. + Status Add(SequenceNumber seq, ValueType type, const Slice& key, + const Slice& value, bool allow_concurrent = false, + MemTablePostProcessInfo* post_process_info = nullptr, + void** hint = nullptr); // Used to Get value associated with key or Get Merge Operands associated // with key. @@ -239,35 +240,34 @@ class MemTable { void MultiGet(const ReadOptions& read_options, MultiGetRange* range, ReadCallback* callback, bool* is_blob); - // Attempts to update the new_value inplace, else does normal Add - // Pseudocode - // if key exists in current memtable && prev_value is of type kTypeValue - // if new sizeof(new_value) <= sizeof(prev_value) - // update inplace - // else add(key, new_value) - // else add(key, new_value) + // If `key` exists in current memtable with type `kTypeValue` and the existing + // value is at least as large as the new value, updates it in-place. Otherwise + // adds the new value to the memtable out-of-place. + // + // Returns `Status::TryAgain` if the `seq`, `key` combination already exists + // in the memtable and `MemTableRepFactory::CanHandleDuplicatedKey()` is true. + // The next attempt should try a larger value for `seq`. // // REQUIRES: external synchronization to prevent simultaneous // operations on the same MemTable. - void Update(SequenceNumber seq, - const Slice& key, - const Slice& value); - - // If prev_value for key exists, attempts to update it inplace. - // else returns false - // Pseudocode - // if key exists in current memtable && prev_value is of type kTypeValue - // new_value = delta(prev_value) - // if sizeof(new_value) <= sizeof(prev_value) - // update inplace - // else add(key, new_value) - // else return false + Status Update(SequenceNumber seq, const Slice& key, const Slice& value); + + // If `key` exists in current memtable with type `kTypeValue` and the existing + // value is at least as large as the new value, updates it in-place. Otherwise + // if `key` exists in current memtable with type `kTypeValue`, adds the new + // value to the memtable out-of-place. + // + // Returns `Status::NotFound` if `key` does not exist in current memtable or + // the latest version of `key` does not have `kTypeValue`. + // + // Returns `Status::TryAgain` if the `seq`, `key` combination already exists + // in the memtable and `MemTableRepFactory::CanHandleDuplicatedKey()` is true. + // The next attempt should try a larger value for `seq`. // // REQUIRES: external synchronization to prevent simultaneous // operations on the same MemTable. - bool UpdateCallback(SequenceNumber seq, - const Slice& key, - const Slice& delta); + Status UpdateCallback(SequenceNumber seq, const Slice& key, + const Slice& delta); // Returns the number of successive merge entries starting from the newest // entry for the key up to the last non-merge entry or last entry for the diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index cc6e566ad..6f578e7c7 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -242,10 +242,10 @@ TEST_F(MemTableListTest, GetTest) { mem->Ref(); // Write some keys to this memtable. - mem->Add(++seq, kTypeDeletion, "key1", ""); - mem->Add(++seq, kTypeValue, "key2", "value2"); - mem->Add(++seq, kTypeValue, "key1", "value1"); - mem->Add(++seq, kTypeValue, "key2", "value2.2"); + ASSERT_OK(mem->Add(++seq, kTypeDeletion, "key1", "")); + ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2")); + ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", "value1")); + ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2.2")); // Fetch the newly written keys merge_context.Clear(); @@ -283,8 +283,8 @@ TEST_F(MemTableListTest, GetTest) { kMaxSequenceNumber, 0 /* column_family_id */); mem2->Ref(); - mem2->Add(++seq, kTypeDeletion, "key1", ""); - mem2->Add(++seq, kTypeValue, "key2", "value2.3"); + ASSERT_OK(mem2->Add(++seq, kTypeDeletion, "key1", "")); + ASSERT_OK(mem2->Add(++seq, kTypeValue, "key2", "value2.3")); // Add second memtable to list list.Add(mem2, &to_delete); @@ -359,9 +359,9 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { mem->Ref(); // Write some keys to this memtable. - mem->Add(++seq, kTypeDeletion, "key1", ""); - mem->Add(++seq, kTypeValue, "key2", "value2"); - mem->Add(++seq, kTypeValue, "key2", "value2.2"); + ASSERT_OK(mem->Add(++seq, kTypeDeletion, "key1", "")); + ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2")); + ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2.2")); // Fetch the newly written keys merge_context.Clear(); @@ -443,8 +443,8 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { kMaxSequenceNumber, 0 /* column_family_id */); mem2->Ref(); - mem2->Add(++seq, kTypeDeletion, "key1", ""); - mem2->Add(++seq, kTypeValue, "key3", "value3"); + ASSERT_OK(mem2->Add(++seq, kTypeDeletion, "key1", "")); + ASSERT_OK(mem2->Add(++seq, kTypeValue, "key3", "value3")); // Add second memtable to list list.Add(mem2, &to_delete); @@ -554,11 +554,11 @@ TEST_F(MemTableListTest, FlushPendingTest) { std::string value; MergeContext merge_context; - mem->Add(++seq, kTypeValue, "key1", ToString(i)); - mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN"); - mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value"); - mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM"); - mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), ""); + ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", ToString(i))); + ASSERT_OK(mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN")); + ASSERT_OK(mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value")); + ASSERT_OK(mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM")); + ASSERT_OK(mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "")); tables.push_back(mem); } @@ -823,11 +823,11 @@ TEST_F(MemTableListTest, AtomicFlusTest) { std::string value; - mem->Add(++seq, kTypeValue, "key1", ToString(i)); - mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN"); - mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value"); - mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM"); - mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), ""); + ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", ToString(i))); + ASSERT_OK(mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN")); + ASSERT_OK(mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value")); + ASSERT_OK(mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM")); + ASSERT_OK(mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "")); elem.push_back(mem); } diff --git a/db/write_batch.cc b/db/write_batch.cc index 0e1ec1b07..8600b24b3 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1425,19 +1425,21 @@ class MemTableInserter : public WriteBatch::Handler { Status ret_status; if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) { - bool batch_boundry = false; - if (rebuilding_trx_ != nullptr) { + if (ret_status.ok() && rebuilding_trx_ != nullptr) { assert(!write_after_commit_); // The CF is probably flushed and hence no need for insert but we still // need to keep track of the keys for upcoming rollback/commit. ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); - assert(ret_status.ok()); - batch_boundry = IsDuplicateKeySeq(column_family_id, key); + if (ret_status.ok()) { + MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key)); + } + } else if (ret_status.ok()) { + MaybeAdvanceSeq(false /* batch_boundary */); } - MaybeAdvanceSeq(batch_boundry); return ret_status; } + assert(ret_status.ok()); MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetImmutableMemTableOptions(); @@ -1445,23 +1447,17 @@ class MemTableInserter : public WriteBatch::Handler { // any kind of transactions including the ones that use seq_per_batch assert(!seq_per_batch_ || !moptions->inplace_update_support); if (!moptions->inplace_update_support) { - bool mem_res = + ret_status = mem->Add(sequence_, value_type, key, value, concurrent_memtable_writes_, get_post_process_info(mem), hint_per_batch_ ? &GetHintMap()[mem] : nullptr); - if (UNLIKELY(!mem_res)) { - assert(seq_per_batch_); - ret_status = Status::TryAgain("key+seq exists"); - const bool BATCH_BOUNDRY = true; - MaybeAdvanceSeq(BATCH_BOUNDRY); - } } else if (moptions->inplace_callback == nullptr) { assert(!concurrent_memtable_writes_); - mem->Update(sequence_, key, value); + ret_status = mem->Update(sequence_, key, value); } else { assert(!concurrent_memtable_writes_); - if (mem->UpdateCallback(sequence_, key, value)) { - } else { + ret_status = mem->UpdateCallback(sequence_, key, value); + if (ret_status.IsNotFound()) { // key not found in memtable. Do sst get, update, add SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; @@ -1475,50 +1471,69 @@ class MemTableInserter : public WriteBatch::Handler { std::string merged_value; auto cf_handle = cf_mems_->GetColumnFamilyHandle(); - Status s = Status::NotSupported(); + Status get_status = Status::NotSupported(); if (db_ != nullptr && recovering_log_number_ == 0) { if (cf_handle == nullptr) { cf_handle = db_->DefaultColumnFamily(); } - s = db_->Get(ropts, cf_handle, key, &prev_value); + get_status = db_->Get(ropts, cf_handle, key, &prev_value); + } + // Intentionally overwrites the `NotFound` in `ret_status`. + if (!get_status.ok() && !get_status.IsNotFound()) { + ret_status = get_status; + } else { + ret_status = Status::OK(); } - char* prev_buffer = const_cast(prev_value.c_str()); - uint32_t prev_size = static_cast(prev_value.size()); - auto status = moptions->inplace_callback(s.ok() ? prev_buffer : nullptr, - s.ok() ? &prev_size : nullptr, - value, &merged_value); - if (status == UpdateStatus::UPDATED_INPLACE) { - // prev_value is updated in-place with final value. - bool mem_res __attribute__((__unused__)); - mem_res = mem->Add( - sequence_, value_type, key, Slice(prev_buffer, prev_size)); - assert(mem_res); - RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); - } else if (status == UpdateStatus::UPDATED) { - // merged_value contains the final value. - bool mem_res __attribute__((__unused__)); - mem_res = - mem->Add(sequence_, value_type, key, Slice(merged_value)); - assert(mem_res); - RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); + if (ret_status.ok()) { + UpdateStatus update_status; + char* prev_buffer = const_cast(prev_value.c_str()); + uint32_t prev_size = static_cast(prev_value.size()); + if (get_status.ok()) { + update_status = moptions->inplace_callback(prev_buffer, &prev_size, + value, &merged_value); + } else { + update_status = moptions->inplace_callback( + nullptr /* existing_value */, nullptr /* existing_value_size */, + value, &merged_value); + } + if (update_status == UpdateStatus::UPDATED_INPLACE) { + assert(get_status.ok()); + // prev_value is updated in-place with final value. + ret_status = mem->Add(sequence_, value_type, key, + Slice(prev_buffer, prev_size)); + if (ret_status.ok()) { + RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); + } + } else if (update_status == UpdateStatus::UPDATED) { + // merged_value contains the final value. + ret_status = + mem->Add(sequence_, value_type, key, Slice(merged_value)); + if (ret_status.ok()) { + RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN); + } + } } } } + if (UNLIKELY(ret_status.IsTryAgain())) { + assert(seq_per_batch_); + const bool kBatchBoundary = true; + MaybeAdvanceSeq(kBatchBoundary); + } else if (ret_status.ok()) { + MaybeAdvanceSeq(); + CheckMemtableFull(); + } // optimize for non-recovery mode - if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + // If `ret_status` is `TryAgain` then the next (successful) try will add + // the key to the rebuilding transaction object. If `ret_status` is + // another non-OK `Status`, then the `rebuilding_trx_` will be thrown + // away. So we only need to add to it when `ret_status.ok()`. + if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) { assert(!write_after_commit_); - // If the ret_status is TryAgain then let the next try to add the ky to - // the rebuilding transaction object. ret_status = WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); - assert(ret_status.ok()); } - // Since all Puts are logged in transaction logs (if enabled), always bump - // sequence number. Even if the update eventually fails and does not result - // in memtable add/update. - MaybeAdvanceSeq(); - CheckMemtableFull(); return ret_status; } @@ -1531,18 +1546,18 @@ class MemTableInserter : public WriteBatch::Handler { const Slice& value, ValueType delete_type) { Status ret_status; MemTable* mem = cf_mems_->GetMemTable(); - bool mem_res = + ret_status = mem->Add(sequence_, delete_type, key, value, concurrent_memtable_writes_, get_post_process_info(mem), hint_per_batch_ ? &GetHintMap()[mem] : nullptr); - if (UNLIKELY(!mem_res)) { + if (UNLIKELY(ret_status.IsTryAgain())) { assert(seq_per_batch_); - ret_status = Status::TryAgain("key+seq exists"); - const bool BATCH_BOUNDRY = true; - MaybeAdvanceSeq(BATCH_BOUNDRY); + const bool kBatchBoundary = true; + MaybeAdvanceSeq(kBatchBoundary); + } else if (ret_status.ok()) { + MaybeAdvanceSeq(); + CheckMemtableFull(); } - MaybeAdvanceSeq(); - CheckMemtableFull(); return ret_status; } @@ -1555,17 +1570,18 @@ class MemTableInserter : public WriteBatch::Handler { Status ret_status; if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) { - bool batch_boundry = false; - if (rebuilding_trx_ != nullptr) { + if (ret_status.ok() && rebuilding_trx_ != nullptr) { assert(!write_after_commit_); // The CF is probably flushed and hence no need for insert but we still // need to keep track of the keys for upcoming rollback/commit. ret_status = WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); - assert(ret_status.ok()); - batch_boundry = IsDuplicateKeySeq(column_family_id, key); + if (ret_status.ok()) { + MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key)); + } + } else if (ret_status.ok()) { + MaybeAdvanceSeq(false /* batch_boundary */); } - MaybeAdvanceSeq(batch_boundry); return ret_status; } @@ -1578,10 +1594,12 @@ class MemTableInserter : public WriteBatch::Handler { (0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp; ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type); // optimize for non-recovery mode - if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + // If `ret_status` is `TryAgain` then the next (successful) try will add + // the key to the rebuilding transaction object. If `ret_status` is + // another non-OK `Status`, then the `rebuilding_trx_` will be thrown + // away. So we only need to add to it when `ret_status.ok()`. + if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) { assert(!write_after_commit_); - // If the ret_status is TryAgain then let the next try to add the ky to - // the rebuilding transaction object. ret_status = WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); } @@ -1598,27 +1616,31 @@ class MemTableInserter : public WriteBatch::Handler { Status ret_status; if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) { - bool batch_boundry = false; - if (rebuilding_trx_ != nullptr) { + if (ret_status.ok() && rebuilding_trx_ != nullptr) { assert(!write_after_commit_); // The CF is probably flushed and hence no need for insert but we still // need to keep track of the keys for upcoming rollback/commit. ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key); - assert(ret_status.ok()); - batch_boundry = IsDuplicateKeySeq(column_family_id, key); + if (ret_status.ok()) { + MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key)); + } + } else if (ret_status.ok()) { + MaybeAdvanceSeq(false /* batch_boundary */); } - MaybeAdvanceSeq(batch_boundry); return ret_status; } + assert(ret_status.ok()); ret_status = DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion); // optimize for non-recovery mode - if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + // If `ret_status` is `TryAgain` then the next (successful) try will add + // the key to the rebuilding transaction object. If `ret_status` is + // another non-OK `Status`, then the `rebuilding_trx_` will be thrown + // away. So we only need to add to it when `ret_status.ok()`. + if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) { assert(!write_after_commit_); - // If the ret_status is TryAgain then let the next try to add the ky to - // the rebuilding transaction object. ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key); } @@ -1636,21 +1658,22 @@ class MemTableInserter : public WriteBatch::Handler { Status ret_status; if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) { - bool batch_boundry = false; - if (rebuilding_trx_ != nullptr) { + if (ret_status.ok() && rebuilding_trx_ != nullptr) { assert(!write_after_commit_); // The CF is probably flushed and hence no need for insert but we still // need to keep track of the keys for upcoming rollback/commit. ret_status = WriteBatchInternal::DeleteRange( rebuilding_trx_, column_family_id, begin_key, end_key); - assert(ret_status.ok()); - // TODO(myabandeh): when transactional DeleteRange support is added, - // check if end_key must also be added. - batch_boundry = IsDuplicateKeySeq(column_family_id, begin_key); + if (ret_status.ok()) { + MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, begin_key)); + } + } else if (ret_status.ok()) { + MaybeAdvanceSeq(false /* batch_boundary */); } - MaybeAdvanceSeq(batch_boundry); return ret_status; } + assert(ret_status.ok()); + if (db_ != nullptr) { auto cf_handle = cf_mems_->GetColumnFamilyHandle(); if (cf_handle == nullptr) { @@ -1659,6 +1682,8 @@ class MemTableInserter : public WriteBatch::Handler { auto* cfd = static_cast_with_check(cf_handle)->cfd(); if (!cfd->is_delete_range_supported()) { + // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`. + ret_status.PermitUncheckedError(); return Status::NotSupported( std::string("DeleteRange not supported for table type ") + cfd->ioptions()->table_factory->Name() + " in CF " + @@ -1666,10 +1691,14 @@ class MemTableInserter : public WriteBatch::Handler { } int cmp = cfd->user_comparator()->Compare(begin_key, end_key); if (cmp > 0) { + // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`. + ret_status.PermitUncheckedError(); // It's an empty range where endpoints appear mistaken. Don't bother // applying it to the DB, and return an error to the user. return Status::InvalidArgument("end key comes before start key"); } else if (cmp == 0) { + // TODO(ajkr): refactor `SeekToColumnFamily()` so it returns a `Status`. + ret_status.PermitUncheckedError(); // It's an empty range. Don't bother applying it to the DB. return Status::OK(); } @@ -1678,10 +1707,12 @@ class MemTableInserter : public WriteBatch::Handler { ret_status = DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion); // optimize for non-recovery mode + // If `ret_status` is `TryAgain` then the next (successful) try will add + // the key to the rebuilding transaction object. If `ret_status` is + // another non-OK `Status`, then the `rebuilding_trx_` will be thrown + // away. So we only need to add to it when `ret_status.ok()`. if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { assert(!write_after_commit_); - // If the ret_status is TryAgain then let the next try to add the ky to - // the rebuilding transaction object. ret_status = WriteBatchInternal::DeleteRange( rebuilding_trx_, column_family_id, begin_key, end_key); } @@ -1699,19 +1730,21 @@ class MemTableInserter : public WriteBatch::Handler { Status ret_status; if (UNLIKELY(!SeekToColumnFamily(column_family_id, &ret_status))) { - bool batch_boundry = false; - if (rebuilding_trx_ != nullptr) { + if (ret_status.ok() && rebuilding_trx_ != nullptr) { assert(!write_after_commit_); // The CF is probably flushed and hence no need for insert but we still // need to keep track of the keys for upcoming rollback/commit. ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); - assert(ret_status.ok()); - batch_boundry = IsDuplicateKeySeq(column_family_id, key); + if (ret_status.ok()) { + MaybeAdvanceSeq(IsDuplicateKeySeq(column_family_id, key)); + } + } else if (ret_status.ok()) { + MaybeAdvanceSeq(false /* batch_boundary */); } - MaybeAdvanceSeq(batch_boundry); return ret_status; } + assert(ret_status.ok()); MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetImmutableMemTableOptions(); @@ -1756,60 +1789,60 @@ class MemTableInserter : public WriteBatch::Handler { if (cf_handle == nullptr) { cf_handle = db_->DefaultColumnFamily(); } - db_->Get(read_options, cf_handle, key, &get_value); - Slice get_value_slice = Slice(get_value); - - // 2) Apply this merge - auto merge_operator = moptions->merge_operator; - assert(merge_operator); - - std::string new_value; - - Status merge_status = MergeHelper::TimedFullMerge( - merge_operator, key, &get_value_slice, {value}, &new_value, - moptions->info_log, moptions->statistics, Env::Default()); - - if (!merge_status.ok()) { - // Failed to merge! - // Store the delta in memtable + Status get_status = db_->Get(read_options, cf_handle, key, &get_value); + if (!get_status.ok()) { + // Failed to read a key we know exists. Store the delta in memtable. perform_merge = false; } else { - // 3) Add value to memtable - assert(!concurrent_memtable_writes_); - bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value); - if (UNLIKELY(!mem_res)) { - assert(seq_per_batch_); - ret_status = Status::TryAgain("key+seq exists"); - const bool BATCH_BOUNDRY = true; - MaybeAdvanceSeq(BATCH_BOUNDRY); + Slice get_value_slice = Slice(get_value); + + // 2) Apply this merge + auto merge_operator = moptions->merge_operator; + assert(merge_operator); + + std::string new_value; + + Status merge_status = MergeHelper::TimedFullMerge( + merge_operator, key, &get_value_slice, {value}, &new_value, + moptions->info_log, moptions->statistics, Env::Default()); + + if (!merge_status.ok()) { + // Failed to merge! + // Store the delta in memtable + perform_merge = false; + } else { + // 3) Add value to memtable + assert(!concurrent_memtable_writes_); + ret_status = mem->Add(sequence_, kTypeValue, key, new_value); } } } if (!perform_merge) { - // Add merge operator to memtable - bool mem_res = + // Add merge operand to memtable + ret_status = mem->Add(sequence_, kTypeMerge, key, value, concurrent_memtable_writes_, get_post_process_info(mem)); - if (UNLIKELY(!mem_res)) { - assert(seq_per_batch_); - ret_status = Status::TryAgain("key+seq exists"); - const bool BATCH_BOUNDRY = true; - MaybeAdvanceSeq(BATCH_BOUNDRY); - } } + if (UNLIKELY(ret_status.IsTryAgain())) { + assert(seq_per_batch_); + const bool kBatchBoundary = true; + MaybeAdvanceSeq(kBatchBoundary); + } else if (ret_status.ok()) { + MaybeAdvanceSeq(); + CheckMemtableFull(); + } // optimize for non-recovery mode - if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) { + // If `ret_status` is `TryAgain` then the next (successful) try will add + // the key to the rebuilding transaction object. If `ret_status` is + // another non-OK `Status`, then the `rebuilding_trx_` will be thrown + // away. So we only need to add to it when `ret_status.ok()`. + if (UNLIKELY(ret_status.ok() && rebuilding_trx_ != nullptr)) { assert(!write_after_commit_); - // If the ret_status is TryAgain then let the next try to add the ky to - // the rebuilding transaction object. ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); - assert(ret_status.ok()); } - MaybeAdvanceSeq(); - CheckMemtableFull(); return ret_status; } diff --git a/table/table_test.cc b/table/table_test.cc index 4ff4c7fa9..592493418 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -504,7 +504,10 @@ class MemTableConstructor: public Constructor { memtable_->Ref(); int seq = 1; for (const auto& kv : kv_map) { - memtable_->Add(seq, kTypeValue, kv.first, kv.second); + Status s = memtable_->Add(seq, kTypeValue, kv.first, kv.second); + if (!s.ok()) { + return s; + } seq++; } return Status::OK();