Add DB property "rocksdb.estimate-table-readers-mem"

Summary:
Add a DB Property "rocksdb.estimate-table-readers-mem" to return estimated memory usage by all loaded table readers, other than allocated from block cache.

Refactor the property codes to allow getting property from a version, with DB mutex not acquired.

Test Plan: Add several checks of this new property in existing codes for various cases.

Reviewers: yhchiang, ljin

Reviewed By: ljin

Subscribers: xjin, igor, leveldb

Differential Revision: https://reviews.facebook.net/D20733
main
sdong 11 years ago
parent 606a126703
commit 1242bfcad7
  1. 130
      db/db_impl.cc
  2. 14
      db/db_impl.h
  3. 33
      db/db_test.cc
  4. 46
      db/internal_stats.cc
  5. 15
      db/internal_stats.h
  6. 9
      db/plain_table_db_test.cc
  7. 2
      db/simple_table_db_test.cc
  8. 23
      db/table_cache.cc
  9. 7
      db/table_cache.h
  10. 12
      db/version_set.cc
  11. 2
      db/version_set.h
  12. 11
      table/block.cc
  13. 3
      table/block.h
  14. 26
      table/block_based_table_reader.cc
  15. 2
      table/block_based_table_reader.h
  16. 3
      table/cuckoo_table_reader.cc
  17. 3
      table/cuckoo_table_reader.h
  18. 3
      table/filter_block.cc
  19. 1
      table/filter_block.h
  20. 4
      table/plain_table_reader.h
  21. 3
      table/table_reader.h

@ -514,17 +514,21 @@ void DBImpl::MaybeDumpStats() {
// period in rare cases.
last_stats_dump_time_microsec_ = now_micros;
DBPropertyType cf_property_type = GetPropertyType("rocksdb.cfstats");
DBPropertyType db_property_type = GetPropertyType("rocksdb.dbstats");
bool tmp1 = false;
bool tmp2 = false;
DBPropertyType cf_property_type =
GetPropertyType("rocksdb.cfstats", &tmp1, &tmp2);
DBPropertyType db_property_type =
GetPropertyType("rocksdb.dbstats", &tmp1, &tmp2);
std::string stats;
{
MutexLock l(&mutex_);
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->internal_stats()->GetProperty(
cf_property_type, "rocksdb.cfstats", &stats);
cfd->internal_stats()->GetStringProperty(cf_property_type,
"rocksdb.cfstats", &stats);
}
default_cf_internal_stats_->GetProperty(
db_property_type, "rocksdb.dbstats", &stats);
default_cf_internal_stats_->GetStringProperty(db_property_type,
"rocksdb.dbstats", &stats);
}
Log(options_.info_log, "------- DUMPING STATS -------");
Log(options_.info_log, "%s", stats.c_str());
@ -3321,15 +3325,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
}
// Acquire SuperVersion
SuperVersion* sv = nullptr;
// TODO(ljin): consider using GetReferencedSuperVersion() directly
if (LIKELY(options_.allow_thread_local)) {
sv = cfd->GetThreadLocalSuperVersion(&mutex_);
} else {
mutex_.Lock();
sv = cfd->GetSuperVersion()->Ref();
mutex_.Unlock();
}
SuperVersion* sv = GetAndRefSuperVersion(cfd);
// Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context;
@ -3356,22 +3352,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
PERF_TIMER_START(get_post_process_time);
bool unref_sv = true;
if (LIKELY(options_.allow_thread_local)) {
unref_sv = !cfd->ReturnThreadLocalSuperVersion(sv);
}
if (unref_sv) {
// Release SuperVersion
if (sv->Unref()) {
mutex_.Lock();
sv->Cleanup();
mutex_.Unlock();
delete sv;
RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
}
RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
}
ReturnAndCleanupSuperVersion(cfd, sv);
RecordTick(stats_, NUMBER_KEYS_READ);
RecordTick(stats_, BYTES_READ, value->size());
@ -4372,21 +4353,92 @@ const Options& DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
const Slice& property, std::string* value) {
bool is_int_property;
bool need_out_of_mutex;
DBPropertyType property_type =
GetPropertyType(property, &is_int_property, &need_out_of_mutex);
value->clear();
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
DBPropertyType property_type = GetPropertyType(property);
MutexLock l(&mutex_);
return cfd->internal_stats()->GetProperty(property_type, property, value);
if (is_int_property) {
uint64_t int_value;
bool ret_value = GetIntPropertyInternal(column_family, property_type,
need_out_of_mutex, &int_value);
if (ret_value) {
*value = std::to_string(int_value);
}
return ret_value;
} else {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
MutexLock l(&mutex_);
return cfd->internal_stats()->GetStringProperty(property_type, property,
value);
}
}
bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
const Slice& property, uint64_t* value) {
bool is_int_property;
bool need_out_of_mutex;
DBPropertyType property_type =
GetPropertyType(property, &is_int_property, &need_out_of_mutex);
if (!is_int_property) {
return false;
}
return GetIntPropertyInternal(column_family, property_type, need_out_of_mutex,
value);
}
bool DBImpl::GetIntPropertyInternal(ColumnFamilyHandle* column_family,
DBPropertyType property_type,
bool need_out_of_mutex, uint64_t* value) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
DBPropertyType property_type = GetPropertyType(property);
MutexLock l(&mutex_);
return cfd->internal_stats()->GetIntProperty(property_type, property, value);
if (!need_out_of_mutex) {
MutexLock l(&mutex_);
return cfd->internal_stats()->GetIntProperty(property_type, value);
} else {
SuperVersion* sv = GetAndRefSuperVersion(cfd);
bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
property_type, sv->current, value);
ReturnAndCleanupSuperVersion(cfd, sv);
return ret;
}
}
SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
// TODO(ljin): consider using GetReferencedSuperVersion() directly
if (LIKELY(options_.allow_thread_local)) {
return cfd->GetThreadLocalSuperVersion(&mutex_);
} else {
MutexLock l(&mutex_);
return cfd->GetSuperVersion()->Ref();
}
}
void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
SuperVersion* sv) {
bool unref_sv = true;
if (LIKELY(options_.allow_thread_local)) {
unref_sv = !cfd->ReturnThreadLocalSuperVersion(sv);
}
if (unref_sv) {
// Release SuperVersion
if (sv->Unref()) {
{
MutexLock l(&mutex_);
sv->Cleanup();
}
delete sv;
RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
}
RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
}
}
void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family,

