diff --git a/db/db_properties_test.cc b/db/db_properties_test.cc index a5ed0aa39..5d209593c 100644 --- a/db/db_properties_test.cc +++ b/db/db_properties_test.cc @@ -1225,7 +1225,7 @@ class BlockCountingTablePropertiesCollector : public TablePropertiesCollector { return Status::OK(); } - void BlockAdd(uint64_t /* block_raw_bytes */, + void BlockAdd(uint64_t /* block_uncomp_bytes */, uint64_t block_compressed_bytes_fast, uint64_t block_compressed_bytes_slow) override { if (block_compressed_bytes_fast > 0 || block_compressed_bytes_slow > 0) { diff --git a/db/db_test2.cc b/db/db_test2.cc index 7583b37fd..a4aa41a2e 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -1796,15 +1796,14 @@ TEST_P(CompressionFailuresTest, CompressionFailures) { }); } else if (compression_failure_type_ == kTestDecompressionFail) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "UncompressBlockContentsForCompressionType:TamperWithReturnValue", - [](void* arg) { + "UncompressBlockData:TamperWithReturnValue", [](void* arg) { Status* ret = static_cast(arg); ASSERT_OK(*ret); *ret = Status::Corruption("kTestDecompressionFail"); }); } else if (compression_failure_type_ == kTestDecompressionCorruption) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "UncompressBlockContentsForCompressionType:" + "UncompressBlockData:" "TamperWithDecompressionOutput", [](void* arg) { BlockContents* contents = static_cast(arg); @@ -1872,7 +1871,7 @@ TEST_P(CompressionFailuresTest, CompressionFailures) { "Could not decompress: kTestDecompressionFail"); } else if (compression_failure_type_ == kTestDecompressionCorruption) { ASSERT_EQ(std::string(s.getState()), - "Decompressed block did not match raw block"); + "Decompressed block did not match pre-compression block"); } } diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index 5ab0aa7d8..665c89869 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -1472,7 +1472,7 @@ TEST_P(ExternalSSTFileBasicTest, IngestFileWithBadBlockChecksum) { SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->SetCallBack( - "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum", + "BlockBasedTableBuilder::WriteMaybeCompressedBlock:TamperWithChecksum", change_checksum); SyncPoint::GetInstance()->EnableProcessing(); int file_id = 0; diff --git a/db/table_properties_collector.cc b/db/table_properties_collector.cc index fdf48c927..591c1d04a 100644 --- a/db/table_properties_collector.cc +++ b/db/table_properties_collector.cc @@ -43,9 +43,9 @@ Status UserKeyTablePropertiesCollector::InternalAdd(const Slice& key, } void UserKeyTablePropertiesCollector::BlockAdd( - uint64_t block_raw_bytes, uint64_t block_compressed_bytes_fast, + uint64_t block_uncomp_bytes, uint64_t block_compressed_bytes_fast, uint64_t block_compressed_bytes_slow) { - return collector_->BlockAdd(block_raw_bytes, block_compressed_bytes_fast, + return collector_->BlockAdd(block_uncomp_bytes, block_compressed_bytes_fast, block_compressed_bytes_slow); } diff --git a/db/table_properties_collector.h b/db/table_properties_collector.h index 2a73f907d..9035ba793 100644 --- a/db/table_properties_collector.h +++ b/db/table_properties_collector.h @@ -29,7 +29,7 @@ class IntTblPropCollector { virtual Status InternalAdd(const Slice& key, const Slice& value, uint64_t file_size) = 0; - virtual void BlockAdd(uint64_t block_raw_bytes, + virtual void BlockAdd(uint64_t block_uncomp_bytes, uint64_t block_compressed_bytes_fast, uint64_t block_compressed_bytes_slow) = 0; @@ -69,7 +69,7 @@ class UserKeyTablePropertiesCollector : public IntTblPropCollector { virtual Status InternalAdd(const Slice& key, const Slice& value, uint64_t file_size) override; - virtual void BlockAdd(uint64_t block_raw_bytes, + virtual void BlockAdd(uint64_t block_uncomp_bytes, uint64_t block_compressed_bytes_fast, uint64_t block_compressed_bytes_slow) override; @@ -143,7 +143,7 @@ class TimestampTablePropertiesCollector : public IntTblPropCollector { return Status::OK(); } - void BlockAdd(uint64_t /* block_raw_bytes */, + void BlockAdd(uint64_t /* block_uncomp_bytes */, uint64_t /* block_compressed_bytes_fast */, uint64_t /* block_compressed_bytes_slow */) override { return; diff --git a/db/table_properties_collector_test.cc b/db/table_properties_collector_test.cc index 49b095937..b329fdef4 100644 --- a/db/table_properties_collector_test.cc +++ b/db/table_properties_collector_test.cc @@ -176,7 +176,7 @@ class RegularKeysStartWithAInternal : public IntTblPropCollector { return Status::OK(); } - void BlockAdd(uint64_t /* block_raw_bytes */, + void BlockAdd(uint64_t /* block_uncomp_bytes */, uint64_t /* block_compressed_bytes_fast */, uint64_t /* block_compressed_bytes_slow */) override { // Nothing to do. diff --git a/include/rocksdb/persistent_cache.h b/include/rocksdb/persistent_cache.h index 3bb4477f0..d60bf4233 100644 --- a/include/rocksdb/persistent_cache.h +++ b/include/rocksdb/persistent_cache.h @@ -44,9 +44,9 @@ class PersistentCache { virtual Status Lookup(const Slice& key, std::unique_ptr* data, size_t* size) = 0; - // Is cache storing uncompressed data ? - // - // True if the cache is configured to store uncompressed data else false + // True if the cache is configured to store serialized blocks, which are + // potentially compressed and include a trailer (when SST format calls for + // one). False if the cache stores uncompressed blocks (no trailer). virtual bool IsCompressed() = 0; // Return stats as map of {string, double} per-tier diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 9ed17cbb8..cbe87fa3a 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -111,7 +111,7 @@ class TablePropertiesCollector { } // Called after each new block is cut - virtual void BlockAdd(uint64_t /* block_raw_bytes */, + virtual void BlockAdd(uint64_t /* block_uncomp_bytes */, uint64_t /* block_compressed_bytes_fast */, uint64_t /* block_compressed_bytes_slow */) { // Nothing to do here. Callback registers can override. @@ -193,9 +193,9 @@ struct TableProperties { uint64_t index_value_is_delta_encoded = 0; // the size of filter block. uint64_t filter_size = 0; - // total raw key size + // total raw (uncompressed, undelineated) key size uint64_t raw_key_size = 0; - // total raw value size + // total raw (uncompressed, undelineated) value size uint64_t raw_value_size = 0; // the number of blocks in this table uint64_t num_data_blocks = 0; diff --git a/table/block_based/block.h b/table/block_based/block.h index 32eb12a05..5d73f72f6 100644 --- a/table/block_based/block.h +++ b/table/block_based/block.h @@ -137,18 +137,19 @@ class BlockReadAmpBitmap { uint32_t rnd_; }; -// This Block class is not for any old block: it is designed to hold only -// uncompressed blocks containing sorted key-value pairs. It is thus -// suitable for storing uncompressed data blocks, index blocks (including -// partitions), range deletion blocks, properties blocks, metaindex blocks, -// as well as the top level of the partitioned filter structure (which is -// actually an index of the filter partitions). It is NOT suitable for +// class Block is the uncompressed and "parsed" form for blocks containing +// key-value pairs. (See BlockContents comments for more on terminology.) +// This includes the in-memory representation of data blocks, index blocks +// (including partitions), range deletion blocks, properties blocks, metaindex +// blocks, as well as the top level of the partitioned filter structure (which +// is actually an index of the filter partitions). It is NOT suitable for // compressed blocks in general, filter blocks/partitions, or compression -// dictionaries (since the latter do not contain sorted key-value pairs). -// Use BlockContents directly for those. +// dictionaries. // // See https://github.com/facebook/rocksdb/wiki/Rocksdb-BlockBasedTable-Format // for details of the format and the various block types. +// +// TODO: Rename to ParsedKvBlock? class Block { public: // Initialize the block with the specified contents. diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index da81cb254..011a71ccc 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -104,15 +104,15 @@ FilterBlockBuilder* CreateFilterBlockBuilder( } } -bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { +bool GoodCompressionRatio(size_t compressed_size, size_t uncomp_size) { // Check to see if compressed less than 12.5% - return compressed_size < raw_size - (raw_size / 8u); + return compressed_size < uncomp_size - (uncomp_size / 8u); } } // namespace // format_version is the block format as defined in include/rocksdb/table.h -Slice CompressBlock(const Slice& raw, const CompressionInfo& info, +Slice CompressBlock(const Slice& uncompressed_data, const CompressionInfo& info, CompressionType* type, uint32_t format_version, bool do_sample, std::string* compressed_output, std::string* sampled_output_fast, @@ -139,7 +139,8 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& info, CompressionDict::GetEmptyDict(), c, info.SampleForCompression()); - CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version), + CompressData(uncompressed_data, info_tmp, + GetCompressFormatForVersion(format_version), sampled_output_fast); } @@ -152,29 +153,32 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& info, CompressionDict::GetEmptyDict(), c, info.SampleForCompression()); - CompressData(raw, info_tmp, GetCompressFormatForVersion(format_version), + CompressData(uncompressed_data, info_tmp, + GetCompressFormatForVersion(format_version), sampled_output_slow); } } if (info.type() == kNoCompression) { *type = kNoCompression; - return raw; + return uncompressed_data; } // Actually compress the data; if the compression method is not supported, // or the compression fails etc., just fall back to uncompressed - if (!CompressData(raw, info, GetCompressFormatForVersion(format_version), + if (!CompressData(uncompressed_data, info, + GetCompressFormatForVersion(format_version), compressed_output)) { *type = kNoCompression; - return raw; + return uncompressed_data; } // Check the compression ratio; if it's not good enough, just fall back to // uncompressed - if (!GoodCompressionRatio(compressed_output->size(), raw.size())) { + if (!GoodCompressionRatio(compressed_output->size(), + uncompressed_data.size())) { *type = kNoCompression; - return raw; + return uncompressed_data; } *type = info.type(); @@ -216,7 +220,7 @@ class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector return Status::OK(); } - virtual void BlockAdd(uint64_t /* block_raw_bytes */, + virtual void BlockAdd(uint64_t /* block_uncomp_bytes */, uint64_t /* block_compressed_bytes_fast */, uint64_t /* block_compressed_bytes_slow */) override { // Intentionally left blank. No interest in collecting stats for @@ -665,21 +669,21 @@ struct BlockBasedTableBuilder::ParallelCompressionRep { class FileSizeEstimator { public: explicit FileSizeEstimator() - : raw_bytes_compressed(0), - raw_bytes_curr_block(0), - raw_bytes_curr_block_set(false), - raw_bytes_inflight(0), + : uncomp_bytes_compressed(0), + uncomp_bytes_curr_block(0), + uncomp_bytes_curr_block_set(false), + uncomp_bytes_inflight(0), blocks_inflight(0), curr_compression_ratio(0), estimated_file_size(0) {} // Estimate file size when a block is about to be emitted to // compression thread - void EmitBlock(uint64_t raw_block_size, uint64_t curr_file_size) { - uint64_t new_raw_bytes_inflight = - raw_bytes_inflight.fetch_add(raw_block_size, - std::memory_order_relaxed) + - raw_block_size; + void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) { + uint64_t new_uncomp_bytes_inflight = + uncomp_bytes_inflight.fetch_add(uncomp_block_size, + std::memory_order_relaxed) + + uncomp_block_size; uint64_t new_blocks_inflight = blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; @@ -687,7 +691,7 @@ struct BlockBasedTableBuilder::ParallelCompressionRep { estimated_file_size.store( curr_file_size + static_cast( - static_cast(new_raw_bytes_inflight) * + static_cast(new_uncomp_bytes_inflight) * curr_compression_ratio.load(std::memory_order_relaxed)) + new_blocks_inflight * kBlockTrailerSize, std::memory_order_relaxed); @@ -696,24 +700,24 @@ struct BlockBasedTableBuilder::ParallelCompressionRep { // Estimate file size when a block is already reaped from // compression thread void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) { - assert(raw_bytes_curr_block_set); + assert(uncomp_bytes_curr_block_set); - uint64_t new_raw_bytes_compressed = - raw_bytes_compressed + raw_bytes_curr_block; - assert(new_raw_bytes_compressed > 0); + uint64_t new_uncomp_bytes_compressed = + uncomp_bytes_compressed + uncomp_bytes_curr_block; + assert(new_uncomp_bytes_compressed > 0); curr_compression_ratio.store( (curr_compression_ratio.load(std::memory_order_relaxed) * - raw_bytes_compressed + + uncomp_bytes_compressed + compressed_block_size) / - static_cast(new_raw_bytes_compressed), + static_cast(new_uncomp_bytes_compressed), std::memory_order_relaxed); - raw_bytes_compressed = new_raw_bytes_compressed; + uncomp_bytes_compressed = new_uncomp_bytes_compressed; - uint64_t new_raw_bytes_inflight = - raw_bytes_inflight.fetch_sub(raw_bytes_curr_block, - std::memory_order_relaxed) - - raw_bytes_curr_block; + uint64_t new_uncomp_bytes_inflight = + uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block, + std::memory_order_relaxed) - + uncomp_bytes_curr_block; uint64_t new_blocks_inflight = blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1; @@ -721,12 +725,12 @@ struct BlockBasedTableBuilder::ParallelCompressionRep { estimated_file_size.store( curr_file_size + static_cast( - static_cast(new_raw_bytes_inflight) * + static_cast(new_uncomp_bytes_inflight) * curr_compression_ratio.load(std::memory_order_relaxed)) + new_blocks_inflight * kBlockTrailerSize, std::memory_order_relaxed); - raw_bytes_curr_block_set = false; + uncomp_bytes_curr_block_set = false; } void SetEstimatedFileSize(uint64_t size) { @@ -737,24 +741,24 @@ struct BlockBasedTableBuilder::ParallelCompressionRep { return estimated_file_size.load(std::memory_order_relaxed); } - void SetCurrBlockRawSize(uint64_t size) { - raw_bytes_curr_block = size; - raw_bytes_curr_block_set = true; + void SetCurrBlockUncompSize(uint64_t size) { + uncomp_bytes_curr_block = size; + uncomp_bytes_curr_block_set = true; } private: - // Raw bytes compressed so far. - uint64_t raw_bytes_compressed; + // Input bytes compressed so far. + uint64_t uncomp_bytes_compressed; // Size of current block being appended. - uint64_t raw_bytes_curr_block; - // Whether raw_bytes_curr_block has been set for next + uint64_t uncomp_bytes_curr_block; + // Whether uncomp_bytes_curr_block has been set for next // ReapBlock call. - bool raw_bytes_curr_block_set; - // Raw bytes under compression and not appended yet. - std::atomic raw_bytes_inflight; + bool uncomp_bytes_curr_block_set; + // Input bytes under compression and not appended yet. + std::atomic uncomp_bytes_inflight; // Number of blocks under compression and not appended yet. std::atomic blocks_inflight; - // Current compression ratio, maintained by BGWorkWriteRawBlock. + // Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock. std::atomic curr_compression_ratio; // Estimated SST file size. std::atomic estimated_file_size; @@ -1040,19 +1044,19 @@ void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle, BlockType block_type) { block->Finish(); - std::string raw_block_contents; - raw_block_contents.reserve(rep_->table_options.block_size); - block->SwapAndReset(raw_block_contents); + std::string uncompressed_block_data; + uncompressed_block_data.reserve(rep_->table_options.block_size); + block->SwapAndReset(uncompressed_block_data); if (rep_->state == Rep::State::kBuffered) { assert(block_type == BlockType::kData); - rep_->data_block_buffers.emplace_back(std::move(raw_block_contents)); + rep_->data_block_buffers.emplace_back(std::move(uncompressed_block_data)); rep_->data_begin_offset += rep_->data_block_buffers.back().size(); return; } - WriteBlock(raw_block_contents, handle, block_type); + WriteBlock(uncompressed_block_data, handle, block_type); } -void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, +void BlockBasedTableBuilder::WriteBlock(const Slice& uncompressed_block_data, BlockHandle* handle, BlockType block_type) { Rep* r = rep_; @@ -1061,7 +1065,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, CompressionType type; Status compress_status; bool is_data_block = block_type == BlockType::kData; - CompressAndVerifyBlock(raw_block_contents, is_data_block, + CompressAndVerifyBlock(uncompressed_block_data, is_data_block, *(r->compression_ctxs[0]), r->verify_ctxs[0].get(), &(r->compressed_output), &(block_contents), &type, &compress_status); @@ -1070,7 +1074,8 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, return; } - WriteRawBlock(block_contents, type, handle, block_type, &raw_block_contents); + WriteMaybeCompressedBlock(block_contents, type, handle, block_type, + &uncompressed_block_data); r->compressed_output.clear(); if (is_data_block) { r->props.data_size = r->get_offset(); @@ -1094,7 +1099,7 @@ void BlockBasedTableBuilder::BGWorkCompression( } void BlockBasedTableBuilder::CompressAndVerifyBlock( - const Slice& raw_block_contents, bool is_data_block, + const Slice& uncompressed_block_data, bool is_data_block, const CompressionContext& compression_ctx, UncompressionContext* verify_ctx, std::string* compressed_output, Slice* block_contents, CompressionType* type, Status* out_status) { @@ -1116,9 +1121,9 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( r->ioptions.clock, ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats)); - if (is_status_ok && raw_block_contents.size() < kCompressionSizeLimit) { + if (is_status_ok && uncompressed_block_data.size() < kCompressionSizeLimit) { if (is_data_block) { - r->compressible_input_data_bytes.fetch_add(raw_block_contents.size(), + r->compressible_input_data_bytes.fetch_add(uncompressed_block_data.size(), std::memory_order_relaxed); } const CompressionDict* compression_dict; @@ -1135,14 +1140,14 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( std::string sampled_output_fast; std::string sampled_output_slow; *block_contents = CompressBlock( - raw_block_contents, compression_info, type, + uncompressed_block_data, compression_info, type, r->table_options.format_version, is_data_block /* do_sample */, compressed_output, &sampled_output_fast, &sampled_output_slow); if (sampled_output_slow.size() > 0 || sampled_output_fast.size() > 0) { // Currently compression sampling is only enabled for data block. assert(is_data_block); - r->sampled_input_data_bytes.fetch_add(raw_block_contents.size(), + r->sampled_input_data_bytes.fetch_add(uncompressed_block_data.size(), std::memory_order_relaxed); r->sampled_output_slow_data_bytes.fetch_add(sampled_output_slow.size(), std::memory_order_relaxed); @@ -1151,7 +1156,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( } // notify collectors on block add NotifyCollectTableCollectorsOnBlockAdd( - r->table_properties_collectors, raw_block_contents.size(), + r->table_properties_collectors, uncompressed_block_data.size(), sampled_output_fast.size(), sampled_output_slow.size()); // Some of the compression algorithms are known to be unreliable. If @@ -1169,19 +1174,20 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( BlockContents contents; UncompressionInfo uncompression_info(*verify_ctx, *verify_dict, r->compression_type); - Status stat = UncompressBlockContentsForCompressionType( + Status stat = UncompressBlockData( uncompression_info, block_contents->data(), block_contents->size(), &contents, r->table_options.format_version, r->ioptions); if (stat.ok()) { - bool compressed_ok = contents.data.compare(raw_block_contents) == 0; + bool compressed_ok = + contents.data.compare(uncompressed_block_data) == 0; if (!compressed_ok) { // The result of the compression was invalid. abort. abort_compression = true; - ROCKS_LOG_ERROR(r->ioptions.logger, - "Decompressed block did not match raw block"); - *out_status = - Status::Corruption("Decompressed block did not match raw block"); + const char* const msg = + "Decompressed block did not match pre-compression block"; + ROCKS_LOG_ERROR(r->ioptions.logger, "%s", msg); + *out_status = Status::Corruption(msg); } } else { // Decompression reported an error. abort. @@ -1193,8 +1199,8 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( } else { // Block is too big to be compressed. if (is_data_block) { - r->uncompressible_input_data_bytes.fetch_add(raw_block_contents.size(), - std::memory_order_relaxed); + r->uncompressible_input_data_bytes.fetch_add( + uncompressed_block_data.size(), std::memory_order_relaxed); } abort_compression = true; } @@ -1208,27 +1214,26 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( if (abort_compression) { RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED); *type = kNoCompression; - *block_contents = raw_block_contents; + *block_contents = uncompressed_block_data; } else if (*type != kNoCompression) { if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats)) { RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS, timer.ElapsedNanos()); } RecordInHistogram(r->ioptions.stats, BYTES_COMPRESSED, - raw_block_contents.size()); + uncompressed_block_data.size()); RecordTick(r->ioptions.stats, NUMBER_BLOCK_COMPRESSED); } else if (*type != r->compression_type) { RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED); } } -void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, - CompressionType type, - BlockHandle* handle, - BlockType block_type, - const Slice* raw_block_contents) { +void BlockBasedTableBuilder::WriteMaybeCompressedBlock( + const Slice& block_contents, CompressionType type, BlockHandle* handle, + BlockType block_type, const Slice* uncompressed_block_data) { Rep* r = rep_; bool is_data_block = block_type == BlockType::kData; + // Old, misleading name of this function: WriteRawBlock StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS); handle->set_offset(r->get_offset()); handle->set_size(block_contents.size()); @@ -1259,7 +1264,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, EncodeFixed32(trailer.data() + 1, checksum); TEST_SYNC_POINT_CALLBACK( - "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum", + "BlockBasedTableBuilder::WriteMaybeCompressedBlock:TamperWithChecksum", trailer.data()); { IOStatus io_s = r->file->Append(Slice(trailer.data(), trailer.size())); @@ -1287,8 +1292,9 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, if (warm_cache) { if (type == kNoCompression) { s = InsertBlockInCacheHelper(block_contents, handle, block_type); - } else if (raw_block_contents != nullptr) { - s = InsertBlockInCacheHelper(*raw_block_contents, handle, block_type); + } else if (uncompressed_block_data != nullptr) { + s = InsertBlockInCacheHelper(*uncompressed_block_data, handle, + block_type); } if (!s.ok()) { r->SetStatus(s); @@ -1327,7 +1333,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, } } -void BlockBasedTableBuilder::BGWorkWriteRawBlock() { +void BlockBasedTableBuilder::BGWorkWriteMaybeCompressedBlock() { Rep* r = rep_; ParallelCompressionRep::BlockRepSlot* slot = nullptr; ParallelCompressionRep::BlockRep* block_rep = nullptr; @@ -1354,9 +1360,11 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() { r->index_builder->OnKeyAdded(key); } - r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size()); - WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type, - &r->pending_handle, BlockType::kData, &block_rep->contents); + r->pc_rep->file_size_estimator.SetCurrBlockUncompSize( + block_rep->data->size()); + WriteMaybeCompressedBlock(block_rep->compressed_contents, + block_rep->compression_type, &r->pending_handle, + BlockType::kData, &block_rep->contents); if (!ok()) { break; } @@ -1391,7 +1399,7 @@ void BlockBasedTableBuilder::StartParallelCompression() { }); } rep_->pc_rep->write_thread.reset( - new port::Thread([this] { BGWorkWriteRawBlock(); })); + new port::Thread([this] { BGWorkWriteMaybeCompressedBlock(); })); } void BlockBasedTableBuilder::StopParallelCompression() { @@ -1438,7 +1446,7 @@ Status BlockBasedTableBuilder::InsertBlockInCompressedCache( BlockContents* block_contents_to_cache = new BlockContents(std::move(ubuf), size); #ifndef NDEBUG - block_contents_to_cache->is_raw_block = true; + block_contents_to_cache->has_trailer = true; #endif // NDEBUG CacheKey key = BlockBasedTable::GetCacheKey(rep_->base_cache_key, *handle); @@ -1567,8 +1575,8 @@ void BlockBasedTableBuilder::WriteFilterBlock( BlockType btype = is_partitioned_filter && /* last */ s.ok() ? BlockType::kFilterPartitionIndex : BlockType::kFilter; - WriteRawBlock(filter_content, kNoCompression, &filter_block_handle, btype, - nullptr /*raw_contents*/); + WriteMaybeCompressedBlock(filter_content, kNoCompression, + &filter_block_handle, btype); } rep_->filter_builder->ResetFilterBitsBuilder(); } @@ -1613,8 +1621,9 @@ void BlockBasedTableBuilder::WriteIndexBlock( WriteBlock(index_blocks.index_block_contents, index_block_handle, BlockType::kIndex); } else { - WriteRawBlock(index_blocks.index_block_contents, kNoCompression, - index_block_handle, BlockType::kIndex); + WriteMaybeCompressedBlock(index_blocks.index_block_contents, + kNoCompression, index_block_handle, + BlockType::kIndex); } } // If there are more index partitions, finish them and write them out @@ -1638,8 +1647,9 @@ void BlockBasedTableBuilder::WriteIndexBlock( WriteBlock(index_blocks.index_block_contents, index_block_handle, BlockType::kIndex); } else { - WriteRawBlock(index_blocks.index_block_contents, kNoCompression, - index_block_handle, BlockType::kIndex); + WriteMaybeCompressedBlock(index_blocks.index_block_contents, + kNoCompression, index_block_handle, + BlockType::kIndex); } // The last index_block_handle will be for the partition index block } @@ -1727,8 +1737,8 @@ void BlockBasedTableBuilder::WritePropertiesBlock( Slice block_data = property_block_builder.Finish(); TEST_SYNC_POINT_CALLBACK( "BlockBasedTableBuilder::WritePropertiesBlock:BlockData", &block_data); - WriteRawBlock(block_data, kNoCompression, &properties_block_handle, - BlockType::kProperties); + WriteMaybeCompressedBlock(block_data, kNoCompression, + &properties_block_handle, BlockType::kProperties); } if (ok()) { #ifndef NDEBUG @@ -1758,9 +1768,9 @@ void BlockBasedTableBuilder::WriteCompressionDictBlock( rep_->compression_dict->GetRawDict().size()) { BlockHandle compression_dict_block_handle; if (ok()) { - WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression, - &compression_dict_block_handle, - BlockType::kCompressionDictionary); + WriteMaybeCompressedBlock(rep_->compression_dict->GetRawDict(), + kNoCompression, &compression_dict_block_handle, + BlockType::kCompressionDictionary); #ifndef NDEBUG Slice compression_dict = rep_->compression_dict->GetRawDict(); TEST_SYNC_POINT_CALLBACK( @@ -1779,8 +1789,9 @@ void BlockBasedTableBuilder::WriteRangeDelBlock( MetaIndexBuilder* meta_index_builder) { if (ok() && !rep_->range_del_block.empty()) { BlockHandle range_del_block_handle; - WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression, - &range_del_block_handle, BlockType::kRangeDeletion); + WriteMaybeCompressedBlock(rep_->range_del_block.Finish(), kNoCompression, + &range_del_block_handle, + BlockType::kRangeDeletion); meta_index_builder->Add(kRangeDelBlockName, range_del_block_handle); } } @@ -2001,8 +2012,8 @@ Status BlockBasedTableBuilder::Finish() { WritePropertiesBlock(&meta_index_builder); if (ok()) { // flush the meta index block - WriteRawBlock(meta_index_builder.Finish(), kNoCompression, - &metaindex_block_handle, BlockType::kMetaIndex); + WriteMaybeCompressedBlock(meta_index_builder.Finish(), kNoCompression, + &metaindex_block_handle, BlockType::kMetaIndex); } if (ok()) { WriteFooter(metaindex_block_handle, index_block_handle); @@ -2036,7 +2047,7 @@ uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; } uint64_t BlockBasedTableBuilder::EstimatedFileSize() const { if (rep_->IsParallelCompressionEnabled()) { - // Use compression ratio so far and inflight raw bytes to estimate + // Use compression ratio so far and inflight uncompressed bytes to estimate // final SST size. return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize(); } else { diff --git a/table/block_based/block_based_table_builder.h b/table/block_based/block_based_table_builder.h index 83d47af17..ecc13d0f7 100644 --- a/table/block_based/block_based_table_builder.h +++ b/table/block_based/block_based_table_builder.h @@ -122,8 +122,9 @@ class BlockBasedTableBuilder : public TableBuilder { void WriteBlock(const Slice& block_contents, BlockHandle* handle, BlockType block_type); // Directly write data to the file. - void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle, - BlockType block_type, const Slice* raw_data = nullptr); + void WriteMaybeCompressedBlock(const Slice& data, CompressionType, + BlockHandle* handle, BlockType block_type, + const Slice* raw_data = nullptr); void SetupCacheKeyPrefix(const TableBuilderOptions& tbo); @@ -161,8 +162,9 @@ class BlockBasedTableBuilder : public TableBuilder { // REQUIRES: Finish(), Abandon() have not been called void Flush(); - // Some compression libraries fail when the raw size is bigger than int. If - // uncompressed size is bigger than kCompressionSizeLimit, don't compress it + // Some compression libraries fail when the uncompressed size is bigger than + // int. If uncompressed size is bigger than kCompressionSizeLimit, don't + // compress it const uint64_t kCompressionSizeLimit = std::numeric_limits::max(); // Get blocks from mem-table walking thread, compress them and @@ -170,9 +172,9 @@ class BlockBasedTableBuilder : public TableBuilder { void BGWorkCompression(const CompressionContext& compression_ctx, UncompressionContext* verify_ctx); - // Given raw block content, try to compress it and return result and + // Given uncompressed block content, try to compress it and return result and // compression type - void CompressAndVerifyBlock(const Slice& raw_block_contents, + void CompressAndVerifyBlock(const Slice& uncompressed_block_data, bool is_data_block, const CompressionContext& compression_ctx, UncompressionContext* verify_ctx, @@ -182,17 +184,17 @@ class BlockBasedTableBuilder : public TableBuilder { Status* out_status); // Get compressed blocks from BGWorkCompression and write them into SST - void BGWorkWriteRawBlock(); + void BGWorkWriteMaybeCompressedBlock(); // Initialize parallel compression context and - // start BGWorkCompression and BGWorkWriteRawBlock threads + // start BGWorkCompression and BGWorkWriteMaybeCompressedBlock threads void StartParallelCompression(); - // Stop BGWorkCompression and BGWorkWriteRawBlock threads + // Stop BGWorkCompression and BGWorkWriteMaybeCompressedBlock threads void StopParallelCompression(); }; -Slice CompressBlock(const Slice& raw, const CompressionInfo& info, +Slice CompressBlock(const Slice& uncompressed_data, const CompressionInfo& info, CompressionType* type, uint32_t format_version, bool do_sample, std::string* compressed_output, std::string* sampled_output_fast, diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 8e3910a98..e2fc18150 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -412,7 +412,7 @@ template Status BlockBasedTable::InsertEntryToCache( const CacheTier& cache_tier, Cache* block_cache, const Slice& key, const Cache::CacheItemHelper* cache_helper, - std::unique_ptr& block_holder, size_t charge, + std::unique_ptr&& block_holder, size_t charge, Cache::Handle** cache_handle, Cache::Priority priority) const { Status s = Status::OK(); if (cache_tier == CacheTier::kNonVolatileBlockTier) { @@ -422,6 +422,11 @@ Status BlockBasedTable::InsertEntryToCache( s = block_cache->Insert(key, block_holder.get(), charge, cache_helper->del_cb, cache_handle, priority); } + if (s.ok()) { + // Cache took ownership + block_holder.release(); + } + s.MustCheck(); return s; } @@ -1271,15 +1276,16 @@ Status BlockBasedTable::ReadMetaIndexBlock( template Status BlockBasedTable::GetDataBlockFromCache( const Slice& cache_key, Cache* block_cache, Cache* block_cache_compressed, - const ReadOptions& read_options, CachableEntry* block, + const ReadOptions& read_options, + CachableEntry* out_parsed_block, const UncompressionDict& uncompression_dict, BlockType block_type, const bool wait, GetContext* get_context) const { const size_t read_amp_bytes_per_bit = block_type == BlockType::kData ? rep_->table_options.read_amp_bytes_per_bit : 0; - assert(block); - assert(block->IsEmpty()); + assert(out_parsed_block); + assert(out_parsed_block->IsEmpty()); // Here we treat the legacy name "...index_and_filter_blocks..." to mean all // metadata blocks that might go into block cache, EXCEPT only those needed // for the read path (Get, etc.). TableProperties should not be needed on the @@ -1312,7 +1318,7 @@ Status BlockBasedTable::GetDataBlockFromCache( BlocklikeTraits::GetCacheItemHelper(block_type), create_cb, priority); if (cache_handle != nullptr) { - block->SetCachedValue( + out_parsed_block->SetCachedValue( reinterpret_cast(block_cache->Value(cache_handle)), block_cache, cache_handle); return s; @@ -1320,7 +1326,7 @@ Status BlockBasedTable::GetDataBlockFromCache( } // If not found, search from the compressed block cache. - assert(block->IsEmpty()); + assert(out_parsed_block->IsEmpty()); if (block_cache_compressed == nullptr) { return s; @@ -1358,32 +1364,33 @@ Status BlockBasedTable::GetDataBlockFromCache( // Retrieve the uncompressed contents into a new buffer UncompressionContext context(compression_type); UncompressionInfo info(context, uncompression_dict, compression_type); - s = UncompressBlockContents( + s = UncompressSerializedBlock( info, compressed_block->data.data(), compressed_block->data.size(), &contents, rep_->table_options.format_version, rep_->ioptions, GetMemoryAllocator(rep_->table_options)); - // Insert uncompressed block into block cache, the priority is based on the + // Insert parsed block into block cache, the priority is based on the // data block type. if (s.ok()) { std::unique_ptr block_holder( BlocklikeTraits::Create( std::move(contents), read_amp_bytes_per_bit, statistics, rep_->blocks_definitely_zstd_compressed, - rep_->table_options.filter_policy.get())); // uncompressed block + rep_->table_options.filter_policy.get())); if (block_cache != nullptr && block_holder->own_bytes() && read_options.fill_cache) { size_t charge = block_holder->ApproximateMemoryUsage(); Cache::Handle* cache_handle = nullptr; + auto block_holder_raw_ptr = block_holder.get(); s = InsertEntryToCache( rep_->ioptions.lowest_used_cache_tier, block_cache, cache_key, BlocklikeTraits::GetCacheItemHelper(block_type), - block_holder, charge, &cache_handle, priority); + std::move(block_holder), charge, &cache_handle, priority); if (s.ok()) { assert(cache_handle != nullptr); - block->SetCachedValue(block_holder.release(), block_cache, - cache_handle); + out_parsed_block->SetCachedValue(block_holder_raw_ptr, block_cache, + cache_handle); UpdateCacheInsertionMetrics(block_type, get_context, charge, s.IsOkOverwritten(), rep_->ioptions.stats); @@ -1391,7 +1398,7 @@ Status BlockBasedTable::GetDataBlockFromCache( RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); } } else { - block->SetOwnedValue(block_holder.release()); + out_parsed_block->SetOwnedValue(std::move(block_holder)); } } @@ -1403,8 +1410,8 @@ Status BlockBasedTable::GetDataBlockFromCache( template Status BlockBasedTable::PutDataBlockToCache( const Slice& cache_key, Cache* block_cache, Cache* block_cache_compressed, - CachableEntry* cached_block, BlockContents* raw_block_contents, - CompressionType raw_block_comp_type, + CachableEntry* out_parsed_block, BlockContents&& block_contents, + CompressionType block_comp_type, const UncompressionDict& uncompression_dict, MemoryAllocator* memory_allocator, BlockType block_type, GetContext* get_context) const { @@ -1419,22 +1426,22 @@ Status BlockBasedTable::PutDataBlockToCache( block_type != BlockType::kData ? Cache::Priority::HIGH : Cache::Priority::LOW; - assert(cached_block); - assert(cached_block->IsEmpty()); + assert(out_parsed_block); + assert(out_parsed_block->IsEmpty()); Status s; Statistics* statistics = ioptions.stats; std::unique_ptr block_holder; - if (raw_block_comp_type != kNoCompression) { + if (block_comp_type != kNoCompression) { // Retrieve the uncompressed contents into a new buffer BlockContents uncompressed_block_contents; - UncompressionContext context(raw_block_comp_type); - UncompressionInfo info(context, uncompression_dict, raw_block_comp_type); - s = UncompressBlockContents(info, raw_block_contents->data.data(), - raw_block_contents->data.size(), - &uncompressed_block_contents, format_version, - ioptions, memory_allocator); + UncompressionContext context(block_comp_type); + UncompressionInfo info(context, uncompression_dict, block_comp_type); + s = UncompressBlockData(info, block_contents.data.data(), + block_contents.data.size(), + &uncompressed_block_contents, format_version, + ioptions, memory_allocator); if (!s.ok()) { return s; } @@ -1445,53 +1452,51 @@ Status BlockBasedTable::PutDataBlockToCache( rep_->table_options.filter_policy.get())); } else { block_holder.reset(BlocklikeTraits::Create( - std::move(*raw_block_contents), read_amp_bytes_per_bit, statistics, + std::move(block_contents), read_amp_bytes_per_bit, statistics, rep_->blocks_definitely_zstd_compressed, rep_->table_options.filter_policy.get())); } // Insert compressed block into compressed block cache. // Release the hold on the compressed cache entry immediately. - if (block_cache_compressed != nullptr && - raw_block_comp_type != kNoCompression && raw_block_contents != nullptr && - raw_block_contents->own_bytes()) { - assert(raw_block_contents->is_raw_block); + if (block_cache_compressed != nullptr && block_comp_type != kNoCompression && + block_contents.own_bytes()) { + assert(block_contents.has_trailer); assert(!cache_key.empty()); - // We cannot directly put raw_block_contents because this could point to + // We cannot directly put block_contents because this could point to // an object in the stack. - std::unique_ptr block_cont_for_comp_cache( - new BlockContents(std::move(*raw_block_contents))); + auto block_cont_for_comp_cache = + std::make_unique(std::move(block_contents)); + size_t charge = block_cont_for_comp_cache->ApproximateMemoryUsage(); s = InsertEntryToCache( rep_->ioptions.lowest_used_cache_tier, block_cache_compressed, cache_key, BlocklikeTraits::GetCacheItemHelper(block_type), - block_cont_for_comp_cache, - block_cont_for_comp_cache->ApproximateMemoryUsage(), nullptr, + std::move(block_cont_for_comp_cache), charge, nullptr, Cache::Priority::LOW); - BlockContents* block_cont_raw_ptr = block_cont_for_comp_cache.release(); if (s.ok()) { // Avoid the following code to delete this cached block. RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD); } else { RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD_FAILURES); - delete block_cont_raw_ptr; } } // insert into uncompressed block cache if (block_cache != nullptr && block_holder->own_bytes()) { size_t charge = block_holder->ApproximateMemoryUsage(); + auto block_holder_raw_ptr = block_holder.get(); Cache::Handle* cache_handle = nullptr; s = InsertEntryToCache( rep_->ioptions.lowest_used_cache_tier, block_cache, cache_key, BlocklikeTraits::GetCacheItemHelper(block_type), - block_holder, charge, &cache_handle, priority); + std::move(block_holder), charge, &cache_handle, priority); if (s.ok()) { assert(cache_handle != nullptr); - cached_block->SetCachedValue(block_holder.release(), block_cache, - cache_handle); + out_parsed_block->SetCachedValue(block_holder_raw_ptr, block_cache, + cache_handle); UpdateCacheInsertionMetrics(block_type, get_context, charge, s.IsOkOverwritten(), rep_->ioptions.stats); @@ -1499,7 +1504,7 @@ Status BlockBasedTable::PutDataBlockToCache( RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); } } else { - cached_block->SetOwnedValue(block_holder.release()); + out_parsed_block->SetOwnedValue(std::move(block_holder)); } return s; @@ -1580,10 +1585,10 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, const bool wait, const bool for_compaction, - CachableEntry* block_entry, BlockType block_type, + CachableEntry* out_parsed_block, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, BlockContents* contents, bool async_read) const { - assert(block_entry != nullptr); + assert(out_parsed_block != nullptr); const bool no_io = (ro.read_tier == kBlockCacheTier); Cache* block_cache = rep_->table_options.block_cache.get(); Cache* block_cache_compressed = @@ -1603,11 +1608,11 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( if (!contents) { s = GetDataBlockFromCache(key, block_cache, block_cache_compressed, ro, - block_entry, uncompression_dict, block_type, - wait, get_context); + out_parsed_block, uncompression_dict, + block_type, wait, get_context); // Value could still be null at this point, so check the cache handle // and update the read pattern for prefetching - if (block_entry->GetValue() || block_entry->GetCacheHandle()) { + if (out_parsed_block->GetValue() || out_parsed_block->GetCacheHandle()) { // TODO(haoyu): Differentiate cache hit on uncompressed block cache and // compressed block cache. is_cache_hit = true; @@ -1624,25 +1629,26 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( // Can't find the block from the cache. If I/O is allowed, read from the // file. - if (block_entry->GetValue() == nullptr && - block_entry->GetCacheHandle() == nullptr && !no_io && ro.fill_cache) { + if (out_parsed_block->GetValue() == nullptr && + out_parsed_block->GetCacheHandle() == nullptr && !no_io && + ro.fill_cache) { Statistics* statistics = rep_->ioptions.stats; const bool maybe_compressed = block_type != BlockType::kFilter && block_type != BlockType::kCompressionDictionary && rep_->blocks_maybe_compressed; const bool do_uncompress = maybe_compressed && !block_cache_compressed; - CompressionType raw_block_comp_type; - BlockContents raw_block_contents; + CompressionType contents_comp_type; + // Maybe serialized or uncompressed + BlockContents tmp_contents; if (!contents) { Histograms histogram = for_compaction ? READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; StopWatch sw(rep_->ioptions.clock, statistics, histogram); BlockFetcher block_fetcher( rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle, - &raw_block_contents, rep_->ioptions, do_uncompress, - maybe_compressed, block_type, uncompression_dict, - rep_->persistent_cache_options, + &tmp_contents, rep_->ioptions, do_uncompress, maybe_compressed, + block_type, uncompression_dict, rep_->persistent_cache_options, GetMemoryAllocator(rep_->table_options), GetMemoryAllocatorForCompressedBlock(rep_->table_options)); @@ -1657,8 +1663,8 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( s = block_fetcher.ReadBlockContents(); } - raw_block_comp_type = block_fetcher.get_compression_type(); - contents = &raw_block_contents; + contents_comp_type = block_fetcher.get_compression_type(); + contents = &tmp_contents; if (get_context) { switch (block_type) { case BlockType::kIndex: @@ -1673,15 +1679,15 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( } } } else { - raw_block_comp_type = GetBlockCompressionType(*contents); + contents_comp_type = GetBlockCompressionType(*contents); } if (s.ok()) { // If filling cache is allowed and a cache is configured, try to put the // block to the cache. s = PutDataBlockToCache( - key, block_cache, block_cache_compressed, block_entry, contents, - raw_block_comp_type, uncompression_dict, + key, block_cache, block_cache_compressed, out_parsed_block, + std::move(*contents), contents_comp_type, uncompression_dict, GetMemoryAllocator(rep_->table_options), block_type, get_context); } } @@ -1692,12 +1698,12 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( lookup_context) { size_t usage = 0; uint64_t nkeys = 0; - if (block_entry->GetValue()) { + if (out_parsed_block->GetValue()) { // Approximate the number of keys in the block using restarts. - nkeys = - rep_->table_options.block_restart_interval * - BlocklikeTraits::GetNumRestarts(*block_entry->GetValue()); - usage = block_entry->GetValue()->ApproximateMemoryUsage(); + nkeys = rep_->table_options.block_restart_interval * + BlocklikeTraits::GetNumRestarts( + *out_parsed_block->GetValue()); + usage = out_parsed_block->GetValue()->ApproximateMemoryUsage(); } TraceType trace_block_type = TraceType::kTraceMax; switch (block_type) { @@ -1752,7 +1758,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( } } - assert(s.ok() || block_entry->GetValue() == nullptr); + assert(s.ok() || out_parsed_block->GetValue() == nullptr); return s; } @@ -1760,32 +1766,33 @@ template Status BlockBasedTable::RetrieveBlock( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, - CachableEntry* block_entry, BlockType block_type, + CachableEntry* out_parsed_block, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, bool for_compaction, bool use_cache, bool wait_for_cache, bool async_read) const { - assert(block_entry); - assert(block_entry->IsEmpty()); + assert(out_parsed_block); + assert(out_parsed_block->IsEmpty()); Status s; if (use_cache) { - s = MaybeReadBlockAndLoadToCache( - prefetch_buffer, ro, handle, uncompression_dict, wait_for_cache, - for_compaction, block_entry, block_type, get_context, lookup_context, - /*contents=*/nullptr, async_read); + s = MaybeReadBlockAndLoadToCache(prefetch_buffer, ro, handle, + uncompression_dict, wait_for_cache, + for_compaction, out_parsed_block, + block_type, get_context, lookup_context, + /*contents=*/nullptr, async_read); if (!s.ok()) { return s; } - if (block_entry->GetValue() != nullptr || - block_entry->GetCacheHandle() != nullptr) { + if (out_parsed_block->GetValue() != nullptr || + out_parsed_block->GetCacheHandle() != nullptr) { assert(s.ok()); return s; } } - assert(block_entry->IsEmpty()); + assert(out_parsed_block->IsEmpty()); const bool no_io = ro.read_tier == kBlockCacheTier; if (no_io) { @@ -1833,7 +1840,7 @@ Status BlockBasedTable::RetrieveBlock( return s; } - block_entry->SetOwnedValue(block.release()); + out_parsed_block->SetOwnedValue(std::move(block)); assert(s.ok()); return s; @@ -1844,7 +1851,7 @@ Status BlockBasedTable::RetrieveBlock( template Status BlockBasedTable::RetrieveBlock( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, - CachableEntry* block_entry, BlockType block_type, + CachableEntry* out_parsed_block, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, bool for_compaction, bool use_cache, bool wait_for_cache, bool async_read) const; @@ -1852,15 +1859,15 @@ template Status BlockBasedTable::RetrieveBlock( template Status BlockBasedTable::RetrieveBlock( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, - CachableEntry* block_entry, BlockType block_type, - GetContext* get_context, BlockCacheLookupContext* lookup_context, - bool for_compaction, bool use_cache, bool wait_for_cache, - bool async_read) const; + CachableEntry* out_parsed_block, + BlockType block_type, GetContext* get_context, + BlockCacheLookupContext* lookup_context, bool for_compaction, + bool use_cache, bool wait_for_cache, bool async_read) const; template Status BlockBasedTable::RetrieveBlock( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, - CachableEntry* block_entry, BlockType block_type, + CachableEntry* out_parsed_block, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, bool for_compaction, bool use_cache, bool wait_for_cache, bool async_read) const; @@ -1868,7 +1875,7 @@ template Status BlockBasedTable::RetrieveBlock( template Status BlockBasedTable::RetrieveBlock( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, - CachableEntry* block_entry, BlockType block_type, + CachableEntry* out_parsed_block, BlockType block_type, GetContext* get_context, BlockCacheLookupContext* lookup_context, bool for_compaction, bool use_cache, bool wait_for_cache, bool async_read) const; diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 8667ab43d..7054a2dd4 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -249,16 +249,16 @@ class BlockBasedTable : public TableReader { return static_cast(handle.size() + kBlockTrailerSize); } - // It's the caller's responsibility to make sure that this is - // for raw block contents, which contains the compression - // byte in the end. + // It is the caller's responsibility to make sure that this is called with + // block-based table serialized block contents, which contains the compression + // byte in the trailer after `block_size`. static inline CompressionType GetBlockCompressionType(const char* block_data, size_t block_size) { return static_cast(block_data[block_size]); } static inline CompressionType GetBlockCompressionType( const BlockContents& contents) { - assert(contents.is_raw_block); + assert(contents.has_trailer); return GetBlockCompressionType(contents.data.data(), contents.data.size()); } @@ -327,7 +327,7 @@ class BlockBasedTable : public TableReader { Status InsertEntryToCache(const CacheTier& cache_tier, Cache* block_cache, const Slice& key, const Cache::CacheItemHelper* cache_helper, - std::unique_ptr& block_holder, + std::unique_ptr&& block_holder, size_t charge, Cache::Handle** cache_handle, Cache::Priority priority) const; @@ -411,13 +411,13 @@ class BlockBasedTable : public TableReader { BlockType block_type, const bool wait, GetContext* get_context) const; - // Put a raw block (maybe compressed) to the corresponding block caches. - // This method will perform decompression against raw_block if needed and then - // populate the block caches. + // Put a maybe compressed block to the corresponding block caches. + // This method will perform decompression against block_contents if needed + // and then populate the block caches. // On success, Status::OK will be returned; also @block will be populated with // uncompressed block and its cache handle. // - // Allocated memory managed by raw_block_contents will be transferred to + // Allocated memory managed by block_contents will be transferred to // PutDataBlockToCache(). After the call, the object will be invalid. // @param uncompression_dict Data for presetting the compression library's // dictionary. @@ -425,8 +425,8 @@ class BlockBasedTable : public TableReader { Status PutDataBlockToCache(const Slice& cache_key, Cache* block_cache, Cache* block_cache_compressed, CachableEntry* cached_block, - BlockContents* raw_block_contents, - CompressionType raw_block_comp_type, + BlockContents&& block_contents, + CompressionType block_comp_type, const UncompressionDict& uncompression_dict, MemoryAllocator* memory_allocator, BlockType block_type, diff --git a/table/block_based/block_based_table_reader_sync_and_async.h b/table/block_based/block_based_table_reader_sync_and_async.h index 4a035aeae..8c7547a2a 100644 --- a/table/block_based/block_based_table_reader_sync_and_async.h +++ b/table/block_based/block_based_table_reader_sync_and_async.h @@ -191,7 +191,7 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) } } - BlockContents raw_block_contents; + BlockContents serialized_block; if (s.ok()) { if (!use_shared_buffer) { // We allocated a buffer for this block. Give ownership of it to @@ -199,17 +199,17 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) assert(req.result.data() == req.scratch); assert(req.result.size() == BlockSizeWithTrailer(handle)); assert(req_offset == 0); - std::unique_ptr raw_block(req.scratch); - raw_block_contents = BlockContents(std::move(raw_block), handle.size()); + serialized_block = + BlockContents(std::unique_ptr(req.scratch), handle.size()); } else { // We used the scratch buffer or direct io buffer // which are shared by the blocks. - // raw_block_contents does not have the ownership. - raw_block_contents = + // serialized_block does not have the ownership. + serialized_block = BlockContents(Slice(req.result.data() + req_offset, handle.size())); } #ifndef NDEBUG - raw_block_contents.is_raw_block = true; + serialized_block.has_trailer = true; #endif if (options.verify_checksums) { @@ -232,28 +232,29 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) if (s.ok()) { // When the blocks share the same underlying buffer (scratch or direct io - // buffer), we may need to manually copy the block into heap if the raw - // block has to be inserted into a cache. That falls into th following - // cases - - // 1. Raw block is not compressed, it needs to be inserted into the - // uncompressed block cache if there is one - // 2. If the raw block is compressed, it needs to be inserted into the - // compressed block cache if there is one + // buffer), we may need to manually copy the block into heap if the + // serialized block has to be inserted into a cache. That falls into the + // following cases - + // 1. serialized block is not compressed, it needs to be inserted into + // the uncompressed block cache if there is one + // 2. If the serialized block is compressed, it needs to be inserted + // into the compressed block cache if there is one // - // In all other cases, the raw block is either uncompressed into a heap - // buffer or there is no cache at all. + // In all other cases, the serialized block is either uncompressed into a + // heap buffer or there is no cache at all. CompressionType compression_type = - GetBlockCompressionType(raw_block_contents); + GetBlockCompressionType(serialized_block); if (use_shared_buffer && (compression_type == kNoCompression || (compression_type != kNoCompression && rep_->table_options.block_cache_compressed))) { - Slice raw = + Slice serialized = Slice(req.result.data() + req_offset, BlockSizeWithTrailer(handle)); - raw_block_contents = BlockContents( - CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw), + serialized_block = BlockContents( + CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), + serialized), handle.size()); #ifndef NDEBUG - raw_block_contents.is_raw_block = true; + serialized_block.has_trailer = true; #endif } } @@ -264,13 +265,13 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) TableReaderCaller::kUserMultiGet); CachableEntry* block_entry = &(*results)[idx_in_batch]; // MaybeReadBlockAndLoadToCache will insert into the block caches if - // necessary. Since we're passing the raw block contents, it will - // avoid looking up the block cache + // necessary. Since we're passing the serialized block contents, it + // will avoid looking up the block cache s = MaybeReadBlockAndLoadToCache( nullptr, options, handle, uncompression_dict, /*wait=*/true, /*for_compaction=*/false, block_entry, BlockType::kData, mget_iter->get_context, &lookup_data_block_context, - &raw_block_contents, /*async_read=*/false); + &serialized_block, /*async_read=*/false); // block_entry value could be null if no block cache is present, i.e // BlockBasedTableOptions::no_block_cache is true and no compressed @@ -283,12 +284,12 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) } CompressionType compression_type = - GetBlockCompressionType(raw_block_contents); + GetBlockCompressionType(serialized_block); BlockContents contents; if (compression_type != kNoCompression) { UncompressionContext context(compression_type); UncompressionInfo info(context, uncompression_dict, compression_type); - s = UncompressBlockContents( + s = UncompressSerializedBlock( info, req.result.data() + req_offset, handle.size(), &contents, footer.format_version(), rep_->ioptions, memory_allocator); } else { @@ -296,13 +297,13 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) // 1) caller uses the shared buffer (scratch or direct io buffer); // 2) we use the requst buffer. // If scratch buffer or direct io buffer is used, we ensure that - // all raw blocks are copyed to the heap as single blocks. If scratch - // buffer is not used, we also have no combined read, so the raw - // block can be used directly. - contents = std::move(raw_block_contents); + // all serialized blocks are copyed to the heap as single blocks. If + // scratch buffer is not used, we also have no combined read, so the + // serialized block can be used directly. + contents = std::move(serialized_block); } if (s.ok()) { - (*results)[idx_in_batch].SetOwnedValue(new Block( + (*results)[idx_in_batch].SetOwnedValue(std::make_unique( std::move(contents), read_amp_bytes_per_bit, ioptions.stats)); } } diff --git a/table/block_based/cachable_entry.h b/table/block_based/cachable_entry.h index eb51e98da..d21385f03 100644 --- a/table/block_based/cachable_entry.h +++ b/table/block_based/cachable_entry.h @@ -132,17 +132,17 @@ public: ResetFields(); } - void SetOwnedValue(T* value) { - assert(value != nullptr); + void SetOwnedValue(std::unique_ptr&& value) { + assert(value.get() != nullptr); - if (UNLIKELY(value_ == value && own_value_)) { + if (UNLIKELY(value_ == value.get() && own_value_)) { assert(cache_ == nullptr && cache_handle_ == nullptr); return; } Reset(); - value_ = value; + value_ = value.release(); own_value_ = true; } diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 9f12a4c45..ac0f00570 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -49,7 +49,7 @@ inline void BlockFetcher::ProcessTrailerIfPresent() { inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() { if (cache_options_.persistent_cache && !cache_options_.persistent_cache->IsCompressed()) { - Status status = PersistentCacheHelper::LookupUncompressedPage( + Status status = PersistentCacheHelper::LookupUncompressed( cache_options_, handle_, contents_); if (status.ok()) { // uncompressed page is found for the block handle @@ -99,15 +99,14 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() { return got_from_prefetch_buffer_; } -inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() { +inline bool BlockFetcher::TryGetSerializedBlockFromPersistentCache() { if (cache_options_.persistent_cache && cache_options_.persistent_cache->IsCompressed()) { - // lookup uncompressed cache mode p-cache - std::unique_ptr raw_data; - io_status_ = status_to_io_status(PersistentCacheHelper::LookupRawPage( - cache_options_, handle_, &raw_data, block_size_with_trailer_)); + std::unique_ptr buf; + io_status_ = status_to_io_status(PersistentCacheHelper::LookupSerialized( + cache_options_, handle_, &buf, block_size_with_trailer_)); if (io_status_.ok()) { - heap_buf_ = CacheAllocationPtr(raw_data.release()); + heap_buf_ = CacheAllocationPtr(buf.release()); used_buf_ = heap_buf_.get(); slice_ = Slice(heap_buf_.get(), block_size_); ProcessTrailerIfPresent(); @@ -162,9 +161,8 @@ inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() { if (io_status_.ok() && read_options_.fill_cache && cache_options_.persistent_cache && cache_options_.persistent_cache->IsCompressed()) { - // insert to raw cache - PersistentCacheHelper::InsertRawPage(cache_options_, handle_, used_buf_, - block_size_with_trailer_); + PersistentCacheHelper::InsertSerialized(cache_options_, handle_, used_buf_, + block_size_with_trailer_); } } @@ -173,8 +171,8 @@ inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() { read_options_.fill_cache && cache_options_.persistent_cache && !cache_options_.persistent_cache->IsCompressed()) { // insert to uncompressed cache - PersistentCacheHelper::InsertUncompressedPage(cache_options_, handle_, - *contents_); + PersistentCacheHelper::InsertUncompressed(cache_options_, handle_, + *contents_); } } @@ -234,7 +232,7 @@ inline void BlockFetcher::GetBlockContents() { *contents_ = BlockContents(std::move(heap_buf_), block_size_); } #ifndef NDEBUG - contents_->is_raw_block = true; + contents_->has_trailer = footer_.GetBlockTrailerSize() > 0; #endif } @@ -242,7 +240,7 @@ IOStatus BlockFetcher::ReadBlockContents() { if (TryGetUncompressBlockFromPersistentCache()) { compression_type_ = kNoCompression; #ifndef NDEBUG - contents_->is_raw_block = true; + contents_->has_trailer = footer_.GetBlockTrailerSize() > 0; #endif // NDEBUG return IOStatus::OK(); } @@ -250,7 +248,7 @@ IOStatus BlockFetcher::ReadBlockContents() { if (!io_status_.ok()) { return io_status_; } - } else if (!TryGetCompressedBlockFromPersistentCache()) { + } else if (!TryGetSerializedBlockFromPersistentCache()) { IOOptions opts; io_status_ = file_->PrepareIOOptions(read_options_, opts); // Actual file read @@ -327,7 +325,7 @@ IOStatus BlockFetcher::ReadBlockContents() { // compressed page, uncompress, update cache UncompressionContext context(compression_type_); UncompressionInfo info(context, uncompression_dict_, compression_type_); - io_status_ = status_to_io_status(UncompressBlockContents( + io_status_ = status_to_io_status(UncompressSerializedBlock( info, slice_.data(), block_size_, contents_, footer_.format_version(), ioptions_, memory_allocator_)); #ifndef NDEBUG @@ -347,10 +345,10 @@ IOStatus BlockFetcher::ReadAsyncBlockContents() { if (TryGetUncompressBlockFromPersistentCache()) { compression_type_ = kNoCompression; #ifndef NDEBUG - contents_->is_raw_block = true; + contents_->has_trailer = footer_.GetBlockTrailerSize() > 0; #endif // NDEBUG return IOStatus::OK(); - } else if (!TryGetCompressedBlockFromPersistentCache()) { + } else if (!TryGetSerializedBlockFromPersistentCache()) { assert(prefetch_buffer_ != nullptr); if (!for_compaction_) { IOOptions opts; @@ -378,7 +376,7 @@ IOStatus BlockFetcher::ReadAsyncBlockContents() { UncompressionContext context(compression_type_); UncompressionInfo info(context, uncompression_dict_, compression_type_); - io_status_ = status_to_io_status(UncompressBlockContents( + io_status_ = status_to_io_status(UncompressSerializedBlock( info, slice_.data(), block_size_, contents_, footer_.format_version(), ioptions_, memory_allocator_)); #ifndef NDEBUG diff --git a/table/block_fetcher.h b/table/block_fetcher.h index 2111186c4..4871e81e8 100644 --- a/table/block_fetcher.h +++ b/table/block_fetcher.h @@ -128,7 +128,7 @@ class BlockFetcher { bool TryGetUncompressBlockFromPersistentCache(); // return true if found bool TryGetFromPrefetchBuffer(); - bool TryGetCompressedBlockFromPersistentCache(); + bool TryGetSerializedBlockFromPersistentCache(); void PrepareBufferForBlockFromFile(); // Copy content from used_buf_ to new heap_buf_. void CopyBufferToHeapBuf(); diff --git a/table/format.cc b/table/format.cc index 23f5b9b5e..efde5e169 100644 --- a/table/format.cc +++ b/table/format.cc @@ -498,10 +498,11 @@ uint32_t ComputeBuiltinChecksumWithLastByte(ChecksumType type, const char* data, } } -Status UncompressBlockContentsForCompressionType( - const UncompressionInfo& uncompression_info, const char* data, size_t n, - BlockContents* contents, uint32_t format_version, - const ImmutableOptions& ioptions, MemoryAllocator* allocator) { +Status UncompressBlockData(const UncompressionInfo& uncompression_info, + const char* data, size_t size, + BlockContents* out_contents, uint32_t format_version, + const ImmutableOptions& ioptions, + MemoryAllocator* allocator) { Status ret = Status::OK(); assert(uncompression_info.type() != kNoCompression && @@ -511,7 +512,7 @@ Status UncompressBlockContentsForCompressionType( ShouldReportDetailedTime(ioptions.env, ioptions.stats)); size_t uncompressed_size = 0; CacheAllocationPtr ubuf = - UncompressData(uncompression_info, data, n, &uncompressed_size, + UncompressData(uncompression_info, data, size, &uncompressed_size, GetCompressFormatForVersion(format_version), allocator); if (!ubuf) { if (!CompressionTypeSupported(uncompression_info.type())) { @@ -525,44 +526,36 @@ Status UncompressBlockContentsForCompressionType( } } - *contents = BlockContents(std::move(ubuf), uncompressed_size); + *out_contents = BlockContents(std::move(ubuf), uncompressed_size); if (ShouldReportDetailedTime(ioptions.env, ioptions.stats)) { RecordTimeToHistogram(ioptions.stats, DECOMPRESSION_TIMES_NANOS, timer.ElapsedNanos()); } RecordTimeToHistogram(ioptions.stats, BYTES_DECOMPRESSED, - contents->data.size()); + out_contents->data.size()); RecordTick(ioptions.stats, NUMBER_BLOCK_DECOMPRESSED); + TEST_SYNC_POINT_CALLBACK("UncompressBlockData:TamperWithReturnValue", + static_cast(&ret)); TEST_SYNC_POINT_CALLBACK( - "UncompressBlockContentsForCompressionType:TamperWithReturnValue", - static_cast(&ret)); - TEST_SYNC_POINT_CALLBACK( - "UncompressBlockContentsForCompressionType:" + "UncompressBlockData:" "TamperWithDecompressionOutput", - static_cast(contents)); + static_cast(out_contents)); return ret; } -// -// The 'data' points to the raw block contents that was read in from file. -// This method allocates a new heap buffer and the raw block -// contents are uncompresed into this buffer. This -// buffer is returned via 'result' and it is upto the caller to -// free this buffer. -// format_version is the block format as defined in include/rocksdb/table.h -Status UncompressBlockContents(const UncompressionInfo& uncompression_info, - const char* data, size_t n, - BlockContents* contents, uint32_t format_version, - const ImmutableOptions& ioptions, - MemoryAllocator* allocator) { - assert(data[n] != kNoCompression); - assert(data[n] == static_cast(uncompression_info.type())); - return UncompressBlockContentsForCompressionType(uncompression_info, data, n, - contents, format_version, - ioptions, allocator); +Status UncompressSerializedBlock(const UncompressionInfo& uncompression_info, + const char* data, size_t size, + BlockContents* out_contents, + uint32_t format_version, + const ImmutableOptions& ioptions, + MemoryAllocator* allocator) { + assert(data[size] != kNoCompression); + assert(data[size] == static_cast(uncompression_info.type())); + return UncompressBlockData(uncompression_info, data, size, out_contents, + format_version, ioptions, allocator); } // Replace the contents of db_host_id with the actual hostname, if db_host_id diff --git a/table/format.h b/table/format.h index de11648e4..ffb9fb0ca 100644 --- a/table/format.h +++ b/table/format.h @@ -29,7 +29,7 @@ namespace ROCKSDB_NAMESPACE { class RandomAccessFile; struct ReadOptions; -extern bool ShouldReportDetailedTime(Env* env, Statistics* stats); +bool ShouldReportDetailedTime(Env* env, Statistics* stats); // the length of the magic number in bytes. constexpr uint32_t kMagicNumberLengthByte = 8; @@ -256,7 +256,26 @@ uint32_t ComputeBuiltinChecksumWithLastByte(ChecksumType type, const char* data, // Represents the contents of a block read from an SST file. Depending on how // it's created, it may or may not own the actual block bytes. As an example, // BlockContents objects representing data read from mmapped files only point -// into the mmapped region. +// into the mmapped region. Depending on context, it might be a serialized +// (potentially compressed) block, including a trailer beyond `size`, or an +// uncompressed block. +// +// Please try to use this terminology when dealing with blocks: +// * "Serialized block" - bytes that go into storage. For block-based table +// (usually the case) this includes the block trailer. Here the `size` does +// not include the trailer, but other places in code might include the trailer +// in the size. +// * "Maybe compressed block" - like a serialized block, but without the +// trailer (or no promise of including a trailer). Must be accompanied by a +// CompressionType in some other variable or field. +// * "Uncompressed block" - "payload" bytes that are either stored with no +// compression, used as input to compression function, or result of +// decompression function. +// * "Parsed block" - an in-memory form of a block in block cache, as it is +// used by the table reader. Different C++ types are used depending on the +// block type (see block_like_traits.h). Only trivially parsable block types +// use BlockContents as the parsed form. +// struct BlockContents { // Points to block payload (without trailer) Slice data; @@ -265,7 +284,7 @@ struct BlockContents { #ifndef NDEBUG // Whether there is a known trailer after what is pointed to by `data`. // See BlockBasedTable::GetCompressionType. - bool is_raw_block = false; + bool has_trailer = false; #endif // NDEBUG BlockContents() {} @@ -313,36 +332,35 @@ struct BlockContents { data = std::move(other.data); allocation = std::move(other.allocation); #ifndef NDEBUG - is_raw_block = other.is_raw_block; + has_trailer = other.has_trailer; #endif // NDEBUG return *this; } }; -// The 'data' points to the raw block contents read in from file. -// This method allocates a new heap buffer and the raw block -// contents are uncompresed into this buffer. This buffer is -// returned via 'result' and it is upto the caller to -// free this buffer. -// For description of compress_format_version and possible values, see -// util/compression.h -extern Status UncompressBlockContents(const UncompressionInfo& info, - const char* data, size_t n, - BlockContents* contents, - uint32_t compress_format_version, - const ImmutableOptions& ioptions, - MemoryAllocator* allocator = nullptr); - -// This is an extension to UncompressBlockContents that accepts -// a specific compression type. This is used by un-wrapped blocks -// with no compression header. -extern Status UncompressBlockContentsForCompressionType( - const UncompressionInfo& info, const char* data, size_t n, - BlockContents* contents, uint32_t compress_format_version, - const ImmutableOptions& ioptions, MemoryAllocator* allocator = nullptr); +// The `data` points to serialized block contents read in from file, which +// must be compressed and include a trailer beyond `size`. A new buffer is +// allocated with the given allocator (or default) and the uncompressed +// contents are returned in `out_contents`. +// format_version is as defined in include/rocksdb/table.h, which is +// used to determine compression format version. +Status UncompressSerializedBlock(const UncompressionInfo& info, + const char* data, size_t size, + BlockContents* out_contents, + uint32_t format_version, + const ImmutableOptions& ioptions, + MemoryAllocator* allocator = nullptr); + +// This is a variant of UncompressSerializedBlock that does not expect a +// block trailer beyond `size`. (CompressionType is taken from `info`.) +Status UncompressBlockData(const UncompressionInfo& info, const char* data, + size_t size, BlockContents* out_contents, + uint32_t format_version, + const ImmutableOptions& ioptions, + MemoryAllocator* allocator = nullptr); // Replace db_host_id contents with the real hostname if necessary -extern Status ReifyDbHostIdProperty(Env* env, std::string* db_host_id); +Status ReifyDbHostIdProperty(Env* env, std::string* db_host_id); // Implementation details follow. Clients should ignore, diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 49ccf1bf2..78402482b 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -196,10 +196,11 @@ bool NotifyCollectTableCollectorsOnAdd( void NotifyCollectTableCollectorsOnBlockAdd( const std::vector>& collectors, - const uint64_t block_raw_bytes, const uint64_t block_compressed_bytes_fast, + const uint64_t block_uncomp_bytes, + const uint64_t block_compressed_bytes_fast, const uint64_t block_compressed_bytes_slow) { for (auto& collector : collectors) { - collector->BlockAdd(block_raw_bytes, block_compressed_bytes_fast, + collector->BlockAdd(block_uncomp_bytes, block_compressed_bytes_fast, block_compressed_bytes_slow); } } diff --git a/table/meta_blocks.h b/table/meta_blocks.h index fb720f184..b867dd01d 100644 --- a/table/meta_blocks.h +++ b/table/meta_blocks.h @@ -92,7 +92,7 @@ bool NotifyCollectTableCollectorsOnAdd( void NotifyCollectTableCollectorsOnBlockAdd( const std::vector>& collectors, - uint64_t block_raw_bytes, uint64_t block_compressed_bytes_fast, + uint64_t block_uncomp_bytes, uint64_t block_compressed_bytes_fast, uint64_t block_compressed_bytes_slow); // NotifyCollectTableCollectorsOnFinish() triggers the `Finish` event for all diff --git a/table/persistent_cache_helper.cc b/table/persistent_cache_helper.cc index c10b193ba..8435b294e 100644 --- a/table/persistent_cache_helper.cc +++ b/table/persistent_cache_helper.cc @@ -11,7 +11,7 @@ namespace ROCKSDB_NAMESPACE { const PersistentCacheOptions PersistentCacheOptions::kEmpty; -void PersistentCacheHelper::InsertRawPage( +void PersistentCacheHelper::InsertSerialized( const PersistentCacheOptions& cache_options, const BlockHandle& handle, const char* data, const size_t size) { assert(cache_options.persistent_cache); @@ -24,7 +24,7 @@ void PersistentCacheHelper::InsertRawPage( .PermitUncheckedError(); } -void PersistentCacheHelper::InsertUncompressedPage( +void PersistentCacheHelper::InsertUncompressed( const PersistentCacheOptions& cache_options, const BlockHandle& handle, const BlockContents& contents) { assert(cache_options.persistent_cache); @@ -42,11 +42,11 @@ void PersistentCacheHelper::InsertUncompressedPage( ; } -Status PersistentCacheHelper::LookupRawPage( +Status PersistentCacheHelper::LookupSerialized( const PersistentCacheOptions& cache_options, const BlockHandle& handle, - std::unique_ptr* raw_data, const size_t raw_data_size) { + std::unique_ptr* out_data, const size_t expected_data_size) { #ifdef NDEBUG - (void)raw_data_size; + (void)expected_data_size; #endif assert(cache_options.persistent_cache); assert(cache_options.persistent_cache->IsCompressed()); @@ -56,7 +56,7 @@ Status PersistentCacheHelper::LookupRawPage( size_t size; Status s = - cache_options.persistent_cache->Lookup(key.AsSlice(), raw_data, &size); + cache_options.persistent_cache->Lookup(key.AsSlice(), out_data, &size); if (!s.ok()) { // cache miss RecordTick(cache_options.statistics, PERSISTENT_CACHE_MISS); @@ -65,13 +65,14 @@ Status PersistentCacheHelper::LookupRawPage( // cache hit // Block-based table is assumed - assert(raw_data_size == handle.size() + BlockBasedTable::kBlockTrailerSize); - assert(size == raw_data_size); + assert(expected_data_size == + handle.size() + BlockBasedTable::kBlockTrailerSize); + assert(size == expected_data_size); RecordTick(cache_options.statistics, PERSISTENT_CACHE_HIT); return Status::OK(); } -Status PersistentCacheHelper::LookupUncompressedPage( +Status PersistentCacheHelper::LookupUncompressed( const PersistentCacheOptions& cache_options, const BlockHandle& handle, BlockContents* contents) { assert(cache_options.persistent_cache); diff --git a/table/persistent_cache_helper.h b/table/persistent_cache_helper.h index 1db855729..ece339aee 100644 --- a/table/persistent_cache_helper.h +++ b/table/persistent_cache_helper.h @@ -19,26 +19,28 @@ struct BlockContents; // Encapsulates some of the helper logic for read and writing from the cache class PersistentCacheHelper { public: - // insert block into raw page cache - static void InsertRawPage(const PersistentCacheOptions& cache_options, - const BlockHandle& handle, const char* data, - const size_t size); - - // insert block into uncompressed cache - static void InsertUncompressedPage( - const PersistentCacheOptions& cache_options, const BlockHandle& handle, - const BlockContents& contents); - - // lookup block from raw page cacge - static Status LookupRawPage(const PersistentCacheOptions& cache_options, - const BlockHandle& handle, - std::unique_ptr* raw_data, - const size_t raw_data_size); - - // lookup block from uncompressed cache - static Status LookupUncompressedPage( - const PersistentCacheOptions& cache_options, const BlockHandle& handle, - BlockContents* contents); + // Insert block into cache of serialized blocks. Size includes block trailer + // (if applicable). + static void InsertSerialized(const PersistentCacheOptions& cache_options, + const BlockHandle& handle, const char* data, + const size_t size); + + // Insert block into cache of uncompressed blocks. No block trailer. + static void InsertUncompressed(const PersistentCacheOptions& cache_options, + const BlockHandle& handle, + const BlockContents& contents); + + // Lookup block from cache of serialized blocks. Size includes block trailer + // (if applicable). + static Status LookupSerialized(const PersistentCacheOptions& cache_options, + const BlockHandle& handle, + std::unique_ptr* out_data, + const size_t expected_data_size); + + // Lookup block from uncompressed cache. No block trailer. + static Status LookupUncompressed(const PersistentCacheOptions& cache_options, + const BlockHandle& handle, + BlockContents* contents); }; } // namespace ROCKSDB_NAMESPACE diff --git a/table/sst_file_writer_collectors.h b/table/sst_file_writer_collectors.h index 7610af573..486315fb5 100644 --- a/table/sst_file_writer_collectors.h +++ b/table/sst_file_writer_collectors.h @@ -36,7 +36,7 @@ class SstFileWriterPropertiesCollector : public IntTblPropCollector { return Status::OK(); } - virtual void BlockAdd(uint64_t /* block_raw_bytes */, + virtual void BlockAdd(uint64_t /* block_uncomp_bytes */, uint64_t /* block_compressed_bytes_fast */, uint64_t /* block_compressed_bytes_slow */) override { // Intentionally left blank. No interest in collecting stats for diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 6834d0b80..4522c193e 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -1165,7 +1165,7 @@ Status BlobDBImpl::DecompressSlice(const Slice& compressed_value, UncompressionContext context(compression_type); UncompressionInfo info(context, UncompressionDict::GetEmptyDict(), compression_type); - Status s = UncompressBlockContentsForCompressionType( + Status s = UncompressBlockData( info, compressed_value.data(), compressed_value.size(), &contents, kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions())); if (!s.ok()) { diff --git a/utilities/blob_db/blob_dump_tool.cc b/utilities/blob_db/blob_dump_tool.cc index 47ab16218..5a628fd21 100644 --- a/utilities/blob_db/blob_dump_tool.cc +++ b/utilities/blob_db/blob_dump_tool.cc @@ -213,7 +213,7 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob, UncompressionContext context(compression); UncompressionInfo info(context, UncompressionDict::GetEmptyDict(), compression); - s = UncompressBlockContentsForCompressionType( + s = UncompressBlockData( info, slice.data() + key_size, static_cast(value_size), &contents, 2 /*compress_format_version*/, ImmutableOptions(Options())); if (!s.ok()) { diff --git a/utilities/cache_dump_load_impl.cc b/utilities/cache_dump_load_impl.cc index 33b0d37ec..221634ea0 100644 --- a/utilities/cache_dump_load_impl.cc +++ b/utilities/cache_dump_load_impl.cc @@ -175,33 +175,33 @@ CacheDumperImpl::DumpOneBlockCallBack() { // Step 4: if the block should not be filter out, write the block to the // CacheDumpWriter if (!filter_out && block_start != nullptr) { - char* buffer = new char[block_len]; - memcpy(buffer, block_start, block_len); - WriteCacheBlock(type, key, (void*)buffer, block_len) + WriteBlock(type, key, Slice(block_start, block_len)) .PermitUncheckedError(); - delete[] buffer; } }; } -// Write the raw block to the writer. It takes the timestamp of the block being -// copied from block cache, block type, key, block pointer, raw block size and -// the block checksum as the input. When writing the raw block, we first create -// the dump unit and encoude it to a string. Then, we calculate the checksum of -// the how dump unit string and store it in the dump unit metadata. + +// Write the block to the writer. It takes the timestamp of the +// block being copied from block cache, block type, key, block pointer, +// block size and block checksum as the input. When writing the dumper raw +// block, we first create the dump unit and encoude it to a string. Then, +// we calculate the checksum of the whole dump unit string and store it in +// the dump unit metadata. // First, we write the metadata first, which is a fixed size string. Then, we // Append the dump unit string to the writer. -IOStatus CacheDumperImpl::WriteRawBlock(uint64_t timestamp, - CacheDumpUnitType type, - const Slice& key, void* value, - size_t len, uint32_t checksum) { - // First, serilize the block information in a string +IOStatus CacheDumperImpl::WriteBlock(CacheDumpUnitType type, const Slice& key, + const Slice& value) { + uint64_t timestamp = clock_->NowMicros(); + uint32_t value_checksum = crc32c::Value(value.data(), value.size()); + + // First, serialize the block information in a string DumpUnit dump_unit; dump_unit.timestamp = timestamp; dump_unit.key = key; dump_unit.type = type; - dump_unit.value_len = len; - dump_unit.value = value; - dump_unit.value_checksum = checksum; + dump_unit.value_len = value.size(); + dump_unit.value = const_cast(value.data()); + dump_unit.value_checksum = value_checksum; std::string encoded_data; CacheDumperHelper::EncodeDumpUnit(dump_unit, &encoded_data); @@ -212,19 +212,19 @@ IOStatus CacheDumperImpl::WriteRawBlock(uint64_t timestamp, unit_meta.sequence_num = sequence_num_; sequence_num_++; unit_meta.dump_unit_checksum = - crc32c::Value(encoded_data.c_str(), encoded_data.size()); - unit_meta.dump_unit_size = static_cast(encoded_data.size()); + crc32c::Value(encoded_data.data(), encoded_data.size()); + unit_meta.dump_unit_size = encoded_data.size(); std::string encoded_meta; CacheDumperHelper::EncodeDumpUnitMeta(unit_meta, &encoded_meta); // We write the metadata first. assert(writer_ != nullptr); - IOStatus io_s = writer_->WriteMetadata(Slice(encoded_meta)); + IOStatus io_s = writer_->WriteMetadata(encoded_meta); if (!io_s.ok()) { return io_s; } // followed by the dump unit. - return writer_->WritePacket(Slice(encoded_data)); + return writer_->WritePacket(encoded_data); } // Before we write any block, we write the header first to store the cache dump @@ -238,38 +238,18 @@ IOStatus CacheDumperImpl::WriteHeader() { << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t" << "Format: dump_unit_metadata , dump_unit cache_value\n"; + "block_size, block_data, block_checksum> cache_value\n"; std::string header_value(s.str()); CacheDumpUnitType type = CacheDumpUnitType::kHeader; - uint64_t timestamp = clock_->NowMicros(); - uint32_t header_checksum = - crc32c::Value(header_value.c_str(), header_value.size()); - return WriteRawBlock(timestamp, type, Slice(header_key), - (void*)header_value.c_str(), header_value.size(), - header_checksum); -} - -// Write the block dumped from cache -IOStatus CacheDumperImpl::WriteCacheBlock(const CacheDumpUnitType type, - const Slice& key, void* value, - size_t len) { - uint64_t timestamp = clock_->NowMicros(); - uint32_t value_checksum = crc32c::Value((char*)value, len); - return WriteRawBlock(timestamp, type, key, value, len, value_checksum); + return WriteBlock(type, header_key, header_value); } // Write the footer after all the blocks are stored to indicate the ending. IOStatus CacheDumperImpl::WriteFooter() { std::string footer_key = "footer"; - std::ostringstream s; std::string footer_value("cache dump completed"); CacheDumpUnitType type = CacheDumpUnitType::kFooter; - uint64_t timestamp = clock_->NowMicros(); - uint32_t footer_checksum = - crc32c::Value(footer_value.c_str(), footer_value.size()); - return WriteRawBlock(timestamp, type, Slice(footer_key), - (void*)footer_value.c_str(), footer_value.size(), - footer_checksum); + return WriteBlock(type, footer_key, footer_value); } // This is the main function to restore the cache entries to secondary cache. @@ -309,9 +289,10 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() { if (!io_s.ok()) { break; } - // create the raw_block_content based on the information in the dump_unit - BlockContents raw_block_contents( - Slice((char*)dump_unit.value, dump_unit.value_len)); + // Create the uncompressed_block based on the information in the dump_unit + // (There is no block trailer here compatible with block-based SST file.) + BlockContents uncompressed_block( + Slice(static_cast(dump_unit.value), dump_unit.value_len)); Cache::CacheItemHelper* helper = nullptr; Statistics* statistics = nullptr; Status s = Status::OK(); @@ -323,7 +304,7 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() { BlockType::kFilter); std::unique_ptr block_holder; block_holder.reset(BlocklikeTraits::Create( - std::move(raw_block_contents), toptions_.read_amp_bytes_per_bit, + std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit, statistics, false, toptions_.filter_policy.get())); if (helper != nullptr) { s = secondary_cache_->Insert(dump_unit.key, @@ -335,7 +316,7 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() { helper = BlocklikeTraits::GetCacheItemHelper(BlockType::kData); std::unique_ptr block_holder; block_holder.reset(BlocklikeTraits::Create( - std::move(raw_block_contents), toptions_.read_amp_bytes_per_bit, + std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit, statistics, false, toptions_.filter_policy.get())); if (helper != nullptr) { s = secondary_cache_->Insert(dump_unit.key, @@ -347,7 +328,7 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() { helper = BlocklikeTraits::GetCacheItemHelper(BlockType::kIndex); std::unique_ptr block_holder; block_holder.reset(BlocklikeTraits::Create( - std::move(raw_block_contents), 0, statistics, false, + std::move(uncompressed_block), 0, statistics, false, toptions_.filter_policy.get())); if (helper != nullptr) { s = secondary_cache_->Insert(dump_unit.key, @@ -360,7 +341,7 @@ IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() { BlockType::kFilterPartitionIndex); std::unique_ptr block_holder; block_holder.reset(BlocklikeTraits::Create( - std::move(raw_block_contents), toptions_.read_amp_bytes_per_bit, + std::move(uncompressed_block), toptions_.read_amp_bytes_per_bit, statistics, false, toptions_.filter_policy.get())); if (helper != nullptr) { s = secondary_cache_->Insert(dump_unit.key, @@ -436,7 +417,7 @@ IOStatus CacheDumpedLoaderImpl::ReadHeader(std::string* data, if (!io_s.ok()) { return io_s; } - uint32_t unit_checksum = crc32c::Value(data->c_str(), data->size()); + uint32_t unit_checksum = crc32c::Value(data->data(), data->size()); if (unit_checksum != header_meta.dump_unit_checksum) { return IOStatus::Corruption("Read header unit corrupted!"); } @@ -461,7 +442,7 @@ IOStatus CacheDumpedLoaderImpl::ReadCacheBlock(std::string* data, if (!io_s.ok()) { return io_s; } - uint32_t unit_checksum = crc32c::Value(data->c_str(), data->size()); + uint32_t unit_checksum = crc32c::Value(data->data(), data->size()); if (unit_checksum != unit_meta.dump_unit_checksum) { return IOStatus::Corruption( "Checksum does not match! Read dumped unit corrupted!"); diff --git a/utilities/cache_dump_load_impl.h b/utilities/cache_dump_load_impl.h index ad8cd045a..f45b3360b 100644 --- a/utilities/cache_dump_load_impl.h +++ b/utilities/cache_dump_load_impl.h @@ -75,7 +75,7 @@ struct DumpUnit { // Pointer to the block. Note that, in the dump process, it points to a memory // buffer copied from cache block. The buffer is freed when we process the // next block. In the load process, we use an std::string to store the - // serilized dump_unit read from the reader. So it points to the memory + // serialized dump_unit read from the reader. So it points to the memory // address of the begin of the block in this string. void* value; @@ -103,14 +103,9 @@ class CacheDumperImpl : public CacheDumper { IOStatus DumpCacheEntriesToWriter() override; private: - IOStatus WriteRawBlock(uint64_t timestamp, CacheDumpUnitType type, - const Slice& key, void* value, size_t len, - uint32_t checksum); - + IOStatus WriteBlock(CacheDumpUnitType type, const Slice& key, + const Slice& value); IOStatus WriteHeader(); - - IOStatus WriteCacheBlock(const CacheDumpUnitType type, const Slice& key, - void* value, size_t len); IOStatus WriteFooter(); bool ShouldFilterOut(const Slice& key); std::function @@ -166,7 +161,7 @@ class ToFileCacheDumpWriter : public CacheDumpWriter { ~ToFileCacheDumpWriter() { Close().PermitUncheckedError(); } - // Write the serilized metadata to the file + // Write the serialized metadata to the file virtual IOStatus WriteMetadata(const Slice& metadata) override { assert(file_writer_ != nullptr); std::string prefix; @@ -179,7 +174,7 @@ class ToFileCacheDumpWriter : public CacheDumpWriter { return io_s; } - // Write the serilized data to the file + // Write the serialized data to the file virtual IOStatus WritePacket(const Slice& data) override { assert(file_writer_ != nullptr); std::string prefix; @@ -284,7 +279,7 @@ class FromFileCacheDumpReader : public CacheDumpReader { // The cache dump and load helper class class CacheDumperHelper { public: - // serilize the dump_unit_meta to a string, it is fixed 16 bytes size. + // serialize the dump_unit_meta to a string, it is fixed 16 bytes size. static void EncodeDumpUnitMeta(const DumpUnitMeta& meta, std::string* data) { assert(data); PutFixed32(data, static_cast(meta.sequence_num)); @@ -292,7 +287,7 @@ class CacheDumperHelper { PutFixed64(data, meta.dump_unit_size); } - // Serilize the dump_unit to a string. + // Serialize the dump_unit to a string. static void EncodeDumpUnit(const DumpUnit& dump_unit, std::string* data) { assert(data); PutFixed64(data, dump_unit.timestamp); @@ -304,7 +299,7 @@ class CacheDumperHelper { Slice((char*)dump_unit.value, dump_unit.value_len)); } - // Deserilize the dump_unit_meta from a string + // Deserialize the dump_unit_meta from a string static Status DecodeDumpUnitMeta(const std::string& encoded_data, DumpUnitMeta* unit_meta) { assert(unit_meta != nullptr); @@ -323,7 +318,7 @@ class CacheDumperHelper { return Status::OK(); } - // Deserilize the dump_unit from a string. + // Deserialize the dump_unit from a string. static Status DecodeDumpUnit(const std::string& encoded_data, DumpUnit* dump_unit) { assert(dump_unit != nullptr);