Preload some files even if options.max_open_files (#3340)

Summary:
Choose to preload some files if options.max_open_files != -1. This can slightly narrow the gap of performance between options.max_open_files is -1 and a large number. To avoid a significant regression to DB reopen speed if options.max_open_files != -1. Limit the files to preload in DB open time to 16.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/3340

Differential Revision: D6686945

Pulled By: siying

fbshipit-source-id: 8ec11bbdb46e3d0cdee7b6ad5897a09c5a07869f
main
Siying Dong 6 years ago committed by Facebook Github Bot
parent 46e3209e0d
commit f0dda35d7d
  1. 1
      HISTORY.md
  2. 23
      db/db_compaction_test.cc
  3. 2
      db/db_impl_open.cc
  4. 11
      db/db_properties_test.cc
  5. 20
      db/db_test2.cc
  6. 1
      db/db_test_util.cc
  7. 32
      db/db_wal_test.cc
  8. 2
      db/table_cache.h
  9. 42
      db/version_builder.cc
  10. 1
      db/version_builder.h
  11. 13
      db/version_set.cc

@ -3,6 +3,7 @@
### New Features ### New Features
* Enabled checkpoint on readonly db (DBImplReadOnly). * Enabled checkpoint on readonly db (DBImplReadOnly).
* Make DB ignore dropped column families while committing results of atomic flush. * Make DB ignore dropped column families while committing results of atomic flush.
* RocksDB may choose to preopen some files even if options.max_open_files != -1. This may make DB open slightly longer.
### Public API Change ### Public API Change
* Transaction::GetForUpdate is extended with a do_validate parameter with default value of true. If false it skips validating the snapshot before doing the read. Similarly ::Merge, ::Put, ::Delete, and ::SingleDelete are extended with assume_tracked with default value of false. If true it indicates that call is assumed to be after a ::GetForUpdate. * Transaction::GetForUpdate is extended with a do_validate parameter with default value of true. If false it skips validating the snapshot before doing the read. Similarly ::Merge, ::Put, ::Delete, and ::SingleDelete are extended with assume_tracked with default value of false. If true it indicates that call is assumed to be after a ::GetForUpdate.

@ -414,6 +414,7 @@ TEST_F(DBCompactionTest, SkipStatsUpdateTest) {
// Reopen the DB with stats-update disabled // Reopen the DB with stats-update disabled
options.skip_stats_update_on_db_open = true; options.skip_stats_update_on_db_open = true;
options.max_open_files = 20;
env_->random_file_open_counter_.store(0); env_->random_file_open_counter_.store(0);
Reopen(options); Reopen(options);
@ -439,7 +440,7 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = env_; options.env = env_;
options.new_table_reader_for_compaction_inputs = true; options.new_table_reader_for_compaction_inputs = true;
options.max_open_files = 100; options.max_open_files = 20;
options.level0_file_num_compaction_trigger = 3; options.level0_file_num_compaction_trigger = 3;
DestroyAndReopen(options); DestroyAndReopen(options);
Random rnd(301); Random rnd(301);
@ -468,15 +469,16 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
Flush(); Flush();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
// preloading iterator issues one table cache lookup and create // preloading iterator issues one table cache lookup and create
// a new table reader. // a new table reader, if not preloaded.
ASSERT_EQ(num_table_cache_lookup, 1); int old_num_table_cache_lookup = num_table_cache_lookup;
ASSERT_GE(num_table_cache_lookup, 1);
ASSERT_EQ(num_new_table_reader, 1); ASSERT_EQ(num_new_table_reader, 1);
num_table_cache_lookup = 0; num_table_cache_lookup = 0;
num_new_table_reader = 0; num_new_table_reader = 0;
ASSERT_EQ(Key(k), Get(Key(k))); ASSERT_EQ(Key(k), Get(Key(k)));
// lookup iterator from table cache and no need to create a new one. // lookup iterator from table cache and no need to create a new one.
ASSERT_EQ(num_table_cache_lookup, 1); ASSERT_EQ(old_num_table_cache_lookup + num_table_cache_lookup, 2);
ASSERT_EQ(num_new_table_reader, 0); ASSERT_EQ(num_new_table_reader, 0);
} }
} }
@ -489,7 +491,10 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
// a new table reader. One file is created for flush and one for compaction. // a new table reader. One file is created for flush and one for compaction.
// Compaction inputs make no table cache look-up for data/range deletion // Compaction inputs make no table cache look-up for data/range deletion
// iterators // iterators
ASSERT_EQ(num_table_cache_lookup, 2); // May preload table cache too.
ASSERT_GE(num_table_cache_lookup, 2);
int old_num_table_cache_lookup2 = num_table_cache_lookup;
// Create new iterator for: // Create new iterator for:
// (1) 1 for verifying flush results // (1) 1 for verifying flush results
// (2) 3 for compaction input files // (2) 3 for compaction input files
@ -499,7 +504,7 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
num_table_cache_lookup = 0; num_table_cache_lookup = 0;
num_new_table_reader = 0; num_new_table_reader = 0;
ASSERT_EQ(Key(1), Get(Key(1))); ASSERT_EQ(Key(1), Get(Key(1)));
ASSERT_EQ(num_table_cache_lookup, 1); ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 3);
ASSERT_EQ(num_new_table_reader, 0); ASSERT_EQ(num_new_table_reader, 0);
num_table_cache_lookup = 0; num_table_cache_lookup = 0;
@ -511,14 +516,16 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
db_->CompactRange(cro, nullptr, nullptr); db_->CompactRange(cro, nullptr, nullptr);
// Only verifying compaction outputs issues one table cache lookup // Only verifying compaction outputs issues one table cache lookup
// for both data block and range deletion block). // for both data block and range deletion block).
ASSERT_EQ(num_table_cache_lookup, 1); // May preload table cache too.
ASSERT_GE(num_table_cache_lookup, 1);
old_num_table_cache_lookup2 = num_table_cache_lookup;
// One for compaction input, one for verifying compaction results. // One for compaction input, one for verifying compaction results.
ASSERT_EQ(num_new_table_reader, 2); ASSERT_EQ(num_new_table_reader, 2);
num_table_cache_lookup = 0; num_table_cache_lookup = 0;
num_new_table_reader = 0; num_new_table_reader = 0;
ASSERT_EQ(Key(1), Get(Key(1))); ASSERT_EQ(Key(1), Get(Key(1)));
ASSERT_EQ(num_table_cache_lookup, 1); ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 2);
ASSERT_EQ(num_new_table_reader, 0); ASSERT_EQ(num_new_table_reader, 0);
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();

