Clean up old log files in background threads

Summary:
Cleaning up log files can do heavy IO, since we call ftruncate() in the destructor. We don't want to call ftruncate() in user threads.

This diff moves cleaning to background threads (flush and compaction)

Test Plan: make check, will also run valgrind

Reviewers: yhchiang, rven, MarkCallaghan, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D36177
main
Igor Canadi 10 years ago
parent 99ec2412e5
commit fd3dbef22b
  1. 4
      db/column_family_test.cc
  2. 18
      db/db_impl.cc
  3. 7
      db/db_impl.h
  4. 5
      db/db_impl_debug.cc
  5. 15
      db/db_test.cc
  6. 10
      db/job_context.h

@ -548,9 +548,9 @@ TEST_F(ColumnFamilyTest, FlushTest) {
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
uint64_t max_total_in_memory_state = uint64_t max_total_in_memory_state =
dbfull()->TEST_max_total_in_memory_state(); dbfull()->TEST_MaxTotalInMemoryState();
Flush(i); Flush(i);
ASSERT_EQ(dbfull()->TEST_max_total_in_memory_state(), ASSERT_EQ(dbfull()->TEST_MaxTotalInMemoryState(),
max_total_in_memory_state); max_total_in_memory_state);
} }
ASSERT_OK(Put(1, "foofoo", "bar")); ASSERT_OK(Put(1, "foofoo", "bar"));

