API for getting archived log files

Summary: Also expanded class LogFile to have startSequene and FileSize and exposed it publicly

Test Plan: make all check

Reviewers: dhruba, haobo

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D12087
main
Mayank Agarwal 12 years ago
parent e1346968d8
commit 8a3547d38e
  1. 48
      db/db_filesnapshot.cc
  2. 149
      db/db_impl.cc
  3. 30
      db/db_impl.h
  4. 8
      db/db_test.cc
  5. 7
      db/filename.cc
  6. 48
      db/log_file.h
  7. 43
      db/transaction_log_impl.cc
  8. 48
      db/transaction_log_impl.h
  9. 10
      include/leveldb/db.h
  10. 36
      include/leveldb/transaction_log.h
  11. 2
      include/leveldb/types.h
  12. 9
      include/utilities/stackable_db.h
  13. 71
      tools/db_repl_stress.cc
  14. 8
      utilities/ttl/db_ttl.cc
  15. 4
      utilities/ttl/db_ttl.h

@ -2,10 +2,11 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. // found in the LICENSE file.
#include "db/db_impl.h" #include <algorithm>
#include "db/filename.h"
#include <string> #include <string>
#include <stdint.h> #include <stdint.h>
#include "db/db_impl.h"
#include "db/filename.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
@ -66,4 +67,47 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
return Status::OK(); return Status::OK();
} }
Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
// First get sorted files in archive dir, then append sorted files from main
// dir to maintain sorted order
// list wal files in archive dir.
Status s;
std::string archivedir = ArchivalDirectory(dbname_);
if (env_->FileExists(archivedir)) {
s = AppendSortedWalsOfType(archivedir, files, kArchivedLogFile);
if (!s.ok()) {
return s;
}
}
// list wal files in main db dir.
s = AppendSortedWalsOfType(dbname_, files, kAliveLogFile);
if (!s.ok()) {
return s;
}
return s;
}
Status DBImpl::DeleteWalFiles(const VectorLogPtr& files) {
Status s;
std::string archivedir = ArchivalDirectory(dbname_);
std::string files_not_deleted;
for (const auto& wal : files) {
/* Try deleting in archive dir. If fails, try deleting in main db dir.
* This is efficient because all except for very few wal files will be in
* archive. Checking for WalType is not much helpful because alive wal could
be archived now.
*/
if (!env_->DeleteFile(archivedir + "/" + wal->Filename()).ok() &&
!env_->DeleteFile(dbname_ + "/" + wal->Filename()).ok()) {
files_not_deleted.append(wal->Filename());
}
}
if (!files_not_deleted.empty()) {
return Status::IOError("Deleted all requested files except: " +
files_not_deleted);
}
return Status::OK();
}
} }

