Add test to check if DB can handle atomic group (#4433)

Summary:
Add unit tests to demonstrate that `VersionSet::Recover` is able to detect and handle cases in which the MANIFEST has valid atomic group, incomplete trailing atomic group, atomic group mixed with normal version edits and atomic group with incorrect size.
With this capability, RocksDB identifies non-valid groups of version edits and do not apply them, thus guaranteeing that the db is restored to a state consistent with the most recent successful atomic flush before applying WAL.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4433

Differential Revision: D10079202

Pulled By: riversand963

fbshipit-source-id: a0e0b8bf4da1cf68e044d397588c121b66c68876
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent eaaf1a6f05
commit d1118f6f19
  1. 15
      db/version_set.cc
  2. 264
      db/version_set_test.cc

@ -3474,14 +3474,21 @@ Status VersionSet::Recover(
if (edit.is_in_atomic_group_) { if (edit.is_in_atomic_group_) {
if (replay_buffer.empty()) { if (replay_buffer.empty()) {
replay_buffer.resize(edit.remaining_entries_ + 1); replay_buffer.resize(edit.remaining_entries_ + 1);
TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:FirstInAtomicGroup",
&edit);
} }
++num_entries_decoded; ++num_entries_decoded;
if (num_entries_decoded + edit.remaining_entries_ != if (num_entries_decoded + edit.remaining_entries_ !=
static_cast<uint32_t>(replay_buffer.size())) { static_cast<uint32_t>(replay_buffer.size())) {
return Status::Corruption("corrupted atomic group"); TEST_SYNC_POINT_CALLBACK(
"VersionSet::Recover:IncorrectAtomicGroupSize", &edit);
s = Status::Corruption("corrupted atomic group");
break;
} }
replay_buffer[num_entries_decoded - 1] = std::move(edit); replay_buffer[num_entries_decoded - 1] = std::move(edit);
if (num_entries_decoded == replay_buffer.size()) { if (num_entries_decoded == replay_buffer.size()) {
TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup",
&edit);
for (auto& e : replay_buffer) { for (auto& e : replay_buffer) {
s = ApplyOneVersionEdit( s = ApplyOneVersionEdit(
e, cf_name_to_options, column_families_not_found, builders, e, cf_name_to_options, column_families_not_found, builders,
@ -3496,9 +3503,13 @@ Status VersionSet::Recover(
replay_buffer.clear(); replay_buffer.clear();
num_entries_decoded = 0; num_entries_decoded = 0;
} }
TEST_SYNC_POINT("VersionSet::Recover:AtomicGroup");
} else { } else {
if (!replay_buffer.empty()) { if (!replay_buffer.empty()) {
return Status::Corruption("corrupted atomic group"); TEST_SYNC_POINT_CALLBACK(
"VersionSet::Recover:AtomicGroupMixedWithNormalEdits", &edit);
s = Status::Corruption("corrupted atomic group");
break;
} }
s = ApplyOneVersionEdit( s = ApplyOneVersionEdit(
edit, cf_name_to_options, column_families_not_found, builders, edit, cf_name_to_options, column_families_not_found, builders,

@ -605,9 +605,9 @@ TEST_F(FindLevelFileTest, LevelOverlappingFiles) {
ASSERT_TRUE(Overlaps("600", "700")); ASSERT_TRUE(Overlaps("600", "700"));
} }
class ManifestWriterTest : public testing::Test { class VersionSetTest : public testing::Test {
public: public:
ManifestWriterTest() VersionSetTest()
: env_(Env::Default()), : env_(Env::Default()),
dbname_(test::PerThreadDBPath("version_set_test")), dbname_(test::PerThreadDBPath("version_set_test")),
db_options_(), db_options_(),
@ -624,8 +624,12 @@ class ManifestWriterTest : public testing::Test {
std::numeric_limits<uint64_t>::max()); std::numeric_limits<uint64_t>::max());
} }
// Create DB with 3 column families. void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families,
void NewDB() { SequenceNumber* last_seqno,
std::unique_ptr<log::Writer>* log_writer) {
assert(column_families != nullptr);
assert(last_seqno != nullptr);
assert(log_writer != nullptr);
VersionEdit new_db; VersionEdit new_db;
new_db.SetLogNumber(0); new_db.SetLogNumber(0);
new_db.SetNextFile(2); new_db.SetNextFile(2);
@ -646,6 +650,7 @@ class ManifestWriterTest : public testing::Test {
new_cf.SetLastSequence(last_seq++); new_cf.SetLastSequence(last_seq++);
new_cfs.emplace_back(new_cf); new_cfs.emplace_back(new_cf);
} }
*last_seqno = last_seq;
const std::string manifest = DescriptorFileName(dbname_, 1); const std::string manifest = DescriptorFileName(dbname_, 1);
unique_ptr<WritableFile> file; unique_ptr<WritableFile> file;
@ -655,32 +660,40 @@ class ManifestWriterTest : public testing::Test {
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), manifest, env_options_)); new WritableFileWriter(std::move(file), manifest, env_options_));
{ {
log::Writer log(std::move(file_writer), 0, false); log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
std::string record; std::string record;
new_db.EncodeTo(&record); new_db.EncodeTo(&record);
s = log.AddRecord(record); s = (*log_writer)->AddRecord(record);
for (const auto& e : new_cfs) { for (const auto& e : new_cfs) {
record.clear();
e.EncodeTo(&record); e.EncodeTo(&record);
s = log.AddRecord(record); s = (*log_writer)->AddRecord(record);
ASSERT_OK(s); ASSERT_OK(s);
} }
} }
ASSERT_OK(s); ASSERT_OK(s);
// Make "CURRENT" file point to the new manifest file.
s = SetCurrentFile(env_, dbname_, 1, nullptr);
std::vector<ColumnFamilyDescriptor> column_families;
cf_options_.table_factory = mock_table_factory_; cf_options_.table_factory = mock_table_factory_;
for (const auto& cf_name : cf_names) { for (const auto& cf_name : cf_names) {
column_families.emplace_back(cf_name, cf_options_); column_families->emplace_back(cf_name, cf_options_);
} }
}
// Create DB with 3 column families.
void NewDB() {
std::vector<ColumnFamilyDescriptor> column_families;
SequenceNumber last_seqno;
std::unique_ptr<log::Writer> log_writer;
PrepareManifest(&column_families, &last_seqno, &log_writer);
log_writer.reset();
// Make "CURRENT" file point to the new manifest file.
Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
ASSERT_OK(s);
EXPECT_OK(versions_->Recover(column_families, false)); EXPECT_OK(versions_->Recover(column_families, false));
EXPECT_EQ(kInitialNumOfCfs, EXPECT_EQ(column_families.size(),
versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfds_.emplace_back(cfd);
}
} }
Env* env_; Env* env_;
@ -692,14 +705,13 @@ class ManifestWriterTest : public testing::Test {
std::shared_ptr<Cache> table_cache_; std::shared_ptr<Cache> table_cache_;
WriteController write_controller_; WriteController write_controller_;
WriteBufferManager write_buffer_manager_; WriteBufferManager write_buffer_manager_;
std::unique_ptr<VersionSet> versions_; std::shared_ptr<VersionSet> versions_;
InstrumentedMutex mutex_; InstrumentedMutex mutex_;
std::atomic<bool> shutting_down_; std::atomic<bool> shutting_down_;
std::shared_ptr<mock::MockTableFactory> mock_table_factory_; std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
std::vector<ColumnFamilyData*> cfds_;
}; };
TEST_F(ManifestWriterTest, SameColumnFamilyGroupCommit) { TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
NewDB(); NewDB();
const int kGroupSize = 5; const int kGroupSize = 5;
autovector<VersionEdit> edits; autovector<VersionEdit> edits;
@ -710,13 +722,15 @@ TEST_F(ManifestWriterTest, SameColumnFamilyGroupCommit) {
autovector<const MutableCFOptions*> all_mutable_cf_options; autovector<const MutableCFOptions*> all_mutable_cf_options;
autovector<autovector<VersionEdit*>> edit_lists; autovector<autovector<VersionEdit*>> edit_lists;
for (int i = 0; i != kGroupSize; ++i) { for (int i = 0; i != kGroupSize; ++i) {
cfds.emplace_back(cfds_[0]); cfds.emplace_back(versions_->GetColumnFamilySet()->GetDefault());
all_mutable_cf_options.emplace_back(&mutable_cf_options_); all_mutable_cf_options.emplace_back(&mutable_cf_options_);
autovector<VersionEdit*> edit_list; autovector<VersionEdit*> edit_list;
edit_list.emplace_back(&edits[i]); edit_list.emplace_back(&edits[i]);
edit_lists.emplace_back(edit_list); edit_lists.emplace_back(edit_list);
} }
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
int count = 0; int count = 0;
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) { "VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) {
@ -732,6 +746,218 @@ TEST_F(ManifestWriterTest, SameColumnFamilyGroupCommit) {
EXPECT_OK(s); EXPECT_OK(s);
EXPECT_EQ(kGroupSize - 1, count); EXPECT_EQ(kGroupSize - 1, count);
} }
TEST_F(VersionSetTest, HandleValidAtomicGroup) {
std::vector<ColumnFamilyDescriptor> column_families;
SequenceNumber last_seqno;
std::unique_ptr<log::Writer> log_writer;
PrepareManifest(&column_families, &last_seqno, &log_writer);
// Append multiple version edits that form an atomic group
const int kAtomicGroupSize = 3;
std::vector<VersionEdit> edits(kAtomicGroupSize);
int remaining = kAtomicGroupSize;
for (size_t i = 0; i != edits.size(); ++i) {
edits[i].SetLogNumber(0);
edits[i].SetNextFile(2);
edits[i].MarkAtomicGroup(--remaining);
edits[i].SetLastSequence(last_seqno++);
}
Status s;
for (const auto& edit : edits) {
std::string record;
edit.EncodeTo(&record);
s = log_writer->AddRecord(record);
ASSERT_OK(s);
}
log_writer.reset();
s = SetCurrentFile(env_, dbname_, 1, nullptr);
ASSERT_OK(s);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
bool first_in_atomic_group = false;
bool last_in_atomic_group = false;
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::Recover:FirstInAtomicGroup", [&](void* arg) {
VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
EXPECT_EQ(edits.front().DebugString(),
e->DebugString()); // compare based on value
first_in_atomic_group = true;
});
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::Recover:LastInAtomicGroup", [&](void* arg) {
VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
EXPECT_EQ(edits.back().DebugString(),
e->DebugString()); // compare based on value
EXPECT_TRUE(first_in_atomic_group);
last_in_atomic_group = true;
});
SyncPoint::GetInstance()->EnableProcessing();
EXPECT_OK(versions_->Recover(column_families, false));
EXPECT_EQ(column_families.size(),
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
EXPECT_TRUE(first_in_atomic_group);
EXPECT_TRUE(last_in_atomic_group);
}
TEST_F(VersionSetTest, HandleIncompleteTrailingAtomicGroup) {
std::vector<ColumnFamilyDescriptor> column_families;
SequenceNumber last_seqno;
std::unique_ptr<log::Writer> log_writer;
PrepareManifest(&column_families, &last_seqno, &log_writer);
// Append multiple version edits that form an atomic group
const int kAtomicGroupSize = 4;
const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
std::vector<VersionEdit> edits(kNumberOfPersistedVersionEdits);
int remaining = kAtomicGroupSize;
for (size_t i = 0; i != edits.size(); ++i) {
edits[i].SetLogNumber(0);
edits[i].SetNextFile(2);
edits[i].MarkAtomicGroup(--remaining);
edits[i].SetLastSequence(last_seqno++);
}
Status s;
for (const auto& edit : edits) {
std::string record;
edit.EncodeTo(&record);
s = log_writer->AddRecord(record);
ASSERT_OK(s);
}
log_writer.reset();
s = SetCurrentFile(env_, dbname_, 1, nullptr);
ASSERT_OK(s);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
bool first_in_atomic_group = false;
bool last_in_atomic_group = false;
size_t num = 0;
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::Recover:FirstInAtomicGroup", [&](void* arg) {
VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
EXPECT_EQ(edits.front().DebugString(),
e->DebugString()); // compare based on value
first_in_atomic_group = true;
});
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::Recover:LastInAtomicGroup",
[&](void* /* arg */) { last_in_atomic_group = true; });
SyncPoint::GetInstance()->SetCallBack("VersionSet::Recover:AtomicGroup",
[&](void* /* arg */) { ++num; });
SyncPoint::GetInstance()->EnableProcessing();
EXPECT_OK(versions_->Recover(column_families, false));
EXPECT_EQ(column_families.size(),
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
EXPECT_TRUE(first_in_atomic_group);
EXPECT_FALSE(last_in_atomic_group);
EXPECT_EQ(kNumberOfPersistedVersionEdits, num);
}
TEST_F(VersionSetTest, HandleCorruptedAtomicGroup) {
std::vector<ColumnFamilyDescriptor> column_families;
SequenceNumber last_seqno;
std::unique_ptr<log::Writer> log_writer;
PrepareManifest(&column_families, &last_seqno, &log_writer);
// Append multiple version edits that form an atomic group
const int kAtomicGroupSize = 4;
std::vector<VersionEdit> edits(kAtomicGroupSize);
int remaining = kAtomicGroupSize;
for (size_t i = 0; i != edits.size(); ++i) {
edits[i].SetLogNumber(0);
edits[i].SetNextFile(2);
if (i != (kAtomicGroupSize / 2)) {
edits[i].MarkAtomicGroup(--remaining);
}
edits[i].SetLastSequence(last_seqno++);
}
Status s;
for (const auto& edit : edits) {
std::string record;
edit.EncodeTo(&record);
s = log_writer->AddRecord(record);
ASSERT_OK(s);
}
log_writer.reset();
s = SetCurrentFile(env_, dbname_, 1, nullptr);
ASSERT_OK(s);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
bool mixed = false;
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::Recover:AtomicGroupMixedWithNormalEdits", [&](void* arg) {
VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
EXPECT_EQ(edits[kAtomicGroupSize / 2].DebugString(), e->DebugString());
mixed = true;
});
SyncPoint::GetInstance()->EnableProcessing();
EXPECT_NOK(versions_->Recover(column_families, false));
EXPECT_EQ(column_families.size(),
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
EXPECT_TRUE(mixed);
}
TEST_F(VersionSetTest, HandleIncorrectAtomicGroupSize) {
std::vector<ColumnFamilyDescriptor> column_families;
SequenceNumber last_seqno;
std::unique_ptr<log::Writer> log_writer;
PrepareManifest(&column_families, &last_seqno, &log_writer);
// Append multiple version edits that form an atomic group
const int kAtomicGroupSize = 4;
std::vector<VersionEdit> edits(kAtomicGroupSize);
int remaining = kAtomicGroupSize;
for (size_t i = 0; i != edits.size(); ++i) {
edits[i].SetLogNumber(0);
edits[i].SetNextFile(2);
if (i != 1) {
edits[i].MarkAtomicGroup(--remaining);
} else {
edits[i].MarkAtomicGroup(remaining--);
}
edits[i].SetLastSequence(last_seqno++);
}
Status s;
for (const auto& edit : edits) {
std::string record;
edit.EncodeTo(&record);
s = log_writer->AddRecord(record);
ASSERT_OK(s);
}
log_writer.reset();
s = SetCurrentFile(env_, dbname_, 1, nullptr);
ASSERT_OK(s);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
bool incorrect_group_size = false;
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::Recover:IncorrectAtomicGroupSize", [&](void* arg) {
VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
EXPECT_EQ(edits[1].DebugString(), e->DebugString());
incorrect_group_size = true;
});
SyncPoint::GetInstance()->EnableProcessing();
EXPECT_NOK(versions_->Recover(column_families, false));
EXPECT_EQ(column_families.size(),
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
EXPECT_TRUE(incorrect_group_size);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save