// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include #include #include #include #include #include #include #include #include #include #include "rocksdb/env.h" #include "rocksdb/slice.h" #include "port/port.h" #include "port/dirent.h" #include "port/win/win_logger.h" #include "util/random.h" #include "util/coding.h" #include "util/iostats_context_imp.h" #include "util/rate_limiter.h" #include "util/sync_point.h" #include "util/aligned_buffer.h" #include "util/threadpool.h" #include "util/thread_status_updater.h" #include "util/thread_status_util.h" #include // For UUID generation #include namespace rocksdb { std::string GetWindowsErrSz(DWORD err) { LPSTR lpMsgBuf; FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, err, 0, // Default language reinterpret_cast(&lpMsgBuf), 0, NULL); std::string Err = lpMsgBuf; LocalFree(lpMsgBuf); return Err; } ThreadStatusUpdater* CreateThreadStatusUpdater() { return new ThreadStatusUpdater(); } namespace { const size_t c_OneMB = (1 << 20); inline Status IOErrorFromWindowsError(const std::string& context, DWORD err) { return Status::IOError(context, GetWindowsErrSz(err)); } inline Status IOErrorFromLastWindowsError(const std::string& context) { return IOErrorFromWindowsError(context, GetLastError()); } inline Status IOError(const std::string& context, int err_number) { return Status::IOError(context, strerror(err_number)); } // TODO(sdong): temp logging. Need to help debugging. Remove it when // the feature is proved to be stable. inline void PrintThreadInfo(size_t thread_id, size_t terminatingId) { fprintf(stdout, "Bg thread %Iu terminates %Iu\n", thread_id, terminatingId); } // returns the ID of the current process inline int current_process_id() { return _getpid(); } // RAII helpers for HANDLEs const auto CloseHandleFunc = [](HANDLE h) { ::CloseHandle(h); }; typedef std::unique_ptr UniqueCloseHandlePtr; // We preserve the original name of this interface to denote the original idea // behind it. // All reads happen by a specified offset and pwrite interface does not change // the position of the file pointer. Judging from the man page and errno it does // execute // lseek atomically to return the position of the file back where it was. // WriteFile() does not // have this capability. Therefore, for both pread and pwrite the pointer is // advanced to the next position // which is fine for writes because they are (should be) sequential. // Because all the reads/writes happen by the specified offset, the caller in // theory should not // rely on the current file offset. SSIZE_T pwrite(HANDLE hFile, const char* src, size_t numBytes, uint64_t offset) { assert(numBytes <= std::numeric_limits::max()); OVERLAPPED overlapped = {0}; ULARGE_INTEGER offsetUnion; offsetUnion.QuadPart = offset; overlapped.Offset = offsetUnion.LowPart; overlapped.OffsetHigh = offsetUnion.HighPart; SSIZE_T result = 0; unsigned long bytesWritten = 0; if (FALSE == WriteFile(hFile, src, static_cast(numBytes), &bytesWritten, &overlapped)) { result = -1; } else { result = bytesWritten; } return result; } // See comments for pwrite above SSIZE_T pread(HANDLE hFile, char* src, size_t numBytes, uint64_t offset) { assert(numBytes <= std::numeric_limits::max()); OVERLAPPED overlapped = {0}; ULARGE_INTEGER offsetUnion; offsetUnion.QuadPart = offset; overlapped.Offset = offsetUnion.LowPart; overlapped.OffsetHigh = offsetUnion.HighPart; SSIZE_T result = 0; unsigned long bytesRead = 0; if (FALSE == ReadFile(hFile, src, static_cast(numBytes), &bytesRead, &overlapped)) { return -1; } else { result = bytesRead; } return result; } // Note the below two do not set errno because they are used only here in this // file // on a Windows handle and, therefore, not necessary. Translating GetLastError() // to errno // is a sad business inline int fsync(HANDLE hFile) { if (!FlushFileBuffers(hFile)) { return -1; } return 0; } // SetFileInformationByHandle() is capable of fast pre-allocates. // However, this does not change the file end position unless the file is // truncated and the pre-allocated space is not considered filled with zeros. inline Status fallocate(const std::string& filename, HANDLE hFile, uint64_t to_size) { Status status; FILE_ALLOCATION_INFO alloc_info; alloc_info.AllocationSize.QuadPart = to_size; if (!SetFileInformationByHandle(hFile, FileAllocationInfo, &alloc_info, sizeof(FILE_ALLOCATION_INFO))) { auto lastError = GetLastError(); status = IOErrorFromWindowsError( "Failed to pre-allocate space: " + filename, lastError); } return status; } inline Status ftruncate(const std::string& filename, HANDLE hFile, uint64_t toSize) { Status status; FILE_END_OF_FILE_INFO end_of_file; end_of_file.EndOfFile.QuadPart = toSize; if (!SetFileInformationByHandle(hFile, FileEndOfFileInfo, &end_of_file, sizeof(FILE_END_OF_FILE_INFO))) { auto lastError = GetLastError(); status = IOErrorFromWindowsError("Failed to Set end of file: " + filename, lastError); } return status; } size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size) { if (max_size < kMaxVarint64Length * 3) { return 0; } BY_HANDLE_FILE_INFORMATION FileInfo; BOOL result = GetFileInformationByHandle(hFile, &FileInfo); TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result); if (!result) { return 0; } char* rid = id; rid = EncodeVarint64(rid, uint64_t(FileInfo.dwVolumeSerialNumber)); rid = EncodeVarint64(rid, uint64_t(FileInfo.nFileIndexHigh)); rid = EncodeVarint64(rid, uint64_t(FileInfo.nFileIndexLow)); assert(rid >= id); return static_cast(rid - id); } // mmap() based random-access class WinMmapReadableFile : public RandomAccessFile { const std::string fileName_; HANDLE hFile_; HANDLE hMap_; const void* mapped_region_; const size_t length_; public: // mapped_region_[0,length-1] contains the mmapped contents of the file. WinMmapReadableFile(const std::string& fileName, HANDLE hFile, HANDLE hMap, const void* mapped_region, size_t length) : fileName_(fileName), hFile_(hFile), hMap_(hMap), mapped_region_(mapped_region), length_(length) {} ~WinMmapReadableFile() { BOOL ret = ::UnmapViewOfFile(mapped_region_); assert(ret); ret = ::CloseHandle(hMap_); assert(ret); ret = ::CloseHandle(hFile_); assert(ret); } virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const override { Status s; if (offset > length_) { *result = Slice(); return IOError(fileName_, EINVAL); } else if (offset + n > length_) { n = length_ - offset; } *result = Slice(reinterpret_cast(mapped_region_) + offset, n); return s; } virtual Status InvalidateCache(size_t offset, size_t length) override { return Status::OK(); } virtual size_t GetUniqueId(char* id, size_t max_size) const override { return GetUniqueIdFromFile(hFile_, id, max_size); } }; // We preallocate up to an extra megabyte and use memcpy to append new // data to the file. This is safe since we either properly close the // file before reading from it, or for log files, the reading code // knows enough to skip zero suffixes. class WinMmapFile : public WritableFile { private: const std::string filename_; HANDLE hFile_; HANDLE hMap_; const size_t page_size_; // We flush the mapping view in page_size // increments. We may decide if this is a memory // page size or SSD page size const size_t allocation_granularity_; // View must start at such a granularity size_t reserved_size_; // Preallocated size size_t mapping_size_; // The max size of the mapping object // we want to guess the final file size to minimize the remapping size_t view_size_; // How much memory to map into a view at a time char* mapped_begin_; // Must begin at the file offset that is aligned with // allocation_granularity_ char* mapped_end_; char* dst_; // Where to write next (in range [mapped_begin_,mapped_end_]) char* last_sync_; // Where have we synced up to uint64_t file_offset_; // Offset of mapped_begin_ in file // Do we have unsynced writes? bool pending_sync_; // Can only truncate or reserve to a sector size aligned if // used on files that are opened with Unbuffered I/O Status TruncateFile(uint64_t toSize) { return ftruncate(filename_, hFile_, toSize); } Status UnmapCurrentRegion() { Status status; if (mapped_begin_ != nullptr) { if (!::UnmapViewOfFile(mapped_begin_)) { status = IOErrorFromWindowsError( "Failed to unmap file view: " + filename_, GetLastError()); } // Move on to the next portion of the file file_offset_ += view_size_; // UnmapView automatically sends data to disk but not the metadata // which is good and provides some equivalent of fdatasync() on Linux // therefore, we donot need separate flag for metadata mapped_begin_ = nullptr; mapped_end_ = nullptr; dst_ = nullptr; last_sync_ = nullptr; pending_sync_ = false; } return status; } Status MapNewRegion() { Status status; assert(mapped_begin_ == nullptr); size_t minDiskSize = file_offset_ + view_size_; if (minDiskSize > reserved_size_) { status = Allocate(file_offset_, view_size_); if (!status.ok()) { return status; } } // Need to remap if (hMap_ == NULL || reserved_size_ > mapping_size_) { if (hMap_ != NULL) { // Unmap the previous one BOOL ret = ::CloseHandle(hMap_); assert(ret); hMap_ = NULL; } ULARGE_INTEGER mappingSize; mappingSize.QuadPart = reserved_size_; hMap_ = CreateFileMappingA( hFile_, NULL, // Security attributes PAGE_READWRITE, // There is not a write only mode for mapping mappingSize.HighPart, // Enable mapping the whole file but the actual // amount mapped is determined by MapViewOfFile mappingSize.LowPart, NULL); // Mapping name if (NULL == hMap_) { return IOErrorFromWindowsError( "WindowsMmapFile failed to create file mapping for: " + filename_, GetLastError()); } mapping_size_ = reserved_size_; } ULARGE_INTEGER offset; offset.QuadPart = file_offset_; // View must begin at the granularity aligned offset mapped_begin_ = reinterpret_cast( MapViewOfFileEx(hMap_, FILE_MAP_WRITE, offset.HighPart, offset.LowPart, view_size_, NULL)); if (!mapped_begin_) { status = IOErrorFromWindowsError( "WindowsMmapFile failed to map file view: " + filename_, GetLastError()); } else { mapped_end_ = mapped_begin_ + view_size_; dst_ = mapped_begin_; last_sync_ = mapped_begin_; pending_sync_ = false; } return status; } public: WinMmapFile(const std::string& fname, HANDLE hFile, size_t page_size, size_t allocation_granularity, const EnvOptions& options) : filename_(fname), hFile_(hFile), hMap_(NULL), page_size_(page_size), allocation_granularity_(allocation_granularity), reserved_size_(0), mapping_size_(0), view_size_(0), mapped_begin_(nullptr), mapped_end_(nullptr), dst_(nullptr), last_sync_(nullptr), file_offset_(0), pending_sync_(false) { // Allocation granularity must be obtained from GetSystemInfo() and must be // a power of two. assert(allocation_granularity > 0); assert((allocation_granularity & (allocation_granularity - 1)) == 0); assert(page_size > 0); assert((page_size & (page_size - 1)) == 0); // Only for memory mapped writes assert(options.use_mmap_writes); // View size must be both the multiple of allocation_granularity AND the // page size and the granularity is usually a multiple of a page size. const size_t viewSize = 32 * 1024; // 32Kb similar to the Windows File Cache in buffered mode view_size_ = Roundup(viewSize, allocation_granularity_); } ~WinMmapFile() { if (hFile_) { this->Close(); } } virtual Status Append(const Slice& data) override { const char* src = data.data(); size_t left = data.size(); while (left > 0) { assert(mapped_begin_ <= dst_); size_t avail = mapped_end_ - dst_; if (avail == 0) { Status s = UnmapCurrentRegion(); if (s.ok()) { s = MapNewRegion(); } if (!s.ok()) { return s; } } else { size_t n = std::min(left, avail); memcpy(dst_, src, n); dst_ += n; src += n; left -= n; pending_sync_ = true; } } // Now make sure that the last partial page is padded with zeros if needed size_t bytesToPad = Roundup(size_t(dst_), page_size_) - size_t(dst_); if (bytesToPad > 0) { memset(dst_, 0, bytesToPad); } return Status::OK(); } // Means Close() will properly take care of truncate // and it does not need any additional information virtual Status Truncate(uint64_t size) override { return Status::OK(); } virtual Status Close() override { Status s; assert(NULL != hFile_); // We truncate to the precise size so no // uninitialized data at the end. SetEndOfFile // which we use does not write zeros and it is good. uint64_t targetSize = GetFileSize(); if (mapped_begin_ != nullptr) { // Sync before unmapping to make sure everything // is on disk and there is not a lazy writing // so we are deterministic with the tests Sync(); s = UnmapCurrentRegion(); } if (NULL != hMap_) { BOOL ret = ::CloseHandle(hMap_); if (!ret && s.ok()) { auto lastError = GetLastError(); s = IOErrorFromWindowsError( "Failed to Close mapping for file: " + filename_, lastError); } hMap_ = NULL; } if (hFile_ != NULL) { TruncateFile(targetSize); BOOL ret = ::CloseHandle(hFile_); hFile_ = NULL; if (!ret && s.ok()) { auto lastError = GetLastError(); s = IOErrorFromWindowsError( "Failed to close file map handle: " + filename_, lastError); } } return s; } virtual Status Flush() override { return Status::OK(); } // Flush only data virtual Status Sync() override { Status s; // Some writes occurred since last sync if (dst_ > last_sync_) { assert(mapped_begin_); assert(dst_); assert(dst_ > mapped_begin_); assert(dst_ < mapped_end_); size_t page_begin = TruncateToPageBoundary(page_size_, last_sync_ - mapped_begin_); size_t page_end = TruncateToPageBoundary(page_size_, dst_ - mapped_begin_ - 1); // Flush only the amount of that is a multiple of pages if (!::FlushViewOfFile(mapped_begin_ + page_begin, (page_end - page_begin) + page_size_)) { s = IOErrorFromWindowsError("Failed to FlushViewOfFile: " + filename_, GetLastError()); } else { last_sync_ = dst_; } } return s; } /** * Flush data as well as metadata to stable storage. */ virtual Status Fsync() override { Status s = Sync(); // Flush metadata if (s.ok() && pending_sync_) { if (!::FlushFileBuffers(hFile_)) { s = IOErrorFromWindowsError("Failed to FlushFileBuffers: " + filename_, GetLastError()); } pending_sync_ = false; } return s; } /** * Get the size of valid data in the file. This will not match the * size that is returned from the filesystem because we use mmap * to extend file by map_size every time. */ virtual uint64_t GetFileSize() override { size_t used = dst_ - mapped_begin_; return file_offset_ + used; } virtual Status InvalidateCache(size_t offset, size_t length) override { return Status::OK(); } virtual Status Allocate(uint64_t offset, uint64_t len) override { Status status; TEST_KILL_RANDOM("WinMmapFile::Allocate", rocksdb_kill_odds); // Make sure that we reserve an aligned amount of space // since the reservation block size is driven outside so we want // to check if we are ok with reservation here size_t spaceToReserve = Roundup(offset + len, view_size_); // Nothing to do if (spaceToReserve <= reserved_size_) { return status; } IOSTATS_TIMER_GUARD(allocate_nanos); status = fallocate(filename_, hFile_, spaceToReserve); if (status.ok()) { reserved_size_ = spaceToReserve; } return status; } virtual size_t GetUniqueId(char* id, size_t max_size) const override { return GetUniqueIdFromFile(hFile_, id, max_size); } }; class WinSequentialFile : public SequentialFile { private: const std::string filename_; HANDLE file_; // There is no equivalent of advising away buffered pages as in posix. // To implement this flag we would need to do unbuffered reads which // will need to be aligned (not sure there is a guarantee that the buffer // passed in is aligned). // Hence we currently ignore this flag. It is used only in a few cases // which should not be perf critical. // If perf evaluation finds this to be a problem, we can look into // implementing this. bool use_os_buffer_; public: WinSequentialFile(const std::string& fname, HANDLE f, const EnvOptions& options) : filename_(fname), file_(f), use_os_buffer_(options.use_os_buffer) {} virtual ~WinSequentialFile() { assert(file_ != INVALID_HANDLE_VALUE); CloseHandle(file_); } virtual Status Read(size_t n, Slice* result, char* scratch) override { Status s; size_t r = 0; // Windows ReadFile API accepts a DWORD. // While it is possible to read in a loop if n is > UINT_MAX // it is a highly unlikely case. if (n > UINT_MAX) { return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER); } DWORD bytesToRead = static_cast(n); //cast is safe due to the check above DWORD bytesRead = 0; BOOL ret = ReadFile(file_, scratch, bytesToRead, &bytesRead, NULL); if (ret == TRUE) { r = bytesRead; } else { return IOErrorFromWindowsError(filename_, GetLastError()); } *result = Slice(scratch, r); return s; } virtual Status Skip(uint64_t n) override { // Can't handle more than signed max as SetFilePointerEx accepts a signed 64-bit // integer. As such it is a highly unlikley case to have n so large. if (n > _I64_MAX) { return IOErrorFromWindowsError(filename_, ERROR_INVALID_PARAMETER); } LARGE_INTEGER li; li.QuadPart = static_cast(n); //cast is safe due to the check above BOOL ret = SetFilePointerEx(file_, li, NULL, FILE_CURRENT); if (ret == FALSE) { return IOErrorFromWindowsError(filename_, GetLastError()); } return Status::OK(); } virtual Status InvalidateCache(size_t offset, size_t length) override { return Status::OK(); } }; // pread() based random-access class WinRandomAccessFile : public RandomAccessFile { const std::string filename_; HANDLE hFile_; const bool use_os_buffer_; bool read_ahead_; const size_t compaction_readahead_size_; const size_t random_access_max_buffer_size_; mutable std::mutex buffer_mut_; mutable AlignedBuffer buffer_; mutable uint64_t buffered_start_; // file offset set that is currently buffered /* * The function reads a requested amount of bytes into the specified aligned * buffer Upon success the function sets the length of the buffer to the * amount of bytes actually read even though it might be less than actually * requested. It then copies the amount of bytes requested by the user (left) * to the user supplied buffer (dest) and reduces left by the amount of bytes * copied to the user buffer * * @user_offset [in] - offset on disk where the read was requested by the user * @first_page_start [in] - actual page aligned disk offset that we want to * read from * @bytes_to_read [in] - total amount of bytes that will be read from disk * which is generally greater or equal to the amount * that the user has requested due to the * either alignment requirements or read_ahead in * effect. * @left [in/out] total amount of bytes that needs to be copied to the user * buffer. It is reduced by the amount of bytes that actually * copied * @buffer - buffer to use * @dest - user supplied buffer */ SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start, size_t bytes_to_read, size_t& left, AlignedBuffer& buffer, char* dest) const { assert(buffer.CurrentSize() == 0); assert(buffer.Capacity() >= bytes_to_read); SSIZE_T read = pread(hFile_, buffer.Destination(), bytes_to_read, first_page_start); if (read > 0) { buffer.Size(read); // Let's figure out how much we read from the users standpoint if ((first_page_start + buffer.CurrentSize()) > user_offset) { assert(first_page_start <= user_offset); size_t buffer_offset = user_offset - first_page_start; read = buffer.Read(dest, buffer_offset, left); } else { read = 0; } left -= read; } return read; } SSIZE_T ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start, size_t bytes_to_read, size_t& left, char* dest) const { AlignedBuffer bigBuffer; bigBuffer.Alignment(buffer_.Alignment()); bigBuffer.AllocateNewBuffer(bytes_to_read); return ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, left, bigBuffer, dest); } SSIZE_T ReadIntoInstanceBuffer(uint64_t user_offset, uint64_t first_page_start, size_t bytes_to_read, size_t& left, char* dest) const { SSIZE_T read = ReadIntoBuffer(user_offset, first_page_start, bytes_to_read, left, buffer_, dest); if (read > 0) { buffered_start_ = first_page_start; } return read; } void CalculateReadParameters(uint64_t offset, size_t bytes_requested, size_t& actual_bytes_toread, uint64_t& first_page_start) const { const size_t alignment = buffer_.Alignment(); first_page_start = TruncateToPageBoundary(alignment, offset); const uint64_t last_page_start = TruncateToPageBoundary(alignment, offset + bytes_requested - 1); actual_bytes_toread = (last_page_start - first_page_start) + alignment; } public: WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment, const EnvOptions& options) : filename_(fname), hFile_(hFile), use_os_buffer_(options.use_os_buffer), read_ahead_(false), compaction_readahead_size_(options.compaction_readahead_size), random_access_max_buffer_size_(options.random_access_max_buffer_size), buffer_(), buffered_start_(0) { assert(!options.use_mmap_reads); // Unbuffered access, use internal buffer for reads if (!use_os_buffer_) { // Do not allocate the buffer either until the first request or // until there is a call to allocate a read-ahead buffer buffer_.Alignment(alignment); } } virtual ~WinRandomAccessFile() { if (hFile_ != NULL && hFile_ != INVALID_HANDLE_VALUE) { ::CloseHandle(hFile_); } } virtual void EnableReadAhead() override { this->Hint(SEQUENTIAL); } virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const override { Status s; SSIZE_T r = -1; size_t left = n; char* dest = scratch; if (n == 0) { *result = Slice(scratch, 0); return s; } // When in unbuffered mode we need to do the following changes: // - use our own aligned buffer // - always read at the offset of that is a multiple of alignment if (!use_os_buffer_) { uint64_t first_page_start = 0; size_t actual_bytes_toread = 0; size_t bytes_requested = left; if (!read_ahead_ && random_access_max_buffer_size_ == 0) { CalculateReadParameters(offset, bytes_requested, actual_bytes_toread, first_page_start); assert(actual_bytes_toread > 0); r = ReadIntoOneShotBuffer(offset, first_page_start, actual_bytes_toread, left, dest); } else { std::unique_lock lock(buffer_mut_); // Let's see if at least some of the requested data is already // in the buffer if (offset >= buffered_start_ && offset < (buffered_start_ + buffer_.CurrentSize())) { size_t buffer_offset = offset - buffered_start_; r = buffer_.Read(dest, buffer_offset, left); assert(r >= 0); left -= size_t(r); offset += r; dest += r; } // Still some left or none was buffered if (left > 0) { // Figure out the start/end offset for reading and amount to read bytes_requested = left; if (read_ahead_ && bytes_requested < compaction_readahead_size_) { bytes_requested = compaction_readahead_size_; } CalculateReadParameters(offset, bytes_requested, actual_bytes_toread, first_page_start); assert(actual_bytes_toread > 0); if (buffer_.Capacity() < actual_bytes_toread) { // If we are in read-ahead mode or the requested size // exceeds max buffer size then use one-shot // big buffer otherwise reallocate main buffer if (read_ahead_ || (actual_bytes_toread > random_access_max_buffer_size_)) { // Unlock the mutex since we are not using instance buffer lock.unlock(); r = ReadIntoOneShotBuffer(offset, first_page_start, actual_bytes_toread, left, dest); } else { buffer_.AllocateNewBuffer(actual_bytes_toread); r = ReadIntoInstanceBuffer(offset, first_page_start, actual_bytes_toread, left, dest); } } else { buffer_.Clear(); r = ReadIntoInstanceBuffer(offset, first_page_start, actual_bytes_toread, left, dest); } } } } else { r = pread(hFile_, scratch, left, offset); if (r > 0) { left -= r; } } *result = Slice(scratch, (r < 0) ? 0 : n - left); if (r < 0) { s = IOErrorFromLastWindowsError(filename_); } return s; } virtual bool ShouldForwardRawRequest() const override { return true; } virtual void Hint(AccessPattern pattern) override { if (pattern == SEQUENTIAL && !use_os_buffer_ && compaction_readahead_size_ > 0) { std::lock_guard lg(buffer_mut_); if (!read_ahead_) { read_ahead_ = true; // This would allocate read-ahead size + 2 alignments // - one for memory alignment which added implicitly by AlignedBuffer // - We add one more alignment because we will read one alignment more // from disk buffer_.AllocateNewBuffer(compaction_readahead_size_ + buffer_.Alignment()); } } } virtual Status InvalidateCache(size_t offset, size_t length) override { return Status::OK(); } virtual size_t GetUniqueId(char* id, size_t max_size) const override { return GetUniqueIdFromFile(hFile_, id, max_size); } }; // This is a sequential write class. It has been mimicked (as others) after // the original Posix class. We add support for unbuffered I/O on windows as // well // we utilize the original buffer as an alignment buffer to write directly to // file with no buffering. // No buffering requires that the provided buffer is aligned to the physical // sector size (SSD page size) and // that all SetFilePointer() operations to occur with such an alignment. // We thus always write in sector/page size increments to the drive and leave // the tail for the next write OR for Close() at which point we pad with zeros. // No padding is required for // buffered access. class WinWritableFile : public WritableFile { private: const std::string filename_; HANDLE hFile_; const bool use_os_buffer_; // Used to indicate unbuffered access, the file const uint64_t alignment_; // must be opened as unbuffered if false uint64_t filesize_; // How much data is actually written disk uint64_t reservedsize_; // how far we have reserved space public: WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment, size_t capacity, const EnvOptions& options) : filename_(fname), hFile_(hFile), use_os_buffer_(options.use_os_buffer), alignment_(alignment), filesize_(0), reservedsize_(0) { assert(!options.use_mmap_writes); } ~WinWritableFile() { if (NULL != hFile_ && INVALID_HANDLE_VALUE != hFile_) { WinWritableFile::Close(); } } // Indicates if the class makes use of unbuffered I/O virtual bool UseOSBuffer() const override { return use_os_buffer_; } virtual size_t GetRequiredBufferAlignment() const override { return alignment_; } virtual Status Append(const Slice& data) override { // Used for buffered access ONLY assert(use_os_buffer_); assert(data.size() < std::numeric_limits::max()); Status s; DWORD bytesWritten = 0; if (!WriteFile(hFile_, data.data(), static_cast(data.size()), &bytesWritten, NULL)) { auto lastError = GetLastError(); s = IOErrorFromWindowsError( "Failed to WriteFile: " + filename_, lastError); } else { assert(size_t(bytesWritten) == data.size()); filesize_ += data.size(); } return s; } virtual Status PositionedAppend(const Slice& data, uint64_t offset) override { Status s; SSIZE_T ret = pwrite(hFile_, data.data(), data.size(), offset); // Error break if (ret < 0) { auto lastError = GetLastError(); s = IOErrorFromWindowsError( "Failed to pwrite for: " + filename_, lastError); } else { // With positional write it is not clear at all // if this actually extends the filesize assert(size_t(ret) == data.size()); filesize_ += data.size(); } return s; } // Need to implement this so the file is truncated correctly // when buffered and unbuffered mode virtual Status Truncate(uint64_t size) override { Status s = ftruncate(filename_, hFile_, size); if (s.ok()) { filesize_ = size; } return s; } virtual Status Close() override { Status s; assert(INVALID_HANDLE_VALUE != hFile_); if (fsync(hFile_) < 0) { auto lastError = GetLastError(); s = IOErrorFromWindowsError("fsync failed at Close() for: " + filename_, lastError); } if (FALSE == ::CloseHandle(hFile_)) { auto lastError = GetLastError(); s = IOErrorFromWindowsError("CloseHandle failed for: " + filename_, lastError); } hFile_ = INVALID_HANDLE_VALUE; return s; } // write out the cached data to the OS cache // This is now taken care of the WritableFileWriter virtual Status Flush() override { return Status::OK(); } virtual Status Sync() override { Status s; // Calls flush buffers if (fsync(hFile_) < 0) { auto lastError = GetLastError(); s = IOErrorFromWindowsError("fsync failed at Sync() for: " + filename_, lastError); } return s; } virtual Status Fsync() override { return Sync(); } virtual uint64_t GetFileSize() override { // Double accounting now here with WritableFileWriter // and this size will be wrong when unbuffered access is used // but tests implement their own writable files and do not use WritableFileWrapper // so we need to squeeze a square peg through // a round hole here. return filesize_; } virtual Status Allocate(uint64_t offset, uint64_t len) override { Status status; TEST_KILL_RANDOM("WinWritableFile::Allocate", rocksdb_kill_odds); // Make sure that we reserve an aligned amount of space // since the reservation block size is driven outside so we want // to check if we are ok with reservation here size_t spaceToReserve = Roundup(offset + len, alignment_); // Nothing to do if (spaceToReserve <= reservedsize_) { return status; } IOSTATS_TIMER_GUARD(allocate_nanos); status = fallocate(filename_, hFile_, spaceToReserve); if (status.ok()) { reservedsize_ = spaceToReserve; } return status; } virtual size_t GetUniqueId(char* id, size_t max_size) const override { return GetUniqueIdFromFile(hFile_, id, max_size); } }; class WinDirectory : public Directory { public: WinDirectory() {} virtual Status Fsync() override { return Status::OK(); } }; class WinFileLock : public FileLock { public: explicit WinFileLock(HANDLE hFile) : hFile_(hFile) { assert(hFile != NULL); assert(hFile != INVALID_HANDLE_VALUE); } ~WinFileLock() { BOOL ret = ::CloseHandle(hFile_); assert(ret); } private: HANDLE hFile_; }; namespace { void WinthreadCall(const char* label, std::error_code result) { if (0 != result.value()) { fprintf(stderr, "pthread %s: %s\n", label, strerror(result.value())); abort(); } } } typedef VOID(WINAPI * FnGetSystemTimePreciseAsFileTime)(LPFILETIME); class WinEnv : public Env { public: WinEnv(); virtual ~WinEnv() { for (auto& th : threads_to_join_) { th.join(); } threads_to_join_.clear(); for (auto& thpool : thread_pools_) { thpool.JoinAllThreads(); } // All threads must be joined before the deletion of // thread_status_updater_. delete thread_status_updater_; } virtual Status DeleteFile(const std::string& fname) override { Status result; if (_unlink(fname.c_str())) { result = IOError("Failed to delete: " + fname, errno); } return result; } Status GetCurrentTime(int64_t* unix_time) override { time_t time = std::time(nullptr); if (time == (time_t)(-1)) { return Status::NotSupported("Failed to get time"); } *unix_time = time; return Status::OK(); } virtual Status NewSequentialFile(const std::string& fname, std::unique_ptr* result, const EnvOptions& options) override { Status s; result->reset(); // Corruption test needs to rename and delete files of these kind // while they are still open with another handle. For that reason we // allow share_write and delete(allows rename). HANDLE hFile = INVALID_HANDLE_VALUE; { IOSTATS_TIMER_GUARD(open_nanos); hFile = CreateFileA( fname.c_str(), GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL, OPEN_EXISTING, // Original fopen mode is "rb" FILE_ATTRIBUTE_NORMAL, NULL); } if (INVALID_HANDLE_VALUE == hFile) { auto lastError = GetLastError(); s = IOErrorFromWindowsError("Failed to open NewSequentialFile" + fname, lastError); } else { result->reset(new WinSequentialFile(fname, hFile, options)); } return s; } virtual Status NewRandomAccessFile(const std::string& fname, std::unique_ptr* result, const EnvOptions& options) override { result->reset(); Status s; // Open the file for read-only random access // Random access is to disable read-ahead as the system reads too much data DWORD fileFlags = FILE_ATTRIBUTE_READONLY; if (!options.use_os_buffer && !options.use_mmap_reads) { fileFlags |= FILE_FLAG_NO_BUFFERING; } else { fileFlags |= FILE_FLAG_RANDOM_ACCESS; } /// Shared access is necessary for corruption test to pass // almost all tests would work with a possible exception of fault_injection HANDLE hFile = 0; { IOSTATS_TIMER_GUARD(open_nanos); hFile = CreateFileA(fname.c_str(), GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL, OPEN_EXISTING, fileFlags, NULL); } if (INVALID_HANDLE_VALUE == hFile) { auto lastError = GetLastError(); return IOErrorFromWindowsError( "NewRandomAccessFile failed to Create/Open: " + fname, lastError); } UniqueCloseHandlePtr fileGuard(hFile, CloseHandleFunc); // CAUTION! This will map the entire file into the process address space if (options.use_mmap_reads && sizeof(void*) >= 8) { // Use mmap when virtual address-space is plentiful. uint64_t fileSize; s = GetFileSize(fname, &fileSize); if (s.ok()) { // Will not map empty files if (fileSize == 0) { return IOError( "NewRandomAccessFile failed to map empty file: " + fname, EINVAL); } HANDLE hMap = CreateFileMappingA(hFile, NULL, PAGE_READONLY, 0, // Whole file at its present length 0, NULL); // Mapping name if (!hMap) { auto lastError = GetLastError(); return IOErrorFromWindowsError( "Failed to create file mapping for NewRandomAccessFile: " + fname, lastError); } UniqueCloseHandlePtr mapGuard(hMap, CloseHandleFunc); const void* mapped_region = MapViewOfFileEx(hMap, FILE_MAP_READ, 0, // High DWORD of access start 0, // Low DWORD fileSize, NULL); // Let the OS choose the mapping if (!mapped_region) { auto lastError = GetLastError(); return IOErrorFromWindowsError( "Failed to MapViewOfFile for NewRandomAccessFile: " + fname, lastError); } result->reset(new WinMmapReadableFile(fname, hFile, hMap, mapped_region, fileSize)); mapGuard.release(); fileGuard.release(); } } else { result->reset(new WinRandomAccessFile(fname, hFile, page_size_, options)); fileGuard.release(); } return s; } virtual Status NewWritableFile(const std::string& fname, std::unique_ptr* result, const EnvOptions& options) override { const size_t c_BufferCapacity = 64 * 1024; EnvOptions local_options(options); result->reset(); Status s; DWORD fileFlags = FILE_ATTRIBUTE_NORMAL; if (!local_options.use_os_buffer && !local_options.use_mmap_writes) { fileFlags = FILE_FLAG_NO_BUFFERING; } // Desired access. We are want to write only here but if we want to memory // map // the file then there is no write only mode so we have to create it // Read/Write // However, MapViewOfFile specifies only Write only DWORD desired_access = GENERIC_WRITE; DWORD shared_mode = FILE_SHARE_READ; if (local_options.use_mmap_writes) { desired_access |= GENERIC_READ; } else { // Adding this solely for tests to pass (fault_injection_test, // wal_manager_test). shared_mode |= (FILE_SHARE_WRITE | FILE_SHARE_DELETE); } HANDLE hFile = 0; { IOSTATS_TIMER_GUARD(open_nanos); hFile = CreateFileA( fname.c_str(), desired_access, // Access desired shared_mode, NULL, // Security attributes CREATE_ALWAYS, // Posix env says O_CREAT | O_RDWR | O_TRUNC fileFlags, // Flags NULL); // Template File } if (INVALID_HANDLE_VALUE == hFile) { auto lastError = GetLastError(); return IOErrorFromWindowsError( "Failed to create a NewWriteableFile: " + fname, lastError); } if (options.use_mmap_writes) { // We usually do not use mmmapping on SSD and thus we pass memory // page_size result->reset(new WinMmapFile(fname, hFile, page_size_, allocation_granularity_, local_options)); } else { // Here we want the buffer allocation to be aligned by the SSD page size // and to be a multiple of it result->reset(new WinWritableFile(fname, hFile, page_size_, c_BufferCapacity, local_options)); } return s; } virtual Status NewDirectory(const std::string& name, std::unique_ptr* result) override { Status s; // Must be nullptr on failure result->reset(); // Must fail if directory does not exist if (!DirExists(name)) { s = IOError("Directory does not exist: " + name, EEXIST); } else { IOSTATS_TIMER_GUARD(open_nanos); result->reset(new WinDirectory); } return s; } virtual Status FileExists(const std::string& fname) override { // F_OK == 0 const int F_OK_ = 0; return _access(fname.c_str(), F_OK_) == 0 ? Status::OK() : Status::NotFound(); } virtual Status GetChildren(const std::string& dir, std::vector* result) override { std::vector output; Status status; auto CloseDir = [](DIR* p) { closedir(p); }; std::unique_ptr dirp(opendir(dir.c_str()), CloseDir); if (!dirp) { status = IOError(dir, errno); } else { if (result->capacity() > 0) { output.reserve(result->capacity()); } struct dirent* ent = readdir(dirp.get()); while (ent) { output.push_back(ent->d_name); ent = readdir(dirp.get()); } } output.swap(*result); return status; } virtual Status CreateDir(const std::string& name) override { Status result; if (_mkdir(name.c_str()) != 0) { auto code = errno; result = IOError("Failed to create dir: " + name, code); } return result; } virtual Status CreateDirIfMissing(const std::string& name) override { Status result; if (DirExists(name)) { return result; } if (_mkdir(name.c_str()) != 0) { if (errno == EEXIST) { result = Status::IOError("`" + name + "' exists but is not a directory"); } else { auto code = errno; result = IOError("Failed to create dir: " + name, code); } } return result; } virtual Status DeleteDir(const std::string& name) override { Status result; if (_rmdir(name.c_str()) != 0) { auto code = errno; result = IOError("Failed to remove dir: " + name, code); } return result; } virtual Status GetFileSize(const std::string& fname, uint64_t* size) override { Status s; WIN32_FILE_ATTRIBUTE_DATA attrs; if (GetFileAttributesExA(fname.c_str(), GetFileExInfoStandard, &attrs)) { ULARGE_INTEGER file_size; file_size.HighPart = attrs.nFileSizeHigh; file_size.LowPart = attrs.nFileSizeLow; *size = file_size.QuadPart; } else { auto lastError = GetLastError(); s = IOErrorFromWindowsError("Can not get size for: " + fname, lastError); } return s; } static inline uint64_t FileTimeToUnixTime(const FILETIME& ftTime) { const uint64_t c_FileTimePerSecond = 10000000U; // UNIX epoch starts on 1970-01-01T00:00:00Z // Windows FILETIME starts on 1601-01-01T00:00:00Z // Therefore, we need to subtract the below number of seconds from // the seconds that we obtain from FILETIME with an obvious loss of // precision const uint64_t c_SecondBeforeUnixEpoch = 11644473600U; ULARGE_INTEGER li; li.HighPart = ftTime.dwHighDateTime; li.LowPart = ftTime.dwLowDateTime; uint64_t result = (li.QuadPart / c_FileTimePerSecond) - c_SecondBeforeUnixEpoch; return result; } virtual Status GetFileModificationTime(const std::string& fname, uint64_t* file_mtime) override { Status s; WIN32_FILE_ATTRIBUTE_DATA attrs; if (GetFileAttributesExA(fname.c_str(), GetFileExInfoStandard, &attrs)) { *file_mtime = FileTimeToUnixTime(attrs.ftLastWriteTime); } else { auto lastError = GetLastError(); s = IOErrorFromWindowsError( "Can not get file modification time for: " + fname, lastError); *file_mtime = 0; } return s; } virtual Status RenameFile(const std::string& src, const std::string& target) override { Status result; // rename() is not capable of replacing the existing file as on Linux // so use OS API directly if (!MoveFileExA(src.c_str(), target.c_str(), MOVEFILE_REPLACE_EXISTING)) { DWORD lastError = GetLastError(); std::string text("Failed to rename: "); text.append(src).append(" to: ").append(target); result = IOErrorFromWindowsError(text, lastError); } return result; } virtual Status LinkFile(const std::string& src, const std::string& target) override { Status result; if (!CreateHardLinkA(target.c_str(), src.c_str(), NULL)) { DWORD lastError = GetLastError(); std::string text("Failed to link: "); text.append(src).append(" to: ").append(target); result = IOErrorFromWindowsError(text, lastError); } return result; } virtual Status LockFile(const std::string& lockFname, FileLock** lock) override { assert(lock != nullptr); *lock = NULL; Status result; // No-sharing, this is a LOCK file const DWORD ExclusiveAccessON = 0; // Obtain exclusive access to the LOCK file // Previously, instead of NORMAL attr we set DELETE on close and that worked // well except with fault_injection test that insists on deleting it. HANDLE hFile = 0; { IOSTATS_TIMER_GUARD(open_nanos); hFile = CreateFileA(lockFname.c_str(), (GENERIC_READ | GENERIC_WRITE), ExclusiveAccessON, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL); } if (INVALID_HANDLE_VALUE == hFile) { auto lastError = GetLastError(); result = IOErrorFromWindowsError( "Failed to create lock file: " + lockFname, lastError); } else { *lock = new WinFileLock(hFile); } return result; } virtual Status UnlockFile(FileLock* lock) override { Status result; assert(lock != nullptr); delete lock; return result; } virtual void Schedule(void (*function)(void*), void* arg, Priority pri = LOW, void* tag = nullptr, void (*unschedFunction)(void* arg) = 0) override; virtual int UnSchedule(void* arg, Priority pri) override; virtual void StartThread(void (*function)(void* arg), void* arg) override; virtual void WaitForJoin() override; virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override; virtual Status GetTestDirectory(std::string* result) override { std::string output; const char* env = getenv("TEST_TMPDIR"); if (env && env[0] != '\0') { output = env; CreateDir(output); } else { env = getenv("TMP"); if (env && env[0] != '\0') { output = env; } else { output = "c:\\tmp"; } CreateDir(output); } output.append("\\testrocksdb-"); output.append(std::to_string(_getpid())); CreateDir(output); output.swap(*result); return Status::OK(); } virtual Status GetThreadList( std::vector* thread_list) override { assert(thread_status_updater_); return thread_status_updater_->GetThreadList(thread_list); } static uint64_t gettid() { uint64_t thread_id = GetCurrentThreadId(); return thread_id; } virtual uint64_t GetThreadID() const override { return gettid(); } virtual Status NewLogger(const std::string& fname, std::shared_ptr* result) override { Status s; result->reset(); HANDLE hFile = 0; { IOSTATS_TIMER_GUARD(open_nanos); hFile = CreateFileA( fname.c_str(), GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_DELETE, // In RocksDb log files are // renamed and deleted before // they are closed. This enables // doing so. NULL, CREATE_ALWAYS, // Original fopen mode is "w" FILE_ATTRIBUTE_NORMAL, NULL); } if (INVALID_HANDLE_VALUE == hFile) { auto lastError = GetLastError(); s = IOErrorFromWindowsError("Failed to open LogFile" + fname, lastError); } else { { // With log files we want to set the true creation time as of now // because the system // for some reason caches the attributes of the previous file that just // been renamed from // this name so auto_roll_logger_test fails FILETIME ft; GetSystemTimeAsFileTime(&ft); // Set creation, last access and last write time to the same value SetFileTime(hFile, &ft, &ft, &ft); } result->reset(new WinLogger(&WinEnv::gettid, this, hFile)); } return s; } virtual uint64_t NowMicros() override { if (GetSystemTimePreciseAsFileTime_ != NULL) { // all std::chrono clocks on windows proved to return // values that may repeat that is not good enough for some uses. const int64_t c_UnixEpochStartTicks = 116444736000000000i64; const int64_t c_FtToMicroSec = 10; // This interface needs to return system time and not // just any microseconds because it is often used as an argument // to TimedWait() on condition variable FILETIME ftSystemTime; GetSystemTimePreciseAsFileTime_(&ftSystemTime); LARGE_INTEGER li; li.LowPart = ftSystemTime.dwLowDateTime; li.HighPart = ftSystemTime.dwHighDateTime; // Subtract unix epoch start li.QuadPart -= c_UnixEpochStartTicks; // Convert to microsecs li.QuadPart /= c_FtToMicroSec; return li.QuadPart; } using namespace std::chrono; return duration_cast(system_clock::now().time_since_epoch()).count(); } virtual uint64_t NowNanos() override { // all std::chrono clocks on windows have the same resolution that is only // good enough for microseconds but not nanoseconds // On Windows 8 and Windows 2012 Server // GetSystemTimePreciseAsFileTime(¤t_time) can be used LARGE_INTEGER li; QueryPerformanceCounter(&li); // Convert to nanoseconds first to avoid loss of precision // and divide by frequency li.QuadPart *= std::nano::den; li.QuadPart /= perf_counter_frequency_; return li.QuadPart; } virtual void SleepForMicroseconds(int micros) override { std::this_thread::sleep_for(std::chrono::microseconds(micros)); } virtual Status GetHostName(char* name, uint64_t len) override { Status s; DWORD nSize = static_cast( std::min(len, std::numeric_limits::max())); if (!::GetComputerNameA(name, &nSize)) { auto lastError = GetLastError(); s = IOErrorFromWindowsError("GetHostName", lastError); } else { name[nSize] = 0; } return s; } virtual Status GetCurrTime(int64_t* unix_time) { Status s; time_t ret = time(nullptr); if (ret == (time_t)-1) { *unix_time = 0; s = IOError("GetCurrTime", errno); } else { *unix_time = (int64_t)ret; } return s; } virtual Status GetAbsolutePath(const std::string& db_path, std::string* output_path) override { // Check if we already have an absolute path // that starts with non dot and has a semicolon in it if ((!db_path.empty() && (db_path[0] == '/' || db_path[0] == '\\')) || (db_path.size() > 2 && db_path[0] != '.' && ((db_path[1] == ':' && db_path[2] == '\\') || (db_path[1] == ':' && db_path[2] == '/')))) { *output_path = db_path; return Status::OK(); } std::string result; result.resize(_MAX_PATH); char* ret = _getcwd(&result[0], _MAX_PATH); if (ret == nullptr) { return Status::IOError("Failed to get current working directory", strerror(errno)); } result.resize(strlen(result.data())); result.swap(*output_path); return Status::OK(); } // Allow increasing the number of worker threads. virtual void SetBackgroundThreads(int num, Priority pri) override { assert(pri >= Priority::LOW && pri <= Priority::HIGH); thread_pools_[pri].SetBackgroundThreads(num); } virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { assert(pri >= Priority::LOW && pri <= Priority::HIGH); thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); } virtual std::string TimeToString(uint64_t secondsSince1970) override { std::string result; const time_t seconds = secondsSince1970; const int maxsize = 64; struct tm t; errno_t ret = localtime_s(&t, &seconds); if (ret) { result = std::to_string(seconds); } else { result.resize(maxsize); char* p = &result[0]; int len = snprintf(p, maxsize, "%04d/%02d/%02d-%02d:%02d:%02d ", t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec); assert(len > 0); result.resize(len); } return result; } EnvOptions OptimizeForLogWrite(const EnvOptions& env_options, const DBOptions& db_options) const override { EnvOptions optimized = env_options; optimized.use_mmap_writes = false; optimized.bytes_per_sync = db_options.wal_bytes_per_sync; optimized.use_os_buffer = true; // This is because we flush only whole pages on unbuffered io and // the last records are not guaranteed to be flushed. // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit // test and make this false optimized.fallocate_with_keep_size = true; return optimized; } EnvOptions OptimizeForManifestWrite( const EnvOptions& env_options) const override { EnvOptions optimized = env_options; optimized.use_mmap_writes = false; optimized.use_os_buffer = true; optimized.fallocate_with_keep_size = true; return optimized; } private: // Returns true iff the named directory exists and is a directory. virtual bool DirExists(const std::string& dname) { WIN32_FILE_ATTRIBUTE_DATA attrs; if (GetFileAttributesExA(dname.c_str(), GetFileExInfoStandard, &attrs)) { return 0 != (attrs.dwFileAttributes & FILE_ATTRIBUTE_DIRECTORY); } return false; } bool SupportsFastAllocate(const std::string& /* path */) { return false; } bool checkedDiskForMmap_; bool forceMmapOff; // do we override Env options? size_t page_size_; size_t allocation_granularity_; uint64_t perf_counter_frequency_; std::vector thread_pools_; mutable std::mutex mu_; std::vector threads_to_join_; FnGetSystemTimePreciseAsFileTime GetSystemTimePreciseAsFileTime_; }; WinEnv::WinEnv() : checkedDiskForMmap_(false), forceMmapOff(false), page_size_(4 * 1012), allocation_granularity_(page_size_), perf_counter_frequency_(0), thread_pools_(Priority::TOTAL), GetSystemTimePreciseAsFileTime_(NULL) { HMODULE module = GetModuleHandle("kernel32.dll"); if (module != NULL) { GetSystemTimePreciseAsFileTime_ = (FnGetSystemTimePreciseAsFileTime)GetProcAddress( module, "GetSystemTimePreciseAsFileTime"); } SYSTEM_INFO sinfo; GetSystemInfo(&sinfo); page_size_ = sinfo.dwPageSize; allocation_granularity_ = sinfo.dwAllocationGranularity; { LARGE_INTEGER qpf; BOOL ret = QueryPerformanceFrequency(&qpf); assert(ret == TRUE); perf_counter_frequency_ = qpf.QuadPart; } for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { thread_pools_[pool_id].SetThreadPriority( static_cast(pool_id)); // This allows later initializing the thread-local-env of each thread. thread_pools_[pool_id].SetHostEnv(this); } // Protected member of the base class thread_status_updater_ = CreateThreadStatusUpdater(); } void WinEnv::Schedule(void (*function)(void*), void* arg, Priority pri, void* tag, void (*unschedFunction)(void* arg)) { assert(pri >= Priority::LOW && pri <= Priority::HIGH); thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); } int WinEnv::UnSchedule(void* arg, Priority pri) { return thread_pools_[pri].UnSchedule(arg); } unsigned int WinEnv::GetThreadPoolQueueLen(Priority pri) const { assert(pri >= Priority::LOW && pri <= Priority::HIGH); return thread_pools_[pri].GetQueueLen(); } namespace { struct StartThreadState { void (*user_function)(void*); void* arg; }; } static void* StartThreadWrapper(void* arg) { std::unique_ptr state( reinterpret_cast(arg)); state->user_function(state->arg); return nullptr; } void WinEnv::StartThread(void (*function)(void* arg), void* arg) { StartThreadState* state = new StartThreadState; state->user_function = function; state->arg = arg; try { std::thread th(&StartThreadWrapper, state); std::lock_guard lg(mu_); threads_to_join_.push_back(std::move(th)); } catch (const std::system_error& ex) { WinthreadCall("start thread", ex.code()); } } void WinEnv::WaitForJoin() { for (auto& th : threads_to_join_) { th.join(); } threads_to_join_.clear(); } } // namespace std::string Env::GenerateUniqueId() { std::string result; UUID uuid; UuidCreateSequential(&uuid); RPC_CSTR rpc_str; auto status = UuidToStringA(&uuid, &rpc_str); assert(status == RPC_S_OK); result = reinterpret_cast(rpc_str); status = RpcStringFreeA(&rpc_str); assert(status == RPC_S_OK); return result; } // We choose to create this on the heap and using std::once for the following // reasons // 1) Currently available MS compiler does not implement atomic C++11 // initialization of // function local statics // 2) We choose not to destroy the env because joining the threads from the // system loader // which destroys the statics (same as from DLLMain) creates a system loader // dead-lock. // in this manner any remaining threads are terminated OK. namespace { std::once_flag winenv_once_flag; Env* envptr; }; Env* Env::Default() { std::call_once(winenv_once_flag, []() { envptr = new WinEnv(); }); return envptr; } } // namespace rocksdb