From f5e46354d2ba96a6a080916cbf05c1df6f7d3275 Mon Sep 17 00:00:00 2001 From: DorianZheng Date: Fri, 27 Jul 2018 14:02:07 -0700 Subject: [PATCH] Protect external file when ingesting (#4099) Summary: If crash happen after a hard link established, Recover function may reuse the file number that has already assigned to the internal file, and this will overwrite the external file. To protect the external file, we have to make sure the file number will never being reused. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4099 Differential Revision: D9034092 Pulled By: riversand963 fbshipit-source-id: 3f1a737440b86aa2ef01673e5013aacbb7c33e28 --- db/db_impl.cc | 26 +++++++++++++++++++++++++- db/db_impl_compaction_flush.cc | 3 ++- db/external_sst_file_ingestion_job.cc | 6 ++++-- db/external_sst_file_ingestion_job.h | 1 + db/version_set.h | 5 +++++ tools/db_stress.cc | 10 +++++++--- 6 files changed, 44 insertions(+), 7 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 0088ee56d..0d6158a89 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2879,6 +2879,10 @@ Status DBImpl::IngestExternalFile( ColumnFamilyHandle* column_family, const std::vector& external_files, const IngestExternalFileOptions& ingestion_options) { + if (external_files.empty()) { + return Status::InvalidArgument("external_files is empty"); + } + Status status; auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); @@ -2896,6 +2900,9 @@ Status DBImpl::IngestExternalFile( immutable_db_options_, env_options_, &snapshots_, ingestion_options); + SuperVersionContext dummy_sv_ctx(/* create_superversion */ true); + VersionEdit dummy_edit; + uint64_t next_file_number = 0; std::list::iterator pending_output_elem; { InstrumentedMutexLock l(&mutex_); @@ -2906,10 +2913,27 @@ Status DBImpl::IngestExternalFile( // Make sure that bg cleanup wont delete the files that we are ingesting pending_output_elem = CaptureCurrentFileNumberInPendingOutputs(); + + // If crash happen after a hard link established, Recover function may + // reuse the file number that has already assigned to the internal file, + // and this will overwrite the external file. To protect the external + // file, we have to make sure the file number will never being reused. + next_file_number = versions_->FetchAddFileNumber(external_files.size()); + auto cf_options = cfd->GetLatestMutableCFOptions(); + status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_, + directories_.GetDbDir()); + if (status.ok()) { + InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options); + } + } + dummy_sv_ctx.Clean(); + if (!status.ok()) { + return status; } SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); - status = ingestion_job.Prepare(external_files, super_version); + status = ingestion_job.Prepare(external_files, next_file_number, + super_version); CleanupSuperVersion(super_version); if (!status.ok()) { InstrumentedMutexLock l(&mutex_); diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index f4f1680f9..b71063818 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -2091,7 +2091,8 @@ void DBImpl::InstallSuperVersionAndScheduleWork( old_sv->mutable_cf_options.max_write_buffer_number; } - if (sv_context->new_superversion == nullptr) { + // this branch is unlikely to step in + if (UNLIKELY(sv_context->new_superversion == nullptr)) { sv_context->NewSuperVersion(); } cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options); diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 2fc8b61c6..a3761e0d4 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -29,7 +29,9 @@ namespace rocksdb { Status ExternalSstFileIngestionJob::Prepare( - const std::vector& external_files_paths, SuperVersion* sv) { + const std::vector& external_files_paths, + uint64_t next_file_number, + SuperVersion* sv) { Status status; // Read the information of files we are ingesting @@ -90,7 +92,7 @@ Status ExternalSstFileIngestionJob::Prepare( // Copy/Move external files into DB for (IngestedFileInfo& f : files_to_ingest_) { - f.fd = FileDescriptor(versions_->NewFileNumber(), 0, f.file_size); + f.fd = FileDescriptor(next_file_number++, 0, f.file_size); const std::string path_outside_db = f.external_file_path; const std::string path_inside_db = diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 4ba604d47..29981bc10 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -89,6 +89,7 @@ class ExternalSstFileIngestionJob { // Prepare the job by copying external files into the DB. Status Prepare(const std::vector& external_files_paths, + uint64_t next_file_number, SuperVersion* sv); // Check if we need to flush the memtable before running the ingestion job diff --git a/db/version_set.h b/db/version_set.h index 6fe205651..fe8f26339 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -832,6 +832,11 @@ class VersionSet { // Allocate and return a new file number uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); } + // Fetch And Add n new file number + uint64_t FetchAddFileNumber(uint64_t n) { + return next_file_number_.fetch_add(n); + } + // Return the last sequence number. uint64_t LastSequence() const { return last_sequence_.load(std::memory_order_acquire); diff --git a/tools/db_stress.cc b/tools/db_stress.cc index b0612ad2e..e599df93e 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -1627,11 +1627,15 @@ class StressTest { if (!FLAGS_verbose) { return; } - fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") ", cf, key, sz); + std::string tmp; + tmp.reserve(sz * 2 + 16); + char buf[4]; for (size_t i = 0; i < sz; i++) { - fprintf(stdout, "%X", value[i]); + snprintf(buf, 4, "%X", value[i]); + tmp.append(buf); } - fprintf(stdout, "\n"); + fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") %s\n", + cf, key, sz, tmp.c_str()); } static int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) {