If a Put fails, fail all other puts

Summary:
When a Put fails, it can leave database in a messy state. We don't want to pretend that everything is OK when it may not be. We fail every write following the failed one.

I added checks for corruption to DBImpl::Write(). Is there anywhere else I need to add them?

Test Plan: Corruption unit test.

Reviewers: dhruba, haobo, kailiu

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13671
main
Igor Canadi 11 years ago
parent 1ca86f0391
commit 100fa8e013
  1. 13
      db/corruption_test.cc
  2. 3
      db/db_impl.cc
  3. 67
      db/db_test.cc
  4. 3
      include/rocksdb/options.h

@ -221,10 +221,15 @@ TEST(CorruptionTest, NewFileErrorDuringWrite) {
const int num = 3 + (Options().write_buffer_size / kValueSize);
std::string value_storage;
Status s;
for (int i = 0; s.ok() && i < num; i++) {
bool failed = false;
for (int i = 0; i < num; i++) {
WriteBatch batch;
batch.Put("a", Value(100, &value_storage));
s = db_->Write(WriteOptions(), &batch);
if (!s.ok()) {
failed = true;
}
ASSERT_TRUE(!failed || !s.ok());
}
ASSERT_TRUE(!s.ok());
ASSERT_GE(env_.num_writable_file_errors_, 1);
@ -338,8 +343,14 @@ TEST(CorruptionTest, CompactionInputErrorParanoid) {
// Write must eventually fail because of corrupted table
Status s;
std::string tmp1, tmp2;
bool failed = false;
for (int i = 0; i < 10000 && s.ok(); i++) {
s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2));
if (!s.ok()) {
failed = true;
}
// if one write failed, every subsequent write must fail, too
ASSERT_TRUE(!failed || !s.ok()) << "write did not fail in a corrupted db";
}
ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
}

@ -2630,6 +2630,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
if (updates == &tmp_batch_) tmp_batch_.Clear();
}
if (options_.paranoid_checks && !status.ok() && bg_error_.ok()) {
bg_error_ = status; // stop compaction & fail any further writes
}
while (true) {
Writer* ready = writers_.front();

@ -101,6 +101,9 @@ class SpecialEnv : public EnvWrapper {
// Force write to manifest files to fail while this pointer is non-nullptr
port::AtomicPointer manifest_write_error_;
// Force write to log files to fail while this pointer is non-nullptr
port::AtomicPointer log_write_error_;
bool count_random_reads_;
anon::AtomicCounter random_read_counter_;
@ -113,6 +116,7 @@ class SpecialEnv : public EnvWrapper {
count_random_reads_ = false;
manifest_sync_error_.Release_Store(nullptr);
manifest_write_error_.Release_Store(nullptr);
log_write_error_.Release_Store(nullptr);
}
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
@ -168,6 +172,24 @@ class SpecialEnv : public EnvWrapper {
}
}
};
class LogFile : public WritableFile {
private:
SpecialEnv* env_;
unique_ptr<WritableFile> base_;
public:
LogFile(SpecialEnv* env, unique_ptr<WritableFile>&& b)
: env_(env), base_(std::move(b)) { }
Status Append(const Slice& data) {
if (env_->log_write_error_.Acquire_Load() != nullptr) {
return Status::IOError("simulated writer error");
} else {
return base_->Append(data);
}
}
Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); }
Status Sync() { return base_->Sync(); }
};
if (non_writable_.Acquire_Load() != nullptr) {
return Status::IOError("simulated write error");
@ -179,6 +201,8 @@ class SpecialEnv : public EnvWrapper {
r->reset(new SSTableFile(this, std::move(*r)));
} else if (strstr(f.c_str(), "MANIFEST") != nullptr) {
r->reset(new ManifestFile(this, std::move(*r)));
} else if (strstr(f.c_str(), "log") != nullptr) {
r->reset(new LogFile(this, std::move(*r)));
}
}
return s;
@ -3295,6 +3319,49 @@ TEST(DBTest, ManifestWriteError) {
}
}
TEST(DBTest, PutFailsParanoid) {
// Test the following:
// (a) A random put fails in paranoid mode (simulate by sync fail)
// (b) All other puts have to fail, even if writes would succeed
// (c) All of that should happen ONLY if paranoid_checks = true
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.error_if_exists = false;
options.paranoid_checks = true;
DestroyAndReopen(&options);
Status s;
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("foo1", "bar1"));
// simulate error
env_->log_write_error_.Release_Store(env_);
s = Put("foo2", "bar2");
ASSERT_TRUE(!s.ok());
env_->log_write_error_.Release_Store(nullptr);
s = Put("foo3", "bar3");
// the next put should fail, too
ASSERT_TRUE(!s.ok());
// but we're still able to read
ASSERT_EQ("bar", Get("foo"));
// do the same thing with paranoid checks off
options.paranoid_checks = false;
DestroyAndReopen(&options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("foo1", "bar1"));
// simulate error
env_->log_write_error_.Release_Store(env_);
s = Put("foo2", "bar2");
ASSERT_TRUE(!s.ok());
env_->log_write_error_.Release_Store(nullptr);
s = Put("foo3", "bar3");
// the next put should NOT fail
ASSERT_TRUE(s.ok());
}
TEST(DBTest, FilesDeletedAfterCompaction) {
do {
ASSERT_OK(Put("foo", "v2"));

@ -109,6 +109,9 @@ struct Options {
// errors. This may have unforeseen ramifications: for example, a
// corruption of one DB entry may cause a large number of entries to
// become unreadable or for the entire DB to become unopenable.
// If any of the writes to the database fails (Put, Delete, Merge, Write),
// the database will switch to read-only mode and fail all other
// Write operations.
// Default: false
bool paranoid_checks;

Loading…
Cancel
Save