WritePrepared Txn: Disable GC during recovery

Summary:
Disables GC during recovery of a WritePrepared txn db to avoid GCing uncommitted key values.
Closes https://github.com/facebook/rocksdb/pull/2980

Differential Revision: D6000191

Pulled By: maysamyabandeh

fbshipit-source-id: fc4d522c643d24ebf043f811fe4ecd0dd0294675
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 7891af8b53
commit 7e38238981
  1. 7
      db/db_impl.cc
  2. 1
      db/db_impl.h
  3. 18
      db/db_impl_compaction_flush.cc
  4. 8
      db/db_impl_open.cc
  5. 6
      db/repair.cc
  6. 32
      db/snapshot_checker.h
  7. 2
      utilities/transactions/pessimistic_transaction_db.cc
  8. 16
      utilities/transactions/snapshot_checker.cc
  9. 22
      utilities/transactions/write_prepared_transaction_test.cc

@ -182,8 +182,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
unable_to_flush_oldest_log_(false), unable_to_flush_oldest_log_(false),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
env_options_for_compaction_(env_->OptimizeForCompactionTableWrite( env_options_for_compaction_(env_->OptimizeForCompactionTableWrite(
env_options_, env_options_, immutable_db_options_)),
immutable_db_options_)),
num_running_ingest_file_(0), num_running_ingest_file_(0),
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
wal_manager_(immutable_db_options_, env_options_), wal_manager_(immutable_db_options_, env_options_),
@ -195,7 +194,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
opened_successfully_(false), opened_successfully_(false),
concurrent_prepare_(options.concurrent_prepare), concurrent_prepare_(options.concurrent_prepare),
manual_wal_flush_(options.manual_wal_flush), manual_wal_flush_(options.manual_wal_flush),
seq_per_batch_(options.seq_per_batch) { seq_per_batch_(options.seq_per_batch),
// TODO(myabandeh): revise this when we change options.seq_per_batch
use_custom_gc_(options.seq_per_batch) {
env_->GetAbsolutePath(dbname, &db_absolute_path_); env_->GetAbsolutePath(dbname, &db_absolute_path_);
// Reserve ten files or so for other uses and give the rest to TableCache. // Reserve ten files or so for other uses and give the rest to TableCache.

@ -1300,6 +1300,7 @@ class DBImpl : public DB {
const bool concurrent_prepare_; const bool concurrent_prepare_;
const bool manual_wal_flush_; const bool manual_wal_flush_;
const bool seq_per_batch_; const bool seq_per_batch_;
const bool use_custom_gc_;
}; };
extern Options SanitizeOptions(const std::string& db, extern Options SanitizeOptions(const std::string& db,

@ -86,10 +86,14 @@ Status DBImpl::FlushMemTableToOutputFile(
std::vector<SequenceNumber> snapshot_seqs = std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot); snapshots_.GetAll(&earliest_write_conflict_snapshot);
auto snapshot_checker = snapshot_checker_.get();
if (use_custom_gc_ && snapshot_checker == nullptr) {
snapshot_checker = DisableGCSnapshotChecker::Instance();
}
FlushJob flush_job( FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options, dbname_, cfd, immutable_db_options_, mutable_cf_options,
env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker_.get(), snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
job_context, log_buffer, directories_.GetDbDir(), job_context, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(0U), directories_.GetDataDir(0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
@ -531,13 +535,17 @@ Status DBImpl::CompactFilesImpl(
auto pending_outputs_inserted_elem = auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs(); CaptureCurrentFileNumberInPendingOutputs();
auto snapshot_checker = snapshot_checker_.get();
if (use_custom_gc_ && snapshot_checker == nullptr) {
snapshot_checker = DisableGCSnapshotChecker::Instance();
}
assert(is_snapshot_supported_ || snapshots_.empty()); assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job( CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_, job_context->job_id, c.get(), immutable_db_options_,
env_options_for_compaction_, versions_.get(), &shutting_down_, log_buffer, env_options_for_compaction_, versions_.get(), &shutting_down_, log_buffer,
directories_.GetDbDir(), directories_.GetDataDir(c->output_path_id()), directories_.GetDbDir(), directories_.GetDataDir(c->output_path_id()),
stats_, &mutex_, &bg_error_, snapshot_seqs, stats_, &mutex_, &bg_error_, snapshot_seqs,
earliest_write_conflict_snapshot, snapshot_checker_.get(), table_cache_, earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger_, c->mutable_cf_options()->paranoid_file_checks, &event_logger_, c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_, c->mutable_cf_options()->report_bg_io_stats, dbname_,
nullptr); // Here we pass a nullptr for CompactionJobStats because nullptr); // Here we pass a nullptr for CompactionJobStats because
@ -1678,6 +1686,10 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
std::vector<SequenceNumber> snapshot_seqs = std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot); snapshots_.GetAll(&earliest_write_conflict_snapshot);
auto snapshot_checker = snapshot_checker_.get();
if (use_custom_gc_ && snapshot_checker == nullptr) {
snapshot_checker = DisableGCSnapshotChecker::Instance();
}
assert(is_snapshot_supported_ || snapshots_.empty()); assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job( CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_, job_context->job_id, c.get(), immutable_db_options_,
@ -1685,7 +1697,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
log_buffer, directories_.GetDbDir(), log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, directories_.GetDataDir(c->output_path_id()), stats_, &mutex_,
&bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker_.get(), table_cache_, &event_logger_, snapshot_checker, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_, c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats); &compaction_job_stats);

@ -883,10 +883,10 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
SequenceNumber earliest_write_conflict_snapshot; SequenceNumber earliest_write_conflict_snapshot;
std::vector<SequenceNumber> snapshot_seqs = std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot); snapshots_.GetAll(&earliest_write_conflict_snapshot);
// Only TransactionDB passes snapshot_checker and it creates it after db auto snapshot_checker = snapshot_checker_.get();
// open. Just pass nullptr here. if (use_custom_gc_ && snapshot_checker == nullptr) {
SnapshotChecker* snapshot_checker = nullptr; snapshot_checker = DisableGCSnapshotChecker::Instance();
}
s = BuildTable( s = BuildTable(
dbname_, env_, *cfd->ioptions(), mutable_cf_options, dbname_, env_, *cfd->ioptions(), mutable_cf_options,
env_options_for_compaction_, cfd->table_cache(), iter.get(), env_options_for_compaction_, cfd->table_cache(), iter.get(),

@ -400,11 +400,7 @@ class Repairer {
int64_t _current_time = 0; int64_t _current_time = 0;
status = env_->GetCurrentTime(&_current_time); // ignore error status = env_->GetCurrentTime(&_current_time); // ignore error
const uint64_t current_time = static_cast<uint64_t>(_current_time); const uint64_t current_time = static_cast<uint64_t>(_current_time);
// Only TransactionDB make use of snapshot_checker and repair doesn't SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
// currently support TransactionDB with uncommitted prepared keys in WAL.
// TODO(yiwu) Support repairing TransactionDB.
SnapshotChecker* snapshot_checker = nullptr;
status = BuildTable( status = BuildTable(
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
env_options_, table_cache_, iter.get(), env_options_, table_cache_, iter.get(),

@ -8,16 +8,40 @@
namespace rocksdb { namespace rocksdb {
// Callback class that control GC of duplicate keys in flush/compaction
class SnapshotChecker {
public:
virtual ~SnapshotChecker() {}
virtual bool IsInSnapshot(SequenceNumber sequence,
SequenceNumber snapshot_sequence) const = 0;
};
class DisableGCSnapshotChecker : public SnapshotChecker {
public:
virtual ~DisableGCSnapshotChecker() {}
virtual bool IsInSnapshot(SequenceNumber sequence,
SequenceNumber snapshot_sequence) const {
// By returning false, we prevent all the values from being GCed
return false;
}
static DisableGCSnapshotChecker* Instance() { return &instance_; }
protected:
static DisableGCSnapshotChecker instance_;
explicit DisableGCSnapshotChecker() {}
};
class WritePreparedTxnDB; class WritePreparedTxnDB;
// Callback class created by WritePreparedTxnDB to check if a key // Callback class created by WritePreparedTxnDB to check if a key
// is visible by a snapshot. // is visible by a snapshot.
class SnapshotChecker { class WritePreparedSnapshotChecker : public SnapshotChecker {
public: public:
explicit SnapshotChecker(WritePreparedTxnDB* txn_db); explicit WritePreparedSnapshotChecker(WritePreparedTxnDB* txn_db);
virtual ~WritePreparedSnapshotChecker() {}
bool IsInSnapshot(SequenceNumber sequence, virtual bool IsInSnapshot(SequenceNumber sequence,
SequenceNumber snapshot_sequence) const; SequenceNumber snapshot_sequence) const override;
private: private:
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE

@ -148,7 +148,7 @@ Status WritePreparedTxnDB::Initialize(
SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber(); SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
AdvanceMaxEvictedSeq(prev_max, last_seq); AdvanceMaxEvictedSeq(prev_max, last_seq);
db_impl_->SetSnapshotChecker(new SnapshotChecker(this)); db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices, auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
handles); handles);

@ -14,10 +14,11 @@
namespace rocksdb { namespace rocksdb {
#ifdef ROCKSDB_LITE #ifdef ROCKSDB_LITE
SnapshotChecker::SnapshotChecker(WritePreparedTxnDB* txn_db) {} WritePreparedSnapshotChecker::WritePreparedSnapshotChecker(
WritePreparedTxnDB* txn_db) {}
bool SnapshotChecker::IsInSnapshot(SequenceNumber sequence, bool WritePreparedSnapshotChecker::IsInSnapshot(
SequenceNumber snapshot_sequence) const { SequenceNumber sequence, SequenceNumber snapshot_sequence) const {
// Should never be called in LITE mode. // Should never be called in LITE mode.
assert(false); assert(false);
return true; return true;
@ -25,13 +26,16 @@ bool SnapshotChecker::IsInSnapshot(SequenceNumber sequence,
#else #else
SnapshotChecker::SnapshotChecker(WritePreparedTxnDB* txn_db) WritePreparedSnapshotChecker::WritePreparedSnapshotChecker(
WritePreparedTxnDB* txn_db)
: txn_db_(txn_db){}; : txn_db_(txn_db){};
bool SnapshotChecker::IsInSnapshot(SequenceNumber sequence, bool WritePreparedSnapshotChecker::IsInSnapshot(
SequenceNumber snapshot_sequence) const { SequenceNumber sequence, SequenceNumber snapshot_sequence) const {
return txn_db_->IsInSnapshot(sequence, snapshot_sequence); return txn_db_->IsInSnapshot(sequence, snapshot_sequence);
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
DisableGCSnapshotChecker DisableGCSnapshotChecker::instance_;
} // namespace rocksdb } // namespace rocksdb

@ -1349,6 +1349,28 @@ TEST_P(WritePreparedTransactionTest, DuplicateKeyTest) {
} }
} }
TEST_P(WritePreparedTransactionTest, DisableGCDuringRecoveryTest) {
// Use large buffer to avoid memtable flush after 1024 insertions
options.write_buffer_size = 1024 * 1024;
ReOpen();
std::vector<KeyVersion> versions;
for (uint64_t i = 1; i <= 1024; i++) {
std::string v = "bar" + ToString(i);
ASSERT_OK(db->Put(WriteOptions(), "foo", v));
VerifyKeys({{"foo", v}});
KeyVersion kv = {"foo", v, i, kTypeValue};
versions.emplace_back(kv);
}
std::reverse(std::begin(versions), std::end(versions));
VerifyInternalKeys(versions);
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
// Use small buffer to ensure memtable flush during recovery
options.write_buffer_size = 1024;
ReOpenNoDelete();
VerifyInternalKeys(versions);
}
TEST_P(WritePreparedTransactionTest, SequenceNumberZeroTest) { TEST_P(WritePreparedTransactionTest, SequenceNumberZeroTest) {
ASSERT_OK(db->Put(WriteOptions(), "foo", "bar")); ASSERT_OK(db->Put(WriteOptions(), "foo", "bar"));
VerifyKeys({{"foo", "bar"}}); VerifyKeys({{"foo", "bar"}});

Loading…
Cancel
Save