@ -41,6 +41,8 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
max_max_open_files = 0x400000; max_max_open_files = 0x400000;
} }
ClipToRange(&result.max_open_files, 20, max_max_open_files); ClipToRange(&result.max_open_files, 20, max_max_open_files);
TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
&result.max_open_files);
} }
if (result.info_log == nullptr) { if (result.info_log == nullptr) {

@ -393,7 +393,15 @@ TEST_F(DBPropertiesTest, ReadLatencyHistogramByLevel) {
options.target_file_size_base = 98 << 10; options.target_file_size_base = 98 << 10;
options.max_write_buffer_number = 2; options.max_write_buffer_number = 2;
options.statistics = rocksdb::CreateDBStatistics(); options.statistics = rocksdb::CreateDBStatistics();
options.max_open_files = 100; options.max_open_files = 11; // Make sure no proloading of table readers
// RocksDB sanitize max open files to at least 20. Modify it back.
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = static_cast<int*>(arg);
*max_open_files = 11;
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;
table_options.no_block_cache = true; table_options.no_block_cache = true;
@ -441,6 +449,7 @@ TEST_F(DBPropertiesTest, ReadLatencyHistogramByLevel) {
// Reopen and issue iterating. See thee latency tracked // Reopen and issue iterating. See thee latency tracked
ReopenWithColumnFamilies({"default", "pikachu"}, options); ReopenWithColumnFamilies({"default", "pikachu"}, options);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.cf-file-histogram", &prop)); ASSERT_TRUE(dbfull()->GetProperty("rocksdb.cf-file-histogram", &prop));
ASSERT_EQ(std::string::npos, prop.find("** Level 0 read latency histogram")); ASSERT_EQ(std::string::npos, prop.find("** Level 0 read latency histogram"));
ASSERT_EQ(std::string::npos, prop.find("** Level 1 read latency histogram")); ASSERT_EQ(std::string::npos, prop.find("** Level 1 read latency histogram"));

@ -342,6 +342,7 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
ASSERT_GE(cache->GetUsage(), 1024 * 1024); ASSERT_GE(cache->GetUsage(), 1024 * 1024);
Close(); Close();
options.write_buffer_manager.reset(); options.write_buffer_manager.reset();
last_options_.write_buffer_manager.reset();
ASSERT_LT(cache->GetUsage(), 1024 * 1024); ASSERT_LT(cache->GetUsage(), 1024 * 1024);
} }
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
@ -1476,11 +1477,26 @@ TEST_P(PinL0IndexAndFilterBlocksTest, DisablePrefetchingNonL0IndexAndFilter) {
uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS); uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT); uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
if (!infinite_max_files_) {
// Now we have two files. We narrow the max open files to allow 3 entries
// so that preloading SST files won't happen.
options.max_open_files = 13;
// RocksDB sanitize max open files to at least 20. Modify it back.
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = static_cast<int*>(arg);
*max_open_files = 13;
});
}
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Reopen database. If max_open_files is set as -1, table readers will be // Reopen database. If max_open_files is set as -1, table readers will be
// preloaded. This will trigger a BlockBasedTable::Open() and prefetch // preloaded. This will trigger a BlockBasedTable::Open() and prefetch
// L0 index and filter. Level 1's prefetching is disabled in DB::Open() // L0 index and filter. Level 1's prefetching is disabled in DB::Open()
TryReopenWithColumnFamilies({"default", "pikachu"}, options); TryReopenWithColumnFamilies({"default", "pikachu"}, options);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
if (infinite_max_files_) { if (infinite_max_files_) {
// After reopen, cache miss are increased by one because we read (and only // After reopen, cache miss are increased by one because we read (and only
// read) filter and index on L0 // read) filter and index on L0
@ -1650,9 +1666,11 @@ class MockPersistentCache : public PersistentCache {
// Make sure that in CPU time perf context counters, Env::NowCPUNanos() // Make sure that in CPU time perf context counters, Env::NowCPUNanos()
// is used, rather than Env::CPUNanos(); // is used, rather than Env::CPUNanos();
TEST_F(DBTest2, TestPerfContextCpuTime) { TEST_F(DBTest2, TestPerfContextCpuTime) {
// force resizing table cache so table handle is not preloaded so that
// we can measure find_table_nanos during Get().
dbfull()->TEST_table_cache()->SetCapacity(0);
ASSERT_OK(Put("foo", "bar")); ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush()); ASSERT_OK(Flush());
env_->now_cpu_count_.store(0); env_->now_cpu_count_.store(0);
// CPU timing is not enabled with kEnableTimeExceptForMutex // CPU timing is not enabled with kEnableTimeExceptForMutex

@ -604,6 +604,7 @@ Status DBTestBase::TryReopenWithColumnFamilies(
column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i])); column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
} }
DBOptions db_opts = DBOptions(options[0]); DBOptions db_opts = DBOptions(options[0]);
last_options_ = options[0];
return DB::Open(db_opts, dbname_, column_families, &handles_, &db_); return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
} }

