From 5fc57eec2b44337289f25c1b5687beb54ad709a2 Mon Sep 17 00:00:00 2001 From: Hui Xiao Date: Mon, 15 May 2023 15:34:22 -0700 Subject: [PATCH] Support parallel read and write/delete to same key in NonBatchedOpsStressTest (#11058) Summary: **Context:** Current `NonBatchedOpsStressTest` does not allow multi-thread read (i.e, Get, Iterator) and write (i.e, Put, Merge) or delete to the same key. Every read or write/delete operation will acquire lock (`GetLocksForKeyRange`) on the target key to gain exclusive access to it. This does not align with RocksDB's nature of allowing multi-thread read and write/delete to the same key, that is concurrent threads can issue read/write/delete to RocksDB without external locking. Therefore this is a gap in our testing coverage. To close the gap, biggest challenge remains in verifying db value against expected state in presence of parallel read and write/delete. The challenge is due to read/write/delete to the db and read/write to expected state is not within one atomic operation. Therefore we may not know the exact expected state of a certain db read, as by the time we read the expected state for that db read, another write to expected state for another db write to the same key might have changed the expected state. **Summary:** Credited to ajkr's idea, we now solve this challenge by breaking the 32-bits expected value of a key into different parts that can be read and write to in parallel. Basically we divide the 32-bits expected value into `value_base` (corresponding to the previous whole 32 bits but now with some shrinking in the value base range we allow), `pending_write` (i.e, whether there is an ongoing concurrent write), `del_counter` (i.e, number of times a value has been deleted, analogous to value_base for write), `pending_delete` (similar to pending_write) and `deleted` (i.e whether a key is deleted). Also, we need to use incremental `value_base` instead of random value base as before because we want to control the range of value base a correct db read result can possibly be in presence of parallel read and write. In that way, we can verify the correctness of the read against expected state more easily. This is at the cost of reducing the randomness of the value generated in NonBatchedOpsStressTest we are willing to accept. (For detailed algorithm of how to use these parts to infer expected state of a key, see the PR) Misc: hide value_base detail from callers of ExpectedState by abstracting related logics into ExpectedValue class Pull Request resolved: https://github.com/facebook/rocksdb/pull/11058 Test Plan: - Manual test of small number of keys (i.e, high chances of parallel read and write/delete to same key) with equally distributed read/write/deleted for 30 min ``` python3 tools/db_crashtest.py --simple {blackbox|whitebox} --sync_fault_injection=1 --skip_verifydb=0 --continuous_verification_interval=1000 --clear_column_family_one_in=0 --max_key=10 --column_families=1 --threads=32 --readpercent=25 --writepercent=25 --nooverwritepercent=0 --iterpercent=25 --verify_iterator_with_expected_state_one_in=1 --num_iterations=5 --delpercent=15 --delrangepercent=10 --range_deletion_width=5 --use_merge={0|1} --use_put_entity_one_in=0 --use_txn=0 --verify_before_write=0 --user_timestamp_size=0 --compact_files_one_in=1000 --compact_range_one_in=1000 --flush_one_in=1000 --get_property_one_in=1000 --ingest_external_file_one_in=100 --backup_one_in=100 --checkpoint_one_in=100 --approximate_size_one_in=0 --acquire_snapshot_one_in=100 --use_multiget=0 --prefixpercent=0 --get_live_files_one_in=1000 --manual_wal_flush_one_in=1000 --pause_background_one_in=1000 --target_file_size_base=524288 --write_buffer_size=524288 --verify_checksum_one_in=1000 --verify_db_one_in=1000 ``` - Rehearsal stress test for normal parameter and aggressive parameter to see if such change can find what existing stress test can find (i.e, no regression in testing capability) - [Ongoing]Try to find new bugs with this change that are not found by current NonBatchedOpsStressTest with no parallel read and write/delete to same key Reviewed By: ajkr Differential Revision: D42257258 Pulled By: hx235 fbshipit-source-id: e6fdc18f1fad3753e5ac91731483a644d9b5b6eb --- db_stress_tool/batched_ops_stress.cc | 3 +- db_stress_tool/db_stress_shared_state.h | 76 +++--- db_stress_tool/db_stress_test_base.cc | 10 +- db_stress_tool/expected_state.cc | 242 ++++++++++++++---- db_stress_tool/expected_state.h | 311 ++++++++++++++++++++---- db_stress_tool/no_batched_ops_stress.cc | 263 +++++++++++++------- 6 files changed, 674 insertions(+), 231 deletions(-) diff --git a/db_stress_tool/batched_ops_stress.cc b/db_stress_tool/batched_ops_stress.cc index 0317611e0..0872f2842 100644 --- a/db_stress_tool/batched_ops_stress.cc +++ b/db_stress_tool/batched_ops_stress.cc @@ -31,8 +31,7 @@ class BatchedOpsStressTest : public StressTest { const std::string key_body = Key(rand_keys[0]); - const uint32_t value_base = - thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL; + const uint32_t value_base = thread->rand.Next(); const size_t sz = GenerateValue(value_base, value, sizeof(value)); const std::string value_body = Slice(value, sz).ToString(); diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index 5565c6221..0137f0b2e 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -43,12 +43,6 @@ class StressTest; // State shared by all concurrent executions of the same benchmark. class SharedState { public: - // indicates a key may have any value (or not be present) as an operation on - // it is incomplete. - static constexpr uint32_t UNKNOWN_SENTINEL = 0xfffffffe; - // indicates a key should definitely be deleted - static constexpr uint32_t DELETION_SENTINEL = 0xffffffff; - // Errors when reading filter blocks are ignored, so we use a thread // local variable updated via sync points to keep track of errors injected // while reading filter blocks in order to ignore the Get/MultiGet result @@ -254,54 +248,70 @@ class SharedState { return expected_state_manager_->ClearColumnFamily(cf); } - // @param pending True if the update may have started but is not yet - // guaranteed finished. This is useful for crash-recovery testing when the - // process may crash before updating the expected values array. + // Prepare a Put that will be started but not finish yet + // This is useful for crash-recovery testing when the process may crash + // before updating the corresponding expected value // - // Requires external locking covering `key` in `cf`. - void Put(int cf, int64_t key, uint32_t value_base, bool pending) { - return expected_state_manager_->Put(cf, key, value_base, pending); + // Requires external locking covering `key` in `cf` to prevent concurrent + // write or delete to the same `key`. + PendingExpectedValue PreparePut(int cf, int64_t key) { + return expected_state_manager_->PreparePut(cf, key); } - // Requires external locking covering `key` in `cf`. - uint32_t Get(int cf, int64_t key) const { + // Does not requires external locking. + ExpectedValue Get(int cf, int64_t key) { return expected_state_manager_->Get(cf, key); } - // @param pending See comment above Put() - // Returns true if the key was not yet deleted. + // Prepare a Delete that will be started but not finish yet + // This is useful for crash-recovery testing when the process may crash + // before updating the corresponding expected value // - // Requires external locking covering `key` in `cf`. - bool Delete(int cf, int64_t key, bool pending) { - return expected_state_manager_->Delete(cf, key, pending); + // Requires external locking covering `key` in `cf` to prevent concurrent + // write or delete to the same `key`. + PendingExpectedValue PrepareDelete(int cf, int64_t key) { + return expected_state_manager_->PrepareDelete(cf, key); } - // @param pending See comment above Put() - // Returns true if the key was not yet deleted. - // - // Requires external locking covering `key` in `cf`. - bool SingleDelete(int cf, int64_t key, bool pending) { - return expected_state_manager_->Delete(cf, key, pending); + // Requires external locking covering `key` in `cf` to prevent concurrent + // write or delete to the same `key`. + PendingExpectedValue PrepareSingleDelete(int cf, int64_t key) { + return expected_state_manager_->PrepareSingleDelete(cf, key); } - // @param pending See comment above Put() - // Returns number of keys deleted by the call. - // - // Requires external locking covering keys in `[begin_key, end_key)` in `cf`. - int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) { - return expected_state_manager_->DeleteRange(cf, begin_key, end_key, - pending); + // Requires external locking covering keys in `[begin_key, end_key)` in `cf` + // to prevent concurrent write or delete to the same `key`. + std::vector PrepareDeleteRange(int cf, + int64_t begin_key, + int64_t end_key) { + return expected_state_manager_->PrepareDeleteRange(cf, begin_key, end_key); } bool AllowsOverwrite(int64_t key) const { return no_overwrite_ids_.find(key) == no_overwrite_ids_.end(); } - // Requires external locking covering `key` in `cf`. + // Requires external locking covering `key` in `cf` to prevent concurrent + // delete to the same `key`. bool Exists(int cf, int64_t key) { return expected_state_manager_->Exists(cf, key); } + // Sync the `value_base` to the corresponding expected value + void SyncPut(int cf, int64_t key, uint32_t value_base) { + return expected_state_manager_->SyncPut(cf, key, value_base); + } + + // Sync the corresponding expected value to be pending Put + void SyncPendingPut(int cf, int64_t key) { + return expected_state_manager_->SyncPendingPut(cf, key); + } + + // Sync the corresponding expected value to be deleted + void SyncDelete(int cf, int64_t key) { + return expected_state_manager_->SyncDelete(cf, key); + } + uint32_t GetSeed() const { return seed_; } void SetShouldStopBgThread() { should_stop_bg_thread_ = true; } diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 710c7687b..e05b47173 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -483,12 +483,13 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, for (int64_t k = 0; k != number_of_keys; ++k) { const std::string key = Key(k); - constexpr uint32_t value_base = 0; + PendingExpectedValue pending_expected_value = + shared->PreparePut(cf_idx, k); + const uint32_t value_base = pending_expected_value.GetFinalValueBase(); const size_t sz = GenerateValue(value_base, value, sizeof(value)); const Slice v(value, sz); - shared->Put(cf_idx, k, value_base, true /* pending */); std::string ts; if (FLAGS_user_timestamp_size > 0) { @@ -534,7 +535,7 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, } } - shared->Put(cf_idx, k, value_base, false /* pending */); + pending_expected_value.Commit(); if (!s.ok()) { break; } @@ -614,8 +615,7 @@ void StressTest::ProcessRecoveredPreparedTxnsHelper(Transaction* txn, for (wbwi_iter->SeekToFirst(); wbwi_iter->Valid(); wbwi_iter->Next()) { uint64_t key_val; if (GetIntVal(wbwi_iter->Entry().key.ToString(), &key_val)) { - shared->Put(static_cast(i) /* cf_idx */, key_val, - 0 /* value_base */, true /* pending */); + shared->SyncPendingPut(static_cast(i) /* cf_idx */, key_val); } } } diff --git a/db_stress_tool/expected_state.cc b/db_stress_tool/expected_state.cc index 0d921c712..d1b8c57d0 100644 --- a/db_stress_tool/expected_state.cc +++ b/db_stress_tool/expected_state.cc @@ -3,17 +3,126 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include #ifdef GFLAGS -#include "db_stress_tool/expected_state.h" #include "db/wide/wide_column_serialization.h" #include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_shared_state.h" +#include "db_stress_tool/expected_state.h" #include "rocksdb/trace_reader_writer.h" #include "rocksdb/trace_record_result.h" namespace ROCKSDB_NAMESPACE { +void ExpectedValue::Put(bool pending) { + if (pending) { + SetPendingWrite(); + } else { + SetValueBase(NextValueBase()); + ClearDeleted(); + ClearPendingWrite(); + } +} + +bool ExpectedValue::Delete(bool pending) { + if (!Exists()) { + return false; + } + if (pending) { + SetPendingDel(); + } else { + SetDelCounter(NextDelCounter()); + SetDeleted(); + ClearPendingDel(); + } + return true; +} + +void ExpectedValue::SyncPut(uint32_t value_base) { + assert(ExpectedValue::IsValueBaseValid(value_base)); + + SetValueBase(value_base); + ClearDeleted(); + ClearPendingWrite(); + + // This is needed in case crash happens during a pending delete of the key + // assocated with this expected value + ClearPendingDel(); +} + +void ExpectedValue::SyncPendingPut() { Put(true /* pending */); } + +void ExpectedValue::SyncDelete() { + Delete(false /* pending */); + // This is needed in case crash happens during a pending write of the key + // assocated with this expected value + ClearPendingWrite(); +} + +uint32_t ExpectedValue::GetFinalValueBase() const { + return PendingWrite() ? NextValueBase() : GetValueBase(); +} + +uint32_t ExpectedValue::GetFinalDelCounter() const { + return PendingDelete() ? NextDelCounter() : GetDelCounter(); +} + +bool ExpectedValueHelper::MustHaveNotExisted( + ExpectedValue pre_read_expected_value, + ExpectedValue post_read_expected_value) { + const bool pre_read_expected_deleted = pre_read_expected_value.IsDeleted(); + + const uint32_t pre_read_expected_value_base = + pre_read_expected_value.GetValueBase(); + + const uint32_t post_read_expected_final_value_base = + post_read_expected_value.GetFinalValueBase(); + + const bool during_read_no_write_happened = + (pre_read_expected_value_base == post_read_expected_final_value_base); + return pre_read_expected_deleted && during_read_no_write_happened; +} + +bool ExpectedValueHelper::MustHaveExisted( + ExpectedValue pre_read_expected_value, + ExpectedValue post_read_expected_value) { + const bool pre_read_expected_not_deleted = + !pre_read_expected_value.IsDeleted(); + + const uint32_t pre_read_expected_del_counter = + pre_read_expected_value.GetDelCounter(); + const uint32_t post_read_expected_final_del_counter = + post_read_expected_value.GetFinalDelCounter(); + + const bool during_read_no_delete_happened = + (pre_read_expected_del_counter == post_read_expected_final_del_counter); + + return pre_read_expected_not_deleted && during_read_no_delete_happened; +} + +bool ExpectedValueHelper::InExpectedValueBaseRange( + uint32_t value_base, ExpectedValue pre_read_expected_value, + ExpectedValue post_read_expected_value) { + assert(ExpectedValue::IsValueBaseValid(value_base)); + + const uint32_t pre_read_expected_value_base = + pre_read_expected_value.GetValueBase(); + const uint32_t post_read_expected_final_value_base = + post_read_expected_value.GetFinalValueBase(); + + if (pre_read_expected_value_base <= post_read_expected_final_value_base) { + const uint32_t lower_bound = pre_read_expected_value_base; + const uint32_t upper_bound = post_read_expected_final_value_base; + return lower_bound <= value_base && value_base <= upper_bound; + } else { + const uint32_t upper_bound_1 = post_read_expected_final_value_base; + const uint32_t lower_bound_2 = pre_read_expected_value_base; + const uint32_t upper_bound_2 = ExpectedValue::GetValueBaseMask(); + return (value_base <= upper_bound_1) || + (lower_bound_2 <= value_base && value_base <= upper_bound_2); + } +} ExpectedState::ExpectedState(size_t max_key, size_t num_column_families) : max_key_(max_key), @@ -21,70 +130,107 @@ ExpectedState::ExpectedState(size_t max_key, size_t num_column_families) values_(nullptr) {} void ExpectedState::ClearColumnFamily(int cf) { - std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */), - SharedState::DELETION_SENTINEL); + const uint32_t del_mask = ExpectedValue::GetDelMask(); + std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */), del_mask); } -void ExpectedState::Put(int cf, int64_t key, uint32_t value_base, - bool pending) { - if (!pending) { - // prevent expected-value update from reordering before Write - std::atomic_thread_fence(std::memory_order_release); - } - Value(cf, key).store(pending ? SharedState::UNKNOWN_SENTINEL : value_base, - std::memory_order_relaxed); - if (pending) { - // prevent Write from reordering before expected-value update - std::atomic_thread_fence(std::memory_order_release); - } +void ExpectedState::Precommit(int cf, int64_t key, const ExpectedValue& value) { + Value(cf, key).store(value.Read()); + // To prevent low-level instruction reordering that results + // in db write happens before setting pending state in expected value + std::atomic_thread_fence(std::memory_order_release); } -uint32_t ExpectedState::Get(int cf, int64_t key) const { - return Value(cf, key); +PendingExpectedValue ExpectedState::PreparePut(int cf, int64_t key) { + ExpectedValue expected_value = Load(cf, key); + const ExpectedValue orig_expected_value = expected_value; + expected_value.Put(true /* pending */); + const ExpectedValue pending_expected_value = expected_value; + expected_value.Put(false /* pending */); + const ExpectedValue final_expected_value = expected_value; + Precommit(cf, key, pending_expected_value); + return PendingExpectedValue(&Value(cf, key), orig_expected_value, + final_expected_value); } -bool ExpectedState::Delete(int cf, int64_t key, bool pending) { - if (Value(cf, key) == SharedState::DELETION_SENTINEL) { - return false; - } - Put(cf, key, SharedState::DELETION_SENTINEL, pending); - return true; +ExpectedValue ExpectedState::Get(int cf, int64_t key) { return Load(cf, key); } + +PendingExpectedValue ExpectedState::PrepareDelete(int cf, int64_t key, + bool* prepared) { + ExpectedValue expected_value = Load(cf, key); + const ExpectedValue orig_expected_value = expected_value; + bool res = expected_value.Delete(true /* pending */); + if (prepared) { + *prepared = res; + } + if (!res) { + return PendingExpectedValue(&Value(cf, key), orig_expected_value, + orig_expected_value); + } + const ExpectedValue pending_expected_value = expected_value; + expected_value.Delete(false /* pending */); + const ExpectedValue final_expected_value = expected_value; + Precommit(cf, key, pending_expected_value); + return PendingExpectedValue(&Value(cf, key), orig_expected_value, + final_expected_value); } -bool ExpectedState::SingleDelete(int cf, int64_t key, bool pending) { - return Delete(cf, key, pending); +PendingExpectedValue ExpectedState::PrepareSingleDelete(int cf, int64_t key) { + return PrepareDelete(cf, key); } -int ExpectedState::DeleteRange(int cf, int64_t begin_key, int64_t end_key, - bool pending) { - int covered = 0; +std::vector ExpectedState::PrepareDeleteRange( + int cf, int64_t begin_key, int64_t end_key) { + std::vector pending_expected_values; for (int64_t key = begin_key; key < end_key; ++key) { - if (Delete(cf, key, pending)) { - ++covered; + bool prepared = false; + PendingExpectedValue pending_expected_value = + PrepareDelete(cf, key, &prepared); + if (prepared) { + pending_expected_values.push_back(pending_expected_value); } } - return covered; + return pending_expected_values; } bool ExpectedState::Exists(int cf, int64_t key) { - // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite - // is disallowed can't be accidentally added a second time, in which case - // SingleDelete wouldn't be able to properly delete the key. It does allow - // the case where a SingleDelete might be added which covers nothing, but - // that's not a correctness issue. - uint32_t expected_value = Value(cf, key).load(); - return expected_value != SharedState::DELETION_SENTINEL; + return Load(cf, key).Exists(); } void ExpectedState::Reset() { + const uint32_t del_mask = ExpectedValue::GetDelMask(); for (size_t i = 0; i < num_column_families_; ++i) { for (size_t j = 0; j < max_key_; ++j) { - Value(static_cast(i), j) - .store(SharedState::DELETION_SENTINEL, std::memory_order_relaxed); + Value(static_cast(i), j).store(del_mask, std::memory_order_relaxed); } } } +void ExpectedState::SyncPut(int cf, int64_t key, uint32_t value_base) { + ExpectedValue expected_value = Load(cf, key); + expected_value.SyncPut(value_base); + Value(cf, key).store(expected_value.Read()); +} + +void ExpectedState::SyncPendingPut(int cf, int64_t key) { + ExpectedValue expected_value = Load(cf, key); + expected_value.SyncPendingPut(); + Value(cf, key).store(expected_value.Read()); +} + +void ExpectedState::SyncDelete(int cf, int64_t key) { + ExpectedValue expected_value = Load(cf, key); + expected_value.SyncDelete(); + Value(cf, key).store(expected_value.Read()); +} + +void ExpectedState::SyncDeleteRange(int cf, int64_t begin_key, + int64_t end_key) { + for (int64_t key = begin_key; key < end_key; ++key) { + SyncDelete(cf, key); + } +} + FileExpectedState::FileExpectedState(std::string expected_state_file_path, size_t max_key, size_t num_column_families) : ExpectedState(max_key, num_column_families), @@ -385,7 +531,7 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, if (!GetIntVal(key.ToString(), &key_id)) { return Status::Corruption("unable to parse key", key.ToString()); } - uint32_t value_id = GetValueBase(value); + uint32_t value_base = GetValueBase(value); bool should_buffer_write = !(buffered_writes_ == nullptr); if (should_buffer_write) { @@ -393,8 +539,7 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, key, value); } - state_->Put(column_family_id, static_cast(key_id), value_id, - false /* pending */); + state_->SyncPut(column_family_id, static_cast(key_id), value_base); ++num_write_ops_; return Status::OK(); } @@ -431,8 +576,7 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, const uint32_t value_base = GetValueBase(columns.front().value()); - state_->Put(column_family_id, static_cast(key_id), value_base, - false /* pending */); + state_->SyncPut(column_family_id, static_cast(key_id), value_base); ++num_write_ops_; @@ -454,8 +598,7 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, column_family_id, key); } - state_->Delete(column_family_id, static_cast(key_id), - false /* pending */); + state_->SyncDelete(column_family_id, static_cast(key_id)); ++num_write_ops_; return Status::OK(); } @@ -499,8 +642,9 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, buffered_writes_.get(), column_family_id, begin_key, end_key); } - state_->DeleteRange(column_family_id, static_cast(begin_key_id), - static_cast(end_key_id), false /* pending */); + state_->SyncDeleteRange(column_family_id, + static_cast(begin_key_id), + static_cast(end_key_id)); ++num_write_ops_; return Status::OK(); } diff --git a/db_stress_tool/expected_state.h b/db_stress_tool/expected_state.h index 41d747e76..aa78941c3 100644 --- a/db_stress_tool/expected_state.h +++ b/db_stress_tool/expected_state.h @@ -22,6 +22,174 @@ #include "util/string_util.h" namespace ROCKSDB_NAMESPACE { +// This class is not thread-safe. +class ExpectedValue { + public: + static uint32_t GetValueBaseMask() { return VALUE_BASE_MASK; } + static uint32_t GetValueBaseDelta() { return VALUE_BASE_DELTA; } + static uint32_t GetDelCounterDelta() { return DEL_COUNTER_DELTA; } + static uint32_t GetDelMask() { return DEL_MASK; } + static bool IsValueBaseValid(uint32_t value_base) { + return IsValuePartValid(value_base, VALUE_BASE_MASK); + } + + explicit ExpectedValue(uint32_t expected_value) + : expected_value_(expected_value) {} + + bool Exists() const { return PendingWrite() || !IsDeleted(); } + + uint32_t Read() const { return expected_value_; } + + void Put(bool pending); + + bool Delete(bool pending); + + void SyncPut(uint32_t value_base); + + void SyncPendingPut(); + + void SyncDelete(); + + uint32_t GetValueBase() const { return GetValuePart(VALUE_BASE_MASK); } + + uint32_t NextValueBase() const { + return GetIncrementedValuePart(VALUE_BASE_MASK, VALUE_BASE_DELTA); + } + + void SetValueBase(uint32_t new_value_base) { + SetValuePart(VALUE_BASE_MASK, new_value_base); + } + + bool PendingWrite() const { + const uint32_t pending_write = GetValuePart(PENDING_WRITE_MASK); + return pending_write != 0; + } + + void SetPendingWrite() { + SetValuePart(PENDING_WRITE_MASK, PENDING_WRITE_MASK); + } + + void ClearPendingWrite() { ClearValuePart(PENDING_WRITE_MASK); } + + uint32_t GetDelCounter() const { return GetValuePart(DEL_COUNTER_MASK); } + + uint32_t NextDelCounter() const { + return GetIncrementedValuePart(DEL_COUNTER_MASK, DEL_COUNTER_DELTA); + } + + void SetDelCounter(uint32_t new_del_counter) { + SetValuePart(DEL_COUNTER_MASK, new_del_counter); + } + + bool PendingDelete() const { + const uint32_t pending_del = GetValuePart(PENDING_DEL_MASK); + return pending_del != 0; + } + + void SetPendingDel() { SetValuePart(PENDING_DEL_MASK, PENDING_DEL_MASK); } + + void ClearPendingDel() { ClearValuePart(PENDING_DEL_MASK); } + + bool IsDeleted() const { + const uint32_t deleted = GetValuePart(DEL_MASK); + return deleted != 0; + } + + void SetDeleted() { SetValuePart(DEL_MASK, DEL_MASK); } + + void ClearDeleted() { ClearValuePart(DEL_MASK); } + + uint32_t GetFinalValueBase() const; + + uint32_t GetFinalDelCounter() const; + + private: + static bool IsValuePartValid(uint32_t value_part, uint32_t value_part_mask) { + return (value_part & (~value_part_mask)) == 0; + } + + // The 32-bit expected_value_ is divided into following parts: + // Bit 0 - 14: value base + static constexpr uint32_t VALUE_BASE_MASK = 0x7fff; + static constexpr uint32_t VALUE_BASE_DELTA = 1; + // Bit 15: whether write to this value base is pending (0 equals `false`) + static constexpr uint32_t PENDING_WRITE_MASK = (uint32_t)1 << 15; + // Bit 16 - 29: deletion counter (i.e, number of times this value base has + // been deleted) + static constexpr uint32_t DEL_COUNTER_MASK = 0x3fff0000; + static constexpr uint32_t DEL_COUNTER_DELTA = (uint32_t)1 << 16; + // Bit 30: whether deletion of this value base is pending (0 equals `false`) + static constexpr uint32_t PENDING_DEL_MASK = (uint32_t)1 << 30; + // Bit 31: whether this value base is deleted (0 equals `false`) + static constexpr uint32_t DEL_MASK = (uint32_t)1 << 31; + + uint32_t GetValuePart(uint32_t value_part_mask) const { + return expected_value_ & value_part_mask; + } + + uint32_t GetIncrementedValuePart(uint32_t value_part_mask, + uint32_t value_part_delta) const { + uint32_t current_value_part = GetValuePart(value_part_mask); + ExpectedValue temp_expected_value(current_value_part + value_part_delta); + return temp_expected_value.GetValuePart(value_part_mask); + } + + void SetValuePart(uint32_t value_part_mask, uint32_t new_value_part) { + assert(IsValuePartValid(new_value_part, value_part_mask)); + ClearValuePart(value_part_mask); + expected_value_ |= new_value_part; + } + + void ClearValuePart(uint32_t value_part_mask) { + expected_value_ &= (~value_part_mask); + } + + uint32_t expected_value_; +}; + +class PendingExpectedValue { + public: + explicit PendingExpectedValue(std::atomic* value_ptr, + ExpectedValue orig_value, + ExpectedValue final_value) + : value_ptr_(value_ptr), + orig_value_(orig_value), + final_value_(final_value) {} + + void Commit() { + // To prevent low-level instruction reordering that results + // in setting expected value happens before db write + std::atomic_thread_fence(std::memory_order_release); + value_ptr_->store(final_value_.Read()); + } + + uint32_t GetFinalValueBase() { return final_value_.GetValueBase(); } + + private: + std::atomic* const value_ptr_; + const ExpectedValue orig_value_; + const ExpectedValue final_value_; +}; + +class ExpectedValueHelper { + public: + // Return whether value is expected not to exist from begining till the end + // of the read based on `pre_read_expected_value` and + // `pre_read_expected_value`. + static bool MustHaveNotExisted(ExpectedValue pre_read_expected_value, + ExpectedValue post_read_expected_value); + + // Return whether value is expected to exist from begining till the end of + // the read based on `pre_read_expected_value` and + // `pre_read_expected_value`. + static bool MustHaveExisted(ExpectedValue pre_read_expected_value, + ExpectedValue post_read_expected_value); + + // Return whether the `value_base` falls within the expected value base + static bool InExpectedValueBaseRange(uint32_t value_base, + ExpectedValue pre_read_expected_value, + ExpectedValue post_read_expected_value); +}; // An `ExpectedState` provides read/write access to expected values for every // key. @@ -38,43 +206,79 @@ class ExpectedState { // Requires external locking covering all keys in `cf`. void ClearColumnFamily(int cf); - // @param pending True if the update may have started but is not yet - // guaranteed finished. This is useful for crash-recovery testing when the - // process may crash before updating the expected values array. + // Prepare a Put that will be started but not finished yet + // This is useful for crash-recovery testing when the process may crash + // before updating the corresponding expected value // - // Requires external locking covering `key` in `cf`. - void Put(int cf, int64_t key, uint32_t value_base, bool pending); + // Requires external locking covering `key` in `cf` to prevent concurrent + // write or delete to the same `key`. + PendingExpectedValue PreparePut(int cf, int64_t key); - // Requires external locking covering `key` in `cf`. - uint32_t Get(int cf, int64_t key) const; + // Does not requires external locking. + ExpectedValue Get(int cf, int64_t key); - // @param pending See comment above Put() - // Returns true if the key was not yet deleted. + // Prepare a Delete that will be started but not finished yet + // This is useful for crash-recovery testing when the process may crash + // before updating the corresponding expected value // - // Requires external locking covering `key` in `cf`. - bool Delete(int cf, int64_t key, bool pending); + // Requires external locking covering `key` in `cf` to prevent concurrent + // write or delete to the same `key`. + PendingExpectedValue PrepareDelete(int cf, int64_t key, + bool* prepared = nullptr); + + // Requires external locking covering `key` in `cf` to prevent concurrent + // write or delete to the same `key`. + PendingExpectedValue PrepareSingleDelete(int cf, int64_t key); + + // Requires external locking covering keys in `[begin_key, end_key)` in `cf` + // to prevent concurrent write or delete to the same `key`. + std::vector PrepareDeleteRange(int cf, + int64_t begin_key, + int64_t end_key); + + // Update the expected value for start of an incomplete write or delete + // operation on the key assoicated with this expected value + void Precommit(int cf, int64_t key, const ExpectedValue& value); + + // Requires external locking covering `key` in `cf` to prevent concurrent + // delete to the same `key`. + bool Exists(int cf, int64_t key); - // @param pending See comment above Put() - // Returns true if the key was not yet deleted. + // Sync the `value_base` to the corresponding expected value // - // Requires external locking covering `key` in `cf`. - bool SingleDelete(int cf, int64_t key, bool pending); + // Requires external locking covering `key` in `cf` or be in single thread + // to prevent concurrent write or delete to the same `key` + void SyncPut(int cf, int64_t key, uint32_t value_base); - // @param pending See comment above Put() - // Returns number of keys deleted by the call. + // Sync the corresponding expected value to be pending Put // - // Requires external locking covering keys in `[begin_key, end_key)` in `cf`. - int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending); + // Requires external locking covering `key` in `cf` or be in single thread + // to prevent concurrent write or delete to the same `key` + void SyncPendingPut(int cf, int64_t key); - // Requires external locking covering `key` in `cf`. - bool Exists(int cf, int64_t key); + // Sync the corresponding expected value to be deleted + // + // Requires external locking covering `key` in `cf` or be in single thread + // to prevent concurrent write or delete to the same `key` + void SyncDelete(int cf, int64_t key); + + // Sync the corresponding expected values to be deleted + // + // Requires external locking covering keys in `[begin_key, end_key)` in `cf` + // to prevent concurrent write or delete to the same `key` + void SyncDeleteRange(int cf, int64_t begin_key, int64_t end_key); private: - // Requires external locking covering `key` in `cf`. + // Does not requires external locking. std::atomic& Value(int cf, int64_t key) const { return values_[cf * max_key_ + key]; } + // Does not requires external locking + ExpectedValue Load(int cf, int64_t key) const { + return ExpectedValue(Value(cf, key).load()); + } + const size_t max_key_; const size_t num_column_families_; @@ -160,45 +364,52 @@ class ExpectedStateManager { // Requires external locking covering all keys in `cf`. void ClearColumnFamily(int cf) { return latest_->ClearColumnFamily(cf); } - // @param pending True if the update may have started but is not yet - // guaranteed finished. This is useful for crash-recovery testing when the - // process may crash before updating the expected values array. - // - // Requires external locking covering `key` in `cf`. - void Put(int cf, int64_t key, uint32_t value_base, bool pending) { - return latest_->Put(cf, key, value_base, pending); + // See ExpectedState::PreparePut() + PendingExpectedValue PreparePut(int cf, int64_t key) { + return latest_->PreparePut(cf, key); } - // Requires external locking covering `key` in `cf`. - uint32_t Get(int cf, int64_t key) const { return latest_->Get(cf, key); } + // See ExpectedState::Get() + ExpectedValue Get(int cf, int64_t key) { return latest_->Get(cf, key); } - // @param pending See comment above Put() - // Returns true if the key was not yet deleted. - // - // Requires external locking covering `key` in `cf`. - bool Delete(int cf, int64_t key, bool pending) { - return latest_->Delete(cf, key, pending); + // See ExpectedState::PrepareDelete() + PendingExpectedValue PrepareDelete(int cf, int64_t key) { + return latest_->PrepareDelete(cf, key); } - // @param pending See comment above Put() - // Returns true if the key was not yet deleted. - // - // Requires external locking covering `key` in `cf`. - bool SingleDelete(int cf, int64_t key, bool pending) { - return latest_->SingleDelete(cf, key, pending); + // See ExpectedState::PrepareSingleDelete() + PendingExpectedValue PrepareSingleDelete(int cf, int64_t key) { + return latest_->PrepareSingleDelete(cf, key); } - // @param pending See comment above Put() - // Returns number of keys deleted by the call. - // - // Requires external locking covering keys in `[begin_key, end_key)` in `cf`. - int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) { - return latest_->DeleteRange(cf, begin_key, end_key, pending); + // See ExpectedState::PrepareDeleteRange() + std::vector PrepareDeleteRange(int cf, + int64_t begin_key, + int64_t end_key) { + return latest_->PrepareDeleteRange(cf, begin_key, end_key); } - // Requires external locking covering `key` in `cf`. + // See ExpectedState::Exists() bool Exists(int cf, int64_t key) { return latest_->Exists(cf, key); } + // See ExpectedState::SyncPut() + void SyncPut(int cf, int64_t key, uint32_t value_base) { + return latest_->SyncPut(cf, key, value_base); + } + + // See ExpectedState::SyncPendingPut() + void SyncPendingPut(int cf, int64_t key) { + return latest_->SyncPendingPut(cf, key); + } + + // See ExpectedState::SyncDelete() + void SyncDelete(int cf, int64_t key) { return latest_->SyncDelete(cf, key); } + + // See ExpectedState::SyncDeleteRange() + void SyncDeleteRange(int cf, int64_t begin_key, int64_t end_key) { + return latest_->SyncDeleteRange(cf, begin_key, end_key); + } + protected: const size_t max_key_; const size_t num_column_families_; diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 8d97d4af4..966558243 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -7,6 +7,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "db_stress_tool/expected_state.h" #ifdef GFLAGS #include "db_stress_tool/db_stress_common.h" #include "rocksdb/utilities/transaction_db.h" @@ -121,8 +122,7 @@ class NonBatchedOpsStressTest : public StressTest { } VerifyOrSyncValue(static_cast(cf), i, options, shared, from_db, - /* msg_prefix */ "Iterator verification", s, - /* strict */ true); + /* msg_prefix */ "Iterator verification", s); if (!from_db.empty()) { PrintKeyValue(static_cast(cf), static_cast(i), @@ -141,8 +141,7 @@ class NonBatchedOpsStressTest : public StressTest { Status s = db_->Get(options, column_families_[cf], key, &from_db); VerifyOrSyncValue(static_cast(cf), i, options, shared, from_db, - /* msg_prefix */ "Get verification", s, - /* strict */ true); + /* msg_prefix */ "Get verification", s); if (!from_db.empty()) { PrintKeyValue(static_cast(cf), static_cast(i), @@ -178,8 +177,7 @@ class NonBatchedOpsStressTest : public StressTest { } VerifyOrSyncValue(static_cast(cf), i, options, shared, from_db, - /* msg_prefix */ "GetEntity verification", s, - /* strict */ true); + /* msg_prefix */ "GetEntity verification", s); if (!from_db.empty()) { PrintKeyValue(static_cast(cf), static_cast(i), @@ -214,7 +212,7 @@ class NonBatchedOpsStressTest : public StressTest { VerifyOrSyncValue(static_cast(cf), i + j, options, shared, from_db, /* msg_prefix */ "MultiGet verification", - statuses[j], /* strict */ true); + statuses[j]); if (!from_db.empty()) { PrintKeyValue(static_cast(cf), static_cast(i + j), @@ -264,10 +262,9 @@ class NonBatchedOpsStressTest : public StressTest { } } - VerifyOrSyncValue(static_cast(cf), i + j, options, shared, - from_db, - /* msg_prefix */ "MultiGetEntity verification", - statuses[j], /* strict */ true); + VerifyOrSyncValue( + static_cast(cf), i + j, options, shared, from_db, + /* msg_prefix */ "MultiGetEntity verification", statuses[j]); if (!from_db.empty()) { PrintKeyValue(static_cast(cf), static_cast(i + j), @@ -319,8 +316,8 @@ class NonBatchedOpsStressTest : public StressTest { } VerifyOrSyncValue(static_cast(cf), i, options, shared, from_db, - /* msg_prefix */ "GetMergeOperands verification", s, - /* strict */ true); + /* msg_prefix */ "GetMergeOperands verification", + s); if (!from_db.empty()) { PrintKeyValue(static_cast(cf), static_cast(i), @@ -479,9 +476,6 @@ class NonBatchedOpsStressTest : public StressTest { SharedState::ignore_read_error = false; } - std::unique_ptr lock(new MutexLock( - thread->shared->GetMutexForKey(rand_column_families[0], rand_keys[0]))); - ReadOptions read_opts_copy = read_opts; std::string read_ts_str; Slice read_ts_slice; @@ -493,7 +487,11 @@ class NonBatchedOpsStressTest : public StressTest { bool read_older_ts = MaybeUseOlderTimestampForPointLookup( thread, read_ts_str, read_ts_slice, read_opts_copy); + const ExpectedValue pre_read_expected_value = + thread->shared->Get(rand_column_families[0], rand_keys[0]); Status s = db_->Get(read_opts_copy, cfh, key, &from_db); + const ExpectedValue post_read_expected_value = + thread->shared->Get(rand_column_families[0], rand_keys[0]); if (fault_fs_guard) { error_count = fault_fs_guard->GetAndResetErrorCount(); } @@ -512,23 +510,35 @@ class NonBatchedOpsStressTest : public StressTest { // found case thread->stats.AddGets(1, 1); // we only have the latest expected state - if (!FLAGS_skip_verifydb && !read_older_ts && - thread->shared->Get(rand_column_families[0], rand_keys[0]) == - SharedState::DELETION_SENTINEL) { - thread->shared->SetVerificationFailure(); - fprintf(stderr, - "error : inconsistent values for key %s: Get returns %s, " - "expected state does not have the key.\n", - key.ToString(true).c_str(), StringToHex(from_db).c_str()); + if (!FLAGS_skip_verifydb && !read_older_ts) { + if (ExpectedValueHelper::MustHaveNotExisted(pre_read_expected_value, + post_read_expected_value)) { + thread->shared->SetVerificationFailure(); + fprintf(stderr, + "error : inconsistent values for key %s: Get returns %s, " + "but expected state is \"deleted\".\n", + key.ToString(true).c_str(), StringToHex(from_db).c_str()); + } + Slice from_db_slice(from_db); + uint32_t value_base_from_db = GetValueBase(from_db_slice); + if (!ExpectedValueHelper::InExpectedValueBaseRange( + value_base_from_db, pre_read_expected_value, + post_read_expected_value)) { + thread->shared->SetVerificationFailure(); + fprintf(stderr, + "error : inconsistent values for key %s: Get returns %s with " + "value base %d that falls out of expected state's value base " + "range.\n", + key.ToString(true).c_str(), StringToHex(from_db).c_str(), + value_base_from_db); + } } } else if (s.IsNotFound()) { // not found case thread->stats.AddGets(1, 0); if (!FLAGS_skip_verifydb && !read_older_ts) { - auto expected = - thread->shared->Get(rand_column_families[0], rand_keys[0]); - if (expected != SharedState::DELETION_SENTINEL && - expected != SharedState::UNKNOWN_SENTINEL) { + if (ExpectedValueHelper::MustHaveExisted(pre_read_expected_value, + post_read_expected_value)) { thread->shared->SetVerificationFailure(); fprintf(stderr, "error : inconsistent values for key %s: expected state has " @@ -611,8 +621,7 @@ class NonBatchedOpsStressTest : public StressTest { switch (op) { case 0: case 1: { - uint32_t value_base = - thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL; + const uint32_t value_base = 0; char value[100]; size_t sz = GenerateValue(value_base, value, sizeof(value)); Slice v(value, sz); @@ -803,15 +812,16 @@ class NonBatchedOpsStressTest : public StressTest { if (!FLAGS_skip_verifydb) { const WideColumns& columns = from_db.columns(); - + ExpectedValue expected = + shared->Get(rand_column_families[0], rand_keys[0]); if (!VerifyWideColumns(columns)) { shared->SetVerificationFailure(); fprintf(stderr, "error : inconsistent columns returned by GetEntity for key " "%s: %s\n", StringToHex(key).c_str(), WideColumnsToHex(columns).c_str()); - } else if (shared->Get(rand_column_families[0], rand_keys[0]) == - SharedState::DELETION_SENTINEL) { + } else if (ExpectedValueHelper::MustHaveNotExisted(expected, + expected)) { shared->SetVerificationFailure(); fprintf( stderr, @@ -824,9 +834,9 @@ class NonBatchedOpsStressTest : public StressTest { thread->stats.AddGets(1, 0); if (!FLAGS_skip_verifydb) { - auto expected = shared->Get(rand_column_families[0], rand_keys[0]); - if (expected != SharedState::DELETION_SENTINEL && - expected != SharedState::UNKNOWN_SENTINEL) { + ExpectedValue expected = + shared->Get(rand_column_families[0], rand_keys[0]); + if (ExpectedValueHelper::MustHaveExisted(expected, expected)) { shared->SetVerificationFailure(); fprintf(stderr, "error : inconsistent values for key %s: expected state has " @@ -1133,16 +1143,17 @@ class NonBatchedOpsStressTest : public StressTest { Status s = db_->Get(read_opts, cfh, k, &from_db); if (!VerifyOrSyncValue(rand_column_family, rand_key, read_opts, shared, /* msg_prefix */ "Pre-Put Get verification", - from_db, s, /* strict */ true)) { + from_db, s)) { return s; } } - const uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; + PendingExpectedValue pending_expected_value = + shared->PreparePut(rand_column_family, rand_key); + const uint32_t value_base = pending_expected_value.GetFinalValueBase(); const size_t sz = GenerateValue(value_base, value, sizeof(value)); const Slice v(value, sz); - shared->Put(rand_column_family, rand_key, value_base, true /* pending */); Status s; @@ -1186,7 +1197,7 @@ class NonBatchedOpsStressTest : public StressTest { } } - shared->Put(rand_column_family, rand_key, value_base, false /* pending */); + pending_expected_value.Commit(); if (!s.ok()) { if (FLAGS_injest_error_severity >= 2) { @@ -1231,7 +1242,8 @@ class NonBatchedOpsStressTest : public StressTest { // otherwise. Status s; if (shared->AllowsOverwrite(rand_key)) { - shared->Delete(rand_column_family, rand_key, true /* pending */); + PendingExpectedValue pending_expected_value = + shared->PrepareDelete(rand_column_family, rand_key); if (!FLAGS_use_txn) { if (FLAGS_user_timestamp_size == 0) { s = db_->Delete(write_opts, cfh, key); @@ -1248,7 +1260,8 @@ class NonBatchedOpsStressTest : public StressTest { } } } - shared->Delete(rand_column_family, rand_key, false /* pending */); + pending_expected_value.Commit(); + thread->stats.AddDeletes(1); if (!s.ok()) { if (FLAGS_injest_error_severity >= 2) { @@ -1266,7 +1279,8 @@ class NonBatchedOpsStressTest : public StressTest { } } } else { - shared->SingleDelete(rand_column_family, rand_key, true /* pending */); + PendingExpectedValue pending_expected_value = + shared->PrepareSingleDelete(rand_column_family, rand_key); if (!FLAGS_use_txn) { if (FLAGS_user_timestamp_size == 0) { s = db_->SingleDelete(write_opts, cfh, key); @@ -1283,7 +1297,7 @@ class NonBatchedOpsStressTest : public StressTest { } } } - shared->SingleDelete(rand_column_family, rand_key, false /* pending */); + pending_expected_value.Commit(); thread->stats.AddSingleDeletes(1); if (!s.ok()) { if (FLAGS_injest_error_severity >= 2) { @@ -1328,10 +1342,10 @@ class NonBatchedOpsStressTest : public StressTest { shared->GetMutexForKey(rand_column_family, rand_key + j))); } } - shared->DeleteRange(rand_column_family, rand_key, - rand_key + FLAGS_range_deletion_width, - true /* pending */); - + std::vector pending_expected_values = + shared->PrepareDeleteRange(rand_column_family, rand_key, + rand_key + FLAGS_range_deletion_width); + const int covered = static_cast(pending_expected_values.size()); std::string keystr = Key(rand_key); Slice key = keystr; auto cfh = column_families_[rand_column_family]; @@ -1361,9 +1375,10 @@ class NonBatchedOpsStressTest : public StressTest { std::terminate(); } } - int covered = shared->DeleteRange(rand_column_family, rand_key, - rand_key + FLAGS_range_deletion_width, - false /* pending */); + for (PendingExpectedValue& pending_expected_value : + pending_expected_values) { + pending_expected_value.Commit(); + } thread->stats.AddRangeDeletions(1); thread->stats.AddCoveredByRangeDeletions(covered); return s; @@ -1393,6 +1408,8 @@ class NonBatchedOpsStressTest : public StressTest { keys.reserve(FLAGS_ingest_external_file_width); std::vector values; values.reserve(FLAGS_ingest_external_file_width); + std::vector pending_expected_values; + pending_expected_values.reserve(FLAGS_ingest_external_file_width); SharedState* shared = thread->shared; assert(FLAGS_nooverwritepercent < 100); @@ -1407,15 +1424,16 @@ class NonBatchedOpsStressTest : public StressTest { new MutexLock(shared->GetMutexForKey(column_family, key))); } if (!shared->AllowsOverwrite(key)) { - // We could alternatively include `key` on the condition its current - // value is `DELETION_SENTINEL`. + // We could alternatively include `key` that is deleted. continue; } keys.push_back(key); - uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; + PendingExpectedValue pending_expected_value = + shared->PreparePut(column_family, key); + const uint32_t value_base = pending_expected_value.GetFinalValueBase(); values.push_back(value_base); - shared->Put(column_family, key, value_base, true /* pending */); + pending_expected_values.push_back(pending_expected_value); char value[100]; size_t value_len = GenerateValue(value_base, value, sizeof(value)); @@ -1438,8 +1456,9 @@ class NonBatchedOpsStressTest : public StressTest { fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str()); std::terminate(); } - for (size_t i = 0; i < keys.size(); ++i) { - shared->Put(column_family, keys[i], values[i], false /* pending */); + + for (size_t i = 0; i < pending_expected_values.size(); ++i) { + pending_expected_values[i].Commit(); } } @@ -1471,8 +1490,13 @@ class NonBatchedOpsStressTest : public StressTest { // Lock the whole range over which we might iterate to ensure it doesn't // change under us. const int rand_column_family = rand_column_families[0]; - std::vector> range_locks = - shared->GetLocksForKeyRange(rand_column_family, lb, ub); + + // Testing parallel read and write to the same key with user timestamp + // is not currently supported + std::vector> range_locks; + if (FLAGS_user_timestamp_size > 0) { + range_locks = shared->GetLocksForKeyRange(rand_column_family, lb, ub); + } ReadOptions ro(read_opts); ro.total_order_seek = true; @@ -1499,7 +1523,22 @@ class NonBatchedOpsStressTest : public StressTest { ColumnFamilyHandle* const cfh = column_families_[rand_column_family]; assert(cfh); + const std::size_t expected_values_size = static_cast(ub - lb); + std::vector pre_read_expected_values; + std::vector post_read_expected_values; + + for (int64_t i = 0; i < static_cast(expected_values_size); ++i) { + pre_read_expected_values.push_back( + shared->Get(rand_column_family, i + lb)); + } std::unique_ptr iter(db_->NewIterator(ro, cfh)); + for (int64_t i = 0; i < static_cast(expected_values_size); ++i) { + post_read_expected_values.push_back( + shared->Get(rand_column_family, i + lb)); + } + + assert(pre_read_expected_values.size() == expected_values_size && + pre_read_expected_values.size() == post_read_expected_values.size()); std::string op_logs; @@ -1529,10 +1568,15 @@ class NonBatchedOpsStressTest : public StressTest { auto check_no_key_in_range = [&](int64_t start, int64_t end) { for (auto j = std::max(start, lb); j < std::min(end, ub); ++j) { - auto expected_value = - shared->Get(rand_column_family, static_cast(j)); - if (expected_value != shared->DELETION_SENTINEL && - expected_value != shared->UNKNOWN_SENTINEL) { + std::size_t index = static_cast(j - lb); + assert(index < pre_read_expected_values.size() && + index < post_read_expected_values.size()); + const ExpectedValue pre_read_expected_value = + pre_read_expected_values[index]; + const ExpectedValue post_read_expected_value = + post_read_expected_values[index]; + if (ExpectedValueHelper::MustHaveExisted(pre_read_expected_value, + post_read_expected_value)) { // Fail fast to preserve the DB state. thread->shared->SetVerificationFailure(); if (iter->Valid()) { @@ -1646,9 +1690,23 @@ class NonBatchedOpsStressTest : public StressTest { } if (thread->rand.OneIn(2)) { + pre_read_expected_values.clear(); + post_read_expected_values.clear(); // Refresh after forward/backward scan to allow higher chance of SV - // change. It is safe to refresh since the testing key range is locked. + // change. + for (int64_t i = 0; i < static_cast(expected_values_size); ++i) { + pre_read_expected_values.push_back( + shared->Get(rand_column_family, i + lb)); + } iter->Refresh(); + for (int64_t i = 0; i < static_cast(expected_values_size); ++i) { + post_read_expected_values.push_back( + shared->Get(rand_column_family, i + lb)); + } + + assert(pre_read_expected_values.size() == expected_values_size && + pre_read_expected_values.size() == + post_read_expected_values.size()); } // start from middle of [lb, ub) otherwise it is easy to iterate out of @@ -1690,9 +1748,19 @@ class NonBatchedOpsStressTest : public StressTest { iter->Prev(); op_logs += "P"; } else { - const uint32_t expected_value = - shared->Get(rand_column_family, static_cast(curr)); - if (expected_value == shared->DELETION_SENTINEL) { + const uint32_t value_base_from_db = GetValueBase(iter->value()); + std::size_t index = static_cast(curr - lb); + assert(index < pre_read_expected_values.size() && + index < post_read_expected_values.size()); + const ExpectedValue pre_read_expected_value = + pre_read_expected_values[index]; + const ExpectedValue post_read_expected_value = + post_read_expected_values[index]; + if (ExpectedValueHelper::MustHaveNotExisted(pre_read_expected_value, + post_read_expected_value) || + !ExpectedValueHelper::InExpectedValueBaseRange( + value_base_from_db, pre_read_expected_value, + post_read_expected_value)) { // Fail fast to preserve the DB state. thread->shared->SetVerificationFailure(); fprintf(stderr, "Iterator has key %s, but expected state does not.\n", @@ -1748,59 +1816,70 @@ class NonBatchedOpsStressTest : public StressTest { bool VerifyOrSyncValue(int cf, int64_t key, const ReadOptions& /*opts*/, SharedState* shared, const std::string& value_from_db, - std::string msg_prefix, const Status& s, - bool strict = false) const { + std::string msg_prefix, const Status& s) const { if (shared->HasVerificationFailedYet()) { return false; } + const ExpectedValue expected_value = shared->Get(cf, key); - // compare value_from_db with the value in the shared state - uint32_t value_base = shared->Get(cf, key); - if (value_base == SharedState::UNKNOWN_SENTINEL) { + if (expected_value.PendingWrite() || expected_value.PendingDelete()) { if (s.ok()) { // Value exists in db, update state to reflect that Slice slice(value_from_db); - value_base = GetValueBase(slice); - shared->Put(cf, key, value_base, false); + uint32_t value_base = GetValueBase(slice); + shared->SyncPut(cf, key, value_base); } else if (s.IsNotFound()) { // Value doesn't exist in db, update state to reflect that - shared->SingleDelete(cf, key, false); + shared->SyncDelete(cf, key); } return true; } - if (value_base == SharedState::DELETION_SENTINEL && !strict) { - return true; - } + // compare value_from_db with the value in the shared state if (s.ok()) { - char value[kValueMaxLen]; - if (value_base == SharedState::DELETION_SENTINEL) { + const Slice slice(value_from_db); + const uint32_t value_base_from_db = GetValueBase(slice); + if (ExpectedValueHelper::MustHaveNotExisted(expected_value, + expected_value) || + !ExpectedValueHelper::InExpectedValueBaseRange( + value_base_from_db, expected_value, expected_value)) { VerificationAbort(shared, msg_prefix + ": Unexpected value found", cf, key, value_from_db, ""); return false; } - size_t sz = GenerateValue(value_base, value, sizeof(value)); - if (value_from_db.length() != sz) { + char expected_value_data[kValueMaxLen]; + size_t expected_value_data_size = + GenerateValue(expected_value.GetValueBase(), expected_value_data, + sizeof(expected_value_data)); + if (value_from_db.length() != expected_value_data_size) { VerificationAbort(shared, msg_prefix + ": Length of value read is not equal", - cf, key, value_from_db, Slice(value, sz)); + cf, key, value_from_db, + Slice(expected_value_data, expected_value_data_size)); return false; } - if (memcmp(value_from_db.data(), value, sz) != 0) { + if (memcmp(value_from_db.data(), expected_value_data, + expected_value_data_size) != 0) { VerificationAbort(shared, msg_prefix + ": Contents of value read don't match", - cf, key, value_from_db, Slice(value, sz)); + cf, key, value_from_db, + Slice(expected_value_data, expected_value_data_size)); return false; } - } else { - if (value_base != SharedState::DELETION_SENTINEL) { - char value[kValueMaxLen]; - size_t sz = GenerateValue(value_base, value, sizeof(value)); - VerificationAbort(shared, - msg_prefix + ": Value not found: " + s.ToString(), cf, - key, "", Slice(value, sz)); + } else if (s.IsNotFound()) { + if (ExpectedValueHelper::MustHaveExisted(expected_value, + expected_value)) { + char expected_value_data[kValueMaxLen]; + size_t expected_value_data_size = + GenerateValue(expected_value.GetValueBase(), expected_value_data, + sizeof(expected_value_data)); + VerificationAbort( + shared, msg_prefix + ": Value not found: " + s.ToString(), cf, key, + "", Slice(expected_value_data, expected_value_data_size)); return false; } + } else { + assert(false); } return true; }