diff --git a/db/db_impl.cc b/db/db_impl.cc index a061da835..061e1caec 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -182,8 +182,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) unable_to_flush_oldest_log_(false), env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), env_options_for_compaction_(env_->OptimizeForCompactionTableWrite( - env_options_, - immutable_db_options_)), + env_options_, immutable_db_options_)), num_running_ingest_file_(0), #ifndef ROCKSDB_LITE wal_manager_(immutable_db_options_, env_options_), @@ -195,7 +194,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) opened_successfully_(false), concurrent_prepare_(options.concurrent_prepare), manual_wal_flush_(options.manual_wal_flush), - seq_per_batch_(options.seq_per_batch) { + seq_per_batch_(options.seq_per_batch), + // TODO(myabandeh): revise this when we change options.seq_per_batch + use_custom_gc_(options.seq_per_batch) { env_->GetAbsolutePath(dbname, &db_absolute_path_); // Reserve ten files or so for other uses and give the rest to TableCache. diff --git a/db/db_impl.h b/db/db_impl.h index 99abe2d34..5992288a5 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -1300,6 +1300,7 @@ class DBImpl : public DB { const bool concurrent_prepare_; const bool manual_wal_flush_; const bool seq_per_batch_; + const bool use_custom_gc_; }; extern Options SanitizeOptions(const std::string& db, diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 7f7e23eaa..cbd860b3e 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -86,10 +86,14 @@ Status DBImpl::FlushMemTableToOutputFile( std::vector snapshot_seqs = snapshots_.GetAll(&earliest_write_conflict_snapshot); + auto snapshot_checker = snapshot_checker_.get(); + if (use_custom_gc_ && snapshot_checker == nullptr) { + snapshot_checker = DisableGCSnapshotChecker::Instance(); + } FlushJob flush_job( dbname_, cfd, immutable_db_options_, mutable_cf_options, env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, - snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker_.get(), + snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, job_context, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(0U), GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, @@ -531,13 +535,17 @@ Status DBImpl::CompactFilesImpl( auto pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); + auto snapshot_checker = snapshot_checker_.get(); + if (use_custom_gc_ && snapshot_checker == nullptr) { + snapshot_checker = DisableGCSnapshotChecker::Instance(); + } assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJob compaction_job( job_context->job_id, c.get(), immutable_db_options_, env_options_for_compaction_, versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_, snapshot_seqs, - earliest_write_conflict_snapshot, snapshot_checker_.get(), table_cache_, + earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, nullptr); // Here we pass a nullptr for CompactionJobStats because @@ -1678,6 +1686,10 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, std::vector snapshot_seqs = snapshots_.GetAll(&earliest_write_conflict_snapshot); + auto snapshot_checker = snapshot_checker_.get(); + if (use_custom_gc_ && snapshot_checker == nullptr) { + snapshot_checker = DisableGCSnapshotChecker::Instance(); + } assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJob compaction_job( job_context->job_id, c.get(), immutable_db_options_, @@ -1685,7 +1697,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, - snapshot_checker_.get(), table_cache_, &event_logger_, + snapshot_checker, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, &compaction_job_stats); diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 8981f35ea..3ed27c717 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -883,10 +883,10 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, SequenceNumber earliest_write_conflict_snapshot; std::vector snapshot_seqs = snapshots_.GetAll(&earliest_write_conflict_snapshot); - // Only TransactionDB passes snapshot_checker and it creates it after db - // open. Just pass nullptr here. - SnapshotChecker* snapshot_checker = nullptr; - + auto snapshot_checker = snapshot_checker_.get(); + if (use_custom_gc_ && snapshot_checker == nullptr) { + snapshot_checker = DisableGCSnapshotChecker::Instance(); + } s = BuildTable( dbname_, env_, *cfd->ioptions(), mutable_cf_options, env_options_for_compaction_, cfd->table_cache(), iter.get(), diff --git a/db/repair.cc b/db/repair.cc index 6b58c195c..f88720878 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -400,11 +400,7 @@ class Repairer { int64_t _current_time = 0; status = env_->GetCurrentTime(&_current_time); // ignore error const uint64_t current_time = static_cast(_current_time); - // Only TransactionDB make use of snapshot_checker and repair doesn't - // currently support TransactionDB with uncommitted prepared keys in WAL. - // TODO(yiwu) Support repairing TransactionDB. - SnapshotChecker* snapshot_checker = nullptr; - + SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance(); status = BuildTable( dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), env_options_, table_cache_, iter.get(), diff --git a/db/snapshot_checker.h b/db/snapshot_checker.h index baea76e96..8a8738a5a 100644 --- a/db/snapshot_checker.h +++ b/db/snapshot_checker.h @@ -8,16 +8,40 @@ namespace rocksdb { +// Callback class that control GC of duplicate keys in flush/compaction +class SnapshotChecker { + public: + virtual ~SnapshotChecker() {} + virtual bool IsInSnapshot(SequenceNumber sequence, + SequenceNumber snapshot_sequence) const = 0; +}; + +class DisableGCSnapshotChecker : public SnapshotChecker { + public: + virtual ~DisableGCSnapshotChecker() {} + virtual bool IsInSnapshot(SequenceNumber sequence, + SequenceNumber snapshot_sequence) const { + // By returning false, we prevent all the values from being GCed + return false; + } + static DisableGCSnapshotChecker* Instance() { return &instance_; } + + protected: + static DisableGCSnapshotChecker instance_; + explicit DisableGCSnapshotChecker() {} +}; + class WritePreparedTxnDB; // Callback class created by WritePreparedTxnDB to check if a key // is visible by a snapshot. -class SnapshotChecker { +class WritePreparedSnapshotChecker : public SnapshotChecker { public: - explicit SnapshotChecker(WritePreparedTxnDB* txn_db); + explicit WritePreparedSnapshotChecker(WritePreparedTxnDB* txn_db); + virtual ~WritePreparedSnapshotChecker() {} - bool IsInSnapshot(SequenceNumber sequence, - SequenceNumber snapshot_sequence) const; + virtual bool IsInSnapshot(SequenceNumber sequence, + SequenceNumber snapshot_sequence) const override; private: #ifndef ROCKSDB_LITE diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 4690cd298..534325292 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -148,7 +148,7 @@ Status WritePreparedTxnDB::Initialize( SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber(); AdvanceMaxEvictedSeq(prev_max, last_seq); - db_impl_->SetSnapshotChecker(new SnapshotChecker(this)); + db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this)); auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices, handles); diff --git a/utilities/transactions/snapshot_checker.cc b/utilities/transactions/snapshot_checker.cc index 724f7430a..6a718fa8e 100644 --- a/utilities/transactions/snapshot_checker.cc +++ b/utilities/transactions/snapshot_checker.cc @@ -14,10 +14,11 @@ namespace rocksdb { #ifdef ROCKSDB_LITE -SnapshotChecker::SnapshotChecker(WritePreparedTxnDB* txn_db) {} +WritePreparedSnapshotChecker::WritePreparedSnapshotChecker( + WritePreparedTxnDB* txn_db) {} -bool SnapshotChecker::IsInSnapshot(SequenceNumber sequence, - SequenceNumber snapshot_sequence) const { +bool WritePreparedSnapshotChecker::IsInSnapshot( + SequenceNumber sequence, SequenceNumber snapshot_sequence) const { // Should never be called in LITE mode. assert(false); return true; @@ -25,13 +26,16 @@ bool SnapshotChecker::IsInSnapshot(SequenceNumber sequence, #else -SnapshotChecker::SnapshotChecker(WritePreparedTxnDB* txn_db) +WritePreparedSnapshotChecker::WritePreparedSnapshotChecker( + WritePreparedTxnDB* txn_db) : txn_db_(txn_db){}; -bool SnapshotChecker::IsInSnapshot(SequenceNumber sequence, - SequenceNumber snapshot_sequence) const { +bool WritePreparedSnapshotChecker::IsInSnapshot( + SequenceNumber sequence, SequenceNumber snapshot_sequence) const { return txn_db_->IsInSnapshot(sequence, snapshot_sequence); } + #endif // ROCKSDB_LITE +DisableGCSnapshotChecker DisableGCSnapshotChecker::instance_; } // namespace rocksdb diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 78141ce21..37b25450a 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1349,6 +1349,28 @@ TEST_P(WritePreparedTransactionTest, DuplicateKeyTest) { } } +TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) { + // Use large buffer to avoid memtable flush after 1024 insertions + options.write_buffer_size = 1024 * 1024; + ReOpen(); + std::vector versions; + for (uint64_t i = 1; i <= 1024; i++) { + std::string v = "bar" + ToString(i); + ASSERT_OK(db->Put(WriteOptions(), "foo", v)); + VerifyKeys({{"foo", v}}); + KeyVersion kv = {"foo", v, i, kTypeValue}; + versions.emplace_back(kv); + } + std::reverse(std::begin(versions), std::end(versions)); + VerifyInternalKeys(versions); + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + db_impl->FlushWAL(true); + // Use small buffer to ensure memtable flush during recovery + options.write_buffer_size = 1024; + ReOpenNoDelete(); + VerifyInternalKeys(versions); +} + TEST_P(WritePreparedTransactionTest, SequenceNumberZeroTest) { ASSERT_OK(db->Put(WriteOptions(), "foo", "bar")); VerifyKeys({{"foo", "bar"}});