Add missing range conflict check between file ingestion and RefitLevel() (#10988)

Summary:
**Context:**
File ingestion never checks whether the key range it acts on overlaps with an ongoing RefitLevel() (used in `CompactRange()` with `change_level=true`). That's because RefitLevel() doesn't register and make its key range known to file ingestion. Though it checks overlapping with other compactions by https://github.com/facebook/rocksdb/blob/7.8.fb/db/external_sst_file_ingestion_job.cc#L998.

RefitLevel() (used in `CompactRange()` with `change_level=true`) doesn't check whether the key range it acts on overlaps with an ongoing file ingestion. That's because file ingestion does not register and make its key range known to other compactions.
- Note that non-refitlevel-compaction (e.g, manual compaction w/o RefitLevel() or general compaction) also does not check key range overlap with ongoing file ingestion for the same reason.
- But it's fine. Credited to cbi42's discovery, `WaitForIngestFile` was called by background and foreground compactions. They were introduced in 0f88160f67, 5c64fb67d2 and 87dfc1d23e.
- Regardless, this PR registers file ingestion like a compaction is a general approach that will also add range conflict check between file ingestion and non-refitlevel-compaction, though it has not been the issue motivated this PR.

Above are bugs resulting in two bad consequences:
- If file ingestion and RefitLevel() creates files in the same level, then range-overlapped files will be created at that level and caught as corruption by `force_consistency_checks=true`
- If file ingestion and RefitLevel() creates file in different levels, then with one further compaction on the ingested file, it can result in two same keys both with seqno 0 in two different levels. Then with iterator's [optimization](c62f322169/db/db_iter.cc (L342-L343)) that assumes no two same keys both with seqno 0, it will either break this assertion in debug build or, even worst, return value of this same key for the key after it, which is the wrong value to return, in release build.

Therefore we decide to introduce range conflict check for file ingestion and RefitLevel() inspired from the existing range conflict check among compactions.

**Summary:**
- Treat file ingestion job and RefitLevel() as `Compaction` of new compaction reasons: `CompactionReason::kExternalSstIngestion` and `CompactionReason::kRefitLevel` and register/unregister them.  File ingestion is treated as compaction from L0 to different levels and RefitLevel() as compaction from source level to target level.
- Check for `RangeOverlapWithCompaction` with other ongoing compactions, `RegisterCompaction()` on this "compaction" before changing the LSM state in `VersionStorageInfo`, and `UnregisterCompaction()` after changing.
- Replace scattered fixes (0f88160f67, 5c64fb67d2 and 87dfc1d23e.) that prevents overlapping between file ingestion and non-refit-level compaction with this fix cuz those practices are easy to overlook.
- Misc: logic cleanup, see PR comments

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10988

Test Plan:
- New unit test `DBCompactionTestWithOngoingFileIngestionParam*` that failed pre-fix and passed afterwards.
- Made compatible with existing tests, see PR comments
- make check
- [Ongoing] Stress test rehearsal with normal value and aggressive CI value https://github.com/facebook/rocksdb/pull/10761

Reviewed By: cbi42

Differential Revision: D41535685

Pulled By: hx235

fbshipit-source-id: 549833a577ba1496d20a870583d4caa737da1258
main
Hui Xiao 1 year ago committed by Facebook GitHub Bot
parent cc6f323705
commit 9502856edd
  1. 1
      HISTORY.md
  2. 1
      db/column_family.cc
  3. 22
      db/compaction/compaction.cc
  4. 2
      db/compaction/compaction_job.cc
  5. 6
      db/compaction/compaction_picker.cc
  6. 28
      db/compaction/compaction_picker_level.cc
  7. 5
      db/db_bloom_filter_test.cc
  8. 225
      db/db_compaction_test.cc
  9. 17
      db/db_impl/db_impl.cc
  10. 8
      db/db_impl/db_impl.h
  11. 78
      db/db_impl/db_impl_compaction_flush.cc
  12. 1
      db/external_sst_file_basic_test.cc
  13. 98
      db/external_sst_file_ingestion_job.cc
  14. 39
      db/external_sst_file_ingestion_job.h
  15. 124
      db/external_sst_file_test.cc
  16. 8
      include/rocksdb/listener.h
  17. 3
      include/rocksdb/options.h
  18. 16
      java/rocksjni/portal.h
  19. 18
      java/src/main/java/org/rocksdb/CompactionReason.java

@ -15,6 +15,7 @@
* Fixed a bug in LockWAL() leading to re-locking mutex (#11020).
* Fixed a heap use after free bug in async scan prefetching when the scan thread and another thread try to read and load the same seek block into cache.
* Fixed a heap use after free in async scan prefetching if dictionary compression is enabled, in which case sync read of the compression dictionary gets mixed with async prefetching
* Fixed a data race bug of `CompactRange()` under `change_level=true` acts on overlapping range with an ongoing file ingestion for level compaction. This will either result in overlapping file ranges corruption at a certain level caught by `force_consistency_checks=true` or protentially two same keys both with seqno 0 in two different levels (i.e, new data ends up in lower/older level). The latter will be caught by assertion in debug build but go silently and result in read returning wrong result in release build. This fix is general so it also replaced previous fixes to a similar problem for `CompactFiles()` (#4665), general `CompactRange()` and auto compaction (commit 5c64fb6 and 87dfc1d).
### New Features
* When an SstPartitionerFactory is configured, CompactRange() now automatically selects for compaction any files overlapping a partition boundary that is in the compaction range, even if no actual entries are in the requested compaction range. With this feature, manual compaction can be used to (re-)establish SST partition points when SstPartitioner changes, without a full compaction.

@ -1218,6 +1218,7 @@ Compaction* ColumnFamilyData::CompactRange(
if (result != nullptr) {
result->SetInputVersion(current_);
}
TEST_SYNC_POINT("ColumnFamilyData::CompactRange:Return");
return result;
}

@ -235,12 +235,19 @@ Compaction::Compaction(
inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
grandparents_(std::move(_grandparents)),
score_(_score),
bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
bottommost_level_(
// For simplicity, we don't support the concept of "bottommost level"
// with
// `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
(_compaction_reason == CompactionReason::kExternalSstIngestion ||
_compaction_reason == CompactionReason::kRefitLevel)
? false
: IsBottommostLevel(output_level_, vstorage, inputs_)),
is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
is_manual_compaction_(_manual_compaction),
trim_ts_(_trim_ts),
is_trivial_move_(false),
compaction_reason_(_compaction_reason),
notify_on_compaction_completion_(false),
enable_blob_garbage_collection_(
@ -255,8 +262,15 @@ Compaction::Compaction(
_blob_garbage_collection_age_cutoff > 1
? mutable_cf_options()->blob_garbage_collection_age_cutoff
: _blob_garbage_collection_age_cutoff),
penultimate_level_(EvaluatePenultimateLevel(
vstorage, immutable_options_, start_level_, output_level_)) {
penultimate_level_(
// For simplicity, we don't support the concept of "penultimate level"
// with `CompactionReason::kExternalSstIngestion` and
// `CompactionReason::kRefitLevel`
_compaction_reason == CompactionReason::kExternalSstIngestion ||
_compaction_reason == CompactionReason::kRefitLevel
? Compaction::kInvalidLevel
: EvaluatePenultimateLevel(vstorage, immutable_options_,
start_level_, output_level_)) {
MarkFilesBeingCompacted(true);
if (is_manual_compaction_) {
compaction_reason_ = CompactionReason::kManualCompaction;

@ -99,6 +99,8 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) {
return "ForcedBlobGC";
case CompactionReason::kRoundRobinTtl:
return "RoundRobinTtl";
case CompactionReason::kRefitLevel:
return "RefitLevel";
case CompactionReason::kNumOfReasons:
// fall through
default:

@ -1126,7 +1126,11 @@ void CompactionPicker::RegisterCompaction(Compaction* c) {
c->output_level() == 0 ||
!FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level(),
c->GetPenultimateLevel()));
if (c->start_level() == 0 ||
// CompactionReason::kExternalSstIngestion's start level is just a placeholder
// number without actual meaning as file ingestion technically does not have
// an input level like other compactions
if ((c->start_level() == 0 &&
c->compaction_reason() != CompactionReason::kExternalSstIngestion) ||
ioptions_.compaction_style == kCompactionStyleUniversal) {
level0_compactions_in_progress_.insert(c);
}

@ -447,21 +447,21 @@ bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() {
compaction_inputs_.push_back(output_level_inputs_);
}
// In some edge cases we could pick a compaction that will be compacting
// a key range that overlap with another running compaction, and both
// of them have the same output level. This could happen if
// (1) we are running a non-exclusive manual compaction
// (2) AddFile ingest a new file into the LSM tree
// We need to disallow this from happening.
if (compaction_picker_->FilesRangeOverlapWithCompaction(
compaction_inputs_, output_level_,
Compaction::EvaluatePenultimateLevel(
vstorage_, ioptions_, start_level_, output_level_))) {
// This compaction output could potentially conflict with the output
// of a currently running compaction, we cannot run it.
return false;
}
if (!is_l0_trivial_move_) {
// In some edge cases we could pick a compaction that will be compacting
// a key range that overlap with another running compaction, and both
// of them have the same output level. This could happen if
// (1) we are running a non-exclusive manual compaction
// (2) AddFile ingest a new file into the LSM tree
// We need to disallow this from happening.
if (compaction_picker_->FilesRangeOverlapWithCompaction(
compaction_inputs_, output_level_,
Compaction::EvaluatePenultimateLevel(
vstorage_, ioptions_, start_level_, output_level_))) {
// This compaction output could potentially conflict with the output
// of a currently running compaction, we cannot run it.
return false;
}
compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_,
output_level_inputs_, &grandparents_);
}

@ -1229,7 +1229,7 @@ TEST_P(ChargeFilterConstructionTestWithParam, Basic) {
*
* The test is designed in a way such that the reservation for (p1 - b')
* will trigger at least another dummy entry insertion
* (or equivelantly to saying, creating another peak).
* (or equivalently to saying, creating another peak).
*
* kStandard128Ribbon + FullFilter +
* detect_filter_construct_corruption
@ -2618,8 +2618,7 @@ TEST_F(DBBloomFilterTest, OptimizeFiltersForHits) {
BottommostLevelCompaction::kSkip;
compact_options.change_level = true;
compact_options.target_level = 7;
ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)
.IsNotSupported());
ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
ASSERT_EQ(trivial_move, 1);
ASSERT_EQ(non_trivial_move, 0);

@ -6245,6 +6245,231 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
Close();
}
class DBCompactionTestWithOngoingFileIngestionParam
: public DBCompactionTest,
public testing::WithParamInterface<std::string> {
public:
DBCompactionTestWithOngoingFileIngestionParam() : DBCompactionTest() {
compaction_path_to_test_ = GetParam();
}
void SetupOptions() {
options_ = CurrentOptions();
options_.create_if_missing = true;
if (compaction_path_to_test_ == "RefitLevelCompactRange") {
options_.num_levels = 7;
} else {
options_.num_levels = 3;
}
options_.compaction_style = CompactionStyle::kCompactionStyleLevel;
if (compaction_path_to_test_ == "AutoCompaction") {
options_.disable_auto_compactions = false;
options_.level0_file_num_compaction_trigger = 1;
} else {
options_.disable_auto_compactions = true;
}
}
void PauseCompactionThread() {
sleeping_task_.reset(new test::SleepingBackgroundTask());
env_->SetBackgroundThreads(1, Env::LOW);
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
sleeping_task_.get(), Env::Priority::LOW);
sleeping_task_->WaitUntilSleeping();
}
void ResumeCompactionThread() {
if (sleeping_task_) {
sleeping_task_->WakeUp();
sleeping_task_->WaitUntilDone();
}
}
void SetupFilesToForceFutureFilesIngestedToCertainLevel() {
SstFileWriter sst_file_writer(EnvOptions(), options_);
std::string dummy = dbname_ + "/dummy.sst";
ASSERT_OK(sst_file_writer.Open(dummy));
ASSERT_OK(sst_file_writer.Put("k2", "dummy"));
ASSERT_OK(sst_file_writer.Finish());
ASSERT_OK(db_->IngestExternalFile({dummy}, IngestExternalFileOptions()));
// L2 is made to contain a file overlapped with files to be ingested in
// later steps on key "k2". This will force future files ingested to L1 or
// above.
ASSERT_EQ("0,0,1", FilesPerLevel(0));
}
void SetupSyncPoints() {
if (compaction_path_to_test_ == "AutoCompaction") {
SyncPoint::GetInstance()->SetCallBack(
"ExternalSstFileIngestionJob::Run", [&](void*) {
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BackgroundCompaction():AfterPickCompaction",
"VersionSet::LogAndApply:WriteManifest"}});
});
} else if (compaction_path_to_test_ == "NonRefitLevelCompactRange") {
SyncPoint::GetInstance()->SetCallBack(
"ExternalSstFileIngestionJob::Run", [&](void*) {
SyncPoint::GetInstance()->LoadDependency(
{{"ColumnFamilyData::CompactRange:Return",
"VersionSet::LogAndApply:WriteManifest"}});
});
} else if (compaction_path_to_test_ == "RefitLevelCompactRange") {
SyncPoint::GetInstance()->SetCallBack(
"ExternalSstFileIngestionJob::Run", [&](void*) {
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactRange:PostRefitLevel",
"VersionSet::LogAndApply:WriteManifest"}});
});
} else if (compaction_path_to_test_ == "CompactFiles") {
SyncPoint::GetInstance()->SetCallBack(
"ExternalSstFileIngestionJob::Run", [&](void*) {
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::CompactFilesImpl::PostSanitizeCompactionInputFiles",
"VersionSet::LogAndApply:WriteManifest"}});
});
} else {
assert(false);
}
SyncPoint::GetInstance()->LoadDependency(
{{"ExternalSstFileIngestionJob::Run", "PreCompaction"}});
SyncPoint::GetInstance()->EnableProcessing();
}
void RunCompactionOverlappedWithFileIngestion() {
if (compaction_path_to_test_ == "AutoCompaction") {
TEST_SYNC_POINT("PreCompaction");
ResumeCompactionThread();
// Without proper range conflict check,
// this would have been `Status::Corruption` about overlapping ranges
Status s = dbfull()->TEST_WaitForCompact();
EXPECT_OK(s);
} else if (compaction_path_to_test_ == "NonRefitLevelCompactRange") {
CompactRangeOptions cro;
cro.change_level = false;
std::string start_key = "k1";
Slice start(start_key);
std::string end_key = "k4";
Slice end(end_key);
TEST_SYNC_POINT("PreCompaction");
// Without proper range conflict check,
// this would have been `Status::Corruption` about overlapping ranges
Status s = dbfull()->CompactRange(cro, &start, &end);
EXPECT_OK(s);
} else if (compaction_path_to_test_ == "RefitLevelCompactRange") {
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 5;
std::string start_key = "k1";
Slice start(start_key);
std::string end_key = "k4";
Slice end(end_key);
TEST_SYNC_POINT("PreCompaction");
Status s = dbfull()->CompactRange(cro, &start, &end);
// Without proper range conflict check,
// this would have been `Status::Corruption` about overlapping ranges
// To see this, remove the fix AND replace
// `DBImpl::CompactRange:PostRefitLevel` in sync point dependency with
// `DBImpl::ReFitLevel:PostRegisterCompaction`
EXPECT_TRUE(s.IsNotSupported());
EXPECT_TRUE(s.ToString().find("some ongoing compaction's output") !=
std::string::npos);
} else if (compaction_path_to_test_ == "CompactFiles") {
ColumnFamilyMetaData cf_meta_data;
db_->GetColumnFamilyMetaData(&cf_meta_data);
ASSERT_EQ(cf_meta_data.levels[0].files.size(), 1);
std::vector<std::string> input_files;
for (const auto& file : cf_meta_data.levels[0].files) {
input_files.push_back(file.name);
}
TEST_SYNC_POINT("PreCompaction");
Status s = db_->CompactFiles(CompactionOptions(), input_files, 1);
// Without proper range conflict check,
// this would have been `Status::Corruption` about overlapping ranges
EXPECT_TRUE(s.IsAborted());
EXPECT_TRUE(
s.ToString().find(
"A running compaction is writing to the same output level") !=
std::string::npos);
} else {
assert(false);
}
}
void DisableSyncPoints() {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
protected:
std::string compaction_path_to_test_;
Options options_;
std::shared_ptr<test::SleepingBackgroundTask> sleeping_task_;
};
INSTANTIATE_TEST_CASE_P(DBCompactionTestWithOngoingFileIngestionParam,
DBCompactionTestWithOngoingFileIngestionParam,
::testing::Values("AutoCompaction",
"NonRefitLevelCompactRange",
"RefitLevelCompactRange",
"CompactFiles"));
TEST_P(DBCompactionTestWithOngoingFileIngestionParam, RangeConflictCheck) {
SetupOptions();
DestroyAndReopen(options_);
if (compaction_path_to_test_ == "AutoCompaction") {
PauseCompactionThread();
}
if (compaction_path_to_test_ != "RefitLevelCompactRange") {
SetupFilesToForceFutureFilesIngestedToCertainLevel();
}
// Create s1
ASSERT_OK(Put("k1", "v"));
ASSERT_OK(Put("k4", "v"));
ASSERT_OK(Flush());
if (compaction_path_to_test_ == "RefitLevelCompactRange") {
MoveFilesToLevel(6 /* level */);
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel(0));
} else {
ASSERT_EQ("1,0,1", FilesPerLevel(0));
}
// To coerce following sequence of events
// Timeline Thread 1 (Ingest s2) Thread 2 (Compact s1)
// t0 | Decide to output to Lk
// t1 | Release lock in LogAndApply()
// t2 | Acquire lock
// t3 | Decides to compact to Lk
// | Expected to fail due to range
// | conflict check with file
// | ingestion
// t4 | Release lock in LogAndApply()
// t5 | Acquire lock again and finish
// t6 | Acquire lock again and finish
SetupSyncPoints();
// Ingest s2
port::Thread thread1([&] {
SstFileWriter sst_file_writer(EnvOptions(), options_);
std::string s2 = dbname_ + "/ingested_s2.sst";
ASSERT_OK(sst_file_writer.Open(s2));
ASSERT_OK(sst_file_writer.Put("k2", "v2"));
ASSERT_OK(sst_file_writer.Put("k3", "v2"));
ASSERT_OK(sst_file_writer.Finish());
ASSERT_OK(db_->IngestExternalFile({s2}, IngestExternalFileOptions()));
});
// Compact s1. Without proper range conflict check,
// this will encounter overlapping file corruption.
port::Thread thread2([&] { RunCompactionOverlappedWithFileIngestion(); });
thread1.join();
thread2.join();
DisableSyncPoints();
}
TEST_F(DBCompactionTest, ConsistencyFailTest) {
Options options = CurrentOptions();
options.force_consistency_checks = true;

@ -5199,8 +5199,9 @@ Status DBImpl::IngestExternalFiles(
for (const auto& arg : args) {
auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
ingestion_jobs.emplace_back(versions_.get(), cfd, immutable_db_options_,
file_options_, &snapshots_, arg.options,
&directories_, &event_logger_, io_tracer_);
mutable_db_options_, file_options_, &snapshots_,
arg.options, &directories_, &event_logger_,
io_tracer_);
}
// TODO(yanqin) maybe make jobs run in parallel
@ -5333,6 +5334,7 @@ Status DBImpl::IngestExternalFiles(
if (!status.ok()) {
break;
}
ingestion_jobs[i].RegisterRange();
}
}
if (status.ok()) {
@ -5388,6 +5390,10 @@ Status DBImpl::IngestExternalFiles(
}
}
for (auto& job : ingestion_jobs) {
job.UnregisterRange();
}
if (status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd =
@ -5759,13 +5765,6 @@ void DBImpl::NotifyOnExternalFileIngested(
}
}
void DBImpl::WaitForIngestFile() {
mutex_.AssertHeld();
while (num_running_ingest_file_ > 0) {
bg_cv_.Wait();
}
}
Status DBImpl::StartTrace(const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
InstrumentedMutexLock lock(&trace_mutex_);

@ -2023,14 +2023,6 @@ class DBImpl : public DB {
const int output_level, int output_path_id,
JobContext* job_context, LogBuffer* log_buffer,
CompactionJobInfo* compaction_job_info);
// Wait for current IngestExternalFile() calls to finish.
// REQUIRES: mutex_ held
void WaitForIngestFile();
#else
// IngestExternalFile is not supported in ROCKSDB_LITE so this function
// will be no-op
void WaitForIngestFile() {}
#endif // ROCKSDB_LITE
ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);

