From 079e50d2ba439cdff7754d835848054ab2c170b9 Mon Sep 17 00:00:00 2001 From: sdong Date: Thu, 30 Apr 2020 15:34:43 -0700 Subject: [PATCH] Disallow BlockBasedTableBuilder to set status from non-OK (#6776) Summary: There is no systematic mechanism to prevent BlockBasedTableBuilder's status to be set from non-OK to OK. Adding a mechanism to force this will help us prevent failures in the future. The solution is to only make it possible to set the status code if the status code to set is not OK. Since the status code passed to CompressAndVerifyBlock() is changed, a mini refactoring is done too so that the output arguments are changed from reference to pointers, based on Google C++ Style. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6776 Test Plan: Run all existing test. Reviewed By: pdillinger Differential Revision: D21314382 fbshipit-source-id: 27000c10f1e4c121661e026548d6882066409375 --- .../block_based/block_based_table_builder.cc | 166 +++++++++++------- table/block_based/block_based_table_builder.h | 8 +- 2 files changed, 100 insertions(+), 74 deletions(-) diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index 76e087bb5..9a7e379ad 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -285,12 +285,9 @@ struct BlockBasedTableBuilder::Rep { const InternalKeyComparator& internal_comparator; WritableFileWriter* file; std::atomic offset; - Status status; - IOStatus io_status; // Synchronize status & io_status accesses across threads from main thread, // compression thread and write thread in parallel compression. std::mutex status_mutex; - std::mutex io_status_mutex; size_t alignment; BlockBuilder data_block; // Buffers uncompressed data blocks and keys to replay later. Needed when @@ -369,6 +366,61 @@ struct BlockBasedTableBuilder::Rep { uint64_t get_offset() { return offset.load(std::memory_order_relaxed); } void set_offset(uint64_t o) { offset.store(o, std::memory_order_relaxed); } + const IOStatus& GetIOStatus() { + if (compression_opts.parallel_threads > 1) { + std::lock_guard lock(status_mutex); + return io_status; + } else { + return io_status; + } + } + + const Status& GetStatus() { + if (compression_opts.parallel_threads > 1) { + std::lock_guard lock(status_mutex); + return status; + } else { + return status; + } + } + + void SyncStatusFromIOStatus() { + if (compression_opts.parallel_threads > 1) { + std::lock_guard lock(status_mutex); + if (status.ok()) { + status = io_status; + } + } else if (status.ok()) { + status = io_status; + } + } + + // Never erase an existing status that is not OK. + void SetStatus(Status s) { + if (!s.ok()) { + // Locking is an overkill for non compression_opts.parallel_threads + // case but since it's unlikely that s is not OK, we take this cost + // to be simplicity. + std::lock_guard lock(status_mutex); + if (status.ok()) { + status = s; + } + } + } + + // Never erase an existing I/O status that is not OK. + void SetIOStatus(IOStatus ios) { + if (!ios.ok()) { + // Locking is an overkill for non compression_opts.parallel_threads + // case but since it's unlikely that s is not OK, we take this cost + // to be simplicity. + std::lock_guard lock(status_mutex); + if (io_status.ok()) { + io_status = ios; + } + } + } + Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions, const BlockBasedTableOptions& table_opt, const InternalKeyComparator& icomparator, @@ -470,6 +522,10 @@ struct BlockBasedTableBuilder::Rep { Rep& operator=(const Rep&) = delete; ~Rep() {} + + private: + Status status; + IOStatus io_status; }; struct BlockBasedTableBuilder::ParallelCompressionRep { @@ -860,9 +916,12 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size(); return; } + Status compress_status; CompressAndVerifyBlock(raw_block_contents, is_data_block, *(r->compression_ctxs[0]), r->verify_ctxs[0].get(), - r->compressed_output, block_contents, type, r->status); + &(r->compressed_output), &(block_contents), &type, + &compress_status); + r->SetStatus(compress_status); if (!ok()) { return; } @@ -883,8 +942,9 @@ void BlockBasedTableBuilder::BGWorkCompression( while (rep_->pc_rep->compress_queue.pop(block_rep)) { CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/ compression_ctx, verify_ctx, - *(block_rep->compressed_data), block_rep->contents, - block_rep->compression_type, block_rep->status); + block_rep->compressed_data.get(), + &block_rep->contents, &(block_rep->compression_type), + &block_rep->status); block_rep->slot->Fill(block_rep); } } @@ -892,8 +952,8 @@ void BlockBasedTableBuilder::BGWorkCompression( void BlockBasedTableBuilder::CompressAndVerifyBlock( const Slice& raw_block_contents, bool is_data_block, CompressionContext& compression_ctx, UncompressionContext* verify_ctx_ptr, - std::string& compressed_output, Slice& block_contents, - CompressionType& type, Status& out_status) { + std::string* compressed_output, Slice* block_contents, + CompressionType* type, Status* out_status) { // File format contains a sequence of blocks where each block has: // block_data: uint8[n] // type: uint8 @@ -901,7 +961,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( assert(ok()); Rep* r = rep_; - type = r->compression_type; + *type = r->compression_type; uint64_t sample_for_compression = r->sample_for_compression; bool abort_compression = false; @@ -918,15 +978,15 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( } assert(compression_dict != nullptr); CompressionInfo compression_info(r->compression_opts, compression_ctx, - *compression_dict, type, + *compression_dict, *type, sample_for_compression); std::string sampled_output_fast; std::string sampled_output_slow; - block_contents = CompressBlock( - raw_block_contents, compression_info, &type, + *block_contents = CompressBlock( + raw_block_contents, compression_info, type, r->table_options.format_version, is_data_block /* do_sample */, - &compressed_output, &sampled_output_fast, &sampled_output_slow); + compressed_output, &sampled_output_fast, &sampled_output_slow); // notify collectors on block add NotifyCollectTableCollectorsOnBlockAdd( @@ -936,7 +996,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( // Some of the compression algorithms are known to be unreliable. If // the verify_compression flag is set then try to de-compress the // compressed data and compare to the input. - if (type != kNoCompression && r->table_options.verify_compression) { + if (*type != kNoCompression && r->table_options.verify_compression) { // Retrieve the uncompressed contents into a new buffer const UncompressionDict* verify_dict; if (!is_data_block || r->verify_dict == nullptr) { @@ -949,7 +1009,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( UncompressionInfo uncompression_info(*verify_ctx_ptr, *verify_dict, r->compression_type); Status stat = UncompressBlockContentsForCompressionType( - uncompression_info, block_contents.data(), block_contents.size(), + uncompression_info, block_contents->data(), block_contents->size(), &contents, r->table_options.format_version, r->ioptions); if (stat.ok()) { @@ -959,12 +1019,12 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( abort_compression = true; ROCKS_LOG_ERROR(r->ioptions.info_log, "Decompressed block did not match raw block"); - out_status = + *out_status = Status::Corruption("Decompressed block did not match raw block"); } } else { // Decompression reported an error. abort. - out_status = Status::Corruption("Could not decompress"); + *out_status = Status::Corruption("Could not decompress"); abort_compression = true; } } @@ -977,9 +1037,9 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( // verification. if (abort_compression) { RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED); - type = kNoCompression; - block_contents = raw_block_contents; - } else if (type != kNoCompression) { + *type = kNoCompression; + *block_contents = raw_block_contents; + } else if (*type != kNoCompression) { if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) { RecordTimeToHistogram(r->ioptions.statistics, COMPRESSION_TIMES_NANOS, timer.ElapsedNanos()); @@ -987,7 +1047,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( RecordInHistogram(r->ioptions.statistics, BYTES_COMPRESSED, raw_block_contents.size()); RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED); - } else if (type != r->compression_type) { + } else if (*type != r->compression_type) { RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED); } } @@ -1052,10 +1112,10 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, if (io_s.ok()) { s = InsertBlockInCache(block_contents, type, handle); if (!s.ok()) { - SetStatusAtom(s); + r->SetStatus(s); } } else { - SetIOStatusAtom(io_s); + r->SetIOStatus(io_s); } if (s.ok() && io_s.ok()) { r->set_offset(r->get_offset() + block_contents.size() + @@ -1069,7 +1129,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, if (io_s.ok()) { r->set_offset(r->get_offset() + pad_bytes); } else { - SetIOStatusAtom(io_s); + r->SetIOStatus(io_s); } } if (r->compression_opts.parallel_threads > 1) { @@ -1108,10 +1168,10 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, } } } else { - SetIOStatusAtom(io_s); + r->SetIOStatus(io_s); } if (!io_s.ok() && s.ok()) { - SetStatusAtom(io_s); + r->SetStatus(io_s); } } @@ -1122,7 +1182,7 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() { while (r->pc_rep->write_queue.pop(slot)) { slot->Take(block_rep); if (!block_rep->status.ok()) { - SetStatusAtom(block_rep->status); + r->SetStatus(block_rep->status); break; } @@ -1139,7 +1199,7 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() { r->pc_rep->raw_bytes_curr_block = block_rep->data->size(); WriteRawBlock(block_rep->contents, block_rep->compression_type, &r->pending_handle, true /* is_data_block*/); - if (!r->status.ok()) { + if (!ok()) { break; } @@ -1170,40 +1230,10 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() { } } -Status BlockBasedTableBuilder::status() const { - if (rep_->compression_opts.parallel_threads > 1) { - std::lock_guard lock(rep_->status_mutex); - return rep_->status; - } else { - return rep_->status; - } -} +Status BlockBasedTableBuilder::status() const { return rep_->GetStatus(); } IOStatus BlockBasedTableBuilder::io_status() const { - if (rep_->compression_opts.parallel_threads > 1) { - std::lock_guard lock(rep_->io_status_mutex); - return rep_->io_status; - } else { - return rep_->io_status; - } -} - -void BlockBasedTableBuilder::SetStatusAtom(Status s) { - if (rep_->compression_opts.parallel_threads > 1) { - std::lock_guard lock(rep_->status_mutex); - rep_->status = s; - } else { - rep_->status = s; - } -} - -void BlockBasedTableBuilder::SetIOStatusAtom(IOStatus io_s) { - if (rep_->compression_opts.parallel_threads > 1) { - std::lock_guard lock(rep_->io_status_mutex); - rep_->io_status = io_s; - } else { - rep_->io_status = io_s; - } + return rep_->GetIOStatus(); } static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) { @@ -1294,7 +1324,7 @@ void BlockBasedTableBuilder::WriteIndexBlock( // HashIndexBuilder which is not multi-partition. assert(index_blocks.meta_blocks.empty()); } else if (ok() && !index_builder_status.ok()) { - rep_->status = index_builder_status; + rep_->SetStatus(index_builder_status); } if (ok()) { for (const auto& item : index_blocks.meta_blocks) { @@ -1319,7 +1349,7 @@ void BlockBasedTableBuilder::WriteIndexBlock( while (ok() && s.IsIncomplete()) { s = rep_->index_builder->Finish(&index_blocks, *index_block_handle); if (!s.ok() && !s.IsIncomplete()) { - rep_->status = s; + rep_->SetStatus(s); return; } if (rep_->table_options.enable_index_compression) { @@ -1469,13 +1499,13 @@ void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle, footer.set_checksum(r->table_options.checksum); std::string footer_encoding; footer.EncodeTo(&footer_encoding); - assert(r->status.ok()); - assert(r->io_status.ok()); - r->io_status = r->file->Append(footer_encoding); - if (r->io_status.ok()) { + assert(ok()); + IOStatus ios = r->file->Append(footer_encoding); + r->SetIOStatus(ios); + if (ios.ok()) { r->set_offset(r->get_offset() + footer_encoding.size()); } - r->status = r->io_status; + r->SyncStatusFromIOStatus(); } void BlockBasedTableBuilder::EnterUnbuffered() { @@ -1622,7 +1652,7 @@ Status BlockBasedTableBuilder::Finish() { WriteFooter(metaindex_block_handle, index_block_handle); } r->state = Rep::State::kClosed; - return r->status; + return r->GetStatus(); } void BlockBasedTableBuilder::Abandon() { diff --git a/table/block_based/block_based_table_builder.h b/table/block_based/block_based_table_builder.h index 0621c0691..ae1ddcb2a 100644 --- a/table/block_based/block_based_table_builder.h +++ b/table/block_based/block_based_table_builder.h @@ -111,10 +111,6 @@ class BlockBasedTableBuilder : public TableBuilder { private: bool ok() const { return status().ok(); } - void SetStatusAtom(Status status); - - void SetIOStatusAtom(IOStatus io_status); - // Transition state from buffered to unbuffered. See `Rep::State` API comment // for details of the states. // REQUIRES: `rep_->state == kBuffered` @@ -170,8 +166,8 @@ class BlockBasedTableBuilder : public TableBuilder { void CompressAndVerifyBlock( const Slice& raw_block_contents, bool is_data_block, CompressionContext& compression_ctx, UncompressionContext* verify_ctx, - std::string& compressed_output, Slice& result_block_contents, - CompressionType& result_compression_type, Status& out_status); + std::string* compressed_output, Slice* result_block_contents, + CompressionType* result_compression_type, Status* out_status); // Get compressed blocks from BGWorkCompression and write them into SST void BGWorkWriteRawBlock();