Windows thread

Summary:
introduce new methods into a public threadpool interface,
- allow submission of std::functions as they allow greater flexibility.
- add Joining methods to the implementation to join scheduled and submitted jobs with
  an option to cancel jobs that did not start executing.
- Remove ugly `#ifdefs` between pthread and std implementation, make it uniform.
- introduce pimpl for a drop in replacement of the implementation
- Introduce rocksdb::port::Thread typedef which is a replacement for std::thread.  On Posix Thread defaults as before std::thread.
- Implement WindowsThread that allocates memory in a more controllable manner than windows std::thread with a replaceable implementation.
- should be no functionality changes.
Closes https://github.com/facebook/rocksdb/pull/1823

Differential Revision: D4492902

Pulled By: siying

fbshipit-source-id: c74cb11
main
Dmitri Smirnov 8 years ago committed by Facebook Github Bot
parent 1aaa898cf1
commit 0a4cdde50a
  1. 2
      CMakeLists.txt
  2. 4
      db/auto_roll_logger_test.cc
  3. 23
      db/column_family_test.cc
  4. 3
      db/compact_files_test.cc
  5. 5
      db/compaction_iterator_test.cc
  6. 2
      db/compaction_job.cc
  7. 5
      db/db_compaction_test.cc
  8. 5
      db/db_dynamic_level_test.cc
  9. 2
      db/db_iterator_test.cc
  10. 9
      db/db_test.cc
  11. 17
      db/db_test2.cc
  12. 5
      db/db_wal_test.cc
  13. 14
      db/external_sst_file_test.cc
  14. 11
      db/forward_iterator_bench.cc
  15. 12
      db/memtablerep_bench.cc
  16. 3
      db/perf_context_test.cc
  17. 3
      db/version_builder.cc
  18. 3
      db/write_callback_test.cc
  19. 19
      include/rocksdb/threadpool.h
  20. 3
      port/port_posix.h
  21. 3
      port/win/env_win.cc
  22. 16
      port/win/env_win.h
  23. 5
      port/win/port_win.h
  24. 166
      port/win/win_thread.cc
  25. 121
      port/win/win_thread.h
  26. 5
      table/table_test.cc
  27. 4
      tools/db_bench_tool.cc
  28. 5
      tools/write_stress.cc
  29. 2
      util/delete_scheduler.cc
  30. 2
      util/delete_scheduler.h
  31. 2
      util/delete_scheduler_test.cc
  32. 2
      util/dynamic_bloom_test.cc
  33. 2
      util/env_posix.cc
  34. 2
      util/thread_local_test.cc
  35. 422
      util/threadpool_imp.cc
  36. 138
      util/threadpool_imp.h
  37. 2
      utilities/backupable/backupable_db.cc
  38. 2
      utilities/backupable/backupable_db_test.cc
  39. 7
      utilities/checkpoint/checkpoint_test.cc
  40. 3
      utilities/persistent_cache/block_cache_tier.cc
  41. 2
      utilities/persistent_cache/block_cache_tier.h
  42. 3
      utilities/persistent_cache/block_cache_tier_file.cc
  43. 2
      utilities/persistent_cache/block_cache_tier_file.h
  44. 8
      utilities/persistent_cache/persistent_cache_bench.cc
  45. 9
      utilities/persistent_cache/persistent_cache_test.h
  46. 3
      utilities/spatialdb/spatial_db.cc
  47. 3
      utilities/transactions/optimistic_transaction_test.cc
  48. 12
      utilities/transactions/transaction_test.cc

