Fix the Logger::Close() and DBImpl::Close() design pattern

Summary:
The recent Logger::Close() and DBImpl::Close() implementation rely on
calling the CloseImpl() virtual function from the destructor, which will
not work. Refactor the implementation to have a private close helper
function in derived classes that can be called by both CloseImpl() and
the destructor.
Closes https://github.com/facebook/rocksdb/pull/3528

Reviewed By: gfosco

Differential Revision: D7049303

Pulled By: anand1976

fbshipit-source-id: 76a64cbf403209216dfe4864ecf96b5d7f3db9f4
main
Anand Ananthabhotla 7 years ago committed by Facebook Github Bot
parent 30649dc6a1
commit dfbe52e099
  1. 48
      db/db_basic_test.cc
  2. 13
      db/db_impl.cc
  3. 6
      db/db_impl.h
  4. 7
      env/env.cc
  5. 11
      env/env_hdfs.cc
  6. 68
      env/env_test.cc
  7. 15
      env/posix_logger.h
  8. 6
      include/rocksdb/db.h
  9. 10
      include/rocksdb/env.h
  10. 22
      util/auto_roll_logger.h

@ -850,23 +850,43 @@ TEST_F(DBBasicTest, MmapAndBufferOptions) {
class TestEnv : public EnvWrapper { class TestEnv : public EnvWrapper {
public: public:
explicit TestEnv(Env* base) : EnvWrapper(base) { }; explicit TestEnv() : EnvWrapper(Env::Default()),
close_count(0) { }
class TestLogger : public Logger { class TestLogger : public Logger {
public: public:
using Logger::Logv; using Logger::Logv;
TestLogger(TestEnv *env_ptr) : Logger() { env = env_ptr; }
~TestLogger() {
if (!closed_) {
CloseHelper();
}
}
virtual void Logv(const char *format, va_list ap) override { }; virtual void Logv(const char *format, va_list ap) override { };
private: protected:
virtual Status CloseImpl() override { virtual Status CloseImpl() override {
return Status::NotSupported(); return CloseHelper();
}
private:
Status CloseHelper() {
env->CloseCountInc();;
return Status::IOError();
} }
TestEnv *env;
}; };
void CloseCountInc() { close_count++; }
int GetCloseCount() { return close_count; }
virtual Status NewLogger(const std::string& fname, virtual Status NewLogger(const std::string& fname,
shared_ptr<Logger>* result) { shared_ptr<Logger>* result) {
result->reset(new TestLogger()); result->reset(new TestLogger(this));
return Status::OK(); return Status::OK();
} }
private:
int close_count;
}; };
TEST_F(DBBasicTest, DBClose) { TEST_F(DBBasicTest, DBClose) {
@ -875,19 +895,29 @@ TEST_F(DBBasicTest, DBClose) {
ASSERT_OK(DestroyDB(dbname, options)); ASSERT_OK(DestroyDB(dbname, options));
DB* db = nullptr; DB* db = nullptr;
TestEnv *env = new TestEnv();
options.create_if_missing = true; options.create_if_missing = true;
options.env = new TestEnv(Env::Default()); options.env = env;
Status s = DB::Open(options, dbname, &db); Status s = DB::Open(options, dbname, &db);
ASSERT_OK(s); ASSERT_OK(s);
ASSERT_TRUE(db != nullptr); ASSERT_TRUE(db != nullptr);
s = db->Close(); s = db->Close();
ASSERT_EQ(s, Status::NotSupported()); ASSERT_EQ(env->GetCloseCount(), 1);
ASSERT_EQ(s, Status::IOError());
delete db; delete db;
ASSERT_EQ(env->GetCloseCount(), 1);
// Do not call DB::Close() and ensure our logger Close() still gets called
s = DB::Open(options, dbname, &db);
ASSERT_OK(s);
ASSERT_TRUE(db != nullptr);
delete db;
ASSERT_EQ(env->GetCloseCount(), 2);
// Provide our own logger and ensure DB::Close() does not close it // Provide our own logger and ensure DB::Close() does not close it
options.info_log.reset(new TestEnv::TestLogger()); options.info_log.reset(new TestEnv::TestLogger(env));
options.create_if_missing = false; options.create_if_missing = false;
s = DB::Open(options, dbname, &db); s = DB::Open(options, dbname, &db);
ASSERT_OK(s); ASSERT_OK(s);
@ -896,6 +926,10 @@ TEST_F(DBBasicTest, DBClose) {
s = db->Close(); s = db->Close();
ASSERT_EQ(s, Status::OK()); ASSERT_EQ(s, Status::OK());
delete db; delete db;
ASSERT_EQ(env->GetCloseCount(), 2);
options.info_log.reset();
ASSERT_EQ(env->GetCloseCount(), 3);
delete options.env; delete options.env;
} }

@ -278,7 +278,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
} }
} }
Status DBImpl::CloseImpl() { Status DBImpl::CloseHelper() {
// CancelAllBackgroundWork called with false means we just set the shutdown // CancelAllBackgroundWork called with false means we just set the shutdown
// marker. After this we do a variant of the waiting and unschedule work // marker. After this we do a variant of the waiting and unschedule work
// (to consider: moving all the waiting into CancelAllBackgroundWork(true)) // (to consider: moving all the waiting into CancelAllBackgroundWork(true))
@ -404,7 +404,16 @@ Status DBImpl::CloseImpl() {
return ret; return ret;
} }
DBImpl::~DBImpl() { Close(); } Status DBImpl::CloseImpl() {
return CloseHelper();
}
DBImpl::~DBImpl() {
if (!closed_) {
closed_ = true;
CloseHelper();
}
}
void DBImpl::MaybeIgnoreError(Status* s) const { void DBImpl::MaybeIgnoreError(Status* s) const {
if (s->ok() || immutable_db_options_.paranoid_checks) { if (s->ok() || immutable_db_options_.paranoid_checks) {

@ -704,6 +704,9 @@ class DBImpl : public DB {
// The writer must be the leader in write_thread_ and holding mutex_ // The writer must be the leader in write_thread_ and holding mutex_
Status WriteRecoverableState(); Status WriteRecoverableState();
// Actual implementation of Close()
Status CloseImpl();
private: private:
friend class DB; friend class DB;
friend class InternalStats; friend class InternalStats;
@ -930,8 +933,7 @@ class DBImpl : public DB {
uint64_t GetMaxTotalWalSize() const; uint64_t GetMaxTotalWalSize() const;
// Actual implementation of Close() Status CloseHelper();
virtual Status CloseImpl();
// table_cache_ provides its own synchronization // table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_; std::shared_ptr<Cache> table_cache_;

7
env/env.cc vendored

@ -73,17 +73,18 @@ RandomAccessFile::~RandomAccessFile() {
WritableFile::~WritableFile() { WritableFile::~WritableFile() {
} }
Logger::~Logger() { Close(); } Logger::~Logger() { }
Status Logger::Close() { Status Logger::Close() {
if (!closed_) { if (!closed_) {
closed_ = true; closed_ = true;
return CloseImpl(); return CloseImpl();
} } else {
return Status::OK(); return Status::OK();
}
} }
Status Logger::CloseImpl() { return Status::OK(); } Status Logger::CloseImpl() { return Status::NotSupported(); }
FileLock::~FileLock() { FileLock::~FileLock() {
} }

11
env/env_hdfs.cc vendored

@ -277,7 +277,7 @@ class HdfsLogger : public Logger {
HdfsWritableFile* file_; HdfsWritableFile* file_;
uint64_t (*gettid_)(); // Return the thread id for the current thread uint64_t (*gettid_)(); // Return the thread id for the current thread
virtual Status CloseImpl() { Status HdfsCloseHelper() {
ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n", ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
file_->getName().c_str()); file_->getName().c_str());
Status s = file_->Close(); Status s = file_->Close();
@ -287,6 +287,11 @@ class HdfsLogger : public Logger {
return s; return s;
} }
protected:
virtual Status CloseImpl() override {
return HdfsCloseHelper();
}
public: public:
HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)()) HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
: file_(f), gettid_(gettid) { : file_(f), gettid_(gettid) {
@ -295,6 +300,10 @@ class HdfsLogger : public Logger {
} }
virtual ~HdfsLogger() { virtual ~HdfsLogger() {
if (!closed_) {
closed_ = true;
HdfsCloseHelper();
}
} }
virtual void Logv(const char* format, va_list ap) { virtual void Logv(const char* format, va_list ap) {

68
env/env_test.cc vendored

@ -1475,6 +1475,74 @@ TEST_P(EnvPosixTestWithParam, PosixRandomRWFileRandomized) {
env_->DeleteFile(path); env_->DeleteFile(path);
} }
class TestEnv : public EnvWrapper {
public:
explicit TestEnv() : EnvWrapper(Env::Default()),
close_count(0) { }
class TestLogger : public Logger {
public:
using Logger::Logv;
TestLogger(TestEnv *env_ptr) : Logger() { env = env_ptr; }
~TestLogger() {
if (!closed_) {
CloseHelper();
}
}
virtual void Logv(const char *format, va_list ap) override { };
protected:
virtual Status CloseImpl() override {
return CloseHelper();
}
private:
Status CloseHelper() {
env->CloseCountInc();;
return Status::OK();
}
TestEnv *env;
};
void CloseCountInc() { close_count++; }
int GetCloseCount() { return close_count; }
virtual Status NewLogger(const std::string& fname,
shared_ptr<Logger>* result) {
result->reset(new TestLogger(this));
return Status::OK();
}
private:
int close_count;
};
class EnvTest : public testing::Test {
};
TEST_F(EnvTest, Close) {
TestEnv *env = new TestEnv();
std::shared_ptr<Logger> logger;
Status s;
s = env->NewLogger("", &logger);
ASSERT_EQ(s, Status::OK());
logger.get()->Close();
ASSERT_EQ(env->GetCloseCount(), 1);
// Call Close() again. CloseHelper() should not be called again
logger.get()->Close();
ASSERT_EQ(env->GetCloseCount(), 1);
logger.reset();
ASSERT_EQ(env->GetCloseCount(), 1);
s = env->NewLogger("", &logger);
ASSERT_EQ(s, Status::OK());
logger.reset();
ASSERT_EQ(env->GetCloseCount(), 2);
delete env;
}
INSTANTIATE_TEST_CASE_P(DefaultEnvWithoutDirectIO, EnvPosixTestWithParam, INSTANTIATE_TEST_CASE_P(DefaultEnvWithoutDirectIO, EnvPosixTestWithParam,
::testing::Values(std::pair<Env*, bool>(Env::Default(), ::testing::Values(std::pair<Env*, bool>(Env::Default(),
false))); false)));

15
env/posix_logger.h vendored

@ -33,7 +33,7 @@ namespace rocksdb {
class PosixLogger : public Logger { class PosixLogger : public Logger {
private: private:
virtual Status CloseImpl() override { Status PosixCloseHelper() {
int ret; int ret;
ret = fclose(file_); ret = fclose(file_);
@ -50,6 +50,12 @@ class PosixLogger : public Logger {
std::atomic_uint_fast64_t last_flush_micros_; std::atomic_uint_fast64_t last_flush_micros_;
Env* env_; Env* env_;
std::atomic<bool> flush_pending_; std::atomic<bool> flush_pending_;
protected:
virtual Status CloseImpl() override {
return PosixCloseHelper();
}
public: public:
PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env, PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env,
const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL) const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL)
@ -61,7 +67,12 @@ class PosixLogger : public Logger {
last_flush_micros_(0), last_flush_micros_(0),
env_(env), env_(env),
flush_pending_(false) {} flush_pending_(false) {}
virtual ~PosixLogger() { Close(); } virtual ~PosixLogger() {
if (!closed_) {
closed_ = true;
PosixCloseHelper();
}
}
virtual void Flush() override { virtual void Flush() override {
TEST_SYNC_POINT("PosixLogger::Flush:Begin1"); TEST_SYNC_POINT("PosixLogger::Flush:Begin1");
TEST_SYNC_POINT("PosixLogger::Flush:Begin2"); TEST_SYNC_POINT("PosixLogger::Flush:Begin2");

@ -175,8 +175,10 @@ class DB {
// called before calling the desctructor so that the caller can get back a // called before calling the desctructor so that the caller can get back a
// status in case there are any errors. This will not fsync the WAL files. // status in case there are any errors. This will not fsync the WAL files.
// If syncing is required, the caller must first call SyncWAL. // If syncing is required, the caller must first call SyncWAL.
// Regardless of the return status, the DB must be freed // Regardless of the return status, the DB must be freed. If the return
virtual Status Close() { return Status::OK(); } // status is NotSupported(), then the DB implementation does cleanup in the
// destructor
virtual Status Close() { return Status::NotSupported(); }
// ListColumnFamilies will open the DB specified by argument name // ListColumnFamilies will open the DB specified by argument name
// and return the list of all column families in that DB // and return the list of all column families in that DB

@ -822,7 +822,9 @@ class Logger {
: closed_(false), log_level_(log_level) {} : closed_(false), log_level_(log_level) {}
virtual ~Logger(); virtual ~Logger();
// Close the log file. Must be called before destructor // Close the log file. Must be called before destructor. If the return
// status is NotSupported(), it means the implementation does cleanup in
// the destructor
virtual Status Close(); virtual Status Close();
// Write a header to the log file with the specified format // Write a header to the log file with the specified format
@ -851,12 +853,14 @@ class Logger {
log_level_ = log_level; log_level_ = log_level;
} }
protected:
virtual Status CloseImpl();
bool closed_;
private: private:
// No copying allowed // No copying allowed
Logger(const Logger&); Logger(const Logger&);
void operator=(const Logger&); void operator=(const Logger&);
virtual Status CloseImpl();
bool closed_;
InfoLogLevel log_level_; InfoLogLevel log_level_;
}; };

@ -80,6 +80,9 @@ class AutoRollLogger : public Logger {
} }
virtual ~AutoRollLogger() { virtual ~AutoRollLogger() {
if (logger_ && !closed_) {
logger_->Close();
}
} }
void SetCallNowMicrosEveryNRecords(uint64_t call_NowMicros_every_N_records) { void SetCallNowMicrosEveryNRecords(uint64_t call_NowMicros_every_N_records) {
@ -93,6 +96,16 @@ class AutoRollLogger : public Logger {
uint64_t TEST_ctime() const { return ctime_; } uint64_t TEST_ctime() const { return ctime_; }
protected:
// Implementation of Close()
virtual Status CloseImpl() override {
if (logger_) {
return logger_->Close();
} else {
return Status::OK();
}
}
private: private:
bool LogExpired(); bool LogExpired();
Status ResetLogger(); Status ResetLogger();
@ -103,15 +116,6 @@ class AutoRollLogger : public Logger {
std::string ValistToString(const char* format, va_list args) const; std::string ValistToString(const char* format, va_list args) const;
// Write the logs marked as headers to the new log file // Write the logs marked as headers to the new log file
void WriteHeaderInfo(); void WriteHeaderInfo();
// Implementation of Close()
virtual Status CloseImpl() override {
if (logger_) {
return logger_->Close();
} else {
return Status::OK();
}
}
std::string log_fname_; // Current active info log's file name. std::string log_fname_; // Current active info log's file name.
std::string dbname_; std::string dbname_;
std::string db_log_dir_; std::string db_log_dir_;

Loading…
Cancel
Save