Implement WinRandomRW file and improve code reuse (#1388)

main
Dmitri Smirnov 8 years ago committed by GitHub
parent a249a0b75b
commit b9311aa65c
  1. 35
      include/rocksdb/env.h
  2. 49
      port/win/env_win.cc
  3. 10
      port/win/env_win.h
  4. 392
      port/win/io_win.cc
  5. 273
      port/win/io_win.h

@ -504,15 +504,16 @@ class WritableFile {
virtual ~WritableFile(); virtual ~WritableFile();
// Indicates if the class makes use of unbuffered I/O // Indicates if the class makes use of unbuffered I/O
// If false you must pass aligned buffer to Write()
virtual bool UseOSBuffer() const { virtual bool UseOSBuffer() const {
return true; return true;
} }
const size_t c_DefaultPageSize = 4 * 1024; const size_t c_DefaultPageSize = 4 * 1024;
// This is needed when you want to allocate // Use the returned alignment value to allocate
// AlignedBuffer for use with file I/O classes // aligned buffer for Write() when UseOSBuffer()
// Used for unbuffered file I/O when UseOSBuffer() returns false // returns false
virtual size_t GetRequiredBufferAlignment() const { virtual size_t GetRequiredBufferAlignment() const {
return c_DefaultPageSize; return c_DefaultPageSize;
} }
@ -664,7 +665,34 @@ class RandomRWFile {
RandomRWFile() {} RandomRWFile() {}
virtual ~RandomRWFile() {} virtual ~RandomRWFile() {}
// Indicates if the class makes use of unbuffered I/O
// If false you must pass aligned buffer to Write()
virtual bool UseOSBuffer() const {
return true;
}
const size_t c_DefaultPageSize = 4 * 1024;
// Use the returned alignment value to allocate
// aligned buffer for Write() when UseOSBuffer()
// returns false
virtual size_t GetRequiredBufferAlignment() const {
return c_DefaultPageSize;
}
// Used by the file_reader_writer to decide if the ReadAhead wrapper
// should simply forward the call and do not enact read_ahead buffering or locking.
// The implementation below takes care of reading ahead
virtual bool ShouldForwardRawRequest() const {
return false;
}
// For cases when read-ahead is implemented in the platform dependent
// layer. This is when ShouldForwardRawRequest() returns true.
virtual void EnableReadAhead() {}
// Write bytes in `data` at offset `offset`, Returns Status::OK() on success. // Write bytes in `data` at offset `offset`, Returns Status::OK() on success.
// Pass aligned buffer when UseOSBuffer() returns false.
virtual Status Write(uint64_t offset, const Slice& data) = 0; virtual Status Write(uint64_t offset, const Slice& data) = 0;
// Read up to `n` bytes starting from offset `offset` and store them in // Read up to `n` bytes starting from offset `offset` and store them in
@ -681,7 +709,6 @@ class RandomRWFile {
virtual Status Close() = 0; virtual Status Close() = 0;
private:
// No copying allowed // No copying allowed
RandomRWFile(const RandomRWFile&) = delete; RandomRWFile(const RandomRWFile&) = delete;
RandomRWFile& operator=(const RandomRWFile&) = delete; RandomRWFile& operator=(const RandomRWFile&) = delete;

@ -293,6 +293,50 @@ Status WinEnvIO::NewWritableFile(const std::string& fname,
return s; return s;
} }
Status WinEnvIO::NewRandomRWFile(const std::string & fname,
unique_ptr<RandomRWFile>* result, const EnvOptions & options) {
Status s;
// Open the file for read-only random access
// Random access is to disable read-ahead as the system reads too much data
DWORD desired_access = GENERIC_READ | GENERIC_WRITE;
DWORD shared_mode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE;
DWORD creation_disposition = OPEN_ALWAYS; // Create if necessary or open existing
DWORD file_flags = FILE_FLAG_RANDOM_ACCESS;
if (!options.use_os_buffer) {
file_flags |= FILE_FLAG_NO_BUFFERING;
}
/// Shared access is necessary for corruption test to pass
// almost all tests would work with a possible exception of fault_injection
HANDLE hFile = 0;
{
IOSTATS_TIMER_GUARD(open_nanos);
hFile =
CreateFileA(fname.c_str(),
desired_access,
shared_mode,
NULL, // Security attributes
creation_disposition,
file_flags,
NULL);
}
if (INVALID_HANDLE_VALUE == hFile) {
auto lastError = GetLastError();
return IOErrorFromWindowsError(
"NewRandomRWFile failed to Create/Open: " + fname, lastError);
}
UniqueCloseHandlePtr fileGuard(hFile, CloseHandleFunc);
result->reset(new WinRandomRWFile(fname, hFile, page_size_, options));
fileGuard.release();
return s;
}
Status WinEnvIO::NewDirectory(const std::string& name, Status WinEnvIO::NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) { std::unique_ptr<Directory>* result) {
Status s; Status s;
@ -868,6 +912,11 @@ Status WinEnv::NewWritableFile(const std::string& fname,
return winenv_io_.NewWritableFile(fname, result, options); return winenv_io_.NewWritableFile(fname, result, options);
} }
Status WinEnv::NewRandomRWFile(const std::string & fname,
unique_ptr<RandomRWFile>* result, const EnvOptions & options) {
return winenv_io_.NewRandomRWFile(fname, result, options);
}
Status WinEnv::NewDirectory(const std::string& name, Status WinEnv::NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) { std::unique_ptr<Directory>* result) {
return winenv_io_.NewDirectory(name, result); return winenv_io_.NewDirectory(name, result);

@ -92,6 +92,11 @@ public:
std::unique_ptr<WritableFile>* result, std::unique_ptr<WritableFile>* result,
const EnvOptions& options); const EnvOptions& options);
// The returned file will only be accessed by one thread at a time.
virtual Status NewRandomRWFile(const std::string& fname,
unique_ptr<RandomRWFile>* result,
const EnvOptions& options);
virtual Status NewDirectory(const std::string& name, virtual Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result); std::unique_ptr<Directory>* result);
@ -188,6 +193,11 @@ public:
std::unique_ptr<WritableFile>* result, std::unique_ptr<WritableFile>* result,
const EnvOptions& options) override; const EnvOptions& options) override;
// The returned file will only be accessed by one thread at a time.
Status NewRandomRWFile(const std::string& fname,
unique_ptr<RandomRWFile>* result,
const EnvOptions& options) override;
Status NewDirectory(const std::string& name, Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override; std::unique_ptr<Directory>* result) override;

