|
|
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
|
|
|
// This source code is licensed under the BSD-style license found in the
|
|
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
|
|
|
|
#include "utilities/persistent_cache/block_cache_tier_file.h"
|
|
|
|
|
|
|
|
#ifndef OS_WIN
|
|
|
|
#include <unistd.h>
|
|
|
|
#endif
|
|
|
|
#include <functional>
|
|
|
|
#include <memory>
|
|
|
|
#include <vector>
|
|
|
|
|
|
|
|
#include "util/crc32c.h"
|
|
|
|
#include "port/port.h"
|
|
|
|
|
|
|
|
namespace rocksdb {
|
|
|
|
|
|
|
|
//
|
|
|
|
// File creation factories
|
|
|
|
//
|
|
|
|
Status NewWritableCacheFile(Env* const env, const std::string& filepath,
|
|
|
|
std::unique_ptr<WritableFile>* file,
|
|
|
|
const bool use_direct_writes = false) {
|
|
|
|
EnvOptions opt;
|
|
|
|
opt.use_direct_writes = use_direct_writes;
|
|
|
|
Status s = env->NewWritableFile(filepath, file, opt);
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
Status NewRandomAccessCacheFile(Env* const env, const std::string& filepath,
|
|
|
|
std::unique_ptr<RandomAccessFile>* file,
|
|
|
|
const bool use_direct_reads = true) {
|
|
|
|
EnvOptions opt;
|
|
|
|
opt.use_direct_reads = use_direct_reads;
|
|
|
|
Status s = env->NewRandomAccessFile(filepath, file, opt);
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
//
|
|
|
|
// BlockCacheFile
|
|
|
|
//
|
|
|
|
Status BlockCacheFile::Delete(uint64_t* size) {
|
|
|
|
Status status = env_->GetFileSize(Path(), size);
|
|
|
|
if (!status.ok()) {
|
|
|
|
return status;
|
|
|
|
}
|
|
|
|
return env_->DeleteFile(Path());
|
|
|
|
}
|
|
|
|
|
|
|
|
//
|
|
|
|
// CacheRecord
|
|
|
|
//
|
|
|
|
// Cache record represents the record on disk
|
|
|
|
//
|
|
|
|
// +--------+---------+----------+------------+---------------+-------------+
|
|
|
|
// | magic | crc | key size | value size | key data | value data |
|
|
|
|
// +--------+---------+----------+------------+---------------+-------------+
|
|
|
|
// <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size -->
|
|
|
|
//
|
|
|
|
struct CacheRecordHeader {
|
|
|
|
CacheRecordHeader() {}
|
|
|
|
CacheRecordHeader(const uint32_t magic, const uint32_t key_size,
|
|
|
|
const uint32_t val_size)
|
|
|
|
: magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {}
|
|
|
|
|
|
|
|
uint32_t magic_;
|
|
|
|
uint32_t crc_;
|
|
|
|
uint32_t key_size_;
|
|
|
|
uint32_t val_size_;
|
|
|
|
};
|
|
|
|
|
|
|
|
struct CacheRecord {
|
|
|
|
CacheRecord() {}
|
|
|
|
CacheRecord(const Slice& key, const Slice& val)
|
|
|
|
: hdr_(MAGIC, static_cast<uint32_t>(key.size()),
|
|
|
|
static_cast<uint32_t>(val.size())),
|
|
|
|
key_(key),
|
|
|
|
val_(val) {
|
|
|
|
hdr_.crc_ = ComputeCRC();
|
|
|
|
}
|
|
|
|
|
|
|
|
uint32_t ComputeCRC() const;
|
|
|
|
bool Serialize(std::vector<CacheWriteBuffer*>* bufs, size_t* woff);
|
|
|
|
bool Deserialize(const Slice& buf);
|
|
|
|
|
|
|
|
static uint32_t CalcSize(const Slice& key, const Slice& val) {
|
|
|
|
return static_cast<uint32_t>(sizeof(CacheRecordHeader) + key.size() +
|
|
|
|
val.size());
|
|
|
|
}
|
|
|
|
|
|
|
|
static const uint32_t MAGIC = 0xfefa;
|
|
|
|
|
|
|
|
bool Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
|
|
|
|
const char* data, const size_t size);
|
|
|
|
|
|
|
|
CacheRecordHeader hdr_;
|
|
|
|
Slice key_;
|
|
|
|
Slice val_;
|
|
|
|
};
|
|
|
|
|
|
|
|
static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned");
|
|
|
|
|
|
|
|
uint32_t CacheRecord::ComputeCRC() const {
|
|
|
|
uint32_t crc = 0;
|
|
|
|
CacheRecordHeader tmp = hdr_;
|
|
|
|
tmp.crc_ = 0;
|
|
|
|
crc = crc32c::Extend(crc, reinterpret_cast<const char*>(&tmp), sizeof(tmp));
|
|
|
|
crc = crc32c::Extend(crc, reinterpret_cast<const char*>(key_.data()),
|
|
|
|
key_.size());
|
|
|
|
crc = crc32c::Extend(crc, reinterpret_cast<const char*>(val_.data()),
|
|
|
|
val_.size());
|
|
|
|
return crc;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool CacheRecord::Serialize(std::vector<CacheWriteBuffer*>* bufs,
|
|
|
|
size_t* woff) {
|
|
|
|
assert(bufs->size());
|
|
|
|
return Append(bufs, woff, reinterpret_cast<const char*>(&hdr_),
|
|
|
|
sizeof(hdr_)) &&
|
|
|
|
Append(bufs, woff, reinterpret_cast<const char*>(key_.data()),
|
|
|
|
key_.size()) &&
|
|
|
|
Append(bufs, woff, reinterpret_cast<const char*>(val_.data()),
|
|
|
|
val_.size());
|
|
|
|
}
|
|
|
|
|
|
|
|
bool CacheRecord::Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff,
|
|
|
|
const char* data, const size_t data_size) {
|
|
|
|
assert(*woff < bufs->size());
|
|
|
|
|
|
|
|
const char* p = data;
|
|
|
|
size_t size = data_size;
|
|
|
|
|
|
|
|
while (size && *woff < bufs->size()) {
|
|
|
|
CacheWriteBuffer* buf = (*bufs)[*woff];
|
|
|
|
const size_t free = buf->Free();
|
|
|
|
if (size <= free) {
|
|
|
|
buf->Append(p, size);
|
|
|
|
size = 0;
|
|
|
|
} else {
|
|
|
|
buf->Append(p, free);
|
|
|
|
p += free;
|
|
|
|
size -= free;
|
|
|
|
assert(!buf->Free());
|
|
|
|
assert(buf->Used() == buf->Capacity());
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!buf->Free()) {
|
|
|
|
*woff += 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(!size);
|
|
|
|
|
|
|
|
return !size;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool CacheRecord::Deserialize(const Slice& data) {
|
|
|
|
assert(data.size() >= sizeof(CacheRecordHeader));
|
|
|
|
if (data.size() < sizeof(CacheRecordHeader)) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
memcpy(&hdr_, data.data(), sizeof(hdr_));
|
|
|
|
|
|
|
|
assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size());
|
|
|
|
if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_);
|
|
|
|
val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_);
|
|
|
|
|
|
|
|
if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) {
|
|
|
|
fprintf(stderr, "** magic %d ** \n", hdr_.magic_);
|
|
|
|
fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_);
|
|
|
|
fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_);
|
|
|
|
fprintf(stderr, "** key %s ** \n", key_.ToString().c_str());
|
|
|
|
fprintf(stderr, "** val %s ** \n", val_.ToString().c_str());
|
|
|
|
for (size_t i = 0; i < hdr_.val_size_; ++i) {
|
|
|
|
fprintf(stderr, "%d.", (uint8_t)val_.data()[i]);
|
|
|
|
}
|
|
|
|
fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC());
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_);
|
|
|
|
return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_;
|
|
|
|
}
|
|
|
|
|
|
|
|
//
|
|
|
|
// RandomAccessFile
|
|
|
|
//
|
|
|
|
|
|
|
|
bool RandomAccessCacheFile::Open(const bool enable_direct_reads) {
|
|
|
|
WriteLock _(&rwlock_);
|
|
|
|
return OpenImpl(enable_direct_reads);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) {
|
|
|
|
rwlock_.AssertHeld();
|
|
|
|
|
|
|
|
Debug(log_, "Opening cache file %s", Path().c_str());
|
|
|
|
|
|
|
|
Status status = NewRandomAccessCacheFile(env_, Path(), &file_);
|
|
|
|
if (!status.ok()) {
|
|
|
|
Error(log_, "Error opening random access file %s. %s", Path().c_str(),
|
|
|
|
status.ToString().c_str());
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val,
|
|
|
|
char* scratch) {
|
|
|
|
ReadLock _(&rwlock_);
|
|
|
|
|
|
|
|
assert(lba.cache_id_ == cache_id_);
|
|
|
|
|
|
|
|
if (!file_) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
Slice result;
|
|
|
|
Status s = file_->Read(lba.off_, lba.size_, &result, scratch);
|
|
|
|
if (!s.ok()) {
|
|
|
|
Error(log_, "Error reading from file %s. %s", Path().c_str(),
|
|
|
|
s.ToString().c_str());
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(result.data() == scratch);
|
|
|
|
|
|
|
|
return ParseRec(lba, key, val, scratch);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val,
|
|
|
|
char* scratch) {
|
|
|
|
Slice data(scratch, lba.size_);
|
|
|
|
|
|
|
|
CacheRecord rec;
|
|
|
|
if (!rec.Deserialize(data)) {
|
|
|
|
assert(!"Error deserializing data");
|
|
|
|
Error(log_, "Error de-serializing record from file %s off %d",
|
|
|
|
Path().c_str(), lba.off_);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
*key = Slice(rec.key_);
|
|
|
|
*val = Slice(rec.val_);
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
//
|
|
|
|
// WriteableCacheFile
|
|
|
|
//
|
|
|
|
|
|
|
|
WriteableCacheFile::~WriteableCacheFile() {
|
|
|
|
WriteLock _(&rwlock_);
|
|
|
|
if (!eof_) {
|
|
|
|
// This file never flushed. We give priority to shutdown since this is a
|
|
|
|
// cache
|
|
|
|
// TODO(krad): Figure a way to flush the pending data
|
|
|
|
if (file_) {
|
|
|
|
assert(refs_ == 1);
|
|
|
|
--refs_;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
assert(!refs_);
|
|
|
|
ClearBuffers();
|
|
|
|
}
|
|
|
|
|
|
|
|
bool WriteableCacheFile::Create(const bool enable_direct_writes,
|
|
|
|
const bool enable_direct_reads) {
|
|
|
|
WriteLock _(&rwlock_);
|
|
|
|
|
|
|
|
enable_direct_reads_ = enable_direct_reads;
|
|
|
|
|
|
|
|
Debug(log_, "Creating new cache %s (max size is %d B)", Path().c_str(),
|
|
|
|
max_size_);
|
|
|
|
|
|
|
|
Status s = env_->FileExists(Path());
|
|
|
|
if (s.ok()) {
|
|
|
|
Warn(log_, "File %s already exists. %s", Path().c_str(),
|
|
|
|
s.ToString().c_str());
|
|
|
|
}
|
|
|
|
|
|
|
|
s = NewWritableCacheFile(env_, Path(), &file_);
|
|
|
|
if (!s.ok()) {
|
|
|
|
Warn(log_, "Unable to create file %s. %s", Path().c_str(),
|
|
|
|
s.ToString().c_str());
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(!refs_);
|
|
|
|
++refs_;
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) {
|
|
|
|
WriteLock _(&rwlock_);
|
|
|
|
|
|
|
|
if (eof_) {
|
|
|
|
// We can't append since the file is full
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// estimate the space required to store the (key, val)
|
|
|
|
uint32_t rec_size = CacheRecord::CalcSize(key, val);
|
|
|
|
|
|
|
|
if (!ExpandBuffer(rec_size)) {
|
|
|
|
// unable to expand the buffer
|
|
|
|
Debug(log_, "Error expanding buffers. size=%d", rec_size);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
lba->cache_id_ = cache_id_;
|
|
|
|
lba->off_ = disk_woff_;
|
|
|
|
lba->size_ = rec_size;
|
|
|
|
|
|
|
|
CacheRecord rec(key, val);
|
|
|
|
if (!rec.Serialize(&bufs_, &buf_woff_)) {
|
|
|
|
// unexpected error: unable to serialize the data
|
|
|
|
assert(!"Error serializing record");
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
disk_woff_ += rec_size;
|
|
|
|
eof_ = disk_woff_ >= max_size_;
|
|
|
|
|
|
|
|
// dispatch buffer for flush
|
|
|
|
DispatchBuffer();
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool WriteableCacheFile::ExpandBuffer(const size_t size) {
|
|
|
|
rwlock_.AssertHeld();
|
|
|
|
assert(!eof_);
|
|
|
|
|
|
|
|
// determine if there is enough space
|
|
|
|
size_t free = 0; // compute the free space left in buffer
|
|
|
|
for (size_t i = buf_woff_; i < bufs_.size(); ++i) {
|
|
|
|
free += bufs_[i]->Free();
|
|
|
|
if (size <= free) {
|
|
|
|
// we have enough space in the buffer
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// expand the buffer until there is enough space to write `size` bytes
|
|
|
|
assert(free < size);
|
|
|
|
while (free < size) {
|
|
|
|
CacheWriteBuffer* const buf = alloc_->Allocate();
|
|
|
|
if (!buf) {
|
|
|
|
Debug(log_, "Unable to allocate buffers");
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
size_ += static_cast<uint32_t>(buf->Free());
|
|
|
|
free += buf->Free();
|
|
|
|
bufs_.push_back(buf);
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(free >= size);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteableCacheFile::DispatchBuffer() {
|
|
|
|
rwlock_.AssertHeld();
|
|
|
|
|
|
|
|
assert(bufs_.size());
|
|
|
|
assert(buf_doff_ <= buf_woff_);
|
|
|
|
assert(buf_woff_ <= bufs_.size());
|
|
|
|
|
|
|
|
if (pending_ios_) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!eof_ && buf_doff_ == buf_woff_) {
|
|
|
|
// dispatch buffer is pointing to write buffer and we haven't hit eof
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(eof_ || buf_doff_ < buf_woff_);
|
|
|
|
assert(buf_doff_ < bufs_.size());
|
|
|
|
assert(file_);
|
|
|
|
|
|
|
|
auto* buf = bufs_[buf_doff_];
|
|
|
|
const uint64_t file_off = buf_doff_ * alloc_->BufferSize();
|
|
|
|
|
|
|
|
assert(!buf->Free() ||
|
|
|
|
(eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size()));
|
|
|
|
// we have reached end of file, and there is space in the last buffer
|
|
|
|
// pad it with zero for direct IO
|
|
|
|
buf->FillTrailingZeros();
|
|
|
|
|
|
|
|
assert(buf->Used() % kFileAlignmentSize == 0);
|
|
|
|
|
|
|
|
writer_->Write(file_.get(), buf, file_off,
|
|
|
|
std::bind(&WriteableCacheFile::BufferWriteDone, this));
|
|
|
|
pending_ios_++;
|
|
|
|
buf_doff_++;
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteableCacheFile::BufferWriteDone() {
|
|
|
|
WriteLock _(&rwlock_);
|
|
|
|
|
|
|
|
assert(bufs_.size());
|
|
|
|
|
|
|
|
pending_ios_--;
|
|
|
|
|
|
|
|
if (buf_doff_ < bufs_.size()) {
|
|
|
|
DispatchBuffer();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) {
|
|
|
|
// end-of-file reached, move to read mode
|
|
|
|
CloseAndOpenForReading();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteableCacheFile::CloseAndOpenForReading() {
|
|
|
|
// Our env abstraction do not allow reading from a file opened for appending
|
|
|
|
// We need close the file and re-open it for reading
|
|
|
|
Close();
|
|
|
|
RandomAccessCacheFile::OpenImpl(enable_direct_reads_);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block,
|
|
|
|
char* scratch) {
|
|
|
|
rwlock_.AssertHeld();
|
|
|
|
|
|
|
|
if (!ReadBuffer(lba, scratch)) {
|
|
|
|
Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_,
|
|
|
|
lba.off_);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
return ParseRec(lba, key, block, scratch);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) {
|
|
|
|
rwlock_.AssertHeld();
|
|
|
|
|
|
|
|
assert(lba.off_ < disk_woff_);
|
|
|
|
|
|
|
|
// we read from the buffers like reading from a flat file. The list of buffers
|
|
|
|
// are treated as contiguous stream of data
|
|
|
|
|
|
|
|
char* tmp = data;
|
|
|
|
size_t pending_nbytes = lba.size_;
|
|
|
|
// start buffer
|
|
|
|
size_t start_idx = lba.off_ / alloc_->BufferSize();
|
|
|
|
// offset into the start buffer
|
|
|
|
size_t start_off = lba.off_ % alloc_->BufferSize();
|
|
|
|
|
|
|
|
assert(start_idx <= buf_woff_);
|
|
|
|
|
|
|
|
for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) {
|
|
|
|
assert(i <= buf_woff_);
|
|
|
|
auto* buf = bufs_[i];
|
|
|
|
assert(i == buf_woff_ || !buf->Free());
|
|
|
|
// bytes to write to the buffer
|
|
|
|
size_t nbytes = pending_nbytes > (buf->Used() - start_off)
|
|
|
|
? (buf->Used() - start_off)
|
|
|
|
: pending_nbytes;
|
|
|
|
memcpy(tmp, buf->Data() + start_off, nbytes);
|
|
|
|
|
|
|
|
// left over to be written
|
|
|
|
pending_nbytes -= nbytes;
|
|
|
|
start_off = 0;
|
|
|
|
tmp += nbytes;
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(!pending_nbytes);
|
|
|
|
if (pending_nbytes) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(tmp == data + lba.size_);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteableCacheFile::Close() {
|
|
|
|
rwlock_.AssertHeld();
|
|
|
|
|
|
|
|
assert(size_ >= max_size_);
|
|
|
|
assert(disk_woff_ >= max_size_);
|
|
|
|
assert(buf_doff_ == bufs_.size());
|
|
|
|
assert(bufs_.size() - buf_woff_ <= 1);
|
|
|
|
assert(!pending_ios_);
|
|
|
|
|
|
|
|
Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_,
|
|
|
|
disk_woff_);
|
|
|
|
|
|
|
|
ClearBuffers();
|
|
|
|
file_.reset();
|
|
|
|
|
|
|
|
assert(refs_);
|
|
|
|
--refs_;
|
|
|
|
}
|
|
|
|
|
|
|
|
void WriteableCacheFile::ClearBuffers() {
|
|
|
|
for (size_t i = 0; i < bufs_.size(); ++i) {
|
|
|
|
alloc_->Deallocate(bufs_[i]);
|
|
|
|
}
|
|
|
|
|
|
|
|
bufs_.clear();
|
|
|
|
}
|
|
|
|
|
|
|
|
//
|
|
|
|
// ThreadedFileWriter implementation
|
|
|
|
//
|
|
|
|
ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache,
|
|
|
|
const size_t qdepth, const size_t io_size)
|
|
|
|
: Writer(cache), io_size_(io_size) {
|
|
|
|
for (size_t i = 0; i < qdepth; ++i) {
|
|
|
|
port::Thread th(&ThreadedWriter::ThreadMain, this);
|
|
|
|
threads_.push_back(std::move(th));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void ThreadedWriter::Stop() {
|
|
|
|
// notify all threads to exit
|
|
|
|
for (size_t i = 0; i < threads_.size(); ++i) {
|
|
|
|
q_.Push(IO(/*signal=*/true));
|
|
|
|
}
|
|
|
|
|
|
|
|
// wait for all threads to exit
|
|
|
|
for (auto& th : threads_) {
|
|
|
|
th.join();
|
|
|
|
assert(!th.joinable());
|
|
|
|
}
|
|
|
|
threads_.clear();
|
|
|
|
}
|
|
|
|
|
|
|
|
void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf,
|
|
|
|
const uint64_t file_off,
|
|
|
|
const std::function<void()> callback) {
|
|
|
|
q_.Push(IO(file, buf, file_off, callback));
|
|
|
|
}
|
|
|
|
|
|
|
|
void ThreadedWriter::ThreadMain() {
|
|
|
|
while (true) {
|
|
|
|
// Fetch the IO to process
|
|
|
|
IO io(q_.Pop());
|
|
|
|
if (io.signal_) {
|
|
|
|
// that's secret signal to exit
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Reserve space for writing the buffer
|
|
|
|
while (!cache_->Reserve(io.buf_->Used())) {
|
|
|
|
// We can fail to reserve space if every file in the system
|
|
|
|
// is being currently accessed
|
|
|
|
/* sleep override */
|
|
|
|
Env::Default()->SleepForMicroseconds(1000000);
|
|
|
|
}
|
|
|
|
|
|
|
|
DispatchIO(io);
|
|
|
|
|
|
|
|
io.callback_();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void ThreadedWriter::DispatchIO(const IO& io) {
|
|
|
|
size_t written = 0;
|
|
|
|
while (written < io.buf_->Used()) {
|
|
|
|
Slice data(io.buf_->Data() + written, io_size_);
|
|
|
|
Status s = io.file_->Append(data);
|
|
|
|
assert(s.ok());
|
|
|
|
if (!s.ok()) {
|
|
|
|
// That is definite IO error to device. There is not much we can
|
|
|
|
// do but ignore the failure. This can lead to corruption of data on
|
|
|
|
// disk, but the cache will skip while reading
|
|
|
|
fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str());
|
|
|
|
}
|
|
|
|
written += io_size_;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
} // namespace rocksdb
|
|
|
|
|
|
|
|
#endif
|