Cache result of ReadFirstRecord()

Summary:
ReadFirstRecord() reads the actual log file from disk on every call. This diff introduces a cache layer on top of ReadFirstRecord(), which should significantly speed up repeated calls to GetUpdatesSince().

I also cleaned up some stuff, but the whole TransactionLogIterator could use some refactoring, especially if we see increased usage.

Test Plan: make check

Reviewers: haobo, sdong, dhruba

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D18387
main
Igor Canadi 11 years ago
parent de825e7307
commit dd9eb7a7d5
  1. 89
      db/db_impl.cc
  2. 15
      db/db_impl.h
  3. 11
      db/db_impl_debug.cc
  4. 67
      db/db_test.cc

@ -831,6 +831,9 @@ void DBImpl::PurgeObsoleteWALFiles() {
Log(options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
continue;
} else {
MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.erase(number);
}
continue;
}
@ -853,6 +856,9 @@ void DBImpl::PurgeObsoleteWALFiles() {
Log(options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
continue;
} else {
MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.erase(number);
}
}
}
@ -887,6 +893,9 @@ void DBImpl::PurgeObsoleteWALFiles() {
Log(options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
continue;
} else {
MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.erase(archived_logs[i]->LogNumber());
}
}
}
@ -914,14 +923,15 @@ Status DBImpl::GetSortedWalsOfType(const std::string& path,
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kLogFile) {
WriteBatch batch;
Status s = ReadFirstRecord(log_type, number, &batch);
SequenceNumber sequence;
Status s = ReadFirstRecord(log_type, number, &sequence);
if (!s.ok()) {
if (CheckWalFileExistsAndEmpty(log_type, number)) {
continue;
}
return s;
}
if (sequence == 0) {
// empty file
continue;
}
uint64_t size_bytes;
s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
@ -930,8 +940,7 @@ Status DBImpl::GetSortedWalsOfType(const std::string& path,
}
log_files.push_back(std::move(unique_ptr<LogFile>(
new LogFileImpl(number, log_type,
WriteBatchInternal::Sequence(&batch), size_bytes))));
new LogFileImpl(number, log_type, sequence, size_bytes))));
}
}
CompareLogByPointer compare_log_files;
@ -963,43 +972,46 @@ Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs,
return Status::OK();
}
bool DBImpl::CheckWalFileExistsAndEmpty(const WalFileType type,
const uint64_t number) {
const std::string fname = (type == kAliveLogFile)
? LogFileName(options_.wal_dir, number)
: ArchivedLogFileName(options_.wal_dir, number);
uint64_t file_size;
Status s = env_->GetFileSize(fname, &file_size);
return (s.ok() && (file_size == 0));
}
Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number,
WriteBatch* const result) {
SequenceNumber* sequence) {
if (type != kAliveLogFile && type != kArchivedLogFile) {
return Status::NotSupported("File Type Not Known " + std::to_string(type));
}
{
MutexLock l(&read_first_record_cache_mutex_);
auto itr = read_first_record_cache_.find(number);
if (itr != read_first_record_cache_.end()) {
*sequence = itr->second;
return Status::OK();
}
}
Status s;
if (type == kAliveLogFile) {
std::string fname = LogFileName(options_.wal_dir, number);
Status status = ReadFirstLine(fname, result);
if (status.ok() || env_->FileExists(fname)) {
// return OK or any error that is not caused non-existing file
return status;
s = ReadFirstLine(fname, sequence);
if (env_->FileExists(fname) && !s.ok()) {
// return any error that is not caused by non-existing file
return s;
}
}
if (type == kArchivedLogFile || !s.ok()) {
// check if the file got moved to archive.
std::string archived_file = ArchivedLogFileName(options_.wal_dir, number);
Status s = ReadFirstLine(archived_file, result);
if (s.ok() || env_->FileExists(archived_file)) {
return s;
}
return Status::NotFound("Log File has been deleted: " + archived_file);
} else if (type == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(options_.wal_dir, number);
Status status = ReadFirstLine(fname, result);
return status;
s = ReadFirstLine(archived_file, sequence);
}
if (s.ok() && *sequence != 0) {
MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.insert({number, *sequence});
}
return Status::NotSupported("File Type Not Known: " + std::to_string(type));
return s;
}
// the function returns status.ok() and sequence == 0 if the file exists, but is
// empty
Status DBImpl::ReadFirstLine(const std::string& fname,
WriteBatch* const batch) {
SequenceNumber* sequence) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
@ -1043,15 +1055,16 @@ Status DBImpl::ReadFirstLine(const std::string& fname,
Status::Corruption("log record too small"));
// TODO read record's till the first no corrupt entry?
} else {
WriteBatchInternal::SetContents(batch, record);
WriteBatch batch;
WriteBatchInternal::SetContents(&batch, record);
*sequence = WriteBatchInternal::Sequence(&batch);
return Status::OK();
}
}
// ReadRecord returns false on EOF, which is deemed as OK() by Reader
if (status.ok()) {
status = Status::Corruption("eof reached");
}
// ReadRecord returns false on EOF, which means that the log file is empty. we
// return status.ok() in that case and set sequence number to 0
*sequence = 0;
return status;
}