@ -155,10 +155,12 @@ size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size) {
return static_cast<size_t>(rid - id); return static_cast<size_t>(rid - id);
} }
////////////////////////////////////////////////////////////////////////////////////////////////////
// WinMmapReadableFile
WinMmapReadableFile::WinMmapReadableFile(const std::string& fileName, HANDLE hFile, HANDLE hMap, WinMmapReadableFile::WinMmapReadableFile(const std::string& fileName, HANDLE hFile, HANDLE hMap,
const void* mapped_region, size_t length) const void* mapped_region, size_t length)
: fileName_(fileName), : WinFileData(fileName, hFile, false),
hFile_(hFile),
hMap_(hMap), hMap_(hMap),
mapped_region_(mapped_region), mapped_region_(mapped_region),
length_(length) {} length_(length) {}
@ -169,9 +171,6 @@ WinMmapReadableFile::~WinMmapReadableFile() {
ret = ::CloseHandle(hMap_); ret = ::CloseHandle(hMap_);
assert(ret); assert(ret);
ret = ::CloseHandle(hFile_);
assert(ret);
} }
Status WinMmapReadableFile::Read(uint64_t offset, size_t n, Slice* result, Status WinMmapReadableFile::Read(uint64_t offset, size_t n, Slice* result,
@ -180,7 +179,7 @@ Status WinMmapReadableFile::Read(uint64_t offset, size_t n, Slice* result,
if (offset > length_) { if (offset > length_) {
*result = Slice(); *result = Slice();
return IOError(fileName_, EINVAL); return IOError(filename_, EINVAL);
} else if (offset + n > length_) { } else if (offset + n > length_) {
n = length_ - offset; n = length_ - offset;
} }
@ -197,6 +196,10 @@ size_t WinMmapReadableFile::GetUniqueId(char* id, size_t max_size) const {
return GetUniqueIdFromFile(hFile_, id, max_size); return GetUniqueIdFromFile(hFile_, id, max_size);
} }
///////////////////////////////////////////////////////////////////////////////
/// WinMmapFile
// Can only truncate or reserve to a sector size aligned if // Can only truncate or reserve to a sector size aligned if
// used on files that are opened with Unbuffered I/O // used on files that are opened with Unbuffered I/O
Status WinMmapFile::TruncateFile(uint64_t toSize) { Status WinMmapFile::TruncateFile(uint64_t toSize) {
@ -302,8 +305,7 @@ Status WinMmapFile::PreallocateInternal(uint64_t spaceToReserve) {
WinMmapFile::WinMmapFile(const std::string& fname, HANDLE hFile, size_t page_size, WinMmapFile::WinMmapFile(const std::string& fname, HANDLE hFile, size_t page_size,
size_t allocation_granularity, const EnvOptions& options) size_t allocation_granularity, const EnvOptions& options)
: filename_(fname), : WinFileData(fname, hFile, false),
hFile_(hFile),
hMap_(NULL), hMap_(NULL),
page_size_(page_size), page_size_(page_size),
allocation_granularity_(allocation_granularity), allocation_granularity_(allocation_granularity),
@ -515,16 +517,16 @@ size_t WinMmapFile::GetUniqueId(char* id, size_t max_size) const {
return GetUniqueIdFromFile(hFile_, id, max_size); return GetUniqueIdFromFile(hFile_, id, max_size);
} }
//////////////////////////////////////////////////////////////////////////////////
// WinSequentialFile
WinSequentialFile::WinSequentialFile(const std::string& fname, HANDLE f, WinSequentialFile::WinSequentialFile(const std::string& fname, HANDLE f,
const EnvOptions& options) const EnvOptions& options)
: filename_(fname), : WinFileData(fname, f, options.use_os_buffer)
file_(f),
use_os_buffer_(options.use_os_buffer)
{} {}
WinSequentialFile::~WinSequentialFile() { WinSequentialFile::~WinSequentialFile() {
assert(file_ != INVALID_HANDLE_VALUE); assert(hFile_ != INVALID_HANDLE_VALUE);
CloseHandle(file_);
} }
Status WinSequentialFile::Read(size_t n, Slice* result, char* scratch) { Status WinSequentialFile::Read(size_t n, Slice* result, char* scratch) {
@ -540,7 +542,7 @@ Status WinSequentialFile::Read(size_t n, Slice* result, char* scratch) {
DWORD bytesToRead = static_cast<DWORD>(n); //cast is safe due to the check above DWORD bytesToRead = static_cast<DWORD>(n); //cast is safe due to the check above
DWORD bytesRead = 0; DWORD bytesRead = 0;
BOOL ret = ReadFile(file_, scratch, bytesToRead, &bytesRead, NULL); BOOL ret = ReadFile(hFile_, scratch, bytesToRead, &bytesRead, NULL);
if (ret == TRUE) { if (ret == TRUE) {
r = bytesRead; r = bytesRead;
} else { } else {
@ -561,7 +563,7 @@ Status WinSequentialFile::Skip(uint64_t n) {
LARGE_INTEGER li; LARGE_INTEGER li;
li.QuadPart = static_cast<int64_t>(n); //cast is safe due to the check above li.QuadPart = static_cast<int64_t>(n); //cast is safe due to the check above
BOOL ret = SetFilePointerEx(file_, li, NULL, FILE_CURRENT); BOOL ret = SetFilePointerEx(hFile_, li, NULL, FILE_CURRENT);
if (ret == FALSE) { if (ret == FALSE) {
return IOErrorFromWindowsError(filename_, GetLastError()); return IOErrorFromWindowsError(filename_, GetLastError());
} }
@ -572,14 +574,31 @@ Status WinSequentialFile::InvalidateCache(size_t offset, size_t length) {
return Status::OK(); return Status::OK();
} }
SSIZE_T WinRandomAccessFile::ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start, //////////////////////////////////////////////////////////////////////////////////////////////////
/// WinRandomAccessBase
// Helper
void CalculateReadParameters(size_t alignment, uint64_t offset,
size_t bytes_requested,
size_t& actual_bytes_toread,
uint64_t& first_page_start) {
first_page_start = TruncateToPageBoundary(alignment, offset);
const uint64_t last_page_start =
TruncateToPageBoundary(alignment, offset + bytes_requested - 1);
actual_bytes_toread = (last_page_start - first_page_start) + alignment;
}
SSIZE_T WinRandomAccessImpl::ReadIntoBuffer(uint64_t user_offset,
uint64_t first_page_start,
size_t bytes_to_read, size_t& left, size_t bytes_to_read, size_t& left,
AlignedBuffer& buffer, char* dest) const { AlignedBuffer& buffer, char* dest) const {
assert(buffer.CurrentSize() == 0); assert(buffer.CurrentSize() == 0);
assert(buffer.Capacity() >= bytes_to_read); assert(buffer.Capacity() >= bytes_to_read);
SSIZE_T read = SSIZE_T read =
PositionedReadInternal(buffer.Destination(), bytes_to_read, first_page_start); PositionedReadInternal(buffer.Destination(), bytes_to_read,
first_page_start);
if (read > 0) { if (read > 0) {
buffer.Size(read); buffer.Size(read);
@ -597,7 +616,8 @@ SSIZE_T WinRandomAccessFile::ReadIntoBuffer(uint64_t user_offset, uint64_t first
return read; return read;
} }
SSIZE_T WinRandomAccessFile::ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start, SSIZE_T WinRandomAccessImpl::ReadIntoOneShotBuffer(uint64_t user_offset,
uint64_t first_page_start,
size_t bytes_to_read, size_t& left, size_t bytes_to_read, size_t& left,
char* dest) const { char* dest) const {
AlignedBuffer bigBuffer; AlignedBuffer bigBuffer;
@ -608,7 +628,7 @@ SSIZE_T WinRandomAccessFile::ReadIntoOneShotBuffer(uint64_t user_offset, uint64_
bigBuffer, dest); bigBuffer, dest);
} }
SSIZE_T WinRandomAccessFile::ReadIntoInstanceBuffer(uint64_t user_offset, SSIZE_T WinRandomAccessImpl::ReadIntoInstanceBuffer(uint64_t user_offset,
uint64_t first_page_start, uint64_t first_page_start,
size_t bytes_to_read, size_t& left, size_t bytes_to_read, size_t& left,
char* dest) const { char* dest) const {
@ -622,52 +642,35 @@ SSIZE_T WinRandomAccessFile::ReadIntoInstanceBuffer(uint64_t user_offset,
return read; return read;
} }
void WinRandomAccessFile::CalculateReadParameters(uint64_t offset, size_t bytes_requested, SSIZE_T WinRandomAccessImpl::PositionedReadInternal(char* src,
size_t& actual_bytes_toread, size_t numBytes,
uint64_t& first_page_start) const {
const size_t alignment = buffer_.Alignment();
first_page_start = TruncateToPageBoundary(alignment, offset);
const uint64_t last_page_start =
TruncateToPageBoundary(alignment, offset + bytes_requested - 1);
actual_bytes_toread = (last_page_start - first_page_start) + alignment;
}
SSIZE_T WinRandomAccessFile::PositionedReadInternal(char* src, size_t numBytes,
uint64_t offset) const { uint64_t offset) const {
return pread(hFile_, src, numBytes, offset); return pread(file_base_->GetFileHandle(), src, numBytes, offset);
} }
WinRandomAccessFile::WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment, inline
const EnvOptions& options) WinRandomAccessImpl::WinRandomAccessImpl(WinFileData* file_base,
: filename_(fname), size_t alignment,
hFile_(hFile), const EnvOptions& options) :
use_os_buffer_(options.use_os_buffer), file_base_(file_base),
read_ahead_(false), read_ahead_(false),
compaction_readahead_size_(options.compaction_readahead_size), compaction_readahead_size_(options.compaction_readahead_size),
random_access_max_buffer_size_(options.random_access_max_buffer_size), random_access_max_buffer_size_(options.random_access_max_buffer_size),
buffer_(), buffer_(),
buffered_start_(0) { buffered_start_(0) {
assert(!options.use_mmap_reads); assert(!options.use_mmap_reads);
// Unbuffered access, use internal buffer for reads // Unbuffered access, use internal buffer for reads
if (!use_os_buffer_) { if (!file_base_->UseOSBuffer()) {
// Do not allocate the buffer either until the first request or // Do not allocate the buffer either until the first request or
// until there is a call to allocate a read-ahead buffer // until there is a call to allocate a read-ahead buffer
buffer_.Alignment(alignment); buffer_.Alignment(alignment);
} }
} }
WinRandomAccessFile::~WinRandomAccessFile() { inline
if (hFile_ != NULL && hFile_ != INVALID_HANDLE_VALUE) { Status WinRandomAccessImpl::ReadImpl(uint64_t offset, size_t n, Slice* result,
::CloseHandle(hFile_);
}
}
void WinRandomAccessFile::EnableReadAhead() { this->Hint(SEQUENTIAL); }
Status WinRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const { char* scratch) const {
Status s; Status s;
@ -683,14 +686,15 @@ Status WinRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
// When in unbuffered mode we need to do the following changes: // When in unbuffered mode we need to do the following changes:
// - use our own aligned buffer // - use our own aligned buffer
// - always read at the offset of that is a multiple of alignment // - always read at the offset of that is a multiple of alignment
if (!use_os_buffer_) { if (!file_base_->UseOSBuffer()) {
uint64_t first_page_start = 0; uint64_t first_page_start = 0;
size_t actual_bytes_toread = 0; size_t actual_bytes_toread = 0;
size_t bytes_requested = left; size_t bytes_requested = left;
if (!read_ahead_ && random_access_max_buffer_size_ == 0) { if (!read_ahead_ && random_access_max_buffer_size_ == 0) {
CalculateReadParameters(offset, bytes_requested, actual_bytes_toread, CalculateReadParameters(buffer_.Alignment(), offset, bytes_requested,
actual_bytes_toread,
first_page_start); first_page_start);
assert(actual_bytes_toread > 0); assert(actual_bytes_toread > 0);
@ -723,7 +727,8 @@ Status WinRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
bytes_requested = compaction_readahead_size_; bytes_requested = compaction_readahead_size_;
} }
CalculateReadParameters(offset, bytes_requested, actual_bytes_toread, CalculateReadParameters(buffer_.Alignment(), offset, bytes_requested,
actual_bytes_toread,
first_page_start); first_page_start);
assert(actual_bytes_toread > 0); assert(actual_bytes_toread > 0);
@ -757,20 +762,25 @@ Status WinRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
} }
} }
*result = Slice(scratch, (r < 0) ? 0 : n - left);
if (r < 0) { if (r < 0) {
s = IOErrorFromLastWindowsError(filename_); auto lastError = GetLastError();
// Posix impl wants to treat reads from beyond
// of the file as OK.
if(lastError != ERROR_HANDLE_EOF) {
s = IOErrorFromWindowsError(file_base_->GetName(), lastError);
}
} }
*result = Slice(scratch, (r < 0) ? 0 : n - left);
return s; return s;
} }
bool WinRandomAccessFile::ShouldForwardRawRequest() const { inline
return true; void WinRandomAccessImpl::HintImpl(RandomAccessFile::AccessPattern pattern) {
}
void WinRandomAccessFile::Hint(AccessPattern pattern) { if (pattern == RandomAccessFile::SEQUENTIAL &&
if (pattern == SEQUENTIAL && !use_os_buffer_ && !file_base_->UseOSBuffer() &&
compaction_readahead_size_ > 0) { compaction_readahead_size_ > 0) {
std::lock_guard<std::mutex> lg(buffer_mut_); std::lock_guard<std::mutex> lg(buffer_mut_);
if (!read_ahead_) { if (!read_ahead_) {
@ -785,60 +795,76 @@ void WinRandomAccessFile::Hint(AccessPattern pattern) {
} }
} }
Status WinRandomAccessFile::InvalidateCache(size_t offset, size_t length) { ///////////////////////////////////////////////////////////////////////////////////////////////////
return Status::OK(); /// WinRandomAccessFile
WinRandomAccessFile::WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment,
const EnvOptions& options) :
WinFileData(fname, hFile, options.use_os_buffer),
WinRandomAccessImpl(this, alignment, options) {
} }
size_t WinRandomAccessFile::GetUniqueId(char* id, size_t max_size) const { WinRandomAccessFile::~WinRandomAccessFile() {
return GetUniqueIdFromFile(hFile_, id, max_size);
} }
Status WinWritableFile::PreallocateInternal(uint64_t spaceToReserve) { Status WinRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
return fallocate(filename_, hFile_, spaceToReserve); char* scratch) const {
return ReadImpl(offset, n, result, scratch);
} }
WinWritableFile::WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment, void WinRandomAccessFile::EnableReadAhead() {
size_t capacity, const EnvOptions& options) HintImpl(SEQUENTIAL);
: filename_(fname),
hFile_(hFile),
use_os_buffer_(options.use_os_buffer),
alignment_(alignment),
filesize_(0),
reservedsize_(0) {
assert(!options.use_mmap_writes);
} }
WinWritableFile::~WinWritableFile() { bool WinRandomAccessFile::ShouldForwardRawRequest() const {
if (NULL != hFile_ && INVALID_HANDLE_VALUE != hFile_) { return true;
WinWritableFile::Close();
}
} }
// Indicates if the class makes use of unbuffered I/O void WinRandomAccessFile::Hint(AccessPattern pattern) {
bool WinWritableFile::UseOSBuffer() const { HintImpl(pattern);
return use_os_buffer_;
} }
size_t WinWritableFile::GetRequiredBufferAlignment() const { Status WinRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
return alignment_; return Status::OK();
} }
Status WinWritableFile::Append(const Slice& data) { size_t WinRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
return GetUniqueIdFromFile(GetFileHandle(), id, max_size);
}
/////////////////////////////////////////////////////////////////////////////
// WinWritableImpl
//
inline
Status WinWritableImpl::PreallocateInternal(uint64_t spaceToReserve) {
return fallocate(file_data_->GetName(), file_data_->GetFileHandle(), spaceToReserve);
}
WinWritableImpl::WinWritableImpl(WinFileData* file_data, size_t alignment)
: file_data_(file_data),
alignment_(alignment),
filesize_(0),
reservedsize_(0) {
}
Status WinWritableImpl::AppendImpl(const Slice& data) {
// Used for buffered access ONLY // Used for buffered access ONLY
assert(use_os_buffer_); assert(file_data_->UseOSBuffer());
assert(data.size() < std::numeric_limits<DWORD>::max()); assert(data.size() < std::numeric_limits<DWORD>::max());
Status s; Status s;
DWORD bytesWritten = 0; DWORD bytesWritten = 0;
if (!WriteFile(hFile_, data.data(), if (!WriteFile(file_data_->GetFileHandle(), data.data(),
static_cast<DWORD>(data.size()), &bytesWritten, NULL)) { static_cast<DWORD>(data.size()), &bytesWritten, NULL)) {
auto lastError = GetLastError(); auto lastError = GetLastError();
s = IOErrorFromWindowsError( s = IOErrorFromWindowsError(
"Failed to WriteFile: " + filename_, "Failed to WriteFile: " + file_data_->GetName(),
lastError); lastError);
} else { }
else {
assert(size_t(bytesWritten) == data.size()); assert(size_t(bytesWritten) == data.size());
filesize_ += data.size(); filesize_ += data.size();
} }
@ -846,86 +872,77 @@ Status WinWritableFile::Append(const Slice& data) {
return s; return s;
} }
Status WinWritableFile::PositionedAppend(const Slice& data, uint64_t offset) { Status WinWritableImpl::PositionedAppendImpl(const Slice& data, uint64_t offset) {
Status s; Status s;
SSIZE_T ret = pwrite(hFile_, data.data(), data.size(), offset); SSIZE_T ret = pwrite(file_data_->GetFileHandle(), data.data(), data.size(), offset);
// Error break // Error break
if (ret < 0) { if (ret < 0) {
auto lastError = GetLastError(); auto lastError = GetLastError();
s = IOErrorFromWindowsError( s = IOErrorFromWindowsError(
"Failed to pwrite for: " + filename_, lastError); "Failed to pwrite for: " + file_data_->GetName(), lastError);
} else { }
// With positional write it is not clear at all else {
// if this actually extends the filesize
assert(size_t(ret) == data.size()); assert(size_t(ret) == data.size());
filesize_ += data.size(); // For sequential write this would be simple
// size extension by data.size()
uint64_t write_end = offset + data.size();
if (write_end >= filesize_) {
filesize_ = write_end;
}
} }
return s; return s;
} }
// Need to implement this so the file is truncated correctly // Need to implement this so the file is truncated correctly
// when buffered and unbuffered mode // when buffered and unbuffered mode
Status WinWritableFile::Truncate(uint64_t size) { inline
Status s = ftruncate(filename_, hFile_, size); Status WinWritableImpl::TruncateImpl(uint64_t size) {
Status s = ftruncate(file_data_->GetName(), file_data_->GetFileHandle(),
size);
if (s.ok()) { if (s.ok()) {
filesize_ = size; filesize_ = size;
} }
return s; return s;
} }
Status WinWritableFile::Close() { Status WinWritableImpl::CloseImpl() {
Status s; Status s;
assert(INVALID_HANDLE_VALUE != hFile_); auto hFile = file_data_->GetFileHandle();
assert(INVALID_HANDLE_VALUE != hFile);
if (fsync(hFile_) < 0) { if (fsync(hFile) < 0) {
auto lastError = GetLastError(); auto lastError = GetLastError();
s = IOErrorFromWindowsError("fsync failed at Close() for: " + filename_, s = IOErrorFromWindowsError("fsync failed at Close() for: " +
file_data_->GetName(),
lastError); lastError);
} }
if (FALSE == ::CloseHandle(hFile_)) { if(!file_data_->CloseFile()) {
auto lastError = GetLastError(); auto lastError = GetLastError();
s = IOErrorFromWindowsError("CloseHandle failed for: " + filename_, s = IOErrorFromWindowsError("CloseHandle failed for: " + file_data_->GetName(),
lastError); lastError);
} }
hFile_ = INVALID_HANDLE_VALUE;
return s; return s;
} }
// write out the cached data to the OS cache Status WinWritableImpl::SyncImpl() {
// This is now taken care of the WritableFileWriter
Status WinWritableFile::Flush() {
return Status::OK();
}
Status WinWritableFile::Sync() {
Status s; Status s;
// Calls flush buffers // Calls flush buffers
if (fsync(hFile_) < 0) { if (fsync(file_data_->GetFileHandle()) < 0) {
auto lastError = GetLastError(); auto lastError = GetLastError();
s = IOErrorFromWindowsError("fsync failed at Sync() for: " + filename_, s = IOErrorFromWindowsError("fsync failed at Sync() for: " +
file_data_->GetName(),
lastError); lastError);
} }
return s; return s;
} }
Status WinWritableFile::Fsync() { return Sync(); }
uint64_t WinWritableFile::GetFileSize() {
// Double accounting now here with WritableFileWriter
// and this size will be wrong when unbuffered access is used
// but tests implement their own writable files and do not use WritableFileWrapper
// so we need to squeeze a square peg through
// a round hole here.
return filesize_;
}
Status WinWritableFile::Allocate(uint64_t offset, uint64_t len) { Status WinWritableImpl::AllocateImpl(uint64_t offset, uint64_t len) {
Status status; Status status;
TEST_KILL_RANDOM("WinWritableFile::Allocate", rocksdb_kill_odds); TEST_KILL_RANDOM("WinWritableFile::Allocate", rocksdb_kill_odds);
@ -946,18 +963,135 @@ Status WinWritableFile::Allocate(uint64_t offset, uint64_t len) {
return status; return status;
} }
////////////////////////////////////////////////////////////////////////////////
/// WinWritableFile
WinWritableFile::WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment,
size_t /* capacity */, const EnvOptions& options)
: WinFileData(fname, hFile, options.use_os_buffer),
WinWritableImpl(this, alignment) {
assert(!options.use_mmap_writes);
}
WinWritableFile::~WinWritableFile() {
}
// Indicates if the class makes use of unbuffered I/O
bool WinWritableFile::UseOSBuffer() const {
return WinFileData::UseOSBuffer();
}
size_t WinWritableFile::GetRequiredBufferAlignment() const {
return GetAlignement();
}
Status WinWritableFile::Append(const Slice& data) {
return AppendImpl(data);
}
Status WinWritableFile::PositionedAppend(const Slice& data, uint64_t offset) {
return PositionedAppendImpl(data, offset);
}
// Need to implement this so the file is truncated correctly
// when buffered and unbuffered mode
Status WinWritableFile::Truncate(uint64_t size) {
return TruncateImpl(size);
}
Status WinWritableFile::Close() {
return CloseImpl();
}
// write out the cached data to the OS cache
// This is now taken care of the WritableFileWriter
Status WinWritableFile::Flush() {
return Status::OK();
}
Status WinWritableFile::Sync() {
return SyncImpl();
}
Status WinWritableFile::Fsync() {
return SyncImpl();
}
uint64_t WinWritableFile::GetFileSize() {
return GetFileSizeImpl();
}
Status WinWritableFile::Allocate(uint64_t offset, uint64_t len) {
return AllocateImpl(offset, len);
}
size_t WinWritableFile::GetUniqueId(char* id, size_t max_size) const { size_t WinWritableFile::GetUniqueId(char* id, size_t max_size) const {
return GetUniqueIdFromFile(hFile_, id, max_size); return GetUniqueIdFromFile(GetFileHandle(), id, max_size);
}
/////////////////////////////////////////////////////////////////////////
/// WinRandomRWFile
WinRandomRWFile::WinRandomRWFile(const std::string& fname, HANDLE hFile, size_t alignment,
const EnvOptions& options) :
WinFileData(fname, hFile, options.use_os_buffer),
WinRandomAccessImpl(this, alignment, options),
WinWritableImpl(this, alignment) {
}
bool WinRandomRWFile::UseOSBuffer() const {
return WinFileData::UseOSBuffer();
} }
size_t WinRandomRWFile::GetRequiredBufferAlignment() const {
return GetAlignement();
}
bool WinRandomRWFile::ShouldForwardRawRequest() const {
return true;
}
void WinRandomRWFile::EnableReadAhead() {
HintImpl(RandomAccessFile::SEQUENTIAL);
}
Status WinRandomRWFile::Write(uint64_t offset, const Slice & data) {
return PositionedAppendImpl(data, offset);
}
Status WinRandomRWFile::Read(uint64_t offset, size_t n, Slice * result,
char * scratch) const {
return ReadImpl(offset, n, result, scratch);
}
Status WinRandomRWFile::Flush() {
return Status::OK();
}
Status WinRandomRWFile::Sync() {
return SyncImpl();
}
Status WinRandomRWFile::Close() {
return CloseImpl();
}
//////////////////////////////////////////////////////////////////////////
/// WinDirectory
Status WinDirectory::Fsync() { return Status::OK(); } Status WinDirectory::Fsync() { return Status::OK(); }
//////////////////////////////////////////////////////////////////////////
/// WinFileLock
WinFileLock::~WinFileLock() { WinFileLock::~WinFileLock() {
BOOL ret = ::CloseHandle(hFile_); BOOL ret = ::CloseHandle(hFile_);
assert(ret); assert(ret);
} }
} }
} }

