Optionally create DuplicateDetector

Summary:
Address issue https://github.com/facebook/rocksdb/issues/3579
Closes https://github.com/facebook/rocksdb/pull/3589

Differential Revision: D7221161

Pulled By: yiwu-arbug

fbshipit-source-id: bd875ab0aa0e414dfa98b1bf036ba9b4ed351361
main
Dmitri Smirnov 7 years ago committed by Facebook Github Bot
parent e003d22526
commit 6f7b7f91b5
  1. 37
      db/write_batch.cc

@ -1009,7 +1009,9 @@ class MemTableInserter : public WriteBatch::Handler {
bool seq_per_batch_; bool seq_per_batch_;
// Whether the memtable write will be done only after the commit // Whether the memtable write will be done only after the commit
bool write_after_commit_; bool write_after_commit_;
DuplicateDetector duplicate_detector_; using DupDetector = std::aligned_storage<sizeof(DuplicateDetector)>::type;
DupDetector duplicate_detector_;
bool dup_dectector_on_;
MemPostInfoMap& GetPostMap() { MemPostInfoMap& GetPostMap() {
assert(concurrent_memtable_writes_); assert(concurrent_memtable_writes_);
@ -1020,6 +1022,17 @@ class MemTableInserter : public WriteBatch::Handler {
return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_); return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
} }
bool IsDuplicateKeySeq(uint32_t column_family_id, const Slice& key) {
assert(!write_after_commit_);
assert(rebuilding_trx_ != nullptr);
if (!dup_dectector_on_) {
new (&duplicate_detector_) DuplicateDetector(db_);
dup_dectector_on_ = true;
}
return reinterpret_cast<DuplicateDetector*>
(&duplicate_detector_)->IsDuplicateKeySeq(column_family_id, key, sequence_);
}
protected: protected:
virtual bool WriteAfterCommit() const override { return write_after_commit_; } virtual bool WriteAfterCommit() const override { return write_after_commit_; }
@ -1048,11 +1061,16 @@ class MemTableInserter : public WriteBatch::Handler {
// batch). So seq_per_batch being false indicates write_after_commit // batch). So seq_per_batch being false indicates write_after_commit
// approach. // approach.
write_after_commit_(!seq_per_batch), write_after_commit_(!seq_per_batch),
duplicate_detector_(db_) { duplicate_detector_(),
dup_dectector_on_(false) {
assert(cf_mems_); assert(cf_mems_);
} }
~MemTableInserter() { ~MemTableInserter() {
if (dup_dectector_on_) {
reinterpret_cast<DuplicateDetector*>
(&duplicate_detector_)->~DuplicateDetector();
}
if (post_info_created_) { if (post_info_created_) {
reinterpret_cast<MemPostInfoMap*> reinterpret_cast<MemPostInfoMap*>
(&mem_post_info_map_)->~MemPostInfoMap(); (&mem_post_info_map_)->~MemPostInfoMap();
@ -1153,8 +1171,7 @@ class MemTableInserter : public WriteBatch::Handler {
// The CF is probably flushed and hence no need for insert but we still // The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit. // need to keep track of the keys for upcoming rollback/commit.
WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value); WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id, batch_boundry = IsDuplicateKeySeq(column_family_id, key);
key, sequence_);
} }
MaybeAdvanceSeq(batch_boundry); MaybeAdvanceSeq(batch_boundry);
return seek_status; return seek_status;
@ -1281,8 +1298,7 @@ class MemTableInserter : public WriteBatch::Handler {
// The CF is probably flushed and hence no need for insert but we still // The CF is probably flushed and hence no need for insert but we still
// need to keep track of the keys for upcoming rollback/commit. // need to keep track of the keys for upcoming rollback/commit.
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key); WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id, batch_boundry = IsDuplicateKeySeq(column_family_id, key);
key, sequence_);
} }
MaybeAdvanceSeq(batch_boundry); MaybeAdvanceSeq(batch_boundry);
return seek_status; return seek_status;
@ -1317,8 +1333,7 @@ class MemTableInserter : public WriteBatch::Handler {
// need to keep track of the keys for upcoming rollback/commit. // need to keep track of the keys for upcoming rollback/commit.
WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id,
key); key);
batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id, batch_boundry = IsDuplicateKeySeq(column_family_id, key);
key, sequence_);
} }
MaybeAdvanceSeq(batch_boundry); MaybeAdvanceSeq(batch_boundry);
return seek_status; return seek_status;
@ -1358,8 +1373,7 @@ class MemTableInserter : public WriteBatch::Handler {
begin_key, end_key); begin_key, end_key);
// TODO(myabandeh): when transactional DeleteRange support is added, // TODO(myabandeh): when transactional DeleteRange support is added,
// check if end_key must also be added. // check if end_key must also be added.
batch_boundry = duplicate_detector_.IsDuplicateKeySeq( batch_boundry = IsDuplicateKeySeq(column_family_id, begin_key);
column_family_id, begin_key, sequence_);
} }
MaybeAdvanceSeq(batch_boundry); MaybeAdvanceSeq(batch_boundry);
return seek_status; return seek_status;
@ -1410,8 +1424,7 @@ class MemTableInserter : public WriteBatch::Handler {
// need to keep track of the keys for upcoming rollback/commit. // need to keep track of the keys for upcoming rollback/commit.
WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key,
value); value);
batch_boundry = duplicate_detector_.IsDuplicateKeySeq(column_family_id, batch_boundry = IsDuplicateKeySeq(column_family_id, key);
key, sequence_);
} }
MaybeAdvanceSeq(batch_boundry); MaybeAdvanceSeq(batch_boundry);
return seek_status; return seek_status;

Loading…
Cancel
Save