Protect external file when ingesting (#4099)

Summary:
If crash happen after a hard link established, Recover function may reuse the file number that has already assigned to the internal file, and this will overwrite the external file. To protect the external file, we have to make sure the file number will never being reused.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/4099

Differential Revision: D9034092

Pulled By: riversand963

fbshipit-source-id: 3f1a737440b86aa2ef01673e5013aacbb7c33e28
main
DorianZheng 6 years ago committed by Facebook Github Bot
parent c33b32671e
commit f5e46354d2
  1. 26
      db/db_impl.cc
  2. 3
      db/db_impl_compaction_flush.cc
  3. 6
      db/external_sst_file_ingestion_job.cc
  4. 1
      db/external_sst_file_ingestion_job.h
  5. 5
      db/version_set.h
  6. 10
      tools/db_stress.cc

@ -2879,6 +2879,10 @@ Status DBImpl::IngestExternalFile(
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const std::vector<std::string>& external_files, const std::vector<std::string>& external_files,
const IngestExternalFileOptions& ingestion_options) { const IngestExternalFileOptions& ingestion_options) {
if (external_files.empty()) {
return Status::InvalidArgument("external_files is empty");
}
Status status; Status status;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd(); auto cfd = cfh->cfd();
@ -2896,6 +2900,9 @@ Status DBImpl::IngestExternalFile(
immutable_db_options_, env_options_, immutable_db_options_, env_options_,
&snapshots_, ingestion_options); &snapshots_, ingestion_options);
SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
VersionEdit dummy_edit;
uint64_t next_file_number = 0;
std::list<uint64_t>::iterator pending_output_elem; std::list<uint64_t>::iterator pending_output_elem;
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
@ -2906,10 +2913,27 @@ Status DBImpl::IngestExternalFile(
// Make sure that bg cleanup wont delete the files that we are ingesting // Make sure that bg cleanup wont delete the files that we are ingesting
pending_output_elem = CaptureCurrentFileNumberInPendingOutputs(); pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();
// If crash happen after a hard link established, Recover function may
// reuse the file number that has already assigned to the internal file,
// and this will overwrite the external file. To protect the external
// file, we have to make sure the file number will never being reused.
next_file_number = versions_->FetchAddFileNumber(external_files.size());
auto cf_options = cfd->GetLatestMutableCFOptions();
status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
}
}
dummy_sv_ctx.Clean();
if (!status.ok()) {
return status;
} }
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
status = ingestion_job.Prepare(external_files, super_version); status = ingestion_job.Prepare(external_files, next_file_number,
super_version);
CleanupSuperVersion(super_version); CleanupSuperVersion(super_version);
if (!status.ok()) { if (!status.ok()) {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);

@ -2091,7 +2091,8 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
old_sv->mutable_cf_options.max_write_buffer_number; old_sv->mutable_cf_options.max_write_buffer_number;
} }
if (sv_context->new_superversion == nullptr) { // this branch is unlikely to step in
if (UNLIKELY(sv_context->new_superversion == nullptr)) {
sv_context->NewSuperVersion(); sv_context->NewSuperVersion();
} }
cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options); cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);

@ -29,7 +29,9 @@
namespace rocksdb { namespace rocksdb {
Status ExternalSstFileIngestionJob::Prepare( Status ExternalSstFileIngestionJob::Prepare(
const std::vector<std::string>& external_files_paths, SuperVersion* sv) { const std::vector<std::string>& external_files_paths,
uint64_t next_file_number,
SuperVersion* sv) {
Status status; Status status;
// Read the information of files we are ingesting // Read the information of files we are ingesting
@ -90,7 +92,7 @@ Status ExternalSstFileIngestionJob::Prepare(
// Copy/Move external files into DB // Copy/Move external files into DB
for (IngestedFileInfo& f : files_to_ingest_) { for (IngestedFileInfo& f : files_to_ingest_) {
f.fd = FileDescriptor(versions_->NewFileNumber(), 0, f.file_size); f.fd = FileDescriptor(next_file_number++, 0, f.file_size);
const std::string path_outside_db = f.external_file_path; const std::string path_outside_db = f.external_file_path;
const std::string path_inside_db = const std::string path_inside_db =

@ -89,6 +89,7 @@ class ExternalSstFileIngestionJob {
// Prepare the job by copying external files into the DB. // Prepare the job by copying external files into the DB.
Status Prepare(const std::vector<std::string>& external_files_paths, Status Prepare(const std::vector<std::string>& external_files_paths,
uint64_t next_file_number,
SuperVersion* sv); SuperVersion* sv);
// Check if we need to flush the memtable before running the ingestion job // Check if we need to flush the memtable before running the ingestion job

@ -832,6 +832,11 @@ class VersionSet {
// Allocate and return a new file number // Allocate and return a new file number
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); } uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
// Fetch And Add n new file number
uint64_t FetchAddFileNumber(uint64_t n) {
return next_file_number_.fetch_add(n);
}
// Return the last sequence number. // Return the last sequence number.
uint64_t LastSequence() const { uint64_t LastSequence() const {
return last_sequence_.load(std::memory_order_acquire); return last_sequence_.load(std::memory_order_acquire);

@ -1627,11 +1627,15 @@ class StressTest {
if (!FLAGS_verbose) { if (!FLAGS_verbose) {
return; return;
} }
fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") ", cf, key, sz); std::string tmp;
tmp.reserve(sz * 2 + 16);
char buf[4];
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
fprintf(stdout, "%X", value[i]); snprintf(buf, 4, "%X", value[i]);
tmp.append(buf);
} }
fprintf(stdout, "\n"); fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") %s\n",
cf, key, sz, tmp.c_str());
} }
static int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) { static int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) {

Loading…
Cancel
Save