diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 6c7fbabbf..62f72f2b5 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -65,6 +65,7 @@ default_params = { "writepercent": 35, "format_version": lambda: random.randint(2, 4), "index_block_restart_interval": lambda: random.choice(range(1, 16)), + "use_multiget" : lambda: random.randint(0, 1), } _TEST_DIR_ENV_VAR = 'TEST_TMPDIR' diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 2ecd2aa6d..97755fe96 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -455,6 +455,9 @@ DEFINE_uint64(snapshot_hold_ops, 0, "If non-zero, then releases snapshots N operations after they're " "acquired."); +DEFINE_bool(use_multiget, false, + "If set, use the batched MultiGet API for reads"); + static bool ValidateInt32Percent(const char* flagname, int32_t value) { if (value < 0 || value>100) { fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n", @@ -1725,6 +1728,27 @@ class StressTest { return base_key + thread->rand.Next() % FLAGS_active_width; } + static std::vector GenerateNKeys( + ThreadState* thread, + int num_keys, + uint64_t iteration) { + const double completed_ratio = + static_cast(iteration) / FLAGS_ops_per_thread; + const int64_t base_key = static_cast( + completed_ratio * (FLAGS_max_key - FLAGS_active_width)); + std::vector keys; + keys.reserve(num_keys); + int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width; + keys.push_back(next_key); + for (int i = 1; i < num_keys; ++i) { + // This may result in some duplicate keys + next_key = next_key + thread->rand.Next() % + (FLAGS_active_width - (next_key - base_key)); + keys.push_back(next_key); + } + return keys; + } + static size_t GenerateValue(uint32_t rand, char *v, size_t max_sz) { size_t value_sz = ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult; @@ -2162,7 +2186,14 @@ class StressTest { int prob_op = thread->rand.Uniform(100); if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) { // OPERATION read - TestGet(thread, read_opts, rand_column_families, rand_keys); + if (FLAGS_use_multiget) { + int num_keys = thread->rand.Uniform(64); + rand_keys = GenerateNKeys(thread, num_keys, i); + TestMultiGet(thread, read_opts, rand_column_families, rand_keys); + i += num_keys - 1; + } else { + TestGet(thread, read_opts, rand_column_families, rand_keys); + } } else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) { // OPERATION prefix scan // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are @@ -2211,6 +2242,11 @@ class StressTest { const std::vector& rand_column_families, const std::vector& rand_keys) = 0; + virtual std::vector TestMultiGet(ThreadState* thread, + const ReadOptions& read_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys) = 0; + virtual Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts, const std::vector& rand_column_families, @@ -2546,6 +2582,8 @@ class StressTest { fprintf(stdout, "Checksum type : %s\n", checksum.c_str()); fprintf(stdout, "Max subcompactions : %" PRIu64 "\n", FLAGS_subcompactions); + fprintf(stdout, "Use MultiGet : %s\n", + FLAGS_use_multiget ? "true" : "false"); const char* memtablerep = ""; switch (FLAGS_rep_factory) { @@ -3012,6 +3050,38 @@ class NonBatchedOpsStressTest : public StressTest { return s; } + virtual std::vector TestMultiGet(ThreadState* thread, + const ReadOptions& read_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys) { + size_t num_keys = rand_keys.size(); + std::vector key_str; + std::vector keys; + std::vector values(num_keys); + std::vector statuses(num_keys); + ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]]; + + for (size_t i = 0; i < num_keys; ++i) { + key_str.emplace_back(Key(rand_keys[i])); + keys.emplace_back(key_str.back()); + } + db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), + statuses.data()); + for (const auto& s : statuses) { + if (s.ok()) { + // found case + thread->stats.AddGets(1, 1); + } else if (s.IsNotFound()) { + // not found case + thread->stats.AddGets(1, 0); + } else { + // errors case + thread->stats.AddErrors(1); + } + } + return statuses; + } + virtual Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts, const std::vector& rand_column_families, @@ -3532,6 +3602,70 @@ class BatchedOpsStressTest : public StressTest { return s; } + virtual std::vector TestMultiGet(ThreadState* thread, + const ReadOptions& readoptions, + const std::vector& rand_column_families, + const std::vector& rand_keys) { + int num_keys = rand_keys.size(); + std::vector statuses(num_keys); + std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}; + for (int key = 0; key < 10; ++key) { + std::vector key_slices; + std::vector values(num_keys); + ReadOptions readoptionscopy = readoptions; + readoptionscopy.snapshot = db_->GetSnapshot(); + std::vector key_str; + std::string from_db; + ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]]; + + for (int rand_key = 0; rand_key < num_keys; ++rand_key) { + key_str.emplace_back(keys[key] + Key(rand_keys[rand_key])); + key_slices.emplace_back(key_str.back()); + } + db_->MultiGet(readoptionscopy, cfh, num_keys, key_slices.data(), + values.data(), statuses.data()); + for (int i = 0; i < num_keys; i++) { + Status s = statuses[i]; + if (!s.ok() && !s.IsNotFound()) { + fprintf(stderr, "get error: %s\n", s.ToString().c_str()); + thread->stats.AddErrors(1); + // we continue after error rather than exiting so that we can + // find more errors if any + } else if (s.IsNotFound()) { + thread->stats.AddGets(1, 0); + } else { + char expected_prefix = (keys[key])[0]; + char actual_prefix = (values[i])[0]; + if (actual_prefix != expected_prefix) { + fprintf(stderr, "error expected prefix = %c actual = %c\n", + expected_prefix, actual_prefix); + } + std::string str; + str.assign(values[i].data(), values[i].size()); + values[i].Reset(); + str[0] = ' '; // blank out the differing character + values[i].PinSelf(str); + thread->stats.AddGets(1, 1); + } + } + db_->ReleaseSnapshot(readoptionscopy.snapshot); + + // Now that we retrieved all values, check that they all match + for (int i = 1; i < num_keys; i++) { + if (values[i] != values[0]) { + fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n", + key_str[i].c_str(), + StringToHex(values[0].ToString()).c_str(), + StringToHex(values[i].ToString()).c_str()); + // we continue after error rather than exiting so that we can + // find more errors if any + } + } + } + + return statuses; + } + // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes // of the key. Each of these 10 scans returns a series of values; @@ -3747,6 +3881,37 @@ class AtomicFlushStressTest : public StressTest { return s; } + virtual std::vector TestMultiGet(ThreadState* thread, + const ReadOptions& read_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys) { + int num_keys = rand_keys.size(); + std::vector key_str; + std::vector keys; + std::vector values(num_keys); + std::vector statuses(num_keys); + ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]]; + + for (int i = 0; i < num_keys; ++i) { + key_str.emplace_back(Key(rand_keys[i])); + keys.emplace_back(key_str.back()); + } + db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), statuses.data()); + for (auto s : statuses) { + if (s.ok()) { + // found case + thread->stats.AddGets(1, 1); + } else if (s.IsNotFound()) { + // not found case + thread->stats.AddGets(1, 0); + } else { + // errors case + thread->stats.AddErrors(1); + } + } + return statuses; + } + virtual Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions, const std::vector& rand_column_families,