@ -283,7 +283,31 @@ TEST_F(DBWALTest, RecoverWithTableHandle) {
ASSERT_OK(Put(1, "bar", "v4")); ASSERT_OK(Put(1, "bar", "v4"));
ASSERT_OK(Flush(1)); ASSERT_OK(Flush(1));
ASSERT_OK(Put(1, "big", std::string(100, 'a'))); ASSERT_OK(Put(1, "big", std::string(100, 'a')));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
options = CurrentOptions();
const int kSmallMaxOpenFiles = 13;
if (option_config_ == kDBLogDir) {
// Use this option to check not preloading files
// Set the max open files to be small enough so no preload will
// happen.
options.max_open_files = kSmallMaxOpenFiles;
// RocksDB sanitize max open files to at least 20. Modify it back.
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
int* max_open_files = static_cast<int*>(arg);
*max_open_files = kSmallMaxOpenFiles;
});
} else if (option_config_ == kWalDirAndMmapReads) {
// Use this option to check always loading all files.
options.max_open_files = 100;
} else {
options.max_open_files = -1;
}
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ReopenWithColumnFamilies({"default", "pikachu"}, options);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
std::vector<std::vector<FileMetaData>> files; std::vector<std::vector<FileMetaData>> files;
dbfull()->TEST_GetFilesMetaData(handles_[1], &files); dbfull()->TEST_GetFilesMetaData(handles_[1], &files);
@ -294,10 +318,10 @@ TEST_F(DBWALTest, RecoverWithTableHandle) {
ASSERT_EQ(total_files, 3); ASSERT_EQ(total_files, 3);
for (const auto& level : files) { for (const auto& level : files) {
for (const auto& file : level) { for (const auto& file : level) {
if (kInfiniteMaxOpenFiles == option_config_) { if (options.max_open_files == kSmallMaxOpenFiles) {
ASSERT_TRUE(file.table_reader_handle != nullptr);
} else {
ASSERT_TRUE(file.table_reader_handle == nullptr); ASSERT_TRUE(file.table_reader_handle == nullptr);
} else {
ASSERT_TRUE(file.table_reader_handle != nullptr);
} }
} }
} }

@ -122,6 +122,8 @@ class TableCache {
// Release the handle from a cache // Release the handle from a cache
void ReleaseHandle(Cache::Handle* handle); void ReleaseHandle(Cache::Handle* handle);
Cache* get_cache() const { return cache_; }
// Capacity of the backing Cache that indicates inifinite TableCache capacity. // Capacity of the backing Cache that indicates inifinite TableCache capacity.
// For example when max_open_files is -1 we set the backing Cache to this. // For example when max_open_files is -1 we set the backing Cache to this.
static const int kInfiniteCapacity = 0x400000; static const int kInfiniteCapacity = 0x400000;

@ -366,8 +366,40 @@ class VersionBuilder::Rep {
void LoadTableHandlers(InternalStats* internal_stats, int max_threads, void LoadTableHandlers(InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache, bool prefetch_index_and_filter_in_cache,
bool is_initial_load,
const SliceTransform* prefix_extractor) { const SliceTransform* prefix_extractor) {
assert(table_cache_ != nullptr); assert(table_cache_ != nullptr);
size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity();
bool always_load = (table_cache_capacity == TableCache::kInfiniteCapacity);
size_t max_load = port::kMaxSizet;
if (!always_load) {
// If it is initial loading and not set to always laoding all the
// files, we only load up to kInitialLoadLimit files, to limit the
// time reopening the DB.
const size_t kInitialLoadLimit = 16;
size_t load_limit;
// If the table cache is not 1/4 full, we pin the table handle to
// file metadata to avoid the cache read costs when reading the file.
// The downside of pinning those files is that LRU won't be followed
// for those files. This doesn't matter much because if number of files
// of the DB excceeds table cache capacity, eventually no table reader
// will be pinned and LRU will be followed.
if (is_initial_load) {
load_limit = std::min(kInitialLoadLimit, table_cache_capacity / 4);
} else {
load_limit = table_cache_capacity / 4;
}
size_t table_cache_usage = table_cache_->get_cache()->GetUsage();
if (table_cache_usage >= load_limit) {
return;
} else {
max_load = load_limit - table_cache_usage;
}
}
// <file metadata, level> // <file metadata, level>
std::vector<std::pair<FileMetaData*, int>> files_meta; std::vector<std::pair<FileMetaData*, int>> files_meta;
for (int level = 0; level < num_levels_; level++) { for (int level = 0; level < num_levels_; level++) {
@ -375,6 +407,12 @@ class VersionBuilder::Rep {
auto* file_meta = file_meta_pair.second; auto* file_meta = file_meta_pair.second;
assert(!file_meta->table_reader_handle); assert(!file_meta->table_reader_handle);
files_meta.emplace_back(file_meta, level); files_meta.emplace_back(file_meta, level);
if (files_meta.size() >= max_load) {
break;
}
}
if (files_meta.size() >= max_load) {
break;
} }
} }
@ -452,9 +490,11 @@ void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
void VersionBuilder::LoadTableHandlers(InternalStats* internal_stats, void VersionBuilder::LoadTableHandlers(InternalStats* internal_stats,
int max_threads, int max_threads,
bool prefetch_index_and_filter_in_cache, bool prefetch_index_and_filter_in_cache,
bool is_initial_load,
const SliceTransform* prefix_extractor) { const SliceTransform* prefix_extractor) {
rep_->LoadTableHandlers(internal_stats, max_threads, rep_->LoadTableHandlers(internal_stats, max_threads,
prefetch_index_and_filter_in_cache, prefix_extractor); prefetch_index_and_filter_in_cache, is_initial_load,
prefix_extractor);
} }
void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level, void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,

@ -35,6 +35,7 @@ class VersionBuilder {
void SaveTo(VersionStorageInfo* vstorage); void SaveTo(VersionStorageInfo* vstorage);
void LoadTableHandlers(InternalStats* internal_stats, int max_threads, void LoadTableHandlers(InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache, bool prefetch_index_and_filter_in_cache,
bool is_initial_load,
const SliceTransform* prefix_extractor); const SliceTransform* prefix_extractor);
void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f); void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f);

@ -3002,9 +3002,7 @@ Status VersionSet::ProcessManifestWrites(
mu->Unlock(); mu->Unlock();
TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest"); TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
if (!first_writer.edit_list.front()->IsColumnFamilyManipulation() && if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
column_family_set_->get_table_cache()->GetCapacity() ==
TableCache::kInfiniteCapacity) {
for (int i = 0; i < static_cast<int>(versions.size()); ++i) { for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
assert(!builder_guards.empty() && assert(!builder_guards.empty() &&
builder_guards.size() == versions.size()); builder_guards.size() == versions.size());
@ -3013,7 +3011,10 @@ Status VersionSet::ProcessManifestWrites(
ColumnFamilyData* cfd = versions[i]->cfd_; ColumnFamilyData* cfd = versions[i]->cfd_;
builder_guards[i]->version_builder()->LoadTableHandlers( builder_guards[i]->version_builder()->LoadTableHandlers(
cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits, cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits,
true /* prefetch_index_and_filter_in_cache */, this->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
TableCache::
kInfiniteCapacity /* prefetch_index_and_filter_in_cache */,
false /* is_initial_load */,
mutable_cf_options_ptrs[i]->prefix_extractor.get()); mutable_cf_options_ptrs[i]->prefix_extractor.get());
} }
} }
@ -3671,15 +3672,13 @@ Status VersionSet::Recover(
assert(builders_iter != builders.end()); assert(builders_iter != builders.end());
auto* builder = builders_iter->second->version_builder(); auto* builder = builders_iter->second->version_builder();
if (GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
TableCache::kInfiniteCapacity) {
// unlimited table cache. Pre-load table handle now. // unlimited table cache. Pre-load table handle now.
// Need to do it out of the mutex. // Need to do it out of the mutex.
builder->LoadTableHandlers( builder->LoadTableHandlers(
cfd->internal_stats(), db_options_->max_file_opening_threads, cfd->internal_stats(), db_options_->max_file_opening_threads,
false /* prefetch_index_and_filter_in_cache */, false /* prefetch_index_and_filter_in_cache */,
true /* is_initial_load */,
cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
}
Version* v = new Version(cfd, this, env_options_, Version* v = new Version(cfd, this, env_options_,
*cfd->GetLatestMutableCFOptions(), *cfd->GetLatestMutableCFOptions(),

Loading…
Cancel
Save