Summary: Decoupling code that deals with archived log files outside of DBImpl. That will make this code easier to reason about and test. It will also make the code easier to improve, because an improver doesn't have to understand DBImpl code in entirety.

Test Plan: added test

Reviewers: ljin, yhchiang, rven, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D27873
main
Igor Canadi 10 years ago
parent fd95745a59
commit 635905481d
  1. 6
      Makefile
  2. 51
      db/db_filesnapshot.cc
  3. 366
      db/db_impl.cc
  4. 53
      db/db_impl.h
  5. 13
      db/db_impl_debug.cc
  6. 191
      db/db_test.cc
  7. 27
      db/transaction_log_impl.cc
  8. 8
      db/transaction_log_impl.h
  9. 445
      db/wal_manager.cc
  10. 95
      db/wal_manager.h
  11. 279
      db/wal_manager_test.cc

@ -146,7 +146,8 @@ TESTS = \
cuckoo_table_reader_test \
cuckoo_table_db_test \
write_batch_with_index_test \
flush_job_test
flush_job_test \
wal_manager_test
TOOLS = \
sst_dump \
@ -421,6 +422,9 @@ write_batch_with_index_test: utilities/write_batch_with_index/write_batch_with_i
flush_job_test: db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
wal_manager_test: db/wal_manager_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/wal_manager_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
dbformat_test: db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

@ -132,57 +132,8 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
}
Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
// First get sorted files in db dir, then get sorted files from archived
// dir, to avoid a race condition where a log file is moved to archived
// dir in between.
Status s;
// list wal files in main db dir.
VectorLogPtr logs;
s = GetSortedWalsOfType(db_options_.wal_dir, logs, kAliveLogFile);
if (!s.ok()) {
return s;
}
// Reproduce the race condition where a log file is moved
// to archived dir, between these two sync points, used in
// (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:1");
TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:2");
files.clear();
// list wal files in archive dir.
std::string archivedir = ArchivalDirectory(db_options_.wal_dir);
if (env_->FileExists(archivedir)) {
s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
if (!s.ok()) {
return s;
}
}
uint64_t latest_archived_log_number = 0;
if (!files.empty()) {
latest_archived_log_number = files.back()->LogNumber();
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Latest Archived log: %" PRIu64, latest_archived_log_number);
}
files.reserve(files.size() + logs.size());
for (auto& log : logs) {
if (log->LogNumber() > latest_archived_log_number) {
files.push_back(std::move(log));
} else {
// When the race condition happens, we could see the
// same log in both db dir and archived dir. Simply
// ignore the one in db dir. Note that, if we read
// archived dir first, we would have missed the log file.
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"%s already moved to archive", log->PathName().c_str());
}
}
return s;
return wal_manager_.GetSortedWalFiles(files);
}
}
#endif // ROCKSDB_LITE

