diff --git a/CMakeLists.txt b/CMakeLists.txt index b60a030b5..d2ea6efb3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -243,6 +243,7 @@ set(SOURCES utilities/flashcache/flashcache.cc utilities/geodb/geodb_impl.cc utilities/leveldb_options/leveldb_options.cc + utilities/memory/memory_util.cc utilities/merge_operators/string_append/stringappend.cc utilities/merge_operators/string_append/stringappend2.cc utilities/merge_operators/put.cc @@ -376,6 +377,7 @@ set(TESTS utilities/document/document_db_test.cc utilities/document/json_document_test.cc utilities/geodb/geodb_test.cc + utilities/memory/memory_test.cc utilities/merge_operators/string_append/stringappend_test.cc utilities/redis/redis_lists_test.cc utilities/spatialdb/spatial_db_test.cc diff --git a/HISTORY.md b/HISTORY.md index b2e0cb297..2fa097d3d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * Remove DefaultCompactionFilterFactory. * Introduce CreateLoggerFromOptions(), this function create a Logger for provided DBOptions. * Add GetAggregatedIntProperty(), which returns the sum of the GetIntProperty of all the column families. +* Add MemoryUtil in rocksdb/utilities/memory.h. It currently offers a way to get the memory usage by type from a list rocksdb instances. ## 4.1.0 (10/8/2015) ### New Features diff --git a/Makefile b/Makefile index 647cc76a2..bbb59891f 100644 --- a/Makefile +++ b/Makefile @@ -272,6 +272,7 @@ TESTS = \ mock_env_test \ memtable_list_test \ merge_helper_test \ + memory_test \ merge_test \ merger_test \ redis_test \ @@ -882,6 +883,9 @@ write_controller_test: db/write_controller_test.o $(LIBOBJECTS) $(TESTHARNESS) merge_helper_test: db/merge_helper_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +memory_test: utilities/memory/memory_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + merge_test: db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/db_impl.h b/db/db_impl.h index 72b146717..3c538dac1 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -308,6 +308,12 @@ class DBImpl : public DB { uint64_t TEST_LogfileNumber(); + // Returns column family name to ImmutableCFOptions map. + Status TEST_GetAllImmutableCFOptions( + std::unordered_map* iopts_map); + + Cache* TEST_table_cache() { return table_cache_.get(); } + #endif // NDEBUG // Returns the list of live files in 'live' and the list diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 73dca560d..ef8a12d10 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -135,5 +135,24 @@ uint64_t DBImpl::TEST_LogfileNumber() { return logfile_number_; } +Status DBImpl::TEST_GetAllImmutableCFOptions( + std::unordered_map* iopts_map) { + std::vector cf_names; + std::vector iopts; + { + InstrumentedMutexLock l(&mutex_); + for (auto cfd : *versions_->GetColumnFamilySet()) { + cf_names.push_back(cfd->GetName()); + iopts.push_back(cfd->ioptions()); + } + } + iopts_map->clear(); + for (size_t i = 0; i < cf_names.size(); ++i) { + iopts_map->insert({cf_names[i], iopts[i]}); + } + + return Status::OK(); +} + } // namespace rocksdb #endif // NDEBUG diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 38fea64bc..6c933400c 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -15,15 +15,16 @@ #include #include #include -#include "rocksdb/metadata.h" -#include "rocksdb/version.h" +#include "rocksdb/immutable_options.h" #include "rocksdb/iterator.h" -#include "rocksdb/options.h" -#include "rocksdb/types.h" -#include "rocksdb/transaction_log.h" #include "rocksdb/listener.h" +#include "rocksdb/metadata.h" +#include "rocksdb/options.h" #include "rocksdb/snapshot.h" #include "rocksdb/thread_status.h" +#include "rocksdb/transaction_log.h" +#include "rocksdb/types.h" +#include "rocksdb/version.h" #ifdef _WIN32 // Windows API macro interference diff --git a/include/rocksdb/utilities/memory_util.h b/include/rocksdb/utilities/memory_util.h new file mode 100644 index 000000000..127fa6f85 --- /dev/null +++ b/include/rocksdb/utilities/memory_util.h @@ -0,0 +1,50 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE + +#pragma once + +#include +#include +#include +#include + +#include "rocksdb/cache.h" +#include "rocksdb/db.h" + +namespace rocksdb { + +// Returns the current memory usage of the specified DB instances. +class MemoryUtil { + public: + enum UsageType : int { + // Memory usage of all the mem-tables. + kMemTableTotal = 0, + // Memory usage of those un-flushed mem-tables. + kMemTableUnFlushed = 1, + // Memory usage of all the table readers. + kTableReadersTotal = 2, + // Memory usage by Cache. + kCacheTotal = 3, + kNumUsageTypes = 4 + }; + + // Returns the approximate memory usage of different types in the input + // list of DBs and Cache set. For instance, in the output vector + // usage_by_type, usage_by_type[kMemTableTotal] will store the memory + // usage of all the mem-tables from all the input rocksdb instances. + // + // Note that for memory usage inside Cache class, we will + // only report the usage of the input "cache_set" without + // including those Cache usage inside the input list "dbs" + // of DBs. + static Status GetApproximateMemoryUsageByType( + const std::vector& dbs, + const std::unordered_set cache_set, + std::map* usage_by_type); +}; +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/src.mk b/src.mk index 9b87d6ede..46083c750 100644 --- a/src.mk +++ b/src.mk @@ -113,6 +113,7 @@ LIB_SOURCES = \ utilities/flashcache/flashcache.cc \ utilities/geodb/geodb_impl.cc \ utilities/leveldb_options/leveldb_options.cc \ + utilities/memory/memory_util.cc \ utilities/merge_operators/put.cc \ utilities/merge_operators/string_append/stringappend2.cc \ utilities/merge_operators/string_append/stringappend.cc \ @@ -248,6 +249,7 @@ TEST_BENCH_SOURCES = \ utilities/document/document_db_test.cc \ utilities/document/json_document_test.cc \ utilities/geodb/geodb_test.cc \ + utilities/memory/memory_test.cc \ utilities/merge_operators/string_append/stringappend_test.cc \ utilities/redis/redis_lists_test.cc \ utilities/spatialdb/spatial_db_test.cc \ diff --git a/utilities/memory/memory_test.cc b/utilities/memory/memory_test.cc new file mode 100644 index 000000000..3b24476ee --- /dev/null +++ b/utilities/memory/memory_test.cc @@ -0,0 +1,271 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE + +#include "db/db_impl.h" +#include "rocksdb/cache.h" +#include "rocksdb/table.h" +#include "rocksdb/utilities/memory_util.h" +#include "rocksdb/utilities/stackable_db.h" +#include "table/block_based_table_factory.h" +#include "util/string_util.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace rocksdb { + +class MemoryTest : public testing::Test { + public: + MemoryTest() : kDbDir(test::TmpDir() + "/memory_test"), rnd_(301) { + assert(Env::Default()->CreateDirIfMissing(kDbDir).ok()); + } + + std::string GetDBName(int id) { return kDbDir + "db_" + ToString(id); } + + std::string RandomString(int len) { + std::string r; + test::RandomString(&rnd_, len, &r); + return r; + } + + void UpdateUsagesHistory(const std::vector& dbs) { + std::map usage_by_type; + ASSERT_OK(GetApproximateMemoryUsageByType(dbs, &usage_by_type)); + for (int i = 0; i < MemoryUtil::kNumUsageTypes; ++i) { + usage_history_[i].push_back( + usage_by_type[static_cast(i)]); + } + } + + void GetCachePointersFromTableFactory( + const TableFactory* factory, + std::unordered_set* cache_set) { + const BlockBasedTableFactory* bbtf = + dynamic_cast(factory); + if (bbtf != nullptr) { + const auto bbt_opts = bbtf->GetTableOptions(); + cache_set->insert(bbt_opts.block_cache.get()); + cache_set->insert(bbt_opts.block_cache_compressed.get()); + } + } + + void GetCachePointers(const std::vector& dbs, + std::unordered_set* cache_set) { + cache_set->clear(); + + for (auto* db : dbs) { + // Cache from DBImpl + StackableDB* sdb = dynamic_cast(db); + DBImpl* db_impl = dynamic_cast(sdb ? sdb->GetBaseDB() : db); + if (db_impl != nullptr) { + cache_set->insert(db_impl->TEST_table_cache()); + } + + // Cache from DBOptions + cache_set->insert(db->GetDBOptions().row_cache.get()); + + // Cache from table factories + std::unordered_map iopts_map; + if (db_impl != nullptr) { + ASSERT_OK(db_impl->TEST_GetAllImmutableCFOptions(&iopts_map)); + } + for (auto pair : iopts_map) { + GetCachePointersFromTableFactory(pair.second->table_factory, cache_set); + } + } + } + + Status GetApproximateMemoryUsageByType( + const std::vector& dbs, + std::map* usage_by_type) { + std::unordered_set cache_set; + GetCachePointers(dbs, &cache_set); + + return MemoryUtil::GetApproximateMemoryUsageByType(dbs, cache_set, + usage_by_type); + } + + const std::string kDbDir; + Random rnd_; + std::vector usage_history_[MemoryUtil::kNumUsageTypes]; +}; + +TEST_F(MemoryTest, SharedBlockCacheTotal) { + std::vector dbs; + std::vector usage_by_type; + const int kNumDBs = 10; + const int kKeySize = 100; + const int kValueSize = 500; + Options opt; + opt.create_if_missing = true; + opt.write_buffer_size = kKeySize + kValueSize; + opt.max_write_buffer_number = 10; + opt.min_write_buffer_number_to_merge = 10; + BlockBasedTableOptions bbt_opts; + bbt_opts.block_cache = NewLRUCache(4096 * 1000 * 10); + for (int i = 0; i < kNumDBs; ++i) { + DestroyDB(GetDBName(i), opt); + DB* db = nullptr; + ASSERT_OK(DB::Open(opt, GetDBName(i), &db)); + dbs.push_back(db); + } + + std::vector keys_by_db[kNumDBs]; + + // Fill one memtable per Put to make memtable use more memory. + for (int p = 0; p < opt.min_write_buffer_number_to_merge / 2; ++p) { + for (int i = 0; i < kNumDBs; ++i) { + for (int j = 0; j < 100; ++j) { + keys_by_db[i].emplace_back(RandomString(kKeySize)); + dbs[i]->Put(WriteOptions(), keys_by_db[i].back(), + RandomString(kValueSize)); + } + dbs[i]->Flush(FlushOptions()); + } + } + for (int i = 0; i < kNumDBs; ++i) { + for (auto& key : keys_by_db[i]) { + std::string value; + dbs[i]->Get(ReadOptions(), key, &value); + } + UpdateUsagesHistory(dbs); + } + for (size_t i = 1; i < usage_history_[MemoryUtil::kMemTableTotal].size(); + ++i) { + // Expect EQ as we didn't flush more memtables. + ASSERT_EQ(usage_history_[MemoryUtil::kTableReadersTotal][i], + usage_history_[MemoryUtil::kTableReadersTotal][i - 1]); + } + for (int i = 0; i < kNumDBs; ++i) { + delete dbs[i]; + } +} + +TEST_F(MemoryTest, MemTableAndTableReadersTotal) { + std::vector dbs; + std::vector usage_by_type; + std::vector> vec_handles; + const int kNumDBs = 10; + const int kKeySize = 100; + const int kValueSize = 500; + Options opt; + opt.create_if_missing = true; + opt.create_missing_column_families = true; + opt.write_buffer_size = kKeySize + kValueSize; + opt.max_write_buffer_number = 10; + opt.min_write_buffer_number_to_merge = 10; + + std::vector cf_descs = { + {kDefaultColumnFamilyName, ColumnFamilyOptions(opt)}, + {"one", ColumnFamilyOptions(opt)}, + {"two", ColumnFamilyOptions(opt)}, + }; + + for (int i = 0; i < kNumDBs; ++i) { + DestroyDB(GetDBName(i), opt); + std::vector handles; + dbs.emplace_back(); + vec_handles.emplace_back(); + ASSERT_OK(DB::Open(DBOptions(opt), GetDBName(i), cf_descs, + &vec_handles.back(), &dbs.back())); + } + + // Fill one memtable per Put to make memtable use more memory. + for (int p = 0; p < opt.min_write_buffer_number_to_merge / 2; ++p) { + for (int i = 0; i < kNumDBs; ++i) { + for (auto* handle : vec_handles[i]) { + dbs[i]->Put(WriteOptions(), handle, RandomString(kKeySize), + RandomString(kValueSize)); + UpdateUsagesHistory(dbs); + } + } + } + // Expect the usage history is monotonically increasing + for (size_t i = 1; i < usage_history_[MemoryUtil::kMemTableTotal].size(); + ++i) { + ASSERT_GT(usage_history_[MemoryUtil::kMemTableTotal][i], + usage_history_[MemoryUtil::kMemTableTotal][i - 1]); + ASSERT_GT(usage_history_[MemoryUtil::kMemTableUnFlushed][i], + usage_history_[MemoryUtil::kMemTableUnFlushed][i - 1]); + ASSERT_EQ(usage_history_[MemoryUtil::kTableReadersTotal][i], + usage_history_[MemoryUtil::kTableReadersTotal][i - 1]); + } + + size_t usage_check_point = usage_history_[MemoryUtil::kMemTableTotal].size(); + std::vector iters; + + // Create an iterator and flush all memtables for each db + for (int i = 0; i < kNumDBs; ++i) { + iters.push_back(dbs[i]->NewIterator(ReadOptions())); + dbs[i]->Flush(FlushOptions()); + + for (int j = 0; j < 100; ++j) { + std::string value; + dbs[i]->Get(ReadOptions(), RandomString(kKeySize), &value); + } + + UpdateUsagesHistory(dbs); + } + for (size_t i = usage_check_point; + i < usage_history_[MemoryUtil::kMemTableTotal].size(); ++i) { + // Since memtables are pinned by iterators, we don't expect the + // memory usage of all the memtables decreases as they are pinned + // by iterators. + ASSERT_GE(usage_history_[MemoryUtil::kMemTableTotal][i], + usage_history_[MemoryUtil::kMemTableTotal][i - 1]); + // Expect the usage history from the "usage_decay_point" is + // monotonically decreasing. + ASSERT_LT(usage_history_[MemoryUtil::kMemTableUnFlushed][i], + usage_history_[MemoryUtil::kMemTableUnFlushed][i - 1]); + // Expect the usage history of the table readers increases + // as we flush tables. + ASSERT_GT(usage_history_[MemoryUtil::kTableReadersTotal][i], + usage_history_[MemoryUtil::kTableReadersTotal][i - 1]); + ASSERT_GT(usage_history_[MemoryUtil::kCacheTotal][i], + usage_history_[MemoryUtil::kCacheTotal][i - 1]); + } + usage_check_point = usage_history_[MemoryUtil::kMemTableTotal].size(); + for (int i = 0; i < kNumDBs; ++i) { + delete iters[i]; + UpdateUsagesHistory(dbs); + } + for (size_t i = usage_check_point; + i < usage_history_[MemoryUtil::kMemTableTotal].size(); ++i) { + // Expect the usage of all memtables decreasing as we delete iterators. + ASSERT_LT(usage_history_[MemoryUtil::kMemTableTotal][i], + usage_history_[MemoryUtil::kMemTableTotal][i - 1]); + // Since the memory usage of un-flushed memtables is only affected + // by Put and flush, we expect EQ here as we only delete iterators. + ASSERT_EQ(usage_history_[MemoryUtil::kMemTableUnFlushed][i], + usage_history_[MemoryUtil::kMemTableUnFlushed][i - 1]); + // Expect EQ as we didn't flush more memtables. + ASSERT_EQ(usage_history_[MemoryUtil::kTableReadersTotal][i], + usage_history_[MemoryUtil::kTableReadersTotal][i - 1]); + } + + for (int i = 0; i < kNumDBs; ++i) { + for (auto* handle : vec_handles[i]) { + delete handle; + } + delete dbs[i]; + } +} +} // namespace rocksdb + +int main(int argc, char** argv) { +#if !(defined NDEBUG) || !defined(OS_WIN) + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +#else + return 0; +#endif +} +#else +int main(int argc, char** argv) { + printf("Skipped in RocksDBLite as utilities are not supported."); + return 0; +} +#endif // !ROCKSDB_LITE diff --git a/utilities/memory/memory_util.cc b/utilities/memory/memory_util.cc new file mode 100644 index 000000000..f5580174a --- /dev/null +++ b/utilities/memory/memory_util.cc @@ -0,0 +1,52 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE + +#include "rocksdb/utilities/memory_util.h" + +#include "db/db_impl.h" + +namespace rocksdb { + +Status MemoryUtil::GetApproximateMemoryUsageByType( + const std::vector& dbs, + const std::unordered_set cache_set, + std::map* usage_by_type) { + usage_by_type->clear(); + + // MemTable + for (auto* db : dbs) { + uint64_t usage = 0; + if (db->GetAggregatedIntProperty(DB::Properties::kSizeAllMemTables, + &usage)) { + (*usage_by_type)[MemoryUtil::kMemTableTotal] += usage; + } + if (db->GetAggregatedIntProperty(DB::Properties::kCurSizeAllMemTables, + &usage)) { + (*usage_by_type)[MemoryUtil::kMemTableUnFlushed] += usage; + } + } + + // Table Readers + for (auto* db : dbs) { + uint64_t usage = 0; + if (db->GetAggregatedIntProperty(DB::Properties::kEstimateTableReadersMem, + &usage)) { + (*usage_by_type)[MemoryUtil::kTableReadersTotal] += usage; + } + } + + // Cache + for (const auto* cache : cache_set) { + if (cache != nullptr) { + (*usage_by_type)[MemoryUtil::kCacheTotal] += cache->GetUsage(); + } + } + + return Status::OK(); +} +} // namespace rocksdb +#endif // !ROCKSDB_LITE