delete superversions in BackgroundCallPurge (#6146)

Summary:
I found that CleanupSuperVersion() may block Get() for 30ms+ (per MemTable is 256MB).

Then I found "delete sv" in ~SuperVersion() takes the time.

The backtrace looks like this

DBImpl::GetImpl() -> DBImpl::ReturnAndCleanupSuperVersion() ->
DBImpl::CleanupSuperVersion() : delete sv; -> ~SuperVersion()

I think it's better to delete in a background thread,  please review it。
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6146

Differential Revision: D18972066

fbshipit-source-id: 0f7b0b70b9bb1e27ad6fc1c8a408fbbf237ae08c
main
解轶伦 5 years ago committed by Facebook Github Bot
parent 02aa22957a
commit 39fcaf8246
  1. 2
      db/arena_wrapped_db_iter.cc
  2. 19
      db/column_family.cc
  3. 4
      db/column_family.h
  4. 48
      db/db_impl/db_impl.cc
  5. 5
      db/db_impl/db_impl.h
  6. 2
      db/db_impl/db_impl_compaction_flush.cc
  7. 2
      db/db_impl/db_impl_secondary.cc
  8. 36
      db/db_test2.cc
  9. 8
      db/forward_iterator.cc
  10. 8
      include/rocksdb/options.h

@ -64,7 +64,7 @@ Status ArenaWrappedDBIter::Refresh() {
arena_.~Arena(); arena_.~Arena();
new (&arena_) Arena(); new (&arena_) Arena();
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
if (read_callback_) { if (read_callback_) {
read_callback_->Refresh(latest_seq); read_callback_->Refresh(latest_seq);
} }

@ -1098,9 +1098,8 @@ Compaction* ColumnFamilyData::CompactRange(
return result; return result;
} }
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion( SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
InstrumentedMutex* db_mutex) { SuperVersion* sv = GetThreadLocalSuperVersion(db);
SuperVersion* sv = GetThreadLocalSuperVersion(db_mutex);
sv->Ref(); sv->Ref();
if (!ReturnThreadLocalSuperVersion(sv)) { if (!ReturnThreadLocalSuperVersion(sv)) {
// This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion() // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
@ -1112,8 +1111,7 @@ SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
return sv; return sv;
} }
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion( SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
InstrumentedMutex* db_mutex) {
// The SuperVersion is cached in thread local storage to avoid acquiring // The SuperVersion is cached in thread local storage to avoid acquiring
// mutex when SuperVersion does not change since the last use. When a new // mutex when SuperVersion does not change since the last use. When a new
// SuperVersion is installed, the compaction or flush thread cleans up // SuperVersion is installed, the compaction or flush thread cleans up
@ -1140,16 +1138,21 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
if (sv && sv->Unref()) { if (sv && sv->Unref()) {
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS); RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
db_mutex->Lock(); db->mutex()->Lock();
// NOTE: underlying resources held by superversion (sst files) might // NOTE: underlying resources held by superversion (sst files) might
// not be released until the next background job. // not be released until the next background job.
sv->Cleanup(); sv->Cleanup();
if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
db->AddSuperVersionsToFreeQueue(sv);
db->SchedulePurge();
} else {
sv_to_delete = sv; sv_to_delete = sv;
}
} else { } else {
db_mutex->Lock(); db->mutex()->Lock();
} }
sv = super_version_->Ref(); sv = super_version_->Ref();
db_mutex->Unlock(); db->mutex()->Unlock();
delete sv_to_delete; delete sv_to_delete;
} }