@ -342,11 +342,12 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
manual_compaction_(nullptr),
disable_delete_obsolete_files_(0),
delete_obsolete_files_last_run_(options.env->NowMicros()),
purge_wal_files_last_run_(0),
last_stats_dump_time_microsec_(0),
default_interval_to_delete_obsolete_WAL_(600),
flush_on_destroy_(false),
env_options_(options),
#ifndef ROCKSDB_LITE
wal_manager_(db_options_, env_options_),
#endif // ROCKSDB_LITE
bg_work_gate_closed_(false),
refitting_level_(false),
opened_successfully_(false) {
@ -738,23 +739,20 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
db_options_.wal_dir : dbname_) + "/" + to_delete;
}
if (type == kLogFile &&
(db_options_.WAL_ttl_seconds > 0 ||
db_options_.WAL_size_limit_MB > 0)) {
auto archived_log_name = ArchivedLogFileName(db_options_.wal_dir, number);
// The sync point below is used in (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:1");
Status s = env_->RenameFile(fname, archived_log_name);
// The sync point below is used in (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:2");
Log(db_options_.info_log,
"Move log file %s to %s -- %s\n",
fname.c_str(), archived_log_name.c_str(), s.ToString().c_str());
#ifdef ROCKSDB_LITE
Status s = env_->DeleteFile(fname);
Log(db_options_.info_log, "Delete %s type=%d #%" PRIu64 " -- %s\n",
fname.c_str(), type, number, s.ToString().c_str());
#else // not ROCKSDB_LITE
if (type == kLogFile && (db_options_.WAL_ttl_seconds > 0 ||
db_options_.WAL_size_limit_MB > 0)) {
wal_manager_.ArchiveWALFile(fname, number);
} else {
Status s = env_->DeleteFile(fname);
Log(db_options_.info_log, "Delete %s type=%d #%" PRIu64 " -- %s\n",
fname.c_str(), type, number, s.ToString().c_str());
}
#endif // ROCKSDB_LITE
}
// Delete old info log files.
@ -775,7 +773,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
}
}
}
PurgeObsoleteWALFiles();
#ifndef ROCKSDB_LITE
wal_manager_.PurgeObsoleteWALFiles();
#endif // ROCKSDB_LITE
LogFlush(db_options_.info_log);
}
@ -788,324 +788,6 @@ void DBImpl::DeleteObsoleteFiles() {
}
}
#ifndef ROCKSDB_LITE
// 1. Go through all archived files and
// a. if ttl is enabled, delete outdated files
// b. if archive size limit is enabled, delete empty files,
// compute file number and size.
// 2. If size limit is enabled:
// a. compute how many files should be deleted
// b. get sorted non-empty archived logs
// c. delete what should be deleted
void DBImpl::PurgeObsoleteWALFiles() {
bool const ttl_enabled = db_options_.WAL_ttl_seconds > 0;
bool const size_limit_enabled = db_options_.WAL_size_limit_MB > 0;
if (!ttl_enabled && !size_limit_enabled) {
return;
}
int64_t current_time;
Status s = env_->GetCurrentTime(&current_time);
if (!s.ok()) {
Log(db_options_.info_log, "Can't get current time: %s",
s.ToString().c_str());
assert(false);
return;
}
uint64_t const now_seconds = static_cast<uint64_t>(current_time);
uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled) ?
db_options_.WAL_ttl_seconds / 2 : default_interval_to_delete_obsolete_WAL_;
if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
return;
}
purge_wal_files_last_run_ = now_seconds;
std::string archival_dir = ArchivalDirectory(db_options_.wal_dir);
std::vector<std::string> files;
s = env_->GetChildren(archival_dir, &files);
if (!s.ok()) {
Log(db_options_.info_log, "Can't get archive files: %s",
s.ToString().c_str());
assert(false);
return;
}
size_t log_files_num = 0;
uint64_t log_file_size = 0;
for (auto& f : files) {
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kLogFile) {
std::string const file_path = archival_dir + "/" + f;
if (ttl_enabled) {
uint64_t file_m_time;
Status const s = env_->GetFileModificationTime(file_path,
&file_m_time);
if (!s.ok()) {
Log(db_options_.info_log, "Can't get file mod time: %s: %s",
file_path.c_str(), s.ToString().c_str());
continue;
}
if (now_seconds - file_m_time > db_options_.WAL_ttl_seconds) {
Status const s = env_->DeleteFile(file_path);
if (!s.ok()) {
Log(db_options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
continue;
} else {
MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.erase(number);
}
continue;
}
}
if (size_limit_enabled) {
uint64_t file_size;
Status const s = env_->GetFileSize(file_path, &file_size);
if (!s.ok()) {
Log(db_options_.info_log, "Can't get file size: %s: %s",
file_path.c_str(), s.ToString().c_str());
return;
} else {
if (file_size > 0) {
log_file_size = std::max(log_file_size, file_size);
++log_files_num;
} else {
Status s = env_->DeleteFile(file_path);
if (!s.ok()) {
Log(db_options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
continue;
} else {
MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.erase(number);
}
}
}
}
}
}
if (0 == log_files_num || !size_limit_enabled) {
return;
}
size_t const files_keep_num = db_options_.WAL_size_limit_MB *
1024 * 1024 / log_file_size;
if (log_files_num <= files_keep_num) {
return;
}
size_t files_del_num = log_files_num - files_keep_num;
VectorLogPtr archived_logs;
GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);
if (files_del_num > archived_logs.size()) {
Log(db_options_.info_log, "Trying to delete more archived log files than "
"exist. Deleting all");
files_del_num = archived_logs.size();
}
for (size_t i = 0; i < files_del_num; ++i) {
std::string const file_path = archived_logs[i]->PathName();
Status const s = DeleteFile(file_path);
if (!s.ok()) {
Log(db_options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
continue;
} else {
MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.erase(archived_logs[i]->LogNumber());
}
}
}
namespace {
struct CompareLogByPointer {
bool operator()(const unique_ptr<LogFile>& a, const unique_ptr<LogFile>& b) {
LogFileImpl* a_impl = dynamic_cast<LogFileImpl*>(a.get());
LogFileImpl* b_impl = dynamic_cast<LogFileImpl*>(b.get());
return *a_impl < *b_impl;
}
};
}
Status DBImpl::GetSortedWalsOfType(const std::string& path,
VectorLogPtr& log_files,
WalFileType log_type) {
std::vector<std::string> all_files;
const Status status = env_->GetChildren(path, &all_files);
if (!status.ok()) {
return status;
}
log_files.reserve(all_files.size());
for (const auto& f : all_files) {
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kLogFile) {
SequenceNumber sequence;
Status s = ReadFirstRecord(log_type, number, &sequence);
if (!s.ok()) {
return s;
}
if (sequence == 0) {
// empty file
continue;
}
// Reproduce the race condition where a log file is moved
// to archived dir, between these two sync points, used in
// (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("DBImpl::GetSortedWalsOfType:1");
TEST_SYNC_POINT("DBImpl::GetSortedWalsOfType:2");
uint64_t size_bytes;
s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
// re-try in case the alive log file has been moved to archive.
if (!s.ok() && log_type == kAliveLogFile &&
env_->FileExists(ArchivedLogFileName(path, number))) {
s = env_->GetFileSize(ArchivedLogFileName(path, number), &size_bytes);
}
if (!s.ok()) {
return s;
}
log_files.push_back(std::move(unique_ptr<LogFile>(
new LogFileImpl(number, log_type, sequence, size_bytes))));
}
}
CompareLogByPointer compare_log_files;
std::sort(log_files.begin(), log_files.end(), compare_log_files);
return status;
}
Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs,
const SequenceNumber target) {
int64_t start = 0; // signed to avoid overflow when target is < first file.
int64_t end = static_cast<int64_t>(all_logs.size()) - 1;
// Binary Search. avoid opening all files.
while (end >= start) {
int64_t mid = start + (end - start) / 2; // Avoid overflow.
SequenceNumber current_seq_num = all_logs.at(mid)->StartSequence();
if (current_seq_num == target) {
end = mid;
break;
} else if (current_seq_num < target) {
start = mid + 1;
} else {
end = mid - 1;
}
}
// end could be -ve.
size_t start_index = std::max(static_cast<int64_t>(0), end);
// The last wal file is always included
all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
return Status::OK();
}
Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number,
SequenceNumber* sequence) {
if (type != kAliveLogFile && type != kArchivedLogFile) {
return Status::NotSupported("File Type Not Known " + std::to_string(type));
}
{
MutexLock l(&read_first_record_cache_mutex_);
auto itr = read_first_record_cache_.find(number);
if (itr != read_first_record_cache_.end()) {
*sequence = itr->second;
return Status::OK();
}
}
Status s;
if (type == kAliveLogFile) {
std::string fname = LogFileName(db_options_.wal_dir, number);
s = ReadFirstLine(fname, sequence);
if (env_->FileExists(fname) && !s.ok()) {
// return any error that is not caused by non-existing file
return s;
}
}
if (type == kArchivedLogFile || !s.ok()) {
// check if the file got moved to archive.
std::string archived_file =
ArchivedLogFileName(db_options_.wal_dir, number);
s = ReadFirstLine(archived_file, sequence);
}
if (s.ok() && *sequence != 0) {
MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.insert({number, *sequence});
}
return s;
}
// the function returns status.ok() and sequence == 0 if the file exists, but is
// empty
Status DBImpl::ReadFirstLine(const std::string& fname,
SequenceNumber* sequence) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status;
bool ignore_error; // true if db_options_.paranoid_checks==false
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->ignore_error ? "(ignoring error) " : ""), fname,
static_cast<int>(bytes), s.ToString().c_str());
if (this->status->ok()) {
// only keep the first error
*this->status = s;
}
}
};
unique_ptr<SequentialFile> file;
Status status = env_->NewSequentialFile(fname, &file, env_options_);
if (!status.ok()) {
return status;
}
LogReporter reporter;
reporter.env = env_;
reporter.info_log = db_options_.info_log.get();
reporter.fname = fname.c_str();
reporter.status = &status;
reporter.ignore_error = !db_options_.paranoid_checks;
log::Reader reader(std::move(file), &reporter, true /*checksum*/,
0 /*initial_offset*/);
std::string scratch;
Slice record;
if (reader.ReadRecord(&record, &scratch) &&
(status.ok() || !db_options_.paranoid_checks)) {
if (record.size() < 12) {
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
// TODO read record's till the first no corrupt entry?
} else {
WriteBatch batch;
WriteBatchInternal::SetContents(&batch, record);
*sequence = WriteBatchInternal::Sequence(&batch);
return Status::OK();
}
}
// ReadRecord returns false on EOF, which means that the log file is empty. we
// return status.ok() in that case and set sequence number to 0
*sequence = 0;
return status;
}
#endif // ROCKSDB_LITE
Status DBImpl::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
bool error_if_log_file_exist) {
@ -4304,23 +3986,7 @@ Status DBImpl::GetUpdatesSince(
if (seq > versions_->LastSequence()) {
return Status::NotFound("Requested sequence not yet written in the db");
}
// Get all sorted Wal Files.
// Do binary search and open files and find the seq number.
std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
Status s = GetSortedWalFiles(*wal_files);
if (!s.ok()) {
return s;
}
s = RetainProbableWalFiles(*wal_files, seq);
if (!s.ok()) {
return s;
}
iter->reset(new TransactionLogIteratorImpl(db_options_.wal_dir, &db_options_,
read_options, env_options_,
seq, std::move(wal_files), this));
return (*iter)->status();
return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
}
Status DBImpl::DeleteFile(std::string name) {

@ -21,6 +21,7 @@
#include "db/snapshot.h"
#include "db/column_family.h"
#include "db/version_edit.h"
#include "db/wal_manager.h"
#include "memtable_list.h"
#include "port/port.h"
#include "rocksdb/db.h"
@ -193,25 +194,12 @@ class DBImpl : public DB {
// Return the current manifest file no.
uint64_t TEST_Current_Manifest_FileNo();
// Trigger's a background call for testing.
void TEST_PurgeObsoleteteWAL();
// get total level0 file size. Only for testing.
uint64_t TEST_GetLevel0TotalSize();
void TEST_SetDefaultTimeToCheck(uint64_t default_interval_to_delete_obsolete_WAL)
{
default_interval_to_delete_obsolete_WAL_ = default_interval_to_delete_obsolete_WAL;
}
void TEST_GetFilesMetaData(ColumnFamilyHandle* column_family,
std::vector<std::vector<FileMetaData>>* metadata);
Status TEST_ReadFirstRecord(const WalFileType type, const uint64_t number,
SequenceNumber* sequence);
Status TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence);
void TEST_LockMutex();
void TEST_UnlockMutex();
@ -355,30 +343,6 @@ class DBImpl : public DB {
void AllocateCompactionOutputFileNumbers(CompactionState* compact);
void ReleaseCompactionUnusedFileNumbers(CompactionState* compact);
#ifdef ROCKSDB_LITE
void PurgeObsoleteWALFiles() {
// this function is used for archiving WAL files. we don't need this in
// ROCKSDB_LITE
}
#else
void PurgeObsoleteWALFiles();
Status GetSortedWalsOfType(const std::string& path,
VectorLogPtr& log_files,
WalFileType type);
// Requires: all_logs should be sorted with earliest log file first
// Retains all log files in all_logs which contain updates with seq no.
// Greater Than or Equal to the requested SequenceNumber.
Status RetainProbableWalFiles(VectorLogPtr& all_logs,
const SequenceNumber target);
Status ReadFirstRecord(const WalFileType type, const uint64_t number,
SequenceNumber* sequence);
Status ReadFirstLine(const std::string& fname, SequenceNumber* sequence);
#endif // ROCKSDB_LITE
void PrintStatistics();
// dump rocksdb.stats to LOG
@ -453,10 +417,6 @@ class DBImpl : public DB {
SnapshotList snapshots_;
// cache for ReadFirstRecord() calls
std::unordered_map<uint64_t, SequenceNumber> read_first_record_cache_;
port::Mutex read_first_record_cache_mutex_;
// Set of table files to protect from deletion because they are
// part of ongoing compactions.
// map from pending file number ID to their path IDs.
@ -506,16 +466,9 @@ class DBImpl : public DB {
// last time when DeleteObsoleteFiles was invoked
uint64_t delete_obsolete_files_last_run_;
// last time when PurgeObsoleteWALFiles ran.
uint64_t purge_wal_files_last_run_;
// last time stats were dumped to LOG
std::atomic<uint64_t> last_stats_dump_time_microsec_;
// obsolete files will be deleted every this seconds if ttl deletion is
// enabled and archive size_limit is disabled.
uint64_t default_interval_to_delete_obsolete_WAL_;
bool flush_on_destroy_; // Used when disableWAL is true.
static const int KEEP_LOG_FILE_NUM = 1000;
@ -524,6 +477,10 @@ class DBImpl : public DB {
// The options to access storage files
const EnvOptions env_options_;
#ifndef ROCKSDB_LITE
WalManager wal_manager_;
#endif // ROCKSDB_LITE
// A value of true temporarily disables scheduling of background work
bool bg_work_gate_closed_;

@ -13,8 +13,6 @@
namespace rocksdb {
void DBImpl::TEST_PurgeObsoleteteWAL() { PurgeObsoleteWALFiles(); }
uint64_t DBImpl::TEST_GetLevel0TotalSize() {
MutexLock l(&mutex_);
return default_cf_handle_->cfd()->current()->GetStorageInfo()->NumLevelBytes(
@ -122,17 +120,6 @@ Status DBImpl::TEST_WaitForCompact() {
return bg_error_;
}
Status DBImpl::TEST_ReadFirstRecord(const WalFileType type,
const uint64_t number,
SequenceNumber* sequence) {
return ReadFirstRecord(type, number, sequence);
}
Status DBImpl::TEST_ReadFirstLine(const std::string& fname,
SequenceNumber* sequence) {
return ReadFirstLine(fname, sequence);
}
void DBImpl::TEST_LockMutex() {
mutex_.Lock();
}

@ -6438,10 +6438,6 @@ std::vector<std::uint64_t> ListSpecificFiles(
return std::move(file_numbers);
}
std::vector<std::uint64_t> ListLogFiles(Env* env, const std::string& path) {
return ListSpecificFiles(env, path, kLogFile);
}
std::vector<std::uint64_t> ListTableFiles(Env* env, const std::string& path) {
return ListSpecificFiles(env, path, kTableFile);
}
@ -6593,114 +6589,6 @@ TEST(DBTest, RecoverCheckFileAmount) {
}
}
TEST(DBTest, WALArchivalTtl) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.WAL_ttl_seconds = 1000;
DestroyAndReopen(options);
// TEST : Create DB with a ttl and no size limit.
// Put some keys. Count the log files present in the DB just after insert.
// Re-open db. Causes deletion/archival to take place.
// Assert that the files moved under "/archive".
// Reopen db with small ttl.
// Assert that archive was removed.
std::string archiveDir = ArchivalDirectory(dbname_);
for (int i = 0; i < 10; ++i) {
for (int j = 0; j < 10; ++j) {
ASSERT_OK(Put(Key(10 * i + j), DummyString(1024)));
}
std::vector<uint64_t> log_files = ListLogFiles(env_, dbname_);
options.create_if_missing = false;
Reopen(options);
std::vector<uint64_t> logs = ListLogFiles(env_, archiveDir);
std::set<uint64_t> archivedFiles(logs.begin(), logs.end());
for (auto& log : log_files) {
ASSERT_TRUE(archivedFiles.find(log) != archivedFiles.end());
}
}
std::vector<uint64_t> log_files = ListLogFiles(env_, archiveDir);
ASSERT_TRUE(log_files.size() > 0);
options.WAL_ttl_seconds = 1;
env_->SleepForMicroseconds(2 * 1000 * 1000);
Reopen(options);
log_files = ListLogFiles(env_, archiveDir);
ASSERT_TRUE(log_files.empty());
} while (ChangeCompactOptions());
}
namespace {
uint64_t GetLogDirSize(std::string dir_path, SpecialEnv* env) {
uint64_t dir_size = 0;
std::vector<std::string> files;
env->GetChildren(dir_path, &files);
for (auto& f : files) {
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kLogFile) {
std::string const file_path = dir_path + "/" + f;
uint64_t file_size;
env->GetFileSize(file_path, &file_size);
dir_size += file_size;
}
}
return dir_size;
}
} // namespace
TEST(DBTest, WALArchivalSizeLimit) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.WAL_ttl_seconds = 0;
options.WAL_size_limit_MB = 1000;
// TEST : Create DB with huge size limit and no ttl.
// Put some keys. Count the archived log files present in the DB
// just after insert. Assert that there are many enough.
// Change size limit. Re-open db.
// Assert that archive is not greater than WAL_size_limit_MB.
// Set ttl and time_to_check_ to small values. Re-open db.
// Assert that there are no archived logs left.
DestroyAndReopen(options);
for (int i = 0; i < 128 * 128; ++i) {
ASSERT_OK(Put(Key(i), DummyString(1024)));
}
Reopen(options);
std::string archive_dir = ArchivalDirectory(dbname_);
std::vector<std::uint64_t> log_files = ListLogFiles(env_, archive_dir);
ASSERT_TRUE(log_files.size() > 2);
options.WAL_size_limit_MB = 8;
Reopen(options);
dbfull()->TEST_PurgeObsoleteteWAL();
uint64_t archive_size = GetLogDirSize(archive_dir, env_);
ASSERT_TRUE(archive_size <= options.WAL_size_limit_MB * 1024 * 1024);
options.WAL_ttl_seconds = 1;
dbfull()->TEST_SetDefaultTimeToCheck(1);
env_->SleepForMicroseconds(2 * 1000 * 1000);
Reopen(options);
dbfull()->TEST_PurgeObsoleteteWAL();
log_files = ListLogFiles(env_, archive_dir);
ASSERT_TRUE(log_files.empty());
} while (ChangeCompactOptions());
}
TEST(DBTest, PurgeInfoLogs) {
Options options = CurrentOptions();
options.keep_log_file_num = 5;
@ -6804,11 +6692,13 @@ TEST(DBTest, TransactionLogIterator) {
#ifndef NDEBUG // sync point is not included with DNDEBUG build
TEST(DBTest, TransactionLogIteratorRace) {
static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;
static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] =
{ { "DBImpl::GetSortedWalFiles:1", "DBImpl::PurgeObsoleteFiles:1",
"DBImpl::PurgeObsoleteFiles:2", "DBImpl::GetSortedWalFiles:2" },
{ "DBImpl::GetSortedWalsOfType:1", "DBImpl::PurgeObsoleteFiles:1",
"DBImpl::PurgeObsoleteFiles:2", "DBImpl::GetSortedWalsOfType:2" }};
static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = {
{"WalManager::GetSortedWalFiles:1", "WalManager::PurgeObsoleteFiles:1",
"WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"},
{"WalManager::GetSortedWalsOfType:1",
"WalManager::PurgeObsoleteFiles:1",
"WalManager::PurgeObsoleteFiles:2",
"WalManager::GetSortedWalsOfType:2"}};
for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) {
// Setup sync point dependency to reproduce the race condition of
// a log file moved to archived dir, in the middle of GetSortedWalFiles
@ -6856,24 +6746,6 @@ TEST(DBTest, TransactionLogIteratorRace) {
}
#endif
TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) {
do {
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
// Do a plain Reopen.
Put(1, "key1", DummyString(1024));
// Two reopens should create a zero record WAL file.
ReopenWithColumnFamilies({"default", "pikachu"}, options);
ReopenWithColumnFamilies({"default", "pikachu"}, options);
Put(1, "key2", DummyString(1024));
auto iter = OpenTransactionLogIter(0);
ExpectRecords(2, iter);
} while (ChangeCompactOptions());
}
TEST(DBTest, TransactionLogIteratorStallAtLastRecord) {
do {
Options options = OptionsForLogIterTest();
@ -6892,17 +6764,6 @@ TEST(DBTest, TransactionLogIteratorStallAtLastRecord) {
} while (ChangeCompactOptions());
}
TEST(DBTest, TransactionLogIteratorJustEmptyFile) {
do {
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(0, &iter);
// Check that an empty iterator is returned
ASSERT_TRUE(!iter->Valid());
} while (ChangeCompactOptions());
}
TEST(DBTest, TransactionLogIteratorCheckAfterRestart) {
do {
Options options = OptionsForLogIterTest();
@ -7013,44 +6874,6 @@ TEST(DBTest, TransactionLogIteratorBlobs) {
handler.seen);
}
TEST(DBTest, ReadFirstRecordCache) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
DestroyAndReopen(options);
std::string path = dbname_ + "/000001.log";
unique_ptr<WritableFile> file;
ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions()));
SequenceNumber s;
ASSERT_OK(dbfull()->TEST_ReadFirstLine(path, &s));
ASSERT_EQ(s, 0U);
ASSERT_OK(dbfull()->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
ASSERT_EQ(s, 0U);
log::Writer writer(std::move(file));
WriteBatch batch;
batch.Put("foo", "bar");
WriteBatchInternal::SetSequence(&batch, 10);
writer.AddRecord(WriteBatchInternal::Contents(&batch));
env_->count_sequential_reads_ = true;
// sequential_read_counter_ sanity test
ASSERT_EQ(env_->sequential_read_counter_.Read(), 0);
ASSERT_OK(dbfull()->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
ASSERT_EQ(s, 10U);
// did a read
ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
ASSERT_OK(dbfull()->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
ASSERT_EQ(s, 10U);
// no new reads since the value is cached
ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
}
// Multi-threaded test:
namespace {

@ -4,6 +4,11 @@
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include "db/transaction_log_impl.h"
#include "db/write_batch_internal.h"
@ -13,7 +18,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dir, const DBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seq,
std::unique_ptr<VectorLogPtr> files, DBImpl const* const dbimpl)
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions)
: dir_(dir),
options_(options),
read_options_(read_options),
@ -25,9 +30,9 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl(
currentFileIndex_(0),
currentBatchSeq_(0),
currentLastSeq_(0),
dbimpl_(dbimpl) {
versions_(versions) {
assert(files_ != nullptr);
assert(dbimpl_ != nullptr);
assert(versions_ != nullptr);
reporter_.env = options_->env;
reporter_.info_log = options_->info_log.get();
@ -74,7 +79,7 @@ bool TransactionLogIteratorImpl::RestrictedRead(
Slice* record,
std::string* scratch) {
// Don't read if no more complete entries to read from logs
if (currentLastSeq_ >= dbimpl_->GetLatestSequenceNumber()) {
if (currentLastSeq_ >= versions_->LastSequence()) {
return false;
}
return currentLogReader_->ReadRecord(record, scratch);
@ -185,7 +190,7 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) {
}
} else {
isValid_ = false;
if (currentLastSeq_ == dbimpl_->GetLatestSequenceNumber()) {
if (currentLastSeq_ == versions_->LastSequence()) {
currentStatus_ = Status::OK();
} else {
currentStatus_ = Status::Corruption("NO MORE DATA LEFT");
@ -203,12 +208,10 @@ bool TransactionLogIteratorImpl::IsBatchExpected(
if (batchSeq != expectedSeq) {
char buf[200];
snprintf(buf, sizeof(buf),
"Discontinuity in log records. Got seq=%lu, Expected seq=%lu, "
"Last flushed seq=%lu.Log iterator will reseek the correct "
"batch.",
(unsigned long)batchSeq,
(unsigned long)expectedSeq,
(unsigned long)dbimpl_->GetLatestSequenceNumber());
"Discontinuity in log records. Got seq=%" PRIu64
", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64
".Log iterator will reseek the correct batch.",
batchSeq, expectedSeq, versions_->LastSequence());
reporter_.Info(buf);
return false;
}
@ -240,7 +243,7 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
currentLastSeq_ = currentBatchSeq_ +
WriteBatchInternal::Count(batch.get()) - 1;
// currentBatchSeq_ can only change here
assert(currentLastSeq_ <= dbimpl_->GetLatestSequenceNumber());
assert(currentLastSeq_ <= versions_->LastSequence());
currentBatch_ = move(batch);
isValid_ = true;

@ -11,7 +11,7 @@
#include "rocksdb/options.h"
#include "rocksdb/types.h"
#include "rocksdb/transaction_log.h"
#include "db/db_impl.h"
#include "db/version_set.h"
#include "db/log_reader.h"
#include "db/filename.h"
@ -73,7 +73,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
const std::string& dir, const DBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seqNum,
std::unique_ptr<VectorLogPtr> files, DBImpl const* const dbimpl);
std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions);
virtual bool Valid();
@ -100,7 +100,9 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
LogReporter reporter_;
SequenceNumber currentBatchSeq_; // sequence number at start of current batch
SequenceNumber currentLastSeq_; // last sequence in the current batch
DBImpl const * const dbimpl_; // The db on whose log files this iterates
// Used only to get latest seq. num
// TODO(icanadi) can this be just a callback?
VersionSet const* const versions_;
// Reads from transaction log only if the writebatch record has been written
bool RestrictedRead(Slice* record, std::string* scratch);

@ -0,0 +1,445 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/wal_manager.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <algorithm>
#include <vector>
#include <memory>
#include "db/filename.h"
#include "db/transaction_log_impl.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/write_batch_internal.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/write_batch.h"
#include "util/coding.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/sync_point.h"
namespace rocksdb {
#ifndef ROCKSDB_LITE
Status WalManager::GetSortedWalFiles(VectorLogPtr& files) {
// First get sorted files in db dir, then get sorted files from archived
// dir, to avoid a race condition where a log file is moved to archived
// dir in between.
Status s;
// list wal files in main db dir.
VectorLogPtr logs;
s = GetSortedWalsOfType(db_options_.wal_dir, logs, kAliveLogFile);
if (!s.ok()) {
return s;
}
// Reproduce the race condition where a log file is moved
// to archived dir, between these two sync points, used in
// (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1");
TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2");
files.clear();
// list wal files in archive dir.
std::string archivedir = ArchivalDirectory(db_options_.wal_dir);
if (env_->FileExists(archivedir)) {
s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
if (!s.ok()) {
return s;
}
}
uint64_t latest_archived_log_number = 0;
if (!files.empty()) {
latest_archived_log_number = files.back()->LogNumber();
Log(db_options_.info_log, "Latest Archived log: %" PRIu64,
latest_archived_log_number);
}
files.reserve(files.size() + logs.size());
for (auto& log : logs) {
if (log->LogNumber() > latest_archived_log_number) {
files.push_back(std::move(log));
} else {
// When the race condition happens, we could see the
// same log in both db dir and archived dir. Simply
// ignore the one in db dir. Note that, if we read
// archived dir first, we would have missed the log file.
Log(db_options_.info_log, "%s already moved to archive",
log->PathName().c_str());
}
}
return s;
}
Status WalManager::GetUpdatesSince(
SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions& read_options,
VersionSet* version_set) {
// Get all sorted Wal Files.
// Do binary search and open files and find the seq number.
std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
Status s = GetSortedWalFiles(*wal_files);
if (!s.ok()) {
return s;
}
s = RetainProbableWalFiles(*wal_files, seq);
if (!s.ok()) {
return s;
}
iter->reset(new TransactionLogIteratorImpl(
db_options_.wal_dir, &db_options_, read_options, env_options_, seq,
std::move(wal_files), version_set));
return (*iter)->status();
}
// 1. Go through all archived files and
// a. if ttl is enabled, delete outdated files
// b. if archive size limit is enabled, delete empty files,
// compute file number and size.
// 2. If size limit is enabled:
// a. compute how many files should be deleted
// b. get sorted non-empty archived logs
// c. delete what should be deleted
void WalManager::PurgeObsoleteWALFiles() {
bool const ttl_enabled = db_options_.WAL_ttl_seconds > 0;
bool const size_limit_enabled = db_options_.WAL_size_limit_MB > 0;
if (!ttl_enabled && !size_limit_enabled) {
return;
}
int64_t current_time;
Status s = env_->GetCurrentTime(&current_time);
if (!s.ok()) {
Log(db_options_.info_log, "Can't get current time: %s",
s.ToString().c_str());
assert(false);
return;
}
uint64_t const now_seconds = static_cast<uint64_t>(current_time);
uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled)
? db_options_.WAL_ttl_seconds / 2
: kDefaultIntervalToDeleteObsoleteWAL;
if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
return;
}
purge_wal_files_last_run_ = now_seconds;
std::string archival_dir = ArchivalDirectory(db_options_.wal_dir);
std::vector<std::string> files;
s = env_->GetChildren(archival_dir, &files);
if (!s.ok()) {
Log(db_options_.info_log, "Can't get archive files: %s",
s.ToString().c_str());
assert(false);
return;
}
size_t log_files_num = 0;
uint64_t log_file_size = 0;
for (auto& f : files) {
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kLogFile) {
std::string const file_path = archival_dir + "/" + f;
if (ttl_enabled) {
uint64_t file_m_time;
Status const s = env_->GetFileModificationTime(file_path, &file_m_time);
if (!s.ok()) {
Log(db_options_.info_log, "Can't get file mod time: %s: %s",
file_path.c_str(), s.ToString().c_str());
continue;
}
if (now_seconds - file_m_time > db_options_.WAL_ttl_seconds) {
Status const s = env_->DeleteFile(file_path);
if (!s.ok()) {
Log(db_options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
continue;
} else {
MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.erase(number);
}
continue;
}
}
if (size_limit_enabled) {
uint64_t file_size;
Status const s = env_->GetFileSize(file_path, &file_size);
if (!s.ok()) {
Log(db_options_.info_log, "Can't get file size: %s: %s",
file_path.c_str(), s.ToString().c_str());
return;
} else {
if (file_size > 0) {
log_file_size = std::max(log_file_size, file_size);
++log_files_num;
} else {
Status s = env_->DeleteFile(file_path);
if (!s.ok()) {
Log(db_options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
continue;
} else {
MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.erase(number);
}
}
}
}
}
}
if (0 == log_files_num || !size_limit_enabled) {
return;
}
size_t const files_keep_num =
db_options_.WAL_size_limit_MB * 1024 * 1024 / log_file_size;
if (log_files_num <= files_keep_num) {
return;
}
size_t files_del_num = log_files_num - files_keep_num;
VectorLogPtr archived_logs;
GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);
if (files_del_num > archived_logs.size()) {
Log(db_options_.info_log,
"Trying to delete more archived log files than "
"exist. Deleting all");
files_del_num = archived_logs.size();
}
for (size_t i = 0; i < files_del_num; ++i) {
std::string const file_path = archived_logs[i]->PathName();
Status const s = env_->DeleteFile(db_options_.wal_dir + "/" + file_path);
if (!s.ok()) {
Log(db_options_.info_log, "Can't delete file: %s: %s", file_path.c_str(),
s.ToString().c_str());
continue;
} else {
MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.erase(archived_logs[i]->LogNumber());
}
}
}
void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
auto archived_log_name = ArchivedLogFileName(db_options_.wal_dir, number);
// The sync point below is used in (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1");
Status s = env_->RenameFile(fname, archived_log_name);
// The sync point below is used in (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2");
Log(db_options_.info_log, "Move log file %s to %s -- %s\n", fname.c_str(),
archived_log_name.c_str(), s.ToString().c_str());
}
namespace {
struct CompareLogByPointer {
bool operator()(const std::unique_ptr<LogFile>& a,
const std::unique_ptr<LogFile>& b) {
LogFileImpl* a_impl = dynamic_cast<LogFileImpl*>(a.get());
LogFileImpl* b_impl = dynamic_cast<LogFileImpl*>(b.get());
return *a_impl < *b_impl;
}
};
}
Status WalManager::GetSortedWalsOfType(const std::string& path,
VectorLogPtr& log_files,
WalFileType log_type) {
std::vector<std::string> all_files;
const Status status = env_->GetChildren(path, &all_files);
if (!status.ok()) {
return status;
}
log_files.reserve(all_files.size());
for (const auto& f : all_files) {
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kLogFile) {
SequenceNumber sequence;
Status s = ReadFirstRecord(log_type, number, &sequence);
if (!s.ok()) {
return s;
}
if (sequence == 0) {
// empty file
continue;
}
// Reproduce the race condition where a log file is moved
// to archived dir, between these two sync points, used in
// (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1");
TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2");
uint64_t size_bytes;
s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
// re-try in case the alive log file has been moved to archive.
if (!s.ok() && log_type == kAliveLogFile &&
env_->FileExists(ArchivedLogFileName(path, number))) {
s = env_->GetFileSize(ArchivedLogFileName(path, number), &size_bytes);
}
if (!s.ok()) {
return s;
}
log_files.push_back(std::move(std::unique_ptr<LogFile>(
new LogFileImpl(number, log_type, sequence, size_bytes))));
}
}
CompareLogByPointer compare_log_files;
std::sort(log_files.begin(), log_files.end(), compare_log_files);
return status;
}
Status WalManager::RetainProbableWalFiles(VectorLogPtr& all_logs,
const SequenceNumber target) {
int64_t start = 0; // signed to avoid overflow when target is < first file.
int64_t end = static_cast<int64_t>(all_logs.size()) - 1;
// Binary Search. avoid opening all files.
while (end >= start) {
int64_t mid = start + (end - start) / 2; // Avoid overflow.
SequenceNumber current_seq_num = all_logs.at(mid)->StartSequence();
if (current_seq_num == target) {
end = mid;
break;
} else if (current_seq_num < target) {
start = mid + 1;
} else {
end = mid - 1;
}
}
// end could be -ve.
size_t start_index = std::max(static_cast<int64_t>(0), end);
// The last wal file is always included
all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
return Status::OK();
}
Status WalManager::ReadFirstRecord(const WalFileType type,
const uint64_t number,
SequenceNumber* sequence) {
if (type != kAliveLogFile && type != kArchivedLogFile) {
return Status::NotSupported("File Type Not Known " + std::to_string(type));
}
{
MutexLock l(&read_first_record_cache_mutex_);
auto itr = read_first_record_cache_.find(number);
if (itr != read_first_record_cache_.end()) {
*sequence = itr->second;
return Status::OK();
}
}
Status s;
if (type == kAliveLogFile) {
std::string fname = LogFileName(db_options_.wal_dir, number);
s = ReadFirstLine(fname, sequence);
if (env_->FileExists(fname) && !s.ok()) {
// return any error that is not caused by non-existing file
return s;
}
}
if (type == kArchivedLogFile || !s.ok()) {
// check if the file got moved to archive.
std::string archived_file =
ArchivedLogFileName(db_options_.wal_dir, number);
s = ReadFirstLine(archived_file, sequence);
}
if (s.ok() && *sequence != 0) {
MutexLock l(&read_first_record_cache_mutex_);
read_first_record_cache_.insert({number, *sequence});
}
return s;
}
// the function returns status.ok() and sequence == 0 if the file exists, but is
// empty
Status WalManager::ReadFirstLine(const std::string& fname,
SequenceNumber* sequence) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
const char* fname;
Status* status;
bool ignore_error; // true if db_options_.paranoid_checks==false
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->ignore_error ? "(ignoring error) " : ""), fname,
static_cast<int>(bytes), s.ToString().c_str());
if (this->status->ok()) {
// only keep the first error
*this->status = s;
}
}
};
std::unique_ptr<SequentialFile> file;
Status status = env_->NewSequentialFile(fname, &file, env_options_);
if (!status.ok()) {
return status;
}
LogReporter reporter;
reporter.env = env_;
reporter.info_log = db_options_.info_log.get();
reporter.fname = fname.c_str();
reporter.status = &status;
reporter.ignore_error = !db_options_.paranoid_checks;
log::Reader reader(std::move(file), &reporter, true /*checksum*/,
0 /*initial_offset*/);
std::string scratch;
Slice record;
if (reader.ReadRecord(&record, &scratch) &&
(status.ok() || !db_options_.paranoid_checks)) {
if (record.size() < 12) {
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
// TODO read record's till the first no corrupt entry?
} else {
WriteBatch batch;
WriteBatchInternal::SetContents(&batch, record);
*sequence = WriteBatchInternal::Sequence(&batch);
return Status::OK();
}
}
// ReadRecord returns false on EOF, which means that the log file is empty. we
// return status.ok() in that case and set sequence number to 0
*sequence = 0;
return status;
}
#endif // ROCKSDB_LITE
} // namespace rocksdb

