Flush job should release reference current version if sync log failed

Summary:
Fix the bug when sync log fail, FlushJob::Run() will not be execute and
reference to cfd->current() will not be release.
Closes https://github.com/facebook/rocksdb/pull/1792

Differential Revision: D4441316

Pulled By: yiwu-arbug

fbshipit-source-id: 5523e28
main
Yi Wu 8 years ago committed by Facebook Github Bot
parent da54d36a96
commit 9239103cd4
  1. 33
      db/db_flush_test.cc
  2. 6
      db/db_impl.cc
  3. 6
      db/flush_job.cc
  4. 4
      db/flush_job.h
  5. 2
      db/version_set.h
  6. 2
      util/fault_injection_test_env.cc

@ -9,6 +9,7 @@
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "util/fault_injection_test_env.h"
#include "util/sync_point.h" #include "util/sync_point.h"
namespace rocksdb { namespace rocksdb {
@ -47,6 +48,38 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) {
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }
TEST_F(DBFlushTest, SyncFail) {
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(Env::Default()));
Options options;
options.disable_auto_compactions = true;
options.env = fault_injection_env.get();
SyncPoint::GetInstance()->LoadDependency(
{{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
{"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
Put("key", "value");
auto* cfd =
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
->cfd();
int refs_before = cfd->current()->TEST_refs();
FlushOptions flush_options;
flush_options.wait = false;
ASSERT_OK(dbfull()->Flush(flush_options));
fault_injection_env->SetFilesystemActive(false);
TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
fault_injection_env->SetFilesystemActive(true);
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ("", FilesPerLevel()); // flush failed.
// Flush job should release ref count to current version.
ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
Destroy(options);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -1842,6 +1842,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
} }
Status DBImpl::SyncClosedLogs(JobContext* job_context) { Status DBImpl::SyncClosedLogs(JobContext* job_context) {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
mutex_.AssertHeld(); mutex_.AssertHeld();
autovector<log::Writer*, 1> logs_to_sync; autovector<log::Writer*, 1> logs_to_sync;
uint64_t current_log_number = logfile_number_; uint64_t current_log_number = logfile_number_;
@ -1878,6 +1879,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
MarkLogsSynced(current_log_number - 1, true, s); MarkLogsSynced(current_log_number - 1, true, s);
if (!s.ok()) { if (!s.ok()) {
bg_error_ = s; bg_error_ = s;
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
return s; return s;
} }
} }
@ -1928,6 +1930,8 @@ Status DBImpl::FlushMemTableToOutputFile(
// is unlocked by the current thread. // is unlocked by the current thread.
if (s.ok()) { if (s.ok()) {
s = flush_job.Run(&file_meta); s = flush_job.Run(&file_meta);
} else {
flush_job.Cancel();
} }
if (s.ok()) { if (s.ok()) {
@ -2762,7 +2766,7 @@ void DBImpl::MarkLogsSynced(
++it; ++it;
} }
} }
assert(logs_.empty() || logs_[0].number > up_to || assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
(logs_.size() == 1 && !logs_[0].getting_synced)); (logs_.size() == 1 && !logs_[0].getting_synced));
log_sync_cv_.SignalAll(); log_sync_cv_.SignalAll();
} }

@ -230,6 +230,12 @@ Status FlushJob::Run(FileMetaData* file_meta) {
return s; return s;
} }
void FlushJob::Cancel() {
db_mutex_->AssertHeld();
assert(base_ != nullptr);
base_->Unref();
}
Status FlushJob::WriteLevel0Table() { Status FlushJob::WriteLevel0Table() {
AutoThreadOperationStageUpdater stage_updater( AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_FLUSH_WRITE_L0); ThreadStatus::STAGE_FLUSH_WRITE_L0);

@ -67,9 +67,11 @@ class FlushJob {
~FlushJob(); ~FlushJob();
// Require db_mutex held // Require db_mutex held.
// Once PickMemTable() is called, either Run() or Cancel() has to be call.
void PickMemTable(); void PickMemTable();
Status Run(FileMetaData* file_meta = nullptr); Status Run(FileMetaData* file_meta = nullptr);
void Cancel();
TableProperties GetTableProperties() const { return table_properties_; } TableProperties GetTableProperties() const { return table_properties_; }
private: private:

@ -520,6 +520,8 @@ class Version {
return next_; return next_;
} }
int TEST_refs() const { return refs_; }
VersionStorageInfo* storage_info() { return &storage_info_; } VersionStorageInfo* storage_info() { return &storage_info_; }
VersionSet* version_set() { return vset_; } VersionSet* version_set() { return vset_; }

@ -149,7 +149,7 @@ Status TestWritableFile::Flush() {
Status TestWritableFile::Sync() { Status TestWritableFile::Sync() {
if (!env_->IsFilesystemActive()) { if (!env_->IsFilesystemActive()) {
return Status::OK(); return Status::IOError("FaultInjectionTestEnv: not active");
} }
// No need to actual sync. // No need to actual sync.
state_.pos_at_last_sync_ = state_.pos_; state_.pos_at_last_sync_ = state_.pos_;

Loading…
Cancel
Save