[CF] Rethink table cache

Summary:
Adapting table cache to column families is interesting. We want table cache to be global LRU, so if some column families are use not as often as others, we want them to be evicted from cache. However, current TableCache object also constructs tables on its own. If table is not found in the cache, TableCache automatically creates new table. We want each column family to be able to specify different table factory.

To solve the problem, we still have a single LRU, but we provide the LRUCache object to TableCache on construction. We have one TableCache per column family, but the underyling cache is shared by all TableCache objects.

This allows us to have a global LRU, but still be able to support different table factories for different column families. Also, in the future it will also be able to support different directories for different column families.

Test Plan: make check

Reviewers: dhruba, haobo, kailiu, sdong

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15915
main
Igor Canadi 11 years ago
parent 7b9f134959
commit c24d8c4e90
  1. 29
      db/column_family.cc
  2. 23
      db/column_family.h
  3. 25
      db/db_impl.cc
  4. 5
      db/db_impl.h
  5. 13
      db/repair.cc
  6. 39
      db/table_cache.cc
  7. 12
      db/table_cache.h
  8. 45
      db/version_set.cc
  9. 4
      db/version_set.h
  10. 14
      util/ldb_cmd.cc

@ -128,10 +128,12 @@ void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
refs.store(1, std::memory_order_relaxed);
}
ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions,
ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
const std::string& name,
Version* dummy_versions, Cache* table_cache,
const ColumnFamilyOptions& options,
const Options* db_options)
const Options* db_options,
const EnvOptions& storage_options)
: id_(id),
name_(name),
dummy_versions_(dummy_versions),
@ -148,9 +150,12 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
prev_(nullptr),
log_number_(0),
need_slowdown_for_num_level0_files_(false) {
// if db_options is nullptr, then this is a dummy column family.
if (db_options != nullptr) {
internal_stats_.reset(new InternalStats(options.num_levels, db_options->env,
db_options->statistics.get()));
table_cache_.reset(new TableCache(dbname, db_options, &options_,
storage_options, table_cache));
if (options_.compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset(new UniversalCompactionPicker(
&options_, &internal_comparator_, db_options->info_log.get()));
@ -230,11 +235,18 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion(
return nullptr;
}
ColumnFamilySet::ColumnFamilySet(const Options* db_options)
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const Options* db_options,
const EnvOptions& storage_options,
Cache* table_cache)
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(0, "", nullptr, ColumnFamilyOptions(),
nullptr)),
db_options_(db_options) {
dummy_cfd_(new ColumnFamilyData(dbname, 0, "", nullptr, nullptr,
ColumnFamilyOptions(), nullptr,
storage_options_)),
db_name_(dbname),
db_options_(db_options),
storage_options_(storage_options),
table_cache_(table_cache) {
// initialize linked list
dummy_cfd_->prev_.store(dummy_cfd_);
dummy_cfd_->next_.store(dummy_cfd_);
@ -290,7 +302,8 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
assert(column_families_.find(name) == column_families_.end());
column_families_.insert({name, id});
ColumnFamilyData* new_cfd =
new ColumnFamilyData(id, name, dummy_versions, options, db_options_);
new ColumnFamilyData(db_name_, id, name, dummy_versions, table_cache_,
options, db_options_, storage_options_);
column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id);
// add to linked list

@ -17,6 +17,7 @@
#include "rocksdb/env.h"
#include "db/memtablelist.h"
#include "db/write_batch_internal.h"
#include "db/table_cache.h"
namespace rocksdb {
@ -64,11 +65,6 @@ extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
// column family metadata. not thread-safe. should be protected by db_mutex
class ColumnFamilyData {
public:
ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, const ColumnFamilyOptions& options,
const Options* db_options);
~ColumnFamilyData();
uint32_t GetID() const { return id_; }
const std::string& GetName() { return name_; }
@ -88,6 +84,8 @@ class ColumnFamilyData {
void SetCurrent(Version* current);
void CreateNewMemtable();
TableCache* table_cache() const { return table_cache_.get(); }
// See documentation in compaction_picker.h
Compaction* PickCompaction();
Compaction* CompactRange(int input_level, int output_level,
@ -122,6 +120,12 @@ class ColumnFamilyData {
private:
friend class ColumnFamilySet;
ColumnFamilyData(const std::string& dbname, uint32_t id,
const std::string& name, Version* dummy_versions,
Cache* table_cache, const ColumnFamilyOptions& options,
const Options* db_options,
const EnvOptions& storage_options);
~ColumnFamilyData();
ColumnFamilyData* next() { return next_.load(); }
@ -135,6 +139,8 @@ class ColumnFamilyData {
ColumnFamilyOptions options_;
std::unique_ptr<TableCache> table_cache_;
std::unique_ptr<InternalStats> internal_stats_;
MemTable* mem_;
@ -186,7 +192,8 @@ class ColumnFamilySet {
ColumnFamilyData* current_;
};
explicit ColumnFamilySet(const Options* db_options_);
ColumnFamilySet(const std::string& dbname, const Options* db_options_,
const EnvOptions& storage_options, Cache* table_cache);
~ColumnFamilySet();
ColumnFamilyData* GetDefault() const;
@ -219,8 +226,12 @@ class ColumnFamilySet {
std::vector<ColumnFamilyData*> droppped_column_families_;
uint32_t max_column_family_;
ColumnFamilyData* dummy_cfd_;
const std::string db_name_;
// TODO(icanadi) change to DBOptions
const Options* const db_options_;
const EnvOptions storage_options_;
Cache* table_cache_;
};
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {

@ -39,6 +39,7 @@
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "port/port.h"
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/column_family.h"
@ -209,6 +210,10 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
options_(SanitizeOptions(dbname, &internal_comparator_,
&internal_filter_policy_, options)),
internal_filter_policy_(options.filter_policy),
// Reserve ten files or so for other uses and give the rest to TableCache.
table_cache_(NewLRUCache(options_.max_open_files - 10,
options_.table_cache_numshardbits,
options_.table_cache_remove_scan_count_limit)),
db_lock_(nullptr),
mutex_(options.use_adaptive_mutex),
shutting_down_(nullptr),
@ -234,11 +239,6 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
env_->GetAbsolutePath(dbname, &db_absolute_path_);
// Reserve ten files or so for other uses and give the rest to TableCache.
const int table_cache_size = options_.max_open_files - 10;
table_cache_.reset(new TableCache(dbname_, &options_,
storage_options_, table_cache_size));
versions_.reset(
new VersionSet(dbname_, &options_, storage_options_, table_cache_.get()));
column_family_memtables_.reset(
@ -551,7 +551,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
if (!keep) {
if (type == kTableFile) {
// evict from cache
table_cache_->Evict(number);
TableCache::Evict(table_cache_.get(), number);
}
std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) +
"/" + state.all_files[i];
@ -1014,7 +1014,7 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_.get(), iter, &meta, cfd->user_comparator(),
cfd->table_cache(), iter, &meta, cfd->user_comparator(),
newest_snapshot, earliest_seqno_in_memtable,
GetCompressionFlush(options_));
LogFlush(options_.info_log);
@ -1079,7 +1079,7 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
(unsigned long)meta.number);
s = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_.get(), iter, &meta, cfd->user_comparator(),
cfd->table_cache(), iter, &meta, cfd->user_comparator(),
newest_snapshot, earliest_seqno_in_memtable,
GetCompressionFlush(options_));
LogFlush(options_.info_log);
@ -2031,7 +2031,7 @@ void DBImpl::CleanupCompaction(CompactionState* compact, Status status) {
// If this file was inserted into the table cache then remove
// them here because this compaction was not committed.
if (!status.ok()) {
table_cache_->Evict(out.number);
TableCache::Evict(table_cache_.get(), out.number);
}
}
delete compact;
@ -2148,10 +2148,9 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
if (s.ok() && current_entries > 0) {
// Verify that the table is usable
Iterator* iter = table_cache_->NewIterator(ReadOptions(),
storage_options_,
output_number,
current_bytes);
ColumnFamilyData* cfd = compact->compaction->column_family_data();
Iterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), storage_options_, output_number, current_bytes);
s = iter->status();
delete iter;
if (s.ok()) {

@ -251,7 +251,7 @@ class DBImpl : public DB {
const std::string dbname_;
unique_ptr<VersionSet> versions_;
const InternalKeyComparator internal_comparator_;
const Options options_; // options_.comparator == &internal_comparator_
const Options options_;
Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd,
SuperVersion* super_version);
@ -370,11 +370,10 @@ class DBImpl : public DB {
const ReadOptions& options, ColumnFamilyData* cfd,
uint64_t* superversion_number);
// Constant after construction
const InternalFilterPolicy internal_filter_policy_;
// table_cache_ provides its own synchronization
unique_ptr<TableCache> table_cache_;
std::shared_ptr<Cache> table_cache_;
// Lock over the persistent DB state. Non-nullptr iff successfully acquired.
FileLock* db_lock_;

@ -55,14 +55,21 @@ class Repairer {
icmp_(options.comparator),
ipolicy_(options.filter_policy),
options_(SanitizeOptions(dbname, &icmp_, &ipolicy_, options)),
cf_options_(ColumnFamilyOptions(options_)),
raw_table_cache_(
// TableCache can be small since we expect each table to be opened
// once.
NewLRUCache(10, options_.table_cache_numshardbits,
options_.table_cache_remove_scan_count_limit)),
next_file_number_(1) {
// TableCache can be small since we expect each table to be opened once.
table_cache_ = new TableCache(dbname_, &options_, storage_options_, 10);
table_cache_ = new TableCache(dbname_, &options_, &cf_options_,
storage_options_, raw_table_cache_.get());
edit_ = new VersionEdit();
}
~Repairer() {
delete table_cache_;
raw_table_cache_.reset();
delete edit_;
}
@ -102,6 +109,8 @@ class Repairer {
InternalKeyComparator const icmp_;
InternalFilterPolicy const ipolicy_;
Options const options_;
ColumnFamilyOptions const cf_options_;
std::shared_ptr<Cache> raw_table_cache_;
TableCache* table_cache_;
VersionEdit* edit_;

@ -34,18 +34,16 @@ static Slice GetSliceForFileNumber(uint64_t* file_number) {
sizeof(*file_number));
}
TableCache::TableCache(const std::string& dbname,
const Options* options,
const EnvOptions& storage_options,
int entries)
: env_(options->env),
// TODO(icanadi) Options -> DBOptions
TableCache::TableCache(const std::string& dbname, const Options* db_options,
const ColumnFamilyOptions* cf_options,
const EnvOptions& storage_options, Cache* const cache)
: env_(db_options->env),
dbname_(dbname),
options_(options),
db_options_(db_options),
cf_options_(cf_options),
storage_options_(storage_options),
cache_(
NewLRUCache(entries, options->table_cache_numshardbits,
options->table_cache_remove_scan_count_limit)) {
}
cache_(cache) {}
TableCache::~TableCache() {
}
@ -68,20 +66,21 @@ Status TableCache::FindTable(const EnvOptions& toptions,
unique_ptr<RandomAccessFile> file;
unique_ptr<TableReader> table_reader;
s = env_->NewRandomAccessFile(fname, &file, toptions);
RecordTick(options_->statistics.get(), NO_FILE_OPENS);
RecordTick(db_options_->statistics.get(), NO_FILE_OPENS);
if (s.ok()) {
if (options_->advise_random_on_open) {
if (db_options_->advise_random_on_open) {
file->Hint(RandomAccessFile::RANDOM);
}
StopWatch sw(env_, options_->statistics.get(), TABLE_OPEN_IO_MICROS);
s = options_->table_factory->GetTableReader(*options_, toptions,
std::move(file), file_size,
&table_reader);
StopWatch sw(env_, db_options_->statistics.get(), TABLE_OPEN_IO_MICROS);
// TODO(icanadi) terrible hack. fix this
Options options(DBOptions(*db_options_), *cf_options_);
s = cf_options_->table_factory->GetTableReader(
options, toptions, std::move(file), file_size, &table_reader);
}
if (!s.ok()) {
assert(table_reader == nullptr);
RecordTick(options_->statistics.get(), NO_FILE_ERRORS);
RecordTick(db_options_->statistics.get(), NO_FILE_ERRORS);
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
@ -112,7 +111,7 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
TableReader* table_reader =
reinterpret_cast<TableReader*>(cache_->Value(handle));
Iterator* result = table_reader->NewIterator(options);
result->RegisterCleanup(&UnrefEntry, cache_.get(), handle);
result->RegisterCleanup(&UnrefEntry, cache_, handle);
if (table_reader_ptr != nullptr) {
*table_reader_ptr = table_reader;
}
@ -167,8 +166,8 @@ bool TableCache::PrefixMayMatch(const ReadOptions& options,
return may_match;
}
void TableCache::Evict(uint64_t file_number) {
cache_->Erase(GetSliceForFileNumber(&file_number));
void TableCache::Evict(Cache* cache, uint64_t file_number) {
cache->Erase(GetSliceForFileNumber(&file_number));
}
} // namespace rocksdb

@ -24,8 +24,9 @@ class Env;
class TableCache {
public:
TableCache(const std::string& dbname, const Options* options,
const EnvOptions& storage_options, int entries);
TableCache(const std::string& dbname, const Options* db_options,
const ColumnFamilyOptions* cf_options,
const EnvOptions& storage_options, Cache* cache);
~TableCache();
// Return an iterator for the specified file number (the corresponding
@ -61,14 +62,15 @@ class TableCache {
bool* table_io);
// Evict any entry for the specified file number
void Evict(uint64_t file_number);
static void Evict(Cache* cache, uint64_t file_number);
private:
Env* const env_;
const std::string dbname_;
const Options* options_;
const Options* db_options_;
const ColumnFamilyOptions* cf_options_;
const EnvOptions& storage_options_;
std::shared_ptr<Cache> cache_;
Cache* const cache_;
Status FindTable(const EnvOptions& toptions, uint64_t file_number,
uint64_t file_size, Cache::Handle**, bool* table_io=nullptr,

@ -229,11 +229,10 @@ bool Version::PrefixMayMatch(const ReadOptions& options,
// key() will always be the biggest value for this SST?
may_match = true;
} else {
may_match = vset_->table_cache_->PrefixMayMatch(
options,
DecodeFixed64(level_iter->value().data()),
DecodeFixed64(level_iter->value().data() + 8),
internal_prefix, nullptr);
may_match = cfd_->table_cache()->PrefixMayMatch(
options, DecodeFixed64(level_iter->value().data()),
DecodeFixed64(level_iter->value().data() + 8), internal_prefix,
nullptr);
}
return may_match;
}
@ -252,8 +251,8 @@ Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
return NewEmptyIterator();
}
}
return NewTwoLevelIterator(level_iter, &GetFileIterator,
vset_->table_cache_, options, soptions);
return NewTwoLevelIterator(level_iter, &GetFileIterator, cfd_->table_cache(),
options, soptions);
}
void Version::AddIterators(const ReadOptions& options,
@ -261,8 +260,7 @@ void Version::AddIterators(const ReadOptions& options,
std::vector<Iterator*>* iters) {
// Merge all level zero files together since they may overlap
for (const FileMetaData* file : files_[0]) {
iters->push_back(
vset_->table_cache_->NewIterator(
iters->push_back(cfd_->table_cache()->NewIterator(
options, soptions, file->number, file->file_size));
}
@ -526,8 +524,8 @@ void Version::Get(const ReadOptions& options,
prev_file = f;
#endif
bool tableIO = false;
*status = vset_->table_cache_->Get(options, f->number, f->file_size,
ikey, &saver, SaveValue, &tableIO,
*status = cfd_->table_cache()->Get(options, f->number, f->file_size, ikey,
&saver, SaveValue, &tableIO,
MarkKeyMayExist);
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
@ -1372,13 +1370,12 @@ class VersionSet::Builder {
};
VersionSet::VersionSet(const std::string& dbname, const Options* options,
const EnvOptions& storage_options,
TableCache* table_cache)
: column_family_set_(new ColumnFamilySet(options)),
const EnvOptions& storage_options, Cache* table_cache)
: column_family_set_(new ColumnFamilySet(dbname, options, storage_options,
table_cache)),
env_(options->env),
dbname_(dbname),
options_(options),
table_cache_(table_cache),
next_file_number_(2),
manifest_file_number_(0), // Filled by Recover()
last_sequence_(0),
@ -1386,8 +1383,7 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options,
current_version_number_(0),
manifest_file_size_(0),
storage_options_(storage_options),
storage_options_compactions_(storage_options_) {
}
storage_options_compactions_(storage_options_) {}
VersionSet::~VersionSet() {
for (auto cfd : *column_family_set_) {
@ -1936,8 +1932,11 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
"Number of levels needs to be bigger than 1");
}
TableCache tc(dbname, options, storage_options, 10);
VersionSet versions(dbname, options, storage_options, &tc);
ColumnFamilyOptions cf_options(*options);
std::shared_ptr<Cache> tc(NewLRUCache(
options->max_open_files - 10, options->table_cache_numshardbits,
options->table_cache_remove_scan_count_limit));
VersionSet versions(dbname, options, storage_options, tc.get());
Status status;
std::vector<ColumnFamilyDescriptor> dummy;
@ -2229,7 +2228,7 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
// "ikey" falls in the range for this table. Add the
// approximate offset of "ikey" within the table.
TableReader* table_reader_ptr;
Iterator* iter = table_cache_->NewIterator(
Iterator* iter = v->cfd_->table_cache()->NewIterator(
ReadOptions(), storage_options_, files[i]->number,
files[i]->file_size, &table_reader_ptr);
if (table_reader_ptr != nullptr) {
@ -2285,7 +2284,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
if (!c->inputs(which)->empty()) {
if (c->level() + which == 0) {
for (const auto& file : *c->inputs(which)) {
list[num++] = table_cache_->NewIterator(
list[num++] = c->column_family_data()->table_cache()->NewIterator(
options, storage_options_compactions_, file->number,
file->file_size, nullptr, true /* for compaction */);
}
@ -2295,8 +2294,8 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
new Version::LevelFileNumIterator(
c->column_family_data()->internal_comparator(),
c->inputs(which)),
&GetFileIterator, table_cache_, options, storage_options_,
true /* for compaction */);
&GetFileIterator, c->column_family_data()->table_cache(), options,
storage_options_, true /* for compaction */);
}
}
}

@ -41,7 +41,6 @@ class Compaction;
class CompactionPicker;
class Iterator;
class MemTable;
class TableCache;
class Version;
class VersionSet;
class MergeContext;
@ -281,7 +280,7 @@ class Version {
class VersionSet {
public:
VersionSet(const std::string& dbname, const Options* options,
const EnvOptions& storage_options, TableCache* table_cache);
const EnvOptions& storage_options, Cache* table_cache);
~VersionSet();
// Apply *edit to the current version to form a new descriptor that
@ -424,7 +423,6 @@ class VersionSet {
Env* const env_;
const std::string dbname_;
const Options* const options_;
TableCache* const table_cache_;
uint64_t next_file_number_;
uint64_t manifest_file_number_;
std::atomic<uint64_t> last_sequence_;

@ -12,6 +12,7 @@
#include "db/write_batch_internal.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/column_family.h"
#include "rocksdb/cache.h"
#include "util/coding.h"
#include <ctime>
@ -535,9 +536,10 @@ void ManifestDumpCommand::DoCommand() {
EnvOptions sopt;
std::string file(manifestfile);
std::string dbname("dummy");
TableCache* tc = new TableCache(dbname, &options, sopt, 10);
VersionSet* versions = new VersionSet(dbname, &options, sopt, tc);
std::shared_ptr<Cache> tc(NewLRUCache(
options.max_open_files - 10, options.table_cache_numshardbits,
options.table_cache_remove_scan_count_limit));
VersionSet* versions = new VersionSet(dbname, &options, sopt, tc.get());
Status s = versions->DumpManifest(options, file, verbose_, is_key_hex_);
if (!s.ok()) {
printf("Error in processing file %s %s\n", manifestfile.c_str(),
@ -1011,9 +1013,11 @@ Options ReduceDBLevelsCommand::PrepareOptionsForOpenDB() {
Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
int* levels) {
EnvOptions soptions;
TableCache tc(db_path_, &opt, soptions, 10);
std::shared_ptr<Cache> tc(
NewLRUCache(opt.max_open_files - 10, opt.table_cache_numshardbits,
opt.table_cache_remove_scan_count_limit));
const InternalKeyComparator cmp(opt.comparator);
VersionSet versions(db_path_, &opt, soptions, &tc);
VersionSet versions(db_path_, &opt, soptions, tc.get());
std::vector<ColumnFamilyDescriptor> dummy;
ColumnFamilyDescriptor dummy_descriptor(default_column_family_name,
ColumnFamilyOptions(opt));

Loading…
Cancel
Save