diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 2b91e0051..89b977ce0 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -407,8 +407,9 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( if (vstorage->NumLevelFiles(lvl) > 0) { bool overlap_with_level = false; - status = IngestedFileOverlapWithLevel(sv, file_to_ingest, lvl, - &overlap_with_level); + status = sv->current->OverlapWithLevelIterator(ro, env_options_, + file_to_ingest->smallest_user_key, file_to_ingest->largest_user_key, + lvl, &overlap_with_level); if (!status.ok()) { return status; } @@ -515,34 +516,6 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile( return status; } -Status ExternalSstFileIngestionJob::IngestedFileOverlapWithIteratorRange( - const IngestedFileInfo* file_to_ingest, InternalIterator* iter, - bool* overlap) { - auto* vstorage = cfd_->current()->storage_info(); - auto* ucmp = vstorage->InternalComparator()->user_comparator(); - InternalKey range_start(file_to_ingest->smallest_user_key, kMaxSequenceNumber, - kValueTypeForSeek); - iter->Seek(range_start.Encode()); - if (!iter->status().ok()) { - return iter->status(); - } - - *overlap = false; - if (iter->Valid()) { - ParsedInternalKey seek_result; - if (!ParseInternalKey(iter->key(), &seek_result)) { - return Status::Corruption("DB have corrupted keys"); - } - - if (ucmp->Compare(seek_result.user_key, file_to_ingest->largest_user_key) <= - 0) { - *overlap = true; - } - } - - return iter->status(); -} - bool ExternalSstFileIngestionJob::IngestedFileFitInLevel( const IngestedFileInfo* file_to_ingest, int level) { if (level == 0) { @@ -571,38 +544,6 @@ bool ExternalSstFileIngestionJob::IngestedFileFitInLevel( return true; } -Status ExternalSstFileIngestionJob::IngestedFileOverlapWithLevel( - SuperVersion* sv, IngestedFileInfo* file_to_ingest, int lvl, - bool* overlap_with_level) { - Arena arena; - ReadOptions ro; - ro.total_order_seek = true; - MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(), - &arena); - // Files are opened lazily when the iterator needs them, thus range deletions - // are also added lazily to the aggregator. We need to check for range - // deletion overlap only in the case where there's no point-key overlap. Then, - // we've already opened the file with range containing the ingested file's - // begin key, and iterated through all files until the one containing the - // ingested file's end key. So any files maybe containing range deletions - // overlapping the ingested file must have been opened and had their range - // deletions added to the aggregator. - RangeDelAggregator range_del_agg(cfd_->internal_comparator(), - {} /* snapshots */, - false /* collapse_deletions */); - sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder, lvl, - &range_del_agg); - ScopedArenaIterator level_iter(merge_iter_builder.Finish()); - Status status = IngestedFileOverlapWithIteratorRange( - file_to_ingest, level_iter.get(), overlap_with_level); - if (status.ok() && *overlap_with_level == false && - range_del_agg.IsRangeOverlapped(file_to_ingest->smallest_user_key, - file_to_ingest->largest_user_key)) { - *overlap_with_level = true; - } - return status; -} - } // namespace rocksdb #endif // !ROCKSDB_LITE diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 6a3641de7..ea0a7c46f 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -133,17 +133,6 @@ class ExternalSstFileIngestionJob { Status AssignGlobalSeqnoForIngestedFile(IngestedFileInfo* file_to_ingest, SequenceNumber seqno); - // Check if `file_to_ingest` key range overlap with the range `iter` represent - // REQUIRES: Mutex held - Status IngestedFileOverlapWithIteratorRange( - const IngestedFileInfo* file_to_ingest, InternalIterator* iter, - bool* overlap); - - // Check if `file_to_ingest` key range overlap with level - // REQUIRES: Mutex held - Status IngestedFileOverlapWithLevel(SuperVersion* sv, - IngestedFileInfo* file_to_ingest, int lvl, bool* overlap_with_level); - // Check if `file_to_ingest` can fit in level `level` // REQUIRES: Mutex held bool IngestedFileFitInLevel(const IngestedFileInfo* file_to_ingest, diff --git a/db/version_set.cc b/db/version_set.cc index 43fe0f53f..b7a62d5e7 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -78,6 +78,33 @@ int FindFileInRange(const InternalKeyComparator& icmp, return right; } +Status OverlapWithIterator(const Comparator* ucmp, + const Slice& smallest_user_key, + const Slice& largest_user_key, + InternalIterator* iter, + bool* overlap) { + InternalKey range_start(smallest_user_key, kMaxSequenceNumber, + kValueTypeForSeek); + iter->Seek(range_start.Encode()); + if (!iter->status().ok()) { + return iter->status(); + } + + *overlap = false; + if (iter->Valid()) { + ParsedInternalKey seek_result; + if (!ParseInternalKey(iter->key(), &seek_result)) { + return Status::Corruption("DB have corrupted keys"); + } + + if (ucmp->Compare(seek_result.user_key, largest_user_key) <= 0) { + *overlap = true; + } + } + + return iter->status(); +} + // Class to help choose the next file to search for the particular key. // Searches and returns files level by level. // We can search level-by-level since entries never hop across @@ -1010,6 +1037,59 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, } } +Status Version::OverlapWithLevelIterator(const ReadOptions& read_options, + const EnvOptions& env_options, + const Slice& smallest_user_key, + const Slice& largest_user_key, + int level, bool* overlap) { + assert(storage_info_.finalized_); + + auto icmp = cfd_->internal_comparator(); + auto ucmp = icmp.user_comparator(); + + Arena arena; + Status status; + RangeDelAggregator range_del_agg(icmp, {}, false); + + *overlap = false; + + if (level == 0) { + for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) { + const auto file = &storage_info_.LevelFilesBrief(0).files[i]; + if (AfterFile(ucmp, &smallest_user_key, file) || + BeforeFile(ucmp, &largest_user_key, file)) { + continue; + } + ScopedArenaIterator iter(cfd_->table_cache()->NewIterator( + read_options, env_options, cfd_->internal_comparator(), file->fd, + &range_del_agg, nullptr, cfd_->internal_stats()->GetFileReadHist(0), + false, &arena, false /* skip_filters */, 0 /* level */)); + status = OverlapWithIterator( + ucmp, smallest_user_key, largest_user_key, iter.get(), overlap); + if (!status.ok() || *overlap) { + break; + } + } + } else if (storage_info_.LevelFilesBrief(level).num_files > 0) { + auto mem = arena.AllocateAligned(sizeof(LevelIterator)); + ScopedArenaIterator iter(new (mem) LevelIterator( + cfd_->table_cache(), read_options, env_options, + cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level), + should_sample_file_read(), + cfd_->internal_stats()->GetFileReadHist(level), + false /* for_compaction */, IsFilterSkipped(level), level, + &range_del_agg)); + status = OverlapWithIterator( + ucmp, smallest_user_key, largest_user_key, iter.get(), overlap); + } + + if (status.ok() && *overlap == false && + range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) { + *overlap = true; + } + return status; +} + VersionStorageInfo::VersionStorageInfo( const InternalKeyComparator* internal_comparator, const Comparator* user_comparator, int levels, diff --git a/db/version_set.h b/db/version_set.h index 4f9832ac9..832857f63 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -525,6 +525,11 @@ class Version { MergeIteratorBuilder* merger_iter_builder, int level, RangeDelAggregator* range_del_agg); + Status OverlapWithLevelIterator(const ReadOptions&, const EnvOptions&, + const Slice& smallest_user_key, + const Slice& largest_user_key, + int level, bool* overlap); + // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. // Uses *operands to store merge_operator operations to apply later.