Track WAL in MANIFEST: Update logic for computing min_log_number_to_keep in atomic flush (#7660)

Summary:
The logic for computing min_log_number_to_keep in atomic flush was incorrect.

For example, when all column families are flushed, the min_log_number_to_keep should be the latest new log. But the incorrect logic calls `PrecomputeMinLogNumberToKeepNon2PC` for each column family, and returns the minimum of them. However, `PrecomputeMinLogNumberToKeepNon2PC(cf)` assumes column families other than `cf` are flushed, but in case all column families are flushed, this assumption is incorrect.

Without this fix, the WAL referenced by the computed min_log_number_to_keep may actually contain no unflushed data, so the WAL might have actually been deleted from disk on recovery, then an incorrect error `Corruption: missing WAL` will be reported.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7660

Test Plan:
run `make crash_test_with_atomic_flush`  on devserver
added a unit test in `db_flush_test`

Reviewed By: riversand963

Differential Revision: D24906265

Pulled By: cheng-chang

fbshipit-source-id: 08deda62e71f67f59e3b7925cdd86dd09bd4f430
main
Cheng Chang 4 years ago committed by Facebook GitHub Bot
parent 303d283420
commit 8c93b16f02
  1. 57
      db/db_flush_test.cc
  2. 10
      db/db_impl/db_impl.h
  3. 41
      db/db_impl/db_impl_files.cc
  4. 7
      db/memtable_list.cc
  5. 18
      db/version_set.h

@ -646,6 +646,63 @@ TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
}
}
TEST_P(DBAtomicFlushTest, PrecomputeMinLogNumberToKeepNon2PC) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = GetParam();
options.write_buffer_size = (static_cast<size_t>(64) << 20);
CreateAndReopenWithCF({"pikachu"}, options);
const size_t num_cfs = handles_.size();
ASSERT_EQ(num_cfs, 2);
WriteOptions wopts;
for (size_t i = 0; i != num_cfs; ++i) {
ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
}
{
// Flush the default CF only.
std::vector<int> cf_ids{0};
ASSERT_OK(Flush(cf_ids));
autovector<ColumnFamilyData*> flushed_cfds;
autovector<autovector<VersionEdit*>> flush_edits;
auto flushed_cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[0]);
flushed_cfds.push_back(flushed_cfh->cfd());
flush_edits.push_back({});
auto unflushed_cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[1]);
ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->TEST_GetVersionSet(),
flushed_cfds, flush_edits),
unflushed_cfh->cfd()->GetLogNumber());
}
{
// Flush all CFs.
std::vector<int> cf_ids;
for (size_t i = 0; i != num_cfs; ++i) {
cf_ids.emplace_back(static_cast<int>(i));
}
ASSERT_OK(Flush(cf_ids));
uint64_t log_num_after_flush = dbfull()->TEST_GetCurrentLogNumber();
uint64_t min_log_number_to_keep = port::kMaxUint64;
autovector<ColumnFamilyData*> flushed_cfds;
autovector<autovector<VersionEdit*>> flush_edits;
for (size_t i = 0; i != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
flushed_cfds.push_back(cfh->cfd());
flush_edits.push_back({});
min_log_number_to_keep =
std::min(min_log_number_to_keep, cfh->cfd()->GetLogNumber());
}
ASSERT_EQ(min_log_number_to_keep, log_num_after_flush);
ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->TEST_GetVersionSet(),
flushed_cfds, flush_edits),
min_log_number_to_keep);
}
}
TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) {
Options options = CurrentOptions();
options.create_if_missing = true;

@ -1017,6 +1017,12 @@ class DBImpl : public DB {
VersionSet* TEST_GetVersionSet() const { return versions_.get(); }
uint64_t TEST_GetCurrentLogNumber() const {
InstrumentedMutexLock l(mutex());
assert(!logs_.empty());
return logs_.back().number;
}
const std::unordered_set<uint64_t>& TEST_GetFilesGrabbedForPurge() const {
return files_grabbed_for_purge_;
}
@ -2225,6 +2231,10 @@ extern uint64_t PrecomputeMinLogNumberToKeep2PC(
extern uint64_t PrecomputeMinLogNumberToKeepNon2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
const autovector<VersionEdit*>& edit_list);
// For atomic flush.
extern uint64_t PrecomputeMinLogNumberToKeepNon2PC(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
const autovector<autovector<VersionEdit*>>& edit_lists);
// `cfd_to_flush` is the column family whose memtable will be flushed and thus
// will not depend on any WAL file. nullptr means no memtable is being flushed.

@ -6,16 +6,17 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_impl/db_impl.h"
#include <cinttypes>
#include <set>
#include <unordered_set>
#include "db/db_impl/db_impl.h"
#include "db/event_helpers.h"
#include "db/memtable_list.h"
#include "file/file_util.h"
#include "file/filename.h"
#include "file/sst_file_manager_impl.h"
#include "port/port.h"
#include "util/autovector.h"
namespace ROCKSDB_NAMESPACE {
@ -710,6 +711,42 @@ uint64_t PrecomputeMinLogNumberToKeepNon2PC(
return min_log_number_to_keep;
}
uint64_t PrecomputeMinLogNumberToKeepNon2PC(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
const autovector<autovector<VersionEdit*>>& edit_lists) {
assert(vset != nullptr);
assert(!cfds_to_flush.empty());
assert(cfds_to_flush.size() == edit_lists.size());
uint64_t min_log_number_to_keep = port::kMaxUint64;
for (const auto& edit_list : edit_lists) {
uint64_t log = 0;
for (const auto& e : edit_list) {
if (e->HasLogNumber()) {
log = std::max(log, e->GetLogNumber());
}
}
if (log != 0) {
min_log_number_to_keep = std::min(min_log_number_to_keep, log);
}
}
if (min_log_number_to_keep == port::kMaxUint64) {
min_log_number_to_keep = cfds_to_flush[0]->GetLogNumber();
for (size_t i = 1; i < cfds_to_flush.size(); i++) {
min_log_number_to_keep =
std::min(min_log_number_to_keep, cfds_to_flush[i]->GetLogNumber());
}
}
std::unordered_set<const ColumnFamilyData*> flushed_cfds(
cfds_to_flush.begin(), cfds_to_flush.end());
min_log_number_to_keep =
std::min(min_log_number_to_keep,
vset->PreComputeMinLogNumberWithUnflushedData(flushed_cfds));
return min_log_number_to_keep;
}
uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
const autovector<VersionEdit*>& edit_list,

@ -757,12 +757,7 @@ Status InstallMemtableAtomicFlushResults(
std::unique_ptr<VersionEdit> wal_deletion;
if (vset->db_options()->track_and_verify_wals_in_manifest) {
uint64_t min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfds[0], edit_lists[0]);
for (size_t i = 1; i < cfds.size(); i++) {
min_wal_number_to_keep = std::min(
min_wal_number_to_keep,
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfds[i], edit_lists[i]));
}
PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists);
const auto& wals = vset->GetWalSet().GetWals();
if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) {
wal_deletion.reset(new VersionEdit);

@ -26,6 +26,7 @@
#include <memory>
#include <set>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
@ -1136,6 +1137,23 @@ class VersionSet {
}
return min_log_num;
}
// Returns the minimum log number which still has data not flushed to any SST
// file, except data from `cfds_to_skip`.
uint64_t PreComputeMinLogNumberWithUnflushedData(
const std::unordered_set<const ColumnFamilyData*>& cfds_to_skip) const {
uint64_t min_log_num = port::kMaxUint64;
for (auto cfd : *column_family_set_) {
if (cfds_to_skip.count(cfd)) {
continue;
}
// It's safe to ignore dropped column families here:
// cfd->IsDropped() becomes true after the drop is persisted in MANIFEST.
if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) {
min_log_num = cfd->GetLogNumber();
}
}
return min_log_num;
}
// Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed.

Loading…
Cancel
Save