diff --git a/db/db_impl.cc b/db/db_impl.cc index a583d3392..393b2d2e9 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -216,6 +216,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, // as well. use_custom_gc_(seq_per_batch), shutdown_initiated_(false), + own_sfm_(options.sst_file_manager == nullptr), preserve_deletes_(options.preserve_deletes), closed_(false), error_handler_(this, immutable_db_options_, &mutex_) { @@ -520,6 +521,17 @@ Status DBImpl::CloseHelper() { ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete"); LogFlush(immutable_db_options_.info_log); +#ifndef ROCKSDB_LITE + // If the sst_file_manager was allocated by us during DB::Open(), ccall + // Close() on it before closing the info_log. Otherwise, background thread + // in SstFileManagerImpl might try to log something + if (immutable_db_options_.sst_file_manager && own_sfm_) { + auto sfm = static_cast( + immutable_db_options_.sst_file_manager.get()); + sfm->Close(); + } +#endif // ROCKSDB_LITE + if (immutable_db_options_.info_log && own_info_log_) { Status s = immutable_db_options_.info_log->Close(); if (ret.ok()) { diff --git a/db/db_impl.h b/db/db_impl.h index ed8591990..ca1072442 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -1541,6 +1541,9 @@ class DBImpl : public DB { // is set a little later during the shutdown after scheduling memtable // flushes bool shutdown_initiated_; + // Flag to indicate whether sst_file_manager object was allocated in + // DB::Open() or passed to us + bool own_sfm_; // Clients must periodically call SetPreserveDeletesSequenceNumber() // to advance this seqnum. Default value is 0 which means ALL deletes are diff --git a/db/error_handler_test.cc b/db/error_handler_test.cc index 6efba2987..d33e19df5 100644 --- a/db/error_handler_test.cc +++ b/db/error_handler_test.cc @@ -48,7 +48,8 @@ class ErrorHandlerListener : public EventListener { file_count_(0), fault_env_(nullptr) {} - void OnTableFileCreationStarted(const TableFileCreationBriefInfo& /*ti*/) { + void OnTableFileCreationStarted( + const TableFileCreationBriefInfo& /*ti*/) override { InstrumentedMutexLock l(&mutex_); file_creation_started_ = true; if (file_count_ > 0) { @@ -61,13 +62,14 @@ class ErrorHandlerListener : public EventListener { } void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/, - Status /*bg_error*/, bool* auto_recovery) { + Status /*bg_error*/, + bool* auto_recovery) override { if (*auto_recovery && no_auto_recovery_) { *auto_recovery = false; } } - void OnErrorRecoveryCompleted(Status /*old_bg_error*/) { + void OnErrorRecoveryCompleted(Status /*old_bg_error*/) override { InstrumentedMutexLock l(&mutex_); recovery_complete_ = true; cv_.SignalAll(); @@ -237,7 +239,6 @@ TEST_F(DBErrorHandlingTest, CorruptionError) { Destroy(options); } -#ifndef TRAVIS TEST_F(DBErrorHandlingTest, AutoRecoverFlushError) { std::unique_ptr fault_env( new FaultInjectionTestEnv(Env::Default())); @@ -307,17 +308,16 @@ TEST_F(DBErrorHandlingTest, WALWriteError) { options.env = fault_env.get(); options.listeners.emplace_back(listener); Status s; + Random rnd(301); listener->EnableAutoRecovery(); DestroyAndReopen(options); { WriteBatch batch; - char val[1024]; for (auto i = 0; i<100; ++i) { - sprintf(val, "%d", i); - batch.Put(Key(i), Slice(val, sizeof(val))); + batch.Put(Key(i), RandomString(&rnd, 1024)); } WriteOptions wopts; @@ -327,12 +327,10 @@ TEST_F(DBErrorHandlingTest, WALWriteError) { { WriteBatch batch; - char val[1024]; int write_error = 0; for (auto i = 100; i<199; ++i) { - sprintf(val, "%d", i); - batch.Put(Key(i), Slice(val, sizeof(val))); + batch.Put(Key(i), RandomString(&rnd, 1024)); } SyncPoint::GetInstance()->SetCallBack("WritableFileWriter::Append:BeforePrepareWrite", [&](void*) { @@ -378,18 +376,17 @@ TEST_F(DBErrorHandlingTest, MultiCFWALWriteError) { options.env = fault_env.get(); options.listeners.emplace_back(listener); Status s; + Random rnd(301); listener->EnableAutoRecovery(); CreateAndReopenWithCF({"one", "two", "three"}, options); { WriteBatch batch; - char val[1024]; for (auto i = 1; i < 4; ++i) { for (auto j = 0; j < 100; ++j) { - sprintf(val, "%d", j); - batch.Put(handles_[i], Key(j), Slice(val, sizeof(val))); + batch.Put(handles_[i], Key(j), RandomString(&rnd, 1024)); } } @@ -400,13 +397,11 @@ TEST_F(DBErrorHandlingTest, MultiCFWALWriteError) { { WriteBatch batch; - char val[1024]; int write_error = 0; // Write to one CF for (auto i = 100; i < 199; ++i) { - sprintf(val, "%d", i); - batch.Put(handles_[2], Key(i), Slice(val, sizeof(val))); + batch.Put(handles_[2], Key(i), RandomString(&rnd, 1024)); } SyncPoint::GetInstance()->SetCallBack( @@ -462,6 +457,7 @@ TEST_F(DBErrorHandlingTest, MultiDBCompactionError) { std::vector db; std::shared_ptr sfm(NewSstFileManager(def_env)); int kNumDbInstances = 3; + Random rnd(301); for (auto i = 0; i < kNumDbInstances; ++i) { listener.emplace_back(new ErrorHandlerListener()); @@ -489,11 +485,9 @@ TEST_F(DBErrorHandlingTest, MultiDBCompactionError) { for (auto i = 0; i < kNumDbInstances; ++i) { WriteBatch batch; - char val[1024]; for (auto j = 0; j <= 100; ++j) { - sprintf(val, "%d", j); - batch.Put(Key(j), Slice(val, sizeof(val))); + batch.Put(Key(j), RandomString(&rnd, 1024)); } WriteOptions wopts; @@ -505,12 +499,10 @@ TEST_F(DBErrorHandlingTest, MultiDBCompactionError) { def_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); for (auto i = 0; i < kNumDbInstances; ++i) { WriteBatch batch; - char val[1024]; // Write to one CF for (auto j = 100; j < 199; ++j) { - sprintf(val, "%d", j); - batch.Put(Key(j), Slice(val, sizeof(val))); + batch.Put(Key(j), RandomString(&rnd, 1024)); } WriteOptions wopts; @@ -561,6 +553,7 @@ TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) { std::vector db; std::shared_ptr sfm(NewSstFileManager(def_env)); int kNumDbInstances = 3; + Random rnd(301); for (auto i = 0; i < kNumDbInstances; ++i) { listener.emplace_back(new ErrorHandlerListener()); @@ -600,11 +593,9 @@ TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) { for (auto i = 0; i < kNumDbInstances; ++i) { WriteBatch batch; - char val[1024]; for (auto j = 0; j <= 100; ++j) { - sprintf(val, "%d", j); - batch.Put(Key(j), Slice(val, sizeof(val))); + batch.Put(Key(j), RandomString(&rnd, 1024)); } WriteOptions wopts; @@ -616,12 +607,10 @@ TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) { def_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); for (auto i = 0; i < kNumDbInstances; ++i) { WriteBatch batch; - char val[1024]; // Write to one CF for (auto j = 100; j < 199; ++j) { - sprintf(val, "%d", j); - batch.Put(Key(j), Slice(val, sizeof(val))); + batch.Put(Key(j), RandomString(&rnd, 1024)); } WriteOptions wopts; @@ -682,7 +671,6 @@ TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) { options.clear(); delete def_env; } -#endif } // namespace rocksdb diff --git a/util/sst_file_manager_impl.cc b/util/sst_file_manager_impl.cc index a1280622b..ee1394bc9 100644 --- a/util/sst_file_manager_impl.cc +++ b/util/sst_file_manager_impl.cc @@ -39,8 +39,15 @@ SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr logger, } SstFileManagerImpl::~SstFileManagerImpl() { + Close(); +} + +void SstFileManagerImpl::Close() { { MutexLock l(&mu_); + if (closing_) { + return; + } closing_ = true; cv_.SignalAll(); } diff --git a/util/sst_file_manager_impl.h b/util/sst_file_manager_impl.h index 8c8015310..d11035df8 100644 --- a/util/sst_file_manager_impl.h +++ b/util/sst_file_manager_impl.h @@ -121,6 +121,10 @@ class SstFileManagerImpl : public SstFileManager { DeleteScheduler* delete_scheduler() { return &delete_scheduler_; } + // Stop the error recovery background thread. This should be called only + // once in the object's lifetime, and before the destructor + void Close(); + private: // REQUIRES: mutex locked void OnAddFileImpl(const std::string& file_path, uint64_t file_size,