BlobDB: handle IO error on read (#4410)

Summary:
Fix IO error on read not being handle and crashing the DB. With the fix we properly return the error.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4410

Differential Revision: D9979246

Pulled By: yiwu-arbug

fbshipit-source-id: 111a85675067a29c03cb60e9a34103f4ff636694
main
Yi Wu 6 years ago committed by Facebook Github Bot
parent 72712f4e28
commit 04d373b260
  1. 9
      util/fault_injection_test_env.cc
  2. 4
      util/fault_injection_test_env.h
  3. 23
      utilities/blob_db/blob_db_impl.cc
  4. 7
      utilities/blob_db/blob_db_impl.h
  5. 21
      utilities/blob_db/blob_db_test.cc
  6. 26
      utilities/blob_db/blob_file.cc
  7. 7
      utilities/blob_db/blob_file.h

@ -197,6 +197,15 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname,
return s;
}
Status FaultInjectionTestEnv::NewRandomAccessFile(
const std::string& fname, std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) {
if (!IsFilesystemActive()) {
return GetError();
}
return target()->NewRandomAccessFile(fname, result, soptions);
}
Status FaultInjectionTestEnv::DeleteFile(const std::string& f) {
if (!IsFilesystemActive()) {
return GetError();

@ -110,6 +110,10 @@ class FaultInjectionTestEnv : public EnvWrapper {
unique_ptr<WritableFile>* result,
const EnvOptions& soptions) override;
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) override;
virtual Status DeleteFile(const std::string& f) override;
virtual Status RenameFile(const std::string& s,

@ -304,13 +304,17 @@ void BlobDBImpl::CloseRandomAccessLocked(
open_file_count_--;
}
std::shared_ptr<RandomAccessFileReader> BlobDBImpl::GetOrOpenRandomAccessReader(
const std::shared_ptr<BlobFile>& bfile, Env* env,
const EnvOptions& env_options) {
Status BlobDBImpl::GetBlobFileReader(
const std::shared_ptr<BlobFile>& blob_file,
std::shared_ptr<RandomAccessFileReader>* reader) {
assert(reader != nullptr);
bool fresh_open = false;
auto rar = bfile->GetOrOpenRandomAccessReader(env, env_options, &fresh_open);
if (fresh_open) open_file_count_++;
return rar;
Status s = blob_file->GetReader(env_, env_options_, reader, &fresh_open);
if (s.ok() && fresh_open) {
assert(*reader != nullptr);
open_file_count_++;
}
return s;
}
std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(const std::string& reason) {
@ -998,8 +1002,11 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
}
// takes locks when called
std::shared_ptr<RandomAccessFileReader> reader =
GetOrOpenRandomAccessReader(bfile, env_, env_options_);
std::shared_ptr<RandomAccessFileReader> reader;
s = GetBlobFileReader(bfile, &reader);
if (!s.ok()) {
return s;
}
assert(blob_index.offset() > key.size() + sizeof(uint32_t));
uint64_t record_offset = blob_index.offset() - key.size() - sizeof(uint32_t);

@ -296,11 +296,8 @@ class BlobDBImpl : public BlobDB {
// Open all blob files found in blob_dir.
Status OpenAllBlobFiles();
// hold write mutex on file and call
// creates a Random Access reader for GET call
std::shared_ptr<RandomAccessFileReader> GetOrOpenRandomAccessReader(
const std::shared_ptr<BlobFile>& bfile, Env* env,
const EnvOptions& env_options);
Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file,
std::shared_ptr<RandomAccessFileReader>* reader);
// hold write mutex on file and call.
// Close the above Random Access reader

@ -16,6 +16,7 @@
#include "port/port.h"
#include "rocksdb/utilities/debug.h"
#include "util/cast_util.h"
#include "util/fault_injection_test_env.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/sync_point.h"
@ -40,6 +41,7 @@ class BlobDBTest : public testing::Test {
BlobDBTest()
: dbname_(test::PerThreadDBPath("blob_db_test")),
mock_env_(new MockTimeEnv(Env::Default())),
fault_injection_env_(new FaultInjectionTestEnv(Env::Default())),
blob_db_(nullptr) {
Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
assert(s.ok());
@ -236,6 +238,7 @@ class BlobDBTest : public testing::Test {
const std::string dbname_;
std::unique_ptr<MockTimeEnv> mock_env_;
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
BlobDB *blob_db_;
}; // class BlobDBTest
@ -354,6 +357,23 @@ TEST_F(BlobDBTest, GetExpiration) {
ASSERT_EQ(300 /* = 100 + 200 */, expiration);
}
TEST_F(BlobDBTest, GetIOError) {
Options options;
options.env = fault_injection_env_.get();
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0; // Make sure value write to blob file
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
PinnableSlice value;
ASSERT_OK(Put("foo", "bar"));
fault_injection_env_->SetFilesystemActive(false, Status::IOError());
Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
ASSERT_TRUE(s.IsIOError());
// Reactivate file system to allow test to close DB.
fault_injection_env_->SetFilesystemActive(true);
}
TEST_F(BlobDBTest, WriteBatch) {
Random rnd(301);
BlobDBOptions bdb_options;
@ -461,7 +481,6 @@ TEST_F(BlobDBTest, DecompressAfterReopen) {
Reopen(bdb_options);
VerifyDB(data);
}
#endif
TEST_F(BlobDBTest, MultipleWriters) {

@ -191,36 +191,48 @@ void BlobFile::CloseRandomAccessLocked() {
last_access_ = -1;
}
std::shared_ptr<RandomAccessFileReader> BlobFile::GetOrOpenRandomAccessReader(
Env* env, const EnvOptions& env_options, bool* fresh_open) {
Status BlobFile::GetReader(Env* env, const EnvOptions& env_options,
std::shared_ptr<RandomAccessFileReader>* reader,
bool* fresh_open) {
assert(reader != nullptr);
assert(fresh_open != nullptr);
*fresh_open = false;
int64_t current_time = 0;
env->GetCurrentTime(&current_time);
last_access_.store(current_time);
Status s;
{
ReadLock lockbfile_r(&mutex_);
if (ra_file_reader_) return ra_file_reader_;
if (ra_file_reader_) {
*reader = ra_file_reader_;
return s;
}
}
WriteLock lockbfile_w(&mutex_);
if (ra_file_reader_) return ra_file_reader_;
// Double check.
if (ra_file_reader_) {
*reader = ra_file_reader_;
return s;
}
std::unique_ptr<RandomAccessFile> rfile;
Status s = env->NewRandomAccessFile(PathName(), &rfile, env_options);
s = env->NewRandomAccessFile(PathName(), &rfile, env_options);
if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_,
"Failed to open blob file for random-read: %s status: '%s'"
" exists: '%s'",
PathName().c_str(), s.ToString().c_str(),
env->FileExists(PathName()).ToString().c_str());
return nullptr;
return s;
}
ra_file_reader_ = std::make_shared<RandomAccessFileReader>(std::move(rfile),
PathName());
*reader = ra_file_reader_;
*fresh_open = true;
return ra_file_reader_;
return s;
}
Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) {

@ -181,6 +181,10 @@ class BlobFile {
// footer_valid_ to false and return Status::OK.
Status ReadMetadata(Env* env, const EnvOptions& env_options);
Status GetReader(Env* env, const EnvOptions& env_options,
std::shared_ptr<RandomAccessFileReader>* reader,
bool* fresh_open);
private:
std::shared_ptr<Reader> OpenRandomAccessReader(
Env* env, const DBOptions& db_options,
@ -190,9 +194,6 @@ class BlobFile {
Status WriteFooterAndCloseLocked();
std::shared_ptr<RandomAccessFileReader> GetOrOpenRandomAccessReader(
Env* env, const EnvOptions& env_options, bool* fresh_open);
void CloseRandomAccessLocked();
// this is used, when you are reading only the footer of a

Loading…
Cancel
Save