Carry over min_log_number_to_keep_2pc in new MANIFEST (#7747)

Summary:
When two phase commit is enabled, `VersionSet::min_log_number_to_keep_2pc` is set during flush.
But when a new MANIFEST is created, the `min_log_number_to_keep_2pc` is not carried over to the new MANIFEST. So if a new MANIFEST is created and then DB is reopened, the `min_log_number_to_keep_2pc` will be lost.  This may cause DB recovery errors.
The bug is reproduced in a new unit test in `version_set_test.cc`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7747

Test Plan: The new unit test in `version_set_test.cc` should pass.

Reviewed By: jay-zhuang

Differential Revision: D25350661

Pulled By: cheng-chang

fbshipit-source-id: eee890d5b19f15769069670692e270ae31044ece
main
Cheng Chang 4 years ago committed by Facebook GitHub Bot
parent 8a1488efbf
commit 80159f6e0b
  1. 11
      db/version_set.cc
  2. 2
      db/version_set.h
  3. 29
      db/version_set_test.cc

@ -5285,6 +5285,17 @@ Status VersionSet::WriteCurrentStateToManifest(
assert(iter != curr_state.end()); assert(iter != curr_state.end());
uint64_t log_number = iter->second.log_number; uint64_t log_number = iter->second.log_number;
edit.SetLogNumber(log_number); edit.SetLogNumber(log_number);
if (cfd->GetID() == 0) {
// min_log_number_to_keep is for the whole db, not for specific column family.
// So it does not need to be set for every column family, just need to be set once.
// Since default CF can never be dropped, we set the min_log to the default CF here.
uint64_t min_log = min_log_number_to_keep_2pc();
if (min_log != 0) {
edit.SetMinLogNumberToKeep(min_log);
}
}
const std::string& full_history_ts_low = iter->second.full_history_ts_low; const std::string& full_history_ts_low = iter->second.full_history_ts_low;
if (!full_history_ts_low.empty()) { if (!full_history_ts_low.empty()) {
edit.SetFullHistoryTsLow(full_history_ts_low); edit.SetFullHistoryTsLow(full_history_ts_low);

@ -1332,7 +1332,7 @@ class VersionSet {
std::string db_id_; std::string db_id_;
const ImmutableDBOptions* const db_options_; const ImmutableDBOptions* const db_options_;
std::atomic<uint64_t> next_file_number_; std::atomic<uint64_t> next_file_number_;
// Any log number equal or lower than this should be ignored during recovery, // Any WAL number smaller than this should be ignored during recovery,
// and is qualified for being deleted in 2PC mode. In non-2PC mode, this // and is qualified for being deleted in 2PC mode. In non-2PC mode, this
// number is ignored. // number is ignored.
std::atomic<uint64_t> min_log_number_to_keep_2pc_ = {0}; std::atomic<uint64_t> min_log_number_to_keep_2pc_ = {0};

@ -826,6 +826,14 @@ class VersionSetTestBase {
versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
} }
void ReopenDB() {
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
EXPECT_OK(versions_->Recover(column_families_, false));
}
void VerifyManifest(std::string* manifest_path) const { void VerifyManifest(std::string* manifest_path) const {
assert(manifest_path != nullptr); assert(manifest_path != nullptr);
uint64_t manifest_file_number = 0; uint64_t manifest_file_number = 0;
@ -3001,6 +3009,27 @@ TEST_F(VersionSetTestMissingFiles, NoFileMissing) {
} }
} }
TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) {
NewDB();
SstInfo sst(100, kDefaultColumnFamilyName, "a");
std::vector<FileMetaData> file_metas;
CreateDummyTableFiles({sst}, &file_metas);
constexpr WalNumber kMinWalNumberToKeep2PC = 10;
VersionEdit edit;
edit.AddFile(0, file_metas[0]);
edit.SetMinLogNumberToKeep(kMinWalNumberToKeep2PC);
ASSERT_OK(LogAndApplyToDefaultCF(edit));
ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC);
for (int i = 0; i < 3; i++) {
CreateNewManifest();
ReopenDB();
ASSERT_EQ(versions_->min_log_number_to_keep_2pc(), kMinWalNumberToKeep2PC);
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save