Add a new IOStatus subcode to indicate that writes are fenced off (#7374)

Summary:
In a distributed file system, directory ownership is enforced by fencing
off the previous owner once they've been preempted by a new owner. This
PR adds a IOStatus subcode for ```StatusCode::IOError``` to indicate this.
Once this error is returned for a file write, the DB is put in read-only
mode and not allowed to resume in read-write mode.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7374

Test Plan: Add new unit tests in ```error_handler_fs_test```

Reviewed By: riversand963

Differential Revision: D23687777

Pulled By: anand1976

fbshipit-source-id: bef948642089dc0af399057864d9a8ca339e8b2f
main
anand76 4 years ago committed by Facebook GitHub Bot
parent 7e09750790
commit 18a3227b12
  1. 7
      db/db_impl/db_impl_write.cc
  2. 30
      db/error_handler.cc
  3. 188
      db/error_handler_fs_test.cc
  4. 5
      include/rocksdb/io_status.h
  5. 9
      include/rocksdb/status.h
  6. 4
      util/status.cc

@ -833,6 +833,7 @@ void DBImpl::WriteStatusCheckOnLocked(const Status& status) {
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
// Caller must hold mutex_.
assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
mutex_.AssertHeld();
if (immutable_db_options_.paranoid_checks && !status.ok() &&
!status.IsBusy() && !status.IsIncomplete()) {
@ -843,6 +844,7 @@ void DBImpl::WriteStatusCheckOnLocked(const Status& status) {
void DBImpl::WriteStatusCheck(const Status& status) {
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
if (immutable_db_options_.paranoid_checks && !status.ok() &&
!status.IsBusy() && !status.IsIncomplete()) {
mutex_.Lock();
@ -854,8 +856,9 @@ void DBImpl::WriteStatusCheck(const Status& status) {
void DBImpl::IOStatusCheck(const IOStatus& io_status) {
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
if (immutable_db_options_.paranoid_checks && !io_status.ok() &&
!io_status.IsBusy() && !io_status.IsIncomplete()) {
if ((immutable_db_options_.paranoid_checks && !io_status.ok() &&
!io_status.IsBusy() && !io_status.IsIncomplete()) ||
io_status.IsIOFenced()) {
mutex_.Lock();
error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback);
mutex_.Unlock();

@ -32,6 +32,14 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
Status::Code::kIOError, Status::SubCode::kSpaceLimit,
true),
Status::Severity::kHardError},
{std::make_tuple(BackgroundErrorReason::kCompaction,
Status::Code::kIOError, Status::SubCode::kIOFenced,
true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kCompaction,
Status::Code::kIOError, Status::SubCode::kIOFenced,
false),
Status::Severity::kFatalError},
// Errors during BG flush
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
Status::SubCode::kNoSpace, true),
@ -42,6 +50,12 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
Status::SubCode::kSpaceLimit, true),
Status::Severity::kHardError},
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
Status::SubCode::kIOFenced, true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
Status::SubCode::kIOFenced, false),
Status::Severity::kFatalError},
// Errors during Write
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
Status::Code::kIOError, Status::SubCode::kNoSpace,
@ -51,6 +65,14 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
Status::Code::kIOError, Status::SubCode::kNoSpace,
false),
Status::Severity::kHardError},
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
Status::Code::kIOError, Status::SubCode::kIOFenced,
true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
Status::Code::kIOError, Status::SubCode::kIOFenced,
false),
Status::Severity::kFatalError},
// Errors during MANIFEST write
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, Status::SubCode::kNoSpace,
@ -60,6 +82,14 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
Status::Code::kIOError, Status::SubCode::kNoSpace,
false),
Status::Severity::kHardError},
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, Status::SubCode::kIOFenced,
true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, Status::SubCode::kIOFenced,
false),
Status::Severity::kFatalError},
};
std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>,

