diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 2d33da27c..fa2d175d1 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -40,6 +40,7 @@ class RandomAccessFile; class SequentialFile; class Slice; class WritableFile; +class RandomRWFile; class Directory; struct DBOptions; class RateLimiter; @@ -168,6 +169,17 @@ class Env { unique_ptr* result, const EnvOptions& options); + // Open `fname` for random read and write, if file dont exist the file + // will be created. On success, stores a pointer to the new file in + // *result and returns OK. On failure returns non-OK. + // + // The returned file will only be accessed by one thread at a time. + virtual Status NewRandomRWFile(const std::string& fname, + unique_ptr* result, + const EnvOptions& options) { + return Status::NotSupported("RandomRWFile is not implemented in this Env"); + } + // Create an object that represents a directory. Will fail if directory // doesn't exist. If the directory exists, it will open the directory // and create a new Directory object. @@ -646,6 +658,35 @@ class WritableFile { Env::IOPriority io_priority_; }; +// A file abstraction for random reading and writing. +class RandomRWFile { + public: + RandomRWFile() {} + virtual ~RandomRWFile() {} + + // Write bytes in `data` at offset `offset`, Returns Status::OK() on success. + virtual Status Write(uint64_t offset, const Slice& data) = 0; + + // Read up to `n` bytes starting from offset `offset` and store them in + // result, provided `scratch` size should be at least `n`. + // Returns Status::OK() on success. + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const = 0; + + virtual Status Flush() = 0; + + virtual Status Sync() = 0; + + virtual Status Fsync() { return Sync(); } + + virtual Status Close() = 0; + + private: + // No copying allowed + RandomRWFile(const RandomRWFile&) = delete; + RandomRWFile& operator=(const RandomRWFile&) = delete; +}; + // Directory object represents collection of files and implements // filesystem operations that can be executed on directories. class Directory { @@ -801,6 +842,11 @@ class EnvWrapper : public Env { const EnvOptions& options) override { return target_->ReuseWritableFile(fname, old_fname, r, options); } + Status NewRandomRWFile(const std::string& fname, + unique_ptr* result, + const EnvOptions& options) override { + return target_->NewRandomRWFile(fname, result, options); + } virtual Status NewDirectory(const std::string& name, unique_ptr* result) override { return target_->NewDirectory(name, result); diff --git a/util/env_chroot.cc b/util/env_chroot.cc index 37939821e..3d3e258c5 100644 --- a/util/env_chroot.cc +++ b/util/env_chroot.cc @@ -81,6 +81,17 @@ class ChrootEnv : public EnvWrapper { options); } + virtual Status NewRandomRWFile(const std::string& fname, + unique_ptr* result, + const EnvOptions& options) override { + auto status_and_enc_path = EncodePathWithNewBasename(fname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::NewRandomRWFile(status_and_enc_path.second, result, + options); + } + virtual Status NewDirectory(const std::string& dir, unique_ptr* result) override { auto status_and_enc_path = EncodePathWithNewBasename(dir); diff --git a/util/env_posix.cc b/util/env_posix.cc index 692c2481f..056ccc855 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -345,6 +345,27 @@ class PosixEnv : public Env { return s; } + virtual Status NewRandomRWFile(const std::string& fname, + unique_ptr* result, + const EnvOptions& options) override { + int fd = -1; + while (fd < 0) { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), O_CREAT | O_RDWR, 0644); + if (fd < 0) { + // Error while opening the file + if (errno == EINTR) { + continue; + } + return IOError(fname, errno); + } + } + + SetFD_CLOEXEC(fd, &options); + result->reset(new PosixRandomRWFile(fname, fd, options)); + return Status::OK(); + } + virtual Status NewDirectory(const std::string& name, unique_ptr* result) override { result->reset(); diff --git a/util/env_test.cc b/util/env_test.cc index 56464848b..9fe884f6a 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -1186,6 +1186,158 @@ TEST_P(EnvPosixTestWithParam, WritableFileWrapper) { EXPECT_EQ(14, step); } +TEST_P(EnvPosixTestWithParam, PosixRandomRWFile) { + const std::string path = test::TmpDir(env_) + "/random_rw_file"; + + env_->DeleteFile(path); + + std::unique_ptr file; + ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions())); + + char buf[10000]; + Slice read_res; + + ASSERT_OK(file->Write(0, "ABCD")); + ASSERT_OK(file->Read(0, 10, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ABCD"); + + ASSERT_OK(file->Write(2, "XXXX")); + ASSERT_OK(file->Read(0, 10, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ABXXXX"); + + ASSERT_OK(file->Write(10, "ZZZ")); + ASSERT_OK(file->Read(10, 10, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ZZZ"); + + ASSERT_OK(file->Write(11, "Y")); + ASSERT_OK(file->Read(10, 10, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ZYZ"); + + ASSERT_OK(file->Write(200, "FFFFF")); + ASSERT_OK(file->Read(200, 10, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "FFFFF"); + + ASSERT_OK(file->Write(205, "XXXX")); + ASSERT_OK(file->Read(200, 10, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "FFFFFXXXX"); + + ASSERT_OK(file->Write(5, "QQQQ")); + ASSERT_OK(file->Read(0, 9, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ABXXXQQQQ"); + + ASSERT_OK(file->Read(2, 4, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "XXXQ"); + + // Close file and reopen it + file->Close(); + ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions())); + + ASSERT_OK(file->Read(0, 9, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ABXXXQQQQ"); + + ASSERT_OK(file->Read(10, 3, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ZYZ"); + + ASSERT_OK(file->Read(200, 9, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "FFFFFXXXX"); + + ASSERT_OK(file->Write(4, "TTTTTTTTTTTTTTTT")); + ASSERT_OK(file->Read(0, 10, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ABXXTTTTTT"); + + // Clean up + env_->DeleteFile(path); +} + +class RandomRWFileWithMirrorString { + public: + explicit RandomRWFileWithMirrorString(RandomRWFile* _file) : file_(_file) {} + + void Write(size_t offset, const std::string& data) { + // Write to mirror string + StringWrite(offset, data); + + // Write to file + Status s = file_->Write(offset, data); + ASSERT_OK(s) << s.ToString(); + } + + void Read(size_t offset = 0, size_t n = 1000000) { + Slice str_res(nullptr, 0); + if (offset < file_mirror_.size()) { + size_t str_res_sz = std::min(file_mirror_.size() - offset, n); + str_res = Slice(file_mirror_.data() + offset, str_res_sz); + StopSliceAtNull(&str_res); + } + + Slice file_res; + Status s = file_->Read(offset, n, &file_res, buf_); + ASSERT_OK(s) << s.ToString(); + StopSliceAtNull(&file_res); + + ASSERT_EQ(str_res.ToString(), file_res.ToString()) << offset << " " << n; + } + + void SetFile(RandomRWFile* _file) { file_ = _file; } + + private: + void StringWrite(size_t offset, const std::string& src) { + if (offset + src.size() > file_mirror_.size()) { + file_mirror_.resize(offset + src.size(), '\0'); + } + + char* pos = const_cast(file_mirror_.data() + offset); + memcpy(pos, src.data(), src.size()); + } + + void StopSliceAtNull(Slice* slc) { + for (size_t i = 0; i < slc->size(); i++) { + if ((*slc)[i] == '\0') { + *slc = Slice(slc->data(), i); + break; + } + } + } + + char buf_[10000]; + RandomRWFile* file_; + std::string file_mirror_; +}; + +TEST_P(EnvPosixTestWithParam, PosixRandomRWFileRandomized) { + const std::string path = test::TmpDir(env_) + "/random_rw_file_rand"; + env_->DeleteFile(path); + + unique_ptr file; + ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions())); + RandomRWFileWithMirrorString file_with_mirror(file.get()); + + Random rnd(301); + std::string buf; + for (int i = 0; i < 10000; i++) { + // Genrate random data + test::RandomString(&rnd, 10, &buf); + + // Pick random offset for write + size_t write_off = rnd.Next() % 1000; + file_with_mirror.Write(write_off, buf); + + // Pick random offset for read + size_t read_off = rnd.Next() % 1000; + size_t read_sz = rnd.Next() % 20; + file_with_mirror.Read(read_off, read_sz); + + if (i % 500 == 0) { + // Reopen the file every 500 iters + ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions())); + file_with_mirror.SetFile(file.get()); + } + } + + // clean up + env_->DeleteFile(path); +} + INSTANTIATE_TEST_CASE_P(DefaultEnv, EnvPosixTestWithParam, ::testing::Values(Env::Default())); #if !defined(ROCKSDB_LITE) && !defined(OS_WIN) diff --git a/util/io_posix.cc b/util/io_posix.cc index 913026a4d..7892397a0 100644 --- a/util/io_posix.cc +++ b/util/io_posix.cc @@ -823,6 +823,95 @@ Status PosixDirectIOWritableFile::PositionedAppend(const Slice& data, return PosixWritableFile::PositionedAppend(data, offset); } +/* + * PosixRandomRWFile + */ + +PosixRandomRWFile::PosixRandomRWFile(const std::string& fname, int fd, + const EnvOptions& options) + : filename_(fname), fd_(fd) {} + +PosixRandomRWFile::~PosixRandomRWFile() { + if (fd_ >= 0) { + Close(); + } +} + +Status PosixRandomRWFile::Write(uint64_t offset, const Slice& data) { + const char* src = data.data(); + size_t left = data.size(); + while (left != 0) { + ssize_t done = pwrite(fd_, src, left, offset); + if (done < 0) { + // error while writing to file + if (errno == EINTR) { + // write was interrupted, try again. + continue; + } + return IOError(filename_, errno); + } + + // Wrote `done` bytes + left -= done; + offset += done; + src += done; + } + + return Status::OK(); +} + +Status PosixRandomRWFile::Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + size_t left = n; + char* ptr = scratch; + while (left > 0) { + ssize_t done = pread(fd_, ptr, left, offset); + if (done < 0) { + // error while reading from file + if (errno == EINTR) { + // read was interrupted, try again. + continue; + } + return IOError(filename_, errno); + } else if (done == 0) { + // Nothing more to read + break; + } + + // Read `done` bytes + ptr += done; + offset += done; + left -= done; + } + + *result = Slice(scratch, n - left); + return Status::OK(); +} + +Status PosixRandomRWFile::Flush() { return Status::OK(); } + +Status PosixRandomRWFile::Sync() { + if (fdatasync(fd_) < 0) { + return IOError(filename_, errno); + } + return Status::OK(); +} + +Status PosixRandomRWFile::Fsync() { + if (fsync(fd_) < 0) { + return IOError(filename_, errno); + } + return Status::OK(); +} + +Status PosixRandomRWFile::Close() { + if (close(fd_) < 0) { + return IOError(filename_, errno); + } + fd_ = -1; + return Status::OK(); +} + /* * PosixDirectory */ diff --git a/util/io_posix.h b/util/io_posix.h index ab9a8f796..bc7f0c01c 100644 --- a/util/io_posix.h +++ b/util/io_posix.h @@ -7,9 +7,10 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include #include #include -#include +#include #include "rocksdb/env.h" // For non linux platform, the following macros are used only as place @@ -218,6 +219,27 @@ class PosixMmapFile : public WritableFile { #endif }; +class PosixRandomRWFile : public RandomRWFile { + public: + explicit PosixRandomRWFile(const std::string& fname, int fd, + const EnvOptions& options); + virtual ~PosixRandomRWFile(); + + virtual Status Write(uint64_t offset, const Slice& data) override; + + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override; + + virtual Status Flush() override; + virtual Status Sync() override; + virtual Status Fsync() override; + virtual Status Close() override; + + private: + const std::string filename_; + int fd_; +}; + class PosixDirectory : public Directory { public: explicit PosixDirectory(int fd) : fd_(fd) {}