@ -52,18 +52,22 @@ class HdfsReadableFile : virtual public SequentialFile,
public :
public :
HdfsReadableFile ( hdfsFS fileSys , const std : : string & fname )
HdfsReadableFile ( hdfsFS fileSys , const std : : string & fname )
: fileSys_ ( fileSys ) , filename_ ( fname ) , hfile_ ( nullptr ) {
: fileSys_ ( fileSys ) , filename_ ( fname ) , hfile_ ( nullptr ) {
Log ( mylog , " [hdfs] HdfsReadableFile opening file %s \n " ,
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsReadableFile opening file %s \n " ,
filename_ . c_str ( ) ) ;
filename_ . c_str ( ) ) ;
hfile_ = hdfsOpenFile ( fileSys_ , filename_ . c_str ( ) , O_RDONLY , 0 , 0 , 0 ) ;
hfile_ = hdfsOpenFile ( fileSys_ , filename_ . c_str ( ) , O_RDONLY , 0 , 0 , 0 ) ;
Log ( mylog , " [hdfs] HdfsReadableFile opened file %s hfile_=0x%p \n " ,
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
filename_ . c_str ( ) , hfile_ ) ;
" [hdfs] HdfsReadableFile opened file %s hfile_=0x%p \n " ,
filename_ . c_str ( ) , hfile_ ) ;
}
}
virtual ~ HdfsReadableFile ( ) {
virtual ~ HdfsReadableFile ( ) {
Log ( mylog , " [hdfs] HdfsReadableFile closing file %s \n " ,
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
filename_ . c_str ( ) ) ;
" [hdfs] HdfsReadableFile closing file %s \n " ,
filename_ . c_str ( ) ) ;
hdfsCloseFile ( fileSys_ , hfile_ ) ;
hdfsCloseFile ( fileSys_ , hfile_ ) ;
Log ( mylog , " [hdfs] HdfsReadableFile closed file %s \n " ,
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsReadableFile closed file %s \n " ,
filename_ . c_str ( ) ) ;
filename_ . c_str ( ) ) ;
hfile_ = nullptr ;
hfile_ = nullptr ;
}
}
@ -75,7 +79,8 @@ class HdfsReadableFile : virtual public SequentialFile,
// sequential access, read data at current offset in file
// sequential access, read data at current offset in file
virtual Status Read ( size_t n , Slice * result , char * scratch ) {
virtual Status Read ( size_t n , Slice * result , char * scratch ) {
Status s ;
Status s ;
Log ( mylog , " [hdfs] HdfsReadableFile reading %s %ld \n " ,
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsReadableFile reading %s %ld \n " ,
filename_ . c_str ( ) , n ) ;
filename_ . c_str ( ) , n ) ;
char * buffer = scratch ;
char * buffer = scratch ;
@ -97,7 +102,8 @@ class HdfsReadableFile : virtual public SequentialFile,
}
}
assert ( total_bytes_read < = n ) ;
assert ( total_bytes_read < = n ) ;
Log ( mylog , " [hdfs] HdfsReadableFile read %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsReadableFile read %s \n " , filename_ . c_str ( ) ) ;
if ( bytes_read < 0 ) {
if ( bytes_read < 0 ) {
s = IOError ( filename_ , errno ) ;
s = IOError ( filename_ , errno ) ;
@ -112,10 +118,12 @@ class HdfsReadableFile : virtual public SequentialFile,
virtual Status Read ( uint64_t offset , size_t n , Slice * result ,
virtual Status Read ( uint64_t offset , size_t n , Slice * result ,
char * scratch ) const {
char * scratch ) const {
Status s ;
Status s ;
Log ( mylog , " [hdfs] HdfsReadableFile preading %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsReadableFile preading %s \n " , filename_ . c_str ( ) ) ;
ssize_t bytes_read = hdfsPread ( fileSys_ , hfile_ , offset ,
ssize_t bytes_read = hdfsPread ( fileSys_ , hfile_ , offset ,
( void * ) scratch , ( tSize ) n ) ;
( void * ) scratch , ( tSize ) n ) ;
Log ( mylog , " [hdfs] HdfsReadableFile pread %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsReadableFile pread %s \n " , filename_ . c_str ( ) ) ;
* result = Slice ( scratch , ( bytes_read < 0 ) ? 0 : bytes_read ) ;
* result = Slice ( scratch , ( bytes_read < 0 ) ? 0 : bytes_read ) ;
if ( bytes_read < 0 ) {
if ( bytes_read < 0 ) {
// An error: return a non-ok status
// An error: return a non-ok status
@ -125,7 +133,8 @@ class HdfsReadableFile : virtual public SequentialFile,
}
}
virtual Status Skip ( uint64_t n ) {
virtual Status Skip ( uint64_t n ) {
Log ( mylog , " [hdfs] HdfsReadableFile skip %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsReadableFile skip %s \n " , filename_ . c_str ( ) ) ;
// get current offset from file
// get current offset from file
tOffset current = hdfsTell ( fileSys_ , hfile_ ) ;
tOffset current = hdfsTell ( fileSys_ , hfile_ ) ;
if ( current < 0 ) {
if ( current < 0 ) {
@ -144,7 +153,8 @@ class HdfsReadableFile : virtual public SequentialFile,
// returns true if we are at the end of file, false otherwise
// returns true if we are at the end of file, false otherwise
bool feof ( ) {
bool feof ( ) {
Log ( mylog , " [hdfs] HdfsReadableFile feof %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsReadableFile feof %s \n " , filename_ . c_str ( ) ) ;
if ( hdfsTell ( fileSys_ , hfile_ ) = = fileSize ( ) ) {
if ( hdfsTell ( fileSys_ , hfile_ ) = = fileSize ( ) ) {
return true ;
return true ;
}
}
@ -153,7 +163,8 @@ class HdfsReadableFile : virtual public SequentialFile,
// the current size of the file
// the current size of the file
tOffset fileSize ( ) {
tOffset fileSize ( ) {
Log ( mylog , " [hdfs] HdfsReadableFile fileSize %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsReadableFile fileSize %s \n " , filename_ . c_str ( ) ) ;
hdfsFileInfo * pFileInfo = hdfsGetPathInfo ( fileSys_ , filename_ . c_str ( ) ) ;
hdfsFileInfo * pFileInfo = hdfsGetPathInfo ( fileSys_ , filename_ . c_str ( ) ) ;
tOffset size = 0L ;
tOffset size = 0L ;
if ( pFileInfo ! = nullptr ) {
if ( pFileInfo ! = nullptr ) {
@ -176,16 +187,20 @@ class HdfsWritableFile: public WritableFile {
public :
public :
HdfsWritableFile ( hdfsFS fileSys , const std : : string & fname )
HdfsWritableFile ( hdfsFS fileSys , const std : : string & fname )
: fileSys_ ( fileSys ) , filename_ ( fname ) , hfile_ ( nullptr ) {
: fileSys_ ( fileSys ) , filename_ ( fname ) , hfile_ ( nullptr ) {
Log ( mylog , " [hdfs] HdfsWritableFile opening %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsWritableFile opening %s \n " , filename_ . c_str ( ) ) ;
hfile_ = hdfsOpenFile ( fileSys_ , filename_ . c_str ( ) , O_WRONLY , 0 , 0 , 0 ) ;
hfile_ = hdfsOpenFile ( fileSys_ , filename_ . c_str ( ) , O_WRONLY , 0 , 0 , 0 ) ;
Log ( mylog , " [hdfs] HdfsWritableFile opened %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsWritableFile opened %s \n " , filename_ . c_str ( ) ) ;
assert ( hfile_ ! = nullptr ) ;
assert ( hfile_ ! = nullptr ) ;
}
}
virtual ~ HdfsWritableFile ( ) {
virtual ~ HdfsWritableFile ( ) {
if ( hfile_ ! = nullptr ) {
if ( hfile_ ! = nullptr ) {
Log ( mylog , " [hdfs] HdfsWritableFile closing %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsWritableFile closing %s \n " , filename_ . c_str ( ) ) ;
hdfsCloseFile ( fileSys_ , hfile_ ) ;
hdfsCloseFile ( fileSys_ , hfile_ ) ;
Log ( mylog , " [hdfs] HdfsWritableFile closed %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsWritableFile closed %s \n " , filename_ . c_str ( ) ) ;
hfile_ = nullptr ;
hfile_ = nullptr ;
}
}
}
}
@ -202,11 +217,13 @@ class HdfsWritableFile: public WritableFile {
}
}
virtual Status Append ( const Slice & data ) {
virtual Status Append ( const Slice & data ) {
Log ( mylog , " [hdfs] HdfsWritableFile Append %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsWritableFile Append %s \n " , filename_ . c_str ( ) ) ;
const char * src = data . data ( ) ;
const char * src = data . data ( ) ;
size_t left = data . size ( ) ;
size_t left = data . size ( ) ;
size_t ret = hdfsWrite ( fileSys_ , hfile_ , src , left ) ;
size_t ret = hdfsWrite ( fileSys_ , hfile_ , src , left ) ;
Log ( mylog , " [hdfs] HdfsWritableFile Appended %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsWritableFile Appended %s \n " , filename_ . c_str ( ) ) ;
if ( ret ! = left ) {
if ( ret ! = left ) {
return IOError ( filename_ , errno ) ;
return IOError ( filename_ , errno ) ;
}
}
@ -219,14 +236,16 @@ class HdfsWritableFile: public WritableFile {
virtual Status Sync ( ) {
virtual Status Sync ( ) {
Status s ;
Status s ;
Log ( mylog , " [hdfs] HdfsWritableFile Sync %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsWritableFile Sync %s \n " , filename_ . c_str ( ) ) ;
if ( hdfsFlush ( fileSys_ , hfile_ ) = = - 1 ) {
if ( hdfsFlush ( fileSys_ , hfile_ ) = = - 1 ) {
return IOError ( filename_ , errno ) ;
return IOError ( filename_ , errno ) ;
}
}
if ( hdfsHSync ( fileSys_ , hfile_ ) = = - 1 ) {
if ( hdfsHSync ( fileSys_ , hfile_ ) = = - 1 ) {
return IOError ( filename_ , errno ) ;
return IOError ( filename_ , errno ) ;
}
}
Log ( mylog , " [hdfs] HdfsWritableFile Synced %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsWritableFile Synced %s \n " , filename_ . c_str ( ) ) ;
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
@ -239,11 +258,13 @@ class HdfsWritableFile: public WritableFile {
}
}
virtual Status Close ( ) {
virtual Status Close ( ) {
Log ( mylog , " [hdfs] HdfsWritableFile closing %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsWritableFile closing %s \n " , filename_ . c_str ( ) ) ;
if ( hdfsCloseFile ( fileSys_ , hfile_ ) ! = 0 ) {
if ( hdfsCloseFile ( fileSys_ , hfile_ ) ! = 0 ) {
return IOError ( filename_ , errno ) ;
return IOError ( filename_ , errno ) ;
}
}
Log ( mylog , " [hdfs] HdfsWritableFile closed %s \n " , filename_ . c_str ( ) ) ;
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
" [hdfs] HdfsWritableFile closed %s \n " , filename_ . c_str ( ) ) ;
hfile_ = nullptr ;
hfile_ = nullptr ;
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
@ -258,13 +279,15 @@ class HdfsLogger : public Logger {
public :
public :
HdfsLogger ( HdfsWritableFile * f , uint64_t ( * gettid ) ( ) )
HdfsLogger ( HdfsWritableFile * f , uint64_t ( * gettid ) ( ) )
: file_ ( f ) , gettid_ ( gettid ) {
: file_ ( f ) , gettid_ ( gettid ) {
Log ( mylog , " [hdfs] HdfsLogger opened %s \n " ,
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
file_ - > getName ( ) . c_str ( ) ) ;
" [hdfs] HdfsLogger opened %s \n " ,
file_ - > getName ( ) . c_str ( ) ) ;
}
}
virtual ~ HdfsLogger ( ) {
virtual ~ HdfsLogger ( ) {
Log ( mylog , " [hdfs] HdfsLogger closed %s \n " ,
Log ( InfoLogLevel : : DEBUG_LEVEL , mylog ,
file_ - > getName ( ) . c_str ( ) ) ;
" [hdfs] HdfsLogger closed %s \n " ,
file_ - > getName ( ) . c_str ( ) ) ;
delete file_ ;
delete file_ ;
if ( mylog ! = nullptr & & mylog = = this ) {
if ( mylog ! = nullptr & & mylog = = this ) {
mylog = nullptr ;
mylog = nullptr ;
@ -417,7 +440,8 @@ Status HdfsEnv::NewDirectory(const std::string& name,
result - > reset ( new HdfsDirectory ( 0 ) ) ;
result - > reset ( new HdfsDirectory ( 0 ) ) ;
return Status : : OK ( ) ;
return Status : : OK ( ) ;
default : // fail if the directory doesn't exist
default : // fail if the directory doesn't exist
Log ( mylog , " NewDirectory hdfsExists call failed " ) ;
Log ( InfoLogLevel : : FATAL_LEVEL ,
mylog , " NewDirectory hdfsExists call failed " ) ;
throw HdfsFatalException ( " hdfsExists call failed with error " +
throw HdfsFatalException ( " hdfsExists call failed with error " +
std : : to_string ( value ) + " on path " + name +
std : : to_string ( value ) + " on path " + name +
" . \n " ) ;
" . \n " ) ;
@ -433,7 +457,8 @@ bool HdfsEnv::FileExists(const std::string& fname) {
case HDFS_DOESNT_EXIST :
case HDFS_DOESNT_EXIST :
return false ;
return false ;
default : // anything else should be an error
default : // anything else should be an error
Log ( mylog , " FileExists hdfsExists call failed " ) ;
Log ( InfoLogLevel : : FATAL_LEVEL ,
mylog , " FileExists hdfsExists call failed " ) ;
throw HdfsFatalException ( " hdfsExists call failed with error " +
throw HdfsFatalException ( " hdfsExists call failed with error " +
std : : to_string ( value ) + " on path " + fname +
std : : to_string ( value ) + " on path " + fname +
" . \n " ) ;
" . \n " ) ;
@ -461,7 +486,8 @@ Status HdfsEnv::GetChildren(const std::string& path,
}
}
} else {
} else {
// numEntries < 0 indicates error
// numEntries < 0 indicates error
Log ( mylog , " hdfsListDirectory call failed with error " ) ;
Log ( InfoLogLevel : : FATAL_LEVEL , mylog ,
" hdfsListDirectory call failed with error " ) ;
throw HdfsFatalException (
throw HdfsFatalException (
" hdfsListDirectory call failed negative error. \n " ) ;
" hdfsListDirectory call failed negative error. \n " ) ;
}
}
@ -470,7 +496,8 @@ Status HdfsEnv::GetChildren(const std::string& path,
case HDFS_DOESNT_EXIST : // directory does not exist, exit
case HDFS_DOESNT_EXIST : // directory does not exist, exit
break ;
break ;
default : // anything else should be an error
default : // anything else should be an error
Log ( mylog , " GetChildren hdfsExists call failed " ) ;
Log ( InfoLogLevel : : FATAL_LEVEL , mylog ,
" GetChildren hdfsExists call failed " ) ;
throw HdfsFatalException ( " hdfsExists call failed with error " +
throw HdfsFatalException ( " hdfsExists call failed with error " +
std : : to_string ( value ) + " . \n " ) ;
std : : to_string ( value ) + " . \n " ) ;
}
}
@ -500,7 +527,8 @@ Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
case HDFS_DOESNT_EXIST :
case HDFS_DOESNT_EXIST :
return CreateDir ( name ) ;
return CreateDir ( name ) ;
default : // anything else should be an error
default : // anything else should be an error
Log ( mylog , " CreateDirIfMissing hdfsExists call failed " ) ;
Log ( InfoLogLevel : : FATAL_LEVEL , mylog ,
" CreateDirIfMissing hdfsExists call failed " ) ;
throw HdfsFatalException ( " hdfsExists call failed with error " +
throw HdfsFatalException ( " hdfsExists call failed with error " +
std : : to_string ( value ) + " . \n " ) ;
std : : to_string ( value ) + " . \n " ) ;
}
}