add `CompactionFilter` to stress/crash tests (#6988)

Summary:
Added a `CompactionFilter` that is aware of the stress test's expected state. It only drops key versions that are already covered according to the expected state. It is incompatible with snapshots (same as all `CompactionFilter`s), so disables all snapshot-related features when used in the crash test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6988

Test Plan:
running a minified blackbox crash test

```
$ TEST_TMPDIR=/dev/shm python tools/db_crashtest.py blackbox --max_key=1000000 -write_buffer_size=1048576 -max_bytes_for_level_base=4194304 -target_file_size_base=1048576 -value_size_mult=33 --interval=10 --duration=3600
```

Reviewed By: anand1976

Differential Revision: D22072888

Pulled By: ajkr

fbshipit-source-id: 727b9d7a90d5eab18be0ec6cd5a810712ac13320
main
Andrew Kryczka 4 years ago committed by Facebook GitHub Bot
parent 312f23c92d
commit 775dc623ad
  1. 40
      db_stress_tool/db_stress_common.h
  2. 78
      db_stress_tool/db_stress_compaction_filter.h
  3. 5
      db_stress_tool/db_stress_driver.cc
  4. 5
      db_stress_tool/db_stress_gflags.cc
  5. 22
      db_stress_tool/db_stress_test_base.cc
  6. 4
      db_stress_tool/db_stress_test_base.h
  7. 9
      db_stress_tool/db_stress_tool.cc
  8. 12
      tools/db_crashtest.py

@ -238,6 +238,7 @@ DECLARE_bool(sync_fault_injection);
DECLARE_bool(best_efforts_recovery);
DECLARE_bool(skip_verifydb);
DECLARE_bool(enable_compaction_filter);
const long KB = 1024;
const int kRandomValueMaxFactor = 3;
@ -439,19 +440,10 @@ extern inline bool GetIntVal(std::string big_endian_key, uint64_t* key_p) {
assert(size_key <= key_gen_ctx.weights.size() * sizeof(uint64_t));
// Pad with zeros to make it a multiple of 8. This function may be called
// with a prefix, in which case we return the first index that falls
// inside or outside that prefix, dependeing on whether the prefix is
// the start of upper bound of a scan
unsigned int pad = sizeof(uint64_t) - (size_key % sizeof(uint64_t));
if (pad < sizeof(uint64_t)) {
big_endian_key.append(pad, '\0');
size_key += pad;
}
std::string little_endian_key;
little_endian_key.resize(size_key);
for (size_t start = 0; start < size_key; start += sizeof(uint64_t)) {
for (size_t start = 0; start + sizeof(uint64_t) <= size_key;
start += sizeof(uint64_t)) {
size_t end = start + sizeof(uint64_t);
for (size_t i = 0; i < sizeof(uint64_t); ++i) {
little_endian_key[start + i] = big_endian_key[end - 1 - i];
@ -470,17 +462,41 @@ extern inline bool GetIntVal(std::string big_endian_key, uint64_t* key_p) {
uint64_t pfx = prefixes[i];
key += (pfx / key_gen_ctx.weights[i]) * key_gen_ctx.window +
pfx % key_gen_ctx.weights[i];
if (i < prefixes.size() - 1) {
// The encoding writes a `key_gen_ctx.weights[i] - 1` that counts for
// `key_gen_ctx.weights[i]` when there are more prefixes to come. So we
// need to add back the one here as we're at a non-last prefix.
++key;
}
}
*key_p = key;
return true;
}
// Given a string prefix, map it to the first corresponding index in the
// expected values buffer.
inline bool GetFirstIntValInPrefix(std::string big_endian_prefix,
uint64_t* key_p) {
size_t size_key = big_endian_prefix.size();
// Pad with zeros to make it a multiple of 8. This function may be called
// with a prefix, in which case we return the first index that falls
// inside or outside that prefix, dependeing on whether the prefix is
// the start of upper bound of a scan
unsigned int pad = sizeof(uint64_t) - (size_key % sizeof(uint64_t));
if (pad < sizeof(uint64_t)) {
big_endian_prefix.append(pad, '\0');
size_key += pad;
}
return GetIntVal(std::move(big_endian_prefix), key_p);
}
extern inline uint64_t GetPrefixKeyCount(const std::string& prefix,
const std::string& ub) {
uint64_t start = 0;
uint64_t end = 0;
if (!GetIntVal(prefix, &start) || !GetIntVal(ub, &end)) {
if (!GetFirstIntValInPrefix(prefix, &start) ||
!GetFirstIntValInPrefix(ub, &end)) {
return 0;
}

@ -0,0 +1,78 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "rocksdb/compaction_filter.h"
namespace ROCKSDB_NAMESPACE {
// DbStressCompactionFilter is safe to use with db_stress as it does not perform
// any mutation. It only makes `kRemove` decisions for keys that are already
// non-existent according to the `SharedState`.
class DbStressCompactionFilter : public CompactionFilter {
public:
DbStressCompactionFilter(SharedState* state, int cf_id)
: state_(state), cf_id_(cf_id) {}
Decision FilterV2(int /*level*/, const Slice& key, ValueType /*value_type*/,
const Slice& /*existing_value*/, std::string* /*new_value*/,
std::string* /*skip_until*/) const override {
if (state_ == nullptr) {
return Decision::kKeep;
}
if (key.empty() || ('0' <= key[0] && key[0] <= '9')) {
// It is likely leftover from a test_batches_snapshots run. Below this
// conditional, the test_batches_snapshots key format is not handled
// properly. Just keep it to be safe.
return Decision::kKeep;
}
uint64_t key_num = 0;
bool ok = GetIntVal(key.ToString(), &key_num);
assert(ok);
MutexLock key_lock(state_->GetMutexForKey(cf_id_, key_num));
if (!state_->Exists(cf_id_, key_num)) {
return Decision::kRemove;
}
return Decision::kKeep;
}
const char* Name() const override { return "DbStressCompactionFilter"; }
private:
SharedState* const state_;
const int cf_id_;
};
class DbStressCompactionFilterFactory : public CompactionFilterFactory {
public:
DbStressCompactionFilterFactory() : state_(nullptr) {}
void SetSharedState(SharedState* state) {
MutexLock state_mutex_guard(&state_mutex_);
state_ = state;
}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
MutexLock state_mutex_guard(&state_mutex_);
return std::unique_ptr<CompactionFilter>(
new DbStressCompactionFilter(state_, context.column_family_id));
}
const char* Name() const override {
return "DbStressCompactionFilterFactory";
}
private:
port::Mutex state_mutex_;
SharedState* state_;
};
} // namespace ROCKSDB_NAMESPACE

@ -57,11 +57,8 @@ void ThreadBody(void* v) {
bool RunStressTest(StressTest* stress) {
stress->InitDb();
SharedState shared(db_stress_env, stress);
if (FLAGS_read_only) {
stress->InitReadonlyDb(&shared);
}
stress->FinishInitDb(&shared);
#ifndef NDEBUG
if (FLAGS_sync_fault_injection) {

@ -695,4 +695,9 @@ DEFINE_bool(sync_fault_injection, false,
DEFINE_bool(best_efforts_recovery, false,
"If true, use best efforts recovery.");
DEFINE_bool(skip_verifydb, false, "If true, skip VerifyDb() calls.");
DEFINE_bool(enable_compaction_filter, false,
"If true, configures a compaction filter that returns a kRemove "
"decision for deleted keys.");
#endif // GFLAGS

@ -10,6 +10,7 @@
#ifdef GFLAGS
#include "db_stress_tool/db_stress_common.h"
#include "db_stress_tool/db_stress_compaction_filter.h"
#include "db_stress_tool/db_stress_driver.h"
#include "rocksdb/convenience.h"
#include "rocksdb/sst_file_manager.h"
@ -195,11 +196,18 @@ void StressTest::InitDb() {
BuildOptionsTable();
}
void StressTest::InitReadonlyDb(SharedState* shared) {
uint64_t now = db_stress_env->NowMicros();
fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
db_stress_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared);
void StressTest::FinishInitDb(SharedState* shared) {
if (FLAGS_read_only) {
uint64_t now = db_stress_env->NowMicros();
fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
db_stress_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared);
}
if (FLAGS_enable_compaction_filter) {
reinterpret_cast<DbStressCompactionFilterFactory*>(
options_.compaction_filter_factory.get())
->SetSharedState(shared);
}
}
bool StressTest::VerifySecondaries() {
@ -1914,6 +1922,10 @@ void StressTest::Open() {
} else {
options_.merge_operator = MergeOperators::CreatePutOperator();
}
if (FLAGS_enable_compaction_filter) {
options_.compaction_filter_factory =
std::make_shared<DbStressCompactionFilterFactory>();
}
options_.best_efforts_recovery = FLAGS_best_efforts_recovery;

@ -27,7 +27,9 @@ class StressTest {
bool BuildOptionsTable();
void InitDb();
void InitReadonlyDb(SharedState*);
// The initialization work is split into two parts to avoid a circular
// dependency with `SharedState`.
void FinishInitDb(SharedState*);
// Return false if verification fails.
bool VerifySecondaries();

@ -236,6 +236,15 @@ int db_stress_tool(int argc, char** argv) {
exit(1);
}
}
if (FLAGS_enable_compaction_filter &&
(FLAGS_acquire_snapshot_one_in > 0 || FLAGS_compact_range_one_in > 0 ||
FLAGS_iterpercent > 0 || FLAGS_test_batches_snapshots > 0)) {
fprintf(
stderr,
"Error: acquire_snapshot_one_in, compact_range_one_in, iterpercent, "
"test_batches_snapshots must all be 0 when using compaction filter\n");
exit(1);
}
rocksdb_kill_odds = FLAGS_kill_random_test;
rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);

@ -55,6 +55,7 @@ default_params = {
"delrangepercent": 1,
"destroy_db_initially": 0,
"enable_pipelined_write": lambda: random.randint(0, 1),
"enable_compaction_filter": lambda: random.choice([0, 0, 0, 1]),
"expected_values_path": expected_values_file.name,
"flush_one_in": 1000000,
"get_live_files_one_in": 1000000,
@ -64,6 +65,7 @@ default_params = {
"get_current_wal_file_one_in": 0,
# Temporarily disable hash index
"index_type": lambda: random.choice([0, 0, 0, 2, 2, 3]),
"iterpercent": 10,
"max_background_compactions": 20,
"max_bytes_for_level_base": 10485760,
"max_key": 100000000,
@ -275,6 +277,16 @@ def finalize_and_sanitize(src_params):
if dest_params.get("atomic_flush", 0) == 1:
# disable pipelined write when atomic flush is used.
dest_params["enable_pipelined_write"] = 0
if dest_params.get("enable_compaction_filter", 0) == 1:
# Compaction filter is incompatible with snapshots. Need to avoid taking
# snapshots, as well as avoid operations that use snapshots for
# verification.
dest_params["acquire_snapshot_one_in"] = 0
dest_params["compact_range_one_in"] = 0
# Give the iterator ops away to reads.
dest_params["readpercent"] += dest_params.get("iterpercent", 10)
dest_params["iterpercent"] = 0
dest_params["test_batches_snapshots"] = 0
return dest_params
def gen_cmd_params(args):

Loading…
Cancel
Save