diff --git a/src.mk b/src.mk index 3b2cbf71a..87dd06a83 100644 --- a/src.mk +++ b/src.mk @@ -136,6 +136,7 @@ LIB_SOURCES = \ utilities/persistent_cache/volatile_tier_impl.cc \ utilities/persistent_cache/block_cache_tier_file.cc \ utilities/persistent_cache/block_cache_tier_metadata.cc \ + utilities/persistent_cache/block_cache_tier.cc \ utilities/redis/redis_lists.cc \ utilities/simulator_cache/sim_cache.cc \ utilities/spatialdb/spatial_db.cc \ diff --git a/util/io_posix.cc b/util/io_posix.cc index 9696769b5..913026a4d 100644 --- a/util/io_posix.cc +++ b/util/io_posix.cc @@ -145,8 +145,7 @@ Status ReadUnaligned(int fd, Slice* data, const uint64_t offset, Status DirectIORead(int fd, Slice* result, size_t off, size_t n, char* scratch) { - if (IsSectorAligned(off) && IsSectorAligned(n) && - IsPageAligned(result->data())) { + if (IsSectorAligned(off) && IsSectorAligned(n) && IsPageAligned(scratch)) { return ReadAligned(fd, result, off, n, scratch); } return ReadUnaligned(fd, result, off, n, scratch); diff --git a/utilities/persistent_cache/block_cache_tier.cc b/utilities/persistent_cache/block_cache_tier.cc new file mode 100644 index 000000000..271758f51 --- /dev/null +++ b/utilities/persistent_cache/block_cache_tier.cc @@ -0,0 +1,358 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#ifndef ROCKSDB_LITE + +#include "utilities/persistent_cache/block_cache_tier.h" + +#include +#include +#include + +#include "util/stop_watch.h" +#include "utilities/persistent_cache/block_cache_tier_file.h" + +namespace rocksdb { + +// +// BlockCacheImpl +// +Status BlockCacheTier::Open() { + Status status; + + WriteLock _(&lock_); + + assert(!size_); + + // Check the validity of the options + status = opt_.ValidateSettings(); + assert(status.ok()); + if (!status.ok()) { + Error(opt_.log, "Invalid block cache options"); + return status; + } + + // Create base directory or cleanup existing directory + status = opt_.env->CreateDirIfMissing(opt_.path); + if (!status.ok()) { + Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(), + status.ToString().c_str()); + return status; + } + + // Create base/ directory + status = opt_.env->CreateDir(GetCachePath()); + if (!status.ok()) { + // directory already exisits, clean it up + status = CleanupCacheFolder(GetCachePath()); + assert(status.ok()); + if (!status.ok()) { + Error(opt_.log, "Error creating directory %s. %s", opt_.path.c_str(), + status.ToString().c_str()); + return status; + } + } + + assert(!cache_file_); + NewCacheFile(); + assert(cache_file_); + + if (opt_.pipeline_writes_) { + assert(!insert_th_.joinable()); + insert_th_ = std::thread(&BlockCacheTier::InsertMain, this); + } + + return Status::OK(); +} + +Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) { + std::vector files; + Status status = opt_.env->GetChildren(folder, &files); + if (!status.ok()) { + Error(opt_.log, "Error getting files for %s. %s", folder.c_str(), + status.ToString().c_str()); + return status; + } + + // cleanup files with the patter :digi:.rc + for (auto file : files) { + try { + const std::regex cache_file_regex("(0-9)+\\.rc$"); + if (std::regex_match(file, cache_file_regex)) { + // cache file + Info(opt_.log, "Removing file %s.", file.c_str()); + status = opt_.env->DeleteFile(folder + "/" + file); + if (!status.ok()) { + Error(opt_.log, "Error deleting file %s. %s", file.c_str(), + status.ToString().c_str()); + return Status::IOError("Error deleting file " + file); + } + } else { + Info(opt_.log, "Skipping file %s.", file.c_str()); + } + } catch (const std::regex_error& e) { + // Since std library is evolving, you can potentially get an exception for + // certain older compiler version. It is safer to exit cleanly. + return Status::IOError(e.what()); + } + } + return Status::OK(); +} + +Status BlockCacheTier::Close() { + // stop the insert thread + if (opt_.pipeline_writes_ && insert_th_.joinable()) { + InsertOp op(/*quit=*/true); + insert_ops_.Push(std::move(op)); + insert_th_.join(); + } + + // stop the writer before + writer_.Stop(); + + // clear all metadata + WriteLock _(&lock_); + metadata_.Clear(); + return Status::OK(); +} + +std::string BlockCacheTier::PrintStats() { + std::ostringstream os; + os << "persistentcache.blockcachetier.bytes_piplined: " + << stats_.bytes_pipelined_.ToString() << std::endl + << "persistentcache.blockcachetier.bytes_written: " + << stats_.bytes_written_.ToString() << std::endl + << "persistentcache.blockcachetier.bytes_read: " + << stats_.bytes_read_.ToString() << std::endl + << "persistentcache.blockcachetier.insert_dropped" + << stats_.insert_dropped_ << std::endl + << "persistentcache.blockcachetier.cache_hits: " << stats_.cache_hits_ + << std::endl + << "persistentcache.blockcachetier.cache_misses: " << stats_.cache_misses_ + << std::endl + << "persistentcache.blockcachetier.cache_errors: " << stats_.cache_errors_ + << std::endl + << "persistentcache.blockcachetier.cache_hits_pct: " + << stats_.CacheHitPct() << std::endl + << "persistentcache.blockcachetier.cache_misses_pct: " + << stats_.CacheMissPct() << std::endl + << "persistentcache.blockcachetier.read_hit_latency: " + << stats_.read_hit_latency_.ToString() << std::endl + << "persistentcache.blockcachetier.read_miss_latency: " + << stats_.read_miss_latency_.ToString() << std::endl + << "persistenetcache.blockcachetier.write_latency: " + << stats_.write_latency_.ToString() << std::endl + << PersistentCacheTier::PrintStats(); + return os.str(); +} + +Status BlockCacheTier::Insert(const Slice& key, const char* data, + const size_t size) { + // update stats + stats_.bytes_pipelined_.Add(size); + + if (opt_.pipeline_writes_) { + // off load the write to the write thread + insert_ops_.Push( + InsertOp(key.ToString(), std::move(std::string(data, size)))); + return Status::OK(); + } + + assert(!opt_.pipeline_writes_); + return InsertImpl(key, Slice(data, size)); +} + +void BlockCacheTier::InsertMain() { + while (true) { + InsertOp op(insert_ops_.Pop()); + + if (op.signal_) { + // that is a secret signal to exit + break; + } + + size_t retry = 0; + Status s; + while ((s = InsertImpl(Slice(op.key_), Slice(op.data_))).IsTryAgain()) { + if (retry > kMaxRetry) { + break; + } + + // this can happen when the buffers are full, we wait till some buffers + // are free. Why don't we wait inside the code. This is because we want + // to support both pipelined and non-pipelined mode + buffer_allocator_.WaitUntilUsable(); + retry++; + } + + if (!s.ok()) { + stats_.insert_dropped_++; + } + } +} + +Status BlockCacheTier::InsertImpl(const Slice& key, const Slice& data) { + // pre-condition + assert(key.size()); + assert(data.size()); + assert(cache_file_); + + StopWatchNano timer(opt_.env); + + WriteLock _(&lock_); + + LBA lba; + if (metadata_.Lookup(key, &lba)) { + // the key already exisits, this is duplicate insert + return Status::OK(); + } + + while (!cache_file_->Append(key, data, &lba)) { + if (!cache_file_->Eof()) { + Debug(opt_.log, "Error inserting to cache file %d", + cache_file_->cacheid()); + stats_.write_latency_.Add(timer.ElapsedNanos() / 1000); + return Status::TryAgain(); + } + + assert(cache_file_->Eof()); + NewCacheFile(); + } + + // Insert into lookup index + BlockInfo* info = metadata_.Insert(key, lba); + assert(info); + if (!info) { + return Status::IOError("Unexpected error inserting to index"); + } + + // insert to cache file reverse mapping + cache_file_->Add(info); + + // update stats + stats_.bytes_written_.Add(data.size()); + stats_.write_latency_.Add(timer.ElapsedNanos() / 1000); + return Status::OK(); +} + +Status BlockCacheTier::Lookup(const Slice& key, unique_ptr* val, + size_t* size) { + StopWatchNano timer(opt_.env); + + LBA lba; + bool status; + status = metadata_.Lookup(key, &lba); + if (!status) { + stats_.cache_misses_++; + stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000); + return Status::NotFound("blockcache: key not found"); + } + + BlockCacheFile* const file = metadata_.Lookup(lba.cache_id_); + if (!file) { + // this can happen because the block index and cache file index are + // different, and the cache file might be removed between the two lookups + stats_.cache_misses_++; + stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000); + return Status::NotFound("blockcache: cache file not found"); + } + + assert(file->refs_); + + unique_ptr scratch(new char[lba.size_]); + Slice blk_key; + Slice blk_val; + + status = file->Read(lba, &blk_key, &blk_val, scratch.get()); + --file->refs_; + assert(status); + if (!status) { + stats_.cache_misses_++; + stats_.cache_errors_++; + stats_.read_miss_latency_.Add(timer.ElapsedNanos() / 1000); + return Status::NotFound("blockcache: error reading data"); + } + + assert(blk_key == key); + + val->reset(new char[blk_val.size()]); + memcpy(val->get(), blk_val.data(), blk_val.size()); + *size = blk_val.size(); + + stats_.bytes_read_.Add(*size); + stats_.cache_hits_++; + stats_.read_hit_latency_.Add(timer.ElapsedNanos() / 1000); + + return Status::OK(); +} + +bool BlockCacheTier::Erase(const Slice& key) { + WriteLock _(&lock_); + BlockInfo* info = metadata_.Remove(key); + assert(info); + delete info; + return true; +} + +void BlockCacheTier::NewCacheFile() { + lock_.AssertHeld(); + + Info(opt_.log, "Creating cache file %d", writer_cache_id_); + + writer_cache_id_++; + + cache_file_ = new WriteableCacheFile(opt_.env, &buffer_allocator_, &writer_, + GetCachePath(), writer_cache_id_, + opt_.cache_file_size, opt_.log); + bool status; + status = + cache_file_->Create(opt_.enable_direct_writes, opt_.enable_direct_reads); + assert(status); + + // insert to cache files tree + status = metadata_.Insert(cache_file_); + (void)status; + assert(status); +} + +bool BlockCacheTier::Reserve(const size_t size) { + WriteLock _(&lock_); + assert(size_ <= opt_.cache_size); + + if (size + size_ <= opt_.cache_size) { + // there is enough space to write + size_ += size; + return true; + } + + assert(size + size_ >= opt_.cache_size); + // there is not enough space to fit the requested data + // we can clear some space by evicting cold data + + const double retain_fac = (100 - kEvictPct) / static_cast(100); + while (size + size_ > opt_.cache_size * retain_fac) { + unique_ptr f(metadata_.Evict()); + if (!f) { + // nothing is evictable + return false; + } + assert(!f->refs_); + size_t file_size; + if (!f->Delete(&file_size).ok()) { + // unable to delete file + return false; + } + + assert(file_size <= size_); + size_ -= file_size; + } + + size_ += size; + assert(size_ <= opt_.cache_size * 0.9); + return true; +} + +} // namespace rocksdb + +#endif // ifndef ROCKSDB_LITE diff --git a/utilities/persistent_cache/block_cache_tier.h b/utilities/persistent_cache/block_cache_tier.h new file mode 100644 index 000000000..ad7f9132f --- /dev/null +++ b/utilities/persistent_cache/block_cache_tier.h @@ -0,0 +1,145 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rocksdb/cache.h" +#include "rocksdb/comparator.h" +#include "rocksdb/persistent_cache.h" + +#include "utilities/persistent_cache/block_cache_tier_file.h" +#include "utilities/persistent_cache/block_cache_tier_metadata.h" +#include "utilities/persistent_cache/persistent_cache_util.h" + +#include "db/skiplist.h" +#include "port/port_posix.h" +#include "util/arena.h" +#include "util/coding.h" +#include "util/crc32c.h" +#include "util/histogram.h" +#include "util/mutexlock.h" + +namespace rocksdb { + +// +// Block cache tier implementation +// +class BlockCacheTier : public PersistentCacheTier { + public: + explicit BlockCacheTier(const PersistentCacheConfig& opt) + : opt_(opt), + insert_ops_(opt_.max_write_pipeline_backlog_size), + buffer_allocator_(opt.write_buffer_size, opt.write_buffer_count()), + writer_(this, opt_.writer_qdepth, opt_.writer_dispatch_size) { + Info(opt_.log, "Initializing allocator. size=%d B count=%d", + opt_.write_buffer_size, opt_.write_buffer_count()); + } + + virtual ~BlockCacheTier() { + // By contract, the user should have called stop before destroying the + // object + assert(!insert_th_.joinable()); + } + + Status Insert(const Slice& key, const char* data, const size_t size) override; + Status Lookup(const Slice& key, std::unique_ptr* data, + size_t* size) override; + Status Open() override; + Status Close() override; + bool Erase(const Slice& key) override; + bool Reserve(const size_t size) override; + + bool IsCompressed() override { return opt_.is_compressed; } + + std::string PrintStats() override; + + void TEST_Flush() override { + while (insert_ops_.Size()) { + /* sleep override */ sleep(1); + } + } + + private: + // Percentage of cache to be evicted when the cache is full + static const size_t kEvictPct = 10; + // Max attempts to insert key, value to cache in pipelined mode + static const size_t kMaxRetry = 3; + + // Pipelined operation + struct InsertOp { + explicit InsertOp(const bool signal) : signal_(signal) {} + explicit InsertOp(std::string&& key, const std::string& data) + : key_(std::move(key)), data_(data) {} + ~InsertOp() {} + + InsertOp() = delete; + InsertOp(InsertOp&& rhs) = default; + InsertOp& operator=(InsertOp&& rhs) = default; + + // used for estimating size by bounded queue + size_t Size() { return data_.size() + key_.size(); } + + std::string key_; + std::string data_; + const bool signal_ = false; // signal to request processing thread to exit + }; + + // entry point for insert thread + void InsertMain(); + // insert implementation + Status InsertImpl(const Slice& key, const Slice& data); + // Create a new cache file + void NewCacheFile(); + // Get cache directory path + std::string GetCachePath() const { return opt_.path + "/cache"; } + // Cleanup folder + Status CleanupCacheFolder(const std::string& folder); + + // Statistics + struct Stats { + HistogramImpl bytes_pipelined_; + HistogramImpl bytes_written_; + HistogramImpl bytes_read_; + HistogramImpl read_hit_latency_; + HistogramImpl read_miss_latency_; + HistogramImpl write_latency_; + uint64_t cache_hits_ = 0; + uint64_t cache_misses_ = 0; + uint64_t cache_errors_ = 0; + uint64_t insert_dropped_ = 0; + + double CacheHitPct() const { + const auto lookups = cache_hits_ + cache_misses_; + return lookups ? 100 * cache_hits_ / static_cast(lookups) : 0.0; + } + + double CacheMissPct() const { + const auto lookups = cache_hits_ + cache_misses_; + return lookups ? 100 * cache_misses_ / static_cast(lookups) : 0.0; + } + }; + + port::RWMutex lock_; // Synchronization + const PersistentCacheConfig opt_; // BlockCache options + BoundedQueue insert_ops_; // Ops waiting for insert + std::thread insert_th_; // Insert thread + uint32_t writer_cache_id_ = 0; // Current cache file identifier + WriteableCacheFile* cache_file_ = nullptr; // Current cache file reference + CacheWriteBufferAllocator buffer_allocator_; // Buffer provider + ThreadedWriter writer_; // Writer threads + BlockCacheTierMetadata metadata_; // Cache meta data manager + std::atomic size_{0}; // Size of the cache + Stats stats_; // Statistics +}; + +} // namespace rocksdb diff --git a/utilities/persistent_cache/block_cache_tier_file.cc b/utilities/persistent_cache/block_cache_tier_file.cc index ef7725258..7e9c1e632 100644 --- a/utilities/persistent_cache/block_cache_tier_file.cc +++ b/utilities/persistent_cache/block_cache_tier_file.cc @@ -189,12 +189,12 @@ bool CacheRecord::Deserialize(const Slice& data) { // RandomAccessFile // -bool RandomAccessCacheFile::Open() { +bool RandomAccessCacheFile::Open(const bool enable_direct_reads) { WriteLock _(&rwlock_); - return OpenImpl(); + return OpenImpl(enable_direct_reads); } -bool RandomAccessCacheFile::OpenImpl() { +bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) { rwlock_.AssertHeld(); Debug(log_, "Opening cache file %s", Path().c_str()); @@ -265,9 +265,12 @@ WriteableCacheFile::~WriteableCacheFile() { ClearBuffers(); } -bool WriteableCacheFile::Create() { +bool WriteableCacheFile::Create(const bool enable_direct_writes, + const bool enable_direct_reads) { WriteLock _(&rwlock_); + enable_direct_reads_ = enable_direct_reads; + Debug(log_, "Creating new cache %s (max size is %d B)", Path().c_str(), max_size_); @@ -388,7 +391,7 @@ void WriteableCacheFile::DispatchBuffer() { // pad it with zero for direct IO buf->FillTrailingZeros(); - assert(buf->Used() % FILE_ALIGNMENT_SIZE == 0); + assert(buf->Used() % kFileAlignmentSize == 0); writer_->Write(file_.get(), buf, file_off, std::bind(&WriteableCacheFile::BufferWriteDone, this)); @@ -417,7 +420,7 @@ void WriteableCacheFile::CloseAndOpenForReading() { // Our env abstraction do not allow reading from a file opened for appending // We need close the file and re-open it for reading Close(); - RandomAccessCacheFile::OpenImpl(); + RandomAccessCacheFile::OpenImpl(enable_direct_reads_); } bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block, @@ -523,7 +526,9 @@ void ThreadedWriter::Stop() { // wait for all threads to exit for (auto& th : threads_) { th.join(); + assert(!th.joinable()); } + threads_.clear(); } void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf, diff --git a/utilities/persistent_cache/block_cache_tier_file.h b/utilities/persistent_cache/block_cache_tier_file.h index da8a4b5cd..155e2886f 100644 --- a/utilities/persistent_cache/block_cache_tier_file.h +++ b/utilities/persistent_cache/block_cache_tier_file.h @@ -114,7 +114,9 @@ class BlockCacheFile : public LRUElement { } // get file path - std::string Path() const { return dir_ + "/" + std::to_string(cache_id_); } + std::string Path() const { + return dir_ + "/" + std::to_string(cache_id_) + ".rc"; + } // get cache ID uint32_t cacheid() const { return cache_id_; } // Add block information to file data @@ -150,7 +152,7 @@ class RandomAccessCacheFile : public BlockCacheFile { virtual ~RandomAccessCacheFile() {} // open file for reading - bool Open(); + bool Open(const bool enable_direct_reads); // read data from the disk bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override; @@ -158,7 +160,7 @@ class RandomAccessCacheFile : public BlockCacheFile { std::unique_ptr file_; protected: - bool OpenImpl(); + bool OpenImpl(const bool enable_direct_reads); bool ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch); std::shared_ptr log_; // log file @@ -183,7 +185,7 @@ class WriteableCacheFile : public RandomAccessCacheFile { virtual ~WriteableCacheFile(); // create file on disk - bool Create(); + bool Create(const bool enable_direct_writes, const bool enable_direct_reads); // read data from logical file bool Read(const LBA& lba, Slice* key, Slice* block, char* scratch) override { @@ -205,7 +207,7 @@ class WriteableCacheFile : public RandomAccessCacheFile { private: friend class ThreadedWriter; - static const size_t FILE_ALIGNMENT_SIZE = 4 * 1024; // align file size + static const size_t kFileAlignmentSize = 4 * 1024; // align file size bool ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch); bool ReadBuffer(const LBA& lba, char* data); @@ -240,6 +242,8 @@ class WriteableCacheFile : public RandomAccessCacheFile { size_t buf_woff_ = 0; // off into bufs_ to write size_t buf_doff_ = 0; // off into bufs_ to dispatch size_t pending_ios_ = 0; // Number of ios to disk in-progress + bool enable_direct_reads_ = false; // Should we enable direct reads + // when reading from disk }; // @@ -267,7 +271,7 @@ class ThreadedWriter : public Writer { explicit ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth, const size_t io_size); - virtual ~ThreadedWriter() {} + virtual ~ThreadedWriter() { assert(threads_.empty()); } void Stop() override; void Write(WritableFile* const file, CacheWriteBuffer* buf, diff --git a/utilities/persistent_cache/block_cache_tier_file_buffer.h b/utilities/persistent_cache/block_cache_tier_file_buffer.h index 4ab7fcc9b..f4878c0ac 100644 --- a/utilities/persistent_cache/block_cache_tier_file_buffer.h +++ b/utilities/persistent_cache/block_cache_tier_file_buffer.h @@ -61,9 +61,9 @@ class CacheWriteBuffer { // class CacheWriteBufferAllocator { public: - explicit CacheWriteBufferAllocator(const uint32_t buffer_size, - const uint32_t buffer_count) - : buffer_size_(buffer_size) { + explicit CacheWriteBufferAllocator(const size_t buffer_size, + const size_t buffer_count) + : cond_empty_(&lock_), buffer_size_(buffer_size) { MutexLock _(&lock_); buffer_size_ = buffer_size; for (uint32_t i = 0; i < buffer_count; i++) { @@ -71,6 +71,7 @@ class CacheWriteBufferAllocator { assert(buf); if (buf) { bufs_.push_back(buf); + cond_empty_.Signal(); } } } @@ -93,7 +94,6 @@ class CacheWriteBufferAllocator { assert(!bufs_.empty()); CacheWriteBuffer* const buf = bufs_.front(); bufs_.pop_front(); - return buf; } @@ -102,6 +102,15 @@ class CacheWriteBufferAllocator { MutexLock _(&lock_); buf->Reset(); bufs_.push_back(buf); + cond_empty_.Signal(); + } + + void WaitUntilUsable() { + // We are asked to wait till we have buffers available + MutexLock _(&lock_); + while (bufs_.empty()) { + cond_empty_.Wait(); + } } size_t Capacity() const { return bufs_.size() * buffer_size_; } @@ -110,6 +119,7 @@ class CacheWriteBufferAllocator { private: port::Mutex lock_; // Sync lock + port::CondVar cond_empty_; // Condition var for empty buffers size_t buffer_size_; // Size of each buffer std::list bufs_; // Buffer stash }; diff --git a/utilities/persistent_cache/block_cache_tier_metadata.cc b/utilities/persistent_cache/block_cache_tier_metadata.cc index 8f94378bc..bf3839e4f 100644 --- a/utilities/persistent_cache/block_cache_tier_metadata.cc +++ b/utilities/persistent_cache/block_cache_tier_metadata.cc @@ -36,8 +36,12 @@ void BlockCacheTierMetadata::Clear() { block_index_.Clear([](BlockInfo* arg){ delete arg; }); } -bool BlockCacheTierMetadata::Insert(BlockInfo* binfo) { - return block_index_.Insert(binfo); +BlockInfo* BlockCacheTierMetadata::Insert(const Slice& key, const LBA& lba) { + std::unique_ptr binfo(new BlockInfo(key, lba)); + if (!block_index_.Insert(binfo.get())) { + return nullptr; + } + return binfo.release(); } bool BlockCacheTierMetadata::Lookup(const Slice& key, LBA* lba) { @@ -59,10 +63,8 @@ bool BlockCacheTierMetadata::Lookup(const Slice& key, LBA* lba) { BlockInfo* BlockCacheTierMetadata::Remove(const Slice& key) { BlockInfo lookup_key(key); BlockInfo* binfo = nullptr; - bool status __attribute__((__unused__)) = - block_index_.Erase(&lookup_key, &binfo); - (void)status; - assert(status); + bool ok __attribute__((__unused__)) = block_index_.Erase(&lookup_key, &binfo); + assert(ok); return binfo; } diff --git a/utilities/persistent_cache/block_cache_tier_metadata.h b/utilities/persistent_cache/block_cache_tier_metadata.h index 846c3fc3c..bab07831c 100644 --- a/utilities/persistent_cache/block_cache_tier_metadata.h +++ b/utilities/persistent_cache/block_cache_tier_metadata.h @@ -60,7 +60,8 @@ class BlockCacheTierMetadata { BlockCacheFile* Lookup(const uint32_t cache_id); // Insert block information to block index - bool Insert(BlockInfo* binfo); + BlockInfo* Insert(const Slice& key, const LBA& lba); + // bool Insert(BlockInfo* binfo); // Lookup block information from block index bool Lookup(const Slice& key, LBA* lba); diff --git a/utilities/persistent_cache/persistent_cache_test.cc b/utilities/persistent_cache/persistent_cache_test.cc index b2dbe3bb8..28c1242e4 100644 --- a/utilities/persistent_cache/persistent_cache_test.cc +++ b/utilities/persistent_cache/persistent_cache_test.cc @@ -14,36 +14,356 @@ #include #include +#include "utilities/persistent_cache/block_cache_tier.h" + namespace rocksdb { -#if !(defined(__clang__) && defined(OS_LINUX)) +static const double kStressFactor = .125; + +static void OnOpenForRead(void* arg) { + int* val = static_cast(arg); + *val &= ~O_DIRECT; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "NewRandomAccessFile:O_DIRECT", + std::bind(OnOpenForRead, std::placeholders::_1)); +} + +static void OnOpenForWrite(void* arg) { + int* val = static_cast(arg); + *val &= ~O_DIRECT; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "NewWritableFile:O_DIRECT", + std::bind(OnOpenForWrite, std::placeholders::_1)); +} + +// +// Simple logger that prints message on stdout +// +class ConsoleLogger : public Logger { + public: + using Logger::Logv; + ConsoleLogger() : Logger(InfoLogLevel::ERROR_LEVEL) {} + + void Logv(const char* format, va_list ap) override { + MutexLock _(&lock_); + vprintf(format, ap); + printf("\n"); + } + + port::Mutex lock_; +}; + +// construct a tiered RAM+Block cache +std::unique_ptr NewTieredCache( + const size_t mem_size, const PersistentCacheConfig& opt) { + std::unique_ptr tcache(new PersistentTieredCache()); + // create primary tier + assert(mem_size); + auto pcache = std::shared_ptr(new VolatileCacheTier( + /*is_compressed*/ true, mem_size)); + tcache->AddTier(pcache); + // create secondary tier + auto scache = std::shared_ptr(new BlockCacheTier(opt)); + tcache->AddTier(scache); + + Status s = tcache->Open(); + assert(s.ok()); + return tcache; +} + +// create block cache +std::unique_ptr NewBlockCache( + Env* env, const std::string& path, + const uint64_t max_size = std::numeric_limits::max(), + const bool enable_direct_writes = false) { + const uint32_t max_file_size = 12 * 1024 * 1024 * kStressFactor; + auto log = std::make_shared(); + PersistentCacheConfig opt(env, path, max_size, log); + opt.cache_file_size = max_file_size; + opt.max_write_pipeline_backlog_size = std::numeric_limits::max(); + opt.enable_direct_writes = enable_direct_writes; + std::unique_ptr scache(new BlockCacheTier(opt)); + Status s = scache->Open(); + assert(s.ok()); + return scache; +} + +// create a new cache tier +std::unique_ptr NewTieredCache( + Env* env, const std::string& path, const uint64_t max_volatile_cache_size, + const uint64_t max_block_cache_size = + std::numeric_limits::max()) { + const uint32_t max_file_size = 12 * 1024 * 1024 * kStressFactor; + auto log = std::make_shared(); + auto opt = PersistentCacheConfig(env, path, max_block_cache_size, log); + opt.cache_file_size = max_file_size; + opt.max_write_pipeline_backlog_size = std::numeric_limits::max(); + // create tier out of the two caches + auto cache = NewTieredCache(max_volatile_cache_size, opt); + return cache; +} + +PersistentCacheTierTest::PersistentCacheTierTest() + : path_(test::TmpDir(Env::Default()) + "/cache_test") { + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + rocksdb::SyncPoint::GetInstance()->SetCallBack("NewRandomAccessFile:O_DIRECT", + OnOpenForRead); + rocksdb::SyncPoint::GetInstance()->SetCallBack("NewWritableFile:O_DIRECT", + OnOpenForWrite); +} + // Volatile cache tests TEST_F(PersistentCacheTierTest, VolatileCacheInsert) { for (auto nthreads : {1, 5}) { - for (auto max_keys : {10 * 1024, 1 * 1024 * 1024}) { + for (auto max_keys : + {10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) { cache_ = std::make_shared(); RunInsertTest(nthreads, max_keys); } } } -#endif // !(defined(__clang__) && defined(OS_LINUX)) TEST_F(PersistentCacheTierTest, VolatileCacheInsertWithEviction) { for (auto nthreads : {1, 5}) { - for (auto max_keys : {1 * 1024 * 1024}) { - cache_ = std::make_shared(/*compressed=*/true, - /*size=*/1 * 1024 * 1024); + for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) { + cache_ = std::make_shared( + /*compressed=*/true, /*size=*/1 * 1024 * 1024 * kStressFactor); RunInsertTestWithEviction(nthreads, max_keys); } } } -#if !(defined(__clang__) && defined(OS_LINUX)) +// Block cache tests +TEST_F(PersistentCacheTierTest, BlockCacheInsert) { + for (auto direct_writes : {true, false}) { + for (auto nthreads : {1, 5}) { + for (auto max_keys : + {10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) { + cache_ = NewBlockCache(Env::Default(), path_, + /*size=*/std::numeric_limits::max(), + direct_writes); + RunInsertTest(nthreads, max_keys); + } + } + } +} + +TEST_F(PersistentCacheTierTest, BlockCacheInsertWithEviction) { + for (auto nthreads : {1, 5}) { + for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) { + cache_ = NewBlockCache(Env::Default(), path_, + /*max_size=*/200 * 1024 * 1024 * kStressFactor); + RunInsertTestWithEviction(nthreads, max_keys); + } + } +} + +// Tiered cache tests +TEST_F(PersistentCacheTierTest, TieredCacheInsert) { + for (auto nthreads : {1, 5}) { + for (auto max_keys : + {10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) { + cache_ = NewTieredCache(Env::Default(), path_, + /*memory_size=*/1 * 1024 * 1024 * kStressFactor); + RunInsertTest(nthreads, max_keys); + } + } +} + +TEST_F(PersistentCacheTierTest, TieredCacheInsertWithEviction) { + for (auto nthreads : {1, 5}) { + for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) { + cache_ = NewTieredCache( + Env::Default(), path_, + /*memory_size=*/1 * 1024 * 1024 * kStressFactor, + /*block_cache_size*/ 200 * 1024 * 1024 * kStressFactor); + RunInsertTestWithEviction(nthreads, max_keys); + } + } +} + +std::shared_ptr MakeVolatileCache( + const std::string& /*dbname*/) { + return std::make_shared(); +} + +std::shared_ptr MakeBlockCache(const std::string& dbname) { + return NewBlockCache(Env::Default(), dbname); +} + +std::shared_ptr MakeTieredCache( + const std::string& dbname) { + const auto memory_size = 1 * 1024 * 1024 * kStressFactor; + return NewTieredCache(Env::Default(), dbname, memory_size); +} + +static void UniqueIdCallback(void* arg) { + int* result = reinterpret_cast(arg); + if (*result == -1) { + *result = 0; + } + + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback); +} + +PersistentCacheDBTest::PersistentCacheDBTest() : DBTestBase("/cache_test") { + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback); + rocksdb::SyncPoint::GetInstance()->SetCallBack("NewRandomAccessFile:O_DIRECT", + OnOpenForRead); +} + +// test template +void PersistentCacheDBTest::RunTest( + const std::function(bool)>& + new_pcache) { + if (!Snappy_Supported()) { + return; + } + + // number of insertion interations + int num_iter = 100 * 1024 * kStressFactor; + + for (int iter = 0; iter < 5; iter++) { + Options options; + options.write_buffer_size = + 64 * 1024 * kStressFactor; // small write buffer + options.statistics = rocksdb::CreateDBStatistics(); + options = CurrentOptions(options); + + // setup page cache + std::shared_ptr pcache; + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + + const uint64_t uint64_max = std::numeric_limits::max(); + + switch (iter) { + case 0: + // page cache, block cache, no-compressed cache + pcache = new_pcache(/*is_compressed=*/true); + table_options.persistent_cache = pcache; + table_options.block_cache = NewLRUCache(uint64_max); + table_options.block_cache_compressed = nullptr; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + break; + case 1: + // page cache, block cache, compressed cache + pcache = new_pcache(/*is_compressed=*/true); + table_options.persistent_cache = pcache; + table_options.block_cache = NewLRUCache(uint64_max); + table_options.block_cache_compressed = NewLRUCache(uint64_max); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + break; + case 2: + // page cache, block cache, compressed cache + KNoCompression + // both block cache and compressed cache, but DB is not compressed + // also, make block cache sizes bigger, to trigger block cache hits + pcache = new_pcache(/*is_compressed=*/true); + table_options.persistent_cache = pcache; + table_options.block_cache = NewLRUCache(uint64_max); + table_options.block_cache_compressed = NewLRUCache(uint64_max); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.compression = kNoCompression; + break; + case 3: + // page cache, no block cache, no compressed cache + pcache = new_pcache(/*is_compressed=*/false); + table_options.persistent_cache = pcache; + table_options.block_cache = nullptr; + table_options.block_cache_compressed = nullptr; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + break; + case 4: + // page cache, no block cache, no compressed cache + // Page cache caches compressed blocks + pcache = new_pcache(/*is_compressed=*/true); + table_options.persistent_cache = pcache; + table_options.block_cache = nullptr; + table_options.block_cache_compressed = nullptr; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + break; + default: + ASSERT_TRUE(false); + } + + std::vector values; + // insert data + Insert(options, table_options, num_iter, &values); + // flush all data in cache to device + pcache->TEST_Flush(); + // verify data + Verify(num_iter, values); + + auto block_miss = TestGetTickerCount(options, BLOCK_CACHE_MISS); + auto compressed_block_hit = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT); + auto compressed_block_miss = + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS); + auto page_hit = TestGetTickerCount(options, PERSISTENT_CACHE_HIT); + auto page_miss = TestGetTickerCount(options, PERSISTENT_CACHE_MISS); + + // check that we triggered the appropriate code paths in the cache + switch (iter) { + case 0: + // page cache, block cache, no-compressed cache + ASSERT_GT(page_miss, 0); + ASSERT_GT(page_hit, 0); + ASSERT_GT(block_miss, 0); + ASSERT_EQ(compressed_block_miss, 0); + ASSERT_EQ(compressed_block_hit, 0); + break; + case 1: + // page cache, block cache, compressed cache + ASSERT_GT(page_miss, 0); + ASSERT_GT(block_miss, 0); + ASSERT_GT(compressed_block_miss, 0); + break; + case 2: + // page cache, block cache, compressed cache + KNoCompression + ASSERT_GT(page_miss, 0); + ASSERT_GT(page_hit, 0); + ASSERT_GT(block_miss, 0); + ASSERT_GT(compressed_block_miss, 0); + // remember kNoCompression + ASSERT_EQ(compressed_block_hit, 0); + break; + case 3: + case 4: + // page cache, no block cache, no compressed cache + ASSERT_GT(page_miss, 0); + ASSERT_GT(page_hit, 0); + ASSERT_EQ(compressed_block_hit, 0); + ASSERT_EQ(compressed_block_miss, 0); + break; + default: + ASSERT_TRUE(false); + } + + options.create_if_missing = true; + DestroyAndReopen(options); + + pcache->Close(); + } +} + // test table with volatile page cache TEST_F(PersistentCacheDBTest, VolatileCacheTest) { - RunTest(std::bind(&PersistentCacheDBTest::MakeVolatileCache, this)); + RunTest(std::bind(&MakeVolatileCache, dbname_)); +} + +// test table with block page cache +TEST_F(PersistentCacheDBTest, BlockCacheTest) { + RunTest(std::bind(&MakeBlockCache, dbname_)); +} + +// test table with tiered page cache +TEST_F(PersistentCacheDBTest, TieredCacheTest) { + RunTest(std::bind(&MakeTieredCache, dbname_)); } -#endif // !(defined(__clang__) && defined(OS_LINUX)) } // namespace rocksdb diff --git a/utilities/persistent_cache/persistent_cache_test.h b/utilities/persistent_cache/persistent_cache_test.h index cf97796d6..b6999d9c7 100644 --- a/utilities/persistent_cache/persistent_cache_test.h +++ b/utilities/persistent_cache/persistent_cache_test.h @@ -32,9 +32,7 @@ namespace rocksdb { // class PersistentCacheTierTest : public testing::Test { public: - explicit PersistentCacheTierTest() - : path_(test::TmpDir(Env::Default()) + "/cache_test") {} - + PersistentCacheTierTest(); virtual ~PersistentCacheTierTest() { if (cache_) { Status s = cache_->Close(); @@ -46,7 +44,7 @@ class PersistentCacheTierTest : public testing::Test { // Flush cache void Flush() { if (cache_) { - cache_->Flush(); + cache_->TEST_Flush(); } } @@ -208,27 +206,7 @@ class PersistentCacheTierTest : public testing::Test { // class PersistentCacheDBTest : public DBTestBase { public: - PersistentCacheDBTest() : DBTestBase("/cache_test") { - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "GetUniqueIdFromFile:FS_IOC_GETVERSION", - PersistentCacheDBTest::UniqueIdCallback); - } - - static void UniqueIdCallback(void* arg) { - int* result = reinterpret_cast(arg); - if (*result == -1) { - *result = 0; - } - - rocksdb::SyncPoint::GetInstance()->ClearTrace(); - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback); - } - - std::shared_ptr MakeVolatileCache() { - return std::make_shared(); - } + PersistentCacheDBTest(); static uint64_t TestGetTickerCount(const Options& options, Tickers ticker_type) { @@ -281,135 +259,7 @@ class PersistentCacheDBTest : public DBTestBase { // test template void RunTest(const std::function(bool)>& - new_pcache) { - if (!Snappy_Supported()) { - return; - } - - // number of insertion interations - int num_iter = 100 * 1024; - - for (int iter = 0; iter < 5; iter++) { - Options options; - options.write_buffer_size = 64 * 1024; // small write buffer - options.statistics = rocksdb::CreateDBStatistics(); - options = CurrentOptions(options); - - // setup page cache - std::shared_ptr pcache; - BlockBasedTableOptions table_options; - table_options.cache_index_and_filter_blocks = true; - - const uint64_t uint64_max = std::numeric_limits::max(); - - switch (iter) { - case 0: - // page cache, block cache, no-compressed cache - pcache = new_pcache(/*is_compressed=*/true); - table_options.persistent_cache = pcache; - table_options.block_cache = NewLRUCache(uint64_max); - table_options.block_cache_compressed = nullptr; - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - break; - case 1: - // page cache, block cache, compressed cache - pcache = new_pcache(/*is_compressed=*/true); - table_options.persistent_cache = pcache; - table_options.block_cache = NewLRUCache(uint64_max); - table_options.block_cache_compressed = NewLRUCache(uint64_max); - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - break; - case 2: - // page cache, block cache, compressed cache + KNoCompression - // both block cache and compressed cache, but DB is not compressed - // also, make block cache sizes bigger, to trigger block cache hits - pcache = new_pcache(/*is_compressed=*/true); - table_options.persistent_cache = pcache; - table_options.block_cache = NewLRUCache(uint64_max); - table_options.block_cache_compressed = NewLRUCache(uint64_max); - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - options.compression = kNoCompression; - break; - case 3: - // page cache, no block cache, no compressed cache - pcache = new_pcache(/*is_compressed=*/false); - table_options.persistent_cache = pcache; - table_options.block_cache = nullptr; - table_options.block_cache_compressed = nullptr; - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - break; - case 4: - // page cache, no block cache, no compressed cache - // Page cache caches compressed blocks - pcache = new_pcache(/*is_compressed=*/true); - table_options.persistent_cache = pcache; - table_options.block_cache = nullptr; - table_options.block_cache_compressed = nullptr; - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - break; - default: - ASSERT_TRUE(false); - } - - std::vector values; - // insert data - Insert(options, table_options, num_iter, &values); - // flush all data in cache to device - pcache->Flush(); - // verify data - Verify(num_iter, values); - - auto block_miss = TestGetTickerCount(options, BLOCK_CACHE_MISS); - auto compressed_block_hit = - TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT); - auto compressed_block_miss = - TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS); - auto page_hit = TestGetTickerCount(options, PERSISTENT_CACHE_HIT); - auto page_miss = TestGetTickerCount(options, PERSISTENT_CACHE_MISS); - - // check that we triggered the appropriate code paths in the cache - switch (iter) { - case 0: - // page cache, block cache, no-compressed cache - ASSERT_GT(page_miss, 0); - ASSERT_GT(page_hit, 0); - ASSERT_GT(block_miss, 0); - ASSERT_EQ(compressed_block_miss, 0); - ASSERT_EQ(compressed_block_hit, 0); - break; - case 1: - // page cache, block cache, compressed cache - ASSERT_GT(page_miss, 0); - ASSERT_GT(block_miss, 0); - ASSERT_GT(compressed_block_miss, 0); - break; - case 2: - // page cache, block cache, compressed cache + KNoCompression - ASSERT_GT(page_miss, 0); - ASSERT_GT(page_hit, 0); - ASSERT_GT(block_miss, 0); - ASSERT_GT(compressed_block_miss, 0); - // remember kNoCompression - ASSERT_EQ(compressed_block_hit, 0); - break; - case 3: - case 4: - // page cache, no block cache, no compressed cache - ASSERT_GT(page_miss, 0); - ASSERT_GT(page_hit, 0); - ASSERT_EQ(compressed_block_hit, 0); - ASSERT_EQ(compressed_block_miss, 0); - break; - default: - ASSERT_TRUE(false); - } - - options.create_if_missing = true; - DestroyAndReopen(options); - - pcache->Close(); - } - } + new_pcache); }; } // namespace rocksdb diff --git a/utilities/persistent_cache/persistent_cache_tier.cc b/utilities/persistent_cache/persistent_cache_tier.cc index 217bcbea7..889d9e9fc 100644 --- a/utilities/persistent_cache/persistent_cache_tier.cc +++ b/utilities/persistent_cache/persistent_cache_tier.cc @@ -28,12 +28,6 @@ Status PersistentCacheTier::Close() { return Status::OK(); } -void PersistentCacheTier::Flush() { - if (next_tier_) { - next_tier_->Flush(); - } -} - bool PersistentCacheTier::Reserve(const size_t size) { // default implementation is a pass through return true; @@ -52,6 +46,13 @@ std::string PersistentCacheTier::PrintStats() { return std::string(); } +std::vector PersistentCacheTier::Stats() { + if (next_tier_) { + return next_tier_->Stats(); + } + return std::vector{}; +} + // // PersistentTieredCache implementation // @@ -71,14 +72,14 @@ Status PersistentTieredCache::Close() { return status; } -void PersistentTieredCache::Flush() { +bool PersistentTieredCache::Erase(const Slice& key) { assert(!tiers_.empty()); - tiers_.front()->Flush(); + return tiers_.front()->Erase(key); } -bool PersistentTieredCache::Erase(const Slice& key) { +std::vector PersistentTieredCache::Stats() { assert(!tiers_.empty()); - return tiers_.front()->Erase(key); + return tiers_.front()->Stats(); } std::string PersistentTieredCache::PrintStats() { @@ -106,6 +107,11 @@ void PersistentTieredCache::AddTier(const Tier& tier) { tiers_.push_back(tier); } +bool PersistentTieredCache::IsCompressed() { + assert(tiers_.size()); + return tiers_.front()->IsCompressed(); +} + } // namespace rocksdb #endif diff --git a/utilities/persistent_cache/persistent_cache_tier.h b/utilities/persistent_cache/persistent_cache_tier.h index 4259de21e..590e5d57f 100644 --- a/utilities/persistent_cache/persistent_cache_tier.h +++ b/utilities/persistent_cache/persistent_cache_tier.h @@ -54,6 +54,173 @@ // null namespace rocksdb { +// Persistent Cache Config +// +// This struct captures all the options that are used to configure persistent +// cache. Some of the terminologies used in naming the options are +// +// dispatch size : +// This is the size in which IO is dispatched to the device +// +// write buffer size : +// This is the size of an individual write buffer size. Write buffers are +// grouped to form buffered file. +// +// cache size : +// This is the logical maximum for the cache size +// +// qdepth : +// This is the max number of IOs that can issues to the device in parallel +// +// pepeling : +// The writer code path follows pipelined architecture, which means the +// operations are handed off from one stage to another +// +// pipelining backlog size : +// With the pipelined architecture, there can always be backlogging of ops in +// pipeline queues. This is the maximum backlog size after which ops are dropped +// from queue +struct PersistentCacheConfig { + explicit PersistentCacheConfig( + Env* const _env, const std::string& _path, const uint64_t _cache_size, + const std::shared_ptr& _log, + const uint32_t _write_buffer_size = 1 * 1024 * 1024 /*1MB*/) { + env = _env; + path = _path; + log = _log; + cache_size = _cache_size; + writer_dispatch_size = write_buffer_size = _write_buffer_size; + } + + // + // Validate the settings. Our intentions are to catch erroneous settings ahead + // of time instead going violating invariants or causing dead locks. + // + Status ValidateSettings() const { + // (1) check pre-conditions for variables + if (!env || path.empty()) { + return Status::InvalidArgument("empty or null args"); + } + + // (2) assert size related invariants + // - cache size cannot be less than cache file size + // - individual write buffer size cannot be greater than cache file size + // - total write buffer size cannot be less than 2X cache file size + if (cache_size < cache_file_size || write_buffer_size >= cache_file_size || + write_buffer_size * write_buffer_count() < 2 * cache_file_size) { + return Status::InvalidArgument("invalid cache size"); + } + + // (2) check writer settings + // - Queue depth cannot be 0 + // - writer_dispatch_size cannot be greater than writer_buffer_size + // - dispatch size and buffer size need to be aligned + if (!writer_qdepth || writer_dispatch_size > write_buffer_size || + write_buffer_size % writer_dispatch_size) { + return Status::InvalidArgument("invalid writer settings"); + } + + return Status::OK(); + } + + // + // Env abstraction to use for systmer level operations + // + Env* env; + + // + // Path for the block cache where blocks are persisted + // + std::string path; + + // + // Log handle for logging messages + // + std::shared_ptr log; + + // + // Enable direct IO for reading + // + bool enable_direct_reads = true; + + // + // Enable direct IO for writing + // + bool enable_direct_writes = false; + + // + // Logical cache size + // + uint64_t cache_size = std::numeric_limits::max(); + + // cache-file-size + // + // Cache consists of multiples of small files. This parameter defines the + // size of an individual cache file + // + // default: 1M + uint32_t cache_file_size = 100ULL * 1024 * 1024; + + // writer-qdepth + // + // The writers can issues IO to the devices in parallel. This parameter + // controls the max number if IOs that can issues in parallel to the block + // device + // + // default :1 + uint32_t writer_qdepth = 1; + + // pipeline-writes + // + // The write optionally follow pipelined architecture. This helps + // avoid regression in the eviction code path of the primary tier. This + // parameter defines if pipelining is enabled or disabled + // + // default: true + bool pipeline_writes_ = true; + + // max-write-pipeline-backlog-size + // + // Max pipeline buffer size. This is the maximum backlog we can accumulate + // while waiting for writes. After the limit, new ops will be dropped. + // + // Default: 1GiB + uint64_t max_write_pipeline_backlog_size = 1ULL * 1024 * 1024 * 1024; + + // write-buffer-size + // + // This is the size in which buffer slabs are allocated. + // + // Default: 1M + uint32_t write_buffer_size = 1ULL * 1024 * 1024; + + // write-buffer-count + // + // This is the total number of buffer slabs. This is calculated as a factor of + // file size in order to avoid dead lock. + size_t write_buffer_count() const { + assert(write_buffer_size); + return (writer_qdepth + 1.2) * cache_file_size / write_buffer_size; + } + + // writer-dispatch-size + // + // The writer thread will dispatch the IO at the specified IO size + // + // default: 1M + uint64_t writer_dispatch_size = 1ULL * 1024 * 1024; + + // is_compressed + // + // This option determines if the cache will run in compressed mode or + // uncompressed mode + bool is_compressed = true; + + PersistentCacheConfig MakePersistentCacheConfig( + const std::string& path, const uint64_t size, + const std::shared_ptr& log); +}; + // Persistent Cache Tier // // This a logical abstraction that defines a tier of the persistent cache. Tiers @@ -73,9 +240,6 @@ class PersistentCacheTier : public PersistentCache { // Close the persistent cache tier virtual Status Close(); - // Flush the pending writes - virtual void Flush(); - // Reserve space up to 'size' bytes virtual bool Reserve(const size_t size); @@ -86,7 +250,7 @@ class PersistentCacheTier : public PersistentCache { virtual std::string PrintStats(); // Expose stats - virtual std::vector Stats() = 0; + virtual std::vector Stats(); // Insert to page cache virtual Status Insert(const Slice& page_key, const char* data, @@ -96,6 +260,9 @@ class PersistentCacheTier : public PersistentCache { virtual Status Lookup(const Slice& page_key, std::unique_ptr* data, size_t* size) = 0; + // Does it store compressed data ? + virtual bool IsCompressed() = 0; + // Return a reference to next tier virtual Tier& next_tier() { return next_tier_; } @@ -105,6 +272,12 @@ class PersistentCacheTier : public PersistentCache { next_tier_ = tier; } + virtual void TEST_Flush() { + if (next_tier_) { + next_tier_->TEST_Flush(); + } + } + private: Tier next_tier_; // next tier }; @@ -120,13 +293,14 @@ class PersistentTieredCache : public PersistentCacheTier { Status Open() override; Status Close() override; - void Flush() override; bool Erase(const Slice& key) override; std::string PrintStats() override; + std::vector Stats() override; Status Insert(const Slice& page_key, const char* data, const size_t size) override; Status Lookup(const Slice& page_key, std::unique_ptr* data, size_t* size) override; + bool IsCompressed() override; void AddTier(const Tier& tier); @@ -140,6 +314,12 @@ class PersistentTieredCache : public PersistentCacheTier { (*it)->set_next_tier(tier); } + void TEST_Flush() override { + assert(!tiers_.empty()); + tiers_.front()->TEST_Flush(); + PersistentCacheTier::TEST_Flush(); + } + protected: std::list tiers_; // list of tiers top-down };