Update recovery code for version edits group commit. (#3945)

Summary:
During recovery, RocksDB is able to handle version edits that belong to group commits.
This PR is a subset of [PR 3752](https://github.com/facebook/rocksdb/pull/3752)
Pull Request resolved: https://github.com/facebook/rocksdb/pull/3945

Differential Revision: D8529122

Pulled By: riversand963

fbshipit-source-id: 57cb0f9cc55ecca684a837742d6626dc9c07f37e
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent 90f744941d
commit d116a1725d
  1. 26
      db/version_edit.cc
  2. 12
      db/version_edit.h
  3. 6
      db/version_edit_test.cc
  4. 268
      db/version_set.cc
  5. 14
      db/version_set.h

@ -40,6 +40,8 @@ enum Tag : uint32_t {
kColumnFamilyAdd = 201, kColumnFamilyAdd = 201,
kColumnFamilyDrop = 202, kColumnFamilyDrop = 202,
kMaxColumnFamily = 203, kMaxColumnFamily = 203,
kInAtomicGroup = 300,
}; };
enum CustomTag : uint32_t { enum CustomTag : uint32_t {
@ -83,6 +85,8 @@ void VersionEdit::Clear() {
is_column_family_add_ = 0; is_column_family_add_ = 0;
is_column_family_drop_ = 0; is_column_family_drop_ = 0;
column_family_name_.clear(); column_family_name_.clear();
is_in_atomic_group_ = false;
remaining_entries_ = 0;
} }
bool VersionEdit::EncodeTo(std::string* dst) const { bool VersionEdit::EncodeTo(std::string* dst) const {
@ -200,6 +204,11 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
if (is_column_family_drop_) { if (is_column_family_drop_) {
PutVarint32(dst, kColumnFamilyDrop); PutVarint32(dst, kColumnFamilyDrop);
} }
if (is_in_atomic_group_) {
PutVarint32(dst, kInAtomicGroup);
PutVarint32(dst, remaining_entries_);
}
return true; return true;
} }
@ -482,6 +491,15 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
is_column_family_drop_ = true; is_column_family_drop_ = true;
break; break;
case kInAtomicGroup:
is_in_atomic_group_ = true;
if (!GetVarint32(&input, &remaining_entries_)) {
if (!msg) {
msg = "remaining entries";
}
}
break;
default: default:
msg = "unknown tag"; msg = "unknown tag";
break; break;
@ -560,6 +578,11 @@ std::string VersionEdit::DebugString(bool hex_key) const {
r.append("\n MaxColumnFamily: "); r.append("\n MaxColumnFamily: ");
AppendNumberTo(&r, max_column_family_); AppendNumberTo(&r, max_column_family_);
} }
if (is_in_atomic_group_) {
r.append("\n AtomicGroup: ");
AppendNumberTo(&r, remaining_entries_);
r.append(" entries remains");
}
r.append("\n}\n"); r.append("\n}\n");
return r; return r;
} }
@ -632,6 +655,9 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
if (has_min_log_number_to_keep_) { if (has_min_log_number_to_keep_) {
jw << "MinLogNumberToKeep" << min_log_number_to_keep_; jw << "MinLogNumberToKeep" << min_log_number_to_keep_;
} }
if (is_in_atomic_group_) {
jw << "AtomicGroup" << remaining_entries_;
}
jw.EndObject(); jw.EndObject();