@ -0,0 +1,95 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include <deque>
#include <limits>
#include <set>
#include <utility>
#include <vector>
#include <string>
#include <memory>
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/types.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/status.h"
#include "db/version_set.h"
namespace rocksdb {
#ifndef ROCKSDB_LITE
class WalManager {
public:
WalManager(const DBOptions& db_options, const EnvOptions& env_options)
: db_options_(db_options),
env_options_(env_options),
env_(db_options.env),
purge_wal_files_last_run_(0) {}
virtual Status GetSortedWalFiles(VectorLogPtr& files);
virtual Status GetUpdatesSince(
SequenceNumber seq_number, std::unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions& read_options,
VersionSet* version_set);
void PurgeObsoleteWALFiles();
void ArchiveWALFile(const std::string& fname, uint64_t number);
Status TEST_ReadFirstRecord(const WalFileType type, const uint64_t number,
SequenceNumber* sequence) {
return ReadFirstRecord(type, number, sequence);
}
Status TEST_ReadFirstLine(const std::string& fname,
SequenceNumber* sequence) {
return ReadFirstLine(fname, sequence);
}
private:
Status GetSortedWalsOfType(const std::string& path, VectorLogPtr& log_files,
WalFileType type);
// Requires: all_logs should be sorted with earliest log file first
// Retains all log files in all_logs which contain updates with seq no.
// Greater Than or Equal to the requested SequenceNumber.
Status RetainProbableWalFiles(VectorLogPtr& all_logs,
const SequenceNumber target);
Status ReadFirstRecord(const WalFileType type, const uint64_t number,
SequenceNumber* sequence);
Status ReadFirstLine(const std::string& fname, SequenceNumber* sequence);
// ------- state from DBImpl ------
const DBOptions& db_options_;
const EnvOptions& env_options_;
Env* env_;
// ------- WalManager state -------
// cache for ReadFirstRecord() calls
std::unordered_map<uint64_t, SequenceNumber> read_first_record_cache_;
port::Mutex read_first_record_cache_mutex_;
// last time when PurgeObsoleteWALFiles ran.
uint64_t purge_wal_files_last_run_;
// obsolete files will be deleted every this seconds if ttl deletion is
// enabled and archive size_limit is disabled.
static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600;
};
#endif // ROCKSDB_LITE
} // namespace rocksdb

