diff --git a/CMakeLists.txt b/CMakeLists.txt index 50b29096c..b6fa5ee34 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -851,6 +851,8 @@ set(SOURCES utilities/blob_db/blob_db_impl_filesnapshot.cc utilities/blob_db/blob_dump_tool.cc utilities/blob_db/blob_file.cc + utilities/cache_dump_load.cc + utilities/cache_dump_load_impl.cc utilities/cassandra/cassandra_compaction_filter.cc utilities/cassandra/format.cc utilities/cassandra/merge_operator.cc diff --git a/HISTORY.md b/HISTORY.md index 337167e84..efdadab65 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * Print information about blob files when using "ldb list_live_files_metadata" * Provided support for SingleDelete with user defined timestamp. * Add remote compaction read/write bytes statistics: `REMOTE_COMPACT_READ_BYTES`, `REMOTE_COMPACT_WRITE_BYTES`. +* Introduce an experimental feature to dump out the blocks from block cache and insert them to the secondary cache to reduce the cache warmup time (e.g., used while migrating DB instance). More information are in `class CacheDumper` and `CacheDumpedLoader` at `rocksdb/utilities/cache_dump_load.h` Note that, this feature is subject to the potential change in the future, it is still experimental. ### Public API change * Made SystemClock extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method. diff --git a/TARGETS b/TARGETS index add585dc8..75837ce88 100644 --- a/TARGETS +++ b/TARGETS @@ -370,6 +370,8 @@ cpp_library( "utilities/blob_db/blob_db_impl_filesnapshot.cc", "utilities/blob_db/blob_dump_tool.cc", "utilities/blob_db/blob_file.cc", + "utilities/cache_dump_load.cc", + "utilities/cache_dump_load_impl.cc", "utilities/cassandra/cassandra_compaction_filter.cc", "utilities/cassandra/format.cc", "utilities/cassandra/merge_operator.cc", @@ -693,6 +695,8 @@ cpp_library( "utilities/blob_db/blob_db_impl_filesnapshot.cc", "utilities/blob_db/blob_dump_tool.cc", "utilities/blob_db/blob_file.cc", + "utilities/cache_dump_load.cc", + "utilities/cache_dump_load_impl.cc", "utilities/cassandra/cassandra_compaction_filter.cc", "utilities/cassandra/format.cc", "utilities/cassandra/merge_operator.cc", diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index e840273bd..b4eede205 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -15,9 +15,11 @@ #include "rocksdb/cache.h" #include "rocksdb/io_status.h" #include "rocksdb/sst_file_manager.h" +#include "rocksdb/utilities/cache_dump_load.h" #include "test_util/testharness.h" #include "util/coding.h" #include "util/random.h" +#include "utilities/cache_dump_load_impl.h" #include "utilities/fault_injection_fs.h" namespace ROCKSDB_NAMESPACE { @@ -1182,6 +1184,367 @@ TEST_F(DBSecondaryCacheTest, TestSecondaryCacheMultiGet) { Destroy(options); } +class LRUCacheWithStat : public LRUCache { + public: + LRUCacheWithStat( + size_t _capacity, int _num_shard_bits, bool _strict_capacity_limit, + double _high_pri_pool_ratio, + std::shared_ptr _memory_allocator = nullptr, + bool _use_adaptive_mutex = kDefaultToAdaptiveMutex, + CacheMetadataChargePolicy _metadata_charge_policy = + kDontChargeCacheMetadata, + const std::shared_ptr& _secondary_cache = nullptr) + : LRUCache(_capacity, _num_shard_bits, _strict_capacity_limit, + _high_pri_pool_ratio, _memory_allocator, _use_adaptive_mutex, + _metadata_charge_policy, _secondary_cache) { + insert_count_ = 0; + lookup_count_ = 0; + } + ~LRUCacheWithStat() {} + + Status Insert(const Slice& key, void* value, size_t charge, DeleterFn deleter, + Handle** handle, Priority priority) override { + insert_count_++; + return LRUCache::Insert(key, value, charge, deleter, handle, priority); + } + Status Insert(const Slice& key, void* value, const CacheItemHelper* helper, + size_t chargge, Handle** handle = nullptr, + Priority priority = Priority::LOW) override { + insert_count_++; + return LRUCache::Insert(key, value, helper, chargge, handle, priority); + } + Handle* Lookup(const Slice& key, Statistics* stats) override { + lookup_count_++; + return LRUCache::Lookup(key, stats); + } + Handle* Lookup(const Slice& key, const CacheItemHelper* helper, + const CreateCallback& create_cb, Priority priority, bool wait, + Statistics* stats = nullptr) override { + lookup_count_++; + return LRUCache::Lookup(key, helper, create_cb, priority, wait, stats); + } + + uint32_t GetInsertCount() { return insert_count_; } + uint32_t GetLookupcount() { return lookup_count_; } + void ResetCount() { + insert_count_ = 0; + lookup_count_ = 0; + } + + private: + uint32_t insert_count_; + uint32_t lookup_count_; +}; + +#ifndef ROCKSDB_LITE + +TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadBasic) { + LRUCacheOptions cache_opts(1024 * 1024, 0, false, 0.5, nullptr, + kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); + LRUCacheWithStat* tmp_cache = new LRUCacheWithStat( + cache_opts.capacity, cache_opts.num_shard_bits, + cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio, + cache_opts.memory_allocator, cache_opts.use_adaptive_mutex, + cache_opts.metadata_charge_policy, cache_opts.secondary_cache); + std::shared_ptr cache(tmp_cache); + BlockBasedTableOptions table_options; + table_options.block_cache = cache; + table_options.block_size = 4 * 1024; + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.env = fault_env_.get(); + DestroyAndReopen(options); + fault_fs_->SetFailGetUniqueId(true); + + Random rnd(301); + const int N = 256; + std::vector value; + char buf[1000]; + memset(buf, 'a', 1000); + value.resize(N); + for (int i = 0; i < N; i++) { + // std::string p_v = rnd.RandomString(1000); + std::string p_v(buf, 1000); + value[i] = p_v; + ASSERT_OK(Put(Key(i), p_v)); + } + ASSERT_OK(Flush()); + Compact("a", "z"); + + // do th eread for all the key value pairs, so all the blocks should be in + // cache + uint32_t start_insert = tmp_cache->GetInsertCount(); + uint32_t start_lookup = tmp_cache->GetLookupcount(); + std::string v; + for (int i = 0; i < N; i++) { + v = Get(Key(i)); + ASSERT_EQ(v, value[i]); + } + uint32_t dump_insert = tmp_cache->GetInsertCount() - start_insert; + uint32_t dump_lookup = tmp_cache->GetLookupcount() - start_lookup; + ASSERT_EQ(63, + static_cast(dump_insert)); // the insert in the block cache + ASSERT_EQ(256, + static_cast(dump_lookup)); // the lookup in the block cache + // We have enough blocks in the block cache + + CacheDumpOptions cd_options; + cd_options.clock = fault_env_->GetSystemClock().get(); + std::string dump_path = db_->GetName() + "/cache_dump"; + std::unique_ptr dump_writer; + Status s = NewToFileCacheDumpWriter(fault_fs_, FileOptions(), dump_path, + &dump_writer); + ASSERT_OK(s); + std::unique_ptr cache_dumper; + s = NewDefaultCacheDumper(cd_options, cache, std::move(dump_writer), + &cache_dumper); + ASSERT_OK(s); + std::vector db_list; + db_list.push_back(db_); + s = cache_dumper->SetDumpFilter(db_list); + ASSERT_OK(s); + s = cache_dumper->DumpCacheEntriesToWriter(); + ASSERT_OK(s); + cache_dumper.reset(); + + // we have a new cache it is empty, then, before we do the Get, we do the + // dumpload + std::shared_ptr secondary_cache = + std::make_shared(2048 * 1024); + cache_opts.secondary_cache = secondary_cache; + tmp_cache = new LRUCacheWithStat( + cache_opts.capacity, cache_opts.num_shard_bits, + cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio, + cache_opts.memory_allocator, cache_opts.use_adaptive_mutex, + cache_opts.metadata_charge_policy, cache_opts.secondary_cache); + std::shared_ptr cache_new(tmp_cache); + table_options.block_cache = cache_new; + table_options.block_size = 4 * 1024; + options.create_if_missing = true; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.env = fault_env_.get(); + + // start to load the data to new block cache + start_insert = secondary_cache->num_inserts(); + start_lookup = secondary_cache->num_lookups(); + std::unique_ptr dump_reader; + s = NewFromFileCacheDumpReader(fault_fs_, FileOptions(), dump_path, + &dump_reader); + ASSERT_OK(s); + std::unique_ptr cache_loader; + s = NewDefaultCacheDumpedLoader(cd_options, table_options, secondary_cache, + std::move(dump_reader), &cache_loader); + ASSERT_OK(s); + s = cache_loader->RestoreCacheEntriesToSecondaryCache(); + ASSERT_OK(s); + uint32_t load_insert = secondary_cache->num_inserts() - start_insert; + uint32_t load_lookup = secondary_cache->num_lookups() - start_lookup; + // check the number we inserted + ASSERT_EQ(64, static_cast(load_insert)); + ASSERT_EQ(0, static_cast(load_lookup)); + ASSERT_OK(s); + + Reopen(options); + + // After load, we do the Get again + start_insert = secondary_cache->num_inserts(); + start_lookup = secondary_cache->num_lookups(); + uint32_t cache_insert = tmp_cache->GetInsertCount(); + uint32_t cache_lookup = tmp_cache->GetLookupcount(); + for (int i = 0; i < N; i++) { + v = Get(Key(i)); + ASSERT_EQ(v, value[i]); + } + uint32_t final_insert = secondary_cache->num_inserts() - start_insert; + uint32_t final_lookup = secondary_cache->num_lookups() - start_lookup; + // no insert to secondary cache + ASSERT_EQ(0, static_cast(final_insert)); + // lookup the secondary to get all blocks + ASSERT_EQ(64, static_cast(final_lookup)); + uint32_t block_insert = tmp_cache->GetInsertCount() - cache_insert; + uint32_t block_lookup = tmp_cache->GetLookupcount() - cache_lookup; + // Check the new block cache insert and lookup, should be no insert since all + // blocks are from the secondary cache. + ASSERT_EQ(0, static_cast(block_insert)); + ASSERT_EQ(256, static_cast(block_lookup)); + + fault_fs_->SetFailGetUniqueId(false); + Destroy(options); +} + +TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadWithFilter) { + LRUCacheOptions cache_opts(1024 * 1024, 0, false, 0.5, nullptr, + kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); + LRUCacheWithStat* tmp_cache = new LRUCacheWithStat( + cache_opts.capacity, cache_opts.num_shard_bits, + cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio, + cache_opts.memory_allocator, cache_opts.use_adaptive_mutex, + cache_opts.metadata_charge_policy, cache_opts.secondary_cache); + std::shared_ptr cache(tmp_cache); + BlockBasedTableOptions table_options; + table_options.block_cache = cache; + table_options.block_size = 4 * 1024; + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.env = fault_env_.get(); + std::string dbname1 = test::PerThreadDBPath("db_1"); + ASSERT_OK(DestroyDB(dbname1, options)); + DB* db1 = nullptr; + ASSERT_OK(DB::Open(options, dbname1, &db1)); + std::string dbname2 = test::PerThreadDBPath("db_2"); + ASSERT_OK(DestroyDB(dbname2, options)); + DB* db2 = nullptr; + ASSERT_OK(DB::Open(options, dbname2, &db2)); + fault_fs_->SetFailGetUniqueId(true); + + // write the KVs to db1 + Random rnd(301); + const int N = 256; + std::vector value1; + WriteOptions wo; + char buf[1000]; + memset(buf, 'a', 1000); + value1.resize(N); + for (int i = 0; i < N; i++) { + std::string p_v(buf, 1000); + value1[i] = p_v; + ASSERT_OK(db1->Put(wo, Key(i), p_v)); + } + ASSERT_OK(db1->Flush(FlushOptions())); + Slice bg("a"); + Slice ed("b"); + ASSERT_OK(db1->CompactRange(CompactRangeOptions(), &bg, &ed)); + + // Write the KVs to DB2 + std::vector value2; + memset(buf, 'b', 1000); + value2.resize(N); + for (int i = 0; i < N; i++) { + std::string p_v(buf, 1000); + value2[i] = p_v; + ASSERT_OK(db2->Put(wo, Key(i), p_v)); + } + ASSERT_OK(db2->Flush(FlushOptions())); + ASSERT_OK(db2->CompactRange(CompactRangeOptions(), &bg, &ed)); + + // do th eread for all the key value pairs, so all the blocks should be in + // cache + uint32_t start_insert = tmp_cache->GetInsertCount(); + uint32_t start_lookup = tmp_cache->GetLookupcount(); + ReadOptions ro; + std::string v; + for (int i = 0; i < N; i++) { + ASSERT_OK(db1->Get(ro, Key(i), &v)); + ASSERT_EQ(v, value1[i]); + } + for (int i = 0; i < N; i++) { + ASSERT_OK(db2->Get(ro, Key(i), &v)); + ASSERT_EQ(v, value2[i]); + } + uint32_t dump_insert = tmp_cache->GetInsertCount() - start_insert; + uint32_t dump_lookup = tmp_cache->GetLookupcount() - start_lookup; + ASSERT_EQ(128, + static_cast(dump_insert)); // the insert in the block cache + ASSERT_EQ(512, + static_cast(dump_lookup)); // the lookup in the block cache + // We have enough blocks in the block cache + + CacheDumpOptions cd_options; + cd_options.clock = fault_env_->GetSystemClock().get(); + std::string dump_path = db1->GetName() + "/cache_dump"; + std::unique_ptr dump_writer; + Status s = NewToFileCacheDumpWriter(fault_fs_, FileOptions(), dump_path, + &dump_writer); + ASSERT_OK(s); + std::unique_ptr cache_dumper; + s = NewDefaultCacheDumper(cd_options, cache, std::move(dump_writer), + &cache_dumper); + ASSERT_OK(s); + std::vector db_list; + db_list.push_back(db1); + s = cache_dumper->SetDumpFilter(db_list); + ASSERT_OK(s); + s = cache_dumper->DumpCacheEntriesToWriter(); + ASSERT_OK(s); + cache_dumper.reset(); + + // we have a new cache it is empty, then, before we do the Get, we do the + // dumpload + std::shared_ptr secondary_cache = + std::make_shared(2048 * 1024); + cache_opts.secondary_cache = secondary_cache; + tmp_cache = new LRUCacheWithStat( + cache_opts.capacity, cache_opts.num_shard_bits, + cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio, + cache_opts.memory_allocator, cache_opts.use_adaptive_mutex, + cache_opts.metadata_charge_policy, cache_opts.secondary_cache); + std::shared_ptr cache_new(tmp_cache); + table_options.block_cache = cache_new; + table_options.block_size = 4 * 1024; + options.create_if_missing = true; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.env = fault_env_.get(); + + // Start the cache loading process + start_insert = secondary_cache->num_inserts(); + start_lookup = secondary_cache->num_lookups(); + std::unique_ptr dump_reader; + s = NewFromFileCacheDumpReader(fault_fs_, FileOptions(), dump_path, + &dump_reader); + ASSERT_OK(s); + std::unique_ptr cache_loader; + s = NewDefaultCacheDumpedLoader(cd_options, table_options, secondary_cache, + std::move(dump_reader), &cache_loader); + ASSERT_OK(s); + s = cache_loader->RestoreCacheEntriesToSecondaryCache(); + ASSERT_OK(s); + uint32_t load_insert = secondary_cache->num_inserts() - start_insert; + uint32_t load_lookup = secondary_cache->num_lookups() - start_lookup; + // check the number we inserted + ASSERT_EQ(64, static_cast(load_insert)); + ASSERT_EQ(0, static_cast(load_lookup)); + ASSERT_OK(s); + + ASSERT_OK(db1->Close()); + delete db1; + ASSERT_OK(DB::Open(options, dbname1, &db1)); + + // After load, we do the Get again. To validate the cache, we do not allow any + // I/O, so we set the file system to false. + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + fault_fs_->SetFilesystemActive(false, error_msg); + start_insert = secondary_cache->num_inserts(); + start_lookup = secondary_cache->num_lookups(); + uint32_t cache_insert = tmp_cache->GetInsertCount(); + uint32_t cache_lookup = tmp_cache->GetLookupcount(); + for (int i = 0; i < N; i++) { + ASSERT_OK(db1->Get(ro, Key(i), &v)); + ASSERT_EQ(v, value1[i]); + } + uint32_t final_insert = secondary_cache->num_inserts() - start_insert; + uint32_t final_lookup = secondary_cache->num_lookups() - start_lookup; + // no insert to secondary cache + ASSERT_EQ(0, static_cast(final_insert)); + // lookup the secondary to get all blocks + ASSERT_EQ(64, static_cast(final_lookup)); + uint32_t block_insert = tmp_cache->GetInsertCount() - cache_insert; + uint32_t block_lookup = tmp_cache->GetLookupcount() - cache_lookup; + // Check the new block cache insert and lookup, should be no insert since all + // blocks are from the secondary cache. + ASSERT_EQ(0, static_cast(block_insert)); + ASSERT_EQ(256, static_cast(block_lookup)); + fault_fs_->SetFailGetUniqueId(false); + fault_fs_->SetFilesystemActive(true); + delete db1; + delete db2; + ASSERT_OK(DestroyDB(dbname1, options)); + ASSERT_OK(DestroyDB(dbname2, options)); +} + +#endif // ROCKSDB_LITE + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index b20f0ea49..b8abd9da9 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -23,17 +23,17 @@ #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { -Status WritableFileWriter::Create(const std::shared_ptr& fs, - const std::string& fname, - const FileOptions& file_opts, - std::unique_ptr* writer, - IODebugContext* dbg) { +IOStatus WritableFileWriter::Create(const std::shared_ptr& fs, + const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* writer, + IODebugContext* dbg) { std::unique_ptr file; - Status s = fs->NewWritableFile(fname, file_opts, &file, dbg); - if (s.ok()) { + IOStatus io_s = fs->NewWritableFile(fname, file_opts, &file, dbg); + if (io_s.ok()) { writer->reset(new WritableFileWriter(std::move(file), fname, file_opts)); } - return s; + return io_s; } IOStatus WritableFileWriter::Append(const Slice& data, diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index 56741cd0f..84a659267 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -200,10 +200,10 @@ class WritableFileWriter { } } - static Status Create(const std::shared_ptr& fs, - const std::string& fname, const FileOptions& file_opts, - std::unique_ptr* writer, - IODebugContext* dbg); + static IOStatus Create(const std::shared_ptr& fs, + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* writer, + IODebugContext* dbg); WritableFileWriter(const WritableFileWriter&) = delete; WritableFileWriter& operator=(const WritableFileWriter&) = delete; diff --git a/include/rocksdb/utilities/cache_dump_load.h b/include/rocksdb/utilities/cache_dump_load.h new file mode 100644 index 000000000..fde03db7e --- /dev/null +++ b/include/rocksdb/utilities/cache_dump_load.h @@ -0,0 +1,142 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include + +#include "rocksdb/cache.h" +#include "rocksdb/env.h" +#include "rocksdb/file_system.h" +#include "rocksdb/io_status.h" +#include "rocksdb/secondary_cache.h" +#include "rocksdb/table.h" + +namespace ROCKSDB_NAMESPACE { + +// The classes and functions in this header file is used for dumping out the +// blocks in a block cache, storing or transfering the blocks to another +// destination host, and load these blocks to the secondary cache at destination +// host. +// NOTE that: The classes, functions, and data structures are EXPERIMENTAL! They +// my be changed in the future when the development continues. + +// The major and minor version number of the data format to be stored/trandfered +// via CacheDumpWriter and read out via CacheDumpReader +static const int kCacheDumpMajorVersion = 0; +static const int kCacheDumpMinorVersion = 1; + +// NOTE that: this class is EXPERIMENTAL! May be changed in the future! +// This is an abstract class to write or transfer the data that is created by +// CacheDumper. We pack one block with its block type, dump time, block key in +// the block cache, block len, block crc32c checksum and block itself as a unit +// and it is stored via WritePacket. Before we call WritePacket, we must call +// WriteMetadata once, which stores the sequence number, block unit checksum, +// and block unit size. +// We provide file based CacheDumpWriter to store the metadata and its package +// sequentially in a file as the defualt implementation. Users can implement +// their own CacheDumpWriter to store/transfer the data. For example, user can +// create a subclass which transfer the metadata and package on the fly. +class CacheDumpWriter { + public: + virtual ~CacheDumpWriter() = default; + + // Called ONCE before the calls to WritePacket + virtual IOStatus WriteMetadata(const Slice& metadata) = 0; + virtual IOStatus WritePacket(const Slice& data) = 0; + virtual IOStatus Close() = 0; +}; + +// NOTE that: this class is EXPERIMENTAL! May be changed in the future! +// This is an abstract class to read or receive the data that is stored +// or transfered by CacheDumpWriter. Note that, ReadMetadata must be called +// once before we call a ReadPacket. +class CacheDumpReader { + public: + virtual ~CacheDumpReader() = default; + // Called ONCE before the calls to ReadPacket + virtual IOStatus ReadMetadata(std::string* metadata) = 0; + // Sets data to empty string on EOF + virtual IOStatus ReadPacket(std::string* data) = 0; + // (Close not needed) +}; + +// CacheDumpOptions is the option for CacheDumper and CacheDumpedLoader. Any +// dump or load process related control variables can be added here. +struct CacheDumpOptions { + SystemClock* clock; +}; + +// NOTE that: this class is EXPERIMENTAL! May be changed in the future! +// This the class to dump out the block in the block cache, store/transfer them +// via CacheDumpWriter. In order to dump out the blocks belonging to a certain +// DB or a list of DB (block cache can be shared by many DB), user needs to call +// SetDumpFilter to specify a list of DB to filter out the blocks that do not +// belong to those DB. +// A typical use case is: when we migrate a DB instance from host A to host B. +// We need to reopen the DB at host B after all the files are copied to host B. +// At this moment, the block cache at host B does not have any block from this +// migrated DB. Therefore, the read performance can be low due to cache warm up. +// By using CacheDumper before we shut down the DB at host A and using +// CacheDumpedLoader at host B before we reopen the DB, we can warmup the cache +// ahead. This function can be used in other use cases also. +class CacheDumper { + public: + virtual ~CacheDumper() = default; + // Only dump the blocks in the block cache that belong to the DBs in this list + virtual Status SetDumpFilter(std::vector db_list) { + (void)db_list; + return Status::NotSupported("SetDumpFilter is not supported"); + } + // The main function to dump out all the blocks that satisfy the filter + // condition from block cache to a certain CacheDumpWriter in one shot. This + // process may take some time. + virtual IOStatus DumpCacheEntriesToWriter() { + return IOStatus::NotSupported("DumpCacheEntriesToWriter is not supported"); + } +}; + +// NOTE that: this class is EXPERIMENTAL! May be changed in the future! +// This is the class to load the dumped blocks to the destination cache. For now +// we only load the blocks to the SecondaryCache. In the future, we may plan to +// support loading to the block cache. +class CacheDumpedLoader { + public: + virtual ~CacheDumpedLoader() = default; + virtual IOStatus RestoreCacheEntriesToSecondaryCache() { + return IOStatus::NotSupported( + "RestoreCacheEntriesToSecondaryCache is not supported"); + } +}; + +// Get the writer which stores all the metadata and data sequentially to a file +IOStatus NewToFileCacheDumpWriter(const std::shared_ptr& fs, + const FileOptions& file_opts, + const std::string& file_name, + std::unique_ptr* writer); + +// Get the reader which read out the metadata and data sequentially from a file +IOStatus NewFromFileCacheDumpReader(const std::shared_ptr& fs, + const FileOptions& file_opts, + const std::string& file_name, + std::unique_ptr* reader); + +// Get the default cache dumper +Status NewDefaultCacheDumper(const CacheDumpOptions& dump_options, + const std::shared_ptr& cache, + std::unique_ptr&& writer, + std::unique_ptr* cache_dumper); + +// Get the default cache dump loader +Status NewDefaultCacheDumpedLoader( + const CacheDumpOptions& dump_options, + const BlockBasedTableOptions& toptions, + const std::shared_ptr& secondary_cache, + std::unique_ptr&& reader, + std::unique_ptr* cache_dump_loader); + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/src.mk b/src.mk index 12a7f05b9..28d53ce5a 100644 --- a/src.mk +++ b/src.mk @@ -234,6 +234,8 @@ LIB_SOURCES = \ utilities/blob_db/blob_db_impl.cc \ utilities/blob_db/blob_db_impl_filesnapshot.cc \ utilities/blob_db/blob_file.cc \ + utilities/cache_dump_load.cc \ + utilities/cache_dump_load_impl.cc \ utilities/cassandra/cassandra_compaction_filter.cc \ utilities/cassandra/format.cc \ utilities/cassandra/merge_operator.cc \ diff --git a/table/mock_table.cc b/table/mock_table.cc index cc3cff973..c40b6a270 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -266,8 +266,8 @@ TableBuilder* MockTableFactory::NewTableBuilder( Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname, KVVector file_contents) { std::unique_ptr file_writer; - auto s = WritableFileWriter::Create(env->GetFileSystem(), fname, - FileOptions(), &file_writer, nullptr); + Status s = WritableFileWriter::Create(env->GetFileSystem(), fname, + FileOptions(), &file_writer, nullptr); if (!s.ok()) { return s; } diff --git a/utilities/cache_dump_load.cc b/utilities/cache_dump_load.cc new file mode 100644 index 000000000..9a7c76798 --- /dev/null +++ b/utilities/cache_dump_load.cc @@ -0,0 +1,69 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "rocksdb/utilities/cache_dump_load.h" + +#include "file/writable_file_writer.h" +#include "port/lang.h" +#include "rocksdb/env.h" +#include "rocksdb/file_system.h" +#include "table/format.h" +#include "util/crc32c.h" +#include "utilities/cache_dump_load_impl.h" + +namespace ROCKSDB_NAMESPACE { + +IOStatus NewToFileCacheDumpWriter(const std::shared_ptr& fs, + const FileOptions& file_opts, + const std::string& file_name, + std::unique_ptr* writer) { + std::unique_ptr file_writer; + IOStatus io_s = WritableFileWriter::Create(fs, file_name, file_opts, + &file_writer, nullptr); + if (!io_s.ok()) { + return io_s; + } + writer->reset(new ToFileCacheDumpWriter(std::move(file_writer))); + return io_s; +} + +IOStatus NewFromFileCacheDumpReader(const std::shared_ptr& fs, + const FileOptions& file_opts, + const std::string& file_name, + std::unique_ptr* reader) { + std::unique_ptr file_reader; + IOStatus io_s = RandomAccessFileReader::Create(fs, file_name, file_opts, + &file_reader, nullptr); + if (!io_s.ok()) { + return io_s; + } + reader->reset(new FromFileCacheDumpReader(std::move(file_reader))); + return io_s; +} + +Status NewDefaultCacheDumper(const CacheDumpOptions& dump_options, + const std::shared_ptr& cache, + std::unique_ptr&& writer, + std::unique_ptr* cache_dumper) { + cache_dumper->reset( + new CacheDumperImpl(dump_options, cache, std::move(writer))); + return Status::OK(); +} + +Status NewDefaultCacheDumpedLoader( + const CacheDumpOptions& dump_options, + const BlockBasedTableOptions& toptions, + const std::shared_ptr& secondary_cache, + std::unique_ptr&& reader, + std::unique_ptr* cache_dump_loader) { + cache_dump_loader->reset(new CacheDumpedLoaderImpl( + dump_options, toptions, secondary_cache, std::move(reader))); + return Status::OK(); +} + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/utilities/cache_dump_load_impl.cc b/utilities/cache_dump_load_impl.cc new file mode 100644 index 000000000..f8c6acc1c --- /dev/null +++ b/utilities/cache_dump_load_impl.cc @@ -0,0 +1,477 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/cache_dump_load_impl.h" + +#include "cache/cache_entry_roles.h" +#include "file/writable_file_writer.h" +#include "port/lang.h" +#include "rocksdb/env.h" +#include "rocksdb/file_system.h" +#include "rocksdb/utilities/ldb_cmd.h" +#include "table/format.h" +#include "util/crc32c.h" + +namespace ROCKSDB_NAMESPACE { + +// Set the dump filter with a list of DBs. Block cache may be shared by multipe +// DBs and we may only want to dump out the blocks belonging to certain DB(s). +// Therefore, a filter is need to decide if the key of the block satisfy the +// requirement. +Status CacheDumperImpl::SetDumpFilter(std::vector db_list) { + Status s = Status::OK(); + for (size_t i = 0; i < db_list.size(); i++) { + assert(i < db_list.size()); + TablePropertiesCollection ptc; + assert(db_list[i] != nullptr); + s = db_list[i]->GetPropertiesOfAllTables(&ptc); + if (!s.ok()) { + return s; + } + for (auto id = ptc.begin(); id != ptc.end(); id++) { + assert(id->second->db_session_id.size() == 20); + prefix_filter_.insert(id->second->db_session_id); + } + } + return s; +} + +// This is the main function to dump out the cache block entries to the writer. +// The writer may create a file or write to other systems. Currently, we will +// iterate the whole block cache, get the blocks, and write them to the writer +IOStatus CacheDumperImpl::DumpCacheEntriesToWriter() { + // Prepare stage, check the parameters. + if (cache_ == nullptr) { + return IOStatus::InvalidArgument("Cache is null"); + } + if (writer_ == nullptr) { + return IOStatus::InvalidArgument("CacheDumpWriter is null"); + } + // Set the system clock + if (options_.clock == nullptr) { + return IOStatus::InvalidArgument("System clock is null"); + } + clock_ = options_.clock; + // We copy the Cache Deleter Role Map as its member. + role_map_ = CopyCacheDeleterRoleMap(); + // Set the sequence number + sequence_num_ = 0; + + // Dump stage, first, we write the hader + IOStatus io_s = WriteHeader(); + if (!io_s.ok()) { + return io_s; + } + + // Then, we iterate the block cache and dump out the blocks that are not + // filtered out. + cache_->ApplyToAllEntries(DumpOneBlockCallBack(), {}); + + // Finally, write the footer + io_s = WriteFooter(); + if (!io_s.ok()) { + return io_s; + } + io_s = writer_->Close(); + return io_s; +} + +// Check if we need to filter out the block based on its key +bool CacheDumperImpl::ShouldFilterOut(const Slice& key) { + // Since now we use db_session_id as the prefix, the prefix size is 20. If + // Anything changes in the future, we need to update it here. + bool filter_out = true; + size_t prefix_size = 20; + Slice key_prefix(key.data(), prefix_size); + std::string prefix = key_prefix.ToString(); + if (prefix_filter_.find(prefix) != prefix_filter_.end()) { + filter_out = false; + } + return filter_out; +} + +// This is the callback function which will be applied to +// Cache::ApplyToAllEntries. In this callback function, we will get the block +// type, decide if the block needs to be dumped based on the filter, and write +// the block through the provided writer. +std::function +CacheDumperImpl::DumpOneBlockCallBack() { + return [&](const Slice& key, void* value, size_t /*charge*/, + Cache::DeleterFn deleter) { + // Step 1: get the type of the block from role_map_ + auto e = role_map_.find(deleter); + CacheEntryRole role; + CacheDumpUnitType type = CacheDumpUnitType::kBlockTypeMax; + if (e == role_map_.end()) { + role = CacheEntryRole::kMisc; + } else { + role = e->second; + } + bool filter_out = false; + + // Step 2: based on the key prefix, check if the block should be filter out. + if (ShouldFilterOut(key)) { + filter_out = true; + } + + // Step 3: based on the block type, get the block raw pointer and length. + const char* block_start = nullptr; + size_t block_len = 0; + switch (role) { + case CacheEntryRole::kDataBlock: + type = CacheDumpUnitType::kData; + block_start = (static_cast(value))->data(); + block_len = (static_cast(value))->size(); + break; + case CacheEntryRole::kDeprecatedFilterBlock: + type = CacheDumpUnitType::kDeprecatedFilterBlock; + block_start = (static_cast(value))->data.data(); + block_len = (static_cast(value))->data.size(); + break; + case CacheEntryRole::kFilterBlock: + type = CacheDumpUnitType::kFilter; + block_start = (static_cast(value)) + ->GetBlockContentsData() + .data(); + block_len = (static_cast(value)) + ->GetBlockContentsData() + .size(); + break; + case CacheEntryRole::kFilterMetaBlock: + type = CacheDumpUnitType::kFilterMetaBlock; + block_start = (static_cast(value))->data(); + block_len = (static_cast(value))->size(); + break; + case CacheEntryRole::kIndexBlock: + type = CacheDumpUnitType::kIndex; + block_start = (static_cast(value))->data(); + block_len = (static_cast(value))->size(); + break; + case CacheEntryRole::kMisc: + filter_out = true; + break; + case CacheEntryRole::kOtherBlock: + filter_out = true; + break; + case CacheEntryRole::kWriteBuffer: + filter_out = true; + break; + default: + filter_out = true; + } + + // Step 4: if the block should not be filter out, write the block to the + // CacheDumpWriter + if (!filter_out && block_start != nullptr) { + char* buffer = new char[block_len]; + memcpy(buffer, block_start, block_len); + WriteCacheBlock(type, key, (void*)buffer, block_len) + .PermitUncheckedError(); + delete[] buffer; + } + }; +} +// Write the raw block to the writer. It takes the timestamp of the block being +// copied from block cache, block type, key, block pointer, raw block size and +// the block checksum as the input. When writing the raw block, we first create +// the dump unit and encoude it to a string. Then, we calculate the checksum of +// the how dump unit string and store it in the dump unit metadata. +// First, we write the metadata first, which is a fixed size string. Then, we +// Append the dump unit string to the writer. +IOStatus CacheDumperImpl::WriteRawBlock(uint64_t timestamp, + CacheDumpUnitType type, + const Slice& key, void* value, + size_t len, uint32_t checksum) { + // First, serilize the block information in a string + DumpUnit dump_unit; + dump_unit.timestamp = timestamp; + dump_unit.key = key; + dump_unit.type = type; + dump_unit.value_len = len; + dump_unit.value = value; + dump_unit.value_checksum = checksum; + std::string encoded_data; + CacheDumperHelper::EncodeDumpUnit(dump_unit, &encoded_data); + + // Second, create the metadata, which contains a sequence number, the dump + // unit string checksum and the string size. The sequence number monotonically + // increases from 0. + DumpUnitMeta unit_meta; + unit_meta.sequence_num = sequence_num_; + sequence_num_++; + unit_meta.dump_unit_checksum = + crc32c::Value(encoded_data.c_str(), encoded_data.size()); + unit_meta.dump_unit_size = static_cast(encoded_data.size()); + std::string encoded_meta; + CacheDumperHelper::EncodeDumpUnitMeta(unit_meta, &encoded_meta); + + // We write the metadata first. + assert(writer_ != nullptr); + IOStatus io_s = writer_->WriteMetadata(Slice(encoded_meta)); + if (!io_s.ok()) { + return io_s; + } + // followed by the dump unit. + return writer_->WritePacket(Slice(encoded_data)); +} + +// Before we write any block, we write the header first to store the cache dump +// format version, rocksdb version, and brief intro. +IOStatus CacheDumperImpl::WriteHeader() { + std::string header_key = "header"; + std::ostringstream s; + s << kTraceMagic << "\t" + << "Cache dump format version: " << kCacheDumpMajorVersion << "." + << kCacheDumpMinorVersion << "\t" + << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t" + << "Format: dump_unit_metadata , dump_unit cache_value\n"; + std::string header_value(s.str()); + CacheDumpUnitType type = CacheDumpUnitType::kHeader; + uint64_t timestamp = clock_->NowMicros(); + uint32_t header_checksum = + crc32c::Value(header_value.c_str(), header_value.size()); + return WriteRawBlock(timestamp, type, Slice(header_key), + (void*)header_value.c_str(), header_value.size(), + header_checksum); +} + +// Write the block dumped from cache +IOStatus CacheDumperImpl::WriteCacheBlock(const CacheDumpUnitType type, + const Slice& key, void* value, + size_t len) { + uint64_t timestamp = clock_->NowMicros(); + uint32_t value_checksum = crc32c::Value((char*)value, len); + return WriteRawBlock(timestamp, type, key, value, len, value_checksum); +} + +// Write the footer after all the blocks are stored to indicate the ending. +IOStatus CacheDumperImpl::WriteFooter() { + std::string footer_key = "footer"; + std::ostringstream s; + std::string footer_value("cache dump completed"); + CacheDumpUnitType type = CacheDumpUnitType::kFooter; + uint64_t timestamp = clock_->NowMicros(); + uint32_t footer_checksum = + crc32c::Value(footer_value.c_str(), footer_value.size()); + return WriteRawBlock(timestamp, type, Slice(footer_key), + (void*)footer_value.c_str(), footer_value.size(), + footer_checksum); +} + +// This is the main function to restore the cache entries to secondary cache. +// First, we check if all the arguments are valid. Then, we read the block +// sequentially from the reader and insert them to the secondary cache. +IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() { + // TODO: remove this line when options are used in the loader + (void)options_; + // Step 1: we check if all the arguments are valid + if (secondary_cache_ == nullptr) { + return IOStatus::InvalidArgument("Secondary Cache is null"); + } + if (reader_ == nullptr) { + return IOStatus::InvalidArgument("CacheDumpReader is null"); + } + // we copy the Cache Deleter Role Map as its member. + role_map_ = CopyCacheDeleterRoleMap(); + + // Step 2: read the header + // TODO: we need to check the cache dump format version and RocksDB version + // after the header is read out. + IOStatus io_s; + DumpUnit dump_unit; + std::string data; + io_s = ReadHeader(&data, &dump_unit); + if (!io_s.ok()) { + return io_s; + } + + // Step 3: read out the rest of the blocks from the reader. The loop will stop + // either I/O status is not ok or we reach to the the end. + while (io_s.ok() && dump_unit.type != CacheDumpUnitType::kFooter) { + dump_unit.reset(); + data.clear(); + // read the content and store in the dump_unit + io_s = ReadCacheBlock(&data, &dump_unit); + if (!io_s.ok()) { + break; + } + // create the raw_block_content based on the information in the dump_unit + BlockContents raw_block_contents( + Slice((char*)dump_unit.value, dump_unit.value_len)); + Cache::CacheItemHelper* helper = nullptr; + Statistics* statistics = nullptr; + Status s = Status::OK(); + // according to the block type, get the helper callback function and create + // the corresponding block + switch (dump_unit.type) { + case CacheDumpUnitType::kDeprecatedFilterBlock: { + helper = BlocklikeTraits::GetCacheItemHelper( + BlockType::kFilter); + std::unique_ptr block_holder; + block_holder.reset(BlocklikeTraits::Create( + std::move(raw_block_contents), 0, statistics, false, + toptions_.filter_policy.get())); + // Insert the block to secondary cache. + // Note that, if we cannot get the correct helper callback, the block + // will not be inserted. + if (helper != nullptr) { + s = secondary_cache_->Insert(dump_unit.key, + (void*)(block_holder.get()), helper); + } + break; + } + case CacheDumpUnitType::kFilter: { + helper = BlocklikeTraits::GetCacheItemHelper( + BlockType::kFilter); + std::unique_ptr block_holder; + block_holder.reset(BlocklikeTraits::Create( + std::move(raw_block_contents), toptions_.read_amp_bytes_per_bit, + statistics, false, toptions_.filter_policy.get())); + if (helper != nullptr) { + s = secondary_cache_->Insert(dump_unit.key, + (void*)(block_holder.get()), helper); + } + break; + } + case CacheDumpUnitType::kData: { + helper = BlocklikeTraits::GetCacheItemHelper(BlockType::kData); + std::unique_ptr block_holder; + block_holder.reset(BlocklikeTraits::Create( + std::move(raw_block_contents), toptions_.read_amp_bytes_per_bit, + statistics, false, toptions_.filter_policy.get())); + if (helper != nullptr) { + s = secondary_cache_->Insert(dump_unit.key, + (void*)(block_holder.get()), helper); + } + break; + } + case CacheDumpUnitType::kIndex: { + helper = BlocklikeTraits::GetCacheItemHelper(BlockType::kIndex); + std::unique_ptr block_holder; + block_holder.reset(BlocklikeTraits::Create( + std::move(raw_block_contents), 0, statistics, false, + toptions_.filter_policy.get())); + if (helper != nullptr) { + s = secondary_cache_->Insert(dump_unit.key, + (void*)(block_holder.get()), helper); + } + break; + } + case CacheDumpUnitType::kFilterMetaBlock: { + helper = BlocklikeTraits::GetCacheItemHelper(BlockType::kFilter); + std::unique_ptr block_holder; + block_holder.reset(BlocklikeTraits::Create( + std::move(raw_block_contents), toptions_.read_amp_bytes_per_bit, + statistics, false, toptions_.filter_policy.get())); + if (helper != nullptr) { + s = secondary_cache_->Insert(dump_unit.key, + (void*)(block_holder.get()), helper); + } + break; + } + case CacheDumpUnitType::kFooter: + break; + default: + continue; + } + if (!s.ok()) { + io_s = status_to_io_status(std::move(s)); + } + } + if (dump_unit.type == CacheDumpUnitType::kFooter) { + return IOStatus::OK(); + } else { + return io_s; + } +} + +// Read and copy the dump unit metadata to std::string data, decode and create +// the unit metadata based on the string +IOStatus CacheDumpedLoaderImpl::ReadDumpUnitMeta(std::string* data, + DumpUnitMeta* unit_meta) { + assert(reader_ != nullptr); + assert(data != nullptr); + assert(unit_meta != nullptr); + IOStatus io_s = reader_->ReadMetadata(data); + if (!io_s.ok()) { + return io_s; + } + return status_to_io_status( + CacheDumperHelper::DecodeDumpUnitMeta(*data, unit_meta)); +} + +// Read and copy the dump unit to std::string data, decode and create the unit +// based on the string +IOStatus CacheDumpedLoaderImpl::ReadDumpUnit(size_t len, std::string* data, + DumpUnit* unit) { + assert(reader_ != nullptr); + assert(data != nullptr); + assert(unit != nullptr); + IOStatus io_s = reader_->ReadPacket(data); + if (!io_s.ok()) { + return io_s; + } + if (data->size() != len) { + return IOStatus::Corruption( + "The data being read out does not match the size stored in metadata!"); + } + Slice block; + return status_to_io_status(CacheDumperHelper::DecodeDumpUnit(*data, unit)); +} + +// Read the header +IOStatus CacheDumpedLoaderImpl::ReadHeader(std::string* data, + DumpUnit* dump_unit) { + DumpUnitMeta header_meta; + header_meta.reset(); + std::string meta_string; + IOStatus io_s = ReadDumpUnitMeta(&meta_string, &header_meta); + if (!io_s.ok()) { + return io_s; + } + + io_s = ReadDumpUnit(header_meta.dump_unit_size, data, dump_unit); + if (!io_s.ok()) { + return io_s; + } + uint32_t unit_checksum = crc32c::Value(data->c_str(), data->size()); + if (unit_checksum != header_meta.dump_unit_checksum) { + return IOStatus::Corruption("Read header unit corrupted!"); + } + return io_s; +} + +// Read the blocks after header is read out +IOStatus CacheDumpedLoaderImpl::ReadCacheBlock(std::string* data, + DumpUnit* dump_unit) { + // According to the write process, we read the dump_unit_metadata first + DumpUnitMeta unit_meta; + unit_meta.reset(); + std::string unit_string; + IOStatus io_s = ReadDumpUnitMeta(&unit_string, &unit_meta); + if (!io_s.ok()) { + return io_s; + } + + // Based on the information in the dump_unit_metadata, we read the dump_unit + // and verify if its content is correct. + io_s = ReadDumpUnit(unit_meta.dump_unit_size, data, dump_unit); + if (!io_s.ok()) { + return io_s; + } + uint32_t unit_checksum = crc32c::Value(data->c_str(), data->size()); + if (unit_checksum != unit_meta.dump_unit_checksum) { + return IOStatus::Corruption( + "Checksum does not match! Read dumped unit corrupted!"); + } + return io_s; +} + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE diff --git a/utilities/cache_dump_load_impl.h b/utilities/cache_dump_load_impl.h new file mode 100644 index 000000000..f799863b2 --- /dev/null +++ b/utilities/cache_dump_load_impl.h @@ -0,0 +1,363 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include + +#include "file/random_access_file_reader.h" +#include "file/writable_file_writer.h" +#include "rocksdb/utilities/cache_dump_load.h" +#include "table/block_based/block.h" +#include "table/block_based/block_like_traits.h" +#include "table/block_based/block_type.h" +#include "table/block_based/cachable_entry.h" +#include "table/block_based/parsed_full_filter_block.h" +#include "table/block_based/reader_common.h" + +namespace ROCKSDB_NAMESPACE { + +// the read buffer size of for the default CacheDumpReader +const unsigned int kDumpReaderBufferSize = 1024; // 1KB +static const unsigned int kSizePrefixLen = 4; + +enum CacheDumpUnitType : unsigned char { + kHeader = 1, + kFooter = 2, + kData = 3, + kFilter = 4, + kProperties = 5, + kCompressionDictionary = 6, + kRangeDeletion = 7, + kHashIndexPrefixes = 8, + kHashIndexMetadata = 9, + kMetaIndex = 10, + kIndex = 11, + kDeprecatedFilterBlock = 12, + kFilterMetaBlock = 13, + kBlockTypeMax, +}; + +// The metadata of a dump unit. After it is serilized, its size is fixed 16 +// bytes. +struct DumpUnitMeta { + // sequence number is a monotonically increasing number to indicate the order + // of the blocks being written. Header is 0. + uint32_t sequence_num; + // The Crc32c checksum of its dump unit. + uint32_t dump_unit_checksum; + // The dump unit size after the dump unit is serilized to a string. + uint64_t dump_unit_size; + + void reset() { + sequence_num = 0; + dump_unit_checksum = 0; + dump_unit_size = 0; + } +}; + +// The data structure to hold a block and its information. +struct DumpUnit { + // The timestamp when the block is identified, copied, and dumped from block + // cache + uint64_t timestamp; + // The type of the block + CacheDumpUnitType type; + // The key of this block when the block is referenced by this Cache + Slice key; + // The block size + size_t value_len; + // The Crc32c checksum of the block + uint32_t value_checksum; + // Pointer to the block. Note that, in the dump process, it points to a memory + // buffer copied from cache block. The buffer is freed when we process the + // next block. In the load process, we use an std::string to store the + // serilized dump_unit read from the reader. So it points to the memory + // address of the begin of the block in this string. + void* value; + + void reset() { + timestamp = 0; + type = CacheDumpUnitType::kBlockTypeMax; + key.clear(); + value_len = 0; + value_checksum = 0; + value = nullptr; + } +}; + +// The default implementation of the Cache Dumper +class CacheDumperImpl : public CacheDumper { + public: + CacheDumperImpl(const CacheDumpOptions& dump_options, + const std::shared_ptr& cache, + std::unique_ptr&& writer) + : options_(dump_options), cache_(cache), writer_(std::move(writer)) {} + ~CacheDumperImpl() { writer_.reset(); } + Status SetDumpFilter(std::vector db_list) override; + IOStatus DumpCacheEntriesToWriter() override; + + private: + IOStatus WriteRawBlock(uint64_t timestamp, CacheDumpUnitType type, + const Slice& key, void* value, size_t len, + uint32_t checksum); + + IOStatus WriteHeader(); + + IOStatus WriteCacheBlock(const CacheDumpUnitType type, const Slice& key, + void* value, size_t len); + IOStatus WriteFooter(); + bool ShouldFilterOut(const Slice& key); + std::function + DumpOneBlockCallBack(); + + CacheDumpOptions options_; + std::shared_ptr cache_; + std::unique_ptr writer_; + std::unordered_map role_map_; + SystemClock* clock_; + uint32_t sequence_num_; + // The cache key prefix filter. Currently, we use db_session_id as the prefix, + // so using std::set to store the prefixes as filter is enough. Further + // improvement can be applied like BloomFilter or others to speedup the + // filtering. + std::set prefix_filter_; +}; + +// The default implementation of CacheDumpedLoader +class CacheDumpedLoaderImpl : public CacheDumpedLoader { + public: + CacheDumpedLoaderImpl(const CacheDumpOptions& dump_options, + const BlockBasedTableOptions& toptions, + const std::shared_ptr& secondary_cache, + std::unique_ptr&& reader) + : options_(dump_options), + toptions_(toptions), + secondary_cache_(secondary_cache), + reader_(std::move(reader)) {} + ~CacheDumpedLoaderImpl() {} + IOStatus RestoreCacheEntriesToSecondaryCache() override; + + private: + IOStatus ReadDumpUnitMeta(std::string* data, DumpUnitMeta* unit_meta); + IOStatus ReadDumpUnit(size_t len, std::string* data, DumpUnit* unit); + IOStatus ReadHeader(std::string* data, DumpUnit* dump_unit); + IOStatus ReadCacheBlock(std::string* data, DumpUnit* dump_unit); + + CacheDumpOptions options_; + const BlockBasedTableOptions& toptions_; + std::shared_ptr secondary_cache_; + std::unique_ptr reader_; + std::unordered_map role_map_; +}; + +// The default implementation of CacheDumpWriter. We write the blocks to a file +// sequentially. +class ToFileCacheDumpWriter : public CacheDumpWriter { + public: + explicit ToFileCacheDumpWriter( + std::unique_ptr&& file_writer) + : file_writer_(std::move(file_writer)) {} + + ~ToFileCacheDumpWriter() { Close().PermitUncheckedError(); } + + // Write the serilized metadata to the file + virtual IOStatus WriteMetadata(const Slice& metadata) override { + assert(file_writer_ != nullptr); + std::string prefix; + PutFixed32(&prefix, static_cast(metadata.size())); + IOStatus io_s = file_writer_->Append(Slice(prefix)); + if (!io_s.ok()) { + return io_s; + } + io_s = file_writer_->Append(metadata); + return io_s; + } + + // Write the serilized data to the file + virtual IOStatus WritePacket(const Slice& data) override { + assert(file_writer_ != nullptr); + std::string prefix; + PutFixed32(&prefix, static_cast(data.size())); + IOStatus io_s = file_writer_->Append(Slice(prefix)); + if (!io_s.ok()) { + return io_s; + } + io_s = file_writer_->Append(data); + return io_s; + } + + // Reset the writer + virtual IOStatus Close() override { + file_writer_.reset(); + return IOStatus::OK(); + } + + private: + std::unique_ptr file_writer_; +}; + +// The default implementation of CacheDumpReader. It is implemented based on +// RandomAccessFileReader. Note that, we keep an internal variable to remember +// the current offset. +class FromFileCacheDumpReader : public CacheDumpReader { + public: + explicit FromFileCacheDumpReader( + std::unique_ptr&& reader) + : file_reader_(std::move(reader)), + offset_(0), + buffer_(new char[kDumpReaderBufferSize]) {} + + ~FromFileCacheDumpReader() { delete[] buffer_; } + + virtual IOStatus ReadMetadata(std::string* metadata) override { + uint32_t metadata_len = 0; + IOStatus io_s = ReadSizePrefix(&metadata_len); + if (!io_s.ok()) { + return io_s; + } + return Read(metadata_len, metadata); + } + + virtual IOStatus ReadPacket(std::string* data) override { + uint32_t data_len = 0; + IOStatus io_s = ReadSizePrefix(&data_len); + if (!io_s.ok()) { + return io_s; + } + return Read(data_len, data); + } + + private: + IOStatus ReadSizePrefix(uint32_t* len) { + std::string prefix; + IOStatus io_s = Read(kSizePrefixLen, &prefix); + if (!io_s.ok()) { + return io_s; + } + Slice encoded_slice(prefix); + if (!GetFixed32(&encoded_slice, len)) { + return IOStatus::Corruption("Decode size prefix string failed"); + } + return IOStatus::OK(); + } + + IOStatus Read(size_t len, std::string* data) { + assert(file_reader_ != nullptr); + IOStatus io_s; + + unsigned int bytes_to_read = static_cast(len); + unsigned int to_read = bytes_to_read > kDumpReaderBufferSize + ? kDumpReaderBufferSize + : bytes_to_read; + + while (to_read > 0) { + io_s = file_reader_->Read(IOOptions(), offset_, to_read, &result_, + buffer_, nullptr); + if (!io_s.ok()) { + return io_s; + } + if (result_.size() < to_read) { + return IOStatus::Corruption("Corrupted cache dump file."); + } + data->append(result_.data(), result_.size()); + + offset_ += to_read; + bytes_to_read -= to_read; + to_read = bytes_to_read > kDumpReaderBufferSize ? kDumpReaderBufferSize + : bytes_to_read; + } + return io_s; + } + std::unique_ptr file_reader_; + Slice result_; + size_t offset_; + char* buffer_; +}; + +// The cache dump and load helper class +class CacheDumperHelper { + public: + // serilize the dump_unit_meta to a string, it is fixed 16 bytes size. + static void EncodeDumpUnitMeta(const DumpUnitMeta& meta, std::string* data) { + assert(data); + PutFixed32(data, static_cast(meta.sequence_num)); + PutFixed32(data, static_cast(meta.dump_unit_checksum)); + PutFixed64(data, meta.dump_unit_size); + } + + // Serilize the dump_unit to a string. + static void EncodeDumpUnit(const DumpUnit& dump_unit, std::string* data) { + assert(data); + PutFixed64(data, dump_unit.timestamp); + data->push_back(dump_unit.type); + PutLengthPrefixedSlice(data, dump_unit.key); + PutFixed32(data, static_cast(dump_unit.value_len)); + PutFixed32(data, dump_unit.value_checksum); + PutLengthPrefixedSlice(data, + Slice((char*)dump_unit.value, dump_unit.value_len)); + } + + // Deserilize the dump_unit_meta from a string + static Status DecodeDumpUnitMeta(const std::string& encoded_data, + DumpUnitMeta* unit_meta) { + assert(unit_meta != nullptr); + Slice encoded_slice = Slice(encoded_data); + if (!GetFixed32(&encoded_slice, &(unit_meta->sequence_num))) { + return Status::Incomplete("Decode dumped unit meta sequence_num failed"); + } + if (!GetFixed32(&encoded_slice, &(unit_meta->dump_unit_checksum))) { + return Status::Incomplete( + "Decode dumped unit meta dump_unit_checksum failed"); + } + if (!GetFixed64(&encoded_slice, &(unit_meta->dump_unit_size))) { + return Status::Incomplete( + "Decode dumped unit meta dump_unit_size failed"); + } + return Status::OK(); + } + + // Deserilize the dump_unit from a string. + static Status DecodeDumpUnit(const std::string& encoded_data, + DumpUnit* dump_unit) { + assert(dump_unit != nullptr); + Slice encoded_slice = Slice(encoded_data); + + // Decode timestamp + if (!GetFixed64(&encoded_slice, &dump_unit->timestamp)) { + return Status::Incomplete("Decode dumped unit string failed"); + } + // Decode the block type + dump_unit->type = static_cast(encoded_slice[0]); + encoded_slice.remove_prefix(1); + // Decode the key + if (!GetLengthPrefixedSlice(&encoded_slice, &(dump_unit->key))) { + return Status::Incomplete("Decode dumped unit string failed"); + } + // Decode the value size + uint32_t value_len; + if (!GetFixed32(&encoded_slice, &value_len)) { + return Status::Incomplete("Decode dumped unit string failed"); + } + dump_unit->value_len = static_cast(value_len); + // Decode the value checksum + if (!GetFixed32(&encoded_slice, &(dump_unit->value_checksum))) { + return Status::Incomplete("Decode dumped unit string failed"); + } + // Decode the block content and copy to the memory space whose pointer + // will be managed by the cache finally. + Slice block; + if (!GetLengthPrefixedSlice(&encoded_slice, &block)) { + return Status::Incomplete("Decode dumped unit string failed"); + } + dump_unit->value = (void*)block.data(); + assert(block.size() == dump_unit->value_len); + return Status::OK(); + } +}; + +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE