[wal changes 1/3] fixed unbounded wal growth in some workloads

Summary:
This fixes the following scenario we've hit:
 - we reached max_total_wal_size, created a new wal and scheduled flushing all memtables corresponding to the old one,
 - before the last of these flushes started its column family was dropped; the last background flush call was a no-op; no one removed the old wal from alive_logs_,
 - hours have passed and no flushes happened even though lots of data was written; data is written to different column families, compactions are disabled; old column families are dropped before memtable grows big enough to trigger a flush; the old wal still sits in alive_logs_ preventing max_total_wal_size limit from kicking in,
 - a few more hours pass and we run out disk space because of one huge .log file.

Test Plan: `make check`; backported the new test, checked that it fails without this diff

Reviewers: igor

Reviewed By: igor

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D40893
main
Mike Kolupaev 10 years ago
parent e70115e71b
commit 218487d8dc
  1. 39
      db/db_impl.cc
  2. 2
      db/db_impl.h
  3. 5
      db/db_impl_debug.cc
  4. 36
      db/db_test.cc
  5. 4
      db/version_set.h

@ -527,6 +527,24 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
versions_->GetObsoleteFiles(&job_context->sst_delete_files, versions_->GetObsoleteFiles(&job_context->sst_delete_files,
job_context->min_pending_output); job_context->min_pending_output);
uint64_t min_log_number = versions_->MinLogNumber();
if (!alive_log_files_.empty()) {
// find newly obsoleted log files
while (alive_log_files_.begin()->number < min_log_number) {
auto& earliest = *alive_log_files_.begin();
job_context->log_delete_files.push_back(earliest.number);
total_log_size_ -= earliest.size;
alive_log_files_.pop_front();
// Current log should always stay alive since it can't have
// number < MinLogNumber().
assert(alive_log_files_.size());
}
}
// We're just cleaning up for DB::Write().
job_context->logs_to_free = logs_to_free_;
logs_to_free_.clear();
// store the current filenum, lognum, etc // store the current filenum, lognum, etc
job_context->manifest_file_number = versions_->manifest_file_number(); job_context->manifest_file_number = versions_->manifest_file_number();
job_context->pending_manifest_file_number = job_context->pending_manifest_file_number =
@ -1309,17 +1327,6 @@ Status DBImpl::FlushMemTableToOutputFile(
VersionStorageInfo::LevelSummaryStorage tmp; VersionStorageInfo::LevelSummaryStorage tmp;
LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(), LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(),
cfd->current()->storage_info()->LevelSummary(&tmp)); cfd->current()->storage_info()->LevelSummary(&tmp));
if (disable_delete_obsolete_files_ == 0) {
// add to deletion state
while (alive_log_files_.size() &&
alive_log_files_.begin()->number < versions_->MinLogNumber()) {
const auto& earliest = *alive_log_files_.begin();
job_context->log_delete_files.push_back(earliest.number);
total_log_size_ -= earliest.size;
alive_log_files_.pop_front();
}
}
} }
if (!s.ok() && !s.IsShutdownInProgress() && db_options_.paranoid_checks && if (!s.ok() && !s.IsShutdownInProgress() && db_options_.paranoid_checks &&
@ -2145,7 +2152,9 @@ void DBImpl::RecordFlushIOStats() {
void DBImpl::BGWorkFlush(void* db) { void DBImpl::BGWorkFlush(void* db) {
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
TEST_SYNC_POINT("DBImpl::BGWorkFlush");
reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush(); reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
} }
void DBImpl::BGWorkCompaction(void* db) { void DBImpl::BGWorkCompaction(void* db) {
@ -2238,10 +2247,6 @@ void DBImpl::BackgroundCallFlush() {
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
// We're just cleaning up for DB::Write()
job_context.logs_to_free = logs_to_free_;
logs_to_free_.clear();
// If flush failed, we want to delete all temporary files that we might have // If flush failed, we want to delete all temporary files that we might have
// created. Thus, we force full scan in FindObsoleteFiles() // created. Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
@ -2308,10 +2313,6 @@ void DBImpl::BackgroundCallCompaction() {
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
// We're just cleaning up for DB::Write()
job_context.logs_to_free = logs_to_free_;
logs_to_free_.clear();
// If compaction failed, we want to delete all temporary files that we might // If compaction failed, we want to delete all temporary files that we might
// have created (they might not be all recorded in job_context in case of a // have created (they might not be all recorded in job_context in case of a
// failure). Thus, we force full scan in FindObsoleteFiles() // failure). Thus, we force full scan in FindObsoleteFiles()

@ -290,6 +290,8 @@ class DBImpl : public DB {
size_t TEST_LogsToFreeSize(); size_t TEST_LogsToFreeSize();
uint64_t TEST_LogfileNumber();
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// Returns the list of live files in 'live' and the list // Returns the list of live files in 'live' and the list

@ -148,5 +148,10 @@ size_t DBImpl::TEST_LogsToFreeSize() {
return logs_to_free_.size(); return logs_to_free_.size();
} }
uint64_t DBImpl::TEST_LogfileNumber() {
InstrumentedMutexLock l(&mutex_);
return logfile_number_;
}
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -8540,7 +8540,6 @@ TEST_F(DBTest, TransactionLogIterator) {
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
#ifndef NDEBUG // sync point is not included with DNDEBUG build
TEST_F(DBTest, TransactionLogIteratorRace) { TEST_F(DBTest, TransactionLogIteratorRace) {
static const int LOG_ITERATOR_RACE_TEST_COUNT = 2; static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;
static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = { static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = {
@ -8595,7 +8594,6 @@ TEST_F(DBTest, TransactionLogIteratorRace) {
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
} }
#endif
TEST_F(DBTest, TransactionLogIteratorStallAtLastRecord) { TEST_F(DBTest, TransactionLogIteratorStallAtLastRecord) {
do { do {
@ -14136,6 +14134,40 @@ TEST_F(DBTest, PrevAfterMerge) {
ASSERT_EQ("1", it->key().ToString()); ASSERT_EQ("1", it->key().ToString());
} }
TEST_F(DBTest, DeletingOldWalAfterDrop) {
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{ { "Test:AllowFlushes", "DBImpl::BGWorkFlush" },
{ "DBImpl::BGWorkFlush:done", "Test:WaitForFlush"} });
rocksdb::SyncPoint::GetInstance()->ClearTrace();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
Options options = CurrentOptions();
options.max_total_wal_size = 8192;
options.compression = kNoCompression;
options.write_buffer_size = 1 << 20;
options.level0_file_num_compaction_trigger = (1<<30);
options.level0_slowdown_writes_trigger = (1<<30);
options.level0_stop_writes_trigger = (1<<30);
options.disable_auto_compactions = true;
DestroyAndReopen(options);
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
CreateColumnFamilies({"cf1", "cf2"}, options);
ASSERT_OK(Put(0, "key1", DummyString(8192)));
ASSERT_OK(Put(0, "key2", DummyString(8192)));
// the oldest wal should now be getting_flushed
ASSERT_OK(db_->DropColumnFamily(handles_[0]));
// all flushes should now do nothing because their CF is dropped
TEST_SYNC_POINT("Test:AllowFlushes");
TEST_SYNC_POINT("Test:WaitForFlush");
uint64_t lognum1 = dbfull()->TEST_LogfileNumber();
ASSERT_OK(Put(1, "key3", DummyString(8192)));
ASSERT_OK(Put(1, "key4", DummyString(8192)));
// new wal should have been created
uint64_t lognum2 = dbfull()->TEST_LogfileNumber();
EXPECT_GT(lognum2, lognum1);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -612,7 +612,9 @@ class VersionSet {
uint64_t MinLogNumber() const { uint64_t MinLogNumber() const {
uint64_t min_log_num = std::numeric_limits<uint64_t>::max(); uint64_t min_log_num = std::numeric_limits<uint64_t>::max();
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
if (min_log_num > cfd->GetLogNumber()) { // It's safe to ignore dropped column families here:
// cfd->IsDropped() becomes true after the drop is persisted in MANIFEST.
if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) {
min_log_num = cfd->GetLogNumber(); min_log_num = cfd->GetLogNumber();
} }
} }

Loading…
Cancel
Save