diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index f793447ec..8ee047347 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -254,9 +254,6 @@ struct BlockBasedTableBuilder::Rep { const InternalKeyComparator& internal_comparator; WritableFileWriter* file; std::atomic offset; - // Synchronize status & io_status accesses across threads from main thread, - // compression thread and write thread in parallel compression. - std::mutex status_mutex; size_t alignment; BlockBuilder data_block; // Buffers uncompressed data blocks and keys to replay later. Needed when @@ -340,58 +337,63 @@ struct BlockBasedTableBuilder::Rep { uint64_t get_offset() { return offset.load(std::memory_order_relaxed); } void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); } - const IOStatus& GetIOStatus() { - if (compression_opts.parallel_threads > 1) { - std::lock_guard lock(status_mutex); - return io_status; + bool IsParallelCompressionEnabled() const { + return compression_opts.parallel_threads > 1; + } + + Status GetStatus() { + // We need to make modifications of status visible when status_ok is set + // to false, and this is ensured by status_mutex, so no special memory + // order for status_ok is required. + if (status_ok.load(std::memory_order_relaxed)) { + return Status::OK(); } else { - return io_status; + return CopyStatus(); } } - const Status& GetStatus() { - if (compression_opts.parallel_threads > 1) { - std::lock_guard lock(status_mutex); - return status; + Status CopyStatus() { + std::lock_guard lock(status_mutex); + return status; + } + + IOStatus GetIOStatus() { + // We need to make modifications of io_status visible when status_ok is set + // to false, and this is ensured by io_status_mutex, so no special memory + // order for io_status_ok is required. + if (io_status_ok.load(std::memory_order_relaxed)) { + return IOStatus::OK(); } else { - return status; + return CopyIOStatus(); } } - void SyncStatusFromIOStatus() { - if (compression_opts.parallel_threads > 1) { - std::lock_guard lock(status_mutex); - if (status.ok()) { - status = io_status; - } - } else if (status.ok()) { - status = io_status; - } + IOStatus CopyIOStatus() { + std::lock_guard lock(io_status_mutex); + return io_status; } // Never erase an existing status that is not OK. void SetStatus(Status s) { - if (!s.ok()) { + if (!s.ok() && status_ok.load(std::memory_order_relaxed)) { // Locking is an overkill for non compression_opts.parallel_threads // case but since it's unlikely that s is not OK, we take this cost // to be simplicity. std::lock_guard lock(status_mutex); - if (status.ok()) { - status = s; - } + status = s; + status_ok.store(false, std::memory_order_relaxed); } } // Never erase an existing I/O status that is not OK. void SetIOStatus(IOStatus ios) { - if (!ios.ok()) { + if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) { // Locking is an overkill for non compression_opts.parallel_threads // case but since it's unlikely that s is not OK, we take this cost // to be simplicity. - std::lock_guard lock(status_mutex); - if (io_status.ok()) { - io_status = ios; - } + std::lock_guard lock(io_status_mutex); + io_status = ios; + io_status_ok.store(false, std::memory_order_relaxed); } } @@ -451,7 +453,9 @@ struct BlockBasedTableBuilder::Rep { file_creation_time(_file_creation_time), db_id(_db_id), db_session_id(_db_session_id), - db_host_id(ioptions.db_host_id) { + db_host_id(ioptions.db_host_id), + status_ok(true), + io_status_ok(true) { for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { compression_ctxs[i].reset(new CompressionContext(compression_type)); } @@ -503,7 +507,13 @@ struct BlockBasedTableBuilder::Rep { Rep& operator=(const Rep&) = delete; private: + // Synchronize status & io_status accesses across threads from main thread, + // compression thread and write thread in parallel compression. + std::mutex status_mutex; + std::atomic status_ok; Status status; + std::mutex io_status_mutex; + std::atomic io_status_ok; IOStatus io_status; }; @@ -599,41 +609,123 @@ struct BlockBasedTableBuilder::ParallelCompressionRep { WriteQueue write_queue; std::unique_ptr write_thread; - // Raw bytes compressed so far. - uint64_t raw_bytes_compressed; - // Size of current block being appended. - uint64_t raw_bytes_curr_block; - // Raw bytes under compression and not appended yet. - std::atomic raw_bytes_inflight; - // Number of blocks under compression and not appended yet. - std::atomic blocks_inflight; - // Current compression ratio, maintained by BGWorkWriteRawBlock. - std::atomic curr_compression_ratio; - // Estimated SST file size. - std::atomic estimated_file_size; - - // Wait for the completion of first block compression to get a - // non-zero compression ratio. - bool first_block; + // Estimate output file size when parallel compression is enabled. This is + // necessary because compression & flush are no longer synchronized, + // and BlockBasedTableBuilder::FileSize() is no longer accurate. + // memory_order_relaxed suffices because accurate statistics is not required. + class FileSizeEstimator { + public: + explicit FileSizeEstimator() + : raw_bytes_compressed(0), + raw_bytes_curr_block(0), + raw_bytes_curr_block_set(false), + raw_bytes_inflight(0), + blocks_inflight(0), + curr_compression_ratio(0), + estimated_file_size(0) {} + + // Estimate file size when a block is about to be emitted to + // compression thread + void EmitBlock(uint64_t raw_block_size, uint64_t curr_file_size) { + uint64_t new_raw_bytes_inflight = + raw_bytes_inflight.fetch_add(raw_block_size, + std::memory_order_relaxed) + + raw_block_size; + + uint64_t new_blocks_inflight = + blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; + + estimated_file_size.store( + curr_file_size + + static_cast( + static_cast(new_raw_bytes_inflight) * + curr_compression_ratio.load(std::memory_order_relaxed)) + + new_blocks_inflight * kBlockTrailerSize, + std::memory_order_relaxed); + } + + // Estimate file size when a block is already reaped from + // compression thread + void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) { + assert(raw_bytes_curr_block_set); + + uint64_t new_raw_bytes_compressed = + raw_bytes_compressed + raw_bytes_curr_block; + assert(new_raw_bytes_compressed > 0); + + curr_compression_ratio.store( + (curr_compression_ratio.load(std::memory_order_relaxed) * + raw_bytes_compressed + + compressed_block_size) / + static_cast(new_raw_bytes_compressed), + std::memory_order_relaxed); + raw_bytes_compressed = new_raw_bytes_compressed; + + uint64_t new_raw_bytes_inflight = + raw_bytes_inflight.fetch_sub(raw_bytes_curr_block, + std::memory_order_relaxed) - + raw_bytes_curr_block; + + uint64_t new_blocks_inflight = + blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1; + + estimated_file_size.store( + curr_file_size + + static_cast( + static_cast(new_raw_bytes_inflight) * + curr_compression_ratio.load(std::memory_order_relaxed)) + + new_blocks_inflight * kBlockTrailerSize, + std::memory_order_relaxed); + + raw_bytes_curr_block_set = false; + } + + void SetEstimatedFileSize(uint64_t size) { + estimated_file_size.store(size, std::memory_order_relaxed); + } + + uint64_t GetEstimatedFileSize() { + return estimated_file_size.load(std::memory_order_relaxed); + } + + void SetCurrBlockRawSize(uint64_t size) { + raw_bytes_curr_block = size; + raw_bytes_curr_block_set = true; + } + + private: + // Raw bytes compressed so far. + uint64_t raw_bytes_compressed; + // Size of current block being appended. + uint64_t raw_bytes_curr_block; + // Whether raw_bytes_curr_block has been set for next + // ReapBlock call. + bool raw_bytes_curr_block_set; + // Raw bytes under compression and not appended yet. + std::atomic raw_bytes_inflight; + // Number of blocks under compression and not appended yet. + std::atomic blocks_inflight; + // Current compression ratio, maintained by BGWorkWriteRawBlock. + std::atomic curr_compression_ratio; + // Estimated SST file size. + std::atomic estimated_file_size; + }; + FileSizeEstimator file_size_estimator; + + // Facilities used for waiting first block completion. Need to Wait for + // the completion of first block compression and flush to get a non-zero + // compression ratio. + std::atomic first_block_processed; std::condition_variable first_block_cond; std::mutex first_block_mutex; - bool finished; - - ParallelCompressionRep(uint32_t parallel_threads) + explicit ParallelCompressionRep(uint32_t parallel_threads) : curr_block_keys(new Keys()), block_rep_buf(parallel_threads), block_rep_pool(parallel_threads), compress_queue(parallel_threads), write_queue(parallel_threads), - raw_bytes_compressed(0), - raw_bytes_curr_block(0), - raw_bytes_inflight(0), - blocks_inflight(0), - curr_compression_ratio(0), - estimated_file_size(0), - first_block(true), - finished(false) { + first_block_processed(false) { for (uint32_t i = 0; i < parallel_threads; i++) { block_rep_buf[i].contents = Slice(); block_rep_buf[i].compressed_contents = Slice(); @@ -649,6 +741,88 @@ struct BlockBasedTableBuilder::ParallelCompressionRep { } ~ParallelCompressionRep() { block_rep_pool.finish(); } + + // Make a block prepared to be emitted to compression thread + // Used in non-buffered mode + BlockRep* PrepareBlock(CompressionType compression_type, + const Slice* first_key_in_next_block, + BlockBuilder* data_block) { + BlockRep* block_rep = + PrepareBlockInternal(compression_type, first_key_in_next_block); + assert(block_rep != nullptr); + data_block->SwapAndReset(*(block_rep->data)); + block_rep->contents = *(block_rep->data); + std::swap(block_rep->keys, curr_block_keys); + curr_block_keys->Clear(); + return block_rep; + } + + // Used in EnterUnbuffered + BlockRep* PrepareBlock(CompressionType compression_type, + const Slice* first_key_in_next_block, + std::string* data_block, + std::vector* keys) { + BlockRep* block_rep = + PrepareBlockInternal(compression_type, first_key_in_next_block); + assert(block_rep != nullptr); + std::swap(*(block_rep->data), *data_block); + block_rep->contents = *(block_rep->data); + block_rep->keys->SwapAssign(*keys); + return block_rep; + } + + // Emit a block to compression thread + void EmitBlock(BlockRep* block_rep) { + assert(block_rep != nullptr); + assert(block_rep->status.ok()); + if (!write_queue.push(block_rep->slot.get())) { + return; + } + if (!compress_queue.push(block_rep)) { + return; + } + + if (!first_block_processed.load(std::memory_order_relaxed)) { + std::unique_lock lock(first_block_mutex); + first_block_cond.wait(lock, [this] { + return first_block_processed.load(std::memory_order_relaxed); + }); + } + } + + // Reap a block from compression thread + void ReapBlock(BlockRep* block_rep) { + assert(block_rep != nullptr); + block_rep->compressed_data->clear(); + block_rep_pool.push(block_rep); + + if (!first_block_processed.load(std::memory_order_relaxed)) { + std::lock_guard lock(first_block_mutex); + first_block_processed.store(true, std::memory_order_relaxed); + first_block_cond.notify_one(); + } + } + + private: + BlockRep* PrepareBlockInternal(CompressionType compression_type, + const Slice* first_key_in_next_block) { + BlockRep* block_rep = nullptr; + block_rep_pool.pop(block_rep); + assert(block_rep != nullptr); + + assert(block_rep->data); + + block_rep->compression_type = compression_type; + + if (first_key_in_next_block == nullptr) { + block_rep->first_key_in_next_block.reset(nullptr); + } else { + block_rep->first_key_in_next_block->assign( + first_key_in_next_block->data(), first_key_in_next_block->size()); + } + + return block_rep; + } }; BlockBasedTableBuilder::BlockBasedTableBuilder( @@ -694,19 +868,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( &rep_->compressed_cache_key_prefix_size); } - if (rep_->compression_opts.parallel_threads > 1) { - rep_->pc_rep.reset( - new ParallelCompressionRep(rep_->compression_opts.parallel_threads)); - rep_->pc_rep->compress_thread_pool.reserve( - rep_->compression_opts.parallel_threads); - for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) { - rep_->pc_rep->compress_thread_pool.emplace_back([this, i] { - BGWorkCompression(*(rep_->compression_ctxs[i]), - rep_->verify_ctxs[i].get()); - }); - } - rep_->pc_rep->write_thread.reset( - new port::Thread([this] { BGWorkWriteRawBlock(); })); + if (rep_->IsParallelCompressionEnabled()) { + StartParallelCompression(); } } @@ -748,7 +911,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { // entries in the first block and < all entries in subsequent // blocks. if (ok() && r->state == Rep::State::kUnbuffered) { - if (r->compression_opts.parallel_threads > 1) { + if (r->IsParallelCompressionEnabled()) { r->pc_rep->curr_block_keys->Clear(); } else { r->index_builder->AddIndexEntry(&r->last_key, &key, @@ -760,7 +923,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { // Note: PartitionedFilterBlockBuilder requires key being added to filter // builder after being added to index builder. if (r->state == Rep::State::kUnbuffered) { - if (r->compression_opts.parallel_threads > 1) { + if (r->IsParallelCompressionEnabled()) { r->pc_rep->curr_block_keys->PushBack(key); } else { if (r->filter_builder != nullptr) { @@ -781,7 +944,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { } r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString()); } else { - if (r->compression_opts.parallel_threads == 1) { + if (!r->IsParallelCompressionEnabled()) { r->index_builder->OnKeyAdded(key); } } @@ -818,61 +981,15 @@ void BlockBasedTableBuilder::Flush() { assert(rep_->state != Rep::State::kClosed); if (!ok()) return; if (r->data_block.empty()) return; - if (r->compression_opts.parallel_threads > 1 && + if (r->IsParallelCompressionEnabled() && r->state == Rep::State::kUnbuffered) { - ParallelCompressionRep::BlockRep* block_rep = nullptr; - r->pc_rep->block_rep_pool.pop(block_rep); - assert(block_rep != nullptr); - r->data_block.Finish(); - assert(block_rep->data); - r->data_block.SwapAndReset(*(block_rep->data)); - - block_rep->contents = *(block_rep->data); - - block_rep->compression_type = r->compression_type; - - std::swap(block_rep->keys, r->pc_rep->curr_block_keys); - r->pc_rep->curr_block_keys->Clear(); - - if (r->first_key_in_next_block == nullptr) { - block_rep->first_key_in_next_block.reset(nullptr); - } else { - block_rep->first_key_in_next_block->assign( - r->first_key_in_next_block->data(), - r->first_key_in_next_block->size()); - } - - uint64_t new_raw_bytes_inflight = - r->pc_rep->raw_bytes_inflight.fetch_add(block_rep->data->size(), - std::memory_order_relaxed) + - block_rep->data->size(); - uint64_t new_blocks_inflight = - r->pc_rep->blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; - r->pc_rep->estimated_file_size.store( - r->get_offset() + - static_cast(static_cast(new_raw_bytes_inflight) * - r->pc_rep->curr_compression_ratio.load( - std::memory_order_relaxed)) + - new_blocks_inflight * kBlockTrailerSize, - std::memory_order_relaxed); - - // Read out first_block here to avoid data race with BGWorkWriteRawBlock - bool first_block = r->pc_rep->first_block; - - assert(block_rep->status.ok()); - if (!r->pc_rep->write_queue.push(block_rep->slot.get())) { - return; - } - if (!r->pc_rep->compress_queue.push(block_rep)) { - return; - } - - if (first_block) { - std::unique_lock lock(r->pc_rep->first_block_mutex); - r->pc_rep->first_block_cond.wait(lock, - [r] { return !r->pc_rep->first_block; }); - } + ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock( + r->compression_type, r->first_key_in_next_block, &(r->data_block)); + assert(block_rep != nullptr); + r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(), + r->get_offset()); + r->pc_rep->EmitBlock(block_rep); } else { WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */); } @@ -919,9 +1036,11 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, } void BlockBasedTableBuilder::BGWorkCompression( - CompressionContext& compression_ctx, UncompressionContext* verify_ctx) { - ParallelCompressionRep::BlockRep* block_rep; + const CompressionContext& compression_ctx, + UncompressionContext* verify_ctx) { + ParallelCompressionRep::BlockRep* block_rep = nullptr; while (rep_->pc_rep->compress_queue.pop(block_rep)) { + assert(block_rep != nullptr); CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/ compression_ctx, verify_ctx, block_rep->compressed_data.get(), @@ -933,15 +1052,18 @@ void BlockBasedTableBuilder::BGWorkCompression( void BlockBasedTableBuilder::CompressAndVerifyBlock( const Slice& raw_block_contents, bool is_data_block, - CompressionContext& compression_ctx, UncompressionContext* verify_ctx_ptr, + const CompressionContext& compression_ctx, UncompressionContext* verify_ctx, std::string* compressed_output, Slice* block_contents, CompressionType* type, Status* out_status) { // File format contains a sequence of blocks where each block has: // block_data: uint8[n] // type: uint8 // crc: uint32 - assert(ok()); Rep* r = rep_; + bool is_status_ok = ok(); + if (!r->IsParallelCompressionEnabled()) { + assert(is_status_ok); + } *type = r->compression_type; uint64_t sample_for_compression = r->sample_for_compression; @@ -951,7 +1073,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( r->ioptions.env, ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)); - if (raw_block_contents.size() < kCompressionSizeLimit) { + if (is_status_ok && raw_block_contents.size() < kCompressionSizeLimit) { const CompressionDict* compression_dict; if (!is_data_block || r->compression_dict == nullptr) { compression_dict = &CompressionDict::GetEmptyDict(); @@ -988,7 +1110,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( } assert(verify_dict != nullptr); BlockContents contents; - UncompressionInfo uncompression_info(*verify_ctx_ptr, *verify_dict, + UncompressionInfo uncompression_info(*verify_ctx, *verify_dict, r->compression_type); Status stat = UncompressBlockContentsForCompressionType( uncompression_info, block_contents->data(), block_contents->size(), @@ -1117,39 +1239,12 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, r->SetIOStatus(io_s); } } - if (r->compression_opts.parallel_threads > 1) { - if (!r->pc_rep->finished) { - assert(r->pc_rep->raw_bytes_compressed + - r->pc_rep->raw_bytes_curr_block > - 0); - r->pc_rep->curr_compression_ratio.store( - (r->pc_rep->curr_compression_ratio.load( - std::memory_order_relaxed) * - r->pc_rep->raw_bytes_compressed + - block_contents.size()) / - static_cast(r->pc_rep->raw_bytes_compressed + - r->pc_rep->raw_bytes_curr_block), - std::memory_order_relaxed); - r->pc_rep->raw_bytes_compressed += r->pc_rep->raw_bytes_curr_block; - uint64_t new_raw_bytes_inflight = - r->pc_rep->raw_bytes_inflight.fetch_sub( - r->pc_rep->raw_bytes_curr_block, std::memory_order_relaxed) - - r->pc_rep->raw_bytes_curr_block; - uint64_t new_blocks_inflight = r->pc_rep->blocks_inflight.fetch_sub( - 1, std::memory_order_relaxed) - - 1; - assert(new_blocks_inflight < r->compression_opts.parallel_threads); - r->pc_rep->estimated_file_size.store( - r->get_offset() + - static_cast( - static_cast(new_raw_bytes_inflight) * - r->pc_rep->curr_compression_ratio.load( - std::memory_order_relaxed)) + - new_blocks_inflight * kBlockTrailerSize, - std::memory_order_relaxed); + if (r->IsParallelCompressionEnabled()) { + if (is_data_block) { + r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(), + r->get_offset()); } else { - r->pc_rep->estimated_file_size.store(r->get_offset(), - std::memory_order_relaxed); + r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset()); } } } @@ -1163,24 +1258,19 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, void BlockBasedTableBuilder::BGWorkWriteRawBlock() { Rep* r = rep_; - ParallelCompressionRep::BlockRepSlot* slot; - ParallelCompressionRep::BlockRep* block_rep; + ParallelCompressionRep::BlockRepSlot* slot = nullptr; + ParallelCompressionRep::BlockRep* block_rep = nullptr; while (r->pc_rep->write_queue.pop(slot)) { + assert(slot != nullptr); slot->Take(block_rep); + assert(block_rep != nullptr); if (!block_rep->status.ok()) { r->SetStatus(block_rep->status); - // Return block_rep to the pool so that blocked Flush() can finish + // Reap block so that blocked Flush() can finish // if there is one, and Flush() will notice !ok() next time. block_rep->status = Status::OK(); - block_rep->compressed_data->clear(); - r->pc_rep->block_rep_pool.push(block_rep); - // Unlock first block if necessary. - if (r->pc_rep->first_block) { - std::lock_guard lock(r->pc_rep->first_block_mutex); - r->pc_rep->first_block = false; - r->pc_rep->first_block_cond.notify_one(); - } - break; + r->pc_rep->ReapBlock(block_rep); + continue; } for (size_t i = 0; i < block_rep->keys->Size(); i++) { @@ -1193,19 +1283,13 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() { r->index_builder->OnKeyAdded(key); } - r->pc_rep->raw_bytes_curr_block = block_rep->data->size(); + r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size()); WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type, &r->pending_handle, true /* is_data_block*/); if (!ok()) { break; } - if (r->pc_rep->first_block) { - std::lock_guard lock(r->pc_rep->first_block_mutex); - r->pc_rep->first_block = false; - r->pc_rep->first_block_cond.notify_one(); - } - if (r->filter_builder != nullptr) { r->filter_builder->StartBlock(r->get_offset()); } @@ -1222,11 +1306,35 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() { &first_key_in_next_block, r->pending_handle); } - block_rep->compressed_data->clear(); - r->pc_rep->block_rep_pool.push(block_rep); + + r->pc_rep->ReapBlock(block_rep); } } +void BlockBasedTableBuilder::StartParallelCompression() { + rep_->pc_rep.reset( + new ParallelCompressionRep(rep_->compression_opts.parallel_threads)); + rep_->pc_rep->compress_thread_pool.reserve( + rep_->compression_opts.parallel_threads); + for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) { + rep_->pc_rep->compress_thread_pool.emplace_back([this, i] { + BGWorkCompression(*(rep_->compression_ctxs[i]), + rep_->verify_ctxs[i].get()); + }); + } + rep_->pc_rep->write_thread.reset( + new port::Thread([this] { BGWorkWriteRawBlock(); })); +} + +void BlockBasedTableBuilder::StopParallelCompression() { + rep_->pc_rep->compress_queue.finish(); + for (auto& thread : rep_->pc_rep->compress_thread_pool) { + thread.join(); + } + rep_->pc_rep->write_queue.finish(); + rep_->pc_rep->write_thread->join(); +} + Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); } IOStatus BlockBasedTableBuilder::io_status() const { @@ -1504,11 +1612,12 @@ void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle, footer.EncodeTo(&footer_encoding); assert(ok()); IOStatus ios = r->file->Append(footer_encoding); - r->SetIOStatus(ios); if (ios.ok()) { r->set_offset(r->get_offset() + footer_encoding.size()); + } else { + r->SetIOStatus(ios); + r->SetStatus(ios); } - r->SyncStatusFromIOStatus(); } void BlockBasedTableBuilder::EnterUnbuffered() { @@ -1557,62 +1666,22 @@ void BlockBasedTableBuilder::EnterUnbuffered() { assert(!data_block.empty()); assert(!keys.empty()); - if (r->compression_opts.parallel_threads > 1) { - ParallelCompressionRep::BlockRep* block_rep; - r->pc_rep->block_rep_pool.pop(block_rep); - - std::swap(*(block_rep->data), data_block); - block_rep->contents = *(block_rep->data); - - block_rep->compression_type = r->compression_type; - - block_rep->keys->SwapAssign(keys); - + if (r->IsParallelCompressionEnabled()) { + Slice first_key_in_next_block; + const Slice* first_key_in_next_block_ptr = &first_key_in_next_block; if (i + 1 < r->data_block_and_keys_buffers.size()) { - block_rep->first_key_in_next_block->assign( - r->data_block_and_keys_buffers[i + 1].second.front()); + first_key_in_next_block = + r->data_block_and_keys_buffers[i + 1].second.front(); } else { - if (r->first_key_in_next_block == nullptr) { - block_rep->first_key_in_next_block.reset(nullptr); - } else { - block_rep->first_key_in_next_block->assign( - r->first_key_in_next_block->data(), - r->first_key_in_next_block->size()); - } + first_key_in_next_block_ptr = r->first_key_in_next_block; } - uint64_t new_raw_bytes_inflight = - r->pc_rep->raw_bytes_inflight.fetch_add(block_rep->data->size(), - std::memory_order_relaxed) + - block_rep->data->size(); - uint64_t new_blocks_inflight = - r->pc_rep->blocks_inflight.fetch_add(1, std::memory_order_relaxed) + - 1; - r->pc_rep->estimated_file_size.store( - r->get_offset() + - static_cast( - static_cast(new_raw_bytes_inflight) * - r->pc_rep->curr_compression_ratio.load( - std::memory_order_relaxed)) + - new_blocks_inflight * kBlockTrailerSize, - std::memory_order_relaxed); - - // Read out first_block here to avoid data race with BGWorkWriteRawBlock - bool first_block = r->pc_rep->first_block; - - assert(block_rep->status.ok()); - if (!r->pc_rep->write_queue.push(block_rep->slot.get())) { - return; - } - if (!r->pc_rep->compress_queue.push(block_rep)) { - return; - } - - if (first_block) { - std::unique_lock lock(r->pc_rep->first_block_mutex); - r->pc_rep->first_block_cond.wait( - lock, [r] { return !r->pc_rep->first_block; }); - } + ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock( + r->compression_type, first_key_in_next_block_ptr, &data_block, &keys); + assert(block_rep != nullptr); + r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(), + r->get_offset()); + r->pc_rep->EmitBlock(block_rep); } else { for (const auto& key : keys) { if (r->filter_builder != nullptr) { @@ -1645,14 +1714,8 @@ Status BlockBasedTableBuilder::Finish() { if (r->state == Rep::State::kBuffered) { EnterUnbuffered(); } - if (r->compression_opts.parallel_threads > 1) { - r->pc_rep->compress_queue.finish(); - for (auto& thread : r->pc_rep->compress_thread_pool) { - thread.join(); - } - r->pc_rep->write_queue.finish(); - r->pc_rep->write_thread->join(); - r->pc_rep->finished = true; + if (r->IsParallelCompressionEnabled()) { + StopParallelCompression(); #ifndef NDEBUG for (const auto& br : r->pc_rep->block_rep_buf) { assert(br.status.ok()); @@ -1691,25 +1754,20 @@ Status BlockBasedTableBuilder::Finish() { WriteFooter(metaindex_block_handle, index_block_handle); } r->state = Rep::State::kClosed; - Status ret_status = r->GetStatus(); + r->SetStatus(r->CopyIOStatus()); + Status ret_status = r->CopyStatus(); assert(!ret_status.ok() || io_status().ok()); return ret_status; } void BlockBasedTableBuilder::Abandon() { assert(rep_->state != Rep::State::kClosed); - if (rep_->compression_opts.parallel_threads > 1) { - rep_->pc_rep->compress_queue.finish(); - for (auto& thread : rep_->pc_rep->compress_thread_pool) { - thread.join(); - } - rep_->pc_rep->write_queue.finish(); - rep_->pc_rep->write_thread->join(); - rep_->pc_rep->finished = true; + if (rep_->IsParallelCompressionEnabled()) { + StopParallelCompression(); } rep_->state = Rep::State::kClosed; - rep_->GetStatus().PermitUncheckedError(); - rep_->GetIOStatus().PermitUncheckedError(); + rep_->CopyStatus().PermitUncheckedError(); + rep_->CopyIOStatus().PermitUncheckedError(); } uint64_t BlockBasedTableBuilder::NumEntries() const { @@ -1723,10 +1781,10 @@ bool BlockBasedTableBuilder::IsEmpty() const { uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; } uint64_t BlockBasedTableBuilder::EstimatedFileSize() const { - if (rep_->compression_opts.parallel_threads > 1) { + if (rep_->IsParallelCompressionEnabled()) { // Use compression ratio so far and inflight raw bytes to estimate // final SST size. - return rep_->pc_rep->estimated_file_size.load(std::memory_order_relaxed); + return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize(); } else { return FileSize(); } diff --git a/table/block_based/block_based_table_builder.h b/table/block_based/block_based_table_builder.h index 2e3081d26..147fb8e7a 100644 --- a/table/block_based/block_based_table_builder.h +++ b/table/block_based/block_based_table_builder.h @@ -159,19 +159,29 @@ class BlockBasedTableBuilder : public TableBuilder { // Get blocks from mem-table walking thread, compress them and // pass them to the write thread. Used in parallel compression mode only - void BGWorkCompression(CompressionContext& compression_ctx, + void BGWorkCompression(const CompressionContext& compression_ctx, UncompressionContext* verify_ctx); // Given raw block content, try to compress it and return result and // compression type - void CompressAndVerifyBlock( - const Slice& raw_block_contents, bool is_data_block, - CompressionContext& compression_ctx, UncompressionContext* verify_ctx, - std::string* compressed_output, Slice* result_block_contents, - CompressionType* result_compression_type, Status* out_status); + void CompressAndVerifyBlock(const Slice& raw_block_contents, + bool is_data_block, + const CompressionContext& compression_ctx, + UncompressionContext* verify_ctx, + std::string* compressed_output, + Slice* result_block_contents, + CompressionType* result_compression_type, + Status* out_status); // Get compressed blocks from BGWorkCompression and write them into SST void BGWorkWriteRawBlock(); + + // Initialize parallel compression context and + // start BGWorkCompression and BGWorkWriteRawBlock threads + void StartParallelCompression(); + + // Stop BGWorkCompression and BGWorkWriteRawBlock threads + void StopParallelCompression(); }; Slice CompressBlock(const Slice& raw, const CompressionInfo& info,