From aa0ef6602d9f71d1a66d4ae0a3e68a59c2d9b12a Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Mon, 6 Jan 2014 20:29:17 -0800 Subject: [PATCH] [Performance Branch] If options.max_open_files set to be -1, cache table readers in FileMetadata for Get() and NewIterator() Summary: In some use cases, table readers for all live files should always be cached. In that case, there will be an opportunity to avoid the table cache look-up while Get() and NewIterator(). We define options.max_open_files = -1 to be the mode that table readers for live files will always be kept. In that mode, table readers are cached in FileMetaData (with a reference count hold in table cache). So that when executing table_cache.Get() and table_cache.newInterator(), LRU cache checking can be by-passed, to reduce latency. Test Plan: add a test case in db_test Reviewers: haobo, kailiu Reviewed By: haobo CC: dhruba, igor, leveldb Differential Revision: https://reviews.facebook.net/D15039 --- db/builder.cc | 3 +-- db/db_impl.cc | 21 ++++++++++----- db/db_test.cc | 4 +++ db/repair.cc | 3 ++- db/table_cache.cc | 54 ++++++++++++++++++++++--------------- db/table_cache.h | 22 +++++++++------ db/version_edit.h | 11 ++++++-- db/version_set.cc | 56 ++++++++++++++++++++++++++++----------- db/version_set.h | 2 +- include/rocksdb/options.h | 6 +++-- 10 files changed, 124 insertions(+), 58 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index ad1334a15..53d2f8985 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -204,8 +204,7 @@ Status BuildTable(const std::string& dbname, // Verify that the table is usable Iterator* it = table_cache->NewIterator(ReadOptions(), soptions, - meta->number, - meta->file_size); + *meta); s = it->status(); delete it; } diff --git a/db/db_impl.cc b/db/db_impl.cc index 8bcdbf4ae..297ac0ade 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -126,7 +126,10 @@ Options SanitizeOptions(const std::string& dbname, Options result = src; result.comparator = icmp; result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr; - ClipToRange(&result.max_open_files, 20, 1000000); + // result.max_open_files means an "infinite" open files. + if (result.max_open_files != -1) { + ClipToRange(&result.max_open_files, 20, 1000000); + } ClipToRange(&result.write_buffer_size, ((size_t)64)<<10, ((size_t)64)<<30); ClipToRange(&result.block_size, 1<<10, 4<<20); @@ -278,7 +281,10 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) } // Reserve ten files or so for other uses and give the rest to TableCache. - const int table_cache_size = options_.max_open_files - 10; + // Give a large number for setting of "infinite" open files. + const int table_cache_size = + (options_.max_open_files == -1) ? + 4194304 : options_.max_open_files - 10; table_cache_.reset(new TableCache(dbname_, &options_, storage_options_, table_cache_size)); versions_.reset(new VersionSet(dbname_, &options_, storage_options_, @@ -335,6 +341,9 @@ DBImpl::~DBImpl() { for (MemTable* m: to_delete) { delete m; } + // versions need to be destroyed before table_cache since it can holds + // references to table_cache. + versions_.reset(); LogFlush(options_.info_log); } @@ -2095,10 +2104,10 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, if (s.ok() && current_entries > 0) { // Verify that the table is usable + FileMetaData meta(output_number, current_bytes); Iterator* iter = table_cache_->NewIterator(ReadOptions(), storage_options_, - output_number, - current_bytes); + meta); s = iter->status(); delete iter; if (s.ok()) { @@ -3701,7 +3710,7 @@ Status DBImpl::DeleteFile(std::string name) { } int level; - FileMetaData metadata; + FileMetaData* metadata; int maxlevel = NumberLevels(); VersionEdit edit(maxlevel); DeletionState deletion_state(true); @@ -3716,7 +3725,7 @@ Status DBImpl::DeleteFile(std::string name) { assert((level > 0) && (level < maxlevel)); // If the file is being compacted no need to delete. - if (metadata.being_compacted) { + if (metadata->being_compacted) { Log(options_.info_log, "DeleteFile %s Skipped. File about to be compacted\n", name.c_str()); return Status::OK(); diff --git a/db/db_test.cc b/db/db_test.cc index 8e2bc9f27..838492f1a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -265,6 +265,7 @@ class DBTest { kHashSkipList, kUniversalCompaction, kCompressedBlockCache, + kInfiniteMaxOpenFiles, kEnd }; int option_config_; @@ -415,6 +416,9 @@ class DBTest { case kCompressedBlockCache: options.block_cache_compressed = NewLRUCache(8*1024*1024); break; + case kInfiniteMaxOpenFiles: + options.max_open_files = -1; + break; default: break; } diff --git a/db/repair.cc b/db/repair.cc index fc9ba282d..802c04fc4 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -265,8 +265,9 @@ class Repairer { int counter = 0; Status status = env_->GetFileSize(fname, &t->meta.file_size); if (status.ok()) { + FileMetaData dummy_meta(t->meta.number, t->meta.file_size); Iterator* iter = table_cache_->NewIterator( - ReadOptions(), storage_options_, t->meta.number, t->meta.file_size); + ReadOptions(), storage_options_, dummy_meta); bool empty = true; ParsedInternalKey parsed; t->min_sequence = 0; diff --git a/db/table_cache.cc b/db/table_cache.cc index 593352dde..527a10cba 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -10,6 +10,7 @@ #include "db/table_cache.h" #include "db/filename.h" +#include "db/version_edit.h" #include "rocksdb/statistics.h" #include "rocksdb/table.h" @@ -50,6 +51,14 @@ TableCache::TableCache(const std::string& dbname, TableCache::~TableCache() { } +TableReader* TableCache::GetTableReaderFromHandle(Cache::Handle* handle) { + return reinterpret_cast(cache_->Value(handle)); +} + +void TableCache::ReleaseHandle(Cache::Handle* handle) { + cache_->Release(handle); +} + Status TableCache::FindTable(const EnvOptions& toptions, uint64_t file_number, uint64_t file_size, Cache::Handle** handle, bool* table_io, @@ -94,25 +103,27 @@ Status TableCache::FindTable(const EnvOptions& toptions, Iterator* TableCache::NewIterator(const ReadOptions& options, const EnvOptions& toptions, - uint64_t file_number, - uint64_t file_size, + const FileMetaData& file_meta, TableReader** table_reader_ptr, bool for_compaction) { if (table_reader_ptr != nullptr) { *table_reader_ptr = nullptr; } - - Cache::Handle* handle = nullptr; - Status s = FindTable(toptions, file_number, file_size, &handle, - nullptr, options.read_tier == kBlockCacheTier); + Cache::Handle* handle = file_meta.table_reader_handle; + Status s; + if (!handle) { + s = FindTable(toptions, file_meta.number, file_meta.file_size, &handle, + nullptr, options.read_tier == kBlockCacheTier); + } if (!s.ok()) { return NewErrorIterator(s); } - TableReader* table_reader = - reinterpret_cast(cache_->Value(handle)); + TableReader* table_reader = GetTableReaderFromHandle(handle); Iterator* result = table_reader->NewIterator(options); - result->RegisterCleanup(&UnrefEntry, cache_.get(), handle); + if (!file_meta.table_reader_handle) { + result->RegisterCleanup(&UnrefEntry, cache_.get(), handle); + } if (table_reader_ptr != nullptr) { *table_reader_ptr = table_reader; } @@ -125,22 +136,24 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, } Status TableCache::Get(const ReadOptions& options, - uint64_t file_number, - uint64_t file_size, + const FileMetaData& file_meta, const Slice& k, void* arg, bool (*saver)(void*, const Slice&, const Slice&, bool), bool* table_io, void (*mark_key_may_exist)(void*)) { - Cache::Handle* handle = nullptr; - Status s = FindTable(storage_options_, file_number, file_size, - &handle, table_io, - options.read_tier == kBlockCacheTier); + Cache::Handle* handle = file_meta.table_reader_handle; + Status s; + if (!handle) { + s = FindTable(storage_options_, file_meta.number, file_meta.file_size, + &handle, table_io, options.read_tier == kBlockCacheTier); + } if (s.ok()) { - TableReader* t = - reinterpret_cast(cache_->Value(handle)); + TableReader* t = GetTableReaderFromHandle(handle); s = t->Get(options, k, arg, saver, mark_key_may_exist); - cache_->Release(handle); + if (!file_meta.table_reader_handle) { + ReleaseHandle(handle); + } } else if (options.read_tier && s.IsIncomplete()) { // Couldnt find Table in cache but treat as kFound if no_io set (*mark_key_may_exist)(arg); @@ -159,10 +172,9 @@ bool TableCache::PrefixMayMatch(const ReadOptions& options, file_size, &handle, table_io); bool may_match = true; if (s.ok()) { - TableReader* t = - reinterpret_cast(cache_->Value(handle)); + TableReader* t = GetTableReaderFromHandle(handle); may_match = t->PrefixMayMatch(internal_prefix); - cache_->Release(handle); + ReleaseHandle(handle); } return may_match; } diff --git a/db/table_cache.h b/db/table_cache.h index f65326bad..ba50ae4d5 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -21,6 +21,7 @@ namespace rocksdb { class Env; +struct FileMetaData; class TableCache { public: @@ -37,8 +38,7 @@ class TableCache { // returned iterator is live. Iterator* NewIterator(const ReadOptions& options, const EnvOptions& toptions, - uint64_t file_number, - uint64_t file_size, + const FileMetaData& file_meta, TableReader** table_reader_ptr = nullptr, bool for_compaction = false); @@ -46,8 +46,7 @@ class TableCache { // call (*handle_result)(arg, found_key, found_value) repeatedly until // it returns false. Status Get(const ReadOptions& options, - uint64_t file_number, - uint64_t file_size, + const FileMetaData& file_meta, const Slice& k, void* arg, bool (*handle_result)(void*, const Slice&, const Slice&, bool), @@ -63,16 +62,23 @@ class TableCache { // Evict any entry for the specified file number void Evict(uint64_t file_number); + // Find table reader + Status FindTable(const EnvOptions& toptions, uint64_t file_number, + uint64_t file_size, Cache::Handle**, bool* table_io=nullptr, + const bool no_io = false); + + // Get TableReader from a cache handle. + TableReader* GetTableReaderFromHandle(Cache::Handle* handle); + + // Release the handle from a cache + void ReleaseHandle(Cache::Handle* handle); + private: Env* const env_; const std::string dbname_; const Options* options_; const EnvOptions& storage_options_; std::shared_ptr cache_; - - Status FindTable(const EnvOptions& toptions, uint64_t file_number, - uint64_t file_size, Cache::Handle**, bool* table_io=nullptr, - const bool no_io = false); }; } // namespace rocksdb diff --git a/db/version_edit.h b/db/version_edit.h index d6fac1c3c..7751ad92e 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -11,6 +11,7 @@ #include #include #include +#include "rocksdb/cache.h" #include "db/dbformat.h" namespace rocksdb { @@ -28,8 +29,14 @@ struct FileMetaData { SequenceNumber smallest_seqno;// The smallest seqno in this file SequenceNumber largest_seqno; // The largest seqno in this file - FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0), - being_compacted(false) { } + // Needs to be disposed when refs becomes 0. + Cache::Handle* table_reader_handle; + + FileMetaData(uint64_t number, uint64_t file_size) : + refs(0), allowed_seeks(1 << 30), number(number), file_size(file_size), + being_compacted(false), table_reader_handle(nullptr) { + } + FileMetaData() : FileMetaData(0, 0) { } }; class VersionEdit { diff --git a/db/version_set.cc b/db/version_set.cc index e2421ef92..eb58e2780 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -51,6 +51,10 @@ Version::~Version() { assert(f->refs > 0); f->refs--; if (f->refs <= 0) { + if (f->table_reader_handle) { + vset_->table_cache_->ReleaseHandle(f->table_reader_handle); + f->table_reader_handle = nullptr; + } vset_->obsolete_files_.push_back(f); } } @@ -202,10 +206,11 @@ static Iterator* GetFileIterator(void* arg, options_copy = options; options_copy.prefix = nullptr; } + FileMetaData meta(DecodeFixed64(file_value.data()), + DecodeFixed64(file_value.data() + 8)); return cache->NewIterator(options.prefix ? options_copy : options, soptions, - DecodeFixed64(file_value.data()), - DecodeFixed64(file_value.data() + 8), + meta, nullptr /* don't need reference to table*/, for_compaction); } @@ -257,9 +262,8 @@ void Version::AddIterators(const ReadOptions& options, std::vector* iters) { // Merge all level zero files together since they may overlap for (const FileMetaData* file : files_[0]) { - iters->push_back( - vset_->table_cache_->NewIterator( - options, soptions, file->number, file->file_size)); + iters->push_back(vset_->table_cache_->NewIterator(options, soptions, + *file)); } // For levels > 0, we can use a concatenating iterator that sequentially @@ -513,9 +517,8 @@ void Version::Get(const ReadOptions& options, prev_file = f; #endif bool tableIO = false; - *status = vset_->table_cache_->Get(options, f->number, f->file_size, - ikey, &saver, SaveValue, &tableIO, - MarkKeyMayExist); + *status = vset_->table_cache_->Get(options, *f, ikey, &saver, SaveValue, + &tableIO, MarkKeyMayExist); // TODO: examine the behavior for corrupted key if (!status->ok()) { return; @@ -954,6 +957,11 @@ class VersionSet::Builder { FileMetaData* f = to_unref[i]; f->refs--; if (f->refs <= 0) { + if (f->table_reader_handle) { + vset_->table_cache_->ReleaseHandle( + f->table_reader_handle); + f->table_reader_handle = nullptr; + } delete f; } } @@ -1113,6 +1121,20 @@ class VersionSet::Builder { CheckConsistency(v); } + void LoadTableHandlers() { + for (int level = 0; level < vset_->NumberLevels(); level++) { + for (auto& file_meta : *(levels_[level].added_files)) { + assert (!file_meta->table_reader_handle); + bool table_io; + vset_->table_cache_->FindTable(vset_->storage_options_, + file_meta->number, + file_meta->file_size, + &file_meta->table_reader_handle, + &table_io, false); + } + } + } + void MaybeAddFile(Version* v, int level, FileMetaData* f) { if (levels_[level].deleted_files.count(f->number) > 0) { // File is deleted: do nothing @@ -1258,7 +1280,7 @@ Status VersionSet::LogAndApply( edit->SetNextFile(next_file_number_); } - // Unlock during expensive MANIFEST log write. New writes cannot get here + // Unlock during expensive operations. New writes cannot get here // because &w is ensuring that all new writes get queued. { // calculate the amount of data being compacted at every level @@ -1267,6 +1289,12 @@ Status VersionSet::LogAndApply( mu->Unlock(); + if (options_->max_open_files == -1) { + // unlimited table cache. Pre-load table handle now. + // Need to do it out of the mutex. + builder.LoadTableHandlers(); + } + // This is fine because everything inside of this block is serialized -- // only one thread can be here at the same time if (!new_manifest_filename.empty()) { @@ -1966,8 +1994,7 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { // approximate offset of "ikey" within the table. TableReader* table_reader_ptr; Iterator* iter = table_cache_->NewIterator( - ReadOptions(), storage_options_, files[i]->number, - files[i]->file_size, &table_reader_ptr); + ReadOptions(), storage_options_, *(files[i]), &table_reader_ptr); if (table_reader_ptr != nullptr) { result += table_reader_ptr->ApproximateOffsetOf(ikey.Encode()); } @@ -2092,8 +2119,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { for (size_t i = 0; i < files.size(); i++) { list[num++] = table_cache_->NewIterator( options, storage_options_compactions_, - files[i]->number, files[i]->file_size, nullptr, - true /* for compaction */); + *(files[i]), nullptr, true /* for compaction */); } } else { // Create concatenating iterator for the files from this level @@ -2876,12 +2902,12 @@ void VersionSet::SetupOtherInputs(Compaction* c) { Status VersionSet::GetMetadataForFile( uint64_t number, int *filelevel, - FileMetaData *meta) { + FileMetaData **meta) { for (int level = 0; level < NumberLevels(); level++) { const std::vector& files = current_->files_[level]; for (size_t i = 0; i < files.size(); i++) { if (files[i]->number == number) { - *meta = *files[i]; + *meta = files[i]; *filelevel = level; return Status::OK(); } diff --git a/db/version_set.h b/db/version_set.h index 3f8f95585..d5dc2cb13 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -431,7 +431,7 @@ class VersionSet { double MaxBytesForLevel(int level); Status GetMetadataForFile( - uint64_t number, int *filelevel, FileMetaData *metadata); + uint64_t number, int *filelevel, FileMetaData **metadata); void GetLiveFilesMetaData( std::vector *metadata); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 5041ea593..660d36838 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -182,8 +182,10 @@ struct Options { int min_write_buffer_number_to_merge; // Number of open files that can be used by the DB. You may need to - // increase this if your database has a large working set (budget - // one open file per 2MB of working set). + // increase this if your database has a large working set. Value -1 means + // files opened are always kept open. You can estimate number of files based + // on target_file_size_base and target_file_size_multiplier for level-based + // compaction. For universal-style compaction, you can usually set it to -1. // // Default: 1000 int max_open_files;