From d0f1b49ab600bdfb37e1226c03fed5ae4ab2da7b Mon Sep 17 00:00:00 2001 From: Anand Ananthabhotla Date: Tue, 16 Jan 2018 10:57:56 -0800 Subject: [PATCH] Add a Close() method to DB to return status when closing a db Summary: Currently, the only way to close an open DB is to destroy the DB object. There is no way for the caller to know the status. In one instance, the destructor encountered an error due to failure to close a log file on HDFS. In order to prevent silent failures, we add DB::Close() that calls CloseImpl() which must be implemented by its descendants. The main failure point in the destructor is closing the log file. This patch also adds a Close() entry point to Logger in order to get status. When DBOptions::info_log is allocated and owned by the DBImpl, it is explicitly closed by DBImpl::CloseImpl(). Closes https://github.com/facebook/rocksdb/pull/3348 Differential Revision: D6698158 Pulled By: anand1976 fbshipit-source-id: 9468e2892553eb09c4c41b8723f590c0dbd8ab7d --- db/db_basic_test.cc | 49 ++++++++++++++++++++++++ db/db_impl.cc | 24 ++++++++++-- db/db_impl.h | 10 +++++ db/db_test.cc | 2 + env/env.cc | 11 +++++- env/env_hdfs.cc | 16 +++++--- env/posix_logger.h | 14 +++++-- include/rocksdb/db.h | 6 +++ include/rocksdb/env.h | 7 +++- include/rocksdb/utilities/stackable_db.h | 2 + util/auto_roll_logger.h | 8 ++++ util/auto_roll_logger_test.cc | 39 +++++++++++++++++++ 12 files changed, 174 insertions(+), 14 deletions(-) diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 8f2679cae..c3ea831d7 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -847,6 +847,55 @@ TEST_F(DBBasicTest, MmapAndBufferOptions) { } #endif +class TestEnv : public EnvWrapper { + public: + explicit TestEnv(Env* base) : EnvWrapper(base) { }; + + class TestLogger : public Logger { + public: + using Logger::Logv; + virtual void Logv(const char *format, va_list ap) override { }; + private: + virtual Status CloseImpl() override { + return Status::NotSupported(); + } + }; + + virtual Status NewLogger(const std::string& fname, + shared_ptr* result) { + result->reset(new TestLogger()); + return Status::OK(); + } +}; + +TEST_F(DBBasicTest, DBClose) { + Options options = GetDefaultOptions(); + std::string dbname = test::TmpDir(env_) + "/db_close_test"; + ASSERT_OK(DestroyDB(dbname, options)); + + DB* db = nullptr; + options.create_if_missing = true; + options.env = new TestEnv(Env::Default()); + Status s = DB::Open(options, dbname, &db); + ASSERT_OK(s); + ASSERT_TRUE(db != nullptr); + + s = db->Close(); + ASSERT_EQ(s, Status::NotSupported()); + + delete db; + + // Provide our own logger and ensure DB::Close() does not close it + options.info_log.reset(new TestEnv::TestLogger()); + options.create_if_missing = false; + s = DB::Open(options, dbname, &db); + ASSERT_OK(s); + ASSERT_TRUE(db != nullptr); + + s = db->Close(); + ASSERT_EQ(s, Status::OK()); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index f0bba0ae6..1bc8ce7b3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -141,6 +141,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, const bool seq_per_batch) : env_(options.env), dbname_(dbname), + own_info_log_(options.info_log == nullptr), initial_db_options_(SanitizeOptions(dbname, options)), immutable_db_options_(initial_db_options_), mutable_db_options_(initial_db_options_), @@ -212,7 +213,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, // requires a custom gc for compaction, we use that to set use_custom_gc_ // as well. use_custom_gc_(seq_per_batch), - preserve_deletes_(options.preserve_deletes) { + preserve_deletes_(options.preserve_deletes), + closed_(false) { env_->GetAbsolutePath(dbname, &db_absolute_path_); // Reserve ten files or so for other uses and give the rest to TableCache. @@ -275,7 +277,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { } } -DBImpl::~DBImpl() { +Status DBImpl::CloseImpl() { // CancelAllBackgroundWork called with false means we just set the shutdown // marker. After this we do a variant of the waiting and unschedule work // (to consider: moving all the waiting into CancelAllBackgroundWork(true)) @@ -378,8 +380,16 @@ DBImpl::~DBImpl() { ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete"); LogFlush(immutable_db_options_.info_log); + + Status s = Status::OK(); + if (immutable_db_options_.info_log && own_info_log_) { + s = immutable_db_options_.info_log->Close(); + } + return s; } +DBImpl::~DBImpl() { Close(); } + void DBImpl::MaybeIgnoreError(Status* s) const { if (s->ok() || immutable_db_options_.paranoid_checks) { // No change needed @@ -2320,7 +2330,15 @@ Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) { return Status::OK(); } -DB::~DB() { } +DB::~DB() {} + +Status DBImpl::Close() { + if (!closed_) { + closed_ = true; + return CloseImpl(); + } + return Status::OK(); +} Status DB::ListColumnFamilies(const DBOptions& db_options, const std::string& name, diff --git a/db/db_impl.h b/db/db_impl.h index 91f75701c..0d9cf83a6 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -611,10 +611,14 @@ class DBImpl : public DB { std::vector* handles, DB** dbptr, const bool seq_per_batch); + virtual Status Close() override; + protected: Env* const env_; const std::string dbname_; unique_ptr versions_; + // Flag to check whether we allocated and own the info log file + bool own_info_log_; const DBOptions initial_db_options_; const ImmutableDBOptions immutable_db_options_; MutableDBOptions mutable_db_options_; @@ -912,6 +916,9 @@ class DBImpl : public DB { uint64_t GetMaxTotalWalSize() const; + // Actual implementation of Close() + virtual Status CloseImpl(); + // table_cache_ provides its own synchronization std::shared_ptr table_cache_; @@ -1359,6 +1366,9 @@ class DBImpl : public DB { // is set to false. std::atomic preserve_deletes_seqnum_; const bool preserve_deletes_; + + // Flag to check whether Close() has been called on this DB + bool closed_; }; extern Options SanitizeOptions(const std::string& db, diff --git a/db/db_test.cc b/db/db_test.cc index b8d44622b..10941204f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2191,6 +2191,8 @@ class ModelDB : public DB { batch.Put(cf, k, v); return Write(o, &batch); } + using DB::Close; + virtual Status Close() { return Status::OK(); } using DB::Delete; virtual Status Delete(const WriteOptions& o, ColumnFamilyHandle* cf, const Slice& key) override { diff --git a/env/env.cc b/env/env.cc index 0b2a6cf98..aef98f99d 100644 --- a/env/env.cc +++ b/env/env.cc @@ -73,9 +73,18 @@ RandomAccessFile::~RandomAccessFile() { WritableFile::~WritableFile() { } -Logger::~Logger() { +Logger::~Logger() { Close(); } + +Status Logger::Close() { + if (!closed_) { + closed_ = true; + return CloseImpl(); + } + return Status::OK(); } +Status Logger::CloseImpl() { return Status::OK(); } + FileLock::~FileLock() { } diff --git a/env/env_hdfs.cc b/env/env_hdfs.cc index d98020c76..883359500 100644 --- a/env/env_hdfs.cc +++ b/env/env_hdfs.cc @@ -277,6 +277,16 @@ class HdfsLogger : public Logger { HdfsWritableFile* file_; uint64_t (*gettid_)(); // Return the thread id for the current thread + virtual Status CloseImpl() { + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n", + file_->getName().c_str()); + Status s = file_->Close(); + if (mylog != nullptr && mylog == this) { + mylog = nullptr; + } + return s; + } + public: HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)()) : file_(f), gettid_(gettid) { @@ -285,12 +295,6 @@ class HdfsLogger : public Logger { } virtual ~HdfsLogger() { - ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n", - file_->getName().c_str()); - delete file_; - if (mylog != nullptr && mylog == this) { - mylog = nullptr; - } } virtual void Logv(const char* format, va_list ap) { diff --git a/env/posix_logger.h b/env/posix_logger.h index 3ec6f574a..7cfcfe43d 100644 --- a/env/posix_logger.h +++ b/env/posix_logger.h @@ -24,6 +24,7 @@ #endif #include +#include "env/io_posix.h" #include "monitoring/iostats_context_imp.h" #include "rocksdb/env.h" #include "util/sync_point.h" @@ -32,6 +33,15 @@ namespace rocksdb { class PosixLogger : public Logger { private: + virtual Status CloseImpl() override { + int ret; + + ret = fclose(file_); + if (ret) { + return IOError("Unable to close log file", "", ret); + } + return Status::OK(); + } FILE* file_; uint64_t (*gettid_)(); // Return the thread id for the current thread std::atomic_size_t log_size_; @@ -51,9 +61,7 @@ class PosixLogger : public Logger { last_flush_micros_(0), env_(env), flush_pending_(false) {} - virtual ~PosixLogger() { - fclose(file_); - } + virtual ~PosixLogger() { Close(); } virtual void Flush() override { TEST_SYNC_POINT("PosixLogger::Flush:Begin1"); TEST_SYNC_POINT("PosixLogger::Flush:Begin2"); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 1e54c9a27..738f1705c 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -163,6 +163,12 @@ class DB { const std::vector& column_families, std::vector* handles, DB** dbptr); + // Close the DB by releasing resources, closing files etc. This should be + // called before calling the desctructor so that the caller can get back a + // status in case there are any errors. Regardless of the return status, the + // DB must be freed + virtual Status Close() { return Status::OK(); } + // ListColumnFamilies will open the DB specified by argument name // and return the list of all column families in that DB // through column_families argument. The ordering of diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 81dc60a4d..a29bf35b8 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -819,9 +819,12 @@ class Logger { size_t kDoNotSupportGetLogFileSize = (std::numeric_limits::max)(); explicit Logger(const InfoLogLevel log_level = InfoLogLevel::INFO_LEVEL) - : log_level_(log_level) {} + : closed_(false), log_level_(log_level) {} virtual ~Logger(); + // Close the log file. Must be called before destructor + virtual Status Close(); + // Write a header to the log file with the specified format // It is recommended that you log all header information at the start of the // application. But it is not enforced. @@ -852,6 +855,8 @@ class Logger { // No copying allowed Logger(const Logger&); void operator=(const Logger&); + virtual Status CloseImpl(); + bool closed_; InfoLogLevel log_level_; }; diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 1035e6f5d..06bf908bd 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -25,6 +25,8 @@ class StackableDB : public DB { delete db_; } + virtual Status Close() override { return db_->Close(); } + virtual DB* GetBaseDB() { return db_; } diff --git a/util/auto_roll_logger.h b/util/auto_roll_logger.h index 2f1f943d6..19d2fe210 100644 --- a/util/auto_roll_logger.h +++ b/util/auto_roll_logger.h @@ -103,6 +103,14 @@ class AutoRollLogger : public Logger { std::string ValistToString(const char* format, va_list args) const; // Write the logs marked as headers to the new log file void WriteHeaderInfo(); + // Implementation of Close() + virtual Status CloseImpl() override { + if (logger_) { + return logger_->Close(); + } else { + return Status::OK(); + } + } std::string log_fname_; // Current active info log's file name. std::string dbname_; diff --git a/util/auto_roll_logger_test.cc b/util/auto_roll_logger_test.cc index 9b39748ce..3bc3463ab 100644 --- a/util/auto_roll_logger_test.cc +++ b/util/auto_roll_logger_test.cc @@ -354,6 +354,45 @@ TEST_F(AutoRollLoggerTest, InfoLogLevel) { inFile.close(); } +TEST_F(AutoRollLoggerTest, Close) { + InitTestDb(); + + size_t log_size = 8192; + size_t log_lines = 0; + AutoRollLogger logger(Env::Default(), kTestDir, "", log_size, 0); + for (int log_level = InfoLogLevel::HEADER_LEVEL; + log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) { + logger.SetInfoLogLevel((InfoLogLevel)log_level); + for (int log_type = InfoLogLevel::DEBUG_LEVEL; + log_type <= InfoLogLevel::HEADER_LEVEL; log_type++) { + // log messages with log level smaller than log_level will not be + // logged. + LogMessage((InfoLogLevel)log_type, &logger, kSampleMessage.c_str()); + } + log_lines += InfoLogLevel::HEADER_LEVEL - log_level + 1; + } + for (int log_level = InfoLogLevel::HEADER_LEVEL; + log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) { + logger.SetInfoLogLevel((InfoLogLevel)log_level); + + // again, messages with level smaller than log_level will not be logged. + ROCKS_LOG_HEADER(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_DEBUG(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_INFO(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_WARN(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_ERROR(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_FATAL(&logger, "%s", kSampleMessage.c_str()); + log_lines += InfoLogLevel::HEADER_LEVEL - log_level + 1; + } + ASSERT_EQ(logger.Close(), Status::OK()); + + std::ifstream inFile(AutoRollLoggerTest::kLogFile.c_str()); + size_t lines = std::count(std::istreambuf_iterator(inFile), + std::istreambuf_iterator(), '\n'); + ASSERT_EQ(log_lines, lines); + inFile.close(); +} + // Test the logger Header function for roll over logs // We expect the new logs creates as roll over to carry the headers specified static std::vector GetOldFileNames(const std::string& path) {