@ -12,13 +12,16 @@
# include <algorithm>
# include <algorithm>
# include <chrono>
# include <chrono>
# include "util/rate_limiter.h"
# include "util/rate_limiter.h"
# include "util/random.h"
# include "util/murmurhash.h"
namespace rocksdb {
namespace rocksdb {
class MemFile {
class MemFile {
public :
public :
explicit MemFile ( const std : : string & fn ) :
explicit MemFile ( const std : : string & fn ) :
fn_ ( fn ) , refs_ ( 0 ) , size_ ( 0 ) , modified_time_ ( Now ( ) ) { }
fn_ ( fn ) , refs_ ( 0 ) , size_ ( 0 ) , modified_time_ ( Now ( ) ) ,
rnd_ ( ( uint32_t ) MurmurHash ( fn . data ( ) , fn . size ( ) , 0 ) ) , fsynced_bytes_ ( 0 ) { }
void Ref ( ) {
void Ref ( ) {
MutexLock lock ( & mutex_ ) ;
MutexLock lock ( & mutex_ ) ;
@ -53,6 +56,19 @@ class MemFile {
}
}
}
}
void CorruptBuffer ( ) {
if ( fsynced_bytes_ > = size_ ) {
return ;
}
uint64_t buffered_bytes = size_ - fsynced_bytes_ ;
uint64_t start = fsynced_bytes_ + rnd_ . Uniform ( buffered_bytes ) ;
uint64_t end = std : : min ( start + 512 , size_ . load ( ) ) ;
MutexLock lock ( & mutex_ ) ;
for ( uint64_t pos = start ; pos < end ; + + pos ) {
data_ [ pos ] = static_cast < char > ( rnd_ . Uniform ( 256 ) ) ;
}
}
Status Read ( uint64_t offset , size_t n , Slice * result , char * scratch ) const {
Status Read ( uint64_t offset , size_t n , Slice * result , char * scratch ) const {
MutexLock lock ( & mutex_ ) ;
MutexLock lock ( & mutex_ ) ;
if ( offset > Size ( ) ) {
if ( offset > Size ( ) ) {
@ -84,6 +100,7 @@ class MemFile {
}
}
Status Fsync ( ) {
Status Fsync ( ) {
fsynced_bytes_ = size_ . load ( ) ;
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
@ -110,9 +127,14 @@ class MemFile {
mutable port : : Mutex mutex_ ;
mutable port : : Mutex mutex_ ;
int refs_ ;
int refs_ ;
// Data written into this file, all bytes before fsynced_bytes are
// persistent.
std : : string data_ ;
std : : string data_ ;
std : : atomic < uint64_t > size_ ;
std : : atomic < uint64_t > size_ ;
std : : atomic < uint64_t > modified_time_ ;
std : : atomic < uint64_t > modified_time_ ;
Random rnd_ ;
std : : atomic < uint64_t > fsynced_bytes_ ;
} ;
} ;
namespace {
namespace {
@ -197,7 +219,7 @@ class WritableFileImpl : public WritableFile {
}
}
virtual Status Close ( ) {
virtual Status Close ( ) {
return Status : : OK ( ) ;
return file_ - > Fsync ( ) ;
}
}
virtual Status Flush ( ) {
virtual Status Flush ( ) {
@ -581,7 +603,7 @@ Status MockEnv::GetTestDirectory(std::string* path) {
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
// Non-virtual functions, specific to MockEnv
// Non-virtual functions, specific to MockEnv
Status MockEnv : : Truncate ( const std : : string & fname , size_t size ) {
Status MockEnv : : Truncate ( const std : : string & fname , size_t size ) {
auto fn = NormalizePath ( fname ) ;
auto fn = NormalizePath ( fname ) ;
MutexLock lock ( & mutex_ ) ;
MutexLock lock ( & mutex_ ) ;
@ -593,6 +615,17 @@ Status MockEnv::Truncate(const std::string& fname, size_t size) {
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
Status MockEnv : : CorruptBuffer ( const std : : string & fname ) {
auto fn = NormalizePath ( fname ) ;
MutexLock lock ( & mutex_ ) ;
auto iter = file_map_ . find ( fn ) ;
if ( iter = = file_map_ . end ( ) ) {
return Status : : IOError ( fn , " File not found " ) ;
}
iter - > second - > CorruptBuffer ( ) ;
return Status : : OK ( ) ;
}
std : : string MockEnv : : NormalizePath ( const std : : string path ) {
std : : string MockEnv : : NormalizePath ( const std : : string path ) {
std : : string dst ;
std : : string dst ;
for ( auto c : path ) {
for ( auto c : path ) {