From 7e327980a389f358032bc5fe2d00fab8e60f4dd3 Mon Sep 17 00:00:00 2001 From: Praveen Rao Date: Wed, 26 Aug 2015 18:51:18 -0700 Subject: [PATCH] Remove usage of C runtime API that has file handle limitation --- port/win/env_win.cc | 100 +++++++++++++++++------------------------ port/win/port_win.h | 1 + port/win/win_logger.cc | 33 +++++++++----- port/win/win_logger.h | 4 +- 4 files changed, 66 insertions(+), 72 deletions(-) diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 53d1ccf36..610de9a53 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -703,55 +703,69 @@ class AlignedBuffer { class WinSequentialFile : public SequentialFile { private: const std::string filename_; - FILE* file_; - int fd_; + HANDLE file_; + + // There is no equivalent of advising away buffered pages as in posix. + // To implement this flag we would need to do unbuffered reads which + // will need to be aligned (not sure there is a guarantee that the buffer + // passed in is aligned). + // Hence we currently ignore this flag. It is used only in a few cases + // which should not be perf critical. + // If perf evaluation finds this to be a problem, we can look into + // implementing this. bool use_os_buffer_; public: - WinSequentialFile(const std::string& fname, FILE* f, + WinSequentialFile(const std::string& fname, HANDLE f, const EnvOptions& options) : filename_(fname), file_(f), - fd_(fileno(f)), use_os_buffer_(options.use_os_buffer) {} virtual ~WinSequentialFile() { - assert(file_ != nullptr); - fclose(file_); + assert(file_ != INVALID_HANDLE_VALUE); + CloseHandle(file_); } virtual Status Read(size_t n, Slice* result, char* scratch) override { Status s; size_t r = 0; - // read() and fread() as well as write/fwrite do not guarantee - // to fullfil the entire request in one call thus the loop. - do { - r = fread(scratch, 1, n, file_); - } while (r == 0 && ferror(file_)); + // Windows ReadFile API accepts a DWORD. + // While it is possible to read in a loop if n is > UINT_MAX + // it is a highly unlikely case. + if (n > UINT_MAX) { + return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER); + } + + DWORD bytesToRead = static_cast(n); //cast is safe due to the check above + DWORD bytesRead = 0; + BOOL ret = ReadFile(file_, scratch, bytesToRead, &bytesRead, NULL); + if (ret == TRUE) { + r = bytesRead; + } else { + return IOErrorFromWindowsError(filename_, GetLastError()); + } IOSTATS_ADD(bytes_read, r); *result = Slice(scratch, r); - if (r < n) { - if (feof(file_)) { - // We leave status as ok if we hit the end of the file - // We also clear the error so that the reads can continue - // if a new data is written to the file - clearerr(file_); - } else { - // A partial read with an error: return a non-ok status - s = Status::IOError(filename_, strerror(errno)); - } - } - return s; } virtual Status Skip(uint64_t n) override { - if (fseek(file_, n, SEEK_CUR)) { - return IOError(filename_, errno); + // Can't handle more than signed max as SetFilePointerEx accepts a signed 64-bit + // integer. As such it is a highly unlikley case to have n so large. + if (n > _I64_MAX) { + return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER); + } + + LARGE_INTEGER li; + li.QuadPart = static_cast(n); //cast is safe due to the check above + BOOL ret = SetFilePointerEx(file_, li, NULL, FILE_CURRENT); + if (ret == FALSE) { + return IOErrorFromWindowsError(filename_, GetLastError()); } return Status::OK(); } @@ -1314,7 +1328,7 @@ class WinEnv : public Env { // Corruption test needs to rename and delete files of these kind // while they are still open with another handle. For that reason we // allow share_write and delete(allows rename). - HANDLE hFile = 0; + HANDLE hFile = INVALID_HANDLE_VALUE; { IOSTATS_TIMER_GUARD(open_nanos); hFile = CreateFileA( @@ -1329,22 +1343,7 @@ class WinEnv : public Env { s = IOErrorFromWindowsError("Failed to open NewSequentialFile" + fname, lastError); } else { - int fd = _open_osfhandle(reinterpret_cast(hFile), 0); - if (fd == -1) { - auto code = errno; - CloseHandle(hFile); - s = IOError("Failed to _open_osfhandle for NewSequentialFile: " + fname, - code); - } else { - FILE* file = _fdopen(fd, "rb"); - if (file == nullptr) { - auto code = errno; - _close(fd); - s = IOError("Failed to fdopen NewSequentialFile: " + fname, code); - } else { - result->reset(new WinSequentialFile(fname, file, options)); - } - } + result->reset(new WinSequentialFile(fname, hFile, options)); } return s; } @@ -1811,22 +1810,7 @@ class WinEnv : public Env { // Set creation, last access and last write time to the same value SetFileTime(hFile, &ft, &ft, &ft); } - - int fd = _open_osfhandle(reinterpret_cast(hFile), 0); - if (fd == -1) { - auto code = errno; - CloseHandle(hFile); - s = IOError("Failed to _open_osfhandle: " + fname, code); - } else { - FILE* file = _fdopen(fd, "w"); - if (file == nullptr) { - auto code = errno; - _close(fd); - s = IOError("Failed to fdopen: " + fname, code); - } else { - result->reset(new WinLogger(&WinEnv::gettid, this, file)); - } - } + result->reset(new WinLogger(&WinEnv::gettid, this, hFile)); } return s; } diff --git a/port/win/port_win.h b/port/win/port_win.h index 11ef74f0d..89dc9d9cc 100644 --- a/port/win/port_win.h +++ b/port/win/port_win.h @@ -92,6 +92,7 @@ typedef SSIZE_T ssize_t; namespace rocksdb { #define PREFETCH(addr, rw, locality) +std::string GetWindowsErrSz(DWORD err); namespace port { diff --git a/port/win/win_logger.cc b/port/win/win_logger.cc index a4cf2de65..e91930dff 100644 --- a/port/win/win_logger.cc +++ b/port/win/win_logger.cc @@ -18,13 +18,16 @@ #include #include "rocksdb/env.h" + +#include + #include "port/win/win_logger.h" #include "port/sys_time.h" #include "util/iostats_context_imp.h" namespace rocksdb { -WinLogger::WinLogger(uint64_t (*gettid)(), Env* env, FILE* file, +WinLogger::WinLogger(uint64_t (*gettid)(), Env* env, HANDLE file, const InfoLogLevel log_level) : Logger(log_level), gettid_(gettid), @@ -35,20 +38,23 @@ WinLogger::WinLogger(uint64_t (*gettid)(), Env* env, FILE* file, file_(file) {} void WinLogger::DebugWriter(const char* str, int len) { - size_t sz = fwrite(str, 1, len, file_); - if (sz == 0) { - perror("fwrite .. [BAD]"); + DWORD bytesWritten = 0; + BOOL ret = WriteFile(file_, str, len, &bytesWritten, NULL); + if (ret == FALSE) { + std::string errSz = GetWindowsErrSz(GetLastError()); + fprintf(stderr, errSz.c_str()); } } WinLogger::~WinLogger() { close(); } -void WinLogger::close() { fclose(file_); } +void WinLogger::close() { CloseHandle(file_); } void WinLogger::Flush() { if (flush_pending_) { flush_pending_ = false; - fflush(file_); + // With Windows API writes go to OS buffers directly so no fflush needed unlike + // with C runtime API. We don't flush all the way to disk for perf reasons. } last_flush_micros_ = env_->NowMicros(); @@ -118,14 +124,16 @@ void WinLogger::Logv(const char* format, va_list ap) { assert(p <= limit); const size_t write_size = p - base; - size_t sz = fwrite(base, 1, write_size, file_); - if (sz == 0) { - perror("fwrite .. [BAD]"); + DWORD bytesWritten = 0; + BOOL ret = WriteFile(file_, base, write_size, &bytesWritten, NULL); + if (ret == FALSE) { + std::string errSz = GetWindowsErrSz(GetLastError()); + fprintf(stderr, errSz.c_str()); } flush_pending_ = true; - assert(sz == write_size); - if (sz > 0) { + assert(bytesWritten == write_size); + if (bytesWritten > 0) { log_size_ += write_size; } @@ -133,7 +141,8 @@ void WinLogger::Logv(const char* format, va_list ap) { static_cast(now_tv.tv_sec) * 1000000 + now_tv.tv_usec; if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { flush_pending_ = false; - fflush(file_); + // With Windows API writes go to OS buffers directly so no fflush needed unlike + // with C runtime API. We don't flush all the way to disk for perf reasons. last_flush_micros_ = now_micros; } break; diff --git a/port/win/win_logger.h b/port/win/win_logger.h index 3f330d488..67e45907f 100644 --- a/port/win/win_logger.h +++ b/port/win/win_logger.h @@ -24,7 +24,7 @@ const int kDebugLogChunkSize = 128 * 1024; class WinLogger : public rocksdb::Logger { public: - WinLogger(uint64_t (*gettid)(), Env* env, FILE* file, + WinLogger(uint64_t (*gettid)(), Env* env, HANDLE file, const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL); virtual ~WinLogger(); @@ -44,7 +44,7 @@ class WinLogger : public rocksdb::Logger { void DebugWriter(const char* str, int len); private: - FILE* file_; + HANDLE file_; uint64_t (*gettid_)(); // Return the thread id for the current thread std::atomic_size_t log_size_; std::atomic_uint_fast64_t last_flush_micros_;