Write min_log_number_to_keep to MANIFEST during atomic flush under 2 phase commit (#7570)

Summary:
When 2 phase commit is enabled, if there are prepared data in a WAL, the WAL should be kept, the minimum log number for such a WAL is written to MANIFEST during flush. In atomic flush, such information is not written to MANIFEST.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7570

Test Plan: Added a new unit test `DBAtomicFlushTest.ManualFlushUnder2PC`, this test fails in atomic flush without this PR, after this PR, it succeeds.

Reviewed By: riversand963

Differential Revision: D24394222

Pulled By: cheng-chang

fbshipit-source-id: 60ce74b21b704804943be40c8de01b41269cf116
main
Cheng Chang 4 years ago committed by Facebook GitHub Bot
parent ac2f90d6f9
commit 70f2e0916a
  1. 1
      HISTORY.md
  2. 4
      db/column_family.cc
  3. 100
      db/db_flush_test.cc
  4. 10
      db/db_impl/db_impl.h
  5. 2
      db/db_impl/db_impl_compaction_flush.cc
  6. 67
      db/db_impl/db_impl_files.cc
  7. 5
      db/flush_job_test.cc
  8. 37
      db/memtable_list.cc
  9. 13
      db/memtable_list.h
  10. 5
      db/memtable_list_test.cc

@ -9,6 +9,7 @@
### Bug Fixes
* Fixed the logic of populating native data structure for `read_amp_bytes_per_bit` during OPTIONS file parsing on big-endian architecture. Without this fix, original code introduced in PR7659, when running on big-endian machine, can mistakenly store read_amp_bytes_per_bit (an uint32) in little endian format. Future access to `read_amp_bytes_per_bit` will give wrong values. Little endian architecture is not affected.
* Fixed prefix extractor with timestamp issues.
* Fixed a bug in atomic flush: in two-phase commit mode, the minimum WAL log number to keep is incorrect.
### New Features
* User defined timestamp feature supports `CompactRange` and `GetApproximateSizes`.

@ -708,9 +708,7 @@ uint64_t ColumnFamilyData::OldestLogToKeep() {
auto current_log = GetLogNumber();
if (allow_2pc_) {
autovector<MemTable*> empty_list;
auto imm_prep_log =
imm()->PrecomputeMinLogContainingPrepSection(empty_list);
auto imm_prep_log = imm()->PrecomputeMinLogContainingPrepSection();
auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
if (imm_prep_log > 0 && imm_prep_log < current_log) {

@ -14,6 +14,7 @@
#include "file/filename.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/utilities/transaction_db.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/mutexlock.h"
@ -620,6 +621,96 @@ TEST_P(DBFlushTestBlobError, FlushError) {
#endif // ROCKSDB_LITE
}
#ifndef ROCKSDB_LITE
TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.allow_2pc = true;
options.atomic_flush = GetParam();
// 64MB so that memtable flush won't be trigger by the small writes.
options.write_buffer_size = (static_cast<size_t>(64) << 20);
// Destroy the DB to recreate as a TransactionDB.
Close();
Destroy(options, true);
// Create a TransactionDB.
TransactionDB* txn_db = nullptr;
TransactionDBOptions txn_db_opts;
txn_db_opts.write_policy = TxnDBWritePolicy::WRITE_COMMITTED;
ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, &txn_db));
ASSERT_NE(txn_db, nullptr);
db_ = txn_db;
// Create two more columns other than default CF.
std::vector<std::string> cfs = {"puppy", "kitty"};
CreateColumnFamilies(cfs, options);
ASSERT_EQ(handles_.size(), 2);
ASSERT_EQ(handles_[0]->GetName(), cfs[0]);
ASSERT_EQ(handles_[1]->GetName(), cfs[1]);
const size_t kNumCfToFlush = options.atomic_flush ? 2 : 1;
WriteOptions wopts;
TransactionOptions txn_opts;
// txn1 only prepare, but does not commit.
// The WAL containing the prepared but uncommitted data must be kept.
Transaction* txn1 = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
// txn2 not only prepare, but also commit.
Transaction* txn2 = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
ASSERT_NE(txn1, nullptr);
ASSERT_NE(txn2, nullptr);
for (size_t i = 0; i < kNumCfToFlush; i++) {
ASSERT_OK(txn1->Put(handles_[i], "k1", "v1"));
ASSERT_OK(txn2->Put(handles_[i], "k2", "v2"));
}
// A txn must be named before prepare.
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn2->SetName("txn2"));
// Prepare writes to WAL, but not to memtable. (WriteCommitted)
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn2->Prepare());
// Commit writes to memtable.
ASSERT_OK(txn2->Commit());
delete txn1;
delete txn2;
// There are still data in memtable not flushed.
// But since data is small enough to reside in the active memtable,
// there are no immutable memtable.
for (size_t i = 0; i < kNumCfToFlush; i++) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
}
// Atomic flush memtables,
// the min log with prepared data should be written to MANIFEST.
std::vector<ColumnFamilyHandle*> cfs_to_flush(kNumCfToFlush);
for (size_t i = 0; i < kNumCfToFlush; i++) {
cfs_to_flush[i] = handles_[i];
}
ASSERT_OK(txn_db->Flush(FlushOptions(), cfs_to_flush));
// There are no remaining data in memtable after flush.
for (size_t i = 0; i < kNumCfToFlush; i++) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
ASSERT_EQ(cfh->cfd()->GetFlushReason(), FlushReason::kManualFlush);
}
// The recovered min log number with prepared data should be non-zero.
// In 2pc mode, MinLogNumberToKeep returns the
// VersionSet::min_log_number_to_keep_2pc recovered from MANIFEST, if it's 0,
// it means atomic flush didn't write the min_log_number_to_keep to MANIFEST.
cfs.push_back(kDefaultColumnFamilyName);
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db_);
ASSERT_TRUE(db_impl->allow_2pc());
ASSERT_NE(db_impl->MinLogNumberToKeep(), 0);
}
#endif // ROCKSDB_LITE
TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
Options options = CurrentOptions();
options.create_if_missing = true;
@ -634,13 +725,22 @@ TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
for (size_t i = 0; i != num_cfs; ++i) {
ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
}
for (size_t i = 0; i != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
}
std::vector<int> cf_ids;
for (size_t i = 0; i != num_cfs; ++i) {
cf_ids.emplace_back(static_cast<int>(i));
}
ASSERT_OK(Flush(cf_ids));
for (size_t i = 0; i != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(cfh->cfd()->GetFlushReason(), FlushReason::kManualFlush);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
}

