From 7259e28d915af72dd0cd6d055ab966644d83dd68 Mon Sep 17 00:00:00 2001 From: anand76 Date: Sun, 30 Jun 2019 20:52:34 -0700 Subject: [PATCH] MultiGet parallel IO (#5464) Summary: Enhancement to MultiGet batching to read data blocks required for keys in a batch in parallel from disk. It uses Env::MultiRead() API to read multiple blocks and reduce latency. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5464 Test Plan: 1. make check 2. make asan_check 3. make asan_crash Differential Revision: D15911771 Pulled By: anand1976 fbshipit-source-id: 605036b9af0f90ca0020dc87c3a86b4da6e83394 --- HISTORY.md | 1 + db/db_basic_test.cc | 289 ++++++++++ table/block_based/block_based_table_reader.cc | 519 +++++++++++++++--- table/block_based/block_based_table_reader.h | 23 +- table/format.h | 2 + util/file_reader_writer.cc | 43 ++ util/file_reader_writer.h | 2 + 7 files changed, 812 insertions(+), 67 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 79feac37c..2c8dc8c3a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -25,6 +25,7 @@ * DBIter::Next() can skip user key checking if previous entry's seqnum is 0. * Merging iterator to avoid child iterator reseek for some cases * Log Writer will flush after finishing the whole record, rather than a fragment. +* Lower MultiGet batching API latency by reading data blocks from disk in parallel ### General Improvements * Added new status code kColumnFamilyDropped to distinguish between Column Family Dropped and DB Shutdown in progress. diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 1aec864dd..66d3b3aff 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -10,6 +10,7 @@ #include "db/db_test_util.h" #include "port/stack_trace.h" #include "rocksdb/perf_context.h" +#include "table/block_based/block_builder.h" #include "test_util/fault_injection_test_env.h" #if !defined(ROCKSDB_LITE) #include "test_util/sync_point.h" @@ -1285,6 +1286,294 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) { } } +class DBBasicTestWithParallelIO + : public DBTestBase, + public testing::WithParamInterface> { + public: + DBBasicTestWithParallelIO() + : DBTestBase("/db_basic_test_with_parallel_io") { + bool compressed_cache = std::get<0>(GetParam()); + bool uncompressed_cache = std::get<1>(GetParam()); + compression_enabled_ = std::get<2>(GetParam()); + fill_cache_ = std::get<3>(GetParam()); + + if (compressed_cache) { + std::shared_ptr cache = NewLRUCache(1048576); + compressed_cache_ = std::make_shared(cache); + } + if (uncompressed_cache) { + std::shared_ptr cache = NewLRUCache(1048576); + uncompressed_cache_ = std::make_shared(cache); + } + + env_->count_random_reads_ = true; + + Options options = CurrentOptions(); + Random rnd(301); + BlockBasedTableOptions table_options; + table_options.pin_l0_filter_and_index_blocks_in_cache = true; + table_options.block_cache = uncompressed_cache_; + table_options.block_cache_compressed = compressed_cache_; + table_options.flush_block_policy_factory.reset( + new MyFlushBlockPolicyFactory()); + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + if (!compression_enabled_) { + options.compression = kNoCompression; + } + Reopen(options); + + std::string zero_str(128, '\0'); + for (int i = 0; i < 100; ++i) { + // Make the value compressible. A purely random string doesn't compress + // and the resultant data block will not be compressed + values_.emplace_back(RandomString(&rnd, 128) + zero_str); + assert(Put(Key(i), values_[i]) == Status::OK()); + } + Flush(); + } + + bool CheckValue(int i, const std::string& value) { + if (values_[i].compare(value) == 0) { + return true; + } + return false; + } + + int num_lookups() { return uncompressed_cache_->num_lookups(); } + int num_found() { return uncompressed_cache_->num_found(); } + int num_inserts() { return uncompressed_cache_->num_inserts(); } + + int num_lookups_compressed() { + return compressed_cache_->num_lookups(); + } + int num_found_compressed() { + return compressed_cache_->num_found(); + } + int num_inserts_compressed() { + return compressed_cache_->num_inserts(); + } + + bool fill_cache() { return fill_cache_; } + + static void SetUpTestCase() {} + static void TearDownTestCase() {} + + private: + class MyFlushBlockPolicyFactory + : public FlushBlockPolicyFactory { + public: + MyFlushBlockPolicyFactory() {} + + virtual const char* Name() const override { + return "MyFlushBlockPolicyFactory"; + } + + virtual FlushBlockPolicy* NewFlushBlockPolicy( + const BlockBasedTableOptions& /*table_options*/, + const BlockBuilder& data_block_builder) const override { + return new MyFlushBlockPolicy(data_block_builder); + } + }; + + class MyFlushBlockPolicy + : public FlushBlockPolicy { + public: + explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder) + : num_keys_(0), data_block_builder_(data_block_builder) {} + + bool Update(const Slice& /*key*/, const Slice& /*value*/) override { + if (data_block_builder_.empty()) { + // First key in this block + num_keys_ = 1; + return false; + } + // Flush every 10 keys + if (num_keys_ == 10) { + num_keys_ = 1; + return true; + } + num_keys_++; + return false; + } + + private: + int num_keys_; + const BlockBuilder& data_block_builder_; + }; + + class MyBlockCache + : public Cache { + public: + explicit MyBlockCache(std::shared_ptr& target) + : target_(target), num_lookups_(0), num_found_(0), num_inserts_(0) {} + + virtual const char* Name() const override { return "MyBlockCache"; } + + virtual Status Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Handle** handle = nullptr, + Priority priority = Priority::LOW) override { + num_inserts_++; + return target_->Insert(key, value, charge, deleter, handle, priority); + } + + virtual Handle* Lookup(const Slice& key, + Statistics* stats = nullptr) override { + num_lookups_++; + Handle* handle = target_->Lookup(key, stats); + if (handle != nullptr) { + num_found_++; + } + return handle; + } + + virtual bool Ref(Handle* handle) override { + return target_->Ref(handle); + } + + virtual bool Release(Handle* handle, bool force_erase = false) override { + return target_->Release(handle, force_erase); + } + + virtual void* Value(Handle* handle) override { + return target_->Value(handle); + } + + virtual void Erase(const Slice& key) override { + target_->Erase(key); + } + virtual uint64_t NewId() override { + return target_->NewId(); + } + + virtual void SetCapacity(size_t capacity) override { + target_->SetCapacity(capacity); + } + + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override { + target_->SetStrictCapacityLimit(strict_capacity_limit); + } + + virtual bool HasStrictCapacityLimit() const override { + return target_->HasStrictCapacityLimit(); + } + + virtual size_t GetCapacity() const override { + return target_->GetCapacity(); + } + + virtual size_t GetUsage() const override { + return target_->GetUsage(); + } + + virtual size_t GetUsage(Handle* handle) const override { + return target_->GetUsage(handle); + } + + virtual size_t GetPinnedUsage() const override { + return target_->GetPinnedUsage(); + } + + virtual size_t GetCharge(Handle* /*handle*/) const override { return 0; } + + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) override { + return target_->ApplyToAllCacheEntries(callback, thread_safe); + } + + virtual void EraseUnRefEntries() override { + return target_->EraseUnRefEntries(); + } + + int num_lookups() { return num_lookups_; } + + int num_found() { return num_found_; } + + int num_inserts() { return num_inserts_; } + private: + std::shared_ptr target_; + int num_lookups_; + int num_found_; + int num_inserts_; + }; + + std::shared_ptr compressed_cache_; + std::shared_ptr uncompressed_cache_; + bool compression_enabled_; + std::vector values_; + bool fill_cache_; +}; + +TEST_P(DBBasicTestWithParallelIO, MultiGet) { + std::vector key_data(10); + std::vector keys; + // We cannot resize a PinnableSlice vector, so just set initial size to + // largest we think we will need + std::vector values(10); + std::vector statuses; + ReadOptions ro; + ro.fill_cache = fill_cache(); + + // Warm up the cache first + key_data.emplace_back(Key(0)); + keys.emplace_back(Slice(key_data.back())); + key_data.emplace_back(Key(50)); + keys.emplace_back(Slice(key_data.back())); + statuses.resize(keys.size()); + + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data(), true); + ASSERT_TRUE(CheckValue(0, values[0].ToString())); + ASSERT_TRUE(CheckValue(50, values[1].ToString())); + + int random_reads = env_->random_read_counter_.Read(); + key_data[0] = Key(1); + key_data[1] = Key(51); + keys[0] = Slice(key_data[0]); + keys[1] = Slice(key_data[1]); + values[0].Reset(); + values[1].Reset(); + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data(), true); + ASSERT_TRUE(CheckValue(1, values[0].ToString())); + ASSERT_TRUE(CheckValue(51, values[1].ToString())); + + int expected_reads = random_reads + (fill_cache() ? 0 : 2); + ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads); + + keys.resize(10); + statuses.resize(10); + std::vector key_ints{1,2,15,16,55,81,82,83,84,85}; + for (size_t i = 0; i < key_ints.size(); ++i) { + key_data[i] = Key(key_ints[i]); + keys[i] = Slice(key_data[i]); + statuses[i] = Status::OK(); + values[i].Reset(); + } + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data(), true); + for (size_t i = 0; i < key_ints.size(); ++i) { + ASSERT_OK(statuses[i]); + ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString())); + } + expected_reads += (fill_cache() ? 2 : 4); + ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads); +} + +INSTANTIATE_TEST_CASE_P( + ParallelIO, DBBasicTestWithParallelIO, + // Params are as follows - + // Param 0 - Compressed cache enabled + // Param 1 - Uncompressed cache enabled + // Param 2 - Data compression enabled + // Param 3 - ReadOptions::fill_cache + ::testing::Values(std::make_tuple(false, true, true, true), + std::make_tuple(true, true, true, true), + std::make_tuple(false, true, false, true), + std::make_tuple(false, true, true, false), + std::make_tuple(true, true, true, false), + std::make_tuple(false, true, false, false))); + class DBBasicTestWithTimestampWithParam : public DBTestBase, public testing::WithParamInterface { diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 017d6126c..edddecf78 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -160,6 +160,13 @@ bool PrefixExtractorChanged(const TableProperties* table_properties, } } +CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) { + CacheAllocationPtr heap_buf; + heap_buf = AllocateBlock(buf.size(), allocator); + memcpy(heap_buf.get(), buf.data(), buf.size()); + return heap_buf; +} + } // namespace // Encapsulates common functionality for the various index reader @@ -421,7 +428,8 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon { // filter blocks s = table()->MaybeReadBlockAndLoadToCache( prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(), - &block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context); + &block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context, + /*contents=*/nullptr); assert(s.ok() || block.GetValue() == nullptr); if (s.ok() && block.GetValue() != nullptr) { @@ -1745,8 +1753,6 @@ Status BlockBasedTable::PutDataBlockToCache( : Cache::Priority::LOW; assert(cached_block); assert(cached_block->IsEmpty()); - assert(raw_block_comp_type == kNoCompression || - block_cache_compressed != nullptr); Status s; Statistics* statistics = ioptions.statistics; @@ -2195,11 +2201,105 @@ IndexBlockIter* BlockBasedTable::InitBlockIterator( rep->index_value_is_full, block_contents_pinned); } +// Convert an uncompressed data block (i.e CachableEntry) +// into an iterator over the contents of the corresponding block. +// If input_iter is null, new a iterator +// If input_iter is not null, update this iter and return it +template +TBlockIter* BlockBasedTable::NewDataBlockIterator( + const ReadOptions& ro, CachableEntry& block, TBlockIter* input_iter, + Status s) const { + PERF_TIMER_GUARD(new_table_block_iter_nanos); + + TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter; + if (!s.ok()) { + iter->Invalidate(s); + return iter; + } + + assert(block.GetValue() != nullptr); + // Block contents are pinned and it is still pinned after the iterator + // is destroyed as long as cleanup functions are moved to another object, + // when: + // 1. block cache handle is set to be released in cleanup function, or + // 2. it's pointing to immortal source. If own_bytes is true then we are + // not reading data from the original source, whether immortal or not. + // Otherwise, the block is pinned iff the source is immortal. + const bool block_contents_pinned = + block.IsCached() || + (!block.GetValue()->own_bytes() && rep_->immortal_table); + iter = InitBlockIterator(rep_, block.GetValue(), iter, + block_contents_pinned); + + if (!block.IsCached()) { + if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) { + // insert a dummy record to block cache to track the memory usage + Cache* const block_cache = rep_->table_options.block_cache.get(); + Cache::Handle* cache_handle = nullptr; + // There are two other types of cache keys: 1) SST cache key added in + // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in + // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate + // from SST cache key(31 bytes), and use non-zero prefix to + // differentiate from `write_buffer_manager` + const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1; + char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length]; + // Prefix: use rep_->cache_key_prefix padded by 0s + memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length); + assert(rep_->cache_key_prefix_size != 0); + assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix); + memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size); + char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix, + next_cache_key_id_++); + assert(end - cache_key <= + static_cast(kExtraCacheKeyPrefix + kMaxVarint64Length)); + const Slice unique_key(cache_key, static_cast(end - cache_key)); + s = block_cache->Insert(unique_key, nullptr, + block.GetValue()->ApproximateMemoryUsage(), + nullptr, &cache_handle); + if (s.ok()) { + assert(cache_handle != nullptr); + iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, + cache_handle); + } + } + } else { + iter->SetCacheHandle(block.GetCacheHandle()); + } + + block.TransferTo(iter); + return iter; +} + +// Lookup the cache for the given data block referenced by an index iterator +// value (i.e BlockHandle). If it exists in the cache, initialize block to +// the contents of the data block. +Status BlockBasedTable::GetDataBlockFromCache( + const ReadOptions& ro, const BlockHandle& handle, + const UncompressionDict& uncompression_dict, + CachableEntry* block, BlockType block_type, + GetContext* get_context) const { + BlockCacheLookupContext lookup_data_block_context( + TableReaderCaller::kUserMultiGet); + Status s = RetrieveBlock(nullptr, ro, handle, uncompression_dict, block, + block_type, get_context, &lookup_data_block_context); + if (s.IsIncomplete()) { + s = Status::OK(); + } + + return s; +} + +// If contents is nullptr, this function looks up the block caches for the +// data block referenced by handle, and read the block from disk if necessary. +// If contents is non-null, it skips the cache lookup and disk read, since +// the caller has already read it. In both cases, if ro.fill_cache is true, +// it inserts the block into the block cache. Status BlockBasedTable::MaybeReadBlockAndLoadToCache( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, BlockType block_type, - GetContext* get_context, BlockCacheLookupContext* lookup_context) const { + GetContext* get_context, BlockCacheLookupContext* lookup_context, + BlockContents* contents) const { assert(block_entry != nullptr); const bool no_io = (ro.read_tier == kBlockCacheTier); Cache* block_cache = rep_->table_options.block_cache.get(); @@ -2231,14 +2331,17 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( compressed_cache_key); } - s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed, - ro, block_entry, uncompression_dict, block_type, - get_context); - if (block_entry->GetValue()) { - // TODO(haoyu): Differentiate cache hit on uncompressed block cache and - // compressed block cache. - is_cache_hit = true; + if (!contents) { + s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed, + ro, block_entry, uncompression_dict, block_type, + get_context); + if (block_entry->GetValue()) { + // TODO(haoyu): Differentiate cache hit on uncompressed block cache and + // compressed block cache. + is_cache_hit = true; + } } + // Can't find the block from the cache. If I/O is allowed, read from the // file. if (block_entry->GetValue() == nullptr && !no_io && ro.fill_cache) { @@ -2248,7 +2351,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( block_cache_compressed == nullptr && rep_->blocks_maybe_compressed; CompressionType raw_block_comp_type; BlockContents raw_block_contents; - { + if (!contents) { StopWatch sw(rep_->ioptions.env, statistics, READ_BLOCK_GET_MICROS); BlockFetcher block_fetcher( rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle, @@ -2259,6 +2362,9 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( GetMemoryAllocatorForCompressedBlock(rep_->table_options)); s = block_fetcher.ReadBlockContents(); raw_block_comp_type = block_fetcher.get_compression_type(); + contents = &raw_block_contents; + } else { + raw_block_comp_type = contents->get_compression_type(); } if (s.ok()) { @@ -2266,7 +2372,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( // If filling cache is allowed and a cache is configured, try to put the // block to the cache. s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed, - block_entry, &raw_block_contents, + block_entry, contents, raw_block_comp_type, uncompression_dict, seq_no, GetMemoryAllocator(rep_->table_options), block_type, get_context); @@ -2331,6 +2437,172 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( return s; } +// This function reads multiple data blocks from disk using Env::MultiRead() +// and optionally inserts them into the block cache. It uses the scratch +// buffer provided by the caller, which is contiguous. If scratch is a nullptr +// it allocates a separate buffer for each block. Typically, if the blocks +// need to be uncompressed and there is no compressed block cache, callers +// can allocate a temporary scratch buffer in order to minimize memory +// allocations. +// If options.fill_cache is true, it inserts the blocks into cache. If its +// false and scratch is non-null and the blocks are uncompressed, it copies +// the buffers to heap. In any case, the CachableEntry returned will +// own the data bytes. +// batch - A MultiGetRange with only those keys with unique data blocks not +// found in cache +// handles - A vector of block handles. Some of them me be NULL handles +// scratch - An optional contiguous buffer to read compressed blocks into +void BlockBasedTable::MaybeLoadBlocksToCache( + const ReadOptions& options, + const MultiGetRange* batch, + const autovector* handles, + autovector* statuses, + autovector< + CachableEntry, MultiGetContext::MAX_BATCH_SIZE>* results, + char* scratch, + const UncompressionDict& uncompression_dict) const { + + RandomAccessFileReader* file = rep_->file.get(); + const Footer& footer = rep_->footer; + const ImmutableCFOptions& ioptions = rep_->ioptions; + SequenceNumber global_seqno = rep_->get_global_seqno(BlockType::kData); + size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit; + MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options); + + if (file->use_direct_io() || ioptions.allow_mmap_reads) { + size_t idx_in_batch = 0; + for (auto mget_iter = batch->begin(); mget_iter != batch->end(); + ++mget_iter, ++idx_in_batch) { + BlockCacheLookupContext lookup_data_block_context( + TableReaderCaller::kUserMultiGet); + const BlockHandle& handle = (*handles)[idx_in_batch]; + if (handle.IsNull()) { + continue; + } + + (*statuses)[idx_in_batch] = RetrieveBlock(nullptr, options, handle, + uncompression_dict, &(*results)[idx_in_batch], BlockType::kData, + mget_iter->get_context, &lookup_data_block_context); + } + return; + } + + autovector read_reqs; + size_t buf_offset = 0; + size_t idx_in_batch = 0; + for (auto mget_iter = batch->begin(); mget_iter != batch->end(); + ++mget_iter, ++idx_in_batch) { + const BlockHandle& handle = (*handles)[idx_in_batch]; + if (handle.IsNull()) { + continue; + } + + ReadRequest req; + req.len = handle.size() + kBlockTrailerSize; + if (scratch == nullptr) { + req.scratch = new char[req.len]; + } else { + req.scratch = scratch + buf_offset; + buf_offset += req.len; + } + req.offset = handle.offset(); + req.status = Status::OK(); + read_reqs.emplace_back(req); + } + + file->MultiRead(&read_reqs[0], read_reqs.size()); + + size_t read_req_idx = 0; + idx_in_batch = 0; + for (auto mget_iter = batch->begin(); mget_iter != batch->end(); + ++mget_iter, ++idx_in_batch) { + const BlockHandle& handle = (*handles)[idx_in_batch]; + + if (handle.IsNull()) { + continue; + } + + ReadRequest& req = read_reqs[read_req_idx++]; + Status s = req.status; + if (s.ok()) { + if (req.result.size() != handle.size() + kBlockTrailerSize) { + s = Status::Corruption("truncated block read from " + + rep_->file->file_name() + " offset " + + ToString(handle.offset()) + ", expected " + + ToString(handle.size() + kBlockTrailerSize) + + " bytes, got " + ToString(req.result.size())); + } + } + + BlockContents raw_block_contents; + if (s.ok()) { + if (scratch == nullptr) { + // We allocated a buffer for this block. Give ownership of it to + // BlockContents so it can free the memory + assert(req.result.data() == req.scratch); + std::unique_ptr raw_block(req.scratch); + raw_block_contents = BlockContents(std::move(raw_block), + handle.size()); + } else { + // We used the scratch buffer, so no need to free anything + raw_block_contents = BlockContents(Slice(req.scratch, + handle.size())); + } +#ifndef NDEBUG + raw_block_contents.is_raw_block = true; +#endif + if (options.verify_checksums) { + PERF_TIMER_GUARD(block_checksum_time); + const char* data = req.result.data(); + uint32_t expected = DecodeFixed32(data + handle.size() + 1); + s = rocksdb::VerifyChecksum(footer.checksum(), req.result.data(), + handle.size() + 1, expected); + } + } + if (s.ok()) { + if (options.fill_cache) { + BlockCacheLookupContext lookup_data_block_context( + TableReaderCaller::kUserMultiGet); + CachableEntry* block_entry = &(*results)[idx_in_batch]; + // MaybeReadBlockAndLoadToCache will insert into the block caches if + // necessary. Since we're passing the raw block contents, it will + // avoid looking up the block cache + s = MaybeReadBlockAndLoadToCache(nullptr, options, handle, + uncompression_dict, block_entry, BlockType::kData, + mget_iter->get_context, &lookup_data_block_context, + &raw_block_contents); + } else { + CompressionType compression_type = + raw_block_contents.get_compression_type(); + BlockContents contents; + if (compression_type != kNoCompression) { + UncompressionContext context(compression_type); + UncompressionInfo info(context, uncompression_dict, compression_type); + s = UncompressBlockContents(info, req.result.data(), handle.size(), + &contents, footer.version(), rep_->ioptions, + memory_allocator); + } else { + if (scratch != nullptr) { + // If we used the scratch buffer, then the contents need to be + // copied to heap + Slice raw = Slice(req.result.data(), handle.size()); + contents = BlockContents(CopyBufferToHeap( + GetMemoryAllocator(rep_->table_options), raw), + handle.size()); + } else { + contents = std::move(raw_block_contents); + } + } + if (s.ok()) { + (*results)[idx_in_batch].SetOwnedValue(new Block(std::move(contents), + global_seqno, read_amp_bytes_per_bit, ioptions.statistics)); + } + } + } + (*statuses)[idx_in_batch] = s; + } +} + Status BlockBasedTable::RetrieveBlock( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, @@ -2347,7 +2619,8 @@ Status BlockBasedTable::RetrieveBlock( block_type != BlockType::kIndex)) { s = MaybeReadBlockAndLoadToCache(prefetch_buffer, ro, handle, uncompression_dict, block_entry, - block_type, get_context, lookup_context); + block_type, get_context, lookup_context, + /*contents=*/nullptr); if (!s.ok()) { return s; @@ -3248,8 +3521,101 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options, iiter_unique_ptr.reset(iiter); } - DataBlockIter biter; uint64_t offset = std::numeric_limits::max(); + autovector block_handles; + autovector, MultiGetContext::MAX_BATCH_SIZE> results; + autovector statuses; + static const size_t kMultiGetReadStackBufSize = 8192; + char stack_buf[kMultiGetReadStackBufSize]; + std::unique_ptr block_buf; + { + MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(), + sst_file_range.end()); + BlockCacheLookupContext lookup_compression_dict_context( + TableReaderCaller::kUserMultiGet); + auto uncompression_dict_storage = GetUncompressionDict(nullptr, no_io, + sst_file_range.begin()->get_context, + &lookup_compression_dict_context); + const UncompressionDict& uncompression_dict = + uncompression_dict_storage.GetValue() == nullptr + ? UncompressionDict::GetEmptyDict() + : *uncompression_dict_storage.GetValue(); + size_t total_len = 0; + ReadOptions ro = read_options; + ro.read_tier = kBlockCacheTier; + + for (auto miter = data_block_range.begin(); + miter != data_block_range.end(); ++miter) { + const Slice& key = miter->ikey; + iiter->Seek(miter->ikey); + + IndexValue v; + if (iiter->Valid()) { + v = iiter->value(); + } + if (!iiter->Valid() || + (!v.first_internal_key.empty() && !skip_filters && + UserComparatorWrapper(rep_->internal_comparator.user_comparator()) + .Compare(ExtractUserKey(key), + ExtractUserKey(v.first_internal_key)) < 0)) { + // The requested key falls between highest key in previous block and + // lowest key in current block. + *(miter->s) = iiter->status(); + data_block_range.SkipKey(miter); + sst_file_range.SkipKey(miter); + continue; + } + statuses.emplace_back(); + results.emplace_back(); + if (v.handle.offset() == offset) { + // We're going to reuse the block for this key later on. No need to + // look it up now. Place a null handle + block_handles.emplace_back(BlockHandle::NullBlockHandle()); + continue; + } + offset = v.handle.offset(); + BlockHandle handle = v.handle; + Status s = GetDataBlockFromCache(ro, handle, uncompression_dict, + &(results.back()), BlockType::kData, miter->get_context); + if (s.ok() && !results.back().IsEmpty()) { + // Found it in the cache. Add NULL handle to indicate there is + // nothing to read from disk + block_handles.emplace_back(BlockHandle::NullBlockHandle()); + } else { + block_handles.emplace_back(handle); + total_len += handle.size(); + } + } + + if (total_len) { + char* scratch = nullptr; + // If the blocks need to be uncompressed and we don't need the + // compressed blocks, then we can use a contiguous block of + // memory to read in all the blocks as it will be temporary + // storage + // 1. If blocks are compressed and compressed block cache is there, + // alloc heap bufs + // 2. If blocks are uncompressed, alloc heap bufs + // 3. If blocks are compressed and no compressed block cache, use + // stack buf + if (rep_->table_options.block_cache_compressed == nullptr && + rep_->blocks_maybe_compressed) { + if (total_len <= kMultiGetReadStackBufSize) { + scratch = stack_buf; + } else { + scratch = new char[total_len]; + block_buf.reset(scratch); + } + } + MaybeLoadBlocksToCache(read_options, + &data_block_range, &block_handles, &statuses, &results, + scratch, uncompression_dict); + } + } + + DataBlockIter first_biter; + DataBlockIter next_biter; + size_t idx_in_batch = 0; for (auto miter = sst_file_range.begin(); miter != sst_file_range.end(); ++miter) { Status s; @@ -3257,83 +3623,97 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options, const Slice& key = miter->ikey; bool matched = false; // if such user key matched a key in SST bool done = false; - for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) { - IndexValue v = iiter->value(); - if (!v.first_internal_key.empty() && !skip_filters && - UserComparatorWrapper(rep_->internal_comparator.user_comparator()) - .Compare(ExtractUserKey(key), - ExtractUserKey(v.first_internal_key)) < 0) { - // The requested key falls between highest key in previous block and - // lowest key in current block. - break; - } - + bool first_block = true; + do { + DataBlockIter* biter = nullptr; bool reusing_block = true; uint64_t referenced_data_size = 0; bool does_referenced_key_exist = false; BlockCacheLookupContext lookup_data_block_context( TableReaderCaller::kUserMultiGet); - if (iiter->value().handle.offset() != offset) { - offset = iiter->value().handle.offset(); - biter.Invalidate(Status::OK()); + if (first_block) { + if (!block_handles[idx_in_batch].IsNull() || + !results[idx_in_batch].IsEmpty()) { + first_biter.Invalidate(Status::OK()); + NewDataBlockIterator( + read_options, results[idx_in_batch], &first_biter, + statuses[idx_in_batch]); + reusing_block = false; + } + biter = &first_biter; + idx_in_batch++; + } else { + IndexValue v = iiter->value(); + if (!v.first_internal_key.empty() && !skip_filters && + UserComparatorWrapper(rep_->internal_comparator.user_comparator()) + .Compare(ExtractUserKey(key), + ExtractUserKey(v.first_internal_key)) < 0) { + // The requested key falls between highest key in previous block and + // lowest key in current block. + break; + } + + next_biter.Invalidate(Status::OK()); NewDataBlockIterator( - read_options, v.handle, &biter, BlockType::kData, get_context, - &lookup_data_block_context, Status(), nullptr); + read_options, iiter->value().handle, &next_biter, + BlockType::kData, get_context, &lookup_data_block_context, + Status(), nullptr); + biter = &next_biter; reusing_block = false; } if (read_options.read_tier == kBlockCacheTier && - biter.status().IsIncomplete()) { + biter->status().IsIncomplete()) { // couldn't get block from block_cache // Update Saver.state to Found because we are only looking for // whether we can guarantee the key is not there when "no_io" is set get_context->MarkKeyMayExist(); break; } - if (!biter.status().ok()) { - s = biter.status(); + if (!biter->status().ok()) { + s = biter->status(); break; } - bool may_exist = biter.SeekForGet(key); + bool may_exist = biter->SeekForGet(key); if (!may_exist) { // HashSeek cannot find the key this block and the the iter is not // the end of the block, i.e. cannot be in the following blocks // either. In this case, the seek_key cannot be found, so we break // from the top level for-loop. - done = true; - } else { - // Call the *saver function on each entry/block until it returns false - for (; biter.Valid(); biter.Next()) { - ParsedInternalKey parsed_key; - Cleanable dummy; - Cleanable* value_pinner = nullptr; - - if (!ParseInternalKey(biter.key(), &parsed_key)) { - s = Status::Corruption(Slice()); - } - if (biter.IsValuePinned()) { - if (reusing_block) { - Cache* block_cache = rep_->table_options.block_cache.get(); - assert(biter.cache_handle() != nullptr); - block_cache->Ref(biter.cache_handle()); - dummy.RegisterCleanup(&ReleaseCachedEntry, block_cache, - biter.cache_handle()); - value_pinner = &dummy; - } else { - value_pinner = &biter; - } - } + break; + } - if (!get_context->SaveValue(parsed_key, biter.value(), &matched, - value_pinner)) { - does_referenced_key_exist = true; - referenced_data_size = biter.key().size() + biter.value().size(); - done = true; - break; + // Call the *saver function on each entry/block until it returns false + for (; biter->Valid(); biter->Next()) { + ParsedInternalKey parsed_key; + Cleanable dummy; + Cleanable* value_pinner = nullptr; + + if (!ParseInternalKey(biter->key(), &parsed_key)) { + s = Status::Corruption(Slice()); + } + if (biter->IsValuePinned()) { + if (reusing_block) { + Cache* block_cache = rep_->table_options.block_cache.get(); + assert(biter->cache_handle() != nullptr); + block_cache->Ref(biter->cache_handle()); + dummy.RegisterCleanup(&ReleaseCachedEntry, block_cache, + biter->cache_handle()); + value_pinner = &dummy; + } else { + value_pinner = biter; } } - s = biter.status(); + + if (!get_context->SaveValue( + parsed_key, biter->value(), &matched, value_pinner)) { + does_referenced_key_exist = true; + referenced_data_size = biter->key().size() + biter->value().size(); + done = true; + break; + } + s = biter->status(); } // Write the block cache access. if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled()) { @@ -3354,11 +3734,18 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options, access_record, lookup_data_block_context.block_key, rep_->cf_name_for_tracing(), key); } + s = biter->status(); if (done) { // Avoid the extra Next which is expensive in two-level indexes break; } - } + if (first_block) { + iiter->Seek(key); + } + first_block = false; + iiter->Next(); + } while (iiter->Valid()); + if (matched && filter != nullptr && !filter->IsBlockBased()) { RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_FULL_TRUE_POSITIVE); PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1, diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 435671391..358bc8b8d 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -233,6 +233,12 @@ class BlockBasedTable : public TableReader { BlockCacheLookupContext* lookup_context, Status s, FilePrefetchBuffer* prefetch_buffer, bool for_compaction = false) const; + // input_iter: if it is not null, update this one and return it as Iterator + template + TBlockIter* NewDataBlockIterator(const ReadOptions& ro, + CachableEntry& block, + TBlockIter* input_iter, Status s) const; + class PartitionedIndexIteratorState; friend class PartitionIndexReader; @@ -276,7 +282,8 @@ class BlockBasedTable : public TableReader { FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, BlockType block_type, - GetContext* get_context, BlockCacheLookupContext* lookup_context) const; + GetContext* get_context, BlockCacheLookupContext* lookup_context, + BlockContents* contents) const; // Similar to the above, with one crucial difference: it will retrieve the // block from the file even if there are no caches configured (assuming the @@ -289,6 +296,20 @@ class BlockBasedTable : public TableReader { BlockCacheLookupContext* lookup_context, bool for_compaction = false) const; + Status GetDataBlockFromCache( + const ReadOptions& ro, const BlockHandle& handle, + const UncompressionDict& uncompression_dict, + CachableEntry* block_entry, BlockType block_type, + GetContext* get_context) const; + + void MaybeLoadBlocksToCache( + const ReadOptions& options, const MultiGetRange* batch, + const autovector* handles, + autovector* statuses, + autovector< + CachableEntry, MultiGetContext::MAX_BATCH_SIZE>* results, + char* scratch, const UncompressionDict& uncompression_dict) const; + // For the following two functions: // if `no_io == true`, we will not try to read filter/index from sst file // were they not present in cache yet. diff --git a/table/format.h b/table/format.h index 539ca8880..effc13add 100644 --- a/table/format.h +++ b/table/format.h @@ -26,7 +26,9 @@ #include "options/cf_options.h" #include "port/port.h" // noexcept #include "table/persistent_cache_options.h" +#include "util/crc32c.h" #include "util/file_reader_writer.h" +#include "util/xxhash.h" namespace rocksdb { diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index bf8850333..f49866d13 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -192,6 +192,49 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, return s; } +Status RandomAccessFileReader::MultiRead(ReadRequest* read_reqs, + size_t num_reqs) const { + Status s; + uint64_t elapsed = 0; + assert(!use_direct_io()); + assert(!for_compaction_); + { + StopWatch sw(env_, stats_, hist_type_, + (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, + true /*delay_enabled*/); + auto prev_perf_level = GetPerfLevel(); + IOSTATS_TIMER_GUARD(read_nanos); + +#ifndef ROCKSDB_LITE + FileOperationInfo::TimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + } +#endif // ROCKSDB_LITE + { + IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); + s = file_->MultiRead(read_reqs, num_reqs); + } + for (size_t i = 0; i < num_reqs; ++i) { +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileReadFinish(read_reqs[i].offset, + read_reqs[i].result.size(), start_ts, finish_ts, + read_reqs[i].status); + } +#endif // ROCKSDB_LITE + IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size()); + } + SetPerfLevel(prev_perf_level); + } + if (stats_ != nullptr && file_read_hist_ != nullptr) { + file_read_hist_->Add(elapsed); + } + + return s; +} + Status WritableFileWriter::Append(const Slice& data) { const char* src = data.data(); size_t left = data.size(); diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 01df1067e..0a7e5032d 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -161,6 +161,8 @@ class RandomAccessFileReader { Status Read(uint64_t offset, size_t n, Slice* result, char* scratch, bool for_compaction = false) const; + Status MultiRead(ReadRequest* reqs, size_t num_reqs) const; + Status Prefetch(uint64_t offset, size_t n) const { return file_->Prefetch(offset, n); }