Remove usage of C runtime API that has file handle limitation

main
Praveen Rao 9 years ago
parent 1fb2abae2d
commit 7e327980a3
  1. 100
      port/win/env_win.cc
  2. 1
      port/win/port_win.h
  3. 33
      port/win/win_logger.cc
  4. 4
      port/win/win_logger.h

@ -703,55 +703,69 @@ class AlignedBuffer {
class WinSequentialFile : public SequentialFile { class WinSequentialFile : public SequentialFile {
private: private:
const std::string filename_; const std::string filename_;
FILE* file_; HANDLE file_;
int fd_;
// There is no equivalent of advising away buffered pages as in posix.
// To implement this flag we would need to do unbuffered reads which
// will need to be aligned (not sure there is a guarantee that the buffer
// passed in is aligned).
// Hence we currently ignore this flag. It is used only in a few cases
// which should not be perf critical.
// If perf evaluation finds this to be a problem, we can look into
// implementing this.
bool use_os_buffer_; bool use_os_buffer_;
public: public:
WinSequentialFile(const std::string& fname, FILE* f, WinSequentialFile(const std::string& fname, HANDLE f,
const EnvOptions& options) const EnvOptions& options)
: filename_(fname), : filename_(fname),
file_(f), file_(f),
fd_(fileno(f)),
use_os_buffer_(options.use_os_buffer) {} use_os_buffer_(options.use_os_buffer) {}
virtual ~WinSequentialFile() { virtual ~WinSequentialFile() {
assert(file_ != nullptr); assert(file_ != INVALID_HANDLE_VALUE);
fclose(file_); CloseHandle(file_);
} }
virtual Status Read(size_t n, Slice* result, char* scratch) override { virtual Status Read(size_t n, Slice* result, char* scratch) override {
Status s; Status s;
size_t r = 0; size_t r = 0;
// read() and fread() as well as write/fwrite do not guarantee // Windows ReadFile API accepts a DWORD.
// to fullfil the entire request in one call thus the loop. // While it is possible to read in a loop if n is > UINT_MAX
do { // it is a highly unlikely case.
r = fread(scratch, 1, n, file_); if (n > UINT_MAX) {
} while (r == 0 && ferror(file_)); return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER);
}
DWORD bytesToRead = static_cast<DWORD>(n); //cast is safe due to the check above
DWORD bytesRead = 0;
BOOL ret = ReadFile(file_, scratch, bytesToRead, &bytesRead, NULL);
if (ret == TRUE) {
r = bytesRead;
} else {
return IOErrorFromWindowsError(filename_, GetLastError());
}
IOSTATS_ADD(bytes_read, r); IOSTATS_ADD(bytes_read, r);
*result = Slice(scratch, r); *result = Slice(scratch, r);
if (r < n) {
if (feof(file_)) {
// We leave status as ok if we hit the end of the file
// We also clear the error so that the reads can continue
// if a new data is written to the file
clearerr(file_);
} else {
// A partial read with an error: return a non-ok status
s = Status::IOError(filename_, strerror(errno));
}
}
return s; return s;
} }
virtual Status Skip(uint64_t n) override { virtual Status Skip(uint64_t n) override {
if (fseek(file_, n, SEEK_CUR)) { // Can't handle more than signed max as SetFilePointerEx accepts a signed 64-bit
return IOError(filename_, errno); // integer. As such it is a highly unlikley case to have n so large.
if (n > _I64_MAX) {
return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER);
}
LARGE_INTEGER li;
li.QuadPart = static_cast<int64_t>(n); //cast is safe due to the check above
BOOL ret = SetFilePointerEx(file_, li, NULL, FILE_CURRENT);
if (ret == FALSE) {
return IOErrorFromWindowsError(filename_, GetLastError());
} }
return Status::OK(); return Status::OK();
} }
@ -1314,7 +1328,7 @@ class WinEnv : public Env {
// Corruption test needs to rename and delete files of these kind // Corruption test needs to rename and delete files of these kind
// while they are still open with another handle. For that reason we // while they are still open with another handle. For that reason we
// allow share_write and delete(allows rename). // allow share_write and delete(allows rename).
HANDLE hFile = 0; HANDLE hFile = INVALID_HANDLE_VALUE;
{ {
IOSTATS_TIMER_GUARD(open_nanos); IOSTATS_TIMER_GUARD(open_nanos);
hFile = CreateFileA( hFile = CreateFileA(
@ -1329,22 +1343,7 @@ class WinEnv : public Env {
s = IOErrorFromWindowsError("Failed to open NewSequentialFile" + fname, s = IOErrorFromWindowsError("Failed to open NewSequentialFile" + fname,
lastError); lastError);
} else { } else {
int fd = _open_osfhandle(reinterpret_cast<intptr_t>(hFile), 0); result->reset(new WinSequentialFile(fname, hFile, options));
if (fd == -1) {
auto code = errno;
CloseHandle(hFile);
s = IOError("Failed to _open_osfhandle for NewSequentialFile: " + fname,
code);
} else {
FILE* file = _fdopen(fd, "rb");
if (file == nullptr) {
auto code = errno;
_close(fd);
s = IOError("Failed to fdopen NewSequentialFile: " + fname, code);
} else {
result->reset(new WinSequentialFile(fname, file, options));
}
}
} }
return s; return s;
} }
@ -1811,22 +1810,7 @@ class WinEnv : public Env {
// Set creation, last access and last write time to the same value // Set creation, last access and last write time to the same value
SetFileTime(hFile, &ft, &ft, &ft); SetFileTime(hFile, &ft, &ft, &ft);
} }
result->reset(new WinLogger(&WinEnv::gettid, this, hFile));
int fd = _open_osfhandle(reinterpret_cast<intptr_t>(hFile), 0);
if (fd == -1) {
auto code = errno;
CloseHandle(hFile);
s = IOError("Failed to _open_osfhandle: " + fname, code);
} else {
FILE* file = _fdopen(fd, "w");
if (file == nullptr) {
auto code = errno;
_close(fd);
s = IOError("Failed to fdopen: " + fname, code);
} else {
result->reset(new WinLogger(&WinEnv::gettid, this, file));
}
}
} }
return s; return s;
} }

