diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 3247036b5..bcf803e14 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -212,6 +212,7 @@ CompactionJob::CompactionJob( const EnvOptions& env_options, VersionSet* versions, std::atomic* shutting_down, LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, + InstrumentedMutex* db_mutex, Status* db_bg_error, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, std::shared_ptr table_cache, EventLogger* event_logger, @@ -231,6 +232,8 @@ CompactionJob::CompactionJob( db_directory_(db_directory), output_directory_(output_directory), stats_(stats), + db_mutex_(db_mutex), + db_bg_error_(db_bg_error), existing_snapshots_(std::move(existing_snapshots)), earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), table_cache_(std::move(table_cache)), @@ -499,16 +502,11 @@ Status CompactionJob::Run() { } TablePropertiesCollection tp; - auto sfm = - static_cast(db_options_.sst_file_manager.get()); for (const auto& state : compact_->sub_compact_states) { for (const auto& output : state.outputs) { auto fn = TableFileName(db_options_.db_paths, output.meta.fd.GetNumber(), output.meta.fd.GetPathId()); tp[fn] = output.table_properties; - if (sfm && output.meta.fd.GetPathId() == 0) { - sfm->OnAddFile(fn); - } } } compact_->compaction->SetOutputTableProperties(std::move(tp)); @@ -524,18 +522,17 @@ Status CompactionJob::Run() { return status; } -Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options, - InstrumentedMutex* db_mutex) { +Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_INSTALL); - db_mutex->AssertHeld(); + db_mutex_->AssertHeld(); Status status = compact_->status; ColumnFamilyData* cfd = compact_->compaction->column_family_data(); cfd->internal_stats()->AddCompactionStats( compact_->compaction->output_level(), compaction_stats_); if (status.ok()) { - status = InstallCompactionResults(mutable_cf_options, db_mutex); + status = InstallCompactionResults(mutable_cf_options); } VersionStorageInfo::LevelSummaryStorage tmp; auto vstorage = cfd->current()->storage_info(); @@ -861,13 +858,33 @@ Status CompactionJob::FinishCompactionOutputFile( event_logger_, cfd->ioptions()->listeners, meta->fd, info); } } + + // Report new file to SstFileManagerImpl + auto sfm = + static_cast(db_options_.sst_file_manager.get()); + if (sfm && meta->fd.GetPathId() == 0) { + ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); + auto fn = TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(), + meta->fd.GetPathId()); + sfm->OnAddFile(fn); + if (sfm->IsMaxAllowedSpaceReached()) { + InstrumentedMutexLock l(db_mutex_); + if (db_bg_error_->ok()) { + s = Status::IOError("Max allowed space was reached"); + *db_bg_error_ = s; + TEST_SYNC_POINT( + "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached"); + } + } + } + sub_compact->builder.reset(); return s; } Status CompactionJob::InstallCompactionResults( - const MutableCFOptions& mutable_cf_options, InstrumentedMutex* db_mutex) { - db_mutex->AssertHeld(); + const MutableCFOptions& mutable_cf_options) { + db_mutex_->AssertHeld(); auto* compaction = compact_->compaction; // paranoia: verify that the files that we started with @@ -902,7 +919,7 @@ Status CompactionJob::InstallCompactionResults( } return versions_->LogAndApply(compaction->column_family_data(), mutable_cf_options, compaction->edit(), - db_mutex, db_directory_); + db_mutex_, db_directory_); } void CompactionJob::RecordCompactionIOStats() { diff --git a/db/compaction_job.h b/db/compaction_job.h index 125dc8fe4..c6edefbe0 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -56,7 +56,8 @@ class CompactionJob { const EnvOptions& env_options, VersionSet* versions, std::atomic* shutting_down, LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, - Statistics* stats, + Statistics* stats, InstrumentedMutex* db_mutex, + Status* db_bg_error, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, std::shared_ptr table_cache, EventLogger* event_logger, @@ -77,8 +78,7 @@ class CompactionJob { Status Run(); // REQUIRED: mutex held - Status Install(const MutableCFOptions& mutable_cf_options, - InstrumentedMutex* db_mutex); + Status Install(const MutableCFOptions& mutable_cf_options); private: struct SubcompactionState; @@ -95,8 +95,7 @@ class CompactionJob { Status FinishCompactionOutputFile(const Status& input_status, SubcompactionState* sub_compact); - Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options, - InstrumentedMutex* db_mutex); + Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options); void RecordCompactionIOStats(); Status OpenCompactionOutputFile(SubcompactionState* sub_compact); void CleanupCompaction(); @@ -130,6 +129,8 @@ class CompactionJob { Directory* db_directory_; Directory* output_directory_; Statistics* stats_; + InstrumentedMutex* db_mutex_; + Status* db_bg_error_; // If there were two snapshots with seq numbers s1 and // s2 and s1 < s2, and if we find two instances of a key k1 then lies // entirely within s1 and s2, then the earlier version of k1 can be safely diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index fc3a6b9f8..f3bc4cca9 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -250,9 +250,9 @@ class CompactionJobTest : public testing::Test { EventLogger event_logger(db_options_.info_log.get()); CompactionJob compaction_job( 0, &compaction, db_options_, env_options_, versions_.get(), - &shutting_down_, &log_buffer, nullptr, nullptr, nullptr, snapshots, - earliest_write_conflict_snapshot, table_cache_, &event_logger, false, - false, dbname_, &compaction_job_stats_); + &shutting_down_, &log_buffer, nullptr, nullptr, nullptr, &mutex_, + &bg_error_, snapshots, earliest_write_conflict_snapshot, table_cache_, + &event_logger, false, false, dbname_, &compaction_job_stats_); VerifyInitializationOfCompactionJobStats(compaction_job_stats_); @@ -262,8 +262,7 @@ class CompactionJobTest : public testing::Test { s = compaction_job.Run(); ASSERT_OK(s); mutex_.Lock(); - ASSERT_OK(compaction_job.Install(*cfd->GetLatestMutableCFOptions(), - &mutex_)); + ASSERT_OK(compaction_job.Install(*cfd->GetLatestMutableCFOptions())); mutex_.Unlock(); if (expected_results.size() == 0) { @@ -295,6 +294,7 @@ class CompactionJobTest : public testing::Test { ColumnFamilyData* cfd_; std::unique_ptr compaction_filter_; std::shared_ptr merge_op_; + Status bg_error_; }; TEST_F(CompactionJobTest, Simple) { diff --git a/db/db_impl.cc b/db/db_impl.cc index c5349ef0c..afe1a9c9d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1510,11 +1510,12 @@ Status DBImpl::FlushMemTableToOutputFile( bg_error_ = s; } RecordFlushIOStats(); -#ifndef ROCKSDB_LITE if (s.ok()) { +#ifndef ROCKSDB_LITE // may temporarily unlock and lock the mutex. NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options, job_context->job_id, flush_job.GetTableProperties()); +#endif // ROCKSDB_LITE auto sfm = static_cast(db_options_.sst_file_manager.get()); if (sfm) { @@ -1522,9 +1523,13 @@ Status DBImpl::FlushMemTableToOutputFile( std::string file_path = MakeTableFileName(db_options_.db_paths[0].path, file_meta.fd.GetNumber()); sfm->OnAddFile(file_path); + if (sfm->IsMaxAllowedSpaceReached() && bg_error_.ok()) { + bg_error_ = Status::IOError("Max allowed space was reached"); + TEST_SYNC_POINT( + "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached"); + } } } -#endif // ROCKSDB_LITE return s; } @@ -1818,9 +1823,9 @@ Status DBImpl::CompactFilesImpl( CompactionJob compaction_job( job_context->job_id, c.get(), db_options_, env_options_, versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), - directories_.GetDataDir(c->output_path_id()), stats_, snapshot_seqs, - earliest_write_conflict_snapshot, table_cache_, &event_logger_, - c->mutable_cf_options()->paranoid_file_checks, + directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_, + snapshot_seqs, earliest_write_conflict_snapshot, table_cache_, + &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->compaction_measure_io_stats, dbname_, nullptr); // Here we pass a nullptr for CompactionJobStats because // CompactFiles does not trigger OnCompactionCompleted(), @@ -1843,7 +1848,7 @@ Status DBImpl::CompactFilesImpl( compaction_job.Run(); mutex_.Lock(); - Status status = compaction_job.Install(*c->mutable_cf_options(), &mutex_); + Status status = compaction_job.Install(*c->mutable_cf_options()); if (status.ok()) { InstallSuperVersionAndScheduleWorkWrapper( c->column_family_data(), job_context, *c->mutable_cf_options()); @@ -2994,8 +2999,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, CompactionJob compaction_job( job_context->job_id, c.get(), db_options_, env_options_, versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), - directories_.GetDataDir(c->output_path_id()), stats_, snapshot_seqs, - earliest_write_conflict_snapshot, table_cache_, &event_logger_, + directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, + &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, + table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->compaction_measure_io_stats, dbname_, &compaction_job_stats); @@ -3006,7 +3012,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun"); mutex_.Lock(); - status = compaction_job.Install(*c->mutable_cf_options(), &mutex_); + status = compaction_job.Install(*c->mutable_cf_options()); if (status.ok()) { InstallSuperVersionAndScheduleWorkWrapper( c->column_family_data(), job_context, *c->mutable_cf_options()); diff --git a/db/db_test.cc b/db/db_test.cc index a1dd6fb18..c9c2a6392 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -8636,7 +8636,6 @@ TEST_F(DBTest, DeletingOldWalAfterDrop) { EXPECT_GT(lognum2, lognum1); } -#ifndef ROCKSDB_LITE TEST_F(DBTest, DBWithSstFileManager) { std::shared_ptr sst_file_manager(NewSstFileManager(env_)); auto sfm = static_cast(sst_file_manager.get()); @@ -8701,6 +8700,7 @@ TEST_F(DBTest, DBWithSstFileManager) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +#ifndef ROCKSDB_LITE TEST_F(DBTest, RateLimitedDelete) { rocksdb::SyncPoint::GetInstance()->LoadDependency({ {"DBTest::RateLimitedDelete:1", "DeleteScheduler::BackgroundEmptyTrash"}, @@ -8873,6 +8873,102 @@ TEST_F(DBTest, DestroyDBWithRateLimitedDelete) { } #endif // ROCKSDB_LITE +TEST_F(DBTest, DBWithMaxSpaceAllowed) { + std::shared_ptr sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast(sst_file_manager.get()); + + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + Random rnd(301); + + // Generate a file containing 100 keys. + for (int i = 0; i < 100; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 50))); + } + ASSERT_OK(Flush()); + + uint64_t first_file_size = 0; + auto files_in_db = GetAllSSTFiles(&first_file_size); + ASSERT_EQ(sfm->GetTotalSize(), first_file_size); + + // Set the maximum allowed space usage to the current total size + sfm->SetMaxAllowedSpaceUsage(first_file_size + 1); + + ASSERT_OK(Put("key1", "val1")); + // This flush will cause bg_error_ and will fail + ASSERT_NOK(Flush()); +} + +TEST_F(DBTest, DBWithMaxSpaceAllowedRandomized) { + // This test will set a maximum allowed space for the DB, then it will + // keep filling the DB until the limit is reached and bg_error_ is set. + // When bg_error_ is set we will verify that the DB size is greater + // than the limit. + + std::vector max_space_limits_mbs = {1, 2, 4, 8, 10}; + + bool bg_error_set = false; + uint64_t total_sst_files_size = 0; + + int reached_max_space_on_flush = 0; + int reached_max_space_on_compaction = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached", + [&](void* arg) { + bg_error_set = true; + GetAllSSTFiles(&total_sst_files_size); + reached_max_space_on_flush++; + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached", + [&](void* arg) { + bg_error_set = true; + GetAllSSTFiles(&total_sst_files_size); + reached_max_space_on_compaction++; + }); + + for (auto limit_mb : max_space_limits_mbs) { + bg_error_set = false; + total_sst_files_size = 0; + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + std::shared_ptr sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast(sst_file_manager.get()); + + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.write_buffer_size = 1024 * 512; // 512 Kb + DestroyAndReopen(options); + Random rnd(301); + + sfm->SetMaxAllowedSpaceUsage(limit_mb * 1024 * 1024); + + int keys_written = 0; + uint64_t estimated_db_size = 0; + while (true) { + auto s = Put(RandomString(&rnd, 10), RandomString(&rnd, 50)); + if (!s.ok()) { + break; + } + keys_written++; + // Check the estimated db size vs the db limit just to make sure we + // dont run into an infinite loop + estimated_db_size = keys_written * 60; // ~60 bytes per key + ASSERT_LT(estimated_db_size, limit_mb * 1024 * 1024 * 2); + } + ASSERT_TRUE(bg_error_set); + ASSERT_GE(total_sst_files_size, limit_mb * 1024 * 1024); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + } + + ASSERT_GT(reached_max_space_on_flush, 0); + ASSERT_GT(reached_max_space_on_compaction, 0); +} + TEST_F(DBTest, UnsupportedManualSync) { DestroyAndReopen(CurrentOptions()); env_->is_wal_sync_thread_safe_.store(false); diff --git a/db/db_test_util.cc b/db/db_test_util.cc index d601ec7eb..950941817 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -1015,9 +1015,13 @@ void DBTestBase::CopyFile(const std::string& source, ASSERT_OK(destfile->Close()); } -std::unordered_map DBTestBase::GetAllSSTFiles() { +std::unordered_map DBTestBase::GetAllSSTFiles( + uint64_t* total_size) { std::unordered_map res; + if (total_size) { + *total_size = 0; + } std::vector files; env_->GetChildren(dbname_, &files); for (auto& file_name : files) { @@ -1028,6 +1032,9 @@ std::unordered_map DBTestBase::GetAllSSTFiles() { uint64_t file_size = 0; env_->GetFileSize(file_path, &file_size); res[file_path] = file_size; + if (total_size) { + *total_size += file_size; + } } } return res; diff --git a/db/db_test_util.h b/db/db_test_util.h index 9eedf8d35..ca2b466e9 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -754,7 +754,8 @@ class DBTestBase : public testing::Test { void CopyFile(const std::string& source, const std::string& destination, uint64_t size = 0); - std::unordered_map GetAllSSTFiles(); + std::unordered_map GetAllSSTFiles( + uint64_t* total_size = nullptr); }; } // namespace rocksdb diff --git a/include/rocksdb/sst_file_manager.h b/include/rocksdb/sst_file_manager.h index 56d28c69f..bee243e4a 100644 --- a/include/rocksdb/sst_file_manager.h +++ b/include/rocksdb/sst_file_manager.h @@ -23,6 +23,22 @@ class SstFileManager { public: virtual ~SstFileManager() {} + // Update the maximum allowed space that should be used by RocksDB, if + // the total size of the SST files exceeds max_allowed_space, writes to + // RocksDB will fail. + // + // Setting max_allowed_space to 0 will disable this feature, maximum allowed + // space will be infinite (Default value). + // + // thread-safe. + virtual void SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) = 0; + + // Return true if the total size of SST files exceeded the maximum allowed + // space usage. + // + // thread-safe. + virtual bool IsMaxAllowedSpaceReached() = 0; + // Return the total size of all tracked files. // thread-safe virtual uint64_t GetTotalSize() = 0; diff --git a/util/sst_file_manager_impl.cc b/util/sst_file_manager_impl.cc index b518bb7e5..8a29f1fec 100644 --- a/util/sst_file_manager_impl.cc +++ b/util/sst_file_manager_impl.cc @@ -56,6 +56,19 @@ Status SstFileManagerImpl::OnMoveFile(const std::string& old_path, return Status::OK(); } +void SstFileManagerImpl::SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) { + MutexLock l(&mu_); + max_allowed_space_ = max_allowed_space; +} + +bool SstFileManagerImpl::IsMaxAllowedSpaceReached() { + MutexLock l(&mu_); + if (max_allowed_space_ <= 0) { + return false; + } + return total_files_size_ >= max_allowed_space_; +} + uint64_t SstFileManagerImpl::GetTotalSize() { MutexLock l(&mu_); return total_files_size_; diff --git a/util/sst_file_manager_impl.h b/util/sst_file_manager_impl.h index 5f44d631c..ca9ddedba 100644 --- a/util/sst_file_manager_impl.h +++ b/util/sst_file_manager_impl.h @@ -37,6 +37,22 @@ class SstFileManagerImpl : public SstFileManager { // DB will call OnMoveFile whenever an sst file is move to a new path. Status OnMoveFile(const std::string& old_path, const std::string& new_path); + // Update the maximum allowed space that should be used by RocksDB, if + // the total size of the SST files exceeds max_allowed_space, writes to + // RocksDB will fail. + // + // Setting max_allowed_space to 0 will disable this feature, maximum allowed + // space will be infinite (Default value). + // + // thread-safe. + void SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) override; + + // Return true if the total size of SST files exceeded the maximum allowed + // space usage. + // + // thread-safe. + bool IsMaxAllowedSpaceReached() override; + // Return the total size of all tracked files. uint64_t GetTotalSize() override; @@ -68,6 +84,8 @@ class SstFileManagerImpl : public SstFileManager { // A map containing all tracked files and there sizes // file_path => file_size std::unordered_map tracked_files_; + // The maximum allowed space (in bytes) for sst files. + uint64_t max_allowed_space_; // DeleteScheduler used to throttle file deletition, if SstFileManagerImpl was // created with rate_bytes_per_sec == 0 or trash_dir == "", delete_scheduler_ // rate limiting will be disabled and will simply delete the files.