Remove snap_refresh_nanos option (#5826)

Summary:
The snap_refresh_nanos option didn't bring much benefit. Remove the feature to simplify the code.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5826

Differential Revision: D17467147

Pulled By: maysamyabandeh

fbshipit-source-id: 4f950b046990d0d1292d7fc04c2ccafaf751c7f0
main
Maysam Yabandeh 5 years ago committed by Facebook Github Bot
parent a9c5e8e944
commit 6ec6a4a9a4
  1. 1
      HISTORY.md
  2. 5
      db/c.cc
  3. 54
      db/compaction/compaction_iterator.cc
  4. 57
      db/compaction/compaction_iterator.h
  5. 9
      db/compaction/compaction_job.cc
  6. 3
      db/compaction/compaction_job.h
  7. 105
      db/compaction/compaction_job_test.cc
  8. 40
      db/db_impl/db_impl_compaction_flush.cc
  9. 2
      include/rocksdb/c.h
  10. 12
      include/rocksdb/options.h
  11. 1
      monitoring/persistent_stats_history.cc
  12. 2
      options/cf_options.cc
  13. 3
      options/cf_options.h
  14. 4
      options/options.cc
  15. 6
      options/options_helper.cc
  16. 2
      options/options_settable_test.cc
  17. 2
      options/options_test.cc
  18. 14
      table/mock_table.cc
  19. 8
      table/mock_table.h
  20. 1
      tools/db_crashtest.py
  21. 7
      tools/db_stress.cc

@ -16,6 +16,7 @@
* Added max_write_buffer_size_to_maintain option to better control memory usage of immutable memtables. * Added max_write_buffer_size_to_maintain option to better control memory usage of immutable memtables.
* Added a lightweight API GetCurrentWalFile() to get last live WAL filename and size. Meant to be used as a helper for backup/restore tooling in a larger ecosystem such as MySQL with a MyRocks storage engine. * Added a lightweight API GetCurrentWalFile() to get last live WAL filename and size. Meant to be used as a helper for backup/restore tooling in a larger ecosystem such as MySQL with a MyRocks storage engine.
* The MemTable Bloom filter, when enabled, now always uses cache locality. Options::bloom_locality now only affects the PlainTable SST format. * The MemTable Bloom filter, when enabled, now always uses cache locality. Options::bloom_locality now only affects the PlainTable SST format.
* Deprecate `snap_refresh_nanos` option.
### Performance Improvements ### Performance Improvements
* Improve the speed of the MemTable Bloom filter, reducing the write overhead of enabling it by 1/3 to 1/2, with similar benefit to read performance. * Improve the speed of the MemTable Bloom filter, reducing the write overhead of enabling it by 1/3 to 1/2, with similar benefit to read performance.

@ -2272,11 +2272,6 @@ void rocksdb_options_set_max_bytes_for_level_base(
opt->rep.max_bytes_for_level_base = n; opt->rep.max_bytes_for_level_base = n;
} }
void rocksdb_options_set_snap_refresh_nanos(rocksdb_options_t* opt,
uint64_t n) {
opt->rep.snap_refresh_nanos = n;
}
void rocksdb_options_set_level_compaction_dynamic_level_bytes( void rocksdb_options_set_level_compaction_dynamic_level_bytes(
rocksdb_options_t* opt, unsigned char v) { rocksdb_options_t* opt, unsigned char v) {
opt->rep.level_compaction_dynamic_level_bytes = v; opt->rep.level_compaction_dynamic_level_bytes = v;

@ -38,7 +38,6 @@ CompactionIterator::CompactionIterator(
const CompactionFilter* compaction_filter, const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down, const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, const SequenceNumber preserve_deletes_seqnum,
SnapshotListFetchCallback* snap_list_callback,
const std::atomic<bool>* manual_compaction_paused) const std::atomic<bool>* manual_compaction_paused)
: CompactionIterator( : CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots, input, cmp, merge_helper, last_sequence, snapshots,
@ -47,7 +46,6 @@ CompactionIterator::CompactionIterator(
std::unique_ptr<CompactionProxy>( std::unique_ptr<CompactionProxy>(
compaction ? new CompactionProxy(compaction) : nullptr), compaction ? new CompactionProxy(compaction) : nullptr),
compaction_filter, shutting_down, preserve_deletes_seqnum, compaction_filter, shutting_down, preserve_deletes_seqnum,
snap_list_callback,
manual_compaction_paused) {} manual_compaction_paused) {}
CompactionIterator::CompactionIterator( CompactionIterator::CompactionIterator(
@ -61,7 +59,6 @@ CompactionIterator::CompactionIterator(
const CompactionFilter* compaction_filter, const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down, const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, const SequenceNumber preserve_deletes_seqnum,
SnapshotListFetchCallback* snap_list_callback,
const std::atomic<bool>* manual_compaction_paused) const std::atomic<bool>* manual_compaction_paused)
: input_(input), : input_(input),
cmp_(cmp), cmp_(cmp),
@ -81,8 +78,7 @@ CompactionIterator::CompactionIterator(
current_user_key_sequence_(0), current_user_key_sequence_(0),
current_user_key_snapshot_(0), current_user_key_snapshot_(0),
merge_out_iter_(merge_helper_), merge_out_iter_(merge_helper_),
current_key_committed_(false), current_key_committed_(false) {
snap_list_callback_(snap_list_callback) {
assert(compaction_filter_ == nullptr || compaction_ != nullptr); assert(compaction_filter_ == nullptr || compaction_ != nullptr);
assert(snapshots_ != nullptr); assert(snapshots_ != nullptr);
bottommost_level_ = bottommost_level_ =
@ -90,7 +86,24 @@ CompactionIterator::CompactionIterator(
if (compaction_ != nullptr) { if (compaction_ != nullptr) {
level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0); level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
} }
ProcessSnapshotList(); if (snapshots_->size() == 0) {
// optimize for fast path if there are no snapshots
visible_at_tip_ = true;
earliest_snapshot_iter_ = snapshots_->end();
earliest_snapshot_ = kMaxSequenceNumber;
latest_snapshot_ = 0;
} else {
visible_at_tip_ = false;
earliest_snapshot_iter_ = snapshots_->begin();
earliest_snapshot_ = snapshots_->at(0);
latest_snapshot_ = snapshots_->back();
}
#ifndef NDEBUG
// findEarliestVisibleSnapshot assumes this ordering.
for (size_t i = 1; i < snapshots_->size(); ++i) {
assert(snapshots_->at(i - 1) < snapshots_->at(i));
}
#endif
input_->SetPinnedItersMgr(&pinned_iters_mgr_); input_->SetPinnedItersMgr(&pinned_iters_mgr_);
TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get()); TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
} }
@ -212,28 +225,6 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
} }
} }
void CompactionIterator::ProcessSnapshotList() {
#ifndef NDEBUG
// findEarliestVisibleSnapshot assumes this ordering.
for (size_t i = 1; i < snapshots_->size(); ++i) {
assert(snapshots_->at(i - 1) < snapshots_->at(i));
}
#endif
if (snapshots_->size() == 0) {
// optimize for fast path if there are no snapshots
visible_at_tip_ = true;
earliest_snapshot_iter_ = snapshots_->end();
earliest_snapshot_ = kMaxSequenceNumber;
latest_snapshot_ = 0;
} else {
visible_at_tip_ = false;
earliest_snapshot_iter_ = snapshots_->begin();
earliest_snapshot_ = snapshots_->at(0);
latest_snapshot_ = snapshots_->back();
}
released_snapshots_.clear();
}
void CompactionIterator::NextFromInput() { void CompactionIterator::NextFromInput() {
at_next_ = false; at_next_ = false;
valid_ = false; valid_ = false;
@ -282,13 +273,6 @@ void CompactionIterator::NextFromInput() {
// compaction filter). ikey_.user_key is pointing to the copy. // compaction filter). ikey_.user_key is pointing to the copy.
if (!has_current_user_key_ || if (!has_current_user_key_ ||
!cmp_->Equal(ikey_.user_key, current_user_key_)) { !cmp_->Equal(ikey_.user_key, current_user_key_)) {
num_keys_++;
// Use num_keys_ to reduce the overhead of reading current time
if (snap_list_callback_ && snapshots_->size() &&
snap_list_callback_->TimeToRefresh(num_keys_)) {
snap_list_callback_->Refresh(snapshots_, latest_snapshot_);
ProcessSnapshotList();
}
// First occurrence of this user key // First occurrence of this user key
// Copy key for output // Copy key for output
key_ = current_key_.SetInternalKey(key_, &ikey_); key_ = current_key_.SetInternalKey(key_, &ikey_);

@ -21,54 +21,6 @@
namespace rocksdb { namespace rocksdb {
// This callback can be used to refresh the snapshot list from the db. It
// includes logics to exponentially decrease the refresh rate to limit the
// overhead of refresh.
class SnapshotListFetchCallback {
public:
SnapshotListFetchCallback(Env* env, uint64_t snap_refresh_nanos,
size_t every_nth_key = 1024)
: timer_(env, /*auto restart*/ true),
snap_refresh_nanos_(snap_refresh_nanos),
every_nth_key_minus_one_(every_nth_key - 1) {
assert(every_nth_key > 0);
assert((ceil(log2(every_nth_key)) == floor(log2(every_nth_key))));
}
// Refresh the snapshot list. snapshots will bre replacted with the new list.
// max is the upper bound. Note: this function will acquire the db_mutex_.
virtual void Refresh(std::vector<SequenceNumber>* snapshots,
SequenceNumber max) = 0;
inline bool TimeToRefresh(const size_t key_index) {
assert(snap_refresh_nanos_ != 0);
// skip the key if key_index % every_nth_key (which is of power 2) is not 0.
if ((key_index & every_nth_key_minus_one_) != 0) {
return false;
}
const uint64_t elapsed = timer_.ElapsedNanos();
auto ret = elapsed > snap_refresh_nanos_;
// pre-compute the next time threshold
if (ret) {
// inc next refresh period exponentially (by x4)
auto next_refresh_threshold = snap_refresh_nanos_ << 2;
// make sure the shift has not overflown the highest 1 bit
snap_refresh_nanos_ =
std::max(snap_refresh_nanos_, next_refresh_threshold);
}
return ret;
}
static constexpr SnapshotListFetchCallback* kDisabled = nullptr;
virtual ~SnapshotListFetchCallback() {}
private:
// Time since the callback was created
StopWatchNano timer_;
// The delay before calling ::Refresh. To be increased exponentially.
uint64_t snap_refresh_nanos_;
// Skip evey nth key. Number n if of power 2. The math will require n-1.
const uint64_t every_nth_key_minus_one_;
};
class CompactionIterator { class CompactionIterator {
public: public:
// A wrapper around Compaction. Has a much smaller interface, only what // A wrapper around Compaction. Has a much smaller interface, only what
@ -118,7 +70,6 @@ class CompactionIterator {
const CompactionFilter* compaction_filter = nullptr, const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr, const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0, const SequenceNumber preserve_deletes_seqnum = 0,
SnapshotListFetchCallback* snap_list_callback = nullptr,
const std::atomic<bool>* manual_compaction_paused = nullptr); const std::atomic<bool>* manual_compaction_paused = nullptr);
// Constructor with custom CompactionProxy, used for tests. // Constructor with custom CompactionProxy, used for tests.
@ -133,7 +84,6 @@ class CompactionIterator {
const CompactionFilter* compaction_filter = nullptr, const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr, const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0, const SequenceNumber preserve_deletes_seqnum = 0,
SnapshotListFetchCallback* snap_list_callback = nullptr,
const std::atomic<bool>* manual_compaction_paused = nullptr); const std::atomic<bool>* manual_compaction_paused = nullptr);
~CompactionIterator(); ~CompactionIterator();
@ -162,8 +112,6 @@ class CompactionIterator {
private: private:
// Processes the input stream to find the next output // Processes the input stream to find the next output
void NextFromInput(); void NextFromInput();
// Process snapshots_ and assign related variables
void ProcessSnapshotList();
// Do last preparations before presenting the output to the callee. At this // Do last preparations before presenting the output to the callee. At this
// point this only zeroes out the sequence number if possible for better // point this only zeroes out the sequence number if possible for better
@ -198,7 +146,7 @@ class CompactionIterator {
InternalIterator* input_; InternalIterator* input_;
const Comparator* cmp_; const Comparator* cmp_;
MergeHelper* merge_helper_; MergeHelper* merge_helper_;
std::vector<SequenceNumber>* snapshots_; const std::vector<SequenceNumber>* snapshots_;
// List of snapshots released during compaction. // List of snapshots released during compaction.
// findEarliestVisibleSnapshot() find them out from return of // findEarliestVisibleSnapshot() find them out from return of
// snapshot_checker, and make sure they will not be returned as // snapshot_checker, and make sure they will not be returned as
@ -274,9 +222,6 @@ class CompactionIterator {
// Used to avoid purging uncommitted values. The application can specify // Used to avoid purging uncommitted values. The application can specify
// uncommitted values by providing a SnapshotChecker object. // uncommitted values by providing a SnapshotChecker object.
bool current_key_committed_; bool current_key_committed_;
SnapshotListFetchCallback* snap_list_callback_;
// number of distinct keys processed
size_t num_keys_ = 0;
bool IsShuttingDown() { bool IsShuttingDown() {
// This is a best-effort facility, so memory_order_relaxed is sufficient. // This is a best-effort facility, so memory_order_relaxed is sufficient.

@ -311,8 +311,7 @@ CompactionJob::CompactionJob(
const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache, const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, SnapshotListFetchCallback* snap_list_callback, Env::Priority thread_pri, const std::atomic<bool>* manual_compaction_paused)
const std::atomic<bool>* manual_compaction_paused)
: job_id_(job_id), : job_id_(job_id),
compact_(new CompactionState(compaction)), compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats), compaction_job_stats_(compaction_job_stats),
@ -334,7 +333,6 @@ CompactionJob::CompactionJob(
db_mutex_(db_mutex), db_mutex_(db_mutex),
db_error_handler_(db_error_handler), db_error_handler_(db_error_handler),
existing_snapshots_(std::move(existing_snapshots)), existing_snapshots_(std::move(existing_snapshots)),
snap_list_callback_(snap_list_callback),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker), snapshot_checker_(snapshot_checker),
table_cache_(std::move(table_cache)), table_cache_(std::move(table_cache)),
@ -892,10 +890,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
&existing_snapshots_, earliest_write_conflict_snapshot_, &existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false, snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
&range_del_agg, sub_compact->compaction, compaction_filter, &range_del_agg, sub_compact->compaction, compaction_filter,
shutting_down_, preserve_deletes_seqnum_, shutting_down_, preserve_deletes_seqnum_, manual_compaction_paused_));
// Currently range_del_agg is incompatible with snapshot refresh feature.
range_del_agg.IsEmpty() ? snap_list_callback_ : nullptr,
manual_compaction_paused_));
auto c_iter = sub_compact->c_iter.get(); auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst(); c_iter->SeekToFirst();
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {

@ -75,7 +75,7 @@ class CompactionJob {
std::shared_ptr<Cache> table_cache, EventLogger* event_logger, std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
bool paranoid_file_checks, bool measure_io_stats, bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, SnapshotListFetchCallback* snap_list_callback, Env::Priority thread_pri,
const std::atomic<bool>* manual_compaction_paused = nullptr); const std::atomic<bool>* manual_compaction_paused = nullptr);
~CompactionJob(); ~CompactionJob();
@ -168,7 +168,6 @@ class CompactionJob {
// entirely within s1 and s2, then the earlier version of k1 can be safely // entirely within s1 and s2, then the earlier version of k1 can be safely
// deleted because that version is not visible in any snapshot. // deleted because that version is not visible in any snapshot.
std::vector<SequenceNumber> existing_snapshots_; std::vector<SequenceNumber> existing_snapshots_;
SnapshotListFetchCallback* snap_list_callback_;
// This is the earliest snapshot that could be used for write-conflict // This is the earliest snapshot that could be used for write-conflict
// checking by a transaction. For any user-key newer than this snapshot, we // checking by a transaction. For any user-key newer than this snapshot, we

@ -250,9 +250,7 @@ class CompactionJobTest : public testing::Test {
const stl_wrappers::KVMap& expected_results, const stl_wrappers::KVMap& expected_results,
const std::vector<SequenceNumber>& snapshots = {}, const std::vector<SequenceNumber>& snapshots = {},
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber, SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
int output_level = 1, bool verify = true, int output_level = 1, bool verify = true) {
SnapshotListFetchCallback* snapshot_fetcher =
SnapshotListFetchCallback::kDisabled) {
auto cfd = versions_->GetColumnFamilySet()->GetDefault(); auto cfd = versions_->GetColumnFamilySet()->GetDefault();
size_t num_input_files = 0; size_t num_input_files = 0;
@ -285,7 +283,7 @@ class CompactionJobTest : public testing::Test {
nullptr, nullptr, &mutex_, &error_handler_, snapshots, nullptr, nullptr, &mutex_, &error_handler_, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, table_cache_, earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger, false, false, dbname_, &compaction_job_stats_, &event_logger, false, false, dbname_, &compaction_job_stats_,
Env::Priority::USER, snapshot_fetcher); Env::Priority::USER);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_); VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
compaction_job.Prepare(); compaction_job.Prepare();
@ -962,105 +960,6 @@ TEST_F(CompactionJobTest, CorruptionAfterDeletion) {
RunCompaction({files}, expected_results); RunCompaction({files}, expected_results);
} }
// Test the snapshot fetcher in compaction
TEST_F(CompactionJobTest, SnapshotRefresh) {
uint64_t time_seed = env_->NowMicros();
printf("time_seed is %" PRIu64 "\n", time_seed); // would help to reproduce
Random64 rand(time_seed);
std::vector<SequenceNumber> db_snapshots;
class SnapshotListFetchCallbackTest : public SnapshotListFetchCallback {
public:
SnapshotListFetchCallbackTest(Env* env, Random64& rand,
std::vector<SequenceNumber>* snapshots)
: SnapshotListFetchCallback(env, 1 /*short time delay*/,
1 /*fetch after each key*/),
rand_(rand),
snapshots_(snapshots) {}
virtual void Refresh(std::vector<SequenceNumber>* snapshots,
SequenceNumber) override {
assert(snapshots->size());
assert(snapshots_->size());
assert(snapshots_->size() == snapshots->size());
if (rand_.OneIn(2)) {
uint64_t release_index = rand_.Uniform(snapshots_->size());
snapshots_->erase(snapshots_->begin() + release_index);
*snapshots = *snapshots_;
}
}
private:
Random64 rand_;
std::vector<SequenceNumber>* snapshots_;
} snapshot_fetcher(env_, rand, &db_snapshots);
std::vector<std::pair<const std::string, std::string>> file1_kvs, file2_kvs;
std::array<ValueType, 4> types = {kTypeValue, kTypeDeletion,
kTypeSingleDeletion};
SequenceNumber last_seq = 0;
for (int i = 1; i < 100; i++) {
SequenceNumber seq = last_seq + 1;
last_seq = seq;
if (rand.OneIn(2)) {
auto type = types[rand.Uniform(types.size())];
file1_kvs.push_back(
{test::KeyStr("k" + ToString(i), seq, type), "v" + ToString(i)});
}
}
auto file1 = mock::MakeMockFile(file1_kvs);
for (int i = 1; i < 100; i++) {
SequenceNumber seq = last_seq + 1;
last_seq++;
if (rand.OneIn(2)) {
auto type = types[rand.Uniform(types.size())];
file2_kvs.push_back(
{test::KeyStr("k" + ToString(i), seq, type), "v" + ToString(i)});
}
}
auto file2 = mock::MakeMockFile(file2_kvs);
for (SequenceNumber i = 1; i < last_seq + 1; i++) {
if (rand.OneIn(5)) {
db_snapshots.push_back(i);
}
}
const bool kVerify = true;
const int output_level_0 = 0;
NewDB();
AddMockFile(file1);
AddMockFile(file2);
SetLastSequence(last_seq);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
// put the output on L0 since it is easier to feed them again to the 2nd
// compaction
RunCompaction({files}, file1, db_snapshots, kMaxSequenceNumber,
output_level_0, !kVerify, &snapshot_fetcher);
// Now db_snapshots are changed. Run the compaction again without snapshot
// fetcher but with the updated snapshot list.
compaction_job_stats_.Reset();
files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, file1, db_snapshots, kMaxSequenceNumber,
output_level_0 + 1, !kVerify);
// The result should be what we get if we run compaction without snapshot
// fetcher on the updated list of snapshots
auto expected = mock_table_factory_->output();
NewDB();
AddMockFile(file1);
AddMockFile(file2);
SetLastSequence(last_seq);
files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected, db_snapshots, kMaxSequenceNumber,
output_level_0, !kVerify);
// The 2nd compaction above would get rid of useless delete markers. To get
// the output here exactly as what we got above after two compactions, we also
// run the compaction for 2nd time.
compaction_job_stats_.Reset();
files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected, db_snapshots, kMaxSequenceNumber,
output_level_0 + 1, !kVerify);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -800,31 +800,6 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
return s; return s;
} }
namespace {
class SnapshotListFetchCallbackImpl : public SnapshotListFetchCallback {
public:
SnapshotListFetchCallbackImpl(DBImpl* db_impl, Env* env,
uint64_t snap_refresh_nanos, Logger* info_log)
: SnapshotListFetchCallback(env, snap_refresh_nanos),
db_impl_(db_impl),
info_log_(info_log) {}
virtual void Refresh(std::vector<SequenceNumber>* snapshots,
SequenceNumber max) override {
size_t prev = snapshots->size();
snapshots->clear();
db_impl_->LoadSnapshots(snapshots, nullptr, max);
size_t now = snapshots->size();
ROCKS_LOG_DEBUG(info_log_,
"Compaction snapshot count refreshed from %zu to %zu", prev,
now);
}
private:
DBImpl* db_impl_;
Logger* info_log_;
};
} // namespace
Status DBImpl::CompactFiles(const CompactionOptions& compact_options, Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names, const std::vector<std::string>& input_file_names,
@ -999,9 +974,6 @@ Status DBImpl::CompactFilesImpl(
assert(is_snapshot_supported_ || snapshots_.empty()); assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJobStats compaction_job_stats; CompactionJobStats compaction_job_stats;
SnapshotListFetchCallbackImpl fetch_callback(
this, env_, c->mutable_cf_options()->snap_refresh_nanos,
immutable_db_options_.info_log.get());
CompactionJob compaction_job( CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_, job_context->job_id, c.get(), immutable_db_options_,
env_options_for_compaction_, versions_.get(), &shutting_down_, env_options_for_compaction_, versions_.get(), &shutting_down_,
@ -1012,10 +984,6 @@ Status DBImpl::CompactFilesImpl(
c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_, c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, &compaction_job_stats, Env::Priority::USER,
immutable_db_options_.max_subcompactions <= 1 &&
c->mutable_cf_options()->snap_refresh_nanos > 0
? &fetch_callback
: nullptr,
&manual_compaction_paused_); &manual_compaction_paused_);
// Creating a compaction influences the compaction score because the score // Creating a compaction influences the compaction score because the score
@ -2765,9 +2733,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
GetSnapshotContext(job_context, &snapshot_seqs, GetSnapshotContext(job_context, &snapshot_seqs,
&earliest_write_conflict_snapshot, &snapshot_checker); &earliest_write_conflict_snapshot, &snapshot_checker);
assert(is_snapshot_supported_ || snapshots_.empty()); assert(is_snapshot_supported_ || snapshots_.empty());
SnapshotListFetchCallbackImpl fetch_callback(
this, env_, c->mutable_cf_options()->snap_refresh_nanos,
immutable_db_options_.info_log.get());
CompactionJob compaction_job( CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_, job_context->job_id, c.get(), immutable_db_options_,
env_options_for_compaction_, versions_.get(), &shutting_down_, env_options_for_compaction_, versions_.get(), &shutting_down_,
@ -2778,10 +2743,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
&event_logger_, c->mutable_cf_options()->paranoid_file_checks, &event_logger_, c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_, c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, thread_pri, &compaction_job_stats, thread_pri,
immutable_db_options_.max_subcompactions <= 1 && is_manual ? &manual_compaction_paused_ : nullptr);
c->mutable_cf_options()->snap_refresh_nanos > 0
? &fetch_callback
: nullptr, is_manual ? &manual_compaction_paused_ : nullptr);
compaction_job.Prepare(); compaction_job.Prepare();
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,

@ -827,8 +827,6 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_target_file_size_multiplier(
rocksdb_options_t*, int); rocksdb_options_t*, int);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_bytes_for_level_base( extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_bytes_for_level_base(
rocksdb_options_t*, uint64_t); rocksdb_options_t*, uint64_t);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_snap_refresh_nanos(
rocksdb_options_t*, uint64_t);
extern ROCKSDB_LIBRARY_API void extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_level_compaction_dynamic_level_bytes(rocksdb_options_t*, rocksdb_options_set_level_compaction_dynamic_level_bytes(rocksdb_options_t*,
unsigned char); unsigned char);

@ -269,17 +269,7 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// Dynamically changeable through SetOptions() API // Dynamically changeable through SetOptions() API
uint64_t max_bytes_for_level_base = 256 * 1048576; uint64_t max_bytes_for_level_base = 256 * 1048576;
// If non-zero, compactions will periodically refresh the snapshot list. The // Deprecated.
// delay for the first refresh is snap_refresh_nanos nano seconds and
// exponentially increases afterwards. When having many short-lived snapshots,
// this option helps reducing the cpu usage of long-running compactions. The
// feature is disabled when max_subcompactions is greater than one.
//
// NOTE: This feautre is currently incompatible with RangeDeletes.
//
// Default: 0
//
// Dynamically changeable through SetOptions() API
uint64_t snap_refresh_nanos = 0; uint64_t snap_refresh_nanos = 0;
// Disable automatic compactions. Manual compactions can still // Disable automatic compactions. Manual compactions can still

@ -69,7 +69,6 @@ void OptimizeForPersistentStats(ColumnFamilyOptions* cfo) {
cfo->write_buffer_size = 2 << 20; cfo->write_buffer_size = 2 << 20;
cfo->target_file_size_base = 2 * 1048576; cfo->target_file_size_base = 2 * 1048576;
cfo->max_bytes_for_level_base = 10 * 1048576; cfo->max_bytes_for_level_base = 10 * 1048576;
cfo->snap_refresh_nanos = 0;
cfo->soft_pending_compaction_bytes_limit = 256 * 1048576; cfo->soft_pending_compaction_bytes_limit = 256 * 1048576;
cfo->hard_pending_compaction_bytes_limit = 1073741824ul; cfo->hard_pending_compaction_bytes_limit = 1073741824ul;
cfo->compression = kNoCompression; cfo->compression = kNoCompression;

@ -167,8 +167,6 @@ void MutableCFOptions::Dump(Logger* log) const {
target_file_size_multiplier); target_file_size_multiplier);
ROCKS_LOG_INFO(log, " max_bytes_for_level_base: %" PRIu64, ROCKS_LOG_INFO(log, " max_bytes_for_level_base: %" PRIu64,
max_bytes_for_level_base); max_bytes_for_level_base);
ROCKS_LOG_INFO(log, " snap_refresh_nanos: %" PRIu64,
snap_refresh_nanos);
ROCKS_LOG_INFO(log, " max_bytes_for_level_multiplier: %f", ROCKS_LOG_INFO(log, " max_bytes_for_level_multiplier: %f",
max_bytes_for_level_multiplier); max_bytes_for_level_multiplier);
ROCKS_LOG_INFO(log, " ttl: %" PRIu64, ROCKS_LOG_INFO(log, " ttl: %" PRIu64,

@ -151,7 +151,6 @@ struct MutableCFOptions {
target_file_size_base(options.target_file_size_base), target_file_size_base(options.target_file_size_base),
target_file_size_multiplier(options.target_file_size_multiplier), target_file_size_multiplier(options.target_file_size_multiplier),
max_bytes_for_level_base(options.max_bytes_for_level_base), max_bytes_for_level_base(options.max_bytes_for_level_base),
snap_refresh_nanos(options.snap_refresh_nanos),
max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier), max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier),
ttl(options.ttl), ttl(options.ttl),
periodic_compaction_seconds(options.periodic_compaction_seconds), periodic_compaction_seconds(options.periodic_compaction_seconds),
@ -188,7 +187,6 @@ struct MutableCFOptions {
target_file_size_base(0), target_file_size_base(0),
target_file_size_multiplier(0), target_file_size_multiplier(0),
max_bytes_for_level_base(0), max_bytes_for_level_base(0),
snap_refresh_nanos(0),
max_bytes_for_level_multiplier(0), max_bytes_for_level_multiplier(0),
ttl(0), ttl(0),
periodic_compaction_seconds(0), periodic_compaction_seconds(0),
@ -240,7 +238,6 @@ struct MutableCFOptions {
uint64_t target_file_size_base; uint64_t target_file_size_base;
int target_file_size_multiplier; int target_file_size_multiplier;
uint64_t max_bytes_for_level_base; uint64_t max_bytes_for_level_base;
uint64_t snap_refresh_nanos;
double max_bytes_for_level_multiplier; double max_bytes_for_level_multiplier;
uint64_t ttl; uint64_t ttl;
uint64_t periodic_compaction_seconds; uint64_t periodic_compaction_seconds;

@ -216,9 +216,6 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER( ROCKS_LOG_HEADER(
log, " Options.max_bytes_for_level_base: %" PRIu64, log, " Options.max_bytes_for_level_base: %" PRIu64,
max_bytes_for_level_base); max_bytes_for_level_base);
ROCKS_LOG_HEADER(
log, " Options.snap_refresh_nanos: %" PRIu64,
snap_refresh_nanos);
ROCKS_LOG_HEADER(log, "Options.level_compaction_dynamic_level_bytes: %d", ROCKS_LOG_HEADER(log, "Options.level_compaction_dynamic_level_bytes: %d",
level_compaction_dynamic_level_bytes); level_compaction_dynamic_level_bytes);
ROCKS_LOG_HEADER(log, " Options.max_bytes_for_level_multiplier: %f", ROCKS_LOG_HEADER(log, " Options.max_bytes_for_level_multiplier: %f",
@ -494,7 +491,6 @@ ColumnFamilyOptions* ColumnFamilyOptions::OptimizeForSmallDb(
write_buffer_size = 2 << 20; write_buffer_size = 2 << 20;
target_file_size_base = 2 * 1048576; target_file_size_base = 2 * 1048576;
max_bytes_for_level_base = 10 * 1048576; max_bytes_for_level_base = 10 * 1048576;
snap_refresh_nanos = 0;
soft_pending_compaction_bytes_limit = 256 * 1048576; soft_pending_compaction_bytes_limit = 256 * 1048576;
hard_pending_compaction_bytes_limit = 1073741824ul; hard_pending_compaction_bytes_limit = 1073741824ul;

@ -182,7 +182,6 @@ ColumnFamilyOptions BuildColumnFamilyOptions(
mutable_cf_options.target_file_size_multiplier; mutable_cf_options.target_file_size_multiplier;
cf_opts.max_bytes_for_level_base = cf_opts.max_bytes_for_level_base =
mutable_cf_options.max_bytes_for_level_base; mutable_cf_options.max_bytes_for_level_base;
cf_opts.snap_refresh_nanos = mutable_cf_options.snap_refresh_nanos;
cf_opts.max_bytes_for_level_multiplier = cf_opts.max_bytes_for_level_multiplier =
mutable_cf_options.max_bytes_for_level_multiplier; mutable_cf_options.max_bytes_for_level_multiplier;
cf_opts.ttl = mutable_cf_options.ttl; cf_opts.ttl = mutable_cf_options.ttl;
@ -1943,9 +1942,8 @@ std::unordered_map<std::string, OptionTypeInfo>
OptionType::kUInt64T, OptionVerificationType::kNormal, true, OptionType::kUInt64T, OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, max_bytes_for_level_base)}}, offsetof(struct MutableCFOptions, max_bytes_for_level_base)}},
{"snap_refresh_nanos", {"snap_refresh_nanos",
{offset_of(&ColumnFamilyOptions::snap_refresh_nanos), {0, OptionType::kUInt64T, OptionVerificationType::kDeprecated, true,
OptionType::kUInt64T, OptionVerificationType::kNormal, true, 0}},
offsetof(struct MutableCFOptions, snap_refresh_nanos)}},
{"max_bytes_for_level_multiplier", {"max_bytes_for_level_multiplier",
{offset_of(&ColumnFamilyOptions::max_bytes_for_level_multiplier), {offset_of(&ColumnFamilyOptions::max_bytes_for_level_multiplier),
OptionType::kDouble, OptionVerificationType::kNormal, true, OptionType::kDouble, OptionVerificationType::kNormal, true,

@ -353,6 +353,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
sizeof(std::shared_ptr<CompactionFilterFactory>)}, sizeof(std::shared_ptr<CompactionFilterFactory>)},
{offset_of(&ColumnFamilyOptions::prefix_extractor), {offset_of(&ColumnFamilyOptions::prefix_extractor),
sizeof(std::shared_ptr<const SliceTransform>)}, sizeof(std::shared_ptr<const SliceTransform>)},
{offset_of(&ColumnFamilyOptions::snap_refresh_nanos), sizeof(uint64_t)},
{offset_of(&ColumnFamilyOptions::table_factory), {offset_of(&ColumnFamilyOptions::table_factory),
sizeof(std::shared_ptr<TableFactory>)}, sizeof(std::shared_ptr<TableFactory>)},
{offset_of(&ColumnFamilyOptions::cf_paths), {offset_of(&ColumnFamilyOptions::cf_paths),
@ -416,7 +417,6 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"kBZip2Compression:kNoCompression:kZlibCompression:kBZip2Compression:" "kBZip2Compression:kNoCompression:kZlibCompression:kBZip2Compression:"
"kSnappyCompression;" "kSnappyCompression;"
"max_bytes_for_level_base=986;" "max_bytes_for_level_base=986;"
"snap_refresh_nanos=1000000000;"
"bloom_locality=8016;" "bloom_locality=8016;"
"target_file_size_base=4294976376;" "target_file_size_base=4294976376;"
"memtable_huge_page_size=2557;" "memtable_huge_page_size=2557;"

@ -71,7 +71,6 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"target_file_size_base", "12"}, {"target_file_size_base", "12"},
{"target_file_size_multiplier", "13"}, {"target_file_size_multiplier", "13"},
{"max_bytes_for_level_base", "14"}, {"max_bytes_for_level_base", "14"},
{"snap_refresh_nanos", "1000000000"},
{"level_compaction_dynamic_level_bytes", "true"}, {"level_compaction_dynamic_level_bytes", "true"},
{"max_bytes_for_level_multiplier", "15.0"}, {"max_bytes_for_level_multiplier", "15.0"},
{"max_bytes_for_level_multiplier_additional", "16:17:18"}, {"max_bytes_for_level_multiplier_additional", "16:17:18"},
@ -183,7 +182,6 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.target_file_size_base, static_cast<uint64_t>(12)); ASSERT_EQ(new_cf_opt.target_file_size_base, static_cast<uint64_t>(12));
ASSERT_EQ(new_cf_opt.target_file_size_multiplier, 13); ASSERT_EQ(new_cf_opt.target_file_size_multiplier, 13);
ASSERT_EQ(new_cf_opt.max_bytes_for_level_base, 14U); ASSERT_EQ(new_cf_opt.max_bytes_for_level_base, 14U);
ASSERT_EQ(new_cf_opt.snap_refresh_nanos, 1000000000U);
ASSERT_EQ(new_cf_opt.level_compaction_dynamic_level_bytes, true); ASSERT_EQ(new_cf_opt.level_compaction_dynamic_level_bytes, true);
ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier, 15.0); ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier, 15.0);
ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier_additional.size(), 3U); ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier_additional.size(), 3U);

@ -21,12 +21,6 @@ const InternalKeyComparator icmp_(BytewiseComparator());
} // namespace } // namespace
stl_wrappers::KVMap MakeMockFile(
std::vector<std::pair<const std::string, std::string>> l) {
return stl_wrappers::KVMap(l.begin(), l.end(),
stl_wrappers::LessOfComparator(&icmp_));
}
stl_wrappers::KVMap MakeMockFile( stl_wrappers::KVMap MakeMockFile(
std::initializer_list<std::pair<const std::string, std::string>> l) { std::initializer_list<std::pair<const std::string, std::string>> l) {
return stl_wrappers::KVMap(l, stl_wrappers::LessOfComparator(&icmp_)); return stl_wrappers::KVMap(l, stl_wrappers::LessOfComparator(&icmp_));
@ -143,14 +137,6 @@ void MockTableFactory::AssertLatestFile(
ParseInternalKey(Slice(key), &ikey); ParseInternalKey(Slice(key), &ikey);
std::cout << ikey.DebugString(false) << " -> " << value << std::endl; std::cout << ikey.DebugString(false) << " -> " << value << std::endl;
} }
std::cout << "Expected:" << std::endl;
for (const auto& kv : file_contents) {
ParsedInternalKey ikey;
std::string key, value;
std::tie(key, value) = kv;
ParseInternalKey(Slice(key), &ikey);
std::cout << ikey.DebugString(false) << " -> " << value << std::endl;
}
FAIL(); FAIL();
} }
} }

