WritePrepared: Fix bug in searching in non-cached snapshots (#4639)

Summary:
When evicting an entry form the commit_cache, it is verified against the list of old snapshots to see if it overlaps with any. The list of old snapshots is split into two lists: an efficient concurrent cache and an slow vector protected by a lock. The patch fixes a bug that would stop the search in the cache if it finds any and yet would not include the larger snapshots in the slower list.
An extra info log entry is also removed. The condition to trigger that although very rare is still feasible and should not spam the LOG when that happens.
Fixes #4621
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4639

Differential Revision: D12934989

Pulled By: maysamyabandeh

fbshipit-source-id: 4e0fe8147ba292b554ae78e94c21c2ef31e03e2d
main
Maysam Yabandeh 6 years ago committed by Facebook Github Bot
parent fffac43cfb
commit 2b5b7bc795
  1. 1
      HISTORY.md
  2. 52
      utilities/transactions/write_prepared_transaction_test.cc
  3. 33
      utilities/transactions/write_prepared_txn_db.cc
  4. 1
      utilities/transactions/write_prepared_txn_db.h

@ -15,6 +15,7 @@
* Fix slow flush/compaction when DB contains many snapshots. The problem became noticeable to us in DBs with 100,000+ snapshots, though it will affect others at different thresholds. * Fix slow flush/compaction when DB contains many snapshots. The problem became noticeable to us in DBs with 100,000+ snapshots, though it will affect others at different thresholds.
* Fix the bug that WriteBatchWithIndex's SeekForPrev() doesn't see the entries with the same key. * Fix the bug that WriteBatchWithIndex's SeekForPrev() doesn't see the entries with the same key.
* Fix the bug where user comparator was sometimes fed with InternalKey instead of the user key. The bug manifests when during GenerateBottommostFiles. * Fix the bug where user comparator was sometimes fed with InternalKey instead of the user key. The bug manifests when during GenerateBottommostFiles.
* Fix a bug in WritePrepared txns where if the number of old snapshots goes beyond the snapshot cache size (128 default) the rest will not be checked when evicting a commit entry from the commit cache.
## 5.17.1 (10/30/2018) ## 5.17.1 (10/30/2018)
## Unreleased ## Unreleased

@ -816,6 +816,7 @@ TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l, std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l,
600l, 700l, 800l, 900l}; 600l, 700l, 800l, 900l};
const size_t snapshot_cache_bits = 2; const size_t snapshot_cache_bits = 2;
const uint64_t cache_size = 1ul << snapshot_cache_bits;
// Safety check to express the intended size in the test. Can be adjusted if // Safety check to express the intended size in the test. Can be adjusted if
// the snapshots lists changed. // the snapshots lists changed.
assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size()); assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size());
@ -843,6 +844,57 @@ TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
commit_entry.prep_seq <= snapshots.back(); commit_entry.prep_seq <= snapshots.back();
ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_); ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_);
} }
// Test that search will include multiple snapshot from snapshot cache
{
// exclude first and last item in the cache
CommitEntry commit_entry = {snapshots.front() + 1,
snapshots[cache_size - 1] - 1};
wp_db->old_commit_map_empty_ = true; // reset
wp_db->old_commit_map_.clear();
wp_db->CheckAgainstSnapshots(commit_entry);
ASSERT_EQ(wp_db->old_commit_map_.size(), cache_size - 2);
}
// Test that search will include multiple snapshot from old snapshots
{
// include two in the middle
CommitEntry commit_entry = {snapshots[cache_size] + 1,
snapshots[cache_size + 2] + 1};
wp_db->old_commit_map_empty_ = true; // reset
wp_db->old_commit_map_.clear();
wp_db->CheckAgainstSnapshots(commit_entry);
ASSERT_EQ(wp_db->old_commit_map_.size(), 2);
}
// Test that search will include both snapshot cache and old snapshots
// Case 1: includes all in snapshot cache
{
CommitEntry commit_entry = {snapshots.front() - 1, snapshots.back() + 1};
wp_db->old_commit_map_empty_ = true; // reset
wp_db->old_commit_map_.clear();
wp_db->CheckAgainstSnapshots(commit_entry);
ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size());
}
// Case 2: includes all snapshot caches except the smallest
{
CommitEntry commit_entry = {snapshots.front() + 1, snapshots.back() + 1};
wp_db->old_commit_map_empty_ = true; // reset
wp_db->old_commit_map_.clear();
wp_db->CheckAgainstSnapshots(commit_entry);
ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - 1);
}
// Case 3: includes only the largest of snapshot cache
{
CommitEntry commit_entry = {snapshots[cache_size - 1] - 1,
snapshots.back() + 1};
wp_db->old_commit_map_empty_ = true; // reset
wp_db->old_commit_map_.clear();
wp_db->CheckAgainstSnapshots(commit_entry);
ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - cache_size + 1);
}
} }
// This test is too slow for travis // This test is too slow for travis

