diff --git a/util/fault_injection_test_env.cc b/util/fault_injection_test_env.cc index 26ea0fe0e..b3232093e 100644 --- a/util/fault_injection_test_env.cc +++ b/util/fault_injection_test_env.cc @@ -197,6 +197,27 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname, return s; } +Status FaultInjectionTestEnv::ReopenWritableFile( + const std::string& fname, unique_ptr* result, + const EnvOptions& soptions) { + if (!IsFilesystemActive()) { + return GetError(); + } + Status s = target()->ReopenWritableFile(fname, result, soptions); + if (s.ok()) { + result->reset(new TestWritableFile(fname, std::move(*result), this)); + // WritableFileWriter* file is opened + // again then it will be truncated - so forget our saved state. + UntrackFile(fname); + MutexLock l(&mutex_); + open_files_.insert(fname); + auto dir_and_name = GetDirAndName(fname); + auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; + list.insert(dir_and_name.second); + } + return s; +} + Status FaultInjectionTestEnv::NewRandomAccessFile( const std::string& fname, std::unique_ptr* result, const EnvOptions& soptions) { diff --git a/util/fault_injection_test_env.h b/util/fault_injection_test_env.h index 563986e29..d88a49155 100644 --- a/util/fault_injection_test_env.h +++ b/util/fault_injection_test_env.h @@ -110,6 +110,10 @@ class FaultInjectionTestEnv : public EnvWrapper { unique_ptr* result, const EnvOptions& soptions) override; + Status ReopenWritableFile(const std::string& fname, + unique_ptr* result, + const EnvOptions& soptions) override; + Status NewRandomAccessFile(const std::string& fname, std::unique_ptr* result, const EnvOptions& soptions) override; diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index e1045aa9e..bdec65462 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -404,82 +404,91 @@ std::shared_ptr BlobDBImpl::FindBlobFileLocked( return (b1 || b2) ? nullptr : (*finditr); } -std::shared_ptr BlobDBImpl::CheckOrCreateWriterLocked( - const std::shared_ptr& bfile) { - std::shared_ptr writer = bfile->GetWriter(); - if (writer) return writer; - - Status s = CreateWriterLocked(bfile); - if (!s.ok()) return nullptr; - - writer = bfile->GetWriter(); - return writer; +Status BlobDBImpl::CheckOrCreateWriterLocked( + const std::shared_ptr& blob_file, + std::shared_ptr* writer) { + assert(writer != nullptr); + *writer = blob_file->GetWriter(); + if (*writer != nullptr) { + return Status::OK(); + } + Status s = CreateWriterLocked(blob_file); + if (s.ok()) { + *writer = blob_file->GetWriter(); + } + return s; } -std::shared_ptr BlobDBImpl::SelectBlobFile() { +Status BlobDBImpl::SelectBlobFile(std::shared_ptr* blob_file) { + assert(blob_file != nullptr); { ReadLock rl(&mutex_); if (open_non_ttl_file_ != nullptr) { - return open_non_ttl_file_; + *blob_file = open_non_ttl_file_; + return Status::OK(); } } // CHECK again WriteLock wl(&mutex_); if (open_non_ttl_file_ != nullptr) { - return open_non_ttl_file_; + *blob_file = open_non_ttl_file_; + return Status::OK(); } - std::shared_ptr bfile = NewBlobFile("SelectBlobFile"); - assert(bfile); + *blob_file = NewBlobFile("SelectBlobFile"); + assert(*blob_file != nullptr); // file not visible, hence no lock - std::shared_ptr writer = CheckOrCreateWriterLocked(bfile); - if (!writer) { + std::shared_ptr writer; + Status s = CheckOrCreateWriterLocked(*blob_file, &writer); + if (!s.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, - "Failed to get writer from blob file: %s", - bfile->PathName().c_str()); - return nullptr; + "Failed to get writer from blob file: %s, error: %s", + (*blob_file)->PathName().c_str(), s.ToString().c_str()); + return s; } - bfile->file_size_ = BlobLogHeader::kSize; - bfile->header_.compression = bdb_options_.compression; - bfile->header_.has_ttl = false; - bfile->header_.column_family_id = + (*blob_file)->file_size_ = BlobLogHeader::kSize; + (*blob_file)->header_.compression = bdb_options_.compression; + (*blob_file)->header_.has_ttl = false; + (*blob_file)->header_.column_family_id = reinterpret_cast(DefaultColumnFamily())->GetID(); - bfile->header_valid_ = true; - bfile->SetColumnFamilyId(bfile->header_.column_family_id); - bfile->SetHasTTL(false); - bfile->SetCompression(bdb_options_.compression); + (*blob_file)->header_valid_ = true; + (*blob_file)->SetColumnFamilyId((*blob_file)->header_.column_family_id); + (*blob_file)->SetHasTTL(false); + (*blob_file)->SetCompression(bdb_options_.compression); - Status s = writer->WriteHeader(bfile->header_); + s = writer->WriteHeader((*blob_file)->header_); if (!s.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, "Failed to write header to new blob file: %s" " status: '%s'", - bfile->PathName().c_str(), s.ToString().c_str()); - return nullptr; + (*blob_file)->PathName().c_str(), s.ToString().c_str()); + return s; } - blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile)); - open_non_ttl_file_ = bfile; + blob_files_.insert( + std::make_pair((*blob_file)->BlobFileNumber(), *blob_file)); + open_non_ttl_file_ = *blob_file; total_blob_size_ += BlobLogHeader::kSize; - return bfile; + return s; } -std::shared_ptr BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) { +Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration, + std::shared_ptr* blob_file) { + assert(blob_file != nullptr); assert(expiration != kNoExpiration); uint64_t epoch_read = 0; - std::shared_ptr bfile; { ReadLock rl(&mutex_); - bfile = FindBlobFileLocked(expiration); + *blob_file = FindBlobFileLocked(expiration); epoch_read = epoch_of_.load(); } - if (bfile) { - assert(!bfile->Immutable()); - return bfile; + if (*blob_file != nullptr) { + assert(!(*blob_file)->Immutable()); + return Status::OK(); } uint64_t exp_low = @@ -487,61 +496,66 @@ std::shared_ptr BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) { uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs; ExpirationRange expiration_range = std::make_pair(exp_low, exp_high); - bfile = NewBlobFile("SelectBlobFileTTL"); - assert(bfile); + *blob_file = NewBlobFile("SelectBlobFileTTL"); + assert(*blob_file != nullptr); ROCKS_LOG_INFO(db_options_.info_log, "New blob file TTL range: %s %d %d", - bfile->PathName().c_str(), exp_low, exp_high); + (*blob_file)->PathName().c_str(), exp_low, exp_high); LogFlush(db_options_.info_log); // we don't need to take lock as no other thread is seeing bfile yet - std::shared_ptr writer = CheckOrCreateWriterLocked(bfile); - if (!writer) { - ROCKS_LOG_ERROR(db_options_.info_log, - "Failed to get writer from blob file with TTL: %s", - bfile->PathName().c_str()); - return nullptr; + std::shared_ptr writer; + Status s = CheckOrCreateWriterLocked(*blob_file, &writer); + if (!s.ok()) { + ROCKS_LOG_ERROR( + db_options_.info_log, + "Failed to get writer from blob file with TTL: %s, error: %s", + (*blob_file)->PathName().c_str(), s.ToString().c_str()); + return s; } - bfile->header_.expiration_range = expiration_range; - bfile->header_.compression = bdb_options_.compression; - bfile->header_.has_ttl = true; - bfile->header_.column_family_id = + (*blob_file)->header_.expiration_range = expiration_range; + (*blob_file)->header_.compression = bdb_options_.compression; + (*blob_file)->header_.has_ttl = true; + (*blob_file)->header_.column_family_id = reinterpret_cast(DefaultColumnFamily())->GetID(); - ; - bfile->header_valid_ = true; - bfile->SetColumnFamilyId(bfile->header_.column_family_id); - bfile->SetHasTTL(true); - bfile->SetCompression(bdb_options_.compression); - bfile->file_size_ = BlobLogHeader::kSize; + (*blob_file)->header_valid_ = true; + (*blob_file)->SetColumnFamilyId((*blob_file)->header_.column_family_id); + (*blob_file)->SetHasTTL(true); + (*blob_file)->SetCompression(bdb_options_.compression); + (*blob_file)->file_size_ = BlobLogHeader::kSize; // set the first value of the range, since that is // concrete at this time. also necessary to add to open_ttl_files_ - bfile->expiration_range_ = expiration_range; + (*blob_file)->expiration_range_ = expiration_range; WriteLock wl(&mutex_); // in case the epoch has shifted in the interim, then check // check condition again - should be rare. if (epoch_of_.load() != epoch_read) { - auto bfile2 = FindBlobFileLocked(expiration); - if (bfile2) return bfile2; + std::shared_ptr blob_file2 = FindBlobFileLocked(expiration); + if (blob_file2 != nullptr) { + *blob_file = std::move(blob_file2); + return Status::OK(); + } } - Status s = writer->WriteHeader(bfile->header_); + s = writer->WriteHeader((*blob_file)->header_); if (!s.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, "Failed to write header to new blob file: %s" " status: '%s'", - bfile->PathName().c_str(), s.ToString().c_str()); - return nullptr; + (*blob_file)->PathName().c_str(), s.ToString().c_str()); + return s; } - blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile)); - open_ttl_files_.insert(bfile); + blob_files_.insert( + std::make_pair((*blob_file)->BlobFileNumber(), *blob_file)); + open_ttl_files_.insert(*blob_file); total_blob_size_ += BlobLogHeader::kSize; epoch_of_++; - return bfile; + return s; } class BlobDBImpl::BlobInserter : public WriteBatch::Handler { @@ -695,36 +709,41 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/, return s; } - std::shared_ptr bfile = (expiration != kNoExpiration) - ? SelectBlobFileTTL(expiration) - : SelectBlobFile(); - assert(bfile != nullptr); - assert(bfile->compression() == bdb_options_.compression); - - s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration, - &index_entry); - if (expiration == kNoExpiration) { - RecordTick(statistics_, BLOB_DB_WRITE_BLOB); + std::shared_ptr blob_file; + if (expiration != kNoExpiration) { + s = SelectBlobFileTTL(expiration, &blob_file); } else { - RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL); + s = SelectBlobFile(&blob_file); + } + if (s.ok()) { + assert(blob_file != nullptr); + assert(blob_file->compression() == bdb_options_.compression); + s = AppendBlob(blob_file, headerbuf, key, value_compressed, expiration, + &index_entry); } - if (s.ok()) { if (expiration != kNoExpiration) { - bfile->ExtendExpirationRange(expiration); + blob_file->ExtendExpirationRange(expiration); } - s = CloseBlobFileIfNeeded(bfile); - if (s.ok()) { - s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key, - index_entry); + s = CloseBlobFileIfNeeded(blob_file); + } + if (s.ok()) { + s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key, + index_entry); + } + if (s.ok()) { + if (expiration == kNoExpiration) { + RecordTick(statistics_, BLOB_DB_WRITE_BLOB); + } else { + RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL); } } else { ROCKS_LOG_ERROR(db_options_.info_log, "Failed to append blob to FILE: %s: KEY: %s VALSZ: %d" " status: '%s' blob_file: '%s'", - bfile->PathName().c_str(), key.ToString().c_str(), + blob_file->PathName().c_str(), key.ToString().c_str(), value.size(), s.ToString().c_str(), - bfile->DumpState().c_str()); + blob_file->DumpState().c_str()); } } @@ -867,9 +886,10 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, uint64_t key_offset = 0; { WriteLock lockbfile_w(&bfile->mutex_); - std::shared_ptr writer = CheckOrCreateWriterLocked(bfile); - if (!writer) { - return Status::IOError("Failed to create blob writer"); + std::shared_ptr writer; + s = CheckOrCreateWriterLocked(bfile, &writer); + if (!s.ok()) { + return s; } // write the blob to the blob log. @@ -1574,7 +1594,13 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, reason += bfptr->PathName(); newfile = NewBlobFile(reason); - new_writer = CheckOrCreateWriterLocked(newfile); + s = CheckOrCreateWriterLocked(newfile, &new_writer); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to open file %s for writer, error: %s", + newfile->PathName().c_str(), s.ToString().c_str()); + break; + } // Can't use header beyond this point newfile->header_ = std::move(header); newfile->header_valid_ = true; diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 4296d5c6a..4809f006e 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -255,10 +255,11 @@ class BlobDBImpl : public BlobDB { // find an existing blob log file based on the expiration unix epoch // if such a file does not exist, return nullptr - std::shared_ptr SelectBlobFileTTL(uint64_t expiration); + Status SelectBlobFileTTL(uint64_t expiration, + std::shared_ptr* blob_file); // find an existing blob log file to append the value to - std::shared_ptr SelectBlobFile(); + Status SelectBlobFile(std::shared_ptr* blob_file); std::shared_ptr FindBlobFileLocked(uint64_t expiration) const; @@ -309,8 +310,8 @@ class BlobDBImpl : public BlobDB { // returns a Writer object for the file. If writer is not // already present, creates one. Needs Write Mutex to be held - std::shared_ptr CheckOrCreateWriterLocked( - const std::shared_ptr& bfile); + Status CheckOrCreateWriterLocked(const std::shared_ptr& blob_file, + std::shared_ptr* writer); // Iterate through keys and values on Blob and write into // separate file the remaining blobs and delete/update pointers diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index cf8f1217a..1c1867e4e 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -374,6 +374,19 @@ TEST_F(BlobDBTest, GetIOError) { fault_injection_env_->SetFilesystemActive(true); } +TEST_F(BlobDBTest, PutIOError) { + Options options; + options.env = fault_injection_env_.get(); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; // Make sure value write to blob file + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + fault_injection_env_->SetFilesystemActive(false, Status::IOError()); + ASSERT_TRUE(Put("foo", "v1").IsIOError()); + fault_injection_env_->SetFilesystemActive(true, Status::IOError()); + ASSERT_OK(Put("bar", "v1")); +} + TEST_F(BlobDBTest, WriteBatch) { Random rnd(301); BlobDBOptions bdb_options;