diff --git a/Makefile b/Makefile index 52019a17f..8642834b8 100644 --- a/Makefile +++ b/Makefile @@ -146,7 +146,8 @@ TESTS = \ cuckoo_table_reader_test \ cuckoo_table_db_test \ write_batch_with_index_test \ - flush_job_test + flush_job_test \ + wal_manager_test TOOLS = \ sst_dump \ @@ -421,6 +422,9 @@ write_batch_with_index_test: utilities/write_batch_with_index/write_batch_with_i flush_job_test: db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +wal_manager_test: db/wal_manager_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) db/wal_manager_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + dbformat_test: db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index eeee99c1b..48819e766 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -132,57 +132,8 @@ Status DBImpl::GetLiveFiles(std::vector& ret, } Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) { - // First get sorted files in db dir, then get sorted files from archived - // dir, to avoid a race condition where a log file is moved to archived - // dir in between. - Status s; - // list wal files in main db dir. - VectorLogPtr logs; - s = GetSortedWalsOfType(db_options_.wal_dir, logs, kAliveLogFile); - if (!s.ok()) { - return s; - } - - // Reproduce the race condition where a log file is moved - // to archived dir, between these two sync points, used in - // (DBTest,TransactionLogIteratorRace) - TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:1"); - TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:2"); - - files.clear(); - // list wal files in archive dir. - std::string archivedir = ArchivalDirectory(db_options_.wal_dir); - if (env_->FileExists(archivedir)) { - s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile); - if (!s.ok()) { - return s; - } - } - - uint64_t latest_archived_log_number = 0; - if (!files.empty()) { - latest_archived_log_number = files.back()->LogNumber(); - Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "Latest Archived log: %" PRIu64, latest_archived_log_number); - } - - files.reserve(files.size() + logs.size()); - for (auto& log : logs) { - if (log->LogNumber() > latest_archived_log_number) { - files.push_back(std::move(log)); - } else { - // When the race condition happens, we could see the - // same log in both db dir and archived dir. Simply - // ignore the one in db dir. Note that, if we read - // archived dir first, we would have missed the log file. - Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, - "%s already moved to archive", log->PathName().c_str()); - } - } - - return s; + return wal_manager_.GetSortedWalFiles(files); } - } #endif // ROCKSDB_LITE diff --git a/db/db_impl.cc b/db/db_impl.cc index 345188703..78fb4ce13 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -342,11 +342,12 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) manual_compaction_(nullptr), disable_delete_obsolete_files_(0), delete_obsolete_files_last_run_(options.env->NowMicros()), - purge_wal_files_last_run_(0), last_stats_dump_time_microsec_(0), - default_interval_to_delete_obsolete_WAL_(600), flush_on_destroy_(false), env_options_(options), +#ifndef ROCKSDB_LITE + wal_manager_(db_options_, env_options_), +#endif // ROCKSDB_LITE bg_work_gate_closed_(false), refitting_level_(false), opened_successfully_(false) { @@ -738,23 +739,20 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { db_options_.wal_dir : dbname_) + "/" + to_delete; } - if (type == kLogFile && - (db_options_.WAL_ttl_seconds > 0 || - db_options_.WAL_size_limit_MB > 0)) { - auto archived_log_name = ArchivedLogFileName(db_options_.wal_dir, number); - // The sync point below is used in (DBTest,TransactionLogIteratorRace) - TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:1"); - Status s = env_->RenameFile(fname, archived_log_name); - // The sync point below is used in (DBTest,TransactionLogIteratorRace) - TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:2"); - Log(db_options_.info_log, - "Move log file %s to %s -- %s\n", - fname.c_str(), archived_log_name.c_str(), s.ToString().c_str()); +#ifdef ROCKSDB_LITE + Status s = env_->DeleteFile(fname); + Log(db_options_.info_log, "Delete %s type=%d #%" PRIu64 " -- %s\n", + fname.c_str(), type, number, s.ToString().c_str()); +#else // not ROCKSDB_LITE + if (type == kLogFile && (db_options_.WAL_ttl_seconds > 0 || + db_options_.WAL_size_limit_MB > 0)) { + wal_manager_.ArchiveWALFile(fname, number); } else { Status s = env_->DeleteFile(fname); Log(db_options_.info_log, "Delete %s type=%d #%" PRIu64 " -- %s\n", fname.c_str(), type, number, s.ToString().c_str()); } +#endif // ROCKSDB_LITE } // Delete old info log files. @@ -775,7 +773,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { } } } - PurgeObsoleteWALFiles(); +#ifndef ROCKSDB_LITE + wal_manager_.PurgeObsoleteWALFiles(); +#endif // ROCKSDB_LITE LogFlush(db_options_.info_log); } @@ -788,324 +788,6 @@ void DBImpl::DeleteObsoleteFiles() { } } -#ifndef ROCKSDB_LITE -// 1. Go through all archived files and -// a. if ttl is enabled, delete outdated files -// b. if archive size limit is enabled, delete empty files, -// compute file number and size. -// 2. If size limit is enabled: -// a. compute how many files should be deleted -// b. get sorted non-empty archived logs -// c. delete what should be deleted -void DBImpl::PurgeObsoleteWALFiles() { - bool const ttl_enabled = db_options_.WAL_ttl_seconds > 0; - bool const size_limit_enabled = db_options_.WAL_size_limit_MB > 0; - if (!ttl_enabled && !size_limit_enabled) { - return; - } - - int64_t current_time; - Status s = env_->GetCurrentTime(¤t_time); - if (!s.ok()) { - Log(db_options_.info_log, "Can't get current time: %s", - s.ToString().c_str()); - assert(false); - return; - } - uint64_t const now_seconds = static_cast(current_time); - uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled) ? - db_options_.WAL_ttl_seconds / 2 : default_interval_to_delete_obsolete_WAL_; - - if (purge_wal_files_last_run_ + time_to_check > now_seconds) { - return; - } - - purge_wal_files_last_run_ = now_seconds; - - std::string archival_dir = ArchivalDirectory(db_options_.wal_dir); - std::vector files; - s = env_->GetChildren(archival_dir, &files); - if (!s.ok()) { - Log(db_options_.info_log, "Can't get archive files: %s", - s.ToString().c_str()); - assert(false); - return; - } - - size_t log_files_num = 0; - uint64_t log_file_size = 0; - - for (auto& f : files) { - uint64_t number; - FileType type; - if (ParseFileName(f, &number, &type) && type == kLogFile) { - std::string const file_path = archival_dir + "/" + f; - if (ttl_enabled) { - uint64_t file_m_time; - Status const s = env_->GetFileModificationTime(file_path, - &file_m_time); - if (!s.ok()) { - Log(db_options_.info_log, "Can't get file mod time: %s: %s", - file_path.c_str(), s.ToString().c_str()); - continue; - } - if (now_seconds - file_m_time > db_options_.WAL_ttl_seconds) { - Status const s = env_->DeleteFile(file_path); - if (!s.ok()) { - Log(db_options_.info_log, "Can't delete file: %s: %s", - file_path.c_str(), s.ToString().c_str()); - continue; - } else { - MutexLock l(&read_first_record_cache_mutex_); - read_first_record_cache_.erase(number); - } - continue; - } - } - - if (size_limit_enabled) { - uint64_t file_size; - Status const s = env_->GetFileSize(file_path, &file_size); - if (!s.ok()) { - Log(db_options_.info_log, "Can't get file size: %s: %s", - file_path.c_str(), s.ToString().c_str()); - return; - } else { - if (file_size > 0) { - log_file_size = std::max(log_file_size, file_size); - ++log_files_num; - } else { - Status s = env_->DeleteFile(file_path); - if (!s.ok()) { - Log(db_options_.info_log, "Can't delete file: %s: %s", - file_path.c_str(), s.ToString().c_str()); - continue; - } else { - MutexLock l(&read_first_record_cache_mutex_); - read_first_record_cache_.erase(number); - } - } - } - } - } - } - - if (0 == log_files_num || !size_limit_enabled) { - return; - } - - size_t const files_keep_num = db_options_.WAL_size_limit_MB * - 1024 * 1024 / log_file_size; - if (log_files_num <= files_keep_num) { - return; - } - - size_t files_del_num = log_files_num - files_keep_num; - VectorLogPtr archived_logs; - GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile); - - if (files_del_num > archived_logs.size()) { - Log(db_options_.info_log, "Trying to delete more archived log files than " - "exist. Deleting all"); - files_del_num = archived_logs.size(); - } - - for (size_t i = 0; i < files_del_num; ++i) { - std::string const file_path = archived_logs[i]->PathName(); - Status const s = DeleteFile(file_path); - if (!s.ok()) { - Log(db_options_.info_log, "Can't delete file: %s: %s", - file_path.c_str(), s.ToString().c_str()); - continue; - } else { - MutexLock l(&read_first_record_cache_mutex_); - read_first_record_cache_.erase(archived_logs[i]->LogNumber()); - } - } -} - -namespace { -struct CompareLogByPointer { - bool operator()(const unique_ptr& a, const unique_ptr& b) { - LogFileImpl* a_impl = dynamic_cast(a.get()); - LogFileImpl* b_impl = dynamic_cast(b.get()); - return *a_impl < *b_impl; - } -}; -} - -Status DBImpl::GetSortedWalsOfType(const std::string& path, - VectorLogPtr& log_files, - WalFileType log_type) { - std::vector all_files; - const Status status = env_->GetChildren(path, &all_files); - if (!status.ok()) { - return status; - } - log_files.reserve(all_files.size()); - for (const auto& f : all_files) { - uint64_t number; - FileType type; - if (ParseFileName(f, &number, &type) && type == kLogFile) { - SequenceNumber sequence; - Status s = ReadFirstRecord(log_type, number, &sequence); - if (!s.ok()) { - return s; - } - if (sequence == 0) { - // empty file - continue; - } - - // Reproduce the race condition where a log file is moved - // to archived dir, between these two sync points, used in - // (DBTest,TransactionLogIteratorRace) - TEST_SYNC_POINT("DBImpl::GetSortedWalsOfType:1"); - TEST_SYNC_POINT("DBImpl::GetSortedWalsOfType:2"); - - uint64_t size_bytes; - s = env_->GetFileSize(LogFileName(path, number), &size_bytes); - // re-try in case the alive log file has been moved to archive. - if (!s.ok() && log_type == kAliveLogFile && - env_->FileExists(ArchivedLogFileName(path, number))) { - s = env_->GetFileSize(ArchivedLogFileName(path, number), &size_bytes); - } - if (!s.ok()) { - return s; - } - - log_files.push_back(std::move(unique_ptr( - new LogFileImpl(number, log_type, sequence, size_bytes)))); - } - } - CompareLogByPointer compare_log_files; - std::sort(log_files.begin(), log_files.end(), compare_log_files); - return status; -} - -Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs, - const SequenceNumber target) { - int64_t start = 0; // signed to avoid overflow when target is < first file. - int64_t end = static_cast(all_logs.size()) - 1; - // Binary Search. avoid opening all files. - while (end >= start) { - int64_t mid = start + (end - start) / 2; // Avoid overflow. - SequenceNumber current_seq_num = all_logs.at(mid)->StartSequence(); - if (current_seq_num == target) { - end = mid; - break; - } else if (current_seq_num < target) { - start = mid + 1; - } else { - end = mid - 1; - } - } - // end could be -ve. - size_t start_index = std::max(static_cast(0), end); - // The last wal file is always included - all_logs.erase(all_logs.begin(), all_logs.begin() + start_index); - return Status::OK(); -} - -Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number, - SequenceNumber* sequence) { - if (type != kAliveLogFile && type != kArchivedLogFile) { - return Status::NotSupported("File Type Not Known " + std::to_string(type)); - } - { - MutexLock l(&read_first_record_cache_mutex_); - auto itr = read_first_record_cache_.find(number); - if (itr != read_first_record_cache_.end()) { - *sequence = itr->second; - return Status::OK(); - } - } - Status s; - if (type == kAliveLogFile) { - std::string fname = LogFileName(db_options_.wal_dir, number); - s = ReadFirstLine(fname, sequence); - if (env_->FileExists(fname) && !s.ok()) { - // return any error that is not caused by non-existing file - return s; - } - } - - if (type == kArchivedLogFile || !s.ok()) { - // check if the file got moved to archive. - std::string archived_file = - ArchivedLogFileName(db_options_.wal_dir, number); - s = ReadFirstLine(archived_file, sequence); - } - - if (s.ok() && *sequence != 0) { - MutexLock l(&read_first_record_cache_mutex_); - read_first_record_cache_.insert({number, *sequence}); - } - return s; -} - -// the function returns status.ok() and sequence == 0 if the file exists, but is -// empty -Status DBImpl::ReadFirstLine(const std::string& fname, - SequenceNumber* sequence) { - struct LogReporter : public log::Reader::Reporter { - Env* env; - Logger* info_log; - const char* fname; - - Status* status; - bool ignore_error; // true if db_options_.paranoid_checks==false - virtual void Corruption(size_t bytes, const Status& s) { - Log(info_log, "%s%s: dropping %d bytes; %s", - (this->ignore_error ? "(ignoring error) " : ""), fname, - static_cast(bytes), s.ToString().c_str()); - if (this->status->ok()) { - // only keep the first error - *this->status = s; - } - } - }; - - unique_ptr file; - Status status = env_->NewSequentialFile(fname, &file, env_options_); - - if (!status.ok()) { - return status; - } - - LogReporter reporter; - reporter.env = env_; - reporter.info_log = db_options_.info_log.get(); - reporter.fname = fname.c_str(); - reporter.status = &status; - reporter.ignore_error = !db_options_.paranoid_checks; - log::Reader reader(std::move(file), &reporter, true /*checksum*/, - 0 /*initial_offset*/); - std::string scratch; - Slice record; - - if (reader.ReadRecord(&record, &scratch) && - (status.ok() || !db_options_.paranoid_checks)) { - if (record.size() < 12) { - reporter.Corruption(record.size(), - Status::Corruption("log record too small")); - // TODO read record's till the first no corrupt entry? - } else { - WriteBatch batch; - WriteBatchInternal::SetContents(&batch, record); - *sequence = WriteBatchInternal::Sequence(&batch); - return Status::OK(); - } - } - - // ReadRecord returns false on EOF, which means that the log file is empty. we - // return status.ok() in that case and set sequence number to 0 - *sequence = 0; - return status; -} - -#endif // ROCKSDB_LITE - Status DBImpl::Recover( const std::vector& column_families, bool read_only, bool error_if_log_file_exist) { @@ -4304,23 +3986,7 @@ Status DBImpl::GetUpdatesSince( if (seq > versions_->LastSequence()) { return Status::NotFound("Requested sequence not yet written in the db"); } - // Get all sorted Wal Files. - // Do binary search and open files and find the seq number. - - std::unique_ptr wal_files(new VectorLogPtr); - Status s = GetSortedWalFiles(*wal_files); - if (!s.ok()) { - return s; - } - - s = RetainProbableWalFiles(*wal_files, seq); - if (!s.ok()) { - return s; - } - iter->reset(new TransactionLogIteratorImpl(db_options_.wal_dir, &db_options_, - read_options, env_options_, - seq, std::move(wal_files), this)); - return (*iter)->status(); + return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get()); } Status DBImpl::DeleteFile(std::string name) { diff --git a/db/db_impl.h b/db/db_impl.h index 15205d90b..547a85da5 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -21,6 +21,7 @@ #include "db/snapshot.h" #include "db/column_family.h" #include "db/version_edit.h" +#include "db/wal_manager.h" #include "memtable_list.h" #include "port/port.h" #include "rocksdb/db.h" @@ -193,25 +194,12 @@ class DBImpl : public DB { // Return the current manifest file no. uint64_t TEST_Current_Manifest_FileNo(); - // Trigger's a background call for testing. - void TEST_PurgeObsoleteteWAL(); - // get total level0 file size. Only for testing. uint64_t TEST_GetLevel0TotalSize(); - void TEST_SetDefaultTimeToCheck(uint64_t default_interval_to_delete_obsolete_WAL) - { - default_interval_to_delete_obsolete_WAL_ = default_interval_to_delete_obsolete_WAL; - } - void TEST_GetFilesMetaData(ColumnFamilyHandle* column_family, std::vector>* metadata); - Status TEST_ReadFirstRecord(const WalFileType type, const uint64_t number, - SequenceNumber* sequence); - - Status TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence); - void TEST_LockMutex(); void TEST_UnlockMutex(); @@ -355,30 +343,6 @@ class DBImpl : public DB { void AllocateCompactionOutputFileNumbers(CompactionState* compact); void ReleaseCompactionUnusedFileNumbers(CompactionState* compact); -#ifdef ROCKSDB_LITE - void PurgeObsoleteWALFiles() { - // this function is used for archiving WAL files. we don't need this in - // ROCKSDB_LITE - } -#else - void PurgeObsoleteWALFiles(); - - Status GetSortedWalsOfType(const std::string& path, - VectorLogPtr& log_files, - WalFileType type); - - // Requires: all_logs should be sorted with earliest log file first - // Retains all log files in all_logs which contain updates with seq no. - // Greater Than or Equal to the requested SequenceNumber. - Status RetainProbableWalFiles(VectorLogPtr& all_logs, - const SequenceNumber target); - - Status ReadFirstRecord(const WalFileType type, const uint64_t number, - SequenceNumber* sequence); - - Status ReadFirstLine(const std::string& fname, SequenceNumber* sequence); -#endif // ROCKSDB_LITE - void PrintStatistics(); // dump rocksdb.stats to LOG @@ -453,10 +417,6 @@ class DBImpl : public DB { SnapshotList snapshots_; - // cache for ReadFirstRecord() calls - std::unordered_map read_first_record_cache_; - port::Mutex read_first_record_cache_mutex_; - // Set of table files to protect from deletion because they are // part of ongoing compactions. // map from pending file number ID to their path IDs. @@ -506,16 +466,9 @@ class DBImpl : public DB { // last time when DeleteObsoleteFiles was invoked uint64_t delete_obsolete_files_last_run_; - // last time when PurgeObsoleteWALFiles ran. - uint64_t purge_wal_files_last_run_; - // last time stats were dumped to LOG std::atomic last_stats_dump_time_microsec_; - // obsolete files will be deleted every this seconds if ttl deletion is - // enabled and archive size_limit is disabled. - uint64_t default_interval_to_delete_obsolete_WAL_; - bool flush_on_destroy_; // Used when disableWAL is true. static const int KEEP_LOG_FILE_NUM = 1000; @@ -524,6 +477,10 @@ class DBImpl : public DB { // The options to access storage files const EnvOptions env_options_; +#ifndef ROCKSDB_LITE + WalManager wal_manager_; +#endif // ROCKSDB_LITE + // A value of true temporarily disables scheduling of background work bool bg_work_gate_closed_; diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index a7be59313..2d67167ba 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -13,8 +13,6 @@ namespace rocksdb { -void DBImpl::TEST_PurgeObsoleteteWAL() { PurgeObsoleteWALFiles(); } - uint64_t DBImpl::TEST_GetLevel0TotalSize() { MutexLock l(&mutex_); return default_cf_handle_->cfd()->current()->GetStorageInfo()->NumLevelBytes( @@ -122,17 +120,6 @@ Status DBImpl::TEST_WaitForCompact() { return bg_error_; } -Status DBImpl::TEST_ReadFirstRecord(const WalFileType type, - const uint64_t number, - SequenceNumber* sequence) { - return ReadFirstRecord(type, number, sequence); -} - -Status DBImpl::TEST_ReadFirstLine(const std::string& fname, - SequenceNumber* sequence) { - return ReadFirstLine(fname, sequence); -} - void DBImpl::TEST_LockMutex() { mutex_.Lock(); } diff --git a/db/db_test.cc b/db/db_test.cc index 477c6c812..59b611c65 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -6438,10 +6438,6 @@ std::vector ListSpecificFiles( return std::move(file_numbers); } -std::vector ListLogFiles(Env* env, const std::string& path) { - return ListSpecificFiles(env, path, kLogFile); -} - std::vector ListTableFiles(Env* env, const std::string& path) { return ListSpecificFiles(env, path, kTableFile); } @@ -6593,114 +6589,6 @@ TEST(DBTest, RecoverCheckFileAmount) { } } -TEST(DBTest, WALArchivalTtl) { - do { - Options options = CurrentOptions(); - options.create_if_missing = true; - options.WAL_ttl_seconds = 1000; - DestroyAndReopen(options); - - // TEST : Create DB with a ttl and no size limit. - // Put some keys. Count the log files present in the DB just after insert. - // Re-open db. Causes deletion/archival to take place. - // Assert that the files moved under "/archive". - // Reopen db with small ttl. - // Assert that archive was removed. - - std::string archiveDir = ArchivalDirectory(dbname_); - - for (int i = 0; i < 10; ++i) { - for (int j = 0; j < 10; ++j) { - ASSERT_OK(Put(Key(10 * i + j), DummyString(1024))); - } - - std::vector log_files = ListLogFiles(env_, dbname_); - - options.create_if_missing = false; - Reopen(options); - - std::vector logs = ListLogFiles(env_, archiveDir); - std::set archivedFiles(logs.begin(), logs.end()); - - for (auto& log : log_files) { - ASSERT_TRUE(archivedFiles.find(log) != archivedFiles.end()); - } - } - - std::vector log_files = ListLogFiles(env_, archiveDir); - ASSERT_TRUE(log_files.size() > 0); - - options.WAL_ttl_seconds = 1; - env_->SleepForMicroseconds(2 * 1000 * 1000); - Reopen(options); - - log_files = ListLogFiles(env_, archiveDir); - ASSERT_TRUE(log_files.empty()); - } while (ChangeCompactOptions()); -} - -namespace { -uint64_t GetLogDirSize(std::string dir_path, SpecialEnv* env) { - uint64_t dir_size = 0; - std::vector files; - env->GetChildren(dir_path, &files); - for (auto& f : files) { - uint64_t number; - FileType type; - if (ParseFileName(f, &number, &type) && type == kLogFile) { - std::string const file_path = dir_path + "/" + f; - uint64_t file_size; - env->GetFileSize(file_path, &file_size); - dir_size += file_size; - } - } - return dir_size; -} -} // namespace - -TEST(DBTest, WALArchivalSizeLimit) { - do { - Options options = CurrentOptions(); - options.create_if_missing = true; - options.WAL_ttl_seconds = 0; - options.WAL_size_limit_MB = 1000; - - // TEST : Create DB with huge size limit and no ttl. - // Put some keys. Count the archived log files present in the DB - // just after insert. Assert that there are many enough. - // Change size limit. Re-open db. - // Assert that archive is not greater than WAL_size_limit_MB. - // Set ttl and time_to_check_ to small values. Re-open db. - // Assert that there are no archived logs left. - - DestroyAndReopen(options); - for (int i = 0; i < 128 * 128; ++i) { - ASSERT_OK(Put(Key(i), DummyString(1024))); - } - Reopen(options); - - std::string archive_dir = ArchivalDirectory(dbname_); - std::vector log_files = ListLogFiles(env_, archive_dir); - ASSERT_TRUE(log_files.size() > 2); - - options.WAL_size_limit_MB = 8; - Reopen(options); - dbfull()->TEST_PurgeObsoleteteWAL(); - - uint64_t archive_size = GetLogDirSize(archive_dir, env_); - ASSERT_TRUE(archive_size <= options.WAL_size_limit_MB * 1024 * 1024); - - options.WAL_ttl_seconds = 1; - dbfull()->TEST_SetDefaultTimeToCheck(1); - env_->SleepForMicroseconds(2 * 1000 * 1000); - Reopen(options); - dbfull()->TEST_PurgeObsoleteteWAL(); - - log_files = ListLogFiles(env_, archive_dir); - ASSERT_TRUE(log_files.empty()); - } while (ChangeCompactOptions()); -} - TEST(DBTest, PurgeInfoLogs) { Options options = CurrentOptions(); options.keep_log_file_num = 5; @@ -6804,11 +6692,13 @@ TEST(DBTest, TransactionLogIterator) { #ifndef NDEBUG // sync point is not included with DNDEBUG build TEST(DBTest, TransactionLogIteratorRace) { static const int LOG_ITERATOR_RACE_TEST_COUNT = 2; - static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = - { { "DBImpl::GetSortedWalFiles:1", "DBImpl::PurgeObsoleteFiles:1", - "DBImpl::PurgeObsoleteFiles:2", "DBImpl::GetSortedWalFiles:2" }, - { "DBImpl::GetSortedWalsOfType:1", "DBImpl::PurgeObsoleteFiles:1", - "DBImpl::PurgeObsoleteFiles:2", "DBImpl::GetSortedWalsOfType:2" }}; + static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = { + {"WalManager::GetSortedWalFiles:1", "WalManager::PurgeObsoleteFiles:1", + "WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"}, + {"WalManager::GetSortedWalsOfType:1", + "WalManager::PurgeObsoleteFiles:1", + "WalManager::PurgeObsoleteFiles:2", + "WalManager::GetSortedWalsOfType:2"}}; for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) { // Setup sync point dependency to reproduce the race condition of // a log file moved to archived dir, in the middle of GetSortedWalFiles @@ -6856,24 +6746,6 @@ TEST(DBTest, TransactionLogIteratorRace) { } #endif -TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) { - do { - Options options = OptionsForLogIterTest(); - DestroyAndReopen(options); - CreateAndReopenWithCF({"pikachu"}, options); - // Do a plain Reopen. - Put(1, "key1", DummyString(1024)); - // Two reopens should create a zero record WAL file. - ReopenWithColumnFamilies({"default", "pikachu"}, options); - ReopenWithColumnFamilies({"default", "pikachu"}, options); - - Put(1, "key2", DummyString(1024)); - - auto iter = OpenTransactionLogIter(0); - ExpectRecords(2, iter); - } while (ChangeCompactOptions()); -} - TEST(DBTest, TransactionLogIteratorStallAtLastRecord) { do { Options options = OptionsForLogIterTest(); @@ -6892,17 +6764,6 @@ TEST(DBTest, TransactionLogIteratorStallAtLastRecord) { } while (ChangeCompactOptions()); } -TEST(DBTest, TransactionLogIteratorJustEmptyFile) { - do { - Options options = OptionsForLogIterTest(); - DestroyAndReopen(options); - unique_ptr iter; - Status status = dbfull()->GetUpdatesSince(0, &iter); - // Check that an empty iterator is returned - ASSERT_TRUE(!iter->Valid()); - } while (ChangeCompactOptions()); -} - TEST(DBTest, TransactionLogIteratorCheckAfterRestart) { do { Options options = OptionsForLogIterTest(); @@ -7013,44 +6874,6 @@ TEST(DBTest, TransactionLogIteratorBlobs) { handler.seen); } -TEST(DBTest, ReadFirstRecordCache) { - Options options = CurrentOptions(); - options.env = env_; - options.create_if_missing = true; - DestroyAndReopen(options); - - std::string path = dbname_ + "/000001.log"; - unique_ptr file; - ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions())); - - SequenceNumber s; - ASSERT_OK(dbfull()->TEST_ReadFirstLine(path, &s)); - ASSERT_EQ(s, 0U); - - ASSERT_OK(dbfull()->TEST_ReadFirstRecord(kAliveLogFile, 1, &s)); - ASSERT_EQ(s, 0U); - - log::Writer writer(std::move(file)); - WriteBatch batch; - batch.Put("foo", "bar"); - WriteBatchInternal::SetSequence(&batch, 10); - writer.AddRecord(WriteBatchInternal::Contents(&batch)); - - env_->count_sequential_reads_ = true; - // sequential_read_counter_ sanity test - ASSERT_EQ(env_->sequential_read_counter_.Read(), 0); - - ASSERT_OK(dbfull()->TEST_ReadFirstRecord(kAliveLogFile, 1, &s)); - ASSERT_EQ(s, 10U); - // did a read - ASSERT_EQ(env_->sequential_read_counter_.Read(), 1); - - ASSERT_OK(dbfull()->TEST_ReadFirstRecord(kAliveLogFile, 1, &s)); - ASSERT_EQ(s, 10U); - // no new reads since the value is cached - ASSERT_EQ(env_->sequential_read_counter_.Read(), 1); -} - // Multi-threaded test: namespace { diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index bfcf7b328..6fc9fbaae 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -4,6 +4,11 @@ // of patent rights can be found in the PATENTS file in the same directory. #ifndef ROCKSDB_LITE +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include #include "db/transaction_log_impl.h" #include "db/write_batch_internal.h" @@ -13,7 +18,7 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( const std::string& dir, const DBOptions* options, const TransactionLogIterator::ReadOptions& read_options, const EnvOptions& soptions, const SequenceNumber seq, - std::unique_ptr files, DBImpl const* const dbimpl) + std::unique_ptr files, VersionSet const* const versions) : dir_(dir), options_(options), read_options_(read_options), @@ -25,9 +30,9 @@ TransactionLogIteratorImpl::TransactionLogIteratorImpl( currentFileIndex_(0), currentBatchSeq_(0), currentLastSeq_(0), - dbimpl_(dbimpl) { + versions_(versions) { assert(files_ != nullptr); - assert(dbimpl_ != nullptr); + assert(versions_ != nullptr); reporter_.env = options_->env; reporter_.info_log = options_->info_log.get(); @@ -74,7 +79,7 @@ bool TransactionLogIteratorImpl::RestrictedRead( Slice* record, std::string* scratch) { // Don't read if no more complete entries to read from logs - if (currentLastSeq_ >= dbimpl_->GetLatestSequenceNumber()) { + if (currentLastSeq_ >= versions_->LastSequence()) { return false; } return currentLogReader_->ReadRecord(record, scratch); @@ -185,7 +190,7 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) { } } else { isValid_ = false; - if (currentLastSeq_ == dbimpl_->GetLatestSequenceNumber()) { + if (currentLastSeq_ == versions_->LastSequence()) { currentStatus_ = Status::OK(); } else { currentStatus_ = Status::Corruption("NO MORE DATA LEFT"); @@ -203,12 +208,10 @@ bool TransactionLogIteratorImpl::IsBatchExpected( if (batchSeq != expectedSeq) { char buf[200]; snprintf(buf, sizeof(buf), - "Discontinuity in log records. Got seq=%lu, Expected seq=%lu, " - "Last flushed seq=%lu.Log iterator will reseek the correct " - "batch.", - (unsigned long)batchSeq, - (unsigned long)expectedSeq, - (unsigned long)dbimpl_->GetLatestSequenceNumber()); + "Discontinuity in log records. Got seq=%" PRIu64 + ", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64 + ".Log iterator will reseek the correct batch.", + batchSeq, expectedSeq, versions_->LastSequence()); reporter_.Info(buf); return false; } @@ -240,7 +243,7 @@ void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) { currentLastSeq_ = currentBatchSeq_ + WriteBatchInternal::Count(batch.get()) - 1; // currentBatchSeq_ can only change here - assert(currentLastSeq_ <= dbimpl_->GetLatestSequenceNumber()); + assert(currentLastSeq_ <= versions_->LastSequence()); currentBatch_ = move(batch); isValid_ = true; diff --git a/db/transaction_log_impl.h b/db/transaction_log_impl.h index 1c7ab78d9..a0b7c9d3c 100644 --- a/db/transaction_log_impl.h +++ b/db/transaction_log_impl.h @@ -11,7 +11,7 @@ #include "rocksdb/options.h" #include "rocksdb/types.h" #include "rocksdb/transaction_log.h" -#include "db/db_impl.h" +#include "db/version_set.h" #include "db/log_reader.h" #include "db/filename.h" @@ -73,7 +73,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { const std::string& dir, const DBOptions* options, const TransactionLogIterator::ReadOptions& read_options, const EnvOptions& soptions, const SequenceNumber seqNum, - std::unique_ptr files, DBImpl const* const dbimpl); + std::unique_ptr files, VersionSet const* const versions); virtual bool Valid(); @@ -100,7 +100,9 @@ class TransactionLogIteratorImpl : public TransactionLogIterator { LogReporter reporter_; SequenceNumber currentBatchSeq_; // sequence number at start of current batch SequenceNumber currentLastSeq_; // last sequence in the current batch - DBImpl const * const dbimpl_; // The db on whose log files this iterates + // Used only to get latest seq. num + // TODO(icanadi) can this be just a callback? + VersionSet const* const versions_; // Reads from transaction log only if the writebatch record has been written bool RestrictedRead(Slice* record, std::string* scratch); diff --git a/db/wal_manager.cc b/db/wal_manager.cc new file mode 100644 index 000000000..c08b3b220 --- /dev/null +++ b/db/wal_manager.cc @@ -0,0 +1,445 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/wal_manager.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include +#include +#include +#include + +#include "db/filename.h" +#include "db/transaction_log_impl.h" +#include "db/log_reader.h" +#include "db/log_writer.h" +#include "db/write_batch_internal.h" +#include "port/port.h" +#include "rocksdb/env.h" +#include "rocksdb/options.h" +#include "rocksdb/write_batch.h" +#include "util/coding.h" +#include "util/logging.h" +#include "util/mutexlock.h" +#include "util/sync_point.h" + +namespace rocksdb { + +#ifndef ROCKSDB_LITE + +Status WalManager::GetSortedWalFiles(VectorLogPtr& files) { + // First get sorted files in db dir, then get sorted files from archived + // dir, to avoid a race condition where a log file is moved to archived + // dir in between. + Status s; + // list wal files in main db dir. + VectorLogPtr logs; + s = GetSortedWalsOfType(db_options_.wal_dir, logs, kAliveLogFile); + if (!s.ok()) { + return s; + } + + // Reproduce the race condition where a log file is moved + // to archived dir, between these two sync points, used in + // (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1"); + TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2"); + + files.clear(); + // list wal files in archive dir. + std::string archivedir = ArchivalDirectory(db_options_.wal_dir); + if (env_->FileExists(archivedir)) { + s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile); + if (!s.ok()) { + return s; + } + } + + uint64_t latest_archived_log_number = 0; + if (!files.empty()) { + latest_archived_log_number = files.back()->LogNumber(); + Log(db_options_.info_log, "Latest Archived log: %" PRIu64, + latest_archived_log_number); + } + + files.reserve(files.size() + logs.size()); + for (auto& log : logs) { + if (log->LogNumber() > latest_archived_log_number) { + files.push_back(std::move(log)); + } else { + // When the race condition happens, we could see the + // same log in both db dir and archived dir. Simply + // ignore the one in db dir. Note that, if we read + // archived dir first, we would have missed the log file. + Log(db_options_.info_log, "%s already moved to archive", + log->PathName().c_str()); + } + } + + return s; +} + +Status WalManager::GetUpdatesSince( + SequenceNumber seq, std::unique_ptr* iter, + const TransactionLogIterator::ReadOptions& read_options, + VersionSet* version_set) { + + // Get all sorted Wal Files. + // Do binary search and open files and find the seq number. + + std::unique_ptr wal_files(new VectorLogPtr); + Status s = GetSortedWalFiles(*wal_files); + if (!s.ok()) { + return s; + } + + s = RetainProbableWalFiles(*wal_files, seq); + if (!s.ok()) { + return s; + } + iter->reset(new TransactionLogIteratorImpl( + db_options_.wal_dir, &db_options_, read_options, env_options_, seq, + std::move(wal_files), version_set)); + return (*iter)->status(); +} + +// 1. Go through all archived files and +// a. if ttl is enabled, delete outdated files +// b. if archive size limit is enabled, delete empty files, +// compute file number and size. +// 2. If size limit is enabled: +// a. compute how many files should be deleted +// b. get sorted non-empty archived logs +// c. delete what should be deleted +void WalManager::PurgeObsoleteWALFiles() { + bool const ttl_enabled = db_options_.WAL_ttl_seconds > 0; + bool const size_limit_enabled = db_options_.WAL_size_limit_MB > 0; + if (!ttl_enabled && !size_limit_enabled) { + return; + } + + int64_t current_time; + Status s = env_->GetCurrentTime(¤t_time); + if (!s.ok()) { + Log(db_options_.info_log, "Can't get current time: %s", + s.ToString().c_str()); + assert(false); + return; + } + uint64_t const now_seconds = static_cast(current_time); + uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled) + ? db_options_.WAL_ttl_seconds / 2 + : kDefaultIntervalToDeleteObsoleteWAL; + + if (purge_wal_files_last_run_ + time_to_check > now_seconds) { + return; + } + + purge_wal_files_last_run_ = now_seconds; + + std::string archival_dir = ArchivalDirectory(db_options_.wal_dir); + std::vector files; + s = env_->GetChildren(archival_dir, &files); + if (!s.ok()) { + Log(db_options_.info_log, "Can't get archive files: %s", + s.ToString().c_str()); + assert(false); + return; + } + + size_t log_files_num = 0; + uint64_t log_file_size = 0; + + for (auto& f : files) { + uint64_t number; + FileType type; + if (ParseFileName(f, &number, &type) && type == kLogFile) { + std::string const file_path = archival_dir + "/" + f; + if (ttl_enabled) { + uint64_t file_m_time; + Status const s = env_->GetFileModificationTime(file_path, &file_m_time); + if (!s.ok()) { + Log(db_options_.info_log, "Can't get file mod time: %s: %s", + file_path.c_str(), s.ToString().c_str()); + continue; + } + if (now_seconds - file_m_time > db_options_.WAL_ttl_seconds) { + Status const s = env_->DeleteFile(file_path); + if (!s.ok()) { + Log(db_options_.info_log, "Can't delete file: %s: %s", + file_path.c_str(), s.ToString().c_str()); + continue; + } else { + MutexLock l(&read_first_record_cache_mutex_); + read_first_record_cache_.erase(number); + } + continue; + } + } + + if (size_limit_enabled) { + uint64_t file_size; + Status const s = env_->GetFileSize(file_path, &file_size); + if (!s.ok()) { + Log(db_options_.info_log, "Can't get file size: %s: %s", + file_path.c_str(), s.ToString().c_str()); + return; + } else { + if (file_size > 0) { + log_file_size = std::max(log_file_size, file_size); + ++log_files_num; + } else { + Status s = env_->DeleteFile(file_path); + if (!s.ok()) { + Log(db_options_.info_log, "Can't delete file: %s: %s", + file_path.c_str(), s.ToString().c_str()); + continue; + } else { + MutexLock l(&read_first_record_cache_mutex_); + read_first_record_cache_.erase(number); + } + } + } + } + } + } + + if (0 == log_files_num || !size_limit_enabled) { + return; + } + + size_t const files_keep_num = + db_options_.WAL_size_limit_MB * 1024 * 1024 / log_file_size; + if (log_files_num <= files_keep_num) { + return; + } + + size_t files_del_num = log_files_num - files_keep_num; + VectorLogPtr archived_logs; + GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile); + + if (files_del_num > archived_logs.size()) { + Log(db_options_.info_log, + "Trying to delete more archived log files than " + "exist. Deleting all"); + files_del_num = archived_logs.size(); + } + + for (size_t i = 0; i < files_del_num; ++i) { + std::string const file_path = archived_logs[i]->PathName(); + Status const s = env_->DeleteFile(db_options_.wal_dir + "/" + file_path); + if (!s.ok()) { + Log(db_options_.info_log, "Can't delete file: %s: %s", file_path.c_str(), + s.ToString().c_str()); + continue; + } else { + MutexLock l(&read_first_record_cache_mutex_); + read_first_record_cache_.erase(archived_logs[i]->LogNumber()); + } + } +} + +void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) { + auto archived_log_name = ArchivedLogFileName(db_options_.wal_dir, number); + // The sync point below is used in (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1"); + Status s = env_->RenameFile(fname, archived_log_name); + // The sync point below is used in (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2"); + Log(db_options_.info_log, "Move log file %s to %s -- %s\n", fname.c_str(), + archived_log_name.c_str(), s.ToString().c_str()); +} + +namespace { +struct CompareLogByPointer { + bool operator()(const std::unique_ptr& a, + const std::unique_ptr& b) { + LogFileImpl* a_impl = dynamic_cast(a.get()); + LogFileImpl* b_impl = dynamic_cast(b.get()); + return *a_impl < *b_impl; + } +}; +} + +Status WalManager::GetSortedWalsOfType(const std::string& path, + VectorLogPtr& log_files, + WalFileType log_type) { + std::vector all_files; + const Status status = env_->GetChildren(path, &all_files); + if (!status.ok()) { + return status; + } + log_files.reserve(all_files.size()); + for (const auto& f : all_files) { + uint64_t number; + FileType type; + if (ParseFileName(f, &number, &type) && type == kLogFile) { + SequenceNumber sequence; + Status s = ReadFirstRecord(log_type, number, &sequence); + if (!s.ok()) { + return s; + } + if (sequence == 0) { + // empty file + continue; + } + + // Reproduce the race condition where a log file is moved + // to archived dir, between these two sync points, used in + // (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1"); + TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2"); + + uint64_t size_bytes; + s = env_->GetFileSize(LogFileName(path, number), &size_bytes); + // re-try in case the alive log file has been moved to archive. + if (!s.ok() && log_type == kAliveLogFile && + env_->FileExists(ArchivedLogFileName(path, number))) { + s = env_->GetFileSize(ArchivedLogFileName(path, number), &size_bytes); + } + if (!s.ok()) { + return s; + } + + log_files.push_back(std::move(std::unique_ptr( + new LogFileImpl(number, log_type, sequence, size_bytes)))); + } + } + CompareLogByPointer compare_log_files; + std::sort(log_files.begin(), log_files.end(), compare_log_files); + return status; +} + +Status WalManager::RetainProbableWalFiles(VectorLogPtr& all_logs, + const SequenceNumber target) { + int64_t start = 0; // signed to avoid overflow when target is < first file. + int64_t end = static_cast(all_logs.size()) - 1; + // Binary Search. avoid opening all files. + while (end >= start) { + int64_t mid = start + (end - start) / 2; // Avoid overflow. + SequenceNumber current_seq_num = all_logs.at(mid)->StartSequence(); + if (current_seq_num == target) { + end = mid; + break; + } else if (current_seq_num < target) { + start = mid + 1; + } else { + end = mid - 1; + } + } + // end could be -ve. + size_t start_index = std::max(static_cast(0), end); + // The last wal file is always included + all_logs.erase(all_logs.begin(), all_logs.begin() + start_index); + return Status::OK(); +} + +Status WalManager::ReadFirstRecord(const WalFileType type, + const uint64_t number, + SequenceNumber* sequence) { + if (type != kAliveLogFile && type != kArchivedLogFile) { + return Status::NotSupported("File Type Not Known " + std::to_string(type)); + } + { + MutexLock l(&read_first_record_cache_mutex_); + auto itr = read_first_record_cache_.find(number); + if (itr != read_first_record_cache_.end()) { + *sequence = itr->second; + return Status::OK(); + } + } + Status s; + if (type == kAliveLogFile) { + std::string fname = LogFileName(db_options_.wal_dir, number); + s = ReadFirstLine(fname, sequence); + if (env_->FileExists(fname) && !s.ok()) { + // return any error that is not caused by non-existing file + return s; + } + } + + if (type == kArchivedLogFile || !s.ok()) { + // check if the file got moved to archive. + std::string archived_file = + ArchivedLogFileName(db_options_.wal_dir, number); + s = ReadFirstLine(archived_file, sequence); + } + + if (s.ok() && *sequence != 0) { + MutexLock l(&read_first_record_cache_mutex_); + read_first_record_cache_.insert({number, *sequence}); + } + return s; +} + +// the function returns status.ok() and sequence == 0 if the file exists, but is +// empty +Status WalManager::ReadFirstLine(const std::string& fname, + SequenceNumber* sequence) { + struct LogReporter : public log::Reader::Reporter { + Env* env; + Logger* info_log; + const char* fname; + + Status* status; + bool ignore_error; // true if db_options_.paranoid_checks==false + virtual void Corruption(size_t bytes, const Status& s) { + Log(info_log, "%s%s: dropping %d bytes; %s", + (this->ignore_error ? "(ignoring error) " : ""), fname, + static_cast(bytes), s.ToString().c_str()); + if (this->status->ok()) { + // only keep the first error + *this->status = s; + } + } + }; + + std::unique_ptr file; + Status status = env_->NewSequentialFile(fname, &file, env_options_); + + if (!status.ok()) { + return status; + } + + LogReporter reporter; + reporter.env = env_; + reporter.info_log = db_options_.info_log.get(); + reporter.fname = fname.c_str(); + reporter.status = &status; + reporter.ignore_error = !db_options_.paranoid_checks; + log::Reader reader(std::move(file), &reporter, true /*checksum*/, + 0 /*initial_offset*/); + std::string scratch; + Slice record; + + if (reader.ReadRecord(&record, &scratch) && + (status.ok() || !db_options_.paranoid_checks)) { + if (record.size() < 12) { + reporter.Corruption(record.size(), + Status::Corruption("log record too small")); + // TODO read record's till the first no corrupt entry? + } else { + WriteBatch batch; + WriteBatchInternal::SetContents(&batch, record); + *sequence = WriteBatchInternal::Sequence(&batch); + return Status::OK(); + } + } + + // ReadRecord returns false on EOF, which means that the log file is empty. we + // return status.ok() in that case and set sequence number to 0 + *sequence = 0; + return status; +} + +#endif // ROCKSDB_LITE +} // namespace rocksdb diff --git a/db/wal_manager.h b/db/wal_manager.h new file mode 100644 index 000000000..493c426e3 --- /dev/null +++ b/db/wal_manager.h @@ -0,0 +1,95 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "port/port.h" + +#include "rocksdb/env.h" +#include "rocksdb/options.h" +#include "rocksdb/types.h" +#include "rocksdb/transaction_log.h" +#include "rocksdb/status.h" + +#include "db/version_set.h" + +namespace rocksdb { + +#ifndef ROCKSDB_LITE +class WalManager { + public: + WalManager(const DBOptions& db_options, const EnvOptions& env_options) + : db_options_(db_options), + env_options_(env_options), + env_(db_options.env), + purge_wal_files_last_run_(0) {} + + virtual Status GetSortedWalFiles(VectorLogPtr& files); + + virtual Status GetUpdatesSince( + SequenceNumber seq_number, std::unique_ptr* iter, + const TransactionLogIterator::ReadOptions& read_options, + VersionSet* version_set); + + void PurgeObsoleteWALFiles(); + + void ArchiveWALFile(const std::string& fname, uint64_t number); + + Status TEST_ReadFirstRecord(const WalFileType type, const uint64_t number, + SequenceNumber* sequence) { + return ReadFirstRecord(type, number, sequence); + } + + Status TEST_ReadFirstLine(const std::string& fname, + SequenceNumber* sequence) { + return ReadFirstLine(fname, sequence); + } + + private: + Status GetSortedWalsOfType(const std::string& path, VectorLogPtr& log_files, + WalFileType type); + // Requires: all_logs should be sorted with earliest log file first + // Retains all log files in all_logs which contain updates with seq no. + // Greater Than or Equal to the requested SequenceNumber. + Status RetainProbableWalFiles(VectorLogPtr& all_logs, + const SequenceNumber target); + + Status ReadFirstRecord(const WalFileType type, const uint64_t number, + SequenceNumber* sequence); + + Status ReadFirstLine(const std::string& fname, SequenceNumber* sequence); + + // ------- state from DBImpl ------ + const DBOptions& db_options_; + const EnvOptions& env_options_; + Env* env_; + + // ------- WalManager state ------- + // cache for ReadFirstRecord() calls + std::unordered_map read_first_record_cache_; + port::Mutex read_first_record_cache_mutex_; + + // last time when PurgeObsoleteWALFiles ran. + uint64_t purge_wal_files_last_run_; + + // obsolete files will be deleted every this seconds if ttl deletion is + // enabled and archive size_limit is disabled. + static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600; +}; + +#endif // ROCKSDB_LITE +} // namespace rocksdb diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc new file mode 100644 index 000000000..1f609d083 --- /dev/null +++ b/db/wal_manager_test.cc @@ -0,0 +1,279 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include + +#include "rocksdb/cache.h" +#include "rocksdb/write_batch.h" + +#include "db/wal_manager.h" +#include "db/log_writer.h" +#include "db/column_family.h" +#include "db/version_set.h" +#include "util/testharness.h" +#include "util/testutil.h" +#include "table/mock_table.h" +#include "db/db_impl.h" + +namespace rocksdb { + +// TODO(icanadi) mock out VersionSet +// TODO(icanadi) move other WalManager-specific tests from db_test here +class WalManagerTest { + public: + WalManagerTest() + : env_(Env::Default()), + dbname_(test::TmpDir() + "/wal_manager_test"), + table_cache_(NewLRUCache(50000, 16, 8)), + current_log_number_(0) { + DestroyDB(dbname_, Options()); + } + + void Init() { + ASSERT_OK(env_->CreateDirIfMissing(dbname_)); + ASSERT_OK(env_->CreateDirIfMissing(ArchivalDirectory(dbname_))); + db_options_.db_paths.emplace_back(dbname_, + std::numeric_limits::max()); + db_options_.wal_dir = dbname_; + + versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, + table_cache_.get(), &write_controller_)); + + wal_manager_.reset(new WalManager(db_options_, env_options_)); + } + + void Reopen() { + wal_manager_.reset(new WalManager(db_options_, env_options_)); + } + + // NOT thread safe + void Put(const std::string& key, const std::string& value) { + assert(current_log_writer_.get() != nullptr); + uint64_t seq = versions_->LastSequence() + 1; + WriteBatch batch; + batch.Put(key, value); + WriteBatchInternal::SetSequence(&batch, seq); + current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch)); + versions_->SetLastSequence(seq); + } + + // NOT thread safe + void RollTheLog(bool archived) { + current_log_number_++; + std::string fname = ArchivedLogFileName(dbname_, current_log_number_); + unique_ptr file; + ASSERT_OK(env_->NewWritableFile(fname, &file, env_options_)); + current_log_writer_.reset(new log::Writer(std::move(file))); + } + + void CreateArchiveLogs(int num_logs, int entries_per_log) { + for (int i = 1; i <= num_logs; ++i) { + RollTheLog(true); + for (int k = 0; k < entries_per_log; ++k) { + Put(std::to_string(k), std::string(1024, 'a')); + } + } + } + + std::unique_ptr OpenTransactionLogIter( + const SequenceNumber seq) { + unique_ptr iter; + Status status = wal_manager_->GetUpdatesSince( + seq, &iter, TransactionLogIterator::ReadOptions(), versions_.get()); + ASSERT_OK(status); + return std::move(iter); + } + + Env* env_; + std::string dbname_; + WriteController write_controller_; + EnvOptions env_options_; + std::shared_ptr table_cache_; + DBOptions db_options_; + std::unique_ptr versions_; + std::unique_ptr wal_manager_; + + std::unique_ptr current_log_writer_; + uint64_t current_log_number_; +}; + +TEST(WalManagerTest, ReadFirstRecordCache) { + Init(); + std::string path = dbname_ + "/000001.log"; + unique_ptr file; + ASSERT_OK(env_->NewWritableFile(path, &file, EnvOptions())); + + SequenceNumber s; + ASSERT_OK(wal_manager_->TEST_ReadFirstLine(path, &s)); + ASSERT_EQ(s, 0U); + + ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s)); + ASSERT_EQ(s, 0U); + + log::Writer writer(std::move(file)); + WriteBatch batch; + batch.Put("foo", "bar"); + WriteBatchInternal::SetSequence(&batch, 10); + writer.AddRecord(WriteBatchInternal::Contents(&batch)); + + // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here. + // Waiting for lei to finish with db_test + // env_->count_sequential_reads_ = true; + // sequential_read_counter_ sanity test + // ASSERT_EQ(env_->sequential_read_counter_.Read(), 0); + + ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s)); + ASSERT_EQ(s, 10U); + // did a read + // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here + // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1); + + ASSERT_OK(wal_manager_->TEST_ReadFirstRecord(kAliveLogFile, 1, &s)); + ASSERT_EQ(s, 10U); + // no new reads since the value is cached + // TODO(icanadi) move SpecialEnv outside of db_test, so we can reuse it here + // ASSERT_EQ(env_->sequential_read_counter_.Read(), 1); +} + +namespace { +uint64_t GetLogDirSize(std::string dir_path, Env* env) { + uint64_t dir_size = 0; + std::vector files; + env->GetChildren(dir_path, &files); + for (auto& f : files) { + uint64_t number; + FileType type; + if (ParseFileName(f, &number, &type) && type == kLogFile) { + std::string const file_path = dir_path + "/" + f; + uint64_t file_size; + env->GetFileSize(file_path, &file_size); + dir_size += file_size; + } + } + return dir_size; +} +std::vector ListSpecificFiles( + Env* env, const std::string& path, const FileType expected_file_type) { + std::vector files; + std::vector file_numbers; + env->GetChildren(path, &files); + uint64_t number; + FileType type; + for (size_t i = 0; i < files.size(); ++i) { + if (ParseFileName(files[i], &number, &type)) { + if (type == expected_file_type) { + file_numbers.push_back(number); + } + } + } + return std::move(file_numbers); +} + +int CountRecords(TransactionLogIterator* iter) { + int count = 0; + SequenceNumber lastSequence = 0; + BatchResult res; + while (iter->Valid()) { + res = iter->GetBatch(); + ASSERT_TRUE(res.sequence > lastSequence); + ++count; + lastSequence = res.sequence; + ASSERT_OK(iter->status()); + iter->Next(); + } + return count; +} +} // namespace + +TEST(WalManagerTest, WALArchivalSizeLimit) { + db_options_.WAL_ttl_seconds = 0; + db_options_.WAL_size_limit_MB = 1000; + Init(); + + // TEST : Create WalManager with huge size limit and no ttl. + // Create some archived files and call PurgeObsoleteWALFiles(). + // Count the archived log files that survived. + // Assert that all of them did. + // Change size limit. Re-open WalManager. + // Assert that archive is not greater than WAL_size_limit_MB after + // PurgeObsoleteWALFiles() + // Set ttl and time_to_check_ to small values. Re-open db. + // Assert that there are no archived logs left. + + std::string archive_dir = ArchivalDirectory(dbname_); + CreateArchiveLogs(20, 5000); + + std::vector log_files = + ListSpecificFiles(env_, archive_dir, kLogFile); + ASSERT_EQ(log_files.size(), 20U); + + db_options_.WAL_size_limit_MB = 8; + Reopen(); + wal_manager_->PurgeObsoleteWALFiles(); + + uint64_t archive_size = GetLogDirSize(archive_dir, env_); + ASSERT_TRUE(archive_size <= db_options_.WAL_size_limit_MB * 1024 * 1024); + + db_options_.WAL_ttl_seconds = 1; + env_->SleepForMicroseconds(2 * 1000 * 1000); + Reopen(); + wal_manager_->PurgeObsoleteWALFiles(); + + log_files = ListSpecificFiles(env_, archive_dir, kLogFile); + ASSERT_TRUE(log_files.empty()); +} + +TEST(WalManagerTest, WALArchivalTtl) { + db_options_.WAL_ttl_seconds = 1000; + Init(); + + // TEST : Create WalManager with a ttl and no size limit. + // Create some archived log files and call PurgeObsoleteWALFiles(). + // Assert that files are not deleted + // Reopen db with small ttl. + // Assert that all archived logs was removed. + + std::string archive_dir = ArchivalDirectory(dbname_); + CreateArchiveLogs(20, 5000); + + std::vector log_files = + ListSpecificFiles(env_, archive_dir, kLogFile); + ASSERT_GT(log_files.size(), 0U); + + db_options_.WAL_ttl_seconds = 1; + env_->SleepForMicroseconds(3 * 1000 * 1000); + Reopen(); + wal_manager_->PurgeObsoleteWALFiles(); + + log_files = ListSpecificFiles(env_, archive_dir, kLogFile); + ASSERT_TRUE(log_files.empty()); +} + +TEST(WalManagerTest, TransactionLogIteratorMoveOverZeroFiles) { + Init(); + RollTheLog(false); + Put("key1", std::string(1024, 'a')); + // Create a zero record WAL file. + RollTheLog(false); + RollTheLog(false); + + Put("key2", std::string(1024, 'a')); + + auto iter = OpenTransactionLogIter(0); + ASSERT_EQ(2, CountRecords(iter.get())); +} + +TEST(WalManagerTest, TransactionLogIteratorJustEmptyFile) { + Init(); + RollTheLog(false); + auto iter = OpenTransactionLogIter(0); + // Check that an empty iterator is returned + ASSERT_TRUE(!iter->Valid()); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }