// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #ifndef ROCKSDB_LITE #include "file/delete_scheduler.h" #include <thread> #include <vector> #include "file/sst_file_manager_impl.h" #include "logging/logging.h" #include "port/port.h" #include "rocksdb/env.h" #include "test_util/sync_point.h" #include "util/mutexlock.h" namespace rocksdb { DeleteScheduler::DeleteScheduler(Env* env, int64_t rate_bytes_per_sec, Logger* info_log, SstFileManagerImpl* sst_file_manager, double max_trash_db_ratio, uint64_t bytes_max_delete_chunk) : env_(env), total_trash_size_(0), rate_bytes_per_sec_(rate_bytes_per_sec), pending_files_(0), bytes_max_delete_chunk_(bytes_max_delete_chunk), closing_(false), cv_(&mu_), info_log_(info_log), sst_file_manager_(sst_file_manager), max_trash_db_ratio_(max_trash_db_ratio) { assert(sst_file_manager != nullptr); assert(max_trash_db_ratio >= 0); bg_thread_.reset( new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this)); } DeleteScheduler::~DeleteScheduler() { { InstrumentedMutexLock l(&mu_); closing_ = true; cv_.SignalAll(); } if (bg_thread_) { bg_thread_->join(); } } Status DeleteScheduler::DeleteFile(const std::string& file_path, const std::string& dir_to_sync, const bool force_bg) { Status s; if (rate_bytes_per_sec_.load() <= 0 || (!force_bg && total_trash_size_.load() > sst_file_manager_->GetTotalSize() * max_trash_db_ratio_.load())) { // Rate limiting is disabled or trash size makes up more than // max_trash_db_ratio_ (default 25%) of the total DB size TEST_SYNC_POINT("DeleteScheduler::DeleteFile"); s = env_->DeleteFile(file_path); if (s.ok()) { sst_file_manager_->OnDeleteFile(file_path); } return s; } // Move file to trash std::string trash_file; s = MarkAsTrash(file_path, &trash_file); if (!s.ok()) { ROCKS_LOG_ERROR(info_log_, "Failed to mark %s as trash", file_path.c_str()); s = env_->DeleteFile(file_path); if (s.ok()) { sst_file_manager_->OnDeleteFile(file_path); } return s; } // Update the total trash size uint64_t trash_file_size = 0; env_->GetFileSize(trash_file, &trash_file_size); total_trash_size_.fetch_add(trash_file_size); // Add file to delete queue { InstrumentedMutexLock l(&mu_); queue_.emplace(trash_file, dir_to_sync); pending_files_++; if (pending_files_ == 1) { cv_.SignalAll(); } } return s; } std::map<std::string, Status> DeleteScheduler::GetBackgroundErrors() { InstrumentedMutexLock l(&mu_); return bg_errors_; } const std::string DeleteScheduler::kTrashExtension = ".trash"; bool DeleteScheduler::IsTrashFile(const std::string& file_path) { return (file_path.size() >= kTrashExtension.size() && file_path.rfind(kTrashExtension) == file_path.size() - kTrashExtension.size()); } Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm, const std::string& path) { Status s; // Check if there are any files marked as trash in this path std::vector<std::string> files_in_path; s = env->GetChildren(path, &files_in_path); if (!s.ok()) { return s; } for (const std::string& current_file : files_in_path) { if (!DeleteScheduler::IsTrashFile(current_file)) { // not a trash file, skip continue; } Status file_delete; std::string trash_file = path + "/" + current_file; if (sfm) { // We have an SstFileManager that will schedule the file delete sfm->OnAddFile(trash_file); file_delete = sfm->ScheduleFileDeletion(trash_file, path); } else { // Delete the file immediately file_delete = env->DeleteFile(trash_file); } if (s.ok() && !file_delete.ok()) { s = file_delete; } } return s; } Status DeleteScheduler::MarkAsTrash(const std::string& file_path, std::string* trash_file) { // Sanity check of the path size_t idx = file_path.rfind("/"); if (idx == std::string::npos || idx == file_path.size() - 1) { return Status::InvalidArgument("file_path is corrupted"); } Status s; if (DeleteScheduler::IsTrashFile(file_path)) { // This is already a trash file *trash_file = file_path; return s; } *trash_file = file_path + kTrashExtension; // TODO(tec) : Implement Env::RenameFileIfNotExist and remove // file_move_mu mutex. int cnt = 0; InstrumentedMutexLock l(&file_move_mu_); while (true) { s = env_->FileExists(*trash_file); if (s.IsNotFound()) { // We found a path for our file in trash s = env_->RenameFile(file_path, *trash_file); break; } else if (s.ok()) { // Name conflict, generate new random suffix *trash_file = file_path + std::to_string(cnt) + kTrashExtension; } else { // Error during FileExists call, we cannot continue break; } cnt++; } if (s.ok()) { sst_file_manager_->OnMoveFile(file_path, *trash_file); } return s; } void DeleteScheduler::BackgroundEmptyTrash() { TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash"); while (true) { InstrumentedMutexLock l(&mu_); while (queue_.empty() && !closing_) { cv_.Wait(); } if (closing_) { return; } // Delete all files in queue_ uint64_t start_time = env_->NowMicros(); uint64_t total_deleted_bytes = 0; int64_t current_delete_rate = rate_bytes_per_sec_.load(); while (!queue_.empty() && !closing_) { if (current_delete_rate != rate_bytes_per_sec_.load()) { // User changed the delete rate current_delete_rate = rate_bytes_per_sec_.load(); start_time = env_->NowMicros(); total_deleted_bytes = 0; } // Get new file to delete const FileAndDir& fad = queue_.front(); std::string path_in_trash = fad.fname; // We dont need to hold the lock while deleting the file mu_.Unlock(); uint64_t deleted_bytes = 0; bool is_complete = true; // Delete file from trash and update total_penlty value Status s = DeleteTrashFile(path_in_trash, fad.dir, &deleted_bytes, &is_complete); total_deleted_bytes += deleted_bytes; mu_.Lock(); if (is_complete) { queue_.pop(); } if (!s.ok()) { bg_errors_[path_in_trash] = s; } // Apply penlty if necessary uint64_t total_penlty; if (current_delete_rate > 0) { // rate limiting is enabled total_penlty = ((total_deleted_bytes * kMicrosInSecond) / current_delete_rate); while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {} } else { // rate limiting is disabled total_penlty = 0; } TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait", &total_penlty); if (is_complete) { pending_files_--; } if (pending_files_ == 0) { // Unblock WaitForEmptyTrash since there are no more files waiting // to be deleted cv_.SignalAll(); } } } } Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash, const std::string& dir_to_sync, uint64_t* deleted_bytes, bool* is_complete) { uint64_t file_size; Status s = env_->GetFileSize(path_in_trash, &file_size); *is_complete = true; TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile"); if (s.ok()) { bool need_full_delete = true; if (bytes_max_delete_chunk_ != 0 && file_size > bytes_max_delete_chunk_) { uint64_t num_hard_links = 2; // We don't have to worry aobut data race between linking a new // file after the number of file link check and ftruncte because // the file is now in trash and no hardlink is supposed to create // to trash files by RocksDB. Status my_status = env_->NumFileLinks(path_in_trash, &num_hard_links); if (my_status.ok()) { if (num_hard_links == 1) { std::unique_ptr<WritableFile> wf; my_status = env_->ReopenWritableFile(path_in_trash, &wf, EnvOptions()); if (my_status.ok()) { my_status = wf->Truncate(file_size - bytes_max_delete_chunk_); if (my_status.ok()) { TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:Fsync"); my_status = wf->Fsync(); } } if (my_status.ok()) { *deleted_bytes = bytes_max_delete_chunk_; need_full_delete = false; *is_complete = false; } else { ROCKS_LOG_WARN(info_log_, "Failed to partially delete %s from trash -- %s", path_in_trash.c_str(), my_status.ToString().c_str()); } } else { ROCKS_LOG_INFO(info_log_, "Cannot delete %s slowly through ftruncate from trash " "as it has other links", path_in_trash.c_str()); } } else if (!num_link_error_printed_) { ROCKS_LOG_INFO( info_log_, "Cannot delete files slowly through ftruncate from trash " "as Env::NumFileLinks() returns error: %s", my_status.ToString().c_str()); num_link_error_printed_ = true; } } if (need_full_delete) { s = env_->DeleteFile(path_in_trash); if (!dir_to_sync.empty()) { std::unique_ptr<Directory> dir_obj; if (s.ok()) { s = env_->NewDirectory(dir_to_sync, &dir_obj); } if (s.ok()) { s = dir_obj->Fsync(); TEST_SYNC_POINT_CALLBACK( "DeleteScheduler::DeleteTrashFile::AfterSyncDir", reinterpret_cast<void*>(const_cast<std::string*>(&dir_to_sync))); } } *deleted_bytes = file_size; sst_file_manager_->OnDeleteFile(path_in_trash); } } if (!s.ok()) { // Error while getting file size or while deleting ROCKS_LOG_ERROR(info_log_, "Failed to delete %s from trash -- %s", path_in_trash.c_str(), s.ToString().c_str()); *deleted_bytes = 0; } else { total_trash_size_.fetch_sub(*deleted_bytes); } return s; } void DeleteScheduler::WaitForEmptyTrash() { InstrumentedMutexLock l(&mu_); while (pending_files_ > 0 && !closing_) { cv_.Wait(); } } } // namespace rocksdb #endif // ROCKSDB_LITE