diff --git a/db/version_edit.cc b/db/version_edit.cc index 447fbf378..adeca134d 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -40,6 +40,8 @@ enum Tag : uint32_t { kColumnFamilyAdd = 201, kColumnFamilyDrop = 202, kMaxColumnFamily = 203, + + kInAtomicGroup = 300, }; enum CustomTag : uint32_t { @@ -83,6 +85,8 @@ void VersionEdit::Clear() { is_column_family_add_ = 0; is_column_family_drop_ = 0; column_family_name_.clear(); + is_in_atomic_group_ = false; + remaining_entries_ = 0; } bool VersionEdit::EncodeTo(std::string* dst) const { @@ -200,6 +204,11 @@ bool VersionEdit::EncodeTo(std::string* dst) const { if (is_column_family_drop_) { PutVarint32(dst, kColumnFamilyDrop); } + + if (is_in_atomic_group_) { + PutVarint32(dst, kInAtomicGroup); + PutVarint32(dst, remaining_entries_); + } return true; } @@ -482,6 +491,15 @@ Status VersionEdit::DecodeFrom(const Slice& src) { is_column_family_drop_ = true; break; + case kInAtomicGroup: + is_in_atomic_group_ = true; + if (!GetVarint32(&input, &remaining_entries_)) { + if (!msg) { + msg = "remaining entries"; + } + } + break; + default: msg = "unknown tag"; break; @@ -560,6 +578,11 @@ std::string VersionEdit::DebugString(bool hex_key) const { r.append("\n MaxColumnFamily: "); AppendNumberTo(&r, max_column_family_); } + if (is_in_atomic_group_) { + r.append("\n AtomicGroup: "); + AppendNumberTo(&r, remaining_entries_); + r.append(" entries remains"); + } r.append("\n}\n"); return r; } @@ -632,6 +655,9 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const { if (has_min_log_number_to_keep_) { jw << "MinLogNumberToKeep" << min_log_number_to_keep_; } + if (is_in_atomic_group_) { + jw << "AtomicGroup" << remaining_entries_; + } jw.EndObject(); diff --git a/db/version_edit.h b/db/version_edit.h index 28028d1e4..808be58bd 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -27,7 +27,7 @@ const uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF; extern uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id); // A copyable structure contains information needed to read data from an SST -// file. It can contains a pointer to a table reader opened for the file, or +// file. It can contain a pointer to a table reader opened for the file, or // file number and size, which can be used to create a new table reader for it. // The behavior is undefined when a copied of the structure is used when the // file is not in any live version any more. @@ -300,6 +300,11 @@ class VersionEdit { return new_files_; } + void MarkAtomicGroup(uint32_t remaining_entries) { + is_in_atomic_group_ = true; + remaining_entries_ = remaining_entries; + } + std::string DebugString(bool hex_key = false) const; std::string DebugJSON(int edit_num, bool hex_key = false) const; @@ -329,7 +334,7 @@ class VersionEdit { DeletedFileSet deleted_files_; std::vector> new_files_; - // Each version edit record should have column_family_id set + // Each version edit record should have column_family_ set // If it's not set, it is default (0) uint32_t column_family_; // a version edit can be either column_family add or @@ -338,6 +343,9 @@ class VersionEdit { bool is_column_family_drop_; bool is_column_family_add_; std::string column_family_name_; + + bool is_in_atomic_group_; + uint32_t remaining_entries_; }; } // namespace rocksdb diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 0dd6a76ca..0668de7aa 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -191,6 +191,12 @@ TEST_F(VersionEditTest, MinLogNumberToKeep) { TestEncodeDecode(edit); } +TEST_F(VersionEditTest, AtomicGroupTest) { + VersionEdit edit; + edit.MarkAtomicGroup(1); + TestEncodeDecode(edit); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index d6c83c45d..ede91c613 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3208,6 +3208,133 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, builder->Apply(edit); } +Status VersionSet::ApplyOneVersionEdit( + VersionEdit& edit, + const std::unordered_map& name_to_options, + std::unordered_map& column_families_not_found, + std::unordered_map& builders, + bool* have_log_number, uint64_t* /* log_number */, + bool* have_prev_log_number, uint64_t* previous_log_number, + bool* have_next_file, uint64_t* next_file, bool* have_last_sequence, + SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, + uint32_t* max_column_family) { + // Not found means that user didn't supply that column + // family option AND we encountered column family add + // record. Once we encounter column family drop record, + // we will delete the column family from + // column_families_not_found. + bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) != + column_families_not_found.end()); + // in builders means that user supplied that column family + // option AND that we encountered column family add record + bool cf_in_builders = builders.find(edit.column_family_) != builders.end(); + + // they can't both be true + assert(!(cf_in_not_found && cf_in_builders)); + + ColumnFamilyData* cfd = nullptr; + + if (edit.is_column_family_add_) { + if (cf_in_builders || cf_in_not_found) { + return Status::Corruption( + "Manifest adding the same column family twice: " + + edit.column_family_name_); + } + auto cf_options = name_to_options.find(edit.column_family_name_); + if (cf_options == name_to_options.end()) { + column_families_not_found.insert( + {edit.column_family_, edit.column_family_name_}); + } else { + cfd = CreateColumnFamily(cf_options->second, &edit); + cfd->set_initialized(); + builders.insert( + {edit.column_family_, new BaseReferencedVersionBuilder(cfd)}); + } + } else if (edit.is_column_family_drop_) { + if (cf_in_builders) { + auto builder = builders.find(edit.column_family_); + assert(builder != builders.end()); + delete builder->second; + builders.erase(builder); + cfd = column_family_set_->GetColumnFamily(edit.column_family_); + assert(cfd != nullptr); + if (cfd->Unref()) { + delete cfd; + cfd = nullptr; + } else { + // who else can have reference to cfd!? + assert(false); + } + } else if (cf_in_not_found) { + column_families_not_found.erase(edit.column_family_); + } else { + return Status::Corruption( + "Manifest - dropping non-existing column family"); + } + } else if (!cf_in_not_found) { + if (!cf_in_builders) { + return Status::Corruption( + "Manifest record referencing unknown column family"); + } + + cfd = column_family_set_->GetColumnFamily(edit.column_family_); + // this should never happen since cf_in_builders is true + assert(cfd != nullptr); + + // if it is not column family add or column family drop, + // then it's a file add/delete, which should be forwarded + // to builder + auto builder = builders.find(edit.column_family_); + assert(builder != builders.end()); + builder->second->version_builder()->Apply(&edit); + } + + if (cfd != nullptr) { + if (edit.has_log_number_) { + if (cfd->GetLogNumber() > edit.log_number_) { + ROCKS_LOG_WARN( + db_options_->info_log, + "MANIFEST corruption detected, but ignored - Log numbers in " + "records NOT monotonically increasing"); + } else { + cfd->SetLogNumber(edit.log_number_); + *have_log_number = true; + } + } + if (edit.has_comparator_ && + edit.comparator_ != cfd->user_comparator()->Name()) { + return Status::InvalidArgument( + cfd->user_comparator()->Name(), + "does not match existing comparator " + edit.comparator_); + } + } + + if (edit.has_prev_log_number_) { + *previous_log_number = edit.prev_log_number_; + *have_prev_log_number = true; + } + + if (edit.has_next_file_number_) { + *next_file = edit.next_file_number_; + *have_next_file = true; + } + + if (edit.has_max_column_family_) { + *max_column_family = edit.max_column_family_; + } + + if (edit.has_min_log_number_to_keep_) { + *min_log_number_to_keep = + std::max(*min_log_number_to_keep, edit.min_log_number_to_keep_); + } + + if (edit.has_last_sequence_) { + *last_sequence = edit.last_sequence_; + *have_last_sequence = true; + } + return Status::OK(); +} + Status VersionSet::Recover( const std::vector& column_families, bool read_only) { @@ -3296,6 +3423,8 @@ Status VersionSet::Recover( true /*checksum*/, 0 /*initial_offset*/, 0); Slice record; std::string scratch; + std::vector replay_buffer; + size_t num_entries_decoded = 0; while (reader.ReadRecord(&record, &scratch) && s.ok()) { VersionEdit edit; s = edit.DecodeFrom(record); @@ -3303,123 +3432,44 @@ Status VersionSet::Recover( break; } - // Not found means that user didn't supply that column - // family option AND we encountered column family add - // record. Once we encounter column family drop record, - // we will delete the column family from - // column_families_not_found. - bool cf_in_not_found = - column_families_not_found.find(edit.column_family_) != - column_families_not_found.end(); - // in builders means that user supplied that column family - // option AND that we encountered column family add record - bool cf_in_builders = - builders.find(edit.column_family_) != builders.end(); - - // they can't both be true - assert(!(cf_in_not_found && cf_in_builders)); - - ColumnFamilyData* cfd = nullptr; - - if (edit.is_column_family_add_) { - if (cf_in_builders || cf_in_not_found) { - s = Status::Corruption( - "Manifest adding the same column family twice"); - break; - } - auto cf_options = cf_name_to_options.find(edit.column_family_name_); - if (cf_options == cf_name_to_options.end()) { - column_families_not_found.insert( - {edit.column_family_, edit.column_family_name_}); - } else { - cfd = CreateColumnFamily(cf_options->second, &edit); - cfd->set_initialized(); - builders.insert( - {edit.column_family_, new BaseReferencedVersionBuilder(cfd)}); - } - } else if (edit.is_column_family_drop_) { - if (cf_in_builders) { - auto builder = builders.find(edit.column_family_); - assert(builder != builders.end()); - delete builder->second; - builders.erase(builder); - cfd = column_family_set_->GetColumnFamily(edit.column_family_); - if (cfd->Unref()) { - delete cfd; - cfd = nullptr; - } else { - // who else can have reference to cfd!? - assert(false); - } - } else if (cf_in_not_found) { - column_families_not_found.erase(edit.column_family_); - } else { - s = Status::Corruption( - "Manifest - dropping non-existing column family"); - break; + if (edit.is_in_atomic_group_) { + if (replay_buffer.empty()) { + replay_buffer.resize(edit.remaining_entries_ + 1); } - } else if (!cf_in_not_found) { - if (!cf_in_builders) { - s = Status::Corruption( - "Manifest record referencing unknown column family"); - break; + ++num_entries_decoded; + if (num_entries_decoded + edit.remaining_entries_ != + static_cast(replay_buffer.size())) { + return Status::Corruption("corrupted atomic group"); } - - cfd = column_family_set_->GetColumnFamily(edit.column_family_); - // this should never happen since cf_in_builders is true - assert(cfd != nullptr); - - // if it is not column family add or column family drop, - // then it's a file add/delete, which should be forwarded - // to builder - auto builder = builders.find(edit.column_family_); - assert(builder != builders.end()); - builder->second->version_builder()->Apply(&edit); - } - - if (cfd != nullptr) { - if (edit.has_log_number_) { - if (cfd->GetLogNumber() > edit.log_number_) { - ROCKS_LOG_WARN( - db_options_->info_log, - "MANIFEST corruption detected, but ignored - Log numbers in " - "records NOT monotonically increasing"); - } else { - cfd->SetLogNumber(edit.log_number_); - have_log_number = true; + replay_buffer[num_entries_decoded - 1] = std::move(edit); + if (num_entries_decoded == replay_buffer.size()) { + for (auto& e : replay_buffer) { + s = ApplyOneVersionEdit( + e, cf_name_to_options, column_families_not_found, builders, + &have_log_number, &log_number, &have_prev_log_number, + &previous_log_number, &have_next_file, &next_file, + &have_last_sequence, &last_sequence, &min_log_number_to_keep, + &max_column_family); + if (!s.ok()) { + break; + } } + replay_buffer.clear(); + num_entries_decoded = 0; } - if (edit.has_comparator_ && - edit.comparator_ != cfd->user_comparator()->Name()) { - s = Status::InvalidArgument( - cfd->user_comparator()->Name(), - "does not match existing comparator " + edit.comparator_); - break; + } else { + if (!replay_buffer.empty()) { + return Status::Corruption("corrupted atomic group"); } + s = ApplyOneVersionEdit( + edit, cf_name_to_options, column_families_not_found, builders, + &have_log_number, &log_number, &have_prev_log_number, + &previous_log_number, &have_next_file, &next_file, + &have_last_sequence, &last_sequence, &min_log_number_to_keep, + &max_column_family); } - - if (edit.has_prev_log_number_) { - previous_log_number = edit.prev_log_number_; - have_prev_log_number = true; - } - - if (edit.has_next_file_number_) { - next_file = edit.next_file_number_; - have_next_file = true; - } - - if (edit.has_max_column_family_) { - max_column_family = edit.max_column_family_; - } - - if (edit.has_min_log_number_to_keep_) { - min_log_number_to_keep = - std::max(min_log_number_to_keep, edit.min_log_number_to_keep_); - } - - if (edit.has_last_sequence_) { - last_sequence = edit.last_sequence_; - have_last_sequence = true; + if (!s.ok()) { + break; } } } diff --git a/db/version_set.h b/db/version_set.h index f57f84fb7..4e3b2bec2 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -728,6 +728,10 @@ struct ObsoleteFileInfo { } }; +namespace { +class BaseReferencedVersionBuilder; +} + class VersionSet { public: VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options, @@ -989,6 +993,16 @@ class VersionSet { ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options, VersionEdit* edit); + Status ApplyOneVersionEdit( + VersionEdit& edit, + const std::unordered_map& name_to_opts, + std::unordered_map& column_families_not_found, + std::unordered_map& builders, + bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number, + uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file, + bool* have_last_sequence, SequenceNumber* last_sequence, + uint64_t* min_log_number_to_keep, uint32_t* max_column_family); + Status ProcessManifestWrites(std::deque& writers, InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log,