diff --git a/CMakeLists.txt b/CMakeLists.txt index a8eb39783..49683925a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -597,6 +597,7 @@ set(SOURCES util/comparator.cc util/compression_context_cache.cc util/concurrent_arena.cc + util/concurrent_task_limiter_impl.cc util/crc32c.cc util/delete_scheduler.cc util/dynamic_bloom.cc diff --git a/TARGETS b/TARGETS index 246c2efee..cc84ef1bb 100644 --- a/TARGETS +++ b/TARGETS @@ -220,6 +220,7 @@ cpp_library( "util/comparator.cc", "util/compression_context_cache.cc", "util/concurrent_arena.cc", + "util/concurrent_task_limiter_impl.cc", "util/crc32c.cc", "util/delete_scheduler.cc", "util/dynamic_bloom.cc", diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 743ec33ba..b27e46cf5 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -10,8 +10,10 @@ #include "db/db_test_util.h" #include "port/port.h" #include "port/stack_trace.h" +#include "rocksdb/concurrent_task_limiter.h" #include "rocksdb/experimental.h" #include "rocksdb/utilities/convenience.h" +#include "util/concurrent_task_limiter_impl.h" #include "util/fault_injection_test_env.h" #include "util/sync_point.h" @@ -3890,6 +3892,193 @@ TEST_F(DBCompactionTest, CompactionHasEmptyOutput) { ASSERT_EQ(2, collector->num_ssts_creation_started()); } +TEST_F(DBCompactionTest, CompactionLimiter) { + const int kNumKeysPerFile = 10; + const int kMaxBackgroundThreads = 64; + + struct CompactionLimiter { + std::string name; + int limit_tasks; + int max_tasks; + int tasks; + std::shared_ptr limiter; + }; + + std::vector limiter_settings; + limiter_settings.push_back({"limiter_1", 1, 0, 0, nullptr}); + limiter_settings.push_back({"limiter_2", 2, 0, 0, nullptr}); + limiter_settings.push_back({"limiter_3", 3, 0, 0, nullptr}); + + for (auto& ls : limiter_settings) { + ls.limiter.reset(NewConcurrentTaskLimiter(ls.name, ls.limit_tasks)); + } + + std::shared_ptr unique_limiter( + NewConcurrentTaskLimiter("unique_limiter", -1)); + + const char* cf_names[] = {"default", "0", "1", "2", "3", "4", "5", + "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" }; + const int cf_count = sizeof cf_names / sizeof cf_names[0]; + + std::unordered_map cf_to_limiter; + + Options options = CurrentOptions(); + options.write_buffer_size = 110 * 1024; // 110KB + options.arena_block_size = 4096; + options.num_levels = 3; + options.level0_file_num_compaction_trigger = 4; + options.level0_slowdown_writes_trigger = 64; + options.level0_stop_writes_trigger = 64; + options.max_background_jobs = kMaxBackgroundThreads; // Enough threads + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + options.max_write_buffer_number = 10; // Enough memtables + DestroyAndReopen(options); + + std::vector option_vector; + option_vector.reserve(cf_count); + + for (int cf = 0; cf < cf_count; cf++) { + ColumnFamilyOptions cf_opt(options); + if (cf == 0) { + // "Default" CF does't use compaction limiter + cf_opt.compaction_thread_limiter = nullptr; + } else if (cf == 1) { + // "1" CF uses bypass compaction limiter + unique_limiter->SetMaxOutstandingTask(-1); + cf_opt.compaction_thread_limiter = unique_limiter; + } else { + // Assign limiter by mod + auto& ls = limiter_settings[cf % 3]; + cf_opt.compaction_thread_limiter = ls.limiter; + cf_to_limiter[cf_names[cf]] = &ls; + } + option_vector.emplace_back(DBOptions(options), cf_opt); + } + + for (int cf = 1; cf < cf_count; cf++) { + CreateColumnFamilies({cf_names[cf]}, option_vector[cf]); + } + + ReopenWithColumnFamilies(std::vector(cf_names, + cf_names + cf_count), + option_vector); + + port::Mutex mutex; + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:BeforeCompaction", [&](void* arg) { + const auto& cf_name = static_cast(arg)->GetName(); + auto iter = cf_to_limiter.find(cf_name); + if (iter != cf_to_limiter.end()) { + MutexLock l(&mutex); + ASSERT_GE(iter->second->limit_tasks, ++iter->second->tasks); + iter->second->max_tasks = std::max(iter->second->max_tasks, + iter->second->limit_tasks); + } + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:AfterCompaction", [&](void* arg) { + const auto& cf_name = static_cast(arg)->GetName(); + auto iter = cf_to_limiter.find(cf_name); + if (iter != cf_to_limiter.end()) { + MutexLock l(&mutex); + ASSERT_GE(--iter->second->tasks, 0); + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Block all compact threads in thread pool. + const size_t kTotalFlushTasks = kMaxBackgroundThreads / 4; + const size_t kTotalCompactTasks = kMaxBackgroundThreads - kTotalFlushTasks; + env_->SetBackgroundThreads((int)kTotalFlushTasks, Env::HIGH); + env_->SetBackgroundThreads((int)kTotalCompactTasks, Env::LOW); + + test::SleepingBackgroundTask sleeping_compact_tasks[kTotalCompactTasks]; + + // Block all compaction threads in thread pool. + for (size_t i = 0; i < kTotalCompactTasks; i++) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_compact_tasks[i], Env::LOW); + sleeping_compact_tasks[i].WaitUntilSleeping(); + } + + int keyIndex = 0; + + for (int n = 0; n < options.level0_file_num_compaction_trigger; n++) { + for (int cf = 0; cf < cf_count; cf++) { + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(cf, Key(keyIndex++), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(cf, "", "")); + } + + for (int cf = 0; cf < cf_count; cf++) { + dbfull()->TEST_WaitForFlushMemTable(handles_[cf]); + } + } + + // Enough L0 files to trigger compaction + for (int cf = 0; cf < cf_count; cf++) { + ASSERT_EQ(NumTableFilesAtLevel(0, cf), + options.level0_file_num_compaction_trigger); + } + + // Create more files for one column family, which triggers speed up + // condition, all compactions will be scheduled. + for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(0, Key(i), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(0, "", "")); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1, + NumTableFilesAtLevel(0, 0)); + } + + // All CFs are pending compaction + ASSERT_EQ(cf_count, env_->GetThreadPoolQueueLen(Env::LOW)); + + // Unblock all compaction threads + for (size_t i = 0; i < kTotalCompactTasks; i++) { + sleeping_compact_tasks[i].WakeUp(); + sleeping_compact_tasks[i].WaitUntilDone(); + } + + for (int cf = 0; cf < cf_count; cf++) { + dbfull()->TEST_WaitForFlushMemTable(handles_[cf]); + } + + dbfull()->TEST_WaitForCompact(); + + // Max outstanding compact tasks reached limit + for (auto& ls : limiter_settings) { + ASSERT_EQ(ls.limit_tasks, ls.max_tasks); + ASSERT_EQ(0, ls.limiter->GetOutstandingTask()); + } + + // test manual compaction under a fully throttled limiter + int cf_test = 1; + unique_limiter->SetMaxOutstandingTask(0); + + // flush one more file to cf 1 + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(cf_test, Key(keyIndex++), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(cf_test, "", "")); + + dbfull()->TEST_WaitForFlushMemTable(handles_[cf_test]); + ASSERT_EQ(1, NumTableFilesAtLevel(0, cf_test)); + + Compact(cf_test, Key(0), Key(keyIndex)); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(0, unique_limiter->GetOutstandingTask()); +} + INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam, ::testing::Values(std::make_tuple(1, true), std::make_tuple(1, false), diff --git a/db/db_impl.h b/db/db_impl.h index 2cabe756a..1147b2929 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -65,6 +65,7 @@ class Arena; class ArenaWrappedDBIter; class MemTable; class TableCache; +class TaskLimiterToken; class Version; class VersionEdit; class VersionSet; @@ -1106,9 +1107,18 @@ class DBImpl : public DB { const std::vector& inputs, bool* sfm_bookkeeping, LogBuffer* log_buffer); + // Request compaction tasks token from compaction thread limiter. + // It always succeeds if force = true or limiter is disable. + bool RequestCompactionToken(ColumnFamilyData* cfd, bool force, + std::unique_ptr* token, + LogBuffer* log_buffer); + // Schedule background tasks void StartTimedTasks(); + void SubtractCompactionTask(const std::string& device_name, + LogBuffer* log_buffer); + void PrintStatistics(); // dump rocksdb.stats to LOG @@ -1129,6 +1139,10 @@ class DBImpl : public DB { ColumnFamilyData* PopFirstFromCompactionQueue(); FlushRequest PopFirstFromFlushQueue(); + // Pick the first unthrottled compaction with task token from queue. + ColumnFamilyData* PickCompactionFromQueue( + std::unique_ptr* token, LogBuffer* log_buffer); + // helper function to call after some of the logs_ were synced void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status); @@ -1422,6 +1436,8 @@ class DBImpl : public DB { // caller retains ownership of `manual_compaction_state` as it is reused // across background compactions. ManualCompactionState* manual_compaction_state; // nullptr if non-manual + // task limiter token is requested during compaction picking. + std::unique_ptr task_token; }; std::deque manual_compaction_dequeue_; diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 1c32ec675..c2623161f 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -20,6 +20,7 @@ #include "monitoring/perf_context_imp.h" #include "monitoring/thread_status_updater.h" #include "monitoring/thread_status_util.h" +#include "util/concurrent_task_limiter_impl.h" #include "util/sst_file_manager_impl.h" #include "util/sync_point.h" @@ -59,6 +60,27 @@ bool DBImpl::EnoughRoomForCompaction( return enough_room; } +bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force, + std::unique_ptr* token, + LogBuffer* log_buffer) { + assert(*token == nullptr); + auto limiter = static_cast( + cfd->ioptions()->compaction_thread_limiter.get()); + if (limiter == nullptr) { + return true; + } + *token = limiter->GetToken(force); + if (*token != nullptr) { + ROCKS_LOG_BUFFER(log_buffer, + "Thread limiter [%s] increase [%s] compaction task, " + "force: %s, tasks after: %d", + limiter->GetName().c_str(), cfd->GetName().c_str(), + force ? "true" : "false", limiter->GetOutstandingTask()); + return true; + } + return false; +} + Status DBImpl::SyncClosedLogs(JobContext* job_context) { TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); mutex_.AssertHeld(); @@ -1354,6 +1376,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual compaction starting", cfd->GetName().c_str()); + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, + immutable_db_options_.info_log.get()); // We don't check bg_error_ here, because if we get the error in compaction, // the compaction will set manual.status to bg_error_ and set manual.done to // true. @@ -1391,6 +1415,12 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, ca->prepicked_compaction = new PrepickedCompaction; ca->prepicked_compaction->manual_compaction_state = &manual; ca->prepicked_compaction->compaction = compaction; + if (!RequestCompactionToken(cfd, true, + &ca->prepicked_compaction->task_token, + &log_buffer)) { + // Don't throttle manual compaction, only count outstanding tasks. + assert(false); + } manual.incomplete = false; bg_compaction_scheduled_++; env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, @@ -1399,6 +1429,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, } } + log_buffer.FlushBufferToLog(); assert(!manual.in_progress); assert(HasPendingManualCompaction()); RemoveManualCompaction(&manual); @@ -1824,6 +1855,32 @@ DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() { return flush_req; } +ColumnFamilyData* DBImpl::PickCompactionFromQueue( + std::unique_ptr* token, LogBuffer* log_buffer) { + assert(!compaction_queue_.empty()); + assert(*token == nullptr); + autovector throttled_candidates; + ColumnFamilyData* cfd = nullptr; + while (!compaction_queue_.empty()) { + auto first_cfd = *compaction_queue_.begin(); + compaction_queue_.pop_front(); + assert(first_cfd->queued_for_compaction()); + if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) { + throttled_candidates.push_back(first_cfd); + continue; + } + cfd = first_cfd; + cfd->set_queued_for_compaction(false); + break; + } + // Add throttled compaction candidates back to queue in the original order. + for (auto iter = throttled_candidates.rbegin(); + iter != throttled_candidates.rend(); ++iter) { + compaction_queue_.push_front(*iter); + } + return cfd; +} + void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, FlushReason flush_reason) { if (flush_req.empty()) { @@ -2081,7 +2138,12 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer, prepicked_compaction); TEST_SYNC_POINT("BackgroundCallCompaction:1"); - if (!s.ok() && !s.IsShutdownInProgress()) { + if (s.IsBusy()) { + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + mutex_.Unlock(); + env_->SleepForMicroseconds(10000); // prevent hot loop + mutex_.Lock(); + } else if (!s.ok() && !s.IsShutdownInProgress()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to // chew up resources for failed compactions for the duration of @@ -2216,6 +2278,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, manual_compaction->in_progress = true; } + std::unique_ptr task_token; + // InternalKey manual_end_storage; // InternalKey* manual_end = &manual_end_storage; bool sfm_reserved_compact_space = false; @@ -2267,17 +2331,23 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, return Status::OK(); } - // cfd is referenced here - auto cfd = PopFirstFromCompactionQueue(); + auto cfd = PickCompactionFromQueue(&task_token, log_buffer); + if (cfd == nullptr) { + // Can't find any executable task from the compaction queue. + // All tasks have been throttled by compaction thread limiter. + ++unscheduled_compactions_; + return Status::Busy(); + } + // We unreference here because the following code will take a Ref() on // this cfd if it is going to use it (Compaction class holds a // reference). // This will all happen under a mutex so we don't have to be afraid of // somebody else deleting it. if (cfd->Unref()) { - delete cfd; // This was the last reference of the column family, so no need to // compact. + delete cfd; return Status::OK(); } @@ -2347,6 +2417,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } else if (c->deletion_compaction()) { // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old // file if there is alive snapshot pointing to it + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction", + c->column_family_data()); assert(c->num_input_files(1) == 0); assert(c->level() == 0); assert(c->column_family_data()->ioptions()->compaction_style == @@ -2370,8 +2442,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, c->column_family_data()->GetName().c_str(), c->num_input_files(0)); *made_progress = true; + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction", + c->column_family_data()); } else if (!trivial_move_disallowed && c->IsTrivialMove()) { TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove"); + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction", + c->column_family_data()); // Instrument for event update // TODO(yhchiang): add op details for showing trivial-move. ThreadStatusUtil::SetColumnFamily( @@ -2437,6 +2513,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // Clear Instrument ThreadStatusUtil::ResetThreadStatus(); + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction", + c->column_family_data()); } else if (!is_prepicked && c->output_level() > 0 && c->output_level() == c->column_family_data() @@ -2454,10 +2532,14 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ca->prepicked_compaction = new PrepickedCompaction; ca->prepicked_compaction->compaction = c.release(); ca->prepicked_compaction->manual_compaction_state = nullptr; + // Transfer requested token, so it doesn't need to do it again. + ca->prepicked_compaction->task_token = std::move(task_token); ++bg_bottom_compaction_scheduled_; env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM, this, &DBImpl::UnscheduleCallback); } else { + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction", + c->column_family_data()); int output_level __attribute__((__unused__)); output_level = c->output_level(); TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial", @@ -2498,6 +2580,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, *c->mutable_cf_options(), FlushReason::kAutoCompaction); } *made_progress = true; + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction", + c->column_family_data()); } if (c != nullptr) { c->ReleaseCompactionFiles(status); diff --git a/include/rocksdb/concurrent_task_limiter.h b/include/rocksdb/concurrent_task_limiter.h new file mode 100644 index 000000000..35ca1149c --- /dev/null +++ b/include/rocksdb/concurrent_task_limiter.h @@ -0,0 +1,47 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "rocksdb/env.h" +#include "rocksdb/statistics.h" + +namespace rocksdb { + +class ConcurrentTaskLimiter { + public: + + virtual ~ConcurrentTaskLimiter() {} + + // Returns a name that identifies this concurrent task limiter. + virtual const std::string& GetName() const = 0; + + // Set max concurrent tasks. + // limit = 0 means no new task allowed. + // limit < 0 means no limitation. + virtual void SetMaxOutstandingTask(int32_t limit) = 0; + + // Reset to unlimited max concurrent task. + virtual void ResetMaxOutstandingTask() = 0; + + // Returns current outstanding task count. + virtual int32_t GetOutstandingTask() const = 0; +}; + +// Create a ConcurrentTaskLimiter that can be shared with mulitple CFs +// across RocksDB instances to control concurrent tasks. +// +// @param name: Name of the limiter. +// @param limit: max concurrent tasks. +// limit = 0 means no new task allowed. +// limit < 0 means no limitation. +extern ConcurrentTaskLimiter* NewConcurrentTaskLimiter( + const std::string& name, int32_t limit); + +} // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index c3ba44839..dd55e7036 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -34,6 +34,7 @@ class Cache; class CompactionFilter; class CompactionFilterFactory; class Comparator; +class ConcurrentTaskLimiter; class Env; enum InfoLogLevel : unsigned char; class SstFileManager; @@ -293,6 +294,14 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions { // Default: empty std::vector cf_paths; + // Compaction concurrent thread limiter for the column family. + // If non-nullptr, use given concurrent thread limiter to control + // the max outstanding compaction tasks. Limiter can be shared with + // multiple column families across db instances. + // + // Default: nullptr + std::shared_ptr compaction_thread_limiter = nullptr; + // Create ColumnFamilyOptions with default values for all fields ColumnFamilyOptions(); // Create ColumnFamilyOptions from Options diff --git a/options/cf_options.cc b/options/cf_options.cc index 37ef71065..74a6c47e6 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -17,6 +17,7 @@ #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/options.h" +#include "rocksdb/concurrent_task_limiter.h" namespace rocksdb { @@ -75,7 +76,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, max_subcompactions(db_options.max_subcompactions), memtable_insert_with_hint_prefix_extractor( cf_options.memtable_insert_with_hint_prefix_extractor.get()), - cf_paths(cf_options.cf_paths) {} + cf_paths(cf_options.cf_paths), + compaction_thread_limiter(cf_options.compaction_thread_limiter) {} // Multiple two operands. If they overflow, return op1. uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) { diff --git a/options/cf_options.h b/options/cf_options.h index 69b0b0105..0cc5ef5a5 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -120,6 +120,8 @@ struct ImmutableCFOptions { const SliceTransform* memtable_insert_with_hint_prefix_extractor; std::vector cf_paths; + + std::shared_ptr compaction_thread_limiter; }; struct MutableCFOptions { diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index cad1af3d7..9ff7c7814 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -351,6 +351,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { sizeof(std::shared_ptr)}, {offset_of(&ColumnFamilyOptions::cf_paths), sizeof(std::vector)}, + {offset_of(&ColumnFamilyOptions::compaction_thread_limiter), + sizeof(std::shared_ptr)}, }; char* options_ptr = new char[sizeof(ColumnFamilyOptions)]; @@ -389,6 +391,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { options->soft_rate_limit = 0; options->purge_redundant_kvs_while_flush = false; options->max_mem_compaction_level = 0; + options->compaction_filter = nullptr; char* new_options_ptr = new char[sizeof(ColumnFamilyOptions)]; ColumnFamilyOptions* new_options = diff --git a/src.mk b/src.mk index 5ba7f4b7c..0926fc12c 100644 --- a/src.mk +++ b/src.mk @@ -137,6 +137,7 @@ LIB_SOURCES = \ util/comparator.cc \ util/compression_context_cache.cc \ util/concurrent_arena.cc \ + util/concurrent_task_limiter_impl.cc \ util/crc32c.cc \ util/delete_scheduler.cc \ util/dynamic_bloom.cc \ diff --git a/util/concurrent_task_limiter_impl.cc b/util/concurrent_task_limiter_impl.cc new file mode 100644 index 000000000..098028b19 --- /dev/null +++ b/util/concurrent_task_limiter_impl.cc @@ -0,0 +1,66 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "util/concurrent_task_limiter_impl.h" +#include "rocksdb/concurrent_task_limiter.h" + +namespace rocksdb { + +ConcurrentTaskLimiterImpl::ConcurrentTaskLimiterImpl( + const std::string& name, int32_t max_outstanding_task) + : name_(name), + max_outstanding_tasks_{max_outstanding_task}, + outstanding_tasks_{0} { + +} + +ConcurrentTaskLimiterImpl::~ConcurrentTaskLimiterImpl() { +} + +const std::string& ConcurrentTaskLimiterImpl::GetName() const { + return name_; +} + +void ConcurrentTaskLimiterImpl::SetMaxOutstandingTask(int32_t limit) { + max_outstanding_tasks_.store(limit, std::memory_order_relaxed); +} + +void ConcurrentTaskLimiterImpl::ResetMaxOutstandingTask() { + max_outstanding_tasks_.store(-1, std::memory_order_relaxed); +} + +int32_t ConcurrentTaskLimiterImpl::GetOutstandingTask() const { + return outstanding_tasks_.load(std::memory_order_relaxed); +} + +std::unique_ptr ConcurrentTaskLimiterImpl::GetToken( + bool force) { + int32_t limit = max_outstanding_tasks_.load(std::memory_order_relaxed); + int32_t tasks = outstanding_tasks_.load(std::memory_order_relaxed); + // force = true, bypass the throttle. + // limit < 0 means unlimited tasks. + while (force || limit < 0 || tasks < limit) { + if (outstanding_tasks_.compare_exchange_weak(tasks, tasks + 1)) { + return std::unique_ptr(new TaskLimiterToken(this)); + } + } + return nullptr; +} + +ConcurrentTaskLimiter* NewConcurrentTaskLimiter( + const std::string& name, int32_t limit) { + return new ConcurrentTaskLimiterImpl(name, limit); +} + +TaskLimiterToken::~TaskLimiterToken() { + --limiter_->outstanding_tasks_; + assert(limiter_->outstanding_tasks_ >= 0); +} + +} // namespace rocksdb diff --git a/util/concurrent_task_limiter_impl.h b/util/concurrent_task_limiter_impl.h new file mode 100644 index 000000000..515f1481e --- /dev/null +++ b/util/concurrent_task_limiter_impl.h @@ -0,0 +1,68 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include + +#include "rocksdb/env.h" +#include "rocksdb/concurrent_task_limiter.h" + +namespace rocksdb { + +class TaskLimiterToken; + +class ConcurrentTaskLimiterImpl : public ConcurrentTaskLimiter { + public: + explicit ConcurrentTaskLimiterImpl(const std::string& name, + int32_t max_outstanding_task); + + virtual ~ConcurrentTaskLimiterImpl(); + + virtual const std::string& GetName() const override; + + virtual void SetMaxOutstandingTask(int32_t limit) override; + + virtual void ResetMaxOutstandingTask() override; + + virtual int32_t GetOutstandingTask() const override; + + // Request token for adding a new task. + // If force == true, it requests a token bypassing throttle. + // Returns nullptr if it got throttled. + virtual std::unique_ptr GetToken(bool force); + + private: + friend class TaskLimiterToken; + + std::string name_; + std::atomic max_outstanding_tasks_; + std::atomic outstanding_tasks_; + + // No copying allowed + ConcurrentTaskLimiterImpl(const ConcurrentTaskLimiterImpl&) = delete; + ConcurrentTaskLimiterImpl& operator=( + const ConcurrentTaskLimiterImpl&) = delete; +}; + +class TaskLimiterToken { + public: + explicit TaskLimiterToken(ConcurrentTaskLimiterImpl* limiter) + : limiter_(limiter) {} + ~TaskLimiterToken(); + + private: + ConcurrentTaskLimiterImpl* limiter_; + + // no copying allowed + TaskLimiterToken(const TaskLimiterToken&) = delete; + void operator=(const TaskLimiterToken&) = delete; +}; + +} // namespace rocksdb