You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rocksdb/utilities/env_mirror.cc

276 lines
8.3 KiB

// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// Copyright (c) 2015, Red Hat, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef ROCKSDB_LITE
#include "rocksdb/utilities/env_mirror.h"
namespace ROCKSDB_NAMESPACE {
// An implementation of Env that mirrors all work over two backend
// Env's. This is useful for debugging purposes.
class SequentialFileMirror : public SequentialFile {
public:
std::unique_ptr<SequentialFile> a_, b_;
std::string fname;
explicit SequentialFileMirror(std::string f) : fname(f) {}
Status Read(size_t n, Slice* result, char* scratch) override {
Slice aslice;
Status as = a_->Read(n, &aslice, scratch);
if (as == Status::OK()) {
char* bscratch = new char[n];
Slice bslice;
#ifndef NDEBUG
size_t off = 0;
#endif
size_t left = aslice.size();
while (left) {
Status bs = b_->Read(left, &bslice, bscratch);
#ifndef NDEBUG
assert(as == bs);
assert(memcmp(bscratch, scratch + off, bslice.size()) == 0);
off += bslice.size();
#endif
left -= bslice.size();
}
delete[] bscratch;
*result = aslice;
} else {
Status bs = b_->Read(n, result, scratch);
assert(as == bs);
}
return as;
}
Status Skip(uint64_t n) override {
Status as = a_->Skip(n);
Status bs = b_->Skip(n);
assert(as == bs);
return as;
}
Status InvalidateCache(size_t offset, size_t length) override {
Status as = a_->InvalidateCache(offset, length);
Status bs = b_->InvalidateCache(offset, length);
assert(as == bs);
return as;
};
};
class RandomAccessFileMirror : public RandomAccessFile {
public:
std::unique_ptr<RandomAccessFile> a_, b_;
std::string fname;
explicit RandomAccessFileMirror(std::string f) : fname(f) {}
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override {
Status as = a_->Read(offset, n, result, scratch);
if (as == Status::OK()) {
char* bscratch = new char[n];
Slice bslice;
size_t off = 0;
size_t left = result->size();
while (left) {
Status bs = b_->Read(offset + off, left, &bslice, bscratch);
assert(as == bs);
assert(memcmp(bscratch, scratch + off, bslice.size()) == 0);
off += bslice.size();
left -= bslice.size();
}
delete[] bscratch;
} else {
Status bs = b_->Read(offset, n, result, scratch);
assert(as == bs);
}
return as;
}
size_t GetUniqueId(char* id, size_t max_size) const override {
// NOTE: not verified
return a_->GetUniqueId(id, max_size);
}
};
class WritableFileMirror : public WritableFile {
public:
std::unique_ptr<WritableFile> a_, b_;
std::string fname;
Optionally wait on bytes_per_sync to smooth I/O (#5183) Summary: The existing implementation does not guarantee bytes reach disk every `bytes_per_sync` when writing SST files, or every `wal_bytes_per_sync` when writing WALs. This can cause confusing behavior for users who enable this feature to avoid large syncs during flush and compaction, but then end up hitting them anyways. My understanding of the existing behavior is we used `sync_file_range` with `SYNC_FILE_RANGE_WRITE` to submit ranges for async writeback, such that we could continue processing the next range of bytes while that I/O is happening. I believe we can preserve that benefit while also limiting how far the processing can get ahead of the I/O, which prevents huge syncs from happening when the file finishes. Consider this `sync_file_range` usage: `sync_file_range(fd_, 0, static_cast<off_t>(offset + nbytes), SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE)`. Expanding the range to start at 0 and adding the `SYNC_FILE_RANGE_WAIT_BEFORE` flag causes any pending writeback (like from a previous call to `sync_file_range`) to finish before it proceeds to submit the latest `nbytes` for writeback. The latest `nbytes` are still written back asynchronously, unless processing exceeds I/O speed, in which case the following `sync_file_range` will need to wait on it. There is a second change in this PR to use `fdatasync` when `sync_file_range` is unavailable (determined statically) or has some known problem with the underlying filesystem (determined dynamically). The above two changes only apply when the user enables a new option, `strict_bytes_per_sync`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5183 Differential Revision: D14953553 Pulled By: siying fbshipit-source-id: 445c3862e019fb7b470f9c7f314fc231b62706e9
5 years ago
explicit WritableFileMirror(std::string f, const EnvOptions& options)
: WritableFile(options), fname(f) {}
Status Append(const Slice& data) override {
Status as = a_->Append(data);
Status bs = b_->Append(data);
assert(as == bs);
return as;
}
Status Append(const Slice& data,
const DataVerificationInfo& /* verification_info */) override {
return Append(data);
}
Status PositionedAppend(const Slice& data, uint64_t offset) override {
Status as = a_->PositionedAppend(data, offset);
Status bs = b_->PositionedAppend(data, offset);
assert(as == bs);
return as;
}
Status PositionedAppend(
const Slice& data, uint64_t offset,
const DataVerificationInfo& /* verification_info */) override {
return PositionedAppend(data, offset);
}
Status Truncate(uint64_t size) override {
Status as = a_->Truncate(size);
Status bs = b_->Truncate(size);
assert(as == bs);
return as;
}
Status Close() override {
Status as = a_->Close();
Status bs = b_->Close();
assert(as == bs);
return as;
}
Status Flush() override {
Status as = a_->Flush();
Status bs = b_->Flush();
assert(as == bs);
return as;
}
Status Sync() override {
Status as = a_->Sync();
Status bs = b_->Sync();
assert(as == bs);
return as;
}
Status Fsync() override {
Status as = a_->Fsync();
Status bs = b_->Fsync();
assert(as == bs);
return as;
}
bool IsSyncThreadSafe() const override {
bool as = a_->IsSyncThreadSafe();
assert(as == b_->IsSyncThreadSafe());
return as;
}
void SetIOPriority(Env::IOPriority pri) override {
a_->SetIOPriority(pri);
b_->SetIOPriority(pri);
}
Env::IOPriority GetIOPriority() override {
// NOTE: we don't verify this one
return a_->GetIOPriority();
}
uint64_t GetFileSize() override {
uint64_t as = a_->GetFileSize();
assert(as == b_->GetFileSize());
return as;
}
void GetPreallocationStatus(size_t* block_size,
size_t* last_allocated_block) override {
// NOTE: we don't verify this one
return a_->GetPreallocationStatus(block_size, last_allocated_block);
}
size_t GetUniqueId(char* id, size_t max_size) const override {
// NOTE: we don't verify this one
return a_->GetUniqueId(id, max_size);
}
Status InvalidateCache(size_t offset, size_t length) override {
Status as = a_->InvalidateCache(offset, length);
Status bs = b_->InvalidateCache(offset, length);
assert(as == bs);
return as;
}
protected:
Status Allocate(uint64_t offset, uint64_t length) override {
Status as = a_->Allocate(offset, length);
Status bs = b_->Allocate(offset, length);
assert(as == bs);
return as;
}
Status RangeSync(uint64_t offset, uint64_t nbytes) override {
Status as = a_->RangeSync(offset, nbytes);
Status bs = b_->RangeSync(offset, nbytes);
assert(as == bs);
return as;
}
};
Status EnvMirror::NewSequentialFile(const std::string& f,
std::unique_ptr<SequentialFile>* r,
const EnvOptions& options) {
if (f.find("/proc/") == 0) {
return a_->NewSequentialFile(f, r, options);
}
SequentialFileMirror* mf = new SequentialFileMirror(f);
Status as = a_->NewSequentialFile(f, &mf->a_, options);
Status bs = b_->NewSequentialFile(f, &mf->b_, options);
assert(as == bs);
if (as.ok())
r->reset(mf);
else
delete mf;
return as;
}
Status EnvMirror::NewRandomAccessFile(const std::string& f,
std::unique_ptr<RandomAccessFile>* r,
const EnvOptions& options) {
if (f.find("/proc/") == 0) {
return a_->NewRandomAccessFile(f, r, options);
}
RandomAccessFileMirror* mf = new RandomAccessFileMirror(f);
Status as = a_->NewRandomAccessFile(f, &mf->a_, options);
Status bs = b_->NewRandomAccessFile(f, &mf->b_, options);
assert(as == bs);
if (as.ok())
r->reset(mf);
else
delete mf;
return as;
}
Status EnvMirror::NewWritableFile(const std::string& f,
std::unique_ptr<WritableFile>* r,
const EnvOptions& options) {
if (f.find("/proc/") == 0) return a_->NewWritableFile(f, r, options);
Optionally wait on bytes_per_sync to smooth I/O (#5183) Summary: The existing implementation does not guarantee bytes reach disk every `bytes_per_sync` when writing SST files, or every `wal_bytes_per_sync` when writing WALs. This can cause confusing behavior for users who enable this feature to avoid large syncs during flush and compaction, but then end up hitting them anyways. My understanding of the existing behavior is we used `sync_file_range` with `SYNC_FILE_RANGE_WRITE` to submit ranges for async writeback, such that we could continue processing the next range of bytes while that I/O is happening. I believe we can preserve that benefit while also limiting how far the processing can get ahead of the I/O, which prevents huge syncs from happening when the file finishes. Consider this `sync_file_range` usage: `sync_file_range(fd_, 0, static_cast<off_t>(offset + nbytes), SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE)`. Expanding the range to start at 0 and adding the `SYNC_FILE_RANGE_WAIT_BEFORE` flag causes any pending writeback (like from a previous call to `sync_file_range`) to finish before it proceeds to submit the latest `nbytes` for writeback. The latest `nbytes` are still written back asynchronously, unless processing exceeds I/O speed, in which case the following `sync_file_range` will need to wait on it. There is a second change in this PR to use `fdatasync` when `sync_file_range` is unavailable (determined statically) or has some known problem with the underlying filesystem (determined dynamically). The above two changes only apply when the user enables a new option, `strict_bytes_per_sync`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5183 Differential Revision: D14953553 Pulled By: siying fbshipit-source-id: 445c3862e019fb7b470f9c7f314fc231b62706e9
5 years ago
WritableFileMirror* mf = new WritableFileMirror(f, options);
Status as = a_->NewWritableFile(f, &mf->a_, options);
Status bs = b_->NewWritableFile(f, &mf->b_, options);
assert(as == bs);
if (as.ok())
r->reset(mf);
else
delete mf;
return as;
}
Status EnvMirror::ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
std::unique_ptr<WritableFile>* r,
const EnvOptions& options) {
if (fname.find("/proc/") == 0)
return a_->ReuseWritableFile(fname, old_fname, r, options);
Optionally wait on bytes_per_sync to smooth I/O (#5183) Summary: The existing implementation does not guarantee bytes reach disk every `bytes_per_sync` when writing SST files, or every `wal_bytes_per_sync` when writing WALs. This can cause confusing behavior for users who enable this feature to avoid large syncs during flush and compaction, but then end up hitting them anyways. My understanding of the existing behavior is we used `sync_file_range` with `SYNC_FILE_RANGE_WRITE` to submit ranges for async writeback, such that we could continue processing the next range of bytes while that I/O is happening. I believe we can preserve that benefit while also limiting how far the processing can get ahead of the I/O, which prevents huge syncs from happening when the file finishes. Consider this `sync_file_range` usage: `sync_file_range(fd_, 0, static_cast<off_t>(offset + nbytes), SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE)`. Expanding the range to start at 0 and adding the `SYNC_FILE_RANGE_WAIT_BEFORE` flag causes any pending writeback (like from a previous call to `sync_file_range`) to finish before it proceeds to submit the latest `nbytes` for writeback. The latest `nbytes` are still written back asynchronously, unless processing exceeds I/O speed, in which case the following `sync_file_range` will need to wait on it. There is a second change in this PR to use `fdatasync` when `sync_file_range` is unavailable (determined statically) or has some known problem with the underlying filesystem (determined dynamically). The above two changes only apply when the user enables a new option, `strict_bytes_per_sync`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5183 Differential Revision: D14953553 Pulled By: siying fbshipit-source-id: 445c3862e019fb7b470f9c7f314fc231b62706e9
5 years ago
WritableFileMirror* mf = new WritableFileMirror(fname, options);
Status as = a_->ReuseWritableFile(fname, old_fname, &mf->a_, options);
Status bs = b_->ReuseWritableFile(fname, old_fname, &mf->b_, options);
assert(as == bs);
if (as.ok())
r->reset(mf);
else
delete mf;
return as;
}
} // namespace ROCKSDB_NAMESPACE
#endif