Make parallel compression optimization code tidier (#6888)

Summary:
This commit makes https://github.com/facebook/rocksdb/issues/6262's code change tidier and easier to understand by:

1. Wrapping parallel compression initialization and termination into
   common methods;
2. Wrapping BlockRep initialization, push/pop into common methods;
3. Wrapping file size estimation into common methods;
4. Fixing function declarations that use non-const reference;
5. Fixing some uninitialized variables;
6. Fixing first_block data race;
7. Making BlockRep::status check in BlockBasedTableBuilder::Finish only present
if ok();
8. Making assert(ok()) in BlockBasedTableBuilder::CompressAndVerifyBlock only
present in non-parallel compression mode. In parallel compression mode,
compression will abort if status is not OK;
9. Eliminating potential data race caused by BlockBasedTableBuilder::GetStatus()
and BlockBasedTableBuilder::GetIOStatus() by returning status copy instead of
unprotected reference.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/6888

Reviewed By: ajkr

Differential Revision: D21957110

Pulled By: jay-zhuang

fbshipit-source-id: 3a29892f249209513f030349756cecd7736eae80
main
Ziyue Yang 4 years ago committed by Facebook GitHub Bot
parent eef27d0048
commit 1c78e4b235
  1. 548
      table/block_based/block_based_table_builder.cc
  2. 22
      table/block_based/block_based_table_builder.h

@ -254,9 +254,6 @@ struct BlockBasedTableBuilder::Rep {
const InternalKeyComparator& internal_comparator; const InternalKeyComparator& internal_comparator;
WritableFileWriter* file; WritableFileWriter* file;
std::atomic<uint64_t> offset; std::atomic<uint64_t> offset;
// Synchronize status & io_status accesses across threads from main thread,
// compression thread and write thread in parallel compression.
std::mutex status_mutex;
size_t alignment; size_t alignment;
BlockBuilder data_block; BlockBuilder data_block;
// Buffers uncompressed data blocks and keys to replay later. Needed when // Buffers uncompressed data blocks and keys to replay later. Needed when
@ -340,58 +337,63 @@ struct BlockBasedTableBuilder::Rep {
uint64_t get_offset() { return offset.load(std::memory_order_relaxed); } uint64_t get_offset() { return offset.load(std::memory_order_relaxed); }
void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); } void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); }
const IOStatus& GetIOStatus() { bool IsParallelCompressionEnabled() const {
if (compression_opts.parallel_threads > 1) { return compression_opts.parallel_threads > 1;
std::lock_guard<std::mutex> lock(status_mutex); }
return io_status;
Status GetStatus() {
// We need to make modifications of status visible when status_ok is set
// to false, and this is ensured by status_mutex, so no special memory
// order for status_ok is required.
if (status_ok.load(std::memory_order_relaxed)) {
return Status::OK();
} else { } else {
return io_status; return CopyStatus();
} }
} }
const Status& GetStatus() { Status CopyStatus() {
if (compression_opts.parallel_threads > 1) {
std::lock_guard<std::mutex> lock(status_mutex); std::lock_guard<std::mutex> lock(status_mutex);
return status; return status;
} else {
return status;
}
} }
void SyncStatusFromIOStatus() { IOStatus GetIOStatus() {
if (compression_opts.parallel_threads > 1) { // We need to make modifications of io_status visible when status_ok is set
std::lock_guard<std::mutex> lock(status_mutex); // to false, and this is ensured by io_status_mutex, so no special memory
if (status.ok()) { // order for io_status_ok is required.
status = io_status; if (io_status_ok.load(std::memory_order_relaxed)) {
return IOStatus::OK();
} else {
return CopyIOStatus();
} }
} else if (status.ok()) {
status = io_status;
} }
IOStatus CopyIOStatus() {
std::lock_guard<std::mutex> lock(io_status_mutex);
return io_status;
} }
// Never erase an existing status that is not OK. // Never erase an existing status that is not OK.
void SetStatus(Status s) { void SetStatus(Status s) {
if (!s.ok()) { if (!s.ok() && status_ok.load(std::memory_order_relaxed)) {
// Locking is an overkill for non compression_opts.parallel_threads // Locking is an overkill for non compression_opts.parallel_threads
// case but since it's unlikely that s is not OK, we take this cost // case but since it's unlikely that s is not OK, we take this cost
// to be simplicity. // to be simplicity.
std::lock_guard<std::mutex> lock(status_mutex); std::lock_guard<std::mutex> lock(status_mutex);
if (status.ok()) {
status = s; status = s;
} status_ok.store(false, std::memory_order_relaxed);
} }
} }
// Never erase an existing I/O status that is not OK. // Never erase an existing I/O status that is not OK.
void SetIOStatus(IOStatus ios) { void SetIOStatus(IOStatus ios) {
if (!ios.ok()) { if (!ios.ok() && io_status_ok.load(std::memory_order_relaxed)) {
// Locking is an overkill for non compression_opts.parallel_threads // Locking is an overkill for non compression_opts.parallel_threads
// case but since it's unlikely that s is not OK, we take this cost // case but since it's unlikely that s is not OK, we take this cost
// to be simplicity. // to be simplicity.
std::lock_guard<std::mutex> lock(status_mutex); std::lock_guard<std::mutex> lock(io_status_mutex);
if (io_status.ok()) {
io_status = ios; io_status = ios;
} io_status_ok.store(false, std::memory_order_relaxed);
} }
} }
@ -451,7 +453,9 @@ struct BlockBasedTableBuilder::Rep {
file_creation_time(_file_creation_time), file_creation_time(_file_creation_time),
db_id(_db_id), db_id(_db_id),
db_session_id(_db_session_id), db_session_id(_db_session_id),
db_host_id(ioptions.db_host_id) { db_host_id(ioptions.db_host_id),
status_ok(true),
io_status_ok(true) {
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
compression_ctxs[i].reset(new CompressionContext(compression_type)); compression_ctxs[i].reset(new CompressionContext(compression_type));
} }
@ -503,7 +507,13 @@ struct BlockBasedTableBuilder::Rep {
Rep& operator=(const Rep&) = delete; Rep& operator=(const Rep&) = delete;
private: private:
// Synchronize status & io_status accesses across threads from main thread,
// compression thread and write thread in parallel compression.
std::mutex status_mutex;
std::atomic<bool> status_ok;
Status status; Status status;
std::mutex io_status_mutex;
std::atomic<bool> io_status_ok;
IOStatus io_status; IOStatus io_status;
}; };
@ -599,10 +609,98 @@ struct BlockBasedTableBuilder::ParallelCompressionRep {
WriteQueue write_queue; WriteQueue write_queue;
std::unique_ptr<port::Thread> write_thread; std::unique_ptr<port::Thread> write_thread;
// Estimate output file size when parallel compression is enabled. This is
// necessary because compression & flush are no longer synchronized,
// and BlockBasedTableBuilder::FileSize() is no longer accurate.
// memory_order_relaxed suffices because accurate statistics is not required.
class FileSizeEstimator {
public:
explicit FileSizeEstimator()
: raw_bytes_compressed(0),
raw_bytes_curr_block(0),
raw_bytes_curr_block_set(false),
raw_bytes_inflight(0),
blocks_inflight(0),
curr_compression_ratio(0),
estimated_file_size(0) {}
// Estimate file size when a block is about to be emitted to
// compression thread
void EmitBlock(uint64_t raw_block_size, uint64_t curr_file_size) {
uint64_t new_raw_bytes_inflight =
raw_bytes_inflight.fetch_add(raw_block_size,
std::memory_order_relaxed) +
raw_block_size;
uint64_t new_blocks_inflight =
blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1;
estimated_file_size.store(
curr_file_size +
static_cast<uint64_t>(
static_cast<double>(new_raw_bytes_inflight) *
curr_compression_ratio.load(std::memory_order_relaxed)) +
new_blocks_inflight * kBlockTrailerSize,
std::memory_order_relaxed);
}
// Estimate file size when a block is already reaped from
// compression thread
void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) {
assert(raw_bytes_curr_block_set);
uint64_t new_raw_bytes_compressed =
raw_bytes_compressed + raw_bytes_curr_block;
assert(new_raw_bytes_compressed > 0);
curr_compression_ratio.store(
(curr_compression_ratio.load(std::memory_order_relaxed) *
raw_bytes_compressed +
compressed_block_size) /
static_cast<double>(new_raw_bytes_compressed),
std::memory_order_relaxed);
raw_bytes_compressed = new_raw_bytes_compressed;
uint64_t new_raw_bytes_inflight =
raw_bytes_inflight.fetch_sub(raw_bytes_curr_block,
std::memory_order_relaxed) -
raw_bytes_curr_block;
uint64_t new_blocks_inflight =
blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1;
estimated_file_size.store(
curr_file_size +
static_cast<uint64_t>(
static_cast<double>(new_raw_bytes_inflight) *
curr_compression_ratio.load(std::memory_order_relaxed)) +
new_blocks_inflight * kBlockTrailerSize,
std::memory_order_relaxed);
raw_bytes_curr_block_set = false;
}
void SetEstimatedFileSize(uint64_t size) {
estimated_file_size.store(size, std::memory_order_relaxed);
}
uint64_t GetEstimatedFileSize() {
return estimated_file_size.load(std::memory_order_relaxed);
}
void SetCurrBlockRawSize(uint64_t size) {
raw_bytes_curr_block = size;
raw_bytes_curr_block_set = true;
}
private:
// Raw bytes compressed so far. // Raw bytes compressed so far.
uint64_t raw_bytes_compressed; uint64_t raw_bytes_compressed;
// Size of current block being appended. // Size of current block being appended.
uint64_t raw_bytes_curr_block; uint64_t raw_bytes_curr_block;
// Whether raw_bytes_curr_block has been set for next
// ReapBlock call.
bool raw_bytes_curr_block_set;
// Raw bytes under compression and not appended yet. // Raw bytes under compression and not appended yet.
std::atomic<uint64_t> raw_bytes_inflight; std::atomic<uint64_t> raw_bytes_inflight;
// Number of blocks under compression and not appended yet. // Number of blocks under compression and not appended yet.
@ -611,29 +709,23 @@ struct BlockBasedTableBuilder::ParallelCompressionRep {
std::atomic<double> curr_compression_ratio; std::atomic<double> curr_compression_ratio;
// Estimated SST file size. // Estimated SST file size.
std::atomic<uint64_t> estimated_file_size; std::atomic<uint64_t> estimated_file_size;
};
FileSizeEstimator file_size_estimator;
// Wait for the completion of first block compression to get a // Facilities used for waiting first block completion. Need to Wait for
// non-zero compression ratio. // the completion of first block compression and flush to get a non-zero
bool first_block; // compression ratio.
std::atomic<bool> first_block_processed;
std::condition_variable first_block_cond; std::condition_variable first_block_cond;
std::mutex first_block_mutex; std::mutex first_block_mutex;
bool finished; explicit ParallelCompressionRep(uint32_t parallel_threads)
ParallelCompressionRep(uint32_t parallel_threads)
: curr_block_keys(new Keys()), : curr_block_keys(new Keys()),
block_rep_buf(parallel_threads), block_rep_buf(parallel_threads),
block_rep_pool(parallel_threads), block_rep_pool(parallel_threads),
compress_queue(parallel_threads), compress_queue(parallel_threads),
write_queue(parallel_threads), write_queue(parallel_threads),
raw_bytes_compressed(0), first_block_processed(false) {
raw_bytes_curr_block(0),
raw_bytes_inflight(0),
blocks_inflight(0),
curr_compression_ratio(0),
estimated_file_size(0),
first_block(true),
finished(false) {
for (uint32_t i = 0; i < parallel_threads; i++) { for (uint32_t i = 0; i < parallel_threads; i++) {
block_rep_buf[i].contents = Slice(); block_rep_buf[i].contents = Slice();
block_rep_buf[i].compressed_contents = Slice(); block_rep_buf[i].compressed_contents = Slice();
@ -649,6 +741,88 @@ struct BlockBasedTableBuilder::ParallelCompressionRep {
} }
~ParallelCompressionRep() { block_rep_pool.finish(); } ~ParallelCompressionRep() { block_rep_pool.finish(); }
// Make a block prepared to be emitted to compression thread
// Used in non-buffered mode
BlockRep* PrepareBlock(CompressionType compression_type,
const Slice* first_key_in_next_block,
BlockBuilder* data_block) {
BlockRep* block_rep =
PrepareBlockInternal(compression_type, first_key_in_next_block);
assert(block_rep != nullptr);
data_block->SwapAndReset(*(block_rep->data));
block_rep->contents = *(block_rep->data);
std::swap(block_rep->keys, curr_block_keys);
curr_block_keys->Clear();
return block_rep;
}
// Used in EnterUnbuffered
BlockRep* PrepareBlock(CompressionType compression_type,
const Slice* first_key_in_next_block,
std::string* data_block,
std::vector<std::string>* keys) {
BlockRep* block_rep =
PrepareBlockInternal(compression_type, first_key_in_next_block);
assert(block_rep != nullptr);
std::swap(*(block_rep->data), *data_block);
block_rep->contents = *(block_rep->data);
block_rep->keys->SwapAssign(*keys);
return block_rep;
}
// Emit a block to compression thread
void EmitBlock(BlockRep* block_rep) {
assert(block_rep != nullptr);
assert(block_rep->status.ok());
if (!write_queue.push(block_rep->slot.get())) {
return;
}
if (!compress_queue.push(block_rep)) {
return;
}
if (!first_block_processed.load(std::memory_order_relaxed)) {
std::unique_lock<std::mutex> lock(first_block_mutex);
first_block_cond.wait(lock, [this] {
return first_block_processed.load(std::memory_order_relaxed);
});
}
}
// Reap a block from compression thread
void ReapBlock(BlockRep* block_rep) {
assert(block_rep != nullptr);
block_rep->compressed_data->clear();
block_rep_pool.push(block_rep);
if (!first_block_processed.load(std::memory_order_relaxed)) {
std::lock_guard<std::mutex> lock(first_block_mutex);
first_block_processed.store(true, std::memory_order_relaxed);
first_block_cond.notify_one();
}
}
private:
BlockRep* PrepareBlockInternal(CompressionType compression_type,
const Slice* first_key_in_next_block) {
BlockRep* block_rep = nullptr;
block_rep_pool.pop(block_rep);
assert(block_rep != nullptr);
assert(block_rep->data);
block_rep->compression_type = compression_type;
if (first_key_in_next_block == nullptr) {
block_rep->first_key_in_next_block.reset(nullptr);
} else {
block_rep->first_key_in_next_block->assign(
first_key_in_next_block->data(), first_key_in_next_block->size());
}
return block_rep;
}
}; };
BlockBasedTableBuilder::BlockBasedTableBuilder( BlockBasedTableBuilder::BlockBasedTableBuilder(
@ -694,19 +868,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
&rep_->compressed_cache_key_prefix_size); &rep_->compressed_cache_key_prefix_size);
} }
if (rep_->compression_opts.parallel_threads > 1) { if (rep_->IsParallelCompressionEnabled()) {
rep_->pc_rep.reset( StartParallelCompression();
new ParallelCompressionRep(rep_->compression_opts.parallel_threads));
rep_->pc_rep->compress_thread_pool.reserve(
rep_->compression_opts.parallel_threads);
for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) {
rep_->pc_rep->compress_thread_pool.emplace_back([this, i] {
BGWorkCompression(*(rep_->compression_ctxs[i]),
rep_->verify_ctxs[i].get());
});
}
rep_->pc_rep->write_thread.reset(
new port::Thread([this] { BGWorkWriteRawBlock(); }));
} }
} }
@ -748,7 +911,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
// entries in the first block and < all entries in subsequent // entries in the first block and < all entries in subsequent
// blocks. // blocks.
if (ok() && r->state == Rep::State::kUnbuffered) { if (ok() && r->state == Rep::State::kUnbuffered) {
if (r->compression_opts.parallel_threads > 1) { if (r->IsParallelCompressionEnabled()) {
r->pc_rep->curr_block_keys->Clear(); r->pc_rep->curr_block_keys->Clear();
} else { } else {
r->index_builder->AddIndexEntry(&r->last_key, &key, r->index_builder->AddIndexEntry(&r->last_key, &key,
@ -760,7 +923,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
// Note: PartitionedFilterBlockBuilder requires key being added to filter // Note: PartitionedFilterBlockBuilder requires key being added to filter
// builder after being added to index builder. // builder after being added to index builder.
if (r->state == Rep::State::kUnbuffered) { if (r->state == Rep::State::kUnbuffered) {
if (r->compression_opts.parallel_threads > 1) { if (r->IsParallelCompressionEnabled()) {
r->pc_rep->curr_block_keys->PushBack(key); r->pc_rep->curr_block_keys->PushBack(key);
} else { } else {
if (r->filter_builder != nullptr) { if (r->filter_builder != nullptr) {
@ -781,7 +944,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
} }
r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString()); r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString());
} else { } else {
if (r->compression_opts.parallel_threads == 1) { if (!r->IsParallelCompressionEnabled()) {
r->index_builder->OnKeyAdded(key); r->index_builder->OnKeyAdded(key);
} }
} }
@ -818,61 +981,15 @@ void BlockBasedTableBuilder::Flush() {
assert(rep_->state != Rep::State::kClosed); assert(rep_->state != Rep::State::kClosed);
if (!ok()) return; if (!ok()) return;
if (r->data_block.empty()) return; if (r->data_block.empty()) return;
if (r->compression_opts.parallel_threads > 1 && if (r->IsParallelCompressionEnabled() &&
r->state == Rep::State::kUnbuffered) { r->state == Rep::State::kUnbuffered) {
ParallelCompressionRep::BlockRep* block_rep = nullptr;
r->pc_rep->block_rep_pool.pop(block_rep);
assert(block_rep != nullptr);
r->data_block.Finish(); r->data_block.Finish();
assert(block_rep->data); ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
r->data_block.SwapAndReset(*(block_rep->data)); r->compression_type, r->first_key_in_next_block, &(r->data_block));
assert(block_rep != nullptr);
block_rep->contents = *(block_rep->data); r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
r->get_offset());
block_rep->compression_type = r->compression_type; r->pc_rep->EmitBlock(block_rep);
std::swap(block_rep->keys, r->pc_rep->curr_block_keys);
r->pc_rep->curr_block_keys->Clear();
if (r->first_key_in_next_block == nullptr) {
block_rep->first_key_in_next_block.reset(nullptr);
} else {
block_rep->first_key_in_next_block->assign(
r->first_key_in_next_block->data(),
r->first_key_in_next_block->size());
}
uint64_t new_raw_bytes_inflight =
r->pc_rep->raw_bytes_inflight.fetch_add(block_rep->data->size(),
std::memory_order_relaxed) +
block_rep->data->size();
uint64_t new_blocks_inflight =
r->pc_rep->blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1;
r->pc_rep->estimated_file_size.store(
r->get_offset() +
static_cast<uint64_t>(static_cast<double>(new_raw_bytes_inflight) *
r->pc_rep->curr_compression_ratio.load(
std::memory_order_relaxed)) +
new_blocks_inflight * kBlockTrailerSize,
std::memory_order_relaxed);
// Read out first_block here to avoid data race with BGWorkWriteRawBlock
bool first_block = r->pc_rep->first_block;
assert(block_rep->status.ok());
if (!r->pc_rep->write_queue.push(block_rep->slot.get())) {
return;
}
if (!r->pc_rep->compress_queue.push(block_rep)) {
return;
}
if (first_block) {
std::unique_lock<std::mutex> lock(r->pc_rep->first_block_mutex);
r->pc_rep->first_block_cond.wait(lock,
[r] { return !r->pc_rep->first_block; });
}
} else { } else {
WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */); WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
} }
@ -919,9 +1036,11 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
} }
void BlockBasedTableBuilder::BGWorkCompression( void BlockBasedTableBuilder::BGWorkCompression(
CompressionContext& compression_ctx, UncompressionContext* verify_ctx) { const CompressionContext& compression_ctx,
ParallelCompressionRep::BlockRep* block_rep; UncompressionContext* verify_ctx) {
ParallelCompressionRep::BlockRep* block_rep = nullptr;
while (rep_->pc_rep->compress_queue.pop(block_rep)) { while (rep_->pc_rep->compress_queue.pop(block_rep)) {
assert(block_rep != nullptr);
CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/ CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/
compression_ctx, verify_ctx, compression_ctx, verify_ctx,
block_rep->compressed_data.get(), block_rep->compressed_data.get(),
@ -933,15 +1052,18 @@ void BlockBasedTableBuilder::BGWorkCompression(
void BlockBasedTableBuilder::CompressAndVerifyBlock( void BlockBasedTableBuilder::CompressAndVerifyBlock(
const Slice& raw_block_contents, bool is_data_block, const Slice& raw_block_contents, bool is_data_block,
CompressionContext& compression_ctx, UncompressionContext* verify_ctx_ptr, const CompressionContext& compression_ctx, UncompressionContext* verify_ctx,
std::string* compressed_output, Slice* block_contents, std::string* compressed_output, Slice* block_contents,
CompressionType* type, Status* out_status) { CompressionType* type, Status* out_status) {
// File format contains a sequence of blocks where each block has: // File format contains a sequence of blocks where each block has:
// block_data: uint8[n] // block_data: uint8[n]
// type: uint8 // type: uint8
// crc: uint32 // crc: uint32
assert(ok());
Rep* r = rep_; Rep* r = rep_;
bool is_status_ok = ok();
if (!r->IsParallelCompressionEnabled()) {
assert(is_status_ok);
}
*type = r->compression_type; *type = r->compression_type;
uint64_t sample_for_compression = r->sample_for_compression; uint64_t sample_for_compression = r->sample_for_compression;
@ -951,7 +1073,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
r->ioptions.env, r->ioptions.env,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)); ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
if (raw_block_contents.size() < kCompressionSizeLimit) { if (is_status_ok && raw_block_contents.size() < kCompressionSizeLimit) {
const CompressionDict* compression_dict; const CompressionDict* compression_dict;
if (!is_data_block || r->compression_dict == nullptr) { if (!is_data_block || r->compression_dict == nullptr) {
compression_dict = &CompressionDict::GetEmptyDict(); compression_dict = &CompressionDict::GetEmptyDict();
@ -988,7 +1110,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
} }
assert(verify_dict != nullptr); assert(verify_dict != nullptr);
BlockContents contents; BlockContents contents;
UncompressionInfo uncompression_info(*verify_ctx_ptr, *verify_dict, UncompressionInfo uncompression_info(*verify_ctx, *verify_dict,
r->compression_type); r->compression_type);
Status stat = UncompressBlockContentsForCompressionType( Status stat = UncompressBlockContentsForCompressionType(
uncompression_info, block_contents->data(), block_contents->size(), uncompression_info, block_contents->data(), block_contents->size(),
@ -1117,39 +1239,12 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
r->SetIOStatus(io_s); r->SetIOStatus(io_s);
} }
} }
if (r->compression_opts.parallel_threads > 1) { if (r->IsParallelCompressionEnabled()) {
if (!r->pc_rep->finished) { if (is_data_block) {
assert(r->pc_rep->raw_bytes_compressed + r->pc_rep->file_size_estimator.ReapBlock(block_contents.size(),
r->pc_rep->raw_bytes_curr_block > r->get_offset());
0);
r->pc_rep->curr_compression_ratio.store(
(r->pc_rep->curr_compression_ratio.load(
std::memory_order_relaxed) *
r->pc_rep->raw_bytes_compressed +
block_contents.size()) /
static_cast<double>(r->pc_rep->raw_bytes_compressed +
r->pc_rep->raw_bytes_curr_block),
std::memory_order_relaxed);
r->pc_rep->raw_bytes_compressed += r->pc_rep->raw_bytes_curr_block;
uint64_t new_raw_bytes_inflight =
r->pc_rep->raw_bytes_inflight.fetch_sub(
r->pc_rep->raw_bytes_curr_block, std::memory_order_relaxed) -
r->pc_rep->raw_bytes_curr_block;
uint64_t new_blocks_inflight = r->pc_rep->blocks_inflight.fetch_sub(
1, std::memory_order_relaxed) -
1;
assert(new_blocks_inflight < r->compression_opts.parallel_threads);
r->pc_rep->estimated_file_size.store(
r->get_offset() +
static_cast<uint64_t>(
static_cast<double>(new_raw_bytes_inflight) *
r->pc_rep->curr_compression_ratio.load(
std::memory_order_relaxed)) +
new_blocks_inflight * kBlockTrailerSize,
std::memory_order_relaxed);
} else { } else {
r->pc_rep->estimated_file_size.store(r->get_offset(), r->pc_rep->file_size_estimator.SetEstimatedFileSize(r->get_offset());
std::memory_order_relaxed);
} }
} }
} }
@ -1163,24 +1258,19 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
void BlockBasedTableBuilder::BGWorkWriteRawBlock() { void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
Rep* r = rep_; Rep* r = rep_;
ParallelCompressionRep::BlockRepSlot* slot; ParallelCompressionRep::BlockRepSlot* slot = nullptr;
ParallelCompressionRep::BlockRep* block_rep; ParallelCompressionRep::BlockRep* block_rep = nullptr;
while (r->pc_rep->write_queue.pop(slot)) { while (r->pc_rep->write_queue.pop(slot)) {
assert(slot != nullptr);
slot->Take(block_rep); slot->Take(block_rep);
assert(block_rep != nullptr);
if (!block_rep->status.ok()) { if (!block_rep->status.ok()) {
r->SetStatus(block_rep->status); r->SetStatus(block_rep->status);
// Return block_rep to the pool so that blocked Flush() can finish // Reap block so that blocked Flush() can finish
// if there is one, and Flush() will notice !ok() next time. // if there is one, and Flush() will notice !ok() next time.
block_rep->status = Status::OK(); block_rep->status = Status::OK();
block_rep->compressed_data->clear(); r->pc_rep->ReapBlock(block_rep);
r->pc_rep->block_rep_pool.push(block_rep); continue;
// Unlock first block if necessary.
if (r->pc_rep->first_block) {
std::lock_guard<std::mutex> lock(r->pc_rep->first_block_mutex);
r->pc_rep->first_block = false;
r->pc_rep->first_block_cond.notify_one();
}
break;
} }
for (size_t i = 0; i < block_rep->keys->Size(); i++) { for (size_t i = 0; i < block_rep->keys->Size(); i++) {
@ -1193,19 +1283,13 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
r->index_builder->OnKeyAdded(key); r->index_builder->OnKeyAdded(key);
} }
r->pc_rep->raw_bytes_curr_block = block_rep->data->size(); r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size());
WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type, WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type,
&r->pending_handle, true /* is_data_block*/); &r->pending_handle, true /* is_data_block*/);
if (!ok()) { if (!ok()) {
break; break;
} }
if (r->pc_rep->first_block) {
std::lock_guard<std::mutex> lock(r->pc_rep->first_block_mutex);
r->pc_rep->first_block = false;
r->pc_rep->first_block_cond.notify_one();
}
if (r->filter_builder != nullptr) { if (r->filter_builder != nullptr) {
r->filter_builder->StartBlock(r->get_offset()); r->filter_builder->StartBlock(r->get_offset());
} }
@ -1222,9 +1306,33 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
&first_key_in_next_block, &first_key_in_next_block,
r->pending_handle); r->pending_handle);
} }
block_rep->compressed_data->clear();
r->pc_rep->block_rep_pool.push(block_rep); r->pc_rep->ReapBlock(block_rep);
}
}
void BlockBasedTableBuilder::StartParallelCompression() {
rep_->pc_rep.reset(
new ParallelCompressionRep(rep_->compression_opts.parallel_threads));
rep_->pc_rep->compress_thread_pool.reserve(
rep_->compression_opts.parallel_threads);
for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) {
rep_->pc_rep->compress_thread_pool.emplace_back([this, i] {
BGWorkCompression(*(rep_->compression_ctxs[i]),
rep_->verify_ctxs[i].get());
});
} }
rep_->pc_rep->write_thread.reset(
new port::Thread([this] { BGWorkWriteRawBlock(); }));
}
void BlockBasedTableBuilder::StopParallelCompression() {
rep_->pc_rep->compress_queue.finish();
for (auto& thread : rep_->pc_rep->compress_thread_pool) {
thread.join();
}
rep_->pc_rep->write_queue.finish();
rep_->pc_rep->write_thread->join();
} }
Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); } Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); }
@ -1504,11 +1612,12 @@ void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
footer.EncodeTo(&footer_encoding); footer.EncodeTo(&footer_encoding);
assert(ok()); assert(ok());
IOStatus ios = r->file->Append(footer_encoding); IOStatus ios = r->file->Append(footer_encoding);
r->SetIOStatus(ios);
if (ios.ok()) { if (ios.ok()) {
r->set_offset(r->get_offset() + footer_encoding.size()); r->set_offset(r->get_offset() + footer_encoding.size());
} else {
r->SetIOStatus(ios);
r->SetStatus(ios);
} }
r->SyncStatusFromIOStatus();
} }
void BlockBasedTableBuilder::EnterUnbuffered() { void BlockBasedTableBuilder::EnterUnbuffered() {
@ -1557,62 +1666,22 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
assert(!data_block.empty()); assert(!data_block.empty());
assert(!keys.empty()); assert(!keys.empty());
if (r->compression_opts.parallel_threads > 1) { if (r->IsParallelCompressionEnabled()) {
ParallelCompressionRep::BlockRep* block_rep; Slice first_key_in_next_block;
r->pc_rep->block_rep_pool.pop(block_rep); const Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
std::swap(*(block_rep->data), data_block);
block_rep->contents = *(block_rep->data);
block_rep->compression_type = r->compression_type;
block_rep->keys->SwapAssign(keys);
if (i + 1 < r->data_block_and_keys_buffers.size()) { if (i + 1 < r->data_block_and_keys_buffers.size()) {
block_rep->first_key_in_next_block->assign( first_key_in_next_block =
r->data_block_and_keys_buffers[i + 1].second.front()); r->data_block_and_keys_buffers[i + 1].second.front();
} else {
if (r->first_key_in_next_block == nullptr) {
block_rep->first_key_in_next_block.reset(nullptr);
} else { } else {
block_rep->first_key_in_next_block->assign( first_key_in_next_block_ptr = r->first_key_in_next_block;
r->first_key_in_next_block->data(),
r->first_key_in_next_block->size());
}
}
uint64_t new_raw_bytes_inflight =
r->pc_rep->raw_bytes_inflight.fetch_add(block_rep->data->size(),
std::memory_order_relaxed) +
block_rep->data->size();
uint64_t new_blocks_inflight =
r->pc_rep->blocks_inflight.fetch_add(1, std::memory_order_relaxed) +
1;
r->pc_rep->estimated_file_size.store(
r->get_offset() +
static_cast<uint64_t>(
static_cast<double>(new_raw_bytes_inflight) *
r->pc_rep->curr_compression_ratio.load(
std::memory_order_relaxed)) +
new_blocks_inflight * kBlockTrailerSize,
std::memory_order_relaxed);
// Read out first_block here to avoid data race with BGWorkWriteRawBlock
bool first_block = r->pc_rep->first_block;
assert(block_rep->status.ok());
if (!r->pc_rep->write_queue.push(block_rep->slot.get())) {
return;
}
if (!r->pc_rep->compress_queue.push(block_rep)) {
return;
} }
if (first_block) { ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock(
std::unique_lock<std::mutex> lock(r->pc_rep->first_block_mutex); r->compression_type, first_key_in_next_block_ptr, &data_block, &keys);
r->pc_rep->first_block_cond.wait( assert(block_rep != nullptr);
lock, [r] { return !r->pc_rep->first_block; }); r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(),
} r->get_offset());
r->pc_rep->EmitBlock(block_rep);
} else { } else {
for (const auto& key : keys) { for (const auto& key : keys) {
if (r->filter_builder != nullptr) { if (r->filter_builder != nullptr) {
@ -1645,14 +1714,8 @@ Status BlockBasedTableBuilder::Finish() {
if (r->state == Rep::State::kBuffered) { if (r->state == Rep::State::kBuffered) {
EnterUnbuffered(); EnterUnbuffered();
} }
if (r->compression_opts.parallel_threads > 1) { if (r->IsParallelCompressionEnabled()) {
r->pc_rep->compress_queue.finish(); StopParallelCompression();
for (auto& thread : r->pc_rep->compress_thread_pool) {
thread.join();
}
r->pc_rep->write_queue.finish();
r->pc_rep->write_thread->join();
r->pc_rep->finished = true;
#ifndef NDEBUG #ifndef NDEBUG
for (const auto& br : r->pc_rep->block_rep_buf) { for (const auto& br : r->pc_rep->block_rep_buf) {
assert(br.status.ok()); assert(br.status.ok());
@ -1691,25 +1754,20 @@ Status BlockBasedTableBuilder::Finish() {
WriteFooter(metaindex_block_handle, index_block_handle); WriteFooter(metaindex_block_handle, index_block_handle);
} }
r->state = Rep::State::kClosed; r->state = Rep::State::kClosed;
Status ret_status = r->GetStatus(); r->SetStatus(r->CopyIOStatus());
Status ret_status = r->CopyStatus();
assert(!ret_status.ok() || io_status().ok()); assert(!ret_status.ok() || io_status().ok());
return ret_status; return ret_status;
} }
void BlockBasedTableBuilder::Abandon() { void BlockBasedTableBuilder::Abandon() {
assert(rep_->state != Rep::State::kClosed); assert(rep_->state != Rep::State::kClosed);
if (rep_->compression_opts.parallel_threads > 1) { if (rep_->IsParallelCompressionEnabled()) {
rep_->pc_rep->compress_queue.finish(); StopParallelCompression();
for (auto& thread : rep_->pc_rep->compress_thread_pool) {
thread.join();
}
rep_->pc_rep->write_queue.finish();
rep_->pc_rep->write_thread->join();
rep_->pc_rep->finished = true;
} }
rep_->state = Rep::State::kClosed; rep_->state = Rep::State::kClosed;
rep_->GetStatus().PermitUncheckedError(); rep_->CopyStatus().PermitUncheckedError();
rep_->GetIOStatus().PermitUncheckedError(); rep_->CopyIOStatus().PermitUncheckedError();
} }
uint64_t BlockBasedTableBuilder::NumEntries() const { uint64_t BlockBasedTableBuilder::NumEntries() const {
@ -1723,10 +1781,10 @@ bool BlockBasedTableBuilder::IsEmpty() const {
uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; } uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
uint64_t BlockBasedTableBuilder::EstimatedFileSize() const { uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
if (rep_->compression_opts.parallel_threads > 1) { if (rep_->IsParallelCompressionEnabled()) {
// Use compression ratio so far and inflight raw bytes to estimate // Use compression ratio so far and inflight raw bytes to estimate
// final SST size. // final SST size.
return rep_->pc_rep->estimated_file_size.load(std::memory_order_relaxed); return rep_->pc_rep->file_size_estimator.GetEstimatedFileSize();
} else { } else {
return FileSize(); return FileSize();
} }

@ -159,19 +159,29 @@ class BlockBasedTableBuilder : public TableBuilder {
// Get blocks from mem-table walking thread, compress them and // Get blocks from mem-table walking thread, compress them and
// pass them to the write thread. Used in parallel compression mode only // pass them to the write thread. Used in parallel compression mode only
void BGWorkCompression(CompressionContext& compression_ctx, void BGWorkCompression(const CompressionContext& compression_ctx,
UncompressionContext* verify_ctx); UncompressionContext* verify_ctx);
// Given raw block content, try to compress it and return result and // Given raw block content, try to compress it and return result and
// compression type // compression type
void CompressAndVerifyBlock( void CompressAndVerifyBlock(const Slice& raw_block_contents,
const Slice& raw_block_contents, bool is_data_block, bool is_data_block,
CompressionContext& compression_ctx, UncompressionContext* verify_ctx, const CompressionContext& compression_ctx,
std::string* compressed_output, Slice* result_block_contents, UncompressionContext* verify_ctx,
CompressionType* result_compression_type, Status* out_status); std::string* compressed_output,
Slice* result_block_contents,
CompressionType* result_compression_type,
Status* out_status);
// Get compressed blocks from BGWorkCompression and write them into SST // Get compressed blocks from BGWorkCompression and write them into SST
void BGWorkWriteRawBlock(); void BGWorkWriteRawBlock();
// Initialize parallel compression context and
// start BGWorkCompression and BGWorkWriteRawBlock threads
void StartParallelCompression();
// Stop BGWorkCompression and BGWorkWriteRawBlock threads
void StopParallelCompression();
}; };
Slice CompressBlock(const Slice& raw, const CompressionInfo& info, Slice CompressBlock(const Slice& raw, const CompressionInfo& info,

Loading…
Cancel
Save