@ -0,0 +1,279 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <map>
#include <string>
#include "rocksdb/cache.h"
#include "rocksdb/write_batch.h"
#include "db/wal_manager.h"
#include "db/log_writer.h"
#include "db/column_family.h"
#include "db/version_set.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "table/mock_table.h"
#include "db/db_impl.h"
namespace rocksdb {
// TODO(icanadi) mock out VersionSet
// TODO(icanadi) move other WalManager-specific tests from db_test here
class WalManagerTest {
public:
WalManagerTest()
: env_(Env::Default()),
dbname_(test::TmpDir() + "/wal_manager_test"),
table_cache_(NewLRUCache(50000, 16, 8)),
current_log_number_(0) {
DestroyDB(dbname_, Options());
}
void Init() {
ASSERT_OK(env_->CreateDirIfMissing(dbname_));
ASSERT_OK(env_->CreateDirIfMissing(ArchivalDirectory(dbname_)));
db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max());
db_options_.wal_dir = dbname_;
versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_controller_));
wal_manager_.reset(new WalManager(db_options_, env_options_));
}
void Reopen() {
wal_manager_.reset(new WalManager(db_options_, env_options_));
}
// NOT thread safe
void Put(const std::string& key, const std::string& value) {
assert(current_log_writer_.get() != nullptr);
uint64_t seq = versions_->LastSequence() + 1;
WriteBatch batch;
batch.Put(key, value);
WriteBatchInternal::SetSequence(&batch, seq);
current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch));
versions_->SetLastSequence(seq);
}
// NOT thread safe
void RollTheLog(bool archived) {
current_log_number_++;
std::string fname = ArchivedLogFileName(dbname_, current_log_number_);
unique_ptr<WritableFile> file;
ASSERT_OK(env_->NewWritableFile(fname, &file, env_options_));
current_log_writer_.reset(new log::Writer(std::move(file)));
}
void CreateArchiveLogs(int num_logs, int entries_per_log) {
for (int i = 1; i <= num_logs; ++i) {
RollTheLog(true);
for (int k = 0; k < entries_per_log; ++k) {
Put(std::to_string(k), std::string(1024, 'a'));
}
}
}
std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
const SequenceNumber seq) {
unique_ptr<TransactionLogIterator> iter;
Status status = wal_manager_->GetUpdatesSince(
seq, &iter, TransactionLogIterator::ReadOptions(), versions_.get());
ASSERT_OK(status);
return std::move(iter);
}
Env* env_;
std::string dbname_;
WriteController write_controller_;
EnvOptions env_options_;
std::shared_ptr<Cache> table_cache_;
DBOptions db_options_;
std::unique_ptr<VersionSet> versions_;
std::unique_ptr<WalManager> wal_manager_;
std::unique_ptr<log::Writer> current_log_writer_;
uint64_t current_log_number_;
};
TEST(WalManagerTest, ReadFirstRecordCache) {
Init();
std::string path = dbname_ + "/000001.log";
unique_ptr<WritableFile> file;
ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions()));
SequenceNumber s;
ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, &s));
ASSERT_EQ(s, 0U);
ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
ASSERT_EQ(s, 0U);
log::Writer writer(std::move(file));
WriteBatch batch;
batch.Put("foo", "bar");
WriteBatchInternal::SetSequence(&batch, 10);
writer.AddRecord(WriteBatchInternal::Contents(&batch));
// TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here.
// Waiting for lei to finish with db_test
// env_->count_sequential_reads_ = true;
// sequential_read_counter_ sanity test
// ASSERT_EQ(env_->sequential_read_counter_.Read(), 0);
ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
ASSERT_EQ(s, 10U);
// did a read
// TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
// ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s));
ASSERT_EQ(s, 10U);
// no new reads since the value is cached
// TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here
// ASSERT_EQ(env_->sequential_read_counter_.Read(), 1);
}
namespace {
uint64_t GetLogDirSize(std::string dir_path, Env* env) {
uint64_t dir_size = 0;
std::vector<std::string> files;
env->GetChildren(dir_path, &files);
for (auto& f : files) {
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kLogFile) {
std::string const file_path = dir_path + "/" + f;
uint64_t file_size;
env->GetFileSize(file_path, &file_size);
dir_size += file_size;
}
}
return dir_size;
}
std::vector<std::uint64_t> ListSpecificFiles(
Env* env, const std::string& path, const FileType expected_file_type) {
std::vector<std::string> files;
std::vector<uint64_t> file_numbers;
env->GetChildren(path, &files);
uint64_t number;
FileType type;
for (size_t i = 0; i < files.size(); ++i) {
if (ParseFileName(files[i], &number, &type)) {
if (type == expected_file_type) {
file_numbers.push_back(number);
}
}
}
return std::move(file_numbers);
}
int CountRecords(TransactionLogIterator* iter) {
int count = 0;
SequenceNumber lastSequence = 0;
BatchResult res;
while (iter->Valid()) {
res = iter->GetBatch();
ASSERT_TRUE(res.sequence > lastSequence);
++count;
lastSequence = res.sequence;
ASSERT_OK(iter->status());
iter->Next();
}
return count;
}
} // namespace
TEST(WalManagerTest, WALArchivalSizeLimit) {
db_options_.WAL_ttl_seconds = 0;
db_options_.WAL_size_limit_MB = 1000;
Init();
// TEST : Create WalManager with huge size limit and no ttl.
// Create some archived files and call PurgeObsoleteWALFiles().
// Count the archived log files that survived.
// Assert that all of them did.
// Change size limit. Re-open WalManager.
// Assert that archive is not greater than WAL_size_limit_MB after
// PurgeObsoleteWALFiles()
// Set ttl and time_to_check_ to small values. Re-open db.
// Assert that there are no archived logs left.
std::string archive_dir = ArchivalDirectory(dbname_);
CreateArchiveLogs(20, 5000);
std::vector<std::uint64_t> log_files =
ListSpecificFiles(env_, archive_dir, kLogFile);
ASSERT_EQ(log_files.size(), 20U);
db_options_.WAL_size_limit_MB = 8;
Reopen();
wal_manager_->PurgeObsoleteWALFiles();
uint64_t archive_size = GetLogDirSize(archive_dir, env_);
ASSERT_TRUE(archive_size <= db_options_.WAL_size_limit_MB * 1024 * 1024);
db_options_.WAL_ttl_seconds = 1;
env_->SleepForMicroseconds(2 * 1000 * 1000);
Reopen();
wal_manager_->PurgeObsoleteWALFiles();
log_files = ListSpecificFiles(env_, archive_dir, kLogFile);
ASSERT_TRUE(log_files.empty());
}
TEST(WalManagerTest, WALArchivalTtl) {
db_options_.WAL_ttl_seconds = 1000;
Init();
// TEST : Create WalManager with a ttl and no size limit.
// Create some archived log files and call PurgeObsoleteWALFiles().
// Assert that files are not deleted
// Reopen db with small ttl.
// Assert that all archived logs was removed.
std::string archive_dir = ArchivalDirectory(dbname_);
CreateArchiveLogs(20, 5000);
std::vector<uint64_t> log_files =
ListSpecificFiles(env_, archive_dir, kLogFile);
ASSERT_GT(log_files.size(), 0U);
db_options_.WAL_ttl_seconds = 1;
env_->SleepForMicroseconds(3 * 1000 * 1000);
Reopen();
wal_manager_->PurgeObsoleteWALFiles();
log_files = ListSpecificFiles(env_, archive_dir, kLogFile);
ASSERT_TRUE(log_files.empty());
}
TEST(WalManagerTest, TransactionLogIteratorMoveOverZeroFiles) {
Init();
RollTheLog(false);
Put("key1", std::string(1024, 'a'));
// Create a zero record WAL file.
RollTheLog(false);
RollTheLog(false);
Put("key2", std::string(1024, 'a'));
auto iter = OpenTransactionLogIter(0);
ASSERT_EQ(2, CountRecords(iter.get()));
}
TEST(WalManagerTest, TransactionLogIteratorJustEmptyFile) {
Init();
RollTheLog(false);
auto iter = OpenTransactionLogIter(0);
// Check that an empty iterator is returned
ASSERT_TRUE(!iter->Valid());
}
} // namespace rocksdb
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }
Loading…
Cancel
Save