@ -436,11 +436,11 @@ class ColumnFamilyData {
SuperVersion* GetSuperVersion() { return super_version_; } SuperVersion* GetSuperVersion() { return super_version_; }
// thread-safe // thread-safe
// Return a already referenced SuperVersion to be used safely. // Return a already referenced SuperVersion to be used safely.
SuperVersion* GetReferencedSuperVersion(InstrumentedMutex* db_mutex); SuperVersion* GetReferencedSuperVersion(DBImpl* db);
// thread-safe // thread-safe
// Get SuperVersion stored in thread local storage. If it does not exist, // Get SuperVersion stored in thread local storage. If it does not exist,
// get a reference from a current SuperVersion. // get a reference from a current SuperVersion.
SuperVersion* GetThreadLocalSuperVersion(InstrumentedMutex* db_mutex); SuperVersion* GetThreadLocalSuperVersion(DBImpl* db);
// Try to return SuperVersion back to thread local storage. Retrun true on // Try to return SuperVersion back to thread local storage. Retrun true on
// success and false on failure. It fails when the thread local storage // success and false on failure. It fails when the thread local storage
// contains anything other than SuperVersion::kSVInUse flag. // contains anything other than SuperVersion::kSVInUse flag.

@ -871,7 +871,7 @@ Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
column_family); column_family);
ColumnFamilyData* cfd = cfh->cfd(); ColumnFamilyData* cfd = cfh->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
Version* version = super_version->current; Version* version = super_version->current;
Status s = Status s =
@ -887,7 +887,6 @@ void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
AddToLogsToFreeQueue(l); AddToLogsToFreeQueue(l);
} }
job_context->logs_to_free.clear(); job_context->logs_to_free.clear();
SchedulePurge();
} }
} }
@ -1314,6 +1313,14 @@ void DBImpl::BackgroundCallPurge() {
delete log_writer; delete log_writer;
mutex_.Lock(); mutex_.Lock();
} }
while (!superversions_to_free_queue_.empty()) {
assert(!superversions_to_free_queue_.empty());
SuperVersion* sv = superversions_to_free_queue_.front();
superversions_to_free_queue_.pop_front();
mutex_.Unlock();
delete sv;
mutex_.Lock();
}
for (const auto& file : purge_files_) { for (const auto& file : purge_files_) {
const PurgeFileInfo& purge_file = file.second; const PurgeFileInfo& purge_file = file.second;
const std::string& fname = purge_file.fname; const std::string& fname = purge_file.fname;
@ -1366,10 +1373,14 @@ static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
state->db->FindObsoleteFiles(&job_context, false, true); state->db->FindObsoleteFiles(&job_context, false, true);
if (state->background_purge) { if (state->background_purge) {
state->db->ScheduleBgLogWriterClose(&job_context); state->db->ScheduleBgLogWriterClose(&job_context);
state->db->AddSuperVersionsToFreeQueue(state->super_version);
state->db->SchedulePurge();
} }
state->mu->Unlock(); state->mu->Unlock();
if (!state->background_purge) {
delete state->super_version; delete state->super_version;
}
if (job_context.HaveSomethingToDelete()) { if (job_context.HaveSomethingToDelete()) {
if (state->background_purge) { if (state->background_purge) {
// PurgeObsoleteFiles here does not delete files. Instead, it adds the // PurgeObsoleteFiles here does not delete files. Instead, it adds the
@ -2444,7 +2455,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
result = nullptr; result = nullptr;
#else #else
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
auto iter = new ForwardIterator(this, read_options, cfd, sv); auto iter = new ForwardIterator(this, read_options, cfd, sv);
result = NewDBIterator( result = NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
@ -2470,7 +2481,7 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
ReadCallback* read_callback, ReadCallback* read_callback,
bool allow_blob, bool allow_blob,
bool allow_refresh) { bool allow_refresh) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
// Try to generate a DB iterator tree in continuous memory area to be // Try to generate a DB iterator tree in continuous memory area to be
// cache friendly. Here is an example of result: // cache friendly. Here is an example of result:
@ -2549,7 +2560,7 @@ Status DBImpl::NewIterators(
#else #else
for (auto cfh : column_families) { for (auto cfh : column_families) {
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd(); auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
auto iter = new ForwardIterator(this, read_options, cfd, sv); auto iter = new ForwardIterator(this, read_options, cfd, sv);
iterators->push_back(NewDBIterator( iterators->push_back(NewDBIterator(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
@ -2885,7 +2896,7 @@ bool DBImpl::GetAggregatedIntProperty(const Slice& property,
SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) { SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
// TODO(ljin): consider using GetReferencedSuperVersion() directly // TODO(ljin): consider using GetReferencedSuperVersion() directly
return cfd->GetThreadLocalSuperVersion(&mutex_); return cfd->GetThreadLocalSuperVersion(this);
} }
// REQUIRED: this function should only be called on the write thread or if the // REQUIRED: this function should only be called on the write thread or if the
@ -2903,11 +2914,19 @@ SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
void DBImpl::CleanupSuperVersion(SuperVersion* sv) { void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
// Release SuperVersion // Release SuperVersion
if (sv->Unref()) { if (sv->Unref()) {
bool defer_purge =
immutable_db_options().avoid_unnecessary_blocking_io;
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
sv->Cleanup(); sv->Cleanup();
if (defer_purge) {
AddSuperVersionsToFreeQueue(sv);
SchedulePurge();
}
} }
if (!defer_purge) {
delete sv; delete sv;
}
RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS); RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
} }
RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES); RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
@ -3918,7 +3937,7 @@ Status DBImpl::IngestExternalFiles(
start_file_number += args[i - 1].external_files.size(); start_file_number += args[i - 1].external_files.size();
auto* cfd = auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd(); static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
exec_results[i].second = ingestion_jobs[i].Prepare( exec_results[i].second = ingestion_jobs[i].Prepare(
args[i].external_files, start_file_number, super_version); args[i].external_files, start_file_number, super_version);
exec_results[i].first = true; exec_results[i].first = true;
@ -3929,7 +3948,7 @@ Status DBImpl::IngestExternalFiles(
{ {
auto* cfd = auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(); static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
exec_results[0].second = ingestion_jobs[0].Prepare( exec_results[0].second = ingestion_jobs[0].Prepare(
args[0].external_files, next_file_number, super_version); args[0].external_files, next_file_number, super_version);
exec_results[0].first = true; exec_results[0].first = true;
@ -4205,7 +4224,7 @@ Status DBImpl::CreateColumnFamilyWithImport(
dummy_sv_ctx.Clean(); dummy_sv_ctx.Clean();
if (status.ok()) { if (status.ok()) {
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
status = import_job.Prepare(next_file_number, sv); status = import_job.Prepare(next_file_number, sv);
CleanupSuperVersion(sv); CleanupSuperVersion(sv);
} }
@ -4282,7 +4301,7 @@ Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
} }
std::vector<SuperVersion*> sv_list; std::vector<SuperVersion*> sv_list;
for (auto cfd : cfd_list) { for (auto cfd : cfd_list) {
sv_list.push_back(cfd->GetReferencedSuperVersion(&mutex_)); sv_list.push_back(cfd->GetReferencedSuperVersion(this));
} }
for (auto& sv : sv_list) { for (auto& sv : sv_list) {
VersionStorageInfo* vstorage = sv->current->storage_info(); VersionStorageInfo* vstorage = sv->current->storage_info();
@ -4307,14 +4326,23 @@ Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
break; break;
} }
} }
bool defer_purge =
immutable_db_options().avoid_unnecessary_blocking_io;
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
for (auto sv : sv_list) { for (auto sv : sv_list) {
if (sv && sv->Unref()) { if (sv && sv->Unref()) {
sv->Cleanup(); sv->Cleanup();
if (defer_purge) {
AddSuperVersionsToFreeQueue(sv);
} else {
delete sv; delete sv;
} }
} }
}
if (defer_purge) {
SchedulePurge();
}
for (auto cfd : cfd_list) { for (auto cfd : cfd_list) {
cfd->UnrefAndTryDelete(); cfd->UnrefAndTryDelete();
} }

@ -799,6 +799,10 @@ class DBImpl : public DB {
logs_to_free_queue_.push_back(log_writer); logs_to_free_queue_.push_back(log_writer);
} }
void AddSuperVersionsToFreeQueue(SuperVersion* sv) {
superversions_to_free_queue_.push_back(sv);
}
void SetSnapshotChecker(SnapshotChecker* snapshot_checker); void SetSnapshotChecker(SnapshotChecker* snapshot_checker);
// Fill JobContext with snapshot information needed by flush and compaction. // Fill JobContext with snapshot information needed by flush and compaction.
@ -1891,6 +1895,7 @@ class DBImpl : public DB {
// A queue to store log writers to close // A queue to store log writers to close
std::deque<log::Writer*> logs_to_free_queue_; std::deque<log::Writer*> logs_to_free_queue_;
std::deque<SuperVersion*> superversions_to_free_queue_;
int unscheduled_flushes_; int unscheduled_flushes_;
int unscheduled_compactions_; int unscheduled_compactions_;

@ -659,7 +659,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
// one/both sides of the interval are unbounded. But it requires more // one/both sides of the interval are unbounded. But it requires more
// changes to RangesOverlapWithMemtables. // changes to RangesOverlapWithMemtables.
Range range(*begin, *end); Range range(*begin, *end);
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed); cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
CleanupSuperVersion(super_version); CleanupSuperVersion(super_version);
} }

@ -407,7 +407,7 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
const ReadOptions& read_options, ColumnFamilyData* cfd, const ReadOptions& read_options, ColumnFamilyData* cfd,
SequenceNumber snapshot, ReadCallback* read_callback) { SequenceNumber snapshot, ReadCallback* read_callback) {
assert(nullptr != cfd); assert(nullptr != cfd);
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
auto db_iter = NewArenaWrappedDbIterator( auto db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options, env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
snapshot, snapshot,

@ -4210,6 +4210,42 @@ TEST_F(DBTest2, SeekFileRangeDeleteTail) {
} }
db_->ReleaseSnapshot(s1); db_->ReleaseSnapshot(s1);
} }
TEST_F(DBTest2, BackgroundPurgeTest) {
Options options = CurrentOptions();
options.write_buffer_manager = std::make_shared<rocksdb::WriteBufferManager>(1 << 20);
options.avoid_unnecessary_blocking_io = true;
DestroyAndReopen(options);
size_t base_value = options.write_buffer_manager->memory_usage();
ASSERT_OK(Put("a", "a"));
Iterator* iter = db_->NewIterator(ReadOptions());
ASSERT_OK(Flush());
size_t value = options.write_buffer_manager->memory_usage();
ASSERT_GT(value, base_value);
db_->GetEnv()->SetBackgroundThreads(1, Env::Priority::HIGH);
test::SleepingBackgroundTask sleeping_task_after;
db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_task_after, Env::Priority::HIGH);
delete iter;
Env::Default()->SleepForMicroseconds(100000);
value = options.write_buffer_manager->memory_usage();
ASSERT_GT(value, base_value);
sleeping_task_after.WakeUp();
sleeping_task_after.WaitUntilDone();
test::SleepingBackgroundTask sleeping_task_after2;
db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_task_after2, Env::Priority::HIGH);
sleeping_task_after2.WakeUp();
sleeping_task_after2.WaitUntilDone();
value = options.write_buffer_manager->memory_usage();
ASSERT_EQ(base_value, value);
}
} // namespace rocksdb } // namespace rocksdb
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS

@ -239,9 +239,13 @@ void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
db->FindObsoleteFiles(&job_context, false, true); db->FindObsoleteFiles(&job_context, false, true);
if (background_purge_on_iterator_cleanup) { if (background_purge_on_iterator_cleanup) {
db->ScheduleBgLogWriterClose(&job_context); db->ScheduleBgLogWriterClose(&job_context);
db->AddSuperVersionsToFreeQueue(sv);
db->SchedulePurge();
} }
db->mutex_.Unlock(); db->mutex_.Unlock();
if (!background_purge_on_iterator_cleanup) {
delete sv; delete sv;
}
if (job_context.HaveSomethingToDelete()) { if (job_context.HaveSomethingToDelete()) {
db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup); db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
} }
@ -614,7 +618,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
Cleanup(refresh_sv); Cleanup(refresh_sv);
if (refresh_sv) { if (refresh_sv) {
// New // New
sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); sv_ = cfd_->GetReferencedSuperVersion(db_);
} }
ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(), ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
kMaxSequenceNumber /* upper_bound */); kMaxSequenceNumber /* upper_bound */);
@ -668,7 +672,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
void ForwardIterator::RenewIterators() { void ForwardIterator::RenewIterators() {
SuperVersion* svnew; SuperVersion* svnew;
assert(sv_); assert(sv_);
svnew = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); svnew = cfd_->GetReferencedSuperVersion(db_);
if (mutable_iter_ != nullptr) { if (mutable_iter_ != nullptr) {
DeleteIterator(mutable_iter_, true /* is_arena */); DeleteIterator(mutable_iter_, true /* is_arena */);

@ -1087,10 +1087,10 @@ struct DBOptions {
// independently if the process crashes later and tries to recover. // independently if the process crashes later and tries to recover.
bool atomic_flush = false; bool atomic_flush = false;
// If true, ColumnFamilyHandle's and Iterator's destructors won't delete // If true, working thread may avoid doing unnecessary and long-latency
// obsolete files directly and will instead schedule a background job // operation (such as deleting obsolete files directly or deleting memtable)
// to do it. Use it if you're destroying iterators or ColumnFamilyHandle-s // and will instead schedule a background job to do it.
// from latency-sensitive threads. // Use it if you're latency-sensitive.
// If set to true, takes precedence over // If set to true, takes precedence over
// ReadOptions::background_purge_on_iterator_cleanup. // ReadOptions::background_purge_on_iterator_cleanup.
bool avoid_unnecessary_blocking_io = false; bool avoid_unnecessary_blocking_io = false;

Loading…
Cancel
Save