Add pipelined & parallel compression optimization (#6262)

Summary:
This PR adds support for pipelined & parallel compression optimization for `BlockBasedTableBuilder`. This optimization makes block building, block compression and block appending a pipeline, and uses multiple threads to accelerate block compression. Users can set `CompressionOptions::parallel_threads` greater than 1 to enable compression parallelism.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6262

Reviewed By: ajkr

Differential Revision: D20651306

fbshipit-source-id: 62125590a9c15b6d9071def9dc72589c1696a4cb
main
Ziyue Yang 5 years ago committed by Facebook GitHub Bot
parent 719c0f91bf
commit 03a781a90c
  1. 1
      CMakeLists.txt
  2. 3
      HISTORY.md
  3. 4
      Makefile
  4. 7
      TARGETS
  5. 3
      db/compaction/compaction_job.cc
  6. 12
      db/db_basic_test.cc
  7. 6
      db/db_options_test.cc
  8. 26
      db/db_test2.cc
  9. 9
      db/db_with_timestamp_basic_test.cc
  10. 21
      include/rocksdb/advanced_options.h
  11. 9
      options/options.cc
  12. 11
      options/options_helper.cc
  13. 8
      options/options_test.cc
  14. 1
      src.mk
  15. 532
      table/block_based/block_based_table_builder.cc
  16. 27
      table/block_based/block_based_table_builder.h
  17. 5
      table/block_based/block_builder.cc
  18. 3
      table/block_based/block_builder.h
  19. 5
      table/table_builder.h
  20. 8
      table/table_test.cc
  21. 4
      tools/db_bench_tool.cc
  22. 149
      util/work_queue.h
  23. 268
      util/work_queue_test.cc

@ -1072,6 +1072,7 @@ if(WITH_TESTS)
util/timer_queue_test.cc util/timer_queue_test.cc
util/thread_list_test.cc util/thread_list_test.cc
util/thread_local_test.cc util/thread_local_test.cc
util/work_queue_test.cc
utilities/backupable/backupable_db_test.cc utilities/backupable/backupable_db_test.cc
utilities/blob_db/blob_db_test.cc utilities/blob_db/blob_db_test.cc
utilities/cassandra/cassandra_functional_test.cc utilities/cassandra/cassandra_functional_test.cc

@ -3,6 +3,9 @@
### Behavior changes ### Behavior changes
* Since RocksDB 6.8, ttl-based FIFO compaction can drop a file whose oldest key becomes older than options.ttl while others have not. This fix reverts this and makes ttl-based FIFO compaction use the file's flush time as the criterion. This fix also requires that max_open_files = -1 and compaction_options_fifo.allow_compaction = false to function properly. * Since RocksDB 6.8, ttl-based FIFO compaction can drop a file whose oldest key becomes older than options.ttl while others have not. This fix reverts this and makes ttl-based FIFO compaction use the file's flush time as the criterion. This fix also requires that max_open_files = -1 and compaction_options_fifo.allow_compaction = false to function properly.
### New Features
* Added support for pipelined & parallel compression optimization for `BlockBasedTableBuilder`. This optimization makes block building, block compression and block appending a pipeline, and uses multiple threads to accelerate block compression. Users can set `CompressionOptions::parallel_threads` greater than 1 to enable compression parallelism.
### Bug Fixes ### Bug Fixes
* Fix a bug which might crash the service when write buffer manager fails to insert the dummy handle to the block cache. * Fix a bug which might crash the service when write buffer manager fails to insert the dummy handle to the block cache.

@ -466,6 +466,7 @@ TESTS = \
hash_test \ hash_test \
random_test \ random_test \
thread_local_test \ thread_local_test \
work_queue_test \
rate_limiter_test \ rate_limiter_test \
perf_context_test \ perf_context_test \
iostats_context_test \ iostats_context_test \
@ -1295,6 +1296,9 @@ histogram_test: monitoring/histogram_test.o $(LIBOBJECTS) $(TESTHARNESS)
thread_local_test: util/thread_local_test.o $(LIBOBJECTS) $(TESTHARNESS) thread_local_test: util/thread_local_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
work_queue_test: util/work_queue_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
corruption_test: db/corruption_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) corruption_test: db/corruption_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)

@ -1512,6 +1512,13 @@ ROCKS_TESTS = [
[], [],
[], [],
], ],
[
"work_queue_test",
"util/work_queue_test.cc",
"serial",
[],
[],
],
[ [
"write_batch_test", "write_batch_test",
"db/write_batch_test.cc", "db/write_batch_test.cc",

@ -937,7 +937,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact->builder != nullptr); assert(sub_compact->builder != nullptr);
assert(sub_compact->current_output() != nullptr); assert(sub_compact->current_output() != nullptr);
sub_compact->builder->Add(key, value); sub_compact->builder->Add(key, value);
sub_compact->current_output_file_size = sub_compact->builder->FileSize(); sub_compact->current_output_file_size =
sub_compact->builder->EstimatedFileSize();
const ParsedInternalKey& ikey = c_iter->ikey(); const ParsedInternalKey& ikey = c_iter->ikey();
sub_compact->current_output()->meta.UpdateBoundaries( sub_compact->current_output()->meta.UpdateBoundaries(
key, value, ikey.sequence, ikey.type); key, value, ikey.sequence, ikey.type);

@ -1892,13 +1892,15 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
class DBBasicTestWithParallelIO class DBBasicTestWithParallelIO
: public DBTestBase, : public DBTestBase,
public testing::WithParamInterface<std::tuple<bool, bool, bool, bool>> { public testing::WithParamInterface<
std::tuple<bool, bool, bool, bool, uint32_t>> {
public: public:
DBBasicTestWithParallelIO() : DBTestBase("/db_basic_test_with_parallel_io") { DBBasicTestWithParallelIO() : DBTestBase("/db_basic_test_with_parallel_io") {
bool compressed_cache = std::get<0>(GetParam()); bool compressed_cache = std::get<0>(GetParam());
bool uncompressed_cache = std::get<1>(GetParam()); bool uncompressed_cache = std::get<1>(GetParam());
compression_enabled_ = std::get<2>(GetParam()); compression_enabled_ = std::get<2>(GetParam());
fill_cache_ = std::get<3>(GetParam()); fill_cache_ = std::get<3>(GetParam());
uint32_t compression_parallel_threads = std::get<4>(GetParam());
if (compressed_cache) { if (compressed_cache) {
std::shared_ptr<Cache> cache = NewLRUCache(1048576); std::shared_ptr<Cache> cache = NewLRUCache(1048576);
@ -1953,6 +1955,8 @@ class DBBasicTestWithParallelIO
options.table_factory.reset(new BlockBasedTableFactory(table_options)); options.table_factory.reset(new BlockBasedTableFactory(table_options));
if (!compression_enabled_) { if (!compression_enabled_) {
options.compression = kNoCompression; options.compression = kNoCompression;
} else {
options.compression_opts.parallel_threads = compression_parallel_threads;
} }
Reopen(options); Reopen(options);
@ -2354,10 +2358,10 @@ INSTANTIATE_TEST_CASE_P(ParallelIO, DBBasicTestWithParallelIO,
// Param 1 - Uncompressed cache enabled // Param 1 - Uncompressed cache enabled
// Param 2 - Data compression enabled // Param 2 - Data compression enabled
// Param 3 - ReadOptions::fill_cache // Param 3 - ReadOptions::fill_cache
// Param 4 - CompressionOptions::parallel_threads
::testing::Combine(::testing::Bool(), ::testing::Bool(), ::testing::Combine(::testing::Bool(), ::testing::Bool(),
::testing::Bool(), ::testing::Bool(), ::testing::Bool(),
::testing::Bool())); ::testing::Values(1, 4)));
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -872,6 +872,7 @@ TEST_F(DBOptionsTest, ChangeCompression) {
options.compression = CompressionType::kLZ4Compression; options.compression = CompressionType::kLZ4Compression;
options.bottommost_compression = CompressionType::kNoCompression; options.bottommost_compression = CompressionType::kNoCompression;
options.bottommost_compression_opts.level = 2; options.bottommost_compression_opts.level = 2;
options.bottommost_compression_opts.parallel_threads = 1;
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
@ -897,12 +898,14 @@ TEST_F(DBOptionsTest, ChangeCompression) {
ASSERT_TRUE(compacted); ASSERT_TRUE(compacted);
ASSERT_EQ(CompressionType::kNoCompression, compression_used); ASSERT_EQ(CompressionType::kNoCompression, compression_used);
ASSERT_EQ(options.compression_opts.level, compression_opt_used.level); ASSERT_EQ(options.compression_opts.level, compression_opt_used.level);
ASSERT_EQ(options.compression_opts.parallel_threads,
compression_opt_used.parallel_threads);
compression_used = CompressionType::kLZ4Compression; compression_used = CompressionType::kLZ4Compression;
compacted = false; compacted = false;
ASSERT_OK(dbfull()->SetOptions( ASSERT_OK(dbfull()->SetOptions(
{{"bottommost_compression", "kSnappyCompression"}, {{"bottommost_compression", "kSnappyCompression"},
{"bottommost_compression_opts", "0:6:0:0:0:true"}})); {"bottommost_compression_opts", "0:6:0:0:0:4:true"}}));
ASSERT_OK(Put("foo", "foofoofoo")); ASSERT_OK(Put("foo", "foofoofoo"));
ASSERT_OK(Put("bar", "foofoofoo")); ASSERT_OK(Put("bar", "foofoofoo"));
ASSERT_OK(Flush()); ASSERT_OK(Flush());
@ -913,6 +916,7 @@ TEST_F(DBOptionsTest, ChangeCompression) {
ASSERT_TRUE(compacted); ASSERT_TRUE(compacted);
ASSERT_EQ(CompressionType::kSnappyCompression, compression_used); ASSERT_EQ(CompressionType::kSnappyCompression, compression_used);
ASSERT_EQ(6, compression_opt_used.level); ASSERT_EQ(6, compression_opt_used.level);
ASSERT_EQ(4u, compression_opt_used.parallel_threads);
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
} }

@ -1288,6 +1288,10 @@ TEST_F(DBTest2, CompressionOptions) {
const int kValSize = 20; const int kValSize = 20;
Random rnd(301); Random rnd(301);
std::vector<uint32_t> compression_parallel_threads = {1, 4};
std::map<std::string, std::string> key_value_written;
for (int iter = 0; iter <= 2; iter++) { for (int iter = 0; iter <= 2; iter++) {
listener->max_level_checked = 0; listener->max_level_checked = 0;
@ -1312,12 +1316,18 @@ TEST_F(DBTest2, CompressionOptions) {
options.bottommost_compression = kDisableCompressionOption; options.bottommost_compression = kDisableCompressionOption;
} }
for (auto num_threads : compression_parallel_threads) {
options.compression_opts.parallel_threads = num_threads;
options.bottommost_compression_opts.parallel_threads = num_threads;
DestroyAndReopen(options); DestroyAndReopen(options);
// Write 10 random files // Write 10 random files
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
for (int j = 0; j < 5; j++) { for (int j = 0; j < 5; j++) {
ASSERT_OK( std::string key = RandomString(&rnd, kKeySize);
Put(RandomString(&rnd, kKeySize), RandomString(&rnd, kValSize))); std::string value = RandomString(&rnd, kValSize);
key_value_written[key] = value;
ASSERT_OK(Put(key, value));
} }
ASSERT_OK(Flush()); ASSERT_OK(Flush());
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
@ -1325,6 +1335,18 @@ TEST_F(DBTest2, CompressionOptions) {
// Make sure that we wrote enough to check all 7 levels // Make sure that we wrote enough to check all 7 levels
ASSERT_EQ(listener->max_level_checked, 6); ASSERT_EQ(listener->max_level_checked, 6);
// Make sure database content is the same as key_value_written
std::unique_ptr<Iterator> db_iter(db_->NewIterator(ReadOptions()));
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
std::string key = db_iter->key().ToString();
std::string value = db_iter->value().ToString();
ASSERT_NE(key_value_written.find(key), key_value_written.end());
ASSERT_EQ(key_value_written[key], value);
key_value_written.erase(key);
}
ASSERT_EQ(0, key_value_written.size());
}
} }
} }

@ -419,8 +419,9 @@ TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) {
class DBBasicTestWithTimestampCompressionSettings class DBBasicTestWithTimestampCompressionSettings
: public DBBasicTestWithTimestampBase, : public DBBasicTestWithTimestampBase,
public testing::WithParamInterface<std::tuple< public testing::WithParamInterface<
std::shared_ptr<const FilterPolicy>, CompressionType, uint32_t>> { std::tuple<std::shared_ptr<const FilterPolicy>, CompressionType,
uint32_t, uint32_t>> {
public: public:
DBBasicTestWithTimestampCompressionSettings() DBBasicTestWithTimestampCompressionSettings()
: DBBasicTestWithTimestampBase( : DBBasicTestWithTimestampBase(
@ -460,6 +461,7 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) {
if (comp_type == kZSTD) { if (comp_type == kZSTD) {
options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam()); options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam());
} }
options.compression_opts.parallel_threads = std::get<3>(GetParam());
options.target_file_size_base = 1 << 26; // 64MB options.target_file_size_base = 1 << 26; // 64MB
DestroyAndReopen(options); DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
@ -572,6 +574,7 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
if (comp_type == kZSTD) { if (comp_type == kZSTD) {
options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam()); options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam());
} }
options.compression_opts.parallel_threads = std::get<3>(GetParam());
DestroyAndReopen(options); DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
@ -749,7 +752,7 @@ INSTANTIATE_TEST_CASE_P(
NewBloomFilterPolicy(10, false))), NewBloomFilterPolicy(10, false))),
::testing::Values(kNoCompression, kZlibCompression, kLZ4Compression, ::testing::Values(kNoCompression, kZlibCompression, kLZ4Compression,
kLZ4HCCompression, kZSTD), kLZ4HCCompression, kZSTD),
::testing::Values(0, 1 << 14))); ::testing::Values(0, 1 << 14), ::testing::Values(1, 4)));
class DBBasicTestWithTimestampPrefixSeek class DBBasicTestWithTimestampPrefixSeek
: public DBBasicTestWithTimestampBase, : public DBBasicTestWithTimestampBase,

