Fix bugs in WAL trash file handling (#5520)

Summary:
1. Cleanup WAL trash files on open
2. Don't apply deletion rate limit if WAL dir is different from db dir
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5520

Test Plan: Add new unit tests and make check

Differential Revision: D16096750

Pulled By: anand1976

fbshipit-source-id: 6f07858ad864b754b711db416f0389c45ede599b
main
anand76 6 years ago committed by Facebook Github Bot
parent 2de61d9129
commit e0d9d57750
  1. 2
      HISTORY.md
  2. 14
      db/db_impl/db_impl.cc
  3. 2
      db/db_impl/db_impl.h
  4. 3
      db/db_impl/db_impl_files.cc
  5. 23
      db/db_impl/db_impl_open.cc
  6. 105
      db/db_sst_test.cc
  7. 9
      db/wal_manager.cc
  8. 6
      db/wal_manager.h
  9. 17
      file/file_util.cc
  10. 6
      file/file_util.h
  11. 6
      utilities/blob_db/blob_db_impl.cc

@ -14,6 +14,7 @@
* The semantics of the per-block-type block read counts in the performance context now match those of the generic block_read_count.
* Add C bindings for secondary instance, i.e. DBImplSecondary.
* db_bench adds a "benchmark" stats_history, which prints out the whole stats history.
* Rate limited deletion of WALs is only enabled if DBOptions::wal_dir is not set, or explicitly set to db_name passed to DB::Open and DBOptions::db_paths is empty, or same as db_paths[0].path
### New Features
* Add an option `snap_refresh_nanos` (default to 0.1s) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature.
@ -40,6 +41,7 @@
* Fix ingested file and directory not being fsync.
* Return TryAgain status in place of Corruption when new tail is not visible to TransactionLogIterator.
* Fix a bug caused by secondary not skipping the beginning of new MANIFEST.
* On DB open, delete WAL trash files left behind in wal_dir
## 6.2.0 (4/30/2019)
### New Features

@ -3137,6 +3137,7 @@ Status DestroyDB(const std::string& dbname, const Options& options,
ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
Env* env = soptions.env;
std::vector<std::string> filenames;
bool wal_in_db_path = IsWalDirSameAsDBPath(&soptions);
// Reset the logger because it holds a handle to the
// log file and prevents cleanup and directory removal
@ -3159,7 +3160,9 @@ Status DestroyDB(const std::string& dbname, const Options& options,
if (type == kMetaDatabase) {
del = DestroyDB(path_to_delete, options);
} else if (type == kTableFile || type == kLogFile) {
del = DeleteDBFile(&soptions, path_to_delete, dbname);
del =
DeleteDBFile(&soptions, path_to_delete, dbname,
/*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
} else {
del = env->DeleteFile(path_to_delete);
}
@ -3193,7 +3196,8 @@ Status DestroyDB(const std::string& dbname, const Options& options,
if (ParseFileName(fname, &number, &type) &&
type == kTableFile) { // Lock file will be deleted at end
std::string table_path = path + "/" + fname;
Status del = DeleteDBFile(&soptions, table_path, dbname);
Status del = DeleteDBFile(&soptions, table_path, dbname,
/*force_bg=*/false, /*force_fg=*/false);
if (result.ok() && !del.ok()) {
result = del;
}
@ -3220,7 +3224,8 @@ Status DestroyDB(const std::string& dbname, const Options& options,
for (const auto& file : archiveFiles) {
if (ParseFileName(file, &number, &type) && type == kLogFile) {
Status del =
DeleteDBFile(&soptions, archivedir + "/" + file, archivedir);
DeleteDBFile(&soptions, archivedir + "/" + file, archivedir,
/*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
if (result.ok() && !del.ok()) {
result = del;
}
@ -3235,7 +3240,8 @@ Status DestroyDB(const std::string& dbname, const Options& options,
if (ParseFileName(file, &number, &type) && type == kLogFile) {
Status del =
DeleteDBFile(&soptions, LogFileName(soptions.wal_dir, number),
soptions.wal_dir);
soptions.wal_dir, /*force_bg=*/false,
/*force_fg=*/!wal_in_db_path);
if (result.ok() && !del.ok()) {
result = del;
}

@ -1893,6 +1893,8 @@ class DBImpl : public DB {
// results sequentially. Flush results of memtables with lower IDs get
// installed to MANIFEST first.
InstrumentedCondVar atomic_flush_install_cv_;
bool wal_in_db_path_;
};
extern Options SanitizeOptions(const std::string& db, const Options& src);

@ -258,7 +258,8 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
Status file_deletion_status;
if (type == kTableFile || type == kLogFile) {
file_deletion_status =
DeleteDBFile(&immutable_db_options_, fname, path_to_sync);
DeleteDBFile(&immutable_db_options_, fname, path_to_sync,
/*force_bg=*/false, /*force_fg=*/!wal_in_db_path_);
} else {
file_deletion_status = env_->DeleteFile(fname);
}

@ -122,6 +122,25 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
}
#ifndef ROCKSDB_LITE
ImmutableDBOptions immutable_db_options(result);
if (!IsWalDirSameAsDBPath(&immutable_db_options)) {
// Either the WAL dir and db_paths[0]/db_name are not the same, or we
// cannot tell for sure. In either case, assume they're different and
// explicitly cleanup the trash log files (bypass DeleteScheduler)
// Do this first so even if we end up calling
// DeleteScheduler::CleanupDirectory on the same dir later, it will be
// safe
std::vector<std::string> filenames;
result.env->GetChildren(result.wal_dir, &filenames);
for (std::string& filename : filenames) {
if (filename.find(".log.trash",
filename.length() - std::string(".log.trash").length()) !=
std::string::npos) {
std::string trash_file = result.wal_dir + "/" + filename;
result.env->DeleteFile(trash_file);
}
}
}
// When the DB is stopped, it's possible that there are some .trash files that
// were not deleted yet, when we open the DB we will find these .trash files
// and schedule them to be deleted (or delete immediately if SstFileManager
@ -1294,6 +1313,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
delete impl;
return s;
}
impl->wal_in_db_path_ =
IsWalDirSameAsDBPath(&impl->immutable_db_options_);
impl->mutex_.Lock();
// Handles create_if_missing, error_if_exists
s = impl->Recover(column_families);

@ -470,6 +470,111 @@ TEST_F(DBSSTTest, RateLimitedWALDelete) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
class DBWALTestWithParam
: public DBSSTTest,
public testing::WithParamInterface<std::tuple<std::string, bool>> {
public:
DBWALTestWithParam() {
wal_dir_ = std::get<0>(GetParam());
wal_dir_same_as_dbname_ = std::get<1>(GetParam());
}
std::string wal_dir_;
bool wal_dir_same_as_dbname_;
};
TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
class MyEnv : public EnvWrapper {
public:
MyEnv(Env* t) : EnvWrapper(t), fake_log_delete(false) {}
Status DeleteFile(const std::string& fname) {
if (fname.find(".log.trash") != std::string::npos && fake_log_delete) {
return Status::OK();
}
return target()->DeleteFile(fname);
}
void set_fake_log_delete(bool fake) { fake_log_delete = fake; }
private:
bool fake_log_delete;
};
std::unique_ptr<MyEnv> env(new MyEnv(Env::Default()));
Destroy(last_options_);
env->set_fake_log_delete(true);
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.compression = kNoCompression;
options.env = env.get();
options.wal_dir = dbname_ + wal_dir_;
int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
Status s;
options.sst_file_manager.reset(
NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
ASSERT_OK(s);
options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
ASSERT_OK(TryReopen(options));
// Create 4 files in L0
for (char v = 'a'; v <= 'd'; v++) {
ASSERT_OK(Put("Key2", DummyString(1024, v)));
ASSERT_OK(Put("Key3", DummyString(1024, v)));
ASSERT_OK(Put("Key4", DummyString(1024, v)));
ASSERT_OK(Put("Key1", DummyString(1024, v)));
ASSERT_OK(Put("Key4", DummyString(1024, v)));
ASSERT_OK(Flush());
}
// We created 4 sst files in L0
ASSERT_EQ("4", FilesPerLevel(0));
Close();
options.sst_file_manager.reset();
std::vector<std::string> filenames;
int trash_log_count = 0;
if (!wal_dir_same_as_dbname_) {
// Forcibly create some trash log files
std::unique_ptr<WritableFile> result;
env->NewWritableFile(options.wal_dir + "/1000.log.trash", &result,
EnvOptions());
result.reset();
}
env->GetChildren(options.wal_dir, &filenames);
for (const std::string& fname : filenames) {
if (fname.find(".log.trash") != std::string::npos) {
trash_log_count++;
}
}
ASSERT_GE(trash_log_count, 1);
env->set_fake_log_delete(false);
ASSERT_OK(TryReopen(options));
filenames.clear();
trash_log_count = 0;
env->GetChildren(options.wal_dir, &filenames);
for (const std::string& fname : filenames) {
if (fname.find(".log.trash") != std::string::npos) {
trash_log_count++;
}
}
ASSERT_EQ(trash_log_count, 0);
Close();
}
INSTANTIATE_TEST_CASE_P(DBWALTestWithParam, DBWALTestWithParam,
::testing::Values(std::make_tuple("", true),
std::make_tuple("_wal_dir", false)));
TEST_F(DBSSTTest, OpenDBWithExistingTrash) {
Options options = CurrentOptions();

@ -187,7 +187,8 @@ void WalManager::PurgeObsoleteWALFiles() {
continue;
}
if (now_seconds - file_m_time > db_options_.wal_ttl_seconds) {
s = DeleteDBFile(&db_options_, file_path, archival_dir, false);
s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
/*force_fg=*/!wal_in_db_path_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());
@ -213,7 +214,8 @@ void WalManager::PurgeObsoleteWALFiles() {
log_file_size = std::max(log_file_size, file_size);
++log_files_num;
} else {
s = DeleteDBFile(&db_options_, file_path, archival_dir, false);
s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
/*force_fg=*/!wal_in_db_path_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Unable to delete file: %s: %s", file_path.c_str(),
@ -253,7 +255,8 @@ void WalManager::PurgeObsoleteWALFiles() {
for (size_t i = 0; i < files_del_num; ++i) {
std::string const file_path = archived_logs[i]->PathName();
s = DeleteDBFile(&db_options_, db_options_.wal_dir + "/" + file_path,
db_options_.wal_dir, false);
db_options_.wal_dir, false,
/*force_fg=*/!wal_in_db_path_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
file_path.c_str(), s.ToString().c_str());

@ -18,6 +18,7 @@
#include <memory>
#include "db/version_set.h"
#include "file/file_util.h"
#include "options/db_options.h"
#include "port/port.h"
#include "rocksdb/env.h"
@ -40,7 +41,8 @@ class WalManager {
env_options_(env_options),
env_(db_options.env),
purge_wal_files_last_run_(0),
seq_per_batch_(seq_per_batch) {}
seq_per_batch_(seq_per_batch),
wal_in_db_path_(IsWalDirSameAsDBPath(&db_options)) {}
Status GetSortedWalFiles(VectorLogPtr& files);
@ -97,6 +99,8 @@ class WalManager {
bool seq_per_batch_;
bool wal_in_db_path_;
// obsolete files will be deleted every this seconds if ttl deletion is
// enabled and archive size_limit is disabled.
static const uint64_t kDefaultIntervalToDeleteObsoleteWAL = 600;

@ -88,12 +88,12 @@ Status CreateFile(Env* env, const std::string& destination,
}
Status DeleteDBFile(const ImmutableDBOptions* db_options,
const std::string& fname, const std::string& dir_to_sync,
const bool force_bg) {
const std::string& fname, const std::string& dir_to_sync,
const bool force_bg, const bool force_fg) {
#ifndef ROCKSDB_LITE
SstFileManagerImpl* sfm =
static_cast<SstFileManagerImpl*>(db_options->sst_file_manager.get());
if (sfm) {
if (sfm && !force_fg) {
return sfm->ScheduleFileDeletion(fname, dir_to_sync, force_bg);
} else {
return db_options->env->DeleteFile(fname);
@ -101,10 +101,21 @@ Status DeleteDBFile(const ImmutableDBOptions* db_options,
#else
(void)dir_to_sync;
(void)force_bg;
(void)force_fg;
// SstFileManager is not supported in ROCKSDB_LITE
// Delete file immediately
return db_options->env->DeleteFile(fname);
#endif
}
bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options) {
bool same = false;
Status s = db_options->env->AreFilesSame(db_options->wal_dir,
db_options->db_paths[0].path, &same);
if (s.IsNotSupported()) {
same = db_options->wal_dir == db_options->db_paths[0].path;
}
return same;
}
} // namespace rocksdb

@ -24,7 +24,9 @@ extern Status CreateFile(Env* env, const std::string& destination,
extern Status DeleteDBFile(const ImmutableDBOptions* db_options,
const std::string& fname,
const std::string& path_to_sync,
const bool force_bg = false);
const std::string& path_to_sync, const bool force_bg,
const bool force_fg);
extern bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options);
} // namespace rocksdb

@ -1758,7 +1758,8 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
blob_files_.erase(bfile->BlobFileNumber());
Status s = DeleteDBFile(&(db_impl_->immutable_db_options()),
bfile->PathName(), blob_dir_, true);
bfile->PathName(), blob_dir_, true,
/*force_fg=*/false);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"File failed to be deleted as obsolete %s",
@ -1848,7 +1849,8 @@ Status DestroyBlobDB(const std::string& dbname, const Options& options,
uint64_t number;
FileType type;
if (ParseFileName(f, &number, &type) && type == kBlobFile) {
Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true);
Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true,
/*force_fg=*/false);
if (status.ok() && !del.ok()) {
status = del;
}

Loading…
Cancel
Save