[unit test] CompactRange should fail if we don't have space

Summary:
See t5106397.

Also, few more changes:
1. in unit tests, the assumption is that writes will be dropped when there is no space left on device. I changed the wording around it.
2. InvalidArgument() errors are only when user-provided arguments are invalid. When the file is corrupted, we need to return Status::Corruption

Test Plan: make check

Reviewers: sdong, ljin

Reviewed By: ljin

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D23145
main
Igor Canadi 11 years ago
parent dd641b2117
commit 059e584dd3
  1. 44
      db/db_test.cc
  2. 8
      table/format.cc

@ -123,6 +123,9 @@ class SpecialEnv : public EnvWrapper {
// sstable Sync() calls are blocked while this pointer is non-nullptr. // sstable Sync() calls are blocked while this pointer is non-nullptr.
port::AtomicPointer delay_sstable_sync_; port::AtomicPointer delay_sstable_sync_;
// Drop writes on the floor while this pointer is non-nullptr.
port::AtomicPointer drop_writes_;
// Simulate no-space errors while this pointer is non-nullptr. // Simulate no-space errors while this pointer is non-nullptr.
port::AtomicPointer no_space_; port::AtomicPointer no_space_;
@ -150,6 +153,7 @@ class SpecialEnv : public EnvWrapper {
explicit SpecialEnv(Env* base) : EnvWrapper(base) { explicit SpecialEnv(Env* base) : EnvWrapper(base) {
delay_sstable_sync_.Release_Store(nullptr); delay_sstable_sync_.Release_Store(nullptr);
drop_writes_.Release_Store(nullptr);
no_space_.Release_Store(nullptr); no_space_.Release_Store(nullptr);
non_writable_.Release_Store(nullptr); non_writable_.Release_Store(nullptr);
count_random_reads_ = false; count_random_reads_ = false;
@ -173,9 +177,11 @@ class SpecialEnv : public EnvWrapper {
base_(std::move(base)) { base_(std::move(base)) {
} }
Status Append(const Slice& data) { Status Append(const Slice& data) {
if (env_->no_space_.Acquire_Load() != nullptr) { if (env_->drop_writes_.Acquire_Load() != nullptr) {
// Drop writes on the floor // Drop writes on the floor
return Status::OK(); return Status::OK();
} else if (env_->no_space_.Acquire_Load() != nullptr) {
return Status::IOError("No space left on device");
} else { } else {
env_->bytes_written_ += data.size(); env_->bytes_written_ += data.size();
return base_->Append(data); return base_->Append(data);
@ -5573,8 +5579,8 @@ TEST(DBTest, DestroyDBMetaDatabase) {
ASSERT_TRUE(!(DB::Open(opts, metametadbname, &db)).ok()); ASSERT_TRUE(!(DB::Open(opts, metametadbname, &db)).ok());
} }
// Check that number of files does not grow when we are out of space // Check that number of files does not grow when writes are dropped
TEST(DBTest, NoSpace) { TEST(DBTest, DropWrites) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = env_; options.env = env_;
@ -5585,7 +5591,7 @@ TEST(DBTest, NoSpace) {
ASSERT_EQ("v1", Get("foo")); ASSERT_EQ("v1", Get("foo"));
Compact("a", "z"); Compact("a", "z");
const int num_files = CountFiles(); const int num_files = CountFiles();
env_->no_space_.Release_Store(env_); // Force out-of-space errors env_->drop_writes_.Release_Store(env_); // Force out-of-space errors
env_->sleep_counter_.Reset(); env_->sleep_counter_.Reset();
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
for (int level = 0; level < dbfull()->NumberLevels()-1; level++) { for (int level = 0; level < dbfull()->NumberLevels()-1; level++) {
@ -5597,7 +5603,7 @@ TEST(DBTest, NoSpace) {
ASSERT_TRUE(db_->GetProperty("rocksdb.background-errors", &property_value)); ASSERT_TRUE(db_->GetProperty("rocksdb.background-errors", &property_value));
ASSERT_EQ("5", property_value); ASSERT_EQ("5", property_value);
env_->no_space_.Release_Store(nullptr); env_->drop_writes_.Release_Store(nullptr);
ASSERT_LT(CountFiles(), num_files + 3); ASSERT_LT(CountFiles(), num_files + 3);
// Check that compaction attempts slept after errors // Check that compaction attempts slept after errors
@ -5606,7 +5612,7 @@ TEST(DBTest, NoSpace) {
} }
// Check background error counter bumped on flush failures. // Check background error counter bumped on flush failures.
TEST(DBTest, NoSpaceFlush) { TEST(DBTest, DropWritesFlush) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = env_; options.env = env_;
@ -5614,7 +5620,7 @@ TEST(DBTest, NoSpaceFlush) {
Reopen(&options); Reopen(&options);
ASSERT_OK(Put("foo", "v1")); ASSERT_OK(Put("foo", "v1"));
env_->no_space_.Release_Store(env_); // Force out-of-space errors env_->drop_writes_.Release_Store(env_); // Force out-of-space errors
std::string property_value; std::string property_value;
// Background error count is 0 now. // Background error count is 0 now.
@ -5638,6 +5644,30 @@ TEST(DBTest, NoSpaceFlush) {
} }
ASSERT_EQ("1", property_value); ASSERT_EQ("1", property_value);
env_->drop_writes_.Release_Store(nullptr);
} while (ChangeCompactOptions());
}
// Check that CompactRange() returns failure if there is not enough space left
// on device
TEST(DBTest, NoSpaceCompactRange) {
do {
Options options = CurrentOptions();
options.env = env_;
options.disable_auto_compactions = true;
Reopen(&options);
// generate 5 tables
for (int i = 0; i < 5; ++i) {
ASSERT_OK(Put(Key(i), Key(i) + "v"));
ASSERT_OK(Flush());
}
env_->no_space_.Release_Store(env_); // Force out-of-space errors
Status s = db_->CompactRange(nullptr, nullptr);
ASSERT_TRUE(s.IsIOError());
env_->no_space_.Release_Store(nullptr); env_->no_space_.Release_Store(nullptr);
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }

@ -135,7 +135,7 @@ Status Footer::DecodeFrom(Slice* input) {
snprintf(buffer, sizeof(buffer) - 1, snprintf(buffer, sizeof(buffer) - 1,
"not an sstable (bad magic number --- %lx)", "not an sstable (bad magic number --- %lx)",
(long)magic); (long)magic);
return Status::InvalidArgument(buffer); return Status::Corruption(buffer);
} }
} else { } else {
set_table_magic_number(magic); set_table_magic_number(magic);
@ -156,7 +156,7 @@ Status Footer::DecodeFrom(Slice* input) {
// It consists of the checksum type, two block handles, padding, // It consists of the checksum type, two block handles, padding,
// a version number, and a magic number // a version number, and a magic number
if (input->size() < kVersion1EncodedLength) { if (input->size() < kVersion1EncodedLength) {
return Status::InvalidArgument("input is too short to be an sstable"); return Status::Corruption("input is too short to be an sstable");
} else { } else {
input->remove_prefix(input->size() - kVersion1EncodedLength); input->remove_prefix(input->size() - kVersion1EncodedLength);
} }
@ -183,7 +183,7 @@ Status ReadFooterFromFile(RandomAccessFile* file,
uint64_t file_size, uint64_t file_size,
Footer* footer) { Footer* footer) {
if (file_size < Footer::kMinEncodedLength) { if (file_size < Footer::kMinEncodedLength) {
return Status::InvalidArgument("file is too short to be an sstable"); return Status::Corruption("file is too short to be an sstable");
} }
char footer_space[Footer::kMaxEncodedLength]; char footer_space[Footer::kMaxEncodedLength];
@ -198,7 +198,7 @@ Status ReadFooterFromFile(RandomAccessFile* file,
// Check that we actually read the whole footer from the file. It may be // Check that we actually read the whole footer from the file. It may be
// that size isn't correct. // that size isn't correct.
if (footer_input.size() < Footer::kMinEncodedLength) { if (footer_input.size() < Footer::kMinEncodedLength) {
return Status::InvalidArgument("file is too short to be an sstable"); return Status::Corruption("file is too short to be an sstable");
} }
return footer->DecodeFrom(&footer_input); return footer->DecodeFrom(&footer_input);

Loading…
Cancel
Save