Fix data race to VersionSet::io_status_ (#7034)

Summary:
After https://github.com/facebook/rocksdb/issues/6949 , VersionSet::io_status_ can be concurrently accessed by multiple
threads without lock, causing tsan test to fail. For example, a bg flush thread
resets io_status_ before calling LogAndApply(), while another thread already in
the process of LogAndApply() reads io_status_. This is a bug.

We do not have to reset io_status_ each time we call LogAndApply(). io_status_
is part of the state of VersionSet, and it indicates the outcome of preceding
MANIFEST/CURRENT files IO operations. Its value should be updated only when:

1. MANIFEST/CURRENT files IO fail for the first time.
2. MANIFEST/CURRENT files IO succeed as part of recovering from a prior
   failure without process restart, e.g. calling Resume().

Test Plan (devserver):
COMPILE_WITH_TSAN=1 make check
COMPILE_WITH_TSAN=1 make db_test2
./db_test2 --gtest_filter=DBTest2.CompactionStall
Pull Request resolved: https://github.com/facebook/rocksdb/pull/7034

Reviewed By: zhichao-cao

Differential Revision: D22247137

Pulled By: riversand963

fbshipit-source-id: 77b83e05390f3ee3cd2d96d3fdd6fe4f225e3216
main
Yanqin Jin 5 years ago committed by Facebook GitHub Bot
parent b9d51b8684
commit d47c871190
  1. 1
      db/compaction/compaction_job.cc
  2. 2
      db/db_impl/db_impl_compaction_flush.cc
  3. 1
      db/memtable_list.cc
  4. 49
      db/version_set.cc
  5. 7
      db/version_set.h

@ -724,7 +724,6 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
cfd->internal_stats()->AddCompactionStats( cfd->internal_stats()->AddCompactionStats(
compact_->compaction->output_level(), thread_pri_, compaction_stats_); compact_->compaction->output_level(), thread_pri_, compaction_stats_);
versions_->SetIOStatus(IOStatus::OK());
if (status.ok()) { if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options); status = InstallCompactionResults(mutable_cf_options);
} }

@ -2705,7 +2705,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
for (const auto& f : *c->inputs(0)) { for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
} }
versions_->SetIOStatus(IOStatus::OK());
status = versions_->LogAndApply(c->column_family_data(), status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(), *c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir()); &mutex_, directories_.GetDbDir());
@ -2763,7 +2762,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
} }
} }
versions_->SetIOStatus(IOStatus::OK());
status = versions_->LogAndApply(c->column_family_data(), status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(), *c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir()); &mutex_, directories_.GetDbDir());

@ -470,7 +470,6 @@ Status MemTableList::TryInstallMemtableFlushResults(
} }
// this can release and reacquire the mutex. // this can release and reacquire the mutex.
vset->SetIOStatus(IOStatus::OK());
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
db_directory); db_directory);
*io_s = vset->io_status(); *io_s = vset->io_status();

