From d9cfaa2b16682c18b7d1164fab4485fb4ae9f86b Mon Sep 17 00:00:00 2001 From: krad Date: Fri, 22 Apr 2016 16:18:33 -0700 Subject: [PATCH] Persistent Read Cache (6) Persistent cache tier implentation - File layout Summary: Persistent cache tier is the tier abstraction that can work for any block device based device mounted on a file system. The design/implementation can handle any generic block device. Any generic block support is achieved by generalizing the access patten as {io-size, q-depth, direct-io/buffered}. We have specifically tested and adapted the IO path for NVM and SSD. Persistent cache tier consists of there parts : 1) File layout Provides the implementation for handling IO path for reading and writing data (key/value pair). 2) Meta-data Provides the implementation for handling the index for persistent read cache. 3) Implementation It binds (1) and (2) and flushed out the PersistentCacheTier interface This patch provides implementation for (1)(2). Follow up patch will provide (3) and tests. Test Plan: Compile and run check Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D57117 --- src.mk | 2 + util/mutexlock.h | 14 + .../persistent_cache/block_cache_tier_file.cc | 575 ++++++++++++++++++ .../persistent_cache/block_cache_tier_file.h | 288 +++++++++ .../block_cache_tier_file_buffer.h | 117 ++++ .../block_cache_tier_metadata.cc | 83 +++ .../block_cache_tier_metadata.h | 124 ++++ .../persistent_cache/persistent_cache_util.h | 67 ++ 8 files changed, 1270 insertions(+) create mode 100644 utilities/persistent_cache/block_cache_tier_file.cc create mode 100644 utilities/persistent_cache/block_cache_tier_file.h create mode 100644 utilities/persistent_cache/block_cache_tier_file_buffer.h create mode 100644 utilities/persistent_cache/block_cache_tier_metadata.cc create mode 100644 utilities/persistent_cache/block_cache_tier_metadata.h create mode 100644 utilities/persistent_cache/persistent_cache_util.h diff --git a/src.mk b/src.mk index aad71d3d8..c98afdcc6 100644 --- a/src.mk +++ b/src.mk @@ -134,6 +134,8 @@ LIB_SOURCES = \ utilities/options/options_util.cc \ utilities/persistent_cache/persistent_cache_tier.cc \ utilities/persistent_cache/volatile_tier_impl.cc \ + utilities/persistent_cache/block_cache_tier_file.cc \ + utilities/persistent_cache/block_cache_tier_metadata.cc \ utilities/redis/redis_lists.cc \ utilities/simulator_cache/sim_cache.cc \ utilities/spatialdb/spatial_db.cc \ diff --git a/util/mutexlock.h b/util/mutexlock.h index a2d14aedf..94d7b596c 100644 --- a/util/mutexlock.h +++ b/util/mutexlock.h @@ -59,6 +59,20 @@ class ReadLock { void operator=(const ReadLock&); }; +// +// Automatically unlock a locked mutex when the object is destroyed +// +class ReadUnlock { + public: + explicit ReadUnlock(port::RWMutex *mu) : mu_(mu) { mu->AssertHeld(); } + ~ReadUnlock() { mu_->ReadUnlock(); } + + private: + port::RWMutex *const mu_; + // No copying allowed + ReadUnlock(const ReadUnlock &) = delete; + ReadUnlock &operator=(const ReadUnlock &) = delete; +}; // // Acquire a WriteLock on the specified RWMutex. diff --git a/utilities/persistent_cache/block_cache_tier_file.cc b/utilities/persistent_cache/block_cache_tier_file.cc new file mode 100644 index 000000000..970da8f18 --- /dev/null +++ b/utilities/persistent_cache/block_cache_tier_file.cc @@ -0,0 +1,575 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#ifndef ROCKSDB_LITE + +#include "utilities/persistent_cache/block_cache_tier_file.h" + +#include +#include +#include + +#include "util/crc32c.h" + +namespace rocksdb { + +// +// File creation factories +// +Status NewWritableCacheFile(Env* const env, const std::string& filepath, + std::unique_ptr* file, + const bool use_direct_writes = false) { + EnvOptions opt; + opt.use_direct_writes = use_direct_writes; + Status s = env->NewWritableFile(filepath, file, opt); + return s; +} + +Status NewRandomAccessCacheFile(Env* const env, const std::string& filepath, + std::unique_ptr* file, + const bool use_direct_reads = true) { + EnvOptions opt; + opt.use_direct_reads = use_direct_reads; + Status s = env->NewRandomAccessFile(filepath, file, opt); + return s; +} + +// +// BlockCacheFile +// +Status BlockCacheFile::Delete(size_t* size) { + Status status = env_->GetFileSize(Path(), size); + if (!status.ok()) { + return status; + } + return env_->DeleteFile(Path()); +} + +// +// CacheRecord +// +// Cache record represents the record on disk +// +// +--------+---------+----------+------------+---------------+-------------+ +// | magic | crc | key size | value size | key data | value data | +// +--------+---------+----------+------------+---------------+-------------+ +// <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size --> +// +struct CacheRecordHeader { + CacheRecordHeader() {} + CacheRecordHeader(const uint32_t magic, const uint32_t key_size, + const uint32_t val_size) + : magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {} + + uint32_t magic_; + uint32_t crc_; + uint32_t key_size_; + uint32_t val_size_; +}; + +struct CacheRecord { + CacheRecord() {} + CacheRecord(const Slice& key, const Slice& val) + : hdr_(MAGIC, static_cast(key.size()), + static_cast(val.size())), + key_(key), + val_(val) { + hdr_.crc_ = ComputeCRC(); + } + + uint32_t ComputeCRC() const; + bool Serialize(std::vector* bufs, size_t* woff); + bool Deserialize(const Slice& buf); + + static uint32_t CalcSize(const Slice& key, const Slice& val) { + return static_cast(sizeof(CacheRecordHeader) + key.size() + + val.size()); + } + + static const uint32_t MAGIC = 0xfefa; + + bool Append(std::vector* bufs, size_t* woff, + const char* data, const size_t size); + + CacheRecordHeader hdr_; + Slice key_; + Slice val_; +}; + +static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned"); + +uint32_t CacheRecord::ComputeCRC() const { + uint32_t crc = 0; + CacheRecordHeader tmp = hdr_; + tmp.crc_ = 0; + crc = crc32c::Extend(crc, reinterpret_cast(&tmp), sizeof(tmp)); + crc = crc32c::Extend(crc, reinterpret_cast(key_.data()), + key_.size()); + crc = crc32c::Extend(crc, reinterpret_cast(val_.data()), + val_.size()); + return crc; +} + +bool CacheRecord::Serialize(std::vector* bufs, + size_t* woff) { + assert(bufs->size()); + return Append(bufs, woff, reinterpret_cast(&hdr_), + sizeof(hdr_)) && + Append(bufs, woff, reinterpret_cast(key_.data()), + key_.size()) && + Append(bufs, woff, reinterpret_cast(val_.data()), + val_.size()); +} + +bool CacheRecord::Append(std::vector* bufs, size_t* woff, + const char* data, const size_t data_size) { + assert(*woff < bufs->size()); + + const char* p = data; + size_t size = data_size; + + while (size && *woff < bufs->size()) { + CacheWriteBuffer* buf = (*bufs)[*woff]; + const size_t free = buf->Free(); + if (size <= free) { + buf->Append(p, size); + size = 0; + } else { + buf->Append(p, free); + p += free; + size -= free; + assert(!buf->Free()); + assert(buf->Used() == buf->Capacity()); + } + + if (!buf->Free()) { + *woff += 1; + } + } + + assert(!size); + + return !size; +} + +bool CacheRecord::Deserialize(const Slice& data) { + assert(data.size() >= sizeof(CacheRecordHeader)); + if (data.size() < sizeof(CacheRecordHeader)) { + return false; + } + + memcpy(&hdr_, data.data(), sizeof(hdr_)); + + assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size()); + if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) { + return false; + } + + key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_); + val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_); + + if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) { + fprintf(stderr, "** magic %d ** \n", hdr_.magic_); + fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_); + fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_); + fprintf(stderr, "** key %s ** \n", key_.ToString().c_str()); + fprintf(stderr, "** val %s ** \n", val_.ToString().c_str()); + for (size_t i = 0; i < hdr_.val_size_; ++i) { + fprintf(stderr, "%d.", (uint8_t)val_.data()[i]); + } + fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC()); + } + + assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_); + return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_; +} + +// +// RandomAccessFile +// + +bool RandomAccessCacheFile::Open() { + WriteLock _(&rwlock_); + return OpenImpl(); +} + +bool RandomAccessCacheFile::OpenImpl() { + rwlock_.AssertHeld(); + + Debug(log_, "Opening cache file %s", Path().c_str()); + + Status status = NewRandomAccessCacheFile(env_, Path(), &file_); + if (!status.ok()) { + Error(log_, "Error opening random access file %s. %s", Path().c_str(), + status.ToString().c_str()); + return false; + } + + return true; +} + +bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val, + char* scratch) { + ReadLock _(&rwlock_); + + assert(lba.cache_id_ == cache_id_); + assert(file_); + + Slice result; + Status s = file_->Read(lba.off_, lba.size_, &result, scratch); + if (!s.ok()) { + Error(log_, "Error reading from file %s. %s", Path().c_str(), + s.ToString().c_str()); + return false; + } + + assert(result.data() == scratch); + + return ParseRec(lba, key, val, scratch); +} + +bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val, + char* scratch) { + Slice data(scratch, lba.size_); + + CacheRecord rec; + if (!rec.Deserialize(data)) { + assert(!"Error deserializing data"); + Error(log_, "Error de-serializing record from file %s off %d", + Path().c_str(), lba.off_); + return false; + } + + *key = Slice(rec.key_); + *val = Slice(rec.val_); + + return true; +} + +// +// WriteableCacheFile +// + +WriteableCacheFile::~WriteableCacheFile() { + WriteLock _(&rwlock_); + if (!eof_) { + // This file never flushed. We give priority to shutdown since this is a + // cache + // TODO(krad): Figure a way to flush the pending data + assert(file_); + + assert(refs_ == 1); + --refs_; + } + ClearBuffers(); +} + +bool WriteableCacheFile::Create() { + WriteLock _(&rwlock_); + + Debug(log_, "Creating new cache %s (max size is %d B)", Path().c_str(), + max_size_); + + Status s = env_->FileExists(Path()); + if (s.ok()) { + Warn(log_, "File %s already exists. %s", Path().c_str(), + s.ToString().c_str()); + } + + s = NewWritableCacheFile(env_, Path(), &file_); + if (!s.ok()) { + Warn(log_, "Unable to create file %s. %s", Path().c_str(), + s.ToString().c_str()); + return false; + } + + assert(!refs_); + ++refs_; + + return true; +} + +bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) { + WriteLock _(&rwlock_); + + if (eof_) { + // We can't append since the file is full + return false; + } + + // estimate the space required to store the (key, val) + uint32_t rec_size = CacheRecord::CalcSize(key, val); + + if (!ExpandBuffer(rec_size)) { + // unable to expand the buffer + Debug(log_, "Error expanding buffers. size=%d", rec_size); + return false; + } + + lba->cache_id_ = cache_id_; + lba->off_ = disk_woff_; + lba->size_ = rec_size; + + CacheRecord rec(key, val); + if (!rec.Serialize(&bufs_, &buf_woff_)) { + // unexpected error: unable to serialize the data + assert(!"Error serializing record"); + return false; + } + + disk_woff_ += rec_size; + eof_ = disk_woff_ >= max_size_; + + // dispatch buffer for flush + DispatchBuffer(); + + return true; +} + +bool WriteableCacheFile::ExpandBuffer(const size_t size) { + rwlock_.AssertHeld(); + assert(!eof_); + + // determine if there is enough space + size_t free = 0; // compute the free space left in buffer + for (size_t i = buf_woff_; i < bufs_.size(); ++i) { + free += bufs_[i]->Free(); + if (size <= free) { + // we have enough space in the buffer + return true; + } + } + + // expand the buffer until there is enough space to write `size` bytes + assert(free < size); + while (free < size) { + CacheWriteBuffer* const buf = alloc_->Allocate(); + if (!buf) { + Debug(log_, "Unable to allocate buffers"); + return false; + } + + size_ += buf->Free(); + free += buf->Free(); + bufs_.push_back(buf); + } + + assert(free >= size); + return true; +} + +void WriteableCacheFile::DispatchBuffer() { + rwlock_.AssertHeld(); + + assert(bufs_.size()); + assert(buf_doff_ <= buf_woff_); + assert(buf_woff_ <= bufs_.size()); + + if (pending_ios_) { + return; + } + + if (!eof_ && buf_doff_ == buf_woff_) { + // dispatch buffer is pointing to write buffer and we haven't hit eof + return; + } + + assert(eof_ || buf_doff_ < buf_woff_); + assert(buf_doff_ < bufs_.size()); + assert(file_); + + auto* buf = bufs_[buf_doff_]; + const uint64_t file_off = buf_doff_ * alloc_->BufferSize(); + + assert(!buf->Free() || + (eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size())); + // we have reached end of file, and there is space in the last buffer + // pad it with zero for direct IO + buf->FillTrailingZeros(); + + assert(buf->Used() % FILE_ALIGNMENT_SIZE == 0); + + writer_->Write(file_.get(), buf, file_off, + std::bind(&WriteableCacheFile::BufferWriteDone, this)); + pending_ios_++; + buf_doff_++; +} + +void WriteableCacheFile::BufferWriteDone() { + WriteLock _(&rwlock_); + + assert(bufs_.size()); + + pending_ios_--; + + if (buf_doff_ < bufs_.size()) { + DispatchBuffer(); + } + + if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) { + // end-of-file reached, move to read mode + CloseAndOpenForReading(); + } +} + +void WriteableCacheFile::CloseAndOpenForReading() { + // Our env abstraction do not allow reading from a file opened for appending + // We need close the file and re-open it for reading + Close(); + RandomAccessCacheFile::OpenImpl(); +} + +bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block, + char* scratch) { + rwlock_.AssertHeld(); + + if (!ReadBuffer(lba, scratch)) { + Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_, + lba.off_); + return false; + } + + return ParseRec(lba, key, block, scratch); +} + +bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) { + rwlock_.AssertHeld(); + + assert(lba.off_ < disk_woff_); + + // we read from the buffers like reading from a flat file. The list of buffers + // are treated as contiguous stream of data + + char* tmp = data; + size_t pending_nbytes = lba.size_; + // start buffer + size_t start_idx = lba.off_ / alloc_->BufferSize(); + // offset into the start buffer + size_t start_off = lba.off_ % alloc_->BufferSize(); + + assert(start_idx <= buf_woff_); + + for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) { + assert(i <= buf_woff_); + auto* buf = bufs_[i]; + assert(i == buf_woff_ || !buf->Free()); + // bytes to write to the buffer + size_t nbytes = pending_nbytes > (buf->Used() - start_off) + ? (buf->Used() - start_off) + : pending_nbytes; + memcpy(tmp, buf->Data() + start_off, nbytes); + + // left over to be written + pending_nbytes -= nbytes; + start_off = 0; + tmp += nbytes; + } + + assert(!pending_nbytes); + if (pending_nbytes) { + return false; + } + + assert(tmp == data + lba.size_); + return true; +} + +void WriteableCacheFile::Close() { + rwlock_.AssertHeld(); + + assert(size_ >= max_size_); + assert(disk_woff_ >= max_size_); + assert(buf_doff_ == bufs_.size()); + assert(bufs_.size() - buf_woff_ <= 1); + assert(!pending_ios_); + + Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_, + disk_woff_); + + ClearBuffers(); + file_.reset(); + + assert(refs_); + --refs_; +} + +void WriteableCacheFile::ClearBuffers() { + for (size_t i = 0; i < bufs_.size(); ++i) { + alloc_->Deallocate(bufs_[i]); + } + + bufs_.clear(); +} + +// +// ThreadedFileWriter implementation +// +ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache, + const size_t qdepth, const size_t io_size) + : Writer(cache), io_size_(io_size) { + for (size_t i = 0; i < qdepth; ++i) { + std::thread th(&ThreadedWriter::ThreadMain, this); + threads_.push_back(std::move(th)); + } +} + +void ThreadedWriter::Stop() { + // notify all threads to exit + for (size_t i = 0; i < threads_.size(); ++i) { + q_.Push(IO(/*signal=*/true)); + } + + // wait for all threads to exit + for (auto& th : threads_) { + th.join(); + } +} + +void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf, + const uint64_t file_off, + const std::function callback) { + q_.Push(IO(file, buf, file_off, callback)); +} + +void ThreadedWriter::ThreadMain() { + while (true) { + // Fetch the IO to process + IO io(q_.Pop()); + if (io.signal_) { + // that's secret signal to exit + break; + } + + // Reserve space for writing the buffer + while (!cache_->Reserve(io.buf_->Used())) { + // We can fail to reserve space if every file in the system + // is being currently accessed + /* sleep override */ sleep(1); + } + + DispatchIO(io); + + io.callback_(); + } +} + +void ThreadedWriter::DispatchIO(const IO& io) { + size_t written = 0; + while (written < io.buf_->Used()) { + Slice data(io.buf_->Data() + written, io_size_); + Status s = io.file_->Append(data); + assert(s.ok()); + if (!s.ok()) { + // That is definite IO error to device. There is not much we can + // do but ignore the failure. This can lead to corruption of data on + // disk, but the cache will skip while reading + fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str()); + } + written += io_size_; + } +} + +} // namespace rocksdb + +#endif diff --git a/utilities/persistent_cache/block_cache_tier_file.h b/utilities/persistent_cache/block_cache_tier_file.h new file mode 100644 index 000000000..8dbd60a1e --- /dev/null +++ b/utilities/persistent_cache/block_cache_tier_file.h @@ -0,0 +1,288 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include + +#include "rocksdb/comparator.h" +#include "rocksdb/env.h" + +#include "utilities/persistent_cache/block_cache_tier_file_buffer.h" +#include "utilities/persistent_cache/lrulist.h" +#include "utilities/persistent_cache/persistent_cache_tier.h" +#include "utilities/persistent_cache/persistent_cache_util.h" + +#include "port/port_posix.h" +#include "util/crc32c.h" +#include "util/mutexlock.h" + +// The io code path of persistent cache uses pipelined architecture +// +// client -> In Queue <-- BlockCacheTier --> Out Queue <-- Writer <--> Kernel +// +// This would enable the system to scale for GB/s of throughput which is +// expected with modern devies like NVM. +// +// The file level operations are encapsulated in the following abstractions +// +// BlockCacheFile +// ^ +// | +// | +// RandomAccessCacheFile (For reading) +// ^ +// | +// | +// WriteableCacheFile (For writing) +// +// Write IO code path : +// +namespace rocksdb { + +class WriteableCacheFile; +struct BlockInfo; + +// Represents a logical record on device +// +// (L)ogical (B)lock (Address = { cache-file-id, offset, size } +struct LogicalBlockAddress { + LogicalBlockAddress() {} + explicit LogicalBlockAddress(const uint32_t cache_id, const uint32_t off, + const uint16_t size) + : cache_id_(cache_id), off_(off), size_(size) {} + + uint32_t cache_id_ = 0; + uint32_t off_ = 0; + uint32_t size_ = 0; +}; + +typedef LogicalBlockAddress LBA; + +// class Writer +// +// Writer is the abstraction used for writing data to file. The component can be +// multithreaded. It is the last step of write pipeline +class Writer { + public: + explicit Writer(PersistentCacheTier* const cache) : cache_(cache) {} + virtual ~Writer() {} + + // write buffer to file at the given offset + virtual void Write(WritableFile* const file, CacheWriteBuffer* buf, + const uint64_t file_off, + const std::function callback) = 0; + // stop the writer + virtual void Stop() = 0; + + PersistentCacheTier* const cache_; +}; + +// class BlockCacheFile +// +// Generic interface to support building file specialized for read/writing +class BlockCacheFile : public LRUElement { + public: + explicit BlockCacheFile(const uint32_t cache_id) + : LRUElement(), cache_id_(cache_id) {} + + explicit BlockCacheFile(Env* const env, const std::string& dir, + const uint32_t cache_id) + : LRUElement(), + env_(env), + dir_(dir), + cache_id_(cache_id) {} + + virtual ~BlockCacheFile() {} + + // append key/value to file and return LBA locator to user + virtual bool Append(const Slice& key, const Slice& val, LBA* const lba) { + assert(!"not implemented"); + return false; + } + + // read from the record locator (LBA) and return key, value and status + virtual bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) { + assert(!"not implemented"); + return false; + } + + // get file path + std::string Path() const { return dir_ + "/" + std::to_string(cache_id_); } + // get cache ID + uint32_t cacheid() const { return cache_id_; } + // Add block information to file data + // Block information is the list of index reference for this file + virtual void Add(BlockInfo* binfo) { + WriteLock _(&rwlock_); + block_infos_.push_back(binfo); + } + // get block information + std::list& block_infos() { return block_infos_; } + // delete file and return the size of the file + virtual Status Delete(size_t* size); + + protected: + port::RWMutex rwlock_; // synchronization mutex + Env* const env_ = nullptr; // Env for IO + const std::string dir_; // Directory name + const uint32_t cache_id_; // Cache id for the file + std::list block_infos_; // List of index entries mapping to the + // file content +}; + +// class RandomAccessFile +// +// Thread safe implementation for reading random data from file +class RandomAccessCacheFile : public BlockCacheFile { + public: + explicit RandomAccessCacheFile(Env* const env, const std::string& dir, + const uint32_t cache_id, + const shared_ptr& log) + : BlockCacheFile(env, dir, cache_id), log_(log) {} + + virtual ~RandomAccessCacheFile() {} + + // open file for reading + bool Open(); + // read data from the disk + bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override; + + private: + std::unique_ptr file_; + + protected: + bool OpenImpl(); + bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch); + + std::shared_ptr log_; // log file +}; + +// class WriteableCacheFile +// +// All writes to the files are cached in buffers. The buffers are flushed to +// disk as they get filled up. When file size reaches a certain size, a new file +// will be created provided there is free space +class WriteableCacheFile : public RandomAccessCacheFile { + public: + explicit WriteableCacheFile(Env* const env, CacheWriteBufferAllocator* alloc, + Writer* writer, const std::string& dir, + const uint32_t cache_id, const uint32_t max_size, + const std::shared_ptr& log) + : RandomAccessCacheFile(env, dir, cache_id, log), + alloc_(alloc), + writer_(writer), + max_size_(max_size) {} + + virtual ~WriteableCacheFile(); + + // create file on disk + bool Create(); + + // read data from logical file + bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override { + ReadLock _(&rwlock_); + const bool closed = eof_ && bufs_.empty(); + if (closed) { + // the file is closed, read from disk + return RandomAccessCacheFile::Read(lba, key, block, scratch); + } + // file is still being written, read from buffers + return ReadBuffer(lba, key, block, scratch); + } + + // append data to end of file + bool Append(const Slice&, const Slice&, LBA*) override; + // End-of-file + bool Eof() const { return eof_; } + + private: + friend class ThreadedWriter; + + static const size_t FILE_ALIGNMENT_SIZE = 4 * 1024; // align file size + + bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch); + bool ReadBuffer(const LBA& lba, char* data); + bool ExpandBuffer(const size_t size); + void DispatchBuffer(); + void BufferWriteDone(); + void CloseAndOpenForReading(); + void ClearBuffers(); + void Close(); + + // File layout in memory + // + // +------+------+------+------+------+------+ + // | b0 | b1 | b2 | b3 | b4 | b5 | + // +------+------+------+------+------+------+ + // ^ ^ + // | | + // buf_doff_ buf_woff_ + // (next buffer to (next buffer to fill) + // flush to disk) + // + // The buffers are flushed to disk serially for a given file + + CacheWriteBufferAllocator* const alloc_ = nullptr; // Buffer provider + Writer* const writer_ = nullptr; // File writer thread + std::unique_ptr file_; // RocksDB Env file abstraction + std::vector bufs_; // Written buffers + uint32_t size_ = 0; // Size of the file + const uint32_t max_size_; // Max size of the file + bool eof_ = false; // End of file + uint32_t disk_woff_ = 0; // Offset to write on disk + size_t buf_woff_ = 0; // off into bufs_ to write + size_t buf_doff_ = 0; // off into bufs_ to dispatch + size_t pending_ios_ = 0; // Number of ios to disk in-progress +}; + +// +// Abstraction to do writing to device. It is part of pipelined architecture. +// +class ThreadedWriter : public Writer { + public: + // Representation of IO to device + struct IO { + explicit IO(const bool signal) : signal_(signal) {} + explicit IO(WritableFile* const file, CacheWriteBuffer* const buf, + const uint64_t file_off, const std::function callback) + : file_(file), buf_(buf), file_off_(file_off), callback_(callback) {} + + IO(const IO&) = default; + IO& operator=(const IO&) = default; + size_t Size() const { return sizeof(IO); } + + WritableFile* file_ = nullptr; // File to write to + CacheWriteBuffer* const buf_ = nullptr; // buffer to write + uint64_t file_off_ = 0; // file offset + bool signal_ = false; // signal to exit thread loop + std::function callback_; // Callback on completion + }; + + explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth, + const size_t io_size); + virtual ~ThreadedWriter() {} + + void Stop() override; + void Write(WritableFile* const file, CacheWriteBuffer* buf, + const uint64_t file_off, + const std::function callback) override; + + private: + void ThreadMain(); + void DispatchIO(const IO& io); + + const size_t io_size_ = 0; + BoundedQueue q_; + std::vector threads_; +}; + +} // namespace rocksdb + +#endif diff --git a/utilities/persistent_cache/block_cache_tier_file_buffer.h b/utilities/persistent_cache/block_cache_tier_file_buffer.h new file mode 100644 index 000000000..4ab7fcc9b --- /dev/null +++ b/utilities/persistent_cache/block_cache_tier_file_buffer.h @@ -0,0 +1,117 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#include +#include +#include + +#include "include/rocksdb/comparator.h" +#include "util/arena.h" +#include "util/mutexlock.h" + +namespace rocksdb { + +// +// CacheWriteBuffer +// +// Buffer abstraction that can be manipulated via append +// (not thread safe) +class CacheWriteBuffer { + public: + explicit CacheWriteBuffer(const size_t size) : size_(size), pos_(0) { + buf_.reset(new char[size_]); + assert(!pos_); + assert(size_); + } + + virtual ~CacheWriteBuffer() {} + + void Append(const char* buf, const size_t size) { + assert(pos_ + size <= size_); + memcpy(buf_.get() + pos_, buf, size); + pos_ += size; + assert(pos_ <= size_); + } + + void FillTrailingZeros() { + assert(pos_ <= size_); + memset(buf_.get() + pos_, '0', size_ - pos_); + pos_ = size_; + } + + void Reset() { pos_ = 0; } + size_t Free() const { return size_ - pos_; } + size_t Capacity() const { return size_; } + size_t Used() const { return pos_; } + char* Data() const { return buf_.get(); } + + private: + std::unique_ptr buf_; + const size_t size_; + size_t pos_; +}; + +// +// CacheWriteBufferAllocator +// +// Buffer pool abstraction(not thread safe) +// +class CacheWriteBufferAllocator { + public: + explicit CacheWriteBufferAllocator(const uint32_t buffer_size, + const uint32_t buffer_count) + : buffer_size_(buffer_size) { + MutexLock _(&lock_); + buffer_size_ = buffer_size; + for (uint32_t i = 0; i < buffer_count; i++) { + auto* buf = new CacheWriteBuffer(buffer_size_); + assert(buf); + if (buf) { + bufs_.push_back(buf); + } + } + } + + virtual ~CacheWriteBufferAllocator() { + MutexLock _(&lock_); + assert(bufs_.size() * buffer_size_ == Capacity()); + for (auto* buf : bufs_) { + delete buf; + } + bufs_.clear(); + } + + CacheWriteBuffer* Allocate() { + MutexLock _(&lock_); + if (bufs_.empty()) { + return nullptr; + } + + assert(!bufs_.empty()); + CacheWriteBuffer* const buf = bufs_.front(); + bufs_.pop_front(); + + return buf; + } + + void Deallocate(CacheWriteBuffer* const buf) { + assert(buf); + MutexLock _(&lock_); + buf->Reset(); + bufs_.push_back(buf); + } + + size_t Capacity() const { return bufs_.size() * buffer_size_; } + size_t Free() const { return bufs_.size() * buffer_size_; } + size_t BufferSize() const { return buffer_size_; } + + private: + port::Mutex lock_; // Sync lock + size_t buffer_size_; // Size of each buffer + std::list bufs_; // Buffer stash +}; + +} // namespace rocksdb diff --git a/utilities/persistent_cache/block_cache_tier_metadata.cc b/utilities/persistent_cache/block_cache_tier_metadata.cc new file mode 100644 index 000000000..8f94378bc --- /dev/null +++ b/utilities/persistent_cache/block_cache_tier_metadata.cc @@ -0,0 +1,83 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#ifndef ROCKSDB_LITE + +#include "utilities/persistent_cache/block_cache_tier_metadata.h" + +#include + +namespace rocksdb { + +bool BlockCacheTierMetadata::Insert(BlockCacheFile* file) { + return cache_file_index_.Insert(file); +} + +BlockCacheFile* BlockCacheTierMetadata::Lookup(const uint32_t cache_id) { + BlockCacheFile* ret = nullptr; + BlockCacheFile lookup_key(cache_id); + bool ok = cache_file_index_.Find(&lookup_key, &ret); + if (ok) { + assert(ret->refs_); + return ret; + } + return nullptr; +} + +BlockCacheFile* BlockCacheTierMetadata::Evict() { + using std::placeholders::_1; + auto fn = std::bind(&BlockCacheTierMetadata::RemoveAllKeys, this, _1); + return cache_file_index_.Evict(fn); +} + +void BlockCacheTierMetadata::Clear() { + cache_file_index_.Clear([](BlockCacheFile* arg){ delete arg; }); + block_index_.Clear([](BlockInfo* arg){ delete arg; }); +} + +bool BlockCacheTierMetadata::Insert(BlockInfo* binfo) { + return block_index_.Insert(binfo); +} + +bool BlockCacheTierMetadata::Lookup(const Slice& key, LBA* lba) { + BlockInfo lookup_key(key); + BlockInfo* block; + port::RWMutex* rlock = nullptr; + if (!block_index_.Find(&lookup_key, &block, &rlock)) { + return false; + } + + ReadUnlock _(rlock); + assert(block->key_ == key.ToString()); + if (lba) { + *lba = block->lba_; + } + return true; +} + +BlockInfo* BlockCacheTierMetadata::Remove(const Slice& key) { + BlockInfo lookup_key(key); + BlockInfo* binfo = nullptr; + bool status __attribute__((__unused__)) = + block_index_.Erase(&lookup_key, &binfo); + (void)status; + assert(status); + return binfo; +} + +void BlockCacheTierMetadata::RemoveAllKeys(BlockCacheFile* f) { + for (BlockInfo* binfo : f->block_infos()) { + BlockInfo* tmp = nullptr; + bool status = block_index_.Erase(binfo, &tmp); + (void)status; + assert(status); + assert(tmp == binfo); + delete binfo; + } + f->block_infos().clear(); +} + +} // namespace rocksdb + +#endif diff --git a/utilities/persistent_cache/block_cache_tier_metadata.h b/utilities/persistent_cache/block_cache_tier_metadata.h new file mode 100644 index 000000000..846c3fc3c --- /dev/null +++ b/utilities/persistent_cache/block_cache_tier_metadata.h @@ -0,0 +1,124 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include + +#include "rocksdb/slice.h" + +#include "utilities/persistent_cache/block_cache_tier_file.h" +#include "utilities/persistent_cache/hash_table.h" +#include "utilities/persistent_cache/hash_table_evictable.h" +#include "utilities/persistent_cache/lrulist.h" + +namespace rocksdb { + +// +// Block Cache Tier Metadata +// +// The BlockCacheTierMetadata holds all the metadata associated with block +// cache. It +// fundamentally contains 2 indexes and an LRU. +// +// Block Cache Index +// +// This is a forward index that maps a given key to a LBA (Logical Block +// Address). LBA is a disk pointer that points to a record on the cache. +// +// LBA = { cache-id, offset, size } +// +// Cache File Index +// +// This is a forward index that maps a given cache-id to a cache file object. +// Typically you would lookup using LBA and use the object to read or write +struct BlockInfo { + explicit BlockInfo(const Slice& key, const LBA& lba = LBA()) + : key_(key.ToString()), lba_(lba) {} + + std::string key_; + LBA lba_; +}; + +class BlockCacheTierMetadata { + public: + explicit BlockCacheTierMetadata(const uint32_t blocks_capacity = 1024 * 1024, + const uint32_t cachefile_capacity = 10 * 1024) + : cache_file_index_(cachefile_capacity), block_index_(blocks_capacity) {} + + virtual ~BlockCacheTierMetadata() {} + + // Insert a given cache file + bool Insert(BlockCacheFile* file); + + // Lookup cache file based on cache_id + BlockCacheFile* Lookup(const uint32_t cache_id); + + // Insert block information to block index + bool Insert(BlockInfo* binfo); + + // Lookup block information from block index + bool Lookup(const Slice& key, LBA* lba); + + // Remove a given from the block index + BlockInfo* Remove(const Slice& key); + + // Find and evict a cache file using LRU policy + BlockCacheFile* Evict(); + + // Clear the metadata contents + virtual void Clear(); + + protected: + // Remove all block information from a given file + virtual void RemoveAllKeys(BlockCacheFile* file); + + private: + // Cache file index definition + // + // cache-id => BlockCacheFile + struct BlockCacheFileHash { + uint64_t operator()(const BlockCacheFile* rec) { + return std::hash()(rec->cacheid()); + } + }; + + struct BlockCacheFileEqual { + uint64_t operator()(const BlockCacheFile* lhs, const BlockCacheFile* rhs) { + return lhs->cacheid() == rhs->cacheid(); + } + }; + + typedef EvictableHashTable + CacheFileIndexType; + + // Block Lookup Index + // + // key => LBA + struct Hash { + size_t operator()(BlockInfo* node) const { + return std::hash()(node->key_); + } + }; + + struct Equal { + size_t operator()(BlockInfo* lhs, BlockInfo* rhs) const { + return lhs->key_ == rhs->key_; + } + }; + + typedef HashTable BlockIndexType; + + CacheFileIndexType cache_file_index_; + BlockIndexType block_index_; +}; + +} // namespace rocksdb + +#endif diff --git a/utilities/persistent_cache/persistent_cache_util.h b/utilities/persistent_cache/persistent_cache_util.h new file mode 100644 index 000000000..bc3807b06 --- /dev/null +++ b/utilities/persistent_cache/persistent_cache_util.h @@ -0,0 +1,67 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#include +#include + +#include "util/mutexlock.h" + +namespace rocksdb { + +// +// Simple synchronized queue implementation with the option of +// bounding the queue +// +// On overflow, the elements will be discarded +// +template +class BoundedQueue { + public: + explicit BoundedQueue( + const size_t max_size = std::numeric_limits::max()) + : cond_empty_(&lock_), max_size_(max_size) {} + + virtual ~BoundedQueue() {} + + void Push(T&& t) { + MutexLock _(&lock_); + if (max_size_ != std::numeric_limits::max() && + size_ + t.Size() >= max_size_) { + // overflow + return; + } + + size_ += t.Size(); + q_.push_back(std::move(t)); + cond_empty_.SignalAll(); + } + + T Pop() { + MutexLock _(&lock_); + while (q_.empty()) { + cond_empty_.Wait(); + } + + T t = std::move(q_.front()); + size_ -= t.Size(); + q_.pop_front(); + return std::move(t); + } + + size_t Size() const { + MutexLock _(&lock_); + return size_; + } + + private: + mutable port::Mutex lock_; + port::CondVar cond_empty_; + std::list q_; + size_t size_ = 0; + const size_t max_size_; +}; + +} // namespace rocksdb