WritePrepared Txn: add stats

Summary:
Adding some stats that would be helpful to monitor if the DB has gone to unlikely stats that would hurt the performance. These are mostly when we end up needing to acquire a mutex.
Closes https://github.com/facebook/rocksdb/pull/3683

Differential Revision: D7529393

Pulled By: maysamyabandeh

fbshipit-source-id: f7d36279a8f39bd84d8ddbf64b5c97f670c5d6d9
main
Maysam Yabandeh 7 years ago committed by Facebook Github Bot
parent eb5a295440
commit bde1c1a72a
  1. 22
      db/db_impl.h
  2. 6
      db/memtable.cc
  3. 16
      include/rocksdb/statistics.h
  4. 7
      include/rocksdb/utilities/transaction_db.h
  5. 18
      utilities/transactions/write_prepared_txn_db.cc
  6. 8
      utilities/transactions/write_prepared_txn_db.h

@ -691,14 +691,20 @@ class DBImpl : public DB {
void EraseThreadStatusDbInfo() const; void EraseThreadStatusDbInfo() const;
// If disable_memtable is set the application logic must guarantee that the // If disable_memtable is set the application logic must guarantee that the
// batch will still be skipped from memtable during the recovery. In // batch will still be skipped from memtable during the recovery. An excption
// WriteCommitted it is guarnateed since disable_memtable is used for prepare // to this is seq_per_batch_ mode, in which since each batch already takes one
// batch which will be written to memtable later during the commit, and in // seq, it is ok for the batch to write to memtable during recovery as long as
// WritePrepared it is guaranteed since it will be used only for WAL markers // it only takes one sequence number: i.e., no duplicate keys.
// which will never be written to memtable. // In WriteCommitted it is guarnateed since disable_memtable is used for
// batch_cnt is expected to be non-zero in seq_per_batch mode and indicates // prepare batch which will be written to memtable later during the commit,
// the number of sub-patches. A sub-patch is a subset of the write batch that // and in WritePrepared it is guaranteed since it will be used only for WAL
// does not have duplicate keys. // markers which will never be written to memtable. If the commit marker is
// accompanied with CommitTimeWriteBatch that is not written to memtable as
// long as it has no duplicate keys, it does not violate the one-seq-per-batch
// policy.
// batch_cnt is expected to be non-zero in seq_per_batch mode and
// indicates the number of sub-patches. A sub-patch is a subset of the write
// batch that does not have duplicate keys.
Status WriteImpl(const WriteOptions& options, WriteBatch* updates, Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr, WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0, uint64_t* log_used = nullptr, uint64_t log_ref = 0,

@ -479,12 +479,12 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
insert_with_hint_prefix_extractor_->InDomain(key_slice)) { insert_with_hint_prefix_extractor_->InDomain(key_slice)) {
Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice); Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice);
bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]); bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]);
if (!res) { if (UNLIKELY(!res)) {
return res; return res;
} }
} else { } else {
bool res = table->InsertKey(handle); bool res = table->InsertKey(handle);
if (!res) { if (UNLIKELY(!res)) {
return res; return res;
} }
} }
@ -520,7 +520,7 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
UpdateFlushState(); UpdateFlushState();
} else { } else {
bool res = table->InsertKeyConcurrently(handle); bool res = table->InsertKeyConcurrently(handle);
if (!res) { if (UNLIKELY(!res)) {
return res; return res;
} }

@ -309,6 +309,17 @@ enum Tickers : uint32_t {
// # of bytes in the blob files evicted because of BlobDB is full. // # of bytes in the blob files evicted because of BlobDB is full.
BLOB_DB_FIFO_BYTES_EVICTED, BLOB_DB_FIFO_BYTES_EVICTED,
// These coutners indicate a performance issue in WritePrepared transactions.
// We should not seem them ticking them much.
// # of times prepare_mutex_ is acquired in the fast path.
TXN_PREPARE_MUTEX_OVERHEAD,
// # of times old_commit_map_mutex_ is acquired in the fast path.
TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD,
// # of times we checked a batch for duplicate keys.
TXN_DUPLICATE_KEY_OVERHEAD,
// # of times snapshot_mutex_ is acquired in the fast path.
TXN_SNAPSHOT_MUTEX_OVERHEAD,
TICKER_ENUM_MAX TICKER_ENUM_MAX
}; };
@ -455,6 +466,11 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{BLOB_DB_FIFO_NUM_FILES_EVICTED, "rocksdb.blobdb.fifo.num.files.evicted"}, {BLOB_DB_FIFO_NUM_FILES_EVICTED, "rocksdb.blobdb.fifo.num.files.evicted"},
{BLOB_DB_FIFO_NUM_KEYS_EVICTED, "rocksdb.blobdb.fifo.num.keys.evicted"}, {BLOB_DB_FIFO_NUM_KEYS_EVICTED, "rocksdb.blobdb.fifo.num.keys.evicted"},
{BLOB_DB_FIFO_BYTES_EVICTED, "rocksdb.blobdb.fifo.bytes.evicted"}, {BLOB_DB_FIFO_BYTES_EVICTED, "rocksdb.blobdb.fifo.bytes.evicted"},
{TXN_PREPARE_MUTEX_OVERHEAD, "rocksdb.txn.overhead.mutex.prepare"},
{TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD,
"rocksdb.txn.overhead.mutex.old.commit.map"},
{TXN_DUPLICATE_KEY_OVERHEAD, "rocksdb.txn.overhead.duplicate.key"},
{TXN_SNAPSHOT_MUTEX_OVERHEAD, "rocksdb.txn.overhead.mutex.snapshot"},
}; };
/** /**

@ -98,9 +98,10 @@ struct TransactionOptions {
bool deadlock_detect = false; bool deadlock_detect = false;
// If set, it states that the CommitTimeWriteBatch represents the latest state // If set, it states that the CommitTimeWriteBatch represents the latest state
// of the application and meant to be used later during recovery. It enables // of the application, has only one sub-batch, i.e., no duplicate keys, and
// an optimization to postpone updating the memtable with CommitTimeWriteBatch // meant to be used later during recovery. It enables an optimization to
// to only SwitchMemtable or recovery. // postpone updating the memtable with CommitTimeWriteBatch to only
// SwitchMemtable or recovery.
bool use_only_the_last_commit_time_batch_for_recovery = false; bool use_only_the_last_commit_time_batch_for_recovery = false;
// TODO(agiardullo): TransactionDB does not yet support comparators that allow // TODO(agiardullo): TransactionDB does not yet support comparators that allow

@ -131,9 +131,9 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
auto s = batch->Iterate(&counter); auto s = batch->Iterate(&counter);
assert(s.ok()); assert(s.ok());
batch_cnt = counter.BatchCount(); batch_cnt = counter.BatchCount();
// TODO(myabandeh): replace me with a stat WPRecordTick(TXN_DUPLICATE_KEY_OVERHEAD);
ROCKS_LOG_WARN(info_log_, "Duplicate key overhead: %" PRIu64 " batches", ROCKS_LOG_DETAILS(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
static_cast<uint64_t>(batch_cnt)); static_cast<uint64_t>(batch_cnt));
} }
assert(batch_cnt); assert(batch_cnt);
@ -506,7 +506,6 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) { while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
auto to_be_popped = prepared_txns_.top(); auto to_be_popped = prepared_txns_.top();
delayed_prepared_.insert(to_be_popped); delayed_prepared_.insert(to_be_popped);
// TODO(myabandeh): also add a stat
ROCKS_LOG_WARN(info_log_, ROCKS_LOG_WARN(info_log_,
"prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64 "prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
" new_max=%" PRIu64 " oldmax=%" PRIu64, " new_max=%" PRIu64 " oldmax=%" PRIu64,
@ -574,14 +573,14 @@ void WritePreparedTxnDB::ReleaseSnapshotInternal(
// old_commit_map_. Check and do garbage collection if that is the case. // old_commit_map_. Check and do garbage collection if that is the case.
bool need_gc = false; bool need_gc = false;
{ {
// TODO(myabandeh): also add a stat WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
ReadLock rl(&old_commit_map_mutex_); ReadLock rl(&old_commit_map_mutex_);
auto prep_set_entry = old_commit_map_.find(snap_seq); auto prep_set_entry = old_commit_map_.find(snap_seq);
need_gc = prep_set_entry != old_commit_map_.end(); need_gc = prep_set_entry != old_commit_map_.end();
} }
if (need_gc) { if (need_gc) {
// TODO(myabandeh): also add a stat WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
WriteLock wl(&old_commit_map_mutex_); WriteLock wl(&old_commit_map_mutex_);
old_commit_map_.erase(snap_seq); old_commit_map_.erase(snap_seq);
@ -601,8 +600,7 @@ void WritePreparedTxnDB::UpdateSnapshots(
#ifndef NDEBUG #ifndef NDEBUG
size_t sync_i = 0; size_t sync_i = 0;
#endif #endif
// TODO(myabandeh): replace me with a stat ROCKS_LOG_DETAILS(info_log_, "snapshots_mutex_ overhead");
ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead");
WriteLock wl(&snapshots_mutex_); WriteLock wl(&snapshots_mutex_);
snapshots_version_ = version; snapshots_version_ = version;
// We update the list concurrently with the readers. // We update the list concurrently with the readers.
@ -682,7 +680,7 @@ void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE && if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE &&
snapshot_seq < evicted.prep_seq)) { snapshot_seq < evicted.prep_seq)) {
// Then access the less efficient list of snapshots_ // Then access the less efficient list of snapshots_
// TODO(myabandeh): also add a stat WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD);
ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead"); ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead");
ReadLock rl(&snapshots_mutex_); ReadLock rl(&snapshots_mutex_);
// Items could have moved from the snapshots_ to snapshot_cache_ before // Items could have moved from the snapshots_ to snapshot_cache_ before
@ -716,8 +714,8 @@ bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
} }
// then snapshot_seq < commit_seq // then snapshot_seq < commit_seq
if (prep_seq <= snapshot_seq) { // overlapping range if (prep_seq <= snapshot_seq) { // overlapping range
WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
// TODO(myabandeh): also add a stat
WriteLock wl(&old_commit_map_mutex_); WriteLock wl(&old_commit_map_mutex_);
old_commit_map_empty_.store(false, std::memory_order_release); old_commit_map_empty_.store(false, std::memory_order_release);
auto& vec = old_commit_map_[snapshot_seq]; auto& vec = old_commit_map_[snapshot_seq];

@ -144,8 +144,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
} }
if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { if (!delayed_prepared_empty_.load(std::memory_order_acquire)) {
// We should not normally reach here // We should not normally reach here
WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD);
ReadLock rl(&prepared_mutex_); ReadLock rl(&prepared_mutex_);
// TODO(myabandeh): also add a stat
ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64, ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64,
static_cast<uint64_t>(delayed_prepared_.size())); static_cast<uint64_t>(delayed_prepared_.size()));
if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) {
@ -216,7 +216,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// We should not normally reach here unless sapshot_seq is old. This is a // We should not normally reach here unless sapshot_seq is old. This is a
// rare case and it is ok to pay the cost of mutex ReadLock for such old, // rare case and it is ok to pay the cost of mutex ReadLock for such old,
// reading transactions. // reading transactions.
// TODO(myabandeh): also add a stat WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
ReadLock rl(&old_commit_map_mutex_); ReadLock rl(&old_commit_map_mutex_);
auto prep_set_entry = old_commit_map_.find(snapshot_seq); auto prep_set_entry = old_commit_map_.find(snapshot_seq);
@ -381,6 +381,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
void Init(const TransactionDBOptions& /* unused */); void Init(const TransactionDBOptions& /* unused */);
void WPRecordTick(uint32_t ticker_type) const {
RecordTick(db_impl_->immutable_db_options_.statistics.get(), ticker_type);
}
// A heap with the amortized O(1) complexity for erase. It uses one extra heap // A heap with the amortized O(1) complexity for erase. It uses one extra heap
// to keep track of erased entries that are not yet on top of the main heap. // to keep track of erased entries that are not yet on top of the main heap.
class PreparedHeap { class PreparedHeap {

Loading…
Cancel
Save