@ -572,14 +572,16 @@ void WritePreparedTxnDB::ReleaseSnapshotInternal(
bool need_gc = false; bool need_gc = false;
{ {
WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
snap_seq);
ReadLock rl(&old_commit_map_mutex_); ReadLock rl(&old_commit_map_mutex_);
auto prep_set_entry = old_commit_map_.find(snap_seq); auto prep_set_entry = old_commit_map_.find(snap_seq);
need_gc = prep_set_entry != old_commit_map_.end(); need_gc = prep_set_entry != old_commit_map_.end();
} }
if (need_gc) { if (need_gc) {
WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead for %" PRIu64,
snap_seq);
WriteLock wl(&old_commit_map_mutex_); WriteLock wl(&old_commit_map_mutex_);
old_commit_map_.erase(snap_seq); old_commit_map_.erase(snap_seq);
old_commit_map_empty_.store(old_commit_map_.empty(), old_commit_map_empty_.store(old_commit_map_.empty(),
@ -654,13 +656,20 @@ void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
// place before gets overwritten the reader that reads bottom-up will // place before gets overwritten the reader that reads bottom-up will
// eventully see it. // eventully see it.
const bool next_is_larger = true; const bool next_is_larger = true;
SequenceNumber snapshot_seq = kMaxSequenceNumber; // We will set to true if the border line snapshot suggests that.
bool search_larger_list = false;
size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE); size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
for (; 0 < ip1; ip1--) { for (; 0 < ip1; ip1--) {
snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire); SequenceNumber snapshot_seq =
snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
++sync_i); ++sync_i);
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i); TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
if (ip1 == SNAPSHOT_CACHE_SIZE) { // border line snapshot
// snapshot_seq < commit_seq => larger_snapshot_seq <= commit_seq
// then later also continue the search to larger snapshots
search_larger_list = snapshot_seq < evicted.commit_seq;
}
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
snapshot_seq, !next_is_larger)) { snapshot_seq, !next_is_larger)) {
break; break;
@ -675,17 +684,20 @@ void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
#endif #endif
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end"); TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end"); TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE && if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && search_larger_list)) {
snapshot_seq < evicted.prep_seq)) {
// Then access the less efficient list of snapshots_ // Then access the less efficient list of snapshots_
WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD); WPRecordTick(TXN_SNAPSHOT_MUTEX_OVERHEAD);
ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead"); ROCKS_LOG_WARN(info_log_,
"snapshots_mutex_ overhead for <%" PRIu64 ",%" PRIu64
"> with %" ROCKSDB_PRIszt " snapshots",
evicted.prep_seq, evicted.commit_seq, cnt);
ReadLock rl(&snapshots_mutex_); ReadLock rl(&snapshots_mutex_);
// Items could have moved from the snapshots_ to snapshot_cache_ before // Items could have moved from the snapshots_ to snapshot_cache_ before
// accquiring the lock. To make sure that we do not miss a valid snapshot, // accquiring the lock. To make sure that we do not miss a valid snapshot,
// read snapshot_cache_ again while holding the lock. // read snapshot_cache_ again while holding the lock.
for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) { for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire); SequenceNumber snapshot_seq =
snapshot_cache_[i].load(std::memory_order_acquire);
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq, if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
snapshot_seq, next_is_larger)) { snapshot_seq, next_is_larger)) {
break; break;
@ -713,7 +725,10 @@ bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
// then snapshot_seq < commit_seq // then snapshot_seq < commit_seq
if (prep_seq <= snapshot_seq) { // overlapping range if (prep_seq <= snapshot_seq) { // overlapping range
WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead"); ROCKS_LOG_WARN(info_log_,
"old_commit_map_mutex_ overhead for %" PRIu64
" commit entry: <%" PRIu64 ",%" PRIu64 ">",
snapshot_seq, prep_seq, commit_seq);
WriteLock wl(&old_commit_map_mutex_); WriteLock wl(&old_commit_map_mutex_);
old_commit_map_empty_.store(false, std::memory_order_release); old_commit_map_empty_.store(false, std::memory_order_release);
auto& vec = old_commit_map_[snapshot_seq]; auto& vec = old_commit_map_[snapshot_seq];

@ -222,7 +222,6 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// rare case and it is ok to pay the cost of mutex ReadLock for such old, // rare case and it is ok to pay the cost of mutex ReadLock for such old,
// reading transactions. // reading transactions.
WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD); WPRecordTick(TXN_OLD_COMMIT_MAP_MUTEX_OVERHEAD);
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
ReadLock rl(&old_commit_map_mutex_); ReadLock rl(&old_commit_map_mutex_);
auto prep_set_entry = old_commit_map_.find(snapshot_seq); auto prep_set_entry = old_commit_map_.find(snapshot_seq);
bool found = prep_set_entry != old_commit_map_.end(); bool found = prep_set_entry != old_commit_map_.end();

Loading…
Cancel
Save