GetUpdatesSince API to enable replication.

Summary:
How it works:
* GetUpdatesSince takes a SequenceNumber.
* A LogFile with the first SequenceNumber nearest and lesser than the requested Sequence Number is found.
* Seek in the logFile till the requested SeqNumber is found.
* Return an iterator which contains logic to return record's one by one.

Test Plan:
* Test case included to check the good code path.
* Will update with more test-cases.
* Feedback required on test-cases.

Reviewers: dhruba, emayanke

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D7119
main
Abhishek Kona 12 years ago
parent f69e9f3e04
commit 8055008909
  1. 162
      db/db_impl.cc
  2. 20
      db/db_impl.h
  3. 62
      db/db_test.cc
  4. 3
      db/dbformat.h
  5. 9
      db/filename.cc
  6. 5
      db/filename.h
  7. 48
      db/log_file.h
  8. 146
      db/transaction_log_iterator_impl.cc
  9. 66
      db/transaction_log_iterator_impl.h
  10. 1
      db/write_batch_internal.h
  11. 15
      include/leveldb/db.h
  12. 34
      include/leveldb/transaction_log_iterator.h
  13. 14
      include/leveldb/types.h

@ -23,6 +23,7 @@
#include "db/table_cache.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "db/transaction_log_iterator_impl.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/statistics.h"
@ -484,7 +485,7 @@ void DBImpl::DeleteObsoleteFiles() {
void DBImpl::PurgeObsoleteWALFiles() {
if (options_.WAL_ttl_seconds != ULONG_MAX && options_.WAL_ttl_seconds > 0) {
std::vector<std::string> WALFiles;
std::string archivalDir = dbname_ + "/" + ARCHIVAL_DIR;
std::string archivalDir = GetArchivalDirectoryName();
env_->GetChildren(archivalDir, &WALFiles);
int64_t currentTime;
const Status status = env_->GetCurrentTime(&currentTime);
@ -884,6 +885,165 @@ Status DBImpl::Flush(const FlushOptions& options) {
return status;
}
Status DBImpl::GetUpdatesSince(SequenceNumber seq,
TransactionLogIterator** iter) {
// Get All Log Files.
// Sort Files
// Get the first entry from each file.
// Do binary search and open files and find the seq number.
std::vector<LogFile> walFiles;
// list wal files in main db dir.
Status s = ListAllWALFiles(dbname_, &walFiles, kAliveLogFile);
if (!s.ok()) {
return s;
}
// list wal files in archive dir.
s = ListAllWALFiles(GetArchivalDirectoryName(), &walFiles, kArchivedLogFile);
if (!s.ok()) {
return s;
}
if (walFiles.empty()) {
return Status::IOError(" NO WAL Files present in the db");
}
// std::shared_ptr would have been useful here.
std::vector<LogFile>* probableWALFiles = new std::vector<LogFile>();
FindProbableWALFiles(&walFiles, probableWALFiles, seq);
if (probableWALFiles->empty()) {
return Status::IOError(" No wal files for the given seqNo. found");
}
TransactionLogIteratorImpl* impl =
new TransactionLogIteratorImpl(dbname_, &options_, seq, probableWALFiles);
*iter = impl;
return Status::OK();
}
Status DBImpl::FindProbableWALFiles(std::vector<LogFile>* const allLogs,
std::vector<LogFile>* const result,
const SequenceNumber target) {
assert(allLogs != NULL);
assert(result != NULL);
std::sort(allLogs->begin(), allLogs->end());
size_t start = 0;
size_t end = allLogs->size();
// Binary Search. avoid opening all files.
while (start < end) {
int mid = (start + end) / 2;
WriteBatch batch;
Status s = ReadFirstRecord(allLogs->at(mid), &batch);
if (!s.ok()) {
return s;
}
SequenceNumber currentSeqNum = WriteBatchInternal::Sequence(&batch);
if (currentSeqNum == target) {
start = mid;
end = mid;
} else if (currentSeqNum < target) {
start = mid;
} else {
end = mid;
}
}
assert( start == end);
for( size_t i = start; i < allLogs->size(); ++i) {
result->push_back(allLogs->at(i));
}
return Status::OK();
}
Status DBImpl::ReadFirstRecord(const LogFile& file, WriteBatch* const result) {
if (file.type == kAliveLogFile) {
std::string fname = LogFileName(dbname_, file.logNumber);
Status status = ReadFirstLine(fname, result);
if (!status.ok()) {
// check if the file got moved to archive.
std::string archivedFile = ArchivedLogFileName(dbname_, file.logNumber);
Status s = ReadFirstLine(archivedFile, result);
if (!s.ok()) {
return Status::IOError("Log File Has been deleted");
}
}
return Status::OK();
} else if (file.type == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(dbname_, file.logNumber);
Status status = ReadFirstLine(fname, result);
return status;
}
return Status::NotSupported("File Type Not Known");
}
Status DBImpl::ReadFirstLine(const std::string& fname,
WriteBatch* const batch) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status; // NULL if options_.paranoid_checks==false
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == NULL ? "(ignoring error) " : ""),
fname, static_cast<int>(bytes), s.ToString().c_str());
if (this->status != NULL && this->status->ok()) *this->status = s;
}
};
SequentialFile* file;
Status status = env_->NewSequentialFile(fname, &file);
if (!status.ok()) {
return status;
}
LogReporter reporter;
reporter.env = env_;
reporter.info_log = options_.info_log;
reporter.fname = fname.c_str();
reporter.status = (options_.paranoid_checks ? &status : NULL);
log::Reader reader(file, &reporter, true/*checksum*/,
0/*initial_offset*/);
std::string scratch;
Slice record;
if (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
return Status::IOError("Corruption noted");
// TODO read record's till the first no corrupt entry?
}
WriteBatchInternal::SetContents(batch, record);
return Status::OK();
}
return Status::IOError("Error reading from file " + fname);
}
Status DBImpl::ListAllWALFiles(const std::string& path,
std::vector<LogFile>* const logFiles,
WalFileType logType) {
assert(logFiles != NULL);
std::vector<std::string> allFiles;
const Status status = env_->GetChildren(path, &allFiles);
if (!status.ok()) {
return status;
}
for(std::vector<std::string>::iterator it = allFiles.begin();
it != allFiles.end();
++it) {
uint64_t number;
FileType type;
if (ParseFileName(*it, &number, &type) && type == kLogFile){
logFiles->push_back(LogFile(number, logType));
}
}
return status;
}
void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
assert(level >= 0);

