diff --git a/db/db_impl.cc b/db/db_impl.cc index c53a4bd92..a47668763 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2347,7 +2347,7 @@ void DBImpl::CleanupCompaction(CompactionState* compact, Status status) { compact->builder->Abandon(); compact->builder.reset(); } else { - assert(compact->outfile == nullptr); + assert(!status.ok() || compact->outfile == nullptr); } for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; @@ -2402,30 +2402,37 @@ Status DBImpl::OpenCompactionOutputFile( pending_outputs_[file_number] = compact->compaction->GetOutputPathId(); mutex_.Unlock(); } + // Make the output file + std::string fname = TableFileName(db_options_.db_paths, file_number, + compact->compaction->GetOutputPathId()); + Status s = env_->NewWritableFile(fname, &compact->outfile, env_options_); + + if (!s.ok()) { + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, + "[%s] OpenCompactionOutputFiles for table #%" PRIu64 " " + "fails at NewWritableFile with status %s", + compact->compaction->column_family_data()->GetName().c_str(), + file_number, s.ToString().c_str()); + LogFlush(db_options_.info_log); + return s; + } CompactionState::Output out; out.number = file_number; out.path_id = compact->compaction->GetOutputPathId(); out.smallest.Clear(); out.largest.Clear(); out.smallest_seqno = out.largest_seqno = 0; - compact->outputs.push_back(out); - // Make the output file - std::string fname = TableFileName(db_options_.db_paths, file_number, - compact->compaction->GetOutputPathId()); - Status s = env_->NewWritableFile(fname, &compact->outfile, env_options_); - - if (s.ok()) { - compact->outfile->SetIOPriority(Env::IO_LOW); - compact->outfile->SetPreallocationBlockSize( - compact->compaction->OutputFilePreallocationSize(mutable_cf_options)); + compact->outputs.push_back(out); + compact->outfile->SetIOPriority(Env::IO_LOW); + compact->outfile->SetPreallocationBlockSize( + compact->compaction->OutputFilePreallocationSize(mutable_cf_options)); - ColumnFamilyData* cfd = compact->compaction->column_family_data(); - compact->builder.reset(NewTableBuilder( - *cfd->ioptions(), cfd->internal_comparator(), compact->outfile.get(), - compact->compaction->OutputCompressionType(), - cfd->ioptions()->compression_opts)); - } + ColumnFamilyData* cfd = compact->compaction->column_family_data(); + compact->builder.reset(NewTableBuilder( + *cfd->ioptions(), cfd->internal_comparator(), compact->outfile.get(), + compact->compaction->OutputCompressionType(), + cfd->ioptions()->compression_opts)); LogFlush(db_options_.info_log); return s; } @@ -2616,7 +2623,7 @@ Status DBImpl::ProcessKeyValueCompaction( int64_t key_drop_obsolete = 0; int64_t loop_cnt = 0; while (input->Valid() && !shutting_down_.load(std::memory_order_acquire) && - !cfd->IsDropped()) { + !cfd->IsDropped() && status.ok()) { if (++loop_cnt > 1000) { if (key_drop_user > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user); @@ -2891,8 +2898,8 @@ Status DBImpl::ProcessKeyValueCompaction( // Only had one item to begin with (Put/Delete) break; } - } - } + } // while (true) + } // if (!drop) // MergeUntil has moved input to the next entry if (!current_entry_is_merging) { diff --git a/db/db_test.cc b/db/db_test.cc index 3ded0ec97..927d97ed4 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -120,6 +120,8 @@ static std::string Key(int i) { // Special Env used to delay background operations class SpecialEnv : public EnvWrapper { public: + Random rnd_; + // sstable Sync() calls are blocked while this pointer is non-nullptr. std::atomic delay_sstable_sync_; @@ -153,7 +155,13 @@ class SpecialEnv : public EnvWrapper { std::atomic sync_counter_; - explicit SpecialEnv(Env* base) : EnvWrapper(base) { + std::atomic non_writeable_rate_; + + std::atomic new_writable_count_; + + std::atomic periodic_non_writable_; + + explicit SpecialEnv(Env* base) : EnvWrapper(base), rnd_(301) { delay_sstable_sync_.store(false, std::memory_order_release); drop_writes_.store(false, std::memory_order_release); no_space_.store(false, std::memory_order_release); @@ -165,6 +173,9 @@ class SpecialEnv : public EnvWrapper { log_write_error_.store(false, std::memory_order_release); bytes_written_ = 0; sync_counter_ = 0; + non_writeable_rate_ = 0; + new_writable_count_ = 0; + periodic_non_writable_ = 0; } Status NewWritableFile(const std::string& f, unique_ptr* r, @@ -250,8 +261,19 @@ class SpecialEnv : public EnvWrapper { } }; - if (non_writable_.load(std::memory_order_acquire)) { - return Status::IOError("simulated write error"); + if (non_writeable_rate_.load(std::memory_order_acquire) > 0) { + auto random_number = rnd_.Uniform(100); + if (random_number < non_writeable_rate_.load()) { + return Status::IOError("simulated random write error"); + } + } + + new_writable_count_++; + + auto periodic_fail = periodic_non_writable_.load(); + if (periodic_fail > 0 && + new_writable_count_.load() % periodic_fail == 0) { + return Status::IOError("simulated periodic write error"); } Status s = target()->NewWritableFile(f, r, soptions); @@ -5871,8 +5893,7 @@ TEST(DBTest, NonWritableFileSystem) { options.env = env_; Reopen(&options); ASSERT_OK(Put("foo", "v1")); - // Force errors for new files - env_->non_writable_.store(true, std::memory_order_release); + env_->non_writeable_rate_.store(100); std::string big(100000, 'x'); int errors = 0; for (int i = 0; i < 20; i++) { @@ -5882,7 +5903,7 @@ TEST(DBTest, NonWritableFileSystem) { } } ASSERT_GT(errors, 0); - env_->non_writable_.store(false, std::memory_order_release); + env_->non_writeable_rate_.store(0); } while (ChangeCompactOptions()); } @@ -8962,6 +8983,141 @@ TEST(DBTest, DynamicCompactionOptions) { ASSERT_EQ(NumTableFilesAtLevel(2), 1); } +TEST(DBTest, FileCreationRandomFailure) { + Options options; + options.env = env_; + options.create_if_missing = true; + options.write_buffer_size = 100000; // Small write buffer + options.target_file_size_base = 200000; + options.max_bytes_for_level_base = 1000000; + options.max_bytes_for_level_multiplier = 2; + + DestroyAndReopen(&options); + Random rnd(301); + + const int kTestSize = kCDTKeysPerBuffer * 4096; + const int kTotalIteration = 100; + // the second half of the test involves in random failure + // of file creation. + const int kRandomFailureTest = kTotalIteration / 2; + std::vector values; + for (int i = 0; i < kTestSize; ++i) { + values.push_back("NOT_FOUND"); + } + for (int j = 0; j < kTotalIteration; ++j) { + if (j == kRandomFailureTest) { + env_->non_writeable_rate_.store(90); + } + for (int k = 0; k < kTestSize; ++k) { + // here we expect some of the Put fails. + std::string value = RandomString(&rnd, 100); + Status s = Put(Key(k), Slice(value)); + if (s.ok()) { + // update the latest successful put + values[k] = value; + } + // But everything before we simulate the failure-test should succeed. + if (j < kRandomFailureTest) { + ASSERT_OK(s); + } + } + } + + // If rocksdb does not do the correct job, internal assert will fail here. + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + + // verify we have the latest successful update + for (int k = 0; k < kTestSize; ++k) { + auto v = Get(Key(k)); + ASSERT_EQ(v, values[k]); + } + + // reopen and reverify we have the latest successful update + env_->non_writeable_rate_.store(0); + Reopen(&options); + for (int k = 0; k < kTestSize; ++k) { + auto v = Get(Key(k)); + ASSERT_EQ(v, values[k]); + } +} + +TEST(DBTest, PartialCompactionFailure) { + Options options; + const int kKeySize = 16; + const int kKvSize = 1000; + const int kKeysPerBuffer = 100; + const int kNumL1Files = 5; + options.create_if_missing = true; + options.write_buffer_size = kKeysPerBuffer * kKvSize; + options.max_write_buffer_number = 2; + options.target_file_size_base = + options.write_buffer_size * + (options.max_write_buffer_number - 1); + options.level0_file_num_compaction_trigger = kNumL1Files; + options.max_bytes_for_level_base = + options.level0_file_num_compaction_trigger * + options.target_file_size_base; + options.max_bytes_for_level_multiplier = 2; + options.compression = kNoCompression; + + // The number of NewWritableFiles calls required by each operation. + const int kNumInitialNewWritableFiles = 4; + const int kNumLevel0FlushNewWritableFiles = + options.level0_file_num_compaction_trigger * 2; + const int kNumLevel1NewWritableFiles = + options.level0_file_num_compaction_trigger + 1; + // This setting will make one of the file-creation fail + // in the first L0 -> L1 compaction while making sure + // all flushes succeeed. + env_->periodic_non_writable_ = + kNumInitialNewWritableFiles + kNumLevel0FlushNewWritableFiles + + kNumLevel1NewWritableFiles - 3; + options.env = env_; + + DestroyAndReopen(&options); + + const int kNumKeys = + options.level0_file_num_compaction_trigger * + (options.max_write_buffer_number - 1) * + kKeysPerBuffer * 1.0; + + Random rnd(301); + std::vector keys; + std::vector values; + for (int k = 0; k < kNumKeys; ++k) { + keys.emplace_back(RandomString(&rnd, kKeySize)); + values.emplace_back(RandomString(&rnd, kKvSize - kKeySize)); + ASSERT_OK(Put(Slice(keys[k]), Slice(values[k]))); + } + + dbfull()->TEST_WaitForFlushMemTable(); + // Make sure the number of L0 files can trigger compaction. + ASSERT_GE(NumTableFilesAtLevel(0), + options.level0_file_num_compaction_trigger); + auto previous_num_level0_files = NumTableFilesAtLevel(0); + // Expect compaction to fail here as one file will fail its + // creation. + dbfull()->TEST_WaitForCompact(); + // Verify L0 -> L1 compaction does fail. + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + // Verify all L0 files are still there. + ASSERT_EQ(NumTableFilesAtLevel(0), previous_num_level0_files); + + // All key-values must exist after compaction fails. + for (int k = 0; k < kNumKeys; ++k) { + ASSERT_EQ(values[k], Get(keys[k])); + } + + // Make sure RocksDB will not get into corrupted state. + Reopen(&options); + + // Verify again after reopen. + for (int k = 0; k < kNumKeys; ++k) { + ASSERT_EQ(values[k], Get(keys[k])); + } +} + TEST(DBTest, DynamicMiscOptions) { // Test max_sequential_skip_in_iterations Options options; diff --git a/db/version_edit.cc b/db/version_edit.cc index 271016aaf..1252759aa 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -9,6 +9,7 @@ #include "db/version_edit.h" +#include "db/filename.h" #include "db/version_set.h" #include "util/coding.h" #include "rocksdb/slice.h" @@ -64,7 +65,7 @@ void VersionEdit::Clear() { column_family_name_.clear(); } -void VersionEdit::EncodeTo(std::string* dst) const { +bool VersionEdit::EncodeTo(std::string* dst) const { if (has_comparator_) { PutVarint32(dst, kComparator); PutLengthPrefixedSlice(dst, comparator_); @@ -111,6 +112,9 @@ void VersionEdit::EncodeTo(std::string* dst) const { PutVarint32(dst, f.fd.GetPathId()); } PutVarint64(dst, f.fd.GetFileSize()); + if (!f.smallest.Valid() || !f.largest.Valid()) { + return false; + } PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode()); PutVarint64(dst, f.smallest_seqno); @@ -131,6 +135,7 @@ void VersionEdit::EncodeTo(std::string* dst) const { if (is_column_family_drop_) { PutVarint32(dst, kColumnFamilyDrop); } + return true; } static bool GetInternalKey(Slice* input, InternalKey* dst) { diff --git a/db/version_edit.h b/db/version_edit.h index fbe7e02d1..3317b11c4 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -213,7 +213,8 @@ class VersionEdit { is_column_family_drop_ = true; } - void EncodeTo(std::string* dst) const; + // return true on success. + bool EncodeTo(std::string* dst) const; Status DecodeFrom(const Slice& src); std::string DebugString(bool hex_key = false) const; diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 850f242c1..fe663c766 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -44,6 +44,16 @@ TEST(VersionEditTest, EncodeDecode) { TestEncodeDecode(edit); } +TEST(VersionEditTest, EncodeEmptyFile) { + VersionEdit edit; + edit.AddFile(0, 0, 0, 0, + InternalKey(), + InternalKey(), + 0, 0); + std::string buffer; + ASSERT_TRUE(!edit.EncodeTo(&buffer)); +} + TEST(VersionEditTest, ColumnFamilyTest) { VersionEdit edit; edit.SetColumnFamily(2); diff --git a/db/version_set.cc b/db/version_set.cc index b47578a4a..6a68c373e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1854,7 +1854,11 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (s.ok()) { for (auto& e : batch_edits) { std::string record; - e->EncodeTo(&record); + if (!e->EncodeTo(&record)) { + s = Status::Corruption( + "Unable to Encode VersionEdit:" + e->DebugString(true)); + break; + } s = descriptor_log_->AddRecord(record); if (!s.ok()) { break; @@ -1872,19 +1876,24 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, } } if (!s.ok()) { - Log(db_options_->info_log, "MANIFEST write: %s\n", - s.ToString().c_str()); + Log(InfoLogLevel::ERROR_LEVEL, db_options_->info_log, + "MANIFEST write: %s\n", s.ToString().c_str()); bool all_records_in = true; for (auto& e : batch_edits) { std::string record; - e->EncodeTo(&record); + if (!e->EncodeTo(&record)) { + s = Status::Corruption( + "Unable to Encode VersionEdit:" + e->DebugString(true)); + all_records_in = false; + break; + } if (!ManifestContains(pending_manifest_file_number_, record)) { all_records_in = false; break; } } if (all_records_in) { - Log(db_options_->info_log, + Log(InfoLogLevel::WARN_LEVEL, db_options_->info_log, "MANIFEST contains log record despite error; advancing to new " "version to prevent mismatch between in-memory and logged state" " If paranoid is set, then the db is now in readonly mode."); @@ -2661,7 +2670,10 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { edit.SetComparatorName( cfd->internal_comparator().user_comparator()->Name()); std::string record; - edit.EncodeTo(&record); + if (!edit.EncodeTo(&record)) { + return Status::Corruption( + "Unable to Encode VersionEdit:" + edit.DebugString(true)); + } Status s = log->AddRecord(record); if (!s.ok()) { return s; @@ -2682,7 +2694,10 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { } edit.SetLogNumber(cfd->GetLogNumber()); std::string record; - edit.EncodeTo(&record); + if (!edit.EncodeTo(&record)) { + return Status::Corruption( + "Unable to Encode VersionEdit:" + edit.DebugString(true)); + } Status s = log->AddRecord(record); if (!s.ok()) { return s;