From eeb44366bae1dfc26e01db9f77c30c2179c134a8 Mon Sep 17 00:00:00 2001 From: Holodov Alexander Date: Sat, 16 May 2015 12:34:28 +0400 Subject: [PATCH 1/8] C api: human-readable statistics --- db/c.cc | 8 ++++++++ include/rocksdb/c.h | 1 + 2 files changed, 9 insertions(+) diff --git a/db/c.cc b/db/c.cc index fbf2c4ef0..0138acf6a 100644 --- a/db/c.cc +++ b/db/c.cc @@ -1930,6 +1930,14 @@ void rocksdb_options_set_fifo_compaction_options( opt->rep.compaction_options_fifo = fifo->rep; } +char *rocksdb_options_statistics_get_string(rocksdb_options_t *opt) { + rocksdb::Statistics *statistics = opt->rep.statistics.get(); + if (statistics) { + return strdup(statistics->ToString().c_str()); + } + return nullptr; +} + /* TODO: DB::OpenForReadOnly diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 06cf65bd1..3a426d24c 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -601,6 +601,7 @@ extern void rocksdb_options_set_max_grandparent_overlap_factor( extern void rocksdb_options_set_max_bytes_for_level_multiplier_additional( rocksdb_options_t*, int* level_values, size_t num_levels); extern void rocksdb_options_enable_statistics(rocksdb_options_t*); +extern char *rocksdb_options_statistics_get_string(rocksdb_options_t *opt); extern void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t*, int); extern void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t*, int); From 84a9c6a53a5fef0c782265201c6b394b5fbe8f40 Mon Sep 17 00:00:00 2001 From: Holodov Alexander Date: Sat, 16 May 2015 15:29:39 +0400 Subject: [PATCH 2/8] add comment --- include/rocksdb/c.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 3a426d24c..7dbaacd73 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -601,6 +601,8 @@ extern void rocksdb_options_set_max_grandparent_overlap_factor( extern void rocksdb_options_set_max_bytes_for_level_multiplier_additional( rocksdb_options_t*, int* level_values, size_t num_levels); extern void rocksdb_options_enable_statistics(rocksdb_options_t*); + +/* returns a pointer to a malloc()-ed, null terminated string */ extern char *rocksdb_options_statistics_get_string(rocksdb_options_t *opt); extern void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t*, int); From 1a08d0beb52eb06816474c1bc7467e3779125e45 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Wed, 17 Jun 2015 10:54:51 -0700 Subject: [PATCH 3/8] Block c_test in ROCKSDB_LITE Summary: Block c_test in ROCKSDB_LITE as it's not supported in ROCKSDB_LITE. Test Plan: c_test Reviewers: sdong, rven, anthony, kradhakrishnan, IslamAbdelRahman, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D40257 --- db/c.cc | 2 +- db/c_test.c | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/db/c.cc b/db/c.cc index fbf2c4ef0..71c5f542c 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2435,4 +2435,4 @@ extern void rocksdb_livefiles_destroy( } // end extern "C" -#endif // ROCKSDB_LITE +#endif // !ROCKSDB_LITE diff --git a/db/c_test.c b/db/c_test.c index 2669b03ad..978b6174c 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -2,6 +2,8 @@ Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. See the AUTHORS file for names of contributors. */ +#ifndef ROCKSDB_LITE // Lite does not support C API + #include "rocksdb/c.h" #include @@ -1007,3 +1009,13 @@ int main(int argc, char** argv) { fprintf(stderr, "PASS\n"); return 0; } + +#else +#include + +int main() { + fprintf(stderr, "SKIPPED\n"); + return 0; +} + +#endif // !ROCKSDB_LITE From 1369f015ee9777e6a5b5157c7dc57556ee6af551 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Wed, 17 Jun 2015 11:21:18 -0700 Subject: [PATCH 4/8] Only initialize the ThreadStatusData when necessary. Summary: Before this patch, any function call to ThreadStatusUtil might automatically initialize and register the thread status data. However, if it is the user-thread making this call, the allocated thread-status-data will never be released as such threads are not managed by rocksdb. In this patch, I remove the automatic-initialization part. Thread-status data is only initialized and uninitialized in Env during the thread creation and destruction. Test Plan: db_test thread_list_test listener_test Reviewers: igor, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D40017 --- util/env_posix.cc | 2 +- util/thread_status_updater.cc | 103 ++++++++++++++++------------------ util/thread_status_updater.h | 16 ++++-- util/thread_status_util.cc | 6 +- util/thread_status_util.h | 4 +- 5 files changed, 66 insertions(+), 65 deletions(-) diff --git a/util/env_posix.cc b/util/env_posix.cc index 77fb0ba44..8b1f04143 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1766,7 +1766,7 @@ class PosixEnv : public Env { ThreadPool* tp = meta->thread_pool_; #if ROCKSDB_USING_THREAD_STATUS // for thread-status - ThreadStatusUtil::SetThreadType(tp->env_, + ThreadStatusUtil::RegisterThread(tp->env_, (tp->GetThreadPriority() == Env::Priority::HIGH ? ThreadStatus::HIGH_PRIORITY : ThreadStatus::LOW_PRIORITY)); diff --git a/util/thread_status_updater.cc b/util/thread_status_updater.cc index 2dc15b429..2fd87cc89 100644 --- a/util/thread_status_updater.cc +++ b/util/thread_status_updater.cc @@ -15,6 +15,19 @@ namespace rocksdb { __thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr; +void ThreadStatusUpdater::RegisterThread( + ThreadStatus::ThreadType ttype, uint64_t thread_id) { + if (UNLIKELY(thread_status_data_ == nullptr)) { + thread_status_data_ = new ThreadStatusData(); + thread_status_data_->thread_type = ttype; + thread_status_data_->thread_id = thread_id; + std::lock_guard lck(thread_list_mutex_); + thread_data_set_.insert(thread_status_data_); + } + + ClearThreadOperationProperties(); +} + void ThreadStatusUpdater::UnregisterThread() { if (thread_status_data_ != nullptr) { std::lock_guard lck(thread_list_mutex_); @@ -24,18 +37,6 @@ void ThreadStatusUpdater::UnregisterThread() { } } -void ThreadStatusUpdater::SetThreadID(uint64_t thread_id) { - auto* data = InitAndGet(); - data->thread_id.store(thread_id, std::memory_order_relaxed); -} - -void ThreadStatusUpdater::SetThreadType( - ThreadStatus::ThreadType ttype) { - auto* data = InitAndGet(); - data->thread_type.store(ttype, std::memory_order_relaxed); - ClearThreadOperationProperties(); -} - void ThreadStatusUpdater::ResetThreadStatus() { ClearThreadState(); ClearThreadOperation(); @@ -44,7 +45,10 @@ void ThreadStatusUpdater::ResetThreadStatus() { void ThreadStatusUpdater::SetColumnFamilyInfoKey( const void* cf_key) { - auto* data = InitAndGet(); + auto* data = Get(); + if (data == nullptr) { + return; + } // set the tracking flag based on whether cf_key is non-null or not. // If enable_thread_tracking is set to false, the input cf_key // would be nullptr. @@ -53,8 +57,8 @@ void ThreadStatusUpdater::SetColumnFamilyInfoKey( } const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() { - auto* data = InitAndGet(); - if (data->enable_tracking == false) { + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return nullptr; } return data->cf_key.load(std::memory_order_relaxed); @@ -62,9 +66,8 @@ const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() { void ThreadStatusUpdater::SetThreadOperation( const ThreadStatus::OperationType type) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } // NOTE: Our practice here is to set all the thread operation properties @@ -82,9 +85,8 @@ void ThreadStatusUpdater::SetThreadOperation( void ThreadStatusUpdater::SetThreadOperationProperty( int i, uint64_t value) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->op_properties[i].store(value, std::memory_order_relaxed); @@ -92,27 +94,24 @@ void ThreadStatusUpdater::SetThreadOperationProperty( void ThreadStatusUpdater::IncreaseThreadOperationProperty( int i, uint64_t delta) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->op_properties[i].fetch_add(delta, std::memory_order_relaxed); } void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->op_start_time.store(start_time, std::memory_order_relaxed); } void ThreadStatusUpdater::ClearThreadOperation() { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN, @@ -123,9 +122,8 @@ void ThreadStatusUpdater::ClearThreadOperation() { } void ThreadStatusUpdater::ClearThreadOperationProperties() { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) { @@ -135,9 +133,8 @@ void ThreadStatusUpdater::ClearThreadOperationProperties() { ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage( ThreadStatus::OperationStage stage) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return ThreadStatus::STAGE_UNKNOWN; } return data->operation_stage.exchange( @@ -146,18 +143,16 @@ ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage( void ThreadStatusUpdater::SetThreadState( const ThreadStatus::StateType type) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->state_type.store(type, std::memory_order_relaxed); } void ThreadStatusUpdater::ClearThreadState() { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->state_type.store( @@ -222,11 +217,14 @@ Status ThreadStatusUpdater::GetThreadList( return Status::OK(); } -ThreadStatusData* ThreadStatusUpdater::InitAndGet() { - if (UNLIKELY(thread_status_data_ == nullptr)) { - thread_status_data_ = new ThreadStatusData(); - std::lock_guard lck(thread_list_mutex_); - thread_data_set_.insert(thread_status_data_); +ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() { + if (thread_status_data_ == nullptr) { + return nullptr; + } + if (!thread_status_data_->enable_tracking) { + assert(thread_status_data_->cf_key.load( + std::memory_order_relaxed) == nullptr); + return nullptr; } return thread_status_data_; } @@ -290,17 +288,14 @@ void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) { #else -void ThreadStatusUpdater::UnregisterThread() { +void ThreadStatusUpdater::RegisterThread( + ThreadStatus::ThreadType ttype, uint64_t thread_id) { } -void ThreadStatusUpdater::ResetThreadStatus() { -} - -void ThreadStatusUpdater::SetThreadID(uint64_t thread_id) { +void ThreadStatusUpdater::UnregisterThread() { } -void ThreadStatusUpdater::SetThreadType( - ThreadStatus::ThreadType ttype) { +void ThreadStatusUpdater::ResetThreadStatus() { } void ThreadStatusUpdater::SetColumnFamilyInfoKey( diff --git a/util/thread_status_updater.h b/util/thread_status_updater.h index 5e7c2b894..218bba042 100644 --- a/util/thread_status_updater.h +++ b/util/thread_status_updater.h @@ -118,8 +118,8 @@ class ThreadStatusUpdater { // Set the id of the current thread. void SetThreadID(uint64_t thread_id); - // Set the thread type of the current thread. - void SetThreadType(ThreadStatus::ThreadType ttype); + // Register the current thread for tracking. + void RegisterThread(ThreadStatus::ThreadType ttype, uint64_t thread_id); // Update the column-family info of the current thread by setting // its thread-local pointer of ThreadStateInfo to the correct entry. @@ -198,9 +198,15 @@ class ThreadStatusUpdater { // The thread-local variable for storing thread status. static __thread ThreadStatusData* thread_status_data_; - // Obtain the pointer to the thread status data. It also performs - // initialization when necessary. - ThreadStatusData* InitAndGet(); + // Returns the pointer to the thread status data only when the + // thread status data is non-null and has enable_tracking == true. + ThreadStatusData* GetLocalThreadStatus(); + + // Directly returns the pointer to thread_status_data_ without + // checking whether enabling_tracking is true of not. + ThreadStatusData* Get() { + return thread_status_data_; + } // The mutex that protects cf_info_map and db_key_map. std::mutex thread_list_mutex_; diff --git a/util/thread_status_util.cc b/util/thread_status_util.cc index 116950e13..e67a8e4ef 100644 --- a/util/thread_status_util.cc +++ b/util/thread_status_util.cc @@ -15,14 +15,14 @@ __thread ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = nullptr; __thread bool ThreadStatusUtil::thread_updater_initialized_ = false; -void ThreadStatusUtil::SetThreadType( +void ThreadStatusUtil::RegisterThread( const Env* env, ThreadStatus::ThreadType thread_type) { if (!MaybeInitThreadLocalUpdater(env)) { return; } assert(thread_updater_local_cache_); - thread_updater_local_cache_->SetThreadID(env->GetThreadID()); - thread_updater_local_cache_->SetThreadType(thread_type); + thread_updater_local_cache_->RegisterThread( + thread_type, env->GetThreadID()); } void ThreadStatusUtil::UnregisterThread() { diff --git a/util/thread_status_util.h b/util/thread_status_util.h index ba0238d58..aa13a6c40 100644 --- a/util/thread_status_util.h +++ b/util/thread_status_util.h @@ -27,8 +27,8 @@ class ColumnFamilyData; // all function calls to ThreadStatusUtil will be no-op. class ThreadStatusUtil { public: - // Set the thread type of the current thread. - static void SetThreadType( + // Register the current thread for tracking. + static void RegisterThread( const Env* env, ThreadStatus::ThreadType thread_type); // Unregister the current thread. From 25d600569db4fa001c0ad7dfe76901311b0b0ccc Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 17 Jun 2015 12:37:59 -0700 Subject: [PATCH 5/8] Clean up InstallSuperVersion Summary: We go to great lengths to make sure MaybeScheduleFlushOrCompaction() is called outside of write thread. But anyway, it's still called in the mutex, so it's not that much cheaper. This diff removes the "optimization" and cleans up the code a bit. Test Plan: make check Reviewers: rven, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D40113 --- db/db_impl.cc | 93 ++++++++++++++++---------------------- db/db_impl.h | 16 ++----- db/db_impl_experimental.cc | 4 +- 3 files changed, 45 insertions(+), 68 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index aca964f84..b767a1ff3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -95,7 +95,6 @@ void DumpRocksDBBuildVersion(Logger * log); struct DBImpl::WriteContext { autovector superversions_to_free_; autovector memtables_to_free_; - bool schedule_bg_work_ = false; ~WriteContext() { for (auto& sv : superversions_to_free_) { @@ -1249,7 +1248,8 @@ Status DBImpl::FlushMemTableToOutputFile( Status s = flush_job.Run(&file_meta); if (s.ok()) { - InstallSuperVersionBackground(cfd, job_context, mutable_cf_options); + InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context, + mutable_cf_options); if (madeProgress) { *madeProgress = 1; } @@ -1578,8 +1578,8 @@ Status DBImpl::CompactFilesImpl( compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); if (status.ok()) { - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); } c->ReleaseCompactionFiles(s); c.reset(); @@ -1791,7 +1791,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, directories_.GetDbDir()); - superversion_to_free = InstallSuperVersion( + superversion_to_free = InstallSuperVersionAndScheduleWork( cfd, new_superversion, mutable_cf_options); new_superversion = nullptr; @@ -1945,9 +1945,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, s = write_thread_.EnterWriteThread(&w, 0); assert(s.ok() && !w.done); // No timeout and nobody should do our job - // SetNewMemtableAndNewLogFile() will release and reacquire mutex + // SwitchMemtable() will release and reacquire mutex // during execution - s = SetNewMemtableAndNewLogFile(cfd, &context); + s = SwitchMemtable(cfd, &context); write_thread_.ExitWriteThread(&w, &w, s); cfd->imm()->FlushRequested(); @@ -2410,10 +2410,10 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, c->inputs(0)->size()); // There are three things that can change compaction score: // 1) When flush or compaction finish. This case is covered by - // InstallSuperVersion() + // InstallSuperVersionAndScheduleWork // 2) When MutableCFOptions changes. This case is also covered by - // InstallSuperVersion(), because this is when the new options take - // effect. + // InstallSuperVersionAndScheduleWork, because this is when the new + // options take effect. // 3) When we Pick a new compaction, we "remove" those files being // compacted from the calculation, which then influences compaction // score. Here we check if we need the new compaction even without the @@ -2449,8 +2449,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, status = versions_->LogAndApply(c->column_family_data(), *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); LogToBuffer(log_buffer, "[%s] Deleted %d files\n", c->column_family_data()->GetName().c_str(), c->num_input_files(0)); @@ -2486,8 +2486,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); // Use latest MutableCFOptions - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); VersionStorageInfo::LevelSummaryStorage tmp; c->column_family_data()->internal_stats()->IncBytesMoved(c->level() + 1, @@ -2532,8 +2532,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); if (status.ok()) { - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); } *madeProgress = true; } @@ -2695,26 +2695,25 @@ Status DBImpl::Get(const ReadOptions& read_options, // * malloc one SuperVersion() outside of the lock -- new_superversion // * delete SuperVersion()s outside of the lock -- superversions_to_free // -// However, if InstallSuperVersion() gets called twice with the same -// job_context, we can't reuse the SuperVersion() that got -// malloced -// because +// However, if InstallSuperVersionAndScheduleWork() gets called twice with the +// same job_context, we can't reuse the SuperVersion() that got +// malloced because // first call already used it. In that rare case, we take a hit and create a // new SuperVersion() inside of the mutex. We do similar thing // for superversion_to_free -void DBImpl::InstallSuperVersionBackground( +void DBImpl::InstallSuperVersionAndScheduleWorkWrapper( ColumnFamilyData* cfd, JobContext* job_context, const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); - SuperVersion* old_superversion = InstallSuperVersion( + SuperVersion* old_superversion = InstallSuperVersionAndScheduleWork( cfd, job_context->new_superversion, mutable_cf_options); job_context->new_superversion = nullptr; job_context->superversions_to_free.push_back(old_superversion); } -SuperVersion* DBImpl::InstallSuperVersion( +SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork( ColumnFamilyData* cfd, SuperVersion* new_sv, - const MutableCFOptions& mutable_cf_options, bool dont_schedule_bg_work) { + const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); // Update max_total_in_memory_state_ @@ -2729,14 +2728,10 @@ SuperVersion* DBImpl::InstallSuperVersion( new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options); // Whenever we install new SuperVersion, we might need to issue new flushes or - // compactions. dont_schedule_bg_work is true when scheduling from write - // thread and we don't want to add additional overhead. Callers promise to - // call SchedulePendingFlush() and MaybeScheduleFlushOrCompaction() eventually - if (!dont_schedule_bg_work) { - SchedulePendingFlush(cfd); - SchedulePendingCompaction(cfd); - MaybeScheduleFlushOrCompaction(); - } + // compactions. + SchedulePendingFlush(cfd); + SchedulePendingCompaction(cfd); + MaybeScheduleFlushOrCompaction(); // Update max_total_in_memory_state_ max_total_in_memory_state_ = @@ -2947,7 +2942,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, auto* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); assert(cfd != nullptr); - delete InstallSuperVersion( + delete InstallSuperVersionAndScheduleWork( cfd, nullptr, *cfd->GetLatestMutableCFOptions()); if (!cfd->mem()->IsSnapshotSupported()) { @@ -3371,15 +3366,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, continue; } if (cfd->GetLogNumber() <= flush_column_family_if_log_file) { - status = SetNewMemtableAndNewLogFile(cfd, &context); + status = SwitchMemtable(cfd, &context); if (!status.ok()) { break; } cfd->imm()->FlushRequested(); SchedulePendingFlush(cfd); - context.schedule_bg_work_ = true; } } + MaybeScheduleFlushOrCompaction(); } else if (UNLIKELY(write_buffer_.ShouldFlush())) { Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Flushing all column families. Write buffer is using %" PRIu64 @@ -3392,13 +3387,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, continue; } if (!cfd->mem()->IsEmpty()) { - status = SetNewMemtableAndNewLogFile(cfd, &context); + status = SwitchMemtable(cfd, &context); if (!status.ok()) { break; } cfd->imm()->FlushRequested(); SchedulePendingFlush(cfd); - context.schedule_bg_work_ = true; } } MaybeScheduleFlushOrCompaction(); @@ -3414,11 +3408,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (UNLIKELY(status.ok()) && (write_controller_.IsStopped() || write_controller_.NeedsDelay())) { - // If writer is stopped, we need to get it going, - // so schedule flushes/compactions - if (context.schedule_bg_work_) { - MaybeScheduleFlushOrCompaction(); - } PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_delay_time); // We don't know size of curent batch so that we always use the size @@ -3560,9 +3549,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, mutex_.AssertHeld(); write_thread_.ExitWriteThread(&w, last_writer, status); - if (context.schedule_bg_work_) { - MaybeScheduleFlushOrCompaction(); - } mutex_.Unlock(); if (status.IsTimedOut()) { @@ -3633,9 +3619,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, uint64_t expiration_time) { Status DBImpl::ScheduleFlushes(WriteContext* context) { ColumnFamilyData* cfd; while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) { - auto status = SetNewMemtableAndNewLogFile(cfd, context); - SchedulePendingFlush(cfd); - context->schedule_bg_work_ = true; + auto status = SwitchMemtable(cfd, context); if (cfd->Unref()) { delete cfd; } @@ -3648,8 +3632,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, - WriteContext* context) { +Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { mutex_.AssertHeld(); unique_ptr lfile; log::Writer* new_log = nullptr; @@ -3719,8 +3702,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); new_mem->Ref(); cfd->SetMemtable(new_mem); - context->superversions_to_free_.push_back( - InstallSuperVersion(cfd, new_superversion, mutable_cf_options, true)); + context->superversions_to_free_.push_back(InstallSuperVersionAndScheduleWork( + cfd, new_superversion, mutable_cf_options)); return s; } @@ -4010,8 +3993,8 @@ Status DBImpl::DeleteFile(std::string name) { status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionBackground(cfd, &job_context, - *cfd->GetLatestMutableCFOptions()); + InstallSuperVersionAndScheduleWorkWrapper( + cfd, &job_context, *cfd->GetLatestMutableCFOptions()); } FindObsoleteFiles(&job_context, false); } // lock released here @@ -4253,7 +4236,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - delete impl->InstallSuperVersion( + delete impl->InstallSuperVersionAndScheduleWork( cfd, nullptr, *cfd->GetLatestMutableCFOptions()); } impl->alive_log_files_.push_back( diff --git a/db/db_impl.h b/db/db_impl.h index 89afda987..0d7e1b472 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -440,8 +440,7 @@ class DBImpl : public DB { Status ScheduleFlushes(WriteContext* context); - Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, - WriteContext* context); + Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); // Force current memtable contents to be flushed. Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options); @@ -719,21 +718,16 @@ class DBImpl : public DB { // the InstallSuperVersion() function. Background threads carry // job_context which can have new_superversion already // allocated. - void InstallSuperVersionBackground( + void InstallSuperVersionAndScheduleWorkWrapper( ColumnFamilyData* cfd, JobContext* job_context, const MutableCFOptions& mutable_cf_options); // All ColumnFamily state changes go through this function. Here we analyze // the new state and we schedule background work if we detect that the new // state needs flush or compaction. - // If dont_schedule_bg_work == true, then caller asks us to not schedule flush - // or compaction here, but it also promises to schedule needed background - // work. We use this to scheduling background compactions when we are in the - // write thread, which is very performance critical. Caller schedules - // background work as soon as it exits the write thread - SuperVersion* InstallSuperVersion(ColumnFamilyData* cfd, SuperVersion* new_sv, - const MutableCFOptions& mutable_cf_options, - bool dont_schedule_bg_work = false); + SuperVersion* InstallSuperVersionAndScheduleWork( + ColumnFamilyData* cfd, SuperVersion* new_sv, + const MutableCFOptions& mutable_cf_options); #ifndef ROCKSDB_LITE using DB::GetPropertiesOfAllTables; diff --git a/db/db_impl_experimental.cc b/db/db_impl_experimental.cc index 65529307b..6bf0ba6a1 100644 --- a/db/db_impl_experimental.cc +++ b/db/db_impl_experimental.cc @@ -137,8 +137,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) { status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionBackground(cfd, &job_context, - *cfd->GetLatestMutableCFOptions()); + InstallSuperVersionAndScheduleWorkWrapper( + cfd, &job_context, *cfd->GetLatestMutableCFOptions()); } } // lock released here LogFlush(db_options_.info_log); From c89369f57ca591a99ad4c5f47f912b124dd26894 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 17 Jun 2015 14:09:12 -0700 Subject: [PATCH 6/8] Move dockerbuild.sh to build_tools/ Summary: That's where we keep build tools :) Test Plan: none Reviewers: sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D39741 --- dockerbuild.sh => build_tools/dockerbuild.sh | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename dockerbuild.sh => build_tools/dockerbuild.sh (100%) diff --git a/dockerbuild.sh b/build_tools/dockerbuild.sh similarity index 100% rename from dockerbuild.sh rename to build_tools/dockerbuild.sh From 12e030a99243b50581177d891eed74d6cd5a47ee Mon Sep 17 00:00:00 2001 From: Islam AbdelRahman Date: Wed, 17 Jun 2015 14:36:14 -0700 Subject: [PATCH 7/8] Use CompactRangeOptions for CompactRange Summary: This diff update DB::CompactRange to use RangeCompactionOptions instead of using multiple parameters Old CompactRange is still available but deprecated Test Plan: make all check make rocksdbjava USE_CLANG=1 make all OPT=-DROCKSDB_LITE make release Reviewers: sdong, yhchiang, igor Reviewed By: igor Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D40209 --- HISTORY.md | 1 + db/c.cc | 4 +- db/column_family_test.cc | 6 +- db/compaction_job_stats_test.cc | 10 +- db/db_bench.cc | 2 +- db/db_impl.cc | 21 +-- db/db_impl.h | 7 +- db/db_impl_readonly.h | 7 +- db/db_test.cc | 172 ++++++++++-------- db/deletefile_test.cc | 7 +- db/fault_injection_test.cc | 2 +- db/listener_test.cc | 3 +- db/merge_test.cc | 8 +- include/rocksdb/db.h | 52 ++++-- include/rocksdb/options.h | 13 ++ include/rocksdb/utilities/stackable_db.h | 10 +- java/rocksjni/rocksjni.cc | 20 +- util/ldb_cmd.cc | 13 +- util/manual_compaction_test.cc | 4 +- utilities/compacted_db/compacted_db_impl.h | 7 +- .../string_append/stringappend_test.cc | 6 +- utilities/spatialdb/spatial_db.cc | 2 +- utilities/ttl/ttl_test.cc | 4 +- 23 files changed, 224 insertions(+), 157 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 033c8f1fb..1559bc312 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -16,6 +16,7 @@ * options.hard_rate_limit is deprecated. * When options.soft_rate_limit or options.level0_slowdown_writes_trigger is triggered, the way to slow down writes is changed to: write rate to DB is limited to to options.delayed_write_rate. * DB::GetApproximateSizes() adds a parameter to allow the estimation to include data in mem table, with default to be not to include. It is now only supported in skip list mem table. +* DB::CompactRange() now accept CompactRangeOptions instead of multiple paramters. CompactRangeOptions is defined in include/rocksdb/options.h. ## 3.11.0 (5/19/2015) ### New Features diff --git a/db/c.cc b/db/c.cc index b44da0bbb..48339c357 100644 --- a/db/c.cc +++ b/db/c.cc @@ -77,6 +77,7 @@ using rocksdb::BackupEngine; using rocksdb::BackupableDBOptions; using rocksdb::BackupInfo; using rocksdb::RestoreOptions; +using rocksdb::CompactRangeOptions; using std::shared_ptr; @@ -1006,6 +1007,7 @@ void rocksdb_compact_range( const char* limit_key, size_t limit_key_len) { Slice a, b; db->rep->CompactRange( + CompactRangeOptions(), // Pass nullptr Slice if corresponding "const char*" is nullptr (start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr), (limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr)); @@ -1018,7 +1020,7 @@ void rocksdb_compact_range_cf( const char* limit_key, size_t limit_key_len) { Slice a, b; db->rep->CompactRange( - column_family->rep, + CompactRangeOptions(), column_family->rep, // Pass nullptr Slice if corresponding "const char*" is nullptr (start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr), (limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr)); diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 484a4c66a..283b8ede1 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -215,11 +215,13 @@ class ColumnFamilyTest : public testing::Test { } void CompactAll(int cf) { - ASSERT_OK(db_->CompactRange(handles_[cf], nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf], nullptr, + nullptr)); } void Compact(int cf, const Slice& start, const Slice& limit) { - ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit)); + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit)); } int NumTableFilesAtLevel(int level, int cf) { diff --git a/db/compaction_job_stats_test.cc b/db/compaction_job_stats_test.cc index 64894d6aa..3efbbfdeb 100644 --- a/db/compaction_job_stats_test.cc +++ b/db/compaction_job_stats_test.cc @@ -309,16 +309,18 @@ class CompactionJobStatsTest : public testing::Test { void Compact(int cf, const Slice& start, const Slice& limit, uint32_t target_path_id) { - ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit, false, -1, - target_path_id)); + CompactRangeOptions compact_options; + compact_options.target_path_id = target_path_id; + ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit)); } void Compact(int cf, const Slice& start, const Slice& limit) { - ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit)); + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit)); } void Compact(const Slice& start, const Slice& limit) { - ASSERT_OK(db_->CompactRange(&start, &limit)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit)); } void TEST_Compact(int level, int cf, const Slice& start, const Slice& limit) { diff --git a/db/db_bench.cc b/db/db_bench.cc index a50b66bed..f7096bad3 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -3748,7 +3748,7 @@ class Benchmark { void Compact(ThreadState* thread) { DB* db = SelectDB(thread); - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); } void PrintStats(const char* key) { diff --git a/db/db_impl.cc b/db/db_impl.cc index b767a1ff3..a8a1389f9 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1328,11 +1328,10 @@ void DBImpl::NotifyOnFlushCompleted( #endif // ROCKSDB_LITE } -Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, - const Slice* begin, const Slice* end, - bool change_level, int target_level, - uint32_t target_path_id) { - if (target_path_id >= db_options_.db_paths.size()) { +Status DBImpl::CompactRange(const CompactRangeOptions& options, + ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) { + if (options.target_path_id >= db_options_.db_paths.size()) { return Status::InvalidArgument("Invalid target path ID"); } @@ -1362,8 +1361,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, cfd->NumberLevels() > 1) { // Always compact all files together. s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, - cfd->NumberLevels() - 1, target_path_id, begin, - end); + cfd->NumberLevels() - 1, options.target_path_id, + begin, end); final_output_level = cfd->NumberLevels() - 1; } else { for (int level = 0; level <= max_level_with_files; level++) { @@ -1384,8 +1383,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, output_level = ColumnFamilyData::kCompactToBaseLevel; } } - s = RunManualCompaction(cfd, level, output_level, target_path_id, begin, - end); + s = RunManualCompaction(cfd, level, output_level, options.target_path_id, + begin, end); if (!s.ok()) { break; } @@ -1403,8 +1402,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, return s; } - if (change_level) { - s = ReFitLevel(cfd, final_output_level, target_level); + if (options.change_level) { + s = ReFitLevel(cfd, final_output_level, options.target_level); } LogFlush(db_options_.info_log); diff --git a/db/db_impl.h b/db/db_impl.h index 0d7e1b472..a649b2baa 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -125,10 +125,9 @@ class DBImpl : public DB { const Range* range, int n, uint64_t* sizes, bool include_memtable = false) override; using DB::CompactRange; - virtual Status CompactRange(ColumnFamilyHandle* column_family, - const Slice* begin, const Slice* end, - bool change_level = false, int target_level = -1, - uint32_t target_path_id = 0) override; + virtual Status CompactRange(const CompactRangeOptions& options, + ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) override; using DB::CompactFiles; virtual Status CompactFiles(const CompactionOptions& compact_options, diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 25fcb4350..fc14c04bc 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -58,10 +58,9 @@ class DBImplReadOnly : public DBImpl { return Status::NotSupported("Not supported operation in read only mode."); } using DBImpl::CompactRange; - virtual Status CompactRange(ColumnFamilyHandle* column_family, - const Slice* begin, const Slice* end, - bool reduce_level = false, int target_level = -1, - uint32_t target_path_id = 0) override { + virtual Status CompactRange(const CompactRangeOptions& options, + ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) override { return Status::NotSupported("Not supported operation in read only mode."); } diff --git a/db/db_test.cc b/db/db_test.cc index a86e755b2..ac6f8ec19 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1092,16 +1092,18 @@ class DBTest : public testing::Test { void Compact(int cf, const Slice& start, const Slice& limit, uint32_t target_path_id) { - ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit, false, -1, - target_path_id)); + CompactRangeOptions compact_options; + compact_options.target_path_id = target_path_id; + ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit)); } void Compact(int cf, const Slice& start, const Slice& limit) { - ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit)); + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit)); } void Compact(const Slice& start, const Slice& limit) { - ASSERT_OK(db_->CompactRange(&start, &limit)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit)); } // Do n memtable compactions, each of which produces an sstable @@ -1524,7 +1526,7 @@ TEST_F(DBTest, CompactedDB) { ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h'))); ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i'))); ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j'))); - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(3, NumTableFilesAtLevel(1)); Close(); @@ -2339,7 +2341,7 @@ TEST_F(DBTest, WholeKeyFilterProp) { // ranges. ASSERT_OK(dbfull()->Put(wo, "aaa", "")); ASSERT_OK(dbfull()->Put(wo, "zzz", "")); - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); // Reopen with both of whole key off and prefix extractor enabled. // Still no bloom filter should be used. @@ -2362,7 +2364,7 @@ TEST_F(DBTest, WholeKeyFilterProp) { // ranges. ASSERT_OK(dbfull()->Put(wo, "aaa", "")); ASSERT_OK(dbfull()->Put(wo, "zzz", "")); - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); options.prefix_extractor.reset(); bbto.whole_key_filtering = true; @@ -3790,7 +3792,7 @@ TEST_F(DBTest, TrivialMoveOneFile) { LiveFileMetaData level0_file = metadata[0]; // L0 file meta // Compaction will initiate a trivial move from L0 to L1 - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); // File moved From L0 to L1 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0 @@ -3855,7 +3857,7 @@ TEST_F(DBTest, TrivialMoveNonOverlappingFiles) { // Since data is non-overlapping we expect compaction to initiate // a trivial move - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); // We expect that all the files were trivially moved from L0 to L1 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files); @@ -3892,7 +3894,7 @@ TEST_F(DBTest, TrivialMoveNonOverlappingFiles) { ASSERT_OK(Flush()); } - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); for (uint32_t i = 0; i < ranges.size(); i++) { for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { @@ -3944,7 +3946,10 @@ TEST_F(DBTest, TrivialMoveTargetLevel) { // 2 files in L0 ASSERT_EQ("2", FilesPerLevel(0)); - ASSERT_OK(db_->CompactRange(nullptr, nullptr, true, 6)); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 6; + ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); // 2 files in L6 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0)); @@ -5423,17 +5428,13 @@ TEST_F(DBTest, ConvertCompactionStyle) { options = CurrentOptions(options); ReopenWithColumnFamilies({"default", "pikachu"}, options); - dbfull()->CompactRange(handles_[1], nullptr, nullptr, true /* reduce level */, - 0 /* reduce to level 0 */); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 0; + dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr); - for (int i = 0; i < options.num_levels; i++) { - int num = NumTableFilesAtLevel(i, 1); - if (i == 0) { - ASSERT_EQ(num, 1); - } else { - ASSERT_EQ(num, 0); - } - } + // Only 1 file in L0 + ASSERT_EQ("1", FilesPerLevel(1)); // Stage 4: re-open in universal compaction style and do some db operations options = CurrentOptions(); @@ -5548,8 +5549,10 @@ TEST_F(DBTest, IncreaseUniversalCompactionNumLevels) { options.target_file_size_base = INT_MAX; ReopenWithColumnFamilies({"default", "pikachu"}, options); // Compact all to level 0 - dbfull()->CompactRange(handles_[1], nullptr, nullptr, true /* reduce level */, - 0 /* reduce to level 0 */); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 0; + dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr); // Need to restart it once to remove higher level records in manifest. ReopenWithColumnFamilies({"default", "pikachu"}, options); // Final reopen @@ -6021,7 +6024,7 @@ TEST_F(DBTest, CompactionFilterDeletesAll) { } // this will produce empty file (delete compaction filter) - ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_EQ(0U, CountLiveFiles()); Reopen(options); @@ -6062,7 +6065,8 @@ TEST_F(DBTest, CompactionFilterWithValueChange) { dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); } else { - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); } // re-write all data again @@ -6079,7 +6083,8 @@ TEST_F(DBTest, CompactionFilterWithValueChange) { dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); } else { - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); } // verify that all keys now have the new value that @@ -6120,7 +6125,7 @@ TEST_F(DBTest, CompactionFilterWithMergeOperator) { ASSERT_OK(Flush()); std::string newvalue = Get("foo"); ASSERT_EQ(newvalue, three); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); newvalue = Get("foo"); ASSERT_EQ(newvalue, three); @@ -6128,12 +6133,12 @@ TEST_F(DBTest, CompactionFilterWithMergeOperator) { // merge keys. ASSERT_OK(db_->Put(WriteOptions(), "bar", two)); ASSERT_OK(Flush()); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); newvalue = Get("bar"); ASSERT_EQ("NOT_FOUND", newvalue); ASSERT_OK(db_->Merge(WriteOptions(), "bar", two)); ASSERT_OK(Flush()); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); newvalue = Get("bar"); ASSERT_EQ(two, two); @@ -6144,7 +6149,7 @@ TEST_F(DBTest, CompactionFilterWithMergeOperator) { ASSERT_OK(Flush()); newvalue = Get("foobar"); ASSERT_EQ(newvalue, three); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); newvalue = Get("foobar"); ASSERT_EQ(newvalue, three); @@ -6157,7 +6162,7 @@ TEST_F(DBTest, CompactionFilterWithMergeOperator) { ASSERT_OK(Flush()); newvalue = Get("barfoo"); ASSERT_EQ(newvalue, four); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); newvalue = Get("barfoo"); ASSERT_EQ(newvalue, four); } @@ -6191,7 +6196,7 @@ TEST_F(DBTest, CompactionFilterContextManual) { filter->expect_manual_compaction_.store(true); filter->expect_full_compaction_.store(false); // Manual compaction always // set this flag. - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(cfilter_count, 700); ASSERT_EQ(NumSortedRuns(0), 1); @@ -6939,7 +6944,8 @@ TEST_F(DBTest, CompactBetweenSnapshots) { // After a compaction, "second", "third" and "fifth" should // be removed FillLevels("a", "z", 1); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ("sixth", Get(1, "foo")); ASSERT_EQ("fourth", Get(1, "foo", snapshot2)); ASSERT_EQ("first", Get(1, "foo", snapshot1)); @@ -6948,7 +6954,8 @@ TEST_F(DBTest, CompactBetweenSnapshots) { // after we release the snapshot1, only two values left db_->ReleaseSnapshot(snapshot1); FillLevels("a", "z", 1); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); // We have only one valid snapshot snapshot2. Since snapshot1 is // not valid anymore, "first" should be removed by a compaction. @@ -6959,7 +6966,8 @@ TEST_F(DBTest, CompactBetweenSnapshots) { // after we release the snapshot2, only one value should be left db_->ReleaseSnapshot(snapshot2); FillLevels("a", "z", 1); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ("sixth", Get(1, "foo")); ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]"); // skip HashCuckooRep as it does not support snapshot @@ -7256,7 +7264,7 @@ TEST_F(DBTest, ManualCompaction) { // Compact all MakeTables(1, "a", "z", 1); ASSERT_EQ("0,1,2", FilesPerLevel(1)); - db_->CompactRange(handles_[1], nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); ASSERT_EQ("0,0,1", FilesPerLevel(1)); if (iter == 0) { @@ -7294,7 +7302,9 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId, ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path)); // Full compaction to DB path 0 - db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 1); + CompactRangeOptions compact_options; + compact_options.target_path_id = 1; + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); @@ -7315,13 +7325,15 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId, ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); // Full compaction to DB path 0 - db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 0); + compact_options.target_path_id = 0; + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path)); // Fail when compacting to an invalid path ID - ASSERT_TRUE(db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 2) + compact_options.target_path_id = 2; + ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr) .IsInvalidArgument()); } @@ -7378,7 +7390,9 @@ TEST_F(DBTest, ManualLevelCompactionOutputPathId) { ASSERT_EQ("1,2", FilesPerLevel(1)); ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); - db_->CompactRange(handles_[1], nullptr, nullptr, false, 1, 1); + CompactRangeOptions compact_options; + compact_options.target_path_id = 1; + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); ASSERT_EQ("0,1", FilesPerLevel(1)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); @@ -7447,7 +7461,7 @@ TEST_F(DBTest, DBOpen_Change_NumLevels) { ASSERT_OK(Put(1, "a", "123")); ASSERT_OK(Put(1, "b", "234")); - db_->CompactRange(handles_[1], nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); Close(); options.create_if_missing = false; @@ -7518,7 +7532,7 @@ TEST_F(DBTest, DropWrites) { true /* disallow trivial move */); } } else { - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); } } @@ -8076,7 +8090,8 @@ TEST_F(DBTest, CompactOnFlush) { ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]"); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]"); // Case 2: Delete followed by another delete @@ -8085,7 +8100,8 @@ TEST_F(DBTest, CompactOnFlush) { ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]"); ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]"); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); // Case 3: Put followed by a delete @@ -8094,7 +8110,8 @@ TEST_F(DBTest, CompactOnFlush) { ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]"); ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]"); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); // Case 4: Put followed by another Put @@ -8103,12 +8120,14 @@ TEST_F(DBTest, CompactOnFlush) { ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]"); ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]"); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]"); // clear database Delete(1, "foo"); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); // Case 5: Put followed by snapshot followed by another Put @@ -8122,7 +8141,8 @@ TEST_F(DBTest, CompactOnFlush) { // clear database Delete(1, "foo"); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); // Case 5: snapshot followed by a put followed by another Put @@ -9061,10 +9081,9 @@ class ModelDB: public DB { } } using DB::CompactRange; - virtual Status CompactRange(ColumnFamilyHandle* column_family, - const Slice* start, const Slice* end, - bool reduce_level, int target_level, - uint32_t output_path_id) override { + virtual Status CompactRange(const CompactRangeOptions& options, + ColumnFamilyHandle* column_family, + const Slice* start, const Slice* end) override { return Status::NotSupported("Not supported operation."); } @@ -9432,7 +9451,8 @@ void PrefixScanInit(DBTest *dbtest) { keystr = std::string(buf); ASSERT_OK(dbtest->Put(keystr, keystr)); dbtest->Flush(); - dbtest->dbfull()->CompactRange(nullptr, nullptr); // move to level 1 + dbtest->dbfull()->CompactRange(CompactRangeOptions(), nullptr, + nullptr); // move to level 1 // GROUP 1 for (int i = 1; i <= small_range_sstfiles; i++) { @@ -9685,7 +9705,7 @@ TEST_F(DBTest, TailingIteratorIncomplete) { // we either see the entry or it's not in cache ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); - ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); iter->SeekToFirst(); // should still be true after compaction ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); @@ -9910,7 +9930,7 @@ TEST_F(DBTest, ManagedTailingIteratorIncomplete) { // we either see the entry or it's not in cache ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); - ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); iter->SeekToFirst(); // should still be true after compaction ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); @@ -10039,7 +10059,7 @@ TEST_F(DBTest, FIFOCompactionTest) { if (iter == 0) { ASSERT_OK(dbfull()->TEST_WaitForCompact()); } else { - ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); } // only 5 files should survive ASSERT_EQ(NumTableFilesAtLevel(0), 5); @@ -10760,7 +10780,7 @@ TEST_F(DBTest, DynamicMemtableOptions) { ASSERT_GT(SizeAtLevel(0), k64KB - k5KB); // Clean up L0 - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(NumTableFilesAtLevel(0), 0); // Increase buffer size @@ -10818,7 +10838,7 @@ TEST_F(DBTest, DynamicMemtableOptions) { {"max_write_buffer_number", "8"}, })); // Clean up memtable and L0 - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); SleepingBackgroundTask sleeping_task_low2; env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2, @@ -10839,7 +10859,7 @@ TEST_F(DBTest, DynamicMemtableOptions) { {"max_write_buffer_number", "4"}, })); // Clean up memtable and L0 - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); SleepingBackgroundTask sleeping_task_low3; env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low3, @@ -11077,7 +11097,7 @@ TEST_F(DBTest, PreShutdownManualCompaction) { MakeTables(1, "a", "z", 1); ASSERT_EQ("0,1,2", FilesPerLevel(1)); CancelAllBackgroundWork(db_); - db_->CompactRange(handles_[1], nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); ASSERT_EQ("0,1,2", FilesPerLevel(1)); if (iter == 0) { @@ -11349,7 +11369,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesBase) { } // Test compact range works - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); // All data should be in the last level. ColumnFamilyMetaData cf_meta; db_->GetColumnFamilyMetaData(&cf_meta); @@ -11542,7 +11562,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesCompactRange) { DestroyAndReopen(options); // Compact against empty DB - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); uint64_t int_prop; std::string str_prop; @@ -11583,7 +11603,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesCompactRange) { }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(output_levels.size(), 2); ASSERT_TRUE(output_levels.find(3) != output_levels.end()); ASSERT_TRUE(output_levels.find(4) != output_levels.end()); @@ -11701,7 +11721,10 @@ TEST_F(DBTest, MigrateToDynamicLevelMaxBytesBase) { // Issue manual compaction in one thread and still verify DB state // in main thread. std::thread t([&]() { - dbfull()->CompactRange(nullptr, nullptr, true, options.num_levels - 1); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = options.num_levels - 1; + dbfull()->CompactRange(compact_options, nullptr, nullptr); compaction_finished.store(true); }); do { @@ -12080,7 +12103,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { // Clean up memtable and L0. Block compaction threads. If continue to write // and flush memtables. We should see put timeout after 8 memtable flushes // since level0_stop_writes_trigger = 8 - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); // Block compaction SleepingBackgroundTask sleeping_task_low1; env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low1, @@ -12106,7 +12129,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_OK(dbfull()->SetOptions({ {"level0_stop_writes_trigger", "6"} })); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(NumTableFilesAtLevel(0), 0); // Block compaction @@ -12131,7 +12154,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_OK(dbfull()->SetOptions({ {"disable_auto_compactions", "true"} })); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(NumTableFilesAtLevel(0), 0); for (int i = 0; i < 4; ++i) { @@ -12147,7 +12170,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_OK(dbfull()->SetOptions({ {"disable_auto_compactions", "false"} })); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(NumTableFilesAtLevel(0), 0); for (int i = 0; i < 4; ++i) { @@ -12924,7 +12947,7 @@ TEST_F(DBTest, FilterCompactionTimeTest) { Flush(); } - ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_EQ(0U, CountLiveFiles()); Reopen(options); @@ -13338,7 +13361,7 @@ TEST_F(DBTest, PromoteL0Failure) { status = experimental::PromoteL0(db_, db_->DefaultColumnFamily()); ASSERT_TRUE(status.IsInvalidArgument()); - ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); // Now there is a file in L1. ASSERT_GE(NumTableFilesAtLevel(1, 0), 1); @@ -13365,7 +13388,7 @@ TEST_F(DBTest, HugeNumberOfLevels) { ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); } - ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); } // Github issue #595 @@ -13491,7 +13514,10 @@ TEST_F(DBTest, UniversalCompactionTargetLevel) { ASSERT_EQ("3", FilesPerLevel(0)); // Compact all files into 1 file and put it in L4 - db_->CompactRange(nullptr, nullptr, true, 4); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 4; + db_->CompactRange(compact_options, nullptr, nullptr); ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0)); } @@ -13516,7 +13542,7 @@ TEST_F(DBTest, SuggestCompactRangeNoTwoLevel0Compactions) { for (int num = 0; num < 10; num++) { GenerateNewRandomFile(&rnd); } - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"CompactionJob::Run():Start", diff --git a/db/deletefile_test.cc b/db/deletefile_test.cc index 83d7b0f22..a00568c32 100644 --- a/db/deletefile_test.cc +++ b/db/deletefile_test.cc @@ -201,8 +201,11 @@ TEST_F(DeleteFileTest, PurgeObsoleteFilesTest) { // 2 ssts, 1 manifest CheckFileTypeCounts(dbname_, 0, 2, 1); std::string first("0"), last("999999"); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 2; Slice first_slice(first), last_slice(last); - db_->CompactRange(&first_slice, &last_slice, true, 2); + db_->CompactRange(compact_options, &first_slice, &last_slice); // 1 sst after compaction CheckFileTypeCounts(dbname_, 0, 1, 1); @@ -211,7 +214,7 @@ TEST_F(DeleteFileTest, PurgeObsoleteFilesTest) { Iterator *itr = 0; CreateTwoLevels(); itr = db_->NewIterator(ReadOptions()); - db_->CompactRange(&first_slice, &last_slice, true, 2); + db_->CompactRange(compact_options, &first_slice, &last_slice); // 3 sst after compaction with live iterator CheckFileTypeCounts(dbname_, 0, 3, 1); delete itr; diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index 6926e2448..85157c8e6 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -659,7 +659,7 @@ class FaultInjectionTest : public testing::Test { Build(write_options, 0, num_pre_sync); if (sync_use_compact_) { - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); } write_options.sync = false; Build(write_options, num_pre_sync, num_post_sync); diff --git a/db/listener_test.cc b/db/listener_test.cc index a5acabbaf..39627df1f 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -201,7 +201,8 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) { ASSERT_OK(Flush(static_cast(i))); const Slice kStart = "a"; const Slice kEnd = "z"; - ASSERT_OK(dbfull()->CompactRange(handles_[i], &kStart, &kEnd)); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i], + &kStart, &kEnd)); dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } diff --git a/db/merge_test.cc b/db/merge_test.cc index 2fa7fae16..80aed6095 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -294,7 +294,7 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) { db->Flush(o); cout << "Compaction started ...\n"; - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); cout << "Compaction ended\n"; dumpDb(db); @@ -341,7 +341,7 @@ void testPartialMerge(Counters* counters, DB* db, size_t max_merge, tmp_sum += i; } db->Flush(o); - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(tmp_sum, counters->assert_get("b")); if (count > max_merge) { // in this case, FullMerge should be called instead. @@ -360,7 +360,7 @@ void testPartialMerge(Counters* counters, DB* db, size_t max_merge, tmp_sum += i; } db->Flush(o); - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(tmp_sum, counters->assert_get("c")); ASSERT_EQ(num_partial_merge_calls, 0U); } @@ -467,7 +467,7 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) { counters.add("test-key", 1); counters.add("test-key", 1); counters.add("test-key", 1); - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); } DB* reopen_db; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 54536e999..ed012455c 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -33,6 +33,7 @@ struct ReadOptions; struct WriteOptions; struct FlushOptions; struct CompactionOptions; +struct CompactRangeOptions; struct TableProperties; class WriteBatch; class Env; @@ -415,25 +416,42 @@ class DB { // begin==nullptr is treated as a key before all keys in the database. // end==nullptr is treated as a key after all keys in the database. // Therefore the following call will compact the entire database: - // db->CompactRange(nullptr, nullptr); + // db->CompactRange(options, nullptr, nullptr); // Note that after the entire database is compacted, all data are pushed - // down to the last level containing any data. If the total data size - // after compaction is reduced, that level might not be appropriate for - // hosting all the files. In this case, client could set change_level - // to true, to move the files back to the minimum level capable of holding - // the data set or a given level (specified by non-negative target_level). - // Compaction outputs should be placed in options.db_paths[target_path_id]. - // Behavior is undefined if target_path_id is out of range. - virtual Status CompactRange(ColumnFamilyHandle* column_family, - const Slice* begin, const Slice* end, - bool change_level = false, int target_level = -1, - uint32_t target_path_id = 0) = 0; - virtual Status CompactRange(const Slice* begin, const Slice* end, - bool change_level = false, int target_level = -1, - uint32_t target_path_id = 0) { - return CompactRange(DefaultColumnFamily(), begin, end, change_level, - target_level, target_path_id); + // down to the last level containing any data. If the total data size after + // compaction is reduced, that level might not be appropriate for hosting all + // the files. In this case, client could set options.change_level to true, to + // move the files back to the minimum level capable of holding the data set + // or a given level (specified by non-negative options.target_level). + virtual Status CompactRange(const CompactRangeOptions& options, + ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) = 0; + virtual Status CompactRange(const CompactRangeOptions& options, + const Slice* begin, const Slice* end) { + return CompactRange(options, DefaultColumnFamily(), begin, end); } + + __attribute__((deprecated)) virtual Status + CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, + const Slice* end, bool change_level = false, + int target_level = -1, uint32_t target_path_id = 0) { + CompactRangeOptions options; + options.change_level = change_level; + options.target_level = target_level; + options.target_path_id = target_path_id; + return CompactRange(options, column_family, begin, end); + } + __attribute__((deprecated)) virtual Status + CompactRange(const Slice* begin, const Slice* end, + bool change_level = false, int target_level = -1, + uint32_t target_path_id = 0) { + CompactRangeOptions options; + options.change_level = change_level; + options.target_level = target_level; + options.target_path_id = target_path_id; + return CompactRange(options, DefaultColumnFamily(), begin, end); + } + virtual Status SetOptions(ColumnFamilyHandle* column_family, const std::unordered_map& new_options) { return Status::NotSupported("Not implemented"); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 1fab82cbb..ef61246e2 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1237,6 +1237,19 @@ struct CompactionOptions { : compression(kSnappyCompression), output_file_size_limit(std::numeric_limits::max()) {} }; + +// CompactRangeOptions is used by CompactRange() call. +struct CompactRangeOptions { + // If true, compacted files will be moved to the minimum level capable + // of holding the data or given level (specified non-negative target_level). + bool change_level = false; + // If change_level is true and target_level have non-negative value, compacted + // files will be moved to target_level. + int target_level = -1; + // Compaction outputs will be placed in options.db_paths[target_path_id]. + // Behavior is undefined if target_path_id is out of range. + uint32_t target_path_id = 0; +}; } // namespace rocksdb #endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_ diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 6231b339b..6d39c99c5 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -127,12 +127,10 @@ class StackableDB : public DB { } using DB::CompactRange; - virtual Status CompactRange(ColumnFamilyHandle* column_family, - const Slice* begin, const Slice* end, - bool change_level = false, int target_level = -1, - uint32_t target_path_id = 0) override { - return db_->CompactRange(column_family, begin, end, change_level, - target_level, target_path_id); + virtual Status CompactRange(const CompactRangeOptions& options, + ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) override { + return db_->CompactRange(options, column_family, begin, end); } using DB::CompactFiles; diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 1b94dd8a1..221e7fff2 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -1476,13 +1476,15 @@ void rocksdb_compactrange_helper(JNIEnv* env, rocksdb::DB* db, jint jtarget_level, jint jtarget_path_id) { rocksdb::Status s; + rocksdb::CompactRangeOptions compact_options; + compact_options.change_level = jreduce_level; + compact_options.target_level = jtarget_level; + compact_options.target_path_id = static_cast(jtarget_path_id); if (cf_handle != nullptr) { - s = db->CompactRange(cf_handle, nullptr, nullptr, jreduce_level, - jtarget_level, static_cast(jtarget_path_id)); + s = db->CompactRange(compact_options, cf_handle, nullptr, nullptr); } else { // backwards compatibility - s = db->CompactRange(nullptr, nullptr, jreduce_level, - jtarget_level, static_cast(jtarget_path_id)); + s = db->CompactRange(compact_options, nullptr, nullptr); } if (s.ok()) { @@ -1533,13 +1535,15 @@ void rocksdb_compactrange_helper(JNIEnv* env, rocksdb::DB* db, const rocksdb::Slice end_slice(reinterpret_cast(end), jend_len); rocksdb::Status s; + rocksdb::CompactRangeOptions compact_options; + compact_options.change_level = jreduce_level; + compact_options.target_level = jtarget_level; + compact_options.target_path_id = static_cast(jtarget_path_id); if (cf_handle != nullptr) { - s = db->CompactRange(cf_handle, &begin_slice, &end_slice, jreduce_level, - jtarget_level, static_cast(jtarget_path_id)); + s = db->CompactRange(compact_options, cf_handle, &begin_slice, &end_slice); } else { // backwards compatibility - s = db->CompactRange(&begin_slice, &end_slice, jreduce_level, - jtarget_level, static_cast(jtarget_path_id)); + s = db->CompactRange(compact_options, &begin_slice, &end_slice); } env->ReleaseByteArrayElements(jbegin, begin, JNI_ABORT); diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 2e19cd0e9..fc334b407 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -441,7 +441,7 @@ void CompactorCommand::DoCommand() { end = new Slice(to_); } - db_->CompactRange(begin, end); + db_->CompactRange(CompactRangeOptions(), begin, end); exec_state_ = LDBCommandExecuteResult::Succeed(""); delete begin; @@ -519,7 +519,7 @@ void DBLoaderCommand::DoCommand() { cout << "Warning: " << bad_lines << " bad lines ignored." << endl; } if (compact_) { - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); } } @@ -1204,7 +1204,7 @@ void ReduceDBLevelsCommand::DoCommand() { } // Compact the whole DB to put all files to the highest level. fprintf(stdout, "Compacting the db...\n"); - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); CloseDB(); EnvOptions soptions; @@ -1309,9 +1309,10 @@ void ChangeCompactionStyleCommand::DoCommand() { files_per_level.c_str()); // manual compact into a single file and move the file to level 0 - db_->CompactRange(nullptr, nullptr, - true /* reduce level */, - 0 /* reduce to level 0 */); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 0; + db_->CompactRange(compact_options, nullptr, nullptr); // verify compaction result files_per_level = ""; diff --git a/util/manual_compaction_test.cc b/util/manual_compaction_test.cc index 6eedd0366..12c7486a3 100644 --- a/util/manual_compaction_test.cc +++ b/util/manual_compaction_test.cc @@ -77,7 +77,7 @@ TEST_F(ManualCompactionTest, CompactTouchesAllKeys) { db->Put(WriteOptions(), Slice("key4"), Slice("destroy")); Slice key4("key4"); - db->CompactRange(nullptr, &key4); + db->CompactRange(CompactRangeOptions(), nullptr, &key4); Iterator* itr = db->NewIterator(ReadOptions()); itr->SeekToFirst(); ASSERT_TRUE(itr->Valid()); @@ -130,7 +130,7 @@ TEST_F(ManualCompactionTest, Test) { rocksdb::Slice greatest(end_key.data(), end_key.size()); // commenting out the line below causes the example to work correctly - db->CompactRange(&least, &greatest); + db->CompactRange(CompactRangeOptions(), &least, &greatest); // count the keys rocksdb::Iterator* iter = db->NewIterator(rocksdb::ReadOptions()); diff --git a/utilities/compacted_db/compacted_db_impl.h b/utilities/compacted_db/compacted_db_impl.h index 2754ed3a2..ec2d53762 100644 --- a/utilities/compacted_db/compacted_db_impl.h +++ b/utilities/compacted_db/compacted_db_impl.h @@ -54,10 +54,9 @@ class CompactedDBImpl : public DBImpl { return Status::NotSupported("Not supported in compacted db mode."); } using DBImpl::CompactRange; - virtual Status CompactRange(ColumnFamilyHandle* column_family, - const Slice* begin, const Slice* end, - bool change_level = false, int target_level = -1, - uint32_t target_path_id = 0) override { + virtual Status CompactRange(const CompactRangeOptions& options, + ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) override { return Status::NotSupported("Not supported in compacted db mode."); } diff --git a/utilities/merge_operators/string_append/stringappend_test.cc b/utilities/merge_operators/string_append/stringappend_test.cc index a0d137c8e..92621de92 100644 --- a/utilities/merge_operators/string_append/stringappend_test.cc +++ b/utilities/merge_operators/string_append/stringappend_test.cc @@ -515,7 +515,7 @@ TEST_F(StringAppendOperatorTest, PersistentFlushAndCompaction) { slists.Append("c", "bbnagnagsx"); slists.Append("a", "sa"); slists.Append("b", "df"); - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); slists.Get("a", &a); slists.Get("b", &b); slists.Get("c", &c); @@ -536,7 +536,7 @@ TEST_F(StringAppendOperatorTest, PersistentFlushAndCompaction) { ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh"); // Compact, Get - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk"); ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;"); ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh"); @@ -544,7 +544,7 @@ TEST_F(StringAppendOperatorTest, PersistentFlushAndCompaction) { // Append, Flush, Compact, Get slists.Append("b", "afcg"); db->Flush(rocksdb::FlushOptions()); - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); slists.Get("b", &b); ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;\nafcg"); } diff --git a/utilities/spatialdb/spatial_db.cc b/utilities/spatialdb/spatial_db.cc index a90185338..33a543817 100644 --- a/utilities/spatialdb/spatial_db.cc +++ b/utilities/spatialdb/spatial_db.cc @@ -589,7 +589,7 @@ class SpatialDBImpl : public SpatialDB { Status t = Flush(FlushOptions(), cfh); if (t.ok()) { - t = CompactRange(cfh, nullptr, nullptr); + t = CompactRange(CompactRangeOptions(), cfh, nullptr, nullptr); } { diff --git a/utilities/ttl/ttl_test.cc b/utilities/ttl/ttl_test.cc index 35ca34a05..e31c4d327 100644 --- a/utilities/ttl/ttl_test.cc +++ b/utilities/ttl/ttl_test.cc @@ -168,9 +168,9 @@ class TtlTest : public testing::Test { // Runs a manual compaction void ManualCompact(ColumnFamilyHandle* cf = nullptr) { if (cf == nullptr) { - db_ttl_->CompactRange(nullptr, nullptr); + db_ttl_->CompactRange(CompactRangeOptions(), nullptr, nullptr); } else { - db_ttl_->CompactRange(cf, nullptr, nullptr); + db_ttl_->CompactRange(CompactRangeOptions(), cf, nullptr, nullptr); } } From 2dc3910b5e56afdaeea7133d6fe17a89e3f7e75a Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 17 Jun 2015 16:44:52 -0700 Subject: [PATCH 8/8] Add --benchmark_write_rate_limit option to db_bench Summary: So far, we benchmarked RocksDB by writing as fast as possible. With this change, we're able to limit our write throughput, which should help us better understand how RocksDB performes under varying write workloads. Specifically, I'm currently interested in the shape of the graph that has write throughput on one axis and write rate on another. This should help us with designing our stall system, as we have started to do with D36351. Test Plan: $ ./db_bench --benchmarks=fillrandom --benchmark_write_rate_limit=1000000 fillrandom : 118.523 micros/op 8437 ops/sec; 0.9 MB/s $ ./db_bench --benchmarks=fillrandom --benchmark_write_rate_limit=2000000 fillrandom : 59.136 micros/op 16910 ops/sec; 1.9 MB/s Reviewers: MarkCallaghan, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D39759 --- db/db_bench.cc | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index f7096bad3..296a90c69 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -567,6 +567,10 @@ DEFINE_int32(rate_limit_delay_max_milliseconds, 1000, DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value."); +DEFINE_uint64( + benchmark_write_rate_limit, 0, + "If non-zero, db_bench will rate-limit the writes going into RocksDB"); + DEFINE_int32(max_grandparent_overlap_factor, 10, "Control maximum bytes of " "overlaps in grandparent (i.e., level+2) before we stop building a" " single file in a level->level+1 compaction."); @@ -1288,6 +1292,7 @@ struct SharedState { port::CondVar cv; int total; int perf_level; + std::shared_ptr write_rate_limiter; // Each thread goes through the following states: // (1) initializing @@ -1400,7 +1405,7 @@ class Benchmark { (((FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio) * num_) / 1048576.0)); - fprintf(stdout, "Write rate limit: %d\n", FLAGS_writes_per_second); + fprintf(stdout, "Writes per second: %d\n", FLAGS_writes_per_second); if (FLAGS_enable_numa) { fprintf(stderr, "Running in NUMA enabled mode.\n"); #ifndef NUMA @@ -1950,6 +1955,10 @@ class Benchmark { shared.num_initialized = 0; shared.num_done = 0; shared.start = false; + if (FLAGS_benchmark_write_rate_limit > 0) { + shared.write_rate_limiter.reset( + NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit)); + } std::unique_ptr reporter_agent; if (FLAGS_report_interval_seconds > 0) { @@ -2646,6 +2655,10 @@ class Benchmark { DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id); batch.Clear(); for (int64_t j = 0; j < entries_per_batch_; j++) { + if (thread->shared->write_rate_limiter.get() != nullptr) { + thread->shared->write_rate_limiter->Request(value_size_ + key_size_, + Env::IO_HIGH); + } int64_t rand_num = key_gens[id]->Next(); GenerateKeyFromInt(rand_num, FLAGS_num, &key); if (FLAGS_num_column_families <= 1) {