Env class that can randomly read and write

Summary: I have implemented basic simple use case that I need for External Value Store I'm working on. There is a potential for making this prettier by refactoring/combining WritableFile and RandomAccessFile, avoiding some copypasta. However, I decided to implement just the basic functionality, so I can continue working on the other diff.

Test Plan: Added a unittest

Reviewers: dhruba, haobo, kailiu

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13365
main
Igor Canadi 11 years ago
parent 7ac3c796f6
commit d0beadd456
  1. 65
      include/rocksdb/env.h
  2. 113
      util/env_posix.cc
  3. 19
      util/env_test.cc

@ -28,6 +28,7 @@ class RandomAccessFile;
class SequentialFile; class SequentialFile;
class Slice; class Slice;
class WritableFile; class WritableFile;
class RandomRWFile;
class Options; class Options;
using std::unique_ptr; using std::unique_ptr;
@ -109,6 +110,15 @@ class Env {
unique_ptr<WritableFile>* result, unique_ptr<WritableFile>* result,
const EnvOptions& options) = 0; const EnvOptions& options) = 0;
// Create an object that both reads and writes to a file on
// specified offsets (random access). If file already exists,
// does not overwrite it. On success, stores a pointer to the
// new file in *result and returns OK. On failure stores nullptr
// in *result and returns non-OK.
virtual Status NewRandomRWFile(const std::string& fname,
unique_ptr<RandomRWFile>* result,
const EnvOptions& options) = 0;
// Returns true iff the named file exists. // Returns true iff the named file exists.
virtual bool FileExists(const std::string& fname) = 0; virtual bool FileExists(const std::string& fname) = 0;
@ -329,7 +339,7 @@ class WritableFile {
/* /*
* Sync data and/or metadata as well. * Sync data and/or metadata as well.
* By default, sync only metadata. * By default, sync only data.
* Override this method for environments where we need to sync * Override this method for environments where we need to sync
* metadata as well. * metadata as well.
*/ */
@ -418,6 +428,55 @@ class WritableFile {
void operator=(const WritableFile&); void operator=(const WritableFile&);
}; };
// A file abstraction for random reading and writing.
class RandomRWFile {
public:
RandomRWFile() {}
virtual ~RandomRWFile() {}
// Write data from Slice data to file starting from offset
// Returns IOError on failure, but does not guarantee
// atomicity of a write. Returns OK status on success.
//
// Safe for concurrent use.
virtual Status Write(uint64_t offset, const Slice& data) = 0;
// Read up to "n" bytes from the file starting at "offset".
// "scratch[0..n-1]" may be written by this routine. Sets "*result"
// to the data that was read (including if fewer than "n" bytes were
// successfully read). May set "*result" to point at data in
// "scratch[0..n-1]", so "scratch[0..n-1]" must be live when
// "*result" is used. If an error was encountered, returns a non-OK
// status.
//
// Safe for concurrent use by multiple threads.
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const = 0;
virtual Status Close() = 0; // closes the file
virtual Status Sync() = 0; // sync data
/*
* Sync data and/or metadata as well.
* By default, sync only data.
* Override this method for environments where we need to sync
* metadata as well.
*/
virtual Status Fsync() {
return Sync();
}
/*
* Pre-allocate space for a file.
*/
virtual Status Allocate(off_t offset, off_t len) {
return Status::OK();
}
private:
// No copying allowed
RandomRWFile(const RandomRWFile&);
void operator=(const RandomRWFile&);
};
// An interface for writing log messages. // An interface for writing log messages.
class Logger { class Logger {
public: public:
@ -497,6 +556,10 @@ class EnvWrapper : public Env {
const EnvOptions& options) { const EnvOptions& options) {
return target_->NewWritableFile(f, r, options); return target_->NewWritableFile(f, r, options);
} }
Status NewRandomRWFile(const std::string& f, unique_ptr<RandomRWFile>* r,
const EnvOptions& options) {
return target_->NewRandomRWFile(f, r, options);
}
bool FileExists(const std::string& f) { return target_->FileExists(f); } bool FileExists(const std::string& f) { return target_->FileExists(f); }
Status GetChildren(const std::string& dir, std::vector<std::string>* r) { Status GetChildren(const std::string& dir, std::vector<std::string>* r) {
return target_->GetChildren(dir, r); return target_->GetChildren(dir, r);

@ -565,7 +565,7 @@ class PosixWritableFile : public WritableFile {
} }
virtual Status Append(const Slice& data) { virtual Status Append(const Slice& data) {
char* src = (char *)data.data(); const char* src = data.data();
size_t left = data.size(); size_t left = data.size();
Status s; Status s;
pending_sync_ = true; pending_sync_ = true;
@ -709,6 +709,98 @@ class PosixWritableFile : public WritableFile {
#endif #endif
}; };
class PosixRandomRWFile : public RandomRWFile {
private:
const std::string filename_;
int fd_;
bool pending_sync_;
bool pending_fsync_;
public:
PosixRandomRWFile(const std::string& fname, int fd,
const EnvOptions& options) :
filename_(fname),
fd_(fd),
pending_sync_(false),
pending_fsync_(false) {
assert(!options.use_mmap_writes && !options.use_mmap_reads);
}
~PosixRandomRWFile() {
if (fd_ >= 0) {
Close();
}
}
virtual Status Write(uint64_t offset, const Slice& data) {
const char* src = data.data();
size_t left = data.size();
Status s;
pending_sync_ = true;
pending_fsync_ = true;
while (left != 0) {
ssize_t done = pwrite(fd_, src, left, offset);
if (done < 0) {
return IOError(filename_, errno);
}
left -= done;
src += done;
offset += done;
}
return Status::OK();
}
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
Status s;
ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
*result = Slice(scratch, (r < 0) ? 0 : r);
if (r < 0) {
s = IOError(filename_, errno);
}
return s;
}
virtual Status Close() {
Status s = Status::OK();
if (fd_ >= 0 && close(fd_) < 0) {
s = IOError(filename_, errno);
}
fd_ = -1;
return s;
}
virtual Status Sync() {
if (pending_sync_ && fdatasync(fd_) < 0) {
return IOError(filename_, errno);
}
pending_sync_ = false;
return Status::OK();
}
virtual Status Fsync() {
if (pending_fsync_ && fsync(fd_) < 0) {
return IOError(filename_, errno);
}
pending_fsync_ = false;
pending_sync_ = false;
return Status::OK();
}
#ifdef OS_LINUX
virtual Status Allocate(off_t offset, off_t len) {
if (!fallocate(fd_, FALLOC_FL_KEEP_SIZE, offset, len)) {
return Status::OK();
} else {
return IOError(filename_, errno);
}
}
#endif
};
static int LockOrUnlock(const std::string& fname, int fd, bool lock) { static int LockOrUnlock(const std::string& fname, int fd, bool lock) {
mutex_lockedFiles.Lock(); mutex_lockedFiles.Lock();
if (lock) { if (lock) {
@ -855,6 +947,25 @@ class PosixEnv : public Env {
return s; return s;
} }
virtual Status NewRandomRWFile(const std::string& fname,
unique_ptr<RandomRWFile>* result,
const EnvOptions& options) {
result->reset();
Status s;
const int fd = open(fname.c_str(), O_CREAT | O_RDWR, 0644);
if (fd < 0) {
s = IOError(fname, errno);
} else {
SetFD_CLOEXEC(fd, &options);
// no support for mmap yet
if (options.use_mmap_writes || options.use_mmap_reads) {
return Status::NotSupported("No support for mmap read/write yet");
}
result->reset(new PosixRandomRWFile(fname, fd, options));
}
return s;
}
virtual bool FileExists(const std::string& fname) { virtual bool FileExists(const std::string& fname) {
return access(fname.c_str(), F_OK) == 0; return access(fname.c_str(), F_OK) == 0;
} }

@ -357,6 +357,25 @@ TEST(EnvPosixTest, InvalidateCache) {
ASSERT_OK(env_->DeleteFile(fname)); ASSERT_OK(env_->DeleteFile(fname));
} }
TEST(EnvPosixTest, PosixRandomRWFileTest) {
EnvOptions soptions;
soptions.use_mmap_writes = soptions.use_mmap_reads = false;
std::string fname = test::TmpDir() + "/" + "testfile";
unique_ptr<RandomRWFile> file;
ASSERT_OK(env_->NewRandomRWFile(fname, &file, soptions));
ASSERT_OK(file.get()->Allocate(0, 10*1024*1024));
ASSERT_OK(file.get()->Write(100, Slice("Hello world")));
ASSERT_OK(file.get()->Write(105, Slice("Hello world")));
ASSERT_OK(file.get()->Sync());
ASSERT_OK(file.get()->Fsync());
char scratch[100];
Slice result;
ASSERT_OK(file.get()->Read(100, 16, &result, scratch));
ASSERT_EQ(result.compare("HelloHello world"), 0);
ASSERT_OK(file.get()->Close());
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save