diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 5fd2af861..ae64392e5 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -2,10 +2,11 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -#include "db/db_impl.h" -#include "db/filename.h" +#include #include #include +#include "db/db_impl.h" +#include "db/filename.h" #include "db/version_set.h" #include "leveldb/db.h" #include "leveldb/env.h" @@ -66,4 +67,47 @@ Status DBImpl::GetLiveFiles(std::vector& ret, return Status::OK(); } +Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) { + // First get sorted files in archive dir, then append sorted files from main + // dir to maintain sorted order + + // list wal files in archive dir. + Status s; + std::string archivedir = ArchivalDirectory(dbname_); + if (env_->FileExists(archivedir)) { + s = AppendSortedWalsOfType(archivedir, files, kArchivedLogFile); + if (!s.ok()) { + return s; + } + } + // list wal files in main db dir. + s = AppendSortedWalsOfType(dbname_, files, kAliveLogFile); + if (!s.ok()) { + return s; + } + return s; +} + +Status DBImpl::DeleteWalFiles(const VectorLogPtr& files) { + Status s; + std::string archivedir = ArchivalDirectory(dbname_); + std::string files_not_deleted; + for (const auto& wal : files) { + /* Try deleting in archive dir. If fails, try deleting in main db dir. + * This is efficient because all except for very few wal files will be in + * archive. Checking for WalType is not much helpful because alive wal could + be archived now. + */ + if (!env_->DeleteFile(archivedir + "/" + wal->Filename()).ok() && + !env_->DeleteFile(dbname_ + "/" + wal->Filename()).ok()) { + files_not_deleted.append(wal->Filename()); + } + } + if (!files_not_deleted.empty()) { + return Status::IOError("Deleted all requested files except: " + + files_not_deleted); + } + return Status::OK(); +} + } diff --git a/db/db_impl.cc b/db/db_impl.cc index f2351037b..1326148bb 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -27,7 +27,7 @@ #include "db/table_cache.h" #include "db/version_set.h" #include "db/write_batch_internal.h" -#include "db/transaction_log_iterator_impl.h" +#include "db/transaction_log_impl.h" #include "leveldb/compaction_filter.h" #include "leveldb/db.h" #include "leveldb/env.h" @@ -1046,34 +1046,24 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() { Status DBImpl::GetUpdatesSince(SequenceNumber seq, unique_ptr* iter) { - // Get All Log Files. - // Sort Files - // Get the first entry from each file. + if (seq > last_flushed_sequence_) { + return Status::IOError("Requested sequence not yet written in the db"); + } + // Get all sorted Wal Files. // Do binary search and open files and find the seq number. - std::vector walFiles; - // list wal files in main db dir. - Status s = ListAllWALFiles(dbname_, &walFiles, kAliveLogFile); + std::unique_ptr wal_files(new VectorLogPtr); + Status s = GetSortedWalFiles(*wal_files); if (!s.ok()) { return s; } - // list wal files in archive dir. - std::string archivedir = ArchivalDirectory(dbname_); - if (env_->FileExists(archivedir)) { - s = ListAllWALFiles(archivedir, &walFiles, kArchivedLogFile); - if (!s.ok()) { - return s; - } - } - if (walFiles.empty()) { + if (wal_files->empty()) { return Status::IOError(" NO WAL Files present in the db"); } // std::shared_ptr would have been useful here. - std::unique_ptr> probableWALFiles( - new std::vector()); - s = FindProbableWALFiles(&walFiles, probableWALFiles.get(), seq); + s = RetainProbableWalFiles(*wal_files, seq); if (!s.ok()) { return s; } @@ -1082,90 +1072,61 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq, &options_, storage_options_, seq, - std::move(probableWALFiles), + std::move(wal_files), &last_flushed_sequence_)); iter->get()->Next(); return iter->get()->status(); } -Status DBImpl::FindProbableWALFiles(std::vector* const allLogs, - std::vector* const result, - const SequenceNumber target) { - assert(allLogs != nullptr); - assert(result != nullptr); - - std::sort(allLogs->begin(), allLogs->end()); +Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs, + const SequenceNumber target) { long start = 0; // signed to avoid overflow when target is < first file. - long end = static_cast(allLogs->size()) - 1; + long end = static_cast(all_logs.size()) - 1; // Binary Search. avoid opening all files. while (end >= start) { long mid = start + (end - start) / 2; // Avoid overflow. - WriteBatch batch; - Status s = ReadFirstRecord(allLogs->at(mid), &batch); - if (!s.ok()) { - if (CheckFileExistsAndEmpty(allLogs->at(mid))) { - allLogs->erase(allLogs->begin() + mid); - --end; - continue; - } - return s; - } - SequenceNumber currentSeqNum = WriteBatchInternal::Sequence(&batch); - if (currentSeqNum == target) { - start = mid; + SequenceNumber current_seq_num = all_logs.at(mid)->StartSequence(); + if (current_seq_num == target) { end = mid; break; - } else if (currentSeqNum < target) { + } else if (current_seq_num < target) { start = mid + 1; } else { end = mid - 1; } } - size_t startIndex = std::max(0l, end); // end could be -ve. - for (size_t i = startIndex; i < allLogs->size(); ++i) { - result->push_back(allLogs->at(i)); - } - if (result->empty()) { - return Status::IOError( - "No probable files. Check if the db contains log files"); - } + size_t start_index = std::max(0l, end); // end could be -ve. + // The last wal file is always included + all_logs.erase(all_logs.begin(), all_logs.begin() + start_index); return Status::OK(); } -bool DBImpl::CheckFileExistsAndEmpty(const LogFile& file) { - if (file.type == kAliveLogFile) { - const std::string fname = LogFileName(dbname_, file.logNumber); - uint64_t file_size; - Status s = env_->GetFileSize(fname, &file_size); - if (s.ok() && file_size == 0) { - return true; - } - } - const std::string fname = ArchivedLogFileName(dbname_, file.logNumber); +bool DBImpl::CheckWalFileExistsAndEmpty(const WalFileType type, + const uint64_t number) { + const std::string fname = (type == kAliveLogFile) ? + LogFileName(dbname_, number) : ArchivedLogFileName(dbname_, number); uint64_t file_size; Status s = env_->GetFileSize(fname, &file_size); - if (s.ok() && file_size == 0) { - return true; - } - return false; + return (s.ok() && (file_size == 0)); } -Status DBImpl::ReadFirstRecord(const LogFile& file, WriteBatch* const result) { +Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number, + WriteBatch* const result) { - if (file.type == kAliveLogFile) { - std::string fname = LogFileName(dbname_, file.logNumber); + if (type == kAliveLogFile) { + std::string fname = LogFileName(dbname_, number); Status status = ReadFirstLine(fname, result); if (!status.ok()) { // check if the file got moved to archive. - std::string archivedFile = ArchivedLogFileName(dbname_, file.logNumber); - Status s = ReadFirstLine(archivedFile, result); + std::string archived_file = ArchivedLogFileName(dbname_, number); + Status s = ReadFirstLine(archived_file, result); if (!s.ok()) { - return Status::IOError("Log File Has been deleted"); + return Status::IOError("Log File has been deleted"); } } return Status::OK(); - } else if (file.type == kArchivedLogFile) { - std::string fname = ArchivedLogFileName(dbname_, file.logNumber); + } else if (type == kArchivedLogFile) { + std::string fname = ArchivedLogFileName(dbname_, number); Status status = ReadFirstLine(fname, result); return status; } @@ -1204,6 +1165,7 @@ Status DBImpl::ReadFirstLine(const std::string& fname, 0/*initial_offset*/); std::string scratch; Slice record; + if (reader.ReadRecord(&record, &scratch) && status.ok()) { if (record.size() < 12) { reporter.Corruption( @@ -1217,22 +1179,49 @@ Status DBImpl::ReadFirstLine(const std::string& fname, return Status::IOError("Error reading from file " + fname); } -Status DBImpl::ListAllWALFiles(const std::string& path, - std::vector* const logFiles, - WalFileType logType) { - assert(logFiles != nullptr); - std::vector allFiles; - const Status status = env_->GetChildren(path, &allFiles); +struct CompareLogByPointer { + bool operator() (const unique_ptr& a, + const unique_ptr& b) { + LogFileImpl* a_impl = dynamic_cast(a.get()); + LogFileImpl* b_impl = dynamic_cast(b.get()); + return *a_impl < *b_impl; + } +}; + +Status DBImpl::AppendSortedWalsOfType(const std::string& path, + VectorLogPtr& log_files, WalFileType log_type) { + std::vector all_files; + const Status status = env_->GetChildren(path, &all_files); if (!status.ok()) { return status; } - for (const auto& f : allFiles) { + log_files.reserve(log_files.size() + all_files.size()); + for (const auto& f : all_files) { uint64_t number; FileType type; if (ParseFileName(f, &number, &type) && type == kLogFile){ - logFiles->push_back(LogFile(number, logType)); + + WriteBatch batch; + Status s = ReadFirstRecord(log_type, number, &batch); + if (!s.ok()) { + if (CheckWalFileExistsAndEmpty(log_type, number)) { + continue; + } + return s; + } + + uint64_t size_bytes; + s = env_->GetFileSize(LogFileName(path, number), &size_bytes); + if (!s.ok()) { + return s; + } + + log_files.push_back(std::move(unique_ptr(new LogFileImpl( + number, log_type, WriteBatchInternal::Sequence(&batch), size_bytes)))); } } + CompareLogByPointer compare_log_files; + std::sort(log_files.begin(), log_files.end(), compare_log_files); return status; } diff --git a/db/db_impl.h b/db/db_impl.h index c7f90b3cc..cea8041e4 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -10,15 +10,15 @@ #include #include #include "db/dbformat.h" -#include "db/log_file.h" #include "db/log_writer.h" #include "db/snapshot.h" #include "leveldb/db.h" #include "leveldb/env.h" +#include "leveldb/memtablerep.h" +#include "leveldb/transaction_log.h" #include "port/port.h" #include "util/stats_logger.h" #include "memtablelist.h" -#include "leveldb/memtablerep.h" #ifdef USE_SCRIBE #include "scribe/scribe_logger.h" @@ -73,6 +73,8 @@ class DBImpl : public DB { virtual Status EnableFileDeletions(); virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size); + virtual Status GetSortedWalFiles(VectorLogPtr& files); + virtual Status DeleteWalFiles(const VectorLogPtr& files); virtual SequenceNumber GetLatestSequenceNumber(); virtual Status GetUpdatesSince(SequenceNumber seq_number, unique_ptr* iter); @@ -183,7 +185,7 @@ class DBImpl : public DB { void MaybeScheduleCompaction(); static void BGWork(void* db); void BackgroundCall(); - Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state); + Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state); void CleanupCompaction(CompactionState* compact); Status DoCompactionWork(CompactionState* compact); @@ -208,19 +210,21 @@ class DBImpl : public DB { void PurgeObsoleteWALFiles(); - Status ListAllWALFiles(const std::string& path, - std::vector* logFiles, - WalFileType type); + Status AppendSortedWalsOfType(const std::string& path, + VectorLogPtr& log_files, + WalFileType type); - // Find's all the log files which contain updates with seq no. - // Greater Than or Equal to the requested SequenceNumber - Status FindProbableWALFiles(std::vector* const allLogs, - std::vector* const result, - const SequenceNumber target); + // Requires: all_logs should be sorted with earliest log file first + // Retains all log files in all_logs which contain updates with seq no. + // Greater Than or Equal to the requested SequenceNumber. + Status RetainProbableWalFiles(VectorLogPtr& all_logs, + const SequenceNumber target); // return true if - bool CheckFileExistsAndEmpty(const LogFile& file); + bool CheckWalFileExistsAndEmpty(const WalFileType type, + const uint64_t number); - Status ReadFirstRecord(const LogFile& file, WriteBatch* const result); + Status ReadFirstRecord(const WalFileType type, const uint64_t number, + WriteBatch* const result); Status ReadFirstLine(const std::string& fname, WriteBatch* const batch); diff --git a/db/db_test.cc b/db/db_test.cc index 8cd1dda6b..a087edf2c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3461,6 +3461,14 @@ class ModelDB: public DB { return Status::OK(); } + virtual Status GetSortedWalFiles(VectorLogPtr& files) { + return Status::OK(); + } + + virtual Status DeleteWalFiles(const VectorLogPtr& files) { + return Status::OK(); + } + virtual SequenceNumber GetLatestSequenceNumber() { return 0; } diff --git a/db/filename.cc b/db/filename.cc index 81a142153..794410cf3 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -46,10 +46,13 @@ extern Status WriteStringToFileSync(Env* env, const Slice& data, static std::string MakeFileName(const std::string& name, uint64_t number, const char* suffix) { char buf[100]; - snprintf(buf, sizeof(buf), "/%06llu.%s", + snprintf(buf, sizeof(buf), "%06llu.%s", static_cast(number), suffix); - return name + buf; + if (name.empty()) { + return buf; + } + return name + "/" + buf; } std::string LogFileName(const std::string& name, uint64_t number) { diff --git a/db/log_file.h b/db/log_file.h deleted file mode 100644 index da7808e65..000000000 --- a/db/log_file.h +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2008-present Facebook. All Rights Reserved. - -#ifndef STORAGE_LEVELDB_DB_LOG_FILE_H_ -#define STORAGE_LEVELDB_DB_LOG_FILE_H_ - -namespace leveldb { - -enum WalFileType { - kArchivedLogFile = 0, - kAliveLogFile = 1 -} ; - -class LogFile { - - public: - uint64_t logNumber; - WalFileType type; - - LogFile(uint64_t logNum,WalFileType logType) : - logNumber(logNum), - type(logType) {} - - LogFile(const LogFile& that) { - logNumber = that.logNumber; - type = that.type; - } - - bool operator < (const LogFile& that) const { - return logNumber < that.logNumber; - } - - std::string ToString() const { - char response[100]; - const char* typeOfLog; - if (type == kAliveLogFile) { - typeOfLog = "Alive Log"; - } else { - typeOfLog = "Archived Log"; - } - sprintf(response, - "LogNumber : %ld LogType : %s", - logNumber, - typeOfLog); - return std::string(response); - } -}; -} // namespace leveldb -#endif // STORAGE_LEVELDB_DB_LOG_FILE_H_ diff --git a/db/transaction_log_iterator_impl.cc b/db/transaction_log_impl.cc similarity index 81% rename from db/transaction_log_iterator_impl.cc rename to db/transaction_log_impl.cc index f8e4f8b26..6de0bd9cf 100644 --- a/db/transaction_log_iterator_impl.cc +++ b/db/transaction_log_impl.cc @@ -1,6 +1,5 @@ -#include "db/transaction_log_iterator_impl.h" +#include "db/transaction_log_impl.h" #include "db/write_batch_internal.h" -#include "db/filename.h" namespace leveldb { @@ -8,39 +7,39 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( const std::string& dbname, const Options* options, const EnvOptions& soptions, - SequenceNumber& seq, - std::unique_ptr> files, + const SequenceNumber seq, + std::unique_ptr files, SequenceNumber const * const lastFlushedSequence) : - dbname_(dbname), - options_(options), - soptions_(soptions), - startingSequenceNumber_(seq), - files_(std::move(files)), - started_(false), - isValid_(false), - currentFileIndex_(0), - lastFlushedSequence_(lastFlushedSequence) { + dbname_(dbname), + options_(options), + soptions_(soptions), + startingSequenceNumber_(seq), + files_(std::move(files)), + started_(false), + isValid_(false), + currentFileIndex_(0), + lastFlushedSequence_(lastFlushedSequence) { + assert(startingSequenceNumber_ <= *lastFlushedSequence_); assert(files_.get() != nullptr); - assert(lastFlushedSequence_); reporter_.env = options_->env; reporter_.info_log = options_->info_log.get(); } Status TransactionLogIteratorImpl::OpenLogFile( - const LogFile& logFile, + const LogFile* logFile, unique_ptr* file) { Env* env = options_->env; - if (logFile.type == kArchivedLogFile) { - std::string fname = ArchivedLogFileName(dbname_, logFile.logNumber); + if (logFile->Type() == kArchivedLogFile) { + std::string fname = ArchivedLogFileName(dbname_, logFile->LogNumber()); return env->NewSequentialFile(fname, file, soptions_); } else { - std::string fname = LogFileName(dbname_, logFile.logNumber); + std::string fname = LogFileName(dbname_, logFile->LogNumber()); Status status = env->NewSequentialFile(fname, file, soptions_); if (!status.ok()) { // If cannot open file in DB directory. // Try the archive dir, as it could have moved in the meanwhile. - fname = ArchivedLogFileName(dbname_, logFile.logNumber); + fname = ArchivedLogFileName(dbname_, logFile->LogNumber()); status = env->NewSequentialFile(fname, file, soptions_); if (!status.ok()) { return Status::IOError(" Requested file not present in the dir"); @@ -67,7 +66,7 @@ bool TransactionLogIteratorImpl::Valid() { } void TransactionLogIteratorImpl::Next() { - LogFile currentLogFile = files_.get()->at(currentFileIndex_); + LogFile* currentLogFile = files_.get()->at(currentFileIndex_).get(); // First seek to the given seqNo. in the current file. std::string scratch; @@ -129,7 +128,7 @@ void TransactionLogIteratorImpl::Next() { if (openNextFile) { if (currentFileIndex_ < files_.get()->size() - 1) { ++currentFileIndex_; - Status status = OpenLogReader(files_.get()->at(currentFileIndex_)); + Status status =OpenLogReader(files_.get()->at(currentFileIndex_).get()); if (!status.ok()) { isValid_ = false; currentStatus_ = status; @@ -157,7 +156,7 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { currentStatus_ = Status::OK(); } -Status TransactionLogIteratorImpl::OpenLogReader(const LogFile& logFile) { +Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) { unique_ptr file; Status status = OpenLogFile(logFile, &file); if (!status.ok()) { diff --git a/db/transaction_log_iterator_impl.h b/db/transaction_log_impl.h similarity index 58% rename from db/transaction_log_iterator_impl.h rename to db/transaction_log_impl.h index faf07f43f..27ab1936c 100644 --- a/db/transaction_log_iterator_impl.h +++ b/db/transaction_log_impl.h @@ -7,9 +7,9 @@ #include "leveldb/env.h" #include "leveldb/options.h" #include "leveldb/types.h" -#include "leveldb/transaction_log_iterator.h" -#include "db/log_file.h" +#include "leveldb/transaction_log.h" #include "db/log_reader.h" +#include "db/filename.h" namespace leveldb { @@ -21,13 +21,45 @@ struct LogReporter : public log::Reader::Reporter { } }; +class LogFileImpl : public LogFile { + public: + LogFileImpl(uint64_t logNum, WalFileType logType, SequenceNumber startSeq, + uint64_t sizeBytes) : + logNumber_(logNum), + type_(logType), + startSequence_(startSeq), + sizeFileBytes_(sizeBytes) { + } + + std::string Filename() const { return LogFileName("", logNumber_); } + + uint64_t LogNumber() const { return logNumber_; } + + WalFileType Type() const { return type_; } + + SequenceNumber StartSequence() const { return startSequence_; } + + uint64_t SizeFileBytes() const { return sizeFileBytes_; } + + bool operator < (const LogFile& that) const { + return LogNumber() < that.LogNumber(); + } + + private: + uint64_t logNumber_; + WalFileType type_; + SequenceNumber startSequence_; + uint64_t sizeFileBytes_; + +}; + class TransactionLogIteratorImpl : public TransactionLogIterator { public: TransactionLogIteratorImpl(const std::string& dbname, const Options* options, const EnvOptions& soptions, - SequenceNumber& seqNum, - std::unique_ptr> files, + const SequenceNumber seqNum, + std::unique_ptr files, SequenceNumber const * const lastFlushedSequence); virtual bool Valid(); @@ -42,22 +74,22 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { const std::string& dbname_; const Options* options_; const EnvOptions& soptions_; - const uint64_t startingSequenceNumber_; - std::unique_ptr> files_; + const SequenceNumber startingSequenceNumber_; + std::unique_ptr files_; bool started_; bool isValid_; // not valid when it starts of. Status currentStatus_; size_t currentFileIndex_; std::unique_ptr currentBatch_; unique_ptr currentLogReader_; - Status OpenLogFile(const LogFile& logFile, unique_ptr* file); + Status OpenLogFile(const LogFile* logFile, unique_ptr* file); LogReporter reporter_; SequenceNumber const * const lastFlushedSequence_; // represents the sequence number being read currently. SequenceNumber currentSequence_; void UpdateCurrentWriteBatch(const Slice& record); - Status OpenLogReader(const LogFile& file); + Status OpenLogReader(const LogFile* file); }; diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 0c056c362..d0316ee4f 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -12,7 +12,7 @@ #include "leveldb/iterator.h" #include "leveldb/options.h" #include "leveldb/types.h" -#include "leveldb/transaction_log_iterator.h" +#include "leveldb/transaction_log.h" namespace leveldb { @@ -232,6 +232,14 @@ class DB { virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size) = 0; + // Retrieve the sorted list of all wal files with earliest file first + virtual Status GetSortedWalFiles(VectorLogPtr& files) = 0; + + // Delete wal files in files. These can be either live or archived. + // Returns Status::OK if all files could be deleted, otherwise Status::IOError + // which contains information about files that could not be deleted. + virtual Status DeleteWalFiles(const VectorLogPtr& files) = 0; + // The sequence number of the most recent transaction. virtual SequenceNumber GetLatestSequenceNumber() = 0; diff --git a/include/leveldb/transaction_log_iterator.h b/include/leveldb/transaction_log.h similarity index 54% rename from include/leveldb/transaction_log_iterator.h rename to include/leveldb/transaction_log.h index d755d829c..a834afa86 100644 --- a/include/leveldb/transaction_log_iterator.h +++ b/include/leveldb/transaction_log.h @@ -3,10 +3,46 @@ #define STORAGE_LEVELDB_INCLUDE_TRANSACTION_LOG_ITERATOR_H_ #include "leveldb/status.h" +#include "leveldb/types.h" #include "leveldb/write_batch.h" namespace leveldb { +class LogFile; +typedef std::vector> VectorLogPtr; + +enum WalFileType { + /* Indicates that WAL file is in archive directory. WAL files are moved from + * the main db directory to archive directory once they are not live and stay + * there for a duration of WAL_ttl_seconds which can be set in Options + */ + kArchivedLogFile = 0, + + /* Indicates that WAL file is live and resides in the main db directory */ + kAliveLogFile = 1 +} ; + +class LogFile { + public: + LogFile() {} + virtual ~LogFile() {} + + // Returns log file's name excluding the db path + virtual std::string Filename() const = 0; + + // Primary identifier for log file. + // This is directly proportional to creation time of the log file + virtual uint64_t LogNumber() const = 0; + + // Log file can be either alive or archived + virtual WalFileType Type() const = 0; + + // Starting sequence number of writebatch written in this log file + virtual SequenceNumber StartSequence() const = 0; + + // Size of log file on disk in Bytes + virtual uint64_t SizeFileBytes() const = 0; +}; struct BatchResult { SequenceNumber sequence; diff --git a/include/leveldb/types.h b/include/leveldb/types.h index 6ec058c2e..1d9109461 100644 --- a/include/leveldb/types.h +++ b/include/leveldb/types.h @@ -7,7 +7,7 @@ namespace leveldb { // Define all public custom types here. -// Represents a sequence number in a WAL file. +// Represents a sequence number in a WAL file. typedef uint64_t SequenceNumber; } // namespace leveldb diff --git a/include/utilities/stackable_db.h b/include/utilities/stackable_db.h index 916496e04..08930d194 100644 --- a/include/utilities/stackable_db.h +++ b/include/utilities/stackable_db.h @@ -139,6 +139,15 @@ class StackableDB : public DB { return sdb_->GetLatestSequenceNumber(); } + virtual Status GetSortedWalFiles(VectorLogPtr& files) override { + return sdb_->GetSortedWalFiles(files); + } + + virtual Status DeleteWalFiles(const VectorLogPtr& files) + override{ + return sdb_->DeleteWalFiles(files); + } + virtual Status GetUpdatesSince(SequenceNumber seq_number, unique_ptr* iter) override { diff --git a/tools/db_repl_stress.cc b/tools/db_repl_stress.cc index 012a3df01..010fa573d 100644 --- a/tools/db_repl_stress.cc +++ b/tools/db_repl_stress.cc @@ -1,4 +1,3 @@ - #include #include "db/write_batch_internal.h" @@ -19,7 +18,6 @@ using namespace leveldb; struct DataPumpThread { size_t no_records; DB* db; // Assumption DB is Open'ed already. - volatile bool is_running; }; static std::string RandomString(Random* rnd, int len) { @@ -33,59 +31,50 @@ static void DataPumpThreadBody(void* arg) { DB* db = t->db; Random rnd(301); size_t i = 0; - t->is_running = true; - while( i < t->no_records ) { - db->Put(WriteOptions(), - Slice(RandomString(&rnd, 50)), - Slice(RandomString(&rnd, 500))); - ++i; + while(i++ < t->no_records) { + if(!db->Put(WriteOptions(), Slice(RandomString(&rnd, 500)), + Slice(RandomString(&rnd, 500))).ok()) { + fprintf(stderr, "Error in put\n"); + exit(1); + } } - t->is_running = false; } struct ReplicationThread { port::AtomicPointer stop; DB* db; - volatile SequenceNumber latest; volatile size_t no_read; - volatile bool has_more; }; static void ReplicationThreadBody(void* arg) { ReplicationThread* t = reinterpret_cast(arg); DB* db = t->db; unique_ptr iter; - SequenceNumber currentSeqNum = 0; + SequenceNumber currentSeqNum = 1; while (t->stop.Acquire_Load() != nullptr) { - if (!iter) { - db->GetUpdatesSince(currentSeqNum, &iter); - fprintf(stdout, "Refreshing iterator\n"); - iter->Next(); - while(iter->Valid()) { - BatchResult res = iter->GetBatch(); - if (res.sequence != currentSeqNum +1 - && res.sequence != currentSeqNum) { - fprintf(stderr, - "Missed a seq no. b/w %ld and %ld\n", - currentSeqNum, - res.sequence); - exit(1); - } - currentSeqNum = res.sequence; - t->latest = res.sequence; - iter->Next(); - t->no_read++; + iter.reset(); + Status s; + while(!db->GetUpdatesSince(currentSeqNum, &iter).ok()) { + if (t->stop.Acquire_Load() == nullptr) { + return; + } + } + fprintf(stderr, "Refreshing iterator\n"); + for(;iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) { + BatchResult res = iter->GetBatch(); + if (res.sequence != currentSeqNum) { + fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n", currentSeqNum, + res.sequence); + exit(1); } } - iter.reset(); } } - int main(int argc, const char** argv) { - long FLAGS_num_inserts = 1000; - long FLAGS_WAL_ttl_seconds = 1000; + uint64_t FLAGS_num_inserts = 1000; + uint64_t FLAGS_WAL_ttl_seconds = 1000; char junk; long l; @@ -108,36 +97,34 @@ int main(int argc, const char** argv) { options.create_if_missing = true; options.WAL_ttl_seconds = FLAGS_WAL_ttl_seconds; DB* db; + DestroyDB(default_db_path, options); Status s = DB::Open(options, default_db_path, &db); if (!s.ok()) { fprintf(stderr, "Could not open DB due to %s\n", s.ToString().c_str()); + exit(1); } DataPumpThread dataPump; dataPump.no_records = FLAGS_num_inserts; dataPump.db = db; - dataPump.is_running = true; env->StartThread(DataPumpThreadBody, &dataPump); ReplicationThread replThread; replThread.db = db; replThread.no_read = 0; - replThread.has_more = true; replThread.stop.Release_Store(env); // store something to make it non-null. env->StartThread(ReplicationThreadBody, &replThread); - while(dataPump.is_running) { - continue; - } + while(replThread.no_read < FLAGS_num_inserts); replThread.stop.Release_Store(nullptr); - if ( replThread.no_read < dataPump.no_records ) { + if (replThread.no_read < dataPump.no_records) { // no. read should be => than inserted. fprintf(stderr, "No. of Record's written and read not same\nRead : %ld" - " Written : %ld", replThread.no_read, dataPump.no_records); + " Written : %ld\n", replThread.no_read, dataPump.no_records); exit(1); } + fprintf(stderr, "Successful!\n"); exit(0); - fprintf(stdout, "ALL IS FINE"); } diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index b6235ba4a..9babecd0f 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -266,6 +266,14 @@ SequenceNumber DBWithTTL::GetLatestSequenceNumber() { return db_->GetLatestSequenceNumber(); } +Status DBWithTTL::GetSortedWalFiles(VectorLogPtr& files) { + return db_->GetSortedWalFiles(files); +} + +Status DBWithTTL::DeleteWalFiles(const VectorLogPtr& files){ + return db_->DeleteWalFiles(files); +} + Status DBWithTTL::GetUpdatesSince( SequenceNumber seq_number, unique_ptr* iter) { diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 1285b448e..dbba5c63a 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -77,6 +77,10 @@ class DBWithTTL : public StackableDB { virtual Status GetLiveFiles(std::vector& vec, uint64_t* mfs); + virtual Status GetSortedWalFiles(VectorLogPtr& files); + + virtual Status DeleteWalFiles(const VectorLogPtr& files); + virtual SequenceNumber GetLatestSequenceNumber(); virtual Status GetUpdatesSince(SequenceNumber seq_number,