Improve log handling when recover without flush (#4405)

Summary:
Improve log handling when avoid_flush_during_recovery=true.
1. restore total_log_size_ after recovery, by summing up existing log sizes. Fixes #4253.
2. truncate the last existing log, since this log can contain preallocated space and it will be a waste to keep the space. It avoids a crash loop of user application cause a lot of log with non-trivial size being created and ultimately take up all disk space.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4405

Differential Revision: D9953933

Pulled By: yiwu-arbug

fbshipit-source-id: 967780fee8acec7f358b6eb65190fb4684f82e56
main
Yi Wu 6 years ago committed by Facebook Github Bot
parent 17edc82a4b
commit dc813e4b85
  1. 7
      db/db_impl.h
  2. 6
      db/db_impl_debug.cc
  3. 86
      db/db_impl_open.cc
  4. 6
      db/db_test_util.h
  5. 102
      db/db_wal_test.cc

@ -464,6 +464,7 @@ class DBImpl : public DB {
int TEST_BGCompactionsAllowed() const;
int TEST_BGFlushesAllowed() const;
size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
#endif // NDEBUG
@ -934,6 +935,12 @@ class DBImpl : public DB {
Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit);
// Restore alive_log_files_ and total_log_size_ after recovery.
// It needs to run only when there's no flush during recovery
// (e.g. avoid_flush_during_recovery=true). May also trigger flush
// in case total_log_size > max_total_wal_size.
Status RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers);
// num_bytes: for slowdown case, delay time is calculated based on
// `num_bytes` going through.
Status DelayWrite(uint64_t num_bytes, const WriteOptions& write_options);

@ -237,5 +237,11 @@ SequenceNumber DBImpl::TEST_GetLastVisibleSequence() const {
}
}
size_t DBImpl::TEST_GetWalPreallocateBlockSize(
uint64_t write_buffer_size) const {
InstrumentedMutexLock l(&mutex_);
return GetWalPreallocateBlockSize(write_buffer_size);
}
} // namespace rocksdb
#endif // NDEBUG