@ -28,8 +28,6 @@ namespace mock {
stl_wrappers::KVMap MakeMockFile( stl_wrappers::KVMap MakeMockFile(
std::initializer_list<std::pair<const std::string, std::string>> l = {}); std::initializer_list<std::pair<const std::string, std::string>> l = {});
stl_wrappers::KVMap MakeMockFile(
std::vector<std::pair<const std::string, std::string>> l);
struct MockTableFileSystem { struct MockTableFileSystem {
port::Mutex mutex; port::Mutex mutex;
@ -194,12 +192,6 @@ class MockTableFactory : public TableFactory {
// contents are equal to file_contents // contents are equal to file_contents
void AssertSingleFile(const stl_wrappers::KVMap& file_contents); void AssertSingleFile(const stl_wrappers::KVMap& file_contents);
void AssertLatestFile(const stl_wrappers::KVMap& file_contents); void AssertLatestFile(const stl_wrappers::KVMap& file_contents);
stl_wrappers::KVMap output() {
assert(!file_system_.files.empty());
auto latest = file_system_.files.end();
--latest;
return latest->second;
}
private: private:
uint32_t GetAndWriteNextID(WritableFileWriter* file) const; uint32_t GetAndWriteNextID(WritableFileWriter* file) const;

@ -148,7 +148,6 @@ cf_consistency_params = {
"write_buffer_size": 1024 * 1024, "write_buffer_size": 1024 * 1024,
# disable pipelined write when test_atomic_flush is true # disable pipelined write when test_atomic_flush is true
"enable_pipelined_write": 0, "enable_pipelined_write": 0,
"snap_refresh_nanos": 0,
} }

@ -541,10 +541,6 @@ DEFINE_uint64(num_iterations, 10, "Number of iterations per MultiIterate run");
static const bool FLAGS_num_iterations_dummy __attribute__((__unused__)) = static const bool FLAGS_num_iterations_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_num_iterations, &ValidateUint32Range); RegisterFlagValidator(&FLAGS_num_iterations, &ValidateUint32Range);
DEFINE_uint64(
snap_refresh_nanos, 100 * 1000 * 1000,
"If non-zero, compactions will periodically refresh snapshot list.");
namespace { namespace {
enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
assert(ctype); assert(ctype);
@ -2757,8 +2753,6 @@ class StressTest {
fprintf(stdout, " %s\n", p.c_str()); fprintf(stdout, " %s\n", p.c_str());
} }
} }
fprintf(stdout, "Snapshot refresh nanos : %" PRIu64 "\n",
FLAGS_snap_refresh_nanos);
fprintf(stdout, "Periodic Compaction Secs : %" PRIu64 "\n", fprintf(stdout, "Periodic Compaction Secs : %" PRIu64 "\n",
FLAGS_periodic_compaction_seconds); FLAGS_periodic_compaction_seconds);
fprintf(stdout, "Compaction TTL : %" PRIu64 "\n", fprintf(stdout, "Compaction TTL : %" PRIu64 "\n",
@ -2919,7 +2913,6 @@ class StressTest {
} else { } else {
options_.merge_operator = MergeOperators::CreatePutOperator(); options_.merge_operator = MergeOperators::CreatePutOperator();
} }
options_.snap_refresh_nanos = FLAGS_snap_refresh_nanos;
fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());

Loading…
Cancel
Save