@ -611,6 +611,16 @@ class DBImpl : public DB {
void InstallSuperVersion(ColumnFamilyData* cfd,
DeletionState& deletion_state);
// Find Super version and reference it. Based on options, it might return
// the thread local cached one.
inline SuperVersion* GetAndRefSuperVersion(ColumnFamilyData* cfd);
// Un-reference the super version and return it to thread local cache if
// needed. If it is the last reference of the super version. Clean it up
// after un-referencing it.
inline void ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
SuperVersion* sv);
#ifndef ROCKSDB_LITE
using DB::GetPropertiesOfAllTables;
virtual Status GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
@ -623,6 +633,10 @@ class DBImpl : public DB {
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, std::string* value,
bool* value_found = nullptr);
bool GetIntPropertyInternal(ColumnFamilyHandle* column_family,
DBPropertyType property_type,
bool need_out_of_mutex, uint64_t* value);
};
// Sanitize db options. The caller should delete result.info_log if

@ -1186,6 +1186,10 @@ TEST(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) {
ASSERT_EQ(2, /* only index/filter were added */
TestGetTickerCount(options, BLOCK_CACHE_ADD));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS));
uint64_t int_num;
ASSERT_TRUE(
dbfull()->GetIntProperty("rocksdb.estimate-table-readers-mem", &int_num));
ASSERT_EQ(int_num, 0U);
// Make sure filter block is in cache.
std::string value;
@ -2489,6 +2493,10 @@ TEST(DBTest, GetProperty) {
uint64_t int_num;
SetPerfLevel(kEnableTime);
ASSERT_TRUE(
dbfull()->GetIntProperty("rocksdb.estimate-table-readers-mem", &int_num));
ASSERT_EQ(int_num, 0U);
ASSERT_OK(dbfull()->Put(writeOpt, "k1", big_value));
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num));
ASSERT_EQ(num, "0");
@ -2525,6 +2533,10 @@ TEST(DBTest, GetProperty) {
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.estimate-num-keys", &int_num));
ASSERT_EQ(int_num, 4U);
ASSERT_TRUE(
dbfull()->GetIntProperty("rocksdb.estimate-table-readers-mem", &int_num));
ASSERT_EQ(int_num, 0U);
sleeping_task_high.WakeUp();
sleeping_task_high.WaitUntilDone();
dbfull()->TEST_WaitForFlushMemTable();
@ -2538,8 +2550,29 @@ TEST(DBTest, GetProperty) {
ASSERT_EQ(num, "1");
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.estimate-num-keys", &num));
ASSERT_EQ(num, "4");
ASSERT_TRUE(
dbfull()->GetIntProperty("rocksdb.estimate-table-readers-mem", &int_num));
ASSERT_GT(int_num, 0U);
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
dbfull()->TEST_WaitForFlushMemTable();
options.max_open_files = 10;
Reopen(&options);
// After reopening, no table reader is loaded, so no memory for table readers
ASSERT_TRUE(
dbfull()->GetIntProperty("rocksdb.estimate-table-readers-mem", &int_num));
ASSERT_EQ(int_num, 0U);
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.estimate-num-keys", &int_num));
ASSERT_GT(int_num, 0U);
// After reading a key, at least one table reader is loaded.
Get("k5");
ASSERT_TRUE(
dbfull()->GetIntProperty("rocksdb.estimate-table-readers-mem", &int_num));
ASSERT_GT(int_num, 0U);
}
TEST(DBTest, FLUSH) {

@ -86,12 +86,17 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name,
}
DBPropertyType GetPropertyType(const Slice& property) {
DBPropertyType GetPropertyType(const Slice& property, bool* is_int_property,
bool* need_out_of_mutex) {
assert(is_int_property != nullptr);
assert(need_out_of_mutex != nullptr);
Slice in = property;
Slice prefix("rocksdb.");
*need_out_of_mutex = false;
if (!in.starts_with(prefix)) return kUnknown;
in.remove_prefix(prefix.size());
*is_int_property = false;
if (in.starts_with("num-files-at-level")) {
return kNumFilesAtLevel;
} else if (in == "levelstats") {
@ -104,7 +109,10 @@ DBPropertyType GetPropertyType(const Slice& property) {
return kDBStats;
} else if (in == "sstables") {
return kSsTables;
} else if (in == "num-immutable-mem-table") {
}
*is_int_property = true;
if (in == "num-immutable-mem-table") {
return kNumImmutableMemTable;
} else if (in == "mem-table-flush-pending") {
return kMemtableFlushPending;
@ -120,21 +128,32 @@ DBPropertyType GetPropertyType(const Slice& property) {
return kNumEntriesInImmutableMemtable;
} else if (in == "estimate-num-keys") {
return kEstimatedNumKeys;
} else if (in == "estimate-table-readers-mem") {
*need_out_of_mutex = true;
return kEstimatedUsageByTableReaders;
}
return kUnknown;
}
bool InternalStats::GetProperty(DBPropertyType property_type,
const Slice& property, std::string* value) {
if (property_type > kStartIntTypes) {
uint64_t int_value;
bool ret_value = GetIntProperty(property_type, property, &int_value);
if (ret_value) {
*value = std::to_string(int_value);
}
return ret_value;
bool InternalStats::GetIntPropertyOutOfMutex(DBPropertyType property_type,
Version* version,
uint64_t* value) const {
assert(value != nullptr);
if (property_type != kEstimatedUsageByTableReaders) {
return false;
}
if (version == nullptr) {
*value = 0;
} else {
*value = version->GetMemoryUsageByTableReaders();
}
return true;
}
bool InternalStats::GetStringProperty(DBPropertyType property_type,
const Slice& property,
std::string* value) {
assert(value != nullptr);
Version* current = cfd_->current();
Slice in = property;
@ -169,10 +188,10 @@ bool InternalStats::GetProperty(DBPropertyType property_type,
return true;
}
case kStats: {
if (!GetProperty(kCFStats, "rocksdb.cfstats", value)) {
if (!GetStringProperty(kCFStats, "rocksdb.cfstats", value)) {
return false;
}
if (!GetProperty(kDBStats, "rocksdb.dbstats", value)) {
if (!GetStringProperty(kDBStats, "rocksdb.dbstats", value)) {
return false;
}
return true;
@ -194,7 +213,6 @@ bool InternalStats::GetProperty(DBPropertyType property_type,
}
bool InternalStats::GetIntProperty(DBPropertyType property_type,
const Slice& property,
uint64_t* value) const {
Version* current = cfd_->current();

@ -41,9 +41,12 @@ enum DBPropertyType : uint32_t {
kNumEntriesInImmutableMemtable, // Return sum of number of entries in all
// the immutable mem tables.
kEstimatedNumKeys, // Estimated total number of keys in the database.
kEstimatedUsageByTableReaders, // Estimated memory by table readers.
};
extern DBPropertyType GetPropertyType(const Slice& property);
extern DBPropertyType GetPropertyType(const Slice& property,
bool* is_int_property,
bool* need_out_of_mutex);
class InternalStats {
public:
@ -191,11 +194,13 @@ class InternalStats {
uint64_t BumpAndGetBackgroundErrorCount() { return ++bg_error_count_; }
bool GetProperty(DBPropertyType property_type, const Slice& property,
std::string* value);
bool GetStringProperty(DBPropertyType property_type, const Slice& property,
std::string* value);
bool GetIntProperty(DBPropertyType property_type, const Slice& property,
uint64_t* value) const;
bool GetIntProperty(DBPropertyType property_type, uint64_t* value) const;
bool GetIntPropertyOutOfMutex(DBPropertyType property_type, Version* version,
uint64_t* value) const;
private:
void DumpDBStats(std::string* value);

@ -347,11 +347,20 @@ TEST(PlainTableDBTest, Flush) {
NewPlainTableFactory(plain_table_options));
}
DestroyAndReopen(&options);
uint64_t int_num;
ASSERT_TRUE(dbfull()->GetIntProperty(
"rocksdb.estimate-table-readers-mem", &int_num));
ASSERT_EQ(int_num, 0U);
ASSERT_OK(Put("1000000000000foo", "v1"));
ASSERT_OK(Put("0000000000000bar", "v2"));
ASSERT_OK(Put("1000000000000foo", "v3"));
dbfull()->TEST_FlushMemTable();
ASSERT_TRUE(dbfull()->GetIntProperty(
"rocksdb.estimate-table-readers-mem", &int_num));
ASSERT_GT(int_num, 0U);
TablePropertiesCollection ptc;
reinterpret_cast<DB*>(dbfull())->GetPropertiesOfAllTables(&ptc);
ASSERT_EQ(1U, ptc.size());

@ -92,6 +92,8 @@ public:
uint64_t ApproximateOffsetOf(const Slice& key) override;
virtual size_t ApproximateMemoryUsage() const override { return 0; }
void SetupForCompaction() override;
std::shared_ptr<const TableProperties> GetTableProperties() const override;

@ -185,6 +185,29 @@ Status TableCache::GetTableProperties(
return s;
}
size_t TableCache::GetMemoryUsageByTableReader(
const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd) {
Status s;
auto table_reader = fd.table_reader;
// table already been pre-loaded?
if (table_reader) {
return table_reader->ApproximateMemoryUsage();
}
Cache::Handle* table_handle = nullptr;
s = FindTable(toptions, internal_comparator, fd, &table_handle, true);
if (!s.ok()) {
return 0;
}
assert(table_handle);
auto table = GetTableReaderFromHandle(table_handle);
auto ret = table->ApproximateMemoryUsage();
ReleaseHandle(table_handle);
return ret;
}
void TableCache::Evict(Cache* cache, uint64_t file_number) {
cache->Erase(GetSliceForFileNumber(&file_number));
}

@ -80,6 +80,13 @@ class TableCache {
std::shared_ptr<const TableProperties>* properties,
bool no_io = false);
// Return total memory usage of the table reader of the file.
// 0 of table reader of the file is not loaded.
size_t GetMemoryUsageByTableReader(
const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd);
// Release the handle from a cache
void ReleaseHandle(Cache::Handle* handle);

@ -593,6 +593,18 @@ Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
return Status::OK();
}
size_t Version::GetMemoryUsageByTableReaders() {
size_t total_usage = 0;
for (auto& file_level : file_levels_) {
for (size_t i = 0; i < file_level.num_files; i++) {
total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
vset_->storage_options_, cfd_->internal_comparator(),
file_level.files[i].fd);
}
}
return total_usage;
}
uint64_t Version::GetEstimatedActiveKeys() {
// Estimation will be not accurate when:
// (1) there is merge keys

@ -234,6 +234,8 @@ class Version {
uint64_t GetEstimatedActiveKeys();
size_t GetMemoryUsageByTableReaders();
// used to sort files by size
struct Fsize {
int index;

@ -359,4 +359,15 @@ void Block::SetBlockPrefixIndex(BlockPrefixIndex* prefix_index) {
prefix_index_.reset(prefix_index);
}
size_t Block::ApproximateMemoryUsage() const {
size_t usage = size();
if (hash_index_) {
usage += hash_index_->ApproximateMemoryUsage();
}
if (prefix_index_) {
usage += prefix_index_->ApproximateMemoryUsage();
}
return usage;
}
} // namespace rocksdb

@ -50,6 +50,9 @@ class Block {
void SetBlockHashIndex(BlockHashIndex* hash_index);
void SetBlockPrefixIndex(BlockPrefixIndex* prefix_index);
// Report an approximation of how much memory has been used.
size_t ApproximateMemoryUsage() const;
private:
const char* data_;
size_t size_;

@ -142,6 +142,10 @@ class BlockBasedTable::IndexReader {
// The size of the index.
virtual size_t size() const = 0;
// Report an approximation of how much memory has been used other than memory
// that was allocated in block cache.
virtual size_t ApproximateMemoryUsage() const = 0;
protected:
const Comparator* comparator_;
};
@ -176,6 +180,11 @@ class BinarySearchIndexReader : public IndexReader {
virtual size_t size() const override { return index_block_->size(); }
virtual size_t ApproximateMemoryUsage() const override {
assert(index_block_);
return index_block_->ApproximateMemoryUsage();
}
private:
BinarySearchIndexReader(const Comparator* comparator, Block* index_block)
: IndexReader(comparator), index_block_(index_block) {
@ -292,6 +301,12 @@ class HashIndexReader : public IndexReader {
virtual size_t size() const override { return index_block_->size(); }
virtual size_t ApproximateMemoryUsage() const override {
assert(index_block_);
return index_block_->ApproximateMemoryUsage() +
prefixes_contents_.data.size();
}
private:
HashIndexReader(const Comparator* comparator, Block* index_block)
: IndexReader(comparator),
@ -544,6 +559,17 @@ std::shared_ptr<const TableProperties> BlockBasedTable::GetTableProperties()
return rep_->table_properties;
}
size_t BlockBasedTable::ApproximateMemoryUsage() const {
size_t usage = 0;
if (rep_->filter) {
usage += rep_->filter->ApproximateMemoryUsage();
}
if (rep_->index_reader) {
usage += rep_->index_reader->ApproximateMemoryUsage();
}
return usage;
}
// Load the meta-block from the file. On success, return the loaded meta block
// and its iterator.
Status BlockBasedTable::ReadMetaBlock(

@ -96,6 +96,8 @@ class BlockBasedTable : public TableReader {
std::shared_ptr<const TableProperties> GetTableProperties() const override;
size_t ApproximateMemoryUsage() const override;
~BlockBasedTable();
bool TEST_filter_block_preloaded() const;

@ -268,5 +268,8 @@ Iterator* CuckooTableReader::NewIterator(const ReadOptions&, Arena* arena) {
}
return iter;
}
size_t CuckooTableReader::ApproximateMemoryUsage() const { return 0; }
} // namespace rocksdb
#endif

@ -47,6 +47,9 @@ class CuckooTableReader: public TableReader {
Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override;
// Report an approximation of how much memory has been used.
size_t ApproximateMemoryUsage() const override;
// Following methods are not implemented for Cuckoo Table Reader
uint64_t ApproximateOffsetOf(const Slice& key) override { return 0; }
void SetupForCompaction() override {}

@ -184,4 +184,7 @@ bool FilterBlockReader::MayMatch(uint64_t block_offset, const Slice& entry) {
return true; // Errors are treated as potential matches
}
size_t FilterBlockReader::ApproximateMemoryUsage() const {
return num_ * 4 + 5 + (offset_ - data_);
}
}

@ -74,6 +74,7 @@ class FilterBlockReader {
bool delete_contents_after_use = false);
bool KeyMayMatch(uint64_t block_offset, const Slice& key);
bool PrefixMayMatch(uint64_t block_offset, const Slice& prefix);
size_t ApproximateMemoryUsage() const;
private:
const FilterPolicy* policy_;

@ -78,6 +78,10 @@ class PlainTableReader: public TableReader {
return table_properties_;
}
virtual size_t ApproximateMemoryUsage() const override {
return arena_.MemoryAllocatedBytes();
}
PlainTableReader(const Options& options, unique_ptr<RandomAccessFile>&& file,
const EnvOptions& storage_options,
const InternalKeyComparator& internal_comparator,

@ -52,6 +52,9 @@ class TableReader {
// Prepare work that can be done before the real Get()
virtual void Prepare(const Slice& target) {}
// Report an approximation of how much memory has been used.
virtual size_t ApproximateMemoryUsage() const = 0;
// Calls (*result_handler)(handle_context, ...) repeatedly, starting with
// the entry found after a call to Seek(key), until result_handler returns
// false, where k is the actual internal key for a row found and v as the

Loading…
Cancel
Save