@ -117,6 +117,22 @@ struct CompressionOptions {
// Default: 0. // Default: 0.
uint32_t zstd_max_train_bytes; uint32_t zstd_max_train_bytes;
// Number of threads for parallel compression.
// Parallel compression is enabled only if threads > 1.
//
// This option is valid only when BlockBasedTable is used.
//
// When parallel compression is enabled, SST size estimation becomes less
// accurate, because block building and compression are pipelined, and there
// might be inflight blocks being compressed and not finally written, when
// current SST size is fetched. This brings inflation of final output file
// size.
// To be more accurate, this inflation is also estimated by using historical
// compression ratio and current bytes inflight.
//
// Default: 1.
uint32_t parallel_threads;
// When the compression options are set by the user, it will be set to "true". // When the compression options are set by the user, it will be set to "true".
// For bottommost_compression_opts, to enable it, user must set enabled=true. // For bottommost_compression_opts, to enable it, user must set enabled=true.
// Otherwise, bottommost compression will use compression_opts as default // Otherwise, bottommost compression will use compression_opts as default
@ -134,14 +150,17 @@ struct CompressionOptions {
strategy(0), strategy(0),
max_dict_bytes(0), max_dict_bytes(0),
zstd_max_train_bytes(0), zstd_max_train_bytes(0),
parallel_threads(1),
enabled(false) {} enabled(false) {}
CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes, CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes,
int _zstd_max_train_bytes, bool _enabled) int _zstd_max_train_bytes, int _parallel_threads,
bool _enabled)
: window_bits(wbits), : window_bits(wbits),
level(_lev), level(_lev),
strategy(_strategy), strategy(_strategy),
max_dict_bytes(_max_dict_bytes), max_dict_bytes(_max_dict_bytes),
zstd_max_train_bytes(_zstd_max_train_bytes), zstd_max_train_bytes(_zstd_max_train_bytes),
parallel_threads(_parallel_threads),
enabled(_enabled) {} enabled(_enabled) {}
}; };

@ -182,6 +182,11 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
" Options.bottommost_compression_opts.zstd_max_train_bytes: " " Options.bottommost_compression_opts.zstd_max_train_bytes: "
"%" PRIu32, "%" PRIu32,
bottommost_compression_opts.zstd_max_train_bytes); bottommost_compression_opts.zstd_max_train_bytes);
ROCKS_LOG_HEADER(
log,
" Options.bottommost_compression_opts.parallel_threads: "
"%" PRIu32,
bottommost_compression_opts.parallel_threads);
ROCKS_LOG_HEADER( ROCKS_LOG_HEADER(
log, " Options.bottommost_compression_opts.enabled: %s", log, " Options.bottommost_compression_opts.enabled: %s",
bottommost_compression_opts.enabled ? "true" : "false"); bottommost_compression_opts.enabled ? "true" : "false");
@ -199,6 +204,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
" Options.compression_opts.zstd_max_train_bytes: " " Options.compression_opts.zstd_max_train_bytes: "
"%" PRIu32, "%" PRIu32,
compression_opts.zstd_max_train_bytes); compression_opts.zstd_max_train_bytes);
ROCKS_LOG_HEADER(log,
" Options.compression_opts.parallel_threads: "
"%" PRIu32,
compression_opts.parallel_threads);
ROCKS_LOG_HEADER(log, ROCKS_LOG_HEADER(log,
" Options.compression_opts.enabled: %s", " Options.compression_opts.enabled: %s",
compression_opts.enabled ? "true" : "false"); compression_opts.enabled ? "true" : "false");

