Support ingest file when range deletions exist

Summary:
Previously we returned NotSupported when ingesting files into a database containing any range deletions. This diff adds the support.

- Flush if any memtable contains range deletions overlapping the to-be-ingested file
- Place to-be-ingested file before any level that contains range deletions overlapping it.
- Added support for `Version` to return iterators over range deletions in a given level. Previously, we piggybacked getting range deletions onto `Version`'s `Get()` / `AddIterator()` functions by passing them a `RangeDelAggregator*`. But file ingestion needs to get iterators over range deletions, not populate an aggregator (since the aggregator does collapsing and doesn't expose the actual ranges).
Closes https://github.com/facebook/rocksdb/pull/2370

Differential Revision: D5127648

Pulled By: ajkr

fbshipit-source-id: 816faeb9708adfa5287962bafdde717db56e3f1a
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent ad19eb8686
commit 9c9909bf7d
  1. 49
      db/external_sst_file_basic_test.cc
  2. 64
      db/external_sst_file_ingestion_job.cc
  3. 7
      db/external_sst_file_ingestion_job.h
  4. 8
      db/memtable_list.cc
  5. 2
      db/memtable_list.h
  6. 36
      db/table_cache.cc
  7. 6
      db/table_cache.h
  8. 13
      db/version_set.cc
  9. 5
      db/version_set.h

@ -559,6 +559,55 @@ TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_F(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
Reopen(options);
std::map<std::string, std::string> true_data;
int file_id = 1;
// prevent range deletions from being dropped due to becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot();
// range del [0, 50) in L0 file, [50, 100) in memtable
for (int i = 0; i < 2; i++) {
if (i == 1) {
db_->Flush(FlushOptions());
}
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(50 * i), Key(50 * (i + 1))));
}
ASSERT_EQ(1, NumTableFilesAtLevel(0));
// overlaps with L0 file but not memtable, so flush is skipped
SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
ASSERT_OK(GenerateAndAddExternalFile(
options, {10, 40}, {ValueType::kTypeValue, ValueType::kTypeValue},
file_id++, &true_data));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno);
ASSERT_EQ(2, NumTableFilesAtLevel(0));
// overlaps with memtable, so flush is triggered (thus file count increases by
// two at this step).
ASSERT_OK(GenerateAndAddExternalFile(
options, {50, 90}, {ValueType::kTypeValue, ValueType::kTypeValue},
file_id++, &true_data));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno);
ASSERT_EQ(4, NumTableFilesAtLevel(0));
// snapshot unneeded now that both range deletions are persisted
db_->ReleaseSnapshot(snapshot);
// overlaps with nothing, so places at bottom level and skips incrementing
// seqnum.
ASSERT_OK(GenerateAndAddExternalFile(
options, {101, 125}, {ValueType::kTypeValue, ValueType::kTypeValue},
file_id++, &true_data));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
ASSERT_EQ(4, NumTableFilesAtLevel(0));
ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1));
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace rocksdb } // namespace rocksdb

