diff --git a/db/arena_wrapped_db_iter.cc b/db/arena_wrapped_db_iter.cc index 840c20e9e..c2de8db9e 100644 --- a/db/arena_wrapped_db_iter.cc +++ b/db/arena_wrapped_db_iter.cc @@ -64,7 +64,7 @@ Status ArenaWrappedDBIter::Refresh() { arena_.~Arena(); new (&arena_) Arena(); - SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); + SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_); if (read_callback_) { read_callback_->Refresh(latest_seq); } diff --git a/db/column_family.cc b/db/column_family.cc index ec4411eb2..290283608 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1098,9 +1098,8 @@ Compaction* ColumnFamilyData::CompactRange( return result; } -SuperVersion* ColumnFamilyData::GetReferencedSuperVersion( - InstrumentedMutex* db_mutex) { - SuperVersion* sv = GetThreadLocalSuperVersion(db_mutex); +SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) { + SuperVersion* sv = GetThreadLocalSuperVersion(db); sv->Ref(); if (!ReturnThreadLocalSuperVersion(sv)) { // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion() @@ -1112,8 +1111,7 @@ SuperVersion* ColumnFamilyData::GetReferencedSuperVersion( return sv; } -SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion( - InstrumentedMutex* db_mutex) { +SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) { // The SuperVersion is cached in thread local storage to avoid acquiring // mutex when SuperVersion does not change since the last use. When a new // SuperVersion is installed, the compaction or flush thread cleans up @@ -1140,16 +1138,21 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion( if (sv && sv->Unref()) { RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS); - db_mutex->Lock(); + db->mutex()->Lock(); // NOTE: underlying resources held by superversion (sst files) might // not be released until the next background job. sv->Cleanup(); - sv_to_delete = sv; + if (db->immutable_db_options().avoid_unnecessary_blocking_io) { + db->AddSuperVersionsToFreeQueue(sv); + db->SchedulePurge(); + } else { + sv_to_delete = sv; + } } else { - db_mutex->Lock(); + db->mutex()->Lock(); } sv = super_version_->Ref(); - db_mutex->Unlock(); + db->mutex()->Unlock(); delete sv_to_delete; } diff --git a/db/column_family.h b/db/column_family.h index 8529ff230..f8d0edc13 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -436,11 +436,11 @@ class ColumnFamilyData { SuperVersion* GetSuperVersion() { return super_version_; } // thread-safe // Return a already referenced SuperVersion to be used safely. - SuperVersion* GetReferencedSuperVersion(InstrumentedMutex* db_mutex); + SuperVersion* GetReferencedSuperVersion(DBImpl* db); // thread-safe // Get SuperVersion stored in thread local storage. If it does not exist, // get a reference from a current SuperVersion. - SuperVersion* GetThreadLocalSuperVersion(InstrumentedMutex* db_mutex); + SuperVersion* GetThreadLocalSuperVersion(DBImpl* db); // Try to return SuperVersion back to thread local storage. Retrun true on // success and false on failure. It fails when the thread local storage // contains anything other than SuperVersion::kSVInUse flag. diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 8bc09e8cf..5b830c363 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -871,7 +871,7 @@ Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family, column_family); ColumnFamilyData* cfd = cfh->cfd(); - SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); Version* version = super_version->current; Status s = @@ -887,7 +887,6 @@ void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) { AddToLogsToFreeQueue(l); } job_context->logs_to_free.clear(); - SchedulePurge(); } } @@ -1314,6 +1313,14 @@ void DBImpl::BackgroundCallPurge() { delete log_writer; mutex_.Lock(); } + while (!superversions_to_free_queue_.empty()) { + assert(!superversions_to_free_queue_.empty()); + SuperVersion* sv = superversions_to_free_queue_.front(); + superversions_to_free_queue_.pop_front(); + mutex_.Unlock(); + delete sv; + mutex_.Lock(); + } for (const auto& file : purge_files_) { const PurgeFileInfo& purge_file = file.second; const std::string& fname = purge_file.fname; @@ -1366,10 +1373,14 @@ static void CleanupIteratorState(void* arg1, void* /*arg2*/) { state->db->FindObsoleteFiles(&job_context, false, true); if (state->background_purge) { state->db->ScheduleBgLogWriterClose(&job_context); + state->db->AddSuperVersionsToFreeQueue(state->super_version); + state->db->SchedulePurge(); } state->mu->Unlock(); - delete state->super_version; + if (!state->background_purge) { + delete state->super_version; + } if (job_context.HaveSomethingToDelete()) { if (state->background_purge) { // PurgeObsoleteFiles here does not delete files. Instead, it adds the @@ -2444,7 +2455,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, result = nullptr; #else - SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + SuperVersion* sv = cfd->GetReferencedSuperVersion(this); auto iter = new ForwardIterator(this, read_options, cfd, sv); result = NewDBIterator( env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, @@ -2470,7 +2481,7 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, ReadCallback* read_callback, bool allow_blob, bool allow_refresh) { - SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + SuperVersion* sv = cfd->GetReferencedSuperVersion(this); // Try to generate a DB iterator tree in continuous memory area to be // cache friendly. Here is an example of result: @@ -2549,7 +2560,7 @@ Status DBImpl::NewIterators( #else for (auto cfh : column_families) { auto cfd = reinterpret_cast(cfh)->cfd(); - SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + SuperVersion* sv = cfd->GetReferencedSuperVersion(this); auto iter = new ForwardIterator(this, read_options, cfd, sv); iterators->push_back(NewDBIterator( env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, @@ -2885,7 +2896,7 @@ bool DBImpl::GetAggregatedIntProperty(const Slice& property, SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) { // TODO(ljin): consider using GetReferencedSuperVersion() directly - return cfd->GetThreadLocalSuperVersion(&mutex_); + return cfd->GetThreadLocalSuperVersion(this); } // REQUIRED: this function should only be called on the write thread or if the @@ -2903,11 +2914,19 @@ SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) { void DBImpl::CleanupSuperVersion(SuperVersion* sv) { // Release SuperVersion if (sv->Unref()) { + bool defer_purge = + immutable_db_options().avoid_unnecessary_blocking_io; { InstrumentedMutexLock l(&mutex_); sv->Cleanup(); + if (defer_purge) { + AddSuperVersionsToFreeQueue(sv); + SchedulePurge(); + } + } + if (!defer_purge) { + delete sv; } - delete sv; RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS); } RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES); @@ -3918,7 +3937,7 @@ Status DBImpl::IngestExternalFiles( start_file_number += args[i - 1].external_files.size(); auto* cfd = static_cast(args[i].column_family)->cfd(); - SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); exec_results[i].second = ingestion_jobs[i].Prepare( args[i].external_files, start_file_number, super_version); exec_results[i].first = true; @@ -3929,7 +3948,7 @@ Status DBImpl::IngestExternalFiles( { auto* cfd = static_cast(args[0].column_family)->cfd(); - SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); exec_results[0].second = ingestion_jobs[0].Prepare( args[0].external_files, next_file_number, super_version); exec_results[0].first = true; @@ -4205,7 +4224,7 @@ Status DBImpl::CreateColumnFamilyWithImport( dummy_sv_ctx.Clean(); if (status.ok()) { - SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + SuperVersion* sv = cfd->GetReferencedSuperVersion(this); status = import_job.Prepare(next_file_number, sv); CleanupSuperVersion(sv); } @@ -4282,7 +4301,7 @@ Status DBImpl::VerifyChecksum(const ReadOptions& read_options) { } std::vector sv_list; for (auto cfd : cfd_list) { - sv_list.push_back(cfd->GetReferencedSuperVersion(&mutex_)); + sv_list.push_back(cfd->GetReferencedSuperVersion(this)); } for (auto& sv : sv_list) { VersionStorageInfo* vstorage = sv->current->storage_info(); @@ -4307,14 +4326,23 @@ Status DBImpl::VerifyChecksum(const ReadOptions& read_options) { break; } } + bool defer_purge = + immutable_db_options().avoid_unnecessary_blocking_io; { InstrumentedMutexLock l(&mutex_); for (auto sv : sv_list) { if (sv && sv->Unref()) { sv->Cleanup(); - delete sv; + if (defer_purge) { + AddSuperVersionsToFreeQueue(sv); + } else { + delete sv; + } } } + if (defer_purge) { + SchedulePurge(); + } for (auto cfd : cfd_list) { cfd->UnrefAndTryDelete(); } diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 7d38ee4dd..d8c5e1c60 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -799,6 +799,10 @@ class DBImpl : public DB { logs_to_free_queue_.push_back(log_writer); } + void AddSuperVersionsToFreeQueue(SuperVersion* sv) { + superversions_to_free_queue_.push_back(sv); + } + void SetSnapshotChecker(SnapshotChecker* snapshot_checker); // Fill JobContext with snapshot information needed by flush and compaction. @@ -1891,6 +1895,7 @@ class DBImpl : public DB { // A queue to store log writers to close std::deque logs_to_free_queue_; + std::deque superversions_to_free_queue_; int unscheduled_flushes_; int unscheduled_compactions_; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 3c436eb74..cfd07c118 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -659,7 +659,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, // one/both sides of the interval are unbounded. But it requires more // changes to RangesOverlapWithMemtables. Range range(*begin, *end); - SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed); CleanupSuperVersion(super_version); } diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index c33fdb59f..d465077b0 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -407,7 +407,7 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl( const ReadOptions& read_options, ColumnFamilyData* cfd, SequenceNumber snapshot, ReadCallback* read_callback) { assert(nullptr != cfd); - SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); auto db_iter = NewArenaWrappedDbIterator( env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options, snapshot, diff --git a/db/db_test2.cc b/db/db_test2.cc index 6b0ee157e..b30dac6b5 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -4210,6 +4210,42 @@ TEST_F(DBTest2, SeekFileRangeDeleteTail) { } db_->ReleaseSnapshot(s1); } + +TEST_F(DBTest2, BackgroundPurgeTest) { + Options options = CurrentOptions(); + options.write_buffer_manager = std::make_shared(1 << 20); + options.avoid_unnecessary_blocking_io = true; + DestroyAndReopen(options); + size_t base_value = options.write_buffer_manager->memory_usage(); + + ASSERT_OK(Put("a", "a")); + Iterator* iter = db_->NewIterator(ReadOptions()); + ASSERT_OK(Flush()); + size_t value = options.write_buffer_manager->memory_usage(); + ASSERT_GT(value, base_value); + + db_->GetEnv()->SetBackgroundThreads(1, Env::Priority::HIGH); + test::SleepingBackgroundTask sleeping_task_after; + db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task_after, Env::Priority::HIGH); + delete iter; + + Env::Default()->SleepForMicroseconds(100000); + value = options.write_buffer_manager->memory_usage(); + ASSERT_GT(value, base_value); + + sleeping_task_after.WakeUp(); + sleeping_task_after.WaitUntilDone(); + + test::SleepingBackgroundTask sleeping_task_after2; + db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task_after2, Env::Priority::HIGH); + sleeping_task_after2.WakeUp(); + sleeping_task_after2.WaitUntilDone(); + + value = options.write_buffer_manager->memory_usage(); + ASSERT_EQ(base_value, value); +} } // namespace rocksdb #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index c875008c7..ae039db03 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -239,9 +239,13 @@ void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv, db->FindObsoleteFiles(&job_context, false, true); if (background_purge_on_iterator_cleanup) { db->ScheduleBgLogWriterClose(&job_context); + db->AddSuperVersionsToFreeQueue(sv); + db->SchedulePurge(); } db->mutex_.Unlock(); - delete sv; + if (!background_purge_on_iterator_cleanup) { + delete sv; + } if (job_context.HaveSomethingToDelete()) { db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup); } @@ -614,7 +618,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { Cleanup(refresh_sv); if (refresh_sv) { // New - sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); + sv_ = cfd_->GetReferencedSuperVersion(db_); } ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(), kMaxSequenceNumber /* upper_bound */); @@ -668,7 +672,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { void ForwardIterator::RenewIterators() { SuperVersion* svnew; assert(sv_); - svnew = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); + svnew = cfd_->GetReferencedSuperVersion(db_); if (mutable_iter_ != nullptr) { DeleteIterator(mutable_iter_, true /* is_arena */); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 9abefbcab..86e8f8280 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1087,10 +1087,10 @@ struct DBOptions { // independently if the process crashes later and tries to recover. bool atomic_flush = false; - // If true, ColumnFamilyHandle's and Iterator's destructors won't delete - // obsolete files directly and will instead schedule a background job - // to do it. Use it if you're destroying iterators or ColumnFamilyHandle-s - // from latency-sensitive threads. + // If true, working thread may avoid doing unnecessary and long-latency + // operation (such as deleting obsolete files directly or deleting memtable) + // and will instead schedule a background job to do it. + // Use it if you're latency-sensitive. // If set to true, takes precedence over // ReadOptions::background_purge_on_iterator_cleanup. bool avoid_unnecessary_blocking_io = false;