Add manual_wal_flush, FlushWAL() to stress/crash test (#10698)

Summary:
**Context/Summary:**
Introduce `manual_wal_flush_one_in` as titled.
- When `manual_wal_flush_one_in  > 0`, we also need tracing to correctly verify recovery because WAL data can be lost in this case when `FlushWAL()` is not explicitly called by users of RocksDB (in our case, db stress) and the recovery from such potential WAL data loss is a prefix recovery that requires tracing to verify. As another consequence, we need to disable features can't run under unsync data loss with `manual_wal_flush_one_in`

Incompatibilities fixed along the way:
```
db_stress: db/db_impl/db_impl_open.cc:2063: static rocksdb::Status rocksdb::DBImpl::Open(const rocksdb::DBOptions&, const string&, const std::vector<rocksdb::ColumnFamilyDescriptor>&, std::vector<rocksdb::ColumnFamilyHandle*>*, rocksdb::DB**, bool, bool): Assertion `impl->TEST_WALBufferIsEmpty()' failed.
```
 - It turns out that `Writer::AddCompressionTypeRecord` before this assertion `EmitPhysicalRecord(kSetCompressionType, encode.data(), encode.size());` but do not trigger flush if `manual_wal_flush` is set . This leads to `impl->TEST_WALBufferIsEmpty()' is false.
    - As suggested, assertion is removed and violation case is handled by `FlushWAL(sync=true)` along with refactoring `TEST_WALBufferIsEmpty()` to be `WALBufferIsEmpty()` since it is used in prod code now.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10698

Test Plan:
- Locally running `python3 tools/db_crashtest.py blackbox --manual_wal_flush_one_in=1 --manual_wal_flush=1 --sync_wal_one_in=100 --atomic_flush=1 --flush_one_in=100 --column_families=3`
- Joined https://github.com/facebook/rocksdb/pull/10624 in auto CI testings with all RocksDB stress/crash test jobs

Reviewed By: ajkr

Differential Revision: D39593752

Pulled By: ajkr

fbshipit-source-id: 3a2135bb792c52d2ffa60257d4fbc557fb04d2ce
main
Hui Xiao 2 years ago committed by Facebook GitHub Bot
parent 793fd09783
commit 3b8164912e
  1. 12
      db/db_impl/db_impl.cc
  2. 2
      db/db_impl/db_impl.h
  3. 12
      db/db_impl/db_impl_debug.cc
  4. 6
      db/db_impl/db_impl_open.cc
  5. 16
      db/db_write_test.cc
  6. 2
      db/log_writer.cc
  7. 2
      db/log_writer.h
  8. 1
      db_stress_tool/db_stress_common.h
  9. 7
      db_stress_tool/db_stress_gflags.cc
  10. 25
      db_stress_tool/db_stress_test_base.cc
  11. 2
      file/writable_file_writer.h
  12. 4
      tools/db_crashtest.py

@ -1453,6 +1453,18 @@ Status DBImpl::FlushWAL(bool sync) {
return SyncWAL();
}
bool DBImpl::WALBufferIsEmpty(bool lock) {
if (lock) {
log_write_mutex_.Lock();
}
log::Writer* cur_log_writer = logs_.back().writer;
auto res = cur_log_writer->BufferIsEmpty();
if (lock) {
log_write_mutex_.Unlock();
}
return res;
}
Status DBImpl::SyncWAL() {
TEST_SYNC_POINT("DBImpl::SyncWAL:Begin");
autovector<log::Writer*, 1> logs_to_sync;

@ -423,7 +423,7 @@ class DBImpl : public DB {
const FlushOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) override;
virtual Status FlushWAL(bool sync) override;
bool TEST_WALBufferIsEmpty(bool lock = true);
bool WALBufferIsEmpty(bool lock = true);
virtual Status SyncWAL() override;
virtual Status LockWAL() override;
virtual Status UnlockWAL() override;

@ -31,18 +31,6 @@ Status DBImpl::TEST_SwitchWAL() {
return s;
}
bool DBImpl::TEST_WALBufferIsEmpty(bool lock) {
if (lock) {
log_write_mutex_.Lock();
}
log::Writer* cur_log_writer = logs_.back().writer;
auto res = cur_log_writer->TEST_BufferIsEmpty();
if (lock) {
log_write_mutex_.Unlock();
}
return res;
}
uint64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
ColumnFamilyHandle* column_family) {
ColumnFamilyData* cfd;

@ -2060,9 +2060,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
impl);
LogFlush(impl->immutable_db_options_.info_log);
assert(impl->TEST_WALBufferIsEmpty());
// If the assert above fails then we need to FlushWAL before returning
// control back to the user.
if (!impl->WALBufferIsEmpty()) {
impl->FlushWAL(true /* sync */);
}
if (!persist_options_status.ok()) {
s = Status::IOError(
"DB::Open() failed --- Unable to persist Options file",

@ -453,15 +453,15 @@ TEST_P(DBWriteTest, ManualWalFlushInEffect) {
Reopen(options);
// try the 1st WAL created during open
ASSERT_TRUE(Put("key" + std::to_string(0), "value").ok());
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty());
ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
// try the 2nd wal created during SwitchWAL
ASSERT_OK(dbfull()->TEST_SwitchWAL());
ASSERT_TRUE(Put("key" + std::to_string(0), "value").ok());
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty());
ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
}
TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) {
@ -609,16 +609,16 @@ TEST_P(DBWriteTest, LockWalInEffect) {
Reopen(options);
// try the 1st WAL created during open
ASSERT_OK(Put("key" + std::to_string(0), "value"));
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty());
ASSERT_OK(dbfull()->LockWAL());
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
ASSERT_TRUE(dbfull()->WALBufferIsEmpty(false));
ASSERT_OK(dbfull()->UnlockWAL());
// try the 2nd wal created during SwitchWAL
ASSERT_OK(dbfull()->TEST_SwitchWAL());
ASSERT_OK(Put("key" + std::to_string(0), "value"));
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty());
ASSERT_OK(dbfull()->LockWAL());
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
ASSERT_TRUE(dbfull()->WALBufferIsEmpty(false));
ASSERT_OK(dbfull()->UnlockWAL());
}

