diff --git a/db/perf_context_test.cc b/db/perf_context_test.cc index 1e3830636..b7efec182 100644 --- a/db/perf_context_test.cc +++ b/db/perf_context_test.cc @@ -841,8 +841,8 @@ TEST_F(PerfContextTest, CPUTimer) { ASSERT_EQ(value, "v0"); if (FLAGS_verbose) { - std::cout << "Get CPU time nanos: " - << get_perf_context()->get_cpu_nanos << "ns\n"; + std::cout << "Get CPU time nanos: " << get_perf_context()->get_cpu_nanos + << "ns\n"; } // Iter diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 4f55a30d3..f92d563eb 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -25,13 +25,13 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( options_(options), read_options_(read_options), soptions_(soptions), - startingSequenceNumber_(seq), + starting_sequence_number_(seq), files_(std::move(files)), started_(false), - isValid_(false), - currentFileIndex_(0), - currentBatchSeq_(0), - currentLastSeq_(0), + is_valid_(false), + current_file_index_(0), + current_batch_seq_(0), + current_last_seq_(0), versions_(versions), seq_per_batch_(seq_per_batch) { assert(files_ != nullptr); @@ -43,23 +43,23 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( } Status TransactionLogIteratorImpl::OpenLogFile( - const LogFile* logFile, + const LogFile* log_file, std::unique_ptr* file_reader) { Env* env = options_->env; std::unique_ptr file; std::string fname; Status s; EnvOptions optimized_env_options = env->OptimizeForLogRead(soptions_); - if (logFile->Type() == kArchivedLogFile) { - fname = ArchivedLogFileName(dir_, logFile->LogNumber()); + if (log_file->Type() == kArchivedLogFile) { + fname = ArchivedLogFileName(dir_, log_file->LogNumber()); s = env->NewSequentialFile(fname, &file, optimized_env_options); } else { - fname = LogFileName(dir_, logFile->LogNumber()); + fname = LogFileName(dir_, log_file->LogNumber()); s = env->NewSequentialFile(fname, &file, optimized_env_options); if (!s.ok()) { // If cannot open file in DB directory. // Try the archive dir, as it could have moved in the meanwhile. - fname = ArchivedLogFileName(dir_, logFile->LogNumber()); + fname = ArchivedLogFileName(dir_, log_file->LogNumber()); s = env->NewSequentialFile(fname, &file, optimized_env_options); } } @@ -70,45 +70,41 @@ Status TransactionLogIteratorImpl::OpenLogFile( } BatchResult TransactionLogIteratorImpl::GetBatch() { - assert(isValid_); // cannot call in a non valid state. + assert(is_valid_); // cannot call in a non valid state. BatchResult result; - result.sequence = currentBatchSeq_; - result.writeBatchPtr = std::move(currentBatch_); + result.sequence = current_batch_seq_; + result.writeBatchPtr = std::move(current_batch_); return result; } -Status TransactionLogIteratorImpl::status() { - return currentStatus_; -} +Status TransactionLogIteratorImpl::status() { return current_status_; } -bool TransactionLogIteratorImpl::Valid() { - return started_ && isValid_; -} +bool TransactionLogIteratorImpl::Valid() { return started_ && is_valid_; } bool TransactionLogIteratorImpl::RestrictedRead( Slice* record, std::string* scratch) { // Don't read if no more complete entries to read from logs - if (currentLastSeq_ >= versions_->LastSequence()) { + if (current_last_seq_ >= versions_->LastSequence()) { return false; } - return currentLogReader_->ReadRecord(record, scratch); + return current_log_reader_->ReadRecord(record, scratch); } -void TransactionLogIteratorImpl::SeekToStartSequence( - uint64_t startFileIndex, - bool strict) { +void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index, + bool strict) { std::string scratch; Slice record; started_ = false; - isValid_ = false; - if (files_->size() <= startFileIndex) { + is_valid_ = false; + if (files_->size() <= start_file_index) { return; } - Status s = OpenLogReader(files_->at(static_cast(startFileIndex)).get()); + Status s = + OpenLogReader(files_->at(static_cast(start_file_index)).get()); if (!s.ok()) { - currentStatus_ = s; - reporter_.Info(currentStatus_.ToString().c_str()); + current_status_ = s; + reporter_.Info(current_status_.ToString().c_str()); return; } while (RestrictedRead(&record, &scratch)) { @@ -118,21 +114,22 @@ void TransactionLogIteratorImpl::SeekToStartSequence( continue; } UpdateCurrentWriteBatch(record); - if (currentLastSeq_ >= startingSequenceNumber_) { - if (strict && currentBatchSeq_ != startingSequenceNumber_) { - currentStatus_ = Status::Corruption("Gap in sequence number. Could not " - "seek to required sequence number"); - reporter_.Info(currentStatus_.ToString().c_str()); + if (current_last_seq_ >= starting_sequence_number_) { + if (strict && current_batch_seq_ != starting_sequence_number_) { + current_status_ = Status::Corruption( + "Gap in sequence number. Could not " + "seek to required sequence number"); + reporter_.Info(current_status_.ToString().c_str()); return; } else if (strict) { reporter_.Info("Could seek required sequence number. Iterator will " "continue."); } - isValid_ = true; + is_valid_ = true; started_ = true; // set started_ as we could seek till starting sequence return; } else { - isValid_ = false; + is_valid_ = false; } } @@ -141,13 +138,15 @@ void TransactionLogIteratorImpl::SeekToStartSequence( // If strict is set, we want to seek exactly till the start sequence and it // should have been present in the file we scanned above if (strict) { - currentStatus_ = Status::Corruption("Gap in sequence number. Could not " - "seek to required sequence number"); - reporter_.Info(currentStatus_.ToString().c_str()); + current_status_ = Status::Corruption( + "Gap in sequence number. Could not " + "seek to required sequence number"); + reporter_.Info(current_status_.ToString().c_str()); } else if (files_->size() != 1) { - currentStatus_ = Status::Corruption("Start sequence was not found, " - "skipping to the next available"); - reporter_.Info(currentStatus_.ToString().c_str()); + current_status_ = Status::Corruption( + "Start sequence was not found, " + "skipping to the next available"); + reporter_.Info(current_status_.ToString().c_str()); // Let NextImpl find the next available entry. started_ remains false // because we don't want to check for gaps while moving to start sequence NextImpl(true); @@ -161,15 +160,15 @@ void TransactionLogIteratorImpl::Next() { void TransactionLogIteratorImpl::NextImpl(bool internal) { std::string scratch; Slice record; - isValid_ = false; + is_valid_ = false; if (!internal && !started_) { // Runs every time until we can seek to the start sequence return SeekToStartSequence(); } while(true) { - assert(currentLogReader_); - if (currentLogReader_->IsEOF()) { - currentLogReader_->UnmarkEOF(); + assert(current_log_reader_); + if (current_log_reader_->IsEOF()) { + current_log_reader_->UnmarkEOF(); } while (RestrictedRead(&record, &scratch)) { if (record.size() < WriteBatchInternal::kHeader) { @@ -190,20 +189,20 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { } // Open the next file - if (currentFileIndex_ < files_->size() - 1) { - ++currentFileIndex_; - Status s = OpenLogReader(files_->at(currentFileIndex_).get()); + if (current_file_index_ < files_->size() - 1) { + ++current_file_index_; + Status s = OpenLogReader(files_->at(current_file_index_).get()); if (!s.ok()) { - isValid_ = false; - currentStatus_ = s; + is_valid_ = false; + current_status_ = s; return; } } else { - isValid_ = false; - if (currentLastSeq_ == versions_->LastSequence()) { - currentStatus_ = Status::OK(); + is_valid_ = false; + if (current_last_seq_ == versions_->LastSequence()) { + current_status_ = Status::OK(); } else { - currentStatus_ = Status::Corruption("NO MORE DATA LEFT"); + current_status_ = Status::Corruption("NO MORE DATA LEFT"); } return; } @@ -211,17 +210,16 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { } bool TransactionLogIteratorImpl::IsBatchExpected( - const WriteBatch* batch, - const SequenceNumber expectedSeq) { + const WriteBatch* batch, const SequenceNumber expected_seq) { assert(batch); SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch); - if (batchSeq != expectedSeq) { + if (batchSeq != expected_seq) { char buf[200]; snprintf(buf, sizeof(buf), "Discontinuity in log records. Got seq=%" PRIu64 ", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64 ".Log iterator will reseek the correct batch.", - batchSeq, expectedSeq, versions_->LastSequence()); + batchSeq, expected_seq, versions_->LastSequence()); reporter_.Info(buf); return false; } @@ -232,25 +230,25 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { std::unique_ptr batch(new WriteBatch()); WriteBatchInternal::SetContents(batch.get(), record); - SequenceNumber expectedSeq = currentLastSeq_ + 1; + SequenceNumber expected_seq = current_last_seq_ + 1; // If the iterator has started, then confirm that we get continuous batches - if (started_ && !IsBatchExpected(batch.get(), expectedSeq)) { + if (started_ && !IsBatchExpected(batch.get(), expected_seq)) { // Seek to the batch having expected sequence number - if (expectedSeq < files_->at(currentFileIndex_)->StartSequence()) { + if (expected_seq < files_->at(current_file_index_)->StartSequence()) { // Expected batch must lie in the previous log file // Avoid underflow. - if (currentFileIndex_ != 0) { - currentFileIndex_--; + if (current_file_index_ != 0) { + current_file_index_--; } } - startingSequenceNumber_ = expectedSeq; + starting_sequence_number_ = expected_seq; // currentStatus_ will be set to Ok if reseek succeeds // Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode // that allows gaps in the WAL since it will still skip over the gap. - currentStatus_ = Status::NotFound("Gap in sequence numbers"); + current_status_ = Status::NotFound("Gap in sequence numbers"); // In seq_per_batch_ mode, gaps in the seq are possible so the strict mode // should be disabled - return SeekToStartSequence(currentFileIndex_, !seq_per_batch_); + return SeekToStartSequence(current_file_index_, !seq_per_batch_); } struct BatchCounter : public WriteBatch::Handler { @@ -289,33 +287,33 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { Status MarkRollback(const Slice&) override { return Status::OK(); } }; - currentBatchSeq_ = WriteBatchInternal::Sequence(batch.get()); + current_batch_seq_ = WriteBatchInternal::Sequence(batch.get()); if (seq_per_batch_) { - BatchCounter counter(currentBatchSeq_); + BatchCounter counter(current_batch_seq_); batch->Iterate(&counter); - currentLastSeq_ = counter.sequence_; + current_last_seq_ = counter.sequence_; } else { - currentLastSeq_ = - currentBatchSeq_ + WriteBatchInternal::Count(batch.get()) - 1; + current_last_seq_ = + current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1; } // currentBatchSeq_ can only change here - assert(currentLastSeq_ <= versions_->LastSequence()); + assert(current_last_seq_ <= versions_->LastSequence()); - currentBatch_ = std::move(batch); - isValid_ = true; - currentStatus_ = Status::OK(); + current_batch_ = std::move(batch); + is_valid_ = true; + current_status_ = Status::OK(); } -Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) { +Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) { std::unique_ptr file; - Status s = OpenLogFile(logFile, &file); + Status s = OpenLogFile(log_file, &file); if (!s.ok()) { return s; } assert(file); - currentLogReader_.reset( + current_log_reader_.reset( new log::Reader(options_->info_log, std::move(file), &reporter_, - read_options_.verify_checksums_, logFile->LogNumber())); + read_options_.verify_checksums_, log_file->LogNumber())); return Status::OK(); } } // namespace rocksdb diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index 3c27a8f37..6382b61a5 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -78,15 +78,15 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { const ImmutableDBOptions* options_; const TransactionLogIterator::ReadOptions read_options_; const EnvOptions& soptions_; - SequenceNumber startingSequenceNumber_; + SequenceNumber starting_sequence_number_; std::unique_ptr files_; bool started_; - bool isValid_; // not valid when it starts of. - Status currentStatus_; - size_t currentFileIndex_; - std::unique_ptr currentBatch_; - std::unique_ptr currentLogReader_; - Status OpenLogFile(const LogFile* logFile, + bool is_valid_; // not valid when it starts of. + Status current_status_; + size_t current_file_index_; + std::unique_ptr current_batch_; + std::unique_ptr current_log_reader_; + Status OpenLogFile(const LogFile* log_file, std::unique_ptr* file); struct LogReporter : public log::Reader::Reporter { @@ -99,8 +99,9 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { virtual void Info(const char* s) { ROCKS_LOG_INFO(info_log, "%s", s); } } reporter_; - SequenceNumber currentBatchSeq_; // sequence number at start of current batch - SequenceNumber currentLastSeq_; // last sequence in the current batch + SequenceNumber + current_batch_seq_; // sequence number at start of current batch + SequenceNumber current_last_seq_; // last sequence in the current batch // Used only to get latest seq. num // TODO(icanadi) can this be just a callback? VersionSet const* const versions_; @@ -109,14 +110,14 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { bool RestrictedRead(Slice* record, std::string* scratch); // Seeks to startingSequenceNumber reading from startFileIndex in files_. // If strict is set,then must get a batch starting with startingSequenceNumber - void SeekToStartSequence(uint64_t startFileIndex = 0, bool strict = false); + void SeekToStartSequence(uint64_t start_file_index = 0, bool strict = false); // Implementation of Next. SeekToStartSequence calls it internally with // internal=true to let it find next entry even if it has to jump gaps because // the iterator may start off from the first available entry but promises to // be continuous after that void NextImpl(bool internal = false); // Check if batch is expected, else return false - bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expectedSeq); + bool IsBatchExpected(const WriteBatch* batch, SequenceNumber expected_seq); // Update current batch if a continuous batch is found, else return false void UpdateCurrentWriteBatch(const Slice& record); Status OpenLogReader(const LogFile* file);