@ -381,6 +381,14 @@ Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables(
sv->imm->AddIterators(ro, &merge_iter_builder); sv->imm->AddIterators(ro, &merge_iter_builder);
ScopedArenaIterator memtable_iter(merge_iter_builder.Finish()); ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
MergeIteratorBuilder merge_range_del_iter_builder(
&cfd_->internal_comparator(), &arena);
merge_range_del_iter_builder.AddIterator(
sv->mem->NewRangeTombstoneIterator(ro));
sv->imm->AddRangeTombstoneIterators(ro, &merge_range_del_iter_builder);
ScopedArenaIterator memtable_range_del_iter(
merge_range_del_iter_builder.Finish());
Status status; Status status;
*overlap = false; *overlap = false;
for (IngestedFileInfo& f : files_to_ingest_) { for (IngestedFileInfo& f : files_to_ingest_) {
@ -389,6 +397,11 @@ Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables(
if (!status.ok() || *overlap == true) { if (!status.ok() || *overlap == true) {
break; break;
} }
status = IngestedFileOverlapWithRangeDeletions(
&f, memtable_range_del_iter.get(), overlap);
if (!status.ok() || *overlap == true) {
break;
}
} }
return status; return status;
@ -555,6 +568,34 @@ Status ExternalSstFileIngestionJob::IngestedFileOverlapWithIteratorRange(
return iter->status(); return iter->status();
} }
Status ExternalSstFileIngestionJob::IngestedFileOverlapWithRangeDeletions(
const IngestedFileInfo* file_to_ingest, InternalIterator* range_del_iter,
bool* overlap) {
auto* vstorage = cfd_->current()->storage_info();
auto* ucmp = vstorage->InternalComparator()->user_comparator();
*overlap = false;
if (range_del_iter != nullptr) {
for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
range_del_iter->Next()) {
ParsedInternalKey parsed_key;
if (!ParseInternalKey(range_del_iter->key(), &parsed_key)) {
return Status::Corruption("corrupted range deletion key: " +
range_del_iter->key().ToString());
}
RangeTombstone range_del(parsed_key, range_del_iter->value());
if (ucmp->Compare(range_del.start_key_,
file_to_ingest->largest_user_key) <= 0 &&
ucmp->Compare(file_to_ingest->smallest_user_key,
range_del.end_key_) <= 0) {
*overlap = true;
break;
}
}
}
return Status::OK();
}
bool ExternalSstFileIngestionJob::IngestedFileFitInLevel( bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
const IngestedFileInfo* file_to_ingest, int level) { const IngestedFileInfo* file_to_ingest, int level) {
if (level == 0) { if (level == 0) {
@ -591,17 +632,22 @@ Status ExternalSstFileIngestionJob::IngestedFileOverlapWithLevel(
ro.total_order_seek = true; ro.total_order_seek = true;
MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(), MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(),
&arena); &arena);
RangeDelAggregator range_del_agg(cfd_->internal_comparator(), MergeIteratorBuilder merge_range_del_iter_builder(
{} /* snapshots */); &cfd_->internal_comparator(), &arena);
sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder, sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder, lvl,
lvl, &range_del_agg); nullptr /* range_del_agg */);
if (!range_del_agg.IsEmpty()) { sv->current->AddRangeDelIteratorsForLevel(ro, env_options_,
return Status::NotSupported( &merge_range_del_iter_builder, lvl);
"file ingestion with range tombstones is currently unsupported");
}
ScopedArenaIterator level_iter(merge_iter_builder.Finish()); ScopedArenaIterator level_iter(merge_iter_builder.Finish());
return IngestedFileOverlapWithIteratorRange( ScopedArenaIterator level_range_del_iter(
merge_range_del_iter_builder.Finish());
Status status = IngestedFileOverlapWithIteratorRange(
file_to_ingest, level_iter.get(), overlap_with_level); file_to_ingest, level_iter.get(), overlap_with_level);
if (status.ok() && *overlap_with_level == false) {
status = IngestedFileOverlapWithRangeDeletions(
file_to_ingest, level_range_del_iter.get(), overlap_with_level);
}
return status;
} }
} // namespace rocksdb } // namespace rocksdb

