fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
226 lines
5.8 KiB
226 lines
5.8 KiB
10 years ago
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||
|
// This source code is licensed under the BSD-style license found in the
|
||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||
|
//
|
||
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||
|
// Use of this source code is governed by a BSD-style license that can be
|
||
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||
|
|
||
|
#include "util/file_reader_writer.h"
|
||
|
|
||
|
#include <algorithm>
|
||
|
#include "port/port.h"
|
||
|
#include "util/iostats_context_imp.h"
|
||
|
#include "util/random.h"
|
||
|
#include "util/rate_limiter.h"
|
||
|
#include "util/sync_point.h"
|
||
|
|
||
|
namespace rocksdb {
|
||
|
Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
|
||
|
Status s = file_->Read(n, result, scratch);
|
||
|
IOSTATS_ADD(bytes_read, result->size());
|
||
|
return s;
|
||
|
}
|
||
|
|
||
|
Status SequentialFileReader::Skip(uint64_t n) { return file_->Skip(n); }
|
||
|
|
||
|
Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
|
||
|
char* scratch) const {
|
||
|
IOSTATS_TIMER_GUARD(read_nanos);
|
||
|
Status s = file_->Read(offset, n, result, scratch);
|
||
|
IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
|
||
|
return s;
|
||
|
}
|
||
|
|
||
|
Status WritableFileWriter::Append(const Slice& data) {
|
||
|
const char* src = data.data();
|
||
|
size_t left = data.size();
|
||
|
Status s;
|
||
|
pending_sync_ = true;
|
||
|
pending_fsync_ = true;
|
||
|
|
||
|
TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2);
|
||
|
|
||
|
writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left);
|
||
|
// if there is no space in the cache, then flush
|
||
|
if (cursize_ + left > capacity_) {
|
||
|
s = Flush();
|
||
|
if (!s.ok()) {
|
||
|
return s;
|
||
|
}
|
||
|
// Increase the buffer size, but capped at 1MB
|
||
|
if (capacity_ < (1 << 20)) {
|
||
|
capacity_ *= 2;
|
||
|
buf_.reset(new char[capacity_]);
|
||
|
}
|
||
|
assert(cursize_ == 0);
|
||
|
}
|
||
|
|
||
|
// if the write fits into the cache, then write to cache
|
||
|
// otherwise do a write() syscall to write to OS buffers.
|
||
|
if (cursize_ + left <= capacity_) {
|
||
|
memcpy(buf_.get() + cursize_, src, left);
|
||
|
cursize_ += left;
|
||
|
} else {
|
||
|
while (left != 0) {
|
||
|
size_t size = RequestToken(left);
|
||
|
{
|
||
|
IOSTATS_TIMER_GUARD(write_nanos);
|
||
|
s = writable_file_->Append(Slice(src, size));
|
||
|
if (!s.ok()) {
|
||
|
return s;
|
||
|
}
|
||
|
}
|
||
|
IOSTATS_ADD(bytes_written, size);
|
||
|
TEST_KILL_RANDOM(rocksdb_kill_odds);
|
||
|
|
||
|
left -= size;
|
||
|
src += size;
|
||
|
}
|
||
|
}
|
||
|
TEST_KILL_RANDOM(rocksdb_kill_odds);
|
||
|
filesize_ += data.size();
|
||
|
return Status::OK();
|
||
|
}
|
||
|
|
||
|
Status WritableFileWriter::Close() {
|
||
|
Status s;
|
||
|
s = Flush(); // flush cache to OS
|
||
|
if (!s.ok()) {
|
||
|
return s;
|
||
|
}
|
||
|
|
||
|
TEST_KILL_RANDOM(rocksdb_kill_odds);
|
||
|
return writable_file_->Close();
|
||
|
}
|
||
|
|
||
|
// write out the cached data to the OS cache
|
||
|
Status WritableFileWriter::Flush() {
|
||
|
TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2);
|
||
|
size_t left = cursize_;
|
||
|
char* src = buf_.get();
|
||
|
while (left != 0) {
|
||
|
size_t size = RequestToken(left);
|
||
|
{
|
||
|
IOSTATS_TIMER_GUARD(write_nanos);
|
||
|
Status s = writable_file_->Append(Slice(src, size));
|
||
|
if (!s.ok()) {
|
||
|
return s;
|
||
|
}
|
||
|
}
|
||
|
IOSTATS_ADD(bytes_written, size);
|
||
|
TEST_KILL_RANDOM(rocksdb_kill_odds * REDUCE_ODDS2);
|
||
|
left -= size;
|
||
|
src += size;
|
||
|
}
|
||
|
cursize_ = 0;
|
||
|
|
||
|
writable_file_->Flush();
|
||
|
|
||
|
// sync OS cache to disk for every bytes_per_sync_
|
||
|
// TODO: give log file and sst file different options (log
|
||
|
// files could be potentially cached in OS for their whole
|
||
|
// life time, thus we might not want to flush at all).
|
||
|
if (bytes_per_sync_ && filesize_ - last_sync_size_ >= bytes_per_sync_) {
|
||
|
writable_file_->RangeSync(last_sync_size_, filesize_ - last_sync_size_);
|
||
|
last_sync_size_ = filesize_;
|
||
|
}
|
||
|
|
||
|
return Status::OK();
|
||
|
}
|
||
|
|
||
|
Status WritableFileWriter::Sync(bool use_fsync) {
|
||
|
Status s = Flush();
|
||
|
if (!s.ok()) {
|
||
|
return s;
|
||
|
}
|
||
|
TEST_KILL_RANDOM(rocksdb_kill_odds);
|
||
|
if (pending_sync_) {
|
||
|
if (use_fsync) {
|
||
|
s = writable_file_->Fsync();
|
||
|
} else {
|
||
|
s = writable_file_->Sync();
|
||
|
}
|
||
|
if (!s.ok()) {
|
||
|
return s;
|
||
|
}
|
||
|
}
|
||
|
TEST_KILL_RANDOM(rocksdb_kill_odds);
|
||
|
pending_sync_ = false;
|
||
|
if (use_fsync) {
|
||
|
pending_fsync_ = false;
|
||
|
}
|
||
|
return Status::OK();
|
||
|
}
|
||
|
|
||
|
Status WritableFileWriter::RangeSync(off_t offset, off_t nbytes) {
|
||
|
IOSTATS_TIMER_GUARD(range_sync_nanos);
|
||
|
return writable_file_->RangeSync(offset, nbytes);
|
||
|
}
|
||
|
|
||
|
size_t WritableFileWriter::RequestToken(size_t bytes) {
|
||
|
Env::IOPriority io_priority;
|
||
|
if (rate_limiter_&&(io_priority = writable_file_->GetIOPriority()) <
|
||
|
Env::IO_TOTAL) {
|
||
|
bytes = std::min(bytes,
|
||
|
static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()));
|
||
|
rate_limiter_->Request(bytes, io_priority);
|
||
|
}
|
||
|
return bytes;
|
||
|
}
|
||
|
|
||
|
Status RandomRWFileAccessor::Write(uint64_t offset, const Slice& data) {
|
||
|
Status s;
|
||
|
pending_sync_ = true;
|
||
|
pending_fsync_ = true;
|
||
|
|
||
|
{
|
||
|
IOSTATS_TIMER_GUARD(write_nanos);
|
||
|
s = random_rw_file_->Write(offset, data);
|
||
|
if (!s.ok()) {
|
||
|
return s;
|
||
|
}
|
||
|
}
|
||
|
IOSTATS_ADD(bytes_written, data.size());
|
||
|
|
||
|
return s;
|
||
|
}
|
||
|
|
||
|
Status RandomRWFileAccessor::Read(uint64_t offset, size_t n, Slice* result,
|
||
|
char* scratch) const {
|
||
|
Status s;
|
||
|
{
|
||
|
IOSTATS_TIMER_GUARD(read_nanos);
|
||
|
s = random_rw_file_->Read(offset, n, result, scratch);
|
||
|
if (!s.ok()) {
|
||
|
return s;
|
||
|
}
|
||
|
}
|
||
|
IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
|
||
|
return s;
|
||
|
}
|
||
|
|
||
|
Status RandomRWFileAccessor::Close() { return random_rw_file_->Close(); }
|
||
|
|
||
|
Status RandomRWFileAccessor::Sync(bool use_fsync) {
|
||
|
Status s;
|
||
|
if (pending_sync_) {
|
||
|
if (use_fsync) {
|
||
|
s = random_rw_file_->Fsync();
|
||
|
} else {
|
||
|
s = random_rw_file_->Sync();
|
||
|
}
|
||
|
if (!s.ok()) {
|
||
|
return s;
|
||
|
}
|
||
|
}
|
||
|
if (use_fsync) {
|
||
|
pending_fsync_ = false;
|
||
|
}
|
||
|
pending_sync_ = false;
|
||
|
|
||
|
return s;
|
||
|
}
|
||
|
} // namespace rocksdb
|