diff --git a/HISTORY.md b/HISTORY.md index cf566b49d..d9b73500f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ * Remove CompactionEventListener. ### New Features +* SstFileManager now can cancel compactions if they will result in max space errors. SstFileManager users can also use SetCompactionBufferSize to specify how much space must be leftover during a compaction for auxiliary file functions such as logging and flushing. * Avoid unnecessarily flushing in `CompactRange()` when the range specified by the user does not overlap unflushed memtables. * If `ColumnFamilyOptions::max_subcompactions` is set greater than one, we now parallelize large manual level-based compactions. * Add "rocksdb.live-sst-files-size" DB property to return total bytes of all SST files belong to the latest LSM tree. diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 264a4ac2c..31742d0bd 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -886,7 +886,7 @@ class TestEnv : public EnvWrapper { return Status::OK(); } - private: + private: int close_count; }; @@ -896,7 +896,7 @@ TEST_F(DBBasicTest, DBClose) { ASSERT_OK(DestroyDB(dbname, options)); DB* db = nullptr; - TestEnv *env = new TestEnv(); + TestEnv* env = new TestEnv(); options.create_if_missing = true; options.env = env; Status s = DB::Open(options, dbname, &db); diff --git a/db/db_impl.cc b/db/db_impl.cc index 81f331355..70a523b0d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -404,9 +404,7 @@ Status DBImpl::CloseHelper() { return ret; } -Status DBImpl::CloseImpl() { - return CloseHelper(); -} +Status DBImpl::CloseImpl() { return CloseHelper(); } DBImpl::~DBImpl() { if (!closed_) { diff --git a/db/db_impl.h b/db/db_impl.h index 3259189e5..dd31e1601 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -378,7 +378,9 @@ class DBImpl : public DB { Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr); // Wait for any compaction - Status TEST_WaitForCompact(); + // We add a bool parameter to wait for unscheduledCompactions_ == 0, but this + // is only for the special test of CancelledCompactions + Status TEST_WaitForCompact(bool waitUnscheduled = false); // Return the maximum overlapping data (in bytes) at next level for any // file at a level >= 1. diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 47eed51fa..e9a5eee5e 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -1503,7 +1503,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, if (made_progress || (bg_compaction_scheduled_ == 0 && bg_bottom_compaction_scheduled_ == 0) || - HasPendingManualCompaction()) { + HasPendingManualCompaction() || unscheduled_compactions_ == 0) { // signal if // * made_progress -- need to wakeup DelayWrite // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl @@ -1566,6 +1566,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // InternalKey manual_end_storage; // InternalKey* manual_end = &manual_end_storage; +#ifndef ROCKSDB_LITE + bool sfm_bookkeeping = false; +#endif // ROCKSDB_LITE if (is_manual) { ManualCompactionState* m = manual_compaction; assert(m->in_progress); @@ -1628,27 +1631,65 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction"); c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer)); TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction"); + + bool enough_room = true; if (c != nullptr) { - // update statistics - MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, - c->inputs(0)->size()); - // There are three things that can change compaction score: - // 1) When flush or compaction finish. This case is covered by - // InstallSuperVersionAndScheduleWork - // 2) When MutableCFOptions changes. This case is also covered by - // InstallSuperVersionAndScheduleWork, because this is when the new - // options take effect. - // 3) When we Pick a new compaction, we "remove" those files being - // compacted from the calculation, which then influences compaction - // score. Here we check if we need the new compaction even without the - // files that are currently being compacted. If we need another - // compaction, we might be able to execute it in parallel, so we add it - // to the queue and schedule a new thread. - if (cfd->NeedsCompaction()) { - // Yes, we need more compactions! +#ifndef ROCKSDB_LITE + auto sfm = static_cast( + immutable_db_options_.sst_file_manager.get()); + if (sfm) { + enough_room = sfm->EnoughRoomForCompaction(c.get()); + if (enough_room) { + sfm_bookkeeping = true; + } + } +#endif // ROCKSDB_LITE + if (!enough_room) { + // Just in case tests want to change the value of enough_room + TEST_SYNC_POINT_CALLBACK( + "DBImpl::BackgroundCompaction():CancelledCompaction", + &enough_room); + } + if (!enough_room) { + // Then don't do the compaction + c->ReleaseCompactionFiles(status); + c->column_family_data() + ->current() + ->storage_info() + ->ComputeCompactionScore(*(c->immutable_cf_options()), + *(c->mutable_cf_options())); + + ROCKS_LOG_BUFFER(log_buffer, + "Cancelled compaction because not enough room"); AddToCompactionQueue(cfd); ++unscheduled_compactions_; - MaybeScheduleFlushOrCompaction(); + + c.reset(); + // Don't need to sleep here, because BackgroundCallCompaction + // will sleep if !s.ok() + status = Status::CompactionTooLarge(); + } else { + // update statistics + MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, + c->inputs(0)->size()); + // There are three things that can change compaction score: + // 1) When flush or compaction finish. This case is covered by + // InstallSuperVersionAndScheduleWork + // 2) When MutableCFOptions changes. This case is also covered by + // InstallSuperVersionAndScheduleWork, because this is when the new + // options take effect. + // 3) When we Pick a new compaction, we "remove" those files being + // compacted from the calculation, which then influences compaction + // score. Here we check if we need the new compaction even without the + // files that are currently being compacted. If we need another + // compaction, we might be able to execute it in parallel, so we add + // it to the queue and schedule a new thread. + if (cfd->NeedsCompaction()) { + // Yes, we need more compactions! + AddToCompactionQueue(cfd); + ++unscheduled_compactions_; + MaybeScheduleFlushOrCompaction(); + } } } } @@ -1808,6 +1849,16 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (c != nullptr) { c->ReleaseCompactionFiles(status); *made_progress = true; + +#ifndef ROCKSDB_LITE + // Need to make sure SstFileManager does its bookkeeping + auto sfm = static_cast( + immutable_db_options_.sst_file_manager.get()); + if (sfm && sfm_bookkeeping) { + sfm->OnCompactionCompletion(c.get()); + } +#endif // ROCKSDB_LITE + NotifyOnCompactionCompleted( c->column_family_data(), c.get(), status, compaction_job_stats, job_context->job_id); @@ -1815,7 +1866,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // this will unref its input_version and column_family_data c.reset(); - if (status.ok()) { + if (status.ok() || status.IsCompactionTooLarge()) { // Done } else if (status.IsShutdownInProgress()) { // Ignore compaction errors found during shutting down diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 3ae271168..9d87f5c29 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -117,7 +117,7 @@ Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) { return WaitForFlushMemTable(cfd); } -Status DBImpl::TEST_WaitForCompact() { +Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) { // Wait until the compaction completes // TODO: a bug here. This function actually does not necessarily @@ -126,7 +126,8 @@ Status DBImpl::TEST_WaitForCompact() { InstrumentedMutexLock l(&mutex_); while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || - bg_flush_scheduled_) && + bg_flush_scheduled_ || + (wait_unscheduled && unscheduled_compactions_)) && bg_error_.ok()) { bg_cv_.Wait(); } diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index fbbbcfdf7..c48d789e6 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -540,6 +540,52 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) { ASSERT_NOK(Flush()); } +TEST_F(DBSSTTest, CancellingCompactionsWorks) { + std::shared_ptr sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast(sst_file_manager.get()); + + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.level0_file_num_compaction_trigger = 2; + DestroyAndReopen(options); + + int cancelled_compaction = 0; + int completed_compactions = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* arg) { + cancelled_compaction++; + sfm->SetMaxAllowedSpaceUsage(0); + }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", + [&](void* arg) { completed_compactions++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Random rnd(301); + + // Generate a file containing 10 keys. + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + uint64_t total_file_size = 0; + auto files_in_db = GetAllSSTFiles(&total_file_size); + // Set the maximum allowed space usage to the current total size + sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1); + + // Generate another file to trigger compaction. + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + dbfull()->TEST_WaitForCompact(true); + + ASSERT_GT(cancelled_compaction, 0); + ASSERT_GT(completed_compactions, 0); + ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) { // This test will set a maximum allowed space for the DB, then it will // keep filling the DB until the limit is reached and bg_error_ is set. @@ -568,6 +614,12 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) { estimate_multiplier++; // used in the main loop assert }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* arg) { + bool* enough_room = static_cast(arg); + *enough_room = true; + }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached", [&](void* arg) { diff --git a/db/malloc_stats.cc b/db/malloc_stats.cc index 00197d04f..02e895038 100644 --- a/db/malloc_stats.cc +++ b/db/malloc_stats.cc @@ -54,6 +54,5 @@ void DumpMallocStats(std::string* stats) { #else void DumpMallocStats(std::string*) {} #endif // ROCKSDB_JEMALLOC - } #endif // !ROCKSDB_LITE diff --git a/env/env.cc b/env/env.cc index f428697cb..9f165d6d1 100644 --- a/env/env.cc +++ b/env/env.cc @@ -73,7 +73,7 @@ RandomAccessFile::~RandomAccessFile() { WritableFile::~WritableFile() { } -Logger::~Logger() { } +Logger::~Logger() {} Status Logger::Close() { if (!closed_) { diff --git a/env/env_hdfs.cc b/env/env_hdfs.cc index c686d0824..1eaea3a1c 100644 --- a/env/env_hdfs.cc +++ b/env/env_hdfs.cc @@ -288,9 +288,7 @@ class HdfsLogger : public Logger { } protected: - virtual Status CloseImpl() override { - return HdfsCloseHelper(); - } + virtual Status CloseImpl() override { return HdfsCloseHelper(); } public: HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)()) diff --git a/env/env_test.cc b/env/env_test.cc index 054dcc451..bf19980a6 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -1514,48 +1514,46 @@ class TestEnv : public EnvWrapper { explicit TestEnv() : EnvWrapper(Env::Default()), close_count(0) { } - class TestLogger : public Logger { - public: - using Logger::Logv; - TestLogger(TestEnv *env_ptr) : Logger() { env = env_ptr; } - ~TestLogger() { - if (!closed_) { - CloseHelper(); - } - } - virtual void Logv(const char* /*format*/, va_list /*ap*/) override{}; - - protected: - virtual Status CloseImpl() override { - return CloseHelper(); - } - private: - Status CloseHelper() { - env->CloseCountInc();; - return Status::OK(); - } - TestEnv *env; - }; - - void CloseCountInc() { close_count++; } + class TestLogger : public Logger { + public: + using Logger::Logv; + TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; } + ~TestLogger() { + if (!closed_) { + CloseHelper(); + } + } + virtual void Logv(const char* format, va_list ap) override{}; - int GetCloseCount() { return close_count; } + protected: + virtual Status CloseImpl() override { return CloseHelper(); } - virtual Status NewLogger(const std::string& /*fname*/, - shared_ptr* result) { - result->reset(new TestLogger(this)); + private: + Status CloseHelper() { + env->CloseCountInc();; return Status::OK(); } + TestEnv* env; + }; - private: - int close_count; -}; + void CloseCountInc() { close_count++; } -class EnvTest : public testing::Test { + int GetCloseCount() { return close_count; } + + virtual Status NewLogger(const std::string& fname, + shared_ptr* result) { + result->reset(new TestLogger(this)); + return Status::OK(); + } + + private: + int close_count; }; +class EnvTest : public testing::Test {}; + TEST_F(EnvTest, Close) { - TestEnv *env = new TestEnv(); + TestEnv* env = new TestEnv(); std::shared_ptr logger; Status s; @@ -1577,7 +1575,6 @@ TEST_F(EnvTest, Close) { delete env; } - INSTANTIATE_TEST_CASE_P(DefaultEnvWithoutDirectIO, EnvPosixTestWithParam, ::testing::Values(std::pair(Env::Default(), false))); diff --git a/env/posix_logger.h b/env/posix_logger.h index 121591e0d..e983ba704 100644 --- a/env/posix_logger.h +++ b/env/posix_logger.h @@ -52,9 +52,7 @@ class PosixLogger : public Logger { std::atomic flush_pending_; protected: - virtual Status CloseImpl() override { - return PosixCloseHelper(); - } + virtual Status CloseImpl() override { return PosixCloseHelper(); } public: PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env, diff --git a/include/rocksdb/sst_file_manager.h b/include/rocksdb/sst_file_manager.h index cb626b1a6..fa18f836c 100644 --- a/include/rocksdb/sst_file_manager.h +++ b/include/rocksdb/sst_file_manager.h @@ -35,12 +35,22 @@ class SstFileManager { // thread-safe. virtual void SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) = 0; + // Set the amount of buffer room each compaction should be able to leave. + // In other words, at its maximum disk space consumption, the compaction + // should still leave compaction_buffer_size available on the disk so that + // other background functions may continue, such as logging and flushing. + virtual void SetCompactionBufferSize(uint64_t compaction_buffer_size) = 0; + // Return true if the total size of SST files exceeded the maximum allowed // space usage. // // thread-safe. virtual bool IsMaxAllowedSpaceReached() = 0; + // Returns true if the total size of SST files as well as estimated size + // of ongoing compactions exceeds the maximums allowed space usage. + virtual bool IsMaxAllowedSpaceReachedIncludingCompactions() = 0; + // Return the total size of all tracked files. // thread-safe virtual uint64_t GetTotalSize() = 0; diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 709f38370..3573d37e3 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -58,7 +58,8 @@ class Status { kAborted = 10, kBusy = 11, kExpired = 12, - kTryAgain = 13 + kTryAgain = 13, + kCompactionTooLarge = 14 }; Code code() const { return code_; } @@ -162,6 +163,14 @@ class Status { return Status(kTryAgain, msg, msg2); } + static Status CompactionTooLarge(SubCode msg = kNone) { + return Status(kCompactionTooLarge, msg); + } + static Status CompactionTooLarge(const Slice& msg, + const Slice& msg2 = Slice()) { + return Status(kCompactionTooLarge, msg, msg2); + } + static Status NoSpace() { return Status(kIOError, kNoSpace); } static Status NoSpace(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kIOError, kNoSpace, msg, msg2); @@ -221,6 +230,9 @@ class Status { // re-attempted. bool IsTryAgain() const { return code() == kTryAgain; } + // Returns true iff the status indicates the proposed compaction is too large + bool IsCompactionTooLarge() const { return code() == kCompactionTooLarge; } + // Returns true iff the status indicates a NoSpace error // This is caused by an I/O error returning the specific "out of space" // error condition. Stricto sensu, an NoSpace error is an I/O error diff --git a/util/sst_file_manager_impl.cc b/util/sst_file_manager_impl.cc index 61b08f23a..435a8f357 100644 --- a/util/sst_file_manager_impl.cc +++ b/util/sst_file_manager_impl.cc @@ -22,6 +22,8 @@ SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr logger, : env_(env), logger_(logger), total_files_size_(0), + compaction_buffer_size_(0), + cur_compactions_reserved_size_(0), max_allowed_space_(0), delete_scheduler_(env, rate_bytes_per_sec, logger.get(), this, max_trash_db_ratio) {} @@ -48,6 +50,18 @@ Status SstFileManagerImpl::OnDeleteFile(const std::string& file_path) { return Status::OK(); } +void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) { + MutexLock l(&mu_); + uint64_t size_added_by_compaction = 0; + for (size_t i = 0; i < c->num_input_levels(); i++) { + for (size_t j = 0; j < c->num_input_files(i); j++) { + FileMetaData* filemeta = c->input(i, j); + size_added_by_compaction += filemeta->fd.GetFileSize(); + } + } + cur_compactions_reserved_size_ -= size_added_by_compaction; +} + Status SstFileManagerImpl::OnMoveFile(const std::string& old_path, const std::string& new_path, uint64_t* file_size) { @@ -68,6 +82,12 @@ void SstFileManagerImpl::SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) { max_allowed_space_ = max_allowed_space; } +void SstFileManagerImpl::SetCompactionBufferSize( + uint64_t compaction_buffer_size) { + MutexLock l(&mu_); + compaction_buffer_size_ = compaction_buffer_size; +} + bool SstFileManagerImpl::IsMaxAllowedSpaceReached() { MutexLock l(&mu_); if (max_allowed_space_ <= 0) { @@ -76,6 +96,43 @@ bool SstFileManagerImpl::IsMaxAllowedSpaceReached() { return total_files_size_ >= max_allowed_space_; } +bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() { + MutexLock l(&mu_); + if (max_allowed_space_ <= 0) { + return false; + } + return total_files_size_ + cur_compactions_reserved_size_ >= + max_allowed_space_; +} + +bool SstFileManagerImpl::EnoughRoomForCompaction(Compaction* c) { + MutexLock l(&mu_); + uint64_t size_added_by_compaction = 0; + // First check if we even have the space to do the compaction + for (size_t i = 0; i < c->num_input_levels(); i++) { + for (size_t j = 0; j < c->num_input_files(i); j++) { + FileMetaData* filemeta = c->input(i, j); + size_added_by_compaction += filemeta->fd.GetFileSize(); + } + } + + if (max_allowed_space_ != 0 && + (size_added_by_compaction + cur_compactions_reserved_size_ + + total_files_size_ + compaction_buffer_size_ > + max_allowed_space_)) { + return false; + } + // Update cur_compactions_reserved_size_ so concurrent compaction + // don't max out space + cur_compactions_reserved_size_ += size_added_by_compaction; + return true; +} + +uint64_t SstFileManagerImpl::GetCompactionsReservedSize() { + MutexLock l(&mu_); + return cur_compactions_reserved_size_; +} + uint64_t SstFileManagerImpl::GetTotalSize() { MutexLock l(&mu_); return total_files_size_; diff --git a/util/sst_file_manager_impl.h b/util/sst_file_manager_impl.h index 1cb1d4fb1..db2ba0823 100644 --- a/util/sst_file_manager_impl.h +++ b/util/sst_file_manager_impl.h @@ -11,6 +11,7 @@ #include "port/port.h" +#include "db/compaction.h" #include "rocksdb/sst_file_manager.h" #include "util/delete_scheduler.h" @@ -50,12 +51,29 @@ class SstFileManagerImpl : public SstFileManager { // thread-safe. void SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) override; + void SetCompactionBufferSize(uint64_t compaction_buffer_size) override; + // Return true if the total size of SST files exceeded the maximum allowed // space usage. // // thread-safe. bool IsMaxAllowedSpaceReached() override; + bool IsMaxAllowedSpaceReachedIncludingCompactions() override; + + // Returns true is there is enough (approximate) space for the specified + // compaction. Space is approximate because this function conservatively + // estimates how much space is currently being used by compactions (i.e. + // if a compaction has started, this function bumps the used space by + // the full compaction size). + bool EnoughRoomForCompaction(Compaction* c); + + // Bookkeeping so total_file_sizes_ goes back to normal after compaction + // finishes + void OnCompactionCompletion(Compaction* c); + + uint64_t GetCompactionsReservedSize(); + // Return the total size of all tracked files. uint64_t GetTotalSize() override; @@ -95,6 +113,11 @@ class SstFileManagerImpl : public SstFileManager { port::Mutex mu_; // The summation of the sizes of all files in tracked_files_ map uint64_t total_files_size_; + // Compactions should only execute if they can leave at least + // this amount of buffer space for logs and flushes + uint64_t compaction_buffer_size_; + // Estimated size of the current ongoing compactions + uint64_t cur_compactions_reserved_size_; // A map containing all tracked files and there sizes // file_path => file_size std::unordered_map tracked_files_; diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 714a8d465..3983a53f2 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -811,7 +811,7 @@ TEST_F(BackupableDBTest, NoDoubleCopy) { test_db_env_->SetFilenamesForMockedAttrs(dummy_db_->live_files_); ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); std::vector should_have_written = { - "/shared/.00010.sst.tmp", "/shared/.00011.sst.tmp", + "/shared/.00010.sst.tmp", "/shared/.00011.sst.tmp", "/private/1.tmp/CURRENT", "/private/1.tmp/MANIFEST-01", "/private/1.tmp/00011.log", "/meta/.1.tmp"}; AppendPath(backupdir_, should_have_written);