@ -1249,6 +1249,12 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[RefitLevel] waiting for background threads to stop");
// TODO(hx235): remove `Enable/DisableManualCompaction` and
// `Continue/PauseBackgroundWork` once we ensure registering RefitLevel()'s
// range is sufficient (if not, what else is needed) for avoiding range
// conflicts with other activities (e.g, compaction, flush) that are
// currently avoided by `Enable/DisableManualCompaction` and
// `Continue/PauseBackgroundWork`.
DisableManualCompaction();
s = PauseBackgroundWork();
if (s.ok()) {
@ -1313,13 +1319,6 @@ Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
const_cast<std::atomic<int>*>(&manual_compaction_paused_)));
{
InstrumentedMutexLock l(&mutex_);
// This call will unlock/lock the mutex to wait for current running
// IngestExternalFile() calls to finish.
WaitForIngestFile();
// We need to get current after `WaitForIngestFile`, because
// `IngestExternalFile` may add files that overlap with `input_file_names`
auto* current = cfd->current();
current->Ref();
@ -1398,6 +1397,7 @@ Status DBImpl::CompactFilesImpl(
Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
&input_set, cf_meta, output_level);
TEST_SYNC_POINT("DBImpl::CompactFilesImpl::PostSanitizeCompactionInputFiles");
if (!s.ok()) {
return s;
}
@ -1691,6 +1691,10 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
InstrumentedMutexLock guard_lock(&mutex_);
auto* vstorage = cfd->current()->storage_info();
if (vstorage->LevelFiles(level).empty()) {
return Status::OK();
}
// only allow one thread refitting
if (refitting_level_) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
@ -1706,8 +1710,16 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
}
auto* vstorage = cfd->current()->storage_info();
if (to_level != level) {
std::vector<CompactionInputFiles> input(1);
input[0].level = level;
for (auto& f : vstorage->LevelFiles(level)) {
input[0].files.push_back(f);
}
InternalKey refit_level_smallest;
InternalKey refit_level_largest;
cfd->compaction_picker()->GetRange(input[0], &refit_level_smallest,
&refit_level_largest);
if (to_level > level) {
if (level == 0) {
refitting_level_ = false;
@ -1721,6 +1733,14 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
return Status::NotSupported(
"Levels between source and target are not empty for a move.");
}
if (cfd->RangeOverlapWithCompaction(refit_level_smallest.user_key(),
refit_level_largest.user_key(),
l)) {
refitting_level_ = false;
return Status::NotSupported(
"Levels between source and target "
"will have some ongoing compaction's output.");
}
}
} else {
// to_level < level
@ -1731,12 +1751,39 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
return Status::NotSupported(
"Levels between source and target are not empty for a move.");
}
if (cfd->RangeOverlapWithCompaction(refit_level_smallest.user_key(),
refit_level_largest.user_key(),
l)) {
refitting_level_ = false;
return Status::NotSupported(
"Levels between source and target "
"will have some ongoing compaction's output.");
}
}
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Before refitting:\n%s", cfd->GetName().c_str(),
cfd->current()->DebugString().data());
std::unique_ptr<Compaction> c(new Compaction(
vstorage, *cfd->ioptions(), mutable_cf_options, mutable_db_options_,
{input}, to_level,
MaxFileSizeForLevel(
mutable_cf_options, to_level,
cfd->ioptions()
->compaction_style) /* output file size limit, not applicable */
,
LLONG_MAX /* max compaction bytes, not applicable */,
0 /* output path ID, not applicable */, mutable_cf_options.compression,
mutable_cf_options.compression_opts, Temperature::kUnknown,
0 /* max_subcompactions, not applicable */,
{} /* grandparents, not applicable */, false /* is manual */,
"" /* trim_ts */, -1 /* score, not applicable */,
false /* is deletion compaction, not applicable */,
false /* l0_files_might_overlap, not applicable */,
CompactionReason::kRefitLevel));
cfd->compaction_picker()->RegisterCompaction(c.get());
TEST_SYNC_POINT("DBImpl::ReFitLevel:PostRegisterCompaction");
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
@ -1757,6 +1804,9 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
Status status = versions_->LogAndApply(cfd, mutable_cf_options, &edit,
&mutex_, directories_.GetDbDir());
cfd->compaction_picker()->UnregisterCompaction(c.get());
c.reset();
InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
@ -1973,9 +2023,6 @@ Status DBImpl::RunManualCompaction(
manual.begin, manual.end, &manual.manual_end, &manual_conflict,
max_file_num_to_ignore, trim_ts)) == nullptr &&
manual_conflict))) {
// exclusive manual compactions should not see a conflict during
// CompactRange
assert(!exclusive || !manual_conflict);
// Running either this or some other manual compaction
bg_cv_.Wait();
if (manual_compaction_paused_ > 0 && scheduled && !unscheduled) {
@ -3004,10 +3051,6 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
{
InstrumentedMutexLock l(&mutex_);
// This call will unlock/lock the mutex to wait for current running
// IngestExternalFile() calls to finish.
WaitForIngestFile();
num_running_compactions_++;
std::unique_ptr<std::list<uint64_t>::iterator>
@ -3649,11 +3692,6 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
}
bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
if (num_running_ingest_file_ > 0) {
// We need to wait for other IngestExternalFile() calls to finish
// before running a manual compaction.
return true;
}
if (m->exclusive) {
return (bg_bottom_compaction_scheduled_ > 0 ||
bg_compaction_scheduled_ > 0);

@ -800,6 +800,7 @@ TEST_P(ExternalSSTFileBasicTest, IngestFileWithMultipleValueType) {
bool verify_checksums_before_ingest = std::get<1>(GetParam());
do {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.merge_operator.reset(new TestPutOperator());
DestroyAndReopen(options);
std::map<std::string, std::string> true_data;

@ -477,9 +477,82 @@ Status ExternalSstFileIngestionJob::Run() {
f_metadata.temperature = f.file_temperature;
edit_.AddFile(f.picked_level, f_metadata);
}
CreateEquivalentFileIngestingCompactions();
return status;
}
void ExternalSstFileIngestionJob::CreateEquivalentFileIngestingCompactions() {
// A map from output level to input of compactions equivalent to this
// ingestion job.
// TODO: simplify below logic to creating compaction per ingested file
// instead of per output level, once we figure out how to treat ingested files
// with adjacent range deletion tombstones to same output level in the same
// job as non-overlapping compactions.
std::map<int, CompactionInputFiles>
output_level_to_file_ingesting_compaction_input;
for (const auto& pair : edit_.GetNewFiles()) {
int output_level = pair.first;
const FileMetaData& f_metadata = pair.second;
CompactionInputFiles& input =
output_level_to_file_ingesting_compaction_input[output_level];
if (input.files.empty()) {
// Treat the source level of ingested files to be level 0
input.level = 0;
}
compaction_input_metdatas_.push_back(new FileMetaData(f_metadata));
input.files.push_back(compaction_input_metdatas_.back());
}
for (const auto& pair : output_level_to_file_ingesting_compaction_input) {
int output_level = pair.first;
const CompactionInputFiles& input = pair.second;
const auto& mutable_cf_options = *(cfd_->GetLatestMutableCFOptions());
file_ingesting_compactions_.push_back(new Compaction(
cfd_->current()->storage_info(), *cfd_->ioptions(), mutable_cf_options,
mutable_db_options_, {input}, output_level,
MaxFileSizeForLevel(
mutable_cf_options, output_level,
cfd_->ioptions()->compaction_style) /* output file size
limit,
* not applicable
*/
,
LLONG_MAX /* max compaction bytes, not applicable */,
0 /* output path ID, not applicable */, mutable_cf_options.compression,
mutable_cf_options.compression_opts, Temperature::kUnknown,
0 /* max_subcompaction, not applicable */,
{} /* grandparents, not applicable */, false /* is manual */,
"" /* trim_ts */, -1 /* score, not applicable */,
false /* is deletion compaction, not applicable */,
files_overlap_ /* l0_files_might_overlap, not applicable */,
CompactionReason::kExternalSstIngestion));
}
}
void ExternalSstFileIngestionJob::RegisterRange() {
for (const auto& c : file_ingesting_compactions_) {
cfd_->compaction_picker()->RegisterCompaction(c);
}
}
void ExternalSstFileIngestionJob::UnregisterRange() {
for (const auto& c : file_ingesting_compactions_) {
cfd_->compaction_picker()->UnregisterCompaction(c);
delete c;
}
file_ingesting_compactions_.clear();
for (const auto& f : compaction_input_metdatas_) {
delete f;
}
compaction_input_metdatas_.clear();
}
void ExternalSstFileIngestionJob::UpdateStats() {
// Update internal stats for new ingested files
uint64_t total_keys = 0;
@ -798,8 +871,16 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
if (lvl > 0 && lvl < vstorage->base_level()) {
continue;
}
if (vstorage->NumLevelFiles(lvl) > 0) {
if (cfd_->RangeOverlapWithCompaction(
file_to_ingest->smallest_internal_key.user_key(),
file_to_ingest->largest_internal_key.user_key(), lvl)) {
// We must use L0 or any level higher than `lvl` to be able to overwrite
// the compaction output keys that we overlap with in this level, We also
// need to assign this file a seqno to overwrite the compaction output
// keys in level `lvl`
overlap_with_db = true;
break;
} else if (vstorage->NumLevelFiles(lvl) > 0) {
bool overlap_with_level = false;
status = sv->current->OverlapWithLevelIterator(
ro, env_options_, file_to_ingest->smallest_internal_key.user_key(),
@ -856,6 +937,7 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
target_level < cfd_->NumberLevels() - 1) {
status = Status::TryAgain(
"Files cannot be ingested to Lmax. Please make sure key range of Lmax "
"and ongoing compaction's output to Lmax"
"does not overlap with files to ingest.");
return status;
}
@ -873,7 +955,7 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
IngestedFileInfo* file_to_ingest) {
auto* vstorage = cfd_->current()->storage_info();
// first check if new files fit in the bottommost level
// First, check if new files fit in the bottommost level
int bottom_lvl = cfd_->NumberLevels() - 1;
if (!IngestedFileFitInLevel(file_to_ingest, bottom_lvl)) {
return Status::InvalidArgument(
@ -881,7 +963,7 @@ Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
"at the bottommost level!");
}
// second check if despite allow_ingest_behind=true we still have 0 seqnums
// Second, check if despite allow_ingest_behind=true we still have 0 seqnums
// at some upper level
for (int lvl = 0; lvl < cfd_->NumberLevels() - 1; lvl++) {
for (auto file : vstorage->LevelFiles(lvl)) {
@ -997,14 +1079,8 @@ bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
// add it to this level
return false;
}
if (cfd_->RangeOverlapWithCompaction(file_smallest_user_key,
file_largest_user_key, level)) {
// File overlap with a running compaction output that will be stored
// in this level, we cannot add this file to this level
return false;
}
// File did not overlap with level files, our compaction output
// File did not overlap with level files, nor compaction output
return true;
}

@ -11,6 +11,7 @@
#include "db/column_family.h"
#include "db/internal_stats.h"
#include "db/snapshot_impl.h"
#include "db/version_edit.h"
#include "env/file_system_tracer.h"
#include "logging/event_logger.h"
#include "options/db_options.h"
@ -78,7 +79,8 @@ class ExternalSstFileIngestionJob {
public:
ExternalSstFileIngestionJob(
VersionSet* versions, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options, const EnvOptions& env_options,
const ImmutableDBOptions& db_options,
const MutableDBOptions& mutable_db_options, const EnvOptions& env_options,
SnapshotList* db_snapshots,
const IngestExternalFileOptions& ingestion_options,
Directories* directories, EventLogger* event_logger,
@ -88,6 +90,7 @@ class ExternalSstFileIngestionJob {
versions_(versions),
cfd_(cfd),
db_options_(db_options),
mutable_db_options_(mutable_db_options),
env_options_(env_options),
db_snapshots_(db_snapshots),
ingestion_options_(ingestion_options),
@ -99,6 +102,17 @@ class ExternalSstFileIngestionJob {
assert(directories != nullptr);
}
~ExternalSstFileIngestionJob() {
for (const auto& c : file_ingesting_compactions_) {
cfd_->compaction_picker()->UnregisterCompaction(c);
delete c;
}
for (const auto& f : compaction_input_metdatas_) {
delete f;
}
}
// Prepare the job by copying external files into the DB.
Status Prepare(const std::vector<std::string>& external_files_paths,
const std::vector<std::string>& files_checksums,
@ -120,6 +134,15 @@ class ExternalSstFileIngestionJob {
// REQUIRES: Mutex held
Status Run();
// Register key range involved in this ingestion job
// to prevent key range conflict with other ongoing compaction/file ingestion
// REQUIRES: Mutex held
void RegisterRange();
// Unregister key range registered for this ingestion job
// REQUIRES: Mutex held
void UnregisterRange();
// Update column family stats.
// REQUIRES: Mutex held
void UpdateStats();
@ -175,11 +198,17 @@ class ExternalSstFileIngestionJob {
template <typename TWritableFile>
Status SyncIngestedFile(TWritableFile* file);
// Create equivalent `Compaction` objects to this file ingestion job
// , which will be used to check range conflict with other ongoing
// compactions.
void CreateEquivalentFileIngestingCompactions();
SystemClock* clock_;
FileSystemPtr fs_;
VersionSet* versions_;
ColumnFamilyData* cfd_;
const ImmutableDBOptions& db_options_;
const MutableDBOptions& mutable_db_options_;
const EnvOptions& env_options_;
SnapshotList* db_snapshots_;
autovector<IngestedFileInfo> files_to_ingest_;
@ -196,6 +225,14 @@ class ExternalSstFileIngestionJob {
// file_checksum_gen_factory is set, DB will generate checksum each file.
bool need_generate_file_checksum_{true};
std::shared_ptr<IOTracer> io_tracer_;
// Below are variables used in (un)registering range for this ingestion job
//
// FileMetaData used in inputs of compactions equivalent to this ingestion
// job
std::vector<FileMetaData*> compaction_input_metdatas_;
// Compactions equivalent to this ingestion job
std::vector<Compaction*> file_ingesting_compactions_;
};
} // namespace ROCKSDB_NAMESPACE

@ -973,7 +973,7 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) {
do {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
std::atomic<int> thread_num(0);
std::function<void()> write_file_func = [&]() {
int file_idx = thread_num.fetch_add(1);
@ -1249,8 +1249,9 @@ TEST_P(ExternalSSTFileTest, PickedLevel) {
// This file overlaps with file 0 (L3), file 1 (L2) and the
// output of compaction going to L1
ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, -1, false, false, true,
false, false, &true_data));
ASSERT_OK(GenerateAndAddExternalFile(options, {4, 7}, -1,
true /* allow_global_seqno */, false,
true, false, false, &true_data));
EXPECT_EQ(FilesPerLevel(), "5,0,1,1");
// This file does not overlap with any file or with the running compaction
@ -1270,106 +1271,6 @@ TEST_P(ExternalSSTFileTest, PickedLevel) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(ExternalSSTFileTest, PickedLevelBug) {
env_->skip_fsync_ = true;
Options options = CurrentOptions();
options.disable_auto_compactions = false;
options.level0_file_num_compaction_trigger = 3;
options.num_levels = 2;
DestroyAndReopen(options);
std::vector<int> file_keys;
// file #1 in L0
file_keys = {0, 5, 7};
for (int k : file_keys) {
ASSERT_OK(Put(Key(k), Key(k)));
}
ASSERT_OK(Flush());
// file #2 in L0
file_keys = {4, 6, 8, 9};
for (int k : file_keys) {
ASSERT_OK(Put(Key(k), Key(k)));
}
ASSERT_OK(Flush());
// We have 2 overlapping files in L0
EXPECT_EQ(FilesPerLevel(), "2");
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::IngestExternalFile:AfterIncIngestFileCounter",
"ExternalSSTFileTest::PickedLevelBug:0"},
{"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"},
{"ExternalSSTFileTest::PickedLevelBug:2",
"DBImpl::RunManualCompaction:0"},
{"ExternalSSTFileTest::PickedLevelBug:3",
"DBImpl::RunManualCompaction:1"}});
std::atomic<bool> bg_compact_started(false);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:Start",
[&](void* /*arg*/) { bg_compact_started.store(true); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Status bg_compact_status;
Status bg_addfile_status;
{
// While writing the MANIFEST start a thread that will ask for compaction
ThreadGuard bg_compact(port::Thread([&]() {
bg_compact_status =
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
}));
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2");
// Start a thread that will ingest a new file
ThreadGuard bg_addfile(port::Thread([&]() {
file_keys = {1, 2, 3};
bg_addfile_status = GenerateAndAddExternalFile(options, file_keys, 1);
}));
// Wait for AddFile to start picking levels and writing MANIFEST
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0");
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:3");
// We need to verify that no compactions can run while AddFile is
// ingesting the files into the levels it find suitable. So we will
// wait for 2 seconds to give a chance for compactions to run during
// this period, and then make sure that no compactions where able to run
env_->SleepForMicroseconds(1000000 * 2);
bool bg_compact_started_tmp = bg_compact_started.load();
// Hold AddFile from finishing writing the MANIFEST
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:1");
// check the status at the end, so even if the ASSERT fails the threads
// could be joined and return.
ASSERT_FALSE(bg_compact_started_tmp);
}
ASSERT_OK(bg_addfile_status);
ASSERT_OK(bg_compact_status);
ASSERT_OK(dbfull()->TEST_WaitForCompact());
int total_keys = 0;
Iterator* iter = db_->NewIterator(ReadOptions());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
total_keys++;
}
ASSERT_EQ(total_keys, 10);
delete iter;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(ExternalSSTFileTest, IngestNonExistingFile) {
Options options = CurrentOptions();
DestroyAndReopen(options);
@ -1420,7 +1321,8 @@ TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) {
int range_id = 0;
std::vector<int> file_keys;
std::function<void()> bg_addfile = [&]() {
ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id));
ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id,
true /* allow_global_seqno */));
};
const int num_of_ranges = 1000;
@ -1503,8 +1405,9 @@ TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
// This file overlaps with the output of the compaction (going to L3)
// so the file will be added to L0 since L3 is the base level
ASSERT_OK(GenerateAndAddExternalFile(options, {31, 32, 33, 34}, -1, false,
false, true, false, false, &true_data));
ASSERT_OK(GenerateAndAddExternalFile(options, {31, 32, 33, 34}, -1,
true /* allow_global_seqno */, false,
true, false, false, &true_data));
EXPECT_EQ(FilesPerLevel(), "5");
// This file does not overlap with the current running compactiong
@ -1642,14 +1545,15 @@ TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():Start", [&](void* /*arg*/) {
// fit in L3 but will overlap with compaction so will be added
// to L2 but a compaction will trivially move it to L3
// and break LSM consistency
// Fit in L3 but will overlap with the compaction output so will be
// added to L2. Prior to the fix, a compaction will then trivially move
// this file to L3 and break LSM consistency
static std::atomic<bool> called = {false};
if (!called) {
called = true;
ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}}));
ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7));
ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7,
true /* allow_global_seqno */));
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

@ -140,7 +140,10 @@ enum class CompactionReason : int {
// According to the comments in flush_job.cc, RocksDB treats flush as
// a level 0 compaction in internal stats.
kFlush,
// Compaction caused by external sst file ingestion
// [InternalOnly] External sst file ingestion treated as a compaction
// with placeholder input level L0 as file ingestion
// technically does not have an input level like other compactions.
// Used only for internal stats and conflict checking with other compactions
kExternalSstIngestion,
// Compaction due to SST file being too old
kPeriodicCompaction,
@ -151,6 +154,9 @@ enum class CompactionReason : int {
// A special TTL compaction for RoundRobin policy, which basically the same as
// kLevelMaxLevelSize, but the goal is to compact TTLed files.
kRoundRobinTtl,
// [InternalOnly] DBImpl::ReFitLevel treated as a compaction,
// Used only for internal conflict checking with other compactions
kRefitLevel,
// total number of compaction reasons, new reasons must be added above this.
kNumOfReasons,
};

@ -1933,7 +1933,8 @@ struct IngestExternalFileOptions {
// that where created before the file was ingested.
bool snapshot_consistency = true;
// If set to false, IngestExternalFile() will fail if the file key range
// overlaps with existing keys or tombstones in the DB.
// overlaps with existing keys or tombstones or output of ongoing compaction
// during file ingestion in the DB.
bool allow_global_seqno = true;
// If set to false and the file key range overlaps with the memtable key range
// (memtable flush required), IngestExternalFile will fail.

@ -7181,6 +7181,16 @@ class CompactionReasonJni {
return 0x0C;
case ROCKSDB_NAMESPACE::CompactionReason::kExternalSstIngestion:
return 0x0D;
case ROCKSDB_NAMESPACE::CompactionReason::kPeriodicCompaction:
return 0x0E;
case ROCKSDB_NAMESPACE::CompactionReason::kChangeTemperature:
return 0x0F;
case ROCKSDB_NAMESPACE::CompactionReason::kForcedBlobGC:
return 0x11;
case ROCKSDB_NAMESPACE::CompactionReason::kRoundRobinTtl:
return 0x12;
case ROCKSDB_NAMESPACE::CompactionReason::kRefitLevel:
return 0x13;
default:
return 0x7F; // undefined
}
@ -7225,6 +7235,12 @@ class CompactionReasonJni {
return ROCKSDB_NAMESPACE::CompactionReason::kPeriodicCompaction;
case 0x0F:
return ROCKSDB_NAMESPACE::CompactionReason::kChangeTemperature;
case 0x11:
return ROCKSDB_NAMESPACE::CompactionReason::kForcedBlobGC;
case 0x12:
return ROCKSDB_NAMESPACE::CompactionReason::kRoundRobinTtl;
case 0x13:
return ROCKSDB_NAMESPACE::CompactionReason::kRefitLevel;
default:
// undefined/default
return ROCKSDB_NAMESPACE::CompactionReason::kUnknown;

@ -88,7 +88,23 @@ public enum CompactionReason {
/**
* Compaction in order to move files to temperature
*/
kChangeTemperature((byte) 0x0F);
kChangeTemperature((byte) 0x0F),
/**
* Compaction scheduled to force garbage collection of blob files
*/
kForcedBlobGC((byte) 0x11),
/**
* A special TTL compaction for RoundRobin policy, which basically the same as
* kLevelMaxLevelSize, but the goal is to compact TTLed files.
*/
kRoundRobinTtl((byte) 0x12),
/**
* Compaction by calling DBImpl::ReFitLevel
*/
kRefitLevel((byte) 0x13);
private final byte value;

Loading…
Cancel
Save