Atomic ingest (#4895)

Summary:
Make file ingestion atomic.

 as title.
Ingesting external SST files into multiple column families should be atomic. If
a crash occurs and db reopens, either all column families have successfully
ingested the files before the crash, or non of the ingestions have any effect
on the state of the db.

Also add unit tests for atomic ingestion.

Note that the unit test here does not cover the case of incomplete atomic group
in the MANIFEST, which is covered in VersionSetTest already.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4895

Differential Revision: D13718245

Pulled By: riversand963

fbshipit-source-id: 7df97cc483af73ad44dd6993008f99b083852198
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent 33b33235ff
commit a69d4deefb
  1. 1
      HISTORY.md
  2. 350
      db/db_impl.cc
  3. 13
      db/db_impl.h
  4. 6
      db/db_test.cc
  5. 11
      db/external_sst_file_ingestion_job.cc
  6. 7
      db/external_sst_file_ingestion_job.h
  7. 450
      db/external_sst_file_test.cc
  8. 12
      db/version_set.cc
  9. 22
      include/rocksdb/db.h
  10. 6
      include/rocksdb/utilities/stackable_db.h
  11. 24
      util/fault_injection_test_env.cc
  12. 4
      util/fault_injection_test_env.h

@ -24,6 +24,7 @@
* Remove PlainTable's store_index_in_file feature. When opening an existing DB with index in SST files, the index and bloom filter will still be rebuild while SST files are opened, in the same way as there is no index in the file. * Remove PlainTable's store_index_in_file feature. When opening an existing DB with index in SST files, the index and bloom filter will still be rebuild while SST files are opened, in the same way as there is no index in the file.
* Remove CuckooHash memtable. * Remove CuckooHash memtable.
* The counter stat `number.block.not_compressed` now also counts blocks not compressed due to poor compression ratio. * The counter stat `number.block.not_compressed` now also counts blocks not compressed due to poor compression ratio.
* Support SST file ingestion across multiple column families via DB::IngestExternalFiles. See the function's comment about atomicity.
### Bug Fixes ### Bug Fixes
* Fix a deadlock caused by compaction and file ingestion waiting for each other in the event of write stalls. * Fix a deadlock caused by compaction and file ingestion waiting for each other in the event of write stalls.

@ -3112,74 +3112,124 @@ Status DBImpl::IngestExternalFile(
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const std::vector<std::string>& external_files, const std::vector<std::string>& external_files,
const IngestExternalFileOptions& ingestion_options) { const IngestExternalFileOptions& ingestion_options) {
if (external_files.empty()) { IngestExternalFileArg arg;
return Status::InvalidArgument("external_files is empty"); arg.column_family = column_family;
} arg.external_files = external_files;
arg.options = ingestion_options;
Status status; return IngestExternalFiles({arg});
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); }
auto cfd = cfh->cfd();
// Ingest should immediately fail if ingest_behind is requested, Status DBImpl::IngestExternalFiles(
// but the DB doesn't support it. const std::vector<IngestExternalFileArg>& args) {
if (ingestion_options.ingest_behind) { if (args.empty()) {
if (!immutable_db_options_.allow_ingest_behind) { return Status::InvalidArgument("ingestion arg list is empty");
}
{
std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
for (const auto& arg : args) {
if (arg.column_family == nullptr) {
return Status::InvalidArgument("column family handle is null");
} else if (unique_cfhs.count(arg.column_family) > 0) {
return Status::InvalidArgument(
"ingestion args have duplicate column families");
}
unique_cfhs.insert(arg.column_family);
}
}
// Ingest multiple external SST files atomically.
size_t num_cfs = args.size();
for (size_t i = 0; i != num_cfs; ++i) {
if (args[i].external_files.empty()) {
char err_msg[128] = {0};
snprintf(err_msg, 128, "external_files[%zu] is empty", i);
return Status::InvalidArgument(err_msg);
}
}
for (const auto& arg : args) {
const IngestExternalFileOptions& ingest_opts = arg.options;
if (ingest_opts.ingest_behind &&
!immutable_db_options_.allow_ingest_behind) {
return Status::InvalidArgument( return Status::InvalidArgument(
"Can't ingest_behind file in DB with allow_ingest_behind=false"); "can't ingest_behind file in DB with allow_ingest_behind=false");
} }
} }
ExternalSstFileIngestionJob ingestion_job(env_, versions_.get(), cfd, // TODO (yanqin) maybe handle the case in which column_families have
immutable_db_options_, env_options_, // duplicates
&snapshots_, ingestion_options);
SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
VersionEdit dummy_edit;
uint64_t next_file_number = 0;
std::list<uint64_t>::iterator pending_output_elem; std::list<uint64_t>::iterator pending_output_elem;
{ size_t total = 0;
InstrumentedMutexLock l(&mutex_); for (const auto& arg : args) {
if (error_handler_.IsDBStopped()) { total += arg.external_files.size();
// Don't ingest files when there is a bg_error
return error_handler_.GetBGError();
}
// Make sure that bg cleanup wont delete the files that we are ingesting
pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();
// If crash happen after a hard link established, Recover function may
// reuse the file number that has already assigned to the internal file,
// and this will overwrite the external file. To protect the external
// file, we have to make sure the file number will never being reused.
next_file_number = versions_->FetchAddFileNumber(external_files.size());
auto cf_options = cfd->GetLatestMutableCFOptions();
status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
}
} }
dummy_sv_ctx.Clean(); uint64_t next_file_number = 0;
Status status = ReserveFileNumbersBeforeIngestion(
static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
&pending_output_elem, &next_file_number);
if (!status.ok()) { if (!status.ok()) {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
ReleaseFileNumberFromPendingOutputs(pending_output_elem); ReleaseFileNumberFromPendingOutputs(pending_output_elem);
return status; return status;
} }
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
status = for (const auto& arg : args) {
ingestion_job.Prepare(external_files, next_file_number, super_version); auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
CleanupSuperVersion(super_version); ingestion_jobs.emplace_back(env_, versions_.get(), cfd,
immutable_db_options_, env_options_,
&snapshots_, arg.options);
}
std::vector<std::pair<bool, Status>> exec_results;
for (size_t i = 0; i != num_cfs; ++i) {
exec_results.emplace_back(false, Status::OK());
}
// TODO(yanqin) maybe make jobs run in parallel
for (size_t i = 1; i != num_cfs; ++i) {
uint64_t start_file_number =
next_file_number + args[i - 1].external_files.size();
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
exec_results[i].second = ingestion_jobs[i].Prepare(
args[i].external_files, start_file_number, super_version);
exec_results[i].first = true;
CleanupSuperVersion(super_version);
}
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1");
{
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
exec_results[0].second = ingestion_jobs[0].Prepare(
args[0].external_files, next_file_number, super_version);
exec_results[0].first = true;
CleanupSuperVersion(super_version);
}
for (const auto& exec_result : exec_results) {
if (!exec_result.second.ok()) {
status = exec_result.second;
break;
}
}
if (!status.ok()) { if (!status.ok()) {
for (size_t i = 0; i != num_cfs; ++i) {
if (exec_results[i].first) {
ingestion_jobs[i].Cleanup(status);
}
}
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
ReleaseFileNumberFromPendingOutputs(pending_output_elem); ReleaseFileNumberFromPendingOutputs(pending_output_elem);
return status; return status;
} }
SuperVersionContext sv_context(/* create_superversion */ true); std::vector<SuperVersionContext> sv_ctxs;
for (size_t i = 0; i != num_cfs; ++i) {
sv_ctxs.emplace_back(true /* create_superversion */);
}
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:0");
TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:1");
TEST_SYNC_POINT("DBImpl::AddFile:Start"); TEST_SYNC_POINT("DBImpl::AddFile:Start");
{ {
// Lock db mutex
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
TEST_SYNC_POINT("DBImpl::AddFile:MutexLock"); TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
@ -3191,55 +3241,130 @@ Status DBImpl::IngestExternalFile(
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
} }
num_running_ingest_file_++; num_running_ingest_file_ += static_cast<int>(num_cfs);
TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter"); TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
// We cannot ingest a file into a dropped CF bool at_least_one_cf_need_flush = false;
if (cfd->IsDropped()) { std::vector<bool> need_flush(num_cfs, false);
status = Status::InvalidArgument( for (size_t i = 0; i != num_cfs; ++i) {
"Cannot ingest an external file into a dropped CF"); auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
if (cfd->IsDropped()) {
// TODO (yanqin) investigate whether we should abort ingestion or
// proceed with other non-dropped column families.
status = Status::InvalidArgument(
"cannot ingest an external file into a dropped CF");
break;
}
bool tmp = false;
status = ingestion_jobs[i].NeedsFlush(&tmp, cfd->GetSuperVersion());
need_flush[i] = tmp;
at_least_one_cf_need_flush = (at_least_one_cf_need_flush || tmp);
if (!status.ok()) {
break;
}
} }
TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
&at_least_one_cf_need_flush);
// Figure out if we need to flush the memtable first if (status.ok() && at_least_one_cf_need_flush) {
if (status.ok()) { FlushOptions flush_opts;
bool need_flush = false; flush_opts.allow_write_stall = true;
status = ingestion_job.NeedsFlush(&need_flush, cfd->GetSuperVersion()); if (immutable_db_options_.atomic_flush) {
TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush", autovector<ColumnFamilyData*> cfds_to_flush;
&need_flush); SelectColumnFamiliesForAtomicFlush(&cfds_to_flush);
if (status.ok() && need_flush) { mutex_.Unlock();
FlushOptions flush_opts; status = AtomicFlushMemTables(cfds_to_flush, flush_opts,
flush_opts.allow_write_stall = true; FlushReason::kExternalFileIngestion,
if (immutable_db_options_.atomic_flush) { true /* writes_stopped */);
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();
status = AtomicFlushMemTables(cfds, flush_opts,
FlushReason::kExternalFileIngestion,
true /* writes_stopped */);
} else {
mutex_.Unlock();
status = FlushMemTable(cfd, flush_opts,
FlushReason::kExternalFileIngestion,
true /* writes_stopped */);
}
mutex_.Lock(); mutex_.Lock();
} else {
for (size_t i = 0; i != num_cfs; ++i) {
if (need_flush[i]) {
mutex_.Unlock();
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)
->cfd();
status = FlushMemTable(cfd, flush_opts,
FlushReason::kExternalFileIngestion,
true /* writes_stopped */);
mutex_.Lock();
if (!status.ok()) {
break;
}
}
}
} }
} }
// Run ingestion jobs.
// Run the ingestion job
if (status.ok()) { if (status.ok()) {
status = ingestion_job.Run(); for (size_t i = 0; i != num_cfs; ++i) {
status = ingestion_jobs[i].Run();
if (!status.ok()) {
break;
}
}
} }
// Install job edit [Mutex will be unlocked here]
auto mutable_cf_options = cfd->GetLatestMutableCFOptions();
if (status.ok()) { if (status.ok()) {
bool should_increment_last_seqno =
ingestion_jobs[0].ShouldIncrementLastSequence();
#ifndef NDEBUG
for (size_t i = 1; i != num_cfs; ++i) {
assert(should_increment_last_seqno ==
ingestion_jobs[i].ShouldIncrementLastSequence());
}
#endif
if (should_increment_last_seqno) {
const SequenceNumber last_seqno = versions_->LastSequence();
versions_->SetLastAllocatedSequence(last_seqno + 1);
versions_->SetLastPublishedSequence(last_seqno + 1);
versions_->SetLastSequence(last_seqno + 1);
}
autovector<ColumnFamilyData*> cfds_to_commit;
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<autovector<VersionEdit*>> edit_lists;
uint32_t num_entries = 0;
for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
if (cfd->IsDropped()) {
continue;
}
cfds_to_commit.push_back(cfd);
mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
autovector<VersionEdit*> edit_list;
edit_list.push_back(ingestion_jobs[i].edit());
edit_lists.push_back(edit_list);
++num_entries;
}
// Mark the version edits as an atomic group
for (auto& edits : edit_lists) {
assert(edits.size() == 1);
edits[0]->MarkAtomicGroup(--num_entries);
}
assert(0 == num_entries);
status = status =
versions_->LogAndApply(cfd, *mutable_cf_options, ingestion_job.edit(), versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
&mutex_, directories_.GetDbDir()); edit_lists, &mutex_, directories_.GetDbDir());
} }
if (status.ok()) { if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_context, *mutable_cf_options); for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
if (!cfd->IsDropped()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
*cfd->GetLatestMutableCFOptions());
#ifndef NDEBUG
if (0 == i && num_cfs > 1) {
TEST_SYNC_POINT(
"DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
TEST_SYNC_POINT(
"DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
}
#endif // !NDEBUG
}
}
} }
// Resume writes to the DB // Resume writes to the DB
@ -3248,30 +3373,36 @@ Status DBImpl::IngestExternalFile(
} }
write_thread_.ExitUnbatched(&w); write_thread_.ExitUnbatched(&w);
// Update stats
if (status.ok()) { if (status.ok()) {
ingestion_job.UpdateStats(); for (auto& job : ingestion_jobs) {
job.UpdateStats();
}
} }
ReleaseFileNumberFromPendingOutputs(pending_output_elem); ReleaseFileNumberFromPendingOutputs(pending_output_elem);
num_running_ingest_file_ -= static_cast<int>(num_cfs);
num_running_ingest_file_--; if (0 == num_running_ingest_file_) {
if (num_running_ingest_file_ == 0) {
bg_cv_.SignalAll(); bg_cv_.SignalAll();
} }
TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock"); TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
} }
// mutex_ is unlocked here // mutex_ is unlocked here
// Cleanup // Cleanup
sv_context.Clean(); for (size_t i = 0; i != num_cfs; ++i) {
ingestion_job.Cleanup(status); sv_ctxs[i].Clean();
// This may rollback jobs that have completed successfully. This is
// intended for atomicity.
ingestion_jobs[i].Cleanup(status);
}
if (status.ok()) { if (status.ok()) {
NotifyOnExternalFileIngested(cfd, ingestion_job); for (size_t i = 0; i != num_cfs; ++i) {
auto* cfd =
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
if (!cfd->IsDropped()) {
NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]);
}
}
} }
return status; return status;
} }
@ -3391,6 +3522,35 @@ Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id,
return s; return s;
} }
Status DBImpl::ReserveFileNumbersBeforeIngestion(
ColumnFamilyData* cfd, uint64_t num,
std::list<uint64_t>::iterator* pending_output_elem,
uint64_t* next_file_number) {
Status s;
SuperVersionContext dummy_sv_ctx(true /* create_superversion */);
assert(nullptr != pending_output_elem);
assert(nullptr != next_file_number);
InstrumentedMutexLock l(&mutex_);
if (error_handler_.IsDBStopped()) {
// Do not ingest files when there is a bg_error
return error_handler_.GetBGError();
}
*pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();
*next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
auto cf_options = cfd->GetLatestMutableCFOptions();
VersionEdit dummy_edit;
// If crash happen after a hard link established, Recover function may
// reuse the file number that has already assigned to the internal file,
// and this will overwrite the external file. To protect the external
// file, we have to make sure the file number will never being reused.
s = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
if (s.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
}
dummy_sv_ctx.Clean();
return s;
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace rocksdb } // namespace rocksdb

