Catchup with posix features

Summary:
Catch up with Posix features
  NewWritableRWFile must fail when file does not exists
  Implement Env::Truncate()
  Adjust Env options optimization functions
  Implement MemoryMappedBuffer on Windows.
Closes https://github.com/facebook/rocksdb/pull/3857

Differential Revision: D8053610

Pulled By: ajkr

fbshipit-source-id: ccd0d46c29648a9f6f496873bc1c9d6c5547487e
main
Dmitri Smirnov 7 years ago committed by Facebook Github Bot
parent c465509379
commit 3db8504cde
  1. 10
      env/env_test.cc
  2. 2
      env/io_posix.cc
  3. 21
      include/rocksdb/env.h
  4. 135
      port/win/env_win.cc
  5. 21
      port/win/env_win.h
  6. 21
      port/win/io_win.cc
  7. 12
      port/win/io_win.h
  8. 4
      tools/db_stress.cc

10
env/env_test.cc vendored

@ -232,10 +232,10 @@ TEST_F(EnvPosixTest, MemoryMappedFileBuffer) {
ASSERT_OK(status); ASSERT_OK(status);
ASSERT_NE(nullptr, mmap_buffer.get()); ASSERT_NE(nullptr, mmap_buffer.get());
ASSERT_NE(nullptr, mmap_buffer->base); ASSERT_NE(nullptr, mmap_buffer->GetBase());
ASSERT_EQ(kFileBytes, mmap_buffer->length); ASSERT_EQ(kFileBytes, mmap_buffer->GetLen());
std::string actual_data(static_cast<char*>(mmap_buffer->base), std::string actual_data(reinterpret_cast<const char*>(mmap_buffer->GetBase()),
mmap_buffer->length); mmap_buffer->GetLen());
ASSERT_EQ(expected_data, actual_data); ASSERT_EQ(expected_data, actual_data);
} }
@ -1437,10 +1437,8 @@ TEST_P(EnvPosixTestWithParam, PosixRandomRWFile) {
std::unique_ptr<RandomRWFile> file; std::unique_ptr<RandomRWFile> file;
#ifdef OS_LINUX
// Cannot open non-existing file. // Cannot open non-existing file.
ASSERT_NOK(env_->NewRandomRWFile(path, &file, EnvOptions())); ASSERT_NOK(env_->NewRandomRWFile(path, &file, EnvOptions()));
#endif
// Create the file using WriteableFile // Create the file using WriteableFile
{ {

2
env/io_posix.cc vendored

@ -1054,7 +1054,7 @@ Status PosixRandomRWFile::Close() {
PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() { PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() {
// TODO should have error handling though not much we can do... // TODO should have error handling though not much we can do...
munmap(this->base, length); munmap(this->base_, length_);
} }
/* /*

@ -42,7 +42,7 @@ class SequentialFile;
class Slice; class Slice;
class WritableFile; class WritableFile;
class RandomRWFile; class RandomRWFile;
struct MemoryMappedFileBuffer; class MemoryMappedFileBuffer;
class Directory; class Directory;
struct DBOptions; struct DBOptions;
struct ImmutableDBOptions; struct ImmutableDBOptions;
@ -822,13 +822,24 @@ class RandomRWFile {
// MemoryMappedFileBuffer object represents a memory-mapped file's raw buffer. // MemoryMappedFileBuffer object represents a memory-mapped file's raw buffer.
// Subclasses should release the mapping upon destruction. // Subclasses should release the mapping upon destruction.
struct MemoryMappedFileBuffer { class MemoryMappedFileBuffer {
public:
MemoryMappedFileBuffer(void* _base, size_t _length) MemoryMappedFileBuffer(void* _base, size_t _length)
: base(_base), length(_length) {} : base_(_base), length_(_length) {}
virtual ~MemoryMappedFileBuffer() = 0; virtual ~MemoryMappedFileBuffer() = 0;
void* const base; // We do not want to unmap this twice. We can make this class
const size_t length; // movable if desired, however, since
MemoryMappedFileBuffer(const MemoryMappedFileBuffer&) = delete;
MemoryMappedFileBuffer& operator=(const MemoryMappedFileBuffer&) = delete;
void* GetBase() const { return base_; }
size_t GetLen() const { return length_; }
protected:
void* base_;
const size_t length_;
}; };
// Directory object represents collection of files and implements // Directory object represents collection of files and implements

@ -112,6 +112,15 @@ Status WinEnvIO::DeleteFile(const std::string& fname) {
return result; return result;
} }
Status WinEnvIO::Truncate(const std::string& fname, size_t size) {
Status s;
int result = truncate(fname.c_str(), size);
if (result != 0) {
s = IOError("Failed to truncate: " + fname, errno);
}
return s;
}
Status WinEnvIO::GetCurrentTime(int64_t* unix_time) { Status WinEnvIO::GetCurrentTime(int64_t* unix_time) {
time_t time = std::time(nullptr); time_t time = std::time(nullptr);
if (time == (time_t)(-1)) { if (time == (time_t)(-1)) {
@ -344,7 +353,7 @@ Status WinEnvIO::NewRandomRWFile(const std::string & fname,
// Random access is to disable read-ahead as the system reads too much data // Random access is to disable read-ahead as the system reads too much data
DWORD desired_access = GENERIC_READ | GENERIC_WRITE; DWORD desired_access = GENERIC_READ | GENERIC_WRITE;
DWORD shared_mode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE; DWORD shared_mode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE;
DWORD creation_disposition = OPEN_ALWAYS; // Create if necessary or open existing DWORD creation_disposition = OPEN_EXISTING; // Fail if file does not exist
DWORD file_flags = FILE_FLAG_RANDOM_ACCESS; DWORD file_flags = FILE_FLAG_RANDOM_ACCESS;
if (options.use_direct_reads && options.use_direct_writes) { if (options.use_direct_reads && options.use_direct_writes) {
@ -380,6 +389,84 @@ Status WinEnvIO::NewRandomRWFile(const std::string & fname,
return s; return s;
} }
Status WinEnvIO::NewMemoryMappedFileBuffer(const std::string & fname,
std::unique_ptr<MemoryMappedFileBuffer>* result) {
Status s;
result->reset();
DWORD fileFlags = FILE_ATTRIBUTE_READONLY;
HANDLE hFile = INVALID_HANDLE_VALUE;
{
IOSTATS_TIMER_GUARD(open_nanos);
hFile = CreateFileA(
fname.c_str(), GENERIC_READ | GENERIC_WRITE,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
NULL,
OPEN_EXISTING, // Open only if it exists
fileFlags,
NULL);
}
if (INVALID_HANDLE_VALUE == hFile) {
auto lastError = GetLastError();
s = IOErrorFromWindowsError("Failed to open NewMemoryMappedFileBuffer: " + fname,
lastError);
return s;
}
UniqueCloseHandlePtr fileGuard(hFile, CloseHandleFunc);
uint64_t fileSize = 0;
s = GetFileSize(fname, &fileSize);
if (!s.ok()) {
return s;
}
// Will not map empty files
if (fileSize == 0) {
return Status::NotSupported("NewMemoryMappedFileBuffer can not map zero length files: " + fname);
}
// size_t is 32-bit with 32-bit builds
if (fileSize > std::numeric_limits<size_t>::max()) {
return Status::NotSupported(
"The specified file size does not fit into 32-bit memory addressing: " + fname);
}
HANDLE hMap = CreateFileMappingA(hFile, NULL, PAGE_READWRITE,
0, // Whole file at its present length
0,
NULL); // Mapping name
if (!hMap) {
auto lastError = GetLastError();
return IOErrorFromWindowsError(
"Failed to create file mapping for NewMemoryMappedFileBuffer: " + fname,
lastError);
}
UniqueCloseHandlePtr mapGuard(hMap, CloseHandleFunc);
void* base = MapViewOfFileEx(hMap, FILE_MAP_WRITE,
0, // High DWORD of access start
0, // Low DWORD
fileSize,
NULL); // Let the OS choose the mapping
if (!base) {
auto lastError = GetLastError();
return IOErrorFromWindowsError(
"Failed to MapViewOfFile for NewMemoryMappedFileBuffer: " + fname,
lastError);
}
result->reset(new WinMemoryMappedBuffer(hFile, hMap,
base, static_cast<size_t>(fileSize)));
mapGuard.release();
fileGuard.release();
return s;
}
Status WinEnvIO::NewDirectory(const std::string& name, Status WinEnvIO::NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) { std::unique_ptr<Directory>* result) {
Status s; Status s;
@ -928,27 +1015,33 @@ std::string WinEnvIO::TimeToString(uint64_t secondsSince1970) {
EnvOptions WinEnvIO::OptimizeForLogWrite(const EnvOptions& env_options, EnvOptions WinEnvIO::OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const { const DBOptions& db_options) const {
EnvOptions optimized = env_options; EnvOptions optimized(env_options);
// These two the same as default optimizations
optimized.bytes_per_sync = db_options.wal_bytes_per_sync; optimized.bytes_per_sync = db_options.wal_bytes_per_sync;
optimized.use_mmap_writes = false;
// This is because we flush only whole pages on unbuffered io and
// the last records are not guaranteed to be flushed.
optimized.use_direct_writes = false;
// TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it
// breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit
// test and make this false
optimized.fallocate_with_keep_size = true;
optimized.writable_file_max_buffer_size = optimized.writable_file_max_buffer_size =
db_options.writable_file_max_buffer_size; db_options.writable_file_max_buffer_size;
// This adversely affects %999 on windows
optimized.use_mmap_writes = false;
// Direct writes will produce a huge perf impact on
// Windows. Pre-allocate space for WAL.
optimized.use_direct_writes = false;
return optimized; return optimized;
} }
EnvOptions WinEnvIO::OptimizeForManifestWrite( EnvOptions WinEnvIO::OptimizeForManifestWrite(
const EnvOptions& env_options) const { const EnvOptions& env_options) const {
EnvOptions optimized = env_options; EnvOptions optimized(env_options);
optimized.use_mmap_writes = false; optimized.use_mmap_writes = false;
optimized.use_direct_writes = false; optimized.use_direct_reads = false;
optimized.fallocate_with_keep_size = true; return optimized;
}
EnvOptions WinEnvIO::OptimizeForManifestRead(
const EnvOptions& env_options) const {
EnvOptions optimized(env_options);
optimized.use_mmap_writes = false;
optimized.use_direct_reads = false;
return optimized; return optimized;
} }
@ -1145,6 +1238,10 @@ Status WinEnv::DeleteFile(const std::string& fname) {
return winenv_io_.DeleteFile(fname); return winenv_io_.DeleteFile(fname);
} }
Status WinEnv::Truncate(const std::string& fname, size_t size) {
return winenv_io_.Truncate(fname, size);
}
Status WinEnv::GetCurrentTime(int64_t* unix_time) { Status WinEnv::GetCurrentTime(int64_t* unix_time) {
return winenv_io_.GetCurrentTime(unix_time); return winenv_io_.GetCurrentTime(unix_time);
} }
@ -1173,10 +1270,15 @@ Status WinEnv::ReopenWritableFile(const std::string& fname,
} }
Status WinEnv::NewRandomRWFile(const std::string & fname, Status WinEnv::NewRandomRWFile(const std::string & fname,
unique_ptr<RandomRWFile>* result, const EnvOptions & options) { std::unique_ptr<RandomRWFile>* result, const EnvOptions & options) {
return winenv_io_.NewRandomRWFile(fname, result, options); return winenv_io_.NewRandomRWFile(fname, result, options);
} }
Status WinEnv::NewMemoryMappedFileBuffer(const std::string& fname,
std::unique_ptr<MemoryMappedFileBuffer>* result) {
return winenv_io_.NewMemoryMappedFileBuffer(fname, result);
}
Status WinEnv::NewDirectory(const std::string& name, Status WinEnv::NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) { std::unique_ptr<Directory>* result) {
return winenv_io_.NewDirectory(name, result); return winenv_io_.NewDirectory(name, result);
@ -1310,6 +1412,11 @@ void WinEnv::IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) {
return winenv_threads_.IncBackgroundThreadsIfNeeded(num, pri); return winenv_threads_.IncBackgroundThreadsIfNeeded(num, pri);
} }
EnvOptions WinEnv::OptimizeForManifestRead(
const EnvOptions& env_options) const {
return winenv_io_.OptimizeForManifestRead(env_options);
}
EnvOptions WinEnv::OptimizeForLogWrite(const EnvOptions& env_options, EnvOptions WinEnv::OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const { const DBOptions& db_options) const {
return winenv_io_.OptimizeForLogWrite(env_options, db_options); return winenv_io_.OptimizeForLogWrite(env_options, db_options);

@ -89,6 +89,8 @@ public:
virtual Status DeleteFile(const std::string& fname); virtual Status DeleteFile(const std::string& fname);
Status Truncate(const std::string& fname, size_t size);
virtual Status GetCurrentTime(int64_t* unix_time); virtual Status GetCurrentTime(int64_t* unix_time);
virtual Status NewSequentialFile(const std::string& fname, virtual Status NewSequentialFile(const std::string& fname,
@ -110,6 +112,10 @@ public:
unique_ptr<RandomRWFile>* result, unique_ptr<RandomRWFile>* result,
const EnvOptions& options); const EnvOptions& options);
virtual Status NewMemoryMappedFileBuffer(
const std::string& fname,
std::unique_ptr<MemoryMappedFileBuffer>* result);
virtual Status NewDirectory(const std::string& name, virtual Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result); std::unique_ptr<Directory>* result);
@ -168,6 +174,9 @@ public:
virtual EnvOptions OptimizeForManifestWrite( virtual EnvOptions OptimizeForManifestWrite(
const EnvOptions& env_options) const; const EnvOptions& env_options) const;
virtual EnvOptions OptimizeForManifestRead(
const EnvOptions& env_options) const;
size_t GetPageSize() const { return page_size_; } size_t GetPageSize() const { return page_size_; }
size_t GetAllocationGranularity() const { return allocation_granularity_; } size_t GetAllocationGranularity() const { return allocation_granularity_; }
@ -197,6 +206,8 @@ public:
Status DeleteFile(const std::string& fname) override; Status DeleteFile(const std::string& fname) override;
Status Truncate(const std::string& fname, size_t size) override;
Status GetCurrentTime(int64_t* unix_time) override; Status GetCurrentTime(int64_t* unix_time) override;
Status NewSequentialFile(const std::string& fname, Status NewSequentialFile(const std::string& fname,
@ -224,9 +235,13 @@ public:
// The returned file will only be accessed by one thread at a time. // The returned file will only be accessed by one thread at a time.
Status NewRandomRWFile(const std::string& fname, Status NewRandomRWFile(const std::string& fname,
unique_ptr<RandomRWFile>* result, std::unique_ptr<RandomRWFile>* result,
const EnvOptions& options) override; const EnvOptions& options) override;
Status NewMemoryMappedFileBuffer(
const std::string& fname,
std::unique_ptr<MemoryMappedFileBuffer>* result) override;
Status NewDirectory(const std::string& name, Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override; std::unique_ptr<Directory>* result) override;
@ -302,12 +317,16 @@ public:
void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) override; void IncBackgroundThreadsIfNeeded(int num, Env::Priority pri) override;
EnvOptions OptimizeForManifestRead(
const EnvOptions& env_options) const override;
EnvOptions OptimizeForLogWrite(const EnvOptions& env_options, EnvOptions OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const override; const DBOptions& db_options) const override;
EnvOptions OptimizeForManifestWrite( EnvOptions OptimizeForManifestWrite(
const EnvOptions& env_options) const override; const EnvOptions& env_options) const override;
private: private:
WinEnvIO winenv_io_; WinEnvIO winenv_io_;

@ -1061,6 +1061,27 @@ Status WinRandomRWFile::Close() {
return CloseImpl(); return CloseImpl();
} }
//////////////////////////////////////////////////////////////////////////
/// WinMemoryMappedBufer
WinMemoryMappedBuffer::~WinMemoryMappedBuffer() {
BOOL ret = FALSE;
if (base_ != nullptr) {
ret = ::UnmapViewOfFile(base_);
assert(ret);
base_ = nullptr;
}
if (map_handle_ != NULL && map_handle_ != INVALID_HANDLE_VALUE) {
ret = ::CloseHandle(map_handle_);
assert(ret);
map_handle_ = NULL;
}
if (file_handle_ != NULL && file_handle_ != INVALID_HANDLE_VALUE) {
ret = ::CloseHandle(file_handle_);
assert(ret);
file_handle_ = NULL;
}
}
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
/// WinDirectory /// WinDirectory

@ -411,6 +411,18 @@ class WinRandomRWFile : private WinFileData,
virtual Status Close() override; virtual Status Close() override;
}; };
class WinMemoryMappedBuffer : public MemoryMappedFileBuffer {
private:
HANDLE file_handle_;
HANDLE map_handle_;
public:
WinMemoryMappedBuffer(HANDLE file_handle, HANDLE map_handle, void* base, size_t size) :
MemoryMappedFileBuffer(base, size),
file_handle_(file_handle),
map_handle_(map_handle) {}
~WinMemoryMappedBuffer() override;
};
class WinDirectory : public Directory { class WinDirectory : public Directory {
HANDLE handle_; HANDLE handle_;
public: public:

@ -879,9 +879,9 @@ class SharedState {
FLAGS_expected_values_path, &expected_mmap_buffer_); FLAGS_expected_values_path, &expected_mmap_buffer_);
} }
if (status.ok()) { if (status.ok()) {
assert(expected_mmap_buffer_->length == expected_values_size); assert(expected_mmap_buffer_->GetLen() == expected_values_size);
values_ = values_ =
static_cast<std::atomic<uint32_t>*>(expected_mmap_buffer_->base); static_cast<std::atomic<uint32_t>*>(expected_mmap_buffer_->GetBase());
assert(values_ != nullptr); assert(values_ != nullptr);
} else { } else {
fprintf(stderr, "Failed opening shared file '%s' with error: %s\n", fprintf(stderr, "Failed opening shared file '%s' with error: %s\n",

Loading…
Cancel
Save