@ -8,6 +8,7 @@
#include <deque>
#include <set>
#include "db/dbformat.h"
#include "db/log_file.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "leveldb/db.h"
@ -54,7 +55,8 @@ class DBImpl : public DB {
virtual Status EnableFileDeletions();
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size);
virtual Status GetUpdatesSince(SequenceNumber seq_number,
TransactionLogIterator ** iter);
// Return's the path of the archival directory.
std::string GetArchivalDirectoryName();
@ -178,6 +180,22 @@ protected:
void EvictObsoleteFiles(DeletionState& deletion_state);
void PurgeObsoleteWALFiles();
Status ListAllWALFiles(const std::string& path,
std::vector<LogFile>* logFiles,
WalFileType type);
// Find's all the log files which contain updates with seq no.
// Greater Than or Equal to the requested SequenceNumber
Status FindProbableWALFiles(std::vector<LogFile>* const allLogs,
std::vector<LogFile>* const result,
const SequenceNumber target);
Status ReadFirstRecord(const LogFile& file, WriteBatch* const result);
Status ReadFirstLine(const std::string& fname, WriteBatch* const batch);
// Constant after construction
const InternalFilterPolicy internal_filter_policy_;
bool owns_info_log_;

@ -2264,6 +2264,63 @@ TEST(DBTest, WALArchival) {
}
TEST(DBTest, TransactionLogIterator) {
std::string value(1024, '1');
Options options = CurrentOptions();
options.create_if_missing = true;
options.WAL_ttl_seconds = 1000;
DestroyAndReopen(&options);
Put("key1", value);
Put("key2", value);
Put("key2", value);
{
TransactionLogIterator* iter;
Status status = dbfull()->GetUpdatesSince(0, &iter);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(!iter->Valid());
iter->Next();
int i = 0;
SequenceNumber lastSequence = 0;
while (iter->Valid()) {
WriteBatch batch;
iter->GetBatch(&batch);
SequenceNumber current = WriteBatchInternal::Sequence(&batch);
// ASSERT_TRUE(current > lastSequence);
++i;
lastSequence = current;
ASSERT_TRUE(iter->status().ok());
iter->Next();
}
ASSERT_EQ(i, 3);
}
Reopen(&options);
{
Put("key4", value);
Put("key5", value);
Put("key6", value);
}
{
TransactionLogIterator* iter;
Status status = dbfull()->GetUpdatesSince(0, &iter);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(!iter->Valid());
iter->Next();
int i = 0;
SequenceNumber lastSequence = 0;
while (iter->Valid()) {
WriteBatch batch;
iter->GetBatch(&batch);
SequenceNumber current = WriteBatchInternal::Sequence(&batch);
ASSERT_TRUE(current > lastSequence);
lastSequence = current;
ASSERT_TRUE(iter->status().ok());
iter->Next();
++i;
}
ASSERT_EQ(i, 6);
}
}
TEST(DBTest, ReadCompaction) {
std::string value(4096, '4'); // a string of size 4K
{
@ -2526,6 +2583,11 @@ class ModelDB: public DB {
return Status::OK();
}
virtual Status GetUpdatesSince(leveldb::SequenceNumber,
leveldb::TransactionLogIterator**) {
return Status::NotSupported("Not supported in Model DB");
}
private:
class ModelIter: public Iterator {
public:

@ -11,6 +11,7 @@
#include "leveldb/filter_policy.h"
#include "leveldb/slice.h"
#include "leveldb/table_builder.h"
#include "leveldb/types.h"
#include "util/coding.h"
#include "util/logging.h"
@ -33,8 +34,6 @@ enum ValueType {
// ValueType, not the lowest).
static const ValueType kValueTypeForSeek = kTypeValue;
typedef uint64_t SequenceNumber;
// We leave eight bits empty at the bottom so a type and sequence#
// can be packed together into 64-bits.
static const SequenceNumber kMaxSequenceNumber =

@ -2,13 +2,13 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/filename.h"
#include <ctype.h>
#include <stdio.h>
#include "db/filename.h"
#include "db/dbformat.h"
#include "leveldb/env.h"
#include "util/logging.h"
#include <iostream>
namespace leveldb {
@ -57,6 +57,11 @@ std::string LogFileName(const std::string& name, uint64_t number) {
return MakeFileName(name, number, "log");
}
std::string ArchivedLogFileName(const std::string& name, uint64_t number) {
assert(number > 0);
return MakeFileName(name + "/archive", number, "log");
}
std::string TableFileName(const std::string& name, uint64_t number) {
assert(number > 0);
return MakeFileName(name, number, "sst");

@ -32,6 +32,11 @@ enum FileType {
// "dbname".
extern std::string LogFileName(const std::string& dbname, uint64_t number);
// Return the name of the archived log file with the specified number
// in the db named by "dbname". The result will be prefixed with "dbname".
extern std::string ArchivedLogFileName(const std::string& dbname,
uint64_t num);
// Return the name of the sstable with the specified number
// in the db named by "dbname". The result will be prefixed with
// "dbname".

@ -0,0 +1,48 @@
// Copyright 2008-present Facebook. All Rights Reserved.
#ifndef STORAGE_LEVELDB_DB_LOG_FILE_H_
#define STORAGE_LEVELDB_DB_LOG_FILE_H_
namespace leveldb {
enum WalFileType {
kArchivedLogFile = 0,
kAliveLogFile = 1
} ;
class LogFile {
public:
uint64_t logNumber;
WalFileType type;
LogFile(uint64_t logNum,WalFileType logType) :
logNumber(logNum),
type(logType) {}
LogFile(const LogFile& that) {
logNumber = that.logNumber;
type = that.type;
}
bool operator < (const LogFile& that) const {
return logNumber < that.logNumber;
}
std::string ToString() const {
char response[100];
const char* typeOfLog;
if (type == kAliveLogFile) {
typeOfLog = "Alive Log";
} else {
typeOfLog = "Archived Log";
}
sprintf(response,
"LogNumber : %ld LogType : %s",
logNumber,
typeOfLog);
return std::string(response);
}
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_DB_LOG_FILE_H_

@ -0,0 +1,146 @@
#include "db/transaction_log_iterator_impl.h"
#include "db/write_batch_internal.h"
#include "db/filename.h"
namespace leveldb {
TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dbname,
const Options* options,
SequenceNumber& seq,
std::vector<LogFile>* files) :
dbname_(dbname),
options_(options),
sequenceNumber_(seq),
files_(files),
started_(false),
isValid_(true),
currentFileIndex_(0),
currentLogReader_(NULL) {
assert( files_ != NULL);
}
LogReporter
TransactionLogIteratorImpl::NewLogReporter(const uint64_t logNumber) {
LogReporter reporter;
reporter.env = options_->env;
reporter.info_log = options_->info_log;
reporter.log_number = logNumber;
return reporter;
}
Status TransactionLogIteratorImpl::OpenLogFile(const LogFile& logFile,
SequentialFile** file) {
Env* env = options_->env;
if (logFile.type == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(dbname_, logFile.logNumber);
return env->NewSequentialFile(fname, file);
} else {
std::string fname = LogFileName(dbname_, logFile.logNumber);
Status status = env->NewSequentialFile(fname, file);
if (!status.ok()) {
// If cannot open file in DB directory.
// Try the archive dir, as it could have moved in the meanwhile.
fname = ArchivedLogFileName(dbname_, logFile.logNumber);
status = env->NewSequentialFile(fname, file);
if (!status.ok()) {
// TODO stringprintf
return Status::IOError(" Requested file not present in the dir");
}
}
return status;
}
}
void TransactionLogIteratorImpl::GetBatch(WriteBatch* batch) {
assert(isValid_); // cannot call in a non valid state.
WriteBatchInternal::SetContents(batch, currentRecord_);
}
Status TransactionLogIteratorImpl::status() {
return currentStatus_;
}
bool TransactionLogIteratorImpl::Valid() {
return started_ && isValid_;
}
void TransactionLogIteratorImpl::Next() {
// First seek to the given seqNo. in the current file.
LogFile currentLogFile = files_->at(currentFileIndex_);
LogReporter reporter = NewLogReporter(currentLogFile.logNumber);
std::string scratch;
Slice record;
if (!started_) {
SequentialFile* file = NULL;
Status status = OpenLogFile(currentLogFile, &file);
if (!status.ok()) {
isValid_ = false;
currentStatus_ = status;
return;
}
assert(file != NULL);
WriteBatch batch;
log::Reader* reader = new log::Reader(file, &reporter, true, 0);
assert(reader != NULL);
while (reader->ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);
SequenceNumber currentNum = WriteBatchInternal::Sequence(&batch);
if (currentNum >= sequenceNumber_) {
isValid_ = true;
currentRecord_ = record;
currentLogReader_ = reader;
break;
}
}
if (!isValid_) {
// TODO read the entire first file. and did not find the seq number.
// Error out.
currentStatus_ =
Status::NotFound("Did not find the Seq no. in first file");
}
started_ = true;
} else {
LOOK_NEXT_FILE:
assert(currentLogReader_ != NULL);
bool openNextFile = true;
while (currentLogReader_->ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
continue;
} else {
currentRecord_ = record;
openNextFile = false;
break;
}
}
if (openNextFile) {
if (currentFileIndex_ < files_->size() - 1) {
++currentFileIndex_;
delete currentLogReader_;
SequentialFile *file;
Status status = OpenLogFile(files_->at(currentFileIndex_), &file);
if (!status.ok()) {
isValid_ = false;
currentStatus_ = status;
return;
}
currentLogReader_ = new log::Reader(file, &reporter, true, 0);
goto LOOK_NEXT_FILE;
} else {
// LOOKED AT FILES. WE ARE DONE HERE.
isValid_ = false;
currentStatus_ = Status::IOError(" NO MORE DATA LEFT");
}
}
}
}
} // namespace leveldb

@ -0,0 +1,66 @@
// Copyright 2008-present Facebook. All Rights Reserved.
#ifndef STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_
#define STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_
#include <vector>
#include "leveldb/env.h"
#include "leveldb/options.h"
#include "leveldb/types.h"
#include "leveldb/transaction_log_iterator.h"
#include "db/log_file.h"
#include "db/log_reader.h"
namespace leveldb {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
uint64_t log_number;
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "%ld: dropping %d bytes; %s",
log_number, static_cast<int>(bytes), s.ToString().c_str());
}
};
class TransactionLogIteratorImpl : public TransactionLogIterator {
public:
TransactionLogIteratorImpl(const std::string& dbname,
const Options* options,
SequenceNumber& seqNum,
std::vector<LogFile>* files);
virtual ~TransactionLogIteratorImpl() {
// TODO move to cc file.
if (currentLogReader_ != NULL) {
delete currentLogReader_;
}
delete files_;
}
virtual bool Valid();
virtual void Next();
virtual Status status();
virtual void GetBatch(WriteBatch* batch);
private:
const std::string& dbname_;
const Options* options_;
const uint64_t sequenceNumber_;
const std::vector<LogFile>* files_;
bool started_;
bool isValid_; // not valid when it starts of.
Status currentStatus_;
size_t currentFileIndex_;
Slice currentRecord_;
log::Reader* currentLogReader_;
Status OpenLogFile(const LogFile& logFile, SequentialFile** file);
LogReporter NewLogReporter(uint64_t logNumber);
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_INCLUDE_WRITES_ITERATOR_IMPL_H_

@ -5,6 +5,7 @@
#ifndef STORAGE_LEVELDB_DB_WRITE_BATCH_INTERNAL_H_
#define STORAGE_LEVELDB_DB_WRITE_BATCH_INTERNAL_H_
#include "leveldb/types.h"
#include "leveldb/write_batch.h"
namespace leveldb {

@ -10,6 +10,8 @@
#include <vector>
#include "leveldb/iterator.h"
#include "leveldb/options.h"
#include "leveldb/types.h"
#include "leveldb/transaction_log_iterator.h"
namespace leveldb {
@ -180,6 +182,19 @@ class DB {
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size) = 0;
// Return's an iterator for all writes since the sequence number
// Status::ok if iterator is valid.
// The iterator internally holds references to the available log files.
// It automatically takes care of closing a file with no-updates left, and
// opening the next one.
// If the sequence number is non existent. it returns an iterator at a seq_no
// just greater than the requested seq_no.
// Must set WAL_ttl_seconds to a large value to use this api.
// else the WAL files will get
// cleared aggressively and the iterator might keep getting invalid before
// an update is read.
virtual Status GetUpdatesSince(SequenceNumber seq_number,
TransactionLogIterator** iter) = 0;
private:
// No copying allowed
DB(const DB&);

@ -0,0 +1,34 @@
// Copyright 2008-present Facebook. All Rights Reserved.
#ifndef STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
#define STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
namespace leveldb {
// A TransactionLogIterator is used to iterate over the Transaction's in a db.
class TransactionLogIterator {
public:
TransactionLogIterator() {}
virtual ~TransactionLogIterator() {}
// An iterator is either positioned at a WriteBatch or not valid.
// This method returns true if the iterator is valid.
virtual bool Valid() = 0;
// Moves the iterator to the next WriteBatch.
// REQUIRES: Valid() to be true.
virtual void Next() = 0;
// Return's ok if the iterator is in a valid stated.
// Return the Error Status when the iterator is not Valid.
virtual Status status() = 0;
// If valid return's the current write_batch.
virtual void GetBatch(WriteBatch* batch) = 0;
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_

@ -0,0 +1,14 @@
#ifndef STORAGE_LEVELDB_INCLUDE_TYPES_H_
#define STORAGE_LEVELDB_INCLUDE_TYPES_H_
#include <stdint.h>
namespace leveldb {
// Define all public custom types here.
// Represents a sequence number in a WAL file.
typedef uint64_t SequenceNumber;
} // namespace leveldb
#endif // STORAGE_LEVELDB_INCLUDE_TYPES_H_
Loading…
Cancel
Save