diff --git a/db/db_impl.cc b/db/db_impl.cc index f6f7fcbc7..df561f758 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -193,7 +193,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) stall_memtable_compaction_(0), stall_level0_num_files_(0), stall_leveln_slowdown_(0), - started_at_(options.env->NowMicros()) { + started_at_(options.env->NowMicros()), + flush_on_destroy_(false) { mem_->Ref(); has_imm_.Release_Store(NULL); @@ -226,7 +227,10 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) DBImpl::~DBImpl() { // Wait for background work to finish - mutex_.Lock(); + if (flush_on_destroy_) { + FlushMemTable(FlushOptions()); + } + mutex_.Lock(); shutting_down_.Release_Store(this); // Any non-NULL value is ok while (bg_compaction_scheduled_ || bg_logstats_scheduled_) { bg_cv_.Wait(); @@ -315,7 +319,7 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) { // delete_obsolete_files_period_micros. if (options_.delete_obsolete_files_period_micros != 0) { const uint64_t now_micros = env_->NowMicros(); - if (delete_obsolete_files_last_run_ + + if (delete_obsolete_files_last_run_ + options_.delete_obsolete_files_period_micros > now_micros) { return; } @@ -1144,8 +1148,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { ikey.sequence < compact->smallest_snapshot) { // If the user has specified a compaction filter, then invoke // it. If this key is not visible via any snapshot and the - // return value of the compaction filter is true and then - // drop this key from the output. + // return value of the compaction filter is true and then + // drop this key from the output. drop = options_.CompactionFilter(compact->compaction->level(), ikey.user_key, value, &compaction_filter_value); @@ -1414,6 +1418,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // into mem_. { mutex_.Unlock(); + if (options.disableWAL) { + flush_on_destroy_ = true; + } + if (!options.disableWAL) { status = log_->AddRecord(WriteBatchInternal::Contents(updates)); if (status.ok() && options.sync) { diff --git a/db/db_impl.h b/db/db_impl.h index 7264a9627..a12b94f7a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -51,7 +51,7 @@ class DBImpl : public DB { virtual Status Flush(const FlushOptions& options); virtual Status DisableFileDeletions(); virtual Status EnableFileDeletions(); - virtual Status GetLiveFiles(std::vector&, + virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size); // Extra methods (for testing) that are not in the public DB interface @@ -222,6 +222,8 @@ class DBImpl : public DB { // Time at which this instance was started. const uint64_t started_at_; + bool flush_on_destroy_; // Used when disableWAL is true. + // Per level compaction stats. stats_[level] stores the stats for // compactions that produced data for the specified "level". struct CompactionStats { diff --git a/db/db_test.cc b/db/db_test.cc index 3bef3eec3..0b5c96875 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -858,8 +858,8 @@ TEST(DBTest, WAL) { ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1")); Reopen(); - ASSERT_EQ("NOT_FOUND", Get("foo")); - ASSERT_EQ("NOT_FOUND", Get("bar")); + ASSERT_EQ("v1", Get("foo")); + ASSERT_EQ("v1", Get("bar")); writeOpt.disableWAL = false; ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v2")); @@ -867,10 +867,9 @@ TEST(DBTest, WAL) { ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v2")); Reopen(); - // We garantee the 'bar' will be there - // because its put has WAL enabled. - // But 'foo' may or may not be there. + // Both value's should be present. ASSERT_EQ("v2", Get("bar")); + ASSERT_EQ("v2", Get("foo")); writeOpt.disableWAL = true; ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v3")); @@ -878,9 +877,9 @@ TEST(DBTest, WAL) { ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v3")); Reopen(); - // 'foo' should be there because its put - // has WAL enabled. + // again both values should be present. ASSERT_EQ("v3", Get("foo")); + ASSERT_EQ("v3", Get("bar")); } TEST(DBTest, CheckLock) { @@ -895,13 +894,13 @@ TEST(DBTest, FLUSH) { WriteOptions writeOpt = WriteOptions(); writeOpt.disableWAL = true; ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1")); - // this will not flush the last 2 writes + // this will now also flush the last 2 writes dbfull()->Flush(FlushOptions()); ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1")); Reopen(); ASSERT_EQ("v1", Get("foo")); - ASSERT_EQ("NOT_FOUND", Get("bar")); + ASSERT_EQ("v1", Get("bar")); writeOpt.disableWAL = true; ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v2")); @@ -1201,12 +1200,12 @@ static int cfilter_count; static std::string NEW_VALUE = "NewValue"; static bool keep_filter(int level, const Slice& key, const Slice& value, Slice** new_value) { - cfilter_count++; + cfilter_count++; return false; } static bool delete_filter(int level, const Slice& key, const Slice& value, Slice** new_value) { - cfilter_count++; + cfilter_count++; return true; } static bool change_filter(int level, const Slice& key, @@ -1223,8 +1222,8 @@ TEST(DBTest, CompactionFilter) { options.CompactionFilter = keep_filter; Reopen(&options); - // Write 100K+1 keys, these are written to a few files - // in L0. We do this so that the current snapshot points + // Write 100K+1 keys, these are written to a few files + // in L0. We do this so that the current snapshot points // to the 100001 key.The compaction filter is not invoked // on keys that are visible via a snapshot because we // anyways cannot delete it. @@ -1324,8 +1323,8 @@ TEST(DBTest, CompactionFilterWithValueChange) { options.CompactionFilter = change_filter; Reopen(&options); - // Write 100K+1 keys, these are written to a few files - // in L0. We do this so that the current snapshot points + // Write 100K+1 keys, these are written to a few files + // in L0. We do this so that the current snapshot points // to the 100001 key.The compaction filter is not invoked // on keys that are visible via a snapshot because we // anyways cannot delete it.