diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index 303cd81cf..e6fb8db12 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -18,9 +18,6 @@ namespace rocksdb { -static const std::string kProto = "hdfs://"; -static const std::string pathsep = "/"; - // Thrown during execution when there is an issue with the supplied // arguments. class HdfsUsageException : public std::exception { }; @@ -58,20 +55,23 @@ class HdfsEnv : public Env { } virtual Status NewSequentialFile(const std::string& fname, - SequentialFile** result); + std::unique_ptr* result, + const EnvOptions& options); virtual Status NewRandomAccessFile(const std::string& fname, - RandomAccessFile** result); + std::unique_ptr* result, + const EnvOptions& options); virtual Status NewWritableFile(const std::string& fname, - WritableFile** result); + std::unique_ptr* result, + const EnvOptions& options); virtual Status NewRandomRWFile(const std::string& fname, - unique_ptr* result, + std::unique_ptr* result, const EnvOptions& options); virtual Status NewDirectory(const std::string& name, - unique_ptr* result); + std::unique_ptr* result); virtual bool FileExists(const std::string& fname); @@ -97,7 +97,8 @@ class HdfsEnv : public Env { virtual Status UnlockFile(FileLock* lock); - virtual Status NewLogger(const std::string& fname, Logger** result); + virtual Status NewLogger(const std::string& fname, + std::shared_ptr* result); virtual void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW) { @@ -161,6 +162,9 @@ class HdfsEnv : public Env { // object here so that we can use posix timers, // posix threads, etc. + static const std::string kProto; + static const std::string pathsep; + /** * If the URI is specified of the form hdfs://server:port/path, * then connect to the specified cluster diff --git a/util/env_hdfs.cc b/util/env_hdfs.cc index c724b2302..eb2b12cba 100644 --- a/util/env_hdfs.cc +++ b/util/env_hdfs.cc @@ -18,6 +18,9 @@ #include "hdfs/hdfs.h" #include "hdfs/env_hdfs.h" +#define HDFS_EXISTS 0 +#define HDFS_DOESNT_EXIST 1 + // // This file defines an HDFS environment for rocksdb. It uses the libhdfs // api to access HDFS. All HDFS files created by one instance of rocksdb @@ -39,7 +42,8 @@ static Logger* mylog = nullptr; // Used for reading a file from HDFS. It implements both sequential-read // access methods as well as random read access methods. -class HdfsReadableFile: virtual public SequentialFile, virtual public RandomAccessFile { +class HdfsReadableFile : virtual public SequentialFile, + virtual public RandomAccessFile { private: hdfsFS fileSys_; std::string filename_; @@ -73,17 +77,34 @@ class HdfsReadableFile: virtual public SequentialFile, virtual public RandomAcce Status s; Log(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n", filename_.c_str(), n); - size_t bytes_read = hdfsRead(fileSys_, hfile_, scratch, (tSize)n); - Log(mylog, "[hdfs] HdfsReadableFile read %s\n", filename_.c_str()); - *result = Slice(scratch, bytes_read); - if (bytes_read < n) { - if (feof()) { - // We leave status as ok if we hit the end of the file - } else { - // A partial read with an error: return a non-ok status - s = IOError(filename_, errno); + + char* buffer = scratch; + size_t total_bytes_read = 0; + tSize bytes_read = 0; + tSize remaining_bytes = (tSize)n; + + // Read a total of n bytes repeatedly until we hit error or eof + while (remaining_bytes > 0) { + bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes); + if (bytes_read <= 0) { + break; } + assert(bytes_read <= remaining_bytes); + + total_bytes_read += bytes_read; + remaining_bytes -= bytes_read; + buffer += bytes_read; + } + assert(total_bytes_read <= n); + + Log(mylog, "[hdfs] HdfsReadableFile read %s\n", filename_.c_str()); + + if (bytes_read < 0) { + s = IOError(filename_, errno); + } else { + *result = Slice(scratch, total_bytes_read); } + return s; } @@ -139,8 +160,7 @@ class HdfsReadableFile: virtual public SequentialFile, virtual public RandomAcce size = pFileInfo->mSize; hdfsFreeFileInfo(pFileInfo, 1); } else { - throw rocksdb::HdfsFatalException("fileSize on unknown file " + - filename_); + throw HdfsFatalException("fileSize on unknown file " + filename_); } return size; } @@ -236,9 +256,8 @@ class HdfsLogger : public Logger { uint64_t (*gettid_)(); // Return the thread id for the current thread public: - HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)(), - const InfoLogLevel log_level = InfoLogLevel::ERROR) - : Logger(log_level), file_(f), gettid_(gettid) { + HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)()) + : file_(f), gettid_(gettid) { Log(mylog, "[hdfs] HdfsLogger opened %s\n", file_->getName().c_str()); } @@ -324,40 +343,52 @@ class HdfsLogger : public Logger { // Finally, the hdfs environment +const std::string HdfsEnv::kProto = "hdfs://"; +const std::string HdfsEnv::pathsep = "/"; + // open a file for sequential reading Status HdfsEnv::NewSequentialFile(const std::string& fname, - SequentialFile** result) { + unique_ptr* result, + const EnvOptions& options) { + result->reset(); HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); - if (f == nullptr) { + if (f == nullptr || !f->isValid()) { + delete f; *result = nullptr; return IOError(fname, errno); } - *result = dynamic_cast(f); + result->reset(dynamic_cast(f)); return Status::OK(); } // open a file for random reading Status HdfsEnv::NewRandomAccessFile(const std::string& fname, - RandomAccessFile** result) { + unique_ptr* result, + const EnvOptions& options) { + result->reset(); HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); - if (f == nullptr) { + if (f == nullptr || !f->isValid()) { + delete f; *result = nullptr; return IOError(fname, errno); } - *result = dynamic_cast(f); + result->reset(dynamic_cast(f)); return Status::OK(); } // create a new file for writing Status HdfsEnv::NewWritableFile(const std::string& fname, - WritableFile** result) { + unique_ptr* result, + const EnvOptions& options) { + result->reset(); Status s; HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); if (f == nullptr || !f->isValid()) { + delete f; *result = nullptr; return IOError(fname, errno); } - *result = dynamic_cast(f); + result->reset(dynamic_cast(f)); return Status::OK(); } @@ -367,24 +398,30 @@ Status HdfsEnv::NewRandomRWFile(const std::string& fname, return Status::NotSupported("NewRandomRWFile not supported on HdfsEnv"); } -virtual Status NewDirectory(const std::string& name, - unique_ptr* result) { - return Status::NotSupported("NewDirectory not yet supported on HdfsEnv"); +Status HdfsEnv::NewDirectory(const std::string& name, + unique_ptr* result) { + return Status::NotSupported("NewDirectory not supported on HdfsEnv"); } bool HdfsEnv::FileExists(const std::string& fname) { int value = hdfsExists(fileSys_, fname.c_str()); - if (value == 0) { + switch (value) { + case HDFS_EXISTS: return true; + case HDFS_DOESNT_EXIST: + return false; + default: // anything else should be an error + Log(mylog, "FileExists hdfsExists call failed"); + throw HdfsFatalException("hdfsExists call failed with error " + + std::to_string(value) + ".\n"); } - return false; } Status HdfsEnv::GetChildren(const std::string& path, std::vector* result) { int value = hdfsExists(fileSys_, path.c_str()); switch (value) { - case 0: { + case HDFS_EXISTS: { // directory exists int numEntries = 0; hdfsFileInfo* pHdfsFileInfo = 0; pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries); @@ -402,15 +439,17 @@ Status HdfsEnv::GetChildren(const std::string& path, } else { // numEntries < 0 indicates error Log(mylog, "hdfsListDirectory call failed with error "); - throw HdfsFatalException("hdfsListDirectory call failed negative error.\n"); + throw HdfsFatalException( + "hdfsListDirectory call failed negative error.\n"); } break; } - case 1: // directory does not exist, exit + case HDFS_DOESNT_EXIST: // directory does not exist, exit break; default: // anything else should be an error - Log(mylog, "hdfsListDirectory call failed with error "); - throw HdfsFatalException("hdfsListDirectory call failed with error.\n"); + Log(mylog, "GetChildren hdfsExists call failed"); + throw HdfsFatalException("hdfsExists call failed with error " + + std::to_string(value) + ".\n"); } return Status::OK(); } @@ -432,10 +471,15 @@ Status HdfsEnv::CreateDir(const std::string& name) { Status HdfsEnv::CreateDirIfMissing(const std::string& name) { const int value = hdfsExists(fileSys_, name.c_str()); // Not atomic. state might change b/w hdfsExists and CreateDir. - if (value == 0) { + switch (value) { + case HDFS_EXISTS: return Status::OK(); - } else { + case HDFS_DOESNT_EXIST: return CreateDir(name); + default: // anything else should be an error + Log(mylog, "CreateDirIfMissing hdfsExists call failed"); + throw HdfsFatalException("hdfsExists call failed with error " + + std::to_string(value) + ".\n"); } }; @@ -492,11 +536,12 @@ Status HdfsEnv::NewLogger(const std::string& fname, shared_ptr* result) { HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); if (f == nullptr || !f->isValid()) { + delete f; *result = nullptr; return IOError(fname, errno); } HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid); - *result = h; + result->reset(h); if (mylog == nullptr) { // mylog = h; // uncomment this for detailed logging }