@ -191,6 +191,10 @@ class DBImpl : public DB {
void TEST_GetFilesMetaData(ColumnFamilyHandle* column_family,
std::vector<std::vector<FileMetaData>>* metadata);
Status TEST_ReadFirstRecord(const WalFileType type, const uint64_t number,
SequenceNumber* sequence);
Status TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence);
#endif // NDEBUG
// needed for CleanupIteratorState
@ -408,14 +412,11 @@ class DBImpl : public DB {
// Greater Than or Equal to the requested SequenceNumber.
Status RetainProbableWalFiles(VectorLogPtr& all_logs,
const SequenceNumber target);
// return true if
bool CheckWalFileExistsAndEmpty(const WalFileType type,
const uint64_t number);
Status ReadFirstRecord(const WalFileType type, const uint64_t number,
WriteBatch* const result);
SequenceNumber* sequence);
Status ReadFirstLine(const std::string& fname, WriteBatch* const batch);
Status ReadFirstLine(const std::string& fname, SequenceNumber* sequence);
#endif // ROCKSDB_LITE
void PrintStatistics();
@ -459,6 +460,10 @@ class DBImpl : public DB {
SnapshotList snapshots_;
// cache for ReadFirstRecord() calls
std::unordered_map<uint64_t, SequenceNumber> read_first_record_cache_;
port::Mutex read_first_record_cache_mutex_;
// Set of table files to protect from deletion because they are
// part of ongoing compactions.
std::set<uint64_t> pending_outputs_;

@ -117,5 +117,16 @@ Status DBImpl::TEST_WaitForCompact() {
}
return bg_error_;
}
Status DBImpl::TEST_ReadFirstRecord(const WalFileType type,
const uint64_t number,
SequenceNumber* sequence) {
return ReadFirstRecord(type, number, sequence);
}
Status DBImpl::TEST_ReadFirstLine(const std::string& fname,
SequenceNumber* sequence) {
return ReadFirstLine(fname, sequence);
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -125,6 +125,9 @@ class SpecialEnv : public EnvWrapper {
bool count_random_reads_;
anon::AtomicCounter random_read_counter_;
bool count_sequential_reads_;
anon::AtomicCounter sequential_read_counter_;
anon::AtomicCounter sleep_counter_;
explicit SpecialEnv(Env* base) : EnvWrapper(base) {
@ -132,6 +135,7 @@ class SpecialEnv : public EnvWrapper {
no_space_.Release_Store(nullptr);
non_writable_.Release_Store(nullptr);
count_random_reads_ = false;
count_sequential_reads_ = false;
manifest_sync_error_.Release_Store(nullptr);
manifest_write_error_.Release_Store(nullptr);
log_write_error_.Release_Store(nullptr);
@ -252,6 +256,31 @@ class SpecialEnv : public EnvWrapper {
return s;
}
Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
const EnvOptions& soptions) {
class CountingFile : public SequentialFile {
private:
unique_ptr<SequentialFile> target_;
anon::AtomicCounter* counter_;
public:
CountingFile(unique_ptr<SequentialFile>&& target,
anon::AtomicCounter* counter)
: target_(std::move(target)), counter_(counter) {}
virtual Status Read(size_t n, Slice* result, char* scratch) {
counter_->Increment();
return target_->Read(n, result, scratch);
}
virtual Status Skip(uint64_t n) { return target_->Skip(n); }
};
Status s = target()->NewSequentialFile(f, r, soptions);
if (s.ok() && count_sequential_reads_) {
r->reset(new CountingFile(std::move(*r), &sequential_read_counter_));
}
return s;
}
virtual void SleepForMicroseconds(int micros) {
sleep_counter_.Increment();
target()->SleepForMicroseconds(micros);
@ -5694,6 +5723,44 @@ TEST(DBTest, TransactionLogIteratorBlobs) {
handler.seen);
}
TEST(DBTest, ReadFirstRecordCache) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
DestroyAndReopen(&options);
std::string path = dbname_ + "/000001.log";
unique_ptr<WritableFile> file;
ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions()));
SequenceNumber s;
ASSERT_OK(dbfull()->TEST_ReadFirstLine(path, &s));
ASSERT_EQ(s, 0);
ASSERT_OK(dbfull()->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
ASSERT_EQ(s, 0);
log::Writer writer(std::move(file));
WriteBatch batch;
batch.Put("foo", "bar");
WriteBatchInternal::SetSequence(&batch, 10);
writer.AddRecord(WriteBatchInternal::Contents(&batch));
env_->count_sequential_reads_ = true;
// sequential_read_counter_ sanity test
ASSERT_EQ(env_->sequential_read_counter_.Read(), 0);
ASSERT_OK(dbfull()->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
ASSERT_EQ(s, 10);
// did a read
ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
ASSERT_OK(dbfull()->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
ASSERT_EQ(s, 10);
// no new reads since the value is cached
ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
}
TEST(DBTest, ReadCompaction) {
std::string value(4096, '4'); // a string of size 4K
{

Loading…
Cancel
Save