@ -194,7 +194,7 @@ IOStatus Writer::AddCompressionTypeRecord() {
return s;
}
bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); }
bool Writer::BufferIsEmpty() { return dest_->BufferIsEmpty(); }
IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n,
Env::IOPriority rate_limiter_priority) {

@ -96,7 +96,7 @@ class Writer {
IOStatus Close();
bool TEST_BufferIsEmpty();
bool BufferIsEmpty();
private:
std::unique_ptr<WritableFileWriter> dest_;

@ -86,6 +86,7 @@ DECLARE_string(options_file);
DECLARE_int64(active_width);
DECLARE_bool(test_batches_snapshots);
DECLARE_bool(atomic_flush);
DECLARE_int32(manual_wal_flush_one_in);
DECLARE_bool(test_cf_consistency);
DECLARE_bool(test_multi_ops_txns);
DECLARE_int32(threads);

@ -85,6 +85,13 @@ DEFINE_bool(test_batches_snapshots, false,
DEFINE_bool(atomic_flush, false,
"If set, enables atomic flush in the options.\n");
DEFINE_int32(
manual_wal_flush_one_in, 0,
"If non-zero, then `FlushWAL(bool sync)`, where `bool sync` is randomly "
"decided, will be explictly called in db stress once for every N ops "
"on average. Setting `manual_wal_flush_one_in` to be greater than 0 "
"implies `Options::manual_wal_flush = true` is set.");
DEFINE_bool(test_cf_consistency, false,
"If set, runs the stress test dedicated to verifying writes to "
"multiple column families are consistent. Setting this implies "

@ -334,7 +334,14 @@ void StressTest::FinishInitDb(SharedState* shared) {
}
void StressTest::TrackExpectedState(SharedState* shared) {
if ((FLAGS_sync_fault_injection || FLAGS_disable_wal) && IsStateTracked()) {
// For `FLAGS_manual_wal_flush_one_inWAL`
// data can be lost when `manual_wal_flush_one_in > 0` and `FlushWAL()` is not
// explictly called by users of RocksDB (in our case, db stress).
// Therefore recovery from such potential WAL data loss is a prefix recovery
// that requires tracing
if ((FLAGS_sync_fault_injection || FLAGS_disable_wal ||
FLAGS_manual_wal_flush_one_in > 0) &&
IsStateTracked()) {
Status s = shared->SaveAtAndAfter(db_);
if (!s.ok()) {
fprintf(stderr, "Error enabling history tracing: %s\n",
@ -777,6 +784,15 @@ void StressTest::OperateDb(ThreadState* thread) {
MaybeClearOneColumnFamily(thread);
if (thread->rand.OneInOpt(FLAGS_manual_wal_flush_one_in)) {
bool sync = thread->rand.OneIn(2) ? true : false;
Status s = db_->FlushWAL(sync);
if (!s.ok()) {
fprintf(stderr, "FlushWAL(sync=%s) failed: %s\n",
(sync ? "true" : "false"), s.ToString().c_str());
}
}
if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) {
Status s = db_->SyncWAL();
if (!s.ok() && !s.IsNotSupported()) {
@ -2293,6 +2309,8 @@ void StressTest::PrintEnv() const {
FLAGS_read_only ? "true" : "false");
fprintf(stdout, "Atomic flush : %s\n",
FLAGS_atomic_flush ? "true" : "false");
fprintf(stdout, "Manual WAL flush : %s\n",
FLAGS_manual_wal_flush_one_in > 0 ? "true" : "false");
fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
if (!FLAGS_test_batches_snapshots) {
fprintf(stdout, "Clear CFs one in : %d\n",
@ -2801,7 +2819,9 @@ void StressTest::Reopen(ThreadState* thread) {
clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_);
Open(thread->shared);
if ((FLAGS_sync_fault_injection || FLAGS_disable_wal) && IsStateTracked()) {
if ((FLAGS_sync_fault_injection || FLAGS_disable_wal ||
FLAGS_manual_wal_flush_one_in > 0) &&
IsStateTracked()) {
Status s = thread->shared->SaveAtAndAfter(db_);
if (!s.ok()) {
fprintf(stderr, "Error enabling history tracing: %s\n",
@ -3094,6 +3114,7 @@ void InitializeOptionsFromFlags(
options.compaction_options_universal.max_size_amplification_percent =
FLAGS_universal_max_size_amplification_percent;
options.atomic_flush = FLAGS_atomic_flush;
options.manual_wal_flush = FLAGS_manual_wal_flush_one_in > 0 ? true : false;
options.avoid_unnecessary_blocking_io = FLAGS_avoid_unnecessary_blocking_io;
options.write_dbid_to_manifest = FLAGS_write_dbid_to_manifest;
options.avoid_flush_during_recovery = FLAGS_avoid_flush_during_recovery;

@ -286,7 +286,7 @@ class WritableFileWriter {
bool use_direct_io() { return writable_file_->use_direct_io(); }
bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; }
bool BufferIsEmpty() { return buf_.CurrentSize() == 0; }
void TEST_SetFileChecksumGenerator(
FileChecksumGenerator* checksum_generator) {

@ -77,6 +77,7 @@ default_params = {
"expected_values_dir": lambda: setup_expected_values_dir(),
"fail_if_options_file_error": lambda: random.randint(0, 1),
"flush_one_in": 1000000,
"manual_wal_flush_one_in": lambda: random.choice([0, 0, 1000, 1000000]),
"file_checksum_impl": lambda: random.choice(["none", "crc32c", "xxh64", "big"]),
"get_live_files_one_in": 1000000,
# Note: the following two are intentionally disabled as the corresponding
@ -525,6 +526,7 @@ def finalize_and_sanitize(src_params):
if (
dest_params.get("disable_wal") == 1
or dest_params.get("sync_fault_injection") == 1
or dest_params.get("manual_wal_flush_one_in") > 0
):
# File ingestion does not guarantee prefix-recoverability when unsynced
# data can be lost. Ingesting a file syncs data immediately that is
@ -603,7 +605,7 @@ def finalize_and_sanitize(src_params):
# compatible with only write committed policy
if (dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0):
dest_params["sync_fault_injection"] = 0
dest_params["manual_wal_flush_one_in"] = 0
# PutEntity is currently not supported with Merge
if dest_params["use_put_entity_one_in"] != 0:
dest_params["use_merge"] = 0

Loading…
Cancel
Save