@ -2234,6 +2234,12 @@ extern uint64_t PrecomputeMinLogNumberToKeep2PC(
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker);
// For atomic flush.
extern uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
const autovector<autovector<VersionEdit*>>& edit_lists,
const autovector<const autovector<MemTable*>*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker);
// In non-2PC mode, WALs with log number < the returned number can be
// deleted after the cfd_to_flush column family is flushed successfully.
@ -2251,6 +2257,10 @@ extern uint64_t PrecomputeMinLogNumberToKeepNon2PC(
extern uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const ColumnFamilyData* cfd_to_flush,
const autovector<MemTable*>& memtables_to_flush);
// For atomic flush.
extern uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
const autovector<const autovector<MemTable*>*>& memtables_to_flush);
// Fix user-supplied options to be reasonable
template <class T, class V>

@ -584,7 +584,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
versions_.get(), &mutex_, tmp_file_meta,
versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta,
&job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
}

@ -659,13 +659,15 @@ uint64_t FindMinPrepLogReferencedByMemTable(
// we must look through the memtables for two phase transactions
// that have been committed but not yet flushed
std::unordered_set<MemTable*> memtables_to_flush_set(
memtables_to_flush.begin(), memtables_to_flush.end());
for (auto loop_cfd : *vset->GetColumnFamilySet()) {
if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) {
continue;
}
auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection(
memtables_to_flush);
&memtables_to_flush_set);
if (log > 0 && (min_log == 0 || log < min_log)) {
min_log = log;
@ -681,6 +683,37 @@ uint64_t FindMinPrepLogReferencedByMemTable(
return min_log;
}
uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
const autovector<const autovector<MemTable*>*>& memtables_to_flush) {
uint64_t min_log = 0;
std::unordered_set<ColumnFamilyData*> cfds_to_flush_set(cfds_to_flush.begin(),
cfds_to_flush.end());
std::unordered_set<MemTable*> memtables_to_flush_set;
for (const autovector<MemTable*>* memtables : memtables_to_flush) {
memtables_to_flush_set.insert(memtables->begin(), memtables->end());
}
for (auto loop_cfd : *vset->GetColumnFamilySet()) {
if (loop_cfd->IsDropped() || cfds_to_flush_set.count(loop_cfd)) {
continue;
}
auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection(
&memtables_to_flush_set);
if (log > 0 && (min_log == 0 || log < min_log)) {
min_log = log;
}
log = loop_cfd->mem()->GetMinLogContainingPrepSection();
if (log > 0 && (min_log == 0 || log < min_log)) {
min_log = log;
}
}
return min_log;
}
uint64_t PrecomputeMinLogNumberToKeepNon2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
const autovector<VersionEdit*>& edit_list) {
@ -788,6 +821,38 @@ uint64_t PrecomputeMinLogNumberToKeep2PC(
return min_log_number_to_keep;
}
uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
const autovector<autovector<VersionEdit*>>& edit_lists,
const autovector<const autovector<MemTable*>*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker) {
assert(vset != nullptr);
assert(prep_tracker != nullptr);
assert(cfds_to_flush.size() == edit_lists.size());
assert(cfds_to_flush.size() == memtables_to_flush.size());
uint64_t min_log_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, cfds_to_flush, edit_lists);
uint64_t min_log_in_prep_heap =
prep_tracker->FindMinLogContainingOutstandingPrep();
if (min_log_in_prep_heap != 0 &&
min_log_in_prep_heap < min_log_number_to_keep) {
min_log_number_to_keep = min_log_in_prep_heap;
}
uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(
vset, cfds_to_flush, memtables_to_flush);
if (min_log_refed_by_mem != 0 &&
min_log_refed_by_mem < min_log_number_to_keep) {
min_log_number_to_keep = min_log_refed_by_mem;
}
return min_log_number_to_keep;
}
Status DBImpl::SetDBId() {
Status s;
// Happens when immutable_db_options_.write_dbid_to_manifest is set to true

@ -412,8 +412,9 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
Status s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free,
nullptr /* db_directory */, nullptr /* log_buffer */);
versions_.get(), nullptr /* prep_tracker */, &mutex_, file_meta_ptrs,
&job_context.memtables_to_free, nullptr /* db_directory */,
nullptr /* log_buffer */);
ASSERT_OK(s);
mutex_.Unlock();

