Break large file writes into 1GB chunks (#5213)

Summary:
This is a workaround for the issue described in #5169.
It has been tested on a database with very large values, but not dedicated test has been added to the code base.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5213

Differential Revision: D15243116

Pulled By: siying

fbshipit-source-id: e0c226a6cd71a60924dcd7ce7af74abcb4054484
main
Raphael Bost 6 years ago committed by Facebook Github Bot
parent f0e8216197
commit 468ca61105
  1. 109
      env/io_posix.cc

109
env/io_posix.cc vendored

@ -58,6 +58,57 @@ int Fadvise(int fd, off_t offset, size_t len, int advice) {
namespace { namespace {
// On MacOS (and probably *BSD), the posix write and pwrite calls do not support
// buffers larger than 2^31-1 bytes. These two wrappers fix this issue by
// cutting the buffer in 1GB chunks. We use this chunk size to be sure to keep
// the writes aligned.
bool PosixWrite(int fd, const char* buf, size_t nbyte) {
const size_t kLimit1Gb = 1UL << 30;
const char* src = buf;
size_t left = nbyte;
while (left != 0) {
size_t bytes_to_write = std::min(left, kLimit1Gb);
ssize_t done = write(fd, src, bytes_to_write);
if (done < 0) {
if (errno == EINTR) {
continue;
}
return false;
}
left -= done;
src += done;
}
return true;
}
bool PosixPositionedWrite(int fd, const char* buf, size_t nbyte, off_t offset) {
const size_t kLimit1Gb = 1UL << 30;
const char* src = buf;
size_t left = nbyte;
while (left != 0) {
size_t bytes_to_write = std::min(left, kLimit1Gb);
ssize_t done = pwrite(fd, src, bytes_to_write, offset);
if (done < 0) {
if (errno == EINTR) {
continue;
}
return false;
}
left -= done;
offset += done;
src += done;
}
return true;
}
size_t GetLogicalBufferSize(int __attribute__((__unused__)) fd) { size_t GetLogicalBufferSize(int __attribute__((__unused__)) fd) {
#ifdef OS_LINUX #ifdef OS_LINUX
struct stat buf; struct stat buf;
@ -180,7 +231,7 @@ bool IsSectorAligned(const void* ptr, size_t sector_size) {
return uintptr_t(ptr) % sector_size == 0; return uintptr_t(ptr) % sector_size == 0;
} }
} } // namespace
#endif #endif
/* /*
@ -752,8 +803,8 @@ Status PosixMmapFile::Allocate(uint64_t offset, uint64_t len) {
TEST_KILL_RANDOM("PosixMmapFile::Allocate:0", rocksdb_kill_odds); TEST_KILL_RANDOM("PosixMmapFile::Allocate:0", rocksdb_kill_odds);
int alloc_status = 0; int alloc_status = 0;
if (allow_fallocate_) { if (allow_fallocate_) {
alloc_status = fallocate( alloc_status =
fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
static_cast<off_t>(offset), static_cast<off_t>(len)); static_cast<off_t>(offset), static_cast<off_t>(len));
} }
if (alloc_status == 0) { if (alloc_status == 0) {
@ -801,19 +852,13 @@ Status PosixWritableFile::Append(const Slice& data) {
assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment())); assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
} }
const char* src = data.data(); const char* src = data.data();
size_t left = data.size(); size_t nbytes = data.size();
while (left != 0) {
ssize_t done = write(fd_, src, left); if (!PosixWrite(fd_, src, nbytes)) {
if (done < 0) {
if (errno == EINTR) {
continue;
}
return IOError("While appending to file", filename_, errno); return IOError("While appending to file", filename_, errno);
} }
left -= done;
src += done; filesize_ += nbytes;
}
filesize_ += data.size();
return Status::OK(); return Status::OK();
} }
@ -825,21 +870,12 @@ Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) {
} }
assert(offset <= std::numeric_limits<off_t>::max()); assert(offset <= std::numeric_limits<off_t>::max());
const char* src = data.data(); const char* src = data.data();
size_t left = data.size(); size_t nbytes = data.size();
while (left != 0) { if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) {
ssize_t done = pwrite(fd_, src, left, static_cast<off_t>(offset));
if (done < 0) {
if (errno == EINTR) {
continue;
}
return IOError("While pwrite to file at offset " + ToString(offset), return IOError("While pwrite to file at offset " + ToString(offset),
filename_, errno); filename_, errno);
} }
left -= done; filesize_ = offset + nbytes;
offset += done;
src += done;
}
filesize_ = offset;
return Status::OK(); return Status::OK();
} }
@ -974,8 +1010,8 @@ Status PosixWritableFile::Allocate(uint64_t offset, uint64_t len) {
IOSTATS_TIMER_GUARD(allocate_nanos); IOSTATS_TIMER_GUARD(allocate_nanos);
int alloc_status = 0; int alloc_status = 0;
if (allow_fallocate_) { if (allow_fallocate_) {
alloc_status = fallocate( alloc_status =
fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, fallocate(fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
static_cast<off_t>(offset), static_cast<off_t>(len)); static_cast<off_t>(offset), static_cast<off_t>(len));
} }
if (alloc_status == 0) { if (alloc_status == 0) {
@ -1037,26 +1073,13 @@ PosixRandomRWFile::~PosixRandomRWFile() {
Status PosixRandomRWFile::Write(uint64_t offset, const Slice& data) { Status PosixRandomRWFile::Write(uint64_t offset, const Slice& data) {
const char* src = data.data(); const char* src = data.data();
size_t left = data.size(); size_t nbytes = data.size();
while (left != 0) { if (!PosixPositionedWrite(fd_, src, nbytes, static_cast<off_t>(offset))) {
ssize_t done = pwrite(fd_, src, left, offset);
if (done < 0) {
// error while writing to file
if (errno == EINTR) {
// write was interrupted, try again.
continue;
}
return IOError( return IOError(
"While write random read/write file at offset " + ToString(offset), "While write random read/write file at offset " + ToString(offset),
filename_, errno); filename_, errno);
} }
// Wrote `done` bytes
left -= done;
offset += done;
src += done;
}
return Status::OK(); return Status::OK();
} }

Loading…
Cancel
Save