Flush Data at object destruction if disableWal is used.

Summary:
Added a conditional flush in ~DBImpl to flush.
There is still a chance of writes not being persisted if there is a
crash (not a clean shutdown) before the DBImpl instance is destroyed.

Test Plan: modified db_test to meet the new expectations.

Reviewers: dhruba, heyongqiang

Differential Revision: https://reviews.facebook.net/D6519
main
Abhishek Kona 12 years ago
parent aa42c66814
commit 4e413df3d0
  1. 18
      db/db_impl.cc
  2. 4
      db/db_impl.h
  3. 29
      db/db_test.cc

@ -193,7 +193,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
stall_memtable_compaction_(0), stall_memtable_compaction_(0),
stall_level0_num_files_(0), stall_level0_num_files_(0),
stall_leveln_slowdown_(0), stall_leveln_slowdown_(0),
started_at_(options.env->NowMicros()) { started_at_(options.env->NowMicros()),
flush_on_destroy_(false) {
mem_->Ref(); mem_->Ref();
has_imm_.Release_Store(NULL); has_imm_.Release_Store(NULL);
@ -226,7 +227,10 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
DBImpl::~DBImpl() { DBImpl::~DBImpl() {
// Wait for background work to finish // Wait for background work to finish
mutex_.Lock(); if (flush_on_destroy_) {
FlushMemTable(FlushOptions());
}
mutex_.Lock();
shutting_down_.Release_Store(this); // Any non-NULL value is ok shutting_down_.Release_Store(this); // Any non-NULL value is ok
while (bg_compaction_scheduled_ || bg_logstats_scheduled_) { while (bg_compaction_scheduled_ || bg_logstats_scheduled_) {
bg_cv_.Wait(); bg_cv_.Wait();
@ -315,7 +319,7 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) {
// delete_obsolete_files_period_micros. // delete_obsolete_files_period_micros.
if (options_.delete_obsolete_files_period_micros != 0) { if (options_.delete_obsolete_files_period_micros != 0) {
const uint64_t now_micros = env_->NowMicros(); const uint64_t now_micros = env_->NowMicros();
if (delete_obsolete_files_last_run_ + if (delete_obsolete_files_last_run_ +
options_.delete_obsolete_files_period_micros > now_micros) { options_.delete_obsolete_files_period_micros > now_micros) {
return; return;
} }
@ -1144,8 +1148,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
ikey.sequence < compact->smallest_snapshot) { ikey.sequence < compact->smallest_snapshot) {
// If the user has specified a compaction filter, then invoke // If the user has specified a compaction filter, then invoke
// it. If this key is not visible via any snapshot and the // it. If this key is not visible via any snapshot and the
// return value of the compaction filter is true and then // return value of the compaction filter is true and then
// drop this key from the output. // drop this key from the output.
drop = options_.CompactionFilter(compact->compaction->level(), drop = options_.CompactionFilter(compact->compaction->level(),
ikey.user_key, value, &compaction_filter_value); ikey.user_key, value, &compaction_filter_value);
@ -1414,6 +1418,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// into mem_. // into mem_.
{ {
mutex_.Unlock(); mutex_.Unlock();
if (options.disableWAL) {
flush_on_destroy_ = true;
}
if (!options.disableWAL) { if (!options.disableWAL) {
status = log_->AddRecord(WriteBatchInternal::Contents(updates)); status = log_->AddRecord(WriteBatchInternal::Contents(updates));
if (status.ok() && options.sync) { if (status.ok() && options.sync) {

@ -51,7 +51,7 @@ class DBImpl : public DB {
virtual Status Flush(const FlushOptions& options); virtual Status Flush(const FlushOptions& options);
virtual Status DisableFileDeletions(); virtual Status DisableFileDeletions();
virtual Status EnableFileDeletions(); virtual Status EnableFileDeletions();
virtual Status GetLiveFiles(std::vector<std::string>&, virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size); uint64_t* manifest_file_size);
// Extra methods (for testing) that are not in the public DB interface // Extra methods (for testing) that are not in the public DB interface
@ -222,6 +222,8 @@ class DBImpl : public DB {
// Time at which this instance was started. // Time at which this instance was started.
const uint64_t started_at_; const uint64_t started_at_;
bool flush_on_destroy_; // Used when disableWAL is true.
// Per level compaction stats. stats_[level] stores the stats for // Per level compaction stats. stats_[level] stores the stats for
// compactions that produced data for the specified "level". // compactions that produced data for the specified "level".
struct CompactionStats { struct CompactionStats {

@ -858,8 +858,8 @@ TEST(DBTest, WAL) {
ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1")); ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1"));
Reopen(); Reopen();
ASSERT_EQ("NOT_FOUND", Get("foo")); ASSERT_EQ("v1", Get("foo"));
ASSERT_EQ("NOT_FOUND", Get("bar")); ASSERT_EQ("v1", Get("bar"));
writeOpt.disableWAL = false; writeOpt.disableWAL = false;
ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v2")); ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v2"));
@ -867,10 +867,9 @@ TEST(DBTest, WAL) {
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v2")); ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v2"));
Reopen(); Reopen();
// We garantee the 'bar' will be there // Both value's should be present.
// because its put has WAL enabled.
// But 'foo' may or may not be there.
ASSERT_EQ("v2", Get("bar")); ASSERT_EQ("v2", Get("bar"));
ASSERT_EQ("v2", Get("foo"));
writeOpt.disableWAL = true; writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v3")); ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v3"));
@ -878,9 +877,9 @@ TEST(DBTest, WAL) {
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v3")); ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v3"));
Reopen(); Reopen();
// 'foo' should be there because its put // again both values should be present.
// has WAL enabled.
ASSERT_EQ("v3", Get("foo")); ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v3", Get("bar"));
} }
TEST(DBTest, CheckLock) { TEST(DBTest, CheckLock) {
@ -895,13 +894,13 @@ TEST(DBTest, FLUSH) {
WriteOptions writeOpt = WriteOptions(); WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true; writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1")); ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
// this will not flush the last 2 writes // this will now also flush the last 2 writes
dbfull()->Flush(FlushOptions()); dbfull()->Flush(FlushOptions());
ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1")); ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1"));
Reopen(); Reopen();
ASSERT_EQ("v1", Get("foo")); ASSERT_EQ("v1", Get("foo"));
ASSERT_EQ("NOT_FOUND", Get("bar")); ASSERT_EQ("v1", Get("bar"));
writeOpt.disableWAL = true; writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v2")); ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v2"));
@ -1201,12 +1200,12 @@ static int cfilter_count;
static std::string NEW_VALUE = "NewValue"; static std::string NEW_VALUE = "NewValue";
static bool keep_filter(int level, const Slice& key, static bool keep_filter(int level, const Slice& key,
const Slice& value, Slice** new_value) { const Slice& value, Slice** new_value) {
cfilter_count++; cfilter_count++;
return false; return false;
} }
static bool delete_filter(int level, const Slice& key, static bool delete_filter(int level, const Slice& key,
const Slice& value, Slice** new_value) { const Slice& value, Slice** new_value) {
cfilter_count++; cfilter_count++;
return true; return true;
} }
static bool change_filter(int level, const Slice& key, static bool change_filter(int level, const Slice& key,
@ -1223,8 +1222,8 @@ TEST(DBTest, CompactionFilter) {
options.CompactionFilter = keep_filter; options.CompactionFilter = keep_filter;
Reopen(&options); Reopen(&options);
// Write 100K+1 keys, these are written to a few files // Write 100K+1 keys, these are written to a few files
// in L0. We do this so that the current snapshot points // in L0. We do this so that the current snapshot points
// to the 100001 key.The compaction filter is not invoked // to the 100001 key.The compaction filter is not invoked
// on keys that are visible via a snapshot because we // on keys that are visible via a snapshot because we
// anyways cannot delete it. // anyways cannot delete it.
@ -1324,8 +1323,8 @@ TEST(DBTest, CompactionFilterWithValueChange) {
options.CompactionFilter = change_filter; options.CompactionFilter = change_filter;
Reopen(&options); Reopen(&options);
// Write 100K+1 keys, these are written to a few files // Write 100K+1 keys, these are written to a few files
// in L0. We do this so that the current snapshot points // in L0. We do this so that the current snapshot points
// to the 100001 key.The compaction filter is not invoked // to the 100001 key.The compaction filter is not invoked
// on keys that are visible via a snapshot because we // on keys that are visible via a snapshot because we
// anyways cannot delete it. // anyways cannot delete it.

Loading…
Cancel
Save