db_stress: preserve all historic manifest files (#6142)

Summary:
compaction history is stored in manifest files. Preserve all of them in db_stress would help debugging.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6142

Test Plan: Run db_stress and observe that manifest files are preserved. Run whole crash_test and see how DB directory looks like.

Differential Revision: D19047026

fbshipit-source-id: f83c3e0bb5332b1b4768be5dcee56a24f9b760a9
main
sdong 5 years ago committed by Facebook Github Bot
parent fbda25f57a
commit 35126dd874
  1. 9
      db_stress_tool/cf_consistency_stress.cc
  2. 5
      db_stress_tool/db_stress_common.cc
  3. 5
      db_stress_tool/db_stress_common.h
  4. 34
      db_stress_tool/db_stress_env_wrapper.h
  5. 10
      db_stress_tool/db_stress_test_base.cc
  6. 18
      db_stress_tool/db_stress_tool.cc

@ -292,7 +292,12 @@ class CfConsistencyStressTest : public StressTest {
const std::vector<int64_t>& /* rand_keys */) override { const std::vector<int64_t>& /* rand_keys */) override {
std::string checkpoint_dir = std::string checkpoint_dir =
FLAGS_db + "/.checkpoint" + ToString(thread->tid); FLAGS_db + "/.checkpoint" + ToString(thread->tid);
DestroyDB(checkpoint_dir, options_);
// We need to clear DB including manifest files, so make a copy
Options opt_copy = options_;
opt_copy.env = FLAGS_env->target();
DestroyDB(checkpoint_dir, opt_copy);
Checkpoint* checkpoint = nullptr; Checkpoint* checkpoint = nullptr;
Status s = Checkpoint::Create(db_, &checkpoint); Status s = Checkpoint::Create(db_, &checkpoint);
if (s.ok()) { if (s.ok()) {
@ -326,7 +331,7 @@ class CfConsistencyStressTest : public StressTest {
delete checkpoint_db; delete checkpoint_db;
checkpoint_db = nullptr; checkpoint_db = nullptr;
} }
DestroyDB(checkpoint_dir, options_); DestroyDB(checkpoint_dir, opt_copy);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "A checkpoint operation failed with: %s\n", fprintf(stderr, "A checkpoint operation failed with: %s\n",
s.ToString().c_str()); s.ToString().c_str());

@ -12,7 +12,7 @@
#include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_common.h"
#include <cmath> #include <cmath>
rocksdb::Env* FLAGS_env = rocksdb::Env::Default(); rocksdb::DbStressEnvWrapper* FLAGS_env = nullptr;
enum rocksdb::CompressionType FLAGS_compression_type_e = enum rocksdb::CompressionType FLAGS_compression_type_e =
rocksdb::kSnappyCompression; rocksdb::kSnappyCompression;
enum rocksdb::ChecksumType FLAGS_checksum_type_e = rocksdb::kCRC32c; enum rocksdb::ChecksumType FLAGS_checksum_type_e = rocksdb::kCRC32c;
@ -100,7 +100,8 @@ void PoolSizeChangeThread(void* v) {
if (new_thread_pool_size < 1) { if (new_thread_pool_size < 1) {
new_thread_pool_size = 1; new_thread_pool_size = 1;
} }
FLAGS_env->SetBackgroundThreads(new_thread_pool_size); FLAGS_env->SetBackgroundThreads(new_thread_pool_size,
rocksdb::Env::Priority::LOW);
// Sleep up to 3 seconds // Sleep up to 3 seconds
FLAGS_env->SleepForMicroseconds( FLAGS_env->SleepForMicroseconds(
thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval * thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval *

@ -36,6 +36,7 @@
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "db_stress_tool/db_stress_env_wrapper.h"
#include "db_stress_tool/db_stress_listener.h" #include "db_stress_tool/db_stress_listener.h"
#include "db_stress_tool/db_stress_shared_state.h" #include "db_stress_tool/db_stress_shared_state.h"
#include "db_stress_tool/db_stress_test_base.h" #include "db_stress_tool/db_stress_test_base.h"
@ -207,8 +208,8 @@ const long KB = 1024;
const int kRandomValueMaxFactor = 3; const int kRandomValueMaxFactor = 3;
const int kValueMaxLen = 100; const int kValueMaxLen = 100;
// posix or hdfs environment // wrapped posix or hdfs environment
extern rocksdb::Env* FLAGS_env; extern rocksdb::DbStressEnvWrapper* FLAGS_env;
extern enum rocksdb::CompressionType FLAGS_compression_type_e; extern enum rocksdb::CompressionType FLAGS_compression_type_e;
extern enum rocksdb::ChecksumType FLAGS_checksum_type_e; extern enum rocksdb::ChecksumType FLAGS_checksum_type_e;

@ -0,0 +1,34 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifdef GFLAGS
#pragma once
#include "db_stress_tool/db_stress_common.h"
namespace rocksdb {
class DbStressEnvWrapper : public EnvWrapper {
public:
explicit DbStressEnvWrapper(Env* t) : EnvWrapper(t) {}
Status DeleteFile(const std::string& f) override {
// We determine whether it is a manifest file by searching a strong,
// so that there will be false positive if the directory path contains the
// keyword but it is unlikely.
if (!if_preserve_all_manifests ||
f.find("MANIFEST-") == std::string::npos) {
return target()->DeleteFile(f);
}
return Status::OK();
}
// If true, all manifest files will not be delted in DeleteFile().
bool if_preserve_all_manifests = true;
};
} // namespace rocksdb
#endif // GFLAGS

@ -38,7 +38,8 @@ StressTest::StressTest()
} }
} }
Options options; Options options;
options.env = FLAGS_env; // Remove files without preserving manfiest files
options.env = FLAGS_env->target();
Status s = DestroyDB(FLAGS_db, options); Status s = DestroyDB(FLAGS_db, options);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Cannot destroy original db: %s\n", s.ToString().c_str()); fprintf(stderr, "Cannot destroy original db: %s\n", s.ToString().c_str());
@ -1220,7 +1221,10 @@ Status StressTest::TestCheckpoint(ThreadState* thread,
FLAGS_db + "/.checkpoint" + ToString(thread->tid); FLAGS_db + "/.checkpoint" + ToString(thread->tid);
Options tmp_opts(options_); Options tmp_opts(options_);
tmp_opts.listeners.clear(); tmp_opts.listeners.clear();
tmp_opts.env = FLAGS_env->target();
DestroyDB(checkpoint_dir, tmp_opts); DestroyDB(checkpoint_dir, tmp_opts);
Checkpoint* checkpoint = nullptr; Checkpoint* checkpoint = nullptr;
Status s = Checkpoint::Create(db_, &checkpoint); Status s = Checkpoint::Create(db_, &checkpoint);
if (s.ok()) { if (s.ok()) {
@ -1276,7 +1280,9 @@ Status StressTest::TestCheckpoint(ThreadState* thread,
delete checkpoint_db; delete checkpoint_db;
checkpoint_db = nullptr; checkpoint_db = nullptr;
} }
DestroyDB(checkpoint_dir, tmp_opts); DestroyDB(checkpoint_dir, tmp_opts);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "A checkpoint operation failed with: %s\n", fprintf(stderr, "A checkpoint operation failed with: %s\n",
s.ToString().c_str()); s.ToString().c_str());
@ -1556,7 +1562,7 @@ void StressTest::Open() {
std::vector<ColumnFamilyDescriptor> cf_descriptors; std::vector<ColumnFamilyDescriptor> cf_descriptors;
Status s = LoadOptionsFromFile(FLAGS_options_file, FLAGS_env, &db_options, Status s = LoadOptionsFromFile(FLAGS_options_file, FLAGS_env, &db_options,
&cf_descriptors); &cf_descriptors);
db_options.env = FLAGS_env; db_options.env = new DbStressEnvWrapper(FLAGS_env);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Unable to load options file %s --- %s\n", fprintf(stderr, "Unable to load options file %s --- %s\n",
FLAGS_options_file.c_str(), s.ToString().c_str()); FLAGS_options_file.c_str(), s.ToString().c_str());

@ -27,6 +27,7 @@
namespace rocksdb { namespace rocksdb {
namespace { namespace {
static std::shared_ptr<rocksdb::Env> env_guard; static std::shared_ptr<rocksdb::Env> env_guard;
static std::shared_ptr<rocksdb::DbStressEnvWrapper> env_wrapper_guard;
} // namespace } // namespace
int db_stress_tool(int argc, char** argv) { int db_stress_tool(int argc, char** argv) {
@ -47,24 +48,33 @@ int db_stress_tool(int argc, char** argv) {
FLAGS_compression_type_e = FLAGS_compression_type_e =
StringToCompressionType(FLAGS_compression_type.c_str()); StringToCompressionType(FLAGS_compression_type.c_str());
FLAGS_checksum_type_e = StringToChecksumType(FLAGS_checksum_type.c_str()); FLAGS_checksum_type_e = StringToChecksumType(FLAGS_checksum_type.c_str());
Env* raw_env;
if (!FLAGS_hdfs.empty()) { if (!FLAGS_hdfs.empty()) {
if (!FLAGS_env_uri.empty()) { if (!FLAGS_env_uri.empty()) {
fprintf(stderr, "Cannot specify both --hdfs and --env_uri.\n"); fprintf(stderr, "Cannot specify both --hdfs and --env_uri.\n");
exit(1); exit(1);
} }
FLAGS_env = new rocksdb::HdfsEnv(FLAGS_hdfs); raw_env = new rocksdb::HdfsEnv(FLAGS_hdfs);
} else if (!FLAGS_env_uri.empty()) { } else if (!FLAGS_env_uri.empty()) {
Status s = Env::LoadEnv(FLAGS_env_uri, &FLAGS_env, &env_guard); Status s = Env::LoadEnv(FLAGS_env_uri, &raw_env, &env_guard);
if (FLAGS_env == nullptr) { if (raw_env == nullptr) {
fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str()); fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str());
exit(1); exit(1);
} }
} else {
raw_env = Env::Default();
} }
env_wrapper_guard = std::make_shared<DbStressEnvWrapper>(raw_env);
FLAGS_env = env_wrapper_guard.get();
FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str()); FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
// The number of background threads should be at least as much the // The number of background threads should be at least as much the
// max number of concurrent compactions. // max number of concurrent compactions.
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions); FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions,
rocksdb::Env::Priority::LOW);
FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads, FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads,
rocksdb::Env::Priority::BOTTOM); rocksdb::Env::Priority::BOTTOM);
if (FLAGS_prefixpercent > 0 && FLAGS_prefix_size < 0) { if (FLAGS_prefixpercent > 0 && FLAGS_prefix_size < 0) {

Loading…
Cancel
Save