diff --git a/src.mk b/src.mk index aad71d3d8..c98afdcc6 100644 --- a/src.mk +++ b/src.mk @@ -134,6 +134,8 @@ LIB_SOURCES = \ utilities/options/options_util.cc \ utilities/persistent_cache/persistent_cache_tier.cc \ utilities/persistent_cache/volatile_tier_impl.cc \ + utilities/persistent_cache/block_cache_tier_file.cc \ + utilities/persistent_cache/block_cache_tier_metadata.cc \ utilities/redis/redis_lists.cc \ utilities/simulator_cache/sim_cache.cc \ utilities/spatialdb/spatial_db.cc \ diff --git a/util/mutexlock.h b/util/mutexlock.h index a2d14aedf..94d7b596c 100644 --- a/util/mutexlock.h +++ b/util/mutexlock.h @@ -59,6 +59,20 @@ class ReadLock { void operator=(const ReadLock&); }; +// +// Automatically unlock a locked mutex when the object is destroyed +// +class ReadUnlock { + public: + explicit ReadUnlock(port::RWMutex *mu) : mu_(mu) { mu->AssertHeld(); } + ~ReadUnlock() { mu_->ReadUnlock(); } + + private: + port::RWMutex *const mu_; + // No copying allowed + ReadUnlock(const ReadUnlock &) = delete; + ReadUnlock &operator=(const ReadUnlock &) = delete; +}; // // Acquire a WriteLock on the specified RWMutex. diff --git a/utilities/persistent_cache/block_cache_tier_file.cc b/utilities/persistent_cache/block_cache_tier_file.cc new file mode 100644 index 000000000..970da8f18 --- /dev/null +++ b/utilities/persistent_cache/block_cache_tier_file.cc @@ -0,0 +1,575 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#ifndef ROCKSDB_LITE + +#include "utilities/persistent_cache/block_cache_tier_file.h" + +#include +#include +#include + +#include "util/crc32c.h" + +namespace rocksdb { + +// +// File creation factories +// +Status NewWritableCacheFile(Env* const env, const std::string& filepath, + std::unique_ptr* file, + const bool use_direct_writes = false) { + EnvOptions opt; + opt.use_direct_writes = use_direct_writes; + Status s = env->NewWritableFile(filepath, file, opt); + return s; +} + +Status NewRandomAccessCacheFile(Env* const env, const std::string& filepath, + std::unique_ptr* file, + const bool use_direct_reads = true) { + EnvOptions opt; + opt.use_direct_reads = use_direct_reads; + Status s = env->NewRandomAccessFile(filepath, file, opt); + return s; +} + +// +// BlockCacheFile +// +Status BlockCacheFile::Delete(size_t* size) { + Status status = env_->GetFileSize(Path(), size); + if (!status.ok()) { + return status; + } + return env_->DeleteFile(Path()); +} + +// +// CacheRecord +// +// Cache record represents the record on disk +// +// +--------+---------+----------+------------+---------------+-------------+ +// | magic | crc | key size | value size | key data | value data | +// +--------+---------+----------+------------+---------------+-------------+ +// <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size --> +// +struct CacheRecordHeader { + CacheRecordHeader() {} + CacheRecordHeader(const uint32_t magic, const uint32_t key_size, + const uint32_t val_size) + : magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {} + + uint32_t magic_; + uint32_t crc_; + uint32_t key_size_; + uint32_t val_size_; +}; + +struct CacheRecord { + CacheRecord() {} + CacheRecord(const Slice& key, const Slice& val) + : hdr_(MAGIC, static_cast(key.size()), + static_cast(val.size())), + key_(key), + val_(val) { + hdr_.crc_ = ComputeCRC(); + } + + uint32_t ComputeCRC() const; + bool Serialize(std::vector* bufs, size_t* woff); + bool Deserialize(const Slice& buf); + + static uint32_t CalcSize(const Slice& key, const Slice& val) { + return static_cast(sizeof(CacheRecordHeader) + key.size() + + val.size()); + } + + static const uint32_t MAGIC = 0xfefa; + + bool Append(std::vector* bufs, size_t* woff, + const char* data, const size_t size); + + CacheRecordHeader hdr_; + Slice key_; + Slice val_; +}; + +static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned"); + +uint32_t CacheRecord::ComputeCRC() const { + uint32_t crc = 0; + CacheRecordHeader tmp = hdr_; + tmp.crc_ = 0; + crc = crc32c::Extend(crc, reinterpret_cast(&tmp), sizeof(tmp)); + crc = crc32c::Extend(crc, reinterpret_cast(key_.data()), + key_.size()); + crc = crc32c::Extend(crc, reinterpret_cast(val_.data()), + val_.size()); + return crc; +} + +bool CacheRecord::Serialize(std::vector* bufs, + size_t* woff) { + assert(bufs->size()); + return Append(bufs, woff, reinterpret_cast(&hdr_), + sizeof(hdr_)) && + Append(bufs, woff, reinterpret_cast(key_.data()), + key_.size()) && + Append(bufs, woff, reinterpret_cast(val_.data()), + val_.size()); +} + +bool CacheRecord::Append(std::vector* bufs, size_t* woff, + const char* data, const size_t data_size) { + assert(*woff < bufs->size()); + + const char* p = data; + size_t size = data_size; + + while (size && *woff < bufs->size()) { + CacheWriteBuffer* buf = (*bufs)[*woff]; + const size_t free = buf->Free(); + if (size <= free) { + buf->Append(p, size); + size = 0; + } else { + buf->Append(p, free); + p += free; + size -= free; + assert(!buf->Free()); + assert(buf->Used() == buf->Capacity()); + } + + if (!buf->Free()) { + *woff += 1; + } + } + + assert(!size); + + return !size; +} + +bool CacheRecord::Deserialize(const Slice& data) { + assert(data.size() >= sizeof(CacheRecordHeader)); + if (data.size() < sizeof(CacheRecordHeader)) { + return false; + } + + memcpy(&hdr_, data.data(), sizeof(hdr_)); + + assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size()); + if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) { + return false; + } + + key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_); + val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_); + + if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) { + fprintf(stderr, "** magic %d ** \n", hdr_.magic_); + fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_); + fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_); + fprintf(stderr, "** key %s ** \n", key_.ToString().c_str()); + fprintf(stderr, "** val %s ** \n", val_.ToString().c_str()); + for (size_t i = 0; i < hdr_.val_size_; ++i) { + fprintf(stderr, "%d.", (uint8_t)val_.data()[i]); + } + fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC()); + } + + assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_); + return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_; +} + +// +// RandomAccessFile +// + +bool RandomAccessCacheFile::Open() { + WriteLock _(&rwlock_); + return OpenImpl(); +} + +bool RandomAccessCacheFile::OpenImpl() { + rwlock_.AssertHeld(); + + Debug(log_, "Opening cache file %s", Path().c_str()); + + Status status = NewRandomAccessCacheFile(env_, Path(), &file_); + if (!status.ok()) { + Error(log_, "Error opening random access file %s. %s", Path().c_str(), + status.ToString().c_str()); + return false; + } + + return true; +} + +bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val, + char* scratch) { + ReadLock _(&rwlock_); + + assert(lba.cache_id_ == cache_id_); + assert(file_); + + Slice result; + Status s = file_->Read(lba.off_, lba.size_, &result, scratch); + if (!s.ok()) { + Error(log_, "Error reading from file %s. %s", Path().c_str(), + s.ToString().c_str()); + return false; + } + + assert(result.data() == scratch); + + return ParseRec(lba, key, val, scratch); +} + +bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val, + char* scratch) { + Slice data(scratch, lba.size_); + + CacheRecord rec; + if (!rec.Deserialize(data)) { + assert(!"Error deserializing data"); + Error(log_, "Error de-serializing record from file %s off %d", + Path().c_str(), lba.off_); + return false; + } + + *key = Slice(rec.key_); + *val = Slice(rec.val_); + + return true; +} + +// +// WriteableCacheFile +// + +WriteableCacheFile::~WriteableCacheFile() { + WriteLock _(&rwlock_); + if (!eof_) { + // This file never flushed. We give priority to shutdown since this is a + // cache + // TODO(krad): Figure a way to flush the pending data + assert(file_); + + assert(refs_ == 1); + --refs_; + } + ClearBuffers(); +} + +bool WriteableCacheFile::Create() { + WriteLock _(&rwlock_); + + Debug(log_, "Creating new cache %s (max size is %d B)", Path().c_str(), + max_size_); + + Status s = env_->FileExists(Path()); + if (s.ok()) { + Warn(log_, "File %s already exists. %s", Path().c_str(), + s.ToString().c_str()); + } + + s = NewWritableCacheFile(env_, Path(), &file_); + if (!s.ok()) { + Warn(log_, "Unable to create file %s. %s", Path().c_str(), + s.ToString().c_str()); + return false; + } + + assert(!refs_); + ++refs_; + + return true; +} + +bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) { + WriteLock _(&rwlock_); + + if (eof_) { + // We can't append since the file is full + return false; + } + + // estimate the space required to store the (key, val) + uint32_t rec_size = CacheRecord::CalcSize(key, val); + + if (!ExpandBuffer(rec_size)) { + // unable to expand the buffer + Debug(log_, "Error expanding buffers. size=%d", rec_size); + return false; + } + + lba->cache_id_ = cache_id_; + lba->off_ = disk_woff_; + lba->size_ = rec_size; + + CacheRecord rec(key, val); + if (!rec.Serialize(&bufs_, &buf_woff_)) { + // unexpected error: unable to serialize the data + assert(!"Error serializing record"); + return false; + } + + disk_woff_ += rec_size; + eof_ = disk_woff_ >= max_size_; + + // dispatch buffer for flush + DispatchBuffer(); + + return true; +} + +bool WriteableCacheFile::ExpandBuffer(const size_t size) { + rwlock_.AssertHeld(); + assert(!eof_); + + // determine if there is enough space + size_t free = 0; // compute the free space left in buffer + for (size_t i = buf_woff_; i < bufs_.size(); ++i) { + free += bufs_[i]->Free(); + if (size <= free) { + // we have enough space in the buffer + return true; + } + } + + // expand the buffer until there is enough space to write `size` bytes + assert(free < size); + while (free < size) { + CacheWriteBuffer* const buf = alloc_->Allocate(); + if (!buf) { + Debug(log_, "Unable to allocate buffers"); + return false; + } + + size_ += buf->Free(); + free += buf->Free(); + bufs_.push_back(buf); + } + + assert(free >= size); + return true; +} + +void WriteableCacheFile::DispatchBuffer() { + rwlock_.AssertHeld(); + + assert(bufs_.size()); + assert(buf_doff_ <= buf_woff_); + assert(buf_woff_ <= bufs_.size()); + + if (pending_ios_) { + return; + } + + if (!eof_ && buf_doff_ == buf_woff_) { + // dispatch buffer is pointing to write buffer and we haven't hit eof + return; + } + + assert(eof_ || buf_doff_ < buf_woff_); + assert(buf_doff_ < bufs_.size()); + assert(file_); + + auto* buf = bufs_[buf_doff_]; + const uint64_t file_off = buf_doff_ * alloc_->BufferSize(); + + assert(!buf->Free() || + (eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size())); + // we have reached end of file, and there is space in the last buffer + // pad it with zero for direct IO + buf->FillTrailingZeros(); + + assert(buf->Used() % FILE_ALIGNMENT_SIZE == 0); + + writer_->Write(file_.get(), buf, file_off, + std::bind(&WriteableCacheFile::BufferWriteDone, this)); + pending_ios_++; + buf_doff_++; +} + +void WriteableCacheFile::BufferWriteDone() { + WriteLock _(&rwlock_); + + assert(bufs_.size()); + + pending_ios_--; + + if (buf_doff_ < bufs_.size()) { + DispatchBuffer(); + } + + if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) { + // end-of-file reached, move to read mode + CloseAndOpenForReading(); + } +} + +void WriteableCacheFile::CloseAndOpenForReading() { + // Our env abstraction do not allow reading from a file opened for appending + // We need close the file and re-open it for reading + Close(); + RandomAccessCacheFile::OpenImpl(); +} + +bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block, + char* scratch) { + rwlock_.AssertHeld(); + + if (!ReadBuffer(lba, scratch)) { + Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_, + lba.off_); + return false; + } + + return ParseRec(lba, key, block, scratch); +} + +bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) { + rwlock_.AssertHeld(); + + assert(lba.off_ < disk_woff_); + + // we read from the buffers like reading from a flat file. The list of buffers + // are treated as contiguous stream of data + + char* tmp = data; + size_t pending_nbytes = lba.size_; + // start buffer + size_t start_idx = lba.off_ / alloc_->BufferSize(); + // offset into the start buffer + size_t start_off = lba.off_ % alloc_->BufferSize(); + + assert(start_idx <= buf_woff_); + + for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) { + assert(i <= buf_woff_); + auto* buf = bufs_[i]; + assert(i == buf_woff_ || !buf->Free()); + // bytes to write to the buffer + size_t nbytes = pending_nbytes > (buf->Used() - start_off) + ? (buf->Used() - start_off) + : pending_nbytes; + memcpy(tmp, buf->Data() + start_off, nbytes); + + // left over to be written + pending_nbytes -= nbytes; + start_off = 0; + tmp += nbytes; + } + + assert(!pending_nbytes); + if (pending_nbytes) { + return false; + } + + assert(tmp == data + lba.size_); + return true; +} + +void WriteableCacheFile::Close() { + rwlock_.AssertHeld(); + + assert(size_ >= max_size_); + assert(disk_woff_ >= max_size_); + assert(buf_doff_ == bufs_.size()); + assert(bufs_.size() - buf_woff_ <= 1); + assert(!pending_ios_); + + Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_, + disk_woff_); + + ClearBuffers(); + file_.reset(); + + assert(refs_); + --refs_; +} + +void WriteableCacheFile::ClearBuffers() { + for (size_t i = 0; i < bufs_.size(); ++i) { + alloc_->Deallocate(bufs_[i]); + } + + bufs_.clear(); +} + +// +// ThreadedFileWriter implementation +// +ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache, + const size_t qdepth, const size_t io_size) + : Writer(cache), io_size_(io_size) { + for (size_t i = 0; i < qdepth; ++i) { + std::thread th(&ThreadedWriter::ThreadMain, this); + threads_.push_back(std::move(th)); + } +} + +void ThreadedWriter::Stop() { + // notify all threads to exit + for (size_t i = 0; i < threads_.size(); ++i) { + q_.Push(IO(/*signal=*/true)); + } + + // wait for all threads to exit + for (auto& th : threads_) { + th.join(); + } +} + +void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf, + const uint64_t file_off, + const std::function callback) { + q_.Push(IO(file, buf, file_off, callback)); +} + +void ThreadedWriter::ThreadMain() { + while (true) { + // Fetch the IO to process + IO io(q_.Pop()); + if (io.signal_) { + // that's secret signal to exit + break; + } + + // Reserve space for writing the buffer + while (!cache_->Reserve(io.buf_->Used())) { + // We can fail to reserve space if every file in the system + // is being currently accessed + /* sleep override */ sleep(1); + } + + DispatchIO(io); + + io.callback_(); + } +} + +void ThreadedWriter::DispatchIO(const IO& io) { + size_t written = 0; + while (written < io.buf_->Used()) { + Slice data(io.buf_->Data() + written, io_size_); + Status s = io.file_->Append(data); + assert(s.ok()); + if (!s.ok()) { + // That is definite IO error to device. There is not much we can + // do but ignore the failure. This can lead to corruption of data on + // disk, but the cache will skip while reading + fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str()); + } + written += io_size_; + } +} + +} // namespace rocksdb + +#endif diff --git a/utilities/persistent_cache/block_cache_tier_file.h b/utilities/persistent_cache/block_cache_tier_file.h new file mode 100644 index 000000000..8dbd60a1e --- /dev/null +++ b/utilities/persistent_cache/block_cache_tier_file.h @@ -0,0 +1,288 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include + +#include "rocksdb/comparator.h" +#include "rocksdb/env.h" + +#include "utilities/persistent_cache/block_cache_tier_file_buffer.h" +#include "utilities/persistent_cache/lrulist.h" +#include "utilities/persistent_cache/persistent_cache_tier.h" +#include "utilities/persistent_cache/persistent_cache_util.h" + +#include "port/port_posix.h" +#include "util/crc32c.h" +#include "util/mutexlock.h" + +// The io code path of persistent cache uses pipelined architecture +// +// client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel +// +// This would enable the system to scale for GB/s of throughput which is +// expected with modern devies like NVM. +// +// The file level operations are encapsulated in the following abstractions +// +// BlockCacheFile +// ^ +// | +// | +// RandomAccessCacheFile (For reading) +// ^ +// | +// | +// WriteableCacheFile (For writing) +// +// Write IO code path : +// +namespace rocksdb { + +class WriteableCacheFile; +struct BlockInfo; + +// Represents a logical record on device +// +// (L)ogical (B)lock (Address = { cache-file-id, offset, size } +struct LogicalBlockAddress { + LogicalBlockAddress() {} + explicit LogicalBlockAddress(const uint32_t cache_id, const uint32_t off, + const uint16_t size) + : cache_id_(cache_id), off_(off), size_(size) {} + + uint32_t cache_id_ = 0; + uint32_t off_ = 0; + uint32_t size_ = 0; +}; + +typedef LogicalBlockAddress LBA; + +// class Writer +// +// Writer is the abstraction used for writing data to file. The component can be +// multithreaded. It is the last step of write pipeline +class Writer { + public: + explicit Writer(PersistentCacheTier* const cache) : cache_(cache) {} + virtual ~Writer() {} + + // write buffer to file at the given offset + virtual void Write(WritableFile* const file, CacheWriteBuffer* buf, + const uint64_t file_off, + const std::function callback) = 0; + // stop the writer + virtual void Stop() = 0; + + PersistentCacheTier* const cache_; +}; + +// class BlockCacheFile +// +// Generic interface to support building file specialized for read/writing +class BlockCacheFile : public LRUElement { + public: + explicit BlockCacheFile(const uint32_t cache_id) + : LRUElement(), cache_id_(cache_id) {} + + explicit BlockCacheFile(Env* const env, const std::string& dir, + const uint32_t cache_id) + : LRUElement(), + env_(env), + dir_(dir), + cache_id_(cache_id) {} + + virtual ~BlockCacheFile() {} + + // append key/value to file and return LBA locator to user + virtual bool Append(const Slice& key, const Slice& val, LBA* const lba) { + assert(!"not implemented"); + return false; + } + + // read from the record locator (LBA) and return key, value and status + virtual bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) { + assert(!"not implemented"); + return false; + } + + // get file path + std::string Path() const { return dir_ + "/" + std::to_string(cache_id_); } + // get cache ID + uint32_t cacheid() const { return cache_id_; } + // Add block information to file data + // Block information is the list of index reference for this file + virtual void Add(BlockInfo* binfo) { + WriteLock _(&rwlock_); + block_infos_.push_back(binfo); + } + // get block information + std::list& block_infos() { return block_infos_; } + // delete file and return the size of the file + virtual Status Delete(size_t* size); + + protected: + port::RWMutex rwlock_; // synchronization mutex + Env* const env_ = nullptr; // Env for IO + const std::string dir_; // Directory name + const uint32_t cache_id_; // Cache id for the file + std::list block_infos_; // List of index entries mapping to the + // file content +}; + +// class RandomAccessFile +// +// Thread safe implementation for reading random data from file +class RandomAccessCacheFile : public BlockCacheFile { + public: + explicit RandomAccessCacheFile(Env* const env, const std::string& dir, + const uint32_t cache_id, + const shared_ptr& log) + : BlockCacheFile(env, dir, cache_id), log_(log) {} + + virtual ~RandomAccessCacheFile() {} + + // open file for reading + bool Open(); + // read data from the disk + bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override; + + private: + std::unique_ptr file_; + + protected: + bool OpenImpl(); + bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch); + + std::shared_ptr log_; // log file +}; + +// class WriteableCacheFile +// +// All writes to the files are cached in buffers. The buffers are flushed to +// disk as they get filled up. When file size reaches a certain size, a new file +// will be created provided there is free space +class WriteableCacheFile : public RandomAccessCacheFile { + public: + explicit WriteableCacheFile(Env* const env, CacheWriteBufferAllocator* alloc, + Writer* writer, const std::string& dir, + const uint32_t cache_id, const uint32_t max_size, + const std::shared_ptr& log) + : RandomAccessCacheFile(env, dir, cache_id, log), + alloc_(alloc), + writer_(writer), + max_size_(max_size) {} + + virtual ~WriteableCacheFile(); + + // create file on disk + bool Create(); + + // read data from logical file + bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override { + ReadLock _(&rwlock_); + const bool closed = eof_ && bufs_.empty(); + if (closed) { + // the file is closed, read from disk + return RandomAccessCacheFile::Read(lba, key, block, scratch); + } + // file is still being written, read from buffers + return ReadBuffer(lba, key, block, scratch); + } + + // append data to end of file + bool Append(const Slice&, const Slice&, LBA*) override; + // End-of-file + bool Eof() const { return eof_; } + + private: + friend class ThreadedWriter; + + static const size_t FILE_ALIGNMENT_SIZE = 4 * 1024; // align file size + + bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch); + bool ReadBuffer(const LBA& lba, char* data); + bool ExpandBuffer(const size_t size); + void DispatchBuffer(); + void BufferWriteDone(); + void CloseAndOpenForReading(); + void ClearBuffers(); + void Close(); + + // File layout in memory + // + // +------+------+------+------+------+------+ + // | b0 | b1 | b2 | b3 | b4 | b5 | + // +------+------+------+------+------+------+ + // ^ ^ + // | | + // buf_doff_ buf_woff_ + // (next buffer to (next buffer to fill) + // flush to disk) + // + // The buffers are flushed to disk serially for a given file + + CacheWriteBufferAllocator* const alloc_ = nullptr; // Buffer provider + Writer* const writer_ = nullptr; // File writer thread + std::unique_ptr file_; // RocksDB Env file abstraction + std::vector bufs_; // Written buffers + uint32_t size_ = 0; // Size of the file + const uint32_t max_size_; // Max size of the file + bool eof_ = false; // End of file + uint32_t disk_woff_ = 0; // Offset to write on disk + size_t buf_woff_ = 0; // off into bufs_ to write + size_t buf_doff_ = 0; // off into bufs_ to dispatch + size_t pending_ios_ = 0; // Number of ios to disk in-progress +}; + +// +// Abstraction to do writing to device. It is part of pipelined architecture. +// +class ThreadedWriter : public Writer { + public: + // Representation of IO to device + struct IO { + explicit IO(const bool signal) : signal_(signal) {} + explicit IO(WritableFile* const file, CacheWriteBuffer* const buf, + const uint64_t file_off, const std::function callback) + : file_(file), buf_(buf), file_off_(file_off), callback_(callback) {} + + IO(const IO&) = default; + IO& operator=(const IO&) = default; + size_t Size() const { return sizeof(IO); } + + WritableFile* file_ = nullptr; // File to write to + CacheWriteBuffer* const buf_ = nullptr; // buffer to write + uint64_t file_off_ = 0; // file offset + bool signal_ = false; // signal to exit thread loop + std::function callback_; // Callback on completion + }; + + explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth, + const size_t io_size); + virtual ~ThreadedWriter() {} + + void Stop() override; + void Write(WritableFile* const file, CacheWriteBuffer* buf, + const uint64_t file_off, + const std::function callback) override; + + private: + void ThreadMain(); + void DispatchIO(const IO& io); + + const size_t io_size_ = 0; + BoundedQueue q_; + std::vector threads_; +}; + +} // namespace rocksdb + +#endif diff --git a/utilities/persistent_cache/block_cache_tier_file_buffer.h b/utilities/persistent_cache/block_cache_tier_file_buffer.h new file mode 100644 index 000000000..4ab7fcc9b --- /dev/null +++ b/utilities/persistent_cache/block_cache_tier_file_buffer.h @@ -0,0 +1,117 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#include +#include +#include + +#include "include/rocksdb/comparator.h" +#include "util/arena.h" +#include "util/mutexlock.h" + +namespace rocksdb { + +// +// CacheWriteBuffer +// +// Buffer abstraction that can be manipulated via append +// (not thread safe) +class CacheWriteBuffer { + public: + explicit CacheWriteBuffer(const size_t size) : size_(size), pos_(0) { + buf_.reset(new char[size_]); + assert(!pos_); + assert(size_); + } + + virtual ~CacheWriteBuffer() {} + + void Append(const char* buf, const size_t size) { + assert(pos_ + size <= size_); + memcpy(buf_.get() + pos_, buf, size); + pos_ += size; + assert(pos_ <= size_); + } + + void FillTrailingZeros() { + assert(pos_ <= size_); + memset(buf_.get() + pos_, '0', size_ - pos_); + pos_ = size_; + } + + void Reset() { pos_ = 0; } + size_t Free() const { return size_ - pos_; } + size_t Capacity() const { return size_; } + size_t Used() const { return pos_; } + char* Data() const { return buf_.get(); } + + private: + std::unique_ptr buf_; + const size_t size_; + size_t pos_; +}; + +// +// CacheWriteBufferAllocator +// +// Buffer pool abstraction(not thread safe) +// +class CacheWriteBufferAllocator { + public: + explicit CacheWriteBufferAllocator(const uint32_t buffer_size, + const uint32_t buffer_count) + : buffer_size_(buffer_size) { + MutexLock _(&lock_); + buffer_size_ = buffer_size; + for (uint32_t i = 0; i < buffer_count; i++) { + auto* buf = new CacheWriteBuffer(buffer_size_); + assert(buf); + if (buf) { + bufs_.push_back(buf); + } + } + } + + virtual ~CacheWriteBufferAllocator() { + MutexLock _(&lock_); + assert(bufs_.size() * buffer_size_ == Capacity()); + for (auto* buf : bufs_) { + delete buf; + } + bufs_.clear(); + } + + CacheWriteBuffer* Allocate() { + MutexLock _(&lock_); + if (bufs_.empty()) { + return nullptr; + } + + assert(!bufs_.empty()); + CacheWriteBuffer* const buf = bufs_.front(); + bufs_.pop_front(); + + return buf; + } + + void Deallocate(CacheWriteBuffer* const buf) { + assert(buf); + MutexLock _(&lock_); + buf->Reset(); + bufs_.push_back(buf); + } + + size_t Capacity() const { return bufs_.size() * buffer_size_; } + size_t Free() const { return bufs_.size() * buffer_size_; } + size_t BufferSize() const { return buffer_size_; } + + private: + port::Mutex lock_; // Sync lock + size_t buffer_size_; // Size of each buffer + std::list bufs_; // Buffer stash +}; + +} // namespace rocksdb diff --git a/utilities/persistent_cache/block_cache_tier_metadata.cc b/utilities/persistent_cache/block_cache_tier_metadata.cc new file mode 100644 index 000000000..8f94378bc --- /dev/null +++ b/utilities/persistent_cache/block_cache_tier_metadata.cc @@ -0,0 +1,83 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#ifndef ROCKSDB_LITE + +#include "utilities/persistent_cache/block_cache_tier_metadata.h" + +#include + +namespace rocksdb { + +bool BlockCacheTierMetadata::Insert(BlockCacheFile* file) { + return cache_file_index_.Insert(file); +} + +BlockCacheFile* BlockCacheTierMetadata::Lookup(const uint32_t cache_id) { + BlockCacheFile* ret = nullptr; + BlockCacheFile lookup_key(cache_id); + bool ok = cache_file_index_.Find(&lookup_key, &ret); + if (ok) { + assert(ret->refs_); + return ret; + } + return nullptr; +} + +BlockCacheFile* BlockCacheTierMetadata::Evict() { + using std::placeholders::_1; + auto fn = std::bind(&BlockCacheTierMetadata::RemoveAllKeys, this, _1); + return cache_file_index_.Evict(fn); +} + +void BlockCacheTierMetadata::Clear() { + cache_file_index_.Clear([](BlockCacheFile* arg){ delete arg; }); + block_index_.Clear([](BlockInfo* arg){ delete arg; }); +} + +bool BlockCacheTierMetadata::Insert(BlockInfo* binfo) { + return block_index_.Insert(binfo); +} + +bool BlockCacheTierMetadata::Lookup(const Slice& key, LBA* lba) { + BlockInfo lookup_key(key); + BlockInfo* block; + port::RWMutex* rlock = nullptr; + if (!block_index_.Find(&lookup_key, &block, &rlock)) { + return false; + } + + ReadUnlock _(rlock); + assert(block->key_ == key.ToString()); + if (lba) { + *lba = block->lba_; + } + return true; +} + +BlockInfo* BlockCacheTierMetadata::Remove(const Slice& key) { + BlockInfo lookup_key(key); + BlockInfo* binfo = nullptr; + bool status __attribute__((__unused__)) = + block_index_.Erase(&lookup_key, &binfo); + (void)status; + assert(status); + return binfo; +} + +void BlockCacheTierMetadata::RemoveAllKeys(BlockCacheFile* f) { + for (BlockInfo* binfo : f->block_infos()) { + BlockInfo* tmp = nullptr; + bool status = block_index_.Erase(binfo, &tmp); + (void)status; + assert(status); + assert(tmp == binfo); + delete binfo; + } + f->block_infos().clear(); +} + +} // namespace rocksdb + +#endif diff --git a/utilities/persistent_cache/block_cache_tier_metadata.h b/utilities/persistent_cache/block_cache_tier_metadata.h new file mode 100644 index 000000000..846c3fc3c --- /dev/null +++ b/utilities/persistent_cache/block_cache_tier_metadata.h @@ -0,0 +1,124 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include + +#include "rocksdb/slice.h" + +#include "utilities/persistent_cache/block_cache_tier_file.h" +#include "utilities/persistent_cache/hash_table.h" +#include "utilities/persistent_cache/hash_table_evictable.h" +#include "utilities/persistent_cache/lrulist.h" + +namespace rocksdb { + +// +// Block Cache Tier Metadata +// +// The BlockCacheTierMetadata holds all the metadata associated with block +// cache. It +// fundamentally contains 2 indexes and an LRU. +// +// Block Cache Index +// +// This is a forward index that maps a given key to a LBA (Logical Block +// Address). LBA is a disk pointer that points to a record on the cache. +// +// LBA = { cache-id, offset, size } +// +// Cache File Index +// +// This is a forward index that maps a given cache-id to a cache file object. +// Typically you would lookup using LBA and use the object to read or write +struct BlockInfo { + explicit BlockInfo(const Slice& key, const LBA& lba = LBA()) + : key_(key.ToString()), lba_(lba) {} + + std::string key_; + LBA lba_; +}; + +class BlockCacheTierMetadata { + public: + explicit BlockCacheTierMetadata(const uint32_t blocks_capacity = 1024 * 1024, + const uint32_t cachefile_capacity = 10 * 1024) + : cache_file_index_(cachefile_capacity), block_index_(blocks_capacity) {} + + virtual ~BlockCacheTierMetadata() {} + + // Insert a given cache file + bool Insert(BlockCacheFile* file); + + // Lookup cache file based on cache_id + BlockCacheFile* Lookup(const uint32_t cache_id); + + // Insert block information to block index + bool Insert(BlockInfo* binfo); + + // Lookup block information from block index + bool Lookup(const Slice& key, LBA* lba); + + // Remove a given from the block index + BlockInfo* Remove(const Slice& key); + + // Find and evict a cache file using LRU policy + BlockCacheFile* Evict(); + + // Clear the metadata contents + virtual void Clear(); + + protected: + // Remove all block information from a given file + virtual void RemoveAllKeys(BlockCacheFile* file); + + private: + // Cache file index definition + // + // cache-id => BlockCacheFile + struct BlockCacheFileHash { + uint64_t operator()(const BlockCacheFile* rec) { + return std::hash()(rec->cacheid()); + } + }; + + struct BlockCacheFileEqual { + uint64_t operator()(const BlockCacheFile* lhs, const BlockCacheFile* rhs) { + return lhs->cacheid() == rhs->cacheid(); + } + }; + + typedef EvictableHashTable + CacheFileIndexType; + + // Block Lookup Index + // + // key => LBA + struct Hash { + size_t operator()(BlockInfo* node) const { + return std::hash()(node->key_); + } + }; + + struct Equal { + size_t operator()(BlockInfo* lhs, BlockInfo* rhs) const { + return lhs->key_ == rhs->key_; + } + }; + + typedef HashTable BlockIndexType; + + CacheFileIndexType cache_file_index_; + BlockIndexType block_index_; +}; + +} // namespace rocksdb + +#endif diff --git a/utilities/persistent_cache/persistent_cache_util.h b/utilities/persistent_cache/persistent_cache_util.h new file mode 100644 index 000000000..bc3807b06 --- /dev/null +++ b/utilities/persistent_cache/persistent_cache_util.h @@ -0,0 +1,67 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#include +#include + +#include "util/mutexlock.h" + +namespace rocksdb { + +// +// Simple synchronized queue implementation with the option of +// bounding the queue +// +// On overflow, the elements will be discarded +// +template +class BoundedQueue { + public: + explicit BoundedQueue( + const size_t max_size = std::numeric_limits::max()) + : cond_empty_(&lock_), max_size_(max_size) {} + + virtual ~BoundedQueue() {} + + void Push(T&& t) { + MutexLock _(&lock_); + if (max_size_ != std::numeric_limits::max() && + size_ + t.Size() >= max_size_) { + // overflow + return; + } + + size_ += t.Size(); + q_.push_back(std::move(t)); + cond_empty_.SignalAll(); + } + + T Pop() { + MutexLock _(&lock_); + while (q_.empty()) { + cond_empty_.Wait(); + } + + T t = std::move(q_.front()); + size_ -= t.Size(); + q_.pop_front(); + return std::move(t); + } + + size_t Size() const { + MutexLock _(&lock_); + return size_; + } + + private: + mutable port::Mutex lock_; + port::CondVar cond_empty_; + std::list q_; + size_t size_ = 0; + const size_t max_size_; +}; + +} // namespace rocksdb