@ -92,6 +92,7 @@ typedef SSIZE_T ssize_t;
namespace rocksdb { namespace rocksdb {
#define PREFETCH(addr, rw, locality) #define PREFETCH(addr, rw, locality)
std::string GetWindowsErrSz(DWORD err);
namespace port { namespace port {

@ -18,13 +18,16 @@
#include <atomic> #include <atomic>
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include <Windows.h>
#include "port/win/win_logger.h" #include "port/win/win_logger.h"
#include "port/sys_time.h" #include "port/sys_time.h"
#include "util/iostats_context_imp.h" #include "util/iostats_context_imp.h"
namespace rocksdb { namespace rocksdb {
WinLogger::WinLogger(uint64_t (*gettid)(), Env* env, FILE* file, WinLogger::WinLogger(uint64_t (*gettid)(), Env* env, HANDLE file,
const InfoLogLevel log_level) const InfoLogLevel log_level)
: Logger(log_level), : Logger(log_level),
gettid_(gettid), gettid_(gettid),
@ -35,20 +38,23 @@ WinLogger::WinLogger(uint64_t (*gettid)(), Env* env, FILE* file,
file_(file) {} file_(file) {}
void WinLogger::DebugWriter(const char* str, int len) { void WinLogger::DebugWriter(const char* str, int len) {
size_t sz = fwrite(str, 1, len, file_); DWORD bytesWritten = 0;
if (sz == 0) { BOOL ret = WriteFile(file_, str, len, &bytesWritten, NULL);
perror("fwrite .. [BAD]"); if (ret == FALSE) {
std::string errSz = GetWindowsErrSz(GetLastError());
fprintf(stderr, errSz.c_str());
} }
} }
WinLogger::~WinLogger() { close(); } WinLogger::~WinLogger() { close(); }
void WinLogger::close() { fclose(file_); } void WinLogger::close() { CloseHandle(file_); }
void WinLogger::Flush() { void WinLogger::Flush() {
if (flush_pending_) { if (flush_pending_) {
flush_pending_ = false; flush_pending_ = false;
fflush(file_); // With Windows API writes go to OS buffers directly so no fflush needed unlike
// with C runtime API. We don't flush all the way to disk for perf reasons.
} }
last_flush_micros_ = env_->NowMicros(); last_flush_micros_ = env_->NowMicros();
@ -118,14 +124,16 @@ void WinLogger::Logv(const char* format, va_list ap) {
assert(p <= limit); assert(p <= limit);
const size_t write_size = p - base; const size_t write_size = p - base;
size_t sz = fwrite(base, 1, write_size, file_); DWORD bytesWritten = 0;
if (sz == 0) { BOOL ret = WriteFile(file_, base, write_size, &bytesWritten, NULL);
perror("fwrite .. [BAD]"); if (ret == FALSE) {
std::string errSz = GetWindowsErrSz(GetLastError());
fprintf(stderr, errSz.c_str());
} }
flush_pending_ = true; flush_pending_ = true;
assert(sz == write_size); assert(bytesWritten == write_size);
if (sz > 0) { if (bytesWritten > 0) {
log_size_ += write_size; log_size_ += write_size;
} }
@ -133,7 +141,8 @@ void WinLogger::Logv(const char* format, va_list ap) {
static_cast<uint64_t>(now_tv.tv_sec) * 1000000 + now_tv.tv_usec; static_cast<uint64_t>(now_tv.tv_sec) * 1000000 + now_tv.tv_usec;
if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
flush_pending_ = false; flush_pending_ = false;
fflush(file_); // With Windows API writes go to OS buffers directly so no fflush needed unlike
// with C runtime API. We don't flush all the way to disk for perf reasons.
last_flush_micros_ = now_micros; last_flush_micros_ = now_micros;
} }
break; break;

@ -24,7 +24,7 @@ const int kDebugLogChunkSize = 128 * 1024;
class WinLogger : public rocksdb::Logger { class WinLogger : public rocksdb::Logger {
public: public:
WinLogger(uint64_t (*gettid)(), Env* env, FILE* file, WinLogger(uint64_t (*gettid)(), Env* env, HANDLE file,
const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL); const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL);
virtual ~WinLogger(); virtual ~WinLogger();
@ -44,7 +44,7 @@ class WinLogger : public rocksdb::Logger {
void DebugWriter(const char* str, int len); void DebugWriter(const char* str, int len);
private: private:
FILE* file_; HANDLE file_;
uint64_t (*gettid_)(); // Return the thread id for the current thread uint64_t (*gettid_)(); // Return the thread id for the current thread
std::atomic_size_t log_size_; std::atomic_size_t log_size_;
std::atomic_uint_fast64_t last_flush_micros_; std::atomic_uint_fast64_t last_flush_micros_;

Loading…
Cancel
Save