add flush interface to DB

Summary: as subject. The flush will flush everything in the db.

Test Plan: new test in db_test.cc

Reviewers: dhruba

Reviewed By: dhruba

Differential Revision: https://reviews.facebook.net/D4029
main
heyongqiang 13 years ago
parent a347d4ac0d
commit 22ee777f68
  1. 40
      db/db_impl.cc
  2. 7
      db/db_impl.h
  3. 43
      db/db_test.cc
  4. 4
      include/leveldb/db.h
  5. 11
      include/leveldb/options.h

@ -548,6 +548,11 @@ int DBImpl::Level0StopWriteTrigger() {
return options_.level0_stop_writes_trigger; return options_.level0_stop_writes_trigger;
} }
Status DBImpl::Flush(const FlushOptions& options) {
Status status = FlushMemTable(options);
return status;
}
void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
assert(level >= 0); assert(level >= 0);
@ -582,27 +587,36 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
} }
} }
Status DBImpl::TEST_CompactMemTable() { Status DBImpl::FlushMemTable(const FlushOptions& options) {
// NULL batch means just wait for earlier writes to be done // NULL batch means just wait for earlier writes to be done
Status s = Write(WriteOptions(), NULL); Status s = Write(WriteOptions(), NULL);
if (s.ok()) { if (s.ok() && options.wait) {
// Wait until the compaction completes // Wait until the compaction completes
s = TEST_WaitForCompactMemTable(); s = WaitForCompactMemTable();
} }
return s; return s;
} }
Status DBImpl::WaitForCompactMemTable() {
Status s;
// Wait until the compaction completes
MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) {
bg_cv_.Wait();
}
if (imm_ != NULL) {
s = bg_error_;
}
return s;
}
Status DBImpl::TEST_CompactMemTable() {
return FlushMemTable(FlushOptions());
}
Status DBImpl::TEST_WaitForCompactMemTable() { Status DBImpl::TEST_WaitForCompactMemTable() {
Status s; return WaitForCompactMemTable();
// Wait until the compaction completes
MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) {
bg_cv_.Wait();
}
if (imm_ != NULL) {
s = bg_error_;
}
return s;
} }
Status DBImpl::TEST_WaitForCompact() { Status DBImpl::TEST_WaitForCompact() {

@ -43,6 +43,7 @@ class DBImpl : public DB {
virtual int NumberLevels(); virtual int NumberLevels();
virtual int MaxMemCompactionLevel(); virtual int MaxMemCompactionLevel();
virtual int Level0StopWriteTrigger(); virtual int Level0StopWriteTrigger();
virtual Status Flush(const FlushOptions& options);
// Extra methods (for testing) that are not in the public DB interface // Extra methods (for testing) that are not in the public DB interface
@ -100,6 +101,12 @@ class DBImpl : public DB {
Status MakeRoomForWrite(bool force /* compact even if there is room? */); Status MakeRoomForWrite(bool force /* compact even if there is room? */);
WriteBatch* BuildBatchGroup(Writer** last_writer); WriteBatch* BuildBatchGroup(Writer** last_writer);
// Force current memtable contents to be flushed.
Status FlushMemTable(const FlushOptions& options);
// Wait for memtable compaction
Status WaitForCompactMemTable();
void MaybeScheduleCompaction(); void MaybeScheduleCompaction();
static void BGWork(void* db); static void BGWork(void* db);
void BackgroundCall(); void BackgroundCall();

@ -784,11 +784,11 @@ TEST(DBTest, WAL) {
WriteOptions writeOpt = WriteOptions(); WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true; writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1")); ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
ASSERT_OK(dbfull()->Put(writeOpt, "baz", "v1")); ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1"));
Reopen(); Reopen();
ASSERT_EQ("NOT_FOUND", Get("foo")); ASSERT_EQ("NOT_FOUND", Get("foo"));
ASSERT_EQ("NOT_FOUND", Get("baz")); ASSERT_EQ("NOT_FOUND", Get("bar"));
writeOpt.disableWAL = false; writeOpt.disableWAL = false;
ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v2")); ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v2"));
@ -812,6 +812,40 @@ TEST(DBTest, WAL) {
ASSERT_EQ("v3", Get("foo")); ASSERT_EQ("v3", Get("foo"));
} }
TEST(DBTest, FLUSH) {
Options options = CurrentOptions();
WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
// this will not flush the last 2 writes
dbfull()->Flush(FlushOptions());
ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1"));
Reopen();
ASSERT_EQ("v1", Get("foo"));
ASSERT_EQ("NOT_FOUND", Get("bar"));
writeOpt.disableWAL = true;
ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v2"));
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v2"));
dbfull()->Flush(FlushOptions());
Reopen();
ASSERT_EQ("v2", Get("bar"));
ASSERT_EQ("v2", Get("foo"));
writeOpt.disableWAL = false;
ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v3"));
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v3"));
dbfull()->Flush(FlushOptions());
Reopen();
// 'foo' should be there because its put
// has WAL enabled.
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v3", Get("bar"));
}
TEST(DBTest, RecoveryWithEmptyLog) { TEST(DBTest, RecoveryWithEmptyLog) {
do { do {
ASSERT_OK(Put("foo", "v1")); ASSERT_OK(Put("foo", "v1"));
@ -1758,6 +1792,11 @@ class ModelDB: public DB {
return -1; return -1;
} }
virtual Status Flush(const leveldb::FlushOptions& options) {
Status ret;
return ret;
}
private: private:
class ModelIter: public Iterator { class ModelIter: public Iterator {
public: public:

@ -19,6 +19,7 @@ static const int kMinorVersion = 4;
struct Options; struct Options;
struct ReadOptions; struct ReadOptions;
struct WriteOptions; struct WriteOptions;
struct FlushOptions;
class WriteBatch; class WriteBatch;
// Abstract handle to particular state of a DB. // Abstract handle to particular state of a DB.
@ -150,6 +151,9 @@ class DB {
// Number of files in level-0 that would stop writes. // Number of files in level-0 that would stop writes.
virtual int Level0StopWriteTrigger() = 0; virtual int Level0StopWriteTrigger() = 0;
// Flush all mem-table data.
virtual Status Flush(const FlushOptions& options) = 0;
private: private:
// No copying allowed // No copying allowed
DB(const DB&); DB(const DB&);

@ -254,6 +254,17 @@ struct WriteOptions {
} }
}; };
// Options that control flush operations
struct FlushOptions {
// If true, the flush will wait until the flush is done.
// Default: true
bool wait;
FlushOptions()
: wait(true) {
}
};
} // namespace leveldb } // namespace leveldb
#endif // STORAGE_LEVELDB_INCLUDE_OPTIONS_H_ #endif // STORAGE_LEVELDB_INCLUDE_OPTIONS_H_

Loading…
Cancel
Save