WritePrepared Txn: Move DuplicateDetector to util

Summary:
Move DuplicateDetector and SetComparator to its own header file in util. It would also address a complaint in the unity test.
Closes https://github.com/facebook/rocksdb/pull/3567

Differential Revision: D7163268

Pulled By: maysamyabandeh

fbshipit-source-id: 6ddf82773473646dbbc1284ae601a78c4907c778
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent 9cb4856dbd
commit 62277e15c3
  1. 55
      db/write_batch.cc
  2. 48
      util/duplicate_detector.h
  3. 22
      util/set_comparator.h
  4. 14
      utilities/transactions/write_prepared_txn_db.h

@ -52,6 +52,7 @@
#include "monitoring/statistics.h"
#include "rocksdb/merge_operator.h"
#include "util/coding.h"
#include "util/duplicate_detector.h"
#include "util/string_util.h"
namespace rocksdb {
@ -978,60 +979,6 @@ Status WriteBatch::PopSavePoint() {
return Status::OK();
}
// TODO(myabandeh): move it to util
namespace {
// During recovery if the memtable is flushed we cannot rely on its help on
// duplicate key detection and as key insert will not be attempted. This class
// will be used as a emulator of memtable to tell if insertion of a key/seq
// would have resulted in duplication.
class DuplicateDetector {
public:
explicit DuplicateDetector(DBImpl* db) : db_(db) {}
bool IsDuplicateKeySeq(uint32_t cf, const Slice& key, SequenceNumber seq) {
assert(seq >= batch_seq_);
if (batch_seq_ != seq) { // it is a new batch
keys_.clear();
}
batch_seq_ = seq;
CFKeys& cf_keys = keys_[cf];
if (cf_keys.size() == 0) { // just inserted
InitWithComp(cf);
}
auto it = cf_keys.insert(key);
if (it.second == false) { // second is false if a element already existed.
keys_.clear();
InitWithComp(cf);
keys_[cf].insert(key);
return true;
}
return false;
}
private:
SequenceNumber batch_seq_ = 0;
DBImpl* db_;
// A comparator to be used in std::set
struct SetComparator {
explicit SetComparator() : user_comparator_(BytewiseComparator()) {}
explicit SetComparator(const Comparator* user_comparator)
: user_comparator_(user_comparator ? user_comparator
: BytewiseComparator()) {}
bool operator()(const Slice& lhs, const Slice& rhs) const {
return user_comparator_->Compare(lhs, rhs) < 0;
}
private:
const Comparator* user_comparator_;
};
using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_;
void InitWithComp(const uint32_t cf) {
auto cmp = db_->GetColumnFamilyHandle(cf)->GetComparator();
keys_[cf] = CFKeys(SetComparator(cmp));
}
};
} // anonymous namespace
class MemTableInserter : public WriteBatch::Handler {
SequenceNumber sequence_;

@ -0,0 +1,48 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "util/set_comparator.h"
namespace rocksdb {
// During recovery if the memtable is flushed we cannot rely on its help on
// duplicate key detection and as key insert will not be attempted. This class
// will be used as a emulator of memtable to tell if insertion of a key/seq
// would have resulted in duplication.
class DuplicateDetector {
public:
explicit DuplicateDetector(DBImpl* db) : db_(db) {}
bool IsDuplicateKeySeq(uint32_t cf, const Slice& key, SequenceNumber seq) {
assert(seq >= batch_seq_);
if (batch_seq_ != seq) { // it is a new batch
keys_.clear();
}
batch_seq_ = seq;
CFKeys& cf_keys = keys_[cf];
if (cf_keys.size() == 0) { // just inserted
InitWithComp(cf);
}
auto it = cf_keys.insert(key);
if (it.second == false) { // second is false if a element already existed.
keys_.clear();
InitWithComp(cf);
keys_[cf].insert(key);
return true;
}
return false;
}
private:
SequenceNumber batch_seq_ = 0;
DBImpl* db_;
using CFKeys = std::set<Slice, SetComparator>;
std::map<uint32_t, CFKeys> keys_;
void InitWithComp(const uint32_t cf) {
auto cmp = db_->GetColumnFamilyHandle(cf)->GetComparator();
keys_[cf] = CFKeys(SetComparator(cmp));
}
};
} // namespace rocksdb

@ -0,0 +1,22 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
namespace rocksdb {
// A comparator to be used in std::set
struct SetComparator {
explicit SetComparator() : user_comparator_(BytewiseComparator()) {}
explicit SetComparator(const Comparator* user_comparator)
: user_comparator_(user_comparator ? user_comparator
: BytewiseComparator()) {}
bool operator()(const Slice& lhs, const Slice& rhs) const {
return user_comparator_->Compare(lhs, rhs) < 0;
}
private:
const Comparator* user_comparator_;
};
} // namespace rocksdb

@ -20,6 +20,7 @@
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/set_comparator.h"
#include "util/string_util.h"
#include "utilities/transactions/pessimistic_transaction.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
@ -517,19 +518,6 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
bool includes_data_;
};
// A wrapper around Comparator to make it usable in std::set
struct SetComparator {
explicit SetComparator() : user_comparator_(BytewiseComparator()) {}
explicit SetComparator(const Comparator* user_comparator)
: user_comparator_(user_comparator ? user_comparator
: BytewiseComparator()) {}
bool operator()(const Slice& lhs, const Slice& rhs) const {
return user_comparator_->Compare(lhs, rhs) < 0;
}
private:
const Comparator* user_comparator_;
};
// Count the number of sub-batches inside a batch. A sub-batch does not have
// duplicate keys.
struct SubBatchCounter : public WriteBatch::Handler {

Loading…
Cancel
Save