Add GetAggregatedIntProperty(): returns the aggregated value from all CFs

Summary:
This patch adds GetAggregatedIntProperty() that returns the aggregated
value from all CFs

Test Plan: Added a test in db_test

Reviewers: igor, sdong, anthony, IslamAbdelRahman, rven

Reviewed By: rven

Subscribers: rven, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D49497
main
Yueh-Hsuan Chiang 9 years ago
parent 93a9667223
commit 3ecbab0040
  1. 1
      HISTORY.md
  2. 66
      db/db_impl.cc
  3. 8
      db/db_impl.h
  4. 58
      db/db_test.cc
  5. 5
      include/rocksdb/db.h
  6. 6
      include/rocksdb/utilities/stackable_db.h

@ -7,6 +7,7 @@
* TablePropertiesCollectorFactory::CreateTablePropertiesCollector() now takes an option Context, containing the information of column family ID for the file being written.
* Remove DefaultCompactionFilterFactory.
* Introduce CreateLoggerFromOptions(), this function create a Logger for provided DBOptions.
* Add GetAggregatedIntProperty(), which returns the sum of the GetIntProperty of all the column families.
## 4.1.0 (10/8/2015)
### New Features

@ -4286,17 +4286,16 @@ bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
GetPropertyType(property, &is_int_property, &need_out_of_mutex);
value->clear();
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
if (is_int_property) {
uint64_t int_value;
bool ret_value = GetIntPropertyInternal(column_family, property_type,
need_out_of_mutex, &int_value);
bool ret_value = GetIntPropertyInternal(
cfd, property_type, need_out_of_mutex, false, &int_value);
if (ret_value) {
*value = ToString(int_value);
}
return ret_value;
} else {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
InstrumentedMutexLock l(&mutex_);
return cfd->internal_stats()->GetStringProperty(property_type, property,
value);
@ -4312,31 +4311,70 @@ bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
if (!is_int_property) {
return false;
}
return GetIntPropertyInternal(column_family, property_type, need_out_of_mutex,
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
return GetIntPropertyInternal(cfd, property_type, need_out_of_mutex, false,
value);
}
bool DBImpl::GetIntPropertyInternal(ColumnFamilyHandle* column_family,
bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
DBPropertyType property_type,
bool need_out_of_mutex, uint64_t* value) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
bool need_out_of_mutex, bool is_locked,
uint64_t* value) {
if (!need_out_of_mutex) {
InstrumentedMutexLock l(&mutex_);
return cfd->internal_stats()->GetIntProperty(property_type, value, this);
if (is_locked) {
mutex_.AssertHeld();
return cfd->internal_stats()->GetIntProperty(property_type, value, this);
} else {
InstrumentedMutexLock l(&mutex_);
return cfd->internal_stats()->GetIntProperty(property_type, value, this);
}
} else {
SuperVersion* sv = GetAndRefSuperVersion(cfd);
SuperVersion* sv = nullptr;
if (!is_locked) {
sv = GetAndRefSuperVersion(cfd);
} else {
sv = cfd->GetSuperVersion();
}
bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
property_type, sv->current, value);
ReturnAndCleanupSuperVersion(cfd, sv);
if (!is_locked) {
ReturnAndCleanupSuperVersion(cfd, sv);
}
return ret;
}
}
bool DBImpl::GetAggregatedIntProperty(const Slice& property,
uint64_t* aggregated_value) {
bool need_out_of_mutex;
bool is_int_property;
DBPropertyType property_type =
GetPropertyType(property, &is_int_property, &need_out_of_mutex);
if (!is_int_property) {
return false;
}
uint64_t sum = 0;
{
// Needs mutex to protect the list of column families.
InstrumentedMutexLock l(&mutex_);
uint64_t value;
for (auto* cfd : *versions_->GetColumnFamilySet()) {
if (GetIntPropertyInternal(cfd, property_type, need_out_of_mutex, true,
&value)) {
sum += value;
} else {
return false;
}
}
}
*aggregated_value = sum;
return true;
}
SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
// TODO(ljin): consider using GetReferencedSuperVersion() directly
return cfd->GetThreadLocalSuperVersion(&mutex_);

