diff --git a/db/db_impl.cc b/db/db_impl.cc index d1810bc88..7b737f66f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -740,6 +740,34 @@ void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) { } } +uint64_t DBImpl::MinLogNumberToKeep() { + uint64_t log_number = versions_->MinLogNumber(); + + if (allow_2pc()) { + // if are 2pc we must consider logs containing prepared + // sections of outstanding transactions. + // + // We must check min logs with outstanding prep before we check + // logs referneces by memtables because a log referenced by the + // first data structure could transition to the second under us. + // + // TODO(horuff): iterating over all column families under db mutex. + // should find more optimial solution + auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep(); + + if (min_log_in_prep_heap != 0 && min_log_in_prep_heap < log_number) { + log_number = min_log_in_prep_heap; + } + + auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(); + + if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < log_number) { + log_number = min_log_refed_by_mem; + } + } + return log_number; +} + // * Returns the list of live files in 'sst_live' // If it's doing full scan: // * Returns the list of all files in the filesystem in @@ -798,32 +826,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, job_context->manifest_file_number = versions_->manifest_file_number(); job_context->pending_manifest_file_number = versions_->pending_manifest_file_number(); - job_context->log_number = versions_->MinLogNumber(); - - if (allow_2pc()) { - // if are 2pc we must consider logs containing prepared - // sections of outstanding transactions. - // - // We must check min logs with outstanding prep before we check - // logs referneces by memtables because a log referenced by the - // first data structure could transition to the second under us. - // - // TODO(horuff): iterating over all column families under db mutex. - // should find more optimial solution - auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep(); - - if (min_log_in_prep_heap != 0 && - min_log_in_prep_heap < job_context->log_number) { - job_context->log_number = min_log_in_prep_heap; - } - - auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(); - - if (min_log_refed_by_mem != 0 && - min_log_refed_by_mem < job_context->log_number) { - job_context->log_number = min_log_refed_by_mem; - } - } + job_context->log_number = MinLogNumberToKeep(); job_context->prev_log_number = versions_->prev_log_number(); diff --git a/db/db_impl.h b/db/db_impl.h index 90ed602d7..dfd5f1482 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -380,6 +380,8 @@ class DBImpl : public DB { // schedule a purge void ScheduleBgLogWriterClose(JobContext* job_context); + uint64_t MinLogNumberToKeep(); + // Returns the list of live files in 'live' and the list // of all files in the filesystem in 'candidate_files'. // If force == false and the last call was less than diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 6ecd258a1..953d9e9a7 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -221,6 +221,7 @@ static const std::string num_live_versions = "num-live-versions"; static const std::string current_version_number = "current-super-version-number"; static const std::string estimate_live_data_size = "estimate-live-data-size"; +static const std::string min_log_number_to_keep = "min-log-number-to-keep"; static const std::string base_level = "base-level"; static const std::string total_sst_files_size = "total-sst-files-size"; static const std::string estimate_pending_comp_bytes = @@ -285,6 +286,8 @@ const std::string DB::Properties::kCurrentSuperVersionNumber = rocksdb_prefix + current_version_number; const std::string DB::Properties::kEstimateLiveDataSize = rocksdb_prefix + estimate_live_data_size; +const std::string DB::Properties::kMinLogNumberToKeep = + rocksdb_prefix + min_log_number_to_keep; const std::string DB::Properties::kTotalSstFilesSize = rocksdb_prefix + total_sst_files_size; const std::string DB::Properties::kBaseLevel = rocksdb_prefix + base_level; @@ -368,6 +371,8 @@ const std::unordered_map nullptr}}, {DB::Properties::kEstimateLiveDataSize, {true, nullptr, &InternalStats::HandleEstimateLiveDataSize, nullptr}}, + {DB::Properties::kMinLogNumberToKeep, + {false, nullptr, &InternalStats::HandleMinLogNumberToKeep, nullptr}}, {DB::Properties::kBaseLevel, {false, nullptr, &InternalStats::HandleBaseLevel, nullptr}}, {DB::Properties::kTotalSstFilesSize, @@ -705,6 +710,12 @@ bool InternalStats::HandleEstimateLiveDataSize(uint64_t* value, DBImpl* db, return true; } +bool InternalStats::HandleMinLogNumberToKeep(uint64_t* value, DBImpl* db, + Version* version) { + *value = db->MinLogNumberToKeep(); + return true; +} + void InternalStats::DumpDBStats(std::string* value) { char buf[1000]; // DB-level stats, only available from default column family diff --git a/db/internal_stats.h b/db/internal_stats.h index e6c2365c2..aade28bb3 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -401,6 +401,7 @@ class InternalStats { Version* version); bool HandleEstimateLiveDataSize(uint64_t* value, DBImpl* db, Version* version); + bool HandleMinLogNumberToKeep(uint64_t* value, DBImpl* db, Version* version); // Total number of background errors encountered. Every time a flush task // or compaction task fails, this counter is incremented. The failure can diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 419acd8a5..2a0304e47 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -500,6 +500,10 @@ class DB { // live data in bytes. static const std::string kEstimateLiveDataSize; + // "rocksdb.min-log-number-to-keep" - return the minmum log number of the + // log files that should be kept. + static const std::string kMinLogNumberToKeep; + // "rocksdb.total-sst-files-size" - returns total size (bytes) of all SST // files. // WARNING: may slow down online queries if there are too many files. @@ -565,6 +569,7 @@ class DB { // "rocksdb.num-live-versions" // "rocksdb.current-super-version-number" // "rocksdb.estimate-live-data-size" + // "rocksdb.min-log-number-to-keep" // "rocksdb.total-sst-files-size" // "rocksdb.base-level" // "rocksdb.estimate-pending-compaction-bytes" diff --git a/utilities/checkpoint/checkpoint.cc b/utilities/checkpoint/checkpoint.cc index 8bc8dc84a..ea6a1d08f 100644 --- a/utilities/checkpoint/checkpoint.cc +++ b/utilities/checkpoint/checkpoint.cc @@ -62,6 +62,8 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { Status s; std::vector live_files; uint64_t manifest_file_size = 0; + bool allow_2pc = db_->GetDBOptions().allow_2pc; + uint64_t min_log_num = port::kMaxUint64; uint64_t sequence_number = db_->GetLatestSequenceNumber(); bool same_fs = true; VectorLogPtr live_wal_files; @@ -78,6 +80,35 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { if (s.ok()) { // this will return live_files prefixed with "/" s = db_->GetLiveFiles(live_files, &manifest_file_size); + + if (s.ok() && allow_2pc) { + // If 2PC is enabled, we need to get minimum log number after the flush. + // Need to refetch the live files to recapture the snapshot. + if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep, + &min_log_num)) { + db_->EnableFileDeletions(false); + return Status::InvalidArgument( + "2PC enabled but cannot fine the min log number to keep."); + } + // We need to refetch live files with flush to handle this case: + // A previous 000001.log contains the prepare record of transaction tnx1. + // The current log file is 000002.log, and sequence_number points to this + // file. + // After calling GetLiveFiles(), 000003.log is created. + // Then tnx1 is committed. The commit record is written to 000003.log. + // Now we fetch min_log_num, which will be 3. + // Then only 000002.log and 000003.log will be copied, and 000001.log will + // be skipped. 000003.log contains commit message of tnx1, but we don't + // have respective prepare record for it. + // In order to avoid this situation, we need to force flush to make sure + // all transactions commited before getting min_log_num will be flushed + // to SST files. + // We cannot get min_log_num before calling the GetLiveFiles() for the + // first time, because if we do that, all the logs files will be included, + // far more than needed. + s = db_->GetLiveFiles(live_files, &manifest_file_size, /* flush */ true); + } + TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1"); TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"); } @@ -156,7 +187,8 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { // that has changes after the last flush. for (size_t i = 0; s.ok() && i < wal_size; ++i) { if ((live_wal_files[i]->Type() == kAliveLogFile) && - (live_wal_files[i]->StartSequence() >= sequence_number)) { + (live_wal_files[i]->StartSequence() >= sequence_number || + live_wal_files[i]->LogNumber() >= min_log_num)) { if (i + 1 == wal_size) { Log(db_->GetOptions().info_log, "Copying %s", live_wal_files[i]->PathName().c_str()); diff --git a/utilities/checkpoint/checkpoint_test.cc b/utilities/checkpoint/checkpoint_test.cc index 3336e5af5..3d6fa74de 100644 --- a/utilities/checkpoint/checkpoint_test.cc +++ b/utilities/checkpoint/checkpoint_test.cc @@ -21,6 +21,7 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/utilities/checkpoint.h" +#include "rocksdb/utilities/transaction_db.h" #include "util/sync_point.h" #include "util/testharness.h" #include "util/xfunc.h" @@ -390,6 +391,120 @@ TEST_F(DBTest, CurrentFileModifiedWhileCheckpointing) { snapshotDB = nullptr; } +TEST_F(DBTest, CurrentFileModifiedWhileCheckpointing2PC) { + const std::string kSnapshotName = test::TmpDir(env_) + "/snapshot"; + const std::string dbname = test::TmpDir() + "/transaction_testdb"; + ASSERT_OK(DestroyDB(kSnapshotName, CurrentOptions())); + ASSERT_OK(DestroyDB(dbname, CurrentOptions())); + env_->DeleteDir(kSnapshotName); + env_->DeleteDir(dbname); + Close(); + + Options options = CurrentOptions(); + // allow_2pc is implicitly set with tx prepare + // options.allow_2pc = true; + TransactionDBOptions txn_db_options; + TransactionDB* txdb; + Status s = TransactionDB::Open(options, txn_db_options, dbname, &txdb); + assert(s.ok()); + ColumnFamilyHandle* cfa; + ColumnFamilyHandle* cfb; + ColumnFamilyOptions cf_options; + ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFA", &cfa)); + + WriteOptions write_options; + // Insert something into CFB so lots of log files will be kept + // before creating the checkpoint. + ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFB", &cfb)); + ASSERT_OK(txdb->Put(write_options, cfb, "", "")); + + ReadOptions read_options; + std::string value; + TransactionOptions txn_options; + Transaction* txn = txdb->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); + ASSERT_EQ(txdb->GetTransactionByName("xid"), txn); + + s = txn->Put(Slice("foo"), Slice("bar")); + s = txn->Put(cfa, Slice("foocfa"), Slice("barcfa")); + ASSERT_OK(s); + // Writing prepare into middle of first WAL, then flush WALs many times + for (int i = 1; i <= 100000; i++) { + Transaction* tx = txdb->BeginTransaction(write_options, txn_options); + ASSERT_OK(tx->SetName("x")); + ASSERT_OK(tx->Put(Slice(std::to_string(i)), Slice("val"))); + ASSERT_OK(tx->Put(cfa, Slice("aaa"), Slice("111"))); + ASSERT_OK(tx->Prepare()); + ASSERT_OK(tx->Commit()); + if (i % 10000 == 0) { + txdb->Flush(FlushOptions()); + } + if (i == 88888) { + ASSERT_OK(txn->Prepare()); + } + } + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1", + "DBTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit"}, + {"DBTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit", + "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + std::thread t([&]() { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(txdb, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(kSnapshotName)); + delete checkpoint; + }); + TEST_SYNC_POINT("DBTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit"); + ASSERT_OK(txn->Commit()); + TEST_SYNC_POINT( + "DBTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit"); + t.join(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + // No more than two logs files should exist. + std::vector files; + env_->GetChildren(kSnapshotName, &files); + int num_log_files = 0; + for (auto& file : files) { + uint64_t num; + FileType type; + WalFileType log_type; + if (ParseFileName(file, &num, &type, &log_type) && type == kLogFile) { + num_log_files++; + } + } + // One flush after preapare + one outstanding file before checkpoint + one log + // file generated after checkpoint. + ASSERT_LE(num_log_files, 3); + + TransactionDB* snapshotDB; + std::vector column_families; + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions())); + column_families.push_back( + ColumnFamilyDescriptor("CFA", ColumnFamilyOptions())); + column_families.push_back( + ColumnFamilyDescriptor("CFB", ColumnFamilyOptions())); + std::vector cf_handles; + ASSERT_OK(TransactionDB::Open(options, txn_db_options, kSnapshotName, + column_families, &cf_handles, &snapshotDB)); + ASSERT_OK(snapshotDB->Get(read_options, "foo", &value)); + ASSERT_EQ(value, "bar"); + ASSERT_OK(snapshotDB->Get(read_options, cf_handles[1], "foocfa", &value)); + ASSERT_EQ(value, "barcfa"); + + delete cfa; + delete cfb; + delete cf_handles[0]; + delete cf_handles[1]; + delete cf_handles[2]; + delete snapshotDB; + snapshotDB = nullptr; +} + } // namespace rocksdb int main(int argc, char** argv) {