@ -1962,6 +1962,194 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover2) {
Close();
}
class DBErrorHandlingFencingTest : public DBErrorHandlingFSTest,
public testing::WithParamInterface<bool> {};
TEST_P(DBErrorHandlingFencingTest, FLushWriteFenced) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
Options options = GetDefaultOptions();
options.env = fault_fs_env.get();
options.create_if_missing = true;
options.listeners.emplace_back(listener);
options.paranoid_checks = GetParam();
Status s;
listener->EnableAutoRecovery(true);
DestroyAndReopen(options);
Put(Key(0), "val");
SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
fault_fs->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
});
SyncPoint::GetInstance()->EnableProcessing();
s = Flush();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
ASSERT_TRUE(s.IsIOFenced());
SyncPoint::GetInstance()->DisableProcessing();
fault_fs->SetFilesystemActive(true);
s = dbfull()->Resume();
ASSERT_TRUE(s.IsIOFenced());
Destroy(options);
}
TEST_P(DBErrorHandlingFencingTest, ManifestWriteFenced) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
Options options = GetDefaultOptions();
options.env = fault_fs_env.get();
options.create_if_missing = true;
options.listeners.emplace_back(listener);
options.paranoid_checks = GetParam();
Status s;
std::string old_manifest;
std::string new_manifest;
listener->EnableAutoRecovery(true);
DestroyAndReopen(options);
old_manifest = GetManifestNameFromLiveFiles();
Put(Key(0), "val");
Flush();
Put(Key(1), "val");
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WriteManifest", [&](void*) {
fault_fs->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
});
SyncPoint::GetInstance()->EnableProcessing();
s = Flush();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
ASSERT_TRUE(s.IsIOFenced());
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
fault_fs->SetFilesystemActive(true);
s = dbfull()->Resume();
ASSERT_TRUE(s.IsIOFenced());
Close();
}
TEST_P(DBErrorHandlingFencingTest, CompactionWriteFenced) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
Options options = GetDefaultOptions();
options.env = fault_fs_env.get();
options.create_if_missing = true;
options.level0_file_num_compaction_trigger = 2;
options.listeners.emplace_back(listener);
options.paranoid_checks = GetParam();
Status s;
DestroyAndReopen(options);
Put(Key(0), "va;");
Put(Key(2), "va;");
s = Flush();
ASSERT_EQ(s, Status::OK());
listener->EnableAutoRecovery(true);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::FlushMemTable:FlushMemTableFinished",
"BackgroundCallCompaction:0"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BackgroundCallCompaction:0", [&](void*) {
fault_fs->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Put(Key(1), "val");
s = Flush();
ASSERT_EQ(s, Status::OK());
s = dbfull()->TEST_WaitForCompact();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
ASSERT_TRUE(s.IsIOFenced());
fault_fs->SetFilesystemActive(true);
s = dbfull()->Resume();
ASSERT_TRUE(s.IsIOFenced());
Destroy(options);
}
TEST_P(DBErrorHandlingFencingTest, WALWriteFenced) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
Options options = GetDefaultOptions();
options.env = fault_fs_env.get();
options.create_if_missing = true;
options.writable_file_max_buffer_size = 32768;
options.listeners.emplace_back(listener);
options.paranoid_checks = GetParam();
Status s;
Random rnd(301);
listener->EnableAutoRecovery(true);
DestroyAndReopen(options);
{
WriteBatch batch;
for (auto i = 0; i < 100; ++i) {
batch.Put(Key(i), rnd.RandomString(1024));
}
WriteOptions wopts;
wopts.sync = true;
ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
};
{
WriteBatch batch;
int write_error = 0;
for (auto i = 100; i < 199; ++i) {
batch.Put(Key(i), rnd.RandomString(1024));
}
SyncPoint::GetInstance()->SetCallBack(
"WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
write_error++;
if (write_error > 2) {
fault_fs->SetFilesystemActive(false,
IOStatus::IOFenced("IO fenced"));
}
});
SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wopts;
wopts.sync = true;
s = dbfull()->Write(wopts, &batch);
ASSERT_TRUE(s.IsIOFenced());
}
SyncPoint::GetInstance()->DisableProcessing();
fault_fs->SetFilesystemActive(true);
{
WriteBatch batch;
for (auto i = 0; i < 100; ++i) {
batch.Put(Key(i), rnd.RandomString(1024));
}
WriteOptions wopts;
wopts.sync = true;
s = dbfull()->Write(wopts, &batch);
ASSERT_TRUE(s.IsIOFenced());
}
Close();
}
INSTANTIATE_TEST_CASE_P(DBErrorHandlingFSTest, DBErrorHandlingFencingTest,
::testing::Bool());
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -126,6 +126,11 @@ class IOStatus : public Status {
return IOStatus(kIOError, kPathNotFound, msg, msg2);
}
static IOStatus IOFenced() { return IOStatus(kIOError, kIOFenced); }
static IOStatus IOFenced(const Slice& msg, const Slice& msg2 = Slice()) {
return IOStatus(kIOError, kIOFenced, msg, msg2);
}
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
// std::string ToString() const;

@ -113,6 +113,7 @@ class Status {
kManualCompactionPaused = 11,
kOverwritten = 12,
kTxnNotPrepared = 13,
kIOFenced = 14,
kMaxSubCode
};
@ -482,6 +483,14 @@ class Status {
return (code() == kInvalidArgument) && (subcode() == kTxnNotPrepared);
}
// Returns true iff the status indicates a IOFenced error.
bool IsIOFenced() const {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
checked_ = true;
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
return (code() == kIOError) && (subcode() == kIOFenced);
}
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;

@ -52,7 +52,9 @@ static const char* msgs[static_cast<int>(Status::kMaxSubCode)] = {
"Insufficient capacity for merge operands",
// kManualCompactionPaused
"Manual compaction paused",
" (overwritten)", // kOverwritten, subcode of OK
" (overwritten)", // kOverwritten, subcode of OK
"Txn not prepared", // kTxnNotPrepared
"IO fenced off", // kIOFenced
};
Status::Status(Code _code, SubCode _subcode, const Slice& msg,

Loading…
Cancel
Save