@ -835,6 +835,17 @@ Status ParseCompressionOptions(const std::string& value,
ParseInt(value.substr(start, value.size() - start)); ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start); end = value.find(':', start);
} }
// parallel_threads is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.parallel_threads =
ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start);
}
// enabled is optional for backwards compatibility // enabled is optional for backwards compatibility
if (end != std::string::npos) { if (end != std::string::npos) {
start = end + 1; start = end + 1;

@ -63,8 +63,8 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
"kZSTD:" "kZSTD:"
"kZSTDNotFinalCompression"}, "kZSTDNotFinalCompression"},
{"bottommost_compression", "kLZ4Compression"}, {"bottommost_compression", "kLZ4Compression"},
{"bottommost_compression_opts", "5:6:7:8:9:true"}, {"bottommost_compression_opts", "5:6:7:8:9:10:true"},
{"compression_opts", "4:5:6:7:8:true"}, {"compression_opts", "4:5:6:7:8:9:true"},
{"num_levels", "8"}, {"num_levels", "8"},
{"level0_file_num_compaction_trigger", "8"}, {"level0_file_num_compaction_trigger", "8"},
{"level0_slowdown_writes_trigger", "9"}, {"level0_slowdown_writes_trigger", "9"},
@ -168,6 +168,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.compression_opts.strategy, 6); ASSERT_EQ(new_cf_opt.compression_opts.strategy, 6);
ASSERT_EQ(new_cf_opt.compression_opts.max_dict_bytes, 7u); ASSERT_EQ(new_cf_opt.compression_opts.max_dict_bytes, 7u);
ASSERT_EQ(new_cf_opt.compression_opts.zstd_max_train_bytes, 8u); ASSERT_EQ(new_cf_opt.compression_opts.zstd_max_train_bytes, 8u);
ASSERT_EQ(new_cf_opt.compression_opts.parallel_threads, 9u);
ASSERT_EQ(new_cf_opt.compression_opts.enabled, true); ASSERT_EQ(new_cf_opt.compression_opts.enabled, true);
ASSERT_EQ(new_cf_opt.bottommost_compression, kLZ4Compression); ASSERT_EQ(new_cf_opt.bottommost_compression, kLZ4Compression);
ASSERT_EQ(new_cf_opt.bottommost_compression_opts.window_bits, 5); ASSERT_EQ(new_cf_opt.bottommost_compression_opts.window_bits, 5);
@ -175,6 +176,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.bottommost_compression_opts.strategy, 7); ASSERT_EQ(new_cf_opt.bottommost_compression_opts.strategy, 7);
ASSERT_EQ(new_cf_opt.bottommost_compression_opts.max_dict_bytes, 8u); ASSERT_EQ(new_cf_opt.bottommost_compression_opts.max_dict_bytes, 8u);
ASSERT_EQ(new_cf_opt.bottommost_compression_opts.zstd_max_train_bytes, 9u); ASSERT_EQ(new_cf_opt.bottommost_compression_opts.zstd_max_train_bytes, 9u);
ASSERT_EQ(new_cf_opt.bottommost_compression_opts.parallel_threads, 10u);
ASSERT_EQ(new_cf_opt.bottommost_compression_opts.enabled, true); ASSERT_EQ(new_cf_opt.bottommost_compression_opts.enabled, true);
ASSERT_EQ(new_cf_opt.num_levels, 8); ASSERT_EQ(new_cf_opt.num_levels, 8);
ASSERT_EQ(new_cf_opt.level0_file_num_compaction_trigger, 8); ASSERT_EQ(new_cf_opt.level0_file_num_compaction_trigger, 8);
@ -801,6 +803,7 @@ TEST_F(OptionsTest, GetOptionsFromStringTest) {
ASSERT_EQ(new_options.compression_opts.strategy, 6); ASSERT_EQ(new_options.compression_opts.strategy, 6);
ASSERT_EQ(new_options.compression_opts.max_dict_bytes, 0u); ASSERT_EQ(new_options.compression_opts.max_dict_bytes, 0u);
ASSERT_EQ(new_options.compression_opts.zstd_max_train_bytes, 0u); ASSERT_EQ(new_options.compression_opts.zstd_max_train_bytes, 0u);
ASSERT_EQ(new_options.compression_opts.parallel_threads, 1u);
ASSERT_EQ(new_options.compression_opts.enabled, false); ASSERT_EQ(new_options.compression_opts.enabled, false);
ASSERT_EQ(new_options.bottommost_compression, kDisableCompressionOption); ASSERT_EQ(new_options.bottommost_compression, kDisableCompressionOption);
ASSERT_EQ(new_options.bottommost_compression_opts.window_bits, 5); ASSERT_EQ(new_options.bottommost_compression_opts.window_bits, 5);
@ -808,6 +811,7 @@ TEST_F(OptionsTest, GetOptionsFromStringTest) {
ASSERT_EQ(new_options.bottommost_compression_opts.strategy, 7); ASSERT_EQ(new_options.bottommost_compression_opts.strategy, 7);
ASSERT_EQ(new_options.bottommost_compression_opts.max_dict_bytes, 0u); ASSERT_EQ(new_options.bottommost_compression_opts.max_dict_bytes, 0u);
ASSERT_EQ(new_options.bottommost_compression_opts.zstd_max_train_bytes, 0u); ASSERT_EQ(new_options.bottommost_compression_opts.zstd_max_train_bytes, 0u);
ASSERT_EQ(new_options.bottommost_compression_opts.parallel_threads, 1u);
ASSERT_EQ(new_options.bottommost_compression_opts.enabled, false); ASSERT_EQ(new_options.bottommost_compression_opts.enabled, false);
ASSERT_EQ(new_options.write_buffer_size, 10U); ASSERT_EQ(new_options.write_buffer_size, 10U);
ASSERT_EQ(new_options.max_write_buffer_number, 16); ASSERT_EQ(new_options.max_write_buffer_number, 16);

@ -450,6 +450,7 @@ MAIN_SOURCES = \
util/timer_queue_test.cc \ util/timer_queue_test.cc \
util/thread_list_test.cc \ util/thread_list_test.cc \
util/thread_local_test.cc \ util/thread_local_test.cc \
util/work_queue_test.cc \
utilities/backupable/backupable_db_test.cc \ utilities/backupable/backupable_db_test.cc \
utilities/blob_db/blob_db_test.cc \ utilities/blob_db/blob_db_test.cc \
utilities/cassandra/cassandra_format_test.cc \ utilities/cassandra/cassandra_format_test.cc \

@ -11,6 +11,7 @@
#include <assert.h> #include <assert.h>
#include <stdio.h> #include <stdio.h>
#include <atomic>
#include <list> #include <list>
#include <map> #include <map>
#include <memory> #include <memory>
@ -46,6 +47,7 @@
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/work_queue.h"
#include "util/xxhash.h" #include "util/xxhash.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -284,6 +286,10 @@ struct BlockBasedTableBuilder::Rep {
uint64_t offset = 0; uint64_t offset = 0;
Status status; Status status;
IOStatus io_status; IOStatus io_status;
// Synchronize status & io_status accesses across threads from main thread,
// compression thread and write thread in parallel compression.
std::mutex status_mutex;
std::mutex io_status_mutex;
size_t alignment; size_t alignment;
BlockBuilder data_block; BlockBuilder data_block;
// Buffers uncompressed data blocks and keys to replay later. Needed when // Buffers uncompressed data blocks and keys to replay later. Needed when
@ -300,12 +306,13 @@ struct BlockBasedTableBuilder::Rep {
PartitionedIndexBuilder* p_index_builder_ = nullptr; PartitionedIndexBuilder* p_index_builder_ = nullptr;
std::string last_key; std::string last_key;
const Slice* first_key_in_next_block = nullptr;
CompressionType compression_type; CompressionType compression_type;
uint64_t sample_for_compression; uint64_t sample_for_compression;
CompressionOptions compression_opts; CompressionOptions compression_opts;
std::unique_ptr<CompressionDict> compression_dict; std::unique_ptr<CompressionDict> compression_dict;
CompressionContext compression_ctx; std::vector<std::unique_ptr<CompressionContext>> compression_ctxs;
std::unique_ptr<UncompressionContext> verify_ctx; std::vector<std::unique_ptr<UncompressionContext>> verify_ctxs;
std::unique_ptr<UncompressionDict> verify_dict; std::unique_ptr<UncompressionDict> verify_dict;
size_t data_begin_offset = 0; size_t data_begin_offset = 0;
@ -356,6 +363,8 @@ struct BlockBasedTableBuilder::Rep {
std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors; std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
std::unique_ptr<ParallelCompressionRep> pc_rep;
Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions, Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions,
const BlockBasedTableOptions& table_opt, const BlockBasedTableOptions& table_opt,
const InternalKeyComparator& icomparator, const InternalKeyComparator& icomparator,
@ -390,7 +399,8 @@ struct BlockBasedTableBuilder::Rep {
sample_for_compression(_sample_for_compression), sample_for_compression(_sample_for_compression),
compression_opts(_compression_opts), compression_opts(_compression_opts),
compression_dict(), compression_dict(),
compression_ctx(_compression_type), compression_ctxs(_compression_opts.parallel_threads),
verify_ctxs(_compression_opts.parallel_threads),
verify_dict(), verify_dict(),
state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered
: State::kUnbuffered), : State::kUnbuffered),
@ -407,6 +417,9 @@ struct BlockBasedTableBuilder::Rep {
oldest_key_time(_oldest_key_time), oldest_key_time(_oldest_key_time),
target_file_size(_target_file_size), target_file_size(_target_file_size),
file_creation_time(_file_creation_time) { file_creation_time(_file_creation_time) {
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
compression_ctxs[i].reset(new CompressionContext(compression_type));
}
if (table_options.index_type == if (table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) { BlockBasedTableOptions::kTwoLevelIndexSearch) {
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
@ -441,8 +454,10 @@ struct BlockBasedTableBuilder::Rep {
table_options.index_type, table_options.whole_key_filtering, table_options.index_type, table_options.whole_key_filtering,
_moptions.prefix_extractor != nullptr)); _moptions.prefix_extractor != nullptr));
if (table_options.verify_compression) { if (table_options.verify_compression) {
verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(), for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
compression_type)); verify_ctxs[i].reset(new UncompressionContext(
UncompressionContext::NoCache(), compression_type));
}
} }
} }
@ -452,6 +467,148 @@ struct BlockBasedTableBuilder::Rep {
~Rep() {} ~Rep() {}
}; };
struct BlockBasedTableBuilder::ParallelCompressionRep {
// Keys is a wrapper of vector of strings avoiding
// releasing string memories during vector clear()
// in order to save memory allocation overhead
class Keys {
public:
Keys() : keys_(kKeysInitSize), size_(0) {}
void PushBack(const Slice& key) {
if (size_ == keys_.size()) {
keys_.emplace_back(key.data(), key.size());
} else {
keys_[size_].assign(key.data(), key.size());
}
size_++;
}
void SwapAssign(std::vector<std::string>& keys) {
size_ = keys.size();
std::swap(keys_, keys);
}
void Clear() { size_ = 0; }
size_t Size() { return size_; }
std::string& Back() { return keys_[size_ - 1]; }
std::string& operator[](size_t idx) {
assert(idx < size_);
return keys_[idx];
}
private:
const size_t kKeysInitSize = 32;
std::vector<std::string> keys_;
size_t size_;
};
std::unique_ptr<Keys> curr_block_keys;
class BlockRepSlot;
// BlockRep instances are fetched from and recycled to
// block_rep_pool during parallel compression.
struct BlockRep {
Slice contents;
std::unique_ptr<std::string> data;
std::unique_ptr<std::string> compressed_data;
CompressionType compression_type;
std::unique_ptr<std::string> first_key_in_next_block;
std::unique_ptr<Keys> keys;
std::unique_ptr<BlockRepSlot> slot;
Status status;
};
// Use a vector of BlockRep as a buffer for a determined number
// of BlockRep structures. All data referenced by pointers in
// BlockRep will be freed when this vector is destructed.
typedef std::vector<BlockRep> BlockRepBuffer;
BlockRepBuffer block_rep_buf;
// Use a thread-safe queue for concurrent access from block
// building thread and writer thread.
typedef WorkQueue<BlockRep*> BlockRepPool;
BlockRepPool block_rep_pool;
// Use BlockRepSlot to keep block order in write thread.
// slot_ will pass references to BlockRep
class BlockRepSlot {
public:
BlockRepSlot() : slot_(1) {}
template <typename T>
void Fill(T&& rep) {
slot_.push(std::forward<T>(rep));
};
void Take(BlockRep*& rep) { slot_.pop(rep); }
private:
// slot_ will pass references to BlockRep in block_rep_buf,
// and those references are always valid before the destruction of
// block_rep_buf.
WorkQueue<BlockRep*> slot_;
};
// Compression queue will pass references to BlockRep in block_rep_buf,
// and those references are always valid before the destruction of
// block_rep_buf.
typedef WorkQueue<BlockRep*> CompressQueue;
CompressQueue compress_queue;
std::vector<port::Thread> compress_thread_pool;
// Write queue will pass references to BlockRep::slot in block_rep_buf,
// and those references are always valid before the corresponding
// BlockRep::slot is destructed, which is before the destruction of
// block_rep_buf.
typedef WorkQueue<BlockRepSlot*> WriteQueue;
WriteQueue write_queue;
std::unique_ptr<port::Thread> write_thread;
// Raw bytes compressed so far.
uint64_t raw_bytes_compressed;
// Size of current block being appended.
uint64_t raw_bytes_curr_block;
// Raw bytes under compression and not appended yet.
std::atomic<uint64_t> raw_bytes_inflight;
// Number of blocks under compression and not appended yet.
std::atomic<uint64_t> blocks_inflight;
// Current compression ratio, maintained by BGWorkWriteRawBlock.
double curr_compression_ratio;
// Estimated SST file size.
uint64_t estimated_file_size;
// Wait for the completion of first block compression to get a
// non-zero compression ratio.
bool first_block;
std::condition_variable first_block_cond;
std::mutex first_block_mutex;
bool finished;
ParallelCompressionRep(uint32_t parallel_threads)
: curr_block_keys(new Keys()),
block_rep_buf(parallel_threads),
block_rep_pool(parallel_threads),
compress_queue(parallel_threads),
write_queue(parallel_threads),
raw_bytes_compressed(0),
raw_bytes_curr_block(0),
raw_bytes_inflight(0),
blocks_inflight(0),
curr_compression_ratio(0),
estimated_file_size(0),
first_block(true),
finished(false) {
for (uint32_t i = 0; i < parallel_threads; i++) {
block_rep_buf[i].contents = Slice();
block_rep_buf[i].data.reset(new std::string());
block_rep_buf[i].compressed_data.reset(new std::string());
block_rep_buf[i].compression_type = CompressionType();
block_rep_buf[i].first_key_in_next_block.reset(new std::string());
block_rep_buf[i].keys.reset(new Keys());
block_rep_buf[i].slot.reset(new BlockRepSlot());
block_rep_buf[i].status = Status::OK();
block_rep_pool.push(&block_rep_buf[i]);
}
}
~ParallelCompressionRep() { block_rep_pool.finish(); }
};
BlockBasedTableBuilder::BlockBasedTableBuilder( BlockBasedTableBuilder::BlockBasedTableBuilder(
const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions, const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions,
const BlockBasedTableOptions& table_options, const BlockBasedTableOptions& table_options,
@ -493,6 +650,21 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
&rep_->compressed_cache_key_prefix[0], &rep_->compressed_cache_key_prefix[0],
&rep_->compressed_cache_key_prefix_size); &rep_->compressed_cache_key_prefix_size);
} }
if (rep_->compression_opts.parallel_threads > 1) {
rep_->pc_rep.reset(
new ParallelCompressionRep(rep_->compression_opts.parallel_threads));
rep_->pc_rep->compress_thread_pool.reserve(
rep_->compression_opts.parallel_threads);
for (uint32_t i = 0; i < rep_->compression_opts.parallel_threads; i++) {
rep_->pc_rep->compress_thread_pool.emplace_back([=] {
BGWorkCompression(*(rep_->compression_ctxs[i]),
rep_->verify_ctxs[i].get());
});
}
rep_->pc_rep->write_thread.reset(
new port::Thread([=] { BGWorkWriteRawBlock(); }));
}
} }
BlockBasedTableBuilder::~BlockBasedTableBuilder() { BlockBasedTableBuilder::~BlockBasedTableBuilder() {
@ -516,6 +688,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
auto should_flush = r->flush_block_policy->Update(key, value); auto should_flush = r->flush_block_policy->Update(key, value);
if (should_flush) { if (should_flush) {
assert(!r->data_block.empty()); assert(!r->data_block.empty());
r->first_key_in_next_block = &key;
Flush(); Flush();
if (r->state == Rep::State::kBuffered && if (r->state == Rep::State::kBuffered &&
@ -532,16 +705,28 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
// entries in the first block and < all entries in subsequent // entries in the first block and < all entries in subsequent
// blocks. // blocks.
if (ok() && r->state == Rep::State::kUnbuffered) { if (ok() && r->state == Rep::State::kUnbuffered) {
r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle); if (r->compression_opts.parallel_threads > 1) {
r->pc_rep->curr_block_keys->Clear();
} else {
r->index_builder->AddIndexEntry(&r->last_key, &key,
r->pending_handle);
}
} }
} }
// Note: PartitionedFilterBlockBuilder requires key being added to filter // Note: PartitionedFilterBlockBuilder requires key being added to filter
// builder after being added to index builder. // builder after being added to index builder.
if (r->state == Rep::State::kUnbuffered && r->filter_builder != nullptr) { if (r->state == Rep::State::kUnbuffered) {
size_t ts_sz = r->internal_comparator.user_comparator()->timestamp_size(); if (r->compression_opts.parallel_threads > 1) {
r->pc_rep->curr_block_keys->PushBack(key);
} else {
if (r->filter_builder != nullptr) {
size_t ts_sz =
r->internal_comparator.user_comparator()->timestamp_size();
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz)); r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
} }
}
}
r->last_key.assign(key.data(), key.size()); r->last_key.assign(key.data(), key.size());
r->data_block.Add(key, value); r->data_block.Add(key, value);
@ -553,8 +738,10 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
} }
r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString()); r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString());
} else { } else {
if (r->compression_opts.parallel_threads == 1) {
r->index_builder->OnKeyAdded(key); r->index_builder->OnKeyAdded(key);
} }
}
NotifyCollectTableCollectorsOnAdd(key, value, r->offset, NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
r->table_properties_collectors, r->table_properties_collectors,
r->ioptions.info_log); r->ioptions.info_log);
@ -586,8 +773,58 @@ void BlockBasedTableBuilder::Flush() {
assert(rep_->state != Rep::State::kClosed); assert(rep_->state != Rep::State::kClosed);
if (!ok()) return; if (!ok()) return;
if (r->data_block.empty()) return; if (r->data_block.empty()) return;
if (r->compression_opts.parallel_threads > 1 &&
r->state == Rep::State::kUnbuffered) {
ParallelCompressionRep::BlockRep* block_rep;
r->pc_rep->block_rep_pool.pop(block_rep);
r->data_block.Finish();
r->data_block.SwapAndReset(*(block_rep->data));
block_rep->contents = *(block_rep->data);
block_rep->compression_type = r->compression_type;
std::swap(block_rep->keys, r->pc_rep->curr_block_keys);
r->pc_rep->curr_block_keys->Clear();
if (r->first_key_in_next_block == nullptr) {
block_rep->first_key_in_next_block.reset(nullptr);
} else {
block_rep->first_key_in_next_block->assign(
r->first_key_in_next_block->data(),
r->first_key_in_next_block->size());
}
uint64_t new_raw_bytes_inflight =
r->pc_rep->raw_bytes_inflight.fetch_add(block_rep->data->size(),
std::memory_order_relaxed) +
block_rep->data->size();
uint64_t new_blocks_inflight =
r->pc_rep->blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1;
r->pc_rep->estimated_file_size =
r->offset +
static_cast<uint64_t>(static_cast<double>(new_raw_bytes_inflight) *
r->pc_rep->curr_compression_ratio) +
new_blocks_inflight * kBlockTrailerSize;
assert(block_rep->status.ok());
if (!r->pc_rep->write_queue.push(block_rep->slot.get())) {
return;
}
if (!r->pc_rep->compress_queue.push(block_rep)) {
return;
}
if (r->pc_rep->first_block) {
std::unique_lock<std::mutex> lock(r->pc_rep->first_block_mutex);
r->pc_rep->first_block_cond.wait(lock,
[=] { return !r->pc_rep->first_block; });
}
} else {
WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */); WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
} }
}
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
BlockHandle* handle, BlockHandle* handle,
@ -599,6 +836,43 @@ void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
BlockHandle* handle, BlockHandle* handle,
bool is_data_block) { bool is_data_block) {
Rep* r = rep_;
Slice block_contents;
CompressionType type;
CompressAndVerifyBlock(raw_block_contents, is_data_block,
*(r->compression_ctxs[0]), r->verify_ctxs[0].get(),
r->compressed_output, block_contents, type, r->status);
if (!ok()) {
return;
}
WriteRawBlock(block_contents, type, handle, is_data_block);
r->compressed_output.clear();
if (is_data_block) {
if (r->filter_builder != nullptr) {
r->filter_builder->StartBlock(r->offset);
}
r->props.data_size = r->offset;
++r->props.num_data_blocks;
}
}
void BlockBasedTableBuilder::BGWorkCompression(
CompressionContext& compression_ctx, UncompressionContext* verify_ctx) {
ParallelCompressionRep::BlockRep* block_rep;
while (rep_->pc_rep->compress_queue.pop(block_rep)) {
CompressAndVerifyBlock(block_rep->contents, true, /* is_data_block*/
compression_ctx, verify_ctx,
*(block_rep->compressed_data), block_rep->contents,
block_rep->compression_type, block_rep->status);
block_rep->slot->Fill(block_rep);
}
}
void BlockBasedTableBuilder::CompressAndVerifyBlock(
const Slice& raw_block_contents, bool is_data_block,
CompressionContext& compression_ctx, UncompressionContext* verify_ctx_ptr,
std::string& compressed_output, Slice& block_contents,
CompressionType& type, Status& out_status) {
// File format contains a sequence of blocks where each block has: // File format contains a sequence of blocks where each block has:
// block_data: uint8[n] // block_data: uint8[n]
// type: uint8 // type: uint8
@ -606,9 +880,8 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
assert(ok()); assert(ok());
Rep* r = rep_; Rep* r = rep_;
auto type = r->compression_type; type = r->compression_type;
uint64_t sample_for_compression = r->sample_for_compression; uint64_t sample_for_compression = r->sample_for_compression;
Slice block_contents;
bool abort_compression = false; bool abort_compression = false;
StopWatchNano timer( StopWatchNano timer(
@ -631,7 +904,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
compression_dict = r->compression_dict.get(); compression_dict = r->compression_dict.get();
} }
assert(compression_dict != nullptr); assert(compression_dict != nullptr);
CompressionInfo compression_info(r->compression_opts, r->compression_ctx, CompressionInfo compression_info(r->compression_opts, compression_ctx,
*compression_dict, type, *compression_dict, type,
sample_for_compression); sample_for_compression);
@ -640,7 +913,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
block_contents = CompressBlock( block_contents = CompressBlock(
raw_block_contents, compression_info, &type, raw_block_contents, compression_info, &type,
r->table_options.format_version, is_data_block /* do_sample */, r->table_options.format_version, is_data_block /* do_sample */,
&r->compressed_output, &sampled_output_fast, &sampled_output_slow); &compressed_output, &sampled_output_fast, &sampled_output_slow);
// notify collectors on block add // notify collectors on block add
NotifyCollectTableCollectorsOnBlockAdd( NotifyCollectTableCollectorsOnBlockAdd(
@ -660,7 +933,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
} }
assert(verify_dict != nullptr); assert(verify_dict != nullptr);
BlockContents contents; BlockContents contents;
UncompressionInfo uncompression_info(*r->verify_ctx, *verify_dict, UncompressionInfo uncompression_info(*verify_ctx_ptr, *verify_dict,
r->compression_type); r->compression_type);
Status stat = UncompressBlockContentsForCompressionType( Status stat = UncompressBlockContentsForCompressionType(
uncompression_info, block_contents.data(), block_contents.size(), uncompression_info, block_contents.data(), block_contents.size(),
@ -673,12 +946,12 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
abort_compression = true; abort_compression = true;
ROCKS_LOG_ERROR(r->ioptions.info_log, ROCKS_LOG_ERROR(r->ioptions.info_log,
"Decompressed block did not match raw block"); "Decompressed block did not match raw block");
r->status = out_status =
Status::Corruption("Decompressed block did not match raw block"); Status::Corruption("Decompressed block did not match raw block");
} }
} else { } else {
// Decompression reported an error. abort. // Decompression reported an error. abort.
r->status = Status::Corruption("Could not decompress"); out_status = Status::Corruption("Could not decompress");
abort_compression = true; abort_compression = true;
} }
} }
@ -704,16 +977,6 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
} else if (type != r->compression_type) { } else if (type != r->compression_type) {
RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED); RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
} }
WriteRawBlock(block_contents, type, handle, is_data_block);
r->compressed_output.clear();
if (is_data_block) {
if (r->filter_builder != nullptr) {
r->filter_builder->StartBlock(r->offset);
}
r->props.data_size = r->offset;
++r->props.num_data_blocks;
}
} }
void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
@ -721,13 +984,15 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
BlockHandle* handle, BlockHandle* handle,
bool is_data_block) { bool is_data_block) {
Rep* r = rep_; Rep* r = rep_;
Status s = Status::OK();
IOStatus io_s = IOStatus::OK();
StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS); StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS);
handle->set_offset(r->offset); handle->set_offset(r->offset);
handle->set_size(block_contents.size()); handle->set_size(block_contents.size());
assert(r->status.ok()); assert(status().ok());
assert(r->io_status.ok()); assert(io_status().ok());
r->io_status = r->file->Append(block_contents); io_s = r->file->Append(block_contents);
if (r->io_status.ok()) { if (io_s.ok()) {
char trailer[kBlockTrailerSize]; char trailer[kBlockTrailerSize];
trailer[0] = type; trailer[0] = type;
char* trailer_without_type = trailer + 1; char* trailer_without_type = trailer + 1;
@ -766,34 +1031,157 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
} }
} }
assert(r->io_status.ok()); assert(io_s.ok());
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum", "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
static_cast<char*>(trailer)); static_cast<char*>(trailer));
r->io_status = r->file->Append(Slice(trailer, kBlockTrailerSize)); io_s = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->io_status.ok()) { if (io_s.ok()) {
r->status = InsertBlockInCache(block_contents, type, handle); s = InsertBlockInCache(block_contents, type, handle);
if (!s.ok()) {
SetStatusAtom(s);
} }
if (r->status.ok() && r->io_status.ok()) { } else {
SetIOStatusAtom(io_s);
}
if (s.ok() && io_s.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize; r->offset += block_contents.size() + kBlockTrailerSize;
if (r->table_options.block_align && is_data_block) { if (r->table_options.block_align && is_data_block) {
size_t pad_bytes = size_t pad_bytes =
(r->alignment - ((block_contents.size() + kBlockTrailerSize) & (r->alignment - ((block_contents.size() + kBlockTrailerSize) &
(r->alignment - 1))) & (r->alignment - 1))) &
(r->alignment - 1); (r->alignment - 1);
r->io_status = r->file->Pad(pad_bytes); io_s = r->file->Pad(pad_bytes);
if (r->io_status.ok()) { if (io_s.ok()) {
r->offset += pad_bytes; r->offset += pad_bytes;
} else {
SetIOStatusAtom(io_s);
}
}
if (r->compression_opts.parallel_threads > 1) {
if (!r->pc_rep->finished) {
r->pc_rep->curr_compression_ratio =
(r->pc_rep->curr_compression_ratio *
r->pc_rep->raw_bytes_compressed +
block_contents.size()) /
static_cast<double>(r->pc_rep->raw_bytes_compressed +
r->pc_rep->raw_bytes_curr_block);
r->pc_rep->raw_bytes_compressed += r->pc_rep->raw_bytes_curr_block;
uint64_t new_raw_bytes_inflight =
r->pc_rep->raw_bytes_inflight.fetch_sub(
r->pc_rep->raw_bytes_curr_block, std::memory_order_relaxed) -
r->pc_rep->raw_bytes_curr_block;
uint64_t new_blocks_inflight = r->pc_rep->blocks_inflight.fetch_sub(
1, std::memory_order_relaxed) -
1;
r->pc_rep->estimated_file_size =
r->offset +
static_cast<uint64_t>(static_cast<double>(new_raw_bytes_inflight) *
r->pc_rep->curr_compression_ratio) +
new_blocks_inflight * kBlockTrailerSize;
} else {
r->pc_rep->estimated_file_size = r->offset;
} }
} }
} }
} else {
SetIOStatusAtom(io_s);
}
if (!io_s.ok() && s.ok()) {
SetStatusAtom(io_s);
}
}
void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
Rep* r = rep_;
ParallelCompressionRep::BlockRepSlot* slot;
ParallelCompressionRep::BlockRep* block_rep;
while (r->pc_rep->write_queue.pop(slot)) {
slot->Take(block_rep);
if (!block_rep->status.ok()) {
SetStatusAtom(block_rep->status);
break;
}
for (size_t i = 0; i < block_rep->keys->Size(); i++) {
auto& key = (*block_rep->keys)[i];
if (r->filter_builder != nullptr) {
size_t ts_sz =
r->internal_comparator.user_comparator()->timestamp_size();
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
}
r->index_builder->OnKeyAdded(key);
}
r->pc_rep->raw_bytes_curr_block = block_rep->data->size();
WriteRawBlock(block_rep->contents, block_rep->compression_type,
&r->pending_handle, true /* is_data_block*/);
if (!r->status.ok()) {
break;
}
if (r->pc_rep->first_block) {
std::unique_lock<std::mutex> lock(r->pc_rep->first_block_mutex);
r->pc_rep->first_block = false;
r->pc_rep->first_block_cond.notify_one();
}
if (r->filter_builder != nullptr) {
r->filter_builder->StartBlock(r->offset);
}
r->props.data_size = r->offset;
++r->props.num_data_blocks;
if (block_rep->first_key_in_next_block == nullptr) {
r->index_builder->AddIndexEntry(&(block_rep->keys->Back()), nullptr,
r->pending_handle);
} else {
Slice first_key_in_next_block =
Slice(*block_rep->first_key_in_next_block);
r->index_builder->AddIndexEntry(&(block_rep->keys->Back()),
&first_key_in_next_block,
r->pending_handle);
}
block_rep->compressed_data->clear();
r->pc_rep->block_rep_pool.push(block_rep);
} }
r->status = r->io_status;
} }
Status BlockBasedTableBuilder::status() const { return rep_->status; } Status BlockBasedTableBuilder::status() const {
if (rep_->compression_opts.parallel_threads > 1) {
std::lock_guard<std::mutex> lock(rep_->status_mutex);
return rep_->status;
} else {
return rep_->status;
}
}
IOStatus BlockBasedTableBuilder::io_status() const { return rep_->io_status; } IOStatus BlockBasedTableBuilder::io_status() const {
if (rep_->compression_opts.parallel_threads > 1) {
std::lock_guard<std::mutex> lock(rep_->io_status_mutex);
return rep_->io_status;
} else {
return rep_->io_status;
}
}
void BlockBasedTableBuilder::SetStatusAtom(Status status) {
if (rep_->compression_opts.parallel_threads > 1) {
std::lock_guard<std::mutex> lock(rep_->status_mutex);
rep_->status = status;
} else {
rep_->status = status;
}
}
void BlockBasedTableBuilder::SetIOStatusAtom(IOStatus io_status) {
if (rep_->compression_opts.parallel_threads > 1) {
std::lock_guard<std::mutex> lock(rep_->io_status_mutex);
rep_->io_status = io_status;
} else {
rep_->io_status = io_status;
}
}
static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) { static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
BlockContents* bc = reinterpret_cast<BlockContents*>(value); BlockContents* bc = reinterpret_cast<BlockContents*>(value);
@ -1108,11 +1496,37 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
r->compression_type == kZSTDNotFinalCompression)); r->compression_type == kZSTDNotFinalCompression));
for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) { for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) {
const auto& data_block = r->data_block_and_keys_buffers[i].first; auto& data_block = r->data_block_and_keys_buffers[i].first;
auto& keys = r->data_block_and_keys_buffers[i].second; auto& keys = r->data_block_and_keys_buffers[i].second;
assert(!data_block.empty()); assert(!data_block.empty());
assert(!keys.empty()); assert(!keys.empty());
if (r->compression_opts.parallel_threads > 1) {
ParallelCompressionRep::BlockRep* block_rep;
r->pc_rep->block_rep_pool.pop(block_rep);
std::swap(*(block_rep->data), data_block);
block_rep->contents = *(block_rep->data);
block_rep->compression_type = r->compression_type;
block_rep->keys->SwapAssign(keys);
if (i + 1 < r->data_block_and_keys_buffers.size()) {
block_rep->first_key_in_next_block->assign(
r->data_block_and_keys_buffers[i + 1].second.front());
} else {
block_rep->first_key_in_next_block.reset(nullptr);
}
assert(block_rep->status.ok());
if (!r->pc_rep->write_queue.push(block_rep->slot.get())) {
return;
}
if (!r->pc_rep->compress_queue.push(block_rep)) {
return;
}
} else {
for (const auto& key : keys) { for (const auto& key : keys) {
if (r->filter_builder != nullptr) { if (r->filter_builder != nullptr) {
size_t ts_sz = size_t ts_sz =
@ -1121,13 +1535,15 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
} }
r->index_builder->OnKeyAdded(key); r->index_builder->OnKeyAdded(key);
} }
WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */); WriteBlock(Slice(data_block), &r->pending_handle,
true /* is_data_block */);
if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) { if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
Slice first_key_in_next_block = Slice first_key_in_next_block =
r->data_block_and_keys_buffers[i + 1].second.front(); r->data_block_and_keys_buffers[i + 1].second.front();
Slice* first_key_in_next_block_ptr = &first_key_in_next_block; Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
r->index_builder->AddIndexEntry(&keys.back(), first_key_in_next_block_ptr, r->index_builder->AddIndexEntry(
r->pending_handle); &keys.back(), first_key_in_next_block_ptr, r->pending_handle);
}
} }
} }
r->data_block_and_keys_buffers.clear(); r->data_block_and_keys_buffers.clear();
@ -1137,16 +1553,27 @@ Status BlockBasedTableBuilder::Finish() {
Rep* r = rep_; Rep* r = rep_;
assert(r->state != Rep::State::kClosed); assert(r->state != Rep::State::kClosed);
bool empty_data_block = r->data_block.empty(); bool empty_data_block = r->data_block.empty();
r->first_key_in_next_block = nullptr;
Flush(); Flush();
if (r->state == Rep::State::kBuffered) { if (r->state == Rep::State::kBuffered) {
EnterUnbuffered(); EnterUnbuffered();
} }
if (r->compression_opts.parallel_threads > 1) {
r->pc_rep->compress_queue.finish();
for (auto& thread : r->pc_rep->compress_thread_pool) {
thread.join();
}
r->pc_rep->write_queue.finish();
r->pc_rep->write_thread->join();
r->pc_rep->finished = true;
} else {
// To make sure properties block is able to keep the accurate size of index // To make sure properties block is able to keep the accurate size of index
// block, we will finish writing all index entries first. // block, we will finish writing all index entries first.
if (ok() && !empty_data_block) { if (ok() && !empty_data_block) {
r->index_builder->AddIndexEntry( r->index_builder->AddIndexEntry(
&r->last_key, nullptr /* no next data block */, r->pending_handle); &r->last_key, nullptr /* no next data block */, r->pending_handle);
} }
}
// Write meta blocks, metaindex block and footer in the following order. // Write meta blocks, metaindex block and footer in the following order.
// 1. [meta block: filter] // 1. [meta block: filter]
@ -1177,6 +1604,15 @@ Status BlockBasedTableBuilder::Finish() {
void BlockBasedTableBuilder::Abandon() { void BlockBasedTableBuilder::Abandon() {
assert(rep_->state != Rep::State::kClosed); assert(rep_->state != Rep::State::kClosed);
if (rep_->compression_opts.parallel_threads > 1) {
rep_->pc_rep->compress_queue.finish();
for (auto& thread : rep_->pc_rep->compress_thread_pool) {
thread.join();
}
rep_->pc_rep->write_queue.finish();
rep_->pc_rep->write_thread->join();
rep_->pc_rep->finished = true;
}
rep_->state = Rep::State::kClosed; rep_->state = Rep::State::kClosed;
} }
@ -1186,6 +1622,16 @@ uint64_t BlockBasedTableBuilder::NumEntries() const {
uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; } uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
uint64_t BlockBasedTableBuilder::EstimatedFileSize() const {
if (rep_->compression_opts.parallel_threads > 1) {
// Use compression ratio so far and inflight raw bytes to estimate
// final SST size.
return rep_->pc_rep->estimated_file_size;
} else {
return FileSize();
}
}
bool BlockBasedTableBuilder::NeedCompact() const { bool BlockBasedTableBuilder::NeedCompact() const {
for (const auto& collector : rep_->table_properties_collectors) { for (const auto& collector : rep_->table_properties_collectors) {
if (collector->NeedCompact()) { if (collector->NeedCompact()) {

@ -90,6 +90,11 @@ class BlockBasedTableBuilder : public TableBuilder {
// Finish() call, returns the size of the final generated file. // Finish() call, returns the size of the final generated file.
uint64_t FileSize() const override; uint64_t FileSize() const override;
// Estimated size of the file generated so far. This is used when
// FileSize() cannot estimate final SST size, e.g. parallel compression
// is enabled.
uint64_t EstimatedFileSize() const override;
bool NeedCompact() const override; bool NeedCompact() const override;
// Get table properties // Get table properties
@ -104,6 +109,10 @@ class BlockBasedTableBuilder : public TableBuilder {
private: private:
bool ok() const { return status().ok(); } bool ok() const { return status().ok(); }
void SetStatusAtom(Status status);
void SetIOStatusAtom(IOStatus io_status);
// Transition state from buffered to unbuffered. See `Rep::State` API comment // Transition state from buffered to unbuffered. See `Rep::State` API comment
// for details of the states. // for details of the states.
// REQUIRES: `rep_->state == kBuffered` // REQUIRES: `rep_->state == kBuffered`
@ -137,6 +146,8 @@ class BlockBasedTableBuilder : public TableBuilder {
class BlockBasedTablePropertiesCollector; class BlockBasedTablePropertiesCollector;
Rep* rep_; Rep* rep_;
struct ParallelCompressionRep;
// Advanced operation: flush any buffered key/value pairs to file. // Advanced operation: flush any buffered key/value pairs to file.
// Can be used to ensure that two adjacent entries never live in // Can be used to ensure that two adjacent entries never live in
// the same data block. Most clients should not need to use this method. // the same data block. Most clients should not need to use this method.
@ -146,6 +157,22 @@ class BlockBasedTableBuilder : public TableBuilder {
// Some compression libraries fail when the raw size is bigger than int. If // Some compression libraries fail when the raw size is bigger than int. If
// uncompressed size is bigger than kCompressionSizeLimit, don't compress it // uncompressed size is bigger than kCompressionSizeLimit, don't compress it
const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max(); const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max();
// Get blocks from mem-table walking thread, compress them and
// pass them to the write thread. Used in parallel compression mode only
void BGWorkCompression(CompressionContext& compression_ctx,
UncompressionContext* verify_ctx);
// Given raw block content, try to compress it and return result and
// compression type
void CompressAndVerifyBlock(
const Slice& raw_block_contents, bool is_data_block,
CompressionContext& compression_ctx, UncompressionContext* verify_ctx,
std::string& compressed_output, Slice& result_block_contents,
CompressionType& result_compression_type, Status& out_status);
// Get compressed blocks from BGWorkCompression and write them into SST
void BGWorkWriteRawBlock();
}; };
Slice CompressBlock(const Slice& raw, const CompressionInfo& info, Slice CompressBlock(const Slice& raw, const CompressionInfo& info,

@ -81,6 +81,11 @@ void BlockBuilder::Reset() {
} }
} }
void BlockBuilder::SwapAndReset(std::string& buffer) {
std::swap(buffer_, buffer);
Reset();
}
size_t BlockBuilder::EstimateSizeAfterKV(const Slice& key, size_t BlockBuilder::EstimateSizeAfterKV(const Slice& key,
const Slice& value) const { const Slice& value) const {
size_t estimate = CurrentSizeEstimate(); size_t estimate = CurrentSizeEstimate();

@ -32,6 +32,9 @@ class BlockBuilder {
// Reset the contents as if the BlockBuilder was just constructed. // Reset the contents as if the BlockBuilder was just constructed.
void Reset(); void Reset();
// Swap the contents in BlockBuilder with buffer, then reset the BlockBuilder.
void SwapAndReset(std::string& buffer);
// REQUIRES: Finish() has not been called since the last call to Reset(). // REQUIRES: Finish() has not been called since the last call to Reset().
// REQUIRES: key is larger than any previously added key // REQUIRES: key is larger than any previously added key
void Add(const Slice& key, const Slice& value, void Add(const Slice& key, const Slice& value,

@ -156,6 +156,11 @@ class TableBuilder {
// Finish() call, returns the size of the final generated file. // Finish() call, returns the size of the final generated file.
virtual uint64_t FileSize() const = 0; virtual uint64_t FileSize() const = 0;
// Estimated size of the file generated so far. This is used when
// FileSize() cannot estimate final SST size, e.g. parallel compression
// is enabled.
virtual uint64_t EstimatedFileSize() const { return FileSize(); }
// If the user defined table properties collector suggest the file to // If the user defined table properties collector suggest the file to
// be further compacted. // be further compacted.
virtual bool NeedCompact() const { return false; } virtual bool NeedCompact() const { return false; }

@ -599,6 +599,7 @@ struct TestArgs {
bool reverse_compare; bool reverse_compare;
int restart_interval; int restart_interval;
CompressionType compression; CompressionType compression;
uint32_t compression_parallel_threads;
uint32_t format_version; uint32_t format_version;
bool use_mmap; bool use_mmap;
}; };
@ -616,6 +617,7 @@ static std::vector<TestArgs> GenerateArgList() {
MEMTABLE_TEST, DB_TEST}; MEMTABLE_TEST, DB_TEST};
std::vector<bool> reverse_compare_types = {false, true}; std::vector<bool> reverse_compare_types = {false, true};
std::vector<int> restart_intervals = {16, 1, 1024}; std::vector<int> restart_intervals = {16, 1, 1024};
std::vector<uint32_t> compression_parallel_threads = {1, 4};
// Only add compression if it is supported // Only add compression if it is supported
std::vector<std::pair<CompressionType, bool>> compression_types; std::vector<std::pair<CompressionType, bool>> compression_types;
@ -658,6 +660,7 @@ static std::vector<TestArgs> GenerateArgList() {
one_arg.reverse_compare = reverse_compare; one_arg.reverse_compare = reverse_compare;
one_arg.restart_interval = restart_intervals[0]; one_arg.restart_interval = restart_intervals[0];
one_arg.compression = compression_types[0].first; one_arg.compression = compression_types[0].first;
one_arg.compression_parallel_threads = 1;
one_arg.use_mmap = true; one_arg.use_mmap = true;
test_args.push_back(one_arg); test_args.push_back(one_arg);
one_arg.use_mmap = false; one_arg.use_mmap = false;
@ -668,18 +671,21 @@ static std::vector<TestArgs> GenerateArgList() {
for (auto restart_interval : restart_intervals) { for (auto restart_interval : restart_intervals) {
for (auto compression_type : compression_types) { for (auto compression_type : compression_types) {
for (auto num_threads : compression_parallel_threads) {
TestArgs one_arg; TestArgs one_arg;
one_arg.type = test_type; one_arg.type = test_type;
one_arg.reverse_compare = reverse_compare; one_arg.reverse_compare = reverse_compare;
one_arg.restart_interval = restart_interval; one_arg.restart_interval = restart_interval;
one_arg.compression = compression_type.first; one_arg.compression = compression_type.first;
one_arg.format_version = compression_type.second ? 2 : 1; one_arg.format_version = compression_type.second ? 2 : 1;
one_arg.compression_parallel_threads = num_threads;
one_arg.use_mmap = false; one_arg.use_mmap = false;
test_args.push_back(one_arg); test_args.push_back(one_arg);
} }
} }
} }
} }
}
return test_args; return test_args;
} }
@ -727,6 +733,8 @@ class HarnessTest : public testing::Test {
constructor_ = nullptr; constructor_ = nullptr;
options_ = Options(); options_ = Options();
options_.compression = args.compression; options_.compression = args.compression;
options_.compression_opts.parallel_threads =
args.compression_parallel_threads;
// Use shorter block size for tests to exercise block boundary // Use shorter block size for tests to exercise block boundary
// conditions more. // conditions more.
if (args.reverse_compare) { if (args.reverse_compare) {

@ -919,6 +919,9 @@ DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts"
" not compressed. Otherwise, apply compression_type to " " not compressed. Otherwise, apply compression_type to "
"all levels."); "all levels.");
DEFINE_int32(compression_threads, 1,
"Number of concurrent compression threads to run.");
static bool ValidateTableCacheNumshardbits(const char* flagname, static bool ValidateTableCacheNumshardbits(const char* flagname,
int32_t value) { int32_t value) {
if (0 >= value || value > 20) { if (0 >= value || value > 20) {
@ -4008,6 +4011,7 @@ class Benchmark {
options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes; options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
options.compression_opts.zstd_max_train_bytes = options.compression_opts.zstd_max_train_bytes =
FLAGS_compression_zstd_max_train_bytes; FLAGS_compression_zstd_max_train_bytes;
options.compression_opts.parallel_threads = FLAGS_compression_threads;
// If this is a block based table, set some related options // If this is a block based table, set some related options
if (options.table_factory->Name() == BlockBasedTableFactory::kName && if (options.table_factory->Name() == BlockBasedTableFactory::kName &&
options.table_factory->GetOptions() != nullptr) { options.table_factory->GetOptions() != nullptr) {

@ -0,0 +1,149 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
/*
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
*/
#pragma once
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <cstddef>
#include <cstddef>
#include <functional>
#include <mutex>
#include <queue>
namespace ROCKSDB_NAMESPACE {
/// Unbounded thread-safe work queue.
//
// This file is an excerpt from Facebook's zstd repo at
// https://github.com/facebook/zstd/. The relevant file is
// contrib/pzstd/utils/WorkQueue.h.
template <typename T>
class WorkQueue {
// Protects all member variable access
std::mutex mutex_;
std::condition_variable readerCv_;
std::condition_variable writerCv_;
std::condition_variable finishCv_;
std::queue<T> queue_;
bool done_;
std::size_t maxSize_;
// Must have lock to call this function
bool full() const {
if (maxSize_ == 0) {
return false;
}
return queue_.size() >= maxSize_;
}
public:
/**
* Constructs an empty work queue with an optional max size.
* If `maxSize == 0` the queue size is unbounded.
*
* @param maxSize The maximum allowed size of the work queue.
*/
WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
/**
* Push an item onto the work queue. Notify a single thread that work is
* available. If `finish()` has been called, do nothing and return false.
* If `push()` returns false, then `item` has not been copied from.
*
* @param item Item to push onto the queue.
* @returns True upon success, false if `finish()` has been called. An
* item was pushed iff `push()` returns true.
*/
template <typename U>
bool push(U&& item) {
{
std::unique_lock<std::mutex> lock(mutex_);
while (full() && !done_) {
writerCv_.wait(lock);
}
if (done_) {
return false;
}
queue_.push(std::forward<U>(item));
}
readerCv_.notify_one();
return true;
}
/**
* Attempts to pop an item off the work queue. It will block until data is
* available or `finish()` has been called.
*
* @param[out] item If `pop` returns `true`, it contains the popped item.
* If `pop` returns `false`, it is unmodified.
* @returns True upon success. False if the queue is empty and
* `finish()` has been called.
*/
bool pop(T& item) {
{
std::unique_lock<std::mutex> lock(mutex_);
while (queue_.empty() && !done_) {
readerCv_.wait(lock);
}
if (queue_.empty()) {
assert(done_);
return false;
}
item = queue_.front();
queue_.pop();
}
writerCv_.notify_one();
return true;
}
/**
* Sets the maximum queue size. If `maxSize == 0` then it is unbounded.
*
* @param maxSize The new maximum queue size.
*/
void setMaxSize(std::size_t maxSize) {
{
std::lock_guard<std::mutex> lock(mutex_);
maxSize_ = maxSize;
}
writerCv_.notify_all();
}
/**
* Promise that `push()` won't be called again, so once the queue is empty
* there will never any more work.
*/
void finish() {
{
std::lock_guard<std::mutex> lock(mutex_);
assert(!done_);
done_ = true;
}
readerCv_.notify_all();
writerCv_.notify_all();
finishCv_.notify_all();
}
/// Blocks until `finish()` has been called (but the queue may not be empty).
void waitUntilFinished() {
std::unique_lock<std::mutex> lock(mutex_);
while (!done_) {
finishCv_.wait(lock);
}
}
};
}

@ -0,0 +1,268 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
/*
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
* LICENSE file in the root directory of this source tree) and the GPLv2 (found
* in the COPYING file in the root directory of this source tree).
*/
#include "util/work_queue.h"
#include <gtest/gtest.h>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
namespace ROCKSDB_NAMESPACE {
// Unit test for work_queue.h.
//
// This file is an excerpt from Facebook's zstd repo at
// https://github.com/facebook/zstd/. The relevant file is
// contrib/pzstd/utils/test/WorkQueueTest.cpp.
struct Popper {
WorkQueue<int>* queue;
int* results;
std::mutex* mutex;
void operator()() {
int result;
while (queue->pop(result)) {
std::lock_guard<std::mutex> lock(*mutex);
results[result] = result;
}
}
};
TEST(WorkQueue, SingleThreaded) {
WorkQueue<int> queue;
int result;
queue.push(5);
EXPECT_TRUE(queue.pop(result));
EXPECT_EQ(5, result);
queue.push(1);
queue.push(2);
EXPECT_TRUE(queue.pop(result));
EXPECT_EQ(1, result);
EXPECT_TRUE(queue.pop(result));
EXPECT_EQ(2, result);
queue.push(1);
queue.push(2);
queue.finish();
EXPECT_TRUE(queue.pop(result));
EXPECT_EQ(1, result);
EXPECT_TRUE(queue.pop(result));
EXPECT_EQ(2, result);
EXPECT_FALSE(queue.pop(result));
queue.waitUntilFinished();
}
TEST(WorkQueue, SPSC) {
WorkQueue<int> queue;
const int max = 100;
for (int i = 0; i < 10; ++i) {
queue.push(i);
}
std::thread thread([&queue, max] {
int result;
for (int i = 0;; ++i) {
if (!queue.pop(result)) {
EXPECT_EQ(i, max);
break;
}
EXPECT_EQ(i, result);
}
});
std::this_thread::yield();
for (int i = 10; i < max; ++i) {
queue.push(i);
}
queue.finish();
thread.join();
}
TEST(WorkQueue, SPMC) {
WorkQueue<int> queue;
std::vector<int> results(50, -1);
std::mutex mutex;
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
threads.emplace_back(Popper{&queue, results.data(), &mutex});
}
for (int i = 0; i < 50; ++i) {
queue.push(i);
}
queue.finish();
for (auto& thread : threads) {
thread.join();
}
for (int i = 0; i < 50; ++i) {
EXPECT_EQ(i, results[i]);
}
}
TEST(WorkQueue, MPMC) {
WorkQueue<int> queue;
std::vector<int> results(100, -1);
std::mutex mutex;
std::vector<std::thread> popperThreads;
for (int i = 0; i < 4; ++i) {
popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
}
std::vector<std::thread> pusherThreads;
for (int i = 0; i < 2; ++i) {
auto min = i * 50;
auto max = (i + 1) * 50;
pusherThreads.emplace_back([&queue, min, max] {
for (int j = min; j < max; ++j) {
queue.push(j);
}
});
}
for (auto& thread : pusherThreads) {
thread.join();
}
queue.finish();
for (auto& thread : popperThreads) {
thread.join();
}
for (int i = 0; i < 100; ++i) {
EXPECT_EQ(i, results[i]);
}
}
TEST(WorkQueue, BoundedSizeWorks) {
WorkQueue<int> queue(1);
int result;
queue.push(5);
queue.pop(result);
queue.push(5);
queue.pop(result);
queue.push(5);
queue.finish();
queue.pop(result);
EXPECT_EQ(5, result);
}
TEST(WorkQueue, BoundedSizePushAfterFinish) {
WorkQueue<int> queue(1);
int result;
queue.push(5);
std::thread pusher([&queue] { queue.push(6); });
// Dirtily try and make sure that pusher has run.
std::this_thread::sleep_for(std::chrono::seconds(1));
queue.finish();
EXPECT_TRUE(queue.pop(result));
EXPECT_EQ(5, result);
EXPECT_FALSE(queue.pop(result));
pusher.join();
}
TEST(WorkQueue, SetMaxSize) {
WorkQueue<int> queue(2);
int result;
queue.push(5);
queue.push(6);
queue.setMaxSize(1);
std::thread pusher([&queue] { queue.push(7); });
// Dirtily try and make sure that pusher has run.
std::this_thread::sleep_for(std::chrono::seconds(1));
queue.finish();
EXPECT_TRUE(queue.pop(result));
EXPECT_EQ(5, result);
EXPECT_TRUE(queue.pop(result));
EXPECT_EQ(6, result);
EXPECT_FALSE(queue.pop(result));
pusher.join();
}
TEST(WorkQueue, BoundedSizeMPMC) {
WorkQueue<int> queue(10);
std::vector<int> results(200, -1);
std::mutex mutex;
std::cerr << "Creating popperThreads" << std::endl;
std::vector<std::thread> popperThreads;
for (int i = 0; i < 4; ++i) {
popperThreads.emplace_back(Popper{&queue, results.data(), &mutex});
}
std::cerr << "Creating pusherThreads" << std::endl;
std::vector<std::thread> pusherThreads;
for (int i = 0; i < 2; ++i) {
auto min = i * 100;
auto max = (i + 1) * 100;
pusherThreads.emplace_back([&queue, min, max] {
for (int j = min; j < max; ++j) {
queue.push(j);
}
});
}
std::cerr << "Joining pusherThreads" << std::endl;
for (auto& thread : pusherThreads) {
thread.join();
}
std::cerr << "Finishing queue" << std::endl;
queue.finish();
std::cerr << "Joining popperThreads" << std::endl;
for (auto& thread : popperThreads) {
thread.join();
}
std::cerr << "Inspecting results" << std::endl;
for (int i = 0; i < 200; ++i) {
EXPECT_EQ(i, results[i]);
}
}
TEST(WorkQueue, FailedPush) {
WorkQueue<int> queue;
EXPECT_TRUE(queue.push(1));
queue.finish();
EXPECT_FALSE(queue.push(1));
}
TEST(WorkQueue, FailedPop) {
WorkQueue<int> queue;
int x = 5;
EXPECT_TRUE(queue.push(x));
queue.finish();
x = 0;
EXPECT_TRUE(queue.pop(x));
EXPECT_EQ(5, x);
EXPECT_FALSE(queue.pop(x));
EXPECT_EQ(5, x);
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Loading…
Cancel
Save