From 3e49249d302076b45a9cd6b3de3306f00f60bbca Mon Sep 17 00:00:00 2001 From: anand76 Date: Tue, 11 Feb 2020 17:25:10 -0800 Subject: [PATCH] Ensure all MultiGet IO errors are propagated to user (#6403) Summary: Unrevert the previous fix to propagate error status, and an additional fix to not treat a memtable lookup MergeInProgress status as an error. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6403 Test Plan: Unit tests Tried running stress tests but couldn't repro the stress failure Differential Revision: D19846721 Pulled By: anand1976 fbshipit-source-id: 7db10cccbdc863d9b559497f0a46b608d2488ca4 --- HISTORY.md | 1 + db/db_basic_test.cc | 169 ++++++++++++++++++ db/table_cache.cc | 1 + db/version_set.cc | 14 ++ table/block_based/block_based_table_reader.cc | 1 + 5 files changed, 186 insertions(+) diff --git a/HISTORY.md b/HISTORY.md index f40dabf6c..89404ae65 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -10,6 +10,7 @@ * Fixed issue #6316 that can cause a corruption of the MANIFEST file in the middle when writing to it fails due to no disk space. * Add DBOptions::skip_checking_sst_file_sizes_on_db_open. It disables potentially expensive checking of all sst file sizes in DB::Open(). * BlobDB now ignores trivially moved files when updating the mapping between blob files and SSTs. This should mitigate issue #6338 where out of order flush/compaction notifications could trigger an assertion with the earlier code. +* Batched MultiGet() ignores IO errors while reading data blocks, causing it to potentially continue looking for a key and returning stale results. ### Performance Improvements * Perfom readahead when reading from option files. Inside DB, options.log_readahead_size will be used as the readahead size. In other cases, a default 512KB is used. diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 86f6f810d..5294c1a60 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1405,6 +1405,91 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) { } } +TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + BlockBasedTableOptions bbto; + bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Reopen(options); + int num_keys = 0; + + for (int i = 0; i < 128; ++i) { + ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + MoveFilesToLevel(2); + + for (int i = 0; i < 128; i += 3) { + ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + MoveFilesToLevel(1); + + for (int i = 0; i < 128; i += 5) { + ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + Flush(); + num_keys = 0; + } + } + if (num_keys > 0) { + Flush(); + num_keys = 0; + } + ASSERT_EQ(0, num_keys); + + for (int i = 0; i < 128; i += 9) { + ASSERT_OK(Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i))); + } + + std::vector keys; + std::vector values; + + for (int i = 32; i < 80; ++i) { + keys.push_back("key_" + std::to_string(i)); + } + + values = MultiGet(keys, nullptr); + ASSERT_EQ(values.size(), keys.size()); + for (unsigned int j = 0; j < 48; ++j) { + int key = j + 32; + std::string value; + value.append("val_l2_" + std::to_string(key)); + if (key % 3 == 0) { + value.append(","); + value.append("val_l1_" + std::to_string(key)); + } + if (key % 5 == 0) { + value.append(","); + value.append("val_l0_" + std::to_string(key)); + } + if (key % 9 == 0) { + value.append(","); + value.append("val_mem_" + std::to_string(key)); + } + ASSERT_EQ(values[j], value); + } +} + // Test class for batched MultiGet with prefix extractor // Param bool - If true, use partitioned filters // If false, use full filter block @@ -2011,6 +2096,90 @@ TEST_P(DBBasicTestWithParallelIO, MultiGet) { } } +TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) { + std::vector key_data(10); + std::vector keys; + // We cannot resize a PinnableSlice vector, so just set initial size to + // largest we think we will need + std::vector values(10); + std::vector statuses; + int read_count = 0; + ReadOptions ro; + ro.fill_cache = fill_cache(); + + SyncPoint::GetInstance()->SetCallBack( + "RetrieveMultipleBlocks:VerifyChecksum", [&](void *status) { + Status* s = static_cast(status); + read_count++; + if (read_count == 2) { + *s = Status::Corruption(); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Warm up the cache first + key_data.emplace_back(Key(0)); + keys.emplace_back(Slice(key_data.back())); + key_data.emplace_back(Key(50)); + keys.emplace_back(Slice(key_data.back())); + statuses.resize(keys.size()); + + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data(), true); + ASSERT_TRUE(CheckValue(0, values[0].ToString())); + //ASSERT_TRUE(CheckValue(50, values[1].ToString())); + ASSERT_EQ(statuses[0], Status::OK()); + ASSERT_EQ(statuses[1], Status::Corruption()); + + SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) { + std::vector key_data(10); + std::vector keys; + // We cannot resize a PinnableSlice vector, so just set initial size to + // largest we think we will need + std::vector values(10); + std::vector statuses; + ReadOptions ro; + ro.fill_cache = fill_cache(); + + SyncPoint::GetInstance()->SetCallBack( + "TableCache::MultiGet:FindTable", [&](void *status) { + Status* s = static_cast(status); + *s = Status::IOError(); + }); + // DB open will create table readers unless we reduce the table cache + // capacity. + // SanitizeOptions will set max_open_files to minimum of 20. Table cache + // is allocated with max_open_files - 10 as capacity. So override + // max_open_files to 11 so table cache capacity will become 1. This will + // prevent file open during DB open and force the file to be opened + // during MultiGet + SyncPoint::GetInstance()->SetCallBack( + "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void *arg) { + int* max_open_files = (int*)arg; + *max_open_files = 11; + }); + SyncPoint::GetInstance()->EnableProcessing(); + + Reopen(CurrentOptions()); + + // Warm up the cache first + key_data.emplace_back(Key(0)); + keys.emplace_back(Slice(key_data.back())); + key_data.emplace_back(Key(50)); + keys.emplace_back(Slice(key_data.back())); + statuses.resize(keys.size()); + + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data(), true); + ASSERT_EQ(statuses[0], Status::IOError()); + ASSERT_EQ(statuses[1], Status::IOError()); + + SyncPoint::GetInstance()->DisableProcessing(); +} + INSTANTIATE_TEST_CASE_P( ParallelIO, DBBasicTestWithParallelIO, // Params are as follows - diff --git a/db/table_cache.cc b/db/table_cache.cc index 12bd90230..5dc895d84 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -490,6 +490,7 @@ Status TableCache::MultiGet(const ReadOptions& options, file_options_, internal_comparator, fd, &handle, prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */, true /* record_read_stats */, file_read_hist, skip_filters, level); + TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s); if (s.ok()) { t = GetTableReaderFromHandle(handle); assert(t); diff --git a/db/version_set.cc b/db/version_set.cc index a88ca2e5c..794060d8a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1923,6 +1923,11 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, &iter->max_covering_tombstone_seq, this->env_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob, tracing_mget_id); + // MergeInProgress status, if set, has been transferred to the get_context + // state, so we set status to ok here. From now on, the iter status will + // be used for IO errors, and get_context state will be used for any + // key level errors + *(iter->s) = Status::OK(); } int get_ctx_index = 0; for (auto iter = range->begin(); iter != range->end(); @@ -1967,6 +1972,15 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) { GetContext& get_context = *iter->get_context; Status* status = iter->s; + // The Status in the KeyContext takes precedence over GetContext state + // Status may be an error if there were any IO errors in the table + // reader. We never expect Status to be NotFound(), as that is + // determined by get_context + assert(!status->IsNotFound()); + if (!status->ok()) { + file_range.MarkKeyDone(iter); + continue; + } if (get_context.sample()) { sample_file_read_inc(f->file_metadata); diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 336669f1b..0a6424b73 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -2458,6 +2458,7 @@ void BlockBasedTable::RetrieveMultipleBlocks( s = rocksdb::VerifyChecksum(footer.checksum(), req.result.data() + req_offset, handle.size() + 1, expected); + TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s); } }