@ -27,7 +27,7 @@ const uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF;
extern uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id); extern uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id);
// A copyable structure contains information needed to read data from an SST // A copyable structure contains information needed to read data from an SST
// file. It can contains a pointer to a table reader opened for the file, or // file. It can contain a pointer to a table reader opened for the file, or
// file number and size, which can be used to create a new table reader for it. // file number and size, which can be used to create a new table reader for it.
// The behavior is undefined when a copied of the structure is used when the // The behavior is undefined when a copied of the structure is used when the
// file is not in any live version any more. // file is not in any live version any more.
@ -300,6 +300,11 @@ class VersionEdit {
return new_files_; return new_files_;
} }
void MarkAtomicGroup(uint32_t remaining_entries) {
is_in_atomic_group_ = true;
remaining_entries_ = remaining_entries;
}
std::string DebugString(bool hex_key = false) const; std::string DebugString(bool hex_key = false) const;
std::string DebugJSON(int edit_num, bool hex_key = false) const; std::string DebugJSON(int edit_num, bool hex_key = false) const;
@ -329,7 +334,7 @@ class VersionEdit {
DeletedFileSet deleted_files_; DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_; std::vector<std::pair<int, FileMetaData>> new_files_;
// Each version edit record should have column_family_id set // Each version edit record should have column_family_ set
// If it's not set, it is default (0) // If it's not set, it is default (0)
uint32_t column_family_; uint32_t column_family_;
// a version edit can be either column_family add or // a version edit can be either column_family add or
@ -338,6 +343,9 @@ class VersionEdit {
bool is_column_family_drop_; bool is_column_family_drop_;
bool is_column_family_add_; bool is_column_family_add_;
std::string column_family_name_; std::string column_family_name_;
bool is_in_atomic_group_;
uint32_t remaining_entries_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -191,6 +191,12 @@ TEST_F(VersionEditTest, MinLogNumberToKeep) {
TestEncodeDecode(edit); TestEncodeDecode(edit);
} }
TEST_F(VersionEditTest, AtomicGroupTest) {
VersionEdit edit;
edit.MarkAtomicGroup(1);
TestEncodeDecode(edit);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -3208,6 +3208,133 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
builder->Apply(edit); builder->Apply(edit);
} }
Status VersionSet::ApplyOneVersionEdit(
VersionEdit& edit,
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*>& builders,
bool* have_log_number, uint64_t* /* log_number */,
bool* have_prev_log_number, uint64_t* previous_log_number,
bool* have_next_file, uint64_t* next_file, bool* have_last_sequence,
SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep,
uint32_t* max_column_family) {
// Not found means that user didn't supply that column
// family option AND we encountered column family add
// record. Once we encounter column family drop record,
// we will delete the column family from
// column_families_not_found.
bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) !=
column_families_not_found.end());
// in builders means that user supplied that column family
// option AND that we encountered column family add record
bool cf_in_builders = builders.find(edit.column_family_) != builders.end();
// they can't both be true
assert(!(cf_in_not_found && cf_in_builders));
ColumnFamilyData* cfd = nullptr;
if (edit.is_column_family_add_) {
if (cf_in_builders || cf_in_not_found) {
return Status::Corruption(
"Manifest adding the same column family twice: " +
edit.column_family_name_);
}
auto cf_options = name_to_options.find(edit.column_family_name_);
if (cf_options == name_to_options.end()) {
column_families_not_found.insert(
{edit.column_family_, edit.column_family_name_});
} else {
cfd = CreateColumnFamily(cf_options->second, &edit);
cfd->set_initialized();
builders.insert(
{edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
}
} else if (edit.is_column_family_drop_) {
if (cf_in_builders) {
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
delete builder->second;
builders.erase(builder);
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
assert(cfd != nullptr);
if (cfd->Unref()) {
delete cfd;
cfd = nullptr;
} else {
// who else can have reference to cfd!?
assert(false);
}
} else if (cf_in_not_found) {
column_families_not_found.erase(edit.column_family_);
} else {
return Status::Corruption(
"Manifest - dropping non-existing column family");
}
} else if (!cf_in_not_found) {
if (!cf_in_builders) {
return Status::Corruption(
"Manifest record referencing unknown column family");
}
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// this should never happen since cf_in_builders is true
assert(cfd != nullptr);
// if it is not column family add or column family drop,
// then it's a file add/delete, which should be forwarded
// to builder
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
builder->second->version_builder()->Apply(&edit);
}
if (cfd != nullptr) {
if (edit.has_log_number_) {
if (cfd->GetLogNumber() > edit.log_number_) {
ROCKS_LOG_WARN(
db_options_->info_log,
"MANIFEST corruption detected, but ignored - Log numbers in "
"records NOT monotonically increasing");
} else {
cfd->SetLogNumber(edit.log_number_);
*have_log_number = true;
}
}
if (edit.has_comparator_ &&
edit.comparator_ != cfd->user_comparator()->Name()) {
return Status::InvalidArgument(
cfd->user_comparator()->Name(),
"does not match existing comparator " + edit.comparator_);
}
}
if (edit.has_prev_log_number_) {
*previous_log_number = edit.prev_log_number_;
*have_prev_log_number = true;
}
if (edit.has_next_file_number_) {
*next_file = edit.next_file_number_;
*have_next_file = true;
}
if (edit.has_max_column_family_) {
*max_column_family = edit.max_column_family_;
}
if (edit.has_min_log_number_to_keep_) {
*min_log_number_to_keep =
std::max(*min_log_number_to_keep, edit.min_log_number_to_keep_);
}
if (edit.has_last_sequence_) {
*last_sequence = edit.last_sequence_;
*have_last_sequence = true;
}
return Status::OK();
}
Status VersionSet::Recover( Status VersionSet::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only) { bool read_only) {
@ -3296,6 +3423,8 @@ Status VersionSet::Recover(
true /*checksum*/, 0 /*initial_offset*/, 0); true /*checksum*/, 0 /*initial_offset*/, 0);
Slice record; Slice record;
std::string scratch; std::string scratch;
std::vector<VersionEdit> replay_buffer;
size_t num_entries_decoded = 0;
while (reader.ReadRecord(&record, &scratch) && s.ok()) { while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit; VersionEdit edit;
s = edit.DecodeFrom(record); s = edit.DecodeFrom(record);
@ -3303,125 +3432,46 @@ Status VersionSet::Recover(
break; break;
} }
// Not found means that user didn't supply that column if (edit.is_in_atomic_group_) {
// family option AND we encountered column family add if (replay_buffer.empty()) {
// record. Once we encounter column family drop record, replay_buffer.resize(edit.remaining_entries_ + 1);
// we will delete the column family from }
// column_families_not_found. ++num_entries_decoded;
bool cf_in_not_found = if (num_entries_decoded + edit.remaining_entries_ !=
column_families_not_found.find(edit.column_family_) != static_cast<uint32_t>(replay_buffer.size())) {
column_families_not_found.end(); return Status::Corruption("corrupted atomic group");
// in builders means that user supplied that column family }
// option AND that we encountered column family add record replay_buffer[num_entries_decoded - 1] = std::move(edit);
bool cf_in_builders = if (num_entries_decoded == replay_buffer.size()) {
builders.find(edit.column_family_) != builders.end(); for (auto& e : replay_buffer) {
s = ApplyOneVersionEdit(
// they can't both be true e, cf_name_to_options, column_families_not_found, builders,
assert(!(cf_in_not_found && cf_in_builders)); &have_log_number, &log_number, &have_prev_log_number,
&previous_log_number, &have_next_file, &next_file,
ColumnFamilyData* cfd = nullptr; &have_last_sequence, &last_sequence, &min_log_number_to_keep,
&max_column_family);
if (edit.is_column_family_add_) { if (!s.ok()) {
if (cf_in_builders || cf_in_not_found) {
s = Status::Corruption(
"Manifest adding the same column family twice");
break; break;
} }
auto cf_options = cf_name_to_options.find(edit.column_family_name_);
if (cf_options == cf_name_to_options.end()) {
column_families_not_found.insert(
{edit.column_family_, edit.column_family_name_});
} else {
cfd = CreateColumnFamily(cf_options->second, &edit);
cfd->set_initialized();
builders.insert(
{edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
} }
} else if (edit.is_column_family_drop_) { replay_buffer.clear();
if (cf_in_builders) { num_entries_decoded = 0;
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
delete builder->second;
builders.erase(builder);
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
if (cfd->Unref()) {
delete cfd;
cfd = nullptr;
} else {
// who else can have reference to cfd!?
assert(false);
} }
} else if (cf_in_not_found) {
column_families_not_found.erase(edit.column_family_);
} else { } else {
s = Status::Corruption( if (!replay_buffer.empty()) {
"Manifest - dropping non-existing column family"); return Status::Corruption("corrupted atomic group");
break;
} }
} else if (!cf_in_not_found) { s = ApplyOneVersionEdit(
if (!cf_in_builders) { edit, cf_name_to_options, column_families_not_found, builders,
s = Status::Corruption( &have_log_number, &log_number, &have_prev_log_number,
"Manifest record referencing unknown column family"); &previous_log_number, &have_next_file, &next_file,
break; &have_last_sequence, &last_sequence, &min_log_number_to_keep,
&max_column_family);
} }
if (!s.ok()) {
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// this should never happen since cf_in_builders is true
assert(cfd != nullptr);
// if it is not column family add or column family drop,
// then it's a file add/delete, which should be forwarded
// to builder
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
builder->second->version_builder()->Apply(&edit);
}
if (cfd != nullptr) {
if (edit.has_log_number_) {
if (cfd->GetLogNumber() > edit.log_number_) {
ROCKS_LOG_WARN(
db_options_->info_log,
"MANIFEST corruption detected, but ignored - Log numbers in "
"records NOT monotonically increasing");
} else {
cfd->SetLogNumber(edit.log_number_);
have_log_number = true;
}
}
if (edit.has_comparator_ &&
edit.comparator_ != cfd->user_comparator()->Name()) {
s = Status::InvalidArgument(
cfd->user_comparator()->Name(),
"does not match existing comparator " + edit.comparator_);
break; break;
} }
} }
if (edit.has_prev_log_number_) {
previous_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}
if (edit.has_max_column_family_) {
max_column_family = edit.max_column_family_;
}
if (edit.has_min_log_number_to_keep_) {
min_log_number_to_keep =
std::max(min_log_number_to_keep, edit.min_log_number_to_keep_);
}
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
} }
if (s.ok()) { if (s.ok()) {

@ -728,6 +728,10 @@ struct ObsoleteFileInfo {
} }
}; };
namespace {
class BaseReferencedVersionBuilder;
}
class VersionSet { class VersionSet {
public: public:
VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options, VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options,
@ -989,6 +993,16 @@ class VersionSet {
ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options, ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options,
VersionEdit* edit); VersionEdit* edit);
Status ApplyOneVersionEdit(
VersionEdit& edit,
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_opts,
std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*>& builders,
bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
bool* have_last_sequence, SequenceNumber* last_sequence,
uint64_t* min_log_number_to_keep, uint32_t* max_column_family);
Status ProcessManifestWrites(std::deque<ManifestWriter>& writers, Status ProcessManifestWrites(std::deque<ManifestWriter>& writers,
InstrumentedMutex* mu, Directory* db_directory, InstrumentedMutex* mu, Directory* db_directory,
bool new_descriptor_log, bool new_descriptor_log,

Loading…
Cancel
Save