diff --git a/db/db_impl.cc b/db/db_impl.cc index 6876e891b..697296576 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -700,8 +700,21 @@ void DBImpl::BackgroundCall() { MutexLock l(&mutex_); assert(bg_compaction_scheduled_); if (!shutting_down_.Acquire_Load()) { - BackgroundCompaction(); + Status s = BackgroundCompaction(); + if (!s.ok()) { + // Wait a little bit before retrying background compaction in + // case this is an environmental problem and we do not want to + // chew up resources for failed compactions for the duration of + // the problem. + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + Log(options_.info_log, "Waiting after background compaction error: %s", + s.ToString().c_str()); + mutex_.Unlock(); + env_->SleepForMicroseconds(1000000); + mutex_.Lock(); + } } + bg_compaction_scheduled_ = false; MaybeScheduleLogDBDeployStats(); @@ -712,12 +725,11 @@ void DBImpl::BackgroundCall() { bg_cv_.SignalAll(); } -void DBImpl::BackgroundCompaction() { +Status DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); if (imm_ != NULL) { - CompactMemTable(); - return; + return CompactMemTable(); } Compaction* c; @@ -792,6 +804,7 @@ void DBImpl::BackgroundCompaction() { } manual_compaction_ = NULL; } + return status; } void DBImpl::CleanupCompaction(CompactionState* compact) { @@ -1368,6 +1381,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { } else if (imm_ != NULL) { // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. + Log(options_.info_log, "wait for memtable compaction...\n"); bg_cv_.Wait(); } else if (versions_->NumLevelFiles(0) >= options_.level0_stop_writes_trigger) { diff --git a/db/db_impl.h b/db/db_impl.h index b618af809..3f64d58d3 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -121,7 +121,7 @@ class DBImpl : public DB { void MaybeScheduleCompaction(); static void BGWork(void* db); void BackgroundCall(); - void BackgroundCompaction(); + Status BackgroundCompaction(); void CleanupCompaction(CompactionState* compact); Status DoCompactionWork(CompactionState* compact); diff --git a/db/db_test.cc b/db/db_test.cc index 674667c4d..a4f171c66 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -56,12 +56,18 @@ class SpecialEnv : public EnvWrapper { // Simulate no-space errors while this pointer is non-NULL. port::AtomicPointer no_space_; + // Simulate non-writable file system while this pointer is non-NULL + port::AtomicPointer non_writable_; + bool count_random_reads_; AtomicCounter random_read_counter_; + AtomicCounter sleep_counter_; + explicit SpecialEnv(Env* base) : EnvWrapper(base) { delay_sstable_sync_.Release_Store(NULL); no_space_.Release_Store(NULL); + non_writable_.Release_Store(NULL); count_random_reads_ = false; } @@ -95,6 +101,10 @@ class SpecialEnv : public EnvWrapper { } }; + if (non_writable_.Acquire_Load() != NULL) { + return Status::IOError("simulated write error"); + } + Status s = target()->NewWritableFile(f, r); if (s.ok()) { if (strstr(f.c_str(), ".sst") != NULL) { @@ -127,6 +137,11 @@ class SpecialEnv : public EnvWrapper { } return s; } + + virtual void SleepForMicroseconds(int micros) { + sleep_counter_.Increment(); + target()->SleepForMicroseconds(micros); + } }; class DBTest { @@ -1568,13 +1583,37 @@ TEST(DBTest, NoSpace) { Compact("a", "z"); const int num_files = CountFiles(); env_->no_space_.Release_Store(env_); // Force out-of-space errors - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 5; i++) { for (int level = 0; level < dbfull()->NumberLevels()-1; level++) { dbfull()->TEST_CompactRange(level, NULL, NULL); } } env_->no_space_.Release_Store(NULL); - ASSERT_LT(CountFiles(), num_files + 5); + ASSERT_LT(CountFiles(), num_files + 3); + + // Check that compaction attempts slept after errors + ASSERT_GE(env_->sleep_counter_.Read(), 5); +} + +TEST(DBTest, NonWritableFileSystem) +{ + Options options = CurrentOptions(); + options.write_buffer_size = 1000; + options.env = env_; + Reopen(&options); + ASSERT_OK(Put("foo", "v1")); + env_->non_writable_.Release_Store(env_); // Force errors for new files + std::string big(100000, 'x'); + int errors = 0; + for (int i = 0; i < 20; i++) { + fprintf(stderr, "iter %d; errors %d\n", i, errors); + if (!Put("foo", big).ok()) { + errors++; + env_->SleepForMicroseconds(100000); + } + } + ASSERT_GT(errors, 0); + env_->non_writable_.Release_Store(NULL); } TEST(DBTest, FilesDeletedAfterCompaction) {