@ -346,6 +346,10 @@ class DBImpl : public DB {
const std::vector<std::string>& external_files, const std::vector<std::string>& external_files,
const IngestExternalFileOptions& ingestion_options) override; const IngestExternalFileOptions& ingestion_options) override;
using DB::IngestExternalFiles;
virtual Status IngestExternalFiles(
const std::vector<IngestExternalFileArg>& args) override;
virtual Status VerifyChecksum() override; virtual Status VerifyChecksum() override;
using DB::StartTrace; using DB::StartTrace;
@ -1588,7 +1592,14 @@ class DBImpl : public DB {
const CompactionJobStats& compaction_job_stats, const CompactionJobStats& compaction_job_stats,
const int job_id, const Version* current, const int job_id, const Version* current,
CompactionJobInfo* compaction_job_info) const; CompactionJobInfo* compaction_job_info) const;
#endif // Reserve the next 'num' file numbers for to-be-ingested external SST files,
// and return the current file_number in 'next_file_number'.
// Write a version edit to the MANIFEST.
Status ReserveFileNumbersBeforeIngestion(
ColumnFamilyData* cfd, uint64_t num,
std::list<uint64_t>::iterator* pending_output_elem,
uint64_t* next_file_number);
#endif //! ROCKSDB_LITE
bool ShouldPurge(uint64_t file_number) const; bool ShouldPurge(uint64_t file_number) const;
void MarkAsGrabbedForPurge(uint64_t file_number); void MarkAsGrabbedForPurge(uint64_t file_number);

