diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 5f2f1ae3d..66ce4668d 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -60,7 +60,7 @@ TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) { ASSERT_FALSE(flags[sequence].test_and_set()); } }; - std::vector threads; + std::vector threads; for (size_t i = 0; i < kThreads; i++) { threads.emplace_back(writer, i); } diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 98e4b816a..49c981433 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -173,8 +173,7 @@ class Env { virtual Status ReopenWritableFile(const std::string& fname, unique_ptr* result, const EnvOptions& options) { - Status s; - return s; + return Status::NotSupported(); } // Reuse an existing file by renaming it and opening it as writable. diff --git a/port/win/env_win.cc b/port/win/env_win.cc index 0b51d72f0..63484d369 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -238,9 +238,11 @@ Status WinEnvIO::NewRandomAccessFile(const std::string& fname, return s; } -Status WinEnvIO::NewWritableFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options) { +Status WinEnvIO::OpenWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options, + bool reopen) { + const size_t c_BufferCapacity = 64 * 1024; EnvOptions local_options(options); @@ -264,12 +266,19 @@ Status WinEnvIO::NewWritableFile(const std::string& fname, if (local_options.use_mmap_writes) { desired_access |= GENERIC_READ; - } else { + } + else { // Adding this solely for tests to pass (fault_injection_test, // wal_manager_test). shared_mode |= (FILE_SHARE_WRITE | FILE_SHARE_DELETE); } + // This will always truncate the file + DWORD creation_disposition = CREATE_ALWAYS; + if (reopen) { + creation_disposition = OPEN_ALWAYS; + } + HANDLE hFile = 0; { IOSTATS_TIMER_GUARD(open_nanos); @@ -278,7 +287,7 @@ Status WinEnvIO::NewWritableFile(const std::string& fname, desired_access, // Access desired shared_mode, NULL, // Security attributes - CREATE_ALWAYS, // Posix env says O_CREAT | O_RDWR | O_TRUNC + creation_disposition, // Posix env says (reopen) ? (O_CREATE | O_APPEND) : O_CREAT | O_TRUNC fileFlags, // Flags NULL); // Template File } @@ -289,6 +298,18 @@ Status WinEnvIO::NewWritableFile(const std::string& fname, "Failed to create a NewWriteableFile: " + fname, lastError); } + // We will start writing at the end, appending + if (reopen) { + LARGE_INTEGER zero_move; + zero_move.QuadPart = 0; + BOOL ret = SetFilePointerEx(hFile, zero_move, NULL, FILE_END); + if (!ret) { + auto lastError = GetLastError(); + return IOErrorFromWindowsError( + "Failed to create a ReopenWritableFile move to the end: " + fname, lastError); + } + } + if (options.use_mmap_writes) { // We usually do not use mmmapping on SSD and thus we pass memory // page_size @@ -304,7 +325,7 @@ Status WinEnvIO::NewWritableFile(const std::string& fname, } Status WinEnvIO::NewRandomRWFile(const std::string & fname, - unique_ptr* result, const EnvOptions & options) { + std::unique_ptr* result, const EnvOptions & options) { Status s; @@ -933,7 +954,12 @@ Status WinEnv::NewRandomAccessFile(const std::string& fname, Status WinEnv::NewWritableFile(const std::string& fname, std::unique_ptr* result, const EnvOptions& options) { - return winenv_io_.NewWritableFile(fname, result, options); + return winenv_io_.OpenWritableFile(fname, result, options, false); +} + +Status WinEnv::ReopenWritableFile(const std::string& fname, + std::unique_ptr* result, const EnvOptions& options) { + return winenv_io_.OpenWritableFile(fname, result, options, true); } Status WinEnv::NewRandomRWFile(const std::string & fname, diff --git a/port/win/env_win.h b/port/win/env_win.h index e2a93d5ec..0b42b5e6d 100644 --- a/port/win/env_win.h +++ b/port/win/env_win.h @@ -95,14 +95,16 @@ public: std::unique_ptr* result, const EnvOptions& options); + // Helper for NewWritable and ReopenWritableFile + virtual Status OpenWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options, + bool reopen); + virtual Status NewRandomAccessFile(const std::string& fname, std::unique_ptr* result, const EnvOptions& options); - virtual Status NewWritableFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options); - // The returned file will only be accessed by one thread at a time. virtual Status NewRandomRWFile(const std::string& fname, unique_ptr* result, @@ -204,6 +206,17 @@ public: std::unique_ptr* result, const EnvOptions& options) override; + // Create an object that writes to a new file with the specified + // name. Deletes any existing file with the same name and creates a + // new file. On success, stores a pointer to the new file in + // *result and returns OK. On failure stores nullptr in *result and + // returns non-OK. + // + // The returned file will only be accessed by one thread at a time. + Status ReopenWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + // The returned file will only be accessed by one thread at a time. Status NewRandomRWFile(const std::string& fname, unique_ptr* result, diff --git a/port/win/io_win.cc b/port/win/io_win.cc index bf5d18327..621d01f30 100644 --- a/port/win/io_win.cc +++ b/port/win/io_win.cc @@ -641,6 +641,7 @@ Status WinSequentialFile::InvalidateCache(size_t offset, size_t length) { ////////////////////////////////////////////////////////////////////////////////////////////////// /// WinRandomAccessBase +inline SSIZE_T WinRandomAccessImpl::PositionedReadInternal(char* src, size_t numBytes, uint64_t offset) const { @@ -733,13 +734,31 @@ Status WinWritableImpl::PreallocateInternal(uint64_t spaceToReserve) { return fallocate(file_data_->GetName(), file_data_->GetFileHandle(), spaceToReserve); } +inline WinWritableImpl::WinWritableImpl(WinFileData* file_data, size_t alignment) : file_data_(file_data), alignment_(alignment), - filesize_(0), + next_write_offset_(0), reservedsize_(0) { + + // Query current position in case ReopenWritableFile is called + // This position is only important for buffered writes + // for unbuffered writes we explicitely specify the position. + LARGE_INTEGER zero_move; + zero_move.QuadPart = 0; // Do not move + LARGE_INTEGER pos; + pos.QuadPart = 0; + BOOL ret = SetFilePointerEx(file_data_->GetFileHandle(), zero_move, &pos, + FILE_CURRENT); + // Querying no supped to fail + if (ret) { + next_write_offset_ = pos.QuadPart; + } else { + assert(false); + } } +inline Status WinWritableImpl::AppendImpl(const Slice& data) { Status s; @@ -754,12 +773,12 @@ Status WinWritableImpl::AppendImpl(const Slice& data) { // With no offset specified we are appending // to the end of the file - assert(IsSectorAligned(filesize_)); + assert(IsSectorAligned(next_write_offset_)); assert(IsSectorAligned(data.size())); assert(IsAligned(GetAlignement(), data.data())); SSIZE_T ret = pwrite(file_data_->GetFileHandle(), data.data(), - data.size(), filesize_); + data.size(), next_write_offset_); if (ret < 0) { auto lastError = GetLastError(); @@ -787,12 +806,13 @@ Status WinWritableImpl::AppendImpl(const Slice& data) { if(s.ok()) { assert(written == data.size()); - filesize_ += data.size(); + next_write_offset_ += data.size(); } return s; } +inline Status WinWritableImpl::PositionedAppendImpl(const Slice& data, uint64_t offset) { if(file_data_->use_direct_io()) { @@ -816,8 +836,8 @@ Status WinWritableImpl::PositionedAppendImpl(const Slice& data, uint64_t offset) // For sequential write this would be simple // size extension by data.size() uint64_t write_end = offset + data.size(); - if (write_end >= filesize_) { - filesize_ = write_end; + if (write_end >= next_write_offset_) { + next_write_offset_ = write_end; } } return s; @@ -830,11 +850,12 @@ Status WinWritableImpl::TruncateImpl(uint64_t size) { Status s = ftruncate(file_data_->GetName(), file_data_->GetFileHandle(), size); if (s.ok()) { - filesize_ = size; + next_write_offset_ = size; } return s; } +inline Status WinWritableImpl::CloseImpl() { Status s; @@ -857,6 +878,7 @@ Status WinWritableImpl::CloseImpl() { return s; } +inline Status WinWritableImpl::SyncImpl() { Status s; // Calls flush buffers @@ -869,6 +891,7 @@ Status WinWritableImpl::SyncImpl() { } +inline Status WinWritableImpl::AllocateImpl(uint64_t offset, uint64_t len) { Status status; TEST_KILL_RANDOM("WinWritableFile::Allocate", rocksdb_kill_odds); @@ -943,7 +966,7 @@ Status WinWritableFile::Sync() { Status WinWritableFile::Fsync() { return SyncImpl(); } uint64_t WinWritableFile::GetFileSize() { - return GetFileSizeImpl(); + return GetFileNextWriteOffset(); } Status WinWritableFile::Allocate(uint64_t offset, uint64_t len) { diff --git a/port/win/io_win.h b/port/win/io_win.h index 2b6a5fdff..e050593ef 100644 --- a/port/win/io_win.h +++ b/port/win/io_win.h @@ -301,7 +301,7 @@ class WinWritableImpl { protected: WinFileData* file_data_; const uint64_t alignment_; - uint64_t filesize_; // How much data is actually written disk + uint64_t next_write_offset_; // Needed because Windows does not support O_APPEND uint64_t reservedsize_; // how far we have reserved space virtual Status PreallocateInternal(uint64_t spaceToReserve); @@ -324,14 +324,14 @@ class WinWritableImpl { Status SyncImpl(); - uint64_t GetFileSizeImpl() { + uint64_t GetFileNextWriteOffset() { // Double accounting now here with WritableFileWriter // and this size will be wrong when unbuffered access is used // but tests implement their own writable files and do not use // WritableFileWrapper // so we need to squeeze a square peg through // a round hole here. - return filesize_; + return next_write_offset_; } Status AllocateImpl(uint64_t offset, uint64_t len); diff --git a/util/timer_queue.h b/util/timer_queue.h index 72b44dc2d..cc0611921 100644 --- a/util/timer_queue.h +++ b/util/timer_queue.h @@ -21,6 +21,9 @@ // work, even for commercial purposes, all without asking permission. #pragma once + +#include "port/port.h" + #include #include #include @@ -213,5 +216,5 @@ class TimerQueue { public: std::vector& getContainer() { return this->c; } } m_items; - std::thread m_th; + rocksdb::port::Thread m_th; }; diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index ca2f2288b..3ce68e8e0 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -230,11 +230,11 @@ TEST_F(BlobDBTest, Compression) { TEST_F(BlobDBTest, DISABLED_MultipleWriters) { Open(); - std::vector workers; + std::vector workers; for (size_t ii = 0; ii < 10; ii++) - workers.push_back(std::thread(&BlobDBTest::InsertBlobs, this)); + workers.push_back(port::Thread(&BlobDBTest::InsertBlobs, this)); - for (std::thread &t : workers) { + for (auto& t : workers) { if (t.joinable()) { t.join(); }