@ -122,6 +122,9 @@ class DBImpl : public DB {
using DB::GetIntProperty;
virtual bool GetIntProperty(ColumnFamilyHandle* column_family,
const Slice& property, uint64_t* value) override;
using DB::GetAggregatedIntProperty;
virtual bool GetAggregatedIntProperty(const Slice& property,
uint64_t* aggregated_value) override;
using DB::GetApproximateSizes;
virtual void GetApproximateSizes(ColumnFamilyHandle* column_family,
const Range* range, int n, uint64_t* sizes,
@ -824,9 +827,10 @@ class DBImpl : public DB {
const Slice& key, std::string* value,
bool* value_found = nullptr);
bool GetIntPropertyInternal(ColumnFamilyHandle* column_family,
bool GetIntPropertyInternal(ColumnFamilyData* cfd,
DBPropertyType property_type,
bool need_out_of_mutex, uint64_t* value);
bool need_out_of_mutex, bool is_locked,
uint64_t* value);
};
// Sanitize db options. The caller should delete result.info_log if

@ -200,6 +200,59 @@ TEST_F(DBTest, WriteEmptyBatch) {
}
#ifndef ROCKSDB_LITE
TEST_F(DBTest, GetAggregatedIntPropertyTest) {
const int kKeySize = 100;
const int kValueSize = 500;
const int kKeyNum = 100;
Options options;
options.env = env_;
options.create_if_missing = true;
options.write_buffer_size = (kKeySize + kValueSize) * kKeyNum / 10;
// Make them never flush
options.min_write_buffer_number_to_merge = 1000;
options.max_write_buffer_number = 1000;
options = CurrentOptions(options);
CreateAndReopenWithCF({"one", "two", "three", "four"}, options);
Random rnd(301);
for (auto* handle : handles_) {
for (int i = 0; i < kKeyNum; ++i) {
db_->Put(WriteOptions(), handle, RandomString(&rnd, kKeySize),
RandomString(&rnd, kValueSize));
}
}
uint64_t manual_sum = 0;
uint64_t api_sum = 0;
uint64_t value = 0;
for (auto* handle : handles_) {
ASSERT_TRUE(
db_->GetIntProperty(handle, DB::Properties::kSizeAllMemTables, &value));
manual_sum += value;
}
ASSERT_TRUE(db_->GetAggregatedIntProperty(DB::Properties::kSizeAllMemTables,
&api_sum));
ASSERT_GT(manual_sum, 0);
ASSERT_EQ(manual_sum, api_sum);
ASSERT_FALSE(db_->GetAggregatedIntProperty(DB::Properties::kDBStats, &value));
uint64_t before_flush_trm;
uint64_t after_flush_trm;
for (auto* handle : handles_) {
ASSERT_TRUE(db_->GetAggregatedIntProperty(
DB::Properties::kEstimateTableReadersMem, &before_flush_trm));
// Issue flush and expect larger memory usage of table readers.
db_->Flush(FlushOptions(), handle);
ASSERT_TRUE(db_->GetAggregatedIntProperty(
DB::Properties::kEstimateTableReadersMem, &after_flush_trm));
ASSERT_GT(after_flush_trm, before_flush_trm);
}
}
TEST_F(DBTest, ReadOnlyDB) {
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put("bar", "v2"));
@ -5717,6 +5770,11 @@ class ModelDB: public DB {
const Slice& property, uint64_t* value) override {
return false;
}
using DB::GetAggregatedIntProperty;
virtual bool GetAggregatedIntProperty(const Slice& property,
uint64_t* value) override {
return false;
}
using DB::GetApproximateSizes;
virtual void GetApproximateSizes(ColumnFamilyHandle* column_family,
const Range* range, int n, uint64_t* sizes,

@ -428,6 +428,11 @@ class DB {
return GetIntProperty(DefaultColumnFamily(), property, value);
}
// Same as GetIntProperty(), but this one returns the aggregated int
// property from all column families.
virtual bool GetAggregatedIntProperty(const Slice& property,
uint64_t* value) = 0;
// For each i in [0,n-1], store in "sizes[i]", the approximate
// file system space used by keys in "[range[i].start .. range[i].limit)".
//

@ -144,6 +144,12 @@ class StackableDB : public DB {
return db_->GetIntProperty(column_family, property, value);
}
using DB::GetAggregatedIntProperty;
virtual bool GetAggregatedIntProperty(const Slice& property,
uint64_t* value) override {
return db_->GetAggregatedIntProperty(property, value);
}
using DB::GetApproximateSizes;
virtual void GetApproximateSizes(ColumnFamilyHandle* column_family,
const Range* r, int n, uint64_t* sizes,

Loading…
Cancel
Save