Merge branch 'master' of github.com:facebook/rocksdb

D40233: Replace %llu with format macros in ParsedInternalKey::DebugString())
main
Poornima Chozhiyath Raman 10 years ago
commit a66b8157df
  1. 1
      HISTORY.md
  2. 0
      build_tools/dockerbuild.sh
  3. 14
      db/c.cc
  4. 12
      db/c_test.c
  5. 6
      db/column_family_test.cc
  6. 10
      db/compaction_job_stats_test.cc
  7. 17
      db/db_bench.cc
  8. 114
      db/db_impl.cc
  9. 23
      db/db_impl.h
  10. 4
      db/db_impl_experimental.cc
  11. 7
      db/db_impl_readonly.h
  12. 172
      db/db_test.cc
  13. 7
      db/deletefile_test.cc
  14. 2
      db/fault_injection_test.cc
  15. 3
      db/listener_test.cc
  16. 8
      db/merge_test.cc
  17. 3
      include/rocksdb/c.h
  18. 52
      include/rocksdb/db.h
  19. 13
      include/rocksdb/options.h
  20. 10
      include/rocksdb/utilities/stackable_db.h
  21. 20
      java/rocksjni/rocksjni.cc
  22. 2
      util/env_posix.cc
  23. 13
      util/ldb_cmd.cc
  24. 4
      util/manual_compaction_test.cc
  25. 103
      util/thread_status_updater.cc
  26. 16
      util/thread_status_updater.h
  27. 6
      util/thread_status_util.cc
  28. 4
      util/thread_status_util.h
  29. 7
      utilities/compacted_db/compacted_db_impl.h
  30. 6
      utilities/merge_operators/string_append/stringappend_test.cc
  31. 2
      utilities/spatialdb/spatial_db.cc
  32. 4
      utilities/ttl/ttl_test.cc

@ -16,6 +16,7 @@
* options.hard_rate_limit is deprecated. * options.hard_rate_limit is deprecated.
* When options.soft_rate_limit or options.level0_slowdown_writes_trigger is triggered, the way to slow down writes is changed to: write rate to DB is limited to to options.delayed_write_rate. * When options.soft_rate_limit or options.level0_slowdown_writes_trigger is triggered, the way to slow down writes is changed to: write rate to DB is limited to to options.delayed_write_rate.
* DB::GetApproximateSizes() adds a parameter to allow the estimation to include data in mem table, with default to be not to include. It is now only supported in skip list mem table. * DB::GetApproximateSizes() adds a parameter to allow the estimation to include data in mem table, with default to be not to include. It is now only supported in skip list mem table.
* DB::CompactRange() now accept CompactRangeOptions instead of multiple paramters. CompactRangeOptions is defined in include/rocksdb/options.h.
## 3.11.0 (5/19/2015) ## 3.11.0 (5/19/2015)
### New Features ### New Features

@ -77,6 +77,7 @@ using rocksdb::BackupEngine;
using rocksdb::BackupableDBOptions; using rocksdb::BackupableDBOptions;
using rocksdb::BackupInfo; using rocksdb::BackupInfo;
using rocksdb::RestoreOptions; using rocksdb::RestoreOptions;
using rocksdb::CompactRangeOptions;
using std::shared_ptr; using std::shared_ptr;
@ -1006,6 +1007,7 @@ void rocksdb_compact_range(
const char* limit_key, size_t limit_key_len) { const char* limit_key, size_t limit_key_len) {
Slice a, b; Slice a, b;
db->rep->CompactRange( db->rep->CompactRange(
CompactRangeOptions(),
// Pass nullptr Slice if corresponding "const char*" is nullptr // Pass nullptr Slice if corresponding "const char*" is nullptr
(start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr), (start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr),
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr)); (limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr));
@ -1018,7 +1020,7 @@ void rocksdb_compact_range_cf(
const char* limit_key, size_t limit_key_len) { const char* limit_key, size_t limit_key_len) {
Slice a, b; Slice a, b;
db->rep->CompactRange( db->rep->CompactRange(
column_family->rep, CompactRangeOptions(), column_family->rep,
// Pass nullptr Slice if corresponding "const char*" is nullptr // Pass nullptr Slice if corresponding "const char*" is nullptr
(start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr), (start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr),
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr)); (limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr));
@ -1930,6 +1932,14 @@ void rocksdb_options_set_fifo_compaction_options(
opt->rep.compaction_options_fifo = fifo->rep; opt->rep.compaction_options_fifo = fifo->rep;
} }
char *rocksdb_options_statistics_get_string(rocksdb_options_t *opt) {
rocksdb::Statistics *statistics = opt->rep.statistics.get();
if (statistics) {
return strdup(statistics->ToString().c_str());
}
return nullptr;
}
/* /*
TODO: TODO:
DB::OpenForReadOnly DB::OpenForReadOnly
@ -2435,4 +2445,4 @@ extern void rocksdb_livefiles_destroy(
} // end extern "C" } // end extern "C"
#endif // ROCKSDB_LITE #endif // !ROCKSDB_LITE

@ -2,6 +2,8 @@
Use of this source code is governed by a BSD-style license that can be Use of this source code is governed by a BSD-style license that can be
found in the LICENSE file. See the AUTHORS file for names of contributors. */ found in the LICENSE file. See the AUTHORS file for names of contributors. */
#ifndef ROCKSDB_LITE // Lite does not support C API
#include "rocksdb/c.h" #include "rocksdb/c.h"
#include <stddef.h> #include <stddef.h>
@ -1007,3 +1009,13 @@ int main(int argc, char** argv) {
fprintf(stderr, "PASS\n"); fprintf(stderr, "PASS\n");
return 0; return 0;
} }
#else
#include <stdio.h>
int main() {
fprintf(stderr, "SKIPPED\n");
return 0;
}
#endif // !ROCKSDB_LITE

