diff --git a/HISTORY.md b/HISTORY.md index 033c8f1fb..1559bc312 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -16,6 +16,7 @@ * options.hard_rate_limit is deprecated. * When options.soft_rate_limit or options.level0_slowdown_writes_trigger is triggered, the way to slow down writes is changed to: write rate to DB is limited to to options.delayed_write_rate. * DB::GetApproximateSizes() adds a parameter to allow the estimation to include data in mem table, with default to be not to include. It is now only supported in skip list mem table. +* DB::CompactRange() now accept CompactRangeOptions instead of multiple paramters. CompactRangeOptions is defined in include/rocksdb/options.h. ## 3.11.0 (5/19/2015) ### New Features diff --git a/dockerbuild.sh b/build_tools/dockerbuild.sh similarity index 100% rename from dockerbuild.sh rename to build_tools/dockerbuild.sh diff --git a/db/c.cc b/db/c.cc index fbf2c4ef0..48339c357 100644 --- a/db/c.cc +++ b/db/c.cc @@ -77,6 +77,7 @@ using rocksdb::BackupEngine; using rocksdb::BackupableDBOptions; using rocksdb::BackupInfo; using rocksdb::RestoreOptions; +using rocksdb::CompactRangeOptions; using std::shared_ptr; @@ -1006,6 +1007,7 @@ void rocksdb_compact_range( const char* limit_key, size_t limit_key_len) { Slice a, b; db->rep->CompactRange( + CompactRangeOptions(), // Pass nullptr Slice if corresponding "const char*" is nullptr (start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr), (limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr)); @@ -1018,7 +1020,7 @@ void rocksdb_compact_range_cf( const char* limit_key, size_t limit_key_len) { Slice a, b; db->rep->CompactRange( - column_family->rep, + CompactRangeOptions(), column_family->rep, // Pass nullptr Slice if corresponding "const char*" is nullptr (start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr), (limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr)); @@ -1930,6 +1932,14 @@ void rocksdb_options_set_fifo_compaction_options( opt->rep.compaction_options_fifo = fifo->rep; } +char *rocksdb_options_statistics_get_string(rocksdb_options_t *opt) { + rocksdb::Statistics *statistics = opt->rep.statistics.get(); + if (statistics) { + return strdup(statistics->ToString().c_str()); + } + return nullptr; +} + /* TODO: DB::OpenForReadOnly @@ -2435,4 +2445,4 @@ extern void rocksdb_livefiles_destroy( } // end extern "C" -#endif // ROCKSDB_LITE +#endif // !ROCKSDB_LITE diff --git a/db/c_test.c b/db/c_test.c index 2669b03ad..978b6174c 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -2,6 +2,8 @@ Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. See the AUTHORS file for names of contributors. */ +#ifndef ROCKSDB_LITE // Lite does not support C API + #include "rocksdb/c.h" #include @@ -1007,3 +1009,13 @@ int main(int argc, char** argv) { fprintf(stderr, "PASS\n"); return 0; } + +#else +#include + +int main() { + fprintf(stderr, "SKIPPED\n"); + return 0; +} + +#endif // !ROCKSDB_LITE diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 484a4c66a..283b8ede1 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -215,11 +215,13 @@ class ColumnFamilyTest : public testing::Test { } void CompactAll(int cf) { - ASSERT_OK(db_->CompactRange(handles_[cf], nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf], nullptr, + nullptr)); } void Compact(int cf, const Slice& start, const Slice& limit) { - ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit)); + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit)); } int NumTableFilesAtLevel(int level, int cf) { diff --git a/db/compaction_job_stats_test.cc b/db/compaction_job_stats_test.cc index 64894d6aa..3efbbfdeb 100644 --- a/db/compaction_job_stats_test.cc +++ b/db/compaction_job_stats_test.cc @@ -309,16 +309,18 @@ class CompactionJobStatsTest : public testing::Test { void Compact(int cf, const Slice& start, const Slice& limit, uint32_t target_path_id) { - ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit, false, -1, - target_path_id)); + CompactRangeOptions compact_options; + compact_options.target_path_id = target_path_id; + ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit)); } void Compact(int cf, const Slice& start, const Slice& limit) { - ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit)); + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit)); } void Compact(const Slice& start, const Slice& limit) { - ASSERT_OK(db_->CompactRange(&start, &limit)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit)); } void TEST_Compact(int level, int cf, const Slice& start, const Slice& limit) { diff --git a/db/db_bench.cc b/db/db_bench.cc index a50b66bed..296a90c69 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -567,6 +567,10 @@ DEFINE_int32(rate_limit_delay_max_milliseconds, 1000, DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value."); +DEFINE_uint64( + benchmark_write_rate_limit, 0, + "If non-zero, db_bench will rate-limit the writes going into RocksDB"); + DEFINE_int32(max_grandparent_overlap_factor, 10, "Control maximum bytes of " "overlaps in grandparent (i.e., level+2) before we stop building a" " single file in a level->level+1 compaction."); @@ -1288,6 +1292,7 @@ struct SharedState { port::CondVar cv; int total; int perf_level; + std::shared_ptr write_rate_limiter; // Each thread goes through the following states: // (1) initializing @@ -1400,7 +1405,7 @@ class Benchmark { (((FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio) * num_) / 1048576.0)); - fprintf(stdout, "Write rate limit: %d\n", FLAGS_writes_per_second); + fprintf(stdout, "Writes per second: %d\n", FLAGS_writes_per_second); if (FLAGS_enable_numa) { fprintf(stderr, "Running in NUMA enabled mode.\n"); #ifndef NUMA @@ -1950,6 +1955,10 @@ class Benchmark { shared.num_initialized = 0; shared.num_done = 0; shared.start = false; + if (FLAGS_benchmark_write_rate_limit > 0) { + shared.write_rate_limiter.reset( + NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit)); + } std::unique_ptr reporter_agent; if (FLAGS_report_interval_seconds > 0) { @@ -2646,6 +2655,10 @@ class Benchmark { DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id); batch.Clear(); for (int64_t j = 0; j < entries_per_batch_; j++) { + if (thread->shared->write_rate_limiter.get() != nullptr) { + thread->shared->write_rate_limiter->Request(value_size_ + key_size_, + Env::IO_HIGH); + } int64_t rand_num = key_gens[id]->Next(); GenerateKeyFromInt(rand_num, FLAGS_num, &key); if (FLAGS_num_column_families <= 1) { @@ -3748,7 +3761,7 @@ class Benchmark { void Compact(ThreadState* thread) { DB* db = SelectDB(thread); - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); } void PrintStats(const char* key) { diff --git a/db/db_impl.cc b/db/db_impl.cc index aca964f84..a8a1389f9 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -95,7 +95,6 @@ void DumpRocksDBBuildVersion(Logger * log); struct DBImpl::WriteContext { autovector superversions_to_free_; autovector memtables_to_free_; - bool schedule_bg_work_ = false; ~WriteContext() { for (auto& sv : superversions_to_free_) { @@ -1249,7 +1248,8 @@ Status DBImpl::FlushMemTableToOutputFile( Status s = flush_job.Run(&file_meta); if (s.ok()) { - InstallSuperVersionBackground(cfd, job_context, mutable_cf_options); + InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context, + mutable_cf_options); if (madeProgress) { *madeProgress = 1; } @@ -1328,11 +1328,10 @@ void DBImpl::NotifyOnFlushCompleted( #endif // ROCKSDB_LITE } -Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, - const Slice* begin, const Slice* end, - bool change_level, int target_level, - uint32_t target_path_id) { - if (target_path_id >= db_options_.db_paths.size()) { +Status DBImpl::CompactRange(const CompactRangeOptions& options, + ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) { + if (options.target_path_id >= db_options_.db_paths.size()) { return Status::InvalidArgument("Invalid target path ID"); } @@ -1362,8 +1361,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, cfd->NumberLevels() > 1) { // Always compact all files together. s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, - cfd->NumberLevels() - 1, target_path_id, begin, - end); + cfd->NumberLevels() - 1, options.target_path_id, + begin, end); final_output_level = cfd->NumberLevels() - 1; } else { for (int level = 0; level <= max_level_with_files; level++) { @@ -1384,8 +1383,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, output_level = ColumnFamilyData::kCompactToBaseLevel; } } - s = RunManualCompaction(cfd, level, output_level, target_path_id, begin, - end); + s = RunManualCompaction(cfd, level, output_level, options.target_path_id, + begin, end); if (!s.ok()) { break; } @@ -1403,8 +1402,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, return s; } - if (change_level) { - s = ReFitLevel(cfd, final_output_level, target_level); + if (options.change_level) { + s = ReFitLevel(cfd, final_output_level, options.target_level); } LogFlush(db_options_.info_log); @@ -1578,8 +1577,8 @@ Status DBImpl::CompactFilesImpl( compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); if (status.ok()) { - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); } c->ReleaseCompactionFiles(s); c.reset(); @@ -1791,7 +1790,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, directories_.GetDbDir()); - superversion_to_free = InstallSuperVersion( + superversion_to_free = InstallSuperVersionAndScheduleWork( cfd, new_superversion, mutable_cf_options); new_superversion = nullptr; @@ -1945,9 +1944,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, s = write_thread_.EnterWriteThread(&w, 0); assert(s.ok() && !w.done); // No timeout and nobody should do our job - // SetNewMemtableAndNewLogFile() will release and reacquire mutex + // SwitchMemtable() will release and reacquire mutex // during execution - s = SetNewMemtableAndNewLogFile(cfd, &context); + s = SwitchMemtable(cfd, &context); write_thread_.ExitWriteThread(&w, &w, s); cfd->imm()->FlushRequested(); @@ -2410,10 +2409,10 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, c->inputs(0)->size()); // There are three things that can change compaction score: // 1) When flush or compaction finish. This case is covered by - // InstallSuperVersion() + // InstallSuperVersionAndScheduleWork // 2) When MutableCFOptions changes. This case is also covered by - // InstallSuperVersion(), because this is when the new options take - // effect. + // InstallSuperVersionAndScheduleWork, because this is when the new + // options take effect. // 3) When we Pick a new compaction, we "remove" those files being // compacted from the calculation, which then influences compaction // score. Here we check if we need the new compaction even without the @@ -2449,8 +2448,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, status = versions_->LogAndApply(c->column_family_data(), *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); LogToBuffer(log_buffer, "[%s] Deleted %d files\n", c->column_family_data()->GetName().c_str(), c->num_input_files(0)); @@ -2486,8 +2485,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); // Use latest MutableCFOptions - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); VersionStorageInfo::LevelSummaryStorage tmp; c->column_family_data()->internal_stats()->IncBytesMoved(c->level() + 1, @@ -2532,8 +2531,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); if (status.ok()) { - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); } *madeProgress = true; } @@ -2695,26 +2694,25 @@ Status DBImpl::Get(const ReadOptions& read_options, // * malloc one SuperVersion() outside of the lock -- new_superversion // * delete SuperVersion()s outside of the lock -- superversions_to_free // -// However, if InstallSuperVersion() gets called twice with the same -// job_context, we can't reuse the SuperVersion() that got -// malloced -// because +// However, if InstallSuperVersionAndScheduleWork() gets called twice with the +// same job_context, we can't reuse the SuperVersion() that got +// malloced because // first call already used it. In that rare case, we take a hit and create a // new SuperVersion() inside of the mutex. We do similar thing // for superversion_to_free -void DBImpl::InstallSuperVersionBackground( +void DBImpl::InstallSuperVersionAndScheduleWorkWrapper( ColumnFamilyData* cfd, JobContext* job_context, const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); - SuperVersion* old_superversion = InstallSuperVersion( + SuperVersion* old_superversion = InstallSuperVersionAndScheduleWork( cfd, job_context->new_superversion, mutable_cf_options); job_context->new_superversion = nullptr; job_context->superversions_to_free.push_back(old_superversion); } -SuperVersion* DBImpl::InstallSuperVersion( +SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork( ColumnFamilyData* cfd, SuperVersion* new_sv, - const MutableCFOptions& mutable_cf_options, bool dont_schedule_bg_work) { + const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); // Update max_total_in_memory_state_ @@ -2729,14 +2727,10 @@ SuperVersion* DBImpl::InstallSuperVersion( new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options); // Whenever we install new SuperVersion, we might need to issue new flushes or - // compactions. dont_schedule_bg_work is true when scheduling from write - // thread and we don't want to add additional overhead. Callers promise to - // call SchedulePendingFlush() and MaybeScheduleFlushOrCompaction() eventually - if (!dont_schedule_bg_work) { - SchedulePendingFlush(cfd); - SchedulePendingCompaction(cfd); - MaybeScheduleFlushOrCompaction(); - } + // compactions. + SchedulePendingFlush(cfd); + SchedulePendingCompaction(cfd); + MaybeScheduleFlushOrCompaction(); // Update max_total_in_memory_state_ max_total_in_memory_state_ = @@ -2947,7 +2941,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, auto* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); assert(cfd != nullptr); - delete InstallSuperVersion( + delete InstallSuperVersionAndScheduleWork( cfd, nullptr, *cfd->GetLatestMutableCFOptions()); if (!cfd->mem()->IsSnapshotSupported()) { @@ -3371,15 +3365,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, continue; } if (cfd->GetLogNumber() <= flush_column_family_if_log_file) { - status = SetNewMemtableAndNewLogFile(cfd, &context); + status = SwitchMemtable(cfd, &context); if (!status.ok()) { break; } cfd->imm()->FlushRequested(); SchedulePendingFlush(cfd); - context.schedule_bg_work_ = true; } } + MaybeScheduleFlushOrCompaction(); } else if (UNLIKELY(write_buffer_.ShouldFlush())) { Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Flushing all column families. Write buffer is using %" PRIu64 @@ -3392,13 +3386,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, continue; } if (!cfd->mem()->IsEmpty()) { - status = SetNewMemtableAndNewLogFile(cfd, &context); + status = SwitchMemtable(cfd, &context); if (!status.ok()) { break; } cfd->imm()->FlushRequested(); SchedulePendingFlush(cfd); - context.schedule_bg_work_ = true; } } MaybeScheduleFlushOrCompaction(); @@ -3414,11 +3407,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (UNLIKELY(status.ok()) && (write_controller_.IsStopped() || write_controller_.NeedsDelay())) { - // If writer is stopped, we need to get it going, - // so schedule flushes/compactions - if (context.schedule_bg_work_) { - MaybeScheduleFlushOrCompaction(); - } PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_delay_time); // We don't know size of curent batch so that we always use the size @@ -3560,9 +3548,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, mutex_.AssertHeld(); write_thread_.ExitWriteThread(&w, last_writer, status); - if (context.schedule_bg_work_) { - MaybeScheduleFlushOrCompaction(); - } mutex_.Unlock(); if (status.IsTimedOut()) { @@ -3633,9 +3618,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, uint64_t expiration_time) { Status DBImpl::ScheduleFlushes(WriteContext* context) { ColumnFamilyData* cfd; while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) { - auto status = SetNewMemtableAndNewLogFile(cfd, context); - SchedulePendingFlush(cfd); - context->schedule_bg_work_ = true; + auto status = SwitchMemtable(cfd, context); if (cfd->Unref()) { delete cfd; } @@ -3648,8 +3631,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, - WriteContext* context) { +Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { mutex_.AssertHeld(); unique_ptr lfile; log::Writer* new_log = nullptr; @@ -3719,8 +3701,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); new_mem->Ref(); cfd->SetMemtable(new_mem); - context->superversions_to_free_.push_back( - InstallSuperVersion(cfd, new_superversion, mutable_cf_options, true)); + context->superversions_to_free_.push_back(InstallSuperVersionAndScheduleWork( + cfd, new_superversion, mutable_cf_options)); return s; } @@ -4010,8 +3992,8 @@ Status DBImpl::DeleteFile(std::string name) { status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionBackground(cfd, &job_context, - *cfd->GetLatestMutableCFOptions()); + InstallSuperVersionAndScheduleWorkWrapper( + cfd, &job_context, *cfd->GetLatestMutableCFOptions()); } FindObsoleteFiles(&job_context, false); } // lock released here @@ -4253,7 +4235,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - delete impl->InstallSuperVersion( + delete impl->InstallSuperVersionAndScheduleWork( cfd, nullptr, *cfd->GetLatestMutableCFOptions()); } impl->alive_log_files_.push_back( diff --git a/db/db_impl.h b/db/db_impl.h index 89afda987..a649b2baa 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -125,10 +125,9 @@ class DBImpl : public DB { const Range* range, int n, uint64_t* sizes, bool include_memtable = false) override; using DB::CompactRange; - virtual Status CompactRange(ColumnFamilyHandle* column_family, - const Slice* begin, const Slice* end, - bool change_level = false, int target_level = -1, - uint32_t target_path_id = 0) override; + virtual Status CompactRange(const CompactRangeOptions& options, + ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) override; using DB::CompactFiles; virtual Status CompactFiles(const CompactionOptions& compact_options, @@ -440,8 +439,7 @@ class DBImpl : public DB { Status ScheduleFlushes(WriteContext* context); - Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, - WriteContext* context); + Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); // Force current memtable contents to be flushed. Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options); @@ -719,21 +717,16 @@ class DBImpl : public DB { // the InstallSuperVersion() function. Background threads carry // job_context which can have new_superversion already // allocated. - void InstallSuperVersionBackground( + void InstallSuperVersionAndScheduleWorkWrapper( ColumnFamilyData* cfd, JobContext* job_context, const MutableCFOptions& mutable_cf_options); // All ColumnFamily state changes go through this function. Here we analyze // the new state and we schedule background work if we detect that the new // state needs flush or compaction. - // If dont_schedule_bg_work == true, then caller asks us to not schedule flush - // or compaction here, but it also promises to schedule needed background - // work. We use this to scheduling background compactions when we are in the - // write thread, which is very performance critical. Caller schedules - // background work as soon as it exits the write thread - SuperVersion* InstallSuperVersion(ColumnFamilyData* cfd, SuperVersion* new_sv, - const MutableCFOptions& mutable_cf_options, - bool dont_schedule_bg_work = false); + SuperVersion* InstallSuperVersionAndScheduleWork( + ColumnFamilyData* cfd, SuperVersion* new_sv, + const MutableCFOptions& mutable_cf_options); #ifndef ROCKSDB_LITE using DB::GetPropertiesOfAllTables; diff --git a/db/db_impl_experimental.cc b/db/db_impl_experimental.cc index 65529307b..6bf0ba6a1 100644 --- a/db/db_impl_experimental.cc +++ b/db/db_impl_experimental.cc @@ -137,8 +137,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) { status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionBackground(cfd, &job_context, - *cfd->GetLatestMutableCFOptions()); + InstallSuperVersionAndScheduleWorkWrapper( + cfd, &job_context, *cfd->GetLatestMutableCFOptions()); } } // lock released here LogFlush(db_options_.info_log); diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 25fcb4350..fc14c04bc 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -58,10 +58,9 @@ class DBImplReadOnly : public DBImpl { return Status::NotSupported("Not supported operation in read only mode."); } using DBImpl::CompactRange; - virtual Status CompactRange(ColumnFamilyHandle* column_family, - const Slice* begin, const Slice* end, - bool reduce_level = false, int target_level = -1, - uint32_t target_path_id = 0) override { + virtual Status CompactRange(const CompactRangeOptions& options, + ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) override { return Status::NotSupported("Not supported operation in read only mode."); } diff --git a/db/db_test.cc b/db/db_test.cc index a86e755b2..ac6f8ec19 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1092,16 +1092,18 @@ class DBTest : public testing::Test { void Compact(int cf, const Slice& start, const Slice& limit, uint32_t target_path_id) { - ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit, false, -1, - target_path_id)); + CompactRangeOptions compact_options; + compact_options.target_path_id = target_path_id; + ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit)); } void Compact(int cf, const Slice& start, const Slice& limit) { - ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit)); + ASSERT_OK( + db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit)); } void Compact(const Slice& start, const Slice& limit) { - ASSERT_OK(db_->CompactRange(&start, &limit)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit)); } // Do n memtable compactions, each of which produces an sstable @@ -1524,7 +1526,7 @@ TEST_F(DBTest, CompactedDB) { ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h'))); ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i'))); ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j'))); - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(3, NumTableFilesAtLevel(1)); Close(); @@ -2339,7 +2341,7 @@ TEST_F(DBTest, WholeKeyFilterProp) { // ranges. ASSERT_OK(dbfull()->Put(wo, "aaa", "")); ASSERT_OK(dbfull()->Put(wo, "zzz", "")); - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); // Reopen with both of whole key off and prefix extractor enabled. // Still no bloom filter should be used. @@ -2362,7 +2364,7 @@ TEST_F(DBTest, WholeKeyFilterProp) { // ranges. ASSERT_OK(dbfull()->Put(wo, "aaa", "")); ASSERT_OK(dbfull()->Put(wo, "zzz", "")); - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); options.prefix_extractor.reset(); bbto.whole_key_filtering = true; @@ -3790,7 +3792,7 @@ TEST_F(DBTest, TrivialMoveOneFile) { LiveFileMetaData level0_file = metadata[0]; // L0 file meta // Compaction will initiate a trivial move from L0 to L1 - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); // File moved From L0 to L1 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0 @@ -3855,7 +3857,7 @@ TEST_F(DBTest, TrivialMoveNonOverlappingFiles) { // Since data is non-overlapping we expect compaction to initiate // a trivial move - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); // We expect that all the files were trivially moved from L0 to L1 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files); @@ -3892,7 +3894,7 @@ TEST_F(DBTest, TrivialMoveNonOverlappingFiles) { ASSERT_OK(Flush()); } - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); for (uint32_t i = 0; i < ranges.size(); i++) { for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { @@ -3944,7 +3946,10 @@ TEST_F(DBTest, TrivialMoveTargetLevel) { // 2 files in L0 ASSERT_EQ("2", FilesPerLevel(0)); - ASSERT_OK(db_->CompactRange(nullptr, nullptr, true, 6)); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 6; + ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); // 2 files in L6 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0)); @@ -5423,17 +5428,13 @@ TEST_F(DBTest, ConvertCompactionStyle) { options = CurrentOptions(options); ReopenWithColumnFamilies({"default", "pikachu"}, options); - dbfull()->CompactRange(handles_[1], nullptr, nullptr, true /* reduce level */, - 0 /* reduce to level 0 */); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 0; + dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr); - for (int i = 0; i < options.num_levels; i++) { - int num = NumTableFilesAtLevel(i, 1); - if (i == 0) { - ASSERT_EQ(num, 1); - } else { - ASSERT_EQ(num, 0); - } - } + // Only 1 file in L0 + ASSERT_EQ("1", FilesPerLevel(1)); // Stage 4: re-open in universal compaction style and do some db operations options = CurrentOptions(); @@ -5548,8 +5549,10 @@ TEST_F(DBTest, IncreaseUniversalCompactionNumLevels) { options.target_file_size_base = INT_MAX; ReopenWithColumnFamilies({"default", "pikachu"}, options); // Compact all to level 0 - dbfull()->CompactRange(handles_[1], nullptr, nullptr, true /* reduce level */, - 0 /* reduce to level 0 */); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 0; + dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr); // Need to restart it once to remove higher level records in manifest. ReopenWithColumnFamilies({"default", "pikachu"}, options); // Final reopen @@ -6021,7 +6024,7 @@ TEST_F(DBTest, CompactionFilterDeletesAll) { } // this will produce empty file (delete compaction filter) - ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_EQ(0U, CountLiveFiles()); Reopen(options); @@ -6062,7 +6065,8 @@ TEST_F(DBTest, CompactionFilterWithValueChange) { dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); } else { - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); } // re-write all data again @@ -6079,7 +6083,8 @@ TEST_F(DBTest, CompactionFilterWithValueChange) { dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); } else { - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); } // verify that all keys now have the new value that @@ -6120,7 +6125,7 @@ TEST_F(DBTest, CompactionFilterWithMergeOperator) { ASSERT_OK(Flush()); std::string newvalue = Get("foo"); ASSERT_EQ(newvalue, three); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); newvalue = Get("foo"); ASSERT_EQ(newvalue, three); @@ -6128,12 +6133,12 @@ TEST_F(DBTest, CompactionFilterWithMergeOperator) { // merge keys. ASSERT_OK(db_->Put(WriteOptions(), "bar", two)); ASSERT_OK(Flush()); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); newvalue = Get("bar"); ASSERT_EQ("NOT_FOUND", newvalue); ASSERT_OK(db_->Merge(WriteOptions(), "bar", two)); ASSERT_OK(Flush()); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); newvalue = Get("bar"); ASSERT_EQ(two, two); @@ -6144,7 +6149,7 @@ TEST_F(DBTest, CompactionFilterWithMergeOperator) { ASSERT_OK(Flush()); newvalue = Get("foobar"); ASSERT_EQ(newvalue, three); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); newvalue = Get("foobar"); ASSERT_EQ(newvalue, three); @@ -6157,7 +6162,7 @@ TEST_F(DBTest, CompactionFilterWithMergeOperator) { ASSERT_OK(Flush()); newvalue = Get("barfoo"); ASSERT_EQ(newvalue, four); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); newvalue = Get("barfoo"); ASSERT_EQ(newvalue, four); } @@ -6191,7 +6196,7 @@ TEST_F(DBTest, CompactionFilterContextManual) { filter->expect_manual_compaction_.store(true); filter->expect_full_compaction_.store(false); // Manual compaction always // set this flag. - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(cfilter_count, 700); ASSERT_EQ(NumSortedRuns(0), 1); @@ -6939,7 +6944,8 @@ TEST_F(DBTest, CompactBetweenSnapshots) { // After a compaction, "second", "third" and "fifth" should // be removed FillLevels("a", "z", 1); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ("sixth", Get(1, "foo")); ASSERT_EQ("fourth", Get(1, "foo", snapshot2)); ASSERT_EQ("first", Get(1, "foo", snapshot1)); @@ -6948,7 +6954,8 @@ TEST_F(DBTest, CompactBetweenSnapshots) { // after we release the snapshot1, only two values left db_->ReleaseSnapshot(snapshot1); FillLevels("a", "z", 1); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); // We have only one valid snapshot snapshot2. Since snapshot1 is // not valid anymore, "first" should be removed by a compaction. @@ -6959,7 +6966,8 @@ TEST_F(DBTest, CompactBetweenSnapshots) { // after we release the snapshot2, only one value should be left db_->ReleaseSnapshot(snapshot2); FillLevels("a", "z", 1); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ("sixth", Get(1, "foo")); ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]"); // skip HashCuckooRep as it does not support snapshot @@ -7256,7 +7264,7 @@ TEST_F(DBTest, ManualCompaction) { // Compact all MakeTables(1, "a", "z", 1); ASSERT_EQ("0,1,2", FilesPerLevel(1)); - db_->CompactRange(handles_[1], nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); ASSERT_EQ("0,0,1", FilesPerLevel(1)); if (iter == 0) { @@ -7294,7 +7302,9 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId, ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path)); // Full compaction to DB path 0 - db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 1); + CompactRangeOptions compact_options; + compact_options.target_path_id = 1; + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); @@ -7315,13 +7325,15 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId, ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); // Full compaction to DB path 0 - db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 0); + compact_options.target_path_id = 0; + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path)); // Fail when compacting to an invalid path ID - ASSERT_TRUE(db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 2) + compact_options.target_path_id = 2; + ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr) .IsInvalidArgument()); } @@ -7378,7 +7390,9 @@ TEST_F(DBTest, ManualLevelCompactionOutputPathId) { ASSERT_EQ("1,2", FilesPerLevel(1)); ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); - db_->CompactRange(handles_[1], nullptr, nullptr, false, 1, 1); + CompactRangeOptions compact_options; + compact_options.target_path_id = 1; + db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); ASSERT_EQ("0,1", FilesPerLevel(1)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); @@ -7447,7 +7461,7 @@ TEST_F(DBTest, DBOpen_Change_NumLevels) { ASSERT_OK(Put(1, "a", "123")); ASSERT_OK(Put(1, "b", "234")); - db_->CompactRange(handles_[1], nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); Close(); options.create_if_missing = false; @@ -7518,7 +7532,7 @@ TEST_F(DBTest, DropWrites) { true /* disallow trivial move */); } } else { - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); } } @@ -8076,7 +8090,8 @@ TEST_F(DBTest, CompactOnFlush) { ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]"); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]"); // Case 2: Delete followed by another delete @@ -8085,7 +8100,8 @@ TEST_F(DBTest, CompactOnFlush) { ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]"); ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]"); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); // Case 3: Put followed by a delete @@ -8094,7 +8110,8 @@ TEST_F(DBTest, CompactOnFlush) { ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]"); ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]"); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); // Case 4: Put followed by another Put @@ -8103,12 +8120,14 @@ TEST_F(DBTest, CompactOnFlush) { ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]"); ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]"); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]"); // clear database Delete(1, "foo"); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); // Case 5: Put followed by snapshot followed by another Put @@ -8122,7 +8141,8 @@ TEST_F(DBTest, CompactOnFlush) { // clear database Delete(1, "foo"); - dbfull()->CompactRange(handles_[1], nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, + nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); // Case 5: snapshot followed by a put followed by another Put @@ -9061,10 +9081,9 @@ class ModelDB: public DB { } } using DB::CompactRange; - virtual Status CompactRange(ColumnFamilyHandle* column_family, - const Slice* start, const Slice* end, - bool reduce_level, int target_level, - uint32_t output_path_id) override { + virtual Status CompactRange(const CompactRangeOptions& options, + ColumnFamilyHandle* column_family, + const Slice* start, const Slice* end) override { return Status::NotSupported("Not supported operation."); } @@ -9432,7 +9451,8 @@ void PrefixScanInit(DBTest *dbtest) { keystr = std::string(buf); ASSERT_OK(dbtest->Put(keystr, keystr)); dbtest->Flush(); - dbtest->dbfull()->CompactRange(nullptr, nullptr); // move to level 1 + dbtest->dbfull()->CompactRange(CompactRangeOptions(), nullptr, + nullptr); // move to level 1 // GROUP 1 for (int i = 1; i <= small_range_sstfiles; i++) { @@ -9685,7 +9705,7 @@ TEST_F(DBTest, TailingIteratorIncomplete) { // we either see the entry or it's not in cache ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); - ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); iter->SeekToFirst(); // should still be true after compaction ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); @@ -9910,7 +9930,7 @@ TEST_F(DBTest, ManagedTailingIteratorIncomplete) { // we either see the entry or it's not in cache ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); - ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); iter->SeekToFirst(); // should still be true after compaction ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); @@ -10039,7 +10059,7 @@ TEST_F(DBTest, FIFOCompactionTest) { if (iter == 0) { ASSERT_OK(dbfull()->TEST_WaitForCompact()); } else { - ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); } // only 5 files should survive ASSERT_EQ(NumTableFilesAtLevel(0), 5); @@ -10760,7 +10780,7 @@ TEST_F(DBTest, DynamicMemtableOptions) { ASSERT_GT(SizeAtLevel(0), k64KB - k5KB); // Clean up L0 - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(NumTableFilesAtLevel(0), 0); // Increase buffer size @@ -10818,7 +10838,7 @@ TEST_F(DBTest, DynamicMemtableOptions) { {"max_write_buffer_number", "8"}, })); // Clean up memtable and L0 - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); SleepingBackgroundTask sleeping_task_low2; env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2, @@ -10839,7 +10859,7 @@ TEST_F(DBTest, DynamicMemtableOptions) { {"max_write_buffer_number", "4"}, })); // Clean up memtable and L0 - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); SleepingBackgroundTask sleeping_task_low3; env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low3, @@ -11077,7 +11097,7 @@ TEST_F(DBTest, PreShutdownManualCompaction) { MakeTables(1, "a", "z", 1); ASSERT_EQ("0,1,2", FilesPerLevel(1)); CancelAllBackgroundWork(db_); - db_->CompactRange(handles_[1], nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); ASSERT_EQ("0,1,2", FilesPerLevel(1)); if (iter == 0) { @@ -11349,7 +11369,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesBase) { } // Test compact range works - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); // All data should be in the last level. ColumnFamilyMetaData cf_meta; db_->GetColumnFamilyMetaData(&cf_meta); @@ -11542,7 +11562,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesCompactRange) { DestroyAndReopen(options); // Compact against empty DB - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); uint64_t int_prop; std::string str_prop; @@ -11583,7 +11603,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesCompactRange) { }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(output_levels.size(), 2); ASSERT_TRUE(output_levels.find(3) != output_levels.end()); ASSERT_TRUE(output_levels.find(4) != output_levels.end()); @@ -11701,7 +11721,10 @@ TEST_F(DBTest, MigrateToDynamicLevelMaxBytesBase) { // Issue manual compaction in one thread and still verify DB state // in main thread. std::thread t([&]() { - dbfull()->CompactRange(nullptr, nullptr, true, options.num_levels - 1); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = options.num_levels - 1; + dbfull()->CompactRange(compact_options, nullptr, nullptr); compaction_finished.store(true); }); do { @@ -12080,7 +12103,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { // Clean up memtable and L0. Block compaction threads. If continue to write // and flush memtables. We should see put timeout after 8 memtable flushes // since level0_stop_writes_trigger = 8 - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); // Block compaction SleepingBackgroundTask sleeping_task_low1; env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low1, @@ -12106,7 +12129,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_OK(dbfull()->SetOptions({ {"level0_stop_writes_trigger", "6"} })); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(NumTableFilesAtLevel(0), 0); // Block compaction @@ -12131,7 +12154,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_OK(dbfull()->SetOptions({ {"disable_auto_compactions", "true"} })); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(NumTableFilesAtLevel(0), 0); for (int i = 0; i < 4; ++i) { @@ -12147,7 +12170,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_OK(dbfull()->SetOptions({ {"disable_auto_compactions", "false"} })); - dbfull()->CompactRange(nullptr, nullptr); + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(NumTableFilesAtLevel(0), 0); for (int i = 0; i < 4; ++i) { @@ -12924,7 +12947,7 @@ TEST_F(DBTest, FilterCompactionTimeTest) { Flush(); } - ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_EQ(0U, CountLiveFiles()); Reopen(options); @@ -13338,7 +13361,7 @@ TEST_F(DBTest, PromoteL0Failure) { status = experimental::PromoteL0(db_, db_->DefaultColumnFamily()); ASSERT_TRUE(status.IsInvalidArgument()); - ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); // Now there is a file in L1. ASSERT_GE(NumTableFilesAtLevel(1, 0), 1); @@ -13365,7 +13388,7 @@ TEST_F(DBTest, HugeNumberOfLevels) { ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); } - ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); } // Github issue #595 @@ -13491,7 +13514,10 @@ TEST_F(DBTest, UniversalCompactionTargetLevel) { ASSERT_EQ("3", FilesPerLevel(0)); // Compact all files into 1 file and put it in L4 - db_->CompactRange(nullptr, nullptr, true, 4); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 4; + db_->CompactRange(compact_options, nullptr, nullptr); ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0)); } @@ -13516,7 +13542,7 @@ TEST_F(DBTest, SuggestCompactRangeNoTwoLevel0Compactions) { for (int num = 0; num < 10; num++) { GenerateNewRandomFile(&rnd); } - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"CompactionJob::Run():Start", diff --git a/db/deletefile_test.cc b/db/deletefile_test.cc index 83d7b0f22..a00568c32 100644 --- a/db/deletefile_test.cc +++ b/db/deletefile_test.cc @@ -201,8 +201,11 @@ TEST_F(DeleteFileTest, PurgeObsoleteFilesTest) { // 2 ssts, 1 manifest CheckFileTypeCounts(dbname_, 0, 2, 1); std::string first("0"), last("999999"); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 2; Slice first_slice(first), last_slice(last); - db_->CompactRange(&first_slice, &last_slice, true, 2); + db_->CompactRange(compact_options, &first_slice, &last_slice); // 1 sst after compaction CheckFileTypeCounts(dbname_, 0, 1, 1); @@ -211,7 +214,7 @@ TEST_F(DeleteFileTest, PurgeObsoleteFilesTest) { Iterator *itr = 0; CreateTwoLevels(); itr = db_->NewIterator(ReadOptions()); - db_->CompactRange(&first_slice, &last_slice, true, 2); + db_->CompactRange(compact_options, &first_slice, &last_slice); // 3 sst after compaction with live iterator CheckFileTypeCounts(dbname_, 0, 3, 1); delete itr; diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index 6926e2448..85157c8e6 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -659,7 +659,7 @@ class FaultInjectionTest : public testing::Test { Build(write_options, 0, num_pre_sync); if (sync_use_compact_) { - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); } write_options.sync = false; Build(write_options, num_pre_sync, num_post_sync); diff --git a/db/listener_test.cc b/db/listener_test.cc index a5acabbaf..39627df1f 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -201,7 +201,8 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) { ASSERT_OK(Flush(static_cast(i))); const Slice kStart = "a"; const Slice kEnd = "z"; - ASSERT_OK(dbfull()->CompactRange(handles_[i], &kStart, &kEnd)); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i], + &kStart, &kEnd)); dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } diff --git a/db/merge_test.cc b/db/merge_test.cc index 2fa7fae16..80aed6095 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -294,7 +294,7 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) { db->Flush(o); cout << "Compaction started ...\n"; - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); cout << "Compaction ended\n"; dumpDb(db); @@ -341,7 +341,7 @@ void testPartialMerge(Counters* counters, DB* db, size_t max_merge, tmp_sum += i; } db->Flush(o); - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(tmp_sum, counters->assert_get("b")); if (count > max_merge) { // in this case, FullMerge should be called instead. @@ -360,7 +360,7 @@ void testPartialMerge(Counters* counters, DB* db, size_t max_merge, tmp_sum += i; } db->Flush(o); - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(tmp_sum, counters->assert_get("c")); ASSERT_EQ(num_partial_merge_calls, 0U); } @@ -467,7 +467,7 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) { counters.add("test-key", 1); counters.add("test-key", 1); counters.add("test-key", 1); - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); } DB* reopen_db; diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 06cf65bd1..7dbaacd73 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -602,6 +602,9 @@ extern void rocksdb_options_set_max_bytes_for_level_multiplier_additional( rocksdb_options_t*, int* level_values, size_t num_levels); extern void rocksdb_options_enable_statistics(rocksdb_options_t*); +/* returns a pointer to a malloc()-ed, null terminated string */ +extern char *rocksdb_options_statistics_get_string(rocksdb_options_t *opt); + extern void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t*, int); extern void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t*, int); extern void rocksdb_options_set_max_write_buffer_number_to_maintain( diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 54536e999..ed012455c 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -33,6 +33,7 @@ struct ReadOptions; struct WriteOptions; struct FlushOptions; struct CompactionOptions; +struct CompactRangeOptions; struct TableProperties; class WriteBatch; class Env; @@ -415,25 +416,42 @@ class DB { // begin==nullptr is treated as a key before all keys in the database. // end==nullptr is treated as a key after all keys in the database. // Therefore the following call will compact the entire database: - // db->CompactRange(nullptr, nullptr); + // db->CompactRange(options, nullptr, nullptr); // Note that after the entire database is compacted, all data are pushed - // down to the last level containing any data. If the total data size - // after compaction is reduced, that level might not be appropriate for - // hosting all the files. In this case, client could set change_level - // to true, to move the files back to the minimum level capable of holding - // the data set or a given level (specified by non-negative target_level). - // Compaction outputs should be placed in options.db_paths[target_path_id]. - // Behavior is undefined if target_path_id is out of range. - virtual Status CompactRange(ColumnFamilyHandle* column_family, - const Slice* begin, const Slice* end, - bool change_level = false, int target_level = -1, - uint32_t target_path_id = 0) = 0; - virtual Status CompactRange(const Slice* begin, const Slice* end, - bool change_level = false, int target_level = -1, - uint32_t target_path_id = 0) { - return CompactRange(DefaultColumnFamily(), begin, end, change_level, - target_level, target_path_id); + // down to the last level containing any data. If the total data size after + // compaction is reduced, that level might not be appropriate for hosting all + // the files. In this case, client could set options.change_level to true, to + // move the files back to the minimum level capable of holding the data set + // or a given level (specified by non-negative options.target_level). + virtual Status CompactRange(const CompactRangeOptions& options, + ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) = 0; + virtual Status CompactRange(const CompactRangeOptions& options, + const Slice* begin, const Slice* end) { + return CompactRange(options, DefaultColumnFamily(), begin, end); } + + __attribute__((deprecated)) virtual Status + CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, + const Slice* end, bool change_level = false, + int target_level = -1, uint32_t target_path_id = 0) { + CompactRangeOptions options; + options.change_level = change_level; + options.target_level = target_level; + options.target_path_id = target_path_id; + return CompactRange(options, column_family, begin, end); + } + __attribute__((deprecated)) virtual Status + CompactRange(const Slice* begin, const Slice* end, + bool change_level = false, int target_level = -1, + uint32_t target_path_id = 0) { + CompactRangeOptions options; + options.change_level = change_level; + options.target_level = target_level; + options.target_path_id = target_path_id; + return CompactRange(options, DefaultColumnFamily(), begin, end); + } + virtual Status SetOptions(ColumnFamilyHandle* column_family, const std::unordered_map& new_options) { return Status::NotSupported("Not implemented"); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 1fab82cbb..ef61246e2 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1237,6 +1237,19 @@ struct CompactionOptions { : compression(kSnappyCompression), output_file_size_limit(std::numeric_limits::max()) {} }; + +// CompactRangeOptions is used by CompactRange() call. +struct CompactRangeOptions { + // If true, compacted files will be moved to the minimum level capable + // of holding the data or given level (specified non-negative target_level). + bool change_level = false; + // If change_level is true and target_level have non-negative value, compacted + // files will be moved to target_level. + int target_level = -1; + // Compaction outputs will be placed in options.db_paths[target_path_id]. + // Behavior is undefined if target_path_id is out of range. + uint32_t target_path_id = 0; +}; } // namespace rocksdb #endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_ diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 6231b339b..6d39c99c5 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -127,12 +127,10 @@ class StackableDB : public DB { } using DB::CompactRange; - virtual Status CompactRange(ColumnFamilyHandle* column_family, - const Slice* begin, const Slice* end, - bool change_level = false, int target_level = -1, - uint32_t target_path_id = 0) override { - return db_->CompactRange(column_family, begin, end, change_level, - target_level, target_path_id); + virtual Status CompactRange(const CompactRangeOptions& options, + ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) override { + return db_->CompactRange(options, column_family, begin, end); } using DB::CompactFiles; diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 1b94dd8a1..221e7fff2 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -1476,13 +1476,15 @@ void rocksdb_compactrange_helper(JNIEnv* env, rocksdb::DB* db, jint jtarget_level, jint jtarget_path_id) { rocksdb::Status s; + rocksdb::CompactRangeOptions compact_options; + compact_options.change_level = jreduce_level; + compact_options.target_level = jtarget_level; + compact_options.target_path_id = static_cast(jtarget_path_id); if (cf_handle != nullptr) { - s = db->CompactRange(cf_handle, nullptr, nullptr, jreduce_level, - jtarget_level, static_cast(jtarget_path_id)); + s = db->CompactRange(compact_options, cf_handle, nullptr, nullptr); } else { // backwards compatibility - s = db->CompactRange(nullptr, nullptr, jreduce_level, - jtarget_level, static_cast(jtarget_path_id)); + s = db->CompactRange(compact_options, nullptr, nullptr); } if (s.ok()) { @@ -1533,13 +1535,15 @@ void rocksdb_compactrange_helper(JNIEnv* env, rocksdb::DB* db, const rocksdb::Slice end_slice(reinterpret_cast(end), jend_len); rocksdb::Status s; + rocksdb::CompactRangeOptions compact_options; + compact_options.change_level = jreduce_level; + compact_options.target_level = jtarget_level; + compact_options.target_path_id = static_cast(jtarget_path_id); if (cf_handle != nullptr) { - s = db->CompactRange(cf_handle, &begin_slice, &end_slice, jreduce_level, - jtarget_level, static_cast(jtarget_path_id)); + s = db->CompactRange(compact_options, cf_handle, &begin_slice, &end_slice); } else { // backwards compatibility - s = db->CompactRange(&begin_slice, &end_slice, jreduce_level, - jtarget_level, static_cast(jtarget_path_id)); + s = db->CompactRange(compact_options, &begin_slice, &end_slice); } env->ReleaseByteArrayElements(jbegin, begin, JNI_ABORT); diff --git a/util/env_posix.cc b/util/env_posix.cc index 77fb0ba44..8b1f04143 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1766,7 +1766,7 @@ class PosixEnv : public Env { ThreadPool* tp = meta->thread_pool_; #if ROCKSDB_USING_THREAD_STATUS // for thread-status - ThreadStatusUtil::SetThreadType(tp->env_, + ThreadStatusUtil::RegisterThread(tp->env_, (tp->GetThreadPriority() == Env::Priority::HIGH ? ThreadStatus::HIGH_PRIORITY : ThreadStatus::LOW_PRIORITY)); diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 2e19cd0e9..fc334b407 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -441,7 +441,7 @@ void CompactorCommand::DoCommand() { end = new Slice(to_); } - db_->CompactRange(begin, end); + db_->CompactRange(CompactRangeOptions(), begin, end); exec_state_ = LDBCommandExecuteResult::Succeed(""); delete begin; @@ -519,7 +519,7 @@ void DBLoaderCommand::DoCommand() { cout << "Warning: " << bad_lines << " bad lines ignored." << endl; } if (compact_) { - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); } } @@ -1204,7 +1204,7 @@ void ReduceDBLevelsCommand::DoCommand() { } // Compact the whole DB to put all files to the highest level. fprintf(stdout, "Compacting the db...\n"); - db_->CompactRange(nullptr, nullptr); + db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); CloseDB(); EnvOptions soptions; @@ -1309,9 +1309,10 @@ void ChangeCompactionStyleCommand::DoCommand() { files_per_level.c_str()); // manual compact into a single file and move the file to level 0 - db_->CompactRange(nullptr, nullptr, - true /* reduce level */, - 0 /* reduce to level 0 */); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 0; + db_->CompactRange(compact_options, nullptr, nullptr); // verify compaction result files_per_level = ""; diff --git a/util/manual_compaction_test.cc b/util/manual_compaction_test.cc index 6eedd0366..12c7486a3 100644 --- a/util/manual_compaction_test.cc +++ b/util/manual_compaction_test.cc @@ -77,7 +77,7 @@ TEST_F(ManualCompactionTest, CompactTouchesAllKeys) { db->Put(WriteOptions(), Slice("key4"), Slice("destroy")); Slice key4("key4"); - db->CompactRange(nullptr, &key4); + db->CompactRange(CompactRangeOptions(), nullptr, &key4); Iterator* itr = db->NewIterator(ReadOptions()); itr->SeekToFirst(); ASSERT_TRUE(itr->Valid()); @@ -130,7 +130,7 @@ TEST_F(ManualCompactionTest, Test) { rocksdb::Slice greatest(end_key.data(), end_key.size()); // commenting out the line below causes the example to work correctly - db->CompactRange(&least, &greatest); + db->CompactRange(CompactRangeOptions(), &least, &greatest); // count the keys rocksdb::Iterator* iter = db->NewIterator(rocksdb::ReadOptions()); diff --git a/util/thread_status_updater.cc b/util/thread_status_updater.cc index 2dc15b429..2fd87cc89 100644 --- a/util/thread_status_updater.cc +++ b/util/thread_status_updater.cc @@ -15,6 +15,19 @@ namespace rocksdb { __thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr; +void ThreadStatusUpdater::RegisterThread( + ThreadStatus::ThreadType ttype, uint64_t thread_id) { + if (UNLIKELY(thread_status_data_ == nullptr)) { + thread_status_data_ = new ThreadStatusData(); + thread_status_data_->thread_type = ttype; + thread_status_data_->thread_id = thread_id; + std::lock_guard lck(thread_list_mutex_); + thread_data_set_.insert(thread_status_data_); + } + + ClearThreadOperationProperties(); +} + void ThreadStatusUpdater::UnregisterThread() { if (thread_status_data_ != nullptr) { std::lock_guard lck(thread_list_mutex_); @@ -24,18 +37,6 @@ void ThreadStatusUpdater::UnregisterThread() { } } -void ThreadStatusUpdater::SetThreadID(uint64_t thread_id) { - auto* data = InitAndGet(); - data->thread_id.store(thread_id, std::memory_order_relaxed); -} - -void ThreadStatusUpdater::SetThreadType( - ThreadStatus::ThreadType ttype) { - auto* data = InitAndGet(); - data->thread_type.store(ttype, std::memory_order_relaxed); - ClearThreadOperationProperties(); -} - void ThreadStatusUpdater::ResetThreadStatus() { ClearThreadState(); ClearThreadOperation(); @@ -44,7 +45,10 @@ void ThreadStatusUpdater::ResetThreadStatus() { void ThreadStatusUpdater::SetColumnFamilyInfoKey( const void* cf_key) { - auto* data = InitAndGet(); + auto* data = Get(); + if (data == nullptr) { + return; + } // set the tracking flag based on whether cf_key is non-null or not. // If enable_thread_tracking is set to false, the input cf_key // would be nullptr. @@ -53,8 +57,8 @@ void ThreadStatusUpdater::SetColumnFamilyInfoKey( } const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() { - auto* data = InitAndGet(); - if (data->enable_tracking == false) { + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return nullptr; } return data->cf_key.load(std::memory_order_relaxed); @@ -62,9 +66,8 @@ const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() { void ThreadStatusUpdater::SetThreadOperation( const ThreadStatus::OperationType type) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } // NOTE: Our practice here is to set all the thread operation properties @@ -82,9 +85,8 @@ void ThreadStatusUpdater::SetThreadOperation( void ThreadStatusUpdater::SetThreadOperationProperty( int i, uint64_t value) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->op_properties[i].store(value, std::memory_order_relaxed); @@ -92,27 +94,24 @@ void ThreadStatusUpdater::SetThreadOperationProperty( void ThreadStatusUpdater::IncreaseThreadOperationProperty( int i, uint64_t delta) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->op_properties[i].fetch_add(delta, std::memory_order_relaxed); } void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->op_start_time.store(start_time, std::memory_order_relaxed); } void ThreadStatusUpdater::ClearThreadOperation() { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN, @@ -123,9 +122,8 @@ void ThreadStatusUpdater::ClearThreadOperation() { } void ThreadStatusUpdater::ClearThreadOperationProperties() { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) { @@ -135,9 +133,8 @@ void ThreadStatusUpdater::ClearThreadOperationProperties() { ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage( ThreadStatus::OperationStage stage) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return ThreadStatus::STAGE_UNKNOWN; } return data->operation_stage.exchange( @@ -146,18 +143,16 @@ ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage( void ThreadStatusUpdater::SetThreadState( const ThreadStatus::StateType type) { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->state_type.store(type, std::memory_order_relaxed); } void ThreadStatusUpdater::ClearThreadState() { - auto* data = InitAndGet(); - if (!data->enable_tracking) { - assert(data->cf_key.load(std::memory_order_relaxed) == nullptr); + auto* data = GetLocalThreadStatus(); + if (data == nullptr) { return; } data->state_type.store( @@ -222,11 +217,14 @@ Status ThreadStatusUpdater::GetThreadList( return Status::OK(); } -ThreadStatusData* ThreadStatusUpdater::InitAndGet() { - if (UNLIKELY(thread_status_data_ == nullptr)) { - thread_status_data_ = new ThreadStatusData(); - std::lock_guard lck(thread_list_mutex_); - thread_data_set_.insert(thread_status_data_); +ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() { + if (thread_status_data_ == nullptr) { + return nullptr; + } + if (!thread_status_data_->enable_tracking) { + assert(thread_status_data_->cf_key.load( + std::memory_order_relaxed) == nullptr); + return nullptr; } return thread_status_data_; } @@ -290,17 +288,14 @@ void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) { #else -void ThreadStatusUpdater::UnregisterThread() { +void ThreadStatusUpdater::RegisterThread( + ThreadStatus::ThreadType ttype, uint64_t thread_id) { } -void ThreadStatusUpdater::ResetThreadStatus() { -} - -void ThreadStatusUpdater::SetThreadID(uint64_t thread_id) { +void ThreadStatusUpdater::UnregisterThread() { } -void ThreadStatusUpdater::SetThreadType( - ThreadStatus::ThreadType ttype) { +void ThreadStatusUpdater::ResetThreadStatus() { } void ThreadStatusUpdater::SetColumnFamilyInfoKey( diff --git a/util/thread_status_updater.h b/util/thread_status_updater.h index 5e7c2b894..218bba042 100644 --- a/util/thread_status_updater.h +++ b/util/thread_status_updater.h @@ -118,8 +118,8 @@ class ThreadStatusUpdater { // Set the id of the current thread. void SetThreadID(uint64_t thread_id); - // Set the thread type of the current thread. - void SetThreadType(ThreadStatus::ThreadType ttype); + // Register the current thread for tracking. + void RegisterThread(ThreadStatus::ThreadType ttype, uint64_t thread_id); // Update the column-family info of the current thread by setting // its thread-local pointer of ThreadStateInfo to the correct entry. @@ -198,9 +198,15 @@ class ThreadStatusUpdater { // The thread-local variable for storing thread status. static __thread ThreadStatusData* thread_status_data_; - // Obtain the pointer to the thread status data. It also performs - // initialization when necessary. - ThreadStatusData* InitAndGet(); + // Returns the pointer to the thread status data only when the + // thread status data is non-null and has enable_tracking == true. + ThreadStatusData* GetLocalThreadStatus(); + + // Directly returns the pointer to thread_status_data_ without + // checking whether enabling_tracking is true of not. + ThreadStatusData* Get() { + return thread_status_data_; + } // The mutex that protects cf_info_map and db_key_map. std::mutex thread_list_mutex_; diff --git a/util/thread_status_util.cc b/util/thread_status_util.cc index 116950e13..e67a8e4ef 100644 --- a/util/thread_status_util.cc +++ b/util/thread_status_util.cc @@ -15,14 +15,14 @@ __thread ThreadStatusUpdater* ThreadStatusUtil::thread_updater_local_cache_ = nullptr; __thread bool ThreadStatusUtil::thread_updater_initialized_ = false; -void ThreadStatusUtil::SetThreadType( +void ThreadStatusUtil::RegisterThread( const Env* env, ThreadStatus::ThreadType thread_type) { if (!MaybeInitThreadLocalUpdater(env)) { return; } assert(thread_updater_local_cache_); - thread_updater_local_cache_->SetThreadID(env->GetThreadID()); - thread_updater_local_cache_->SetThreadType(thread_type); + thread_updater_local_cache_->RegisterThread( + thread_type, env->GetThreadID()); } void ThreadStatusUtil::UnregisterThread() { diff --git a/util/thread_status_util.h b/util/thread_status_util.h index ba0238d58..aa13a6c40 100644 --- a/util/thread_status_util.h +++ b/util/thread_status_util.h @@ -27,8 +27,8 @@ class ColumnFamilyData; // all function calls to ThreadStatusUtil will be no-op. class ThreadStatusUtil { public: - // Set the thread type of the current thread. - static void SetThreadType( + // Register the current thread for tracking. + static void RegisterThread( const Env* env, ThreadStatus::ThreadType thread_type); // Unregister the current thread. diff --git a/utilities/compacted_db/compacted_db_impl.h b/utilities/compacted_db/compacted_db_impl.h index 2754ed3a2..ec2d53762 100644 --- a/utilities/compacted_db/compacted_db_impl.h +++ b/utilities/compacted_db/compacted_db_impl.h @@ -54,10 +54,9 @@ class CompactedDBImpl : public DBImpl { return Status::NotSupported("Not supported in compacted db mode."); } using DBImpl::CompactRange; - virtual Status CompactRange(ColumnFamilyHandle* column_family, - const Slice* begin, const Slice* end, - bool change_level = false, int target_level = -1, - uint32_t target_path_id = 0) override { + virtual Status CompactRange(const CompactRangeOptions& options, + ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) override { return Status::NotSupported("Not supported in compacted db mode."); } diff --git a/utilities/merge_operators/string_append/stringappend_test.cc b/utilities/merge_operators/string_append/stringappend_test.cc index a0d137c8e..92621de92 100644 --- a/utilities/merge_operators/string_append/stringappend_test.cc +++ b/utilities/merge_operators/string_append/stringappend_test.cc @@ -515,7 +515,7 @@ TEST_F(StringAppendOperatorTest, PersistentFlushAndCompaction) { slists.Append("c", "bbnagnagsx"); slists.Append("a", "sa"); slists.Append("b", "df"); - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); slists.Get("a", &a); slists.Get("b", &b); slists.Get("c", &c); @@ -536,7 +536,7 @@ TEST_F(StringAppendOperatorTest, PersistentFlushAndCompaction) { ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh"); // Compact, Get - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk"); ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;"); ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh"); @@ -544,7 +544,7 @@ TEST_F(StringAppendOperatorTest, PersistentFlushAndCompaction) { // Append, Flush, Compact, Get slists.Append("b", "afcg"); db->Flush(rocksdb::FlushOptions()); - db->CompactRange(nullptr, nullptr); + db->CompactRange(CompactRangeOptions(), nullptr, nullptr); slists.Get("b", &b); ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;\nafcg"); } diff --git a/utilities/spatialdb/spatial_db.cc b/utilities/spatialdb/spatial_db.cc index a90185338..33a543817 100644 --- a/utilities/spatialdb/spatial_db.cc +++ b/utilities/spatialdb/spatial_db.cc @@ -589,7 +589,7 @@ class SpatialDBImpl : public SpatialDB { Status t = Flush(FlushOptions(), cfh); if (t.ok()) { - t = CompactRange(cfh, nullptr, nullptr); + t = CompactRange(CompactRangeOptions(), cfh, nullptr, nullptr); } { diff --git a/utilities/ttl/ttl_test.cc b/utilities/ttl/ttl_test.cc index 35ca34a05..e31c4d327 100644 --- a/utilities/ttl/ttl_test.cc +++ b/utilities/ttl/ttl_test.cc @@ -168,9 +168,9 @@ class TtlTest : public testing::Test { // Runs a manual compaction void ManualCompact(ColumnFamilyHandle* cf = nullptr) { if (cf == nullptr) { - db_ttl_->CompactRange(nullptr, nullptr); + db_ttl_->CompactRange(CompactRangeOptions(), nullptr, nullptr); } else { - db_ttl_->CompactRange(cf, nullptr, nullptr); + db_ttl_->CompactRange(CompactRangeOptions(), cf, nullptr, nullptr); } }