@ -28,7 +28,8 @@
bool useOsBuffer = 1 ; // cache data in OS buffers
bool useOsBuffer = 1 ; // cache data in OS buffers
bool useFsReadAhead = 1 ; // allow filesystem to do readaheads
bool useFsReadAhead = 1 ; // allow filesystem to do readaheads
bool useMmapRead = 0 ;
bool useMmapRead = 0 ; // do not use mmaps for reading files
bool useMmapWrite = 1 ; // use mmaps for appending to files
namespace leveldb {
namespace leveldb {
@ -331,6 +332,131 @@ class PosixMmapFile : public WritableFile {
}
}
} ;
} ;
// Use posix write to write data to a file.
class PosixWritableFile : public WritableFile {
private :
const std : : string filename_ ;
int fd_ ;
size_t cursize_ ; // current size of cached data in buf_
size_t capacity_ ; // max size of buf_
char * buf_ ; // a buffer to cache writes
uint64_t filesize_ ;
bool pending_sync_ ;
bool pending_fsync_ ;
public :
PosixWritableFile ( const std : : string & fname , int fd , size_t capacity ) :
filename_ ( fname ) ,
fd_ ( fd ) ,
cursize_ ( 0 ) ,
capacity_ ( capacity ) ,
buf_ ( new char [ capacity ] ) ,
filesize_ ( 0 ) ,
pending_sync_ ( false ) ,
pending_fsync_ ( false ) {
}
~ PosixWritableFile ( ) {
if ( fd_ > = 0 ) {
PosixWritableFile : : Close ( ) ;
}
delete buf_ ;
buf_ = 0 ;
}
virtual Status Append ( const Slice & data ) {
char * src = ( char * ) data . data ( ) ;
size_t left = data . size ( ) ;
Status s ;
pending_sync_ = true ;
pending_fsync_ = true ;
// if there is no space in the cache, then flush
if ( cursize_ + left > capacity_ ) {
s = Flush ( ) ;
if ( ! s . ok ( ) ) {
return s ;
}
// Increase the buffer size, but capped at 1MB
if ( capacity_ < ( 1 < < 20 ) ) {
delete buf_ ;
capacity_ * = 2 ;
buf_ = new char [ capacity_ ] ;
}
assert ( cursize_ = = 0 ) ;
}
// if the write fits into the cache, then write to cache
// otherwise do a write() syscall to write to OS buffers.
if ( cursize_ + left < = capacity_ ) {
memcpy ( buf_ + cursize_ , src , left ) ;
cursize_ + = left ;
} else {
while ( left ! = 0 ) {
size_t done = write ( fd_ , src , left ) ;
if ( done < 0 ) {
return IOError ( filename_ , errno ) ;
}
left - = done ;
src + = done ;
}
}
filesize_ + = data . size ( ) ;
return Status : : OK ( ) ;
}
virtual Status Close ( ) {
Status s ;
s = Flush ( ) ; // flush cache to OS
if ( ! s . ok ( ) ) {
}
if ( close ( fd_ ) < 0 ) {
if ( s . ok ( ) ) {
s = IOError ( filename_ , errno ) ;
}
}
fd_ = - 1 ;
return s ;
}
// write out the cached data to the OS cache
virtual Status Flush ( ) {
size_t left = cursize_ ;
char * src = buf_ ;
while ( left ! = 0 ) {
size_t done = write ( fd_ , src , left ) ;
if ( done < 0 ) {
return IOError ( filename_ , errno ) ;
}
left - = done ;
src + = done ;
}
cursize_ = 0 ;
return Status : : OK ( ) ;
}
virtual Status Sync ( ) {
if ( pending_sync_ & & fdatasync ( fd_ ) < 0 ) {
return IOError ( filename_ , errno ) ;
}
pending_sync_ = false ;
return Status : : OK ( ) ;
}
virtual Status Fsync ( ) {
if ( pending_fsync_ & & fsync ( fd_ ) < 0 ) {
return IOError ( filename_ , errno ) ;
}
pending_fsync_ = false ;
pending_sync_ = false ;
return Status : : OK ( ) ;
}
virtual uint64_t GetFileSize ( ) {
return filesize_ ;
}
} ;
static int LockOrUnlock ( const std : : string & fname , int fd , bool lock ) {
static int LockOrUnlock ( const std : : string & fname , int fd , bool lock ) {
mutex_lockedFiles . Lock ( ) ;
mutex_lockedFiles . Lock ( ) ;
if ( lock ) {
if ( lock ) {
@ -431,7 +557,11 @@ class PosixEnv : public Env {
* result = NULL ;
* result = NULL ;
s = IOError ( fname , errno ) ;
s = IOError ( fname , errno ) ;
} else {
} else {
if ( useMmapWrite ) {
* result = new PosixMmapFile ( fname , fd , page_size_ ) ;
* result = new PosixMmapFile ( fname , fd , page_size_ ) ;
} else {
* result = new PosixWritableFile ( fname , fd , 65536 ) ;
}
}
}
return s ;
return s ;
}
}