@ -68,10 +68,58 @@ Status ftruncate(const std::string& filename, HANDLE hFile,
size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size); size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size);
// mmap() based random-access class WinFileData {
class WinMmapReadableFile : public RandomAccessFile { protected:
const std::string fileName_;
const std::string filename_;
HANDLE hFile_; HANDLE hFile_;
// There is no equivalent of advising away buffered pages as in posix.
// To implement this flag we would need to do unbuffered reads which
// will need to be aligned (not sure there is a guarantee that the buffer
// passed in is aligned).
// Hence we currently ignore this flag. It is used only in a few cases
// which should not be perf critical.
// If perf evaluation finds this to be a problem, we can look into
// implementing this.
const bool use_os_buffer_;
public:
// We want this class be usable both for inheritance (prive
// or protected) and for containment so __ctor and __dtor public
WinFileData(const std::string& filename, HANDLE hFile, bool use_os_buffer) :
filename_(filename), hFile_(hFile), use_os_buffer_(use_os_buffer)
{}
virtual ~WinFileData() {
this->CloseFile();
}
bool CloseFile() {
bool result = true;
if (hFile_ != NULL && hFile_ != INVALID_HANDLE_VALUE) {
result = ::CloseHandle(hFile_);
assert(result);
hFile_ = NULL;
}
return result;
}
const std::string& GetName() const { return filename_; }
HANDLE GetFileHandle() const { return hFile_; }
bool UseOSBuffer() const { return use_os_buffer_; }
WinFileData(const WinFileData&) = delete;
WinFileData& operator=(const WinFileData&) = delete;
};
// mmap() based random-access
class WinMmapReadableFile : private WinFileData, public RandomAccessFile {
HANDLE hMap_; HANDLE hMap_;
const void* mapped_region_; const void* mapped_region_;
@ -84,6 +132,9 @@ public:
~WinMmapReadableFile(); ~WinMmapReadableFile();
WinMmapReadableFile(const WinMmapReadableFile&) = delete;
WinMmapReadableFile& operator=(const WinMmapReadableFile&) = delete;
virtual Status Read(uint64_t offset, size_t n, Slice* result, virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override; char* scratch) const override;
@ -96,10 +147,8 @@ public:
// data to the file. This is safe since we either properly close the // data to the file. This is safe since we either properly close the
// file before reading from it, or for log files, the reading code // file before reading from it, or for log files, the reading code
// knows enough to skip zero suffixes. // knows enough to skip zero suffixes.
class WinMmapFile : public WritableFile { class WinMmapFile : private WinFileData, public WritableFile {
private: private:
const std::string filename_;
HANDLE hFile_;
HANDLE hMap_; HANDLE hMap_;
const size_t page_size_; // We flush the mapping view in page_size const size_t page_size_; // We flush the mapping view in page_size
@ -142,6 +191,9 @@ public:
~WinMmapFile(); ~WinMmapFile();
WinMmapFile(const WinMmapFile&) = delete;
WinMmapFile& operator=(const WinMmapFile&) = delete;
virtual Status Append(const Slice& data) override; virtual Status Append(const Slice& data) override;
// Means Close() will properly take care of truncate // Means Close() will properly take care of truncate
@ -174,27 +226,16 @@ public:
virtual size_t GetUniqueId(char* id, size_t max_size) const override; virtual size_t GetUniqueId(char* id, size_t max_size) const override;
}; };
class WinSequentialFile : public SequentialFile { class WinSequentialFile : private WinFileData, public SequentialFile {
private:
const std::string filename_;
HANDLE file_;
// There is no equivalent of advising away buffered pages as in posix.
// To implement this flag we would need to do unbuffered reads which
// will need to be aligned (not sure there is a guarantee that the buffer
// passed in is aligned).
// Hence we currently ignore this flag. It is used only in a few cases
// which should not be perf critical.
// If perf evaluation finds this to be a problem, we can look into
// implementing this.
bool use_os_buffer_;
public: public:
WinSequentialFile(const std::string& fname, HANDLE f, WinSequentialFile(const std::string& fname, HANDLE f,
const EnvOptions& options); const EnvOptions& options);
~WinSequentialFile(); ~WinSequentialFile();
WinSequentialFile(const WinSequentialFile&) = delete;
WinSequentialFile& operator=(const WinSequentialFile&) = delete;
virtual Status Read(size_t n, Slice* result, char* scratch) override; virtual Status Read(size_t n, Slice* result, char* scratch) override;
virtual Status Skip(uint64_t n) override; virtual Status Skip(uint64_t n) override;
@ -202,41 +243,45 @@ public:
virtual Status InvalidateCache(size_t offset, size_t length) override; virtual Status InvalidateCache(size_t offset, size_t length) override;
}; };
// pread() based random-access class WinRandomAccessImpl {
class WinRandomAccessFile : public RandomAccessFile { protected:
const std::string filename_;
HANDLE hFile_; WinFileData* file_base_;
const bool use_os_buffer_; bool read_ahead_;
bool read_ahead_;
const size_t compaction_readahead_size_; const size_t compaction_readahead_size_;
const size_t random_access_max_buffer_size_; const size_t random_access_max_buffer_size_;
mutable std::mutex buffer_mut_; mutable std::mutex buffer_mut_;
mutable AlignedBuffer buffer_; mutable AlignedBuffer buffer_;
mutable uint64_t mutable uint64_t
buffered_start_; // file offset set that is currently buffered buffered_start_; // file offset set that is currently buffered
/* // Override for behavior change when creating a custom env
* The function reads a requested amount of bytes into the specified aligned virtual SSIZE_T PositionedReadInternal(char* src, size_t numBytes,
* buffer Upon success the function sets the length of the buffer to the uint64_t offset) const;
* amount of bytes actually read even though it might be less than actually
* requested. It then copies the amount of bytes requested by the user (left) /*
* to the user supplied buffer (dest) and reduces left by the amount of bytes * The function reads a requested amount of bytes into the specified aligned
* copied to the user buffer * buffer Upon success the function sets the length of the buffer to the
* * amount of bytes actually read even though it might be less than actually
* @user_offset [in] - offset on disk where the read was requested by the user * requested. It then copies the amount of bytes requested by the user (left)
* @first_page_start [in] - actual page aligned disk offset that we want to * to the user supplied buffer (dest) and reduces left by the amount of bytes
* read from * copied to the user buffer
* @bytes_to_read [in] - total amount of bytes that will be read from disk *
* which is generally greater or equal to the amount * @user_offset [in] - offset on disk where the read was requested by the user
* that the user has requested due to the * @first_page_start [in] - actual page aligned disk offset that we want to
* either alignment requirements or read_ahead in * read from
* effect. * @bytes_to_read [in] - total amount of bytes that will be read from disk
* @left [in/out] total amount of bytes that needs to be copied to the user * which is generally greater or equal to the amount
* buffer. It is reduced by the amount of bytes that actually * that the user has requested due to the
* copied * either alignment requirements or read_ahead in
* @buffer - buffer to use * effect.
* @dest - user supplied buffer * @left [in/out] total amount of bytes that needs to be copied to the user
*/ * buffer. It is reduced by the amount of bytes that actually
* copied
* @buffer - buffer to use
* @dest - user supplied buffer
*/
SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start, SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start,
size_t bytes_to_read, size_t& left, size_t bytes_to_read, size_t& left,
AlignedBuffer& buffer, char* dest) const; AlignedBuffer& buffer, char* dest) const;
@ -250,13 +295,27 @@ class WinRandomAccessFile : public RandomAccessFile {
size_t bytes_to_read, size_t& left, size_t bytes_to_read, size_t& left,
char* dest) const; char* dest) const;
void CalculateReadParameters(uint64_t offset, size_t bytes_requested, WinRandomAccessImpl(WinFileData* file_base, size_t alignment,
size_t& actual_bytes_toread, const EnvOptions& options);
uint64_t& first_page_start) const;
// Override for behavior change virtual ~WinRandomAccessImpl() {}
virtual SSIZE_T PositionedReadInternal(char* src, size_t numBytes,
uint64_t offset) const; public:
WinRandomAccessImpl(const WinRandomAccessImpl&) = delete;
WinRandomAccessImpl& operator=(const WinRandomAccessImpl&) = delete;
Status ReadImpl(uint64_t offset, size_t n, Slice* result,
char* scratch) const;
void HintImpl(RandomAccessFile::AccessPattern pattern);
};
// pread() based random-access
class WinRandomAccessFile : private WinFileData,
protected WinRandomAccessImpl, // Want to be able to override PositionedReadInternal
public RandomAccessFile {
public: public:
WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment, WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment,
@ -291,18 +350,55 @@ public:
// the tail for the next write OR for Close() at which point we pad with zeros. // the tail for the next write OR for Close() at which point we pad with zeros.
// No padding is required for // No padding is required for
// buffered access. // buffered access.
class WinWritableFile : public WritableFile { class WinWritableImpl {
private: protected:
const std::string filename_;
HANDLE hFile_; WinFileData* file_data_;
const bool use_os_buffer_; // Used to indicate unbuffered access, the file
const uint64_t alignment_; const uint64_t alignment_;
// must be opened as unbuffered if false
uint64_t filesize_; // How much data is actually written disk uint64_t filesize_; // How much data is actually written disk
uint64_t reservedsize_; // how far we have reserved space uint64_t reservedsize_; // how far we have reserved space
virtual Status PreallocateInternal(uint64_t spaceToReserve); virtual Status PreallocateInternal(uint64_t spaceToReserve);
WinWritableImpl(WinFileData* file_data, size_t alignment);
~WinWritableImpl() {}
uint64_t GetAlignement() const { return alignment_; }
Status AppendImpl(const Slice& data);
// Requires that the data is aligned as specified by GetRequiredBufferAlignment()
Status PositionedAppendImpl(const Slice& data, uint64_t offset);
Status TruncateImpl(uint64_t size);
Status CloseImpl();
Status SyncImpl();
uint64_t GetFileSizeImpl() {
// Double accounting now here with WritableFileWriter
// and this size will be wrong when unbuffered access is used
// but tests implement their own writable files and do not use WritableFileWrapper
// so we need to squeeze a square peg through
// a round hole here.
return filesize_;
}
Status AllocateImpl(uint64_t offset, uint64_t len);
public:
WinWritableImpl(const WinWritableImpl&) = delete;
WinWritableImpl& operator=(const WinWritableImpl&) = delete;
};
class WinWritableFile : private WinFileData,
protected WinWritableImpl,
public WritableFile {
public: public:
WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment, WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment,
size_t capacity, const EnvOptions& options); size_t capacity, const EnvOptions& options);
@ -310,12 +406,14 @@ public:
~WinWritableFile(); ~WinWritableFile();
// Indicates if the class makes use of unbuffered I/O // Indicates if the class makes use of unbuffered I/O
// Use PositionedAppend
virtual bool UseOSBuffer() const override; virtual bool UseOSBuffer() const override;
virtual size_t GetRequiredBufferAlignment() const override; virtual size_t GetRequiredBufferAlignment() const override;
virtual Status Append(const Slice& data) override; virtual Status Append(const Slice& data) override;
// Requires that the data is aligned as specified by GetRequiredBufferAlignment()
virtual Status PositionedAppend(const Slice& data, uint64_t offset) override; virtual Status PositionedAppend(const Slice& data, uint64_t offset) override;
// Need to implement this so the file is truncated correctly // Need to implement this so the file is truncated correctly
@ -339,6 +437,57 @@ public:
virtual size_t GetUniqueId(char* id, size_t max_size) const override; virtual size_t GetUniqueId(char* id, size_t max_size) const override;
}; };
class WinRandomRWFile : private WinFileData,
protected WinRandomAccessImpl,
protected WinWritableImpl,
public RandomRWFile {
public:
WinRandomRWFile(const std::string& fname, HANDLE hFile, size_t alignment,
const EnvOptions& options);
~WinRandomRWFile() {}
// Indicates if the class makes use of unbuffered I/O
// If false you must pass aligned buffer to Write()
virtual bool UseOSBuffer() const override;
// Use the returned alignment value to allocate
// aligned buffer for Write() when UseOSBuffer()
// returns false
virtual size_t GetRequiredBufferAlignment() const override;
// Used by the file_reader_writer to decide if the ReadAhead wrapper
// should simply forward the call and do not enact read_ahead buffering or locking.
// The implementation below takes care of reading ahead
virtual bool ShouldForwardRawRequest() const override;
// For cases when read-ahead is implemented in the platform dependent
// layer. This is when ShouldForwardRawRequest() returns true.
virtual void EnableReadAhead() override;
// Write bytes in `data` at offset `offset`, Returns Status::OK() on success.
// Pass aligned buffer when UseOSBuffer() returns false.
virtual Status Write(uint64_t offset, const Slice& data) override;
// Read up to `n` bytes starting from offset `offset` and store them in
// result, provided `scratch` size should be at least `n`.
// Returns Status::OK() on success.
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;
virtual Status Flush() override;
virtual Status Sync() override;
virtual Status Fsync() { return Sync(); }
virtual Status Close() override;
};
class WinDirectory : public Directory { class WinDirectory : public Directory {
public: public:
WinDirectory() {} WinDirectory() {}

Loading…
Cancel
Save