From 3ae00472782571770e804aa177c9a813f9f82373 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Tue, 27 Feb 2018 17:08:34 -0800 Subject: [PATCH] skip CompactRange flush based on memtable contents Summary: CompactRange has a call to Flush because we guarantee that, at the time it's called, all existing keys in the range will be pushed through the user's compaction filter. However, previously the flush was done blindly, so it'd happen even if the memtable does not contain keys in the range specified by the user. This caused unnecessarily many L0 files to be created, leading to write stalls in some cases. This PR checks the memtable's contents, and decides to flush only if it overlaps with `CompactRange`'s range. - Move the memtable overlap check logic from `ExternalSstFileIngestionJob` to `ColumnFamilyData::RangesOverlapWithMemtables` - Reuse the above logic in `CompactRange` and skip flushing if no overlap Closes https://github.com/facebook/rocksdb/pull/3520 Differential Revision: D7018897 Pulled By: ajkr fbshipit-source-id: a3c6b1cfae56687b49dd89ccac7c948e53545934 --- HISTORY.md | 3 ++ db/column_family.cc | 63 ++++++++++++++++++++++++++ db/column_family.h | 10 +++++ db/db_compaction_test.cc | 58 ++++++++++++++++++++++++ db/db_impl.cc | 2 +- db/db_impl_compaction_flush.cc | 12 ++++- db/external_sst_file_ingestion_job.cc | 64 +++++---------------------- db/external_sst_file_ingestion_job.h | 12 ++--- include/rocksdb/db.h | 4 +- 9 files changed, 164 insertions(+), 64 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 8f28c0cee..7c2d70d96 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,9 @@ ### Public API Change * RocksDBOptionsParser::Parse()'s `ignore_unknown_options` argument will only be effective if the option file shows it is generated using a higher version of RocksDB than the current version. +### New Features +* Avoid unnecessarily flushing in `CompactRange()` when the range specified by the user does not overlap unflushed memtables. + ## 5.12.0 (2/14/2018) ### Public API Change * Iterator::SeekForPrev is now a pure virtual method. This is to prevent user who implement the Iterator interface fail to implement SeekForPrev by mistake. diff --git a/db/column_family.cc b/db/column_family.cc index 5824d7b54..983be9e41 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -31,6 +31,7 @@ #include "monitoring/thread_status_util.h" #include "options/options_helper.h" #include "table/block_based_table_factory.h" +#include "table/merging_iterator.h" #include "util/autovector.h" #include "util/compression.h" @@ -881,6 +882,68 @@ bool ColumnFamilyData::RangeOverlapWithCompaction( smallest_user_key, largest_user_key, level); } +Status ColumnFamilyData::RangesOverlapWithMemtables( + const autovector& ranges, SuperVersion* super_version, + bool* overlap) { + assert(overlap != nullptr); + *overlap = false; + // Create an InternalIterator over all unflushed memtables + Arena arena; + ReadOptions read_opts; + read_opts.total_order_seek = true; + MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena); + merge_iter_builder.AddIterator( + super_version->mem->NewIterator(read_opts, &arena)); + super_version->imm->AddIterators(read_opts, &merge_iter_builder); + ScopedArenaIterator memtable_iter(merge_iter_builder.Finish()); + + std::vector memtable_range_del_iters; + auto* active_range_del_iter = + super_version->mem->NewRangeTombstoneIterator(read_opts); + if (active_range_del_iter != nullptr) { + memtable_range_del_iters.push_back(active_range_del_iter); + } + super_version->imm->AddRangeTombstoneIterators(read_opts, + &memtable_range_del_iters); + RangeDelAggregator range_del_agg(internal_comparator_, {} /* snapshots */, + false /* collapse_deletions */); + Status status; + { + std::unique_ptr memtable_range_del_iter( + NewMergingIterator(&internal_comparator_, + memtable_range_del_iters.empty() + ? nullptr + : &memtable_range_del_iters[0], + static_cast(memtable_range_del_iters.size()))); + status = range_del_agg.AddTombstones(std::move(memtable_range_del_iter)); + } + for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) { + auto* vstorage = super_version->current->storage_info(); + auto* ucmp = vstorage->InternalComparator()->user_comparator(); + InternalKey range_start(ranges[i].start, kMaxSequenceNumber, + kValueTypeForSeek); + memtable_iter->Seek(range_start.Encode()); + status = memtable_iter->status(); + ParsedInternalKey seek_result; + if (status.ok()) { + if (memtable_iter->Valid() && + !ParseInternalKey(memtable_iter->key(), &seek_result)) { + status = Status::Corruption("DB have corrupted keys"); + } + } + if (status.ok()) { + if (memtable_iter->Valid() && + ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) { + *overlap = true; + } else if (range_del_agg.IsRangeOverlapped(ranges[i].start, + ranges[i].limit)) { + *overlap = true; + } + } + } + return status; +} + const int ColumnFamilyData::kCompactAllLevels = -1; const int ColumnFamilyData::kCompactToBaseLevel = -2; diff --git a/db/column_family.h b/db/column_family.h index e5abb485e..51a88d9d8 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -274,6 +274,16 @@ class ColumnFamilyData { const Slice& largest_user_key, int level) const; + // Check if the passed ranges overlap with any unflushed memtables + // (immutable or mutable). + // + // @param super_version A referenced SuperVersion that will be held for the + // duration of this function. + // + // Thread-safe + Status RangesOverlapWithMemtables(const autovector& ranges, + SuperVersion* super_version, bool* overlap); + // A flag to tell a manual compaction is to compact all levels together // instead of a specific level. static const int kCompactAllLevels; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 3b77228d8..654922b6e 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -3317,6 +3317,64 @@ TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(DBCompactionTest, CompactRangeFlushOverlappingMemtable) { + // Verify memtable only gets flushed if it contains data overlapping the range + // provided to `CompactRange`. Tests all kinds of overlap/non-overlap. + const int kNumEndpointKeys = 5; + std::string keys[kNumEndpointKeys] = {"a", "b", "c", "d", "e"}; + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + Reopen(options); + + // One extra iteration for nullptr, which means left side of interval is + // unbounded. + for (int i = 0; i <= kNumEndpointKeys; ++i) { + Slice begin; + Slice* begin_ptr; + if (i == 0) { + begin_ptr = nullptr; + } else { + begin = keys[i - 1]; + begin_ptr = &begin; + } + // Start at `i` so right endpoint comes after left endpoint. One extra + // iteration for nullptr, which means right side of interval is unbounded. + for (int j = std::max(0, i - 1); j <= kNumEndpointKeys; ++j) { + Slice end; + Slice* end_ptr; + if (j == kNumEndpointKeys) { + end_ptr = nullptr; + } else { + end = keys[j]; + end_ptr = &end; + } + ASSERT_OK(Put("b", "val")); + ASSERT_OK(Put("d", "val")); + CompactRangeOptions compact_range_opts; + ASSERT_OK(db_->CompactRange(compact_range_opts, begin_ptr, end_ptr)); + + uint64_t get_prop_tmp, num_memtable_entries = 0; + ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesImmMemTables, + &get_prop_tmp)); + num_memtable_entries += get_prop_tmp; + ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable, + &get_prop_tmp)); + num_memtable_entries += get_prop_tmp; + if (begin_ptr == nullptr || end_ptr == nullptr || + (i <= 4 && j >= 1 && (begin != "c" || end != "c"))) { + // In this case `CompactRange`'s range overlapped in some way with the + // memtable's range, so flush should've happened. Then "b" and "d" won't + // be in the memtable. + ASSERT_EQ(0, num_memtable_entries); + } else { + ASSERT_EQ(2, num_memtable_entries); + // flush anyways to prepare for next iteration + db_->Flush(FlushOptions()); + } + } + } +} + INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam, ::testing::Values(std::make_tuple(1, true), std::make_tuple(1, false), diff --git a/db/db_impl.cc b/db/db_impl.cc index f47f2c011..1188a0f34 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2832,7 +2832,7 @@ Status DBImpl::IngestExternalFile( // Figure out if we need to flush the memtable first if (status.ok()) { bool need_flush = false; - status = ingestion_job.NeedsFlush(&need_flush); + status = ingestion_job.NeedsFlush(&need_flush, cfd->GetSuperVersion()); TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush", &need_flush); if (status.ok() && need_flush) { diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 6bf5beddc..178f9a217 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -291,7 +291,17 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, bool exclusive = options.exclusive_manual_compaction; bool flush_needed = true; - if (!options.allow_write_stall) { + if (begin != nullptr && end != nullptr) { + // TODO(ajkr): We could also optimize away the flush in certain cases where + // one/both sides of the interval are unbounded. But it requires more + // changes to RangesOverlapWithMemtables. + Range range(*begin, *end); + SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed); + CleanupSuperVersion(super_version); + } + + if (!options.allow_write_stall && flush_needed) { InstrumentedMutexLock l(&mutex_); uint64_t orig_active_memtable_id = cfd->mem()->GetID(); WriteStallCondition write_stall_condition = WriteStallCondition::kNormal; diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 8c2963508..2b91e0051 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -132,11 +132,15 @@ Status ExternalSstFileIngestionJob::Prepare( return status; } -Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed) { - SuperVersion* super_version = cfd_->GetSuperVersion(); +Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed, + SuperVersion* super_version) { + autovector ranges; + for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) { + ranges.emplace_back(file_to_ingest.smallest_user_key, + file_to_ingest.largest_user_key); + } Status status = - IngestedFilesOverlapWithMemtables(super_version, flush_needed); - + cfd_->RangesOverlapWithMemtables(ranges, super_version, flush_needed); if (status.ok() && *flush_needed && !ingestion_options_.allow_blocking_flush) { status = Status::InvalidArgument("External file requires flush"); @@ -148,11 +152,12 @@ Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed) { // nonmem_write_thread_ Status ExternalSstFileIngestionJob::Run() { Status status; + SuperVersion* super_version = cfd_->GetSuperVersion(); #ifndef NDEBUG // We should never run the job with a memtable that is overlapping // with the files we are ingesting bool need_flush = false; - status = NeedsFlush(&need_flush); + status = NeedsFlush(&need_flush, super_version); assert(status.ok() && need_flush == false); #endif @@ -167,7 +172,6 @@ Status ExternalSstFileIngestionJob::Run() { // It is safe to use this instead of LastAllocatedSequence since we are // the only active writer, and hence they are equal const SequenceNumber last_seqno = versions_->LastSequence(); - SuperVersion* super_version = cfd_->GetSuperVersion(); edit_.SetColumnFamily(cfd_->GetID()); // The levels that the files will be ingested into @@ -375,54 +379,6 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo( return status; } -Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables( - SuperVersion* sv, bool* overlap) { - *overlap = false; - // Create an InternalIterator over all memtables - Arena arena; - ReadOptions ro; - ro.total_order_seek = true; - MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(), &arena); - merge_iter_builder.AddIterator(sv->mem->NewIterator(ro, &arena)); - sv->imm->AddIterators(ro, &merge_iter_builder); - ScopedArenaIterator memtable_iter(merge_iter_builder.Finish()); - - std::vector memtable_range_del_iters; - auto* active_range_del_iter = sv->mem->NewRangeTombstoneIterator(ro); - if (active_range_del_iter != nullptr) { - memtable_range_del_iters.push_back(active_range_del_iter); - } - sv->imm->AddRangeTombstoneIterators(ro, &memtable_range_del_iters); - RangeDelAggregator range_del_agg(cfd_->internal_comparator(), - {} /* snapshots */, - false /* collapse_deletions */); - Status status; - { - std::unique_ptr memtable_range_del_iter( - NewMergingIterator(&cfd_->internal_comparator(), - memtable_range_del_iters.empty() - ? nullptr - : &memtable_range_del_iters[0], - static_cast(memtable_range_del_iters.size()))); - status = range_del_agg.AddTombstones(std::move(memtable_range_del_iter)); - } - if (status.ok()) { - for (IngestedFileInfo& f : files_to_ingest_) { - status = IngestedFileOverlapWithIteratorRange(&f, memtable_iter.get(), - overlap); - if (!status.ok() || *overlap == true) { - break; - } - if (range_del_agg.IsRangeOverlapped(f.smallest_user_key, - f.largest_user_key)) { - *overlap = true; - break; - } - } - } - return status; -} - Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style, IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) { diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index e42c50603..6a3641de7 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -84,8 +84,12 @@ class ExternalSstFileIngestionJob { // Check if we need to flush the memtable before running the ingestion job // This will be true if the files we are ingesting are overlapping with any // key range in the memtable. - // REQUIRES: Mutex held - Status NeedsFlush(bool* flush_needed); + // + // @param super_version A referenced SuperVersion that will be held for the + // duration of this function. + // + // Thread-safe + Status NeedsFlush(bool* flush_needed, SuperVersion* super_version); // Will execute the ingestion job and prepare edit() to be applied. // REQUIRES: Mutex held @@ -110,10 +114,6 @@ class ExternalSstFileIngestionJob { Status GetIngestedFileInfo(const std::string& external_file, IngestedFileInfo* file_to_ingest); - // Check if the files we are ingesting overlap with any memtable. - // REQUIRES: Mutex held - Status IngestedFilesOverlapWithMemtables(SuperVersion* sv, bool* overlap); - // Assign `file_to_ingest` the appropriate sequence number and the lowest // possible level that it can be ingested to according to compaction_style. // REQUIRES: Mutex held diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 8b4d7ae56..909d33890 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -93,8 +93,8 @@ static const int kMinorVersion = __ROCKSDB_MINOR__; // A range of keys struct Range { - Slice start; // Included in the range - Slice limit; // Not included in the range + Slice start; + Slice limit; Range() { } Range(const Slice& s, const Slice& l) : start(s), limit(l) { }