@ -215,11 +215,13 @@ class ColumnFamilyTest : public testing::Test {
} }
void CompactAll(int cf) { void CompactAll(int cf) {
ASSERT_OK(db_->CompactRange(handles_[cf], nullptr, nullptr)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf], nullptr,
nullptr));
} }
void Compact(int cf, const Slice& start, const Slice& limit) { void Compact(int cf, const Slice& start, const Slice& limit) {
ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit)); ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
} }
int NumTableFilesAtLevel(int level, int cf) { int NumTableFilesAtLevel(int level, int cf) {

@ -309,16 +309,18 @@ class CompactionJobStatsTest : public testing::Test {
void Compact(int cf, const Slice& start, const Slice& limit, void Compact(int cf, const Slice& start, const Slice& limit,
uint32_t target_path_id) { uint32_t target_path_id) {
ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit, false, -1, CompactRangeOptions compact_options;
target_path_id)); compact_options.target_path_id = target_path_id;
ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit));
} }
void Compact(int cf, const Slice& start, const Slice& limit) { void Compact(int cf, const Slice& start, const Slice& limit) {
ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit)); ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
} }
void Compact(const Slice& start, const Slice& limit) { void Compact(const Slice& start, const Slice& limit) {
ASSERT_OK(db_->CompactRange(&start, &limit)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit));
} }
void TEST_Compact(int level, int cf, const Slice& start, const Slice& limit) { void TEST_Compact(int level, int cf, const Slice& start, const Slice& limit) {

@ -567,6 +567,10 @@ DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value."); DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");
DEFINE_uint64(
benchmark_write_rate_limit, 0,
"If non-zero, db_bench will rate-limit the writes going into RocksDB");
DEFINE_int32(max_grandparent_overlap_factor, 10, "Control maximum bytes of " DEFINE_int32(max_grandparent_overlap_factor, 10, "Control maximum bytes of "
"overlaps in grandparent (i.e., level+2) before we stop building a" "overlaps in grandparent (i.e., level+2) before we stop building a"
" single file in a level->level+1 compaction."); " single file in a level->level+1 compaction.");
@ -1288,6 +1292,7 @@ struct SharedState {
port::CondVar cv; port::CondVar cv;
int total; int total;
int perf_level; int perf_level;
std::shared_ptr<RateLimiter> write_rate_limiter;
// Each thread goes through the following states: // Each thread goes through the following states:
// (1) initializing // (1) initializing
@ -1400,7 +1405,7 @@ class Benchmark {
(((FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio) (((FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio)
* num_) * num_)
/ 1048576.0)); / 1048576.0));
fprintf(stdout, "Write rate limit: %d\n", FLAGS_writes_per_second); fprintf(stdout, "Writes per second: %d\n", FLAGS_writes_per_second);
if (FLAGS_enable_numa) { if (FLAGS_enable_numa) {
fprintf(stderr, "Running in NUMA enabled mode.\n"); fprintf(stderr, "Running in NUMA enabled mode.\n");
#ifndef NUMA #ifndef NUMA
@ -1950,6 +1955,10 @@ class Benchmark {
shared.num_initialized = 0; shared.num_initialized = 0;
shared.num_done = 0; shared.num_done = 0;
shared.start = false; shared.start = false;
if (FLAGS_benchmark_write_rate_limit > 0) {
shared.write_rate_limiter.reset(
NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
}
std::unique_ptr<ReporterAgent> reporter_agent; std::unique_ptr<ReporterAgent> reporter_agent;
if (FLAGS_report_interval_seconds > 0) { if (FLAGS_report_interval_seconds > 0) {
@ -2646,6 +2655,10 @@ class Benchmark {
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id); DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id);
batch.Clear(); batch.Clear();
for (int64_t j = 0; j < entries_per_batch_; j++) { for (int64_t j = 0; j < entries_per_batch_; j++) {
if (thread->shared->write_rate_limiter.get() != nullptr) {
thread->shared->write_rate_limiter->Request(value_size_ + key_size_,
Env::IO_HIGH);
}
int64_t rand_num = key_gens[id]->Next(); int64_t rand_num = key_gens[id]->Next();
GenerateKeyFromInt(rand_num, FLAGS_num, &key); GenerateKeyFromInt(rand_num, FLAGS_num, &key);
if (FLAGS_num_column_families <= 1) { if (FLAGS_num_column_families <= 1) {
@ -3748,7 +3761,7 @@ class Benchmark {
void Compact(ThreadState* thread) { void Compact(ThreadState* thread) {
DB* db = SelectDB(thread); DB* db = SelectDB(thread);
db->CompactRange(nullptr, nullptr); db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
} }
void PrintStats(const char* key) { void PrintStats(const char* key) {

@ -95,7 +95,6 @@ void DumpRocksDBBuildVersion(Logger * log);
struct DBImpl::WriteContext { struct DBImpl::WriteContext {
autovector<SuperVersion*> superversions_to_free_; autovector<SuperVersion*> superversions_to_free_;
autovector<MemTable*> memtables_to_free_; autovector<MemTable*> memtables_to_free_;
bool schedule_bg_work_ = false;
~WriteContext() { ~WriteContext() {
for (auto& sv : superversions_to_free_) { for (auto& sv : superversions_to_free_) {
@ -1249,7 +1248,8 @@ Status DBImpl::FlushMemTableToOutputFile(
Status s = flush_job.Run(&file_meta); Status s = flush_job.Run(&file_meta);
if (s.ok()) { if (s.ok()) {
InstallSuperVersionBackground(cfd, job_context, mutable_cf_options); InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context,
mutable_cf_options);
if (madeProgress) { if (madeProgress) {
*madeProgress = 1; *madeProgress = 1;
} }
@ -1328,11 +1328,10 @@ void DBImpl::NotifyOnFlushCompleted(
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }
Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, Status DBImpl::CompactRange(const CompactRangeOptions& options,
const Slice* begin, const Slice* end, ColumnFamilyHandle* column_family,
bool change_level, int target_level, const Slice* begin, const Slice* end) {
uint32_t target_path_id) { if (options.target_path_id >= db_options_.db_paths.size()) {
if (target_path_id >= db_options_.db_paths.size()) {
return Status::InvalidArgument("Invalid target path ID"); return Status::InvalidArgument("Invalid target path ID");
} }
@ -1362,8 +1361,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
cfd->NumberLevels() > 1) { cfd->NumberLevels() > 1) {
// Always compact all files together. // Always compact all files together.
s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
cfd->NumberLevels() - 1, target_path_id, begin, cfd->NumberLevels() - 1, options.target_path_id,
end); begin, end);
final_output_level = cfd->NumberLevels() - 1; final_output_level = cfd->NumberLevels() - 1;
} else { } else {
for (int level = 0; level <= max_level_with_files; level++) { for (int level = 0; level <= max_level_with_files; level++) {
@ -1384,8 +1383,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
output_level = ColumnFamilyData::kCompactToBaseLevel; output_level = ColumnFamilyData::kCompactToBaseLevel;
} }
} }
s = RunManualCompaction(cfd, level, output_level, target_path_id, begin, s = RunManualCompaction(cfd, level, output_level, options.target_path_id,
end); begin, end);
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
@ -1403,8 +1402,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
return s; return s;
} }
if (change_level) { if (options.change_level) {
s = ReFitLevel(cfd, final_output_level, target_level); s = ReFitLevel(cfd, final_output_level, options.target_level);
} }
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);
@ -1578,8 +1577,8 @@ Status DBImpl::CompactFilesImpl(
compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_);
if (status.ok()) { if (status.ok()) {
InstallSuperVersionBackground(c->column_family_data(), job_context, InstallSuperVersionAndScheduleWorkWrapper(
*c->mutable_cf_options()); c->column_family_data(), job_context, *c->mutable_cf_options());
} }
c->ReleaseCompactionFiles(s); c->ReleaseCompactionFiles(s);
c.reset(); c.reset();
@ -1791,7 +1790,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
directories_.GetDbDir()); directories_.GetDbDir());
superversion_to_free = InstallSuperVersion( superversion_to_free = InstallSuperVersionAndScheduleWork(
cfd, new_superversion, mutable_cf_options); cfd, new_superversion, mutable_cf_options);
new_superversion = nullptr; new_superversion = nullptr;
@ -1945,9 +1944,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
s = write_thread_.EnterWriteThread(&w, 0); s = write_thread_.EnterWriteThread(&w, 0);
assert(s.ok() && !w.done); // No timeout and nobody should do our job assert(s.ok() && !w.done); // No timeout and nobody should do our job
// SetNewMemtableAndNewLogFile() will release and reacquire mutex // SwitchMemtable() will release and reacquire mutex
// during execution // during execution
s = SetNewMemtableAndNewLogFile(cfd, &context); s = SwitchMemtable(cfd, &context);
write_thread_.ExitWriteThread(&w, &w, s); write_thread_.ExitWriteThread(&w, &w, s);
cfd->imm()->FlushRequested(); cfd->imm()->FlushRequested();
@ -2410,10 +2409,10 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
c->inputs(0)->size()); c->inputs(0)->size());
// There are three things that can change compaction score: // There are three things that can change compaction score:
// 1) When flush or compaction finish. This case is covered by // 1) When flush or compaction finish. This case is covered by
// InstallSuperVersion() // InstallSuperVersionAndScheduleWork
// 2) When MutableCFOptions changes. This case is also covered by // 2) When MutableCFOptions changes. This case is also covered by
// InstallSuperVersion(), because this is when the new options take // InstallSuperVersionAndScheduleWork, because this is when the new
// effect. // options take effect.
// 3) When we Pick a new compaction, we "remove" those files being // 3) When we Pick a new compaction, we "remove" those files being
// compacted from the calculation, which then influences compaction // compacted from the calculation, which then influences compaction
// score. Here we check if we need the new compaction even without the // score. Here we check if we need the new compaction even without the
@ -2449,8 +2448,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
status = versions_->LogAndApply(c->column_family_data(), status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(), *c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir()); &mutex_, directories_.GetDbDir());
InstallSuperVersionBackground(c->column_family_data(), job_context, InstallSuperVersionAndScheduleWorkWrapper(
*c->mutable_cf_options()); c->column_family_data(), job_context, *c->mutable_cf_options());
LogToBuffer(log_buffer, "[%s] Deleted %d files\n", LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(), c->column_family_data()->GetName().c_str(),
c->num_input_files(0)); c->num_input_files(0));
@ -2486,8 +2485,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
*c->mutable_cf_options(), c->edit(), *c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir()); &mutex_, directories_.GetDbDir());
// Use latest MutableCFOptions // Use latest MutableCFOptions
InstallSuperVersionBackground(c->column_family_data(), job_context, InstallSuperVersionAndScheduleWorkWrapper(
*c->mutable_cf_options()); c->column_family_data(), job_context, *c->mutable_cf_options());
VersionStorageInfo::LevelSummaryStorage tmp; VersionStorageInfo::LevelSummaryStorage tmp;
c->column_family_data()->internal_stats()->IncBytesMoved(c->level() + 1, c->column_family_data()->internal_stats()->IncBytesMoved(c->level() + 1,
@ -2532,8 +2531,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_);
if (status.ok()) { if (status.ok()) {
InstallSuperVersionBackground(c->column_family_data(), job_context, InstallSuperVersionAndScheduleWorkWrapper(
*c->mutable_cf_options()); c->column_family_data(), job_context, *c->mutable_cf_options());
} }
*madeProgress = true; *madeProgress = true;
} }
@ -2695,26 +2694,25 @@ Status DBImpl::Get(const ReadOptions& read_options,
// * malloc one SuperVersion() outside of the lock -- new_superversion // * malloc one SuperVersion() outside of the lock -- new_superversion
// * delete SuperVersion()s outside of the lock -- superversions_to_free // * delete SuperVersion()s outside of the lock -- superversions_to_free
// //
// However, if InstallSuperVersion() gets called twice with the same // However, if InstallSuperVersionAndScheduleWork() gets called twice with the
// job_context, we can't reuse the SuperVersion() that got // same job_context, we can't reuse the SuperVersion() that got
// malloced // malloced because
// because
// first call already used it. In that rare case, we take a hit and create a // first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing // new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free // for superversion_to_free
void DBImpl::InstallSuperVersionBackground( void DBImpl::InstallSuperVersionAndScheduleWorkWrapper(
ColumnFamilyData* cfd, JobContext* job_context, ColumnFamilyData* cfd, JobContext* job_context,
const MutableCFOptions& mutable_cf_options) { const MutableCFOptions& mutable_cf_options) {
mutex_.AssertHeld(); mutex_.AssertHeld();
SuperVersion* old_superversion = InstallSuperVersion( SuperVersion* old_superversion = InstallSuperVersionAndScheduleWork(
cfd, job_context->new_superversion, mutable_cf_options); cfd, job_context->new_superversion, mutable_cf_options);
job_context->new_superversion = nullptr; job_context->new_superversion = nullptr;
job_context->superversions_to_free.push_back(old_superversion); job_context->superversions_to_free.push_back(old_superversion);
} }
SuperVersion* DBImpl::InstallSuperVersion( SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersion* new_sv, ColumnFamilyData* cfd, SuperVersion* new_sv,
const MutableCFOptions& mutable_cf_options, bool dont_schedule_bg_work) { const MutableCFOptions& mutable_cf_options) {
mutex_.AssertHeld(); mutex_.AssertHeld();
// Update max_total_in_memory_state_ // Update max_total_in_memory_state_
@ -2729,14 +2727,10 @@ SuperVersion* DBImpl::InstallSuperVersion(
new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options); new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options);
// Whenever we install new SuperVersion, we might need to issue new flushes or // Whenever we install new SuperVersion, we might need to issue new flushes or
// compactions. dont_schedule_bg_work is true when scheduling from write // compactions.
// thread and we don't want to add additional overhead. Callers promise to SchedulePendingFlush(cfd);
// call SchedulePendingFlush() and MaybeScheduleFlushOrCompaction() eventually SchedulePendingCompaction(cfd);
if (!dont_schedule_bg_work) { MaybeScheduleFlushOrCompaction();
SchedulePendingFlush(cfd);
SchedulePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
}
// Update max_total_in_memory_state_ // Update max_total_in_memory_state_
max_total_in_memory_state_ = max_total_in_memory_state_ =
@ -2947,7 +2941,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
auto* cfd = auto* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr); assert(cfd != nullptr);
delete InstallSuperVersion( delete InstallSuperVersionAndScheduleWork(
cfd, nullptr, *cfd->GetLatestMutableCFOptions()); cfd, nullptr, *cfd->GetLatestMutableCFOptions());
if (!cfd->mem()->IsSnapshotSupported()) { if (!cfd->mem()->IsSnapshotSupported()) {
@ -3371,15 +3365,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
continue; continue;
} }
if (cfd->GetLogNumber() <= flush_column_family_if_log_file) { if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
status = SetNewMemtableAndNewLogFile(cfd, &context); status = SwitchMemtable(cfd, &context);
if (!status.ok()) { if (!status.ok()) {
break; break;
} }
cfd->imm()->FlushRequested(); cfd->imm()->FlushRequested();
SchedulePendingFlush(cfd); SchedulePendingFlush(cfd);
context.schedule_bg_work_ = true;
} }
} }
MaybeScheduleFlushOrCompaction();
} else if (UNLIKELY(write_buffer_.ShouldFlush())) { } else if (UNLIKELY(write_buffer_.ShouldFlush())) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Flushing all column families. Write buffer is using %" PRIu64 "Flushing all column families. Write buffer is using %" PRIu64
@ -3392,13 +3386,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
continue; continue;
} }
if (!cfd->mem()->IsEmpty()) { if (!cfd->mem()->IsEmpty()) {
status = SetNewMemtableAndNewLogFile(cfd, &context); status = SwitchMemtable(cfd, &context);
if (!status.ok()) { if (!status.ok()) {
break; break;
} }
cfd->imm()->FlushRequested(); cfd->imm()->FlushRequested();
SchedulePendingFlush(cfd); SchedulePendingFlush(cfd);
context.schedule_bg_work_ = true;
} }
} }
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();
@ -3414,11 +3407,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (UNLIKELY(status.ok()) && if (UNLIKELY(status.ok()) &&
(write_controller_.IsStopped() || write_controller_.NeedsDelay())) { (write_controller_.IsStopped() || write_controller_.NeedsDelay())) {
// If writer is stopped, we need to get it going,
// so schedule flushes/compactions
if (context.schedule_bg_work_) {
MaybeScheduleFlushOrCompaction();
}
PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_delay_time); PERF_TIMER_GUARD(write_delay_time);
// We don't know size of curent batch so that we always use the size // We don't know size of curent batch so that we always use the size
@ -3560,9 +3548,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
mutex_.AssertHeld(); mutex_.AssertHeld();
write_thread_.ExitWriteThread(&w, last_writer, status); write_thread_.ExitWriteThread(&w, last_writer, status);
if (context.schedule_bg_work_) {
MaybeScheduleFlushOrCompaction();
}
mutex_.Unlock(); mutex_.Unlock();
if (status.IsTimedOut()) { if (status.IsTimedOut()) {
@ -3633,9 +3618,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, uint64_t expiration_time) {
Status DBImpl::ScheduleFlushes(WriteContext* context) { Status DBImpl::ScheduleFlushes(WriteContext* context) {
ColumnFamilyData* cfd; ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) { while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) {
auto status = SetNewMemtableAndNewLogFile(cfd, context); auto status = SwitchMemtable(cfd, context);
SchedulePendingFlush(cfd);
context->schedule_bg_work_ = true;
if (cfd->Unref()) { if (cfd->Unref()) {
delete cfd; delete cfd;
} }
@ -3648,8 +3631,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
// REQUIRES: mutex_ is held // REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue // REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
WriteContext* context) {
mutex_.AssertHeld(); mutex_.AssertHeld();
unique_ptr<WritableFile> lfile; unique_ptr<WritableFile> lfile;
log::Writer* new_log = nullptr; log::Writer* new_log = nullptr;
@ -3719,8 +3701,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
new_mem->Ref(); new_mem->Ref();
cfd->SetMemtable(new_mem); cfd->SetMemtable(new_mem);
context->superversions_to_free_.push_back( context->superversions_to_free_.push_back(InstallSuperVersionAndScheduleWork(
InstallSuperVersion(cfd, new_superversion, mutable_cf_options, true)); cfd, new_superversion, mutable_cf_options));
return s; return s;
} }
@ -4010,8 +3992,8 @@ Status DBImpl::DeleteFile(std::string name) {
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, directories_.GetDbDir()); &edit, &mutex_, directories_.GetDbDir());
if (status.ok()) { if (status.ok()) {
InstallSuperVersionBackground(cfd, &job_context, InstallSuperVersionAndScheduleWorkWrapper(
*cfd->GetLatestMutableCFOptions()); cfd, &job_context, *cfd->GetLatestMutableCFOptions());
} }
FindObsoleteFiles(&job_context, false); FindObsoleteFiles(&job_context, false);
} // lock released here } // lock released here
@ -4253,7 +4235,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
} }
if (s.ok()) { if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
delete impl->InstallSuperVersion( delete impl->InstallSuperVersionAndScheduleWork(
cfd, nullptr, *cfd->GetLatestMutableCFOptions()); cfd, nullptr, *cfd->GetLatestMutableCFOptions());
} }
impl->alive_log_files_.push_back( impl->alive_log_files_.push_back(

@ -125,10 +125,9 @@ class DBImpl : public DB {
const Range* range, int n, uint64_t* sizes, const Range* range, int n, uint64_t* sizes,
bool include_memtable = false) override; bool include_memtable = false) override;
using DB::CompactRange; using DB::CompactRange;
virtual Status CompactRange(ColumnFamilyHandle* column_family, virtual Status CompactRange(const CompactRangeOptions& options,
const Slice* begin, const Slice* end, ColumnFamilyHandle* column_family,
bool change_level = false, int target_level = -1, const Slice* begin, const Slice* end) override;
uint32_t target_path_id = 0) override;
using DB::CompactFiles; using DB::CompactFiles;
virtual Status CompactFiles(const CompactionOptions& compact_options, virtual Status CompactFiles(const CompactionOptions& compact_options,
@ -440,8 +439,7 @@ class DBImpl : public DB {
Status ScheduleFlushes(WriteContext* context); Status ScheduleFlushes(WriteContext* context);
Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
WriteContext* context);
// Force current memtable contents to be flushed. // Force current memtable contents to be flushed.
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options); Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options);
@ -719,21 +717,16 @@ class DBImpl : public DB {
// the InstallSuperVersion() function. Background threads carry // the InstallSuperVersion() function. Background threads carry
// job_context which can have new_superversion already // job_context which can have new_superversion already
// allocated. // allocated.
void InstallSuperVersionBackground( void InstallSuperVersionAndScheduleWorkWrapper(
ColumnFamilyData* cfd, JobContext* job_context, ColumnFamilyData* cfd, JobContext* job_context,
const MutableCFOptions& mutable_cf_options); const MutableCFOptions& mutable_cf_options);
// All ColumnFamily state changes go through this function. Here we analyze // All ColumnFamily state changes go through this function. Here we analyze
// the new state and we schedule background work if we detect that the new // the new state and we schedule background work if we detect that the new
// state needs flush or compaction. // state needs flush or compaction.
// If dont_schedule_bg_work == true, then caller asks us to not schedule flush SuperVersion* InstallSuperVersionAndScheduleWork(
// or compaction here, but it also promises to schedule needed background ColumnFamilyData* cfd, SuperVersion* new_sv,
// work. We use this to scheduling background compactions when we are in the const MutableCFOptions& mutable_cf_options);
// write thread, which is very performance critical. Caller schedules
// background work as soon as it exits the write thread
SuperVersion* InstallSuperVersion(ColumnFamilyData* cfd, SuperVersion* new_sv,
const MutableCFOptions& mutable_cf_options,
bool dont_schedule_bg_work = false);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
using DB::GetPropertiesOfAllTables; using DB::GetPropertiesOfAllTables;

@ -137,8 +137,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, directories_.GetDbDir()); &edit, &mutex_, directories_.GetDbDir());
if (status.ok()) { if (status.ok()) {
InstallSuperVersionBackground(cfd, &job_context, InstallSuperVersionAndScheduleWorkWrapper(
*cfd->GetLatestMutableCFOptions()); cfd, &job_context, *cfd->GetLatestMutableCFOptions());
} }
} // lock released here } // lock released here
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);

