Fix the bug where compaction does not fail when RocksDB can't create a new file.

Summary:
This diff has two fixes.

1. Fix the bug where compaction does not fail when RocksDB can't create a new file.
2. When NewWritableFiles() fails in OpenCompactionOutputFiles(), previously such fail-to-created file will be still be included as a compaction output.  This patch also fixes this bug.
3. Allow VersionEdit::EncodeTo() to return Status and add basic check.

Test Plan:
./version_edit_test
export ROCKSDB_TESTS=FileCreationRandomFailure
./db_test

Reviewers: ljin, sdong, nkg-, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D25581
main
Yueh-Hsuan Chiang 10 years ago
parent c49dedbe04
commit 3772a3d09d
  1. 31
      db/db_impl.cc
  2. 168
      db/db_test.cc
  3. 7
      db/version_edit.cc
  4. 3
      db/version_edit.h
  5. 10
      db/version_edit_test.cc
  6. 29
      db/version_set.cc

@ -2347,7 +2347,7 @@ void DBImpl::CleanupCompaction(CompactionState* compact, Status status) {
compact->builder->Abandon();
compact->builder.reset();
} else {
assert(compact->outfile == nullptr);
assert(!status.ok() || compact->outfile == nullptr);
}
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
@ -2402,20 +2402,28 @@ Status DBImpl::OpenCompactionOutputFile(
pending_outputs_[file_number] = compact->compaction->GetOutputPathId();
mutex_.Unlock();
}
// Make the output file
std::string fname = TableFileName(db_options_.db_paths, file_number,
compact->compaction->GetOutputPathId());
Status s = env_->NewWritableFile(fname, &compact->outfile, env_options_);
if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"[%s] OpenCompactionOutputFiles for table #%" PRIu64 " "
"fails at NewWritableFile with status %s",
compact->compaction->column_family_data()->GetName().c_str(),
file_number, s.ToString().c_str());
LogFlush(db_options_.info_log);
return s;
}
CompactionState::Output out;
out.number = file_number;
out.path_id = compact->compaction->GetOutputPathId();
out.smallest.Clear();
out.largest.Clear();
out.smallest_seqno = out.largest_seqno = 0;
compact->outputs.push_back(out);
// Make the output file
std::string fname = TableFileName(db_options_.db_paths, file_number,
compact->compaction->GetOutputPathId());
Status s = env_->NewWritableFile(fname, &compact->outfile, env_options_);
if (s.ok()) {
compact->outputs.push_back(out);
compact->outfile->SetIOPriority(Env::IO_LOW);
compact->outfile->SetPreallocationBlockSize(
compact->compaction->OutputFilePreallocationSize(mutable_cf_options));
@ -2425,7 +2433,6 @@ Status DBImpl::OpenCompactionOutputFile(
*cfd->ioptions(), cfd->internal_comparator(), compact->outfile.get(),
compact->compaction->OutputCompressionType(),
cfd->ioptions()->compression_opts));
}
LogFlush(db_options_.info_log);
return s;
}
@ -2616,7 +2623,7 @@ Status DBImpl::ProcessKeyValueCompaction(
int64_t key_drop_obsolete = 0;
int64_t loop_cnt = 0;
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire) &&
!cfd->IsDropped()) {
!cfd->IsDropped() && status.ok()) {
if (++loop_cnt > 1000) {
if (key_drop_user > 0) {
RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
@ -2891,8 +2898,8 @@ Status DBImpl::ProcessKeyValueCompaction(
// Only had one item to begin with (Put/Delete)
break;
}
}
}
} // while (true)
} // if (!drop)
// MergeUntil has moved input to the next entry
if (!current_entry_is_merging) {

@ -120,6 +120,8 @@ static std::string Key(int i) {
// Special Env used to delay background operations
class SpecialEnv : public EnvWrapper {
public:
Random rnd_;
// sstable Sync() calls are blocked while this pointer is non-nullptr.
std::atomic<bool> delay_sstable_sync_;
@ -153,7 +155,13 @@ class SpecialEnv : public EnvWrapper {
std::atomic<int> sync_counter_;
explicit SpecialEnv(Env* base) : EnvWrapper(base) {
std::atomic<uint32_t> non_writeable_rate_;
std::atomic<uint32_t> new_writable_count_;
std::atomic<uint32_t> periodic_non_writable_;
explicit SpecialEnv(Env* base) : EnvWrapper(base), rnd_(301) {
delay_sstable_sync_.store(false, std::memory_order_release);
drop_writes_.store(false, std::memory_order_release);
no_space_.store(false, std::memory_order_release);
@ -165,6 +173,9 @@ class SpecialEnv : public EnvWrapper {
log_write_error_.store(false, std::memory_order_release);
bytes_written_ = 0;
sync_counter_ = 0;
non_writeable_rate_ = 0;
new_writable_count_ = 0;
periodic_non_writable_ = 0;
}
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
@ -250,8 +261,19 @@ class SpecialEnv : public EnvWrapper {
}
};
if (non_writable_.load(std::memory_order_acquire)) {
return Status::IOError("simulated write error");
if (non_writeable_rate_.load(std::memory_order_acquire) > 0) {
auto random_number = rnd_.Uniform(100);
if (random_number < non_writeable_rate_.load()) {
return Status::IOError("simulated random write error");
}
}
new_writable_count_++;
auto periodic_fail = periodic_non_writable_.load();
if (periodic_fail > 0 &&
new_writable_count_.load() % periodic_fail == 0) {
return Status::IOError("simulated periodic write error");
}
Status s = target()->NewWritableFile(f, r, soptions);
@ -5871,8 +5893,7 @@ TEST(DBTest, NonWritableFileSystem) {
options.env = env_;
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
// Force errors for new files
env_->non_writable_.store(true, std::memory_order_release);
env_->non_writeable_rate_.store(100);
std::string big(100000, 'x');
int errors = 0;
for (int i = 0; i < 20; i++) {
@ -5882,7 +5903,7 @@ TEST(DBTest, NonWritableFileSystem) {
}
}
ASSERT_GT(errors, 0);
env_->non_writable_.store(false, std::memory_order_release);
env_->non_writeable_rate_.store(0);
} while (ChangeCompactOptions());
}
@ -8962,6 +8983,141 @@ TEST(DBTest, DynamicCompactionOptions) {
ASSERT_EQ(NumTableFilesAtLevel(2), 1);
}
TEST(DBTest, FileCreationRandomFailure) {
Options options;
options.env = env_;
options.create_if_missing = true;
options.write_buffer_size = 100000; // Small write buffer
options.target_file_size_base = 200000;
options.max_bytes_for_level_base = 1000000;
options.max_bytes_for_level_multiplier = 2;
DestroyAndReopen(&options);
Random rnd(301);
const int kTestSize = kCDTKeysPerBuffer * 4096;
const int kTotalIteration = 100;
// the second half of the test involves in random failure
// of file creation.
const int kRandomFailureTest = kTotalIteration / 2;
std::vector<std::string> values;
for (int i = 0; i < kTestSize; ++i) {
values.push_back("NOT_FOUND");
}
for (int j = 0; j < kTotalIteration; ++j) {
if (j == kRandomFailureTest) {
env_->non_writeable_rate_.store(90);
}
for (int k = 0; k < kTestSize; ++k) {
// here we expect some of the Put fails.
std::string value = RandomString(&rnd, 100);
Status s = Put(Key(k), Slice(value));
if (s.ok()) {
// update the latest successful put
values[k] = value;
}
// But everything before we simulate the failure-test should succeed.
if (j < kRandomFailureTest) {
ASSERT_OK(s);
}
}
}
// If rocksdb does not do the correct job, internal assert will fail here.
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
// verify we have the latest successful update
for (int k = 0; k < kTestSize; ++k) {
auto v = Get(Key(k));
ASSERT_EQ(v, values[k]);
}
// reopen and reverify we have the latest successful update
env_->non_writeable_rate_.store(0);
Reopen(&options);
for (int k = 0; k < kTestSize; ++k) {
auto v = Get(Key(k));
ASSERT_EQ(v, values[k]);
}
}
TEST(DBTest, PartialCompactionFailure) {
Options options;
const int kKeySize = 16;
const int kKvSize = 1000;
const int kKeysPerBuffer = 100;
const int kNumL1Files = 5;
options.create_if_missing = true;
options.write_buffer_size = kKeysPerBuffer * kKvSize;
options.max_write_buffer_number = 2;
options.target_file_size_base =
options.write_buffer_size *
(options.max_write_buffer_number - 1);
options.level0_file_num_compaction_trigger = kNumL1Files;
options.max_bytes_for_level_base =
options.level0_file_num_compaction_trigger *
options.target_file_size_base;
options.max_bytes_for_level_multiplier = 2;
options.compression = kNoCompression;
// The number of NewWritableFiles calls required by each operation.
const int kNumInitialNewWritableFiles = 4;
const int kNumLevel0FlushNewWritableFiles =
options.level0_file_num_compaction_trigger * 2;
const int kNumLevel1NewWritableFiles =
options.level0_file_num_compaction_trigger + 1;
// This setting will make one of the file-creation fail
// in the first L0 -> L1 compaction while making sure
// all flushes succeeed.
env_->periodic_non_writable_ =
kNumInitialNewWritableFiles + kNumLevel0FlushNewWritableFiles +
kNumLevel1NewWritableFiles - 3;
options.env = env_;
DestroyAndReopen(&options);
const int kNumKeys =
options.level0_file_num_compaction_trigger *
(options.max_write_buffer_number - 1) *
kKeysPerBuffer * 1.0;
Random rnd(301);
std::vector<std::string> keys;
std::vector<std::string> values;
for (int k = 0; k < kNumKeys; ++k) {
keys.emplace_back(RandomString(&rnd, kKeySize));
values.emplace_back(RandomString(&rnd, kKvSize - kKeySize));
ASSERT_OK(Put(Slice(keys[k]), Slice(values[k])));
}
dbfull()->TEST_WaitForFlushMemTable();
// Make sure the number of L0 files can trigger compaction.
ASSERT_GE(NumTableFilesAtLevel(0),
options.level0_file_num_compaction_trigger);
auto previous_num_level0_files = NumTableFilesAtLevel(0);
// Expect compaction to fail here as one file will fail its
// creation.
dbfull()->TEST_WaitForCompact();
// Verify L0 -> L1 compaction does fail.
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
// Verify all L0 files are still there.
ASSERT_EQ(NumTableFilesAtLevel(0), previous_num_level0_files);
// All key-values must exist after compaction fails.
for (int k = 0; k < kNumKeys; ++k) {
ASSERT_EQ(values[k], Get(keys[k]));
}
// Make sure RocksDB will not get into corrupted state.
Reopen(&options);
// Verify again after reopen.
for (int k = 0; k < kNumKeys; ++k) {
ASSERT_EQ(values[k], Get(keys[k]));
}
}
TEST(DBTest, DynamicMiscOptions) {
// Test max_sequential_skip_in_iterations
Options options;

@ -9,6 +9,7 @@
#include "db/version_edit.h"
#include "db/filename.h"
#include "db/version_set.h"
#include "util/coding.h"
#include "rocksdb/slice.h"
@ -64,7 +65,7 @@ void VersionEdit::Clear() {
column_family_name_.clear();
}
void VersionEdit::EncodeTo(std::string* dst) const {
bool VersionEdit::EncodeTo(std::string* dst) const {
if (has_comparator_) {
PutVarint32(dst, kComparator);
PutLengthPrefixedSlice(dst, comparator_);
@ -111,6 +112,9 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32(dst, f.fd.GetPathId());
}
PutVarint64(dst, f.fd.GetFileSize());
if (!f.smallest.Valid() || !f.largest.Valid()) {
return false;
}
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
PutVarint64(dst, f.smallest_seqno);
@ -131,6 +135,7 @@ void VersionEdit::EncodeTo(std::string* dst) const {
if (is_column_family_drop_) {
PutVarint32(dst, kColumnFamilyDrop);
}
return true;
}
static bool GetInternalKey(Slice* input, InternalKey* dst) {

@ -213,7 +213,8 @@ class VersionEdit {
is_column_family_drop_ = true;
}
void EncodeTo(std::string* dst) const;
// return true on success.
bool EncodeTo(std::string* dst) const;
Status DecodeFrom(const Slice& src);
std::string DebugString(bool hex_key = false) const;

@ -44,6 +44,16 @@ TEST(VersionEditTest, EncodeDecode) {
TestEncodeDecode(edit);
}
TEST(VersionEditTest, EncodeEmptyFile) {
VersionEdit edit;
edit.AddFile(0, 0, 0, 0,
InternalKey(),
InternalKey(),
0, 0);
std::string buffer;
ASSERT_TRUE(!edit.EncodeTo(&buffer));
}
TEST(VersionEditTest, ColumnFamilyTest) {
VersionEdit edit;
edit.SetColumnFamily(2);

@ -1854,7 +1854,11 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (s.ok()) {
for (auto& e : batch_edits) {
std::string record;
e->EncodeTo(&record);
if (!e->EncodeTo(&record)) {
s = Status::Corruption(
"Unable to Encode VersionEdit:" + e->DebugString(true));
break;
}
s = descriptor_log_->AddRecord(record);
if (!s.ok()) {
break;
@ -1872,19 +1876,24 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
}
}
if (!s.ok()) {
Log(db_options_->info_log, "MANIFEST write: %s\n",
s.ToString().c_str());
Log(InfoLogLevel::ERROR_LEVEL, db_options_->info_log,
"MANIFEST write: %s\n", s.ToString().c_str());
bool all_records_in = true;
for (auto& e : batch_edits) {
std::string record;
e->EncodeTo(&record);
if (!e->EncodeTo(&record)) {
s = Status::Corruption(
"Unable to Encode VersionEdit:" + e->DebugString(true));
all_records_in = false;
break;
}
if (!ManifestContains(pending_manifest_file_number_, record)) {
all_records_in = false;
break;
}
}
if (all_records_in) {
Log(db_options_->info_log,
Log(InfoLogLevel::WARN_LEVEL, db_options_->info_log,
"MANIFEST contains log record despite error; advancing to new "
"version to prevent mismatch between in-memory and logged state"
" If paranoid is set, then the db is now in readonly mode.");
@ -2661,7 +2670,10 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
edit.SetComparatorName(
cfd->internal_comparator().user_comparator()->Name());
std::string record;
edit.EncodeTo(&record);
if (!edit.EncodeTo(&record)) {
return Status::Corruption(
"Unable to Encode VersionEdit:" + edit.DebugString(true));
}
Status s = log->AddRecord(record);
if (!s.ok()) {
return s;
@ -2682,7 +2694,10 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
}
edit.SetLogNumber(cfd->GetLogNumber());
std::string record;
edit.EncodeTo(&record);
if (!edit.EncodeTo(&record)) {
return Status::Corruption(
"Unable to Encode VersionEdit:" + edit.DebugString(true));
}
Status s = log->AddRecord(record);
if (!s.ok()) {
return s;

Loading…
Cancel
Save