@ -27,7 +27,7 @@
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "db/transaction_log_iterator_impl.h" #include "db/transaction_log_impl.h"
#include "leveldb/compaction_filter.h" #include "leveldb/compaction_filter.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
@ -1046,34 +1046,24 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() {
Status DBImpl::GetUpdatesSince(SequenceNumber seq, Status DBImpl::GetUpdatesSince(SequenceNumber seq,
unique_ptr<TransactionLogIterator>* iter) { unique_ptr<TransactionLogIterator>* iter) {
// Get All Log Files. if (seq > last_flushed_sequence_) {
// Sort Files return Status::IOError("Requested sequence not yet written in the db");
// Get the first entry from each file. }
// Get all sorted Wal Files.
// Do binary search and open files and find the seq number. // Do binary search and open files and find the seq number.
std::vector<LogFile> walFiles; std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
// list wal files in main db dir. Status s = GetSortedWalFiles(*wal_files);
Status s = ListAllWALFiles(dbname_, &walFiles, kAliveLogFile);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
// list wal files in archive dir.
std::string archivedir = ArchivalDirectory(dbname_);
if (env_->FileExists(archivedir)) {
s = ListAllWALFiles(archivedir, &walFiles, kArchivedLogFile);
if (!s.ok()) {
return s;
}
}
if (walFiles.empty()) { if (wal_files->empty()) {
return Status::IOError(" NO WAL Files present in the db"); return Status::IOError(" NO WAL Files present in the db");
} }
// std::shared_ptr would have been useful here. // std::shared_ptr would have been useful here.
std::unique_ptr<std::vector<LogFile>> probableWALFiles( s = RetainProbableWalFiles(*wal_files, seq);
new std::vector<LogFile>());
s = FindProbableWALFiles(&walFiles, probableWALFiles.get(), seq);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -1082,90 +1072,61 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
&options_, &options_,
storage_options_, storage_options_,
seq, seq,
std::move(probableWALFiles), std::move(wal_files),
&last_flushed_sequence_)); &last_flushed_sequence_));
iter->get()->Next(); iter->get()->Next();
return iter->get()->status(); return iter->get()->status();
} }
Status DBImpl::FindProbableWALFiles(std::vector<LogFile>* const allLogs, Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs,
std::vector<LogFile>* const result, const SequenceNumber target) {
const SequenceNumber target) {
assert(allLogs != nullptr);
assert(result != nullptr);
std::sort(allLogs->begin(), allLogs->end());
long start = 0; // signed to avoid overflow when target is < first file. long start = 0; // signed to avoid overflow when target is < first file.
long end = static_cast<long>(allLogs->size()) - 1; long end = static_cast<long>(all_logs.size()) - 1;
// Binary Search. avoid opening all files. // Binary Search. avoid opening all files.
while (end >= start) { while (end >= start) {
long mid = start + (end - start) / 2; // Avoid overflow. long mid = start + (end - start) / 2; // Avoid overflow.
WriteBatch batch; SequenceNumber current_seq_num = all_logs.at(mid)->StartSequence();
Status s = ReadFirstRecord(allLogs->at(mid), &batch); if (current_seq_num == target) {
if (!s.ok()) {
if (CheckFileExistsAndEmpty(allLogs->at(mid))) {
allLogs->erase(allLogs->begin() + mid);
--end;
continue;
}
return s;
}
SequenceNumber currentSeqNum = WriteBatchInternal::Sequence(&batch);
if (currentSeqNum == target) {
start = mid;
end = mid; end = mid;
break; break;
} else if (currentSeqNum < target) { } else if (current_seq_num < target) {
start = mid + 1; start = mid + 1;
} else { } else {
end = mid - 1; end = mid - 1;
} }
} }
size_t startIndex = std::max(0l, end); // end could be -ve. size_t start_index = std::max(0l, end); // end could be -ve.
for (size_t i = startIndex; i < allLogs->size(); ++i) { // The last wal file is always included
result->push_back(allLogs->at(i)); all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
}
if (result->empty()) {
return Status::IOError(
"No probable files. Check if the db contains log files");
}
return Status::OK(); return Status::OK();
} }
bool DBImpl::CheckFileExistsAndEmpty(const LogFile& file) { bool DBImpl::CheckWalFileExistsAndEmpty(const WalFileType type,
if (file.type == kAliveLogFile) { const uint64_t number) {
const std::string fname = LogFileName(dbname_, file.logNumber); const std::string fname = (type == kAliveLogFile) ?
uint64_t file_size; LogFileName(dbname_, number) : ArchivedLogFileName(dbname_, number);
Status s = env_->GetFileSize(fname, &file_size);
if (s.ok() && file_size == 0) {
return true;
}
}
const std::string fname = ArchivedLogFileName(dbname_, file.logNumber);
uint64_t file_size; uint64_t file_size;
Status s = env_->GetFileSize(fname, &file_size); Status s = env_->GetFileSize(fname, &file_size);
if (s.ok() && file_size == 0) { return (s.ok() && (file_size == 0));
return true;
}
return false;
} }
Status DBImpl::ReadFirstRecord(const LogFile& file, WriteBatch* const result) { Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number,
WriteBatch* const result) {
if (file.type == kAliveLogFile) { if (type == kAliveLogFile) {
std::string fname = LogFileName(dbname_, file.logNumber); std::string fname = LogFileName(dbname_, number);
Status status = ReadFirstLine(fname, result); Status status = ReadFirstLine(fname, result);
if (!status.ok()) { if (!status.ok()) {
// check if the file got moved to archive. // check if the file got moved to archive.
std::string archivedFile = ArchivedLogFileName(dbname_, file.logNumber); std::string archived_file = ArchivedLogFileName(dbname_, number);
Status s = ReadFirstLine(archivedFile, result); Status s = ReadFirstLine(archived_file, result);
if (!s.ok()) { if (!s.ok()) {
return Status::IOError("Log File Has been deleted"); return Status::IOError("Log File has been deleted");
} }
} }
return Status::OK(); return Status::OK();
} else if (file.type == kArchivedLogFile) { } else if (type == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(dbname_, file.logNumber); std::string fname = ArchivedLogFileName(dbname_, number);
Status status = ReadFirstLine(fname, result); Status status = ReadFirstLine(fname, result);
return status; return status;
} }
@ -1204,6 +1165,7 @@ Status DBImpl::ReadFirstLine(const std::string& fname,
0/*initial_offset*/); 0/*initial_offset*/);
std::string scratch; std::string scratch;
Slice record; Slice record;
if (reader.ReadRecord(&record, &scratch) && status.ok()) { if (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (record.size() < 12) { if (record.size() < 12) {
reporter.Corruption( reporter.Corruption(
@ -1217,22 +1179,49 @@ Status DBImpl::ReadFirstLine(const std::string& fname,
return Status::IOError("Error reading from file " + fname); return Status::IOError("Error reading from file " + fname);
} }
Status DBImpl::ListAllWALFiles(const std::string& path, struct CompareLogByPointer {
std::vector<LogFile>* const logFiles, bool operator() (const unique_ptr<LogFile>& a,
WalFileType logType) { const unique_ptr<LogFile>& b) {
assert(logFiles != nullptr); LogFileImpl* a_impl = dynamic_cast<LogFileImpl*>(a.get());
std::vector<std::string> allFiles; LogFileImpl* b_impl = dynamic_cast<LogFileImpl*>(b.get());
const Status status = env_->GetChildren(path, &allFiles); return *a_impl < *b_impl;
}
};
Status DBImpl::AppendSortedWalsOfType(const std::string& path,
VectorLogPtr& log_files, WalFileType log_type) {
std::vector<std::string> all_files;
const Status status = env_->GetChildren(path, &all_files);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
for (const auto& f : allFiles) { log_files.reserve(log_files.size() + all_files.size());
for (const auto& f : all_files) {
uint64_t number; uint64_t number;
FileType type; FileType type;
if (ParseFileName(f, &number, &type) && type == kLogFile){ if (ParseFileName(f, &number, &type) && type == kLogFile){
logFiles->push_back(LogFile(number, logType));
WriteBatch batch;
Status s = ReadFirstRecord(log_type, number, &batch);
if (!s.ok()) {
if (CheckWalFileExistsAndEmpty(log_type, number)) {
continue;
}
return s;
}
uint64_t size_bytes;
s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
if (!s.ok()) {
return s;
}
log_files.push_back(std::move(unique_ptr<LogFile>(new LogFileImpl(
number, log_type, WriteBatchInternal::Sequence(&batch), size_bytes))));
} }
} }
CompareLogByPointer compare_log_files;
std::sort(log_files.begin(), log_files.end(), compare_log_files);
return status; return status;
} }

@ -10,15 +10,15 @@
#include <set> #include <set>
#include <vector> #include <vector>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/log_file.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/snapshot.h" #include "db/snapshot.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/memtablerep.h"
#include "leveldb/transaction_log.h"
#include "port/port.h" #include "port/port.h"
#include "util/stats_logger.h" #include "util/stats_logger.h"
#include "memtablelist.h" #include "memtablelist.h"
#include "leveldb/memtablerep.h"
#ifdef USE_SCRIBE #ifdef USE_SCRIBE
#include "scribe/scribe_logger.h" #include "scribe/scribe_logger.h"
@ -73,6 +73,8 @@ class DBImpl : public DB {
virtual Status EnableFileDeletions(); virtual Status EnableFileDeletions();
virtual Status GetLiveFiles(std::vector<std::string>&, virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size); uint64_t* manifest_file_size);
virtual Status GetSortedWalFiles(VectorLogPtr& files);
virtual Status DeleteWalFiles(const VectorLogPtr& files);
virtual SequenceNumber GetLatestSequenceNumber(); virtual SequenceNumber GetLatestSequenceNumber();
virtual Status GetUpdatesSince(SequenceNumber seq_number, virtual Status GetUpdatesSince(SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter); unique_ptr<TransactionLogIterator>* iter);
@ -183,7 +185,7 @@ class DBImpl : public DB {
void MaybeScheduleCompaction(); void MaybeScheduleCompaction();
static void BGWork(void* db); static void BGWork(void* db);
void BackgroundCall(); void BackgroundCall();
Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state); Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state);
void CleanupCompaction(CompactionState* compact); void CleanupCompaction(CompactionState* compact);
Status DoCompactionWork(CompactionState* compact); Status DoCompactionWork(CompactionState* compact);
@ -208,19 +210,21 @@ class DBImpl : public DB {
void PurgeObsoleteWALFiles(); void PurgeObsoleteWALFiles();
Status ListAllWALFiles(const std::string& path, Status AppendSortedWalsOfType(const std::string& path,
std::vector<LogFile>* logFiles, VectorLogPtr& log_files,
WalFileType type); WalFileType type);
// Find's all the log files which contain updates with seq no. // Requires: all_logs should be sorted with earliest log file first
// Greater Than or Equal to the requested SequenceNumber // Retains all log files in all_logs which contain updates with seq no.
Status FindProbableWALFiles(std::vector<LogFile>* const allLogs, // Greater Than or Equal to the requested SequenceNumber.
std::vector<LogFile>* const result, Status RetainProbableWalFiles(VectorLogPtr& all_logs,
const SequenceNumber target); const SequenceNumber target);
// return true if // return true if
bool CheckFileExistsAndEmpty(const LogFile& file); bool CheckWalFileExistsAndEmpty(const WalFileType type,
const uint64_t number);
Status ReadFirstRecord(const LogFile& file, WriteBatch* const result); Status ReadFirstRecord(const WalFileType type, const uint64_t number,
WriteBatch* const result);
Status ReadFirstLine(const std::string& fname, WriteBatch* const batch); Status ReadFirstLine(const std::string& fname, WriteBatch* const batch);

@ -3461,6 +3461,14 @@ class ModelDB: public DB {
return Status::OK(); return Status::OK();
} }
virtual Status GetSortedWalFiles(VectorLogPtr& files) {
return Status::OK();
}
virtual Status DeleteWalFiles(const VectorLogPtr& files) {
return Status::OK();
}
virtual SequenceNumber GetLatestSequenceNumber() { virtual SequenceNumber GetLatestSequenceNumber() {
return 0; return 0;
} }

@ -46,10 +46,13 @@ extern Status WriteStringToFileSync(Env* env, const Slice& data,
static std::string MakeFileName(const std::string& name, uint64_t number, static std::string MakeFileName(const std::string& name, uint64_t number,
const char* suffix) { const char* suffix) {
char buf[100]; char buf[100];
snprintf(buf, sizeof(buf), "/%06llu.%s", snprintf(buf, sizeof(buf), "%06llu.%s",
static_cast<unsigned long long>(number), static_cast<unsigned long long>(number),
suffix); suffix);
return name + buf; if (name.empty()) {
return buf;
}
return name + "/" + buf;
} }
std::string LogFileName(const std::string& name, uint64_t number) { std::string LogFileName(const std::string& name, uint64_t number) {

@ -1,48 +0,0 @@
// Copyright 2008-present Facebook. All Rights Reserved.
#ifndef STORAGE_LEVELDB_DB_LOG_FILE_H_
#define STORAGE_LEVELDB_DB_LOG_FILE_H_
namespace leveldb {
enum WalFileType {
kArchivedLogFile = 0,
kAliveLogFile = 1
} ;
class LogFile {
public:
uint64_t logNumber;
WalFileType type;
LogFile(uint64_t logNum,WalFileType logType) :
logNumber(logNum),
type(logType) {}
LogFile(const LogFile& that) {
logNumber = that.logNumber;
type = that.type;
}
bool operator < (const LogFile& that) const {
return logNumber < that.logNumber;
}
std::string ToString() const {
char response[100];
const char* typeOfLog;
if (type == kAliveLogFile) {
typeOfLog = "Alive Log";
} else {
typeOfLog = "Archived Log";
}
sprintf(response,
"LogNumber : %ld LogType : %s",
logNumber,
typeOfLog);
return std::string(response);
}
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_DB_LOG_FILE_H_

@ -1,6 +1,5 @@
#include "db/transaction_log_iterator_impl.h" #include "db/transaction_log_impl.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "db/filename.h"
namespace leveldb { namespace leveldb {
@ -8,39 +7,39 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dbname, const std::string& dbname,
const Options* options, const Options* options,
const EnvOptions& soptions, const EnvOptions& soptions,
SequenceNumber& seq, const SequenceNumber seq,
std::unique_ptr<std::vector<LogFile>> files, std::unique_ptr<VectorLogPtr> files,
SequenceNumber const * const lastFlushedSequence) : SequenceNumber const * const lastFlushedSequence) :
dbname_(dbname), dbname_(dbname),
options_(options), options_(options),
soptions_(soptions), soptions_(soptions),
startingSequenceNumber_(seq), startingSequenceNumber_(seq),
files_(std::move(files)), files_(std::move(files)),
started_(false), started_(false),
isValid_(false), isValid_(false),
currentFileIndex_(0), currentFileIndex_(0),
lastFlushedSequence_(lastFlushedSequence) { lastFlushedSequence_(lastFlushedSequence) {
assert(startingSequenceNumber_ <= *lastFlushedSequence_);
assert(files_.get() != nullptr); assert(files_.get() != nullptr);
assert(lastFlushedSequence_);
reporter_.env = options_->env; reporter_.env = options_->env;
reporter_.info_log = options_->info_log.get(); reporter_.info_log = options_->info_log.get();
} }
Status TransactionLogIteratorImpl::OpenLogFile( Status TransactionLogIteratorImpl::OpenLogFile(
const LogFile& logFile, const LogFile* logFile,
unique_ptr<SequentialFile>* file) { unique_ptr<SequentialFile>* file) {
Env* env = options_->env; Env* env = options_->env;
if (logFile.type == kArchivedLogFile) { if (logFile->Type() == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(dbname_, logFile.logNumber); std::string fname = ArchivedLogFileName(dbname_, logFile->LogNumber());
return env->NewSequentialFile(fname, file, soptions_); return env->NewSequentialFile(fname, file, soptions_);
} else { } else {
std::string fname = LogFileName(dbname_, logFile.logNumber); std::string fname = LogFileName(dbname_, logFile->LogNumber());
Status status = env->NewSequentialFile(fname, file, soptions_); Status status = env->NewSequentialFile(fname, file, soptions_);
if (!status.ok()) { if (!status.ok()) {
// If cannot open file in DB directory. // If cannot open file in DB directory.
// Try the archive dir, as it could have moved in the meanwhile. // Try the archive dir, as it could have moved in the meanwhile.
fname = ArchivedLogFileName(dbname_, logFile.logNumber); fname = ArchivedLogFileName(dbname_, logFile->LogNumber());
status = env->NewSequentialFile(fname, file, soptions_); status = env->NewSequentialFile(fname, file, soptions_);
if (!status.ok()) { if (!status.ok()) {
return Status::IOError(" Requested file not present in the dir"); return Status::IOError(" Requested file not present in the dir");
@ -67,7 +66,7 @@ bool TransactionLogIteratorImpl::Valid() {
} }
void TransactionLogIteratorImpl::Next() { void TransactionLogIteratorImpl::Next() {
LogFile currentLogFile = files_.get()->at(currentFileIndex_); LogFile* currentLogFile = files_.get()->at(currentFileIndex_).get();
// First seek to the given seqNo. in the current file. // First seek to the given seqNo. in the current file.
std::string scratch; std::string scratch;
@ -129,7 +128,7 @@ void TransactionLogIteratorImpl::Next() {
if (openNextFile) { if (openNextFile) {
if (currentFileIndex_ < files_.get()->size() - 1) { if (currentFileIndex_ < files_.get()->size() - 1) {
++currentFileIndex_; ++currentFileIndex_;
Status status = OpenLogReader(files_.get()->at(currentFileIndex_)); Status status =OpenLogReader(files_.get()->at(currentFileIndex_).get());
if (!status.ok()) { if (!status.ok()) {
isValid_ = false; isValid_ = false;
currentStatus_ = status; currentStatus_ = status;
@ -157,7 +156,7 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
currentStatus_ = Status::OK(); currentStatus_ = Status::OK();
} }
Status TransactionLogIteratorImpl::OpenLogReader(const LogFile& logFile) { Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
unique_ptr<SequentialFile> file; unique_ptr<SequentialFile> file;
Status status = OpenLogFile(logFile, &file); Status status = OpenLogFile(logFile, &file);
if (!status.ok()) { if (!status.ok()) {

@ -7,9 +7,9 @@
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/options.h" #include "leveldb/options.h"
#include "leveldb/types.h" #include "leveldb/types.h"
#include "leveldb/transaction_log_iterator.h" #include "leveldb/transaction_log.h"
#include "db/log_file.h"
#include "db/log_reader.h" #include "db/log_reader.h"
#include "db/filename.h"
namespace leveldb { namespace leveldb {
@ -21,13 +21,45 @@ struct LogReporter : public log::Reader::Reporter {
} }
}; };
class LogFileImpl : public LogFile {
public:
LogFileImpl(uint64_t logNum, WalFileType logType, SequenceNumber startSeq,
uint64_t sizeBytes) :
logNumber_(logNum),
type_(logType),
startSequence_(startSeq),
sizeFileBytes_(sizeBytes) {
}
std::string Filename() const { return LogFileName("", logNumber_); }
uint64_t LogNumber() const { return logNumber_; }
WalFileType Type() const { return type_; }
SequenceNumber StartSequence() const { return startSequence_; }
uint64_t SizeFileBytes() const { return sizeFileBytes_; }
bool operator < (const LogFile& that) const {
return LogNumber() < that.LogNumber();
}
private:
uint64_t logNumber_;
WalFileType type_;
SequenceNumber startSequence_;
uint64_t sizeFileBytes_;
};
class TransactionLogIteratorImpl : public TransactionLogIterator { class TransactionLogIteratorImpl : public TransactionLogIterator {
public: public:
TransactionLogIteratorImpl(const std::string& dbname, TransactionLogIteratorImpl(const std::string& dbname,
const Options* options, const Options* options,
const EnvOptions& soptions, const EnvOptions& soptions,
SequenceNumber& seqNum, const SequenceNumber seqNum,
std::unique_ptr<std::vector<LogFile>> files, std::unique_ptr<VectorLogPtr> files,
SequenceNumber const * const lastFlushedSequence); SequenceNumber const * const lastFlushedSequence);
virtual bool Valid(); virtual bool Valid();
@ -42,22 +74,22 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
const std::string& dbname_; const std::string& dbname_;
const Options* options_; const Options* options_;
const EnvOptions& soptions_; const EnvOptions& soptions_;
const uint64_t startingSequenceNumber_; const SequenceNumber startingSequenceNumber_;
std::unique_ptr<std::vector<LogFile>> files_; std::unique_ptr<VectorLogPtr> files_;
bool started_; bool started_;
bool isValid_; // not valid when it starts of. bool isValid_; // not valid when it starts of.
Status currentStatus_; Status currentStatus_;
size_t currentFileIndex_; size_t currentFileIndex_;
std::unique_ptr<WriteBatch> currentBatch_; std::unique_ptr<WriteBatch> currentBatch_;
unique_ptr<log::Reader> currentLogReader_; unique_ptr<log::Reader> currentLogReader_;
Status OpenLogFile(const LogFile& logFile, unique_ptr<SequentialFile>* file); Status OpenLogFile(const LogFile* logFile, unique_ptr<SequentialFile>* file);
LogReporter reporter_; LogReporter reporter_;
SequenceNumber const * const lastFlushedSequence_; SequenceNumber const * const lastFlushedSequence_;
// represents the sequence number being read currently. // represents the sequence number being read currently.
SequenceNumber currentSequence_; SequenceNumber currentSequence_;
void UpdateCurrentWriteBatch(const Slice& record); void UpdateCurrentWriteBatch(const Slice& record);
Status OpenLogReader(const LogFile& file); Status OpenLogReader(const LogFile* file);
}; };

@ -12,7 +12,7 @@
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
#include "leveldb/options.h" #include "leveldb/options.h"
#include "leveldb/types.h" #include "leveldb/types.h"
#include "leveldb/transaction_log_iterator.h" #include "leveldb/transaction_log.h"
namespace leveldb { namespace leveldb {
@ -232,6 +232,14 @@ class DB {
virtual Status GetLiveFiles(std::vector<std::string>&, virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size) = 0; uint64_t* manifest_file_size) = 0;
// Retrieve the sorted list of all wal files with earliest file first
virtual Status GetSortedWalFiles(VectorLogPtr& files) = 0;
// Delete wal files in files. These can be either live or archived.
// Returns Status::OK if all files could be deleted, otherwise Status::IOError
// which contains information about files that could not be deleted.
virtual Status DeleteWalFiles(const VectorLogPtr& files) = 0;
// The sequence number of the most recent transaction. // The sequence number of the most recent transaction.
virtual SequenceNumber GetLatestSequenceNumber() = 0; virtual SequenceNumber GetLatestSequenceNumber() = 0;

@ -3,10 +3,46 @@
#define STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_ #define STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_
#include "leveldb/status.h" #include "leveldb/status.h"
#include "leveldb/types.h"
#include "leveldb/write_batch.h" #include "leveldb/write_batch.h"
namespace leveldb { namespace leveldb {
class LogFile;
typedef std::vector<std::unique_ptr<LogFile>> VectorLogPtr;
enum WalFileType {
/* Indicates that WAL file is in archive directory. WAL files are moved from
* the main db directory to archive directory once they are not live and stay
* there for a duration of WAL_ttl_seconds which can be set in Options
*/
kArchivedLogFile = 0,
/* Indicates that WAL file is live and resides in the main db directory */
kAliveLogFile = 1
} ;
class LogFile {
public:
LogFile() {}
virtual ~LogFile() {}
// Returns log file's name excluding the db path
virtual std::string Filename() const = 0;
// Primary identifier for log file.
// This is directly proportional to creation time of the log file
virtual uint64_t LogNumber() const = 0;
// Log file can be either alive or archived
virtual WalFileType Type() const = 0;
// Starting sequence number of writebatch written in this log file
virtual SequenceNumber StartSequence() const = 0;
// Size of log file on disk in Bytes
virtual uint64_t SizeFileBytes() const = 0;
};
struct BatchResult { struct BatchResult {
SequenceNumber sequence; SequenceNumber sequence;

@ -7,7 +7,7 @@ namespace leveldb {
// Define all public custom types here. // Define all public custom types here.
// Represents a sequence number in a WAL file. // Represents a sequence number in a WAL file.
typedef uint64_t SequenceNumber; typedef uint64_t SequenceNumber;
} // namespace leveldb } // namespace leveldb

@ -139,6 +139,15 @@ class StackableDB : public DB {
return sdb_->GetLatestSequenceNumber(); return sdb_->GetLatestSequenceNumber();
} }
virtual Status GetSortedWalFiles(VectorLogPtr& files) override {
return sdb_->GetSortedWalFiles(files);
}
virtual Status DeleteWalFiles(const VectorLogPtr& files)
override{
return sdb_->DeleteWalFiles(files);
}
virtual Status GetUpdatesSince(SequenceNumber seq_number, virtual Status GetUpdatesSince(SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter) unique_ptr<TransactionLogIterator>* iter)
override { override {

@ -1,4 +1,3 @@
#include <cstdio> #include <cstdio>
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
@ -19,7 +18,6 @@ using namespace leveldb;
struct DataPumpThread { struct DataPumpThread {
size_t no_records; size_t no_records;
DB* db; // Assumption DB is Open'ed already. DB* db; // Assumption DB is Open'ed already.
volatile bool is_running;
}; };
static std::string RandomString(Random* rnd, int len) { static std::string RandomString(Random* rnd, int len) {
@ -33,59 +31,50 @@ static void DataPumpThreadBody(void* arg) {
DB* db = t->db; DB* db = t->db;
Random rnd(301); Random rnd(301);
size_t i = 0; size_t i = 0;
t->is_running = true; while(i++ < t->no_records) {
while( i < t->no_records ) { if(!db->Put(WriteOptions(), Slice(RandomString(&rnd, 500)),
db->Put(WriteOptions(), Slice(RandomString(&rnd, 500))).ok()) {
Slice(RandomString(&rnd, 50)), fprintf(stderr, "Error in put\n");
Slice(RandomString(&rnd, 500))); exit(1);
++i; }
} }
t->is_running = false;
} }
struct ReplicationThread { struct ReplicationThread {
port::AtomicPointer stop; port::AtomicPointer stop;
DB* db; DB* db;
volatile SequenceNumber latest;
volatile size_t no_read; volatile size_t no_read;
volatile bool has_more;
}; };
static void ReplicationThreadBody(void* arg) { static void ReplicationThreadBody(void* arg) {
ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg); ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg);
DB* db = t->db; DB* db = t->db;
unique_ptr<TransactionLogIterator> iter; unique_ptr<TransactionLogIterator> iter;
SequenceNumber currentSeqNum = 0; SequenceNumber currentSeqNum = 1;
while (t->stop.Acquire_Load() != nullptr) { while (t->stop.Acquire_Load() != nullptr) {
if (!iter) { iter.reset();
db->GetUpdatesSince(currentSeqNum, &iter); Status s;
fprintf(stdout, "Refreshing iterator\n"); while(!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
iter->Next(); if (t->stop.Acquire_Load() == nullptr) {
while(iter->Valid()) { return;
BatchResult res = iter->GetBatch(); }
if (res.sequence != currentSeqNum +1 }
&& res.sequence != currentSeqNum) { fprintf(stderr, "Refreshing iterator\n");
fprintf(stderr, for(;iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {
"Missed a seq no. b/w %ld and %ld\n", BatchResult res = iter->GetBatch();
currentSeqNum, if (res.sequence != currentSeqNum) {
res.sequence); fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n", currentSeqNum,
exit(1); res.sequence);
} exit(1);
currentSeqNum = res.sequence;
t->latest = res.sequence;
iter->Next();
t->no_read++;
} }
} }
iter.reset();
} }
} }
int main(int argc, const char** argv) { int main(int argc, const char** argv) {
long FLAGS_num_inserts = 1000; uint64_t FLAGS_num_inserts = 1000;
long FLAGS_WAL_ttl_seconds = 1000; uint64_t FLAGS_WAL_ttl_seconds = 1000;
char junk; char junk;
long l; long l;
@ -108,36 +97,34 @@ int main(int argc, const char** argv) {
options.create_if_missing = true; options.create_if_missing = true;
options.WAL_ttl_seconds = FLAGS_WAL_ttl_seconds; options.WAL_ttl_seconds = FLAGS_WAL_ttl_seconds;
DB* db; DB* db;
DestroyDB(default_db_path, options);
Status s = DB::Open(options, default_db_path, &db); Status s = DB::Open(options, default_db_path, &db);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str()); fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str());
exit(1);
} }
DataPumpThread dataPump; DataPumpThread dataPump;
dataPump.no_records = FLAGS_num_inserts; dataPump.no_records = FLAGS_num_inserts;
dataPump.db = db; dataPump.db = db;
dataPump.is_running = true;
env->StartThread(DataPumpThreadBody, &dataPump); env->StartThread(DataPumpThreadBody, &dataPump);
ReplicationThread replThread; ReplicationThread replThread;
replThread.db = db; replThread.db = db;
replThread.no_read = 0; replThread.no_read = 0;
replThread.has_more = true;
replThread.stop.Release_Store(env); // store something to make it non-null. replThread.stop.Release_Store(env); // store something to make it non-null.
env->StartThread(ReplicationThreadBody, &replThread); env->StartThread(ReplicationThreadBody, &replThread);
while(dataPump.is_running) { while(replThread.no_read < FLAGS_num_inserts);
continue;
}
replThread.stop.Release_Store(nullptr); replThread.stop.Release_Store(nullptr);
if ( replThread.no_read < dataPump.no_records ) { if (replThread.no_read < dataPump.no_records) {
// no. read should be => than inserted. // no. read should be => than inserted.
fprintf(stderr, "No. of Record's written and read not same\nRead : %ld" fprintf(stderr, "No. of Record's written and read not same\nRead : %ld"
" Written : %ld", replThread.no_read, dataPump.no_records); " Written : %ld\n", replThread.no_read, dataPump.no_records);
exit(1); exit(1);
} }
fprintf(stderr, "Successful!\n");
exit(0); exit(0);
fprintf(stdout, "ALL IS FINE");
} }

@ -266,6 +266,14 @@ SequenceNumber DBWithTTL::GetLatestSequenceNumber() {
return db_->GetLatestSequenceNumber(); return db_->GetLatestSequenceNumber();
} }
Status DBWithTTL::GetSortedWalFiles(VectorLogPtr& files) {
return db_->GetSortedWalFiles(files);
}
Status DBWithTTL::DeleteWalFiles(const VectorLogPtr& files){
return db_->DeleteWalFiles(files);
}
Status DBWithTTL::GetUpdatesSince( Status DBWithTTL::GetUpdatesSince(
SequenceNumber seq_number, SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter) { unique_ptr<TransactionLogIterator>* iter) {

@ -77,6 +77,10 @@ class DBWithTTL : public StackableDB {
virtual Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs); virtual Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs);
virtual Status GetSortedWalFiles(VectorLogPtr& files);
virtual Status DeleteWalFiles(const VectorLogPtr& files);
virtual SequenceNumber GetLatestSequenceNumber(); virtual SequenceNumber GetLatestSequenceNumber();
virtual Status GetUpdatesSince(SequenceNumber seq_number, virtual Status GetUpdatesSince(SequenceNumber seq_number,

Loading…
Cancel
Save