diff --git a/db/blob/db_blob_basic_test.cc b/db/blob/db_blob_basic_test.cc index 711ef895b..c1ac3e79f 100644 --- a/db/blob/db_blob_basic_test.cc +++ b/db/blob/db_blob_basic_test.cc @@ -3,6 +3,8 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include + #include "db/blob/blob_index.h" #include "db/db_test_util.h" #include "port/stack_trace.h" @@ -44,6 +46,85 @@ TEST_F(DBBlobBasicTest, GetBlob) { .IsIncomplete()); } +TEST_F(DBBlobBasicTest, MultiGetBlobs) { + constexpr size_t min_blob_size = 6; + + Options options = GetDefaultOptions(); + options.enable_blob_files = true; + options.min_blob_size = min_blob_size; + + Reopen(options); + + // Put then retrieve three key-values. The first value is below the size limit + // and is thus stored inline; the other two are stored separately as blobs. + constexpr size_t num_keys = 3; + + constexpr char first_key[] = "first_key"; + constexpr char first_value[] = "short"; + static_assert(sizeof(first_value) - 1 < min_blob_size, + "first_value too long to be inlined"); + + ASSERT_OK(Put(first_key, first_value)); + + constexpr char second_key[] = "second_key"; + constexpr char second_value[] = "long_value"; + static_assert(sizeof(second_value) - 1 >= min_blob_size, + "second_value too short to be stored as blob"); + + ASSERT_OK(Put(second_key, second_value)); + + constexpr char third_key[] = "third_key"; + constexpr char third_value[] = "other_long_value"; + static_assert(sizeof(third_value) - 1 >= min_blob_size, + "third_value too short to be stored as blob"); + + ASSERT_OK(Put(third_key, third_value)); + + ASSERT_OK(Flush()); + + ReadOptions read_options; + + std::array keys{{first_key, second_key, third_key}}; + + { + std::array values; + std::array statuses; + + db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value); + + ASSERT_OK(statuses[1]); + ASSERT_EQ(values[1], second_value); + + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[2], third_value); + } + + // Try again with no I/O allowed. The table and the necessary blocks should + // already be in their respective caches. The first (inlined) value should be + // successfully read; however, the two blob values could only be read from the + // blob file, so for those the read should return Incomplete. + read_options.read_tier = kBlockCacheTier; + + { + std::array values; + std::array statuses; + + db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + ASSERT_OK(statuses[0]); + ASSERT_EQ(values[0], first_value); + + ASSERT_TRUE(statuses[1].IsIncomplete()); + + ASSERT_TRUE(statuses[2].IsIncomplete()); + } +} + TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) { Options options = GetDefaultOptions(); options.enable_blob_files = true; @@ -175,6 +256,48 @@ TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) { SyncPoint::GetInstance()->ClearAllCallBacks(); } +TEST_P(DBBlobBasicIOErrorTest, MultiGetBlobs_IOError) { + Options options = GetDefaultOptions(); + options.env = fault_injection_env_.get(); + options.enable_blob_files = true; + options.min_blob_size = 0; + + Reopen(options); + + constexpr size_t num_keys = 2; + + constexpr char first_key[] = "first_key"; + constexpr char first_value[] = "first_value"; + + ASSERT_OK(Put(first_key, first_value)); + + constexpr char second_key[] = "second_key"; + constexpr char second_value[] = "second_value"; + + ASSERT_OK(Put(second_key, second_value)); + + ASSERT_OK(Flush()); + + std::array keys{{first_key, second_key}}; + std::array values; + std::array statuses; + + SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { + fault_injection_env_->SetFilesystemActive(false, + Status::IOError(sync_point_)); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0], + &values[0], &statuses[0]); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_TRUE(statuses[0].IsIOError()); + ASSERT_TRUE(statuses[1].IsIOError()); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index b43dfe23d..661811667 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2205,7 +2205,7 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, for (; cf_iter != multiget_cf_data.end(); ++cf_iter) { s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, &sorted_keys, cf_iter->super_version, consistent_seqnum, - read_callback, nullptr); + read_callback); if (!s.ok()) { break; } @@ -2377,7 +2377,7 @@ void DBImpl::MultiGetWithCallback( Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys, multiget_cf_data[0].super_version, consistent_seqnum, - read_callback, nullptr); + read_callback); assert(s.ok() || s.IsTimedOut() || s.IsAborted()); ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd, multiget_cf_data[0].super_version); @@ -2396,7 +2396,7 @@ Status DBImpl::MultiGetImpl( const ReadOptions& read_options, size_t start_key, size_t num_keys, autovector* sorted_keys, SuperVersion* super_version, SequenceNumber snapshot, - ReadCallback* callback, bool* is_blob_index) { + ReadCallback* callback) { PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); StopWatch sw(env_, stats_, DB_MULTIGET); @@ -2435,11 +2435,9 @@ Status DBImpl::MultiGetImpl( (read_options.read_tier == kPersistedTier && has_unpersisted_data_.load(std::memory_order_relaxed)); if (!skip_memtable) { - super_version->mem->MultiGet(read_options, &range, callback, - is_blob_index); + super_version->mem->MultiGet(read_options, &range, callback); if (!range.empty()) { - super_version->imm->MultiGet(read_options, &range, callback, - is_blob_index); + super_version->imm->MultiGet(read_options, &range, callback); } if (!range.empty()) { lookup_current = true; @@ -2449,8 +2447,7 @@ Status DBImpl::MultiGetImpl( } if (lookup_current) { PERF_TIMER_GUARD(get_from_output_files_time); - super_version->current->MultiGet(read_options, &range, callback, - is_blob_index); + super_version->current->MultiGet(read_options, &range, callback); } curr_value_size = range.GetValueSize(); if (curr_value_size > read_options.value_size_soft_limit) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 67a7b4678..0a09aa1a4 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1866,8 +1866,7 @@ class DBImpl : public DB { Status MultiGetImpl( const ReadOptions& read_options, size_t start_key, size_t num_keys, autovector* sorted_keys, - SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback, - bool* is_blob_index); + SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback); Status DisableFileDeletionsWithLock(); diff --git a/db/memtable.cc b/db/memtable.cc index be4c5b527..49f0a4c9c 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -897,7 +897,7 @@ void MemTable::GetFromTable(const LookupKey& key, } void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, - ReadCallback* callback, bool* is_blob) { + ReadCallback* callback) { // The sequence number is updated synchronously in version_set.h if (IsEmpty()) { // Avoiding recording stats for speed. @@ -950,9 +950,9 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key())); } GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true, - callback, is_blob, iter->value->GetSelf(), iter->timestamp, - iter->s, &(iter->merge_context), &seq, &found_final_value, - &merge_in_progress); + callback, &iter->is_blob_index, iter->value->GetSelf(), + iter->timestamp, iter->s, &(iter->merge_context), &seq, + &found_final_value, &merge_in_progress); if (!found_final_value && merge_in_progress) { *(iter->s) = Status::MergeInProgress(); diff --git a/db/memtable.h b/db/memtable.h index 04600262b..525582698 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -238,7 +238,7 @@ class MemTable { } void MultiGet(const ReadOptions& read_options, MultiGetRange* range, - ReadCallback* callback, bool* is_blob); + ReadCallback* callback); // If `key` exists in current memtable with type `kTypeValue` and the existing // value is at least as large as the new value, updates it in-place. Otherwise diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 3cf955bf8..438549c11 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -113,10 +113,10 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value, } void MemTableListVersion::MultiGet(const ReadOptions& read_options, - MultiGetRange* range, ReadCallback* callback, - bool* is_blob) { + MultiGetRange* range, + ReadCallback* callback) { for (auto memtable : memlist_) { - memtable->MultiGet(read_options, range, callback, is_blob); + memtable->MultiGet(read_options, range, callback); if (range->empty()) { return; } diff --git a/db/memtable_list.h b/db/memtable_list.h index 814dbd9f9..493a54d40 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -76,7 +76,7 @@ class MemTableListVersion { } void MultiGet(const ReadOptions& read_options, MultiGetRange* range, - ReadCallback* callback, bool* is_blob); + ReadCallback* callback); // Returns all the merge operands corresponding to the key by searching all // memtables starting from the most recent one. diff --git a/db/version_set.cc b/db/version_set.cc index 5d728ba9e..836acf0c4 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1938,6 +1938,17 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, // TODO: update per-level perfcontext user_key_return_count for kMerge break; case GetContext::kFound: + if (fp.GetHitFileLevel() == 0) { + RecordTick(db_statistics_, GET_HIT_L0); + } else if (fp.GetHitFileLevel() == 1) { + RecordTick(db_statistics_, GET_HIT_L1); + } else if (fp.GetHitFileLevel() >= 2) { + RecordTick(db_statistics_, GET_HIT_L2_AND_UP); + } + + PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, + fp.GetHitFileLevel()); + if (is_blob_index) { if (do_merge && value) { *status = GetBlob(read_options, user_key, *value, value); @@ -1950,15 +1961,6 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } } - if (fp.GetHitFileLevel() == 0) { - RecordTick(db_statistics_, GET_HIT_L0); - } else if (fp.GetHitFileLevel() == 1) { - RecordTick(db_statistics_, GET_HIT_L1); - } else if (fp.GetHitFileLevel() >= 2) { - RecordTick(db_statistics_, GET_HIT_L2_AND_UP); - } - PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, - fp.GetHitFileLevel()); return; case GetContext::kDeleted: // Use empty error message for speed @@ -2008,7 +2010,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, } void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, - ReadCallback* callback, bool* is_blob) { + ReadCallback* callback) { PinnedIteratorsManager pinned_iters_mgr; // Pin blocks that we read to hold merge operands @@ -2033,7 +2035,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, iter->ukey_with_ts, iter->value, iter->timestamp, nullptr, &(iter->merge_context), true, &iter->max_covering_tombstone_seq, this->env_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, - callback, is_blob, tracing_mget_id); + callback, &iter->is_blob_index, tracing_mget_id); // MergeInProgress status, if set, has been transferred to the get_context // state, so we set status to ok here. From now on, the iter status will // be used for IO errors, and get_context state will be used for any @@ -2135,10 +2137,27 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, } else if (fp.GetHitFileLevel() >= 2) { RecordTick(db_statistics_, GET_HIT_L2_AND_UP); } + PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, fp.GetHitFileLevel()); - file_range.AddValueSize(iter->value->size()); + file_range.MarkKeyDone(iter); + + if (iter->is_blob_index) { + if (iter->value) { + *status = GetBlob(read_options, iter->ukey_with_ts, *iter->value, + iter->value); + if (!status->ok()) { + if (status->IsIncomplete()) { + get_context.MarkKeyMayExist(); + } + + continue; + } + } + } + + file_range.AddValueSize(iter->value->size()); if (file_range.GetValueSize() > read_options.value_size_soft_limit) { s = Status::Aborted(); break; diff --git a/db/version_set.h b/db/version_set.h index 94e2e0c03..78b26ba4d 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -682,7 +682,7 @@ class Version { bool* is_blob = nullptr, bool do_merge = true); void MultiGet(const ReadOptions&, MultiGetRange* range, - ReadCallback* callback = nullptr, bool* is_blob = nullptr); + ReadCallback* callback = nullptr); // Interprets blob_index_slice as a blob reference, and (assuming the // corresponding blob file is part of this Version) retrieves the blob and diff --git a/table/multiget_context.h b/table/multiget_context.h index 796def8ea..1c9f8da94 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -31,6 +31,7 @@ struct KeyContext { MergeContext merge_context; SequenceNumber max_covering_tombstone_seq; bool key_exists; + bool is_blob_index; void* cb_arg; PinnableSlice* value; std::string* timestamp; @@ -44,6 +45,7 @@ struct KeyContext { s(stat), max_covering_tombstone_seq(0), key_exists(false), + is_blob_index(false), cb_arg(nullptr), value(val), timestamp(ts),