Add ReadOptions to TransactionLogIterator.

Summary:
Add an optional input parameter ReadOptions to DB::GetUpdateSince(),
which allows the verification of checksums to be disabled by setting
ReadOptions::verify_checksums to false.

Test Plan: Tests are done off-line and will not be included in the regular unit test.

Reviewers: igor

Reviewed By: igor

CC: leveldb, xjin, dhruba

Differential Revision: https://reviews.facebook.net/D16305
main
Yueh-Hsuan Chiang 11 years ago
parent 6ba1084f24
commit a77527f2af
  1. 15
      db/db_impl.cc
  2. 6
      db/db_impl.h
  3. 6
      db/db_test.cc
  4. 18
      db/transaction_log_impl.cc
  5. 12
      db/transaction_log_impl.h
  6. 6
      include/rocksdb/db.h
  7. 13
      include/rocksdb/transaction_log.h
  8. 8
      include/utilities/stackable_db.h

@ -1490,8 +1490,9 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const {
return versions_->LastSequence(); return versions_->LastSequence();
} }
Status DBImpl::GetUpdatesSince(SequenceNumber seq, Status DBImpl::GetUpdatesSince(
unique_ptr<TransactionLogIterator>* iter) { SequenceNumber seq, unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions& read_options) {
RecordTick(options_.statistics.get(), GET_UPDATES_SINCE_CALLS); RecordTick(options_.statistics.get(), GET_UPDATES_SINCE_CALLS);
if (seq > versions_->LastSequence()) { if (seq > versions_->LastSequence()) {
@ -1511,13 +1512,9 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
iter->reset( iter->reset(new TransactionLogIteratorImpl(options_.wal_dir, &options_,
new TransactionLogIteratorImpl(options_.wal_dir, read_options, storage_options_,
&options_, seq, std::move(wal_files), this));
storage_options_,
seq,
std::move(wal_files),
this));
return (*iter)->status(); return (*iter)->status();
} }

@ -85,8 +85,10 @@ class DBImpl : public DB {
bool flush_memtable = true); bool flush_memtable = true);
virtual Status GetSortedWalFiles(VectorLogPtr& files); virtual Status GetSortedWalFiles(VectorLogPtr& files);
virtual SequenceNumber GetLatestSequenceNumber() const; virtual SequenceNumber GetLatestSequenceNumber() const;
virtual Status GetUpdatesSince(SequenceNumber seq_number, virtual Status GetUpdatesSince(
unique_ptr<TransactionLogIterator>* iter); SequenceNumber seq_number, unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions&
read_options = TransactionLogIterator::ReadOptions());
virtual Status DeleteFile(std::string name); virtual Status DeleteFile(std::string name);
virtual void GetLiveFilesMetaData( virtual void GetLiveFilesMetaData(

@ -5005,8 +5005,10 @@ class ModelDB: public DB {
virtual SequenceNumber GetLatestSequenceNumber() const { virtual SequenceNumber GetLatestSequenceNumber() const {
return 0; return 0;
} }
virtual Status GetUpdatesSince(rocksdb::SequenceNumber, virtual Status GetUpdatesSince(
unique_ptr<rocksdb::TransactionLogIterator>*) { rocksdb::SequenceNumber, unique_ptr<rocksdb::TransactionLogIterator>*,
const TransactionLogIterator::ReadOptions&
read_options = TransactionLogIterator::ReadOptions()) {
return Status::NotSupported("Not supported in Model DB"); return Status::NotSupported("Not supported in Model DB");
} }

@ -9,14 +9,13 @@
namespace rocksdb { namespace rocksdb {
TransactionLogIteratorImpl::TransactionLogIteratorImpl( TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dir, const std::string& dir, const Options* options,
const Options* options, const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const EnvOptions& soptions, const SequenceNumber seq,
const SequenceNumber seq, std::unique_ptr<VectorLogPtr> files, DBImpl const* const dbimpl)
std::unique_ptr<VectorLogPtr> files, : dir_(dir),
DBImpl const * const dbimpl) :
dir_(dir),
options_(options), options_(options),
read_options_(read_options),
soptions_(soptions), soptions_(soptions),
startingSequenceNumber_(seq), startingSequenceNumber_(seq),
files_(std::move(files)), files_(std::move(files)),
@ -253,9 +252,8 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
return status; return status;
} }
assert(file); assert(file);
currentLogReader_.reset( currentLogReader_.reset(new log::Reader(std::move(file), &reporter_,
new log::Reader(std::move(file), &reporter_, true, 0) read_options_.verify_checksums_, 0));
);
return Status::OK(); return Status::OK();
} }
} // namespace rocksdb } // namespace rocksdb

