[Performance Branch] If options.max_open_files set to be -1, cache table readers in FileMetadata for Get() and NewIterator()

Summary:
In some use cases, table readers for all live files should always be cached. In that case, there will be an opportunity to avoid the table cache look-up while Get() and NewIterator().

We define options.max_open_files = -1 to be the mode that table readers for live files will always be kept. In that mode, table readers are cached in FileMetaData (with a reference count hold in table cache). So that when executing table_cache.Get() and table_cache.newInterator(), LRU cache checking can be by-passed, to reduce latency.

Test Plan: add a test case in db_test

Reviewers: haobo, kailiu

Reviewed By: haobo

CC: dhruba, igor, leveldb

Differential Revision: https://reviews.facebook.net/D15039
main
Siying Dong 11 years ago
parent 5b5ab0c1a8
commit aa0ef6602d
  1. 3
      db/builder.cc
  2. 21
      db/db_impl.cc
  3. 4
      db/db_test.cc
  4. 3
      db/repair.cc
  5. 54
      db/table_cache.cc
  6. 22
      db/table_cache.h
  7. 11
      db/version_edit.h
  8. 56
      db/version_set.cc
  9. 2
      db/version_set.h
  10. 6
      include/rocksdb/options.h

@ -204,8 +204,7 @@ Status BuildTable(const std::string& dbname,
// Verify that the table is usable
Iterator* it = table_cache->NewIterator(ReadOptions(),
soptions,
meta->number,
meta->file_size);
*meta);
s = it->status();
delete it;
}

