diff --git a/db/db_impl.cc b/db/db_impl.cc index 3aa2a2256..c0c4fe190 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -831,6 +831,9 @@ void DBImpl::PurgeObsoleteWALFiles() { Log(options_.info_log, "Can't delete file: %s: %s", file_path.c_str(), s.ToString().c_str()); continue; + } else { + MutexLock l(&read_first_record_cache_mutex_); + read_first_record_cache_.erase(number); } continue; } @@ -853,6 +856,9 @@ void DBImpl::PurgeObsoleteWALFiles() { Log(options_.info_log, "Can't delete file: %s: %s", file_path.c_str(), s.ToString().c_str()); continue; + } else { + MutexLock l(&read_first_record_cache_mutex_); + read_first_record_cache_.erase(number); } } } @@ -887,6 +893,9 @@ void DBImpl::PurgeObsoleteWALFiles() { Log(options_.info_log, "Can't delete file: %s: %s", file_path.c_str(), s.ToString().c_str()); continue; + } else { + MutexLock l(&read_first_record_cache_mutex_); + read_first_record_cache_.erase(archived_logs[i]->LogNumber()); } } } @@ -914,14 +923,15 @@ Status DBImpl::GetSortedWalsOfType(const std::string& path, uint64_t number; FileType type; if (ParseFileName(f, &number, &type) && type == kLogFile) { - WriteBatch batch; - Status s = ReadFirstRecord(log_type, number, &batch); + SequenceNumber sequence; + Status s = ReadFirstRecord(log_type, number, &sequence); if (!s.ok()) { - if (CheckWalFileExistsAndEmpty(log_type, number)) { - continue; - } return s; } + if (sequence == 0) { + // empty file + continue; + } uint64_t size_bytes; s = env_->GetFileSize(LogFileName(path, number), &size_bytes); @@ -930,8 +940,7 @@ Status DBImpl::GetSortedWalsOfType(const std::string& path, } log_files.push_back(std::move(unique_ptr( - new LogFileImpl(number, log_type, - WriteBatchInternal::Sequence(&batch), size_bytes)))); + new LogFileImpl(number, log_type, sequence, size_bytes)))); } } CompareLogByPointer compare_log_files; @@ -963,43 +972,46 @@ Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs, return Status::OK(); } -bool DBImpl::CheckWalFileExistsAndEmpty(const WalFileType type, - const uint64_t number) { - const std::string fname = (type == kAliveLogFile) - ? LogFileName(options_.wal_dir, number) - : ArchivedLogFileName(options_.wal_dir, number); - uint64_t file_size; - Status s = env_->GetFileSize(fname, &file_size); - return (s.ok() && (file_size == 0)); -} - Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number, - WriteBatch* const result) { + SequenceNumber* sequence) { + if (type != kAliveLogFile && type != kArchivedLogFile) { + return Status::NotSupported("File Type Not Known " + std::to_string(type)); + } + { + MutexLock l(&read_first_record_cache_mutex_); + auto itr = read_first_record_cache_.find(number); + if (itr != read_first_record_cache_.end()) { + *sequence = itr->second; + return Status::OK(); + } + } + Status s; if (type == kAliveLogFile) { std::string fname = LogFileName(options_.wal_dir, number); - Status status = ReadFirstLine(fname, result); - if (status.ok() || env_->FileExists(fname)) { - // return OK or any error that is not caused non-existing file - return status; + s = ReadFirstLine(fname, sequence); + if (env_->FileExists(fname) && !s.ok()) { + // return any error that is not caused by non-existing file + return s; } + } + if (type == kArchivedLogFile || !s.ok()) { // check if the file got moved to archive. std::string archived_file = ArchivedLogFileName(options_.wal_dir, number); - Status s = ReadFirstLine(archived_file, result); - if (s.ok() || env_->FileExists(archived_file)) { - return s; - } - return Status::NotFound("Log File has been deleted: " + archived_file); - } else if (type == kArchivedLogFile) { - std::string fname = ArchivedLogFileName(options_.wal_dir, number); - Status status = ReadFirstLine(fname, result); - return status; + s = ReadFirstLine(archived_file, sequence); + } + + if (s.ok() && *sequence != 0) { + MutexLock l(&read_first_record_cache_mutex_); + read_first_record_cache_.insert({number, *sequence}); } - return Status::NotSupported("File Type Not Known: " + std::to_string(type)); + return s; } +// the function returns status.ok() and sequence == 0 if the file exists, but is +// empty Status DBImpl::ReadFirstLine(const std::string& fname, - WriteBatch* const batch) { + SequenceNumber* sequence) { struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; @@ -1043,15 +1055,16 @@ Status DBImpl::ReadFirstLine(const std::string& fname, Status::Corruption("log record too small")); // TODO read record's till the first no corrupt entry? } else { - WriteBatchInternal::SetContents(batch, record); + WriteBatch batch; + WriteBatchInternal::SetContents(&batch, record); + *sequence = WriteBatchInternal::Sequence(&batch); return Status::OK(); } } - // ReadRecord returns false on EOF, which is deemed as OK() by Reader - if (status.ok()) { - status = Status::Corruption("eof reached"); - } + // ReadRecord returns false on EOF, which means that the log file is empty. we + // return status.ok() in that case and set sequence number to 0 + *sequence = 0; return status; } diff --git a/db/db_impl.h b/db/db_impl.h index b66d4e558..0fa91f29a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -191,6 +191,10 @@ class DBImpl : public DB { void TEST_GetFilesMetaData(ColumnFamilyHandle* column_family, std::vector>* metadata); + Status TEST_ReadFirstRecord(const WalFileType type, const uint64_t number, + SequenceNumber* sequence); + + Status TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence); #endif // NDEBUG // needed for CleanupIteratorState @@ -408,14 +412,11 @@ class DBImpl : public DB { // Greater Than or Equal to the requested SequenceNumber. Status RetainProbableWalFiles(VectorLogPtr& all_logs, const SequenceNumber target); - // return true if - bool CheckWalFileExistsAndEmpty(const WalFileType type, - const uint64_t number); Status ReadFirstRecord(const WalFileType type, const uint64_t number, - WriteBatch* const result); + SequenceNumber* sequence); - Status ReadFirstLine(const std::string& fname, WriteBatch* const batch); + Status ReadFirstLine(const std::string& fname, SequenceNumber* sequence); #endif // ROCKSDB_LITE void PrintStatistics(); @@ -459,6 +460,10 @@ class DBImpl : public DB { SnapshotList snapshots_; + // cache for ReadFirstRecord() calls + std::unordered_map read_first_record_cache_; + port::Mutex read_first_record_cache_mutex_; + // Set of table files to protect from deletion because they are // part of ongoing compactions. std::set pending_outputs_; diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 3dcde6c40..d6551b45a 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -117,5 +117,16 @@ Status DBImpl::TEST_WaitForCompact() { } return bg_error_; } + +Status DBImpl::TEST_ReadFirstRecord(const WalFileType type, + const uint64_t number, + SequenceNumber* sequence) { + return ReadFirstRecord(type, number, sequence); +} + +Status DBImpl::TEST_ReadFirstLine(const std::string& fname, + SequenceNumber* sequence) { + return ReadFirstLine(fname, sequence); +} } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/db/db_test.cc b/db/db_test.cc index 20fdbd290..8a6e1836d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -125,6 +125,9 @@ class SpecialEnv : public EnvWrapper { bool count_random_reads_; anon::AtomicCounter random_read_counter_; + bool count_sequential_reads_; + anon::AtomicCounter sequential_read_counter_; + anon::AtomicCounter sleep_counter_; explicit SpecialEnv(Env* base) : EnvWrapper(base) { @@ -132,6 +135,7 @@ class SpecialEnv : public EnvWrapper { no_space_.Release_Store(nullptr); non_writable_.Release_Store(nullptr); count_random_reads_ = false; + count_sequential_reads_ = false; manifest_sync_error_.Release_Store(nullptr); manifest_write_error_.Release_Store(nullptr); log_write_error_.Release_Store(nullptr); @@ -252,6 +256,31 @@ class SpecialEnv : public EnvWrapper { return s; } + Status NewSequentialFile(const std::string& f, unique_ptr* r, + const EnvOptions& soptions) { + class CountingFile : public SequentialFile { + private: + unique_ptr target_; + anon::AtomicCounter* counter_; + + public: + CountingFile(unique_ptr&& target, + anon::AtomicCounter* counter) + : target_(std::move(target)), counter_(counter) {} + virtual Status Read(size_t n, Slice* result, char* scratch) { + counter_->Increment(); + return target_->Read(n, result, scratch); + } + virtual Status Skip(uint64_t n) { return target_->Skip(n); } + }; + + Status s = target()->NewSequentialFile(f, r, soptions); + if (s.ok() && count_sequential_reads_) { + r->reset(new CountingFile(std::move(*r), &sequential_read_counter_)); + } + return s; + } + virtual void SleepForMicroseconds(int micros) { sleep_counter_.Increment(); target()->SleepForMicroseconds(micros); @@ -5694,6 +5723,44 @@ TEST(DBTest, TransactionLogIteratorBlobs) { handler.seen); } +TEST(DBTest, ReadFirstRecordCache) { + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + DestroyAndReopen(&options); + + std::string path = dbname_ + "/000001.log"; + unique_ptr file; + ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions())); + + SequenceNumber s; + ASSERT_OK(dbfull()->TEST_ReadFirstLine(path, &s)); + ASSERT_EQ(s, 0); + + ASSERT_OK(dbfull()->TEST_ReadFirstRecord(kAliveLogFile, 1, &s)); + ASSERT_EQ(s, 0); + + log::Writer writer(std::move(file)); + WriteBatch batch; + batch.Put("foo", "bar"); + WriteBatchInternal::SetSequence(&batch, 10); + writer.AddRecord(WriteBatchInternal::Contents(&batch)); + + env_->count_sequential_reads_ = true; + // sequential_read_counter_ sanity test + ASSERT_EQ(env_->sequential_read_counter_.Read(), 0); + + ASSERT_OK(dbfull()->TEST_ReadFirstRecord(kAliveLogFile, 1, &s)); + ASSERT_EQ(s, 10); + // did a read + ASSERT_EQ(env_->sequential_read_counter_.Read(), 1); + + ASSERT_OK(dbfull()->TEST_ReadFirstRecord(kAliveLogFile, 1, &s)); + ASSERT_EQ(s, 10); + // no new reads since the value is cached + ASSERT_EQ(env_->sequential_read_counter_.Read(), 1); +} + TEST(DBTest, ReadCompaction) { std::string value(4096, '4'); // a string of size 4K {