@ -66,12 +66,11 @@ class LogFileImpl : public LogFile {
class TransactionLogIteratorImpl : public TransactionLogIterator { class TransactionLogIteratorImpl : public TransactionLogIterator {
public: public:
TransactionLogIteratorImpl(const std::string& dir, TransactionLogIteratorImpl(
const Options* options, const std::string& dir, const Options* options,
const EnvOptions& soptions, const TransactionLogIterator::ReadOptions& read_options,
const SequenceNumber seqNum, const EnvOptions& soptions, const SequenceNumber seqNum,
std::unique_ptr<VectorLogPtr> files, std::unique_ptr<VectorLogPtr> files, DBImpl const* const dbimpl);
DBImpl const * const dbimpl);
virtual bool Valid(); virtual bool Valid();
@ -84,6 +83,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
private: private:
const std::string& dir_; const std::string& dir_;
const Options* options_; const Options* options_;
const TransactionLogIterator::ReadOptions read_options_;
const EnvOptions& soptions_; const EnvOptions& soptions_;
SequenceNumber startingSequenceNumber_; SequenceNumber startingSequenceNumber_;
std::unique_ptr<VectorLogPtr> files_; std::unique_ptr<VectorLogPtr> files_;

@ -300,8 +300,10 @@ class DB {
// use this api, else the WAL files will get // use this api, else the WAL files will get
// cleared aggressively and the iterator might keep getting invalid before // cleared aggressively and the iterator might keep getting invalid before
// an update is read. // an update is read.
virtual Status GetUpdatesSince(SequenceNumber seq_number, virtual Status GetUpdatesSince(
unique_ptr<TransactionLogIterator>* iter) = 0; SequenceNumber seq_number, unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions&
read_options = TransactionLogIterator::ReadOptions()) = 0;
// Delete the file name from the db directory and update the internal state to // Delete the file name from the db directory and update the internal state to
// reflect that. Supports deletion of sst and log files only. 'name' must be // reflect that. Supports deletion of sst and log files only. 'name' must be

@ -85,6 +85,19 @@ class TransactionLogIterator {
// earliest transaction contained in the batch. // earliest transaction contained in the batch.
// ONLY use if Valid() is true and status() is OK. // ONLY use if Valid() is true and status() is OK.
virtual BatchResult GetBatch() = 0; virtual BatchResult GetBatch() = 0;
// The read options for TransactionLogIterator.
struct ReadOptions {
// If true, all data read from underlying storage will be
// verified against corresponding checksums.
// Default: true
bool verify_checksums_;
ReadOptions() : verify_checksums_(true) {}
explicit ReadOptions(bool verify_checksums)
: verify_checksums_(verify_checksums) {}
};
}; };
} // namespace rocksdb } // namespace rocksdb

@ -152,10 +152,10 @@ class StackableDB : public DB {
return db_->GetPropertiesOfAllTables(props); return db_->GetPropertiesOfAllTables(props);
} }
virtual Status GetUpdatesSince(SequenceNumber seq_number, virtual Status GetUpdatesSince(
unique_ptr<TransactionLogIterator>* iter) SequenceNumber seq_number, unique_ptr<TransactionLogIterator>* iter,
override { const TransactionLogIterator::ReadOptions& read_options) override {
return db_->GetUpdatesSince(seq_number, iter); return db_->GetUpdatesSince(seq_number, iter, read_options);
} }
protected: protected:

Loading…
Cancel
Save