diff --git a/HISTORY.md b/HISTORY.md index 2b10e8e04..417342159 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ * Fixed bug where `FlushWAL(true /* sync */)` (used by `GetLiveFilesStorageInfo()`, which is used by checkpoint and backup) could cause parallel writes at the tail of a WAL file to never be synced. * Fix periodic_task unable to re-register the same task type, which may cause `SetOptions()` fail to update periodical_task time like: `stats_dump_period_sec`, `stats_persist_period_sec`. * Fixed a bug in the rocksdb.prefetched.bytes.discarded stat. It was counting the prefetch buffer size, rather than the actual number of bytes discarded from the buffer. +* Fix bug where the directory containing CURRENT can left unsynced after CURRENT is updated to point to the latest MANIFEST, which leads to risk of unsync data loss of CURRENT. ### Public API changes * Add `rocksdb_column_family_handle_get_id`, `rocksdb_column_family_handle_get_name` to get name, id of column family in C API diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index d04b3fda6..79b7f7be2 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -377,7 +377,7 @@ class CompactionJobTestBase : public testing::Test { mutex_.Lock(); EXPECT_OK( versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), - mutable_cf_options_, &edit, &mutex_)); + mutable_cf_options_, &edit, &mutex_, nullptr)); mutex_.Unlock(); } diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 08fa75597..755e67d9e 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1534,8 +1534,8 @@ Status DBImpl::SyncWAL() { Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) { // not empty, write to MANIFEST. mutex_.AssertHeld(); - Status status = - versions_->LogAndApplyToDefaultColumnFamily(synced_wals, &mutex_); + Status status = versions_->LogAndApplyToDefaultColumnFamily( + synced_wals, &mutex_, directories_.GetDbDir()); if (!status.ok() && versions_->io_status().IsIOError()) { status = error_handler_.SetBGError(versions_->io_status(), BackgroundErrorReason::kManifestWrite); @@ -3149,7 +3149,7 @@ Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) { WriteThread::Writer w; write_thread_.EnterUnbatched(&w, &mutex_); s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, - &mutex_); + &mutex_, directories_.GetDbDir()); write_thread_.ExitUnbatched(&w); } if (s.ok()) { diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 150e52338..b6882df06 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -988,7 +988,7 @@ Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd, } Status s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), - &edit, &mutex_); + &edit, &mutex_, directories_.GetDbDir()); if (!s.ok()) { return s; } diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 34880ceb7..7f400f5e3 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -2163,7 +2163,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { VersionEdit wal_deletion; wal_deletion.DeleteWalsBefore(min_wal_number_to_keep); - s = versions_->LogAndApplyToDefaultColumnFamily(&wal_deletion, &mutex_); + s = versions_->LogAndApplyToDefaultColumnFamily(&wal_deletion, &mutex_, + directories_.GetDbDir()); if (!s.ok() && versions_->io_status().IsIOError()) { s = error_handler_.SetBGError(versions_->io_status(), BackgroundErrorReason::kManifestWrite); diff --git a/db/experimental.cc b/db/experimental.cc index e36efbb22..2d155fcdb 100644 --- a/db/experimental.cc +++ b/db/experimental.cc @@ -122,7 +122,11 @@ Status UpdateManifestForFilesState( } if (s.ok() && edit.NumEntries() > 0) { - s = w.LogAndApply(cfd, &edit); + std::unique_ptr db_dir; + s = fs->NewDirectory(db_name, IOOptions(), &db_dir, nullptr); + if (s.ok()) { + s = w.LogAndApply(cfd, &edit, db_dir.get()); + } if (s.ok()) { ++cfs_updated; } diff --git a/db/repair.cc b/db/repair.cc index 482c3af7d..a38b7bde9 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -162,9 +162,13 @@ class Repairer { edit.AddColumnFamily(cf_name); mutex_.Lock(); - Status status = vset_.LogAndApply(cfd, mut_cf_opts, &edit, &mutex_, - nullptr /* db_directory */, - false /* new_descriptor_log */, cf_opts); + std::unique_ptr db_dir; + Status status = env_->GetFileSystem()->NewDirectory(dbname_, IOOptions(), + &db_dir, nullptr); + if (status.ok()) { + status = vset_.LogAndApply(cfd, mut_cf_opts, &edit, &mutex_, db_dir.get(), + false /* new_descriptor_log */, cf_opts); + } mutex_.Unlock(); return status; } @@ -656,9 +660,14 @@ class Repairer { assert(next_file_number_ > 0); vset_.MarkFileNumberUsed(next_file_number_ - 1); mutex_.Lock(); - Status status = vset_.LogAndApply( - cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, - nullptr /* db_directory */, false /* new_descriptor_log */); + std::unique_ptr db_dir; + Status status = env_->GetFileSystem()->NewDirectory(dbname_, IOOptions(), + &db_dir, nullptr); + if (status.ok()) { + status = vset_.LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit, &mutex_, db_dir.get(), + false /* new_descriptor_log */); + } mutex_.Unlock(); if (!status.ok()) { return status; diff --git a/db/version_set.cc b/db/version_set.cc index a3f37c85b..5e631556b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4562,7 +4562,7 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, Status VersionSet::ProcessManifestWrites( std::deque& writers, InstrumentedMutex* mu, - FSDirectory* db_directory, bool new_descriptor_log, + FSDirectory* dir_contains_current_file, bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options) { mu->AssertHeld(); assert(!writers.empty()); @@ -4893,7 +4893,7 @@ Status VersionSet::ProcessManifestWrites( } if (s.ok() && new_descriptor_log) { io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_, - db_directory); + dir_contains_current_file); if (!io_s.ok()) { s = io_s; } @@ -5120,8 +5120,8 @@ Status VersionSet::LogAndApply( const autovector& column_family_datas, const autovector& mutable_cf_options_list, const autovector>& edit_lists, - InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log, - const ColumnFamilyOptions* new_cf_options, + InstrumentedMutex* mu, FSDirectory* dir_contains_current_file, + bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options, const std::vector>& manifest_wcbs) { mu->AssertHeld(); int num_edits = 0; @@ -5195,9 +5195,8 @@ Status VersionSet::LogAndApply( } return Status::ColumnFamilyDropped(); } - - return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log, - new_cf_options); + return ProcessManifestWrites(writers, mu, dir_contains_current_file, + new_descriptor_log, new_cf_options); } void VersionSet::LogAndApplyCFHelper(VersionEdit* edit, diff --git a/db/version_set.h b/db/version_set.h index 90257b434..e6002f61c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1099,13 +1099,14 @@ class VersionSet { Status LogAndApplyToDefaultColumnFamily( VersionEdit* edit, InstrumentedMutex* mu, - FSDirectory* db_directory = nullptr, bool new_descriptor_log = false, + FSDirectory* dir_contains_current_file, bool new_descriptor_log = false, const ColumnFamilyOptions* column_family_options = nullptr) { ColumnFamilyData* default_cf = GetColumnFamilySet()->GetDefault(); const MutableCFOptions* cf_options = default_cf->GetLatestMutableCFOptions(); - return LogAndApply(default_cf, *cf_options, edit, mu, db_directory, - new_descriptor_log, column_family_options); + return LogAndApply(default_cf, *cf_options, edit, mu, + dir_contains_current_file, new_descriptor_log, + column_family_options); } // Apply *edit to the current version to form a new descriptor that @@ -1117,7 +1118,7 @@ class VersionSet { Status LogAndApply( ColumnFamilyData* column_family_data, const MutableCFOptions& mutable_cf_options, VersionEdit* edit, - InstrumentedMutex* mu, FSDirectory* db_directory = nullptr, + InstrumentedMutex* mu, FSDirectory* dir_contains_current_file, bool new_descriptor_log = false, const ColumnFamilyOptions* column_family_options = nullptr) { autovector cfds; @@ -1129,7 +1130,8 @@ class VersionSet { edit_list.emplace_back(edit); edit_lists.emplace_back(edit_list); return LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu, - db_directory, new_descriptor_log, column_family_options); + dir_contains_current_file, new_descriptor_log, + column_family_options); } // The batch version. If edit_list.size() > 1, caller must ensure that // no edit in the list column family add or drop @@ -1137,7 +1139,7 @@ class VersionSet { ColumnFamilyData* column_family_data, const MutableCFOptions& mutable_cf_options, const autovector& edit_list, InstrumentedMutex* mu, - FSDirectory* db_directory = nullptr, bool new_descriptor_log = false, + FSDirectory* dir_contains_current_file, bool new_descriptor_log = false, const ColumnFamilyOptions* column_family_options = nullptr, const std::function& manifest_wcb = {}) { autovector cfds; @@ -1147,8 +1149,8 @@ class VersionSet { autovector> edit_lists; edit_lists.emplace_back(edit_list); return LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu, - db_directory, new_descriptor_log, column_family_options, - {manifest_wcb}); + dir_contains_current_file, new_descriptor_log, + column_family_options, {manifest_wcb}); } // The across-multi-cf batch version. If edit_lists contain more than @@ -1158,7 +1160,7 @@ class VersionSet { const autovector& cfds, const autovector& mutable_cf_options_list, const autovector>& edit_lists, - InstrumentedMutex* mu, FSDirectory* db_directory = nullptr, + InstrumentedMutex* mu, FSDirectory* dir_contains_current_file, bool new_descriptor_log = false, const ColumnFamilyOptions* new_cf_options = nullptr, const std::vector>& manifest_wcbs = @@ -1574,7 +1576,8 @@ class VersionSet { private: // REQUIRES db mutex at beginning. may release and re-acquire db mutex Status ProcessManifestWrites(std::deque& writers, - InstrumentedMutex* mu, FSDirectory* db_directory, + InstrumentedMutex* mu, + FSDirectory* dir_contains_current_file, bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options); @@ -1636,7 +1639,7 @@ class ReactiveVersionSet : public VersionSet { const autovector& /*cfds*/, const autovector& /*mutable_cf_options_list*/, const autovector>& /*edit_lists*/, - InstrumentedMutex* /*mu*/, FSDirectory* /*db_directory*/, + InstrumentedMutex* /*mu*/, FSDirectory* /*dir_contains_current_file*/, bool /*new_descriptor_log*/, const ColumnFamilyOptions* /*new_cf_option*/, const std::vector>& /*manifest_wcbs*/) override { diff --git a/db/version_set_test.cc b/db/version_set_test.cc index cedd6fe98..906938b3b 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1272,7 +1272,7 @@ class VersionSetTestBase { mutex_.Lock(); Status s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), - mutable_cf_options_, &edit, &mutex_); + mutable_cf_options_, &edit, &mutex_, nullptr); mutex_.Unlock(); return s; } @@ -1286,7 +1286,7 @@ class VersionSetTestBase { mutex_.Lock(); Status s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), - mutable_cf_options_, vedits, &mutex_); + mutable_cf_options_, vedits, &mutex_, nullptr); mutex_.Unlock(); return s; } @@ -1384,8 +1384,8 @@ TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) { }); SyncPoint::GetInstance()->EnableProcessing(); mutex_.Lock(); - Status s = - versions_->LogAndApply(cfds, all_mutable_cf_options, edit_lists, &mutex_); + Status s = versions_->LogAndApply(cfds, all_mutable_cf_options, edit_lists, + &mutex_, nullptr); mutex_.Unlock(); EXPECT_OK(s); EXPECT_EQ(kGroupSize - 1, count); @@ -1587,7 +1587,7 @@ TEST_F(VersionSetTest, ObsoleteBlobFile) { mutex_.Lock(); Status s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), - mutable_cf_options_, &edit, &mutex_); + mutable_cf_options_, &edit, &mutex_, nullptr); mutex_.Unlock(); ASSERT_OK(s); @@ -2194,7 +2194,7 @@ class VersionSetWithTimestampTest : public VersionSetTest { Status s; mutex_.Lock(); s = versions_->LogAndApply(cfd_, *(cfd_->GetLatestMutableCFOptions()), - edits_, &mutex_); + edits_, &mutex_, nullptr); mutex_.Unlock(); ASSERT_OK(s); VerifyFullHistoryTsLow(*std::max_element(ts_lbs.begin(), ts_lbs.end())); @@ -2661,7 +2661,7 @@ TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) { mutex_.Lock(); s = versions_->LogAndApply(cfd_to_drop, *cfd_to_drop->GetLatestMutableCFOptions(), - &drop_cf_edit, &mutex_); + &drop_cf_edit, &mutex_, nullptr); mutex_.Unlock(); ASSERT_OK(s); @@ -2710,8 +2710,8 @@ TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) { }); SyncPoint::GetInstance()->EnableProcessing(); mutex_.Lock(); - s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists, - &mutex_); + s = versions_->LogAndApply(cfds, mutable_cf_options_list, edit_lists, &mutex_, + nullptr); mutex_.Unlock(); ASSERT_OK(s); ASSERT_EQ(1, called); diff --git a/db/version_util.h b/db/version_util.h index 84d151e6c..6a809834d 100644 --- a/db/version_util.h +++ b/db/version_util.h @@ -31,12 +31,13 @@ class OfflineManifestWriter { return versions_.Recover(column_families); } - Status LogAndApply(ColumnFamilyData* cfd, VersionEdit* edit) { + Status LogAndApply(ColumnFamilyData* cfd, VersionEdit* edit, + FSDirectory* dir_contains_current_file) { // Use `mutex` to imitate a locked DB mutex when calling `LogAndApply()`. InstrumentedMutex mutex; mutex.Lock(); Status s = versions_.LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), - edit, &mutex, nullptr /* db_directory */, + edit, &mutex, dir_contains_current_file, false /* new_descriptor_log */); mutex.Unlock(); return s; diff --git a/file/filename.cc b/file/filename.cc index 703167c88..b771e0813 100644 --- a/file/filename.cc +++ b/file/filename.cc @@ -388,7 +388,7 @@ bool ParseFileName(const std::string& fname, uint64_t* number, IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname, uint64_t descriptor_number, - FSDirectory* directory_to_fsync) { + FSDirectory* dir_contains_current_file) { // Remove leading "dbname/" and add newline to manifest file name std::string manifest = DescriptorFileName(dbname, descriptor_number); Slice contents = manifest; @@ -404,8 +404,8 @@ IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname, TEST_SYNC_POINT_CALLBACK("SetCurrentFile:AfterRename", &s); } if (s.ok()) { - if (directory_to_fsync != nullptr) { - s = directory_to_fsync->FsyncWithDirOptions( + if (dir_contains_current_file != nullptr) { + s = dir_contains_current_file->FsyncWithDirOptions( IOOptions(), nullptr, DirFsyncOptions(CurrentFileName(dbname))); } } else { diff --git a/file/filename.h b/file/filename.h index b2f1567af..6d45e5210 100644 --- a/file/filename.h +++ b/file/filename.h @@ -160,10 +160,12 @@ extern bool ParseFileName(const std::string& filename, uint64_t* number, FileType* type, WalFileType* log_type = nullptr); // Make the CURRENT file point to the descriptor file with the -// specified number. +// specified number. On its success and when dir_contains_current_file is not +// nullptr, the function will fsync the directory containing the CURRENT file +// when extern IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname, uint64_t descriptor_number, - FSDirectory* directory_to_fsync); + FSDirectory* dir_contains_current_file); // Make the IDENTITY file for the db extern Status SetIdentityFile(Env* env, const std::string& dbname, diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index fcede0621..48e6b0d02 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -4196,7 +4196,12 @@ void UnsafeRemoveSstFileCommand::DoCommand() { VersionEdit edit; edit.SetColumnFamily(cfd->GetID()); edit.DeleteFile(level, sst_file_number_); - s = w.LogAndApply(cfd, &edit); + std::unique_ptr db_dir; + s = options_.env->GetFileSystem()->NewDirectory(db_path_, IOOptions(), + &db_dir, nullptr); + if (s.ok()) { + s = w.LogAndApply(cfd, &edit, db_dir.get()); + } } if (!s.ok()) {