diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 3465ab2d4..1bbd2c2d8 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -548,9 +548,9 @@ TEST_F(ColumnFamilyTest, FlushTest) { for (int i = 0; i < 3; ++i) { uint64_t max_total_in_memory_state = - dbfull()->TEST_max_total_in_memory_state(); + dbfull()->TEST_MaxTotalInMemoryState(); Flush(i); - ASSERT_EQ(dbfull()->TEST_max_total_in_memory_state(), + ASSERT_EQ(dbfull()->TEST_MaxTotalInMemoryState(), max_total_in_memory_state); } ASSERT_OK(Put(1, "foofoo", "bar")); diff --git a/db/db_impl.cc b/db/db_impl.cc index cfd66736a..74ab6657b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -91,16 +91,12 @@ void DumpRocksDBBuildVersion(Logger * log); struct DBImpl::WriteContext { autovector superversions_to_free_; - autovector logs_to_free_; bool schedule_bg_work_ = false; ~WriteContext() { for (auto& sv : superversions_to_free_) { delete sv; } - for (auto& log : logs_to_free_) { - delete log; - } } }; @@ -355,6 +351,10 @@ DBImpl::~DBImpl() { job_context.Clean(); } + for (auto l : logs_to_free_) { + delete l; + } + // versions need to be destroyed before table_cache since it can hold // references to table_cache. versions_.reset(); @@ -1994,6 +1994,10 @@ void DBImpl::BackgroundCallFlush() { ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); + // We're just cleaning up for DB::Write() + job_context.logs_to_free = logs_to_free_; + logs_to_free_.clear(); + // If flush failed, we want to delete all temporary files that we might have // created. Thus, we force full scan in FindObsoleteFiles() FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); @@ -2060,6 +2064,10 @@ void DBImpl::BackgroundCallCompaction() { ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); + // We're just cleaning up for DB::Write() + job_context.logs_to_free = logs_to_free_; + logs_to_free_.clear(); + // If compaction failed, we want to delete all temporary files that we might // have created (they might not be all recorded in job_context in case of a // failure). Thus, we force full scan in FindObsoleteFiles() @@ -3394,7 +3402,7 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, if (creating_new_log) { logfile_number_ = new_log_number; assert(new_log != nullptr); - context->logs_to_free_.push_back(log_.release()); + logs_to_free_.push_back(log_.release()); log_.reset(new_log); log_empty_ = true; alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); diff --git a/db/db_impl.h b/db/db_impl.h index 3099d1da9..617f631f8 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -243,10 +243,12 @@ class DBImpl : public DB { // pass the pointer that you got from TEST_BeginWrite() void TEST_EndWrite(void* w); - uint64_t TEST_max_total_in_memory_state() { + uint64_t TEST_MaxTotalInMemoryState() const { return max_total_in_memory_state_; } + size_t TEST_LogsToFreeSize(); + #endif // ROCKSDB_LITE // Returns the list of live files in 'live' and the list @@ -461,6 +463,9 @@ class DBImpl : public DB { // If true, we have only one (default) column family. We use this to optimize // some code-paths bool single_column_family_mode_; + // If this is non-empty, we need to delete these log files in background + // threads. Protected by db mutex. + autovector logs_to_free_; bool is_snapshot_supported_; diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index efa209a2b..5c7a353f5 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -141,5 +141,10 @@ void DBImpl::TEST_EndWrite(void* w) { delete writer; } +size_t DBImpl::TEST_LogsToFreeSize() { + InstrumentedMutexLock l(&mutex_); + return logs_to_free_.size(); +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/db/db_test.cc b/db/db_test.cc index 57558dbf6..3e3b74c91 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11952,6 +11952,21 @@ TEST_F(DBTest, FilterCompactionTimeTest) { delete itr; } +TEST_F(DBTest, TestLogCleanup) { + Options options = CurrentOptions(); + options.write_buffer_size = 64 * 1024; // very small + // only two memtables allowed ==> only two log files + options.max_write_buffer_number = 2; + Reopen(options); + + for (int i = 0; i < 100000; ++i) { + Put(Key(i), "val"); + // only 2 memtables will be alive, so logs_to_free needs to always be below + // 2 + ASSERT_LT(dbfull()->TEST_LogsToFreeSize(), static_cast(3)); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/job_context.h b/db/job_context.h index befa974eb..d0281443e 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -13,6 +13,7 @@ #include #include "db/column_family.h" +#include "db/log_writer.h" namespace rocksdb { @@ -22,7 +23,8 @@ struct JobContext { inline bool HaveSomethingToDelete() const { return full_scan_candidate_files.size() || sst_delete_files.size() || log_delete_files.size() || new_superversion != nullptr || - superversions_to_free.size() > 0 || memtables_to_free.size() > 0; + superversions_to_free.size() > 0 || memtables_to_free.size() > 0 || + logs_to_free.size() > 0; } // Structure to store information for candidate files to delete. @@ -59,6 +61,8 @@ struct JobContext { autovector superversions_to_free; + autovector logs_to_free; + SuperVersion* new_superversion; // if nullptr no new superversion // the current manifest_file_number, log_number and prev_log_number @@ -88,12 +92,16 @@ struct JobContext { for (auto s : superversions_to_free) { delete s; } + for (auto l : logs_to_free) { + delete l; + } // if new_superversion was not used, it will be non-nullptr and needs // to be freed here delete new_superversion; memtables_to_free.clear(); superversions_to_free.clear(); + logs_to_free.clear(); new_superversion = nullptr; }