@ -91,16 +91,12 @@ void DumpRocksDBBuildVersion(Logger * log);
struct DBImpl::WriteContext { struct DBImpl::WriteContext {
autovector<SuperVersion*> superversions_to_free_; autovector<SuperVersion*> superversions_to_free_;
autovector<log::Writer*> logs_to_free_;
bool schedule_bg_work_ = false; bool schedule_bg_work_ = false;
~WriteContext() { ~WriteContext() {
for (auto& sv : superversions_to_free_) { for (auto& sv : superversions_to_free_) {
delete sv; delete sv;
} }
for (auto& log : logs_to_free_) {
delete log;
}
} }
}; };
@ -355,6 +351,10 @@ DBImpl::~DBImpl() {
job_context.Clean(); job_context.Clean();
} }
for (auto l : logs_to_free_) {
delete l;
}
// versions need to be destroyed before table_cache since it can hold // versions need to be destroyed before table_cache since it can hold
// references to table_cache. // references to table_cache.
versions_.reset(); versions_.reset();
@ -1994,6 +1994,10 @@ void DBImpl::BackgroundCallFlush() {
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
// We're just cleaning up for DB::Write()
job_context.logs_to_free = logs_to_free_;
logs_to_free_.clear();
// If flush failed, we want to delete all temporary files that we might have // If flush failed, we want to delete all temporary files that we might have
// created. Thus, we force full scan in FindObsoleteFiles() // created. Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
@ -2060,6 +2064,10 @@ void DBImpl::BackgroundCallCompaction() {
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
// We're just cleaning up for DB::Write()
job_context.logs_to_free = logs_to_free_;
logs_to_free_.clear();
// If compaction failed, we want to delete all temporary files that we might // If compaction failed, we want to delete all temporary files that we might
// have created (they might not be all recorded in job_context in case of a // have created (they might not be all recorded in job_context in case of a
// failure). Thus, we force full scan in FindObsoleteFiles() // failure). Thus, we force full scan in FindObsoleteFiles()
@ -3394,7 +3402,7 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
if (creating_new_log) { if (creating_new_log) {
logfile_number_ = new_log_number; logfile_number_ = new_log_number;
assert(new_log != nullptr); assert(new_log != nullptr);
context->logs_to_free_.push_back(log_.release()); logs_to_free_.push_back(log_.release());
log_.reset(new_log); log_.reset(new_log);
log_empty_ = true; log_empty_ = true;
alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); alive_log_files_.push_back(LogFileNumberSize(logfile_number_));

@ -243,10 +243,12 @@ class DBImpl : public DB {
// pass the pointer that you got from TEST_BeginWrite() // pass the pointer that you got from TEST_BeginWrite()
void TEST_EndWrite(void* w); void TEST_EndWrite(void* w);
uint64_t TEST_max_total_in_memory_state() { uint64_t TEST_MaxTotalInMemoryState() const {
return max_total_in_memory_state_; return max_total_in_memory_state_;
} }
size_t TEST_LogsToFreeSize();
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// Returns the list of live files in 'live' and the list // Returns the list of live files in 'live' and the list
@ -461,6 +463,9 @@ class DBImpl : public DB {
// If true, we have only one (default) column family. We use this to optimize // If true, we have only one (default) column family. We use this to optimize
// some code-paths // some code-paths
bool single_column_family_mode_; bool single_column_family_mode_;
// If this is non-empty, we need to delete these log files in background
// threads. Protected by db mutex.
autovector<log::Writer*> logs_to_free_;
bool is_snapshot_supported_; bool is_snapshot_supported_;

@ -141,5 +141,10 @@ void DBImpl::TEST_EndWrite(void* w) {
delete writer; delete writer;
} }
size_t DBImpl::TEST_LogsToFreeSize() {
InstrumentedMutexLock l(&mutex_);
return logs_to_free_.size();
}
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -11952,6 +11952,21 @@ TEST_F(DBTest, FilterCompactionTimeTest) {
delete itr; delete itr;
} }
TEST_F(DBTest, TestLogCleanup) {
Options options = CurrentOptions();
options.write_buffer_size = 64 * 1024; // very small
// only two memtables allowed ==> only two log files
options.max_write_buffer_number = 2;
Reopen(options);
for (int i = 0; i < 100000; ++i) {
Put(Key(i), "val");
// only 2 memtables will be alive, so logs_to_free needs to always be below
// 2
ASSERT_LT(dbfull()->TEST_LogsToFreeSize(), static_cast<size_t>(3));
}
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -13,6 +13,7 @@
#include <vector> #include <vector>
#include "db/column_family.h" #include "db/column_family.h"
#include "db/log_writer.h"
namespace rocksdb { namespace rocksdb {
@ -22,7 +23,8 @@ struct JobContext {
inline bool HaveSomethingToDelete() const { inline bool HaveSomethingToDelete() const {
return full_scan_candidate_files.size() || sst_delete_files.size() || return full_scan_candidate_files.size() || sst_delete_files.size() ||
log_delete_files.size() || new_superversion != nullptr || log_delete_files.size() || new_superversion != nullptr ||
superversions_to_free.size() > 0 || memtables_to_free.size() > 0; superversions_to_free.size() > 0 || memtables_to_free.size() > 0 ||
logs_to_free.size() > 0;
} }
// Structure to store information for candidate files to delete. // Structure to store information for candidate files to delete.
@ -59,6 +61,8 @@ struct JobContext {
autovector<SuperVersion*> superversions_to_free; autovector<SuperVersion*> superversions_to_free;
autovector<log::Writer*> logs_to_free;
SuperVersion* new_superversion; // if nullptr no new superversion SuperVersion* new_superversion; // if nullptr no new superversion
// the current manifest_file_number, log_number and prev_log_number // the current manifest_file_number, log_number and prev_log_number
@ -88,12 +92,16 @@ struct JobContext {
for (auto s : superversions_to_free) { for (auto s : superversions_to_free) {
delete s; delete s;
} }
for (auto l : logs_to_free) {
delete l;
}
// if new_superversion was not used, it will be non-nullptr and needs // if new_superversion was not used, it will be non-nullptr and needs
// to be freed here // to be freed here
delete new_superversion; delete new_superversion;
memtables_to_free.clear(); memtables_to_free.clear();
superversions_to_free.clear(); superversions_to_free.clear();
logs_to_free.clear();
new_superversion = nullptr; new_superversion = nullptr;
} }

Loading…
Cancel
Save