@ -674,20 +674,11 @@ void MemTableList::RemoveMemTablesOrRestoreFlags(
}
uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
const autovector<MemTable*>& memtables_to_flush) {
const std::unordered_set<MemTable*>* memtables_to_flush) {
uint64_t min_log = 0;
for (auto& m : current_->memlist_) {
// Assume the list is very short, we can live with O(m*n). We can optimize
// if the performance has some problem.
bool should_skip = false;
for (MemTable* m_to_flush : memtables_to_flush) {
if (m == m_to_flush) {
should_skip = true;
break;
}
}
if (should_skip) {
if (memtables_to_flush && memtables_to_flush->count(m)) {
continue;
}
@ -707,7 +698,8 @@ Status InstallMemtableAtomicFlushResults(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_metas,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater(
@ -752,14 +744,21 @@ Status InstallMemtableAtomicFlushResults(
edit_lists.emplace_back(edits);
}
// TODO(cc): after https://github.com/facebook/rocksdb/pull/7570, handle 2pc
// here.
WalNumber min_wal_number_to_keep = 0;
if (vset->db_options()->allow_2pc) {
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
vset, cfds, edit_lists, mems_list, prep_tracker);
edit_lists.back().back()->SetMinLogNumberToKeep(min_wal_number_to_keep);
}
std::unique_ptr<VersionEdit> wal_deletion;
if (vset->db_options()->track_and_verify_wals_in_manifest) {
uint64_t min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists);
const auto& wals = vset->GetWalSet().GetWals();
if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) {
if (vset->db_options()->track_and_verify_wals_in_manifest &&
!vset->GetWalSet().GetWals().empty()) {
if (!vset->db_options()->allow_2pc) {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists);
}
if (min_wal_number_to_keep > vset->GetWalSet().GetWals().begin()->first) {
wal_deletion.reset(new VersionEdit);
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
edit_lists.back().push_back(wal_deletion.get());

@ -138,8 +138,8 @@ class MemTableListVersion {
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_meta,
VersionSet* vset, LogsWithPrepTracker* prep_tracker,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer);
@ -335,7 +335,7 @@ class MemTableList {
// Returns the min log containing the prep section after memtables listsed in
// `memtables_to_flush` are flushed and their status is persisted in manifest.
uint64_t PrecomputeMinLogContainingPrepSection(
const autovector<MemTable*>& memtables_to_flush);
const std::unordered_set<MemTable*>* memtables_to_flush = nullptr);
uint64_t GetEarliestMemTableID() const {
auto& memlist = current_->memlist_;
@ -381,8 +381,8 @@ class MemTableList {
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_meta,
VersionSet* vset, LogsWithPrepTracker* prep_tracker,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer);
@ -431,7 +431,8 @@ extern Status InstallMemtableAtomicFlushResults(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
LogsWithPrepTracker* prep_tracker, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_meta,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer);
} // namespace ROCKSDB_NAMESPACE

@ -185,8 +185,9 @@ class MemTableListTest : public testing::Test {
InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex);
return InstallMemtableAtomicFlushResults(
&lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
file_meta_ptrs, to_delete, nullptr, &log_buffer);
&lists, cfds, mutable_cf_options_list, mems_list, &versions,
nullptr /* prep_tracker */, &mutex, file_meta_ptrs, to_delete, nullptr,
&log_buffer);
}
};

Loading…
Cancel
Save