Concurrent task limiter for compaction thread control (#4332)
Summary: The PR is targeting to resolve the issue of: https://github.com/facebook/rocksdb/issues/3972#issue-330771918 We have a rocksdb created with leveled-compaction with multiple column families (CFs), some of CFs are using HDD to store big and less frequently accessed data and others are using SSD. When there are continuously write traffics going on to all CFs, the compaction thread pool is mostly occupied by those slow HDD compactions, which blocks fully utilize SSD bandwidth. Since atomic write and transaction is needed across CFs, so splitting it to multiple rocksdb instance is not an option for us. With the compaction thread control, we got 30%+ HDD write throughput gain, and also a lot smooth SSD write since less write stall happening. ConcurrentTaskLimiter can be shared with multi-CFs across rocksdb instances, so the feature does not only work for multi-CFs scenarios, but also for multi-rocksdbs scenarios, who need disk IO resource control per tenant. The usage is straight forward: e.g.: // // Enable compaction thread limiter thru ColumnFamilyOptions // std::shared_ptr<ConcurrentTaskLimiter> ctl(NewConcurrentTaskLimiter("foo_limiter", 4)); Options options; ColumnFamilyOptions cf_opt(options); cf_opt.compaction_thread_limiter = ctl; ... // // Compaction thread limiter can be tuned or disabled on-the-fly // ctl->SetMaxOutstandingTask(12); // enlarge to 12 tasks ... ctl->ResetMaxOutstandingTask(); // disable (bypass) thread limiter ctl->SetMaxOutstandingTask(-1); // Same as above ... ctl->SetMaxOutstandingTask(0); // full throttle (0 task) // // Sharing compaction thread limiter among CFs (to resolve multiple storage perf issue) // std::shared_ptr<ConcurrentTaskLimiter> ctl_ssd(NewConcurrentTaskLimiter("ssd_limiter", 8)); std::shared_ptr<ConcurrentTaskLimiter> ctl_hdd(NewConcurrentTaskLimiter("hdd_limiter", 4)); Options options; ColumnFamilyOptions cf_opt_ssd1(options); ColumnFamilyOptions cf_opt_ssd2(options); ColumnFamilyOptions cf_opt_hdd1(options); ColumnFamilyOptions cf_opt_hdd2(options); ColumnFamilyOptions cf_opt_hdd3(options); // SSD CFs cf_opt_ssd1.compaction_thread_limiter = ctl_ssd; cf_opt_ssd2.compaction_thread_limiter = ctl_ssd; // HDD CFs cf_opt_hdd1.compaction_thread_limiter = ctl_hdd; cf_opt_hdd2.compaction_thread_limiter = ctl_hdd; cf_opt_hdd3.compaction_thread_limiter = ctl_hdd; ... // // The limiter is disabled by default (or set to nullptr explicitly) // Options options; ColumnFamilyOptions cf_opt(options); cf_opt.compaction_thread_limiter = nullptr; Pull Request resolved: https://github.com/facebook/rocksdb/pull/4332 Differential Revision: D13226590 Pulled By: siying fbshipit-source-id: 14307aec55b8bd59c8223d04aa6db3c03d1b0c1dmain
parent
0aa17c1002
commit
a8b9891f95
@ -0,0 +1,47 @@ |
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#pragma once |
||||
|
||||
#include "rocksdb/env.h" |
||||
#include "rocksdb/statistics.h" |
||||
|
||||
namespace rocksdb { |
||||
|
||||
class ConcurrentTaskLimiter { |
||||
public: |
||||
|
||||
virtual ~ConcurrentTaskLimiter() {} |
||||
|
||||
// Returns a name that identifies this concurrent task limiter.
|
||||
virtual const std::string& GetName() const = 0; |
||||
|
||||
// Set max concurrent tasks.
|
||||
// limit = 0 means no new task allowed.
|
||||
// limit < 0 means no limitation.
|
||||
virtual void SetMaxOutstandingTask(int32_t limit) = 0; |
||||
|
||||
// Reset to unlimited max concurrent task.
|
||||
virtual void ResetMaxOutstandingTask() = 0; |
||||
|
||||
// Returns current outstanding task count.
|
||||
virtual int32_t GetOutstandingTask() const = 0; |
||||
}; |
||||
|
||||
// Create a ConcurrentTaskLimiter that can be shared with mulitple CFs
|
||||
// across RocksDB instances to control concurrent tasks.
|
||||
//
|
||||
// @param name: Name of the limiter.
|
||||
// @param limit: max concurrent tasks.
|
||||
// limit = 0 means no new task allowed.
|
||||
// limit < 0 means no limitation.
|
||||
extern ConcurrentTaskLimiter* NewConcurrentTaskLimiter( |
||||
const std::string& name, int32_t limit); |
||||
|
||||
} // namespace rocksdb
|
@ -0,0 +1,66 @@ |
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "util/concurrent_task_limiter_impl.h" |
||||
#include "rocksdb/concurrent_task_limiter.h" |
||||
|
||||
namespace rocksdb { |
||||
|
||||
ConcurrentTaskLimiterImpl::ConcurrentTaskLimiterImpl( |
||||
const std::string& name, int32_t max_outstanding_task) |
||||
: name_(name), |
||||
max_outstanding_tasks_{max_outstanding_task}, |
||||
outstanding_tasks_{0} { |
||||
|
||||
} |
||||
|
||||
ConcurrentTaskLimiterImpl::~ConcurrentTaskLimiterImpl() { |
||||
} |
||||
|
||||
const std::string& ConcurrentTaskLimiterImpl::GetName() const { |
||||
return name_; |
||||
} |
||||
|
||||
void ConcurrentTaskLimiterImpl::SetMaxOutstandingTask(int32_t limit) { |
||||
max_outstanding_tasks_.store(limit, std::memory_order_relaxed); |
||||
} |
||||
|
||||
void ConcurrentTaskLimiterImpl::ResetMaxOutstandingTask() { |
||||
max_outstanding_tasks_.store(-1, std::memory_order_relaxed); |
||||
} |
||||
|
||||
int32_t ConcurrentTaskLimiterImpl::GetOutstandingTask() const { |
||||
return outstanding_tasks_.load(std::memory_order_relaxed); |
||||
} |
||||
|
||||
std::unique_ptr<TaskLimiterToken> ConcurrentTaskLimiterImpl::GetToken( |
||||
bool force) { |
||||
int32_t limit = max_outstanding_tasks_.load(std::memory_order_relaxed); |
||||
int32_t tasks = outstanding_tasks_.load(std::memory_order_relaxed); |
||||
// force = true, bypass the throttle.
|
||||
// limit < 0 means unlimited tasks.
|
||||
while (force || limit < 0 || tasks < limit) { |
||||
if (outstanding_tasks_.compare_exchange_weak(tasks, tasks + 1)) { |
||||
return std::unique_ptr<TaskLimiterToken>(new TaskLimiterToken(this)); |
||||
} |
||||
} |
||||
return nullptr; |
||||
} |
||||
|
||||
ConcurrentTaskLimiter* NewConcurrentTaskLimiter( |
||||
const std::string& name, int32_t limit) { |
||||
return new ConcurrentTaskLimiterImpl(name, limit); |
||||
} |
||||
|
||||
TaskLimiterToken::~TaskLimiterToken() { |
||||
--limiter_->outstanding_tasks_; |
||||
assert(limiter_->outstanding_tasks_ >= 0); |
||||
} |
||||
|
||||
} // namespace rocksdb
|
@ -0,0 +1,68 @@ |
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#pragma once |
||||
#include <atomic> |
||||
#include <memory> |
||||
|
||||
#include "rocksdb/env.h" |
||||
#include "rocksdb/concurrent_task_limiter.h" |
||||
|
||||
namespace rocksdb { |
||||
|
||||
class TaskLimiterToken; |
||||
|
||||
class ConcurrentTaskLimiterImpl : public ConcurrentTaskLimiter { |
||||
public: |
||||
explicit ConcurrentTaskLimiterImpl(const std::string& name, |
||||
int32_t max_outstanding_task); |
||||
|
||||
virtual ~ConcurrentTaskLimiterImpl(); |
||||
|
||||
virtual const std::string& GetName() const override; |
||||
|
||||
virtual void SetMaxOutstandingTask(int32_t limit) override; |
||||
|
||||
virtual void ResetMaxOutstandingTask() override; |
||||
|
||||
virtual int32_t GetOutstandingTask() const override; |
||||
|
||||
// Request token for adding a new task.
|
||||
// If force == true, it requests a token bypassing throttle.
|
||||
// Returns nullptr if it got throttled.
|
||||
virtual std::unique_ptr<TaskLimiterToken> GetToken(bool force); |
||||
|
||||
private: |
||||
friend class TaskLimiterToken; |
||||
|
||||
std::string name_; |
||||
std::atomic<int32_t> max_outstanding_tasks_; |
||||
std::atomic<int32_t> outstanding_tasks_; |
||||
|
||||
// No copying allowed
|
||||
ConcurrentTaskLimiterImpl(const ConcurrentTaskLimiterImpl&) = delete; |
||||
ConcurrentTaskLimiterImpl& operator=( |
||||
const ConcurrentTaskLimiterImpl&) = delete; |
||||
}; |
||||
|
||||
class TaskLimiterToken { |
||||
public: |
||||
explicit TaskLimiterToken(ConcurrentTaskLimiterImpl* limiter) |
||||
: limiter_(limiter) {} |
||||
~TaskLimiterToken(); |
||||
|
||||
private: |
||||
ConcurrentTaskLimiterImpl* limiter_; |
||||
|
||||
// no copying allowed
|
||||
TaskLimiterToken(const TaskLimiterToken&) = delete; |
||||
void operator=(const TaskLimiterToken&) = delete; |
||||
}; |
||||
|
||||
} // namespace rocksdb
|
Loading…
Reference in new issue