@ -126,7 +126,10 @@ Options SanitizeOptions(const std::string& dbname,
Options result = src;
result.comparator = icmp;
result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
ClipToRange(&result.max_open_files, 20, 1000000);
// result.max_open_files means an "infinite" open files.
if (result.max_open_files != -1) {
ClipToRange(&result.max_open_files, 20, 1000000);
}
ClipToRange(&result.write_buffer_size, ((size_t)64)<<10,
((size_t)64)<<30);
ClipToRange(&result.block_size, 1<<10, 4<<20);
@ -278,7 +281,10 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
}
// Reserve ten files or so for other uses and give the rest to TableCache.
const int table_cache_size = options_.max_open_files - 10;
// Give a large number for setting of "infinite" open files.
const int table_cache_size =
(options_.max_open_files == -1) ?
4194304 : options_.max_open_files - 10;
table_cache_.reset(new TableCache(dbname_, &options_,
storage_options_, table_cache_size));
versions_.reset(new VersionSet(dbname_, &options_, storage_options_,
@ -335,6 +341,9 @@ DBImpl::~DBImpl() {
for (MemTable* m: to_delete) {
delete m;
}
// versions need to be destroyed before table_cache since it can holds
// references to table_cache.
versions_.reset();
LogFlush(options_.info_log);
}
@ -2095,10 +2104,10 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
if (s.ok() && current_entries > 0) {
// Verify that the table is usable
FileMetaData meta(output_number, current_bytes);
Iterator* iter = table_cache_->NewIterator(ReadOptions(),
storage_options_,
output_number,
current_bytes);
meta);
s = iter->status();
delete iter;
if (s.ok()) {
@ -3701,7 +3710,7 @@ Status DBImpl::DeleteFile(std::string name) {
}
int level;
FileMetaData metadata;
FileMetaData* metadata;
int maxlevel = NumberLevels();
VersionEdit edit(maxlevel);
DeletionState deletion_state(true);
@ -3716,7 +3725,7 @@ Status DBImpl::DeleteFile(std::string name) {
assert((level > 0) && (level < maxlevel));
// If the file is being compacted no need to delete.
if (metadata.being_compacted) {
if (metadata->being_compacted) {
Log(options_.info_log,
"DeleteFile %s Skipped. File about to be compacted\n", name.c_str());
return Status::OK();

@ -265,6 +265,7 @@ class DBTest {
kHashSkipList,
kUniversalCompaction,
kCompressedBlockCache,
kInfiniteMaxOpenFiles,
kEnd
};
int option_config_;
@ -415,6 +416,9 @@ class DBTest {
case kCompressedBlockCache:
options.block_cache_compressed = NewLRUCache(8*1024*1024);
break;
case kInfiniteMaxOpenFiles:
options.max_open_files = -1;
break;
default:
break;
}

@ -265,8 +265,9 @@ class Repairer {
int counter = 0;
Status status = env_->GetFileSize(fname, &t->meta.file_size);
if (status.ok()) {
FileMetaData dummy_meta(t->meta.number, t->meta.file_size);
Iterator* iter = table_cache_->NewIterator(
ReadOptions(), storage_options_, t->meta.number, t->meta.file_size);
ReadOptions(), storage_options_, dummy_meta);
bool empty = true;
ParsedInternalKey parsed;
t->min_sequence = 0;

@ -10,6 +10,7 @@
#include "db/table_cache.h"
#include "db/filename.h"
#include "db/version_edit.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table.h"
@ -50,6 +51,14 @@ TableCache::TableCache(const std::string& dbname,
TableCache::~TableCache() {
}
TableReader* TableCache::GetTableReaderFromHandle(Cache::Handle* handle) {
return reinterpret_cast<TableReader*>(cache_->Value(handle));
}
void TableCache::ReleaseHandle(Cache::Handle* handle) {
cache_->Release(handle);
}
Status TableCache::FindTable(const EnvOptions& toptions,
uint64_t file_number, uint64_t file_size,
Cache::Handle** handle, bool* table_io,
@ -94,25 +103,27 @@ Status TableCache::FindTable(const EnvOptions& toptions,
Iterator* TableCache::NewIterator(const ReadOptions& options,
const EnvOptions& toptions,
uint64_t file_number,
uint64_t file_size,
const FileMetaData& file_meta,
TableReader** table_reader_ptr,
bool for_compaction) {
if (table_reader_ptr != nullptr) {
*table_reader_ptr = nullptr;
}
Cache::Handle* handle = nullptr;
Status s = FindTable(toptions, file_number, file_size, &handle,
nullptr, options.read_tier == kBlockCacheTier);
Cache::Handle* handle = file_meta.table_reader_handle;
Status s;
if (!handle) {
s = FindTable(toptions, file_meta.number, file_meta.file_size, &handle,
nullptr, options.read_tier == kBlockCacheTier);
}
if (!s.ok()) {
return NewErrorIterator(s);
}
TableReader* table_reader =
reinterpret_cast<TableReader*>(cache_->Value(handle));
TableReader* table_reader = GetTableReaderFromHandle(handle);
Iterator* result = table_reader->NewIterator(options);
result->RegisterCleanup(&UnrefEntry, cache_.get(), handle);
if (!file_meta.table_reader_handle) {
result->RegisterCleanup(&UnrefEntry, cache_.get(), handle);
}
if (table_reader_ptr != nullptr) {
*table_reader_ptr = table_reader;
}
@ -125,22 +136,24 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
}
Status TableCache::Get(const ReadOptions& options,
uint64_t file_number,
uint64_t file_size,
const FileMetaData& file_meta,
const Slice& k,
void* arg,
bool (*saver)(void*, const Slice&, const Slice&, bool),
bool* table_io,
void (*mark_key_may_exist)(void*)) {
Cache::Handle* handle = nullptr;
Status s = FindTable(storage_options_, file_number, file_size,
&handle, table_io,
options.read_tier == kBlockCacheTier);
Cache::Handle* handle = file_meta.table_reader_handle;
Status s;
if (!handle) {
s = FindTable(storage_options_, file_meta.number, file_meta.file_size,
&handle, table_io, options.read_tier == kBlockCacheTier);
}
if (s.ok()) {
TableReader* t =
reinterpret_cast<TableReader*>(cache_->Value(handle));
TableReader* t = GetTableReaderFromHandle(handle);
s = t->Get(options, k, arg, saver, mark_key_may_exist);
cache_->Release(handle);
if (!file_meta.table_reader_handle) {
ReleaseHandle(handle);
}
} else if (options.read_tier && s.IsIncomplete()) {
// Couldnt find Table in cache but treat as kFound if no_io set
(*mark_key_may_exist)(arg);
@ -159,10 +172,9 @@ bool TableCache::PrefixMayMatch(const ReadOptions& options,
file_size, &handle, table_io);
bool may_match = true;
if (s.ok()) {
TableReader* t =
reinterpret_cast<TableReader*>(cache_->Value(handle));
TableReader* t = GetTableReaderFromHandle(handle);
may_match = t->PrefixMayMatch(internal_prefix);
cache_->Release(handle);
ReleaseHandle(handle);
}
return may_match;
}

@ -21,6 +21,7 @@
namespace rocksdb {
class Env;
struct FileMetaData;
class TableCache {
public:
@ -37,8 +38,7 @@ class TableCache {
// returned iterator is live.
Iterator* NewIterator(const ReadOptions& options,
const EnvOptions& toptions,
uint64_t file_number,
uint64_t file_size,
const FileMetaData& file_meta,
TableReader** table_reader_ptr = nullptr,
bool for_compaction = false);
@ -46,8 +46,7 @@ class TableCache {
// call (*handle_result)(arg, found_key, found_value) repeatedly until
// it returns false.
Status Get(const ReadOptions& options,
uint64_t file_number,
uint64_t file_size,
const FileMetaData& file_meta,
const Slice& k,
void* arg,
bool (*handle_result)(void*, const Slice&, const Slice&, bool),
@ -63,16 +62,23 @@ class TableCache {
// Evict any entry for the specified file number
void Evict(uint64_t file_number);
// Find table reader
Status FindTable(const EnvOptions& toptions, uint64_t file_number,
uint64_t file_size, Cache::Handle**, bool* table_io=nullptr,
const bool no_io = false);
// Get TableReader from a cache handle.
TableReader* GetTableReaderFromHandle(Cache::Handle* handle);
// Release the handle from a cache
void ReleaseHandle(Cache::Handle* handle);
private:
Env* const env_;
const std::string dbname_;
const Options* options_;
const EnvOptions& storage_options_;
std::shared_ptr<Cache> cache_;
Status FindTable(const EnvOptions& toptions, uint64_t file_number,
uint64_t file_size, Cache::Handle**, bool* table_io=nullptr,
const bool no_io = false);
};
} // namespace rocksdb

@ -11,6 +11,7 @@
#include <set>
#include <utility>
#include <vector>
#include "rocksdb/cache.h"
#include "db/dbformat.h"
namespace rocksdb {
@ -28,8 +29,14 @@ struct FileMetaData {
SequenceNumber smallest_seqno;// The smallest seqno in this file
SequenceNumber largest_seqno; // The largest seqno in this file
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0),
being_compacted(false) { }
// Needs to be disposed when refs becomes 0.
Cache::Handle* table_reader_handle;
FileMetaData(uint64_t number, uint64_t file_size) :
refs(0), allowed_seeks(1 << 30), number(number), file_size(file_size),
being_compacted(false), table_reader_handle(nullptr) {
}
FileMetaData() : FileMetaData(0, 0) { }
};
class VersionEdit {

@ -51,6 +51,10 @@ Version::~Version() {
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
if (f->table_reader_handle) {
vset_->table_cache_->ReleaseHandle(f->table_reader_handle);
f->table_reader_handle = nullptr;
}
vset_->obsolete_files_.push_back(f);
}
}
@ -202,10 +206,11 @@ static Iterator* GetFileIterator(void* arg,
options_copy = options;
options_copy.prefix = nullptr;
}
FileMetaData meta(DecodeFixed64(file_value.data()),
DecodeFixed64(file_value.data() + 8));
return cache->NewIterator(options.prefix ? options_copy : options,
soptions,
DecodeFixed64(file_value.data()),
DecodeFixed64(file_value.data() + 8),
meta,
nullptr /* don't need reference to table*/,
for_compaction);
}
@ -257,9 +262,8 @@ void Version::AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iters) {
// Merge all level zero files together since they may overlap
for (const FileMetaData* file : files_[0]) {
iters->push_back(
vset_->table_cache_->NewIterator(
options, soptions, file->number, file->file_size));
iters->push_back(vset_->table_cache_->NewIterator(options, soptions,
*file));
}
// For levels > 0, we can use a concatenating iterator that sequentially
@ -513,9 +517,8 @@ void Version::Get(const ReadOptions& options,
prev_file = f;
#endif
bool tableIO = false;
*status = vset_->table_cache_->Get(options, f->number, f->file_size,
ikey, &saver, SaveValue, &tableIO,
MarkKeyMayExist);
*status = vset_->table_cache_->Get(options, *f, ikey, &saver, SaveValue,
&tableIO, MarkKeyMayExist);
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
return;
@ -954,6 +957,11 @@ class VersionSet::Builder {
FileMetaData* f = to_unref[i];
f->refs--;
if (f->refs <= 0) {
if (f->table_reader_handle) {
vset_->table_cache_->ReleaseHandle(
f->table_reader_handle);
f->table_reader_handle = nullptr;
}
delete f;
}
}
@ -1113,6 +1121,20 @@ class VersionSet::Builder {
CheckConsistency(v);
}
void LoadTableHandlers() {
for (int level = 0; level < vset_->NumberLevels(); level++) {
for (auto& file_meta : *(levels_[level].added_files)) {
assert (!file_meta->table_reader_handle);
bool table_io;
vset_->table_cache_->FindTable(vset_->storage_options_,
file_meta->number,
file_meta->file_size,
&file_meta->table_reader_handle,
&table_io, false);
}
}
}
void MaybeAddFile(Version* v, int level, FileMetaData* f) {
if (levels_[level].deleted_files.count(f->number) > 0) {
// File is deleted: do nothing
@ -1258,7 +1280,7 @@ Status VersionSet::LogAndApply(
edit->SetNextFile(next_file_number_);
}
// Unlock during expensive MANIFEST log write. New writes cannot get here
// Unlock during expensive operations. New writes cannot get here
// because &w is ensuring that all new writes get queued.
{
// calculate the amount of data being compacted at every level
@ -1267,6 +1289,12 @@ Status VersionSet::LogAndApply(
mu->Unlock();
if (options_->max_open_files == -1) {
// unlimited table cache. Pre-load table handle now.
// Need to do it out of the mutex.
builder.LoadTableHandlers();
}
// This is fine because everything inside of this block is serialized --
// only one thread can be here at the same time
if (!new_manifest_filename.empty()) {
@ -1966,8 +1994,7 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
// approximate offset of "ikey" within the table.
TableReader* table_reader_ptr;
Iterator* iter = table_cache_->NewIterator(
ReadOptions(), storage_options_, files[i]->number,
files[i]->file_size, &table_reader_ptr);
ReadOptions(), storage_options_, *(files[i]), &table_reader_ptr);
if (table_reader_ptr != nullptr) {
result += table_reader_ptr->ApproximateOffsetOf(ikey.Encode());
}
@ -2092,8 +2119,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
for (size_t i = 0; i < files.size(); i++) {
list[num++] = table_cache_->NewIterator(
options, storage_options_compactions_,
files[i]->number, files[i]->file_size, nullptr,
true /* for compaction */);
*(files[i]), nullptr, true /* for compaction */);
}
} else {
// Create concatenating iterator for the files from this level
@ -2876,12 +2902,12 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
Status VersionSet::GetMetadataForFile(
uint64_t number,
int *filelevel,
FileMetaData *meta) {
FileMetaData **meta) {
for (int level = 0; level < NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = current_->files_[level];
for (size_t i = 0; i < files.size(); i++) {
if (files[i]->number == number) {
*meta = *files[i];
*meta = files[i];
*filelevel = level;
return Status::OK();
}

@ -431,7 +431,7 @@ class VersionSet {
double MaxBytesForLevel(int level);
Status GetMetadataForFile(
uint64_t number, int *filelevel, FileMetaData *metadata);
uint64_t number, int *filelevel, FileMetaData **metadata);
void GetLiveFilesMetaData(
std::vector<LiveFileMetaData> *metadata);

@ -182,8 +182,10 @@ struct Options {
int min_write_buffer_number_to_merge;
// Number of open files that can be used by the DB. You may need to
// increase this if your database has a large working set (budget
// one open file per 2MB of working set).
// increase this if your database has a large working set. Value -1 means
// files opened are always kept open. You can estimate number of files based
// on target_file_size_base and target_file_size_multiplier for level-based
// compaction. For universal-style compaction, you can usually set it to -1.
//
// Default: 1000
int max_open_files;

Loading…
Cancel
Save