@ -377,6 +377,7 @@ set(SOURCES
util/histogram_windowing.cc util/histogram_windowing.cc
util/instrumented_mutex.cc util/instrumented_mutex.cc
util/iostats_context.cc util/iostats_context.cc
util/lru_cache.cc util/lru_cache.cc
tools/ldb_cmd.cc tools/ldb_cmd.cc
tools/ldb_tool.cc tools/ldb_tool.cc
@ -461,6 +462,7 @@ if(WIN32)
port/win/env_default.cc port/win/env_default.cc
port/win/port_win.cc port/win/port_win.cc
port/win/win_logger.cc port/win/win_logger.cc
port/win/win_thread.cc
port/win/xpress_win.cc) port/win/xpress_win.cc)
else() else()
list(APPEND SOURCES list(APPEND SOURCES

@ -271,7 +271,7 @@ TEST_F(AutoRollLoggerTest, LogFlushWhileRolling) {
AutoRollLogger* auto_roll_logger = AutoRollLogger* auto_roll_logger =
dynamic_cast<AutoRollLogger*>(logger.get()); dynamic_cast<AutoRollLogger*>(logger.get());
ASSERT_TRUE(auto_roll_logger); ASSERT_TRUE(auto_roll_logger);
std::thread flush_thread; rocksdb::port::Thread flush_thread;
// Notes: // Notes:
// (1) Need to pin the old logger before beginning the roll, as rolling grabs // (1) Need to pin the old logger before beginning the roll, as rolling grabs
@ -293,7 +293,7 @@ TEST_F(AutoRollLoggerTest, LogFlushWhileRolling) {
{"AutoRollLogger::Flush:PinnedLogger", "PosixLogger::Flush:Begin2"}}); {"AutoRollLogger::Flush:PinnedLogger", "PosixLogger::Flush:Begin2"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
flush_thread = std::thread([&]() { auto_roll_logger->Flush(); }); flush_thread = port::Thread ([&]() { auto_roll_logger->Flush(); });
TEST_SYNC_POINT( TEST_SYNC_POINT(
"AutoRollLoggerTest::LogFlushWhileRolling:PreRollAndPostThreadInit"); "AutoRollLoggerTest::LogFlushWhileRolling:PreRollAndPostThreadInit");
RollLogFileBySizeTest(auto_roll_logger, options.max_log_file_size, RollLogFileBySizeTest(auto_roll_logger, options.max_log_file_size,

@ -15,6 +15,7 @@
#include "db/db_impl.h" #include "db/db_impl.h"
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
@ -1268,7 +1269,7 @@ TEST_F(ColumnFamilyTest, MultipleManualCompactions) {
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::vector<std::thread> threads; std::vector<port::Thread> threads;
threads.emplace_back([&] { threads.emplace_back([&] {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false; compact_options.exclusive_manual_compaction = false;
@ -1376,7 +1377,7 @@ TEST_F(ColumnFamilyTest, AutomaticAndManualCompactions) {
WaitForFlush(2); WaitForFlush(2);
AssertFilesPerLevel(ToString(i + 1), 2); AssertFilesPerLevel(ToString(i + 1), 2);
} }
std::thread threads([&] { rocksdb::port::Thread threads([&] {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false; compact_options.exclusive_manual_compaction = false;
ASSERT_OK( ASSERT_OK(
@ -1464,7 +1465,7 @@ TEST_F(ColumnFamilyTest, ManualAndAutomaticCompactions) {
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread threads([&] { rocksdb::port::Thread threads([&] {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false; compact_options.exclusive_manual_compaction = false;
ASSERT_OK( ASSERT_OK(
@ -1557,7 +1558,7 @@ TEST_F(ColumnFamilyTest, SameCFManualManualCompactions) {
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread threads([&] { rocksdb::port::Thread threads([&] {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = true; compact_options.exclusive_manual_compaction = true;
ASSERT_OK( ASSERT_OK(
@ -1577,7 +1578,7 @@ TEST_F(ColumnFamilyTest, SameCFManualManualCompactions) {
1); 1);
} }
std::thread threads1([&] { rocksdb::port::Thread threads1([&] {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false; compact_options.exclusive_manual_compaction = false;
ASSERT_OK( ASSERT_OK(
@ -1655,7 +1656,7 @@ TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactions) {
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread threads([&] { rocksdb::port::Thread threads([&] {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false; compact_options.exclusive_manual_compaction = false;
ASSERT_OK( ASSERT_OK(
@ -1747,7 +1748,7 @@ TEST_F(ColumnFamilyTest, SameCFManualAutomaticCompactionsLevel) {
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread threads([&] { rocksdb::port::Thread threads([&] {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false; compact_options.exclusive_manual_compaction = false;
ASSERT_OK( ASSERT_OK(
@ -1858,7 +1859,7 @@ TEST_F(ColumnFamilyTest, SameCFManualAutomaticConflict) {
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread threads([&] { rocksdb::port::Thread threads([&] {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = false; compact_options.exclusive_manual_compaction = false;
ASSERT_OK( ASSERT_OK(
@ -2323,7 +2324,7 @@ TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) {
int kKeysNum = 10000; int kKeysNum = 10000;
PutRandomData(1, kKeysNum, 100); PutRandomData(1, kKeysNum, 100);
std::vector<std::thread> threads; std::vector<port::Thread> threads;
threads.emplace_back([&] { ASSERT_OK(db_->DropColumnFamily(handles_[1])); }); threads.emplace_back([&] { ASSERT_OK(db_->DropColumnFamily(handles_[1])); });
sleeping_task.WakeUp(); sleeping_task.WakeUp();
@ -2424,7 +2425,7 @@ TEST_F(ColumnFamilyTest, CreateAndDropRace) {
// Start a thread that will drop the first column family // Start a thread that will drop the first column family
// and its comparator // and its comparator
std::thread drop_cf_thread(DropSingleColumnFamily, this, 1, &comparators); rocksdb::port::Thread drop_cf_thread(DropSingleColumnFamily, this, 1, &comparators);
DropColumnFamilies({2}); DropColumnFamilies({2});
@ -3078,7 +3079,7 @@ TEST_F(ColumnFamilyTest, LogSyncConflictFlush) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread thread([&] { db_->SyncWAL(); }); rocksdb::port::Thread thread([&] { db_->SyncWAL(); });
TEST_SYNC_POINT("ColumnFamilyTest::LogSyncConflictFlush:1"); TEST_SYNC_POINT("ColumnFamilyTest::LogSyncConflictFlush:1");
Flush(1); Flush(1);

@ -11,6 +11,7 @@
#include <vector> #include <vector>
#include "db/db_impl.h" #include "db/db_impl.h"
#include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "util/string_util.h" #include "util/string_util.h"
@ -231,7 +232,7 @@ TEST_F(CompactFilesTest, CapturingPendingFiles) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Start compacting files. // Start compacting files.
std::thread compaction_thread( rocksdb::port::Thread compaction_thread(
[&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); }); [&] { EXPECT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1)); });
// In the meantime flush another file. // In the meantime flush another file.

@ -8,6 +8,7 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "port/port.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/testutil.h" #include "util/testutil.h"
@ -393,7 +394,7 @@ TEST_F(CompactionIteratorTest, ShuttingDownInFilter) {
compaction_proxy_->key_not_exists_beyond_output_level = true; compaction_proxy_->key_not_exists_beyond_output_level = true;
std::atomic<bool> seek_done{false}; std::atomic<bool> seek_done{false};
std::thread compaction_thread([&] { rocksdb::port::Thread compaction_thread([&] {
c_iter_->SeekToFirst(); c_iter_->SeekToFirst();
EXPECT_FALSE(c_iter_->Valid()); EXPECT_FALSE(c_iter_->Valid());
EXPECT_TRUE(c_iter_->status().IsShutdownInProgress()); EXPECT_TRUE(c_iter_->status().IsShutdownInProgress());
@ -429,7 +430,7 @@ TEST_F(CompactionIteratorTest, ShuttingDownInMerge) {
compaction_proxy_->key_not_exists_beyond_output_level = true; compaction_proxy_->key_not_exists_beyond_output_level = true;
std::atomic<bool> seek_done{false}; std::atomic<bool> seek_done{false};
std::thread compaction_thread([&] { rocksdb::port::Thread compaction_thread([&] {
c_iter_->SeekToFirst(); c_iter_->SeekToFirst();
ASSERT_FALSE(c_iter_->Valid()); ASSERT_FALSE(c_iter_->Valid());
ASSERT_TRUE(c_iter_->status().IsShutdownInProgress()); ASSERT_TRUE(c_iter_->status().IsShutdownInProgress());

@ -522,7 +522,7 @@ Status CompactionJob::Run() {
const uint64_t start_micros = env_->NowMicros(); const uint64_t start_micros = env_->NowMicros();
// Launch a thread for each of subcompactions 1...num_threads-1 // Launch a thread for each of subcompactions 1...num_threads-1
std::vector<std::thread> thread_pool; std::vector<port::Thread> thread_pool;
thread_pool.reserve(num_threads - 1); thread_pool.reserve(num_threads - 1);
for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) { for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this, thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,

@ -9,6 +9,7 @@
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "port/port.h"
#include "rocksdb/experimental.h" #include "rocksdb/experimental.h"
#include "rocksdb/utilities/convenience.h" #include "rocksdb/utilities/convenience.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -1101,7 +1102,7 @@ TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
ASSERT_EQ(trivial_move, 6); ASSERT_EQ(trivial_move, 6);
ASSERT_EQ(non_trivial_move, 0); ASSERT_EQ(non_trivial_move, 0);
std::thread threads([&] { rocksdb::port::Thread threads([&] {
compact_options.change_level = false; compact_options.change_level = false;
compact_options.exclusive_manual_compaction = false; compact_options.exclusive_manual_compaction = false;
std::string begin_string = Key(0); std::string begin_string = Key(0);
@ -1233,7 +1234,7 @@ TEST_F(DBCompactionTest, ManualPartialFill) {
ASSERT_EQ(trivial_move, 2); ASSERT_EQ(trivial_move, 2);
ASSERT_EQ(non_trivial_move, 0); ASSERT_EQ(non_trivial_move, 0);
std::thread threads([&] { rocksdb::port::Thread threads([&] {
compact_options.change_level = false; compact_options.change_level = false;
compact_options.exclusive_manual_compaction = false; compact_options.exclusive_manual_compaction = false;
std::string begin_string = Key(0); std::string begin_string = Key(0);

@ -13,6 +13,7 @@
#if !defined(ROCKSDB_LITE) #if !defined(ROCKSDB_LITE)
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
namespace rocksdb { namespace rocksdb {
@ -251,7 +252,7 @@ TEST_F(DBTestDynamicLevel, DynamicLevelMaxBytesBase2) {
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread thread([this] { rocksdb::port::Thread thread([this] {
TEST_SYNC_POINT("DynamicLevelMaxBytesBase2:compact_range_start"); TEST_SYNC_POINT("DynamicLevelMaxBytesBase2:compact_range_start");
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
TEST_SYNC_POINT("DynamicLevelMaxBytesBase2:compact_range_finish"); TEST_SYNC_POINT("DynamicLevelMaxBytesBase2:compact_range_finish");
@ -462,7 +463,7 @@ TEST_F(DBTestDynamicLevel, DISABLED_MigrateToDynamicLevelMaxBytesBase) {
compaction_finished = false; compaction_finished = false;
// Issue manual compaction in one thread and still verify DB state // Issue manual compaction in one thread and still verify DB state
// in main thread. // in main thread.
std::thread t([&]() { rocksdb::port::Thread t([&]() {
CompactRangeOptions compact_options; CompactRangeOptions compact_options;
compact_options.change_level = true; compact_options.change_level = true;
compact_options.target_level = options.num_levels - 1; compact_options.target_level = options.num_levels - 1;

@ -1644,7 +1644,7 @@ TEST_F(DBIteratorTest, IteratorWithLocalStatistics) {
std::atomic<uint64_t> total_prev_found(0); std::atomic<uint64_t> total_prev_found(0);
std::atomic<uint64_t> total_bytes(0); std::atomic<uint64_t> total_bytes(0);
std::vector<std::thread> threads; std::vector<port::Thread> threads;
std::function<void()> reader_func_next = [&]() { std::function<void()> reader_func_next = [&]() {
Iterator* iter = db_->NewIterator(ReadOptions()); Iterator* iter = db_->NewIterator(ReadOptions());

@ -31,6 +31,7 @@
#include "db/version_set.h" #include "db/version_set.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "memtable/hash_linklist_rep.h" #include "memtable/hash_linklist_rep.h"
#include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h" #include "rocksdb/compaction_filter.h"
@ -998,7 +999,7 @@ TEST_F(DBTest, FlushSchedule) {
options.max_write_buffer_number = 2; options.max_write_buffer_number = 2;
options.write_buffer_size = 120 * 1024; options.write_buffer_size = 120 * 1024;
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
std::vector<std::thread> threads; std::vector<port::Thread> threads;
std::atomic<int> thread_num(0); std::atomic<int> thread_num(0);
// each column family will have 5 thread, each thread generating 2 memtables. // each column family will have 5 thread, each thread generating 2 memtables.
@ -3556,7 +3557,7 @@ TEST_F(DBTest, SanitizeNumThreads) {
} }
TEST_F(DBTest, WriteSingleThreadEntry) { TEST_F(DBTest, WriteSingleThreadEntry) {
std::vector<std::thread> threads; std::vector<port::Thread> threads;
dbfull()->TEST_LockMutex(); dbfull()->TEST_LockMutex();
auto w = dbfull()->TEST_BeginWrite(); auto w = dbfull()->TEST_BeginWrite();
threads.emplace_back([&] { Put("a", "b"); }); threads.emplace_back([&] { Put("a", "b"); });
@ -5434,7 +5435,7 @@ TEST_F(DBTest, FlushesInParallelWithCompactRange) {
} }
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::vector<std::thread> threads; std::vector<port::Thread> threads;
threads.emplace_back([&]() { Compact("a", "z"); }); threads.emplace_back([&]() { Compact("a", "z"); });
TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:1"); TEST_SYNC_POINT("DBTest::FlushesInParallelWithCompactRange:1");
@ -5840,7 +5841,7 @@ TEST_F(DBTest, PauseBackgroundWorkTest) {
options.write_buffer_size = 100000; // Small write buffer options.write_buffer_size = 100000; // Small write buffer
Reopen(options); Reopen(options);
std::vector<std::thread> threads; std::vector<port::Thread> threads;
std::atomic<bool> done(false); std::atomic<bool> done(false);
db_->PauseBackgroundWork(); db_->PauseBackgroundWork();
threads.emplace_back([&]() { threads.emplace_back([&]() {

@ -11,6 +11,7 @@
#include <functional> #include <functional>
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/persistent_cache.h" #include "rocksdb/persistent_cache.h"
#include "rocksdb/wal_filter.h" #include "rocksdb/wal_filter.h"
@ -1616,8 +1617,8 @@ TEST_F(DBTest2, SyncPointMarker) {
CountSyncPoint(); CountSyncPoint();
}; };
auto thread1 = std::thread(func1); auto thread1 = port::Thread(func1);
auto thread2 = std::thread(func2); auto thread2 = port::Thread(func2);
thread1.join(); thread1.join();
thread2.join(); thread2.join();
@ -1906,8 +1907,8 @@ TEST_P(MergeOperatorPinningTest, TailingIterator) {
delete iter; delete iter;
}; };
std::thread writer_thread(writer_func); rocksdb::port::Thread writer_thread(writer_func);
std::thread reader_thread(reader_func); rocksdb::port::Thread reader_thread(reader_func);
writer_thread.join(); writer_thread.join();
reader_thread.join(); reader_thread.join();
@ -2178,7 +2179,7 @@ TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) {
cro.exclusive_manual_compaction = false; cro.exclusive_manual_compaction = false;
ASSERT_OK(db_->CompactRange(cro, &k1s, &k2s)); ASSERT_OK(db_->CompactRange(cro, &k1s, &k2s));
}; };
std::thread bg_thread; rocksdb::port::Thread bg_thread;
// While the compaction is running, we will create 2 new files that // While the compaction is running, we will create 2 new files that
// can fit in L1, these 2 files will be moved to L1 and overlap with // can fit in L1, these 2 files will be moved to L1 and overlap with
@ -2199,7 +2200,7 @@ TEST_F(DBTest2, ManualCompactionOverlapManualCompaction) {
ASSERT_OK(Flush()); ASSERT_OK(Flush());
// Start a non-exclusive manual compaction in a bg thread // Start a non-exclusive manual compaction in a bg thread
bg_thread = std::thread(bg_manual_compact); bg_thread = port::Thread(bg_manual_compact);
// This manual compaction conflict with the other manual compaction // This manual compaction conflict with the other manual compaction
// so it should wait until the first compaction finish // so it should wait until the first compaction finish
env_->SleepForMicroseconds(1000000); env_->SleepForMicroseconds(1000000);
@ -2240,7 +2241,7 @@ TEST_F(DBTest2, GetRaceFlush1) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread t1([&] { rocksdb::port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::GetRaceFlush:1"); TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
ASSERT_OK(Put("foo", "v2")); ASSERT_OK(Put("foo", "v2"));
Flush(); Flush();
@ -2263,7 +2264,7 @@ TEST_F(DBTest2, GetRaceFlush2) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread t1([&] { port::Thread t1([&] {
TEST_SYNC_POINT("DBTest2::GetRaceFlush:1"); TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
ASSERT_OK(Put("foo", "v2")); ASSERT_OK(Put("foo", "v2"));
Flush(); Flush();

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "util/fault_injection_test_env.h" #include "util/fault_injection_test_env.h"
#include "util/options_helper.h" #include "util/options_helper.h"
@ -86,7 +87,7 @@ TEST_F(DBWALTest, SyncWALNotBlockWrite) {
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread thread([&]() { ASSERT_OK(db_->SyncWAL()); }); rocksdb::port::Thread thread([&]() { ASSERT_OK(db_->SyncWAL()); });
TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:1"); TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:1");
ASSERT_OK(Put("foo2", "bar2")); ASSERT_OK(Put("foo2", "bar2"));
@ -118,7 +119,7 @@ TEST_F(DBWALTest, SyncWALNotWaitWrite) {
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread thread([&]() { ASSERT_OK(Put("foo2", "bar2")); }); rocksdb::port::Thread thread([&]() { ASSERT_OK(Put("foo2", "bar2")); });
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
ASSERT_OK(db_->SyncWAL()); ASSERT_OK(db_->SyncWAL());
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");

@ -821,7 +821,7 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) {
ASSERT_TRUE(s.ok()) << s.ToString(); ASSERT_TRUE(s.ok()) << s.ToString();
}; };
// Write num_files files in parallel // Write num_files files in parallel
std::vector<std::thread> sst_writer_threads; std::vector<port::Thread> sst_writer_threads;
for (int i = 0; i < num_files; ++i) { for (int i = 0; i < num_files; ++i) {
sst_writer_threads.emplace_back(write_file_func); sst_writer_threads.emplace_back(write_file_func);
} }
@ -864,7 +864,7 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) {
}; };
// Bulk load num_files files in parallel // Bulk load num_files files in parallel
std::vector<std::thread> add_file_threads; std::vector<port::Thread> add_file_threads;
DestroyAndReopen(options); DestroyAndReopen(options);
for (int i = 0; i < num_files; ++i) { for (int i = 0; i < num_files; ++i) {
add_file_threads.emplace_back(load_file_func); add_file_threads.emplace_back(load_file_func);
@ -1108,13 +1108,13 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// While writing the MANIFEST start a thread that will ask for compaction // While writing the MANIFEST start a thread that will ask for compaction
std::thread bg_compact([&]() { rocksdb::port::Thread bg_compact([&]() {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
}); });
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2"); TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2");
// Start a thread that will ingest a new file // Start a thread that will ingest a new file
std::thread bg_addfile([&]() { rocksdb::port::Thread bg_addfile([&]() {
file_keys = {1, 2, 3}; file_keys = {1, 2, 3};
ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, 1)); ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, 1));
}); });
@ -1169,7 +1169,7 @@ TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) {
ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id)); ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id));
}; };
std::vector<std::thread> threads; std::vector<port::Thread> threads;
while (range_id < 5000) { while (range_id < 5000) {
int range_start = range_id * 10; int range_start = range_id * 10;
int range_end = range_start + 10; int range_end = range_start + 10;
@ -1728,7 +1728,7 @@ TEST_F(ExternalSSTFileTest, CompactionDeadlock) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Start ingesting and extrnal file in the background // Start ingesting and extrnal file in the background
std::thread bg_ingest_file([&]() { rocksdb::port::Thread bg_ingest_file([&]() {
running_threads += 1; running_threads += 1;
ASSERT_OK(GenerateAndAddExternalFile(options, {5, 6})); ASSERT_OK(GenerateAndAddExternalFile(options, {5, 6}));
running_threads -= 1; running_threads -= 1;
@ -1748,7 +1748,7 @@ TEST_F(ExternalSSTFileTest, CompactionDeadlock) {
// This thread will try to insert into the memtable but since we have 4 L0 // This thread will try to insert into the memtable but since we have 4 L0
// files this thread will be blocked and hold the writer thread // files this thread will be blocked and hold the writer thread
std::thread bg_block_put([&]() { rocksdb::port::Thread bg_block_put([&]() {
running_threads += 1; running_threads += 1;
ASSERT_OK(Put(Key(10), "memtable")); ASSERT_OK(Put(Key(10), "memtable"));
running_threads -= 1; running_threads -= 1;

@ -34,6 +34,7 @@ int main() { return 0; }
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "port/port.h"
#include "util/testharness.h" #include "util/testharness.h"
const int MAX_SHARDS = 100000; const int MAX_SHARDS = 100000;
@ -94,7 +95,7 @@ struct Reader {
explicit Reader(std::vector<ShardState>* shard_states, rocksdb::DB* db) explicit Reader(std::vector<ShardState>* shard_states, rocksdb::DB* db)
: shard_states_(shard_states), db_(db) { : shard_states_(shard_states), db_(db) {
sem_init(&sem_, 0, 0); sem_init(&sem_, 0, 0);
thread_ = std::thread(&Reader::run, this); thread_ = port::Thread(&Reader::run, this);
} }
void run() { void run() {
@ -193,7 +194,7 @@ struct Reader {
char pad1[128] __attribute__((__unused__)); char pad1[128] __attribute__((__unused__));
std::vector<ShardState>* shard_states_; std::vector<ShardState>* shard_states_;
rocksdb::DB* db_; rocksdb::DB* db_;
std::thread thread_; rocksdb::port::Thread thread_;
sem_t sem_; sem_t sem_;
std::mutex queue_mutex_; std::mutex queue_mutex_;
std::bitset<MAX_SHARDS + 1> shards_pending_set_; std::bitset<MAX_SHARDS + 1> shards_pending_set_;
@ -206,7 +207,7 @@ struct Writer {
explicit Writer(std::vector<ShardState>* shard_states, rocksdb::DB* db) explicit Writer(std::vector<ShardState>* shard_states, rocksdb::DB* db)
: shard_states_(shard_states), db_(db) {} : shard_states_(shard_states), db_(db) {}
void start() { thread_ = std::thread(&Writer::run, this); } void start() { thread_ = port::Thread(&Writer::run, this); }
void run() { void run() {
std::queue<std::chrono::steady_clock::time_point> workq; std::queue<std::chrono::steady_clock::time_point> workq;
@ -263,7 +264,7 @@ struct Writer {
char pad1[128] __attribute__((__unused__)); char pad1[128] __attribute__((__unused__));
std::vector<ShardState>* shard_states_; std::vector<ShardState>* shard_states_;
rocksdb::DB* db_; rocksdb::DB* db_;
std::thread thread_; rocksdb::port::Thread thread_;
char pad2[128] __attribute__((__unused__)); char pad2[128] __attribute__((__unused__));
}; };
@ -313,7 +314,7 @@ struct StatsThread {
rocksdb::DB* db_; rocksdb::DB* db_;
std::mutex cvm_; std::mutex cvm_;
std::condition_variable cv_; std::condition_variable cv_;
std::thread thread_; rocksdb::port::Thread thread_;
std::atomic<bool> done_{false}; std::atomic<bool> done_{false};
}; };

@ -426,7 +426,7 @@ class Benchmark {
virtual ~Benchmark() {} virtual ~Benchmark() {}
virtual void Run() { virtual void Run() {
std::cout << "Number of threads: " << num_threads_ << std::endl; std::cout << "Number of threads: " << num_threads_ << std::endl;
std::vector<std::thread> threads; std::vector<port::Thread> threads;
uint64_t bytes_written = 0; uint64_t bytes_written = 0;
uint64_t bytes_read = 0; uint64_t bytes_read = 0;
uint64_t read_hits = 0; uint64_t read_hits = 0;
@ -457,7 +457,7 @@ class Benchmark {
} }
} }
virtual void RunThreads(std::vector<std::thread>* threads, virtual void RunThreads(std::vector<port::Thread>* threads,
uint64_t* bytes_written, uint64_t* bytes_read, uint64_t* bytes_written, uint64_t* bytes_read,
bool write, uint64_t* read_hits) = 0; bool write, uint64_t* read_hits) = 0;
@ -478,7 +478,7 @@ class FillBenchmark : public Benchmark {
num_write_ops_per_thread_ = FLAGS_num_operations; num_write_ops_per_thread_ = FLAGS_num_operations;
} }
void RunThreads(std::vector<std::thread>* threads, uint64_t* bytes_written, void RunThreads(std::vector<port::Thread>* threads, uint64_t* bytes_written,
uint64_t* bytes_read, bool write, uint64_t* bytes_read, bool write,
uint64_t* read_hits) override { uint64_t* read_hits) override {
FillBenchmarkThread(table_, key_gen_, bytes_written, bytes_read, sequence_, FillBenchmarkThread(table_, key_gen_, bytes_written, bytes_read, sequence_,
@ -494,7 +494,7 @@ class ReadBenchmark : public Benchmark {
num_read_ops_per_thread_ = FLAGS_num_operations / FLAGS_num_threads; num_read_ops_per_thread_ = FLAGS_num_operations / FLAGS_num_threads;
} }
void RunThreads(std::vector<std::thread>* threads, uint64_t* bytes_written, void RunThreads(std::vector<port::Thread>* threads, uint64_t* bytes_written,
uint64_t* bytes_read, bool write, uint64_t* bytes_read, bool write,
uint64_t* read_hits) override { uint64_t* read_hits) override {
for (int i = 0; i < FLAGS_num_threads; ++i) { for (int i = 0; i < FLAGS_num_threads; ++i) {
@ -518,7 +518,7 @@ class SeqReadBenchmark : public Benchmark {
num_read_ops_per_thread_ = FLAGS_num_scans; num_read_ops_per_thread_ = FLAGS_num_scans;
} }
void RunThreads(std::vector<std::thread>* threads, uint64_t* bytes_written, void RunThreads(std::vector<port::Thread>* threads, uint64_t* bytes_written,
uint64_t* bytes_read, bool write, uint64_t* bytes_read, bool write,
uint64_t* read_hits) override { uint64_t* read_hits) override {
for (int i = 0; i < FLAGS_num_threads; ++i) { for (int i = 0; i < FLAGS_num_threads; ++i) {
@ -545,7 +545,7 @@ class ReadWriteBenchmark : public Benchmark {
num_write_ops_per_thread_ = FLAGS_num_operations; num_write_ops_per_thread_ = FLAGS_num_operations;
} }
void RunThreads(std::vector<std::thread>* threads, uint64_t* bytes_written, void RunThreads(std::vector<port::Thread>* threads, uint64_t* bytes_written,
uint64_t* bytes_read, bool write, uint64_t* bytes_read, bool write,
uint64_t* read_hits) override { uint64_t* read_hits) override {
std::atomic_int threads_done; std::atomic_int threads_done;

@ -12,6 +12,7 @@
#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h"
#include "rocksdb/perf_context.h" #include "rocksdb/perf_context.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "port/port.h"
#include "util/histogram.h" #include "util/histogram.h"
#include "util/instrumented_mutex.h" #include "util/instrumented_mutex.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
@ -561,7 +562,7 @@ TEST_F(PerfContextTest, DBMutexLockCounter) {
for (int c = 0; c < 2; ++c) { for (int c = 0; c < 2; ++c) {
InstrumentedMutex mutex(nullptr, Env::Default(), stats_code[c]); InstrumentedMutex mutex(nullptr, Env::Default(), stats_code[c]);
mutex.Lock(); mutex.Lock();
std::thread child_thread([&] { rocksdb::port::Thread child_thread([&] {
SetPerfLevel(perf_level); SetPerfLevel(perf_level);
perf_context.Reset(); perf_context.Reset();
ASSERT_EQ(perf_context.db_mutex_lock_nanos, 0); ASSERT_EQ(perf_context.db_mutex_lock_nanos, 0);

@ -28,6 +28,7 @@
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "port/port.h"
#include "table/table_reader.h" #include "table/table_reader.h"
namespace rocksdb { namespace rocksdb {
@ -359,7 +360,7 @@ class VersionBuilder::Rep {
if (max_threads <= 1) { if (max_threads <= 1) {
load_handlers_func(); load_handlers_func();
} else { } else {
std::vector<std::thread> threads; std::vector<port::Thread> threads;
for (int i = 0; i < max_threads; i++) { for (int i = 0; i < max_threads; i++) {
threads.emplace_back(load_handlers_func); threads.emplace_back(load_handlers_func);
} }

@ -14,6 +14,7 @@
#include "db/write_callback.h" #include "db/write_callback.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "port/port.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/sync_point.h" #include "util/sync_point.h"
#include "util/testharness.h" #include "util/testharness.h"
@ -241,7 +242,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// do all the writes // do all the writes
std::vector<std::thread> threads; std::vector<port::Thread> threads;
for (uint32_t i = 0; i < write_group.size(); i++) { for (uint32_t i = 0; i < write_group.size(); i++) {
threads.emplace_back(write_with_callback_func); threads.emplace_back(write_with_callback_func);
} }

@ -8,6 +8,8 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once #pragma once
#include <functional>
namespace rocksdb { namespace rocksdb {
/* /*
@ -20,6 +22,8 @@ class ThreadPool {
virtual ~ThreadPool() {} virtual ~ThreadPool() {}
// Wait for all threads to finish. // Wait for all threads to finish.
// Discard those threads that did not start
// executing
virtual void JoinAllThreads() = 0; virtual void JoinAllThreads() = 0;
// Set the number of background threads that will be executing the // Set the number of background threads that will be executing the
@ -28,6 +32,21 @@ class ThreadPool {
// Get the number of jobs scheduled in the ThreadPool queue. // Get the number of jobs scheduled in the ThreadPool queue.
virtual unsigned int GetQueueLen() const = 0; virtual unsigned int GetQueueLen() const = 0;
// Waits for all jobs to complete those
// that already started running and those that did not
// start yet. This ensures that everything that was thrown
// on the TP runs even though
// we may not have specified enough threads for the amount
// of jobs
virtual void WaitForJobsAndJoinAllThreads() = 0;
// Submit a fire and forget jobs
// This allows to submit the same job multiple times
virtual void SubmitJob(const std::function<void()>&) = 0;
// This moves the function in for efficiency
virtual void SubmitJob(std::function<void()>&&) = 0;
}; };
// NewThreadPool() is a function that could be used to create a ThreadPool // NewThreadPool() is a function that could be used to create a ThreadPool

@ -11,6 +11,7 @@
#pragma once #pragma once
#include <thread>
// size_t printf formatting named in the manner of C99 standard formatting // size_t printf formatting named in the manner of C99 standard formatting
// strings such as PRIu64 // strings such as PRIu64
// in fact, we could use that one // in fact, we could use that one
@ -156,6 +157,8 @@ class CondVar {
Mutex* mu_; Mutex* mu_;
}; };
using Thread = std::thread;
static inline void AsmVolatilePause() { static inline void AsmVolatilePause() {
#if defined(__i386__) || defined(__x86_64__) #if defined(__i386__) || defined(__x86_64__)
asm volatile("pause"); asm volatile("pause");

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "port/win/env_win.h" #include "port/win/env_win.h"
#include "port/win/win_thread.h"
#include <algorithm> #include <algorithm>
#include <ctime> #include <ctime>
#include <thread> #include <thread>
@ -837,7 +838,7 @@ void WinEnvThreads::StartThread(void(*function)(void* arg), void* arg) {
state->arg = arg; state->arg = arg;
try { try {
std::thread th(&StartThreadWrapper, state.get()); rocksdb::port::WindowsThread th(&StartThreadWrapper, state.get());
state.release(); state.release();
std::lock_guard<std::mutex> lg(mu_); std::lock_guard<std::mutex> lg(mu_);

@ -16,11 +16,21 @@
#pragma once #pragma once
#include "port/win/win_thread.h"
#include <rocksdb/env.h> #include <rocksdb/env.h>
#include "util/threadpool_imp.h" #include "util/threadpool_imp.h"
#include <stdint.h>
#include <Windows.h>
#include <mutex> #include <mutex>
#include <vector> #include <vector>
#include <string>
#undef GetCurrentTime
#undef DeleteFile
#undef GetTickCount
namespace rocksdb { namespace rocksdb {
namespace port { namespace port {
@ -64,7 +74,7 @@ private:
Env* hosted_env_; Env* hosted_env_;
mutable std::mutex mu_; mutable std::mutex mu_;
std::vector<ThreadPoolImpl> thread_pools_; std::vector<ThreadPoolImpl> thread_pools_;
std::vector<std::thread> threads_to_join_; std::vector<WindowsThread> threads_to_join_;
}; };
@ -281,5 +291,5 @@ private:
WinEnvThreads winenv_threads_; WinEnvThreads winenv_threads_;
}; };
} } // namespace port
} } // namespace rocksdb

@ -30,6 +30,8 @@
#include <stdint.h> #include <stdint.h>
#include "port/win/win_thread.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#undef min #undef min
@ -206,6 +208,9 @@ class CondVar {
Mutex* mu_; Mutex* mu_;
}; };
// Wrapper around the platform efficient
// or otherwise preferrable implementation
using Thread = WindowsThread;
// OnceInit type helps emulate // OnceInit type helps emulate
// Posix semantics with initialization // Posix semantics with initialization

@ -0,0 +1,166 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "port/win/win_thread.h"
#include <assert.h>
#include <process.h> // __beginthreadex
#include <Windows.h>
#include <stdexcept>
#include <system_error>
#include <thread>
namespace rocksdb {
namespace port {
struct WindowsThread::Data {
std::function<void()> func_;
uintptr_t handle_;
Data(std::function<void()>&& func) :
func_(std::move(func)),
handle_(0) {
}
Data(const Data&) = delete;
Data& operator=(const Data&) = delete;
static unsigned int __stdcall ThreadProc(void* arg);
};
void WindowsThread::Init(std::function<void()>&& func) {
data_.reset(new Data(std::move(func)));
data_->handle_ = _beginthreadex(NULL,
0, // stack size
&Data::ThreadProc,
data_.get(),
0, // init flag
&th_id_);
if (data_->handle_ == 0) {
throw std::system_error(std::make_error_code(
std::errc::resource_unavailable_try_again),
"Unable to create a thread");
}
}
WindowsThread::WindowsThread() :
data_(nullptr),
th_id_(0)
{}
WindowsThread::~WindowsThread() {
// Must be joined or detached
// before destruction.
// This is the same as std::thread
if (data_) {
if (joinable()) {
assert(false);
std::terminate();
}
data_.reset();
}
}
WindowsThread::WindowsThread(WindowsThread&& o) noexcept :
WindowsThread() {
*this = std::move(o);
}
WindowsThread& WindowsThread::operator=(WindowsThread&& o) noexcept {
if (joinable()) {
assert(false);
std::terminate();
}
data_ = std::move(o.data_);
// Per spec both instances will have the same id
th_id_ = o.th_id_;
return *this;
}
bool WindowsThread::joinable() const {
return (data_ && data_->handle_ != 0);
}
WindowsThread::native_handle_type WindowsThread::native_handle() const {
return reinterpret_cast<native_handle_type>(data_->handle_);
}
unsigned WindowsThread::hardware_concurrency() {
return std::thread::hardware_concurrency();
}
void WindowsThread::join() {
if (!joinable()) {
assert(false);
throw std::system_error(
std::make_error_code(std::errc::invalid_argument),
"Thread is no longer joinable");
}
if (GetThreadId(GetCurrentThread()) == th_id_) {
assert(false);
throw std::system_error(
std::make_error_code(std::errc::resource_deadlock_would_occur),
"Can not join itself");
}
auto ret = WaitForSingleObject(reinterpret_cast<HANDLE>(data_->handle_),
INFINITE);
if (ret != WAIT_OBJECT_0) {
auto lastError = GetLastError();
assert(false);
throw std::system_error(static_cast<int>(lastError),
std::system_category(),
"WaitForSingleObjectFailed");
}
CloseHandle(reinterpret_cast<HANDLE>(data_->handle_));
data_->handle_ = 0;
}
bool WindowsThread::detach() {
if (!joinable()) {
assert(false);
throw std::system_error(
std::make_error_code(std::errc::invalid_argument),
"Thread is no longer available");
}
BOOL ret = CloseHandle(reinterpret_cast<HANDLE>(data_->handle_));
data_->handle_ = 0;
return (ret == TRUE);
}
void WindowsThread::swap(WindowsThread& o) {
data_.swap(o.data_);
std::swap(th_id_, o.th_id_);
}
unsigned int __stdcall WindowsThread::Data::ThreadProc(void* arg) {
auto data = reinterpret_cast<WindowsThread::Data*>(arg);
data->func_();
_endthreadex(0);
return 0;
}
} // namespace port
} // namespace rocksdb

@ -0,0 +1,121 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <memory>
#include <functional>
#include <type_traits>
namespace rocksdb {
namespace port {
// This class is a replacement for std::thread
// 2 reasons we do not like std::thread:
// -- is that it dynamically allocates its internals that are automatically
// freed when the thread terminates and not on the destruction of the
// object. This makes it difficult to control the source of memory
// allocation
// - This implements Pimpl so we can easily replace the guts of the
// object in our private version if necessary.
class WindowsThread {
struct Data;
std::unique_ptr<Data> data_;
unsigned int th_id_;
void Init(std::function<void()>&&);
public:
typedef void* native_handle_type;
// Construct with no thread
WindowsThread();
// Template constructor
//
// This templated constructor accomplishes several things
//
// - Allows the class as whole to be not a template
//
// - take "universal" references to support both _lvalues and _rvalues
//
// - because this constructor is a catchall case in many respects it
// may prevent us from using both the default __ctor, the move __ctor.
// Also it may circumvent copy __ctor deletion. To work around this
// we make sure this one has at least one argument and eliminate
// it from the overload selection when WindowsThread is the first
// argument.
//
// - construct with Fx(Ax...) with a variable number of types/arguments.
//
// - Gathers together the callable object with its arguments and constructs
// a single callable entity
//
// - Makes use of std::function to convert it to a specification-template
// dependent type that both checks the signature conformance to ensure
// that all of the necessary arguments are provided and allows pimpl
// implementation.
template<class Fn,
class... Args,
class = typename std::enable_if<
!std::is_same<typename std::decay<Fn>::type,
WindowsThread>::value>::type>
explicit WindowsThread(Fn&& fx, Args&&... ax) :
WindowsThread() {
// Use binder to create a single callable entity
auto binder = std::bind(std::forward<Fn>(fx),
std::forward<Args>(ax)...);
// Use std::function to take advantage of the type erasure
// so we can still hide implementation within pimpl
// This also makes sure that the binder signature is compliant
std::function<void()> target = binder;
Init(std::move(target));
}
~WindowsThread();
WindowsThread(const WindowsThread&) = delete;
WindowsThread& operator=(const WindowsThread&) = delete;
WindowsThread(WindowsThread&&) noexcept;
WindowsThread& operator=(WindowsThread&&) noexcept;
bool joinable() const;
unsigned int get_id() const { return th_id_; }
native_handle_type native_handle() const;
static unsigned hardware_concurrency();
void join();
bool detach();
void swap(WindowsThread&);
};
} // namespace port
} // namespace rocksdb
namespace std {
inline
void swap(rocksdb::port::WindowsThread& th1,
rocksdb::port::WindowsThread& th2) {
th1.swap(th2);
}
} // namespace std

@ -21,6 +21,7 @@
#include "db/memtable.h" #include "db/memtable.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "memtable/stl_wrappers.h" #include "memtable/stl_wrappers.h"
#include "port/port.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
@ -2204,8 +2205,8 @@ TEST_F(BlockBasedTableTest, NewIndexIteratorLeak) {
std::unique_ptr<InternalIterator> iter(reader->NewIterator(ro)); std::unique_ptr<InternalIterator> iter(reader->NewIterator(ro));
}; };
auto thread1 = std::thread(func1); auto thread1 = port::Thread(func1);
auto thread2 = std::thread(func2); auto thread2 = port::Thread(func2);
thread1.join(); thread1.join();
thread2.join(); thread2.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();

@ -1213,7 +1213,7 @@ class ReporterAgent {
abort(); abort();
} }
reporting_thread_ = std::thread([&]() { SleepAndReport(); }); reporting_thread_ = port::Thread([&]() { SleepAndReport(); });
} }
~ReporterAgent() { ~ReporterAgent() {
@ -1273,7 +1273,7 @@ class ReporterAgent {
std::atomic<int64_t> total_ops_done_; std::atomic<int64_t> total_ops_done_;
int64_t last_report_; int64_t last_report_;
const uint64_t report_interval_secs_; const uint64_t report_interval_secs_;
std::thread reporting_thread_; rocksdb::port::Thread reporting_thread_;
std::mutex mutex_; std::mutex mutex_;
// will notify on stop // will notify on stop
std::condition_variable stop_cv_; std::condition_variable stop_cv_;

@ -66,12 +66,13 @@ int main() {
#include <string> #include <string>
#include <thread> #include <thread>
#include "db/filename.h"
#include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "db/filename.h"
using GFLAGS::ParseCommandLineFlags; using GFLAGS::ParseCommandLineFlags;
using GFLAGS::RegisterFlagValidator; using GFLAGS::RegisterFlagValidator;
@ -290,7 +291,7 @@ class WriteStress {
// frequently than the first one. // frequently than the first one.
std::atomic<char> key_prefix_[kPrefixSize]; std::atomic<char> key_prefix_[kPrefixSize];
std::atomic<bool> stop_; std::atomic<bool> stop_;
std::vector<std::thread> threads_; std::vector<port::Thread> threads_;
std::unique_ptr<DB> db_; std::unique_ptr<DB> db_;
}; };

@ -34,7 +34,7 @@ DeleteScheduler::DeleteScheduler(Env* env, const std::string& trash_dir,
bg_thread_.reset(); bg_thread_.reset();
} else { } else {
bg_thread_.reset( bg_thread_.reset(
new std::thread(&DeleteScheduler::BackgroundEmptyTrash, this)); new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));
} }
} }

@ -81,7 +81,7 @@ class DeleteScheduler {
// - closing_ value is set to true // - closing_ value is set to true
InstrumentedCondVar cv_; InstrumentedCondVar cv_;
// Background thread running BackgroundEmptyTrash // Background thread running BackgroundEmptyTrash
std::unique_ptr<std::thread> bg_thread_; std::unique_ptr<port::Thread> bg_thread_;
// Mutex to protect threads from file name conflicts // Mutex to protect threads from file name conflicts
InstrumentedMutex file_move_mu_; InstrumentedMutex file_move_mu_;
Logger* info_log_; Logger* info_log_;

@ -183,7 +183,7 @@ TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) {
// Delete dummy files using 10 threads and measure time spent to empty trash // Delete dummy files using 10 threads and measure time spent to empty trash
std::atomic<int> thread_num(0); std::atomic<int> thread_num(0);
std::vector<std::thread> threads; std::vector<port::Thread> threads;
std::function<void()> delete_thread = [&]() { std::function<void()> delete_thread = [&]() {
int idx = thread_num.fetch_add(1); int idx = thread_num.fetch_add(1);
int range_start = idx * num_files; int range_start = idx * num_files;

@ -245,7 +245,7 @@ TEST_F(DynamicBloomTest, concurrent_with_perf) {
uint32_t locality_limit = FLAGS_enable_perf ? 1 : 0; uint32_t locality_limit = FLAGS_enable_perf ? 1 : 0;
uint32_t num_threads = 4; uint32_t num_threads = 4;
std::vector<std::thread> threads; std::vector<port::Thread> threads;
for (uint32_t m = 1; m <= m_limit; ++m) { for (uint32_t m = 1; m <= m_limit; ++m) {
for (uint32_t locality = 0; locality <= locality_limit; ++locality) { for (uint32_t locality = 0; locality <= locality_limit; ++locality) {

@ -39,6 +39,8 @@
#include <deque> #include <deque>
#include <set> #include <set>
#include <vector> #include <vector>
#include "rocksdb/options.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "util/coding.h" #include "util/coding.h"

@ -563,7 +563,7 @@ TEST_F(ThreadLocalTest, DISABLED_MainThreadDiesFirst) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
try { try {
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
std::thread th(&AccessThreadLocal, nullptr); rocksdb::port::Thread th(&AccessThreadLocal, nullptr);
th.detach(); th.detach();
TEST_SYNC_POINT("MainThreadDiesFirst:End"); TEST_SYNC_POINT("MainThreadDiesFirst:End");
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE

@ -8,8 +8,9 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/threadpool_imp.h" #include "util/threadpool_imp.h"
#include <algorithm>
#include <atomic> #include "port/port.h"
#include "util/thread_status_util.h"
#ifndef OS_WIN #ifndef OS_WIN
# include <unistd.h> # include <unistd.h>
@ -19,10 +20,12 @@
# include <sys/syscall.h> # include <sys/syscall.h>
#endif #endif
#ifdef OS_FREEBSD #include <algorithm>
# include <stdlib.h> #include <atomic>
#endif #include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
namespace rocksdb { namespace rocksdb {
@ -33,151 +36,154 @@ void ThreadPoolImpl::PthreadCall(const char* label, int result) {
} }
} }
namespace { struct ThreadPoolImpl::Impl {
#ifdef ROCKSDB_STD_THREADPOOL
struct Lock { Impl();
std::unique_lock<std::mutex> ul_; ~Impl();
explicit Lock(std::mutex& m) : ul_(m, std::defer_lock) {}
};
using Condition = std::condition_variable; void JoinThreads(bool wait_for_jobs_to_complete);
inline int ThreadPoolMutexLock(Lock& mutex) { void SetBackgroundThreadsInternal(int num, bool allow_reduce);
mutex.ul_.lock();
return 0;
}
inline unsigned int GetQueueLen() const {
int ConditionWait(Condition& condition, Lock& lock) { return queue_len_.load(std::memory_order_relaxed);
condition.wait(lock.ul_); }
return 0;
}
inline void LowerIOPriority();
int ConditionSignalAll(Condition& condition) {
condition.notify_all();
return 0;
}
inline void WakeUpAllThreads() {
int ConditionSignal(Condition& condition) { bgsignal_.notify_all();
condition.notify_one(); }
return 0;
}
inline void BGThread(size_t thread_id);
int MutexUnlock(Lock& mutex) {
mutex.ul_.unlock();
return 0;
}
inline void StartBGThreads();
void ThreadJoin(std::thread& thread) {
thread.join();
}
inline void Submit(std::function<void()>&& schedule,
int ThreadDetach(std::thread& thread) { std::function<void()>&& unschedule, void* tag);
thread.detach();
return 0;
}
#else int UnSchedule(void* arg);
using Lock = pthread_mutex_t&; void SetHostEnv(Env* env) { env_ = env; }
using Condition = pthread_cond_t&;
inline int ThreadPoolMutexLock(Lock mutex) { Env* GetHostEnv() const { return env_; }
return pthread_mutex_lock(&mutex);
}
inline bool HasExcessiveThread() const {
int ConditionWait(Condition condition, Lock lock) { return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
return pthread_cond_wait(&condition, &lock); }
}
inline // Return true iff the current thread is the excessive thread to terminate.
int ConditionSignalAll(Condition condition) { // Always terminate the running thread that is added last, even if there are
return pthread_cond_broadcast(&condition); // more than one thread to terminate.
} bool IsLastExcessiveThread(size_t thread_id) const {
return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
}
inline bool IsExcessiveThread(size_t thread_id) const {
int ConditionSignal(Condition condition) { return static_cast<int>(thread_id) >= total_threads_limit_;
return pthread_cond_signal(&condition); }
}
inline // Return the thread priority.
int MutexUnlock(Lock mutex) { // This would allow its member-thread to know its priority.
return pthread_mutex_unlock(&mutex); Env::Priority GetThreadPriority() const { return priority_; }
}
inline // Set the thread priority.
void ThreadJoin(pthread_t& thread) { void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
pthread_join(thread, nullptr);
}
inline private:
int ThreadDetach(pthread_t& thread) {
return pthread_detach(thread);
}
#endif
}
ThreadPoolImpl::ThreadPoolImpl() static void* BGThreadWrapper(void* arg);
: total_threads_limit_(1),
bgthreads_(0), bool low_io_priority_;
queue_(), Env::Priority priority_;
Env* env_;
int total_threads_limit_;
std::atomic_uint queue_len_; // Queue length. Used for stats reporting
bool exit_all_threads_;
bool wait_for_jobs_to_complete_;
// Entry per Schedule()/Submit() call
struct BGItem {
void* tag = nullptr;
std::function<void()> function;
std::function<void()> unschedFunction;
};
using BGQueue = std::deque<BGItem>;
BGQueue queue_;
std::mutex mu_;
std::condition_variable bgsignal_;
std::vector<port::Thread> bgthreads_;
};
inline
ThreadPoolImpl::Impl::Impl()
:
low_io_priority_(false),
priority_(Env::LOW),
env_(nullptr),
total_threads_limit_(1),
queue_len_(), queue_len_(),
exit_all_threads_(false), exit_all_threads_(false),
low_io_priority_(false), wait_for_jobs_to_complete_(false),
env_(nullptr) { queue_(),
#ifndef ROCKSDB_STD_THREADPOOL mu_(),
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); bgsignal_(),
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); bgthreads_() {
#endif
} }
ThreadPoolImpl::~ThreadPoolImpl() { assert(bgthreads_.size() == 0U); } inline
ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); }
void ThreadPoolImpl::JoinAllThreads() { void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {
Lock lock(mu_);
PthreadCall("lock", ThreadPoolMutexLock(lock)); std::unique_lock<std::mutex> lock(mu_);
assert(!exit_all_threads_); assert(!exit_all_threads_);
wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;
exit_all_threads_ = true; exit_all_threads_ = true;
PthreadCall("signalall", ConditionSignalAll(bgsignal_));
PthreadCall("unlock", MutexUnlock(lock)); lock.unlock();
bgsignal_.notify_all();
for (auto& th : bgthreads_) { for (auto& th : bgthreads_) {
ThreadJoin(th); th.join();
} }
bgthreads_.clear(); bgthreads_.clear();
exit_all_threads_ = false;
wait_for_jobs_to_complete_ = false;
} }
void ThreadPoolImpl::LowerIOPriority() { inline
#ifdef OS_LINUX void ThreadPoolImpl::Impl::LowerIOPriority() {
PthreadCall("lock", pthread_mutex_lock(&mu_)); std::lock_guard<std::mutex> lock(mu_);
low_io_priority_ = true; low_io_priority_ = true;
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
#endif
} }
void ThreadPoolImpl::BGThread(size_t thread_id) {
void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
bool low_io_priority = false; bool low_io_priority = false;
while (true) { while (true) {
// Wait until there is an item that is ready to run // Wait until there is an item that is ready to run
Lock uniqueLock(mu_); std::unique_lock<std::mutex> lock(mu_);
PthreadCall("lock", ThreadPoolMutexLock(uniqueLock));
// Stop waiting if the thread needs to do work or needs to terminate. // Stop waiting if the thread needs to do work or needs to terminate.
while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
(queue_.empty() || IsExcessiveThread(thread_id))) { (queue_.empty() || IsExcessiveThread(thread_id))) {
PthreadCall("wait", ConditionWait(bgsignal_, uniqueLock)); bgsignal_.wait(lock);
} }
if (exit_all_threads_) { // mechanism to let BG threads exit safely if (exit_all_threads_) { // mechanism to let BG threads exit safely
PthreadCall("unlock", MutexUnlock(uniqueLock));
break; if(!wait_for_jobs_to_complete_ ||
queue_.empty()) {
break;
}
} }
if (IsLastExcessiveThread(thread_id)) { if (IsLastExcessiveThread(thread_id)) {
@ -185,24 +191,24 @@ void ThreadPoolImpl::BGThread(size_t thread_id) {
// We always terminate excessive thread in the reverse order of // We always terminate excessive thread in the reverse order of
// generation time. // generation time.
auto& terminating_thread = bgthreads_.back(); auto& terminating_thread = bgthreads_.back();
PthreadCall("detach", ThreadDetach(terminating_thread)); terminating_thread.detach();
bgthreads_.pop_back(); bgthreads_.pop_back();
if (HasExcessiveThread()) { if (HasExcessiveThread()) {
// There is still at least more excessive thread to terminate. // There is still at least more excessive thread to terminate.
WakeUpAllThreads(); WakeUpAllThreads();
} }
PthreadCall("unlock", MutexUnlock(uniqueLock));
break; break;
} }
void (*function)(void*) = queue_.front().function; auto func = std::move(queue_.front().function);
void* arg = queue_.front().arg;
queue_.pop_front(); queue_.pop_front();
queue_len_.store(static_cast<unsigned int>(queue_.size()), queue_len_.store(static_cast<unsigned int>(queue_.size()),
std::memory_order_relaxed); std::memory_order_relaxed);
bool decrease_io_priority = (low_io_priority != low_io_priority_); bool decrease_io_priority = (low_io_priority != low_io_priority_);
PthreadCall("unlock", MutexUnlock(uniqueLock)); lock.unlock();
#ifdef OS_LINUX #ifdef OS_LINUX
if (decrease_io_priority) { if (decrease_io_priority) {
@ -226,22 +232,22 @@ void ThreadPoolImpl::BGThread(size_t thread_id) {
#else #else
(void)decrease_io_priority; // avoid 'unused variable' error (void)decrease_io_priority; // avoid 'unused variable' error
#endif #endif
(*function)(arg); func();
} }
} }
// Helper struct for passing arguments when creating threads. // Helper struct for passing arguments when creating threads.
struct BGThreadMetadata { struct BGThreadMetadata {
ThreadPoolImpl* thread_pool_; ThreadPoolImpl::Impl* thread_pool_;
size_t thread_id_; // Thread count in the thread. size_t thread_id_; // Thread count in the thread.
BGThreadMetadata(ThreadPoolImpl* thread_pool, size_t thread_id) BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id)
: thread_pool_(thread_pool), thread_id_(thread_id) {} : thread_pool_(thread_pool), thread_id_(thread_id) {}
}; };
static void* BGThreadWrapper(void* arg) { void* ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {
BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg); BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
size_t thread_id = meta->thread_id_; size_t thread_id = meta->thread_id_;
ThreadPoolImpl* tp = meta->thread_pool_; ThreadPoolImpl::Impl* tp = meta->thread_pool_;
#ifdef ROCKSDB_USING_THREAD_STATUS #ifdef ROCKSDB_USING_THREAD_STATUS
// for thread-status // for thread-status
ThreadStatusUtil::RegisterThread( ThreadStatusUtil::RegisterThread(
@ -257,15 +263,11 @@ static void* BGThreadWrapper(void* arg) {
return nullptr; return nullptr;
} }
void ThreadPoolImpl::WakeUpAllThreads() { void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,
PthreadCall("signalall", ConditionSignalAll(bgsignal_)); bool allow_reduce) {
} std::unique_lock<std::mutex> lock(mu_);
void ThreadPoolImpl::SetBackgroundThreadsInternal(int num, bool allow_reduce) {
Lock lock(mu_);
PthreadCall("lock", ThreadPoolMutexLock(lock));
if (exit_all_threads_) { if (exit_all_threads_) {
PthreadCall("unlock", MutexUnlock(lock)); lock.unlock();
return; return;
} }
if (num > total_threads_limit_ || if (num > total_threads_limit_ ||
@ -274,51 +276,36 @@ void ThreadPoolImpl::SetBackgroundThreadsInternal(int num, bool allow_reduce) {
WakeUpAllThreads(); WakeUpAllThreads();
StartBGThreads(); StartBGThreads();
} }
PthreadCall("unlock", MutexUnlock(lock));
}
void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
SetBackgroundThreadsInternal(num, false);
}
void ThreadPoolImpl::SetBackgroundThreads(int num) {
SetBackgroundThreadsInternal(num, true);
} }
void ThreadPoolImpl::StartBGThreads() { void ThreadPoolImpl::Impl::StartBGThreads() {
// Start background thread if necessary // Start background thread if necessary
while ((int)bgthreads_.size() < total_threads_limit_) { while ((int)bgthreads_.size() < total_threads_limit_) {
#ifdef ROCKSDB_STD_THREADPOOL
std::thread p_t(&BGThreadWrapper, port::Thread p_t(&BGThreadWrapper,
new BGThreadMetadata(this, bgthreads_.size())); new BGThreadMetadata(this, bgthreads_.size()));
bgthreads_.push_back(std::move(p_t));
#else
pthread_t t;
PthreadCall("create thread",
pthread_create(&t, nullptr, &BGThreadWrapper,
new BGThreadMetadata(this, bgthreads_.size())));
// Set the thread name to aid debugging // Set the thread name to aid debugging
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
#if __GLIBC_PREREQ(2, 12) #if __GLIBC_PREREQ(2, 12)
auto th_handle = p_t.native_handle();
char name_buf[16]; char name_buf[16];
snprintf(name_buf, sizeof name_buf, "rocksdb:bg%" ROCKSDB_PRIszt, snprintf(name_buf, sizeof name_buf, "rocksdb:bg%" ROCKSDB_PRIszt,
bgthreads_.size()); bgthreads_.size());
name_buf[sizeof name_buf - 1] = '\0'; name_buf[sizeof name_buf - 1] = '\0';
pthread_setname_np(t, name_buf); pthread_setname_np(th_handle, name_buf);
#endif
#endif #endif
bgthreads_.push_back(t);
#endif #endif
bgthreads_.push_back(std::move(p_t));
} }
} }
void ThreadPoolImpl::Schedule(void (*function)(void* arg1), void* arg, void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,
void* tag, void (*unschedFunction)(void* arg)) { std::function<void()>&& unschedule, void* tag) {
Lock lock(mu_);
PthreadCall("lock", ThreadPoolMutexLock(lock)); std::lock_guard<std::mutex> lock(mu_);
if (exit_all_threads_) { if (exit_all_threads_) {
PthreadCall("unlock", MutexUnlock(lock));
return; return;
} }
@ -326,52 +313,133 @@ void ThreadPoolImpl::Schedule(void (*function)(void* arg1), void* arg,
// Add to priority queue // Add to priority queue
queue_.push_back(BGItem()); queue_.push_back(BGItem());
queue_.back().function = function;
queue_.back().arg = arg; auto& item = queue_.back();
queue_.back().tag = tag; item.tag = tag;
queue_.back().unschedFunction = unschedFunction; item.function = std::move(schedule);
item.unschedFunction = std::move(unschedule);
queue_len_.store(static_cast<unsigned int>(queue_.size()), queue_len_.store(static_cast<unsigned int>(queue_.size()),
std::memory_order_relaxed); std::memory_order_relaxed);
if (!HasExcessiveThread()) { if (!HasExcessiveThread()) {
// Wake up at least one waiting thread. // Wake up at least one waiting thread.
PthreadCall("signal", ConditionSignal(bgsignal_)); bgsignal_.notify_one();
} else { } else {
// Need to wake up all threads to make sure the one woken // Need to wake up all threads to make sure the one woken
// up is not the one to terminate. // up is not the one to terminate.
WakeUpAllThreads(); WakeUpAllThreads();
} }
PthreadCall("unlock", MutexUnlock(lock));
} }
int ThreadPoolImpl::UnSchedule(void* arg) { int ThreadPoolImpl::Impl::UnSchedule(void* arg) {
int count = 0; int count = 0;
Lock lock(mu_); std::vector<std::function<void()>> candidates;
PthreadCall("lock", ThreadPoolMutexLock(lock)); {
std::lock_guard<std::mutex> lock(mu_);
// Remove from priority queue
BGQueue::iterator it = queue_.begin(); // Remove from priority queue
while (it != queue_.end()) { BGQueue::iterator it = queue_.begin();
if (arg == (*it).tag) { while (it != queue_.end()) {
void (*unschedFunction)(void*) = (*it).unschedFunction; if (arg == (*it).tag) {
void* arg1 = (*it).arg; if (it->unschedFunction) {
if (unschedFunction != nullptr) { candidates.push_back(std::move(it->unschedFunction));
(*unschedFunction)(arg1); }
it = queue_.erase(it);
count++;
} else {
++it;
} }
it = queue_.erase(it);
count++;
} else {
++it;
} }
queue_len_.store(static_cast<unsigned int>(queue_.size()),
std::memory_order_relaxed);
} }
queue_len_.store(static_cast<unsigned int>(queue_.size()),
std::memory_order_relaxed);
PthreadCall("unlock", MutexUnlock(lock)); // Run unschedule functions outside the mutex
for (auto& f : candidates) {
f();
}
return count; return count;
} }
ThreadPoolImpl::ThreadPoolImpl() :
impl_(new Impl()) {
}
ThreadPoolImpl::~ThreadPoolImpl() {
}
void ThreadPoolImpl::JoinAllThreads() {
impl_->JoinThreads(false);
}
void ThreadPoolImpl::SetBackgroundThreads(int num) {
impl_->SetBackgroundThreadsInternal(num, true);
}
unsigned int ThreadPoolImpl::GetQueueLen() const {
return impl_->GetQueueLen();
}
void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() {
impl_->JoinThreads(true);
}
void ThreadPoolImpl::LowerIOPriority() {
impl_->LowerIOPriority();
}
void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
impl_->SetBackgroundThreadsInternal(num, false);
}
void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) {
auto copy(job);
impl_->Submit(std::move(copy), std::function<void()>(), nullptr);
}
void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) {
impl_->Submit(std::move(job), std::function<void()>(), nullptr);
}
void ThreadPoolImpl::Schedule(void(*function)(void* arg1), void* arg,
void* tag, void(*unschedFunction)(void* arg)) {
std::function<void()> fn = [arg, function] { function(arg); };
std::function<void()> unfn;
if (unschedFunction != nullptr) {
auto uf = [arg, unschedFunction] { unschedFunction(arg); };
unfn = std::move(uf);
}
impl_->Submit(std::move(fn), std::move(unfn), tag);
}
int ThreadPoolImpl::UnSchedule(void* arg) {
return impl_->UnSchedule(arg);
}
void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); }
Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); }
// Return the thread priority.
// This would allow its member-thread to know its priority.
Env::Priority ThreadPoolImpl::GetThreadPriority() const {
return impl_->GetThreadPriority();
}
// Set the thread priority.
void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) {
impl_->SetThreadPriority(priority);
}
ThreadPool* NewThreadPool(int num_threads) { ThreadPool* NewThreadPool(int num_threads) {
ThreadPoolImpl* thread_pool = new ThreadPoolImpl(); ThreadPoolImpl* thread_pool = new ThreadPoolImpl();
thread_pool->SetBackgroundThreads(num_threads); thread_pool->SetBackgroundThreads(num_threads);

@ -8,104 +8,100 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once #pragma once
#ifdef OS_WIN
# define ROCKSDB_STD_THREADPOOL
#endif
#include "rocksdb/env.h"
#include "rocksdb/threadpool.h" #include "rocksdb/threadpool.h"
#include "util/thread_status_util.h" #include "rocksdb/env.h"
#ifdef ROCKSDB_STD_THREADPOOL
# include <thread>
# include <mutex>
# include <condition_variable>
#endif
#include <atomic> #include <memory>
#include <vector> #include <functional>
namespace rocksdb { namespace rocksdb {
class ThreadPoolImpl : public ThreadPool { class ThreadPoolImpl : public ThreadPool {
public: public:
ThreadPoolImpl(); ThreadPoolImpl();
~ThreadPoolImpl(); ~ThreadPoolImpl();
ThreadPoolImpl(ThreadPoolImpl&&) = delete;
ThreadPoolImpl& operator=(ThreadPoolImpl&&) = delete;
// Implement ThreadPool interfaces
// Wait for all threads to finish.
// Discards all the the jobs that did not
// start executing and waits for those running
// to complete
void JoinAllThreads() override; void JoinAllThreads() override;
// Set the number of background threads that will be executing the
// scheduled jobs.
void SetBackgroundThreads(int num) override;
// Get the number of jobs scheduled in the ThreadPool queue.
unsigned int GetQueueLen() const override;
// Waits for all jobs to complete those
// that already started running and those that did not
// start yet
void WaitForJobsAndJoinAllThreads() override;
// Make threads to run at a lower kernel priority
// Currently only has effect on Linux
void LowerIOPriority(); void LowerIOPriority();
void BGThread(size_t thread_id);
void WakeUpAllThreads(); // Ensure there is at aleast num threads in the pool
// but do not kill threads if there are more
void IncBackgroundThreadsIfNeeded(int num); void IncBackgroundThreadsIfNeeded(int num);
void SetBackgroundThreads(int num) override;
void StartBGThreads();
void Schedule(void (*function)(void* arg1), void* arg, void* tag,
void (*unschedFunction)(void* arg));
int UnSchedule(void* arg);
unsigned int GetQueueLen() const override { // Submit a fire and forget job
return queue_len_.load(std::memory_order_relaxed); // These jobs can not be unscheduled
}
void SetHostEnv(Env* env) { env_ = env; } // This allows to submit the same job multiple times
Env* GetHostEnv() const { return env_; } void SubmitJob(const std::function<void()>&) override;
// This moves the function in for efficiency
void SubmitJob(std::function<void()>&&) override;
// Return true if there is at least one thread needs to terminate. // Schedule a job with an unschedule tag and unschedule function
bool HasExcessiveThread() const { // Can be used to filter and unschedule jobs by a tag
return static_cast<int>(bgthreads_.size()) > total_threads_limit_; // that are still in the queue and did not start running
} void Schedule(void (*function)(void* arg1), void* arg, void* tag,
void (*unschedFunction)(void* arg));
// Filter jobs that are still in a queue and match
// the given tag. Remove them from a queue if any
// and for each such job execute an unschedule function
// if such was given at scheduling time.
int UnSchedule(void* tag);
// Return true iff the current thread is the excessive thread to terminate. void SetHostEnv(Env* env);
// Always terminate the running thread that is added last, even if there are
// more than one thread to terminate.
bool IsLastExcessiveThread(size_t thread_id) const {
return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
}
// Is one of the threads to terminate. Env* GetHostEnv() const;
bool IsExcessiveThread(size_t thread_id) const {
return static_cast<int>(thread_id) >= total_threads_limit_;
}
// Return the thread priority. // Return the thread priority.
// This would allow its member-thread to know its priority. // This would allow its member-thread to know its priority.
Env::Priority GetThreadPriority() const { return priority_; } Env::Priority GetThreadPriority() const;
// Set the thread priority. // Set the thread priority.
void SetThreadPriority(Env::Priority priority) { priority_ = priority; } void SetThreadPriority(Env::Priority priority);
static void PthreadCall(const char* label, int result); static void PthreadCall(const char* label, int result);
struct Impl;
private: private:
// Entry per Schedule() call
struct BGItem { // Current public virtual interface does not provide usable
void* arg; // functionality and thus can not be used internally to
void (*function)(void*); // facade different implementations.
void* tag; //
void (*unschedFunction)(void*); // We propose a pimpl idiom in order to easily replace the thread pool impl
}; // w/o touching the header file but providing a different .cc potentially
// CMake option driven.
typedef std::deque<BGItem> BGQueue; //
// Another option is to introduce a Env::MakeThreadPool() virtual interface
int total_threads_limit_; // and override the environment. This would require refactoring ThreadPool usage.
//
#ifdef ROCKSDB_STD_THREADPOOL // We can also combine these two approaches
std::mutex mu_; std::unique_ptr<Impl> impl_;
std::condition_variable bgsignal_;
std::vector<std::thread> bgthreads_;
#else
pthread_mutex_t mu_;
pthread_cond_t bgsignal_;
std::vector<pthread_t> bgthreads_;
#endif
BGQueue queue_;
std::atomic_uint queue_len_; // Queue length. Used for stats reporting
bool exit_all_threads_;
bool low_io_priority_;
Env::Priority priority_;
Env* env_;
void SetBackgroundThreadsInternal(int num, bool allow_reduce);
}; };
} // namespace rocksdb } // namespace rocksdb

@ -425,7 +425,7 @@ class BackupEngineImpl : public BackupEngine {
bool initialized_; bool initialized_;
std::mutex byte_report_mutex_; std::mutex byte_report_mutex_;
channel<CopyOrCreateWorkItem> files_to_copy_or_create_; channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
std::vector<std::thread> threads_; std::vector<port::Thread> threads_;
// Adds a file to the backup work queue to be copied or created if it doesn't // Adds a file to the backup work queue to be copied or created if it doesn't
// already exist. // already exist.

@ -1326,7 +1326,7 @@ TEST_F(BackupableDBTest, ChangeManifestDuringBackupCreation) {
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread flush_thread{[this]() { ASSERT_OK(db_->Flush(FlushOptions())); }}; rocksdb::port::Thread flush_thread{[this]() { ASSERT_OK(db_->Flush(FlushOptions())); }};
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));

@ -18,6 +18,7 @@
#include <utility> #include <utility>
#include "db/db_impl.h" #include "db/db_impl.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/utilities/checkpoint.h" #include "rocksdb/utilities/checkpoint.h"
@ -298,7 +299,7 @@ TEST_F(CheckpointTest, CheckpointCF) {
Status s; Status s;
// Take a snapshot // Take a snapshot
std::thread t([&]() { rocksdb::port::Thread t([&]() {
Checkpoint* checkpoint; Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name)); ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name));
@ -368,7 +369,7 @@ TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing) {
"CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}}); "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread t([&]() { rocksdb::port::Thread t([&]() {
Checkpoint* checkpoint; Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(kSnapshotName)); ASSERT_OK(checkpoint->CreateCheckpoint(kSnapshotName));
@ -451,7 +452,7 @@ TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing2PC) {
{"CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit", {"CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit",
"CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}}); "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread t([&]() { rocksdb::port::Thread t([&]() {
Checkpoint* checkpoint; Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(txdb, &checkpoint)); ASSERT_OK(Checkpoint::Create(txdb, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(kSnapshotName)); ASSERT_OK(checkpoint->CreateCheckpoint(kSnapshotName));

@ -10,6 +10,7 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "port/port.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/sync_point.h" #include "util/sync_point.h"
#include "utilities/persistent_cache/block_cache_tier_file.h" #include "utilities/persistent_cache/block_cache_tier_file.h"
@ -68,7 +69,7 @@ Status BlockCacheTier::Open() {
if (opt_.pipeline_writes) { if (opt_.pipeline_writes) {
assert(!insert_th_.joinable()); assert(!insert_th_.joinable());
insert_th_ = std::thread(&BlockCacheTier::InsertMain, this); insert_th_ = port::Thread(&BlockCacheTier::InsertMain, this);
} }
return Status::OK(); return Status::OK();

@ -138,7 +138,7 @@ class BlockCacheTier : public PersistentCacheTier {
port::RWMutex lock_; // Synchronization port::RWMutex lock_; // Synchronization
const PersistentCacheConfig opt_; // BlockCache options const PersistentCacheConfig opt_; // BlockCache options
BoundedQueue<InsertOp> insert_ops_; // Ops waiting for insert BoundedQueue<InsertOp> insert_ops_; // Ops waiting for insert
std::thread insert_th_; // Insert thread rocksdb::port::Thread insert_th_; // Insert thread
uint32_t writer_cache_id_ = 0; // Current cache file identifier uint32_t writer_cache_id_ = 0; // Current cache file identifier
WriteableCacheFile* cache_file_ = nullptr; // Current cache file reference WriteableCacheFile* cache_file_ = nullptr; // Current cache file reference
CacheWriteBufferAllocator buffer_allocator_; // Buffer provider CacheWriteBufferAllocator buffer_allocator_; // Buffer provider

@ -14,6 +14,7 @@
#include <vector> #include <vector>
#include "util/crc32c.h" #include "util/crc32c.h"
#include "port/port.h"
namespace rocksdb { namespace rocksdb {
@ -519,7 +520,7 @@ ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache,
const size_t qdepth, const size_t io_size) const size_t qdepth, const size_t io_size)
: Writer(cache), io_size_(io_size) { : Writer(cache), io_size_(io_size) {
for (size_t i = 0; i < qdepth; ++i) { for (size_t i = 0; i < qdepth; ++i) {
std::thread th(&ThreadedWriter::ThreadMain, this); port::Thread th(&ThreadedWriter::ThreadMain, this);
threads_.push_back(std::move(th)); threads_.push_back(std::move(th));
} }
} }

@ -284,7 +284,7 @@ class ThreadedWriter : public Writer {
const size_t io_size_ = 0; const size_t io_size_ = 0;
BoundedQueue<IO> q_; BoundedQueue<IO> q_;
std::vector<std::thread> threads_; std::vector<port::Thread> threads_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -17,7 +17,7 @@
#include "utilities/persistent_cache/persistent_cache_tier.h" #include "utilities/persistent_cache/persistent_cache_tier.h"
#include "utilities/persistent_cache/volatile_tier_impl.h" #include "utilities/persistent_cache/volatile_tier_impl.h"
#include "port/port_posix.h" #include "port/port.h"
#include "table/block_builder.h" #include "table/block_builder.h"
#include "util/histogram.h" #include "util/histogram.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
@ -116,7 +116,7 @@ class CacheTierBenchmark {
stats_.Clear(); stats_.Clear();
// Start IO threads // Start IO threads
std::list<std::thread> threads; std::list<port::Thread> threads;
Spawn(FLAGS_nthread_write, &threads, Spawn(FLAGS_nthread_write, &threads,
std::bind(&CacheTierBenchmark::Write, this)); std::bind(&CacheTierBenchmark::Write, this));
Spawn(FLAGS_nthread_read, &threads, Spawn(FLAGS_nthread_read, &threads,
@ -252,7 +252,7 @@ class CacheTierBenchmark {
} }
// spawn threads // spawn threads
void Spawn(const size_t n, std::list<std::thread>* threads, void Spawn(const size_t n, std::list<port::Thread>* threads,
const std::function<void()>& fn) { const std::function<void()>& fn) {
for (size_t i = 0; i < n; ++i) { for (size_t i = 0; i < n; ++i) {
threads->emplace_back(fn); threads->emplace_back(fn);
@ -260,7 +260,7 @@ class CacheTierBenchmark {
} }
// join threads // join threads
void Join(std::list<std::thread>* threads) { void Join(std::list<port::Thread>* threads) {
for (auto& th : *threads) { for (auto& th : *threads) {
th.join(); th.join();
} }

@ -21,6 +21,7 @@
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "table/block_builder.h" #include "table/block_builder.h"
#include "port/port.h"
#include "util/arena.h" #include "util/arena.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "utilities/persistent_cache/volatile_tier_impl.h" #include "utilities/persistent_cache/volatile_tier_impl.h"
@ -50,17 +51,17 @@ class PersistentCacheTierTest : public testing::Test {
// create threaded workload // create threaded workload
template <class T> template <class T>
std::list<std::thread> SpawnThreads(const size_t n, const T& fn) { std::list<port::Thread> SpawnThreads(const size_t n, const T& fn) {
std::list<std::thread> threads; std::list<port::Thread> threads;
for (size_t i = 0; i < n; i++) { for (size_t i = 0; i < n; i++) {
std::thread th(fn); port::Thread th(fn);
threads.push_back(std::move(th)); threads.push_back(std::move(th));
} }
return threads; return threads;
} }
// Wait for threads to join // Wait for threads to join
void Join(std::list<std::thread>&& threads) { void Join(std::list<port::Thread>&& threads) {
for (auto& th : threads) { for (auto& th : threads) {
th.join(); th.join();
} }

@ -31,6 +31,7 @@
#include "rocksdb/utilities/stackable_db.h" #include "rocksdb/utilities/stackable_db.h"
#include "util/coding.h" #include "util/coding.h"
#include "utilities/spatialdb/utils.h" #include "utilities/spatialdb/utils.h"
#include "port/port.h"
namespace rocksdb { namespace rocksdb {
namespace spatial { namespace spatial {
@ -603,7 +604,7 @@ class SpatialDBImpl : public SpatialDB {
Status s; Status s;
int threads_running = 0; int threads_running = 0;
std::vector<std::thread> threads; std::vector<port::Thread> threads;
for (auto cfh : column_families) { for (auto cfh : column_families) {
threads.emplace_back([&, cfh] { threads.emplace_back([&, cfh] {

@ -17,6 +17,7 @@
#include "util/random.h" #include "util/random.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/transaction_test_util.h" #include "util/transaction_test_util.h"
#include "port/port.h"
using std::string; using std::string;
@ -1326,7 +1327,7 @@ TEST_F(OptimisticTransactionTest, OptimisticTransactionStressTest) {
// Setting the key-space to be 100 keys should cause enough write-conflicts // Setting the key-space to be 100 keys should cause enough write-conflicts
// to make this test interesting. // to make this test interesting.
std::vector<std::thread> threads; std::vector<port::Thread> threads;
std::function<void()> call_inserter = [&] { std::function<void()> call_inserter = [&] {
ASSERT_OK(OptimisticTransactionStressTestInserter( ASSERT_OK(OptimisticTransactionStressTestInserter(

@ -27,6 +27,8 @@
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend.h" #include "utilities/merge_operators/string_append/stringappend.h"
#include "port/port.h"
using std::string; using std::string;
namespace rocksdb { namespace rocksdb {
@ -386,7 +388,7 @@ TEST_P(TransactionTest, DeadlockCycleShared) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// We want the leaf transactions to block and hold everyone back. // We want the leaf transactions to block and hold everyone back.
std::vector<std::thread> threads; std::vector<port::Thread> threads;
for (uint32_t i = 0; i < 15; i++) { for (uint32_t i = 0; i < 15; i++) {
std::function<void()> blocking_thread = [&, i] { std::function<void()> blocking_thread = [&, i] {
auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr, auto s = txns[i]->GetForUpdate(read_options, ToString(i + 1), nullptr,
@ -455,7 +457,7 @@ TEST_P(TransactionTest, DeadlockCycle) {
// We want the last transaction in the chain to block and hold everyone // We want the last transaction in the chain to block and hold everyone
// back. // back.
std::vector<std::thread> threads; std::vector<port::Thread> threads;
for (uint32_t i = 0; i < len - 1; i++) { for (uint32_t i = 0; i < len - 1; i++) {
std::function<void()> blocking_thread = [&, i] { std::function<void()> blocking_thread = [&, i] {
auto s = auto s =
@ -534,7 +536,7 @@ TEST_P(TransactionTest, DeadlockStress) {
} }
}; };
std::vector<std::thread> threads; std::vector<port::Thread> threads;
for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) { for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
threads.emplace_back(stress_thread, rnd.Next()); threads.emplace_back(stress_thread, rnd.Next());
} }
@ -1034,7 +1036,7 @@ TEST_P(TransactionTest, TwoPhaseMultiThreadTest) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// do all the writes // do all the writes
std::vector<std::thread> threads; std::vector<port::Thread> threads;
for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) { for (uint32_t i = 0; i < NUM_TXN_THREADS; i++) {
threads.emplace_back(txn_write_thread); threads.emplace_back(txn_write_thread);
} }
@ -4446,7 +4448,7 @@ TEST_P(TransactionTest, TransactionStressTest) {
// Setting the key-space to be 100 keys should cause enough write-conflicts // Setting the key-space to be 100 keys should cause enough write-conflicts
// to make this test interesting. // to make this test interesting.
std::vector<std::thread> threads; std::vector<port::Thread> threads;
std::function<void()> call_inserter = [&] { std::function<void()> call_inserter = [&] {
ASSERT_OK(TransactionStressTestInserter(db, num_transactions_per_thread, ASSERT_OK(TransactionStressTestInserter(db, num_transactions_per_thread,

Loading…
Cancel
Save