Always truncate the latest WAL file on DB Open (#8122)

Summary:
Currently, we only truncate the latest alive WAL files when the DB is opened. If the latest WAL file is empty or was flushed during Open, its not truncated since the file will be deleted later on in the Open path. However, before deletion, a new WAL file is created, and if the process crash loops between the new WAL file creation and deletion of the old WAL file, the preallocated space will keep accumulating and eventually use up all disk space. To prevent this, always truncate the latest WAL file, even if its empty or the data was flushed.

Tests:
Add unit tests to db_wal_test

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8122

Reviewed By: riversand963

Differential Revision: D27366132

Pulled By: anand1976

fbshipit-source-id: f923cc03ef033ccb32b140d36c6a63a8152f0e8e
main
anand76 4 years ago committed by Facebook GitHub Bot
parent 0a5d23944d
commit 7d7f14480e
  1. 7
      db/db_impl/db_impl.h
  2. 70
      db/db_impl/db_impl_open.cc
  3. 157
      db/db_wal_test.cc
  4. 1
      env/io_posix.cc
  5. 3
      env/mock_env.cc

@ -1308,6 +1308,7 @@ class DBImpl : public DB {
struct LogFileNumberSize { struct LogFileNumberSize {
explicit LogFileNumberSize(uint64_t _number) : number(_number) {} explicit LogFileNumberSize(uint64_t _number) : number(_number) {}
LogFileNumberSize() {}
void AddSize(uint64_t new_size) { size += new_size; } void AddSize(uint64_t new_size) { size += new_size; }
uint64_t number; uint64_t number;
uint64_t size = 0; uint64_t size = 0;
@ -1507,6 +1508,12 @@ class DBImpl : public DB {
Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit); MemTable* mem, VersionEdit* edit);
// Get the size of a log file and, if truncate is true, truncate the
// log file to its actual size, thereby freeing preallocated space.
// Return success even if truncate fails
Status GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
LogFileNumberSize* log);
// Restore alive_log_files_ and total_log_size_ after recovery. // Restore alive_log_files_ and total_log_size_ after recovery.
// It needs to run only when there's no flush during recovery // It needs to run only when there's no flush during recovery
// (e.g. avoid_flush_during_recovery=true). May also trigger flush // (e.g. avoid_flush_during_recovery=true). May also trigger flush

@ -1229,8 +1229,16 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
} }
} }
if (status.ok() && data_seen && !flushed) { if (status.ok()) {
if (data_seen && !flushed) {
status = RestoreAliveLogFiles(wal_numbers); status = RestoreAliveLogFiles(wal_numbers);
} else {
// If there's no data in the WAL, or we flushed all the data, still
// truncate the log file. If the process goes into a crash loop before
// the file is deleted, the preallocated space will never get freed.
GetLogSizeAndMaybeTruncate(wal_numbers.back(), true, nullptr)
.PermitUncheckedError();
}
} }
event_logger_.Log() << "job" << job_id << "event" event_logger_.Log() << "job" << job_id << "event"
@ -1239,34 +1247,14 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
return status; return status;
} }
Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) { Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
if (wal_numbers.empty()) { LogFileNumberSize* log_ptr) {
return Status::OK();
}
Status s;
mutex_.AssertHeld();
assert(immutable_db_options_.avoid_flush_during_recovery);
if (two_write_queues_) {
log_write_mutex_.Lock();
}
// Mark these as alive so they'll be considered for deletion later by
// FindObsoleteFiles()
total_log_size_ = 0;
log_empty_ = false;
for (auto wal_number : wal_numbers) {
LogFileNumberSize log(wal_number); LogFileNumberSize log(wal_number);
std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number); std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number);
Status s;
// This gets the appear size of the wals, not including preallocated space. // This gets the appear size of the wals, not including preallocated space.
s = env_->GetFileSize(fname, &log.size); s = env_->GetFileSize(fname, &log.size);
if (!s.ok()) { if (s.ok() && truncate) {
break;
}
total_log_size_ += log.size;
alive_log_files_.push_back(log);
// We preallocate space for wals, but then after a crash and restart, those
// preallocated space are not needed anymore. It is likely only the last
// log has such preallocated space, so we only truncate for the last log.
if (wal_number == wal_numbers.back()) {
std::unique_ptr<FSWritableFile> last_log; std::unique_ptr<FSWritableFile> last_log;
Status truncate_status = fs_->ReopenWritableFile( Status truncate_status = fs_->ReopenWritableFile(
fname, fname,
@ -1287,6 +1275,38 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
truncate_status.ToString().c_str()); truncate_status.ToString().c_str());
} }
} }
if (log_ptr) {
*log_ptr = log;
}
return s;
}
Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
if (wal_numbers.empty()) {
return Status::OK();
}
Status s;
mutex_.AssertHeld();
assert(immutable_db_options_.avoid_flush_during_recovery);
if (two_write_queues_) {
log_write_mutex_.Lock();
}
// Mark these as alive so they'll be considered for deletion later by
// FindObsoleteFiles()
total_log_size_ = 0;
log_empty_ = false;
for (auto wal_number : wal_numbers) {
// We preallocate space for wals, but then after a crash and restart, those
// preallocated space are not needed anymore. It is likely only the last
// log has such preallocated space, so we only truncate for the last log.
LogFileNumberSize log;
s = GetLogSizeAndMaybeTruncate(
wal_number, /*truncate=*/(wal_number == wal_numbers.back()), &log);
if (!s.ok()) {
break;
}
total_log_size_ += log.size;
alive_log_files_.push_back(log);
} }
if (two_write_queues_) { if (two_write_queues_) {
log_write_mutex_.Unlock(); log_write_mutex_.Unlock();

@ -24,13 +24,37 @@ class DBWALTestBase : public DBTestBase {
#if defined(ROCKSDB_PLATFORM_POSIX) #if defined(ROCKSDB_PLATFORM_POSIX)
public: public:
#if defined(ROCKSDB_FALLOCATE_PRESENT)
bool IsFallocateSupported() {
// Test fallocate support of running file system.
// Skip this test if fallocate is not supported.
std::string fname_test_fallocate = dbname_ + "/preallocate_testfile";
int fd = -1;
do {
fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
} while (fd < 0 && errno == EINTR);
assert(fd > 0);
int alloc_status = fallocate(fd, 0, 0, 1);
int err_number = errno;
close(fd);
assert(env_->DeleteFile(fname_test_fallocate) == Status::OK());
if (err_number == ENOSYS || err_number == EOPNOTSUPP) {
fprintf(stderr, "Skipped preallocated space check: %s\n",
errnoStr(err_number).c_str());
return false;
}
assert(alloc_status == 0);
return true;
}
#endif // ROCKSDB_FALLOCATE_PRESENT
uint64_t GetAllocatedFileSize(std::string file_name) { uint64_t GetAllocatedFileSize(std::string file_name) {
struct stat sbuf; struct stat sbuf;
int err = stat(file_name.c_str(), &sbuf); int err = stat(file_name.c_str(), &sbuf);
assert(err == 0); assert(err == 0);
return sbuf.st_blocks * 512; return sbuf.st_blocks * 512;
} }
#endif #endif // ROCKSDB_PLATFORM_POSIX
}; };
class DBWALTest : public DBWALTestBase { class DBWALTest : public DBWALTestBase {
@ -1849,24 +1873,9 @@ TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) {
ROCKSDB_GTEST_SKIP("Test requires non-mem environment"); ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
return; return;
} }
// Test fallocate support of running file system. if (!IsFallocateSupported()) {
// Skip this test if fallocate is not supported.
std::string fname_test_fallocate = dbname_ + "/preallocate_testfile";
int fd = -1;
do {
fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
} while (fd < 0 && errno == EINTR);
ASSERT_GT(fd, 0);
int alloc_status = fallocate(fd, 0, 0, 1);
int err_number = errno;
close(fd);
ASSERT_OK(options.env->DeleteFile(fname_test_fallocate));
if (err_number == ENOSYS || err_number == EOPNOTSUPP) {
fprintf(stderr, "Skipped preallocated space check: %s\n",
errnoStr(err_number).c_str());
return; return;
} }
ASSERT_EQ(0, alloc_status);
DestroyAndReopen(options); DestroyAndReopen(options);
size_t preallocated_size = size_t preallocated_size =
@ -1889,6 +1898,120 @@ TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) {
ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()), ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
preallocated_size); preallocated_size);
} }
// Tests that we will truncate the preallocated space of the last log from
// previous.
TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithFlush) {
constexpr size_t kKB = 1024;
Options options = CurrentOptions();
options.env = env_;
options.avoid_flush_during_recovery = false;
options.avoid_flush_during_shutdown = true;
if (mem_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
return;
}
if (!IsFallocateSupported()) {
return;
}
DestroyAndReopen(options);
size_t preallocated_size =
dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
ASSERT_OK(Put("foo", "v1"));
VectorLogPtr log_files_before;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
ASSERT_EQ(1, log_files_before.size());
auto& file_before = log_files_before[0];
ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
preallocated_size);
// The log file has preallocated space.
Close();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::PurgeObsoleteFiles:Begin",
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"},
{"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate",
"DBImpl::DeleteObsoleteFileImpl::BeforeDeletion"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
port::Thread reopen_thread([&]() { Reopen(options); });
TEST_SYNC_POINT(
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover");
// After the flush during Open, the log file should get deleted. However,
// if the process is in a crash loop, the log file may not get
// deleted and thte preallocated space will keep accumulating. So we need
// to ensure it gets trtuncated.
EXPECT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
preallocated_size);
TEST_SYNC_POINT(
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate");
reopen_thread.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBWALTest, TruncateLastLogAfterRecoverWALEmpty) {
Options options = CurrentOptions();
options.env = env_;
options.avoid_flush_during_recovery = false;
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem/non-encrypted environment");
return;
}
if (!IsFallocateSupported()) {
return;
}
DestroyAndReopen(options);
size_t preallocated_size =
dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
Close();
std::vector<std::string> filenames;
std::string last_log;
uint64_t last_log_num = 0;
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
for (auto fname : filenames) {
uint64_t number;
FileType type;
if (ParseFileName(fname, &number, &type, nullptr)) {
if (type == kWalFile && number > last_log_num) {
last_log = fname;
}
}
}
ASSERT_NE(last_log, "");
last_log = dbname_ + '/' + last_log;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::PurgeObsoleteFiles:Begin",
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"},
{"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate",
"DBImpl::DeleteObsoleteFileImpl::BeforeDeletion"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"PosixWritableFile::Close",
[](void* arg) { *(reinterpret_cast<size_t*>(arg)) = 0; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Preallocate space for the empty log file. This could happen if WAL data
// was buffered in memory and the process crashed.
std::unique_ptr<WritableFile> log_file;
ASSERT_OK(env_->ReopenWritableFile(last_log, &log_file, EnvOptions()));
log_file->SetPreallocationBlockSize(preallocated_size);
log_file->PrepareWrite(0, 4096);
log_file.reset();
ASSERT_GE(GetAllocatedFileSize(last_log), preallocated_size);
port::Thread reopen_thread([&]() { Reopen(options); });
TEST_SYNC_POINT(
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover");
// The preallocated space should be truncated.
EXPECT_LT(GetAllocatedFileSize(last_log), preallocated_size);
TEST_SYNC_POINT(
"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate");
reopen_thread.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
#endif // ROCKSDB_FALLOCATE_PRESENT #endif // ROCKSDB_FALLOCATE_PRESENT
#endif // ROCKSDB_PLATFORM_POSIX #endif // ROCKSDB_PLATFORM_POSIX

1
env/io_posix.cc vendored

@ -1213,6 +1213,7 @@ IOStatus PosixWritableFile::Close(const IOOptions& /*opts*/,
size_t block_size; size_t block_size;
size_t last_allocated_block; size_t last_allocated_block;
GetPreallocationStatus(&block_size, &last_allocated_block); GetPreallocationStatus(&block_size, &last_allocated_block);
TEST_SYNC_POINT_CALLBACK("PosixWritableFile::Close", &last_allocated_block);
if (last_allocated_block > 0) { if (last_allocated_block > 0) {
// trim the extra space preallocated at the end of the file // trim the extra space preallocated at the end of the file
// NOTE(ljin): we probably don't want to surface failure as an IOError, // NOTE(ljin): we probably don't want to surface failure as an IOError,

3
env/mock_env.cc vendored

@ -724,11 +724,12 @@ IOStatus MockFileSystem::ReopenWritableFile(
MemFile* file = nullptr; MemFile* file = nullptr;
if (file_map_.find(fn) == file_map_.end()) { if (file_map_.find(fn) == file_map_.end()) {
file = new MemFile(env_, fn, false); file = new MemFile(env_, fn, false);
// Only take a reference when we create the file objectt
file->Ref();
file_map_[fn] = file; file_map_[fn] = file;
} else { } else {
file = file_map_[fn]; file = file_map_[fn];
} }
file->Ref();
if (file_opts.use_direct_writes && !supports_direct_io_) { if (file_opts.use_direct_writes && !supports_direct_io_) {
return IOStatus::NotSupported("Direct I/O Not Supported"); return IOStatus::NotSupported("Direct I/O Not Supported");
} else { } else {

Loading…
Cancel
Save