From 97631357aa274d06a7ab09b3cde7b909262cc4dd Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 13 Sep 2019 14:48:18 -0700 Subject: [PATCH] Allow ingesting overlapping files (#5539) Summary: Currently IngestExternalFile() fails when its input files' ranges overlap. This condition doesn't need to hold for files that are to be ingested in L0, though. This commit allows overlapping files and forces their target level to L0. Additionally, ingest job's completion is logged to EventLogger, analogous to flush and compaction jobs. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5539 Differential Revision: D17370660 Pulled By: riversand963 fbshipit-source-id: 749a3899b17d1be267a5afd5b0a99d96b38ab2f3 --- db/db_impl/db_impl.cc | 24 +++++++------ db/external_sst_file_basic_test.cc | 41 +++++++++++++++++++++ db/external_sst_file_ingestion_job.cc | 52 +++++++++++++++++++++------ db/external_sst_file_ingestion_job.h | 17 ++++++--- 4 files changed, 108 insertions(+), 26 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 8f104f707..4bcab93a8 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3763,9 +3763,9 @@ Status DBImpl::IngestExternalFiles( std::vector ingestion_jobs; for (const auto& arg : args) { auto* cfd = static_cast(arg.column_family)->cfd(); - ingestion_jobs.emplace_back(env_, versions_.get(), cfd, - immutable_db_options_, env_options_, - &snapshots_, arg.options, &directories_); + ingestion_jobs.emplace_back( + env_, versions_.get(), cfd, immutable_db_options_, env_options_, + &snapshots_, arg.options, &directories_, &event_logger_); } std::vector> exec_results; for (size_t i = 0; i != num_cfs; ++i) { @@ -3895,19 +3895,21 @@ Status DBImpl::IngestExternalFiles( } } if (status.ok()) { - bool should_increment_last_seqno = - ingestion_jobs[0].ShouldIncrementLastSequence(); + int consumed_seqno_count = + ingestion_jobs[0].ConsumedSequenceNumbersCount(); #ifndef NDEBUG for (size_t i = 1; i != num_cfs; ++i) { - assert(should_increment_last_seqno == - ingestion_jobs[i].ShouldIncrementLastSequence()); + assert(!!consumed_seqno_count == + !!ingestion_jobs[i].ConsumedSequenceNumbersCount()); + consumed_seqno_count += + ingestion_jobs[i].ConsumedSequenceNumbersCount(); } #endif - if (should_increment_last_seqno) { + if (consumed_seqno_count > 0) { const SequenceNumber last_seqno = versions_->LastSequence(); - versions_->SetLastAllocatedSequence(last_seqno + 1); - versions_->SetLastPublishedSequence(last_seqno + 1); - versions_->SetLastSequence(last_seqno + 1); + versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count); + versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count); + versions_->SetLastSequence(last_seqno + consumed_seqno_count); } autovector cfds_to_commit; autovector mutable_cf_options_list; diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index c85d7394e..43a003a85 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -1070,6 +1070,47 @@ TEST_P(ExternalSSTFileBasicTest, IngestExternalFileWithCorruptedPropsBlock) { } while (ChangeOptionsForFileIngestionTest()); } +TEST_F(ExternalSSTFileBasicTest, OverlappingFiles) { + Options options = CurrentOptions(); + + std::vector files; + { + SstFileWriter sst_file_writer(EnvOptions(), options); + std::string file1 = sst_files_dir_ + "file1.sst"; + ASSERT_OK(sst_file_writer.Open(file1)); + ASSERT_OK(sst_file_writer.Put("a", "z")); + ASSERT_OK(sst_file_writer.Put("i", "m")); + ExternalSstFileInfo file1_info; + ASSERT_OK(sst_file_writer.Finish(&file1_info)); + files.push_back(std::move(file1)); + } + { + SstFileWriter sst_file_writer(EnvOptions(), options); + std::string file2 = sst_files_dir_ + "file2.sst"; + ASSERT_OK(sst_file_writer.Open(file2)); + ASSERT_OK(sst_file_writer.Put("i", "k")); + ExternalSstFileInfo file2_info; + ASSERT_OK(sst_file_writer.Finish(&file2_info)); + files.push_back(std::move(file2)); + } + + IngestExternalFileOptions ifo; + ASSERT_OK(db_->IngestExternalFile(files, ifo)); + ASSERT_EQ(Get("a"), "z"); + ASSERT_EQ(Get("i"), "k"); + + int total_keys = 0; + Iterator* iter = db_->NewIterator(ReadOptions()); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + total_keys++; + } + delete iter; + ASSERT_EQ(total_keys, 2); + + ASSERT_EQ(2, NumTableFilesAtLevel(0)); +} + INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest, testing::Values(std::make_tuple(true, true), std::make_tuple(true, false), diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 03bcd4240..3926d7fa9 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -71,11 +71,16 @@ Status ExternalSstFileIngestionJob::Prepare( for (size_t i = 0; i < num_files - 1; i++) { if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key, sorted_files[i + 1]->smallest_internal_key) >= 0) { - return Status::NotSupported("Files have overlapping ranges"); + files_overlap_ = true; + break; } } } + if (ingestion_options_.ingest_behind && files_overlap_) { + return Status::NotSupported("Files have overlapping ranges"); + } + for (IngestedFileInfo& f : files_to_ingest_) { if (f.num_entries == 0 && f.num_range_deletions == 0) { return Status::InvalidArgument("File contain no entries"); @@ -212,7 +217,7 @@ Status ExternalSstFileIngestionJob::Run() { } // It is safe to use this instead of LastAllocatedSequence since we are // the only active writer, and hence they are equal - const SequenceNumber last_seqno = versions_->LastSequence(); + SequenceNumber last_seqno = versions_->LastSequence(); edit_.SetColumnFamily(cfd_->GetID()); // The levels that the files will be ingested into @@ -222,8 +227,8 @@ Status ExternalSstFileIngestionJob::Run() { status = CheckLevelForIngestedBehindFile(&f); } else { status = AssignLevelAndSeqnoForIngestedFile( - super_version, force_global_seqno, cfd_->ioptions()->compaction_style, - &f, &assigned_seqno); + super_version, force_global_seqno, cfd_->ioptions()->compaction_style, + last_seqno, &f, &assigned_seqno); } if (!status.ok()) { return status; @@ -231,8 +236,10 @@ Status ExternalSstFileIngestionJob::Run() { status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno); TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run", &assigned_seqno); - if (assigned_seqno == last_seqno + 1) { - consumed_seqno_ = true; + if (assigned_seqno > last_seqno) { + assert(assigned_seqno == last_seqno + 1); + last_seqno = assigned_seqno; + ++consumed_seqno_count_; } if (!status.ok()) { return status; @@ -250,6 +257,13 @@ void ExternalSstFileIngestionJob::UpdateStats() { uint64_t total_keys = 0; uint64_t total_l0_files = 0; uint64_t total_time = env_->NowMicros() - job_start_time_; + + EventLoggerStream stream = event_logger_->Log(); + stream << "event" + << "ingest_finished"; + stream << "files_ingested"; + stream.StartArray(); + for (IngestedFileInfo& f : files_to_ingest_) { InternalStats::CompactionStats stats(CompactionReason::kExternalSstIngestion, 1); stats.micros = total_time; @@ -277,7 +291,18 @@ void ExternalSstFileIngestionJob::UpdateStats() { "(global_seqno=%" PRIu64 ")\n", f.external_file_path.c_str(), f.picked_level, f.internal_file_path.c_str(), f.assigned_seqno); + stream << "file" << f.internal_file_path << "level" << f.picked_level; } + stream.EndArray(); + + stream << "lsm_state"; + stream.StartArray(); + auto vstorage = cfd_->current()->storage_info(); + for (int level = 0; level < vstorage->num_levels(); ++level) { + stream << vstorage->NumLevelFiles(level); + } + stream.EndArray(); + cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL, total_keys); cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL, @@ -301,7 +326,8 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) { f.internal_file_path.c_str(), s.ToString().c_str()); } } - consumed_seqno_ = false; + consumed_seqno_count_ = 0; + files_overlap_ = false; } else if (status.ok() && ingestion_options_.move_files) { // The files were moved and added successfully, remove original file links for (IngestedFileInfo& f : files_to_ingest_) { @@ -479,13 +505,13 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo( Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style, - IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) { + SequenceNumber last_seqno, IngestedFileInfo* file_to_ingest, + SequenceNumber* assigned_seqno) { Status status; *assigned_seqno = 0; - const SequenceNumber last_seqno = versions_->LastSequence(); if (force_global_seqno) { *assigned_seqno = last_seqno + 1; - if (compaction_style == kCompactionStyleUniversal) { + if (compaction_style == kCompactionStyleUniversal || files_overlap_) { file_to_ingest->picked_level = 0; return status; } @@ -547,6 +573,12 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( target_level = lvl; } } + // If files overlap, we have to ingest them at level 0 and assign the newest + // sequence number + if (files_overlap_) { + target_level = 0; + *assigned_seqno = last_seqno + 1; + } TEST_SYNC_POINT_CALLBACK( "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile", &overlap_with_db); diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 4f9fac241..90b8326bb 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -12,6 +12,7 @@ #include "db/dbformat.h" #include "db/internal_stats.h" #include "db/snapshot_impl.h" +#include "logging/event_logger.h" #include "options/db_options.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -71,7 +72,7 @@ class ExternalSstFileIngestionJob { const ImmutableDBOptions& db_options, const EnvOptions& env_options, SnapshotList* db_snapshots, const IngestExternalFileOptions& ingestion_options, - Directories* directories) + Directories* directories, EventLogger* event_logger) : env_(env), versions_(versions), cfd_(cfd), @@ -80,8 +81,9 @@ class ExternalSstFileIngestionJob { db_snapshots_(db_snapshots), ingestion_options_(ingestion_options), directories_(directories), + event_logger_(event_logger), job_start_time_(env_->NowMicros()), - consumed_seqno_(false) { + consumed_seqno_count_(0) { assert(directories != nullptr); } @@ -116,8 +118,8 @@ class ExternalSstFileIngestionJob { return files_to_ingest_; } - // Whether to increment VersionSet's seqno after this job runs - bool ShouldIncrementLastSequence() const { return consumed_seqno_; } + // How many sequence numbers did we consume as part of the ingest job? + int ConsumedSequenceNumbersCount() const { return consumed_seqno_count_; } private: // Open the external file and populate `file_to_ingest` with all the @@ -132,6 +134,7 @@ class ExternalSstFileIngestionJob { Status AssignLevelAndSeqnoForIngestedFile(SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style, + SequenceNumber last_seqno, IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno); @@ -163,9 +166,13 @@ class ExternalSstFileIngestionJob { autovector files_to_ingest_; const IngestExternalFileOptions& ingestion_options_; Directories* directories_; + EventLogger* event_logger_; VersionEdit edit_; uint64_t job_start_time_; - bool consumed_seqno_; + int consumed_seqno_count_; + // Set in ExternalSstFileIngestionJob::Prepare(), if true all files are + // ingested in L0 + bool files_overlap_{false}; }; } // namespace rocksdb