@ -7,8 +7,10 @@
# include "db_stress_tool/expected_state.h"
# include "db_stress_tool/expected_state.h"
# include "db_stress_tool/db_stress_common.h"
# include "db_stress_tool/db_stress_shared_state.h"
# include "db_stress_tool/db_stress_shared_state.h"
# include "rocksdb/trace_reader_writer.h"
# include "rocksdb/trace_reader_writer.h"
# include "rocksdb/trace_record_result.h"
namespace ROCKSDB_NAMESPACE {
namespace ROCKSDB_NAMESPACE {
@ -318,13 +320,220 @@ Status FileExpectedStateManager::SaveAtAndAfter(DB* /* db */) {
}
}
# endif // ROCKSDB_LITE
# endif // ROCKSDB_LITE
bool FileExpectedStateManager : : HasHistory ( ) {
return saved_seqno_ ! = kMaxSequenceNumber ;
}
# ifndef ROCKSDB_LITE
// An `ExpectedStateTraceRecordHandler` applies a configurable number of
// write operation trace records to the configured expected state. It is used in
// `FileExpectedStateManager::Restore()` to sync the expected state with the
// DB's post-recovery state.
class ExpectedStateTraceRecordHandler : public TraceRecord : : Handler ,
public WriteBatch : : Handler {
public :
ExpectedStateTraceRecordHandler ( uint64_t max_write_ops , ExpectedState * state )
: max_write_ops_ ( max_write_ops ) , state_ ( state ) { }
~ ExpectedStateTraceRecordHandler ( ) {
assert ( num_write_ops_ = = max_write_ops_ ) ;
}
Status Handle ( const WriteQueryTraceRecord & record ,
std : : unique_ptr < TraceRecordResult > * /* result */ ) override {
if ( num_write_ops_ = = max_write_ops_ ) {
return Status : : OK ( ) ;
}
WriteBatch batch ( record . GetWriteBatchRep ( ) . ToString ( ) ) ;
return batch . Iterate ( this ) ;
}
// Ignore reads.
Status Handle ( const GetQueryTraceRecord & /* record */ ,
std : : unique_ptr < TraceRecordResult > * /* result */ ) override {
return Status : : OK ( ) ;
}
// Ignore reads.
Status Handle ( const IteratorSeekQueryTraceRecord & /* record */ ,
std : : unique_ptr < TraceRecordResult > * /* result */ ) override {
return Status : : OK ( ) ;
}
// Ignore reads.
Status Handle ( const MultiGetQueryTraceRecord & /* record */ ,
std : : unique_ptr < TraceRecordResult > * /* result */ ) override {
return Status : : OK ( ) ;
}
// Below are the WriteBatch::Handler overrides. We could use a separate
// object, but it's convenient and works to share state with the
// `TraceRecord::Handler`.
Status PutCF ( uint32_t column_family_id , const Slice & key ,
const Slice & value ) override {
uint64_t key_id ;
if ( ! GetIntVal ( key . ToString ( ) , & key_id ) ) {
return Status : : Corruption ( " unable to parse key " , key . ToString ( ) ) ;
}
uint32_t value_id = GetValueBase ( value ) ;
state_ - > Put ( column_family_id , static_cast < int64_t > ( key_id ) , value_id ,
false /* pending */ ) ;
+ + num_write_ops_ ;
return Status : : OK ( ) ;
}
Status DeleteCF ( uint32_t column_family_id , const Slice & key ) override {
uint64_t key_id ;
if ( ! GetIntVal ( key . ToString ( ) , & key_id ) ) {
return Status : : Corruption ( " unable to parse key " , key . ToString ( ) ) ;
}
state_ - > Delete ( column_family_id , static_cast < int64_t > ( key_id ) ,
false /* pending */ ) ;
+ + num_write_ops_ ;
return Status : : OK ( ) ;
}
Status SingleDeleteCF ( uint32_t column_family_id , const Slice & key ) override {
return DeleteCF ( column_family_id , key ) ;
}
Status DeleteRangeCF ( uint32_t column_family_id , const Slice & begin_key ,
const Slice & end_key ) override {
uint64_t begin_key_id , end_key_id ;
if ( ! GetIntVal ( begin_key . ToString ( ) , & begin_key_id ) ) {
return Status : : Corruption ( " unable to parse begin key " ,
begin_key . ToString ( ) ) ;
}
if ( ! GetIntVal ( end_key . ToString ( ) , & end_key_id ) ) {
return Status : : Corruption ( " unable to parse end key " , end_key . ToString ( ) ) ;
}
state_ - > DeleteRange ( column_family_id , static_cast < int64_t > ( begin_key_id ) ,
static_cast < int64_t > ( end_key_id ) , false /* pending */ ) ;
+ + num_write_ops_ ;
return Status : : OK ( ) ;
}
Status MergeCF ( uint32_t column_family_id , const Slice & key ,
const Slice & value ) override {
return PutCF ( column_family_id , key , value ) ;
}
private :
uint64_t num_write_ops_ = 0 ;
uint64_t max_write_ops_ ;
ExpectedState * state_ ;
} ;
Status FileExpectedStateManager : : Restore ( DB * db ) {
assert ( HasHistory ( ) ) ;
SequenceNumber seqno = db - > GetLatestSequenceNumber ( ) ;
if ( seqno < saved_seqno_ ) {
return Status : : Corruption ( " DB is older than any restorable expected state " ) ;
}
std : : string state_filename = ToString ( saved_seqno_ ) + kStateFilenameSuffix ;
std : : string state_file_path = GetPathForFilename ( state_filename ) ;
std : : string latest_file_temp_path =
GetTempPathForFilename ( kLatestBasename + kStateFilenameSuffix ) ;
std : : string latest_file_path =
GetPathForFilename ( kLatestBasename + kStateFilenameSuffix ) ;
std : : string trace_filename = ToString ( saved_seqno_ ) + kTraceFilenameSuffix ;
std : : string trace_file_path = GetPathForFilename ( trace_filename ) ;
std : : unique_ptr < TraceReader > trace_reader ;
Status s = NewFileTraceReader ( Env : : Default ( ) , EnvOptions ( ) , trace_file_path ,
& trace_reader ) ;
if ( s . ok ( ) ) {
// We are going to replay on top of "`seqno`.state" to create a new
// "LATEST.state". Start off by creating a tempfile so we can later make the
// new "LATEST.state" appear atomically using `RenameFile()`.
s = CopyFile ( FileSystem : : Default ( ) , state_file_path , latest_file_temp_path ,
0 /* size */ , false /* use_fsync */ ) ;
}
{
std : : unique_ptr < Replayer > replayer ;
std : : unique_ptr < ExpectedState > state ;
std : : unique_ptr < TraceRecord : : Handler > handler ;
if ( s . ok ( ) ) {
state . reset ( new FileExpectedState ( latest_file_temp_path , max_key_ ,
num_column_families_ ) ) ;
s = state - > Open ( false /* create */ ) ;
}
if ( s . ok ( ) ) {
handler . reset ( new ExpectedStateTraceRecordHandler ( seqno - saved_seqno_ ,
state . get ( ) ) ) ;
// TODO(ajkr): An API limitation requires we provide `handles` although
// they will be unused since we only use the replayer for reading records.
// Just give a default CFH for now to satisfy the requirement.
s = db - > NewDefaultReplayer ( { db - > DefaultColumnFamily ( ) } /* handles */ ,
std : : move ( trace_reader ) , & replayer ) ;
}
if ( s . ok ( ) ) {
s = replayer - > Prepare ( ) ;
}
for ( ; ; ) {
std : : unique_ptr < TraceRecord > record ;
s = replayer - > Next ( & record ) ;
if ( ! s . ok ( ) ) {
break ;
}
std : : unique_ptr < TraceRecordResult > res ;
record - > Accept ( handler . get ( ) , & res ) ;
}
if ( s . IsIncomplete ( ) ) {
// OK because `Status::Incomplete` is expected upon finishing all the
// trace records.
s = Status : : OK ( ) ;
}
}
if ( s . ok ( ) ) {
s = FileSystem : : Default ( ) - > RenameFile ( latest_file_temp_path ,
latest_file_path , IOOptions ( ) ,
nullptr /* dbg */ ) ;
}
if ( s . ok ( ) ) {
latest_ . reset ( new FileExpectedState ( latest_file_path , max_key_ ,
num_column_families_ ) ) ;
s = latest_ - > Open ( false /* create */ ) ;
}
// Delete old state/trace files. We must delete the state file first.
// Otherwise, a crash-recovery immediately after deleting the trace file could
// lead to `Restore()` unable to replay to `seqno`.
if ( s . ok ( ) ) {
s = Env : : Default ( ) - > DeleteFile ( state_file_path ) ;
}
if ( s . ok ( ) ) {
saved_seqno_ = kMaxSequenceNumber ;
s = Env : : Default ( ) - > DeleteFile ( trace_file_path ) ;
}
return s ;
}
# else // ROCKSDB_LITE
Status FileExpectedStateManager : : Restore ( DB * /* db */ ) {
return Status : : NotSupported ( ) ;
}
# endif // ROCKSDB_LITE
Status FileExpectedStateManager : : Clean ( ) {
Status FileExpectedStateManager : : Clean ( ) {
std : : vector < std : : string > expected_state_dir_children ;
std : : vector < std : : string > expected_state_dir_children ;
Status s = Env : : Default ( ) - > GetChildren ( expected_state_dir_path_ ,
Status s = Env : : Default ( ) - > GetChildren ( expected_state_dir_path_ ,
& expected_state_dir_children ) ;
& expected_state_dir_children ) ;
// An incomplete `Open()` or incomplete `SaveAtAndAfter()` could have left
// An incomplete `Open()` or incomplete `SaveAtAndAfter()` could have left
// behind invalid temporary files. An incomplete `SaveAtAndAfter()` could have
// behind invalid temporary files. An incomplete `SaveAtAndAfter()` could have
// also left behind stale state/trace files.
// also left behind stale state/trace files. An incomplete `Restore()` could
// have left behind stale trace files.
for ( size_t i = 0 ; s . ok ( ) & & i < expected_state_dir_children . size ( ) ; + + i ) {
for ( size_t i = 0 ; s . ok ( ) & & i < expected_state_dir_children . size ( ) ; + + i ) {
const auto & filename = expected_state_dir_children [ i ] ;
const auto & filename = expected_state_dir_children [ i ] ;
if ( filename . rfind ( kTempFilenamePrefix , 0 /* pos */ ) = = 0 & &
if ( filename . rfind ( kTempFilenamePrefix , 0 /* pos */ ) = = 0 & &
@ -349,7 +558,6 @@ Status FileExpectedStateManager::Clean() {
ParseUint64 ( filename . substr (
ParseUint64 ( filename . substr (
0 , filename . size ( ) - kTraceFilenameSuffix . size ( ) ) ) <
0 , filename . size ( ) - kTraceFilenameSuffix . size ( ) ) ) <
saved_seqno_ ) {
saved_seqno_ ) {
assert ( saved_seqno_ ! = kMaxSequenceNumber ) ;
// Delete stale trace files.
// Delete stale trace files.
s = Env : : Default ( ) - > DeleteFile ( GetPathForFilename ( filename ) ) ;
s = Env : : Default ( ) - > DeleteFile ( GetPathForFilename ( filename ) ) ;
}
}