diff --git a/db/db_bench.cc b/db/db_bench.cc index 476b0c856..20dc4235b 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -178,6 +178,7 @@ static int FLAGS_stats_interval = 0; extern bool useOsBuffer; extern bool useFsReadAhead; extern bool useMmapRead; +extern bool useMmapWrite; namespace leveldb { @@ -1217,6 +1218,9 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--mmap_read=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { useMmapRead = n; + } else if (sscanf(argv[i], "--mmap_write=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + useMmapWrite = n; } else if (sscanf(argv[i], "--readahead=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { useFsReadAhead = n; diff --git a/util/env_posix.cc b/util/env_posix.cc index ddb9611fd..6b5feb284 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -28,7 +28,8 @@ bool useOsBuffer = 1; // cache data in OS buffers bool useFsReadAhead = 1; // allow filesystem to do readaheads -bool useMmapRead = 0; +bool useMmapRead = 0; // do not use mmaps for reading files +bool useMmapWrite = 1; // use mmaps for appending to files namespace leveldb { @@ -331,6 +332,131 @@ class PosixMmapFile : public WritableFile { } }; +// Use posix write to write data to a file. +class PosixWritableFile : public WritableFile { + private: + const std::string filename_; + int fd_; + size_t cursize_; // current size of cached data in buf_ + size_t capacity_; // max size of buf_ + char* buf_; // a buffer to cache writes + uint64_t filesize_; + bool pending_sync_; + bool pending_fsync_; + + public: + PosixWritableFile(const std::string& fname, int fd, size_t capacity) : + filename_(fname), + fd_(fd), + cursize_(0), + capacity_(capacity), + buf_(new char[capacity]), + filesize_(0), + pending_sync_(false), + pending_fsync_(false) { + } + + ~PosixWritableFile() { + if (fd_ >= 0) { + PosixWritableFile::Close(); + } + delete buf_; + buf_ = 0; + } + + virtual Status Append(const Slice& data) { + char* src = (char *)data.data(); + size_t left = data.size(); + Status s; + pending_sync_ = true; + pending_fsync_ = true; + + // if there is no space in the cache, then flush + if (cursize_ + left > capacity_) { + s = Flush(); + if (!s.ok()) { + return s; + } + // Increase the buffer size, but capped at 1MB + if (capacity_ < (1<<20)) { + delete buf_; + capacity_ *= 2; + buf_ = new char[capacity_]; + } + assert(cursize_ == 0); + } + + // if the write fits into the cache, then write to cache + // otherwise do a write() syscall to write to OS buffers. + if (cursize_ + left <= capacity_) { + memcpy(buf_+cursize_, src, left); + cursize_ += left; + } else { + while (left != 0) { + size_t done = write(fd_, src, left); + if (done < 0) { + return IOError(filename_, errno); + } + left -= done; + src += done; + } + } + filesize_ += data.size(); + return Status::OK(); + } + + virtual Status Close() { + Status s; + s = Flush(); // flush cache to OS + if (!s.ok()) { + } + if (close(fd_) < 0) { + if (s.ok()) { + s = IOError(filename_, errno); + } + } + fd_ = -1; + return s; + } + + // write out the cached data to the OS cache + virtual Status Flush() { + size_t left = cursize_; + char* src = buf_; + while (left != 0) { + size_t done = write(fd_, src, left); + if (done < 0) { + return IOError(filename_, errno); + } + left -= done; + src += done; + } + cursize_ = 0; + return Status::OK(); + } + + virtual Status Sync() { + if (pending_sync_ && fdatasync(fd_) < 0) { + return IOError(filename_, errno); + } + pending_sync_ = false; + return Status::OK(); + } + + virtual Status Fsync() { + if (pending_fsync_ && fsync(fd_) < 0) { + return IOError(filename_, errno); + } + pending_fsync_ = false; + pending_sync_ = false; + return Status::OK(); + } + + virtual uint64_t GetFileSize() { + return filesize_; + } +}; + static int LockOrUnlock(const std::string& fname, int fd, bool lock) { mutex_lockedFiles.Lock(); if (lock) { @@ -431,7 +557,11 @@ class PosixEnv : public Env { *result = NULL; s = IOError(fname, errno); } else { - *result = new PosixMmapFile(fname, fd, page_size_); + if (useMmapWrite) { + *result = new PosixMmapFile(fname, fd, page_size_); + } else { + *result = new PosixWritableFile(fname, fd, 65536); + } } return s; }