@ -396,6 +396,16 @@ Status DBImpl::Recover(
}
}
}
// Initial max_total_in_memory_state_ before recovery logs. Log recovery
// may check this value to decide whether to flush.
max_total_in_memory_state_ = 0;
for (auto cfd : *versions_->GetColumnFamilySet()) {
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
mutable_cf_options->max_write_buffer_number;
}
if (s.ok()) {
SequenceNumber next_sequence(kMaxSequenceNumber);
default_cf_handle_ = new ColumnFamilyHandleImpl(
@ -468,14 +478,6 @@ Status DBImpl::Recover(
}
}
// Initial value
max_total_in_memory_state_ = 0;
for (auto cfd : *versions_->GetColumnFamilySet()) {
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
mutable_cf_options->max_write_buffer_number;
}
return s;
}
@ -885,18 +887,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}
}
if (data_seen && !flushed) {
// Mark these as alive so they'll be considered for deletion later by
// FindObsoleteFiles()
if (two_write_queues_) {
log_write_mutex_.Lock();
}
for (auto log_number : log_numbers) {
alive_log_files_.push_back(LogFileNumberSize(log_number));
}
if (two_write_queues_) {
log_write_mutex_.Unlock();
}
if (status.ok() && data_seen && !flushed) {
status = RestoreAliveLogFiles(log_numbers);
}
event_logger_.Log() << "job" << job_id << "event"
@ -905,6 +897,60 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
return status;
}
Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers) {
if (log_numbers.empty()) {
return Status::OK();
}
Status s;
mutex_.AssertHeld();
assert(immutable_db_options_.avoid_flush_during_recovery);
if (two_write_queues_) {
log_write_mutex_.Lock();
}
// Mark these as alive so they'll be considered for deletion later by
// FindObsoleteFiles()
total_log_size_ = 0;
log_empty_ = false;
for (auto log_number : log_numbers) {
LogFileNumberSize log(log_number);
std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
// This gets the appear size of the logs, not including preallocated space.
s = env_->GetFileSize(fname, &log.size);
if (!s.ok()) {
break;
}
total_log_size_ += log.size;
alive_log_files_.push_back(log);
// We preallocate space for logs, but then after a crash and restart, those
// preallocated space are not needed anymore. It is likely only the last
// log has such preallocated space, so we only truncate for the last log.
if (log_number == log_numbers.back()) {
std::unique_ptr<WritableFile> last_log;
Status truncate_status = env_->ReopenWritableFile(
fname, &last_log,
env_->OptimizeForLogWrite(
env_options_,
BuildDBOptions(immutable_db_options_, mutable_db_options_)));
if (truncate_status.ok()) {
truncate_status = last_log->Truncate(log.size);
}
if (truncate_status.ok()) {
truncate_status = last_log->Close();
}
// Not a critical error if fail to truncate.
if (!truncate_status.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Failed to truncate log #%" PRIu64 ": %s", log_number,
truncate_status.ToString().c_str());
}
}
}
if (two_write_queues_) {
log_write_mutex_.Unlock();
}
return s;
}
Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit) {
mutex_.AssertHeld();

@ -315,6 +315,9 @@ class SpecialEnv : public EnvWrapper {
}
}
uint64_t GetFileSize() override { return base_->GetFileSize(); }
Status Allocate(uint64_t offset, uint64_t len) override {
return base_->Allocate(offset, len);
}
private:
SpecialEnv* env_;
@ -368,6 +371,9 @@ class SpecialEnv : public EnvWrapper {
bool IsSyncThreadSafe() const override {
return env_->is_wal_sync_thread_safe_.load();
}
Status Allocate(uint64_t offset, uint64_t len) override {
return base_->Allocate(offset, len);
}
private:
SpecialEnv* env_;

@ -18,6 +18,15 @@ namespace rocksdb {
class DBWALTest : public DBTestBase {
public:
DBWALTest() : DBTestBase("/db_wal_test") {}
#if defined(ROCKSDB_PLATFORM_POSIX)
uint64_t GetAllocatedFileSize(std::string file_name) {
struct stat sbuf;
int err = stat(file_name.c_str(), &sbuf);
assert(err == 0);
return sbuf.st_blocks * 512;
}
#endif
};
// A SpecialEnv enriched to give more insight about deleted files
@ -1329,6 +1338,99 @@ TEST_F(DBWALTest, RecoverFromCorruptedWALWithoutFlush) {
}
}
// Tests that total log size is recovered if we set
// avoid_flush_during_recovery=true.
// Flush should trigger if max_total_wal_size is reached.
TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) {
class TestFlushListener : public EventListener {
public:
std::atomic<int> count{0};
TestFlushListener() = default;
void OnFlushBegin(DB* /*db*/, const FlushJobInfo& flush_job_info) override {
count++;
assert(FlushReason::kWriteBufferManager == flush_job_info.flush_reason);
}
};
std::shared_ptr<TestFlushListener> test_listener =
std::make_shared<TestFlushListener>();
constexpr size_t kKB = 1024;
constexpr size_t kMB = 1024 * 1024;
Options options = CurrentOptions();
options.avoid_flush_during_recovery = true;
options.max_total_wal_size = 1 * kMB;
options.listeners.push_back(test_listener);
// Have to open DB in multi-CF mode to trigger flush when
// max_total_wal_size is reached.
CreateAndReopenWithCF({"one"}, options);
// Write some keys and we will end up with one log file which is slightly
// smaller than 1MB.
std::string value_100k(100 * kKB, 'v');
std::string value_300k(300 * kKB, 'v');
ASSERT_OK(Put(0, "foo", "v1"));
for (int i = 0; i < 9; i++) {
ASSERT_OK(Put(1, "key" + ToString(i), value_100k));
}
// Get log files before reopen.
VectorLogPtr log_files_before;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
ASSERT_EQ(1, log_files_before.size());
uint64_t log_size_before = log_files_before[0]->SizeFileBytes();
ASSERT_GT(log_size_before, 900 * kKB);
ASSERT_LT(log_size_before, 1 * kMB);
ReopenWithColumnFamilies({"default", "one"}, options);
// Write one more value to make log larger than 1MB.
ASSERT_OK(Put(1, "bar", value_300k));
// Get log files again. A new log file will be opened.
VectorLogPtr log_files_after_reopen;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after_reopen));
ASSERT_EQ(2, log_files_after_reopen.size());
ASSERT_EQ(log_files_before[0]->LogNumber(),
log_files_after_reopen[0]->LogNumber());
ASSERT_GT(log_files_after_reopen[0]->SizeFileBytes() +
log_files_after_reopen[1]->SizeFileBytes(),
1 * kMB);
// Write one more key to trigger flush.
ASSERT_OK(Put(0, "foo", "v2"));
dbfull()->TEST_WaitForFlushMemTable();
// Flushed two column families.
ASSERT_EQ(2, test_listener->count.load());
}
#if defined(ROCKSDB_PLATFORM_POSIX)
#if defined(ROCKSDB_FALLOCATE_PRESENT)
// Tests that we will truncate the preallocated space of the last log from
// previous.
TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) {
constexpr size_t kKB = 1024;
Options options = CurrentOptions();
options.avoid_flush_during_recovery = true;
DestroyAndReopen(options);
size_t preallocated_size =
dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
ASSERT_OK(Put("foo", "v1"));
VectorLogPtr log_files_before;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
ASSERT_EQ(1, log_files_before.size());
auto& file_before = log_files_before[0];
ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
// The log file has preallocated space.
ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
preallocated_size);
Reopen(options);
VectorLogPtr log_files_after;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after));
ASSERT_EQ(1, log_files_after.size());
ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB);
// The preallocated space should be truncated.
ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
preallocated_size);
}
#endif // ROCKSDB_FALLOCATE_PRESENT
#endif // ROCKSDB_PLATFORM_POSIX
#endif // ROCKSDB_LITE
TEST_F(DBWALTest, WalTermTest) {

Loading…
Cancel
Save