Enhance ReadOnly mode to process the all committed transactions.

Summary:
Leveldb has an api OpenForReadOnly() that opens the database
in readonly mode. This call had an option to not process the
transaction log.  This patch removes this option and always
processes all transactions that had been committed. It has
been done in such a way that it does not create/write to
any new files in the process. The invariant of "no-writes"
to the leveldb data directory is still true.

This enhancement allows multiple threads to open the same database
in readonly mode and access all trancations that were committed right
upto the OpenForReadOnly call.

I changed the public API to match the new semantics because
there are no users who are currently using this api.

Test Plan: make clean check

Reviewers: sheki

Reviewed By: sheki

CC: leveldb

Differential Revision: https://reviews.facebook.net/D7479
main
Dhruba Borthakur 12 years ago
parent be9b862d47
commit f4c2b7cf97
  1. 25
      db/db_impl.cc
  2. 12
      db/db_impl.h
  3. 8
      db/db_impl_readonly.cc
  4. 2
      include/leveldb/db.h

@ -505,7 +505,9 @@ void DBImpl::PurgeObsoleteWALFiles() {
} }
} }
Status DBImpl::Recover(VersionEdit* edit, bool no_log_recory, // If externalTable is set, then apply recovered transactions
// to that table. This is used for readonly mode.
Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
bool error_if_log_file_exist) { bool error_if_log_file_exist) {
mutex_.AssertHeld(); mutex_.AssertHeld();
@ -513,7 +515,7 @@ Status DBImpl::Recover(VersionEdit* edit, bool no_log_recory,
// committed only when the descriptor is created, and this directory // committed only when the descriptor is created, and this directory
// may already exist from a previous failed creation attempt. // may already exist from a previous failed creation attempt.
assert(db_lock_ == NULL); assert(db_lock_ == NULL);
if (!no_log_recory) { if (!external_table) {
env_->CreateDir(dbname_); env_->CreateDir(dbname_);
Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
if (!s.ok()) { if (!s.ok()) {
@ -573,14 +575,10 @@ Status DBImpl::Recover(VersionEdit* edit, bool no_log_recory,
"flag but a log file already exists"); "flag but a log file already exists");
} }
if (no_log_recory) {
return s;
}
// Recover in the order in which the logs were generated // Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end()); std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) { for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], edit, &max_sequence); s = RecoverLogFile(logs[i], edit, &max_sequence, external_table);
// The previous incarnation may not have written any MANIFEST // The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually // records after allocating this log number. So we manually
@ -600,7 +598,8 @@ Status DBImpl::Recover(VersionEdit* edit, bool no_log_recory,
Status DBImpl::RecoverLogFile(uint64_t log_number, Status DBImpl::RecoverLogFile(uint64_t log_number,
VersionEdit* edit, VersionEdit* edit,
SequenceNumber* max_sequence) { SequenceNumber* max_sequence,
MemTable* external_table) {
struct LogReporter : public log::Reader::Reporter { struct LogReporter : public log::Reader::Reporter {
Env* env; Env* env;
Logger* info_log; Logger* info_log;
@ -645,6 +644,9 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
Slice record; Slice record;
WriteBatch batch; WriteBatch batch;
MemTable* mem = NULL; MemTable* mem = NULL;
if (external_table) {
mem = external_table;
}
while (reader.ReadRecord(&record, &scratch) && while (reader.ReadRecord(&record, &scratch) &&
status.ok()) { status.ok()) {
if (record.size() < 12) { if (record.size() < 12) {
@ -670,7 +672,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
*max_sequence = last_seq; *max_sequence = last_seq;
} }
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { if (!external_table &&
mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = WriteLevel0TableForRecovery(mem, edit); status = WriteLevel0TableForRecovery(mem, edit);
if (!status.ok()) { if (!status.ok()) {
// Reflect errors immediately so that conditions like full // Reflect errors immediately so that conditions like full
@ -682,13 +685,13 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
} }
} }
if (status.ok() && mem != NULL) { if (status.ok() && mem != NULL && !external_table) {
status = WriteLevel0TableForRecovery(mem, edit); status = WriteLevel0TableForRecovery(mem, edit);
// Reflect errors immediately so that conditions like full // Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail. // file-systems cause the DB::Open() to fail.
} }
if (mem != NULL) mem->Unref(); if (mem != NULL && !external_table) mem->Unref();
delete file; delete file;
return status; return status;
} }

@ -85,8 +85,7 @@ class DBImpl : public DB {
// Simulate a db crash, no elegant closing of database. // Simulate a db crash, no elegant closing of database.
void TEST_Destroy_DBImpl(); void TEST_Destroy_DBImpl();
protected: protected:
Env* const env_; Env* const env_;
const std::string dbname_; const std::string dbname_;
VersionSet* versions_; VersionSet* versions_;
@ -96,6 +95,9 @@ protected:
const Comparator* user_comparator() const { const Comparator* user_comparator() const {
return internal_comparator_.user_comparator(); return internal_comparator_.user_comparator();
} }
MemTable* GetMemTable() {
return mem_;
}
private: private:
friend class DB; friend class DB;
@ -111,8 +113,7 @@ protected:
// Recover the descriptor from persistent storage. May do a significant // Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to // amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit. // be made to the descriptor are added to *edit.
Status Recover(VersionEdit* edit, Status Recover(VersionEdit* edit, MemTable* external_table = NULL,
bool no_log_recory = false,
bool error_if_log_file_exist = false); bool error_if_log_file_exist = false);
void MaybeIgnoreError(Status* s) const; void MaybeIgnoreError(Status* s) const;
@ -128,7 +129,8 @@ protected:
Status RecoverLogFile(uint64_t log_number, Status RecoverLogFile(uint64_t log_number,
VersionEdit* edit, VersionEdit* edit,
SequenceNumber* max_sequence); SequenceNumber* max_sequence,
MemTable* external_table);
// The following two methods are used to flush a memtable to // The following two methods are used to flush a memtable to
// storage. The first one is used atdatabase RecoveryTime (when the // storage. The first one is used atdatabase RecoveryTime (when the

@ -70,16 +70,14 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options) {
Status DB::OpenForReadOnly(const Options& options, const std::string& dbname, Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
DB** dbptr, bool no_log_recory, bool error_if_log_file_exist) { DB** dbptr, bool error_if_log_file_exist) {
*dbptr = NULL; *dbptr = NULL;
DBImplReadOnly* impl = new DBImplReadOnly(options, dbname); DBImplReadOnly* impl = new DBImplReadOnly(options, dbname);
impl->mutex_.Lock(); impl->mutex_.Lock();
VersionEdit edit(impl->NumberLevels()); VersionEdit edit(impl->NumberLevels());
Status s = impl->Recover(&edit, no_log_recory, error_if_log_file_exist); Status s = impl->Recover(&edit, impl->GetMemTable(),
if (s.ok() && !no_log_recory) { error_if_log_file_exist);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
impl->mutex_.Unlock(); impl->mutex_.Unlock();
if (s.ok()) { if (s.ok()) {
*dbptr = impl; *dbptr = impl;

@ -62,7 +62,7 @@ class DB {
// will happen. // will happen.
static Status OpenForReadOnly(const Options& options, static Status OpenForReadOnly(const Options& options,
const std::string& name, DB** dbptr, const std::string& name, DB** dbptr,
bool no_log_recory = true, bool error_if_log_file_exist = false); bool error_if_log_file_exist = false);
DB() { } DB() { }
virtual ~DB(); virtual ~DB();

Loading…
Cancel
Save