@ -58,10 +58,9 @@ class DBImplReadOnly : public DBImpl {
return Status::NotSupported("Not supported operation in read only mode."); return Status::NotSupported("Not supported operation in read only mode.");
} }
using DBImpl::CompactRange; using DBImpl::CompactRange;
virtual Status CompactRange(ColumnFamilyHandle* column_family, virtual Status CompactRange(const CompactRangeOptions& options,
const Slice* begin, const Slice* end, ColumnFamilyHandle* column_family,
bool reduce_level = false, int target_level = -1, const Slice* begin, const Slice* end) override {
uint32_t target_path_id = 0) override {
return Status::NotSupported("Not supported operation in read only mode."); return Status::NotSupported("Not supported operation in read only mode.");
} }

@ -1092,16 +1092,18 @@ class DBTest : public testing::Test {
void Compact(int cf, const Slice& start, const Slice& limit, void Compact(int cf, const Slice& start, const Slice& limit,
uint32_t target_path_id) { uint32_t target_path_id) {
ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit, false, -1, CompactRangeOptions compact_options;
target_path_id)); compact_options.target_path_id = target_path_id;
ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit));
} }
void Compact(int cf, const Slice& start, const Slice& limit) { void Compact(int cf, const Slice& start, const Slice& limit) {
ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit)); ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
} }
void Compact(const Slice& start, const Slice& limit) { void Compact(const Slice& start, const Slice& limit) {
ASSERT_OK(db_->CompactRange(&start, &limit)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit));
} }
// Do n memtable compactions, each of which produces an sstable // Do n memtable compactions, each of which produces an sstable
@ -1524,7 +1526,7 @@ TEST_F(DBTest, CompactedDB) {
ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h'))); ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i'))); ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j'))); ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
db_->CompactRange(nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(3, NumTableFilesAtLevel(1)); ASSERT_EQ(3, NumTableFilesAtLevel(1));
Close(); Close();
@ -2339,7 +2341,7 @@ TEST_F(DBTest, WholeKeyFilterProp) {
// ranges. // ranges.
ASSERT_OK(dbfull()->Put(wo, "aaa", "")); ASSERT_OK(dbfull()->Put(wo, "aaa", ""));
ASSERT_OK(dbfull()->Put(wo, "zzz", "")); ASSERT_OK(dbfull()->Put(wo, "zzz", ""));
db_->CompactRange(nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// Reopen with both of whole key off and prefix extractor enabled. // Reopen with both of whole key off and prefix extractor enabled.
// Still no bloom filter should be used. // Still no bloom filter should be used.
@ -2362,7 +2364,7 @@ TEST_F(DBTest, WholeKeyFilterProp) {
// ranges. // ranges.
ASSERT_OK(dbfull()->Put(wo, "aaa", "")); ASSERT_OK(dbfull()->Put(wo, "aaa", ""));
ASSERT_OK(dbfull()->Put(wo, "zzz", "")); ASSERT_OK(dbfull()->Put(wo, "zzz", ""));
db_->CompactRange(nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
options.prefix_extractor.reset(); options.prefix_extractor.reset();
bbto.whole_key_filtering = true; bbto.whole_key_filtering = true;
@ -3790,7 +3792,7 @@ TEST_F(DBTest, TrivialMoveOneFile) {
LiveFileMetaData level0_file = metadata[0]; // L0 file meta LiveFileMetaData level0_file = metadata[0]; // L0 file meta
// Compaction will initiate a trivial move from L0 to L1 // Compaction will initiate a trivial move from L0 to L1
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// File moved From L0 to L1 // File moved From L0 to L1
ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0
@ -3855,7 +3857,7 @@ TEST_F(DBTest, TrivialMoveNonOverlappingFiles) {
// Since data is non-overlapping we expect compaction to initiate // Since data is non-overlapping we expect compaction to initiate
// a trivial move // a trivial move
db_->CompactRange(nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// We expect that all the files were trivially moved from L0 to L1 // We expect that all the files were trivially moved from L0 to L1
ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files); ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files);
@ -3892,7 +3894,7 @@ TEST_F(DBTest, TrivialMoveNonOverlappingFiles) {
ASSERT_OK(Flush()); ASSERT_OK(Flush());
} }
db_->CompactRange(nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
for (uint32_t i = 0; i < ranges.size(); i++) { for (uint32_t i = 0; i < ranges.size(); i++) {
for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
@ -3944,7 +3946,10 @@ TEST_F(DBTest, TrivialMoveTargetLevel) {
// 2 files in L0 // 2 files in L0
ASSERT_EQ("2", FilesPerLevel(0)); ASSERT_EQ("2", FilesPerLevel(0));
ASSERT_OK(db_->CompactRange(nullptr, nullptr, true, 6)); CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 6;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
// 2 files in L6 // 2 files in L6
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0)); ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0));
@ -5423,17 +5428,13 @@ TEST_F(DBTest, ConvertCompactionStyle) {
options = CurrentOptions(options); options = CurrentOptions(options);
ReopenWithColumnFamilies({"default", "pikachu"}, options); ReopenWithColumnFamilies({"default", "pikachu"}, options);
dbfull()->CompactRange(handles_[1], nullptr, nullptr, true /* reduce level */, CompactRangeOptions compact_options;
0 /* reduce to level 0 */); compact_options.change_level = true;
compact_options.target_level = 0;
dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr);
for (int i = 0; i < options.num_levels; i++) { // Only 1 file in L0
int num = NumTableFilesAtLevel(i, 1); ASSERT_EQ("1", FilesPerLevel(1));
if (i == 0) {
ASSERT_EQ(num, 1);
} else {
ASSERT_EQ(num, 0);
}
}
// Stage 4: re-open in universal compaction style and do some db operations // Stage 4: re-open in universal compaction style and do some db operations
options = CurrentOptions(); options = CurrentOptions();
@ -5548,8 +5549,10 @@ TEST_F(DBTest, IncreaseUniversalCompactionNumLevels) {
options.target_file_size_base = INT_MAX; options.target_file_size_base = INT_MAX;
ReopenWithColumnFamilies({"default", "pikachu"}, options); ReopenWithColumnFamilies({"default", "pikachu"}, options);
// Compact all to level 0 // Compact all to level 0
dbfull()->CompactRange(handles_[1], nullptr, nullptr, true /* reduce level */, CompactRangeOptions compact_options;
0 /* reduce to level 0 */); compact_options.change_level = true;
compact_options.target_level = 0;
dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr);
// Need to restart it once to remove higher level records in manifest. // Need to restart it once to remove higher level records in manifest.
ReopenWithColumnFamilies({"default", "pikachu"}, options); ReopenWithColumnFamilies({"default", "pikachu"}, options);
// Final reopen // Final reopen
@ -6021,7 +6024,7 @@ TEST_F(DBTest, CompactionFilterDeletesAll) {
} }
// this will produce empty file (delete compaction filter) // this will produce empty file (delete compaction filter)
ASSERT_OK(db_->CompactRange(nullptr, nullptr)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(0U, CountLiveFiles()); ASSERT_EQ(0U, CountLiveFiles());
Reopen(options); Reopen(options);
@ -6062,7 +6065,8 @@ TEST_F(DBTest, CompactionFilterWithValueChange) {
dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
} else { } else {
dbfull()->CompactRange(handles_[1], nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
} }
// re-write all data again // re-write all data again
@ -6079,7 +6083,8 @@ TEST_F(DBTest, CompactionFilterWithValueChange) {
dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]);
} else { } else {
dbfull()->CompactRange(handles_[1], nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
} }
// verify that all keys now have the new value that // verify that all keys now have the new value that
@ -6120,7 +6125,7 @@ TEST_F(DBTest, CompactionFilterWithMergeOperator) {
ASSERT_OK(Flush()); ASSERT_OK(Flush());
std::string newvalue = Get("foo"); std::string newvalue = Get("foo");
ASSERT_EQ(newvalue, three); ASSERT_EQ(newvalue, three);
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
newvalue = Get("foo"); newvalue = Get("foo");
ASSERT_EQ(newvalue, three); ASSERT_EQ(newvalue, three);
@ -6128,12 +6133,12 @@ TEST_F(DBTest, CompactionFilterWithMergeOperator) {
// merge keys. // merge keys.
ASSERT_OK(db_->Put(WriteOptions(), "bar", two)); ASSERT_OK(db_->Put(WriteOptions(), "bar", two));
ASSERT_OK(Flush()); ASSERT_OK(Flush());
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
newvalue = Get("bar"); newvalue = Get("bar");
ASSERT_EQ("NOT_FOUND", newvalue); ASSERT_EQ("NOT_FOUND", newvalue);
ASSERT_OK(db_->Merge(WriteOptions(), "bar", two)); ASSERT_OK(db_->Merge(WriteOptions(), "bar", two));
ASSERT_OK(Flush()); ASSERT_OK(Flush());
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
newvalue = Get("bar"); newvalue = Get("bar");
ASSERT_EQ(two, two); ASSERT_EQ(two, two);
@ -6144,7 +6149,7 @@ TEST_F(DBTest, CompactionFilterWithMergeOperator) {
ASSERT_OK(Flush()); ASSERT_OK(Flush());
newvalue = Get("foobar"); newvalue = Get("foobar");
ASSERT_EQ(newvalue, three); ASSERT_EQ(newvalue, three);
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
newvalue = Get("foobar"); newvalue = Get("foobar");
ASSERT_EQ(newvalue, three); ASSERT_EQ(newvalue, three);
@ -6157,7 +6162,7 @@ TEST_F(DBTest, CompactionFilterWithMergeOperator) {
ASSERT_OK(Flush()); ASSERT_OK(Flush());
newvalue = Get("barfoo"); newvalue = Get("barfoo");
ASSERT_EQ(newvalue, four); ASSERT_EQ(newvalue, four);
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
newvalue = Get("barfoo"); newvalue = Get("barfoo");
ASSERT_EQ(newvalue, four); ASSERT_EQ(newvalue, four);
} }
@ -6191,7 +6196,7 @@ TEST_F(DBTest, CompactionFilterContextManual) {
filter->expect_manual_compaction_.store(true); filter->expect_manual_compaction_.store(true);
filter->expect_full_compaction_.store(false); // Manual compaction always filter->expect_full_compaction_.store(false); // Manual compaction always
// set this flag. // set this flag.
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(cfilter_count, 700); ASSERT_EQ(cfilter_count, 700);
ASSERT_EQ(NumSortedRuns(0), 1); ASSERT_EQ(NumSortedRuns(0), 1);
@ -6939,7 +6944,8 @@ TEST_F(DBTest, CompactBetweenSnapshots) {
// After a compaction, "second", "third" and "fifth" should // After a compaction, "second", "third" and "fifth" should
// be removed // be removed
FillLevels("a", "z", 1); FillLevels("a", "z", 1);
dbfull()->CompactRange(handles_[1], nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_EQ("sixth", Get(1, "foo")); ASSERT_EQ("sixth", Get(1, "foo"));
ASSERT_EQ("fourth", Get(1, "foo", snapshot2)); ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
ASSERT_EQ("first", Get(1, "foo", snapshot1)); ASSERT_EQ("first", Get(1, "foo", snapshot1));
@ -6948,7 +6954,8 @@ TEST_F(DBTest, CompactBetweenSnapshots) {
// after we release the snapshot1, only two values left // after we release the snapshot1, only two values left
db_->ReleaseSnapshot(snapshot1); db_->ReleaseSnapshot(snapshot1);
FillLevels("a", "z", 1); FillLevels("a", "z", 1);
dbfull()->CompactRange(handles_[1], nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
// We have only one valid snapshot snapshot2. Since snapshot1 is // We have only one valid snapshot snapshot2. Since snapshot1 is
// not valid anymore, "first" should be removed by a compaction. // not valid anymore, "first" should be removed by a compaction.
@ -6959,7 +6966,8 @@ TEST_F(DBTest, CompactBetweenSnapshots) {
// after we release the snapshot2, only one value should be left // after we release the snapshot2, only one value should be left
db_->ReleaseSnapshot(snapshot2); db_->ReleaseSnapshot(snapshot2);
FillLevels("a", "z", 1); FillLevels("a", "z", 1);
dbfull()->CompactRange(handles_[1], nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_EQ("sixth", Get(1, "foo")); ASSERT_EQ("sixth", Get(1, "foo"));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
// skip HashCuckooRep as it does not support snapshot // skip HashCuckooRep as it does not support snapshot
@ -7256,7 +7264,7 @@ TEST_F(DBTest, ManualCompaction) {
// Compact all // Compact all
MakeTables(1, "a", "z", 1); MakeTables(1, "a", "z", 1);
ASSERT_EQ("0,1,2", FilesPerLevel(1)); ASSERT_EQ("0,1,2", FilesPerLevel(1));
db_->CompactRange(handles_[1], nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
ASSERT_EQ("0,0,1", FilesPerLevel(1)); ASSERT_EQ("0,0,1", FilesPerLevel(1));
if (iter == 0) { if (iter == 0) {
@ -7294,7 +7302,9 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId,
ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path));
// Full compaction to DB path 0 // Full compaction to DB path 0
db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 1); CompactRangeOptions compact_options;
compact_options.target_path_id = 1;
db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(1, TotalLiveFiles(1));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
@ -7315,13 +7325,15 @@ TEST_P(DBTestUniversalManualCompactionOutputPathId,
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
// Full compaction to DB path 0 // Full compaction to DB path 0
db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 0); compact_options.target_path_id = 0;
db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(1, TotalLiveFiles(1));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path));
// Fail when compacting to an invalid path ID // Fail when compacting to an invalid path ID
ASSERT_TRUE(db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 2) compact_options.target_path_id = 2;
ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)
.IsInvalidArgument()); .IsInvalidArgument());
} }
@ -7378,7 +7390,9 @@ TEST_F(DBTest, ManualLevelCompactionOutputPathId) {
ASSERT_EQ("1,2", FilesPerLevel(1)); ASSERT_EQ("1,2", FilesPerLevel(1));
ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
db_->CompactRange(handles_[1], nullptr, nullptr, false, 1, 1); CompactRangeOptions compact_options;
compact_options.target_path_id = 1;
db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
ASSERT_EQ("0,1", FilesPerLevel(1)); ASSERT_EQ("0,1", FilesPerLevel(1));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
@ -7447,7 +7461,7 @@ TEST_F(DBTest, DBOpen_Change_NumLevels) {
ASSERT_OK(Put(1, "a", "123")); ASSERT_OK(Put(1, "a", "123"));
ASSERT_OK(Put(1, "b", "234")); ASSERT_OK(Put(1, "b", "234"));
db_->CompactRange(handles_[1], nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
Close(); Close();
options.create_if_missing = false; options.create_if_missing = false;
@ -7518,7 +7532,7 @@ TEST_F(DBTest, DropWrites) {
true /* disallow trivial move */); true /* disallow trivial move */);
} }
} else { } else {
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
} }
} }
@ -8076,7 +8090,8 @@ TEST_F(DBTest, CompactOnFlush) {
ASSERT_OK(Flush(1)); ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
dbfull()->CompactRange(handles_[1], nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
// Case 2: Delete followed by another delete // Case 2: Delete followed by another delete
@ -8085,7 +8100,8 @@ TEST_F(DBTest, CompactOnFlush) {
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
ASSERT_OK(Flush(1)); ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
dbfull()->CompactRange(handles_[1], nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
// Case 3: Put followed by a delete // Case 3: Put followed by a delete
@ -8094,7 +8110,8 @@ TEST_F(DBTest, CompactOnFlush) {
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
ASSERT_OK(Flush(1)); ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
dbfull()->CompactRange(handles_[1], nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
// Case 4: Put followed by another Put // Case 4: Put followed by another Put
@ -8103,12 +8120,14 @@ TEST_F(DBTest, CompactOnFlush) {
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
ASSERT_OK(Flush(1)); ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
dbfull()->CompactRange(handles_[1], nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
// clear database // clear database
Delete(1, "foo"); Delete(1, "foo");
dbfull()->CompactRange(handles_[1], nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
// Case 5: Put followed by snapshot followed by another Put // Case 5: Put followed by snapshot followed by another Put
@ -8122,7 +8141,8 @@ TEST_F(DBTest, CompactOnFlush) {
// clear database // clear database
Delete(1, "foo"); Delete(1, "foo");
dbfull()->CompactRange(handles_[1], nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
// Case 5: snapshot followed by a put followed by another Put // Case 5: snapshot followed by a put followed by another Put
@ -9061,10 +9081,9 @@ class ModelDB: public DB {
} }
} }
using DB::CompactRange; using DB::CompactRange;
virtual Status CompactRange(ColumnFamilyHandle* column_family, virtual Status CompactRange(const CompactRangeOptions& options,
const Slice* start, const Slice* end, ColumnFamilyHandle* column_family,
bool reduce_level, int target_level, const Slice* start, const Slice* end) override {
uint32_t output_path_id) override {
return Status::NotSupported("Not supported operation."); return Status::NotSupported("Not supported operation.");
} }
@ -9432,7 +9451,8 @@ void PrefixScanInit(DBTest *dbtest) {
keystr = std::string(buf); keystr = std::string(buf);
ASSERT_OK(dbtest->Put(keystr, keystr)); ASSERT_OK(dbtest->Put(keystr, keystr));
dbtest->Flush(); dbtest->Flush();
dbtest->dbfull()->CompactRange(nullptr, nullptr); // move to level 1 dbtest->dbfull()->CompactRange(CompactRangeOptions(), nullptr,
nullptr); // move to level 1
// GROUP 1 // GROUP 1
for (int i = 1; i <= small_range_sstfiles; i++) { for (int i = 1; i <= small_range_sstfiles; i++) {
@ -9685,7 +9705,7 @@ TEST_F(DBTest, TailingIteratorIncomplete) {
// we either see the entry or it's not in cache // we either see the entry or it's not in cache
ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
ASSERT_OK(db_->CompactRange(nullptr, nullptr)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
iter->SeekToFirst(); iter->SeekToFirst();
// should still be true after compaction // should still be true after compaction
ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
@ -9910,7 +9930,7 @@ TEST_F(DBTest, ManagedTailingIteratorIncomplete) {
// we either see the entry or it's not in cache // we either see the entry or it's not in cache
ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
ASSERT_OK(db_->CompactRange(nullptr, nullptr)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
iter->SeekToFirst(); iter->SeekToFirst();
// should still be true after compaction // should still be true after compaction
ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
@ -10039,7 +10059,7 @@ TEST_F(DBTest, FIFOCompactionTest) {
if (iter == 0) { if (iter == 0) {
ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(dbfull()->TEST_WaitForCompact());
} else { } else {
ASSERT_OK(db_->CompactRange(nullptr, nullptr)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
} }
// only 5 files should survive // only 5 files should survive
ASSERT_EQ(NumTableFilesAtLevel(0), 5); ASSERT_EQ(NumTableFilesAtLevel(0), 5);
@ -10760,7 +10780,7 @@ TEST_F(DBTest, DynamicMemtableOptions) {
ASSERT_GT(SizeAtLevel(0), k64KB - k5KB); ASSERT_GT(SizeAtLevel(0), k64KB - k5KB);
// Clean up L0 // Clean up L0
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_EQ(NumTableFilesAtLevel(0), 0);
// Increase buffer size // Increase buffer size
@ -10818,7 +10838,7 @@ TEST_F(DBTest, DynamicMemtableOptions) {
{"max_write_buffer_number", "8"}, {"max_write_buffer_number", "8"},
})); }));
// Clean up memtable and L0 // Clean up memtable and L0
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
SleepingBackgroundTask sleeping_task_low2; SleepingBackgroundTask sleeping_task_low2;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2, env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2,
@ -10839,7 +10859,7 @@ TEST_F(DBTest, DynamicMemtableOptions) {
{"max_write_buffer_number", "4"}, {"max_write_buffer_number", "4"},
})); }));
// Clean up memtable and L0 // Clean up memtable and L0
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
SleepingBackgroundTask sleeping_task_low3; SleepingBackgroundTask sleeping_task_low3;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low3, env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low3,
@ -11077,7 +11097,7 @@ TEST_F(DBTest, PreShutdownManualCompaction) {
MakeTables(1, "a", "z", 1); MakeTables(1, "a", "z", 1);
ASSERT_EQ("0,1,2", FilesPerLevel(1)); ASSERT_EQ("0,1,2", FilesPerLevel(1));
CancelAllBackgroundWork(db_); CancelAllBackgroundWork(db_);
db_->CompactRange(handles_[1], nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
ASSERT_EQ("0,1,2", FilesPerLevel(1)); ASSERT_EQ("0,1,2", FilesPerLevel(1));
if (iter == 0) { if (iter == 0) {
@ -11349,7 +11369,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesBase) {
} }
// Test compact range works // Test compact range works
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// All data should be in the last level. // All data should be in the last level.
ColumnFamilyMetaData cf_meta; ColumnFamilyMetaData cf_meta;
db_->GetColumnFamilyMetaData(&cf_meta); db_->GetColumnFamilyMetaData(&cf_meta);
@ -11542,7 +11562,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesCompactRange) {
DestroyAndReopen(options); DestroyAndReopen(options);
// Compact against empty DB // Compact against empty DB
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
uint64_t int_prop; uint64_t int_prop;
std::string str_prop; std::string str_prop;
@ -11583,7 +11603,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesCompactRange) {
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(output_levels.size(), 2); ASSERT_EQ(output_levels.size(), 2);
ASSERT_TRUE(output_levels.find(3) != output_levels.end()); ASSERT_TRUE(output_levels.find(3) != output_levels.end());
ASSERT_TRUE(output_levels.find(4) != output_levels.end()); ASSERT_TRUE(output_levels.find(4) != output_levels.end());
@ -11701,7 +11721,10 @@ TEST_F(DBTest, MigrateToDynamicLevelMaxBytesBase) {
// Issue manual compaction in one thread and still verify DB state // Issue manual compaction in one thread and still verify DB state
// in main thread. // in main thread.
std::thread t([&]() { std::thread t([&]() {
dbfull()->CompactRange(nullptr, nullptr, true, options.num_levels - 1); CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = options.num_levels - 1;
dbfull()->CompactRange(compact_options, nullptr, nullptr);
compaction_finished.store(true); compaction_finished.store(true);
}); });
do { do {
@ -12080,7 +12103,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
// Clean up memtable and L0. Block compaction threads. If continue to write // Clean up memtable and L0. Block compaction threads. If continue to write
// and flush memtables. We should see put timeout after 8 memtable flushes // and flush memtables. We should see put timeout after 8 memtable flushes
// since level0_stop_writes_trigger = 8 // since level0_stop_writes_trigger = 8
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// Block compaction // Block compaction
SleepingBackgroundTask sleeping_task_low1; SleepingBackgroundTask sleeping_task_low1;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low1, env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low1,
@ -12106,7 +12129,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
ASSERT_OK(dbfull()->SetOptions({ ASSERT_OK(dbfull()->SetOptions({
{"level0_stop_writes_trigger", "6"} {"level0_stop_writes_trigger", "6"}
})); }));
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_EQ(NumTableFilesAtLevel(0), 0);
// Block compaction // Block compaction
@ -12131,7 +12154,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
ASSERT_OK(dbfull()->SetOptions({ ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "true"} {"disable_auto_compactions", "true"}
})); }));
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_EQ(NumTableFilesAtLevel(0), 0);
for (int i = 0; i < 4; ++i) { for (int i = 0; i < 4; ++i) {
@ -12147,7 +12170,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
ASSERT_OK(dbfull()->SetOptions({ ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "false"} {"disable_auto_compactions", "false"}
})); }));
dbfull()->CompactRange(nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_EQ(NumTableFilesAtLevel(0), 0);
for (int i = 0; i < 4; ++i) { for (int i = 0; i < 4; ++i) {
@ -12924,7 +12947,7 @@ TEST_F(DBTest, FilterCompactionTimeTest) {
Flush(); Flush();
} }
ASSERT_OK(db_->CompactRange(nullptr, nullptr)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(0U, CountLiveFiles()); ASSERT_EQ(0U, CountLiveFiles());
Reopen(options); Reopen(options);
@ -13338,7 +13361,7 @@ TEST_F(DBTest, PromoteL0Failure) {
status = experimental::PromoteL0(db_, db_->DefaultColumnFamily()); status = experimental::PromoteL0(db_, db_->DefaultColumnFamily());
ASSERT_TRUE(status.IsInvalidArgument()); ASSERT_TRUE(status.IsInvalidArgument());
ASSERT_OK(db_->CompactRange(nullptr, nullptr)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Now there is a file in L1. // Now there is a file in L1.
ASSERT_GE(NumTableFilesAtLevel(1, 0), 1); ASSERT_GE(NumTableFilesAtLevel(1, 0), 1);
@ -13365,7 +13388,7 @@ TEST_F(DBTest, HugeNumberOfLevels) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
} }
ASSERT_OK(db_->CompactRange(nullptr, nullptr)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
} }
// Github issue #595 // Github issue #595
@ -13491,7 +13514,10 @@ TEST_F(DBTest, UniversalCompactionTargetLevel) {
ASSERT_EQ("3", FilesPerLevel(0)); ASSERT_EQ("3", FilesPerLevel(0));
// Compact all files into 1 file and put it in L4 // Compact all files into 1 file and put it in L4
db_->CompactRange(nullptr, nullptr, true, 4); CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 4;
db_->CompactRange(compact_options, nullptr, nullptr);
ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0)); ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
} }
@ -13516,7 +13542,7 @@ TEST_F(DBTest, SuggestCompactRangeNoTwoLevel0Compactions) {
for (int num = 0; num < 10; num++) { for (int num = 0; num < 10; num++) {
GenerateNewRandomFile(&rnd); GenerateNewRandomFile(&rnd);
} }
db_->CompactRange(nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
rocksdb::SyncPoint::GetInstance()->LoadDependency( rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"CompactionJob::Run():Start", {{"CompactionJob::Run():Start",

@ -201,8 +201,11 @@ TEST_F(DeleteFileTest, PurgeObsoleteFilesTest) {
// 2 ssts, 1 manifest // 2 ssts, 1 manifest
CheckFileTypeCounts(dbname_, 0, 2, 1); CheckFileTypeCounts(dbname_, 0, 2, 1);
std::string first("0"), last("999999"); std::string first("0"), last("999999");
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 2;
Slice first_slice(first), last_slice(last); Slice first_slice(first), last_slice(last);
db_->CompactRange(&first_slice, &last_slice, true, 2); db_->CompactRange(compact_options, &first_slice, &last_slice);
// 1 sst after compaction // 1 sst after compaction
CheckFileTypeCounts(dbname_, 0, 1, 1); CheckFileTypeCounts(dbname_, 0, 1, 1);
@ -211,7 +214,7 @@ TEST_F(DeleteFileTest, PurgeObsoleteFilesTest) {
Iterator *itr = 0; Iterator *itr = 0;
CreateTwoLevels(); CreateTwoLevels();
itr = db_->NewIterator(ReadOptions()); itr = db_->NewIterator(ReadOptions());
db_->CompactRange(&first_slice, &last_slice, true, 2); db_->CompactRange(compact_options, &first_slice, &last_slice);
// 3 sst after compaction with live iterator // 3 sst after compaction with live iterator
CheckFileTypeCounts(dbname_, 0, 3, 1); CheckFileTypeCounts(dbname_, 0, 3, 1);
delete itr; delete itr;

@ -659,7 +659,7 @@ class FaultInjectionTest : public testing::Test {
Build(write_options, 0, num_pre_sync); Build(write_options, 0, num_pre_sync);
if (sync_use_compact_) { if (sync_use_compact_) {
db_->CompactRange(nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
} }
write_options.sync = false; write_options.sync = false;
Build(write_options, num_pre_sync, num_post_sync); Build(write_options, num_pre_sync, num_post_sync);

@ -201,7 +201,8 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
ASSERT_OK(Flush(static_cast<int>(i))); ASSERT_OK(Flush(static_cast<int>(i)));
const Slice kStart = "a"; const Slice kStart = "a";
const Slice kEnd = "z"; const Slice kEnd = "z";
ASSERT_OK(dbfull()->CompactRange(handles_[i], &kStart, &kEnd)); ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i],
&kStart, &kEnd));
dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
} }

@ -294,7 +294,7 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) {
db->Flush(o); db->Flush(o);
cout << "Compaction started ...\n"; cout << "Compaction started ...\n";
db->CompactRange(nullptr, nullptr); db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
cout << "Compaction ended\n"; cout << "Compaction ended\n";
dumpDb(db); dumpDb(db);
@ -341,7 +341,7 @@ void testPartialMerge(Counters* counters, DB* db, size_t max_merge,
tmp_sum += i; tmp_sum += i;
} }
db->Flush(o); db->Flush(o);
db->CompactRange(nullptr, nullptr); db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(tmp_sum, counters->assert_get("b")); ASSERT_EQ(tmp_sum, counters->assert_get("b"));
if (count > max_merge) { if (count > max_merge) {
// in this case, FullMerge should be called instead. // in this case, FullMerge should be called instead.
@ -360,7 +360,7 @@ void testPartialMerge(Counters* counters, DB* db, size_t max_merge,
tmp_sum += i; tmp_sum += i;
} }
db->Flush(o); db->Flush(o);
db->CompactRange(nullptr, nullptr); db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(tmp_sum, counters->assert_get("c")); ASSERT_EQ(tmp_sum, counters->assert_get("c"));
ASSERT_EQ(num_partial_merge_calls, 0U); ASSERT_EQ(num_partial_merge_calls, 0U);
} }
@ -467,7 +467,7 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) {
counters.add("test-key", 1); counters.add("test-key", 1);
counters.add("test-key", 1); counters.add("test-key", 1);
counters.add("test-key", 1); counters.add("test-key", 1);
db->CompactRange(nullptr, nullptr); db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
} }
DB* reopen_db; DB* reopen_db;

@ -602,6 +602,9 @@ extern void rocksdb_options_set_max_bytes_for_level_multiplier_additional(
rocksdb_options_t*, int* level_values, size_t num_levels); rocksdb_options_t*, int* level_values, size_t num_levels);
extern void rocksdb_options_enable_statistics(rocksdb_options_t*); extern void rocksdb_options_enable_statistics(rocksdb_options_t*);
/* returns a pointer to a malloc()-ed, null terminated string */
extern char *rocksdb_options_statistics_get_string(rocksdb_options_t *opt);
extern void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t*, int); extern void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t*, int);
extern void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t*, int); extern void rocksdb_options_set_min_write_buffer_number_to_merge(rocksdb_options_t*, int);
extern void rocksdb_options_set_max_write_buffer_number_to_maintain( extern void rocksdb_options_set_max_write_buffer_number_to_maintain(

@ -33,6 +33,7 @@ struct ReadOptions;
struct WriteOptions; struct WriteOptions;
struct FlushOptions; struct FlushOptions;
struct CompactionOptions; struct CompactionOptions;
struct CompactRangeOptions;
struct TableProperties; struct TableProperties;
class WriteBatch; class WriteBatch;
class Env; class Env;
@ -415,25 +416,42 @@ class DB {
// begin==nullptr is treated as a key before all keys in the database. // begin==nullptr is treated as a key before all keys in the database.
// end==nullptr is treated as a key after all keys in the database. // end==nullptr is treated as a key after all keys in the database.
// Therefore the following call will compact the entire database: // Therefore the following call will compact the entire database:
// db->CompactRange(nullptr, nullptr); // db->CompactRange(options, nullptr, nullptr);
// Note that after the entire database is compacted, all data are pushed // Note that after the entire database is compacted, all data are pushed
// down to the last level containing any data. If the total data size // down to the last level containing any data. If the total data size after
// after compaction is reduced, that level might not be appropriate for // compaction is reduced, that level might not be appropriate for hosting all
// hosting all the files. In this case, client could set change_level // the files. In this case, client could set options.change_level to true, to
// to true, to move the files back to the minimum level capable of holding // move the files back to the minimum level capable of holding the data set
// the data set or a given level (specified by non-negative target_level). // or a given level (specified by non-negative options.target_level).
// Compaction outputs should be placed in options.db_paths[target_path_id]. virtual Status CompactRange(const CompactRangeOptions& options,
// Behavior is undefined if target_path_id is out of range. ColumnFamilyHandle* column_family,
virtual Status CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end) = 0;
const Slice* begin, const Slice* end, virtual Status CompactRange(const CompactRangeOptions& options,
bool change_level = false, int target_level = -1, const Slice* begin, const Slice* end) {
uint32_t target_path_id = 0) = 0; return CompactRange(options, DefaultColumnFamily(), begin, end);
virtual Status CompactRange(const Slice* begin, const Slice* end,
bool change_level = false, int target_level = -1,
uint32_t target_path_id = 0) {
return CompactRange(DefaultColumnFamily(), begin, end, change_level,
target_level, target_path_id);
} }
__attribute__((deprecated)) virtual Status
CompactRange(ColumnFamilyHandle* column_family, const Slice* begin,
const Slice* end, bool change_level = false,
int target_level = -1, uint32_t target_path_id = 0) {
CompactRangeOptions options;
options.change_level = change_level;
options.target_level = target_level;
options.target_path_id = target_path_id;
return CompactRange(options, column_family, begin, end);
}
__attribute__((deprecated)) virtual Status
CompactRange(const Slice* begin, const Slice* end,
bool change_level = false, int target_level = -1,
uint32_t target_path_id = 0) {
CompactRangeOptions options;
options.change_level = change_level;
options.target_level = target_level;
options.target_path_id = target_path_id;
return CompactRange(options, DefaultColumnFamily(), begin, end);
}
virtual Status SetOptions(ColumnFamilyHandle* column_family, virtual Status SetOptions(ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& new_options) { const std::unordered_map<std::string, std::string>& new_options) {
return Status::NotSupported("Not implemented"); return Status::NotSupported("Not implemented");

@ -1237,6 +1237,19 @@ struct CompactionOptions {
: compression(kSnappyCompression), : compression(kSnappyCompression),
output_file_size_limit(std::numeric_limits<uint64_t>::max()) {} output_file_size_limit(std::numeric_limits<uint64_t>::max()) {}
}; };
// CompactRangeOptions is used by CompactRange() call.
struct CompactRangeOptions {
// If true, compacted files will be moved to the minimum level capable
// of holding the data or given level (specified non-negative target_level).
bool change_level = false;
// If change_level is true and target_level have non-negative value, compacted
// files will be moved to target_level.
int target_level = -1;
// Compaction outputs will be placed in options.db_paths[target_path_id].
// Behavior is undefined if target_path_id is out of range.
uint32_t target_path_id = 0;
};
} // namespace rocksdb } // namespace rocksdb
#endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_ #endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_

@ -127,12 +127,10 @@ class StackableDB : public DB {
} }
using DB::CompactRange; using DB::CompactRange;
virtual Status CompactRange(ColumnFamilyHandle* column_family, virtual Status CompactRange(const CompactRangeOptions& options,
const Slice* begin, const Slice* end, ColumnFamilyHandle* column_family,
bool change_level = false, int target_level = -1, const Slice* begin, const Slice* end) override {
uint32_t target_path_id = 0) override { return db_->CompactRange(options, column_family, begin, end);
return db_->CompactRange(column_family, begin, end, change_level,
target_level, target_path_id);
} }
using DB::CompactFiles; using DB::CompactFiles;

@ -1476,13 +1476,15 @@ void rocksdb_compactrange_helper(JNIEnv* env, rocksdb::DB* db,
jint jtarget_level, jint jtarget_path_id) { jint jtarget_level, jint jtarget_path_id) {
rocksdb::Status s; rocksdb::Status s;
rocksdb::CompactRangeOptions compact_options;
compact_options.change_level = jreduce_level;
compact_options.target_level = jtarget_level;
compact_options.target_path_id = static_cast<uint32_t>(jtarget_path_id);
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
s = db->CompactRange(cf_handle, nullptr, nullptr, jreduce_level, s = db->CompactRange(compact_options, cf_handle, nullptr, nullptr);
jtarget_level, static_cast<uint32_t>(jtarget_path_id));
} else { } else {
// backwards compatibility // backwards compatibility
s = db->CompactRange(nullptr, nullptr, jreduce_level, s = db->CompactRange(compact_options, nullptr, nullptr);
jtarget_level, static_cast<uint32_t>(jtarget_path_id));
} }
if (s.ok()) { if (s.ok()) {
@ -1533,13 +1535,15 @@ void rocksdb_compactrange_helper(JNIEnv* env, rocksdb::DB* db,
const rocksdb::Slice end_slice(reinterpret_cast<char*>(end), jend_len); const rocksdb::Slice end_slice(reinterpret_cast<char*>(end), jend_len);
rocksdb::Status s; rocksdb::Status s;
rocksdb::CompactRangeOptions compact_options;
compact_options.change_level = jreduce_level;
compact_options.target_level = jtarget_level;
compact_options.target_path_id = static_cast<uint32_t>(jtarget_path_id);
if (cf_handle != nullptr) { if (cf_handle != nullptr) {
s = db->CompactRange(cf_handle, &begin_slice, &end_slice, jreduce_level, s = db->CompactRange(compact_options, cf_handle, &begin_slice, &end_slice);
jtarget_level, static_cast<uint32_t>(jtarget_path_id));
} else { } else {
// backwards compatibility // backwards compatibility
s = db->CompactRange(&begin_slice, &end_slice, jreduce_level, s = db->CompactRange(compact_options, &begin_slice, &end_slice);
jtarget_level, static_cast<uint32_t>(jtarget_path_id));
} }
env->ReleaseByteArrayElements(jbegin, begin, JNI_ABORT); env->ReleaseByteArrayElements(jbegin, begin, JNI_ABORT);

@ -1766,7 +1766,7 @@ class PosixEnv : public Env {
ThreadPool* tp = meta->thread_pool_; ThreadPool* tp = meta->thread_pool_;
#if ROCKSDB_USING_THREAD_STATUS #if ROCKSDB_USING_THREAD_STATUS
// for thread-status // for thread-status
ThreadStatusUtil::SetThreadType(tp->env_, ThreadStatusUtil::RegisterThread(tp->env_,
(tp->GetThreadPriority() == Env::Priority::HIGH ? (tp->GetThreadPriority() == Env::Priority::HIGH ?
ThreadStatus::HIGH_PRIORITY : ThreadStatus::HIGH_PRIORITY :
ThreadStatus::LOW_PRIORITY)); ThreadStatus::LOW_PRIORITY));

@ -441,7 +441,7 @@ void CompactorCommand::DoCommand() {
end = new Slice(to_); end = new Slice(to_);
} }
db_->CompactRange(begin, end); db_->CompactRange(CompactRangeOptions(), begin, end);
exec_state_ = LDBCommandExecuteResult::Succeed(""); exec_state_ = LDBCommandExecuteResult::Succeed("");
delete begin; delete begin;
@ -519,7 +519,7 @@ void DBLoaderCommand::DoCommand() {
cout << "Warning: " << bad_lines << " bad lines ignored." << endl; cout << "Warning: " << bad_lines << " bad lines ignored." << endl;
} }
if (compact_) { if (compact_) {
db_->CompactRange(nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
} }
} }
@ -1204,7 +1204,7 @@ void ReduceDBLevelsCommand::DoCommand() {
} }
// Compact the whole DB to put all files to the highest level. // Compact the whole DB to put all files to the highest level.
fprintf(stdout, "Compacting the db...\n"); fprintf(stdout, "Compacting the db...\n");
db_->CompactRange(nullptr, nullptr); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
CloseDB(); CloseDB();
EnvOptions soptions; EnvOptions soptions;
@ -1309,9 +1309,10 @@ void ChangeCompactionStyleCommand::DoCommand() {
files_per_level.c_str()); files_per_level.c_str());
// manual compact into a single file and move the file to level 0 // manual compact into a single file and move the file to level 0
db_->CompactRange(nullptr, nullptr, CompactRangeOptions compact_options;
true /* reduce level */, compact_options.change_level = true;
0 /* reduce to level 0 */); compact_options.target_level = 0;
db_->CompactRange(compact_options, nullptr, nullptr);
// verify compaction result // verify compaction result
files_per_level = ""; files_per_level = "";

@ -77,7 +77,7 @@ TEST_F(ManualCompactionTest, CompactTouchesAllKeys) {
db->Put(WriteOptions(), Slice("key4"), Slice("destroy")); db->Put(WriteOptions(), Slice("key4"), Slice("destroy"));
Slice key4("key4"); Slice key4("key4");
db->CompactRange(nullptr, &key4); db->CompactRange(CompactRangeOptions(), nullptr, &key4);
Iterator* itr = db->NewIterator(ReadOptions()); Iterator* itr = db->NewIterator(ReadOptions());
itr->SeekToFirst(); itr->SeekToFirst();
ASSERT_TRUE(itr->Valid()); ASSERT_TRUE(itr->Valid());
@ -130,7 +130,7 @@ TEST_F(ManualCompactionTest, Test) {
rocksdb::Slice greatest(end_key.data(), end_key.size()); rocksdb::Slice greatest(end_key.data(), end_key.size());
// commenting out the line below causes the example to work correctly // commenting out the line below causes the example to work correctly
db->CompactRange(&least, &greatest); db->CompactRange(CompactRangeOptions(), &least, &greatest);
// count the keys // count the keys
rocksdb::Iterator* iter = db->NewIterator(rocksdb::ReadOptions()); rocksdb::Iterator* iter = db->NewIterator(rocksdb::ReadOptions());

@ -15,6 +15,19 @@ namespace rocksdb {
__thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr; __thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr;
void ThreadStatusUpdater::RegisterThread(
ThreadStatus::ThreadType ttype, uint64_t thread_id) {
if (UNLIKELY(thread_status_data_ == nullptr)) {
thread_status_data_ = new ThreadStatusData();
thread_status_data_->thread_type = ttype;
thread_status_data_->thread_id = thread_id;
std::lock_guard<std::mutex> lck(thread_list_mutex_);
thread_data_set_.insert(thread_status_data_);
}
ClearThreadOperationProperties();
}
void ThreadStatusUpdater::UnregisterThread() { void ThreadStatusUpdater::UnregisterThread() {
if (thread_status_data_ != nullptr) { if (thread_status_data_ != nullptr) {
std::lock_guard<std::mutex> lck(thread_list_mutex_); std::lock_guard<std::mutex> lck(thread_list_mutex_);
@ -24,18 +37,6 @@ void ThreadStatusUpdater::UnregisterThread() {
} }
} }
void ThreadStatusUpdater::SetThreadID(uint64_t thread_id) {
auto* data = InitAndGet();
data->thread_id.store(thread_id, std::memory_order_relaxed);
}
void ThreadStatusUpdater::SetThreadType(
ThreadStatus::ThreadType ttype) {
auto* data = InitAndGet();
data->thread_type.store(ttype, std::memory_order_relaxed);
ClearThreadOperationProperties();
}
void ThreadStatusUpdater::ResetThreadStatus() { void ThreadStatusUpdater::ResetThreadStatus() {
ClearThreadState(); ClearThreadState();
ClearThreadOperation(); ClearThreadOperation();
@ -44,7 +45,10 @@ void ThreadStatusUpdater::ResetThreadStatus() {
void ThreadStatusUpdater::SetColumnFamilyInfoKey( void ThreadStatusUpdater::SetColumnFamilyInfoKey(
const void* cf_key) { const void* cf_key) {
auto* data = InitAndGet(); auto* data = Get();
if (data == nullptr) {
return;
}
// set the tracking flag based on whether cf_key is non-null or not. // set the tracking flag based on whether cf_key is non-null or not.
// If enable_thread_tracking is set to false, the input cf_key // If enable_thread_tracking is set to false, the input cf_key
// would be nullptr. // would be nullptr.
@ -53,8 +57,8 @@ void ThreadStatusUpdater::SetColumnFamilyInfoKey(
} }
const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() { const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() {
auto* data = InitAndGet(); auto* data = GetLocalThreadStatus();
if (data->enable_tracking == false) { if (data == nullptr) {
return nullptr; return nullptr;
} }
return data->cf_key.load(std::memory_order_relaxed); return data->cf_key.load(std::memory_order_relaxed);
@ -62,9 +66,8 @@ const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() {
void ThreadStatusUpdater::SetThreadOperation( void ThreadStatusUpdater::SetThreadOperation(
const ThreadStatus::OperationType type) { const ThreadStatus::OperationType type) {
auto* data = InitAndGet(); auto* data = GetLocalThreadStatus();
if (!data->enable_tracking) { if (data == nullptr) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return; return;
} }
// NOTE: Our practice here is to set all the thread operation properties // NOTE: Our practice here is to set all the thread operation properties
@ -82,9 +85,8 @@ void ThreadStatusUpdater::SetThreadOperation(
void ThreadStatusUpdater::SetThreadOperationProperty( void ThreadStatusUpdater::SetThreadOperationProperty(
int i, uint64_t value) { int i, uint64_t value) {
auto* data = InitAndGet(); auto* data = GetLocalThreadStatus();
if (!data->enable_tracking) { if (data == nullptr) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return; return;
} }
data->op_properties[i].store(value, std::memory_order_relaxed); data->op_properties[i].store(value, std::memory_order_relaxed);
@ -92,27 +94,24 @@ void ThreadStatusUpdater::SetThreadOperationProperty(
void ThreadStatusUpdater::IncreaseThreadOperationProperty( void ThreadStatusUpdater::IncreaseThreadOperationProperty(
int i, uint64_t delta) { int i, uint64_t delta) {
auto* data = InitAndGet(); auto* data = GetLocalThreadStatus();
if (!data->enable_tracking) { if (data == nullptr) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return; return;
} }
data->op_properties[i].fetch_add(delta, std::memory_order_relaxed); data->op_properties[i].fetch_add(delta, std::memory_order_relaxed);
} }
void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) { void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) {
auto* data = InitAndGet(); auto* data = GetLocalThreadStatus();
if (!data->enable_tracking) { if (data == nullptr) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return; return;
} }
data->op_start_time.store(start_time, std::memory_order_relaxed); data->op_start_time.store(start_time, std::memory_order_relaxed);
} }
void ThreadStatusUpdater::ClearThreadOperation() { void ThreadStatusUpdater::ClearThreadOperation() {
auto* data = InitAndGet(); auto* data = GetLocalThreadStatus();
if (!data->enable_tracking) { if (data == nullptr) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return; return;
} }
data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN, data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
@ -123,9 +122,8 @@ void ThreadStatusUpdater::ClearThreadOperation() {
} }
void ThreadStatusUpdater::ClearThreadOperationProperties() { void ThreadStatusUpdater::ClearThreadOperationProperties() {
auto* data = InitAndGet(); auto* data = GetLocalThreadStatus();
if (!data->enable_tracking) { if (data == nullptr) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return; return;
} }
for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) { for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
@ -135,9 +133,8 @@ void ThreadStatusUpdater::ClearThreadOperationProperties() {
ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage( ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage(
ThreadStatus::OperationStage stage) { ThreadStatus::OperationStage stage) {
auto* data = InitAndGet(); auto* data = GetLocalThreadStatus();
if (!data->enable_tracking) { if (data == nullptr) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return ThreadStatus::STAGE_UNKNOWN; return ThreadStatus::STAGE_UNKNOWN;
} }
return data->operation_stage.exchange( return data->operation_stage.exchange(
@ -146,18 +143,16 @@ ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage(
void ThreadStatusUpdater::SetThreadState( void ThreadStatusUpdater::SetThreadState(
const ThreadStatus::StateType type) { const ThreadStatus::StateType type) {
auto* data = InitAndGet(); auto* data = GetLocalThreadStatus();
if (!data->enable_tracking) { if (data == nullptr) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return; return;
} }
data->state_type.store(type, std::memory_order_relaxed); data->state_type.store(type, std::memory_order_relaxed);
} }
void ThreadStatusUpdater::ClearThreadState() { void ThreadStatusUpdater::ClearThreadState() {
auto* data = InitAndGet(); auto* data = GetLocalThreadStatus();
if (!data->enable_tracking) { if (data == nullptr) {
assert(data->cf_key.load(std::memory_order_relaxed) == nullptr);
return; return;
} }
data->state_type.store( data->state_type.store(
@ -222,11 +217,14 @@ Status ThreadStatusUpdater::GetThreadList(
return Status::OK(); return Status::OK();
} }
ThreadStatusData* ThreadStatusUpdater::InitAndGet() { ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() {
if (UNLIKELY(thread_status_data_ == nullptr)) { if (thread_status_data_ == nullptr) {
thread_status_data_ = new ThreadStatusData(); return nullptr;
std::lock_guard<std::mutex> lck(thread_list_mutex_); }
thread_data_set_.insert(thread_status_data_); if (!thread_status_data_->enable_tracking) {
assert(thread_status_data_->cf_key.load(
std::memory_order_relaxed) == nullptr);
return nullptr;
} }
return thread_status_data_; return thread_status_data_;
} }
@ -290,17 +288,14 @@ void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) {
#else #else
void ThreadStatusUpdater::UnregisterThread() { void ThreadStatusUpdater::RegisterThread(
ThreadStatus::ThreadType ttype, uint64_t thread_id) {
} }
void ThreadStatusUpdater::ResetThreadStatus() { void ThreadStatusUpdater::UnregisterThread() {
}
void ThreadStatusUpdater::SetThreadID(uint64_t thread_id) {
} }
void ThreadStatusUpdater::SetThreadType( void ThreadStatusUpdater::ResetThreadStatus() {
ThreadStatus::ThreadType ttype) {
} }
void ThreadStatusUpdater::SetColumnFamilyInfoKey( void ThreadStatusUpdater::SetColumnFamilyInfoKey(

@ -118,8 +118,8 @@ class ThreadStatusUpdater {
// Set the id of the current thread. // Set the id of the current thread.
void SetThreadID(uint64_t thread_id); void SetThreadID(uint64_t thread_id);
// Set the thread type of the current thread. // Register the current thread for tracking.
void SetThreadType(ThreadStatus::ThreadType ttype); void RegisterThread(ThreadStatus::ThreadType ttype, uint64_t thread_id);
// Update the column-family info of the current thread by setting // Update the column-family info of the current thread by setting
// its thread-local pointer of ThreadStateInfo to the correct entry. // its thread-local pointer of ThreadStateInfo to the correct entry.
@ -198,9 +198,15 @@ class ThreadStatusUpdater {
// The thread-local variable for storing thread status. // The thread-local variable for storing thread status.
static __thread ThreadStatusData* thread_status_data_; static __thread ThreadStatusData* thread_status_data_;
// Obtain the pointer to the thread status data. It also performs // Returns the pointer to the thread status data only when the
// initialization when necessary. // thread status data is non-null and has enable_tracking == true.
ThreadStatusData* InitAndGet(); ThreadStatusData* GetLocalThreadStatus();
// Directly returns the pointer to thread_status_data_ without
// checking whether enabling_tracking is true of not.
ThreadStatusData* Get() {
return thread_status_data_;
}
// The mutex that protects cf_info_map and db_key_map. // The mutex that protects cf_info_map and db_key_map.
std::mutex thread_list_mutex_; std::mutex thread_list_mutex_;

@ -15,14 +15,14 @@ __thread ThreadStatusUpdater*
ThreadStatusUtil::thread_updater_local_cache_ = nullptr; ThreadStatusUtil::thread_updater_local_cache_ = nullptr;
__thread bool ThreadStatusUtil::thread_updater_initialized_ = false; __thread bool ThreadStatusUtil::thread_updater_initialized_ = false;
void ThreadStatusUtil::SetThreadType( void ThreadStatusUtil::RegisterThread(
const Env* env, ThreadStatus::ThreadType thread_type) { const Env* env, ThreadStatus::ThreadType thread_type) {
if (!MaybeInitThreadLocalUpdater(env)) { if (!MaybeInitThreadLocalUpdater(env)) {
return; return;
} }
assert(thread_updater_local_cache_); assert(thread_updater_local_cache_);
thread_updater_local_cache_->SetThreadID(env->GetThreadID()); thread_updater_local_cache_->RegisterThread(
thread_updater_local_cache_->SetThreadType(thread_type); thread_type, env->GetThreadID());
} }
void ThreadStatusUtil::UnregisterThread() { void ThreadStatusUtil::UnregisterThread() {

@ -27,8 +27,8 @@ class ColumnFamilyData;
// all function calls to ThreadStatusUtil will be no-op. // all function calls to ThreadStatusUtil will be no-op.
class ThreadStatusUtil { class ThreadStatusUtil {
public: public:
// Set the thread type of the current thread. // Register the current thread for tracking.
static void SetThreadType( static void RegisterThread(
const Env* env, ThreadStatus::ThreadType thread_type); const Env* env, ThreadStatus::ThreadType thread_type);
// Unregister the current thread. // Unregister the current thread.

@ -54,10 +54,9 @@ class CompactedDBImpl : public DBImpl {
return Status::NotSupported("Not supported in compacted db mode."); return Status::NotSupported("Not supported in compacted db mode.");
} }
using DBImpl::CompactRange; using DBImpl::CompactRange;
virtual Status CompactRange(ColumnFamilyHandle* column_family, virtual Status CompactRange(const CompactRangeOptions& options,
const Slice* begin, const Slice* end, ColumnFamilyHandle* column_family,
bool change_level = false, int target_level = -1, const Slice* begin, const Slice* end) override {
uint32_t target_path_id = 0) override {
return Status::NotSupported("Not supported in compacted db mode."); return Status::NotSupported("Not supported in compacted db mode.");
} }

@ -515,7 +515,7 @@ TEST_F(StringAppendOperatorTest, PersistentFlushAndCompaction) {
slists.Append("c", "bbnagnagsx"); slists.Append("c", "bbnagnagsx");
slists.Append("a", "sa"); slists.Append("a", "sa");
slists.Append("b", "df"); slists.Append("b", "df");
db->CompactRange(nullptr, nullptr); db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
slists.Get("a", &a); slists.Get("a", &a);
slists.Get("b", &b); slists.Get("b", &b);
slists.Get("c", &c); slists.Get("c", &c);
@ -536,7 +536,7 @@ TEST_F(StringAppendOperatorTest, PersistentFlushAndCompaction) {
ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh"); ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh");
// Compact, Get // Compact, Get
db->CompactRange(nullptr, nullptr); db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk"); ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk");
ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;"); ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;");
ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh"); ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh");
@ -544,7 +544,7 @@ TEST_F(StringAppendOperatorTest, PersistentFlushAndCompaction) {
// Append, Flush, Compact, Get // Append, Flush, Compact, Get
slists.Append("b", "afcg"); slists.Append("b", "afcg");
db->Flush(rocksdb::FlushOptions()); db->Flush(rocksdb::FlushOptions());
db->CompactRange(nullptr, nullptr); db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
slists.Get("b", &b); slists.Get("b", &b);
ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;\nafcg"); ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;\nafcg");
} }

@ -589,7 +589,7 @@ class SpatialDBImpl : public SpatialDB {
Status t = Flush(FlushOptions(), cfh); Status t = Flush(FlushOptions(), cfh);
if (t.ok()) { if (t.ok()) {
t = CompactRange(cfh, nullptr, nullptr); t = CompactRange(CompactRangeOptions(), cfh, nullptr, nullptr);
} }
{ {

@ -168,9 +168,9 @@ class TtlTest : public testing::Test {
// Runs a manual compaction // Runs a manual compaction
void ManualCompact(ColumnFamilyHandle* cf = nullptr) { void ManualCompact(ColumnFamilyHandle* cf = nullptr) {
if (cf == nullptr) { if (cf == nullptr) {
db_ttl_->CompactRange(nullptr, nullptr); db_ttl_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
} else { } else {
db_ttl_->CompactRange(cf, nullptr, nullptr); db_ttl_->CompactRange(CompactRangeOptions(), cf, nullptr, nullptr);
} }
} }

Loading…
Cancel
Save