From a69d4deefb63c57198eadf8786d7efd073d4bb48 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 12 Feb 2019 19:07:25 -0800 Subject: [PATCH] Atomic ingest (#4895) Summary: Make file ingestion atomic. as title. Ingesting external SST files into multiple column families should be atomic. If a crash occurs and db reopens, either all column families have successfully ingested the files before the crash, or non of the ingestions have any effect on the state of the db. Also add unit tests for atomic ingestion. Note that the unit test here does not cover the case of incomplete atomic group in the MANIFEST, which is covered in VersionSetTest already. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4895 Differential Revision: D13718245 Pulled By: riversand963 fbshipit-source-id: 7df97cc483af73ad44dd6993008f99b083852198 --- HISTORY.md | 1 + db/db_impl.cc | 350 +++++++++++++----- db/db_impl.h | 13 +- db/db_test.cc | 6 + db/external_sst_file_ingestion_job.cc | 11 +- db/external_sst_file_ingestion_job.h | 7 +- db/external_sst_file_test.cc | 450 +++++++++++++++++++++++ db/version_set.cc | 12 + include/rocksdb/db.h | 22 ++ include/rocksdb/utilities/stackable_db.h | 6 + util/fault_injection_test_env.cc | 24 ++ util/fault_injection_test_env.h | 4 + 12 files changed, 800 insertions(+), 106 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 2e5abd492..a1e690b96 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -24,6 +24,7 @@ * Remove PlainTable's store_index_in_file feature. When opening an existing DB with index in SST files, the index and bloom filter will still be rebuild while SST files are opened, in the same way as there is no index in the file. * Remove CuckooHash memtable. * The counter stat `number.block.not_compressed` now also counts blocks not compressed due to poor compression ratio. +* Support SST file ingestion across multiple column families via DB::IngestExternalFiles. See the function's comment about atomicity. ### Bug Fixes * Fix a deadlock caused by compaction and file ingestion waiting for each other in the event of write stalls. diff --git a/db/db_impl.cc b/db/db_impl.cc index d854878fe..dae6fef55 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3112,74 +3112,124 @@ Status DBImpl::IngestExternalFile( ColumnFamilyHandle* column_family, const std::vector& external_files, const IngestExternalFileOptions& ingestion_options) { - if (external_files.empty()) { - return Status::InvalidArgument("external_files is empty"); - } - - Status status; - auto cfh = reinterpret_cast(column_family); - auto cfd = cfh->cfd(); + IngestExternalFileArg arg; + arg.column_family = column_family; + arg.external_files = external_files; + arg.options = ingestion_options; + return IngestExternalFiles({arg}); +} - // Ingest should immediately fail if ingest_behind is requested, - // but the DB doesn't support it. - if (ingestion_options.ingest_behind) { - if (!immutable_db_options_.allow_ingest_behind) { +Status DBImpl::IngestExternalFiles( + const std::vector& args) { + if (args.empty()) { + return Status::InvalidArgument("ingestion arg list is empty"); + } + { + std::unordered_set unique_cfhs; + for (const auto& arg : args) { + if (arg.column_family == nullptr) { + return Status::InvalidArgument("column family handle is null"); + } else if (unique_cfhs.count(arg.column_family) > 0) { + return Status::InvalidArgument( + "ingestion args have duplicate column families"); + } + unique_cfhs.insert(arg.column_family); + } + } + // Ingest multiple external SST files atomically. + size_t num_cfs = args.size(); + for (size_t i = 0; i != num_cfs; ++i) { + if (args[i].external_files.empty()) { + char err_msg[128] = {0}; + snprintf(err_msg, 128, "external_files[%zu] is empty", i); + return Status::InvalidArgument(err_msg); + } + } + for (const auto& arg : args) { + const IngestExternalFileOptions& ingest_opts = arg.options; + if (ingest_opts.ingest_behind && + !immutable_db_options_.allow_ingest_behind) { return Status::InvalidArgument( - "Can't ingest_behind file in DB with allow_ingest_behind=false"); + "can't ingest_behind file in DB with allow_ingest_behind=false"); } } - ExternalSstFileIngestionJob ingestion_job(env_, versions_.get(), cfd, - immutable_db_options_, env_options_, - &snapshots_, ingestion_options); - - SuperVersionContext dummy_sv_ctx(/* create_superversion */ true); - VersionEdit dummy_edit; - uint64_t next_file_number = 0; + // TODO (yanqin) maybe handle the case in which column_families have + // duplicates std::list::iterator pending_output_elem; - { - InstrumentedMutexLock l(&mutex_); - if (error_handler_.IsDBStopped()) { - // Don't ingest files when there is a bg_error - return error_handler_.GetBGError(); - } - - // Make sure that bg cleanup wont delete the files that we are ingesting - pending_output_elem = CaptureCurrentFileNumberInPendingOutputs(); - - // If crash happen after a hard link established, Recover function may - // reuse the file number that has already assigned to the internal file, - // and this will overwrite the external file. To protect the external - // file, we have to make sure the file number will never being reused. - next_file_number = versions_->FetchAddFileNumber(external_files.size()); - auto cf_options = cfd->GetLatestMutableCFOptions(); - status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_, - directories_.GetDbDir()); - if (status.ok()) { - InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options); - } + size_t total = 0; + for (const auto& arg : args) { + total += arg.external_files.size(); } - dummy_sv_ctx.Clean(); + uint64_t next_file_number = 0; + Status status = ReserveFileNumbersBeforeIngestion( + static_cast(args[0].column_family)->cfd(), total, + &pending_output_elem, &next_file_number); if (!status.ok()) { InstrumentedMutexLock l(&mutex_); ReleaseFileNumberFromPendingOutputs(pending_output_elem); return status; } - SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); - status = - ingestion_job.Prepare(external_files, next_file_number, super_version); - CleanupSuperVersion(super_version); + std::vector ingestion_jobs; + for (const auto& arg : args) { + auto* cfd = static_cast(arg.column_family)->cfd(); + ingestion_jobs.emplace_back(env_, versions_.get(), cfd, + immutable_db_options_, env_options_, + &snapshots_, arg.options); + } + std::vector> exec_results; + for (size_t i = 0; i != num_cfs; ++i) { + exec_results.emplace_back(false, Status::OK()); + } + // TODO(yanqin) maybe make jobs run in parallel + for (size_t i = 1; i != num_cfs; ++i) { + uint64_t start_file_number = + next_file_number + args[i - 1].external_files.size(); + auto* cfd = + static_cast(args[i].column_family)->cfd(); + SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + exec_results[i].second = ingestion_jobs[i].Prepare( + args[i].external_files, start_file_number, super_version); + exec_results[i].first = true; + CleanupSuperVersion(super_version); + } + TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0"); + TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"); + { + auto* cfd = + static_cast(args[0].column_family)->cfd(); + SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + exec_results[0].second = ingestion_jobs[0].Prepare( + args[0].external_files, next_file_number, super_version); + exec_results[0].first = true; + CleanupSuperVersion(super_version); + } + for (const auto& exec_result : exec_results) { + if (!exec_result.second.ok()) { + status = exec_result.second; + break; + } + } if (!status.ok()) { + for (size_t i = 0; i != num_cfs; ++i) { + if (exec_results[i].first) { + ingestion_jobs[i].Cleanup(status); + } + } InstrumentedMutexLock l(&mutex_); ReleaseFileNumberFromPendingOutputs(pending_output_elem); return status; } - SuperVersionContext sv_context(/* create_superversion */ true); + std::vector sv_ctxs; + for (size_t i = 0; i != num_cfs; ++i) { + sv_ctxs.emplace_back(true /* create_superversion */); + } + TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:0"); + TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:1"); TEST_SYNC_POINT("DBImpl::AddFile:Start"); { - // Lock db mutex InstrumentedMutexLock l(&mutex_); TEST_SYNC_POINT("DBImpl::AddFile:MutexLock"); @@ -3191,55 +3241,130 @@ Status DBImpl::IngestExternalFile( nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); } - num_running_ingest_file_++; + num_running_ingest_file_ += static_cast(num_cfs); TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter"); - // We cannot ingest a file into a dropped CF - if (cfd->IsDropped()) { - status = Status::InvalidArgument( - "Cannot ingest an external file into a dropped CF"); + bool at_least_one_cf_need_flush = false; + std::vector need_flush(num_cfs, false); + for (size_t i = 0; i != num_cfs; ++i) { + auto* cfd = + static_cast(args[i].column_family)->cfd(); + if (cfd->IsDropped()) { + // TODO (yanqin) investigate whether we should abort ingestion or + // proceed with other non-dropped column families. + status = Status::InvalidArgument( + "cannot ingest an external file into a dropped CF"); + break; + } + bool tmp = false; + status = ingestion_jobs[i].NeedsFlush(&tmp, cfd->GetSuperVersion()); + need_flush[i] = tmp; + at_least_one_cf_need_flush = (at_least_one_cf_need_flush || tmp); + if (!status.ok()) { + break; + } } + TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush", + &at_least_one_cf_need_flush); - // Figure out if we need to flush the memtable first - if (status.ok()) { - bool need_flush = false; - status = ingestion_job.NeedsFlush(&need_flush, cfd->GetSuperVersion()); - TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush", - &need_flush); - if (status.ok() && need_flush) { - FlushOptions flush_opts; - flush_opts.allow_write_stall = true; - if (immutable_db_options_.atomic_flush) { - autovector cfds; - SelectColumnFamiliesForAtomicFlush(&cfds); - mutex_.Unlock(); - status = AtomicFlushMemTables(cfds, flush_opts, - FlushReason::kExternalFileIngestion, - true /* writes_stopped */); - } else { - mutex_.Unlock(); - status = FlushMemTable(cfd, flush_opts, - FlushReason::kExternalFileIngestion, - true /* writes_stopped */); - } + if (status.ok() && at_least_one_cf_need_flush) { + FlushOptions flush_opts; + flush_opts.allow_write_stall = true; + if (immutable_db_options_.atomic_flush) { + autovector cfds_to_flush; + SelectColumnFamiliesForAtomicFlush(&cfds_to_flush); + mutex_.Unlock(); + status = AtomicFlushMemTables(cfds_to_flush, flush_opts, + FlushReason::kExternalFileIngestion, + true /* writes_stopped */); mutex_.Lock(); + } else { + for (size_t i = 0; i != num_cfs; ++i) { + if (need_flush[i]) { + mutex_.Unlock(); + auto* cfd = + static_cast(args[i].column_family) + ->cfd(); + status = FlushMemTable(cfd, flush_opts, + FlushReason::kExternalFileIngestion, + true /* writes_stopped */); + mutex_.Lock(); + if (!status.ok()) { + break; + } + } + } } } - - // Run the ingestion job + // Run ingestion jobs. if (status.ok()) { - status = ingestion_job.Run(); + for (size_t i = 0; i != num_cfs; ++i) { + status = ingestion_jobs[i].Run(); + if (!status.ok()) { + break; + } + } } - - // Install job edit [Mutex will be unlocked here] - auto mutable_cf_options = cfd->GetLatestMutableCFOptions(); if (status.ok()) { + bool should_increment_last_seqno = + ingestion_jobs[0].ShouldIncrementLastSequence(); +#ifndef NDEBUG + for (size_t i = 1; i != num_cfs; ++i) { + assert(should_increment_last_seqno == + ingestion_jobs[i].ShouldIncrementLastSequence()); + } +#endif + if (should_increment_last_seqno) { + const SequenceNumber last_seqno = versions_->LastSequence(); + versions_->SetLastAllocatedSequence(last_seqno + 1); + versions_->SetLastPublishedSequence(last_seqno + 1); + versions_->SetLastSequence(last_seqno + 1); + } + autovector cfds_to_commit; + autovector mutable_cf_options_list; + autovector> edit_lists; + uint32_t num_entries = 0; + for (size_t i = 0; i != num_cfs; ++i) { + auto* cfd = + static_cast(args[i].column_family)->cfd(); + if (cfd->IsDropped()) { + continue; + } + cfds_to_commit.push_back(cfd); + mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions()); + autovector edit_list; + edit_list.push_back(ingestion_jobs[i].edit()); + edit_lists.push_back(edit_list); + ++num_entries; + } + // Mark the version edits as an atomic group + for (auto& edits : edit_lists) { + assert(edits.size() == 1); + edits[0]->MarkAtomicGroup(--num_entries); + } + assert(0 == num_entries); status = - versions_->LogAndApply(cfd, *mutable_cf_options, ingestion_job.edit(), - &mutex_, directories_.GetDbDir()); + versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list, + edit_lists, &mutex_, directories_.GetDbDir()); } + if (status.ok()) { - InstallSuperVersionAndScheduleWork(cfd, &sv_context, *mutable_cf_options); + for (size_t i = 0; i != num_cfs; ++i) { + auto* cfd = + static_cast(args[i].column_family)->cfd(); + if (!cfd->IsDropped()) { + InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i], + *cfd->GetLatestMutableCFOptions()); +#ifndef NDEBUG + if (0 == i && num_cfs > 1) { + TEST_SYNC_POINT( + "DBImpl::IngestExternalFiles:InstallSVForFirstCF:0"); + TEST_SYNC_POINT( + "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1"); + } +#endif // !NDEBUG + } + } } // Resume writes to the DB @@ -3248,30 +3373,36 @@ Status DBImpl::IngestExternalFile( } write_thread_.ExitUnbatched(&w); - // Update stats if (status.ok()) { - ingestion_job.UpdateStats(); + for (auto& job : ingestion_jobs) { + job.UpdateStats(); + } } - ReleaseFileNumberFromPendingOutputs(pending_output_elem); - - num_running_ingest_file_--; - if (num_running_ingest_file_ == 0) { + num_running_ingest_file_ -= static_cast(num_cfs); + if (0 == num_running_ingest_file_) { bg_cv_.SignalAll(); } - TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock"); } // mutex_ is unlocked here // Cleanup - sv_context.Clean(); - ingestion_job.Cleanup(status); - + for (size_t i = 0; i != num_cfs; ++i) { + sv_ctxs[i].Clean(); + // This may rollback jobs that have completed successfully. This is + // intended for atomicity. + ingestion_jobs[i].Cleanup(status); + } if (status.ok()) { - NotifyOnExternalFileIngested(cfd, ingestion_job); + for (size_t i = 0; i != num_cfs; ++i) { + auto* cfd = + static_cast(args[i].column_family)->cfd(); + if (!cfd->IsDropped()) { + NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]); + } + } } - return status; } @@ -3391,6 +3522,35 @@ Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id, return s; } +Status DBImpl::ReserveFileNumbersBeforeIngestion( + ColumnFamilyData* cfd, uint64_t num, + std::list::iterator* pending_output_elem, + uint64_t* next_file_number) { + Status s; + SuperVersionContext dummy_sv_ctx(true /* create_superversion */); + assert(nullptr != pending_output_elem); + assert(nullptr != next_file_number); + InstrumentedMutexLock l(&mutex_); + if (error_handler_.IsDBStopped()) { + // Do not ingest files when there is a bg_error + return error_handler_.GetBGError(); + } + *pending_output_elem = CaptureCurrentFileNumberInPendingOutputs(); + *next_file_number = versions_->FetchAddFileNumber(static_cast(num)); + auto cf_options = cfd->GetLatestMutableCFOptions(); + VersionEdit dummy_edit; + // If crash happen after a hard link established, Recover function may + // reuse the file number that has already assigned to the internal file, + // and this will overwrite the external file. To protect the external + // file, we have to make sure the file number will never being reused. + s = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_, + directories_.GetDbDir()); + if (s.ok()) { + InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options); + } + dummy_sv_ctx.Clean(); + return s; +} #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/db/db_impl.h b/db/db_impl.h index db6e2ed60..86acaa24d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -346,6 +346,10 @@ class DBImpl : public DB { const std::vector& external_files, const IngestExternalFileOptions& ingestion_options) override; + using DB::IngestExternalFiles; + virtual Status IngestExternalFiles( + const std::vector& args) override; + virtual Status VerifyChecksum() override; using DB::StartTrace; @@ -1588,7 +1592,14 @@ class DBImpl : public DB { const CompactionJobStats& compaction_job_stats, const int job_id, const Version* current, CompactionJobInfo* compaction_job_info) const; -#endif + // Reserve the next 'num' file numbers for to-be-ingested external SST files, + // and return the current file_number in 'next_file_number'. + // Write a version edit to the MANIFEST. + Status ReserveFileNumbersBeforeIngestion( + ColumnFamilyData* cfd, uint64_t num, + std::list::iterator* pending_output_elem, + uint64_t* next_file_number); +#endif //! ROCKSDB_LITE bool ShouldPurge(uint64_t file_number) const; void MarkAsGrabbedForPurge(uint64_t file_number); diff --git a/db/db_test.cc b/db/db_test.cc index e00127f6d..81bc6508b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2462,6 +2462,12 @@ class ModelDB : public DB { return Status::NotSupported("Not implemented."); } + using DB::IngestExternalFiles; + virtual Status IngestExternalFiles( + const std::vector& /*args*/) override { + return Status::NotSupported("Not implemented"); + } + virtual Status VerifyChecksum() override { return Status::NotSupported("Not implemented."); } diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 617dfa57a..7c927c079 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -167,7 +167,6 @@ Status ExternalSstFileIngestionJob::Run() { assert(status.ok() && need_flush == false); #endif - bool consumed_seqno = false; bool force_global_seqno = false; if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) { @@ -197,7 +196,7 @@ Status ExternalSstFileIngestionJob::Run() { TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run", &assigned_seqno); if (assigned_seqno == last_seqno + 1) { - consumed_seqno = true; + consumed_seqno_ = true; } if (!status.ok()) { return status; @@ -207,13 +206,6 @@ Status ExternalSstFileIngestionJob::Run() { f.largest_internal_key(), f.assigned_seqno, f.assigned_seqno, false); } - - if (consumed_seqno) { - versions_->SetLastAllocatedSequence(last_seqno + 1); - versions_->SetLastPublishedSequence(last_seqno + 1); - versions_->SetLastSequence(last_seqno + 1); - } - return status; } @@ -269,6 +261,7 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) { f.internal_file_path.c_str(), s.ToString().c_str()); } } + consumed_seqno_ = false; } else if (status.ok() && ingestion_options_.move_files) { // The files were moved and added successfully, remove original file links for (IngestedFileInfo& f : files_to_ingest_) { diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 0f14e3606..baa8e9f0f 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -85,7 +85,8 @@ class ExternalSstFileIngestionJob { env_options_(env_options), db_snapshots_(db_snapshots), ingestion_options_(ingestion_options), - job_start_time_(env_->NowMicros()) {} + job_start_time_(env_->NowMicros()), + consumed_seqno_(false) {} // Prepare the job by copying external files into the DB. Status Prepare(const std::vector& external_files_paths, @@ -118,6 +119,9 @@ class ExternalSstFileIngestionJob { return files_to_ingest_; } + // Whether to increment VersionSet's seqno after this job runs + bool ShouldIncrementLastSequence() const { return consumed_seqno_; } + private: // Open the external file and populate `file_to_ingest` with all the // external information we need to ingest this file. @@ -159,6 +163,7 @@ class ExternalSstFileIngestionJob { const IngestExternalFileOptions& ingestion_options_; VersionEdit edit_; uint64_t job_start_time_; + bool consumed_seqno_; }; } // namespace rocksdb diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 9fddab786..a4e4dd326 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -10,6 +10,7 @@ #include "port/port.h" #include "port/stack_trace.h" #include "rocksdb/sst_file_writer.h" +#include "util/fault_injection_test_env.h" #include "util/filename.h" #include "util/testutil.h" @@ -29,6 +30,55 @@ class ExternalSSTFileTest env_->CreateDir(sst_files_dir_); } + Status GenerateOneExternalFile( + const Options& options, ColumnFamilyHandle* cfh, + std::vector>& data, int file_id, + bool sort_data, std::string* external_file_path, + std::map* true_data) { + // Generate a file id if not provided + if (-1 == file_id) { + file_id = (++last_file_id_); + } + // Sort data if asked to do so + if (sort_data) { + std::sort(data.begin(), data.end(), + [&](const std::pair& e1, + const std::pair& e2) { + return options.comparator->Compare(e1.first, e2.first) < 0; + }); + auto uniq_iter = std::unique( + data.begin(), data.end(), + [&](const std::pair& e1, + const std::pair& e2) { + return options.comparator->Compare(e1.first, e2.first) == 0; + }); + data.resize(uniq_iter - data.begin()); + } + std::string file_path = sst_files_dir_ + ToString(file_id); + SstFileWriter sst_file_writer(EnvOptions(), options, cfh); + Status s = sst_file_writer.Open(file_path); + if (!s.ok()) { + return s; + } + for (const auto& entry : data) { + s = sst_file_writer.Put(entry.first, entry.second); + if (!s.ok()) { + sst_file_writer.Finish(); + return s; + } + } + s = sst_file_writer.Finish(); + if (s.ok() && external_file_path != nullptr) { + *external_file_path = file_path; + } + if (s.ok() && nullptr != true_data) { + for (const auto& entry : data) { + true_data->insert({entry.first, entry.second}); + } + } + return s; + } + Status GenerateAndAddExternalFile( const Options options, std::vector> data, int file_id = -1, @@ -154,6 +204,41 @@ class ExternalSSTFileTest return s; } + Status GenerateAndAddExternalFiles( + const Options& options, + const std::vector& column_families, + const std::vector& ifos, + std::vector>>& data, + int file_id, bool sort_data, + std::vector>& true_data) { + if (-1 == file_id) { + file_id = (++last_file_id_); + } + // Generate external SST files, one for each column family + size_t num_cfs = column_families.size(); + assert(ifos.size() == num_cfs); + assert(data.size() == num_cfs); + Status s; + std::vector args(num_cfs); + for (size_t i = 0; i != num_cfs; ++i) { + std::string external_file_path; + s = GenerateOneExternalFile( + options, column_families[i], data[i], file_id, sort_data, + &external_file_path, + true_data.size() == num_cfs ? &true_data[i] : nullptr); + if (!s.ok()) { + return s; + } + ++file_id; + + args[i].column_family = column_families[i]; + args[i].external_files.push_back(external_file_path); + args[i].options = ifos[i]; + } + s = db_->IngestExternalFiles(args); + return s; + } + Status GenerateAndAddExternalFile( const Options options, std::vector> data, int file_id = -1, bool allow_global_seqno = false, @@ -2233,6 +2318,371 @@ TEST_F(ExternalSSTFileTest, SkipBloomFilter) { } } +TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) { + std::unique_ptr fault_injection_env( + new FaultInjectionTestEnv(env_)); + Options options = CurrentOptions(); + options.env = fault_injection_env.get(); + CreateAndReopenWithCF({"pikachu"}, options); + std::vector column_families; + column_families.push_back(handles_[0]); + column_families.push_back(handles_[1]); + std::vector ifos(column_families.size()); + for (auto& ifo : ifos) { + ifo.allow_global_seqno = true; // Always allow global_seqno + // May or may not write global_seqno + ifo.write_global_seqno = std::get<0>(GetParam()); + // Whether to verify checksums before ingestion + ifo.verify_checksums_before_ingest = std::get<1>(GetParam()); + } + std::vector>> data; + data.push_back( + {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); + data.push_back( + {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); + // Resize the true_data vector upon construction to avoid re-alloc + std::vector> true_data( + column_families.size()); + Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data, + -1, true, true_data); + ASSERT_OK(s); + Close(); + ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); + ASSERT_EQ(2, handles_.size()); + int cf = 0; + for (const auto& verify_map : true_data) { + for (const auto& elem : verify_map) { + const std::string& key = elem.first; + const std::string& value = elem.second; + ASSERT_EQ(value, Get(cf, key)); + } + ++cf; + } + Close(); + Destroy(options, true /* delete_cf_paths */); +} + +TEST_P(ExternalSSTFileTest, + IngestFilesIntoMultipleColumnFamilies_NoMixedStateWithSnapshot) { + std::unique_ptr fault_injection_env( + new FaultInjectionTestEnv(env_)); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::IngestExternalFiles:InstallSVForFirstCF:0", + "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:" + "BeforeRead"}, + {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:" + "AfterRead", + "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + + Options options = CurrentOptions(); + options.env = fault_injection_env.get(); + CreateAndReopenWithCF({"pikachu"}, options); + const std::vector> data_before_ingestion = + {{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}}, + {{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}}}; + for (size_t i = 0; i != handles_.size(); ++i) { + int cf = static_cast(i); + const auto& orig_data = data_before_ingestion[i]; + for (const auto& kv : orig_data) { + ASSERT_OK(Put(cf, kv.first, kv.second)); + } + ASSERT_OK(Flush(cf)); + } + + std::vector column_families; + column_families.push_back(handles_[0]); + column_families.push_back(handles_[1]); + std::vector ifos(column_families.size()); + for (auto& ifo : ifos) { + ifo.allow_global_seqno = true; // Always allow global_seqno + // May or may not write global_seqno + ifo.write_global_seqno = std::get<0>(GetParam()); + // Whether to verify checksums before ingestion + ifo.verify_checksums_before_ingest = std::get<1>(GetParam()); + } + std::vector>> data; + data.push_back( + {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); + data.push_back( + {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); + // Resize the true_data vector upon construction to avoid re-alloc + std::vector> true_data( + column_families.size()); + // Take snapshot before ingestion starts + ReadOptions read_opts; + read_opts.total_order_seek = true; + read_opts.snapshot = dbfull()->GetSnapshot(); + std::vector iters(handles_.size()); + + // Range scan checks first kv of each CF before ingestion starts. + for (size_t i = 0; i != handles_.size(); ++i) { + iters[i] = dbfull()->NewIterator(read_opts, handles_[i]); + iters[i]->SeekToFirst(); + ASSERT_TRUE(iters[i]->Valid()); + const std::string& key = iters[i]->key().ToString(); + const std::string& value = iters[i]->value().ToString(); + const std::map& orig_data = + data_before_ingestion[i]; + std::map::const_iterator it = orig_data.find(key); + ASSERT_NE(orig_data.end(), it); + ASSERT_EQ(it->second, value); + iters[i]->Next(); + } + port::Thread ingest_thread([&]() { + ASSERT_OK(GenerateAndAddExternalFiles(options, column_families, ifos, data, + -1, true, true_data)); + }); + TEST_SYNC_POINT( + "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:" + "BeforeRead"); + // Should see only data before ingestion + for (size_t i = 0; i != handles_.size(); ++i) { + const auto& orig_data = data_before_ingestion[i]; + for (; iters[i]->Valid(); iters[i]->Next()) { + const std::string& key = iters[i]->key().ToString(); + const std::string& value = iters[i]->value().ToString(); + std::map::const_iterator it = + orig_data.find(key); + ASSERT_NE(orig_data.end(), it); + ASSERT_EQ(it->second, value); + } + } + TEST_SYNC_POINT( + "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:" + "AfterRead"); + ingest_thread.join(); + for (auto* iter : iters) { + delete iter; + } + iters.clear(); + dbfull()->ReleaseSnapshot(read_opts.snapshot); + + Close(); + ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); + // Should see consistent state after ingestion for all column families even + // without snapshot. + ASSERT_EQ(2, handles_.size()); + int cf = 0; + for (const auto& verify_map : true_data) { + for (const auto& elem : verify_map) { + const std::string& key = elem.first; + const std::string& value = elem.second; + ASSERT_EQ(value, Get(cf, key)); + } + ++cf; + } + Close(); + Destroy(options, true /* delete_cf_paths */); +} + +TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) { + std::unique_ptr fault_injection_env( + new FaultInjectionTestEnv(env_)); + Options options = CurrentOptions(); + options.env = fault_injection_env.get(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0", + "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_PrepareFail:" + "0"}, + {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies:PrepareFail:" + "1", + "DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + CreateAndReopenWithCF({"pikachu"}, options); + std::vector column_families; + column_families.push_back(handles_[0]); + column_families.push_back(handles_[1]); + std::vector ifos(column_families.size()); + for (auto& ifo : ifos) { + ifo.allow_global_seqno = true; // Always allow global_seqno + // May or may not write global_seqno + ifo.write_global_seqno = std::get<0>(GetParam()); + // Whether to verify block checksums before ingest + ifo.verify_checksums_before_ingest = std::get<1>(GetParam()); + } + std::vector>> data; + data.push_back( + {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); + data.push_back( + {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); + // Resize the true_data vector upon construction to avoid re-alloc + std::vector> true_data( + column_families.size()); + port::Thread ingest_thread([&]() { + Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data, + -1, true, true_data); + ASSERT_NOK(s); + }); + TEST_SYNC_POINT( + "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_PrepareFail:" + "0"); + fault_injection_env->SetFilesystemActive(false); + TEST_SYNC_POINT( + "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies:PrepareFail:" + "1"); + ingest_thread.join(); + + fault_injection_env->SetFilesystemActive(true); + Close(); + ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); + ASSERT_EQ(2, handles_.size()); + int cf = 0; + for (const auto& verify_map : true_data) { + for (const auto& elem : verify_map) { + const std::string& key = elem.first; + ASSERT_EQ("NOT_FOUND", Get(cf, key)); + } + ++cf; + } + Close(); + Destroy(options, true /* delete_cf_paths */); +} + +TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) { + std::unique_ptr fault_injection_env( + new FaultInjectionTestEnv(env_)); + Options options = CurrentOptions(); + options.env = fault_injection_env.get(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::IngestExternalFiles:BeforeJobsRun:0", + "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:" + "0"}, + {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:" + "1", + "DBImpl::IngestExternalFiles:BeforeJobsRun:1"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + CreateAndReopenWithCF({"pikachu"}, options); + std::vector column_families; + column_families.push_back(handles_[0]); + column_families.push_back(handles_[1]); + std::vector ifos(column_families.size()); + for (auto& ifo : ifos) { + ifo.allow_global_seqno = true; // Always allow global_seqno + // May or may not write global_seqno + ifo.write_global_seqno = std::get<0>(GetParam()); + // Whether to verify block checksums before ingestion + ifo.verify_checksums_before_ingest = std::get<1>(GetParam()); + } + std::vector>> data; + data.push_back( + {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); + data.push_back( + {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); + // Resize the true_data vector upon construction to avoid re-alloc + std::vector> true_data( + column_families.size()); + port::Thread ingest_thread([&]() { + Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data, + -1, true, true_data); + ASSERT_NOK(s); + }); + TEST_SYNC_POINT( + "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:" + "0"); + fault_injection_env->SetFilesystemActive(false); + TEST_SYNC_POINT( + "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:" + "1"); + ingest_thread.join(); + + fault_injection_env->SetFilesystemActive(true); + Close(); + ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); + ASSERT_EQ(2, handles_.size()); + int cf = 0; + for (const auto& verify_map : true_data) { + for (const auto& elem : verify_map) { + const std::string& key = elem.first; + ASSERT_EQ("NOT_FOUND", Get(cf, key)); + } + ++cf; + } + Close(); + Destroy(options, true /* delete_cf_paths */); +} + +TEST_P(ExternalSSTFileTest, + IngestFilesIntoMultipleColumnFamilies_PartialManifestWriteFail) { + std::unique_ptr fault_injection_env( + new FaultInjectionTestEnv(env_)); + Options options = CurrentOptions(); + options.env = fault_injection_env.get(); + + CreateAndReopenWithCF({"pikachu"}, options); + + SyncPoint::GetInstance()->ClearTrace(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->LoadDependency({ + {"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0", + "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_" + "PartialManifestWriteFail:0"}, + {"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_" + "PartialManifestWriteFail:1", + "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + + std::vector column_families; + column_families.push_back(handles_[0]); + column_families.push_back(handles_[1]); + std::vector ifos(column_families.size()); + for (auto& ifo : ifos) { + ifo.allow_global_seqno = true; // Always allow global_seqno + // May or may not write global_seqno + ifo.write_global_seqno = std::get<0>(GetParam()); + // Whether to verify block checksums before ingestion + ifo.verify_checksums_before_ingest = std::get<1>(GetParam()); + } + std::vector>> data; + data.push_back( + {std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")}); + data.push_back( + {std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")}); + // Resize the true_data vector upon construction to avoid re-alloc + std::vector> true_data( + column_families.size()); + port::Thread ingest_thread([&]() { + Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data, + -1, true, true_data); + ASSERT_NOK(s); + }); + TEST_SYNC_POINT( + "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_" + "PartialManifestWriteFail:0"); + fault_injection_env->SetFilesystemActive(false); + TEST_SYNC_POINT( + "ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_" + "PartialManifestWriteFail:1"); + ingest_thread.join(); + + fault_injection_env->DropUnsyncedFileData(); + fault_injection_env->SetFilesystemActive(true); + Close(); + ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); + ASSERT_EQ(2, handles_.size()); + int cf = 0; + for (const auto& verify_map : true_data) { + for (const auto& elem : verify_map) { + const std::string& key = elem.first; + ASSERT_EQ("NOT_FOUND", Get(cf, key)); + } + ++cf; + } + Close(); + Destroy(options, true /* delete_cf_paths */); +} + INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest, testing::Values(std::make_tuple(false, false), std::make_tuple(false, true), diff --git a/db/version_set.cc b/db/version_set.cc index 231771bdc..074077b0a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3060,6 +3060,9 @@ Status VersionSet::ProcessManifestWrites( // Write new records to MANIFEST log if (s.ok()) { +#ifndef NDEBUG + size_t idx = 0; +#endif for (auto& e : batch_edits) { std::string record; if (!e->EncodeTo(&record)) { @@ -3069,6 +3072,15 @@ Status VersionSet::ProcessManifestWrites( } TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord", rocksdb_kill_odds * REDUCE_ODDS2); +#ifndef NDEBUG + if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) { + TEST_SYNC_POINT( + "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0"); + TEST_SYNC_POINT( + "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1"); + } + ++idx; +#endif /* !NDEBUG */ s = descriptor_log_->AddRecord(record); if (!s.ok()) { break; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 53fb52c94..fa2a63731 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -110,6 +110,12 @@ struct RangePtr { RangePtr(const Slice* s, const Slice* l) : start(s), limit(l) { } }; +struct IngestExternalFileArg { + ColumnFamilyHandle* column_family = nullptr; + std::vector external_files; + IngestExternalFileOptions options; +}; + // A collections of table properties objects, where // key: is the table's file name. // value: the table properties object of the given table. @@ -1051,6 +1057,22 @@ class DB { return IngestExternalFile(DefaultColumnFamily(), external_files, options); } + // IngestExternalFiles() will ingest files for multiple column families, and + // record the result atomically to the MANIFEST. + // If this function returns OK, all column families' ingestion must succeed. + // If this function returns NOK, or the process crashes, then non-of the + // files will be ingested into the database after recovery. + // Note that it is possible for application to observe a mixed state during + // the execution of this function. If the user performs range scan over the + // column families with iterators, iterator on one column family may return + // ingested data, while iterator on other column family returns old data. + // Users can use snapshot for a consistent view of data. + // + // REQUIRES: each arg corresponds to a different column family: namely, for + // 0 <= i < j < len(args), args[i].column_family != args[j].column_family. + virtual Status IngestExternalFiles( + const std::vector& args) = 0; + virtual Status VerifyChecksum() = 0; // AddFile() is deprecated, please use IngestExternalFile() diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 1242befed..2d4d1e9f6 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -107,6 +107,12 @@ class StackableDB : public DB { return db_->IngestExternalFile(column_family, external_files, options); } + using DB::IngestExternalFiles; + virtual Status IngestExternalFiles( + const std::vector& args) override { + return db_->IngestExternalFiles(args); + } + virtual Status VerifyChecksum() override { return db_->VerifyChecksum(); } using DB::KeyMayExist; diff --git a/util/fault_injection_test_env.cc b/util/fault_injection_test_env.cc index 64e9da1aa..9cad23871 100644 --- a/util/fault_injection_test_env.cc +++ b/util/fault_injection_test_env.cc @@ -126,6 +126,7 @@ Status TestWritableFile::Append(const Slice& data) { Status s = target_->Append(data); if (s.ok()) { state_.pos_ += data.size(); + env_->WritableFileAppended(state_); } return s; } @@ -153,6 +154,7 @@ Status TestWritableFile::Sync() { } // No need to actual sync. state_.pos_at_last_sync_ = state_.pos_; + env_->WritableFileSynced(state_); return Status::OK(); } @@ -277,6 +279,28 @@ void FaultInjectionTestEnv::WritableFileClosed(const FileState& state) { } } +void FaultInjectionTestEnv::WritableFileSynced(const FileState& state) { + MutexLock l(&mutex_); + if (open_files_.find(state.filename_) != open_files_.end()) { + if (db_file_state_.find(state.filename_) == db_file_state_.end()) { + db_file_state_.insert(std::make_pair(state.filename_, state)); + } else { + db_file_state_[state.filename_] = state; + } + } +} + +void FaultInjectionTestEnv::WritableFileAppended(const FileState& state) { + MutexLock l(&mutex_); + if (open_files_.find(state.filename_) != open_files_.end()) { + if (db_file_state_.find(state.filename_) == db_file_state_.end()) { + db_file_state_.insert(std::make_pair(state.filename_, state)); + } else { + db_file_state_[state.filename_] = state; + } + } +} + // For every file that is not fully synced, make a call to `func` with // FileState of the file as the parameter. Status FaultInjectionTestEnv::DropFileData( diff --git a/util/fault_injection_test_env.h b/util/fault_injection_test_env.h index d3775d3a3..7c5a080f7 100644 --- a/util/fault_injection_test_env.h +++ b/util/fault_injection_test_env.h @@ -135,6 +135,10 @@ class FaultInjectionTestEnv : public EnvWrapper { void WritableFileClosed(const FileState& state); + void WritableFileSynced(const FileState& state); + + void WritableFileAppended(const FileState& state); + // For every file that is not fully synced, make a call to `func` with // FileState of the file as the parameter. Status DropFileData(std::function func);