From 0a4cdde50a2280c4aecd66ed430ce112bfed6933 Mon Sep 17 00:00:00 2001 From: Dmitri Smirnov Date: Mon, 6 Feb 2017 14:43:55 -0800 Subject: [PATCH] Windows thread Summary: introduce new methods into a public threadpool interface, - allow submission of std::functions as they allow greater flexibility. - add Joining methods to the implementation to join scheduled and submitted jobs with an option to cancel jobs that did not start executing. - Remove ugly `#ifdefs` between pthread and std implementation, make it uniform. - introduce pimpl for a drop in replacement of the implementation - Introduce rocksdb::port::Thread typedef which is a replacement for std::thread. On Posix Thread defaults as before std::thread. - Implement WindowsThread that allocates memory in a more controllable manner than windows std::thread with a replaceable implementation. - should be no functionality changes. Closes https://github.com/facebook/rocksdb/pull/1823 Differential Revision: D4492902 Pulled By: siying fbshipit-source-id: c74cb11 --- CMakeLists.txt | 2 + db/auto_roll_logger_test.cc | 4 +- db/column_family_test.cc | 23 +- db/compact_files_test.cc | 3 +- db/compaction_iterator_test.cc | 5 +- db/compaction_job.cc | 2 +- db/db_compaction_test.cc | 5 +- db/db_dynamic_level_test.cc | 5 +- db/db_iterator_test.cc | 2 +- db/db_test.cc | 9 +- db/db_test2.cc | 17 +- db/db_wal_test.cc | 5 +- db/external_sst_file_test.cc | 14 +- db/forward_iterator_bench.cc | 11 +- db/memtablerep_bench.cc | 12 +- db/perf_context_test.cc | 3 +- db/version_builder.cc | 3 +- db/write_callback_test.cc | 3 +- include/rocksdb/threadpool.h | 19 + port/port_posix.h | 3 + port/win/env_win.cc | 3 +- port/win/env_win.h | 16 +- port/win/port_win.h | 5 + port/win/win_thread.cc | 166 +++++++ port/win/win_thread.h | 121 +++++ table/table_test.cc | 5 +- tools/db_bench_tool.cc | 4 +- tools/write_stress.cc | 5 +- util/delete_scheduler.cc | 2 +- util/delete_scheduler.h | 2 +- util/delete_scheduler_test.cc | 2 +- util/dynamic_bloom_test.cc | 2 +- util/env_posix.cc | 2 + util/thread_local_test.cc | 2 +- util/threadpool_imp.cc | 422 ++++++++++-------- util/threadpool_imp.h | 138 +++--- utilities/backupable/backupable_db.cc | 2 +- utilities/backupable/backupable_db_test.cc | 2 +- utilities/checkpoint/checkpoint_test.cc | 7 +- .../persistent_cache/block_cache_tier.cc | 3 +- utilities/persistent_cache/block_cache_tier.h | 2 +- .../persistent_cache/block_cache_tier_file.cc | 3 +- .../persistent_cache/block_cache_tier_file.h | 2 +- .../persistent_cache_bench.cc | 8 +- .../persistent_cache/persistent_cache_test.h | 9 +- utilities/spatialdb/spatial_db.cc | 3 +- .../optimistic_transaction_test.cc | 3 +- utilities/transactions/transaction_test.cc | 12 +- 48 files changed, 759 insertions(+), 344 deletions(-) create mode 100644 port/win/win_thread.cc create mode 100644 port/win/win_thread.h diff --git a/CMakeLists.txt b/CMakeLists.txt index bf9c781f2..66717d517 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -377,6 +377,7 @@ set(SOURCES util/histogram_windowing.cc util/instrumented_mutex.cc util/iostats_context.cc + util/lru_cache.cc tools/ldb_cmd.cc tools/ldb_tool.cc @@ -461,6 +462,7 @@ if(WIN32) port/win/env_default.cc port/win/port_win.cc port/win/win_logger.cc + port/win/win_thread.cc port/win/xpress_win.cc) else() list(APPEND SOURCES diff --git a/db/auto_roll_logger_test.cc b/db/auto_roll_logger_test.cc index 6a0c95461..547522ccc 100644 --- a/db/auto_roll_logger_test.cc +++ b/db/auto_roll_logger_test.cc @@ -271,7 +271,7 @@ TEST_F(AutoRollLoggerTest, LogFlushWhileRolling) { AutoRollLogger* auto_roll_logger = dynamic_cast(logger.get()); ASSERT_TRUE(auto_roll_logger); - std::thread flush_thread; + rocksdb::port::Thread flush_thread; // Notes: // (1) Need to pin the old logger before beginning the roll, as rolling grabs @@ -293,7 +293,7 @@ TEST_F(AutoRollLoggerTest, LogFlushWhileRolling) { {"AutoRollLogger::Flush:PinnedLogger", "PosixLogger::Flush:Begin2"}}); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - flush_thread = std::thread([&]() { auto_roll_logger->Flush(); }); + flush_thread = port::Thread ([&]() { auto_roll_logger->Flush(); }); TEST_SYNC_POINT( "AutoRollLoggerTest::LogFlushWhileRolling:PreRollAndPostThreadInit"); RollLogFileBySizeTest(auto_roll_logger, options.max_log_file_size, diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 7e5164e96..e55e434cd 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -15,6 +15,7 @@ #include "db/db_impl.h" #include "db/db_test_util.h" +#include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" @@ -1268,7 +1269,7 @@ TEST_F(ColumnFamilyTest, MultipleManualCompactions) { }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::vector threads; + std::vector threads; threads.emplace_back([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; @@ -1376,7 +1377,7 @@ TEST_F(ColumnFamilyTest, AutomaticAndManualCompactions) { WaitForFlush(2); AssertFilesPerLevel(ToString(i + 1), 2); } - std::thread threads([&] { + rocksdb::port::Thread threads([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK( @@ -1464,7 +1465,7 @@ TEST_F(ColumnFamilyTest, ManualAndAutomaticCompactions) { }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::thread threads([&] { + rocksdb::port::Thread threads([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK( @@ -1557,7 +1558,7 @@ TEST_F(ColumnFamilyTest, SameCFManualManualCompactions) { }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::thread threads([&] { + rocksdb::port::Thread threads([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = true; ASSERT_OK( @@ -1577,7 +1578,7 @@ TEST_F(ColumnFamilyTest, SameCFManualManualCompactions) { 1); } - std::thread threads1([&] { + rocksdb::port::Thread threads1([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK( @@ -1655,7 +1656,7 @@ TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactions) { }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::thread threads([&] { + rocksdb::port::Thread threads([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK( @@ -1747,7 +1748,7 @@ TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactionsLevel) { }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::thread threads([&] { + rocksdb::port::Thread threads([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK( @@ -1858,7 +1859,7 @@ TEST_F(ColumnFamilyTest, SameCFManualAutomaticConflict) { }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::thread threads([&] { + rocksdb::port::Thread threads([&] { CompactRangeOptions compact_options; compact_options.exclusive_manual_compaction = false; ASSERT_OK( @@ -2323,7 +2324,7 @@ TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) { int kKeysNum = 10000; PutRandomData(1, kKeysNum, 100); - std::vector threads; + std::vector threads; threads.emplace_back([&] { ASSERT_OK(db_->DropColumnFamily(handles_[1])); }); sleeping_task.WakeUp(); @@ -2424,7 +2425,7 @@ TEST_F(ColumnFamilyTest, CreateAndDropRace) { // Start a thread that will drop the first column family // and its comparator - std::thread drop_cf_thread(DropSingleColumnFamily, this, 1, &comparators); + rocksdb::port::Thread drop_cf_thread(DropSingleColumnFamily, this, 1, &comparators); DropColumnFamilies({2}); @@ -3078,7 +3079,7 @@ TEST_F(ColumnFamilyTest, LogSyncConflictFlush) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::thread thread([&] { db_->SyncWAL(); }); + rocksdb::port::Thread thread([&] { db_->SyncWAL(); }); TEST_SYNC_POINT("ColumnFamilyTest::LogSyncConflictFlush:1"); Flush(1); diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index 76042b6e3..b48b0e9cf 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -11,6 +11,7 @@ #include #include "db/db_impl.h" +#include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "util/string_util.h" @@ -231,7 +232,7 @@ TEST_F(CompactFilesTest, CapturingPendingFiles) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); // Start compacting files. - std::thread compaction_thread( + rocksdb::port::Thread compaction_thread( [&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); }); // In the meantime flush another file. diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index fede1cbd4..d06aac49d 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -8,6 +8,7 @@ #include #include +#include "port/port.h" #include "util/testharness.h" #include "util/testutil.h" @@ -393,7 +394,7 @@ TEST_F(CompactionIteratorTest, ShuttingDownInFilter) { compaction_proxy_->key_not_exists_beyond_output_level = true; std::atomic seek_done{false}; - std::thread compaction_thread([&] { + rocksdb::port::Thread compaction_thread([&] { c_iter_->SeekToFirst(); EXPECT_FALSE(c_iter_->Valid()); EXPECT_TRUE(c_iter_->status().IsShutdownInProgress()); @@ -429,7 +430,7 @@ TEST_F(CompactionIteratorTest, ShuttingDownInMerge) { compaction_proxy_->key_not_exists_beyond_output_level = true; std::atomic seek_done{false}; - std::thread compaction_thread([&] { + rocksdb::port::Thread compaction_thread([&] { c_iter_->SeekToFirst(); ASSERT_FALSE(c_iter_->Valid()); ASSERT_TRUE(c_iter_->status().IsShutdownInProgress()); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index ff4f4cb64..0e2f68f21 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -522,7 +522,7 @@ Status CompactionJob::Run() { const uint64_t start_micros = env_->NowMicros(); // Launch a thread for each of subcompactions 1...num_threads-1 - std::vector thread_pool; + std::vector thread_pool; thread_pool.reserve(num_threads - 1); for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) { thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this, diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 34edc5387..6197108a8 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -9,6 +9,7 @@ #include "db/db_test_util.h" #include "port/stack_trace.h" +#include "port/port.h" #include "rocksdb/experimental.h" #include "rocksdb/utilities/convenience.h" #include "util/sync_point.h" @@ -1101,7 +1102,7 @@ TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) { ASSERT_EQ(trivial_move, 6); ASSERT_EQ(non_trivial_move, 0); - std::thread threads([&] { + rocksdb::port::Thread threads([&] { compact_options.change_level = false; compact_options.exclusive_manual_compaction = false; std::string begin_string = Key(0); @@ -1233,7 +1234,7 @@ TEST_F(DBCompactionTest, ManualPartialFill) { ASSERT_EQ(trivial_move, 2); ASSERT_EQ(non_trivial_move, 0); - std::thread threads([&] { + rocksdb::port::Thread threads([&] { compact_options.change_level = false; compact_options.exclusive_manual_compaction = false; std::string begin_string = Key(0); diff --git a/db/db_dynamic_level_test.cc b/db/db_dynamic_level_test.cc index ca04f8594..6a36810e7 100644 --- a/db/db_dynamic_level_test.cc +++ b/db/db_dynamic_level_test.cc @@ -13,6 +13,7 @@ #if !defined(ROCKSDB_LITE) #include "db/db_test_util.h" +#include "port/port.h" #include "port/stack_trace.h" namespace rocksdb { @@ -251,7 +252,7 @@ TEST_F(DBTestDynamicLevel, DynamicLevelMaxBytesBase2) { }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::thread thread([this] { + rocksdb::port::Thread thread([this] { TEST_SYNC_POINT("DynamicLevelMaxBytesBase2:compact_range_start"); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); TEST_SYNC_POINT("DynamicLevelMaxBytesBase2:compact_range_finish"); @@ -462,7 +463,7 @@ TEST_F(DBTestDynamicLevel, DISABLED_MigrateToDynamicLevelMaxBytesBase) { compaction_finished = false; // Issue manual compaction in one thread and still verify DB state // in main thread. - std::thread t([&]() { + rocksdb::port::Thread t([&]() { CompactRangeOptions compact_options; compact_options.change_level = true; compact_options.target_level = options.num_levels - 1; diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 2eee94459..72b3d58be 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -1644,7 +1644,7 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) { std::atomic total_prev_found(0); std::atomic total_bytes(0); - std::vector threads; + std::vector threads; std::function reader_func_next = [&]() { Iterator* iter = db_->NewIterator(ReadOptions()); diff --git a/db/db_test.cc b/db/db_test.cc index 1dd1e3ea1..f80303ed5 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -31,6 +31,7 @@ #include "db/version_set.h" #include "db/write_batch_internal.h" #include "memtable/hash_linklist_rep.h" +#include "port/port.h" #include "port/stack_trace.h" #include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" @@ -998,7 +999,7 @@ TEST_F(DBTest, FlushSchedule) { options.max_write_buffer_number = 2; options.write_buffer_size = 120 * 1024; CreateAndReopenWithCF({"pikachu"}, options); - std::vector threads; + std::vector threads; std::atomic thread_num(0); // each column family will have 5 thread, each thread generating 2 memtables. @@ -3556,7 +3557,7 @@ TEST_F(DBTest, SanitizeNumThreads) { } TEST_F(DBTest, WriteSingleThreadEntry) { - std::vector threads; + std::vector threads; dbfull()->TEST_LockMutex(); auto w = dbfull()->TEST_BeginWrite(); threads.emplace_back([&] { Put("a", "b"); }); @@ -5434,7 +5435,7 @@ TEST_F(DBTest, FlushesInParallelWithCompactRange) { } rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::vector threads; + std::vector threads; threads.emplace_back([&]() { Compact("a", "z"); }); TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:1"); @@ -5840,7 +5841,7 @@ TEST_F(DBTest, PauseBackgroundWorkTest) { options.write_buffer_size = 100000; // Small write buffer Reopen(options); - std::vector threads; + std::vector threads; std::atomic done(false); db_->PauseBackgroundWork(); threads.emplace_back([&]() { diff --git a/db/db_test2.cc b/db/db_test2.cc index bf21101e0..6eb8b5951 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -11,6 +11,7 @@ #include #include "db/db_test_util.h" +#include "port/port.h" #include "port/stack_trace.h" #include "rocksdb/persistent_cache.h" #include "rocksdb/wal_filter.h" @@ -1616,8 +1617,8 @@ TEST_F(DBTest2, SyncPointMarker) { CountSyncPoint(); }; - auto thread1 = std::thread(func1); - auto thread2 = std::thread(func2); + auto thread1 = port::Thread(func1); + auto thread2 = port::Thread(func2); thread1.join(); thread2.join(); @@ -1906,8 +1907,8 @@ TEST_P(MergeOperatorPinningTest, TailingIterator) { delete iter; }; - std::thread writer_thread(writer_func); - std::thread reader_thread(reader_func); + rocksdb::port::Thread writer_thread(writer_func); + rocksdb::port::Thread reader_thread(reader_func); writer_thread.join(); reader_thread.join(); @@ -2178,7 +2179,7 @@ TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) { cro.exclusive_manual_compaction = false; ASSERT_OK(db_->CompactRange(cro, &k1s, &k2s)); }; - std::thread bg_thread; + rocksdb::port::Thread bg_thread; // While the compaction is running, we will create 2 new files that // can fit in L1, these 2 files will be moved to L1 and overlap with @@ -2199,7 +2200,7 @@ TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) { ASSERT_OK(Flush()); // Start a non-exclusive manual compaction in a bg thread - bg_thread = std::thread(bg_manual_compact); + bg_thread = port::Thread(bg_manual_compact); // This manual compaction conflict with the other manual compaction // so it should wait until the first compaction finish env_->SleepForMicroseconds(1000000); @@ -2240,7 +2241,7 @@ TEST_F(DBTest2, GetRaceFlush1) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::thread t1([&] { + rocksdb::port::Thread t1([&] { TEST_SYNC_POINT("DBTest2::GetRaceFlush:1"); ASSERT_OK(Put("foo", "v2")); Flush(); @@ -2263,7 +2264,7 @@ TEST_F(DBTest2, GetRaceFlush2) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::thread t1([&] { + port::Thread t1([&] { TEST_SYNC_POINT("DBTest2::GetRaceFlush:1"); ASSERT_OK(Put("foo", "v2")); Flush(); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 5c3cfc906..cbc69f394 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/db_test_util.h" +#include "port/port.h" #include "port/stack_trace.h" #include "util/fault_injection_test_env.h" #include "util/options_helper.h" @@ -86,7 +87,7 @@ TEST_F(DBWALTest, SyncWALNotBlockWrite) { }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::thread thread([&]() { ASSERT_OK(db_->SyncWAL()); }); + rocksdb::port::Thread thread([&]() { ASSERT_OK(db_->SyncWAL()); }); TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:1"); ASSERT_OK(Put("foo2", "bar2")); @@ -118,7 +119,7 @@ TEST_F(DBWALTest, SyncWALNotWaitWrite) { }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::thread thread([&]() { ASSERT_OK(Put("foo2", "bar2")); }); + rocksdb::port::Thread thread([&]() { ASSERT_OK(Put("foo2", "bar2")); }); TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); ASSERT_OK(db_->SyncWAL()); TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 4bb75d075..ef354317b 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -821,7 +821,7 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) { ASSERT_TRUE(s.ok()) << s.ToString(); }; // Write num_files files in parallel - std::vector sst_writer_threads; + std::vector sst_writer_threads; for (int i = 0; i < num_files; ++i) { sst_writer_threads.emplace_back(write_file_func); } @@ -864,7 +864,7 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) { }; // Bulk load num_files files in parallel - std::vector add_file_threads; + std::vector add_file_threads; DestroyAndReopen(options); for (int i = 0; i < num_files; ++i) { add_file_threads.emplace_back(load_file_func); @@ -1108,13 +1108,13 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); // While writing the MANIFEST start a thread that will ask for compaction - std::thread bg_compact([&]() { + rocksdb::port::Thread bg_compact([&]() { ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); }); TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2"); // Start a thread that will ingest a new file - std::thread bg_addfile([&]() { + rocksdb::port::Thread bg_addfile([&]() { file_keys = {1, 2, 3}; ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, 1)); }); @@ -1169,7 +1169,7 @@ TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) { ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id)); }; - std::vector threads; + std::vector threads; while (range_id < 5000) { int range_start = range_id * 10; int range_end = range_start + 10; @@ -1728,7 +1728,7 @@ TEST_F(ExternalSSTFileTest, CompactionDeadlock) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); // Start ingesting and extrnal file in the background - std::thread bg_ingest_file([&]() { + rocksdb::port::Thread bg_ingest_file([&]() { running_threads += 1; ASSERT_OK(GenerateAndAddExternalFile(options, {5, 6})); running_threads -= 1; @@ -1748,7 +1748,7 @@ TEST_F(ExternalSSTFileTest, CompactionDeadlock) { // This thread will try to insert into the memtable but since we have 4 L0 // files this thread will be blocked and hold the writer thread - std::thread bg_block_put([&]() { + rocksdb::port::Thread bg_block_put([&]() { running_threads += 1; ASSERT_OK(Put(Key(10), "memtable")); running_threads -= 1; diff --git a/db/forward_iterator_bench.cc b/db/forward_iterator_bench.cc index cd37ddf3c..bdd402539 100644 --- a/db/forward_iterator_bench.cc +++ b/db/forward_iterator_bench.cc @@ -34,6 +34,7 @@ int main() { return 0; } #include "rocksdb/db.h" #include "rocksdb/status.h" #include "rocksdb/table.h" +#include "port/port.h" #include "util/testharness.h" const int MAX_SHARDS = 100000; @@ -94,7 +95,7 @@ struct Reader { explicit Reader(std::vector* shard_states, rocksdb::DB* db) : shard_states_(shard_states), db_(db) { sem_init(&sem_, 0, 0); - thread_ = std::thread(&Reader::run, this); + thread_ = port::Thread(&Reader::run, this); } void run() { @@ -193,7 +194,7 @@ struct Reader { char pad1[128] __attribute__((__unused__)); std::vector* shard_states_; rocksdb::DB* db_; - std::thread thread_; + rocksdb::port::Thread thread_; sem_t sem_; std::mutex queue_mutex_; std::bitset shards_pending_set_; @@ -206,7 +207,7 @@ struct Writer { explicit Writer(std::vector* shard_states, rocksdb::DB* db) : shard_states_(shard_states), db_(db) {} - void start() { thread_ = std::thread(&Writer::run, this); } + void start() { thread_ = port::Thread(&Writer::run, this); } void run() { std::queue workq; @@ -263,7 +264,7 @@ struct Writer { char pad1[128] __attribute__((__unused__)); std::vector* shard_states_; rocksdb::DB* db_; - std::thread thread_; + rocksdb::port::Thread thread_; char pad2[128] __attribute__((__unused__)); }; @@ -313,7 +314,7 @@ struct StatsThread { rocksdb::DB* db_; std::mutex cvm_; std::condition_variable cv_; - std::thread thread_; + rocksdb::port::Thread thread_; std::atomic done_{false}; }; diff --git a/db/memtablerep_bench.cc b/db/memtablerep_bench.cc index b5875618b..7eaffab91 100644 --- a/db/memtablerep_bench.cc +++ b/db/memtablerep_bench.cc @@ -426,7 +426,7 @@ class Benchmark { virtual ~Benchmark() {} virtual void Run() { std::cout << "Number of threads: " << num_threads_ << std::endl; - std::vector threads; + std::vector threads; uint64_t bytes_written = 0; uint64_t bytes_read = 0; uint64_t read_hits = 0; @@ -457,7 +457,7 @@ class Benchmark { } } - virtual void RunThreads(std::vector* threads, + virtual void RunThreads(std::vector* threads, uint64_t* bytes_written, uint64_t* bytes_read, bool write, uint64_t* read_hits) = 0; @@ -478,7 +478,7 @@ class FillBenchmark : public Benchmark { num_write_ops_per_thread_ = FLAGS_num_operations; } - void RunThreads(std::vector* threads, uint64_t* bytes_written, + void RunThreads(std::vector* threads, uint64_t* bytes_written, uint64_t* bytes_read, bool write, uint64_t* read_hits) override { FillBenchmarkThread(table_, key_gen_, bytes_written, bytes_read, sequence_, @@ -494,7 +494,7 @@ class ReadBenchmark : public Benchmark { num_read_ops_per_thread_ = FLAGS_num_operations / FLAGS_num_threads; } - void RunThreads(std::vector* threads, uint64_t* bytes_written, + void RunThreads(std::vector* threads, uint64_t* bytes_written, uint64_t* bytes_read, bool write, uint64_t* read_hits) override { for (int i = 0; i < FLAGS_num_threads; ++i) { @@ -518,7 +518,7 @@ class SeqReadBenchmark : public Benchmark { num_read_ops_per_thread_ = FLAGS_num_scans; } - void RunThreads(std::vector* threads, uint64_t* bytes_written, + void RunThreads(std::vector* threads, uint64_t* bytes_written, uint64_t* bytes_read, bool write, uint64_t* read_hits) override { for (int i = 0; i < FLAGS_num_threads; ++i) { @@ -545,7 +545,7 @@ class ReadWriteBenchmark : public Benchmark { num_write_ops_per_thread_ = FLAGS_num_operations; } - void RunThreads(std::vector* threads, uint64_t* bytes_written, + void RunThreads(std::vector* threads, uint64_t* bytes_written, uint64_t* bytes_read, bool write, uint64_t* read_hits) override { std::atomic_int threads_done; diff --git a/db/perf_context_test.cc b/db/perf_context_test.cc index 90d724dbe..2df0b79b9 100644 --- a/db/perf_context_test.cc +++ b/db/perf_context_test.cc @@ -12,6 +12,7 @@ #include "rocksdb/memtablerep.h" #include "rocksdb/perf_context.h" #include "rocksdb/slice_transform.h" +#include "port/port.h" #include "util/histogram.h" #include "util/instrumented_mutex.h" #include "util/stop_watch.h" @@ -561,7 +562,7 @@ TEST_F(PerfContextTest, DBMutexLockCounter) { for (int c = 0; c < 2; ++c) { InstrumentedMutex mutex(nullptr, Env::Default(), stats_code[c]); mutex.Lock(); - std::thread child_thread([&] { + rocksdb::port::Thread child_thread([&] { SetPerfLevel(perf_level); perf_context.Reset(); ASSERT_EQ(perf_context.db_mutex_lock_nanos, 0); diff --git a/db/version_builder.cc b/db/version_builder.cc index 09b449a58..541541398 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -28,6 +28,7 @@ #include "db/internal_stats.h" #include "db/table_cache.h" #include "db/version_set.h" +#include "port/port.h" #include "table/table_reader.h" namespace rocksdb { @@ -359,7 +360,7 @@ class VersionBuilder::Rep { if (max_threads <= 1) { load_handlers_func(); } else { - std::vector threads; + std::vector threads; for (int i = 0; i < max_threads; i++) { threads.emplace_back(load_handlers_func); } diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index 8e167b4ba..a59281f12 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -14,6 +14,7 @@ #include "db/write_callback.h" #include "rocksdb/db.h" #include "rocksdb/write_batch.h" +#include "port/port.h" #include "util/logging.h" #include "util/sync_point.h" #include "util/testharness.h" @@ -241,7 +242,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); // do all the writes - std::vector threads; + std::vector threads; for (uint32_t i = 0; i < write_group.size(); i++) { threads.emplace_back(write_with_callback_func); } diff --git a/include/rocksdb/threadpool.h b/include/rocksdb/threadpool.h index 9ef0aad14..7ccef06bb 100644 --- a/include/rocksdb/threadpool.h +++ b/include/rocksdb/threadpool.h @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include + namespace rocksdb { /* @@ -20,6 +22,8 @@ class ThreadPool { virtual ~ThreadPool() {} // Wait for all threads to finish. + // Discard those threads that did not start + // executing virtual void JoinAllThreads() = 0; // Set the number of background threads that will be executing the @@ -28,6 +32,21 @@ class ThreadPool { // Get the number of jobs scheduled in the ThreadPool queue. virtual unsigned int GetQueueLen() const = 0; + + // Waits for all jobs to complete those + // that already started running and those that did not + // start yet. This ensures that everything that was thrown + // on the TP runs even though + // we may not have specified enough threads for the amount + // of jobs + virtual void WaitForJobsAndJoinAllThreads() = 0; + + // Submit a fire and forget jobs + // This allows to submit the same job multiple times + virtual void SubmitJob(const std::function&) = 0; + // This moves the function in for efficiency + virtual void SubmitJob(std::function&&) = 0; + }; // NewThreadPool() is a function that could be used to create a ThreadPool diff --git a/port/port_posix.h b/port/port_posix.h index bccecfdcc..05e34aca4 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -11,6 +11,7 @@ #pragma once +#include // size_t printf formatting named in the manner of C99 standard formatting // strings such as PRIu64 // in fact, we could use that one @@ -156,6 +157,8 @@ class CondVar { Mutex* mu_; }; +using Thread = std::thread; + static inline void AsmVolatilePause() { #if defined(__i386__) || defined(__x86_64__) asm volatile("pause"); diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 62cc27c82..ced56f7fe 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "port/win/env_win.h" +#include "port/win/win_thread.h" #include #include #include @@ -837,7 +838,7 @@ void WinEnvThreads::StartThread(void(*function)(void* arg), void* arg) { state->arg = arg; try { - std::thread th(&StartThreadWrapper, state.get()); + rocksdb::port::WindowsThread th(&StartThreadWrapper, state.get()); state.release(); std::lock_guard lg(mu_); diff --git a/port/win/env_win.h b/port/win/env_win.h index 5a6224865..e0e283af4 100644 --- a/port/win/env_win.h +++ b/port/win/env_win.h @@ -16,11 +16,21 @@ #pragma once +#include "port/win/win_thread.h" #include #include "util/threadpool_imp.h" +#include +#include + #include #include +#include + + +#undef GetCurrentTime +#undef DeleteFile +#undef GetTickCount namespace rocksdb { namespace port { @@ -64,7 +74,7 @@ private: Env* hosted_env_; mutable std::mutex mu_; std::vector thread_pools_; - std::vector threads_to_join_; + std::vector threads_to_join_; }; @@ -281,5 +291,5 @@ private: WinEnvThreads winenv_threads_; }; -} -} +} // namespace port +} // namespace rocksdb diff --git a/port/win/port_win.h b/port/win/port_win.h index 54f10a24c..a3860cde7 100644 --- a/port/win/port_win.h +++ b/port/win/port_win.h @@ -30,6 +30,8 @@ #include +#include "port/win/win_thread.h" + #include "rocksdb/options.h" #undef min @@ -206,6 +208,9 @@ class CondVar { Mutex* mu_; }; +// Wrapper around the platform efficient +// or otherwise preferrable implementation +using Thread = WindowsThread; // OnceInit type helps emulate // Posix semantics with initialization diff --git a/port/win/win_thread.cc b/port/win/win_thread.cc new file mode 100644 index 000000000..987950a65 --- /dev/null +++ b/port/win/win_thread.cc @@ -0,0 +1,166 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "port/win/win_thread.h" + +#include +#include // __beginthreadex +#include + +#include +#include +#include + +namespace rocksdb { +namespace port { + +struct WindowsThread::Data { + + std::function func_; + uintptr_t handle_; + + Data(std::function&& func) : + func_(std::move(func)), + handle_(0) { + } + + Data(const Data&) = delete; + Data& operator=(const Data&) = delete; + + static unsigned int __stdcall ThreadProc(void* arg); +}; + + +void WindowsThread::Init(std::function&& func) { + + data_.reset(new Data(std::move(func))); + + data_->handle_ = _beginthreadex(NULL, + 0, // stack size + &Data::ThreadProc, + data_.get(), + 0, // init flag + &th_id_); + + if (data_->handle_ == 0) { + throw std::system_error(std::make_error_code( + std::errc::resource_unavailable_try_again), + "Unable to create a thread"); + } +} + +WindowsThread::WindowsThread() : + data_(nullptr), + th_id_(0) +{} + + +WindowsThread::~WindowsThread() { + // Must be joined or detached + // before destruction. + // This is the same as std::thread + if (data_) { + if (joinable()) { + assert(false); + std::terminate(); + } + data_.reset(); + } +} + +WindowsThread::WindowsThread(WindowsThread&& o) noexcept : + WindowsThread() { + *this = std::move(o); +} + +WindowsThread& WindowsThread::operator=(WindowsThread&& o) noexcept { + + if (joinable()) { + assert(false); + std::terminate(); + } + + data_ = std::move(o.data_); + + // Per spec both instances will have the same id + th_id_ = o.th_id_; + + return *this; +} + +bool WindowsThread::joinable() const { + return (data_ && data_->handle_ != 0); +} + +WindowsThread::native_handle_type WindowsThread::native_handle() const { + return reinterpret_cast(data_->handle_); +} + +unsigned WindowsThread::hardware_concurrency() { + return std::thread::hardware_concurrency(); +} + +void WindowsThread::join() { + + if (!joinable()) { + assert(false); + throw std::system_error( + std::make_error_code(std::errc::invalid_argument), + "Thread is no longer joinable"); + } + + if (GetThreadId(GetCurrentThread()) == th_id_) { + assert(false); + throw std::system_error( + std::make_error_code(std::errc::resource_deadlock_would_occur), + "Can not join itself"); + } + + auto ret = WaitForSingleObject(reinterpret_cast(data_->handle_), + INFINITE); + if (ret != WAIT_OBJECT_0) { + auto lastError = GetLastError(); + assert(false); + throw std::system_error(static_cast(lastError), + std::system_category(), + "WaitForSingleObjectFailed"); + } + + CloseHandle(reinterpret_cast(data_->handle_)); + data_->handle_ = 0; +} + +bool WindowsThread::detach() { + + if (!joinable()) { + assert(false); + throw std::system_error( + std::make_error_code(std::errc::invalid_argument), + "Thread is no longer available"); + } + + BOOL ret = CloseHandle(reinterpret_cast(data_->handle_)); + data_->handle_ = 0; + + return (ret == TRUE); +} + +void WindowsThread::swap(WindowsThread& o) { + data_.swap(o.data_); + std::swap(th_id_, o.th_id_); +} + +unsigned int __stdcall WindowsThread::Data::ThreadProc(void* arg) { + auto data = reinterpret_cast(arg); + data->func_(); + _endthreadex(0); + return 0; +} +} // namespace port +} // namespace rocksdb diff --git a/port/win/win_thread.h b/port/win/win_thread.h new file mode 100644 index 000000000..80b019e6d --- /dev/null +++ b/port/win/win_thread.h @@ -0,0 +1,121 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include +#include +#include + +namespace rocksdb { +namespace port { + +// This class is a replacement for std::thread +// 2 reasons we do not like std::thread: +// -- is that it dynamically allocates its internals that are automatically +// freed when the thread terminates and not on the destruction of the +// object. This makes it difficult to control the source of memory +// allocation +// - This implements Pimpl so we can easily replace the guts of the +// object in our private version if necessary. +class WindowsThread { + + struct Data; + + std::unique_ptr data_; + unsigned int th_id_; + + void Init(std::function&&); + +public: + + typedef void* native_handle_type; + + // Construct with no thread + WindowsThread(); + + // Template constructor + // + // This templated constructor accomplishes several things + // + // - Allows the class as whole to be not a template + // + // - take "universal" references to support both _lvalues and _rvalues + // + // - because this constructor is a catchall case in many respects it + // may prevent us from using both the default __ctor, the move __ctor. + // Also it may circumvent copy __ctor deletion. To work around this + // we make sure this one has at least one argument and eliminate + // it from the overload selection when WindowsThread is the first + // argument. + // + // - construct with Fx(Ax...) with a variable number of types/arguments. + // + // - Gathers together the callable object with its arguments and constructs + // a single callable entity + // + // - Makes use of std::function to convert it to a specification-template + // dependent type that both checks the signature conformance to ensure + // that all of the necessary arguments are provided and allows pimpl + // implementation. + template::type, + WindowsThread>::value>::type> + explicit WindowsThread(Fn&& fx, Args&&... ax) : + WindowsThread() { + + // Use binder to create a single callable entity + auto binder = std::bind(std::forward(fx), + std::forward(ax)...); + // Use std::function to take advantage of the type erasure + // so we can still hide implementation within pimpl + // This also makes sure that the binder signature is compliant + std::function target = binder; + + Init(std::move(target)); + } + + + ~WindowsThread(); + + WindowsThread(const WindowsThread&) = delete; + + WindowsThread& operator=(const WindowsThread&) = delete; + + WindowsThread(WindowsThread&&) noexcept; + + WindowsThread& operator=(WindowsThread&&) noexcept; + + bool joinable() const; + + unsigned int get_id() const { return th_id_; } + + native_handle_type native_handle() const; + + static unsigned hardware_concurrency(); + + void join(); + + bool detach(); + + void swap(WindowsThread&); +}; +} // namespace port +} // namespace rocksdb + +namespace std { + inline + void swap(rocksdb::port::WindowsThread& th1, + rocksdb::port::WindowsThread& th2) { + th1.swap(th2); + } +} // namespace std + diff --git a/table/table_test.cc b/table/table_test.cc index 841fd9e02..227bd9647 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -21,6 +21,7 @@ #include "db/memtable.h" #include "db/write_batch_internal.h" #include "memtable/stl_wrappers.h" +#include "port/port.h" #include "rocksdb/cache.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -2204,8 +2205,8 @@ TEST_F(BlockBasedTableTest, NewIndexIteratorLeak) { std::unique_ptr iter(reader->NewIterator(ro)); }; - auto thread1 = std::thread(func1); - auto thread2 = std::thread(func2); + auto thread1 = port::Thread(func1); + auto thread2 = port::Thread(func2); thread1.join(); thread2.join(); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index b2b52a2a5..571747802 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1213,7 +1213,7 @@ class ReporterAgent { abort(); } - reporting_thread_ = std::thread([&]() { SleepAndReport(); }); + reporting_thread_ = port::Thread([&]() { SleepAndReport(); }); } ~ReporterAgent() { @@ -1273,7 +1273,7 @@ class ReporterAgent { std::atomic total_ops_done_; int64_t last_report_; const uint64_t report_interval_secs_; - std::thread reporting_thread_; + rocksdb::port::Thread reporting_thread_; std::mutex mutex_; // will notify on stop std::condition_variable stop_cv_; diff --git a/tools/write_stress.cc b/tools/write_stress.cc index c2cbec4f4..16aaf108a 100644 --- a/tools/write_stress.cc +++ b/tools/write_stress.cc @@ -66,12 +66,13 @@ int main() { #include #include +#include "db/filename.h" +#include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" -#include "db/filename.h" using GFLAGS::ParseCommandLineFlags; using GFLAGS::RegisterFlagValidator; @@ -290,7 +291,7 @@ class WriteStress { // frequently than the first one. std::atomic key_prefix_[kPrefixSize]; std::atomic stop_; - std::vector threads_; + std::vector threads_; std::unique_ptr db_; }; diff --git a/util/delete_scheduler.cc b/util/delete_scheduler.cc index e4779ad05..91387e345 100644 --- a/util/delete_scheduler.cc +++ b/util/delete_scheduler.cc @@ -34,7 +34,7 @@ DeleteScheduler::DeleteScheduler(Env* env, const std::string& trash_dir, bg_thread_.reset(); } else { bg_thread_.reset( - new std::thread(&DeleteScheduler::BackgroundEmptyTrash, this)); + new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this)); } } diff --git a/util/delete_scheduler.h b/util/delete_scheduler.h index 6b3730330..878a117e5 100644 --- a/util/delete_scheduler.h +++ b/util/delete_scheduler.h @@ -81,7 +81,7 @@ class DeleteScheduler { // - closing_ value is set to true InstrumentedCondVar cv_; // Background thread running BackgroundEmptyTrash - std::unique_ptr bg_thread_; + std::unique_ptr bg_thread_; // Mutex to protect threads from file name conflicts InstrumentedMutex file_move_mu_; Logger* info_log_; diff --git a/util/delete_scheduler_test.cc b/util/delete_scheduler_test.cc index b1b5fae8a..087ba38af 100644 --- a/util/delete_scheduler_test.cc +++ b/util/delete_scheduler_test.cc @@ -183,7 +183,7 @@ TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) { // Delete dummy files using 10 threads and measure time spent to empty trash std::atomic thread_num(0); - std::vector threads; + std::vector threads; std::function delete_thread = [&]() { int idx = thread_num.fetch_add(1); int range_start = idx * num_files; diff --git a/util/dynamic_bloom_test.cc b/util/dynamic_bloom_test.cc index fcce2a6d7..273ccdca6 100644 --- a/util/dynamic_bloom_test.cc +++ b/util/dynamic_bloom_test.cc @@ -245,7 +245,7 @@ TEST_F(DynamicBloomTest, concurrent_with_perf) { uint32_t locality_limit = FLAGS_enable_perf ? 1 : 0; uint32_t num_threads = 4; - std::vector threads; + std::vector threads; for (uint32_t m = 1; m <= m_limit; ++m) { for (uint32_t locality = 0; locality <= locality_limit; ++locality) { diff --git a/util/env_posix.cc b/util/env_posix.cc index 6a87a2deb..0103e9ed1 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -39,6 +39,8 @@ #include #include #include + +#include "rocksdb/options.h" #include "port/port.h" #include "rocksdb/slice.h" #include "util/coding.h" diff --git a/util/thread_local_test.cc b/util/thread_local_test.cc index 904628b5d..6d02df3c4 100644 --- a/util/thread_local_test.cc +++ b/util/thread_local_test.cc @@ -563,7 +563,7 @@ TEST_F(ThreadLocalTest, DISABLED_MainThreadDiesFirst) { #ifndef ROCKSDB_LITE try { #endif // ROCKSDB_LITE - std::thread th(&AccessThreadLocal, nullptr); + rocksdb::port::Thread th(&AccessThreadLocal, nullptr); th.detach(); TEST_SYNC_POINT("MainThreadDiesFirst:End"); #ifndef ROCKSDB_LITE diff --git a/util/threadpool_imp.cc b/util/threadpool_imp.cc index 0c4ab4fc7..c05c54101 100644 --- a/util/threadpool_imp.cc +++ b/util/threadpool_imp.cc @@ -8,8 +8,9 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "util/threadpool_imp.h" -#include -#include + +#include "port/port.h" +#include "util/thread_status_util.h" #ifndef OS_WIN # include @@ -19,10 +20,12 @@ # include #endif -#ifdef OS_FREEBSD -# include -#endif - +#include +#include +#include +#include +#include +#include namespace rocksdb { @@ -33,151 +36,154 @@ void ThreadPoolImpl::PthreadCall(const char* label, int result) { } } -namespace { -#ifdef ROCKSDB_STD_THREADPOOL +struct ThreadPoolImpl::Impl { -struct Lock { - std::unique_lock ul_; - explicit Lock(std::mutex& m) : ul_(m, std::defer_lock) {} -}; + Impl(); + ~Impl(); -using Condition = std::condition_variable; + void JoinThreads(bool wait_for_jobs_to_complete); -inline int ThreadPoolMutexLock(Lock& mutex) { - mutex.ul_.lock(); - return 0; -} + void SetBackgroundThreadsInternal(int num, bool allow_reduce); -inline -int ConditionWait(Condition& condition, Lock& lock) { - condition.wait(lock.ul_); - return 0; -} + unsigned int GetQueueLen() const { + return queue_len_.load(std::memory_order_relaxed); + } -inline -int ConditionSignalAll(Condition& condition) { - condition.notify_all(); - return 0; -} + void LowerIOPriority(); -inline -int ConditionSignal(Condition& condition) { - condition.notify_one(); - return 0; -} + void WakeUpAllThreads() { + bgsignal_.notify_all(); + } -inline -int MutexUnlock(Lock& mutex) { - mutex.ul_.unlock(); - return 0; -} + void BGThread(size_t thread_id); -inline -void ThreadJoin(std::thread& thread) { - thread.join(); -} + void StartBGThreads(); -inline -int ThreadDetach(std::thread& thread) { - thread.detach(); - return 0; -} + void Submit(std::function&& schedule, + std::function&& unschedule, void* tag); -#else + int UnSchedule(void* arg); -using Lock = pthread_mutex_t&; -using Condition = pthread_cond_t&; + void SetHostEnv(Env* env) { env_ = env; } -inline int ThreadPoolMutexLock(Lock mutex) { - return pthread_mutex_lock(&mutex); -} + Env* GetHostEnv() const { return env_; } -inline -int ConditionWait(Condition condition, Lock lock) { - return pthread_cond_wait(&condition, &lock); -} + bool HasExcessiveThread() const { + return static_cast(bgthreads_.size()) > total_threads_limit_; + } -inline -int ConditionSignalAll(Condition condition) { - return pthread_cond_broadcast(&condition); -} + // Return true iff the current thread is the excessive thread to terminate. + // Always terminate the running thread that is added last, even if there are + // more than one thread to terminate. + bool IsLastExcessiveThread(size_t thread_id) const { + return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; + } -inline -int ConditionSignal(Condition condition) { - return pthread_cond_signal(&condition); -} + bool IsExcessiveThread(size_t thread_id) const { + return static_cast(thread_id) >= total_threads_limit_; + } -inline -int MutexUnlock(Lock mutex) { - return pthread_mutex_unlock(&mutex); -} + // Return the thread priority. + // This would allow its member-thread to know its priority. + Env::Priority GetThreadPriority() const { return priority_; } -inline -void ThreadJoin(pthread_t& thread) { - pthread_join(thread, nullptr); -} + // Set the thread priority. + void SetThreadPriority(Env::Priority priority) { priority_ = priority; } -inline -int ThreadDetach(pthread_t& thread) { - return pthread_detach(thread); -} -#endif -} +private: -ThreadPoolImpl::ThreadPoolImpl() - : total_threads_limit_(1), - bgthreads_(0), - queue_(), + static void* BGThreadWrapper(void* arg); + + bool low_io_priority_; + Env::Priority priority_; + Env* env_; + + int total_threads_limit_; + std::atomic_uint queue_len_; // Queue length. Used for stats reporting + bool exit_all_threads_; + bool wait_for_jobs_to_complete_; + + // Entry per Schedule()/Submit() call + struct BGItem { + void* tag = nullptr; + std::function function; + std::function unschedFunction; + }; + + using BGQueue = std::deque; + BGQueue queue_; + + std::mutex mu_; + std::condition_variable bgsignal_; + std::vector bgthreads_; +}; + + +inline +ThreadPoolImpl::Impl::Impl() + : + low_io_priority_(false), + priority_(Env::LOW), + env_(nullptr), + total_threads_limit_(1), queue_len_(), exit_all_threads_(false), - low_io_priority_(false), - env_(nullptr) { -#ifndef ROCKSDB_STD_THREADPOOL - PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); - PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); -#endif + wait_for_jobs_to_complete_(false), + queue_(), + mu_(), + bgsignal_(), + bgthreads_() { } -ThreadPoolImpl::~ThreadPoolImpl() { assert(bgthreads_.size() == 0U); } +inline +ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); } -void ThreadPoolImpl::JoinAllThreads() { - Lock lock(mu_); - PthreadCall("lock", ThreadPoolMutexLock(lock)); +void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) { + + std::unique_lock lock(mu_); assert(!exit_all_threads_); + + wait_for_jobs_to_complete_ = wait_for_jobs_to_complete; exit_all_threads_ = true; - PthreadCall("signalall", ConditionSignalAll(bgsignal_)); - PthreadCall("unlock", MutexUnlock(lock)); + + lock.unlock(); + + bgsignal_.notify_all(); for (auto& th : bgthreads_) { - ThreadJoin(th); + th.join(); } bgthreads_.clear(); + + exit_all_threads_ = false; + wait_for_jobs_to_complete_ = false; } -void ThreadPoolImpl::LowerIOPriority() { -#ifdef OS_LINUX - PthreadCall("lock", pthread_mutex_lock(&mu_)); +inline +void ThreadPoolImpl::Impl::LowerIOPriority() { + std::lock_guard lock(mu_); low_io_priority_ = true; - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); -#endif } -void ThreadPoolImpl::BGThread(size_t thread_id) { + +void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { bool low_io_priority = false; while (true) { // Wait until there is an item that is ready to run - Lock uniqueLock(mu_); - PthreadCall("lock", ThreadPoolMutexLock(uniqueLock)); + std::unique_lock lock(mu_); // Stop waiting if the thread needs to do work or needs to terminate. while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && (queue_.empty() || IsExcessiveThread(thread_id))) { - PthreadCall("wait", ConditionWait(bgsignal_, uniqueLock)); + bgsignal_.wait(lock); } if (exit_all_threads_) { // mechanism to let BG threads exit safely - PthreadCall("unlock", MutexUnlock(uniqueLock)); - break; + + if(!wait_for_jobs_to_complete_ || + queue_.empty()) { + break; + } } if (IsLastExcessiveThread(thread_id)) { @@ -185,24 +191,24 @@ void ThreadPoolImpl::BGThread(size_t thread_id) { // We always terminate excessive thread in the reverse order of // generation time. auto& terminating_thread = bgthreads_.back(); - PthreadCall("detach", ThreadDetach(terminating_thread)); + terminating_thread.detach(); bgthreads_.pop_back(); + if (HasExcessiveThread()) { // There is still at least more excessive thread to terminate. WakeUpAllThreads(); } - PthreadCall("unlock", MutexUnlock(uniqueLock)); break; } - void (*function)(void*) = queue_.front().function; - void* arg = queue_.front().arg; + auto func = std::move(queue_.front().function); queue_.pop_front(); + queue_len_.store(static_cast(queue_.size()), std::memory_order_relaxed); bool decrease_io_priority = (low_io_priority != low_io_priority_); - PthreadCall("unlock", MutexUnlock(uniqueLock)); + lock.unlock(); #ifdef OS_LINUX if (decrease_io_priority) { @@ -226,22 +232,22 @@ void ThreadPoolImpl::BGThread(size_t thread_id) { #else (void)decrease_io_priority; // avoid 'unused variable' error #endif - (*function)(arg); + func(); } } // Helper struct for passing arguments when creating threads. struct BGThreadMetadata { - ThreadPoolImpl* thread_pool_; + ThreadPoolImpl::Impl* thread_pool_; size_t thread_id_; // Thread count in the thread. - BGThreadMetadata(ThreadPoolImpl* thread_pool, size_t thread_id) + BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id) : thread_pool_(thread_pool), thread_id_(thread_id) {} }; -static void* BGThreadWrapper(void* arg) { +void* ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) { BGThreadMetadata* meta = reinterpret_cast(arg); size_t thread_id = meta->thread_id_; - ThreadPoolImpl* tp = meta->thread_pool_; + ThreadPoolImpl::Impl* tp = meta->thread_pool_; #ifdef ROCKSDB_USING_THREAD_STATUS // for thread-status ThreadStatusUtil::RegisterThread( @@ -257,15 +263,11 @@ static void* BGThreadWrapper(void* arg) { return nullptr; } -void ThreadPoolImpl::WakeUpAllThreads() { - PthreadCall("signalall", ConditionSignalAll(bgsignal_)); -} - -void ThreadPoolImpl::SetBackgroundThreadsInternal(int num, bool allow_reduce) { - Lock lock(mu_); - PthreadCall("lock", ThreadPoolMutexLock(lock)); +void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num, + bool allow_reduce) { + std::unique_lock lock(mu_); if (exit_all_threads_) { - PthreadCall("unlock", MutexUnlock(lock)); + lock.unlock(); return; } if (num > total_threads_limit_ || @@ -274,51 +276,36 @@ void ThreadPoolImpl::SetBackgroundThreadsInternal(int num, bool allow_reduce) { WakeUpAllThreads(); StartBGThreads(); } - PthreadCall("unlock", MutexUnlock(lock)); -} - -void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) { - SetBackgroundThreadsInternal(num, false); -} - -void ThreadPoolImpl::SetBackgroundThreads(int num) { - SetBackgroundThreadsInternal(num, true); } -void ThreadPoolImpl::StartBGThreads() { +void ThreadPoolImpl::Impl::StartBGThreads() { // Start background thread if necessary while ((int)bgthreads_.size() < total_threads_limit_) { -#ifdef ROCKSDB_STD_THREADPOOL - std::thread p_t(&BGThreadWrapper, + + port::Thread p_t(&BGThreadWrapper, new BGThreadMetadata(this, bgthreads_.size())); - bgthreads_.push_back(std::move(p_t)); -#else - pthread_t t; - PthreadCall("create thread", - pthread_create(&t, nullptr, &BGThreadWrapper, - new BGThreadMetadata(this, bgthreads_.size()))); + // Set the thread name to aid debugging #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) #if __GLIBC_PREREQ(2, 12) + auto th_handle = p_t.native_handle(); char name_buf[16]; snprintf(name_buf, sizeof name_buf, "rocksdb:bg%" ROCKSDB_PRIszt, bgthreads_.size()); name_buf[sizeof name_buf - 1] = '\0'; - pthread_setname_np(t, name_buf); -#endif + pthread_setname_np(th_handle, name_buf); #endif - bgthreads_.push_back(t); #endif + bgthreads_.push_back(std::move(p_t)); } } -void ThreadPoolImpl::Schedule(void (*function)(void* arg1), void* arg, - void* tag, void (*unschedFunction)(void* arg)) { - Lock lock(mu_); - PthreadCall("lock", ThreadPoolMutexLock(lock)); +void ThreadPoolImpl::Impl::Submit(std::function&& schedule, + std::function&& unschedule, void* tag) { + + std::lock_guard lock(mu_); if (exit_all_threads_) { - PthreadCall("unlock", MutexUnlock(lock)); return; } @@ -326,52 +313,133 @@ void ThreadPoolImpl::Schedule(void (*function)(void* arg1), void* arg, // Add to priority queue queue_.push_back(BGItem()); - queue_.back().function = function; - queue_.back().arg = arg; - queue_.back().tag = tag; - queue_.back().unschedFunction = unschedFunction; + + auto& item = queue_.back(); + item.tag = tag; + item.function = std::move(schedule); + item.unschedFunction = std::move(unschedule); + queue_len_.store(static_cast(queue_.size()), - std::memory_order_relaxed); + std::memory_order_relaxed); if (!HasExcessiveThread()) { // Wake up at least one waiting thread. - PthreadCall("signal", ConditionSignal(bgsignal_)); + bgsignal_.notify_one(); } else { // Need to wake up all threads to make sure the one woken // up is not the one to terminate. WakeUpAllThreads(); } - - PthreadCall("unlock", MutexUnlock(lock)); } -int ThreadPoolImpl::UnSchedule(void* arg) { +int ThreadPoolImpl::Impl::UnSchedule(void* arg) { int count = 0; - Lock lock(mu_); - PthreadCall("lock", ThreadPoolMutexLock(lock)); - - // Remove from priority queue - BGQueue::iterator it = queue_.begin(); - while (it != queue_.end()) { - if (arg == (*it).tag) { - void (*unschedFunction)(void*) = (*it).unschedFunction; - void* arg1 = (*it).arg; - if (unschedFunction != nullptr) { - (*unschedFunction)(arg1); + std::vector> candidates; + { + std::lock_guard lock(mu_); + + // Remove from priority queue + BGQueue::iterator it = queue_.begin(); + while (it != queue_.end()) { + if (arg == (*it).tag) { + if (it->unschedFunction) { + candidates.push_back(std::move(it->unschedFunction)); + } + it = queue_.erase(it); + count++; + } else { + ++it; } - it = queue_.erase(it); - count++; - } else { - ++it; } + queue_len_.store(static_cast(queue_.size()), + std::memory_order_relaxed); } - queue_len_.store(static_cast(queue_.size()), - std::memory_order_relaxed); - PthreadCall("unlock", MutexUnlock(lock)); + + + // Run unschedule functions outside the mutex + for (auto& f : candidates) { + f(); + } + return count; } +ThreadPoolImpl::ThreadPoolImpl() : + impl_(new Impl()) { +} + + +ThreadPoolImpl::~ThreadPoolImpl() { +} + +void ThreadPoolImpl::JoinAllThreads() { + impl_->JoinThreads(false); +} + +void ThreadPoolImpl::SetBackgroundThreads(int num) { + impl_->SetBackgroundThreadsInternal(num, true); +} + +unsigned int ThreadPoolImpl::GetQueueLen() const { + return impl_->GetQueueLen(); +} + +void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() { + impl_->JoinThreads(true); +} + +void ThreadPoolImpl::LowerIOPriority() { + impl_->LowerIOPriority(); +} + +void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) { + impl_->SetBackgroundThreadsInternal(num, false); +} + +void ThreadPoolImpl::SubmitJob(const std::function& job) { + auto copy(job); + impl_->Submit(std::move(copy), std::function(), nullptr); +} + + +void ThreadPoolImpl::SubmitJob(std::function&& job) { + impl_->Submit(std::move(job), std::function(), nullptr); +} + +void ThreadPoolImpl::Schedule(void(*function)(void* arg1), void* arg, + void* tag, void(*unschedFunction)(void* arg)) { + + std::function fn = [arg, function] { function(arg); }; + + std::function unfn; + if (unschedFunction != nullptr) { + auto uf = [arg, unschedFunction] { unschedFunction(arg); }; + unfn = std::move(uf); + } + + impl_->Submit(std::move(fn), std::move(unfn), tag); +} + +int ThreadPoolImpl::UnSchedule(void* arg) { + return impl_->UnSchedule(arg); +} + +void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); } + +Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); } + +// Return the thread priority. +// This would allow its member-thread to know its priority. +Env::Priority ThreadPoolImpl::GetThreadPriority() const { + return impl_->GetThreadPriority(); +} + +// Set the thread priority. +void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) { + impl_->SetThreadPriority(priority); +} + ThreadPool* NewThreadPool(int num_threads) { ThreadPoolImpl* thread_pool = new ThreadPoolImpl(); thread_pool->SetBackgroundThreads(num_threads); diff --git a/util/threadpool_imp.h b/util/threadpool_imp.h index ce1589a51..b232386df 100644 --- a/util/threadpool_imp.h +++ b/util/threadpool_imp.h @@ -8,104 +8,100 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once -#ifdef OS_WIN -# define ROCKSDB_STD_THREADPOOL -#endif - -#include "rocksdb/env.h" #include "rocksdb/threadpool.h" -#include "util/thread_status_util.h" - -#ifdef ROCKSDB_STD_THREADPOOL -# include -# include -# include -#endif +#include "rocksdb/env.h" -#include -#include +#include +#include namespace rocksdb { + class ThreadPoolImpl : public ThreadPool { public: ThreadPoolImpl(); ~ThreadPoolImpl(); + ThreadPoolImpl(ThreadPoolImpl&&) = delete; + ThreadPoolImpl& operator=(ThreadPoolImpl&&) = delete; + + // Implement ThreadPool interfaces + + // Wait for all threads to finish. + // Discards all the the jobs that did not + // start executing and waits for those running + // to complete void JoinAllThreads() override; + + // Set the number of background threads that will be executing the + // scheduled jobs. + void SetBackgroundThreads(int num) override; + // Get the number of jobs scheduled in the ThreadPool queue. + unsigned int GetQueueLen() const override; + + // Waits for all jobs to complete those + // that already started running and those that did not + // start yet + void WaitForJobsAndJoinAllThreads() override; + + // Make threads to run at a lower kernel priority + // Currently only has effect on Linux void LowerIOPriority(); - void BGThread(size_t thread_id); - void WakeUpAllThreads(); + + // Ensure there is at aleast num threads in the pool + // but do not kill threads if there are more void IncBackgroundThreadsIfNeeded(int num); - void SetBackgroundThreads(int num) override; - void StartBGThreads(); - void Schedule(void (*function)(void* arg1), void* arg, void* tag, - void (*unschedFunction)(void* arg)); - int UnSchedule(void* arg); - unsigned int GetQueueLen() const override { - return queue_len_.load(std::memory_order_relaxed); - } + // Submit a fire and forget job + // These jobs can not be unscheduled - void SetHostEnv(Env* env) { env_ = env; } - Env* GetHostEnv() const { return env_; } + // This allows to submit the same job multiple times + void SubmitJob(const std::function&) override; + // This moves the function in for efficiency + void SubmitJob(std::function&&) override; - // Return true if there is at least one thread needs to terminate. - bool HasExcessiveThread() const { - return static_cast(bgthreads_.size()) > total_threads_limit_; - } + // Schedule a job with an unschedule tag and unschedule function + // Can be used to filter and unschedule jobs by a tag + // that are still in the queue and did not start running + void Schedule(void (*function)(void* arg1), void* arg, void* tag, + void (*unschedFunction)(void* arg)); + + // Filter jobs that are still in a queue and match + // the given tag. Remove them from a queue if any + // and for each such job execute an unschedule function + // if such was given at scheduling time. + int UnSchedule(void* tag); - // Return true iff the current thread is the excessive thread to terminate. - // Always terminate the running thread that is added last, even if there are - // more than one thread to terminate. - bool IsLastExcessiveThread(size_t thread_id) const { - return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; - } + void SetHostEnv(Env* env); - // Is one of the threads to terminate. - bool IsExcessiveThread(size_t thread_id) const { - return static_cast(thread_id) >= total_threads_limit_; - } + Env* GetHostEnv() const; // Return the thread priority. // This would allow its member-thread to know its priority. - Env::Priority GetThreadPriority() const { return priority_; } + Env::Priority GetThreadPriority() const; // Set the thread priority. - void SetThreadPriority(Env::Priority priority) { priority_ = priority; } + void SetThreadPriority(Env::Priority priority); static void PthreadCall(const char* label, int result); + struct Impl; + private: - // Entry per Schedule() call - struct BGItem { - void* arg; - void (*function)(void*); - void* tag; - void (*unschedFunction)(void*); - }; - - typedef std::deque BGQueue; - - int total_threads_limit_; - -#ifdef ROCKSDB_STD_THREADPOOL - std::mutex mu_; - std::condition_variable bgsignal_; - std::vector bgthreads_; -#else - pthread_mutex_t mu_; - pthread_cond_t bgsignal_; - std::vector bgthreads_; -#endif - BGQueue queue_; - std::atomic_uint queue_len_; // Queue length. Used for stats reporting - bool exit_all_threads_; - bool low_io_priority_; - Env::Priority priority_; - Env* env_; - - void SetBackgroundThreadsInternal(int num, bool allow_reduce); + + // Current public virtual interface does not provide usable + // functionality and thus can not be used internally to + // facade different implementations. + // + // We propose a pimpl idiom in order to easily replace the thread pool impl + // w/o touching the header file but providing a different .cc potentially + // CMake option driven. + // + // Another option is to introduce a Env::MakeThreadPool() virtual interface + // and override the environment. This would require refactoring ThreadPool usage. + // + // We can also combine these two approaches + std::unique_ptr impl_; }; } // namespace rocksdb diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index f4c0954ac..1fc98e0db 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -425,7 +425,7 @@ class BackupEngineImpl : public BackupEngine { bool initialized_; std::mutex byte_report_mutex_; channel files_to_copy_or_create_; - std::vector threads_; + std::vector threads_; // Adds a file to the backup work queue to be copied or created if it doesn't // already exist. diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 419570dd5..a9c756c94 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -1326,7 +1326,7 @@ TEST_F(BackupableDBTest, ChangeManifestDuringBackupCreation) { }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::thread flush_thread{[this]() { ASSERT_OK(db_->Flush(FlushOptions())); }}; + rocksdb::port::Thread flush_thread{[this]() { ASSERT_OK(db_->Flush(FlushOptions())); }}; ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); diff --git a/utilities/checkpoint/checkpoint_test.cc b/utilities/checkpoint/checkpoint_test.cc index 09a6d89de..72e7df9be 100644 --- a/utilities/checkpoint/checkpoint_test.cc +++ b/utilities/checkpoint/checkpoint_test.cc @@ -18,6 +18,7 @@ #include #include "db/db_impl.h" #include "port/stack_trace.h" +#include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/utilities/checkpoint.h" @@ -298,7 +299,7 @@ TEST_F(CheckpointTest, CheckpointCF) { Status s; // Take a snapshot - std::thread t([&]() { + rocksdb::port::Thread t([&]() { Checkpoint* checkpoint; ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name)); @@ -368,7 +369,7 @@ TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing) { "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}}); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::thread t([&]() { + rocksdb::port::Thread t([&]() { Checkpoint* checkpoint; ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); ASSERT_OK(checkpoint->CreateCheckpoint(kSnapshotName)); @@ -451,7 +452,7 @@ TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing2PC) { {"CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit", "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}}); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - std::thread t([&]() { + rocksdb::port::Thread t([&]() { Checkpoint* checkpoint; ASSERT_OK(Checkpoint::Create(txdb, &checkpoint)); ASSERT_OK(checkpoint->CreateCheckpoint(kSnapshotName)); diff --git a/utilities/persistent_cache/block_cache_tier.cc b/utilities/persistent_cache/block_cache_tier.cc index 585e61051..5506514d7 100644 --- a/utilities/persistent_cache/block_cache_tier.cc +++ b/utilities/persistent_cache/block_cache_tier.cc @@ -10,6 +10,7 @@ #include #include +#include "port/port.h" #include "util/stop_watch.h" #include "util/sync_point.h" #include "utilities/persistent_cache/block_cache_tier_file.h" @@ -68,7 +69,7 @@ Status BlockCacheTier::Open() { if (opt_.pipeline_writes) { assert(!insert_th_.joinable()); - insert_th_ = std::thread(&BlockCacheTier::InsertMain, this); + insert_th_ = port::Thread(&BlockCacheTier::InsertMain, this); } return Status::OK(); diff --git a/utilities/persistent_cache/block_cache_tier.h b/utilities/persistent_cache/block_cache_tier.h index dece806fc..900f43fdb 100644 --- a/utilities/persistent_cache/block_cache_tier.h +++ b/utilities/persistent_cache/block_cache_tier.h @@ -138,7 +138,7 @@ class BlockCacheTier : public PersistentCacheTier { port::RWMutex lock_; // Synchronization const PersistentCacheConfig opt_; // BlockCache options BoundedQueue insert_ops_; // Ops waiting for insert - std::thread insert_th_; // Insert thread + rocksdb::port::Thread insert_th_; // Insert thread uint32_t writer_cache_id_ = 0; // Current cache file identifier WriteableCacheFile* cache_file_ = nullptr; // Current cache file reference CacheWriteBufferAllocator buffer_allocator_; // Buffer provider diff --git a/utilities/persistent_cache/block_cache_tier_file.cc b/utilities/persistent_cache/block_cache_tier_file.cc index be847cc34..04fb38034 100644 --- a/utilities/persistent_cache/block_cache_tier_file.cc +++ b/utilities/persistent_cache/block_cache_tier_file.cc @@ -14,6 +14,7 @@ #include #include "util/crc32c.h" +#include "port/port.h" namespace rocksdb { @@ -519,7 +520,7 @@ ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth, const size_t io_size) : Writer(cache), io_size_(io_size) { for (size_t i = 0; i < qdepth; ++i) { - std::thread th(&ThreadedWriter::ThreadMain, this); + port::Thread th(&ThreadedWriter::ThreadMain, this); threads_.push_back(std::move(th)); } } diff --git a/utilities/persistent_cache/block_cache_tier_file.h b/utilities/persistent_cache/block_cache_tier_file.h index c2311ffcb..9d2d0e9ee 100644 --- a/utilities/persistent_cache/block_cache_tier_file.h +++ b/utilities/persistent_cache/block_cache_tier_file.h @@ -284,7 +284,7 @@ class ThreadedWriter : public Writer { const size_t io_size_ = 0; BoundedQueue q_; - std::vector threads_; + std::vector threads_; }; } // namespace rocksdb diff --git a/utilities/persistent_cache/persistent_cache_bench.cc b/utilities/persistent_cache/persistent_cache_bench.cc index 5b0152059..866eb18c4 100644 --- a/utilities/persistent_cache/persistent_cache_bench.cc +++ b/utilities/persistent_cache/persistent_cache_bench.cc @@ -17,7 +17,7 @@ #include "utilities/persistent_cache/persistent_cache_tier.h" #include "utilities/persistent_cache/volatile_tier_impl.h" -#include "port/port_posix.h" +#include "port/port.h" #include "table/block_builder.h" #include "util/histogram.h" #include "util/mutexlock.h" @@ -116,7 +116,7 @@ class CacheTierBenchmark { stats_.Clear(); // Start IO threads - std::list threads; + std::list threads; Spawn(FLAGS_nthread_write, &threads, std::bind(&CacheTierBenchmark::Write, this)); Spawn(FLAGS_nthread_read, &threads, @@ -252,7 +252,7 @@ class CacheTierBenchmark { } // spawn threads - void Spawn(const size_t n, std::list* threads, + void Spawn(const size_t n, std::list* threads, const std::function& fn) { for (size_t i = 0; i < n; ++i) { threads->emplace_back(fn); @@ -260,7 +260,7 @@ class CacheTierBenchmark { } // join threads - void Join(std::list* threads) { + void Join(std::list* threads) { for (auto& th : *threads) { th.join(); } diff --git a/utilities/persistent_cache/persistent_cache_test.h b/utilities/persistent_cache/persistent_cache_test.h index 9991c0761..340c909df 100644 --- a/utilities/persistent_cache/persistent_cache_test.h +++ b/utilities/persistent_cache/persistent_cache_test.h @@ -21,6 +21,7 @@ #include "db/db_test_util.h" #include "rocksdb/cache.h" #include "table/block_builder.h" +#include "port/port.h" #include "util/arena.h" #include "util/testharness.h" #include "utilities/persistent_cache/volatile_tier_impl.h" @@ -50,17 +51,17 @@ class PersistentCacheTierTest : public testing::Test { // create threaded workload template - std::list SpawnThreads(const size_t n, const T& fn) { - std::list threads; + std::list SpawnThreads(const size_t n, const T& fn) { + std::list threads; for (size_t i = 0; i < n; i++) { - std::thread th(fn); + port::Thread th(fn); threads.push_back(std::move(th)); } return threads; } // Wait for threads to join - void Join(std::list&& threads) { + void Join(std::list&& threads) { for (auto& th : threads) { th.join(); } diff --git a/utilities/spatialdb/spatial_db.cc b/utilities/spatialdb/spatial_db.cc index 496467a2b..ec10ce78f 100644 --- a/utilities/spatialdb/spatial_db.cc +++ b/utilities/spatialdb/spatial_db.cc @@ -31,6 +31,7 @@ #include "rocksdb/utilities/stackable_db.h" #include "util/coding.h" #include "utilities/spatialdb/utils.h" +#include "port/port.h" namespace rocksdb { namespace spatial { @@ -603,7 +604,7 @@ class SpatialDBImpl : public SpatialDB { Status s; int threads_running = 0; - std::vector threads; + std::vector threads; for (auto cfh : column_families) { threads.emplace_back([&, cfh] { diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index 4842b7aac..ce01ef054 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -17,6 +17,7 @@ #include "util/random.h" #include "util/testharness.h" #include "util/transaction_test_util.h" +#include "port/port.h" using std::string; @@ -1326,7 +1327,7 @@ TEST_F(OptimisticTransactionTest, OptimisticTransactionStressTest) { // Setting the key-space to be 100 keys should cause enough write-conflicts // to make this test interesting. - std::vector threads; + std::vector threads; std::function call_inserter = [&] { ASSERT_OK(OptimisticTransactionStressTestInserter( diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 39efa1ccc..0866f4ffd 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -27,6 +27,8 @@ #include "utilities/merge_operators.h" #include "utilities/merge_operators/string_append/stringappend.h" +#include "port/port.h" + using std::string; namespace rocksdb { @@ -386,7 +388,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); // We want the leaf transactions to block and hold everyone back. - std::vector threads; + std::vector threads; for (uint32_t i = 0; i < 15; i++) { std::function blocking_thread = [&, i] { auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr, @@ -455,7 +457,7 @@ TEST_P(TransactionTest, DeadlockCycle) { // We want the last transaction in the chain to block and hold everyone // back. - std::vector threads; + std::vector threads; for (uint32_t i = 0; i < len - 1; i++) { std::function blocking_thread = [&, i] { auto s = @@ -534,7 +536,7 @@ TEST_P(TransactionTest, DeadlockStress) { } }; - std::vector threads; + std::vector threads; for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) { threads.emplace_back(stress_thread, rnd.Next()); } @@ -1034,7 +1036,7 @@ TEST_P(TransactionTest, TwoPhaseMultiThreadTest) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); // do all the writes - std::vector threads; + std::vector threads; for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) { threads.emplace_back(txn_write_thread); } @@ -4446,7 +4448,7 @@ TEST_P(TransactionTest, TransactionStressTest) { // Setting the key-space to be 100 keys should cause enough write-conflicts // to make this test interesting. - std::vector threads; + std::vector threads; std::function call_inserter = [&] { ASSERT_OK(TransactionStressTestInserter(db, num_transactions_per_thread,