diff --git a/db/convenience.cc b/db/convenience.cc index 17f781252..a9d113ff1 100644 --- a/db/convenience.cc +++ b/db/convenience.cc @@ -18,6 +18,13 @@ namespace rocksdb { void CancelAllBackgroundWork(DB* db, bool wait) { (dynamic_cast(db))->CancelAllBackgroundWork(wait); } + +Status DeleteFilesInRange(DB* db, ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) { + return (dynamic_cast(db)) + ->DeleteFilesInRange(column_family, begin, end); +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 19cc94a76..7d82191e3 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -10,6 +10,7 @@ #include "db/db_test_util.h" #include "port/stack_trace.h" #include "rocksdb/experimental.h" +#include "rocksdb/utilities/convenience.h" #include "util/sync_point.h" namespace rocksdb { @@ -1140,6 +1141,112 @@ TEST_F(DBCompactionTest, ManualPartialFill) { } } +TEST_F(DBCompactionTest, DeleteFileRange) { + Options options = CurrentOptions(); + options.write_buffer_size = 10 * 1024 * 1024; + options.max_bytes_for_level_multiplier = 2; + options.num_levels = 4; + options.level0_file_num_compaction_trigger = 3; + options.max_background_compactions = 3; + + DestroyAndReopen(options); + int32_t value_size = 10 * 1024; // 10 KB + + // Add 2 non-overlapping files + Random rnd(301); + std::map values; + + // file 1 [0 => 100] + for (int32_t i = 0; i < 100; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // file 2 [100 => 300] + for (int32_t i = 100; i < 300; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // 2 files in L0 + ASSERT_EQ("2", FilesPerLevel(0)); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 2; + ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + // 2 files in L2 + ASSERT_EQ("0,0,2", FilesPerLevel(0)); + + // file 3 [ 0 => 200] + for (int32_t i = 0; i < 200; i++) { + values[i] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(i), values[i])); + } + ASSERT_OK(Flush()); + + // Many files 4 [300 => 4300) + for (int32_t i = 0; i <= 5; i++) { + for (int32_t j = 300; j < 4300; j++) { + if (j == 2300) { + ASSERT_OK(Flush()); + dbfull()->TEST_WaitForFlushMemTable(); + } + values[j] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(j), values[j])); + } + } + ASSERT_OK(Flush()); + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + + // Verify level sizes + uint64_t target_size = 4 * options.max_bytes_for_level_base; + for (int32_t i = 1; i < options.num_levels; i++) { + ASSERT_LE(SizeAtLevel(i), target_size); + target_size *= options.max_bytes_for_level_multiplier; + } + + int32_t old_num_files = CountFiles(); + std::string begin_string = Key(1000); + std::string end_string = Key(2000); + Slice begin(begin_string); + Slice end(end_string); + ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end)); + + int32_t deleted_count = 0; + for (int32_t i = 0; i < 4300; i++) { + if (i < 1000 || i > 2000) { + ASSERT_EQ(Get(Key(i)), values[i]); + } else { + ReadOptions roptions; + std::string result; + Status s = db_->Get(roptions, Key(i), &result); + ASSERT_TRUE(s.IsNotFound() || s.ok()); + if (s.IsNotFound()) { + deleted_count++; + } + } + } + ASSERT_GT(deleted_count, 0); + + ASSERT_OK( + DeleteFilesInRange(db_, db_->DefaultColumnFamily(), nullptr, nullptr)); + + int32_t deleted_count2 = 0; + for (int32_t i = 0; i < 4300; i++) { + ReadOptions roptions; + std::string result; + Status s = db_->Get(roptions, Key(i), &result); + ASSERT_TRUE(s.IsNotFound()); + deleted_count2++; + } + ASSERT_GT(deleted_count2, deleted_count); + int32_t new_num_files = CountFiles(); + ASSERT_GT(old_num_files, new_num_files); +} + TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) { int32_t trivial_move = 0; int32_t non_trivial_move = 0; diff --git a/db/db_impl.cc b/db/db_impl.cc index add64668f..dd6519fba 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -5007,6 +5007,76 @@ Status DBImpl::DeleteFile(std::string name) { return status; } +Status DBImpl::DeleteFilesInRange(ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) { + Status status; + auto cfh = reinterpret_cast(column_family); + ColumnFamilyData* cfd = cfh->cfd(); + VersionEdit edit; + JobContext job_context(next_job_id_.fetch_add(1), true); + { + InstrumentedMutexLock l(&mutex_); + + auto* vstorage = cfd->current()->storage_info(); + for (int i = 0; i < cfd->NumberLevels(); i++) { + if (vstorage->LevelFiles(i).empty() || + !vstorage->OverlapInLevel(i, begin, end)) { + continue; + } + std::vector level_files; + InternalKey begin_storage, end_storage, *begin_key, *end_key; + if (begin == nullptr) { + begin_key = nullptr; + } else { + begin_storage.SetMaxPossibleForUserKey(*begin); + begin_key = &begin_storage; + } + if (end == nullptr) { + end_key = nullptr; + } else { + end_storage.SetMinPossibleForUserKey(*end); + end_key = &end_storage; + } + + vstorage->GetOverlappingInputs(i, begin_key, end_key, &level_files, -1, + nullptr, false); + for (const auto* level_file : level_files) { + if (((begin == nullptr) || + (cfd->internal_comparator().user_comparator()->Compare( + level_file->smallest.user_key(), *begin) >= 0)) && + ((end == nullptr) || + (cfd->internal_comparator().user_comparator()->Compare( + level_file->largest.user_key(), *end) <= 0))) { + if (level_file->being_compacted) { + continue; + } + edit.SetColumnFamily(cfd->GetID()); + edit.DeleteFile(i, level_file->fd.GetNumber()); + } + } + } + if (edit.GetDeletedFiles().empty()) { + return Status::OK(); + } + status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit, &mutex_, directories_.GetDbDir()); + if (status.ok()) { + InstallSuperVersionAndScheduleWorkWrapper( + cfd, &job_context, *cfd->GetLatestMutableCFOptions()); + } + FindObsoleteFiles(&job_context, false); + } // lock released here + + LogFlush(db_options_.info_log); + // remove files outside the db-lock + if (job_context.HaveSomethingToDelete()) { + // Call PurgeObsoleteFiles() without holding mutex. + PurgeObsoleteFiles(job_context); + } + job_context.Clean(); + return status; +} + void DBImpl::GetLiveFilesMetaData(std::vector* metadata) { InstrumentedMutexLock l(&mutex_); versions_->GetLiveFilesMetaData(metadata); diff --git a/db/db_impl.h b/db/db_impl.h index a6653b091..683fd49dc 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -188,6 +188,8 @@ class DBImpl : public DB { const TransactionLogIterator::ReadOptions& read_options = TransactionLogIterator::ReadOptions()) override; virtual Status DeleteFile(std::string name) override; + Status DeleteFilesInRange(ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end); virtual void GetLiveFilesMetaData( std::vector* metadata) override; diff --git a/include/rocksdb/convenience.h b/include/rocksdb/convenience.h index 7a65b1388..f9111b4e3 100644 --- a/include/rocksdb/convenience.h +++ b/include/rocksdb/convenience.h @@ -7,6 +7,7 @@ #include #include +#include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/table.h" @@ -92,8 +93,15 @@ Status GetMemTableRepFactoryFromString( Status GetOptionsFromString(const Options& base_options, const std::string& opts_str, Options* new_options); -/// Request stopping background work, if wait is true wait until it's done +// Request stopping background work, if wait is true wait until it's done void CancelAllBackgroundWork(DB* db, bool wait = false); + +// Delete files which are entirely in the given range +// Could leave some keys in the range which are in files which are not +// entirely in the range. +// Snapshots before the delete might not see the data in the given range. +Status DeleteFilesInRange(DB* db, ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end); #endif // ROCKSDB_LITE } // namespace rocksdb