From cc1c3ee54eace876ad18c39f931e8e5039823930 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Fri, 23 Apr 2021 12:44:11 -0700 Subject: [PATCH] Eliminate double-buffering of keys in block_based_table_builder (#8219) Summary: The block_based_table_builder buffers some blocks in memory to construct a good compression dictionary. Before this commit, the keys from each block were buffered separately for convenience. However, the buffered block data implicitly contains all keys. This commit eliminates the redundant key buffers and reduces memory usage. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8219 Reviewed By: ajkr Differential Revision: D27945851 Pulled By: saketh-are fbshipit-source-id: caf3cac1217201e080a1e24b542bedf20973afee --- .../block_based/block_based_table_builder.cc | 98 ++++++++++++------- 1 file changed, 60 insertions(+), 38 deletions(-) diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index f94ae8a95..17bdfc882 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -254,13 +254,10 @@ struct BlockBasedTableBuilder::Rep { std::atomic offset; size_t alignment; BlockBuilder data_block; - // Buffers uncompressed data blocks and keys to replay later. Needed when + // Buffers uncompressed data blocks to replay later. Needed when // compression dictionary is enabled so we can finalize the dictionary before // compressing any data blocks. - // TODO(ajkr): ideally we don't buffer all keys and all uncompressed data - // blocks as it's redundant, but it's easier to implement for now. - std::vector>> - data_block_and_keys_buffers; + std::vector data_block_buffers; BlockBuilder range_del_block; InternalKeySliceTransform internal_prefix_transform; @@ -311,8 +308,7 @@ struct BlockBasedTableBuilder::Rep { }; State state; // `kBuffered` state is allowed only as long as the buffering of uncompressed - // data blocks (see `data_block_and_keys_buffers`) does not exceed - // `buffer_limit`. + // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`. uint64_t buffer_limit; const bool use_delta_encoding_for_index_values; @@ -953,12 +949,8 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { r->last_key.assign(key.data(), key.size()); r->data_block.Add(key, value); if (r->state == Rep::State::kBuffered) { - // Buffer keys to be replayed during `Finish()` once compression - // dictionary has been finalized. - if (r->data_block_and_keys_buffers.empty() || should_flush) { - r->data_block_and_keys_buffers.emplace_back(); - } - r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString()); + // Buffered keys will be replayed from data_block_buffers during + // `Finish()` once compression dictionary has been finalized. } else { if (!r->IsParallelCompressionEnabled()) { r->index_builder->OnKeyAdded(key); @@ -1019,11 +1011,8 @@ void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, block->SwapAndReset(raw_block_contents); if (rep_->state == Rep::State::kBuffered) { assert(is_data_block); - assert(!rep_->data_block_and_keys_buffers.empty()); - rep_->data_block_and_keys_buffers.back().first = - std::move(raw_block_contents); - rep_->data_begin_offset += - rep_->data_block_and_keys_buffers.back().first.size(); + rep_->data_block_buffers.emplace_back(std::move(raw_block_contents)); + rep_->data_begin_offset += rep_->data_block_buffers.back().size(); return; } WriteBlock(raw_block_contents, handle, is_data_block); @@ -1695,7 +1684,7 @@ void BlockBasedTableBuilder::EnterUnbuffered() { const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0 ? r->compression_opts.zstd_max_train_bytes : r->compression_opts.max_dict_bytes; - const size_t kNumBlocksBuffered = r->data_block_and_keys_buffers.size(); + const size_t kNumBlocksBuffered = r->data_block_buffers.size(); if (kNumBlocksBuffered == 0) { // The below code is neither safe nor necessary for handling zero data // blocks. @@ -1725,11 +1714,10 @@ void BlockBasedTableBuilder::EnterUnbuffered() { for (size_t i = 0; i < kNumBlocksBuffered && compression_dict_samples.size() < kSampleBytes; ++i) { - size_t copy_len = - std::min(kSampleBytes - compression_dict_samples.size(), - r->data_block_and_keys_buffers[buffer_idx].first.size()); - compression_dict_samples.append( - r->data_block_and_keys_buffers[buffer_idx].first, 0, copy_len); + size_t copy_len = std::min(kSampleBytes - compression_dict_samples.size(), + r->data_block_buffers[buffer_idx].size()); + compression_dict_samples.append(r->data_block_buffers[buffer_idx], 0, + copy_len); compression_dict_sample_lens.emplace_back(copy_len); buffer_idx += kPrimeGeneratorRemainder; @@ -1754,30 +1742,58 @@ void BlockBasedTableBuilder::EnterUnbuffered() { dict, r->compression_type == kZSTD || r->compression_type == kZSTDNotFinalCompression)); - for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) { - auto& data_block = r->data_block_and_keys_buffers[i].first; - auto& keys = r->data_block_and_keys_buffers[i].second; + auto get_iterator_for_block = [&r](size_t i) { + auto& data_block = r->data_block_buffers[i]; assert(!data_block.empty()); - assert(!keys.empty()); + + Block reader{BlockContents{data_block}}; + DataBlockIter* iter = reader.NewDataIterator( + r->internal_comparator.user_comparator(), kDisableGlobalSequenceNumber); + + iter->SeekToFirst(); + assert(iter->Valid()); + return std::unique_ptr(iter); + }; + + std::unique_ptr iter = nullptr, next_block_iter = nullptr; + + for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) { + if (iter == nullptr) { + iter = get_iterator_for_block(i); + assert(iter != nullptr); + }; + + if (i + 1 < r->data_block_buffers.size()) { + next_block_iter = get_iterator_for_block(i + 1); + } + + auto& data_block = r->data_block_buffers[i]; if (r->IsParallelCompressionEnabled()) { Slice first_key_in_next_block; const Slice* first_key_in_next_block_ptr = &first_key_in_next_block; - if (i + 1 < r->data_block_and_keys_buffers.size()) { - first_key_in_next_block = - r->data_block_and_keys_buffers[i + 1].second.front(); + if (i + 1 < r->data_block_buffers.size()) { + assert(next_block_iter != nullptr); + first_key_in_next_block = next_block_iter->key(); } else { first_key_in_next_block_ptr = r->first_key_in_next_block; } + std::vector keys; + for (; iter->Valid(); iter->Next()) { + keys.emplace_back(iter->key().ToString()); + } + ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock( r->compression_type, first_key_in_next_block_ptr, &data_block, &keys); + assert(block_rep != nullptr); r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(), r->get_offset()); r->pc_rep->EmitBlock(block_rep); } else { - for (const auto& key : keys) { + for (; iter->Valid(); iter->Next()) { + Slice key = iter->key(); if (r->filter_builder != nullptr) { size_t ts_sz = r->internal_comparator.user_comparator()->timestamp_size(); @@ -1787,16 +1803,22 @@ void BlockBasedTableBuilder::EnterUnbuffered() { } WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */); - if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) { - Slice first_key_in_next_block = - r->data_block_and_keys_buffers[i + 1].second.front(); + if (ok() && i + 1 < r->data_block_buffers.size()) { + assert(next_block_iter != nullptr); + Slice first_key_in_next_block = next_block_iter->key(); + Slice* first_key_in_next_block_ptr = &first_key_in_next_block; - r->index_builder->AddIndexEntry( - &keys.back(), first_key_in_next_block_ptr, r->pending_handle); + + iter->SeekToLast(); + std::string last_key = iter->key().ToString(); + r->index_builder->AddIndexEntry(&last_key, first_key_in_next_block_ptr, + r->pending_handle); } } + + std::swap(iter, next_block_iter); } - r->data_block_and_keys_buffers.clear(); + r->data_block_buffers.clear(); } Status BlockBasedTableBuilder::Finish() {