From 03a781a90c6f5b0132f4e2fe3407f4ce5140d440 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Wed, 1 Apr 2020 16:37:54 -0700 Subject: [PATCH] Add pipelined & parallel compression optimization (#6262) Summary: This PR adds support for pipelined & parallel compression optimization for `BlockBasedTableBuilder`. This optimization makes block building, block compression and block appending a pipeline, and uses multiple threads to accelerate block compression. Users can set `CompressionOptions::parallel_threads` greater than 1 to enable compression parallelism. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6262 Reviewed By: ajkr Differential Revision: D20651306 fbshipit-source-id: 62125590a9c15b6d9071def9dc72589c1696a4cb --- CMakeLists.txt | 1 + HISTORY.md | 3 + Makefile | 4 + TARGETS | 7 + db/compaction/compaction_job.cc | 3 +- db/db_basic_test.cc | 12 +- db/db_options_test.cc | 6 +- db/db_test2.cc | 44 +- db/db_with_timestamp_basic_test.cc | 9 +- include/rocksdb/advanced_options.h | 21 +- options/options.cc | 9 + options/options_helper.cc | 11 + options/options_test.cc | 8 +- src.mk | 1 + .../block_based/block_based_table_builder.cc | 570 ++++++++++++++++-- table/block_based/block_based_table_builder.h | 27 + table/block_based/block_builder.cc | 5 + table/block_based/block_builder.h | 3 + table/table_builder.h | 5 + table/table_test.cc | 24 +- tools/db_bench_tool.cc | 4 + util/work_queue.h | 149 +++++ util/work_queue_test.cc | 268 ++++++++ 23 files changed, 1101 insertions(+), 93 deletions(-) create mode 100644 util/work_queue.h create mode 100644 util/work_queue_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 4b51d62e3..3bdd6909f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1072,6 +1072,7 @@ if(WITH_TESTS) util/timer_queue_test.cc util/thread_list_test.cc util/thread_local_test.cc + util/work_queue_test.cc utilities/backupable/backupable_db_test.cc utilities/blob_db/blob_db_test.cc utilities/cassandra/cassandra_functional_test.cc diff --git a/HISTORY.md b/HISTORY.md index f9b8fcb0e..afa8633ff 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,9 @@ ### Behavior changes * Since RocksDB 6.8, ttl-based FIFO compaction can drop a file whose oldest key becomes older than options.ttl while others have not. This fix reverts this and makes ttl-based FIFO compaction use the file's flush time as the criterion. This fix also requires that max_open_files = -1 and compaction_options_fifo.allow_compaction = false to function properly. +### New Features +* Added support for pipelined & parallel compression optimization for `BlockBasedTableBuilder`. This optimization makes block building, block compression and block appending a pipeline, and uses multiple threads to accelerate block compression. Users can set `CompressionOptions::parallel_threads` greater than 1 to enable compression parallelism. + ### Bug Fixes * Fix a bug which might crash the service when write buffer manager fails to insert the dummy handle to the block cache. diff --git a/Makefile b/Makefile index c10a1515e..2c8873473 100644 --- a/Makefile +++ b/Makefile @@ -466,6 +466,7 @@ TESTS = \ hash_test \ random_test \ thread_local_test \ + work_queue_test \ rate_limiter_test \ perf_context_test \ iostats_context_test \ @@ -1295,6 +1296,9 @@ histogram_test: monitoring/histogram_test.o $(LIBOBJECTS) $(TESTHARNESS) thread_local_test: util/thread_local_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +work_queue_test: util/work_queue_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + corruption_test: db/corruption_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 654a884b6..8a0c7a9c8 100644 --- a/TARGETS +++ b/TARGETS @@ -1512,6 +1512,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "work_queue_test", + "util/work_queue_test.cc", + "serial", + [], + [], + ], [ "write_batch_test", "db/write_batch_test.cc", diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index a417585dd..8ab390799 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -937,7 +937,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { assert(sub_compact->builder != nullptr); assert(sub_compact->current_output() != nullptr); sub_compact->builder->Add(key, value); - sub_compact->current_output_file_size = sub_compact->builder->FileSize(); + sub_compact->current_output_file_size = + sub_compact->builder->EstimatedFileSize(); const ParsedInternalKey& ikey = c_iter->ikey(); sub_compact->current_output()->meta.UpdateBoundaries( key, value, ikey.sequence, ikey.type); diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index f2cfceae8..e1bd48f0b 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -1892,13 +1892,15 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) { class DBBasicTestWithParallelIO : public DBTestBase, - public testing::WithParamInterface> { + public testing::WithParamInterface< + std::tuple> { public: DBBasicTestWithParallelIO() : DBTestBase("/db_basic_test_with_parallel_io") { bool compressed_cache = std::get<0>(GetParam()); bool uncompressed_cache = std::get<1>(GetParam()); compression_enabled_ = std::get<2>(GetParam()); fill_cache_ = std::get<3>(GetParam()); + uint32_t compression_parallel_threads = std::get<4>(GetParam()); if (compressed_cache) { std::shared_ptr cache = NewLRUCache(1048576); @@ -1953,6 +1955,8 @@ class DBBasicTestWithParallelIO options.table_factory.reset(new BlockBasedTableFactory(table_options)); if (!compression_enabled_) { options.compression = kNoCompression; + } else { + options.compression_opts.parallel_threads = compression_parallel_threads; } Reopen(options); @@ -2354,10 +2358,10 @@ INSTANTIATE_TEST_CASE_P(ParallelIO, DBBasicTestWithParallelIO, // Param 1 - Uncompressed cache enabled // Param 2 - Data compression enabled // Param 3 - ReadOptions::fill_cache + // Param 4 - CompressionOptions::parallel_threads ::testing::Combine(::testing::Bool(), ::testing::Bool(), - ::testing::Bool(), - ::testing::Bool())); - + ::testing::Bool(), ::testing::Bool(), + ::testing::Values(1, 4))); } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_options_test.cc b/db/db_options_test.cc index fa3c5d529..2efef14e5 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -872,6 +872,7 @@ TEST_F(DBOptionsTest, ChangeCompression) { options.compression = CompressionType::kLZ4Compression; options.bottommost_compression = CompressionType::kNoCompression; options.bottommost_compression_opts.level = 2; + options.bottommost_compression_opts.parallel_threads = 1; ASSERT_OK(TryReopen(options)); @@ -897,12 +898,14 @@ TEST_F(DBOptionsTest, ChangeCompression) { ASSERT_TRUE(compacted); ASSERT_EQ(CompressionType::kNoCompression, compression_used); ASSERT_EQ(options.compression_opts.level, compression_opt_used.level); + ASSERT_EQ(options.compression_opts.parallel_threads, + compression_opt_used.parallel_threads); compression_used = CompressionType::kLZ4Compression; compacted = false; ASSERT_OK(dbfull()->SetOptions( {{"bottommost_compression", "kSnappyCompression"}, - {"bottommost_compression_opts", "0:6:0:0:0:true"}})); + {"bottommost_compression_opts", "0:6:0:0:0:4:true"}})); ASSERT_OK(Put("foo", "foofoofoo")); ASSERT_OK(Put("bar", "foofoofoo")); ASSERT_OK(Flush()); @@ -913,6 +916,7 @@ TEST_F(DBOptionsTest, ChangeCompression) { ASSERT_TRUE(compacted); ASSERT_EQ(CompressionType::kSnappyCompression, compression_used); ASSERT_EQ(6, compression_opt_used.level); + ASSERT_EQ(4u, compression_opt_used.parallel_threads); SyncPoint::GetInstance()->DisableProcessing(); } diff --git a/db/db_test2.cc b/db/db_test2.cc index cf2c19d72..3444c4772 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -1288,6 +1288,10 @@ TEST_F(DBTest2, CompressionOptions) { const int kValSize = 20; Random rnd(301); + std::vector compression_parallel_threads = {1, 4}; + + std::map key_value_written; + for (int iter = 0; iter <= 2; iter++) { listener->max_level_checked = 0; @@ -1312,19 +1316,37 @@ TEST_F(DBTest2, CompressionOptions) { options.bottommost_compression = kDisableCompressionOption; } - DestroyAndReopen(options); - // Write 10 random files - for (int i = 0; i < 10; i++) { - for (int j = 0; j < 5; j++) { - ASSERT_OK( - Put(RandomString(&rnd, kKeySize), RandomString(&rnd, kValSize))); + for (auto num_threads : compression_parallel_threads) { + options.compression_opts.parallel_threads = num_threads; + options.bottommost_compression_opts.parallel_threads = num_threads; + + DestroyAndReopen(options); + // Write 10 random files + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 5; j++) { + std::string key = RandomString(&rnd, kKeySize); + std::string value = RandomString(&rnd, kValSize); + key_value_written[key] = value; + ASSERT_OK(Put(key, value)); + } + ASSERT_OK(Flush()); + dbfull()->TEST_WaitForCompact(); } - ASSERT_OK(Flush()); - dbfull()->TEST_WaitForCompact(); - } - // Make sure that we wrote enough to check all 7 levels - ASSERT_EQ(listener->max_level_checked, 6); + // Make sure that we wrote enough to check all 7 levels + ASSERT_EQ(listener->max_level_checked, 6); + + // Make sure database content is the same as key_value_written + std::unique_ptr db_iter(db_->NewIterator(ReadOptions())); + for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) { + std::string key = db_iter->key().ToString(); + std::string value = db_iter->value().ToString(); + ASSERT_NE(key_value_written.find(key), key_value_written.end()); + ASSERT_EQ(key_value_written[key], value); + key_value_written.erase(key); + } + ASSERT_EQ(0, key_value_written.size()); + } } } diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 184f08b17..900582226 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -419,8 +419,9 @@ TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) { class DBBasicTestWithTimestampCompressionSettings : public DBBasicTestWithTimestampBase, - public testing::WithParamInterface, CompressionType, uint32_t>> { + public testing::WithParamInterface< + std::tuple, CompressionType, + uint32_t, uint32_t>> { public: DBBasicTestWithTimestampCompressionSettings() : DBBasicTestWithTimestampBase( @@ -460,6 +461,7 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) { if (comp_type == kZSTD) { options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam()); } + options.compression_opts.parallel_threads = std::get<3>(GetParam()); options.target_file_size_base = 1 << 26; // 64MB DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); @@ -572,6 +574,7 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { if (comp_type == kZSTD) { options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam()); } + options.compression_opts.parallel_threads = std::get<3>(GetParam()); DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); @@ -749,7 +752,7 @@ INSTANTIATE_TEST_CASE_P( NewBloomFilterPolicy(10, false))), ::testing::Values(kNoCompression, kZlibCompression, kLZ4Compression, kLZ4HCCompression, kZSTD), - ::testing::Values(0, 1 << 14))); + ::testing::Values(0, 1 << 14), ::testing::Values(1, 4))); class DBBasicTestWithTimestampPrefixSeek : public DBBasicTestWithTimestampBase, diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index a72edbe05..ac4d677fe 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -117,6 +117,22 @@ struct CompressionOptions { // Default: 0. uint32_t zstd_max_train_bytes; + // Number of threads for parallel compression. + // Parallel compression is enabled only if threads > 1. + // + // This option is valid only when BlockBasedTable is used. + // + // When parallel compression is enabled, SST size estimation becomes less + // accurate, because block building and compression are pipelined, and there + // might be inflight blocks being compressed and not finally written, when + // current SST size is fetched. This brings inflation of final output file + // size. + // To be more accurate, this inflation is also estimated by using historical + // compression ratio and current bytes inflight. + // + // Default: 1. + uint32_t parallel_threads; + // When the compression options are set by the user, it will be set to "true". // For bottommost_compression_opts, to enable it, user must set enabled=true. // Otherwise, bottommost compression will use compression_opts as default @@ -134,14 +150,17 @@ struct CompressionOptions { strategy(0), max_dict_bytes(0), zstd_max_train_bytes(0), + parallel_threads(1), enabled(false) {} CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes, - int _zstd_max_train_bytes, bool _enabled) + int _zstd_max_train_bytes, int _parallel_threads, + bool _enabled) : window_bits(wbits), level(_lev), strategy(_strategy), max_dict_bytes(_max_dict_bytes), zstd_max_train_bytes(_zstd_max_train_bytes), + parallel_threads(_parallel_threads), enabled(_enabled) {} }; diff --git a/options/options.cc b/options/options.cc index fe676597e..3a611af23 100644 --- a/options/options.cc +++ b/options/options.cc @@ -182,6 +182,11 @@ void ColumnFamilyOptions::Dump(Logger* log) const { " Options.bottommost_compression_opts.zstd_max_train_bytes: " "%" PRIu32, bottommost_compression_opts.zstd_max_train_bytes); + ROCKS_LOG_HEADER( + log, + " Options.bottommost_compression_opts.parallel_threads: " + "%" PRIu32, + bottommost_compression_opts.parallel_threads); ROCKS_LOG_HEADER( log, " Options.bottommost_compression_opts.enabled: %s", bottommost_compression_opts.enabled ? "true" : "false"); @@ -199,6 +204,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const { " Options.compression_opts.zstd_max_train_bytes: " "%" PRIu32, compression_opts.zstd_max_train_bytes); + ROCKS_LOG_HEADER(log, + " Options.compression_opts.parallel_threads: " + "%" PRIu32, + compression_opts.parallel_threads); ROCKS_LOG_HEADER(log, " Options.compression_opts.enabled: %s", compression_opts.enabled ? "true" : "false"); diff --git a/options/options_helper.cc b/options/options_helper.cc index eeb32efc7..4a9c0fe96 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -835,6 +835,17 @@ Status ParseCompressionOptions(const std::string& value, ParseInt(value.substr(start, value.size() - start)); end = value.find(':', start); } + // parallel_threads is optional for backwards compatibility + if (end != std::string::npos) { + start = end + 1; + if (start >= value.size()) { + return Status::InvalidArgument( + "unable to parse the specified CF option " + name); + } + compression_opts.parallel_threads = + ParseInt(value.substr(start, value.size() - start)); + end = value.find(':', start); + } // enabled is optional for backwards compatibility if (end != std::string::npos) { start = end + 1; diff --git a/options/options_test.cc b/options/options_test.cc index 9ff99532c..b795daa0e 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -63,8 +63,8 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { "kZSTD:" "kZSTDNotFinalCompression"}, {"bottommost_compression", "kLZ4Compression"}, - {"bottommost_compression_opts", "5:6:7:8:9:true"}, - {"compression_opts", "4:5:6:7:8:true"}, + {"bottommost_compression_opts", "5:6:7:8:9:10:true"}, + {"compression_opts", "4:5:6:7:8:9:true"}, {"num_levels", "8"}, {"level0_file_num_compaction_trigger", "8"}, {"level0_slowdown_writes_trigger", "9"}, @@ -168,6 +168,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.compression_opts.strategy, 6); ASSERT_EQ(new_cf_opt.compression_opts.max_dict_bytes, 7u); ASSERT_EQ(new_cf_opt.compression_opts.zstd_max_train_bytes, 8u); + ASSERT_EQ(new_cf_opt.compression_opts.parallel_threads, 9u); ASSERT_EQ(new_cf_opt.compression_opts.enabled, true); ASSERT_EQ(new_cf_opt.bottommost_compression, kLZ4Compression); ASSERT_EQ(new_cf_opt.bottommost_compression_opts.window_bits, 5); @@ -175,6 +176,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.bottommost_compression_opts.strategy, 7); ASSERT_EQ(new_cf_opt.bottommost_compression_opts.max_dict_bytes, 8u); ASSERT_EQ(new_cf_opt.bottommost_compression_opts.zstd_max_train_bytes, 9u); + ASSERT_EQ(new_cf_opt.bottommost_compression_opts.parallel_threads, 10u); ASSERT_EQ(new_cf_opt.bottommost_compression_opts.enabled, true); ASSERT_EQ(new_cf_opt.num_levels, 8); ASSERT_EQ(new_cf_opt.level0_file_num_compaction_trigger, 8); @@ -801,6 +803,7 @@ TEST_F(OptionsTest, GetOptionsFromStringTest) { ASSERT_EQ(new_options.compression_opts.strategy, 6); ASSERT_EQ(new_options.compression_opts.max_dict_bytes, 0u); ASSERT_EQ(new_options.compression_opts.zstd_max_train_bytes, 0u); + ASSERT_EQ(new_options.compression_opts.parallel_threads, 1u); ASSERT_EQ(new_options.compression_opts.enabled, false); ASSERT_EQ(new_options.bottommost_compression, kDisableCompressionOption); ASSERT_EQ(new_options.bottommost_compression_opts.window_bits, 5); @@ -808,6 +811,7 @@ TEST_F(OptionsTest, GetOptionsFromStringTest) { ASSERT_EQ(new_options.bottommost_compression_opts.strategy, 7); ASSERT_EQ(new_options.bottommost_compression_opts.max_dict_bytes, 0u); ASSERT_EQ(new_options.bottommost_compression_opts.zstd_max_train_bytes, 0u); + ASSERT_EQ(new_options.bottommost_compression_opts.parallel_threads, 1u); ASSERT_EQ(new_options.bottommost_compression_opts.enabled, false); ASSERT_EQ(new_options.write_buffer_size, 10U); ASSERT_EQ(new_options.max_write_buffer_number, 16); diff --git a/src.mk b/src.mk index aa9b439a1..cbd2d0e6d 100644 --- a/src.mk +++ b/src.mk @@ -450,6 +450,7 @@ MAIN_SOURCES = \ util/timer_queue_test.cc \ util/thread_list_test.cc \ util/thread_local_test.cc \ + util/work_queue_test.cc \ utilities/backupable/backupable_db_test.cc \ utilities/blob_db/blob_db_test.cc \ utilities/cassandra/cassandra_format_test.cc \ diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index fda8125a6..4aa586edf 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -46,6 +47,7 @@ #include "util/crc32c.h" #include "util/stop_watch.h" #include "util/string_util.h" +#include "util/work_queue.h" #include "util/xxhash.h" namespace ROCKSDB_NAMESPACE { @@ -284,6 +286,10 @@ struct BlockBasedTableBuilder::Rep { uint64_t offset = 0; Status status; IOStatus io_status; + // Synchronize status & io_status accesses across threads from main thread, + // compression thread and write thread in parallel compression. + std::mutex status_mutex; + std::mutex io_status_mutex; size_t alignment; BlockBuilder data_block; // Buffers uncompressed data blocks and keys to replay later. Needed when @@ -300,12 +306,13 @@ struct BlockBasedTableBuilder::Rep { PartitionedIndexBuilder* p_index_builder_ = nullptr; std::string last_key; + const Slice* first_key_in_next_block = nullptr; CompressionType compression_type; uint64_t sample_for_compression; CompressionOptions compression_opts; std::unique_ptr compression_dict; - CompressionContext compression_ctx; - std::unique_ptr verify_ctx; + std::vector> compression_ctxs; + std::vector> verify_ctxs; std::unique_ptr verify_dict; size_t data_begin_offset = 0; @@ -356,6 +363,8 @@ struct BlockBasedTableBuilder::Rep { std::vector> table_properties_collectors; + std::unique_ptr pc_rep; + Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions, const BlockBasedTableOptions& table_opt, const InternalKeyComparator& icomparator, @@ -390,7 +399,8 @@ struct BlockBasedTableBuilder::Rep { sample_for_compression(_sample_for_compression), compression_opts(_compression_opts), compression_dict(), - compression_ctx(_compression_type), + compression_ctxs(_compression_opts.parallel_threads), + verify_ctxs(_compression_opts.parallel_threads), verify_dict(), state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered : State::kUnbuffered), @@ -407,6 +417,9 @@ struct BlockBasedTableBuilder::Rep { oldest_key_time(_oldest_key_time), target_file_size(_target_file_size), file_creation_time(_file_creation_time) { + for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { + compression_ctxs[i].reset(new CompressionContext(compression_type)); + } if (table_options.index_type == BlockBasedTableOptions::kTwoLevelIndexSearch) { p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( @@ -441,8 +454,10 @@ struct BlockBasedTableBuilder::Rep { table_options.index_type, table_options.whole_key_filtering, _moptions.prefix_extractor != nullptr)); if (table_options.verify_compression) { - verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(), - compression_type)); + for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { + verify_ctxs[i].reset(new UncompressionContext( + UncompressionContext::NoCache(), compression_type)); + } } } @@ -452,6 +467,148 @@ struct BlockBasedTableBuilder::Rep { ~Rep() {} }; +struct BlockBasedTableBuilder::ParallelCompressionRep { + // Keys is a wrapper of vector of strings avoiding + // releasing string memories during vector clear() + // in order to save memory allocation overhead + class Keys { + public: + Keys() : keys_(kKeysInitSize), size_(0) {} + void PushBack(const Slice& key) { + if (size_ == keys_.size()) { + keys_.emplace_back(key.data(), key.size()); + } else { + keys_[size_].assign(key.data(), key.size()); + } + size_++; + } + void SwapAssign(std::vector& keys) { + size_ = keys.size(); + std::swap(keys_, keys); + } + void Clear() { size_ = 0; } + size_t Size() { return size_; } + std::string& Back() { return keys_[size_ - 1]; } + std::string& operator[](size_t idx) { + assert(idx < size_); + return keys_[idx]; + } + + private: + const size_t kKeysInitSize = 32; + std::vector keys_; + size_t size_; + }; + std::unique_ptr curr_block_keys; + + class BlockRepSlot; + + // BlockRep instances are fetched from and recycled to + // block_rep_pool during parallel compression. + struct BlockRep { + Slice contents; + std::unique_ptr data; + std::unique_ptr compressed_data; + CompressionType compression_type; + std::unique_ptr first_key_in_next_block; + std::unique_ptr keys; + std::unique_ptr slot; + Status status; + }; + // Use a vector of BlockRep as a buffer for a determined number + // of BlockRep structures. All data referenced by pointers in + // BlockRep will be freed when this vector is destructed. + typedef std::vector BlockRepBuffer; + BlockRepBuffer block_rep_buf; + // Use a thread-safe queue for concurrent access from block + // building thread and writer thread. + typedef WorkQueue BlockRepPool; + BlockRepPool block_rep_pool; + + // Use BlockRepSlot to keep block order in write thread. + // slot_ will pass references to BlockRep + class BlockRepSlot { + public: + BlockRepSlot() : slot_(1) {} + template + void Fill(T&& rep) { + slot_.push(std::forward(rep)); + }; + void Take(BlockRep*& rep) { slot_.pop(rep); } + + private: + // slot_ will pass references to BlockRep in block_rep_buf, + // and those references are always valid before the destruction of + // block_rep_buf. + WorkQueue slot_; + }; + + // Compression queue will pass references to BlockRep in block_rep_buf, + // and those references are always valid before the destruction of + // block_rep_buf. + typedef WorkQueue CompressQueue; + CompressQueue compress_queue; + std::vector compress_thread_pool; + + // Write queue will pass references to BlockRep::slot in block_rep_buf, + // and those references are always valid before the corresponding + // BlockRep::slot is destructed, which is before the destruction of + // block_rep_buf. + typedef WorkQueue WriteQueue; + WriteQueue write_queue; + std::unique_ptr write_thread; + + // Raw bytes compressed so far. + uint64_t raw_bytes_compressed; + // Size of current block being appended. + uint64_t raw_bytes_curr_block; + // Raw bytes under compression and not appended yet. + std::atomic raw_bytes_inflight; + // Number of blocks under compression and not appended yet. + std::atomic blocks_inflight; + // Current compression ratio, maintained by BGWorkWriteRawBlock. + double curr_compression_ratio; + // Estimated SST file size. + uint64_t estimated_file_size; + + // Wait for the completion of first block compression to get a + // non-zero compression ratio. + bool first_block; + std::condition_variable first_block_cond; + std::mutex first_block_mutex; + + bool finished; + + ParallelCompressionRep(uint32_t parallel_threads) + : curr_block_keys(new Keys()), + block_rep_buf(parallel_threads), + block_rep_pool(parallel_threads), + compress_queue(parallel_threads), + write_queue(parallel_threads), + raw_bytes_compressed(0), + raw_bytes_curr_block(0), + raw_bytes_inflight(0), + blocks_inflight(0), + curr_compression_ratio(0), + estimated_file_size(0), + first_block(true), + finished(false) { + for (uint32_t i = 0; i < parallel_threads; i++) { + block_rep_buf[i].contents = Slice(); + block_rep_buf[i].data.reset(new std::string()); + block_rep_buf[i].compressed_data.reset(new std::string()); + block_rep_buf[i].compression_type = CompressionType(); + block_rep_buf[i].first_key_in_next_block.reset(new std::string()); + block_rep_buf[i].keys.reset(new Keys()); + block_rep_buf[i].slot.reset(new BlockRepSlot()); + block_rep_buf[i].status = Status::OK(); + block_rep_pool.push(&block_rep_buf[i]); + } + } + + ~ParallelCompressionRep() { block_rep_pool.finish(); } +}; + BlockBasedTableBuilder::BlockBasedTableBuilder( const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions, const BlockBasedTableOptions& table_options, @@ -493,6 +650,21 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( &rep_->compressed_cache_key_prefix[0], &rep_->compressed_cache_key_prefix_size); } + + if (rep_->compression_opts.parallel_threads > 1) { + rep_->pc_rep.reset( + new ParallelCompressionRep(rep_->compression_opts.parallel_threads)); + rep_->pc_rep->compress_thread_pool.reserve( + rep_->compression_opts.parallel_threads); + for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) { + rep_->pc_rep->compress_thread_pool.emplace_back([=] { + BGWorkCompression(*(rep_->compression_ctxs[i]), + rep_->verify_ctxs[i].get()); + }); + } + rep_->pc_rep->write_thread.reset( + new port::Thread([=] { BGWorkWriteRawBlock(); })); + } } BlockBasedTableBuilder::~BlockBasedTableBuilder() { @@ -516,6 +688,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { auto should_flush = r->flush_block_policy->Update(key, value); if (should_flush) { assert(!r->data_block.empty()); + r->first_key_in_next_block = &key; Flush(); if (r->state == Rep::State::kBuffered && @@ -532,15 +705,27 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { // entries in the first block and < all entries in subsequent // blocks. if (ok() && r->state == Rep::State::kUnbuffered) { - r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle); + if (r->compression_opts.parallel_threads > 1) { + r->pc_rep->curr_block_keys->Clear(); + } else { + r->index_builder->AddIndexEntry(&r->last_key, &key, + r->pending_handle); + } } } // Note: PartitionedFilterBlockBuilder requires key being added to filter // builder after being added to index builder. - if (r->state == Rep::State::kUnbuffered && r->filter_builder != nullptr) { - size_t ts_sz = r->internal_comparator.user_comparator()->timestamp_size(); - r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz)); + if (r->state == Rep::State::kUnbuffered) { + if (r->compression_opts.parallel_threads > 1) { + r->pc_rep->curr_block_keys->PushBack(key); + } else { + if (r->filter_builder != nullptr) { + size_t ts_sz = + r->internal_comparator.user_comparator()->timestamp_size(); + r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz)); + } + } } r->last_key.assign(key.data(), key.size()); @@ -553,7 +738,9 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { } r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString()); } else { - r->index_builder->OnKeyAdded(key); + if (r->compression_opts.parallel_threads == 1) { + r->index_builder->OnKeyAdded(key); + } } NotifyCollectTableCollectorsOnAdd(key, value, r->offset, r->table_properties_collectors, @@ -586,7 +773,57 @@ void BlockBasedTableBuilder::Flush() { assert(rep_->state != Rep::State::kClosed); if (!ok()) return; if (r->data_block.empty()) return; - WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */); + if (r->compression_opts.parallel_threads > 1 && + r->state == Rep::State::kUnbuffered) { + ParallelCompressionRep::BlockRep* block_rep; + r->pc_rep->block_rep_pool.pop(block_rep); + + r->data_block.Finish(); + r->data_block.SwapAndReset(*(block_rep->data)); + + block_rep->contents = *(block_rep->data); + + block_rep->compression_type = r->compression_type; + + std::swap(block_rep->keys, r->pc_rep->curr_block_keys); + r->pc_rep->curr_block_keys->Clear(); + + if (r->first_key_in_next_block == nullptr) { + block_rep->first_key_in_next_block.reset(nullptr); + } else { + block_rep->first_key_in_next_block->assign( + r->first_key_in_next_block->data(), + r->first_key_in_next_block->size()); + } + + uint64_t new_raw_bytes_inflight = + r->pc_rep->raw_bytes_inflight.fetch_add(block_rep->data->size(), + std::memory_order_relaxed) + + block_rep->data->size(); + uint64_t new_blocks_inflight = + r->pc_rep->blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; + r->pc_rep->estimated_file_size = + r->offset + + static_cast(static_cast(new_raw_bytes_inflight) * + r->pc_rep->curr_compression_ratio) + + new_blocks_inflight * kBlockTrailerSize; + + assert(block_rep->status.ok()); + if (!r->pc_rep->write_queue.push(block_rep->slot.get())) { + return; + } + if (!r->pc_rep->compress_queue.push(block_rep)) { + return; + } + + if (r->pc_rep->first_block) { + std::unique_lock lock(r->pc_rep->first_block_mutex); + r->pc_rep->first_block_cond.wait(lock, + [=] { return !r->pc_rep->first_block; }); + } + } else { + WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */); + } } void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, @@ -599,6 +836,43 @@ void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, BlockHandle* handle, bool is_data_block) { + Rep* r = rep_; + Slice block_contents; + CompressionType type; + CompressAndVerifyBlock(raw_block_contents, is_data_block, + *(r->compression_ctxs[0]), r->verify_ctxs[0].get(), + r->compressed_output, block_contents, type, r->status); + if (!ok()) { + return; + } + WriteRawBlock(block_contents, type, handle, is_data_block); + r->compressed_output.clear(); + if (is_data_block) { + if (r->filter_builder != nullptr) { + r->filter_builder->StartBlock(r->offset); + } + r->props.data_size = r->offset; + ++r->props.num_data_blocks; + } +} + +void BlockBasedTableBuilder::BGWorkCompression( + CompressionContext& compression_ctx, UncompressionContext* verify_ctx) { + ParallelCompressionRep::BlockRep* block_rep; + while (rep_->pc_rep->compress_queue.pop(block_rep)) { + CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/ + compression_ctx, verify_ctx, + *(block_rep->compressed_data), block_rep->contents, + block_rep->compression_type, block_rep->status); + block_rep->slot->Fill(block_rep); + } +} + +void BlockBasedTableBuilder::CompressAndVerifyBlock( + const Slice& raw_block_contents, bool is_data_block, + CompressionContext& compression_ctx, UncompressionContext* verify_ctx_ptr, + std::string& compressed_output, Slice& block_contents, + CompressionType& type, Status& out_status) { // File format contains a sequence of blocks where each block has: // block_data: uint8[n] // type: uint8 @@ -606,9 +880,8 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, assert(ok()); Rep* r = rep_; - auto type = r->compression_type; + type = r->compression_type; uint64_t sample_for_compression = r->sample_for_compression; - Slice block_contents; bool abort_compression = false; StopWatchNano timer( @@ -631,7 +904,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, compression_dict = r->compression_dict.get(); } assert(compression_dict != nullptr); - CompressionInfo compression_info(r->compression_opts, r->compression_ctx, + CompressionInfo compression_info(r->compression_opts, compression_ctx, *compression_dict, type, sample_for_compression); @@ -640,7 +913,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, block_contents = CompressBlock( raw_block_contents, compression_info, &type, r->table_options.format_version, is_data_block /* do_sample */, - &r->compressed_output, &sampled_output_fast, &sampled_output_slow); + &compressed_output, &sampled_output_fast, &sampled_output_slow); // notify collectors on block add NotifyCollectTableCollectorsOnBlockAdd( @@ -660,7 +933,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, } assert(verify_dict != nullptr); BlockContents contents; - UncompressionInfo uncompression_info(*r->verify_ctx, *verify_dict, + UncompressionInfo uncompression_info(*verify_ctx_ptr, *verify_dict, r->compression_type); Status stat = UncompressBlockContentsForCompressionType( uncompression_info, block_contents.data(), block_contents.size(), @@ -673,12 +946,12 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, abort_compression = true; ROCKS_LOG_ERROR(r->ioptions.info_log, "Decompressed block did not match raw block"); - r->status = + out_status = Status::Corruption("Decompressed block did not match raw block"); } } else { // Decompression reported an error. abort. - r->status = Status::Corruption("Could not decompress"); + out_status = Status::Corruption("Could not decompress"); abort_compression = true; } } @@ -704,16 +977,6 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, } else if (type != r->compression_type) { RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED); } - - WriteRawBlock(block_contents, type, handle, is_data_block); - r->compressed_output.clear(); - if (is_data_block) { - if (r->filter_builder != nullptr) { - r->filter_builder->StartBlock(r->offset); - } - r->props.data_size = r->offset; - ++r->props.num_data_blocks; - } } void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, @@ -721,13 +984,15 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, BlockHandle* handle, bool is_data_block) { Rep* r = rep_; + Status s = Status::OK(); + IOStatus io_s = IOStatus::OK(); StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS); handle->set_offset(r->offset); handle->set_size(block_contents.size()); - assert(r->status.ok()); - assert(r->io_status.ok()); - r->io_status = r->file->Append(block_contents); - if (r->io_status.ok()) { + assert(status().ok()); + assert(io_status().ok()); + io_s = r->file->Append(block_contents); + if (io_s.ok()) { char trailer[kBlockTrailerSize]; trailer[0] = type; char* trailer_without_type = trailer + 1; @@ -766,34 +1031,157 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, } } - assert(r->io_status.ok()); + assert(io_s.ok()); TEST_SYNC_POINT_CALLBACK( "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum", static_cast(trailer)); - r->io_status = r->file->Append(Slice(trailer, kBlockTrailerSize)); - if (r->io_status.ok()) { - r->status = InsertBlockInCache(block_contents, type, handle); + io_s = r->file->Append(Slice(trailer, kBlockTrailerSize)); + if (io_s.ok()) { + s = InsertBlockInCache(block_contents, type, handle); + if (!s.ok()) { + SetStatusAtom(s); + } + } else { + SetIOStatusAtom(io_s); } - if (r->status.ok() && r->io_status.ok()) { + if (s.ok() && io_s.ok()) { r->offset += block_contents.size() + kBlockTrailerSize; if (r->table_options.block_align && is_data_block) { size_t pad_bytes = (r->alignment - ((block_contents.size() + kBlockTrailerSize) & (r->alignment - 1))) & (r->alignment - 1); - r->io_status = r->file->Pad(pad_bytes); - if (r->io_status.ok()) { + io_s = r->file->Pad(pad_bytes); + if (io_s.ok()) { r->offset += pad_bytes; + } else { + SetIOStatusAtom(io_s); + } + } + if (r->compression_opts.parallel_threads > 1) { + if (!r->pc_rep->finished) { + r->pc_rep->curr_compression_ratio = + (r->pc_rep->curr_compression_ratio * + r->pc_rep->raw_bytes_compressed + + block_contents.size()) / + static_cast(r->pc_rep->raw_bytes_compressed + + r->pc_rep->raw_bytes_curr_block); + r->pc_rep->raw_bytes_compressed += r->pc_rep->raw_bytes_curr_block; + uint64_t new_raw_bytes_inflight = + r->pc_rep->raw_bytes_inflight.fetch_sub( + r->pc_rep->raw_bytes_curr_block, std::memory_order_relaxed) - + r->pc_rep->raw_bytes_curr_block; + uint64_t new_blocks_inflight = r->pc_rep->blocks_inflight.fetch_sub( + 1, std::memory_order_relaxed) - + 1; + r->pc_rep->estimated_file_size = + r->offset + + static_cast(static_cast(new_raw_bytes_inflight) * + r->pc_rep->curr_compression_ratio) + + new_blocks_inflight * kBlockTrailerSize; + } else { + r->pc_rep->estimated_file_size = r->offset; } } } + } else { + SetIOStatusAtom(io_s); + } + if (!io_s.ok() && s.ok()) { + SetStatusAtom(io_s); } - r->status = r->io_status; } -Status BlockBasedTableBuilder::status() const { return rep_->status; } +void BlockBasedTableBuilder::BGWorkWriteRawBlock() { + Rep* r = rep_; + ParallelCompressionRep::BlockRepSlot* slot; + ParallelCompressionRep::BlockRep* block_rep; + while (r->pc_rep->write_queue.pop(slot)) { + slot->Take(block_rep); + if (!block_rep->status.ok()) { + SetStatusAtom(block_rep->status); + break; + } + + for (size_t i = 0; i < block_rep->keys->Size(); i++) { + auto& key = (*block_rep->keys)[i]; + if (r->filter_builder != nullptr) { + size_t ts_sz = + r->internal_comparator.user_comparator()->timestamp_size(); + r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz)); + } + r->index_builder->OnKeyAdded(key); + } + + r->pc_rep->raw_bytes_curr_block = block_rep->data->size(); + WriteRawBlock(block_rep->contents, block_rep->compression_type, + &r->pending_handle, true /* is_data_block*/); + if (!r->status.ok()) { + break; + } -IOStatus BlockBasedTableBuilder::io_status() const { return rep_->io_status; } + if (r->pc_rep->first_block) { + std::unique_lock lock(r->pc_rep->first_block_mutex); + r->pc_rep->first_block = false; + r->pc_rep->first_block_cond.notify_one(); + } + + if (r->filter_builder != nullptr) { + r->filter_builder->StartBlock(r->offset); + } + r->props.data_size = r->offset; + ++r->props.num_data_blocks; + + if (block_rep->first_key_in_next_block == nullptr) { + r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), nullptr, + r->pending_handle); + } else { + Slice first_key_in_next_block = + Slice(*block_rep->first_key_in_next_block); + r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), + &first_key_in_next_block, + r->pending_handle); + } + block_rep->compressed_data->clear(); + r->pc_rep->block_rep_pool.push(block_rep); + } +} + +Status BlockBasedTableBuilder::status() const { + if (rep_->compression_opts.parallel_threads > 1) { + std::lock_guard lock(rep_->status_mutex); + return rep_->status; + } else { + return rep_->status; + } +} + +IOStatus BlockBasedTableBuilder::io_status() const { + if (rep_->compression_opts.parallel_threads > 1) { + std::lock_guard lock(rep_->io_status_mutex); + return rep_->io_status; + } else { + return rep_->io_status; + } +} + +void BlockBasedTableBuilder::SetStatusAtom(Status status) { + if (rep_->compression_opts.parallel_threads > 1) { + std::lock_guard lock(rep_->status_mutex); + rep_->status = status; + } else { + rep_->status = status; + } +} + +void BlockBasedTableBuilder::SetIOStatusAtom(IOStatus io_status) { + if (rep_->compression_opts.parallel_threads > 1) { + std::lock_guard lock(rep_->io_status_mutex); + rep_->io_status = io_status; + } else { + rep_->io_status = io_status; + } +} static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) { BlockContents* bc = reinterpret_cast(value); @@ -1108,26 +1496,54 @@ void BlockBasedTableBuilder::EnterUnbuffered() { r->compression_type == kZSTDNotFinalCompression)); for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) { - const auto& data_block = r->data_block_and_keys_buffers[i].first; + auto& data_block = r->data_block_and_keys_buffers[i].first; auto& keys = r->data_block_and_keys_buffers[i].second; assert(!data_block.empty()); assert(!keys.empty()); - for (const auto& key : keys) { - if (r->filter_builder != nullptr) { - size_t ts_sz = - r->internal_comparator.user_comparator()->timestamp_size(); - r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz)); + if (r->compression_opts.parallel_threads > 1) { + ParallelCompressionRep::BlockRep* block_rep; + r->pc_rep->block_rep_pool.pop(block_rep); + + std::swap(*(block_rep->data), data_block); + block_rep->contents = *(block_rep->data); + + block_rep->compression_type = r->compression_type; + + block_rep->keys->SwapAssign(keys); + + if (i + 1 < r->data_block_and_keys_buffers.size()) { + block_rep->first_key_in_next_block->assign( + r->data_block_and_keys_buffers[i + 1].second.front()); + } else { + block_rep->first_key_in_next_block.reset(nullptr); + } + + assert(block_rep->status.ok()); + if (!r->pc_rep->write_queue.push(block_rep->slot.get())) { + return; + } + if (!r->pc_rep->compress_queue.push(block_rep)) { + return; + } + } else { + for (const auto& key : keys) { + if (r->filter_builder != nullptr) { + size_t ts_sz = + r->internal_comparator.user_comparator()->timestamp_size(); + r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz)); + } + r->index_builder->OnKeyAdded(key); + } + WriteBlock(Slice(data_block), &r->pending_handle, + true /* is_data_block */); + if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) { + Slice first_key_in_next_block = + r->data_block_and_keys_buffers[i + 1].second.front(); + Slice* first_key_in_next_block_ptr = &first_key_in_next_block; + r->index_builder->AddIndexEntry( + &keys.back(), first_key_in_next_block_ptr, r->pending_handle); } - r->index_builder->OnKeyAdded(key); - } - WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */); - if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) { - Slice first_key_in_next_block = - r->data_block_and_keys_buffers[i + 1].second.front(); - Slice* first_key_in_next_block_ptr = &first_key_in_next_block; - r->index_builder->AddIndexEntry(&keys.back(), first_key_in_next_block_ptr, - r->pending_handle); } } r->data_block_and_keys_buffers.clear(); @@ -1137,15 +1553,26 @@ Status BlockBasedTableBuilder::Finish() { Rep* r = rep_; assert(r->state != Rep::State::kClosed); bool empty_data_block = r->data_block.empty(); + r->first_key_in_next_block = nullptr; Flush(); if (r->state == Rep::State::kBuffered) { EnterUnbuffered(); } - // To make sure properties block is able to keep the accurate size of index - // block, we will finish writing all index entries first. - if (ok() && !empty_data_block) { - r->index_builder->AddIndexEntry( - &r->last_key, nullptr /* no next data block */, r->pending_handle); + if (r->compression_opts.parallel_threads > 1) { + r->pc_rep->compress_queue.finish(); + for (auto& thread : r->pc_rep->compress_thread_pool) { + thread.join(); + } + r->pc_rep->write_queue.finish(); + r->pc_rep->write_thread->join(); + r->pc_rep->finished = true; + } else { + // To make sure properties block is able to keep the accurate size of index + // block, we will finish writing all index entries first. + if (ok() && !empty_data_block) { + r->index_builder->AddIndexEntry( + &r->last_key, nullptr /* no next data block */, r->pending_handle); + } } // Write meta blocks, metaindex block and footer in the following order. @@ -1177,6 +1604,15 @@ Status BlockBasedTableBuilder::Finish() { void BlockBasedTableBuilder::Abandon() { assert(rep_->state != Rep::State::kClosed); + if (rep_->compression_opts.parallel_threads > 1) { + rep_->pc_rep->compress_queue.finish(); + for (auto& thread : rep_->pc_rep->compress_thread_pool) { + thread.join(); + } + rep_->pc_rep->write_queue.finish(); + rep_->pc_rep->write_thread->join(); + rep_->pc_rep->finished = true; + } rep_->state = Rep::State::kClosed; } @@ -1186,6 +1622,16 @@ uint64_t BlockBasedTableBuilder::NumEntries() const { uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; } +uint64_t BlockBasedTableBuilder::EstimatedFileSize() const { + if (rep_->compression_opts.parallel_threads > 1) { + // Use compression ratio so far and inflight raw bytes to estimate + // final SST size. + return rep_->pc_rep->estimated_file_size; + } else { + return FileSize(); + } +} + bool BlockBasedTableBuilder::NeedCompact() const { for (const auto& collector : rep_->table_properties_collectors) { if (collector->NeedCompact()) { diff --git a/table/block_based/block_based_table_builder.h b/table/block_based/block_based_table_builder.h index 353bc80ae..2b0f61fba 100644 --- a/table/block_based/block_based_table_builder.h +++ b/table/block_based/block_based_table_builder.h @@ -90,6 +90,11 @@ class BlockBasedTableBuilder : public TableBuilder { // Finish() call, returns the size of the final generated file. uint64_t FileSize() const override; + // Estimated size of the file generated so far. This is used when + // FileSize() cannot estimate final SST size, e.g. parallel compression + // is enabled. + uint64_t EstimatedFileSize() const override; + bool NeedCompact() const override; // Get table properties @@ -104,6 +109,10 @@ class BlockBasedTableBuilder : public TableBuilder { private: bool ok() const { return status().ok(); } + void SetStatusAtom(Status status); + + void SetIOStatusAtom(IOStatus io_status); + // Transition state from buffered to unbuffered. See `Rep::State` API comment // for details of the states. // REQUIRES: `rep_->state == kBuffered` @@ -137,6 +146,8 @@ class BlockBasedTableBuilder : public TableBuilder { class BlockBasedTablePropertiesCollector; Rep* rep_; + struct ParallelCompressionRep; + // Advanced operation: flush any buffered key/value pairs to file. // Can be used to ensure that two adjacent entries never live in // the same data block. Most clients should not need to use this method. @@ -146,6 +157,22 @@ class BlockBasedTableBuilder : public TableBuilder { // Some compression libraries fail when the raw size is bigger than int. If // uncompressed size is bigger than kCompressionSizeLimit, don't compress it const uint64_t kCompressionSizeLimit = std::numeric_limits::max(); + + // Get blocks from mem-table walking thread, compress them and + // pass them to the write thread. Used in parallel compression mode only + void BGWorkCompression(CompressionContext& compression_ctx, + UncompressionContext* verify_ctx); + + // Given raw block content, try to compress it and return result and + // compression type + void CompressAndVerifyBlock( + const Slice& raw_block_contents, bool is_data_block, + CompressionContext& compression_ctx, UncompressionContext* verify_ctx, + std::string& compressed_output, Slice& result_block_contents, + CompressionType& result_compression_type, Status& out_status); + + // Get compressed blocks from BGWorkCompression and write them into SST + void BGWorkWriteRawBlock(); }; Slice CompressBlock(const Slice& raw, const CompressionInfo& info, diff --git a/table/block_based/block_builder.cc b/table/block_based/block_builder.cc index 6f77ef97c..4964ba3ae 100644 --- a/table/block_based/block_builder.cc +++ b/table/block_based/block_builder.cc @@ -81,6 +81,11 @@ void BlockBuilder::Reset() { } } +void BlockBuilder::SwapAndReset(std::string& buffer) { + std::swap(buffer_, buffer); + Reset(); +} + size_t BlockBuilder::EstimateSizeAfterKV(const Slice& key, const Slice& value) const { size_t estimate = CurrentSizeEstimate(); diff --git a/table/block_based/block_builder.h b/table/block_based/block_builder.h index 42c996e5b..e3fcfc2ec 100644 --- a/table/block_based/block_builder.h +++ b/table/block_based/block_builder.h @@ -32,6 +32,9 @@ class BlockBuilder { // Reset the contents as if the BlockBuilder was just constructed. void Reset(); + // Swap the contents in BlockBuilder with buffer, then reset the BlockBuilder. + void SwapAndReset(std::string& buffer); + // REQUIRES: Finish() has not been called since the last call to Reset(). // REQUIRES: key is larger than any previously added key void Add(const Slice& key, const Slice& value, diff --git a/table/table_builder.h b/table/table_builder.h index bb8dc4df2..3254b78e7 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -156,6 +156,11 @@ class TableBuilder { // Finish() call, returns the size of the final generated file. virtual uint64_t FileSize() const = 0; + // Estimated size of the file generated so far. This is used when + // FileSize() cannot estimate final SST size, e.g. parallel compression + // is enabled. + virtual uint64_t EstimatedFileSize() const { return FileSize(); } + // If the user defined table properties collector suggest the file to // be further compacted. virtual bool NeedCompact() const { return false; } diff --git a/table/table_test.cc b/table/table_test.cc index 61378d771..439d4981b 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -599,6 +599,7 @@ struct TestArgs { bool reverse_compare; int restart_interval; CompressionType compression; + uint32_t compression_parallel_threads; uint32_t format_version; bool use_mmap; }; @@ -616,6 +617,7 @@ static std::vector GenerateArgList() { MEMTABLE_TEST, DB_TEST}; std::vector reverse_compare_types = {false, true}; std::vector restart_intervals = {16, 1, 1024}; + std::vector compression_parallel_threads = {1, 4}; // Only add compression if it is supported std::vector> compression_types; @@ -658,6 +660,7 @@ static std::vector GenerateArgList() { one_arg.reverse_compare = reverse_compare; one_arg.restart_interval = restart_intervals[0]; one_arg.compression = compression_types[0].first; + one_arg.compression_parallel_threads = 1; one_arg.use_mmap = true; test_args.push_back(one_arg); one_arg.use_mmap = false; @@ -668,14 +671,17 @@ static std::vector GenerateArgList() { for (auto restart_interval : restart_intervals) { for (auto compression_type : compression_types) { - TestArgs one_arg; - one_arg.type = test_type; - one_arg.reverse_compare = reverse_compare; - one_arg.restart_interval = restart_interval; - one_arg.compression = compression_type.first; - one_arg.format_version = compression_type.second ? 2 : 1; - one_arg.use_mmap = false; - test_args.push_back(one_arg); + for (auto num_threads : compression_parallel_threads) { + TestArgs one_arg; + one_arg.type = test_type; + one_arg.reverse_compare = reverse_compare; + one_arg.restart_interval = restart_interval; + one_arg.compression = compression_type.first; + one_arg.format_version = compression_type.second ? 2 : 1; + one_arg.compression_parallel_threads = num_threads; + one_arg.use_mmap = false; + test_args.push_back(one_arg); + } } } } @@ -727,6 +733,8 @@ class HarnessTest : public testing::Test { constructor_ = nullptr; options_ = Options(); options_.compression = args.compression; + options_.compression_opts.parallel_threads = + args.compression_parallel_threads; // Use shorter block size for tests to exercise block boundary // conditions more. if (args.reverse_compare) { diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 9e2a37967..aa5ae3998 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -919,6 +919,9 @@ DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts" " not compressed. Otherwise, apply compression_type to " "all levels."); +DEFINE_int32(compression_threads, 1, + "Number of concurrent compression threads to run."); + static bool ValidateTableCacheNumshardbits(const char* flagname, int32_t value) { if (0 >= value || value > 20) { @@ -4008,6 +4011,7 @@ class Benchmark { options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes; options.compression_opts.zstd_max_train_bytes = FLAGS_compression_zstd_max_train_bytes; + options.compression_opts.parallel_threads = FLAGS_compression_threads; // If this is a block based table, set some related options if (options.table_factory->Name() == BlockBasedTableFactory::kName && options.table_factory->GetOptions() != nullptr) { diff --git a/util/work_queue.h b/util/work_queue.h new file mode 100644 index 000000000..3d9126364 --- /dev/null +++ b/util/work_queue.h @@ -0,0 +1,149 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ROCKSDB_NAMESPACE { + +/// Unbounded thread-safe work queue. +// +// This file is an excerpt from Facebook's zstd repo at +// https://github.com/facebook/zstd/. The relevant file is +// contrib/pzstd/utils/WorkQueue.h. + +template +class WorkQueue { + // Protects all member variable access + std::mutex mutex_; + std::condition_variable readerCv_; + std::condition_variable writerCv_; + std::condition_variable finishCv_; + + std::queue queue_; + bool done_; + std::size_t maxSize_; + + // Must have lock to call this function + bool full() const { + if (maxSize_ == 0) { + return false; + } + return queue_.size() >= maxSize_; + } + + public: + /** + * Constructs an empty work queue with an optional max size. + * If `maxSize == 0` the queue size is unbounded. + * + * @param maxSize The maximum allowed size of the work queue. + */ + WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {} + + /** + * Push an item onto the work queue. Notify a single thread that work is + * available. If `finish()` has been called, do nothing and return false. + * If `push()` returns false, then `item` has not been copied from. + * + * @param item Item to push onto the queue. + * @returns True upon success, false if `finish()` has been called. An + * item was pushed iff `push()` returns true. + */ + template + bool push(U&& item) { + { + std::unique_lock lock(mutex_); + while (full() && !done_) { + writerCv_.wait(lock); + } + if (done_) { + return false; + } + queue_.push(std::forward(item)); + } + readerCv_.notify_one(); + return true; + } + + /** + * Attempts to pop an item off the work queue. It will block until data is + * available or `finish()` has been called. + * + * @param[out] item If `pop` returns `true`, it contains the popped item. + * If `pop` returns `false`, it is unmodified. + * @returns True upon success. False if the queue is empty and + * `finish()` has been called. + */ + bool pop(T& item) { + { + std::unique_lock lock(mutex_); + while (queue_.empty() && !done_) { + readerCv_.wait(lock); + } + if (queue_.empty()) { + assert(done_); + return false; + } + item = queue_.front(); + queue_.pop(); + } + writerCv_.notify_one(); + return true; + } + + /** + * Sets the maximum queue size. If `maxSize == 0` then it is unbounded. + * + * @param maxSize The new maximum queue size. + */ + void setMaxSize(std::size_t maxSize) { + { + std::lock_guard lock(mutex_); + maxSize_ = maxSize; + } + writerCv_.notify_all(); + } + + /** + * Promise that `push()` won't be called again, so once the queue is empty + * there will never any more work. + */ + void finish() { + { + std::lock_guard lock(mutex_); + assert(!done_); + done_ = true; + } + readerCv_.notify_all(); + writerCv_.notify_all(); + finishCv_.notify_all(); + } + + /// Blocks until `finish()` has been called (but the queue may not be empty). + void waitUntilFinished() { + std::unique_lock lock(mutex_); + while (!done_) { + finishCv_.wait(lock); + } + } +}; +} diff --git a/util/work_queue_test.cc b/util/work_queue_test.cc new file mode 100644 index 000000000..967101568 --- /dev/null +++ b/util/work_queue_test.cc @@ -0,0 +1,268 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +/* + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under both the BSD-style license (found in the + * LICENSE file in the root directory of this source tree) and the GPLv2 (found + * in the COPYING file in the root directory of this source tree). + */ +#include "util/work_queue.h" + +#include +#include +#include +#include +#include +#include + +namespace ROCKSDB_NAMESPACE { + +// Unit test for work_queue.h. +// +// This file is an excerpt from Facebook's zstd repo at +// https://github.com/facebook/zstd/. The relevant file is +// contrib/pzstd/utils/test/WorkQueueTest.cpp. + +struct Popper { + WorkQueue* queue; + int* results; + std::mutex* mutex; + + void operator()() { + int result; + while (queue->pop(result)) { + std::lock_guard lock(*mutex); + results[result] = result; + } + } +}; + +TEST(WorkQueue, SingleThreaded) { + WorkQueue queue; + int result; + + queue.push(5); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(5, result); + + queue.push(1); + queue.push(2); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(1, result); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(2, result); + + queue.push(1); + queue.push(2); + queue.finish(); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(1, result); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(2, result); + EXPECT_FALSE(queue.pop(result)); + + queue.waitUntilFinished(); +} + +TEST(WorkQueue, SPSC) { + WorkQueue queue; + const int max = 100; + + for (int i = 0; i < 10; ++i) { + queue.push(i); + } + + std::thread thread([&queue, max] { + int result; + for (int i = 0;; ++i) { + if (!queue.pop(result)) { + EXPECT_EQ(i, max); + break; + } + EXPECT_EQ(i, result); + } + }); + + std::this_thread::yield(); + for (int i = 10; i < max; ++i) { + queue.push(i); + } + queue.finish(); + + thread.join(); +} + +TEST(WorkQueue, SPMC) { + WorkQueue queue; + std::vector results(50, -1); + std::mutex mutex; + std::vector threads; + for (int i = 0; i < 5; ++i) { + threads.emplace_back(Popper{&queue, results.data(), &mutex}); + } + + for (int i = 0; i < 50; ++i) { + queue.push(i); + } + queue.finish(); + + for (auto& thread : threads) { + thread.join(); + } + + for (int i = 0; i < 50; ++i) { + EXPECT_EQ(i, results[i]); + } +} + +TEST(WorkQueue, MPMC) { + WorkQueue queue; + std::vector results(100, -1); + std::mutex mutex; + std::vector popperThreads; + for (int i = 0; i < 4; ++i) { + popperThreads.emplace_back(Popper{&queue, results.data(), &mutex}); + } + + std::vector pusherThreads; + for (int i = 0; i < 2; ++i) { + auto min = i * 50; + auto max = (i + 1) * 50; + pusherThreads.emplace_back([&queue, min, max] { + for (int j = min; j < max; ++j) { + queue.push(j); + } + }); + } + + for (auto& thread : pusherThreads) { + thread.join(); + } + queue.finish(); + + for (auto& thread : popperThreads) { + thread.join(); + } + + for (int i = 0; i < 100; ++i) { + EXPECT_EQ(i, results[i]); + } +} + +TEST(WorkQueue, BoundedSizeWorks) { + WorkQueue queue(1); + int result; + queue.push(5); + queue.pop(result); + queue.push(5); + queue.pop(result); + queue.push(5); + queue.finish(); + queue.pop(result); + EXPECT_EQ(5, result); +} + +TEST(WorkQueue, BoundedSizePushAfterFinish) { + WorkQueue queue(1); + int result; + queue.push(5); + std::thread pusher([&queue] { queue.push(6); }); + // Dirtily try and make sure that pusher has run. + std::this_thread::sleep_for(std::chrono::seconds(1)); + queue.finish(); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(5, result); + EXPECT_FALSE(queue.pop(result)); + + pusher.join(); +} + +TEST(WorkQueue, SetMaxSize) { + WorkQueue queue(2); + int result; + queue.push(5); + queue.push(6); + queue.setMaxSize(1); + std::thread pusher([&queue] { queue.push(7); }); + // Dirtily try and make sure that pusher has run. + std::this_thread::sleep_for(std::chrono::seconds(1)); + queue.finish(); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(5, result); + EXPECT_TRUE(queue.pop(result)); + EXPECT_EQ(6, result); + EXPECT_FALSE(queue.pop(result)); + + pusher.join(); +} + +TEST(WorkQueue, BoundedSizeMPMC) { + WorkQueue queue(10); + std::vector results(200, -1); + std::mutex mutex; + std::cerr << "Creating popperThreads" << std::endl; + std::vector popperThreads; + for (int i = 0; i < 4; ++i) { + popperThreads.emplace_back(Popper{&queue, results.data(), &mutex}); + } + + std::cerr << "Creating pusherThreads" << std::endl; + std::vector pusherThreads; + for (int i = 0; i < 2; ++i) { + auto min = i * 100; + auto max = (i + 1) * 100; + pusherThreads.emplace_back([&queue, min, max] { + for (int j = min; j < max; ++j) { + queue.push(j); + } + }); + } + + std::cerr << "Joining pusherThreads" << std::endl; + for (auto& thread : pusherThreads) { + thread.join(); + } + std::cerr << "Finishing queue" << std::endl; + queue.finish(); + + std::cerr << "Joining popperThreads" << std::endl; + for (auto& thread : popperThreads) { + thread.join(); + } + + std::cerr << "Inspecting results" << std::endl; + for (int i = 0; i < 200; ++i) { + EXPECT_EQ(i, results[i]); + } +} + +TEST(WorkQueue, FailedPush) { + WorkQueue queue; + EXPECT_TRUE(queue.push(1)); + queue.finish(); + EXPECT_FALSE(queue.push(1)); +} + +TEST(WorkQueue, FailedPop) { + WorkQueue queue; + int x = 5; + EXPECT_TRUE(queue.push(x)); + queue.finish(); + x = 0; + EXPECT_TRUE(queue.pop(x)); + EXPECT_EQ(5, x); + EXPECT_FALSE(queue.pop(x)); + EXPECT_EQ(5, x); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}