@ -2462,6 +2462,12 @@ class ModelDB : public DB {
return Status::NotSupported("Not implemented."); return Status::NotSupported("Not implemented.");
} }
using DB::IngestExternalFiles;
virtual Status IngestExternalFiles(
const std::vector<IngestExternalFileArg>& /*args*/) override {
return Status::NotSupported("Not implemented");
}
virtual Status VerifyChecksum() override { virtual Status VerifyChecksum() override {
return Status::NotSupported("Not implemented."); return Status::NotSupported("Not implemented.");
} }

@ -167,7 +167,6 @@ Status ExternalSstFileIngestionJob::Run() {
assert(status.ok() && need_flush == false); assert(status.ok() && need_flush == false);
#endif #endif
bool consumed_seqno = false;
bool force_global_seqno = false; bool force_global_seqno = false;
if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) { if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) {
@ -197,7 +196,7 @@ Status ExternalSstFileIngestionJob::Run() {
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run", TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
&assigned_seqno); &assigned_seqno);
if (assigned_seqno == last_seqno + 1) { if (assigned_seqno == last_seqno + 1) {
consumed_seqno = true; consumed_seqno_ = true;
} }
if (!status.ok()) { if (!status.ok()) {
return status; return status;
@ -207,13 +206,6 @@ Status ExternalSstFileIngestionJob::Run() {
f.largest_internal_key(), f.assigned_seqno, f.assigned_seqno, f.largest_internal_key(), f.assigned_seqno, f.assigned_seqno,
false); false);
} }
if (consumed_seqno) {
versions_->SetLastAllocatedSequence(last_seqno + 1);
versions_->SetLastPublishedSequence(last_seqno + 1);
versions_->SetLastSequence(last_seqno + 1);
}
return status; return status;
} }
@ -269,6 +261,7 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
f.internal_file_path.c_str(), s.ToString().c_str()); f.internal_file_path.c_str(), s.ToString().c_str());
} }
} }
consumed_seqno_ = false;
} else if (status.ok() && ingestion_options_.move_files) { } else if (status.ok() && ingestion_options_.move_files) {
// The files were moved and added successfully, remove original file links // The files were moved and added successfully, remove original file links
for (IngestedFileInfo& f : files_to_ingest_) { for (IngestedFileInfo& f : files_to_ingest_) {

@ -85,7 +85,8 @@ class ExternalSstFileIngestionJob {
env_options_(env_options), env_options_(env_options),
db_snapshots_(db_snapshots), db_snapshots_(db_snapshots),
ingestion_options_(ingestion_options), ingestion_options_(ingestion_options),
job_start_time_(env_->NowMicros()) {} job_start_time_(env_->NowMicros()),
consumed_seqno_(false) {}
// Prepare the job by copying external files into the DB. // Prepare the job by copying external files into the DB.
Status Prepare(const std::vector<std::string>& external_files_paths, Status Prepare(const std::vector<std::string>& external_files_paths,
@ -118,6 +119,9 @@ class ExternalSstFileIngestionJob {
return files_to_ingest_; return files_to_ingest_;
} }
// Whether to increment VersionSet's seqno after this job runs
bool ShouldIncrementLastSequence() const { return consumed_seqno_; }
private: private:
// Open the external file and populate `file_to_ingest` with all the // Open the external file and populate `file_to_ingest` with all the
// external information we need to ingest this file. // external information we need to ingest this file.
@ -159,6 +163,7 @@ class ExternalSstFileIngestionJob {
const IngestExternalFileOptions& ingestion_options_; const IngestExternalFileOptions& ingestion_options_;
VersionEdit edit_; VersionEdit edit_;
uint64_t job_start_time_; uint64_t job_start_time_;
bool consumed_seqno_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -10,6 +10,7 @@
#include "port/port.h" #include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/sst_file_writer.h" #include "rocksdb/sst_file_writer.h"
#include "util/fault_injection_test_env.h"
#include "util/filename.h" #include "util/filename.h"
#include "util/testutil.h" #include "util/testutil.h"
@ -29,6 +30,55 @@ class ExternalSSTFileTest
env_->CreateDir(sst_files_dir_); env_->CreateDir(sst_files_dir_);
} }
Status GenerateOneExternalFile(
const Options& options, ColumnFamilyHandle* cfh,
std::vector<std::pair<std::string, std::string>>& data, int file_id,
bool sort_data, std::string* external_file_path,
std::map<std::string, std::string>* true_data) {
// Generate a file id if not provided
if (-1 == file_id) {
file_id = (++last_file_id_);
}
// Sort data if asked to do so
if (sort_data) {
std::sort(data.begin(), data.end(),
[&](const std::pair<std::string, std::string>& e1,
const std::pair<std::string, std::string>& e2) {
return options.comparator->Compare(e1.first, e2.first) < 0;
});
auto uniq_iter = std::unique(
data.begin(), data.end(),
[&](const std::pair<std::string, std::string>& e1,
const std::pair<std::string, std::string>& e2) {
return options.comparator->Compare(e1.first, e2.first) == 0;
});
data.resize(uniq_iter - data.begin());
}
std::string file_path = sst_files_dir_ + ToString(file_id);
SstFileWriter sst_file_writer(EnvOptions(), options, cfh);
Status s = sst_file_writer.Open(file_path);
if (!s.ok()) {
return s;
}
for (const auto& entry : data) {
s = sst_file_writer.Put(entry.first, entry.second);
if (!s.ok()) {
sst_file_writer.Finish();
return s;
}
}
s = sst_file_writer.Finish();
if (s.ok() && external_file_path != nullptr) {
*external_file_path = file_path;
}
if (s.ok() && nullptr != true_data) {
for (const auto& entry : data) {
true_data->insert({entry.first, entry.second});
}
}
return s;
}
Status GenerateAndAddExternalFile( Status GenerateAndAddExternalFile(
const Options options, const Options options,
std::vector<std::pair<std::string, std::string>> data, int file_id = -1, std::vector<std::pair<std::string, std::string>> data, int file_id = -1,
@ -154,6 +204,41 @@ class ExternalSSTFileTest
return s; return s;
} }
Status GenerateAndAddExternalFiles(
const Options& options,
const std::vector<ColumnFamilyHandle*>& column_families,
const std::vector<IngestExternalFileOptions>& ifos,
std::vector<std::vector<std::pair<std::string, std::string>>>& data,
int file_id, bool sort_data,
std::vector<std::map<std::string, std::string>>& true_data) {
if (-1 == file_id) {
file_id = (++last_file_id_);
}
// Generate external SST files, one for each column family
size_t num_cfs = column_families.size();
assert(ifos.size() == num_cfs);
assert(data.size() == num_cfs);
Status s;
std::vector<IngestExternalFileArg> args(num_cfs);
for (size_t i = 0; i != num_cfs; ++i) {
std::string external_file_path;
s = GenerateOneExternalFile(
options, column_families[i], data[i], file_id, sort_data,
&external_file_path,
true_data.size() == num_cfs ? &true_data[i] : nullptr);
if (!s.ok()) {
return s;
}
++file_id;
args[i].column_family = column_families[i];
args[i].external_files.push_back(external_file_path);
args[i].options = ifos[i];
}
s = db_->IngestExternalFiles(args);
return s;
}
Status GenerateAndAddExternalFile( Status GenerateAndAddExternalFile(
const Options options, std::vector<std::pair<int, std::string>> data, const Options options, std::vector<std::pair<int, std::string>> data,
int file_id = -1, bool allow_global_seqno = false, int file_id = -1, bool allow_global_seqno = false,
@ -2233,6 +2318,371 @@ TEST_F(ExternalSSTFileTest, SkipBloomFilter) {
} }
} }
TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_Success) {
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(env_));
Options options = CurrentOptions();
options.env = fault_injection_env.get();
CreateAndReopenWithCF({"pikachu"}, options);
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
// May or may not write global_seqno
ifo.write_global_seqno = std::get<0>(GetParam());
// Whether to verify checksums before ingestion
ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
}
std::vector<std::vector<std::pair<std::string, std::string>>> data;
data.push_back(
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
-1, true, true_data);
ASSERT_OK(s);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(2, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {
const std::string& key = elem.first;
const std::string& value = elem.second;
ASSERT_EQ(value, Get(cf, key));
}
++cf;
}
Close();
Destroy(options, true /* delete_cf_paths */);
}
TEST_P(ExternalSSTFileTest,
IngestFilesIntoMultipleColumnFamilies_NoMixedStateWithSnapshot) {
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(env_));
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::IngestExternalFiles:InstallSVForFirstCF:0",
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
"BeforeRead"},
{"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
"AfterRead",
"DBImpl::IngestExternalFiles:InstallSVForFirstCF:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.env = fault_injection_env.get();
CreateAndReopenWithCF({"pikachu"}, options);
const std::vector<std::map<std::string, std::string>> data_before_ingestion =
{{{"foo1", "fv1_0"}, {"foo2", "fv2_0"}, {"foo3", "fv3_0"}},
{{"bar1", "bv1_0"}, {"bar2", "bv2_0"}, {"bar3", "bv3_0"}}};
for (size_t i = 0; i != handles_.size(); ++i) {
int cf = static_cast<int>(i);
const auto& orig_data = data_before_ingestion[i];
for (const auto& kv : orig_data) {
ASSERT_OK(Put(cf, kv.first, kv.second));
}
ASSERT_OK(Flush(cf));
}
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
// May or may not write global_seqno
ifo.write_global_seqno = std::get<0>(GetParam());
// Whether to verify checksums before ingestion
ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
}
std::vector<std::vector<std::pair<std::string, std::string>>> data;
data.push_back(
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
// Take snapshot before ingestion starts
ReadOptions read_opts;
read_opts.total_order_seek = true;
read_opts.snapshot = dbfull()->GetSnapshot();
std::vector<Iterator*> iters(handles_.size());
// Range scan checks first kv of each CF before ingestion starts.
for (size_t i = 0; i != handles_.size(); ++i) {
iters[i] = dbfull()->NewIterator(read_opts, handles_[i]);
iters[i]->SeekToFirst();
ASSERT_TRUE(iters[i]->Valid());
const std::string& key = iters[i]->key().ToString();
const std::string& value = iters[i]->value().ToString();
const std::map<std::string, std::string>& orig_data =
data_before_ingestion[i];
std::map<std::string, std::string>::const_iterator it = orig_data.find(key);
ASSERT_NE(orig_data.end(), it);
ASSERT_EQ(it->second, value);
iters[i]->Next();
}
port::Thread ingest_thread([&]() {
ASSERT_OK(GenerateAndAddExternalFiles(options, column_families, ifos, data,
-1, true, true_data));
});
TEST_SYNC_POINT(
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
"BeforeRead");
// Should see only data before ingestion
for (size_t i = 0; i != handles_.size(); ++i) {
const auto& orig_data = data_before_ingestion[i];
for (; iters[i]->Valid(); iters[i]->Next()) {
const std::string& key = iters[i]->key().ToString();
const std::string& value = iters[i]->value().ToString();
std::map<std::string, std::string>::const_iterator it =
orig_data.find(key);
ASSERT_NE(orig_data.end(), it);
ASSERT_EQ(it->second, value);
}
}
TEST_SYNC_POINT(
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_MixedState:"
"AfterRead");
ingest_thread.join();
for (auto* iter : iters) {
delete iter;
}
iters.clear();
dbfull()->ReleaseSnapshot(read_opts.snapshot);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
// Should see consistent state after ingestion for all column families even
// without snapshot.
ASSERT_EQ(2, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {
const std::string& key = elem.first;
const std::string& value = elem.second;
ASSERT_EQ(value, Get(cf, key));
}
++cf;
}
Close();
Destroy(options, true /* delete_cf_paths */);
}
TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_PrepareFail) {
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(env_));
Options options = CurrentOptions();
options.env = fault_injection_env.get();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0",
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_PrepareFail:"
"0"},
{"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies:PrepareFail:"
"1",
"DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options);
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
// May or may not write global_seqno
ifo.write_global_seqno = std::get<0>(GetParam());
// Whether to verify block checksums before ingest
ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
}
std::vector<std::vector<std::pair<std::string, std::string>>> data;
data.push_back(
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
port::Thread ingest_thread([&]() {
Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
-1, true, true_data);
ASSERT_NOK(s);
});
TEST_SYNC_POINT(
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_PrepareFail:"
"0");
fault_injection_env->SetFilesystemActive(false);
TEST_SYNC_POINT(
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies:PrepareFail:"
"1");
ingest_thread.join();
fault_injection_env->SetFilesystemActive(true);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(2, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {
const std::string& key = elem.first;
ASSERT_EQ("NOT_FOUND", Get(cf, key));
}
++cf;
}
Close();
Destroy(options, true /* delete_cf_paths */);
}
TEST_P(ExternalSSTFileTest, IngestFilesIntoMultipleColumnFamilies_CommitFail) {
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(env_));
Options options = CurrentOptions();
options.env = fault_injection_env.get();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::IngestExternalFiles:BeforeJobsRun:0",
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
"0"},
{"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
"1",
"DBImpl::IngestExternalFiles:BeforeJobsRun:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options);
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
// May or may not write global_seqno
ifo.write_global_seqno = std::get<0>(GetParam());
// Whether to verify block checksums before ingestion
ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
}
std::vector<std::vector<std::pair<std::string, std::string>>> data;
data.push_back(
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
port::Thread ingest_thread([&]() {
Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
-1, true, true_data);
ASSERT_NOK(s);
});
TEST_SYNC_POINT(
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
"0");
fault_injection_env->SetFilesystemActive(false);
TEST_SYNC_POINT(
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_CommitFail:"
"1");
ingest_thread.join();
fault_injection_env->SetFilesystemActive(true);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(2, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {
const std::string& key = elem.first;
ASSERT_EQ("NOT_FOUND", Get(cf, key));
}
++cf;
}
Close();
Destroy(options, true /* delete_cf_paths */);
}
TEST_P(ExternalSSTFileTest,
IngestFilesIntoMultipleColumnFamilies_PartialManifestWriteFail) {
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(env_));
Options options = CurrentOptions();
options.env = fault_injection_env.get();
CreateAndReopenWithCF({"pikachu"}, options);
SyncPoint::GetInstance()->ClearTrace();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->LoadDependency({
{"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
"PartialManifestWriteFail:0"},
{"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
"PartialManifestWriteFail:1",
"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
std::vector<ColumnFamilyHandle*> column_families;
column_families.push_back(handles_[0]);
column_families.push_back(handles_[1]);
std::vector<IngestExternalFileOptions> ifos(column_families.size());
for (auto& ifo : ifos) {
ifo.allow_global_seqno = true; // Always allow global_seqno
// May or may not write global_seqno
ifo.write_global_seqno = std::get<0>(GetParam());
// Whether to verify block checksums before ingestion
ifo.verify_checksums_before_ingest = std::get<1>(GetParam());
}
std::vector<std::vector<std::pair<std::string, std::string>>> data;
data.push_back(
{std::make_pair("foo1", "fv1"), std::make_pair("foo2", "fv2")});
data.push_back(
{std::make_pair("bar1", "bv1"), std::make_pair("bar2", "bv2")});
// Resize the true_data vector upon construction to avoid re-alloc
std::vector<std::map<std::string, std::string>> true_data(
column_families.size());
port::Thread ingest_thread([&]() {
Status s = GenerateAndAddExternalFiles(options, column_families, ifos, data,
-1, true, true_data);
ASSERT_NOK(s);
});
TEST_SYNC_POINT(
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
"PartialManifestWriteFail:0");
fault_injection_env->SetFilesystemActive(false);
TEST_SYNC_POINT(
"ExternalSSTFileTest::IngestFilesIntoMultipleColumnFamilies_"
"PartialManifestWriteFail:1");
ingest_thread.join();
fault_injection_env->DropUnsyncedFileData();
fault_injection_env->SetFilesystemActive(true);
Close();
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
ASSERT_EQ(2, handles_.size());
int cf = 0;
for (const auto& verify_map : true_data) {
for (const auto& elem : verify_map) {
const std::string& key = elem.first;
ASSERT_EQ("NOT_FOUND", Get(cf, key));
}
++cf;
}
Close();
Destroy(options, true /* delete_cf_paths */);
}
INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest, INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest,
testing::Values(std::make_tuple(false, false), testing::Values(std::make_tuple(false, false),
std::make_tuple(false, true), std::make_tuple(false, true),

@ -3060,6 +3060,9 @@ Status VersionSet::ProcessManifestWrites(
// Write new records to MANIFEST log // Write new records to MANIFEST log
if (s.ok()) { if (s.ok()) {
#ifndef NDEBUG
size_t idx = 0;
#endif
for (auto& e : batch_edits) { for (auto& e : batch_edits) {
std::string record; std::string record;
if (!e->EncodeTo(&record)) { if (!e->EncodeTo(&record)) {
@ -3069,6 +3072,15 @@ Status VersionSet::ProcessManifestWrites(
} }
TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord", TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
rocksdb_kill_odds * REDUCE_ODDS2); rocksdb_kill_odds * REDUCE_ODDS2);
#ifndef NDEBUG
if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
TEST_SYNC_POINT(
"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0");
TEST_SYNC_POINT(
"VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
}
++idx;
#endif /* !NDEBUG */
s = descriptor_log_->AddRecord(record); s = descriptor_log_->AddRecord(record);
if (!s.ok()) { if (!s.ok()) {
break; break;

@ -110,6 +110,12 @@ struct RangePtr {
RangePtr(const Slice* s, const Slice* l) : start(s), limit(l) { } RangePtr(const Slice* s, const Slice* l) : start(s), limit(l) { }
}; };
struct IngestExternalFileArg {
ColumnFamilyHandle* column_family = nullptr;
std::vector<std::string> external_files;
IngestExternalFileOptions options;
};
// A collections of table properties objects, where // A collections of table properties objects, where
// key: is the table's file name. // key: is the table's file name.
// value: the table properties object of the given table. // value: the table properties object of the given table.
@ -1051,6 +1057,22 @@ class DB {
return IngestExternalFile(DefaultColumnFamily(), external_files, options); return IngestExternalFile(DefaultColumnFamily(), external_files, options);
} }
// IngestExternalFiles() will ingest files for multiple column families, and
// record the result atomically to the MANIFEST.
// If this function returns OK, all column families' ingestion must succeed.
// If this function returns NOK, or the process crashes, then non-of the
// files will be ingested into the database after recovery.
// Note that it is possible for application to observe a mixed state during
// the execution of this function. If the user performs range scan over the
// column families with iterators, iterator on one column family may return
// ingested data, while iterator on other column family returns old data.
// Users can use snapshot for a consistent view of data.
//
// REQUIRES: each arg corresponds to a different column family: namely, for
// 0 <= i < j < len(args), args[i].column_family != args[j].column_family.
virtual Status IngestExternalFiles(
const std::vector<IngestExternalFileArg>& args) = 0;
virtual Status VerifyChecksum() = 0; virtual Status VerifyChecksum() = 0;
// AddFile() is deprecated, please use IngestExternalFile() // AddFile() is deprecated, please use IngestExternalFile()

@ -107,6 +107,12 @@ class StackableDB : public DB {
return db_->IngestExternalFile(column_family, external_files, options); return db_->IngestExternalFile(column_family, external_files, options);
} }
using DB::IngestExternalFiles;
virtual Status IngestExternalFiles(
const std::vector<IngestExternalFileArg>& args) override {
return db_->IngestExternalFiles(args);
}
virtual Status VerifyChecksum() override { return db_->VerifyChecksum(); } virtual Status VerifyChecksum() override { return db_->VerifyChecksum(); }
using DB::KeyMayExist; using DB::KeyMayExist;

@ -126,6 +126,7 @@ Status TestWritableFile::Append(const Slice& data) {
Status s = target_->Append(data); Status s = target_->Append(data);
if (s.ok()) { if (s.ok()) {
state_.pos_ += data.size(); state_.pos_ += data.size();
env_->WritableFileAppended(state_);
} }
return s; return s;
} }
@ -153,6 +154,7 @@ Status TestWritableFile::Sync() {
} }
// No need to actual sync. // No need to actual sync.
state_.pos_at_last_sync_ = state_.pos_; state_.pos_at_last_sync_ = state_.pos_;
env_->WritableFileSynced(state_);
return Status::OK(); return Status::OK();
} }
@ -277,6 +279,28 @@ void FaultInjectionTestEnv::WritableFileClosed(const FileState& state) {
} }
} }
void FaultInjectionTestEnv::WritableFileSynced(const FileState& state) {
MutexLock l(&mutex_);
if (open_files_.find(state.filename_) != open_files_.end()) {
if (db_file_state_.find(state.filename_) == db_file_state_.end()) {
db_file_state_.insert(std::make_pair(state.filename_, state));
} else {
db_file_state_[state.filename_] = state;
}
}
}
void FaultInjectionTestEnv::WritableFileAppended(const FileState& state) {
MutexLock l(&mutex_);
if (open_files_.find(state.filename_) != open_files_.end()) {
if (db_file_state_.find(state.filename_) == db_file_state_.end()) {
db_file_state_.insert(std::make_pair(state.filename_, state));
} else {
db_file_state_[state.filename_] = state;
}
}
}
// For every file that is not fully synced, make a call to `func` with // For every file that is not fully synced, make a call to `func` with
// FileState of the file as the parameter. // FileState of the file as the parameter.
Status FaultInjectionTestEnv::DropFileData( Status FaultInjectionTestEnv::DropFileData(

@ -135,6 +135,10 @@ class FaultInjectionTestEnv : public EnvWrapper {
void WritableFileClosed(const FileState& state); void WritableFileClosed(const FileState& state);
void WritableFileSynced(const FileState& state);
void WritableFileAppended(const FileState& state);
// For every file that is not fully synced, make a call to `func` with // For every file that is not fully synced, make a call to `func` with
// FileState of the file as the parameter. // FileState of the file as the parameter.
Status DropFileData(std::function<Status(Env*, FileState)> func); Status DropFileData(std::function<Status(Env*, FileState)> func);

Loading…
Cancel
Save