Fix for the weird behaviour encountered by ldb Get where it could read only the second-latest value

Summary:
Changed the Get and Scan options with openForReadOnly mode to have access to the memtable.
Changed the visibility of NewInternalIterator in db_impl from private to protected so that
the derived class db_impl_read_only can call that in its NewIterator function for the
scan case. The previous approach which changed the default for flush_on_destroy_ from false to true
caused many problems in the unit tests due to empty sst files that it created. All
unit tests pass now.

Test Plan: make clean; make all check; ldb put and get and scans

Reviewers: dhruba, heyongqiang, sheki

Reviewed By: dhruba

CC: kosievdmerwe, zshao, dilipj, kailiu

Differential Revision: https://reviews.facebook.net/D8697
main
amayank 12 years ago
parent fe10200ddc
commit b2c50f1c3f
  1. 17
      db/db_impl.h
  2. 16
      db/db_impl_readonly.cc
  3. 18
      util/ldb_cmd.h

@ -97,25 +97,26 @@ class DBImpl : public DB {
const Comparator* user_comparator() const {
return internal_comparator_.user_comparator();
}
MemTable* GetMemTable() {
return mem_;
}
Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot);
private:
friend class DB;
struct CompactionState;
struct Writer;
struct DeletionState;
Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot);
Status NewDB();
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
Status Recover(VersionEdit* edit, MemTable* external_table = NULL,
Status Recover(VersionEdit* edit, MemTable* external_table = nullptr,
bool error_if_log_file_exist = false);
void MaybeIgnoreError(Status* s) const;
@ -127,7 +128,7 @@ class DBImpl : public DB {
// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status CompactMemTable(bool* madeProgress = NULL);
Status CompactMemTable(bool* madeProgress = nullptr);
Status RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
@ -206,7 +207,7 @@ class DBImpl : public DB {
// table_cache_ provides its own synchronization
unique_ptr<TableCache> table_cache_;
// Lock over the persistent DB state. Non-NULL iff successfully acquired.
// Lock over the persistent DB state. Non-nullptr iff successfully acquired.
FileLock* db_lock_;
// State below is protected by mutex_
@ -241,8 +242,8 @@ class DBImpl : public DB {
int level;
bool done;
bool in_progress; // compaction request being processed?
const InternalKey* begin; // NULL means beginning of key range
const InternalKey* end; // NULL means end of key range
const InternalKey* begin; // nullptr means beginning of key range
const InternalKey* end; // nullptr means end of key range
InternalKey tmp_storage; // Used to keep track of compaction progress
};
ManualCompaction* manual_compaction_;

@ -50,28 +50,32 @@ Status DBImplReadOnly::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
Status s;
MemTable* mem = GetMemTable();
Version* current = versions_->current();
SequenceNumber snapshot = versions_->LastSequence();
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
} else {
Version::GetStats stats;
s = current->Get(options, lkey, value, &stats);
}
return s;
}
Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options) {
std::vector<Iterator*> list;
versions_->current()->AddIterators(options, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
SequenceNumber latest_snapshot;
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
return NewDBIterator(
&dbname_, env_, user_comparator(), internal_iter,
reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_);
(options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
}
Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
DB** dbptr, bool error_if_log_file_exist) {
*dbptr = NULL;
*dbptr = nullptr;
DBImplReadOnly* impl = new DBImplReadOnly(options, dbname);
impl->mutex_.Lock();

@ -61,9 +61,9 @@ public:
}
virtual ~LDBCommand() {
if (db_ != NULL) {
if (db_ != nullptr) {
delete db_;
db_ = NULL;
db_ = nullptr;
}
}
@ -73,7 +73,7 @@ public:
return;
}
if (db_ == NULL && !NoDBOpen()) {
if (db_ == nullptr && !NoDBOpen()) {
OpenDB();
if (!exec_state_.IsNotStarted()) {
return;
@ -85,7 +85,7 @@ public:
exec_state_ = LDBCommandExecuteResult::SUCCEED("");
}
if (db_ != NULL) {
if (db_ != nullptr) {
CloseDB ();
}
}
@ -165,7 +165,7 @@ protected:
LDBCommand(const map<string, string>& options, const vector<string>& flags,
bool is_read_only, const vector<string>& valid_cmd_line_options) :
db_(NULL),
db_(nullptr),
is_read_only_(is_read_only),
is_key_hex_(false),
is_value_hex_(false),
@ -190,9 +190,7 @@ protected:
// Open the DB.
leveldb::Status st;
if (is_read_only_) {
//st = leveldb::DB::OpenForReadOnly(opt, db_path_, &db_);
// Could not get this to work
st = leveldb::DB::Open(opt, db_path_, &db_);
st = leveldb::DB::OpenForReadOnly(opt, db_path_, &db_);
} else {
st = leveldb::DB::Open(opt, db_path_, &db_);
}
@ -203,9 +201,9 @@ protected:
}
void CloseDB () {
if (db_ != NULL) {
if (db_ != nullptr) {
delete db_;
db_ = NULL;
db_ = nullptr;
}
}

Loading…
Cancel
Save