diff --git a/db_stress_tool/db_stress_common.cc b/db_stress_tool/db_stress_common.cc index eccb9b554..577ff7db5 100644 --- a/db_stress_tool/db_stress_common.cc +++ b/db_stress_tool/db_stress_common.cc @@ -225,7 +225,7 @@ size_t GenerateValue(uint32_t rand, char* v, size_t max_sz) { ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult; assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t)); (void)max_sz; - *((uint32_t*)v) = rand; + PutUnaligned(reinterpret_cast(v), rand); for (size_t i = sizeof(uint32_t); i < value_sz; i++) { v[i] = (char)(rand ^ i); } @@ -233,6 +233,13 @@ size_t GenerateValue(uint32_t rand, char* v, size_t max_sz) { return value_sz; // the size of the value set. } +uint32_t GetValueBase(Slice s) { + assert(s.size() >= sizeof(uint32_t)); + uint32_t res; + GetUnaligned(reinterpret_cast(s.data()), &res); + return res; +} + std::string NowNanosStr() { uint64_t t = db_stress_env->NowNanos(); std::string ret; diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index d2e02cdd1..6b64ad3fa 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -567,6 +567,7 @@ extern std::vector GenerateNKeys(ThreadState* thread, int num_keys, uint64_t iteration); extern size_t GenerateValue(uint32_t rand, char* v, size_t max_sz); +extern uint32_t GetValueBase(Slice s); extern StressTest* CreateCfConsistencyStressTest(); extern StressTest* CreateBatchedOpsStressTest(); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 7af49fc05..05d757e4d 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -818,7 +818,7 @@ DEFINE_bool(sync_fault_injection, false, "and unsynced data in DB will lost after crash. In such a case we " "track DB changes in a trace file (\"*.trace\") in " "--expected_values_dir for verifying there are no holes in the " - "recovered data (future work)."); + "recovered data."); DEFINE_bool(best_efforts_recovery, false, "If true, use best efforts recovery."); diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index b99df4c47..1f1542f36 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -254,6 +254,10 @@ class SharedState { return expected_state_manager_->SaveAtAndAfter(db); } + bool HasHistory() { return expected_state_manager_->HasHistory(); } + + Status Restore(DB* db) { return expected_state_manager_->Restore(db); } + // Requires external locking covering all keys in `cf`. void ClearColumnFamily(int cf) { return expected_state_manager_->ClearColumnFamily(cf); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index dcfbd4589..ec2f14523 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -307,7 +307,20 @@ void StressTest::FinishInitDb(SharedState* shared) { fprintf(stdout, "Compaction filter factory: %s\n", compaction_filter_factory->Name()); } - // TODO(ajkr): First restore if there's already a trace. + + if (shared->HasHistory() && IsStateTracked()) { + // The way it works right now is, if there's any history, that means the + // previous run mutating the DB had all its operations traced, in which case + // we should always be able to `Restore()` the expected values to match the + // `db_`'s current seqno. + Status s = shared->Restore(db_); + if (!s.ok()) { + fprintf(stderr, "Error restoring historical expected values: %s\n", + s.ToString().c_str()); + exit(1); + } + } + if (FLAGS_sync_fault_injection && IsStateTracked()) { Status s = shared->SaveAtAndAfter(db_); if (!s.ok()) { diff --git a/db_stress_tool/expected_state.cc b/db_stress_tool/expected_state.cc index b01d8c4ad..075517f6f 100644 --- a/db_stress_tool/expected_state.cc +++ b/db_stress_tool/expected_state.cc @@ -7,8 +7,10 @@ #include "db_stress_tool/expected_state.h" +#include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_shared_state.h" #include "rocksdb/trace_reader_writer.h" +#include "rocksdb/trace_record_result.h" namespace ROCKSDB_NAMESPACE { @@ -318,13 +320,220 @@ Status FileExpectedStateManager::SaveAtAndAfter(DB* /* db */) { } #endif // ROCKSDB_LITE +bool FileExpectedStateManager::HasHistory() { + return saved_seqno_ != kMaxSequenceNumber; +} + +#ifndef ROCKSDB_LITE + +// An `ExpectedStateTraceRecordHandler` applies a configurable number of +// write operation trace records to the configured expected state. It is used in +// `FileExpectedStateManager::Restore()` to sync the expected state with the +// DB's post-recovery state. +class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, + public WriteBatch::Handler { + public: + ExpectedStateTraceRecordHandler(uint64_t max_write_ops, ExpectedState* state) + : max_write_ops_(max_write_ops), state_(state) {} + + ~ExpectedStateTraceRecordHandler() { + assert(num_write_ops_ == max_write_ops_); + } + + Status Handle(const WriteQueryTraceRecord& record, + std::unique_ptr* /* result */) override { + if (num_write_ops_ == max_write_ops_) { + return Status::OK(); + } + WriteBatch batch(record.GetWriteBatchRep().ToString()); + return batch.Iterate(this); + } + + // Ignore reads. + Status Handle(const GetQueryTraceRecord& /* record */, + std::unique_ptr* /* result */) override { + return Status::OK(); + } + + // Ignore reads. + Status Handle(const IteratorSeekQueryTraceRecord& /* record */, + std::unique_ptr* /* result */) override { + return Status::OK(); + } + + // Ignore reads. + Status Handle(const MultiGetQueryTraceRecord& /* record */, + std::unique_ptr* /* result */) override { + return Status::OK(); + } + + // Below are the WriteBatch::Handler overrides. We could use a separate + // object, but it's convenient and works to share state with the + // `TraceRecord::Handler`. + + Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + uint64_t key_id; + if (!GetIntVal(key.ToString(), &key_id)) { + return Status::Corruption("unable to parse key", key.ToString()); + } + uint32_t value_id = GetValueBase(value); + + state_->Put(column_family_id, static_cast(key_id), value_id, + false /* pending */); + ++num_write_ops_; + return Status::OK(); + } + + Status DeleteCF(uint32_t column_family_id, const Slice& key) override { + uint64_t key_id; + if (!GetIntVal(key.ToString(), &key_id)) { + return Status::Corruption("unable to parse key", key.ToString()); + } + + state_->Delete(column_family_id, static_cast(key_id), + false /* pending */); + ++num_write_ops_; + return Status::OK(); + } + + Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override { + return DeleteCF(column_family_id, key); + } + + Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key, + const Slice& end_key) override { + uint64_t begin_key_id, end_key_id; + if (!GetIntVal(begin_key.ToString(), &begin_key_id)) { + return Status::Corruption("unable to parse begin key", + begin_key.ToString()); + } + if (!GetIntVal(end_key.ToString(), &end_key_id)) { + return Status::Corruption("unable to parse end key", end_key.ToString()); + } + + state_->DeleteRange(column_family_id, static_cast(begin_key_id), + static_cast(end_key_id), false /* pending */); + ++num_write_ops_; + return Status::OK(); + } + + Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + return PutCF(column_family_id, key, value); + } + + private: + uint64_t num_write_ops_ = 0; + uint64_t max_write_ops_; + ExpectedState* state_; +}; + +Status FileExpectedStateManager::Restore(DB* db) { + assert(HasHistory()); + SequenceNumber seqno = db->GetLatestSequenceNumber(); + if (seqno < saved_seqno_) { + return Status::Corruption("DB is older than any restorable expected state"); + } + + std::string state_filename = ToString(saved_seqno_) + kStateFilenameSuffix; + std::string state_file_path = GetPathForFilename(state_filename); + + std::string latest_file_temp_path = + GetTempPathForFilename(kLatestBasename + kStateFilenameSuffix); + std::string latest_file_path = + GetPathForFilename(kLatestBasename + kStateFilenameSuffix); + + std::string trace_filename = ToString(saved_seqno_) + kTraceFilenameSuffix; + std::string trace_file_path = GetPathForFilename(trace_filename); + + std::unique_ptr trace_reader; + Status s = NewFileTraceReader(Env::Default(), EnvOptions(), trace_file_path, + &trace_reader); + + if (s.ok()) { + // We are going to replay on top of "`seqno`.state" to create a new + // "LATEST.state". Start off by creating a tempfile so we can later make the + // new "LATEST.state" appear atomically using `RenameFile()`. + s = CopyFile(FileSystem::Default(), state_file_path, latest_file_temp_path, + 0 /* size */, false /* use_fsync */); + } + + { + std::unique_ptr replayer; + std::unique_ptr state; + std::unique_ptr handler; + if (s.ok()) { + state.reset(new FileExpectedState(latest_file_temp_path, max_key_, + num_column_families_)); + s = state->Open(false /* create */); + } + if (s.ok()) { + handler.reset(new ExpectedStateTraceRecordHandler(seqno - saved_seqno_, + state.get())); + // TODO(ajkr): An API limitation requires we provide `handles` although + // they will be unused since we only use the replayer for reading records. + // Just give a default CFH for now to satisfy the requirement. + s = db->NewDefaultReplayer({db->DefaultColumnFamily()} /* handles */, + std::move(trace_reader), &replayer); + } + + if (s.ok()) { + s = replayer->Prepare(); + } + for (;;) { + std::unique_ptr record; + s = replayer->Next(&record); + if (!s.ok()) { + break; + } + std::unique_ptr res; + record->Accept(handler.get(), &res); + } + if (s.IsIncomplete()) { + // OK because `Status::Incomplete` is expected upon finishing all the + // trace records. + s = Status::OK(); + } + } + + if (s.ok()) { + s = FileSystem::Default()->RenameFile(latest_file_temp_path, + latest_file_path, IOOptions(), + nullptr /* dbg */); + } + if (s.ok()) { + latest_.reset(new FileExpectedState(latest_file_path, max_key_, + num_column_families_)); + s = latest_->Open(false /* create */); + } + + // Delete old state/trace files. We must delete the state file first. + // Otherwise, a crash-recovery immediately after deleting the trace file could + // lead to `Restore()` unable to replay to `seqno`. + if (s.ok()) { + s = Env::Default()->DeleteFile(state_file_path); + } + if (s.ok()) { + saved_seqno_ = kMaxSequenceNumber; + s = Env::Default()->DeleteFile(trace_file_path); + } + return s; +} +#else // ROCKSDB_LITE +Status FileExpectedStateManager::Restore(DB* /* db */) { + return Status::NotSupported(); +} +#endif // ROCKSDB_LITE + Status FileExpectedStateManager::Clean() { std::vector expected_state_dir_children; Status s = Env::Default()->GetChildren(expected_state_dir_path_, &expected_state_dir_children); // An incomplete `Open()` or incomplete `SaveAtAndAfter()` could have left // behind invalid temporary files. An incomplete `SaveAtAndAfter()` could have - // also left behind stale state/trace files. + // also left behind stale state/trace files. An incomplete `Restore()` could + // have left behind stale trace files. for (size_t i = 0; s.ok() && i < expected_state_dir_children.size(); ++i) { const auto& filename = expected_state_dir_children[i]; if (filename.rfind(kTempFilenamePrefix, 0 /* pos */) == 0 && @@ -349,7 +558,6 @@ Status FileExpectedStateManager::Clean() { ParseUint64(filename.substr( 0, filename.size() - kTraceFilenameSuffix.size())) < saved_seqno_) { - assert(saved_seqno_ != kMaxSequenceNumber); // Delete stale trace files. s = Env::Default()->DeleteFile(GetPathForFilename(filename)); } diff --git a/db_stress_tool/expected_state.h b/db_stress_tool/expected_state.h index 1c3e4665c..41d747e76 100644 --- a/db_stress_tool/expected_state.h +++ b/db_stress_tool/expected_state.h @@ -133,13 +133,30 @@ class ExpectedStateManager { virtual Status Open() = 0; // Saves expected values for the current state of `db` and begins tracking - // changes. + // changes. Following a successful `SaveAtAndAfter()`, `Restore()` can be + // called on the same DB, as long as its state does not roll back to before + // its current state. // // Requires external locking preventing concurrent execution with any other // member function. Furthermore, `db` must not be mutated while this function // is executing. virtual Status SaveAtAndAfter(DB* db) = 0; + // Returns true if at least one state of historical expected values can be + // restored. + // + // Requires external locking preventing concurrent execution with any other + // member function. + virtual bool HasHistory() = 0; + + // Restores expected values according to the current state of `db`. See + // `SaveAtAndAfter()` for conditions where this can be called. + // + // Requires external locking preventing concurrent execution with any other + // member function. Furthermore, `db` must not be mutated while this function + // is executing. + virtual Status Restore(DB* db) = 0; + // Requires external locking covering all keys in `cf`. void ClearColumnFamily(int cf) { return latest_->ClearColumnFamily(cf); } @@ -204,8 +221,21 @@ class FileExpectedStateManager : public ExpectedStateManager { // // This implementation makes a copy of "LATEST.state" into // ".state", and starts a trace in ".trace". + // Due to using external files, a following `Restore()` can happen even + // from a different process. Status SaveAtAndAfter(DB* db) override; + // See `ExpectedStateManager::HasHistory()` API doc. + bool HasHistory() override; + + // See `ExpectedStateManager::Restore()` API doc. + // + // Say `db->GetLatestSequenceNumber()` was `a` last time `SaveAtAndAfter()` + // was called and now it is `b`. Then this function replays `b - a` write + // operations from "`a`.trace" onto "`a`.state", and then copies the resulting + // file into "LATEST.state". + Status Restore(DB* db) override; + private: // Requires external locking preventing concurrent execution with any other // member function. @@ -238,6 +268,15 @@ class AnonExpectedStateManager : public ExpectedStateManager { return Status::NotSupported(); } + // See `ExpectedStateManager::HasHistory()` API doc. + bool HasHistory() override { return false; } + + // See `ExpectedStateManager::Restore()` API doc. + // + // This implementation returns `Status::NotSupported` since we do not + // currently have a need to keep history of expected state within a process. + Status Restore(DB* /* db */) override { return Status::NotSupported(); } + // Requires external locking preventing concurrent execution with any other // member function. Status Open() override;