diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index cbd9d7a09..b2a131ecf 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -12,6 +12,7 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "util/string_util.h" +#include "util/sync_point.h" #include "util/testharness.h" namespace rocksdb { @@ -53,6 +54,62 @@ class FlushedFileCollector : public EventListener { std::mutex mutex_; }; +TEST_F(CompactFilesTest, L0ConflictsFiles) { + Options options; + // to trigger compaction more easily + const int kWriteBufferSize = 10000; + const int kLevel0Trigger = 2; + options.create_if_missing = true; + options.compaction_style = kCompactionStyleLevel; + // Small slowdown and stop trigger for experimental purpose. + options.level0_slowdown_writes_trigger = 20; + options.level0_stop_writes_trigger = 20; + options.level0_stop_writes_trigger = 20; + options.write_buffer_size = kWriteBufferSize; + options.level0_file_num_compaction_trigger = kLevel0Trigger; + options.compression = kNoCompression; + + DB* db = nullptr; + DestroyDB(db_name_, options); + Status s = DB::Open(options, db_name_, &db); + assert(s.ok()); + assert(db); + + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"CompactFilesImpl:0", "BackgroundCallCompaction:0"}, + {"BackgroundCallCompaction:1", "CompactFilesImpl:1"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // create couple files + // Background compaction starts and waits in BackgroundCallCompaction:0 + for (int i = 0; i < kLevel0Trigger * 4; ++i) { + db->Put(WriteOptions(), ToString(i), ""); + db->Put(WriteOptions(), ToString(100 - i), ""); + db->Flush(FlushOptions()); + } + + rocksdb::ColumnFamilyMetaData meta; + db->GetColumnFamilyMetaData(&meta); + std::string file1; + for (auto& file : meta.levels[0].files) { + ASSERT_EQ(0, meta.levels[0].level); + if (file1 == "") { + file1 = file.db_path + "/" + file.name; + } else { + std::string file2 = file.db_path + "/" + file.name; + // Another thread starts a compact files and creates an L0 compaction + // The background compaction then notices that there is an L0 compaction + // already in progress and doesn't do an L0 compaction + // Once the background compaction finishes, the compact files finishes + ASSERT_OK( + db->CompactFiles(rocksdb::CompactionOptions(), {file1, file2}, 0)); + break; + } + } + delete db; +} + TEST_F(CompactFilesTest, ObsoleteFiles) { Options options; // to trigger compaction more easily diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 27935085d..1d968fc06 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -255,16 +255,30 @@ Compaction* CompactionPicker::FormCompaction( const CompactionOptions& compact_options, const std::vector& input_files, int output_level, VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options, - uint32_t output_path_id) const { + uint32_t output_path_id) { uint64_t max_grandparent_overlap_bytes = output_level + 1 < vstorage->num_levels() ? mutable_cf_options.MaxGrandParentOverlapBytes(output_level + 1) : std::numeric_limits::max(); assert(input_files.size()); - return new Compaction( + + // TODO(rven ): we might be able to run concurrent level 0 compaction + // if the key ranges of the two compactions do not overlap, but for now + // we do not allow it. + if ((input_files[0].level == 0) && !level0_compactions_in_progress_.empty()) { + return nullptr; + } + auto c = new Compaction( vstorage, mutable_cf_options, input_files, output_level, compact_options.output_file_size_limit, max_grandparent_overlap_bytes, output_path_id, compact_options.compression, /* grandparents */ {}, true); + + // If it's level 0 compaction, make sure we don't execute any other level 0 + // compactions in parallel + if ((c != nullptr) && (input_files[0].level == 0)) { + level0_compactions_in_progress_.insert(c); + } + return c; } Status CompactionPicker::GetCompactionInputsFromFileNumbers( diff --git a/db/compaction_picker.h b/db/compaction_picker.h index e7d8bf6db..062fa06da 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -94,7 +94,7 @@ class CompactionPicker { const CompactionOptions& compact_options, const std::vector& input_files, int output_level, VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options, - uint32_t output_path_id) const; + uint32_t output_path_id); // Converts a set of compaction input file numbers into // a list of CompactionInputFiles. diff --git a/db/db_impl.cc b/db/db_impl.cc index 155f9096b..2f105e1a2 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1772,7 +1772,9 @@ Status DBImpl::CompactFilesImpl( c.reset(cfd->compaction_picker()->FormCompaction( compact_options, input_files, output_level, version->storage_info(), *cfd->GetLatestMutableCFOptions(), output_path_id)); - assert(c); + if (!c) { + return Status::Aborted("Another Level 0 compaction is running"); + } c->SetInputVersion(version); // deletion compaction currently not allowed in CompactFiles. assert(!c->deletion_compaction()); @@ -1801,6 +1803,8 @@ Status DBImpl::CompactFilesImpl( compaction_job.Prepare(); mutex_.Unlock(); + TEST_SYNC_POINT("CompactFilesImpl:0"); + TEST_SYNC_POINT("CompactFilesImpl:1"); compaction_job.Run(); mutex_.Lock(); @@ -2559,7 +2563,7 @@ void DBImpl::BackgroundCallFlush() { void DBImpl::BackgroundCallCompaction() { bool made_progress = false; JobContext job_context(next_job_id_.fetch_add(1), true); - + TEST_SYNC_POINT("BackgroundCallCompaction:0"); MaybeDumpStats(); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); { @@ -2571,6 +2575,7 @@ void DBImpl::BackgroundCallCompaction() { assert(bg_compaction_scheduled_); Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer); + TEST_SYNC_POINT("BackgroundCallCompaction:1"); if (!s.ok() && !s.IsShutdownInProgress()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to