Make RocksDB secondary instance respect atomic groups in version edits. (#5411)

Summary:
With this commit, RocksDB secondary instance respects atomic groups in version edits.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5411

Differential Revision: D15617512

Pulled By: HaoyuHuang

fbshipit-source-id: 913f4ede391d772dcaf5649e3cd2099fa292d120
main
haoyuhuang 6 years ago committed by Facebook Github Bot
parent ebe89ef9d8
commit 227b5d52df
  1. 2
      db/db_impl/db_secondary_test.cc
  2. 1
      db/version_edit.h
  3. 403
      db/version_set.cc
  4. 47
      db/version_set.h
  5. 542
      db/version_set_test.cc

@ -373,7 +373,7 @@ TEST_F(DBSecondaryTest, MissingTableFile) {
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers", "ReactiveVersionSet::ApplyOneVersionEditToBuilder:AfterLoadTableHandlers",
[&](void* arg) { [&](void* arg) {
Status s = *reinterpret_cast<Status*>(arg); Status s = *reinterpret_cast<Status*>(arg);
if (s.IsPathNotFound()) { if (s.IsPathNotFound()) {

@ -316,6 +316,7 @@ class VersionEdit {
friend class ReactiveVersionSet; friend class ReactiveVersionSet;
friend class VersionSet; friend class VersionSet;
friend class Version; friend class Version;
friend class AtomicGroupReadBuffer;
bool GetLevel(Slice* input, int* level, const char** msg); bool GetLevel(Slice* input, int* level, const char** msg);

@ -3313,6 +3313,51 @@ struct VersionSet::ManifestWriter {
edit_list(e) {} edit_list(e) {}
}; };
Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
assert(edit);
if (edit->is_in_atomic_group_) {
TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
if (replay_buffer_.empty()) {
replay_buffer_.resize(edit->remaining_entries_ + 1);
TEST_SYNC_POINT_CALLBACK(
"AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit);
}
read_edits_in_atomic_group_++;
if (read_edits_in_atomic_group_ + edit->remaining_entries_ !=
static_cast<uint32_t>(replay_buffer_.size())) {
TEST_SYNC_POINT_CALLBACK(
"AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit);
return Status::Corruption("corrupted atomic group");
}
replay_buffer_[read_edits_in_atomic_group_ - 1] = std::move(*edit);
if (read_edits_in_atomic_group_ == replay_buffer_.size()) {
TEST_SYNC_POINT_CALLBACK(
"AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit);
return Status::OK();
}
return Status::OK();
}
// A normal edit.
if (!replay_buffer().empty()) {
TEST_SYNC_POINT_CALLBACK(
"AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit);
return Status::Corruption("corrupted atomic group");
}
return Status::OK();
}
bool AtomicGroupReadBuffer::IsFull() const {
return read_edits_in_atomic_group_ == replay_buffer_.size();
}
bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); }
void AtomicGroupReadBuffer::Clear() {
read_edits_in_atomic_group_ = 0;
replay_buffer_.clear();
}
VersionSet::VersionSet(const std::string& dbname, VersionSet::VersionSet(const std::string& dbname,
const ImmutableDBOptions* _db_options, const ImmutableDBOptions* _db_options,
const EnvOptions& storage_options, Cache* table_cache, const EnvOptions& storage_options, Cache* table_cache,
@ -4071,6 +4116,74 @@ Status VersionSet::GetCurrentManifestPath(const std::string& dbname, Env* env,
return Status::OK(); return Status::OK();
} }
Status VersionSet::ReadAndRecover(
log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
builders,
bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
bool* have_last_sequence, SequenceNumber* last_sequence,
uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
assert(reader != nullptr);
assert(read_buffer != nullptr);
Status s;
Slice record;
std::string scratch;
size_t recovered_edits = 0;
while (reader->ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
break;
}
s = read_buffer->AddEdit(&edit);
if (!s.ok()) {
break;
}
if (edit.is_in_atomic_group_) {
if (read_buffer->IsFull()) {
// Apply edits in an atomic group when we have read all edits in the
// group.
for (auto& e : read_buffer->replay_buffer()) {
s = ApplyOneVersionEditToBuilder(
e, name_to_options, column_families_not_found, builders,
have_log_number, log_number, have_prev_log_number,
previous_log_number, have_next_file, next_file,
have_last_sequence, last_sequence, min_log_number_to_keep,
max_column_family);
if (!s.ok()) {
break;
}
recovered_edits++;
}
if (!s.ok()) {
break;
}
read_buffer->Clear();
}
} else {
// Apply a normal edit immediately.
s = ApplyOneVersionEditToBuilder(
edit, name_to_options, column_families_not_found, builders,
have_log_number, log_number, have_prev_log_number,
previous_log_number, have_next_file, next_file, have_last_sequence,
last_sequence, min_log_number_to_keep, max_column_family);
if (s.ok()) {
recovered_edits++;
}
}
}
if (!s.ok()) {
// Clear the buffer if we fail to decode/apply an edit.
read_buffer->Clear();
}
TEST_SYNC_POINT_CALLBACK("VersionSet::ReadAndRecover:RecoveredEdits",
&recovered_edits);
return s;
}
Status VersionSet::Recover( Status VersionSet::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only) { bool read_only) {
@ -4148,66 +4261,12 @@ Status VersionSet::Recover(
true /* checksum */, 0 /* log_number */); true /* checksum */, 0 /* log_number */);
Slice record; Slice record;
std::string scratch; std::string scratch;
std::vector<VersionEdit> replay_buffer; AtomicGroupReadBuffer read_buffer;
size_t num_entries_decoded = 0; s = ReadAndRecover(
while (reader.ReadRecord(&record, &scratch) && s.ok()) { &reader, &read_buffer, cf_name_to_options, column_families_not_found,
VersionEdit edit; builders, &have_log_number, &log_number, &have_prev_log_number,
s = edit.DecodeFrom(record); &previous_log_number, &have_next_file, &next_file, &have_last_sequence,
if (!s.ok()) { &last_sequence, &min_log_number_to_keep, &max_column_family);
break;
}
if (edit.is_in_atomic_group_) {
if (replay_buffer.empty()) {
replay_buffer.resize(edit.remaining_entries_ + 1);
TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:FirstInAtomicGroup",
&edit);
}
++num_entries_decoded;
if (num_entries_decoded + edit.remaining_entries_ !=
static_cast<uint32_t>(replay_buffer.size())) {
TEST_SYNC_POINT_CALLBACK(
"VersionSet::Recover:IncorrectAtomicGroupSize", &edit);
s = Status::Corruption("corrupted atomic group");
break;
}
replay_buffer[num_entries_decoded - 1] = std::move(edit);
if (num_entries_decoded == replay_buffer.size()) {
TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup",
&edit);
for (auto& e : replay_buffer) {
s = ApplyOneVersionEditToBuilder(
e, cf_name_to_options, column_families_not_found, builders,
&have_log_number, &log_number, &have_prev_log_number,
&previous_log_number, &have_next_file, &next_file,
&have_last_sequence, &last_sequence, &min_log_number_to_keep,
&max_column_family);
if (!s.ok()) {
break;
}
}
replay_buffer.clear();
num_entries_decoded = 0;
}
TEST_SYNC_POINT("VersionSet::Recover:AtomicGroup");
} else {
if (!replay_buffer.empty()) {
TEST_SYNC_POINT_CALLBACK(
"VersionSet::Recover:AtomicGroupMixedWithNormalEdits", &edit);
s = Status::Corruption("corrupted atomic group");
break;
}
s = ApplyOneVersionEditToBuilder(
edit, cf_name_to_options, column_families_not_found, builders,
&have_log_number, &log_number, &have_prev_log_number,
&previous_log_number, &have_next_file, &next_file,
&have_last_sequence, &last_sequence, &min_log_number_to_keep,
&max_column_family);
}
if (!s.ok()) {
break;
}
}
} }
if (s.ok()) { if (s.ok()) {
@ -5218,19 +5277,11 @@ Status ReactiveVersionSet::Recover(
assert(reader != nullptr); assert(reader != nullptr);
Slice record; Slice record;
std::string scratch; std::string scratch;
while (s.ok() && reader->ReadRecord(&record, &scratch)) { s = ReadAndRecover(
VersionEdit edit; reader, &read_buffer_, cf_name_to_options, column_families_not_found,
s = edit.DecodeFrom(record); builders, &have_log_number, &log_number, &have_prev_log_number,
if (!s.ok()) { &previous_log_number, &have_next_file, &next_file, &have_last_sequence,
break; &last_sequence, &min_log_number_to_keep, &max_column_family);
}
s = ApplyOneVersionEditToBuilder(
edit, cf_name_to_options, column_families_not_found, builders,
&have_log_number, &log_number, &have_prev_log_number,
&previous_log_number, &have_next_file, &next_file,
&have_last_sequence, &last_sequence, &min_log_number_to_keep,
&max_column_family);
}
if (s.ok()) { if (s.ok()) {
bool enough = have_next_file && have_log_number && have_last_sequence; bool enough = have_next_file && have_log_number && have_last_sequence;
if (enough) { if (enough) {
@ -5350,7 +5401,7 @@ Status ReactiveVersionSet::ReadAndApply(
uint64_t previous_log_number = 0; uint64_t previous_log_number = 0;
uint32_t max_column_family = 0; uint32_t max_column_family = 0;
uint64_t min_log_number_to_keep = 0; uint64_t min_log_number_to_keep = 0;
uint64_t applied_edits = 0;
while (s.ok()) { while (s.ok()) {
Slice record; Slice record;
std::string scratch; std::string scratch;
@ -5362,73 +5413,46 @@ Status ReactiveVersionSet::ReadAndApply(
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
ColumnFamilyData* cfd =
column_family_set_->GetColumnFamily(edit.column_family_); s = read_buffer_.AddEdit(&edit);
// If we cannot find this column family in our column family set, then it
// may be a new column family created by the primary after the secondary
// starts. Ignore it for now.
if (nullptr == cfd) {
continue;
}
if (active_version_builders_.find(edit.column_family_) ==
active_version_builders_.end()) {
std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
new BaseReferencedVersionBuilder(cfd));
active_version_builders_.insert(
std::make_pair(edit.column_family_, std::move(builder_guard)));
}
s = ApplyOneVersionEditToBuilder(
edit, &have_log_number, &log_number, &have_prev_log_number,
&previous_log_number, &have_next_file, &next_file,
&have_last_sequence, &last_sequence, &min_log_number_to_keep,
&max_column_family);
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
auto builder_iter = active_version_builders_.find(edit.column_family_); if (edit.is_in_atomic_group_) {
assert(builder_iter != active_version_builders_.end()); if (read_buffer_.IsFull()) {
auto builder = builder_iter->second->version_builder(); // Apply edits in an atomic group when we have read all edits in the
assert(builder != nullptr); // group.
s = builder->LoadTableHandlers( for (auto& e : read_buffer_.replay_buffer()) {
cfd->internal_stats(), db_options_->max_file_opening_threads, s = ApplyOneVersionEditToBuilder(
false /* prefetch_index_and_filter_in_cache */, e, cfds_changed, &have_log_number, &log_number,
false /* is_initial_load */, &have_prev_log_number, &previous_log_number, &have_next_file,
cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); &next_file, &have_last_sequence, &last_sequence,
TEST_SYNC_POINT_CALLBACK( &min_log_number_to_keep, &max_column_family);
"ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers", &s); if (!s.ok()) {
if (!s.ok() && !s.IsPathNotFound()) { break;
break; }
} else if (s.IsPathNotFound()) { applied_edits++;
s = Status::OK(); }
} else { // s.ok() == true if (!s.ok()) {
auto version = new Version(cfd, this, env_options_, break;
*cfd->GetLatestMutableCFOptions(), }
current_version_number_++); read_buffer_.Clear();
builder->SaveTo(version->storage_info()); }
version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true); } else {
AppendVersion(cfd, version); // Apply a normal edit immediately.
active_version_builders_.erase(builder_iter); s = ApplyOneVersionEditToBuilder(
if (cfds_changed->count(cfd) == 0) { edit, cfds_changed, &have_log_number, &log_number,
cfds_changed->insert(cfd); &have_prev_log_number, &previous_log_number, &have_next_file,
&next_file, &have_last_sequence, &last_sequence,
&min_log_number_to_keep, &max_column_family);
if (s.ok()) {
applied_edits++;
} }
} }
if (have_next_file) { }
next_file_number_.store(next_file + 1); if (!s.ok()) {
} // Clear the buffer if we fail to decode/apply an edit.
if (have_last_sequence) { read_buffer_.Clear();
last_allocated_sequence_ = last_sequence;
last_published_sequence_ = last_sequence;
last_sequence_ = last_sequence;
}
if (have_prev_log_number) {
prev_log_number_ = previous_log_number;
MarkFileNumberUsed(previous_log_number);
}
if (have_log_number) {
MarkFileNumberUsed(log_number);
}
column_family_set_->UpdateMaxColumnFamily(max_column_family);
MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
} }
// It's possible that: // It's possible that:
// 1) s.IsCorruption(), indicating the current MANIFEST is corrupted. // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
@ -5457,52 +5481,113 @@ Status ReactiveVersionSet::ReadAndApply(
} }
} }
} }
TEST_SYNC_POINT_CALLBACK("ReactiveVersionSet::ReadAndApply:AppliedEdits",
&applied_edits);
return s; return s;
} }
Status ReactiveVersionSet::ApplyOneVersionEditToBuilder( Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
VersionEdit& edit, bool* have_log_number, uint64_t* log_number, VersionEdit& edit, std::unordered_set<ColumnFamilyData*>* cfds_changed,
bool* have_prev_log_number, uint64_t* previous_log_number, bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
bool* have_next_file, uint64_t* next_file, bool* have_last_sequence, uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, bool* have_last_sequence, SequenceNumber* last_sequence,
uint32_t* max_column_family) { uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
ColumnFamilyData* cfd = nullptr; ColumnFamilyData* cfd =
Status status; column_family_set_->GetColumnFamily(edit.column_family_);
// If we cannot find this column family in our column family set, then it
// may be a new column family created by the primary after the secondary
// starts. It is also possible that the secondary instance opens only a subset
// of column families. Ignore it for now.
if (nullptr == cfd) {
return Status::OK();
}
if (active_version_builders_.find(edit.column_family_) ==
active_version_builders_.end()) {
std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
new BaseReferencedVersionBuilder(cfd));
active_version_builders_.insert(
std::make_pair(edit.column_family_, std::move(builder_guard)));
}
auto builder_iter = active_version_builders_.find(edit.column_family_);
assert(builder_iter != active_version_builders_.end());
auto builder = builder_iter->second->version_builder();
assert(builder != nullptr);
if (edit.is_column_family_add_) { if (edit.is_column_family_add_) {
// TODO (yanqin) for now the secondary ignores column families created // TODO (yanqin) for now the secondary ignores column families created
// after Open. This also simplifies handling of switching to a new MANIFEST // after Open. This also simplifies handling of switching to a new MANIFEST
// and processing the snapshot of the system at the beginning of the // and processing the snapshot of the system at the beginning of the
// MANIFEST. // MANIFEST.
return Status::OK();
} else if (edit.is_column_family_drop_) { } else if (edit.is_column_family_drop_) {
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// Drop a CF created by primary after secondary starts? Then ignore
if (cfd == nullptr) {
return Status::OK();
}
// Drop the column family by setting it to be 'dropped' without destroying // Drop the column family by setting it to be 'dropped' without destroying
// the column family handle. // the column family handle.
// TODO (haoyu) figure out how to handle column faimly drop for
// secondary instance. (Is it possible that the ref count for cfd is 0 but
// the ref count for its versions is higher than 0?)
cfd->SetDropped(); cfd->SetDropped();
if (cfd->Unref()) { if (cfd->Unref()) {
delete cfd; delete cfd;
cfd = nullptr; cfd = nullptr;
} }
} else { } else {
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// Operation on a CF created after Open? Then ignore
if (cfd == nullptr) {
return Status::OK();
}
auto builder_iter = active_version_builders_.find(edit.column_family_);
assert(builder_iter != active_version_builders_.end());
auto builder = builder_iter->second->version_builder();
assert(builder != nullptr);
builder->Apply(&edit); builder->Apply(&edit);
} }
return ExtractInfoFromVersionEdit( Status s = ExtractInfoFromVersionEdit(
cfd, edit, have_log_number, log_number, have_prev_log_number, cfd, edit, have_log_number, log_number, have_prev_log_number,
previous_log_number, have_next_file, next_file, have_last_sequence, previous_log_number, have_next_file, next_file, have_last_sequence,
last_sequence, min_log_number_to_keep, max_column_family); last_sequence, min_log_number_to_keep, max_column_family);
if (!s.ok()) {
return s;
}
if (cfd != nullptr) {
s = builder->LoadTableHandlers(
cfd->internal_stats(), db_options_->max_file_opening_threads,
false /* prefetch_index_and_filter_in_cache */,
false /* is_initial_load */,
cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
TEST_SYNC_POINT_CALLBACK(
"ReactiveVersionSet::ApplyOneVersionEditToBuilder:"
"AfterLoadTableHandlers",
&s);
if (s.ok()) {
auto version = new Version(cfd, this, env_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(version->storage_info());
version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
AppendVersion(cfd, version);
active_version_builders_.erase(builder_iter);
if (cfds_changed->count(cfd) == 0) {
cfds_changed->insert(cfd);
}
} else if (s.IsPathNotFound()) {
s = Status::OK();
}
// Some other error has occurred during LoadTableHandlers.
}
if (have_next_file) {
next_file_number_.store(*next_file + 1);
}
if (have_last_sequence) {
last_allocated_sequence_ = *last_sequence;
last_published_sequence_ = *last_sequence;
last_sequence_ = *last_sequence;
}
if (have_prev_log_number) {
prev_log_number_ = *previous_log_number;
MarkFileNumberUsed(*previous_log_number);
}
if (have_log_number) {
MarkFileNumberUsed(*log_number);
}
column_family_set_->UpdateMaxColumnFamily(*max_column_family);
MarkMinLogNumberToKeep2PC(*min_log_number_to_keep);
return s;
} }
Status ReactiveVersionSet::MaybeSwitchManifest( Status ReactiveVersionSet::MaybeSwitchManifest(

@ -752,6 +752,23 @@ struct ObsoleteFileInfo {
class BaseReferencedVersionBuilder; class BaseReferencedVersionBuilder;
class AtomicGroupReadBuffer {
public:
Status AddEdit(VersionEdit* edit);
void Clear();
bool IsFull() const;
bool IsEmpty() const;
uint64_t TEST_read_edits_in_atomic_group() const {
return read_edits_in_atomic_group_;
}
std::vector<VersionEdit>& replay_buffer() { return replay_buffer_; }
private:
uint64_t read_edits_in_atomic_group_ = 0;
std::vector<VersionEdit> replay_buffer_;
};
// VersionSet is the collection of versions of all the column families of the // VersionSet is the collection of versions of all the column families of the
// database. Each database owns one VersionSet. A VersionSet has access to all // database. Each database owns one VersionSet. A VersionSet has access to all
// column families via ColumnFamilySet, i.e. set of the column families. // column families via ColumnFamilySet, i.e. set of the column families.
@ -1028,6 +1045,18 @@ class VersionSet {
ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options, ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options,
VersionEdit* edit); VersionEdit* edit);
Status ReadAndRecover(
log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
const std::unordered_map<std::string, ColumnFamilyOptions>&
name_to_options,
std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map<
uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders,
bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
bool* have_last_sequence, SequenceNumber* last_sequence,
uint64_t* min_log_number_to_keep, uint32_t* max_column_family);
// REQUIRES db mutex // REQUIRES db mutex
Status ApplyOneVersionEditToBuilder( Status ApplyOneVersionEditToBuilder(
VersionEdit& edit, VersionEdit& edit,
@ -1135,16 +1164,23 @@ class ReactiveVersionSet : public VersionSet {
std::unique_ptr<log::Reader::Reporter>* manifest_reporter, std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
std::unique_ptr<Status>* manifest_reader_status); std::unique_ptr<Status>* manifest_reader_status);
uint64_t TEST_read_edits_in_atomic_group() const {
return read_buffer_.TEST_read_edits_in_atomic_group();
}
std::vector<VersionEdit>& replay_buffer() {
return read_buffer_.replay_buffer();
}
protected: protected:
using VersionSet::ApplyOneVersionEditToBuilder; using VersionSet::ApplyOneVersionEditToBuilder;
// REQUIRES db mutex // REQUIRES db mutex
Status ApplyOneVersionEditToBuilder( Status ApplyOneVersionEditToBuilder(
VersionEdit& edit, bool* have_log_number, uint64_t* log_number, VersionEdit& edit, std::unordered_set<ColumnFamilyData*>* cfds_changed,
bool* have_prev_log_number, uint64_t* previous_log_number, bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
bool* have_next_file, uint64_t* next_file, bool* have_last_sequence, uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, bool* have_last_sequence, SequenceNumber* last_sequence,
uint32_t* max_column_family); uint64_t* min_log_number_to_keep, uint32_t* max_column_family);
Status MaybeSwitchManifest( Status MaybeSwitchManifest(
log::Reader::Reporter* reporter, log::Reader::Reporter* reporter,
@ -1153,6 +1189,7 @@ class ReactiveVersionSet : public VersionSet {
private: private:
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>> std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
active_version_builders_; active_version_builders_;
AtomicGroupReadBuffer read_buffer_;
using VersionSet::LogAndApply; using VersionSet::LogAndApply;
using VersionSet::Recover; using VersionSet::Recover;

@ -607,6 +607,7 @@ class VersionSetTestBase {
const static std::string kColumnFamilyName1; const static std::string kColumnFamilyName1;
const static std::string kColumnFamilyName2; const static std::string kColumnFamilyName2;
const static std::string kColumnFamilyName3; const static std::string kColumnFamilyName3;
int num_initial_edits_;
VersionSetTestBase() VersionSetTestBase()
: env_(Env::Default()), : env_(Env::Default()),
@ -618,6 +619,9 @@ class VersionSetTestBase {
versions_(new VersionSet(dbname_, &db_options_, env_options_, versions_(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_manager_, table_cache_.get(), &write_buffer_manager_,
&write_controller_)), &write_controller_)),
reactive_versions_(std::make_shared<ReactiveVersionSet>(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_)),
shutting_down_(false), shutting_down_(false),
mock_table_factory_(std::make_shared<mock::MockTableFactory>()) { mock_table_factory_(std::make_shared<mock::MockTableFactory>()) {
EXPECT_OK(env_->CreateDirIfMissing(dbname_)); EXPECT_OK(env_->CreateDirIfMissing(dbname_));
@ -653,7 +657,7 @@ class VersionSetTestBase {
new_cfs.emplace_back(new_cf); new_cfs.emplace_back(new_cf);
} }
*last_seqno = last_seq; *last_seqno = last_seq;
num_initial_edits_ = static_cast<int>(new_cfs.size() + 1);
const std::string manifest = DescriptorFileName(dbname_, 1); const std::string manifest = DescriptorFileName(dbname_, 1);
std::unique_ptr<WritableFile> file; std::unique_ptr<WritableFile> file;
Status s = env_->NewWritableFile( Status s = env_->NewWritableFile(
@ -708,6 +712,7 @@ class VersionSetTestBase {
WriteController write_controller_; WriteController write_controller_;
WriteBufferManager write_buffer_manager_; WriteBufferManager write_buffer_manager_;
std::shared_ptr<VersionSet> versions_; std::shared_ptr<VersionSet> versions_;
std::shared_ptr<ReactiveVersionSet> reactive_versions_;
InstrumentedMutex mutex_; InstrumentedMutex mutex_;
std::atomic<bool> shutting_down_; std::atomic<bool> shutting_down_;
std::shared_ptr<mock::MockTableFactory> mock_table_factory_; std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
@ -758,216 +763,388 @@ TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
EXPECT_EQ(kGroupSize - 1, count); EXPECT_EQ(kGroupSize - 1, count);
} }
TEST_F(VersionSetTest, HandleValidAtomicGroup) { class VersionSetAtomicGroupTest : public VersionSetTestBase,
std::vector<ColumnFamilyDescriptor> column_families; public testing::Test {
SequenceNumber last_seqno; public:
std::unique_ptr<log::Writer> log_writer; VersionSetAtomicGroupTest() : VersionSetTestBase() {}
PrepareManifest(&column_families, &last_seqno, &log_writer);
// Append multiple version edits that form an atomic group void SetUp() override {
const int kAtomicGroupSize = 3; PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
std::vector<VersionEdit> edits(kAtomicGroupSize); SetupTestSyncPoints();
int remaining = kAtomicGroupSize;
for (size_t i = 0; i != edits.size(); ++i) {
edits[i].SetLogNumber(0);
edits[i].SetNextFile(2);
edits[i].MarkAtomicGroup(--remaining);
edits[i].SetLastSequence(last_seqno++);
} }
Status s;
for (const auto& edit : edits) {
std::string record;
edit.EncodeTo(&record);
s = log_writer->AddRecord(record);
ASSERT_OK(s);
}
log_writer.reset();
s = SetCurrentFile(env_, dbname_, 1, nullptr);
ASSERT_OK(s);
SyncPoint::GetInstance()->DisableProcessing(); void SetupValidAtomicGroup(int atomic_group_size) {
SyncPoint::GetInstance()->ClearAllCallBacks(); edits_.resize(atomic_group_size);
int remaining = atomic_group_size;
for (size_t i = 0; i != edits_.size(); ++i) {
edits_[i].SetLogNumber(0);
edits_[i].SetNextFile(2);
edits_[i].MarkAtomicGroup(--remaining);
edits_[i].SetLastSequence(last_seqno_++);
}
ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
}
bool first_in_atomic_group = false; void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) {
bool last_in_atomic_group = false; edits_.resize(atomic_group_size);
int remaining = atomic_group_size;
for (size_t i = 0; i != edits_.size(); ++i) {
edits_[i].SetLogNumber(0);
edits_[i].SetNextFile(2);
edits_[i].MarkAtomicGroup(--remaining);
edits_[i].SetLastSequence(last_seqno_++);
}
ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
}
SyncPoint::GetInstance()->SetCallBack( void SetupCorruptedAtomicGroup(int atomic_group_size) {
"VersionSet::Recover:FirstInAtomicGroup", [&](void* arg) { edits_.resize(atomic_group_size);
VersionEdit* e = reinterpret_cast<VersionEdit*>(arg); int remaining = atomic_group_size;
EXPECT_EQ(edits.front().DebugString(), for (size_t i = 0; i != edits_.size(); ++i) {
e->DebugString()); // compare based on value edits_[i].SetLogNumber(0);
first_in_atomic_group = true; edits_[i].SetNextFile(2);
}); if (i != ((size_t)atomic_group_size / 2)) {
SyncPoint::GetInstance()->SetCallBack( edits_[i].MarkAtomicGroup(--remaining);
"VersionSet::Recover:LastInAtomicGroup", [&](void* arg) { }
VersionEdit* e = reinterpret_cast<VersionEdit*>(arg); edits_[i].SetLastSequence(last_seqno_++);
EXPECT_EQ(edits.back().DebugString(), }
e->DebugString()); // compare based on value ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
EXPECT_TRUE(first_in_atomic_group); }
last_in_atomic_group = true;
});
SyncPoint::GetInstance()->EnableProcessing();
EXPECT_OK(versions_->Recover(column_families, false)); void SetupIncorrectAtomicGroup(int atomic_group_size) {
EXPECT_EQ(column_families.size(), edits_.resize(atomic_group_size);
versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); int remaining = atomic_group_size;
EXPECT_TRUE(first_in_atomic_group); for (size_t i = 0; i != edits_.size(); ++i) {
EXPECT_TRUE(last_in_atomic_group); edits_[i].SetLogNumber(0);
} edits_[i].SetNextFile(2);
if (i != 1) {
edits_[i].MarkAtomicGroup(--remaining);
} else {
edits_[i].MarkAtomicGroup(remaining--);
}
edits_[i].SetLastSequence(last_seqno_++);
}
ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
}
TEST_F(VersionSetTest, HandleIncompleteTrailingAtomicGroup) { void SetupTestSyncPoints() {
std::vector<ColumnFamilyDescriptor> column_families; SyncPoint::GetInstance()->DisableProcessing();
SequenceNumber last_seqno; SyncPoint::GetInstance()->ClearAllCallBacks();
std::unique_ptr<log::Writer> log_writer; SyncPoint::GetInstance()->SetCallBack(
PrepareManifest(&column_families, &last_seqno, &log_writer); "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", [&](void* arg) {
VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
EXPECT_EQ(edits_.front().DebugString(),
e->DebugString()); // compare based on value
first_in_atomic_group_ = true;
});
SyncPoint::GetInstance()->SetCallBack(
"AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", [&](void* arg) {
VersionEdit* e = reinterpret_cast<VersionEdit*>(arg);
EXPECT_EQ(edits_.back().DebugString(),
e->DebugString()); // compare based on value
EXPECT_TRUE(first_in_atomic_group_);
last_in_atomic_group_ = true;
});
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::ReadAndRecover:RecoveredEdits", [&](void* arg) {
num_recovered_edits_ = *reinterpret_cast<int*>(arg);
});
SyncPoint::GetInstance()->SetCallBack(
"ReactiveVersionSet::ReadAndApply:AppliedEdits",
[&](void* arg) { num_applied_edits_ = *reinterpret_cast<int*>(arg); });
SyncPoint::GetInstance()->SetCallBack(
"AtomicGroupReadBuffer::AddEdit:AtomicGroup",
[&](void* /* arg */) { ++num_edits_in_atomic_group_; });
SyncPoint::GetInstance()->SetCallBack(
"AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits",
[&](void* arg) {
corrupted_edit_ = *reinterpret_cast<VersionEdit*>(arg);
});
SyncPoint::GetInstance()->SetCallBack(
"AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize",
[&](void* arg) {
edit_with_incorrect_group_size_ =
*reinterpret_cast<VersionEdit*>(arg);
});
SyncPoint::GetInstance()->EnableProcessing();
}
// Append multiple version edits that form an atomic group void AddNewEditsToLog(int num_edits) {
const int kAtomicGroupSize = 4; for (int i = 0; i < num_edits; i++) {
const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1; std::string record;
std::vector<VersionEdit> edits(kNumberOfPersistedVersionEdits); edits_[i].EncodeTo(&record);
int remaining = kAtomicGroupSize; ASSERT_OK(log_writer_->AddRecord(record));
for (size_t i = 0; i != edits.size(); ++i) { }
edits[i].SetLogNumber(0);
edits[i].SetNextFile(2);
edits[i].MarkAtomicGroup(--remaining);
edits[i].SetLastSequence(last_seqno++);
} }
Status s;
for (const auto& edit : edits) { void TearDown() override {
std::string record; SyncPoint::GetInstance()->DisableProcessing();
edit.EncodeTo(&record); SyncPoint::GetInstance()->ClearAllCallBacks();
s = log_writer->AddRecord(record); log_writer_.reset();
ASSERT_OK(s);
} }
log_writer.reset();
s = SetCurrentFile(env_, dbname_, 1, nullptr); protected:
ASSERT_OK(s); std::vector<ColumnFamilyDescriptor> column_families_;
SequenceNumber last_seqno_;
std::vector<VersionEdit> edits_;
bool first_in_atomic_group_ = false;
bool last_in_atomic_group_ = false;
int num_edits_in_atomic_group_ = 0;
int num_recovered_edits_ = 0;
int num_applied_edits_ = 0;
VersionEdit corrupted_edit_;
VersionEdit edit_with_incorrect_group_size_;
std::unique_ptr<log::Writer> log_writer_;
};
SyncPoint::GetInstance()->DisableProcessing(); TEST_F(VersionSetAtomicGroupTest, HandleValidAtomicGroupWithVersionSetRecover) {
SyncPoint::GetInstance()->ClearAllCallBacks(); const int kAtomicGroupSize = 3;
SetupValidAtomicGroup(kAtomicGroupSize);
AddNewEditsToLog(kAtomicGroupSize);
EXPECT_OK(versions_->Recover(column_families_, false));
EXPECT_EQ(column_families_.size(),
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
EXPECT_TRUE(first_in_atomic_group_);
EXPECT_TRUE(last_in_atomic_group_);
EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
EXPECT_EQ(0, num_applied_edits_);
}
bool first_in_atomic_group = false; TEST_F(VersionSetAtomicGroupTest,
bool last_in_atomic_group = false; HandleValidAtomicGroupWithReactiveVersionSetRecover) {
size_t num = 0; const int kAtomicGroupSize = 3;
SetupValidAtomicGroup(kAtomicGroupSize);
AddNewEditsToLog(kAtomicGroupSize);
std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
std::unique_ptr<log::Reader::Reporter> manifest_reporter;
std::unique_ptr<Status> manifest_reader_status;
EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
&manifest_reporter,
&manifest_reader_status));
EXPECT_EQ(column_families_.size(),
reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
EXPECT_TRUE(first_in_atomic_group_);
EXPECT_TRUE(last_in_atomic_group_);
// The recover should clean up the replay buffer.
EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
EXPECT_EQ(num_initial_edits_ + kAtomicGroupSize, num_recovered_edits_);
EXPECT_EQ(0, num_applied_edits_);
}
SyncPoint::GetInstance()->SetCallBack( TEST_F(VersionSetAtomicGroupTest,
"VersionSet::Recover:FirstInAtomicGroup", [&](void* arg) { HandleValidAtomicGroupWithReactiveVersionSetReadAndApply) {
VersionEdit* e = reinterpret_cast<VersionEdit*>(arg); const int kAtomicGroupSize = 3;
EXPECT_EQ(edits.front().DebugString(), SetupValidAtomicGroup(kAtomicGroupSize);
e->DebugString()); // compare based on value std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
first_in_atomic_group = true; std::unique_ptr<log::Reader::Reporter> manifest_reporter;
}); std::unique_ptr<Status> manifest_reader_status;
SyncPoint::GetInstance()->SetCallBack( EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
"VersionSet::Recover:LastInAtomicGroup", &manifest_reporter,
[&](void* /* arg */) { last_in_atomic_group = true; }); &manifest_reader_status));
SyncPoint::GetInstance()->SetCallBack("VersionSet::Recover:AtomicGroup", AddNewEditsToLog(kAtomicGroupSize);
[&](void* /* arg */) { ++num; }); InstrumentedMutex mu;
SyncPoint::GetInstance()->EnableProcessing(); std::unordered_set<ColumnFamilyData*> cfds_changed;
mu.Lock();
EXPECT_OK(
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
mu.Unlock();
EXPECT_TRUE(first_in_atomic_group_);
EXPECT_TRUE(last_in_atomic_group_);
// The recover should clean up the replay buffer.
EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
EXPECT_EQ(kAtomicGroupSize, num_applied_edits_);
}
EXPECT_OK(versions_->Recover(column_families, false)); TEST_F(VersionSetAtomicGroupTest,
EXPECT_EQ(column_families.size(), HandleIncompleteTrailingAtomicGroupWithVersionSetRecover) {
const int kAtomicGroupSize = 4;
const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
AddNewEditsToLog(kNumberOfPersistedVersionEdits);
EXPECT_OK(versions_->Recover(column_families_, false));
EXPECT_EQ(column_families_.size(),
versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
EXPECT_TRUE(first_in_atomic_group); EXPECT_TRUE(first_in_atomic_group_);
EXPECT_FALSE(last_in_atomic_group); EXPECT_FALSE(last_in_atomic_group_);
EXPECT_EQ(kNumberOfPersistedVersionEdits, num); EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
EXPECT_EQ(0, num_applied_edits_);
} }
TEST_F(VersionSetTest, HandleCorruptedAtomicGroup) { TEST_F(VersionSetAtomicGroupTest,
std::vector<ColumnFamilyDescriptor> column_families; HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetRecover) {
SequenceNumber last_seqno;
std::unique_ptr<log::Writer> log_writer;
PrepareManifest(&column_families, &last_seqno, &log_writer);
// Append multiple version edits that form an atomic group
const int kAtomicGroupSize = 4; const int kAtomicGroupSize = 4;
std::vector<VersionEdit> edits(kAtomicGroupSize); const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
int remaining = kAtomicGroupSize; SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
for (size_t i = 0; i != edits.size(); ++i) { AddNewEditsToLog(kNumberOfPersistedVersionEdits);
edits[i].SetLogNumber(0); std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
edits[i].SetNextFile(2); std::unique_ptr<log::Reader::Reporter> manifest_reporter;
if (i != (kAtomicGroupSize / 2)) { std::unique_ptr<Status> manifest_reader_status;
edits[i].MarkAtomicGroup(--remaining); EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
} &manifest_reporter,
edits[i].SetLastSequence(last_seqno++); &manifest_reader_status));
} EXPECT_EQ(column_families_.size(),
Status s; reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
for (const auto& edit : edits) { EXPECT_TRUE(first_in_atomic_group_);
std::string record; EXPECT_FALSE(last_in_atomic_group_);
edit.EncodeTo(&record); EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
s = log_writer->AddRecord(record); // Reactive version set should store the edits in the replay buffer.
ASSERT_OK(s); EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
} kNumberOfPersistedVersionEdits);
log_writer.reset(); EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
// Write the last record. The reactive version set should now apply all
s = SetCurrentFile(env_, dbname_, 1, nullptr); // edits.
ASSERT_OK(s); std::string last_record;
edits_[kAtomicGroupSize - 1].EncodeTo(&last_record);
EXPECT_OK(log_writer_->AddRecord(last_record));
InstrumentedMutex mu;
std::unordered_set<ColumnFamilyData*> cfds_changed;
mu.Lock();
EXPECT_OK(
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
mu.Unlock();
// Reactive version set should be empty now.
EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
EXPECT_TRUE(reactive_versions_->replay_buffer().size() == 0);
EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
EXPECT_EQ(kAtomicGroupSize, num_applied_edits_);
}
SyncPoint::GetInstance()->DisableProcessing(); TEST_F(VersionSetAtomicGroupTest,
SyncPoint::GetInstance()->ClearAllCallBacks(); HandleIncompleteTrailingAtomicGroupWithReactiveVersionSetReadAndApply) {
const int kAtomicGroupSize = 4;
const int kNumberOfPersistedVersionEdits = kAtomicGroupSize - 1;
SetupIncompleteTrailingAtomicGroup(kAtomicGroupSize);
std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
std::unique_ptr<log::Reader::Reporter> manifest_reporter;
std::unique_ptr<Status> manifest_reader_status;
// No edits in an atomic group.
EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
&manifest_reporter,
&manifest_reader_status));
EXPECT_EQ(column_families_.size(),
reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
// Write a few edits in an atomic group.
AddNewEditsToLog(kNumberOfPersistedVersionEdits);
InstrumentedMutex mu;
std::unordered_set<ColumnFamilyData*> cfds_changed;
mu.Lock();
EXPECT_OK(
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
mu.Unlock();
EXPECT_TRUE(first_in_atomic_group_);
EXPECT_FALSE(last_in_atomic_group_);
EXPECT_EQ(kNumberOfPersistedVersionEdits, num_edits_in_atomic_group_);
// Reactive version set should store the edits in the replay buffer.
EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() ==
kNumberOfPersistedVersionEdits);
EXPECT_TRUE(reactive_versions_->replay_buffer().size() == kAtomicGroupSize);
EXPECT_EQ(num_initial_edits_, num_recovered_edits_);
EXPECT_EQ(0, num_applied_edits_);
}
bool mixed = false; TEST_F(VersionSetAtomicGroupTest,
SyncPoint::GetInstance()->SetCallBack( HandleCorruptedAtomicGroupWithVersionSetRecover) {
"VersionSet::Recover:AtomicGroupMixedWithNormalEdits", [&](void* arg) { const int kAtomicGroupSize = 4;
VersionEdit* e = reinterpret_cast<VersionEdit*>(arg); SetupCorruptedAtomicGroup(kAtomicGroupSize);
EXPECT_EQ(edits[kAtomicGroupSize / 2].DebugString(), e->DebugString()); AddNewEditsToLog(kAtomicGroupSize);
mixed = true; EXPECT_NOK(versions_->Recover(column_families_, false));
}); EXPECT_EQ(column_families_.size(),
SyncPoint::GetInstance()->EnableProcessing();
EXPECT_NOK(versions_->Recover(column_families, false));
EXPECT_EQ(column_families.size(),
versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
EXPECT_TRUE(mixed); EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
corrupted_edit_.DebugString());
} }
TEST_F(VersionSetTest, HandleIncorrectAtomicGroupSize) { TEST_F(VersionSetAtomicGroupTest,
std::vector<ColumnFamilyDescriptor> column_families; HandleCorruptedAtomicGroupWithReactiveVersionSetRecover) {
SequenceNumber last_seqno; const int kAtomicGroupSize = 4;
std::unique_ptr<log::Writer> log_writer; SetupCorruptedAtomicGroup(kAtomicGroupSize);
PrepareManifest(&column_families, &last_seqno, &log_writer); AddNewEditsToLog(kAtomicGroupSize);
std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
std::unique_ptr<log::Reader::Reporter> manifest_reporter;
std::unique_ptr<Status> manifest_reader_status;
EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
&manifest_reporter,
&manifest_reader_status));
EXPECT_EQ(column_families_.size(),
reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
corrupted_edit_.DebugString());
}
// Append multiple version edits that form an atomic group TEST_F(VersionSetAtomicGroupTest,
HandleCorruptedAtomicGroupWithReactiveVersionSetReadAndApply) {
const int kAtomicGroupSize = 4; const int kAtomicGroupSize = 4;
std::vector<VersionEdit> edits(kAtomicGroupSize); SetupCorruptedAtomicGroup(kAtomicGroupSize);
int remaining = kAtomicGroupSize; InstrumentedMutex mu;
for (size_t i = 0; i != edits.size(); ++i) { std::unordered_set<ColumnFamilyData*> cfds_changed;
edits[i].SetLogNumber(0); std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
edits[i].SetNextFile(2); std::unique_ptr<log::Reader::Reporter> manifest_reporter;
if (i != 1) { std::unique_ptr<Status> manifest_reader_status;
edits[i].MarkAtomicGroup(--remaining); EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
} else { &manifest_reporter,
edits[i].MarkAtomicGroup(remaining--); &manifest_reader_status));
} // Write the corrupted edits.
edits[i].SetLastSequence(last_seqno++); AddNewEditsToLog(kAtomicGroupSize);
} mu.Lock();
Status s; EXPECT_OK(
for (const auto& edit : edits) { reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
std::string record; mu.Unlock();
edit.EncodeTo(&record); EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
s = log_writer->AddRecord(record); corrupted_edit_.DebugString());
ASSERT_OK(s); }
}
log_writer.reset();
s = SetCurrentFile(env_, dbname_, 1, nullptr); TEST_F(VersionSetAtomicGroupTest,
ASSERT_OK(s); HandleIncorrectAtomicGroupSizeWithVersionSetRecover) {
const int kAtomicGroupSize = 4;
SetupIncorrectAtomicGroup(kAtomicGroupSize);
AddNewEditsToLog(kAtomicGroupSize);
EXPECT_NOK(versions_->Recover(column_families_, false));
EXPECT_EQ(column_families_.size(),
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
EXPECT_EQ(edits_[1].DebugString(),
edit_with_incorrect_group_size_.DebugString());
}
SyncPoint::GetInstance()->DisableProcessing(); TEST_F(VersionSetAtomicGroupTest,
SyncPoint::GetInstance()->ClearAllCallBacks(); HandleIncorrectAtomicGroupSizeWithReactiveVersionSetRecover) {
const int kAtomicGroupSize = 4;
SetupIncorrectAtomicGroup(kAtomicGroupSize);
AddNewEditsToLog(kAtomicGroupSize);
std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
std::unique_ptr<log::Reader::Reporter> manifest_reporter;
std::unique_ptr<Status> manifest_reader_status;
EXPECT_NOK(reactive_versions_->Recover(column_families_, &manifest_reader,
&manifest_reporter,
&manifest_reader_status));
EXPECT_EQ(column_families_.size(),
reactive_versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
EXPECT_EQ(edits_[1].DebugString(),
edit_with_incorrect_group_size_.DebugString());
}
bool incorrect_group_size = false; TEST_F(VersionSetAtomicGroupTest,
SyncPoint::GetInstance()->SetCallBack( HandleIncorrectAtomicGroupSizeWithReactiveVersionSetReadAndApply) {
"VersionSet::Recover:IncorrectAtomicGroupSize", [&](void* arg) { const int kAtomicGroupSize = 4;
VersionEdit* e = reinterpret_cast<VersionEdit*>(arg); SetupIncorrectAtomicGroup(kAtomicGroupSize);
EXPECT_EQ(edits[1].DebugString(), e->DebugString()); InstrumentedMutex mu;
incorrect_group_size = true; std::unordered_set<ColumnFamilyData*> cfds_changed;
}); std::unique_ptr<log::FragmentBufferedReader> manifest_reader;
SyncPoint::GetInstance()->EnableProcessing(); std::unique_ptr<log::Reader::Reporter> manifest_reporter;
EXPECT_NOK(versions_->Recover(column_families, false)); std::unique_ptr<Status> manifest_reader_status;
EXPECT_EQ(column_families.size(), EXPECT_OK(reactive_versions_->Recover(column_families_, &manifest_reader,
versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); &manifest_reporter,
EXPECT_TRUE(incorrect_group_size); &manifest_reader_status));
AddNewEditsToLog(kAtomicGroupSize);
mu.Lock();
EXPECT_OK(
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
mu.Unlock();
EXPECT_EQ(edits_[1].DebugString(),
edit_with_incorrect_group_size_.DebugString());
} }
class VersionSetTestDropOneCF : public VersionSetTestBase, class VersionSetTestDropOneCF : public VersionSetTestBase,
@ -1088,7 +1265,6 @@ INSTANTIATE_TEST_CASE_P(
testing::Values(VersionSetTestBase::kColumnFamilyName1, testing::Values(VersionSetTestBase::kColumnFamilyName1,
VersionSetTestBase::kColumnFamilyName2, VersionSetTestBase::kColumnFamilyName2,
VersionSetTestBase::kColumnFamilyName3)); VersionSetTestBase::kColumnFamilyName3));
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save