diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index ef9fcf5d5..137817eef 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -6,7 +6,6 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -// #include #include "db/db_test_util.h" #include "port/stack_trace.h" #include "rocksdb/perf_context.h" @@ -1706,6 +1705,14 @@ class DBBasicTestWithParallelIO assert(Put(Key(i), values_[i]) == Status::OK()); } Flush(); + + for (int i = 0; i < 100; ++i) { + // block cannot gain space by compression + uncompressable_values_.emplace_back(RandomString(&rnd, 256) + '\0'); + std::string tmp_key = "a" + Key(i); + assert(Put(tmp_key, uncompressable_values_[i]) == Status::OK()); + } + Flush(); } bool CheckValue(int i, const std::string& value) { @@ -1715,6 +1722,13 @@ class DBBasicTestWithParallelIO return false; } + bool CheckUncompressableValue(int i, const std::string& value) { + if (uncompressable_values_[i].compare(value) == 0) { + return true; + } + return false; + } + int num_lookups() { return uncompressed_cache_->num_lookups(); } int num_found() { return uncompressed_cache_->num_found(); } int num_inserts() { return uncompressed_cache_->num_inserts(); } @@ -1864,6 +1878,7 @@ class DBBasicTestWithParallelIO std::shared_ptr uncompressed_cache_; bool compression_enabled_; std::vector values_; + std::vector uncompressable_values_; bool fill_cache_; }; @@ -1928,8 +1943,66 @@ TEST_P(DBBasicTestWithParallelIO, MultiGet) { ASSERT_OK(statuses[i]); ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString())); } - expected_reads += (read_from_cache ? 2 : 4); + if (compression_enabled() && !has_compressed_cache()) { + expected_reads += (read_from_cache ? 2 : 3); + } else { + expected_reads += (read_from_cache ? 2 : 4); + } + ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads); + + keys.resize(10); + statuses.resize(10); + std::vector key_uncmp{1, 2, 15, 16, 55, 81, 82, 83, 84, 85}; + for (size_t i = 0; i < key_uncmp.size(); ++i) { + key_data[i] = "a" + Key(key_uncmp[i]); + keys[i] = Slice(key_data[i]); + statuses[i] = Status::OK(); + values[i].Reset(); + } + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data(), true); + for (size_t i = 0; i < key_uncmp.size(); ++i) { + ASSERT_OK(statuses[i]); + ASSERT_TRUE(CheckUncompressableValue(key_uncmp[i], values[i].ToString())); + } + if (compression_enabled() && !has_compressed_cache()) { + expected_reads += (read_from_cache ? 3 : 3); + } else { + expected_reads += (read_from_cache ? 4 : 4); + } ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads); + + keys.resize(5); + statuses.resize(5); + std::vector key_tr{1, 2, 15, 16, 55}; + for (size_t i = 0; i < key_tr.size(); ++i) { + key_data[i] = "a" + Key(key_tr[i]); + keys[i] = Slice(key_data[i]); + statuses[i] = Status::OK(); + values[i].Reset(); + } + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data(), true); + for (size_t i = 0; i < key_tr.size(); ++i) { + ASSERT_OK(statuses[i]); + ASSERT_TRUE(CheckUncompressableValue(key_tr[i], values[i].ToString())); + } + if (compression_enabled() && !has_compressed_cache()) { + expected_reads += (read_from_cache ? 0 : 2); + ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads); + } else { + if (has_uncompressed_cache()) { + expected_reads += (read_from_cache ? 0 : 3); + ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads); + } else { + // A rare case, even we enable the block compression but some of data + // blocks are not compressed due to content. If user only enable the + // compressed cache, the uncompressed blocks will not tbe cached, and + // block reads will be triggered. The number of reads is related to + // the compression algorithm. + ASSERT_TRUE(env_->random_read_counter_.Read() >= expected_reads); + } + } } INSTANTIATE_TEST_CASE_P( diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index 164b1500b..d4e0de5a8 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -11,7 +11,6 @@ #include #include - #include #include #include diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index be7c8e139..1cd0e16cb 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -7,7 +7,6 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "table/block_based/block_based_table_reader.h" - #include #include #include @@ -2280,6 +2279,8 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( // false and scratch is non-null and the blocks are uncompressed, it copies // the buffers to heap. In any case, the CachableEntry returned will // own the data bytes. +// If compression is enabled and also there is no compressed block cache, +// the adjacent blocks are read out in one IO (combined read) // batch - A MultiGetRange with only those keys with unique data blocks not // found in cache // handles - A vector of block handles. Some of them me be NULL handles @@ -2320,6 +2321,11 @@ void BlockBasedTable::RetrieveMultipleBlocks( autovector read_reqs; size_t buf_offset = 0; size_t idx_in_batch = 0; + + uint64_t prev_offset = 0; + size_t prev_len = 0; + autovector req_idx_for_block; + autovector req_offset_for_block; for (auto mget_iter = batch->begin(); mget_iter != batch->end(); ++mget_iter, ++idx_in_batch) { const BlockHandle& handle = (*handles)[idx_in_batch]; @@ -2327,23 +2333,57 @@ void BlockBasedTable::RetrieveMultipleBlocks( continue; } + size_t prev_end = static_cast(prev_offset) + prev_len; + + // If current block is adjacent to the previous one, at the same time, + // compression is enabled and there is no compressed cache, we combine + // the two block read as one. + if (scratch != nullptr && prev_end == handle.offset()) { + req_offset_for_block.emplace_back(prev_len); + prev_len += block_size(handle); + } else { + // No compression or current block and previous one is not adjacent: + // Step 1, create a new request for previous blocks + if (prev_len != 0) { + FSReadRequest req; + req.offset = prev_offset; + req.len = prev_len; + if (scratch == nullptr) { + req.scratch = new char[req.len]; + } else { + req.scratch = scratch + buf_offset; + buf_offset += req.len; + } + req.status = IOStatus::OK(); + read_reqs.emplace_back(req); + } + + // Step 2, remeber the previous block info + prev_offset = handle.offset(); + prev_len = block_size(handle); + req_offset_for_block.emplace_back(0); + } + req_idx_for_block.emplace_back(read_reqs.size()); + } + // Handle the last block and process the pending last request + if (prev_len != 0) { FSReadRequest req; - req.len = block_size(handle); + req.offset = prev_offset; + req.len = prev_len; if (scratch == nullptr) { req.scratch = new char[req.len]; } else { req.scratch = scratch + buf_offset; buf_offset += req.len; } - req.offset = handle.offset(); req.status = IOStatus::OK(); read_reqs.emplace_back(req); } file->MultiRead(&read_reqs[0], read_reqs.size()); - size_t read_req_idx = 0; idx_in_batch = 0; + size_t valid_batch_idx = 0; for (auto mget_iter = batch->begin(); mget_iter != batch->end(); ++mget_iter, ++idx_in_batch) { const BlockHandle& handle = (*handles)[idx_in_batch]; @@ -2352,41 +2392,86 @@ void BlockBasedTable::RetrieveMultipleBlocks( continue; } - FSReadRequest& req = read_reqs[read_req_idx++]; + assert(valid_batch_idx < req_idx_for_block.size()); + assert(valid_batch_idx < req_offset_for_block.size()); + assert(req_idx_for_block[valid_batch_idx] < read_reqs.size()); + size_t& req_idx = req_idx_for_block[valid_batch_idx]; + size_t& req_offset = req_offset_for_block[valid_batch_idx]; + valid_batch_idx++; + FSReadRequest& req = read_reqs[req_idx]; Status s = req.status; if (s.ok()) { if (req.result.size() != req.len) { - s = Status::Corruption("truncated block read from " + - rep_->file->file_name() + " offset " + - ToString(handle.offset()) + ", expected " + - ToString(req.len) + - " bytes, got " + ToString(req.result.size())); + s = Status::Corruption( + "truncated block read from " + rep_->file->file_name() + + " offset " + ToString(handle.offset()) + ", expected " + + ToString(req.len) + " bytes, got " + ToString(req.result.size())); } } BlockContents raw_block_contents; + size_t cur_read_end = req_offset + block_size(handle); + if (cur_read_end > req.result.size()) { + s = Status::Corruption( + "truncated block read from " + rep_->file->file_name() + " offset " + + ToString(handle.offset()) + ", expected " + ToString(req.len) + + " bytes, got " + ToString(req.result.size())); + } + + bool blocks_share_read_buffer = (req.result.size() != block_size(handle)); if (s.ok()) { - if (scratch == nullptr) { + if (scratch == nullptr && !blocks_share_read_buffer) { // We allocated a buffer for this block. Give ownership of it to // BlockContents so it can free the memory assert(req.result.data() == req.scratch); - std::unique_ptr raw_block(req.scratch); + std::unique_ptr raw_block(req.scratch + req_offset); raw_block_contents = BlockContents(std::move(raw_block), handle.size()); } else { - // We used the scratch buffer, so no need to free anything - raw_block_contents = BlockContents(Slice(req.scratch, handle.size())); + // We used the scratch buffer which are shared by the blocks. + // raw_block_contents does not have the ownership. + raw_block_contents = + BlockContents(Slice(req.scratch + req_offset, handle.size())); } + #ifndef NDEBUG raw_block_contents.is_raw_block = true; #endif if (options.verify_checksums) { PERF_TIMER_GUARD(block_checksum_time); const char* data = req.result.data(); - uint32_t expected = DecodeFixed32(data + handle.size() + 1); - s = rocksdb::VerifyChecksum(footer.checksum(), req.result.data(), + uint32_t expected = + DecodeFixed32(data + req_offset + handle.size() + 1); + // Since the scratch might be shared. the offset of the data block in + // the buffer might not be 0. req.result.data() only point to the + // begin address of each read request, we need to add the offset + // in each read request. Checksum is stored in the block trailer, + // which is handle.size() + 1. + s = rocksdb::VerifyChecksum(footer.checksum(), + req.result.data() + req_offset, handle.size() + 1, expected); } } + + if (s.ok()) { + // It handles a rare case: compression is set and these is no compressed + // cache (enable combined read). In this case, the scratch != nullptr. + // At the same time, some blocks are actually not compressed, + // since its compression space saving is smaller than the threshold. In + // this case, if the block shares the scratch memory, we need to copy it + // to the heap such that it can be added to the regular block cache. + CompressionType compression_type = + raw_block_contents.get_compression_type(); + if (scratch != nullptr && compression_type == kNoCompression) { + Slice raw = Slice(req.scratch + req_offset, block_size(handle)); + raw_block_contents = BlockContents( + CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw), + handle.size()); +#ifndef NDEBUG + raw_block_contents.is_raw_block = true; +#endif + } + } + if (s.ok()) { if (options.fill_cache) { BlockCacheLookupContext lookup_data_block_context( @@ -2415,25 +2500,21 @@ void BlockBasedTable::RetrieveMultipleBlocks( if (compression_type != kNoCompression) { UncompressionContext context(compression_type); UncompressionInfo info(context, uncompression_dict, compression_type); - s = UncompressBlockContents(info, req.result.data(), handle.size(), - &contents, footer.version(), + s = UncompressBlockContents(info, req.result.data() + req_offset, + handle.size(), &contents, footer.version(), rep_->ioptions, memory_allocator); } else { - if (scratch != nullptr) { - // If we used the scratch buffer, then the contents need to be - // copied to heap - Slice raw = Slice(req.result.data(), handle.size()); - contents = BlockContents( - CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw), - handle.size()); - } else { - contents = std::move(raw_block_contents); - } + // There are two cases here: 1) caller uses the scratch buffer; 2) we + // use the requst buffer. If scratch buffer is used, we ensure that + // all raw blocks are copyed to the heap as single blocks. If scratch + // buffer is not used, we also have no combined read, so the raw + // block can be used directly. + contents = std::move(raw_block_contents); } if (s.ok()) { (*results)[idx_in_batch].SetOwnedValue( - new Block(std::move(contents), global_seqno, - read_amp_bytes_per_bit, ioptions.statistics)); + new Block(std::move(contents), global_seqno, read_amp_bytes_per_bit, + ioptions.statistics)); } } (*statuses)[idx_in_batch] = s;