diff --git a/CMakeLists.txt b/CMakeLists.txt index 2fede71c0..00756e8e3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -727,6 +727,7 @@ set(TESTS utilities/persistent_cache/persistent_cache_test.cc utilities/redis/redis_lists_test.cc utilities/spatialdb/spatial_db_test.cc + utilities/simulator_cache/sim_cache_test.cc utilities/table_properties_collectors/compact_on_deletion_collector_test.cc utilities/transactions/optimistic_transaction_test.cc utilities/transactions/transaction_test.cc diff --git a/include/rocksdb/utilities/sim_cache.h b/include/rocksdb/utilities/sim_cache.h index 60c73ec5d..f29fd5e8f 100644 --- a/include/rocksdb/utilities/sim_cache.h +++ b/include/rocksdb/utilities/sim_cache.h @@ -9,6 +9,7 @@ #include #include #include "rocksdb/cache.h" +#include "rocksdb/env.h" #include "rocksdb/slice.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" @@ -67,6 +68,19 @@ class SimCache : public Cache { // String representation of the statistics of the simcache virtual std::string ToString() const = 0; + // Start storing logs of the cache activity (Add/Lookup) into + // a file located at activity_log_file, max_logging_size option can be used to + // stop logging to the file automatically after reaching a specific size in + // bytes, a values of 0 disable this feature + virtual Status StartActivityLogging(const std::string& activity_log_file, + Env* env, uint64_t max_logging_size = 0) = 0; + + // Stop cache activity logging if any + virtual void StopActivityLogging() = 0; + + // Status of cache logging happening in background + virtual Status GetActivityLoggingStatus() = 0; + private: SimCache(const SimCache&); SimCache& operator=(const SimCache&); diff --git a/utilities/simulator_cache/sim_cache.cc b/utilities/simulator_cache/sim_cache.cc index 335ac9896..e3d801657 100644 --- a/utilities/simulator_cache/sim_cache.cc +++ b/utilities/simulator_cache/sim_cache.cc @@ -7,10 +7,144 @@ #include #include "monitoring/statistics.h" #include "port/port.h" +#include "rocksdb/env.h" +#include "util/file_reader_writer.h" +#include "util/mutexlock.h" +#include "util/string_util.h" namespace rocksdb { namespace { + +class CacheActivityLogger { + public: + CacheActivityLogger() + : activity_logging_enabled_(false), max_logging_size_(0) {} + + ~CacheActivityLogger() { + MutexLock l(&mutex_); + + StopLoggingInternal(); + } + + Status StartLogging(const std::string& activity_log_file, Env* env, + uint64_t max_logging_size = 0) { + assert(activity_log_file != ""); + assert(env != nullptr); + + Status status; + EnvOptions env_opts; + std::unique_ptr log_file; + + MutexLock l(&mutex_); + + // Stop existing logging if any + StopLoggingInternal(); + + // Open log file + status = env->NewWritableFile(activity_log_file, &log_file, env_opts); + if (!status.ok()) { + return status; + } + file_writer_.reset(new WritableFileWriter(std::move(log_file), env_opts)); + + max_logging_size_ = max_logging_size; + activity_logging_enabled_.store(true); + + return status; + } + + void StopLogging() { + MutexLock l(&mutex_); + + StopLoggingInternal(); + } + + void ReportLookup(const Slice& key) { + if (activity_logging_enabled_.load() == false) { + return; + } + + std::string log_line = "LOOKUP - " + key.ToString(true) + "\n"; + + // line format: "LOOKUP - " + MutexLock l(&mutex_); + Status s = file_writer_->Append(log_line); + if (!s.ok() && bg_status_.ok()) { + bg_status_ = s; + } + if (MaxLoggingSizeReached() || !bg_status_.ok()) { + // Stop logging if we have reached the max file size or + // encountered an error + StopLoggingInternal(); + } + } + + void ReportAdd(const Slice& key, size_t size) { + if (activity_logging_enabled_.load() == false) { + return; + } + + std::string log_line = "ADD - "; + log_line += key.ToString(true); + log_line += " - "; + AppendNumberTo(&log_line, size); + log_line += "\n"; + + // line format: "ADD - - " + MutexLock l(&mutex_); + Status s = file_writer_->Append(log_line); + if (!s.ok() && bg_status_.ok()) { + bg_status_ = s; + } + + if (MaxLoggingSizeReached() || !bg_status_.ok()) { + // Stop logging if we have reached the max file size or + // encountered an error + StopLoggingInternal(); + } + } + + Status& bg_status() { + MutexLock l(&mutex_); + return bg_status_; + } + + private: + bool MaxLoggingSizeReached() { + mutex_.AssertHeld(); + + return (max_logging_size_ > 0 && + file_writer_->GetFileSize() >= max_logging_size_); + } + + void StopLoggingInternal() { + mutex_.AssertHeld(); + + if (!activity_logging_enabled_) { + return; + } + + activity_logging_enabled_.store(false); + Status s = file_writer_->Close(); + if (!s.ok() && bg_status_.ok()) { + bg_status_ = s; + } + } + + // Mutex to sync writes to file_writer, and all following + // class data members + port::Mutex mutex_; + // Indicates if logging is currently enabled + // atomic to allow reads without mutex + std::atomic activity_logging_enabled_; + // When reached, we will stop logging and close the file + // Value of 0 means unlimited + uint64_t max_logging_size_; + std::unique_ptr file_writer_; + Status bg_status_; +}; + // SimCacheImpl definition class SimCacheImpl : public SimCache { public: @@ -48,6 +182,9 @@ class SimCacheImpl : public SimCache { } else { key_only_cache_->Release(h); } + + cache_activity_logger_.ReportAdd(key, charge); + return cache_->Insert(key, value, charge, deleter, handle, priority); } @@ -61,6 +198,9 @@ class SimCacheImpl : public SimCache { inc_miss_counter(); RecordTick(stats, SIM_BLOCK_CACHE_MISS); } + + cache_activity_logger_.ReportLookup(key); + return cache_->Lookup(key, stats); } @@ -158,12 +298,29 @@ class SimCacheImpl : public SimCache { return ret; } + virtual Status StartActivityLogging(const std::string& activity_log_file, + Env* env, + uint64_t max_logging_size = 0) override { + return cache_activity_logger_.StartLogging(activity_log_file, env, + max_logging_size); + } + + virtual void StopActivityLogging() override { + cache_activity_logger_.StopLogging(); + } + + virtual Status GetActivityLoggingStatus() override { + return cache_activity_logger_.bg_status(); + } + private: std::shared_ptr cache_; std::shared_ptr key_only_cache_; std::atomic miss_times_; std::atomic hit_times_; Statistics* stats_; + CacheActivityLogger cache_activity_logger_; + void inc_miss_counter() { miss_times_.fetch_add(1, std::memory_order_relaxed); } diff --git a/utilities/simulator_cache/sim_cache_test.cc b/utilities/simulator_cache/sim_cache_test.cc index 01b328c78..4c175c947 100644 --- a/utilities/simulator_cache/sim_cache_test.cc +++ b/utilities/simulator_cache/sim_cache_test.cc @@ -138,6 +138,77 @@ TEST_F(SimCacheTest, SimCache) { ASSERT_EQ(6, simCache->get_hit_counter()); } +TEST_F(SimCacheTest, SimCacheLogging) { + auto table_options = GetTableOptions(); + auto options = GetOptions(table_options); + options.disable_auto_compactions = true; + std::shared_ptr sim_cache = + NewSimCache(NewLRUCache(1024 * 1024), 20000, 0); + table_options.block_cache = sim_cache; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + Reopen(options); + + int num_block_entries = 20; + for (int i = 0; i < num_block_entries; i++) { + Put(Key(i), "val"); + Flush(); + } + + std::string log_file = test::TmpDir(env_) + "/cache_log.txt"; + ASSERT_OK(sim_cache->StartActivityLogging(log_file, env_)); + for (int i = 0; i < num_block_entries; i++) { + ASSERT_EQ(Get(Key(i)), "val"); + } + for (int i = 0; i < num_block_entries; i++) { + ASSERT_EQ(Get(Key(i)), "val"); + } + sim_cache->StopActivityLogging(); + ASSERT_OK(sim_cache->GetActivityLoggingStatus()); + + std::string file_contents = ""; + ReadFileToString(env_, log_file, &file_contents); + + int lookup_num = 0; + int add_num = 0; + std::string::size_type pos; + + // count number of lookups + pos = 0; + while ((pos = file_contents.find("LOOKUP -", pos)) != std::string::npos) { + ++lookup_num; + pos += 1; + } + + // count number of additions + pos = 0; + while ((pos = file_contents.find("ADD -", pos)) != std::string::npos) { + ++add_num; + pos += 1; + } + + // We asked for every block twice + ASSERT_EQ(lookup_num, num_block_entries * 2); + + // We added every block only once, since the cache can hold all blocks + ASSERT_EQ(add_num, num_block_entries); + + // Log things again but stop logging automatically after reaching 512 bytes + int max_size = 512; + ASSERT_OK(sim_cache->StartActivityLogging(log_file, env_, max_size)); + for (int it = 0; it < 10; it++) { + for (int i = 0; i < num_block_entries; i++) { + ASSERT_EQ(Get(Key(i)), "val"); + } + } + ASSERT_OK(sim_cache->GetActivityLoggingStatus()); + + uint64_t fsize = 0; + ASSERT_OK(env_->GetFileSize(log_file, &fsize)); + // error margin of 100 bytes + ASSERT_LT(fsize, max_size + 100); + ASSERT_GT(fsize, max_size - 100); +} + } // namespace rocksdb int main(int argc, char** argv) {