@ -3878,10 +3878,6 @@ Status VersionSet::ProcessManifestWrites(
} }
#endif // NDEBUG #endif // NDEBUG
uint64_t new_manifest_file_size = 0;
Status s;
IOStatus io_s;
assert(pending_manifest_file_number_ == 0); assert(pending_manifest_file_number_ == 0);
if (!descriptor_log_ || if (!descriptor_log_ ||
manifest_file_size_ > db_options_->max_manifest_file_size) { manifest_file_size_ > db_options_->max_manifest_file_size) {
@ -3911,6 +3907,9 @@ Status VersionSet::ProcessManifestWrites(
} }
} }
uint64_t new_manifest_file_size = 0;
Status s;
IOStatus io_s;
{ {
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_); FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
mu->Unlock(); mu->Unlock();
@ -3947,9 +3946,9 @@ Status VersionSet::ProcessManifestWrites(
std::string descriptor_fname = std::string descriptor_fname =
DescriptorFileName(dbname_, pending_manifest_file_number_); DescriptorFileName(dbname_, pending_manifest_file_number_);
std::unique_ptr<FSWritableFile> descriptor_file; std::unique_ptr<FSWritableFile> descriptor_file;
s = NewWritableFile(fs_, descriptor_fname, &descriptor_file, io_s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
opt_file_opts); opt_file_opts);
if (s.ok()) { if (io_s.ok()) {
descriptor_file->SetPreallocationBlockSize( descriptor_file->SetPreallocationBlockSize(
db_options_->manifest_preallocation_size); db_options_->manifest_preallocation_size);
@ -3958,7 +3957,10 @@ Status VersionSet::ProcessManifestWrites(
nullptr, db_options_->listeners)); nullptr, db_options_->listeners));
descriptor_log_.reset( descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false)); new log::Writer(std::move(file_writer), 0, false));
s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get()); s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get(),
io_s);
} else {
s = io_s;
} }
} }
@ -3994,7 +3996,6 @@ Status VersionSet::ProcessManifestWrites(
#endif /* !NDEBUG */ #endif /* !NDEBUG */
io_s = descriptor_log_->AddRecord(record); io_s = descriptor_log_->AddRecord(record);
if (!io_s.ok()) { if (!io_s.ok()) {
io_status_ = io_s;
s = io_s; s = io_s;
break; break;
} }
@ -4005,12 +4006,9 @@ Status VersionSet::ProcessManifestWrites(
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s); "VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
} }
if (!io_s.ok()) { if (!io_s.ok()) {
io_status_ = io_s;
s = io_s; s = io_s;
ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n", ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
s.ToString().c_str()); s.ToString().c_str());
} else if (io_status_.IsIOError()) {
io_status_ = io_s;
} }
} }
@ -4020,10 +4018,7 @@ Status VersionSet::ProcessManifestWrites(
io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_, io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_,
db_directory); db_directory);
if (!io_s.ok()) { if (!io_s.ok()) {
io_status_ = io_s;
s = io_s; s = io_s;
} else if (io_status_.IsIOError()) {
io_status_ = io_s;
} }
TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest"); TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
} }
@ -4044,6 +4039,14 @@ Status VersionSet::ProcessManifestWrites(
mu->Lock(); mu->Lock();
} }
if (!io_s.ok()) {
if (io_status_.ok()) {
io_status_ = io_s;
}
} else if (!io_status_.ok()) {
io_status_ = io_s;
}
// Append the old manifest file to the obsolete_manifest_ list to be deleted // Append the old manifest file to the obsolete_manifest_ list to be deleted
// by PurgeObsoleteFiles later. // by PurgeObsoleteFiles later.
if (s.ok() && new_descriptor_log) { if (s.ok() && new_descriptor_log) {
@ -5297,7 +5300,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
Status VersionSet::WriteCurrentStateToManifest( Status VersionSet::WriteCurrentStateToManifest(
const std::unordered_map<uint32_t, MutableCFState>& curr_state, const std::unordered_map<uint32_t, MutableCFState>& curr_state,
log::Writer* log) { log::Writer* log, IOStatus& io_s) {
// TODO: Break up into multiple records to reduce memory usage on recovery? // TODO: Break up into multiple records to reduce memory usage on recovery?
// WARNING: This method doesn't hold a mutex!! // WARNING: This method doesn't hold a mutex!!
@ -5306,6 +5309,7 @@ Status VersionSet::WriteCurrentStateToManifest(
// LogAndApply. Column family manipulations can only happen within LogAndApply // LogAndApply. Column family manipulations can only happen within LogAndApply
// (the same single thread), so we're safe to iterate. // (the same single thread), so we're safe to iterate.
assert(io_s.ok());
if (db_options_->write_dbid_to_manifest) { if (db_options_->write_dbid_to_manifest) {
VersionEdit edit_for_db_id; VersionEdit edit_for_db_id;
assert(!db_id_.empty()); assert(!db_id_.empty());
@ -5315,10 +5319,9 @@ Status VersionSet::WriteCurrentStateToManifest(
return Status::Corruption("Unable to Encode VersionEdit:" + return Status::Corruption("Unable to Encode VersionEdit:" +
edit_for_db_id.DebugString(true)); edit_for_db_id.DebugString(true));
} }
IOStatus io_s = log->AddRecord(db_id_record); io_s = log->AddRecord(db_id_record);
if (!io_s.ok()) { if (!io_s.ok()) {
io_status_ = io_s; return io_s;
return std::move(io_s);
} }
} }
@ -5345,10 +5348,9 @@ Status VersionSet::WriteCurrentStateToManifest(
return Status::Corruption( return Status::Corruption(
"Unable to Encode VersionEdit:" + edit.DebugString(true)); "Unable to Encode VersionEdit:" + edit.DebugString(true));
} }
IOStatus io_s = log->AddRecord(record); io_s = log->AddRecord(record);
if (!io_s.ok()) { if (!io_s.ok()) {
io_status_ = io_s; return io_s;
return std::move(io_s);
} }
} }
@ -5398,10 +5400,9 @@ Status VersionSet::WriteCurrentStateToManifest(
return Status::Corruption( return Status::Corruption(
"Unable to Encode VersionEdit:" + edit.DebugString(true)); "Unable to Encode VersionEdit:" + edit.DebugString(true));
} }
IOStatus io_s = log->AddRecord(record); io_s = log->AddRecord(record);
if (!io_s.ok()) { if (!io_s.ok()) {
io_status_ = io_s; return io_s;
return std::move(io_s);
} }
} }
} }

@ -1159,10 +1159,7 @@ class VersionSet {
static uint64_t GetTotalSstFilesSize(Version* dummy_versions); static uint64_t GetTotalSstFilesSize(Version* dummy_versions);
// Get the IO Status returned by written Manifest. // Get the IO Status returned by written Manifest.
IOStatus io_status() const { return io_status_; } const IOStatus& io_status() const { return io_status_; }
// Set the IO Status to OK. Called before Manifest write if needed.
void SetIOStatus(const IOStatus& s) { io_status_ = s; }
protected: protected:
using VersionBuilderMap = using VersionBuilderMap =
@ -1205,7 +1202,7 @@ class VersionSet {
// Save current contents to *log // Save current contents to *log
Status WriteCurrentStateToManifest( Status WriteCurrentStateToManifest(
const std::unordered_map<uint32_t, MutableCFState>& curr_state, const std::unordered_map<uint32_t, MutableCFState>& curr_state,
log::Writer* log); log::Writer* log, IOStatus& io_s);
void AppendVersion(ColumnFamilyData* column_family_data, Version* v); void AppendVersion(ColumnFamilyData* column_family_data, Version* v);

Loading…
Cancel
Save