@ -141,6 +141,13 @@ class ExternalSstFileIngestionJob {
const IngestedFileInfo* file_to_ingest, InternalIterator* iter, const IngestedFileInfo* file_to_ingest, InternalIterator* iter,
bool* overlap); bool* overlap);
// Check if `file_to_ingest` key range overlaps with any range deletions
// specified by `iter`.
// REQUIRES: Mutex held
Status IngestedFileOverlapWithRangeDeletions(
const IngestedFileInfo* file_to_ingest, InternalIterator* range_del_iter,
bool* overlap);
// Check if `file_to_ingest` key range overlap with level // Check if `file_to_ingest` key range overlap with level
// REQUIRES: Mutex held // REQUIRES: Mutex held
Status IngestedFileOverlapWithLevel(SuperVersion* sv, Status IngestedFileOverlapWithLevel(SuperVersion* sv,

@ -168,6 +168,14 @@ Status MemTableListVersion::AddRangeTombstoneIterators(
return Status::OK(); return Status::OK();
} }
Status MemTableListVersion::AddRangeTombstoneIterators(
const ReadOptions& read_opts, MergeIteratorBuilder* merge_iter_builder) {
for (auto& m : memlist_) {
merge_iter_builder->AddIterator(m->NewRangeTombstoneIterator(read_opts));
}
return Status::OK();
}
void MemTableListVersion::AddIterators( void MemTableListVersion::AddIterators(
const ReadOptions& options, std::vector<InternalIterator*>* iterator_list, const ReadOptions& options, std::vector<InternalIterator*>* iterator_list,
Arena* arena) { Arena* arena) {

@ -84,6 +84,8 @@ class MemTableListVersion {
Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena, Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena,
RangeDelAggregator* range_del_agg); RangeDelAggregator* range_del_agg);
Status AddRangeTombstoneIterators(const ReadOptions& read_opts,
MergeIteratorBuilder* merge_iter_builder);
void AddIterators(const ReadOptions& options, void AddIterators(const ReadOptions& options,
std::vector<InternalIterator*>* iterator_list, std::vector<InternalIterator*>* iterator_list,

@ -262,6 +262,42 @@ InternalIterator* TableCache::NewIterator(
return result; return result;
} }
InternalIterator* TableCache::NewRangeTombstoneIterator(
const ReadOptions& options, const EnvOptions& env_options,
const InternalKeyComparator& icomparator, const FileDescriptor& fd,
HistogramImpl* file_read_hist, bool skip_filters, int level) {
Status s;
TableReader* table_reader = nullptr;
Cache::Handle* handle = nullptr;
table_reader = fd.table_reader;
if (table_reader == nullptr) {
s = FindTable(env_options, icomparator, fd, &handle,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record read_stats */, file_read_hist, skip_filters,
level);
if (s.ok()) {
table_reader = GetTableReaderFromHandle(handle);
}
}
InternalIterator* result = nullptr;
if (s.ok()) {
result = table_reader->NewRangeTombstoneIterator(options);
if (result != nullptr) {
result->RegisterCleanup(&UnrefEntry, cache_, handle);
}
}
if (result == nullptr && handle != nullptr) {
// the range deletion block didn't exist, or there was a failure between
// getting handle and getting iterator.
ReleaseHandle(handle);
}
if (!s.ok()) {
assert(result == nullptr);
result = NewErrorInternalIterator(s);
}
return result;
}
Status TableCache::Get(const ReadOptions& options, Status TableCache::Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator, const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, const Slice& k, const FileDescriptor& fd, const Slice& k,

@ -60,6 +60,12 @@ class TableCache {
HistogramImpl* file_read_hist = nullptr, bool for_compaction = false, HistogramImpl* file_read_hist = nullptr, bool for_compaction = false,
Arena* arena = nullptr, bool skip_filters = false, int level = -1); Arena* arena = nullptr, bool skip_filters = false, int level = -1);
InternalIterator* NewRangeTombstoneIterator(
const ReadOptions& options, const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& file_fd, HistogramImpl* file_read_hist,
bool skip_filters, int level);
// If a seek to internal key "k" in specified file finds an entry, // If a seek to internal key "k" in specified file finds an entry,
// call (*handle_result)(arg, found_key, found_value) repeatedly until // call (*handle_result)(arg, found_key, found_value) repeatedly until
// it returns false. // it returns false.

@ -865,6 +865,19 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
} }
} }
void Version::AddRangeDelIteratorsForLevel(
const ReadOptions& read_options, const EnvOptions& soptions,
MergeIteratorBuilder* merge_iter_builder, int level) {
for (size_t i = 0; i < storage_info_.LevelFilesBrief(level).num_files; i++) {
const auto& file = storage_info_.LevelFilesBrief(level).files[i];
merge_iter_builder->AddIterator(
cfd_->table_cache()->NewRangeTombstoneIterator(
read_options, soptions, cfd_->internal_comparator(), file.fd,
cfd_->internal_stats()->GetFileReadHist(level),
false /* skip_filters */, level));
}
}
VersionStorageInfo::VersionStorageInfo( VersionStorageInfo::VersionStorageInfo(
const InternalKeyComparator* internal_comparator, const InternalKeyComparator* internal_comparator,
const Comparator* user_comparator, int levels, const Comparator* user_comparator, int levels,

@ -463,6 +463,11 @@ class Version {
MergeIteratorBuilder* merger_iter_builder, MergeIteratorBuilder* merger_iter_builder,
int level, RangeDelAggregator* range_del_agg); int level, RangeDelAggregator* range_del_agg);
void AddRangeDelIteratorsForLevel(const ReadOptions& read_options,
const EnvOptions& soptions,
MergeIteratorBuilder* merge_iter_builder,
int level);
// Lookup the value for key. If found, store it in *val and // Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status. // return OK. Else return a non-OK status.
// Uses *operands to store merge_operator operations to apply later. // Uses *operands to store merge_operator operations to apply later.

Loading…
Cancel
Save