Support user-defined table stats collector

Summary:
1. Added a new option that support user-defined table stats collection.
2. Added a deleted key stats collector in `utilities`

Test Plan:
Added a unit test for newly added code.
Also ran make check to make sure other tests are not broken.

Reviewers: dhruba, haobo

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13491
main
Kai Liu 11 years ago
parent 7e91b86f4d
commit 994575c134
  1. 4
      Makefile
  2. 30
      db/db_impl.cc
  3. 55
      db/table_stats_collector.cc
  4. 58
      db/table_stats_collector.h
  5. 259
      db/table_stats_collector_test.cc
  6. 12
      include/rocksdb/options.h
  7. 24
      include/rocksdb/table_stats.h
  8. 42
      table/table_builder.cc
  9. 7
      util/options.cc

@ -36,6 +36,7 @@ VALGRIND_VER := $(join $(VALGRIND_VER),valgrind)
VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full
TESTS = \
table_stats_collector_test \
arena_test \
auto_roll_logger_test \
block_test \
@ -199,6 +200,9 @@ signal_test: util/signal_test.o $(LIBOBJECTS)
arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
table_stats_collector_test: db/table_stats_collector_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/table_stats_collector_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

@ -13,15 +13,15 @@
#include <climits>
#include <cstdio>
#include <set>
#include <string>
#include <stdint.h>
#include <stdexcept>
#include <vector>
#include <stdint.h>
#include <string>
#include <unordered_set>
#include <vector>
#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/db_iter.h"
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
@ -30,9 +30,11 @@
#include "db/merge_helper.h"
#include "db/prefix_filter_iterator.h"
#include "db/table_cache.h"
#include "db/table_stats_collector.h"
#include "db/transaction_log_impl.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "db/transaction_log_impl.h"
#include "port/port.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
@ -40,7 +42,6 @@
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table_builder.h"
#include "port/port.h"
#include "table/block.h"
#include "table/merger.h"
#include "table/table.h"
@ -190,6 +191,23 @@ Options SanitizeOptions(const std::string& dbname,
// Use dbname as default
result.wal_dir = dbname;
}
// -- Sanitize the table stats collector
// All user defined stats collectors will be wrapped by
// UserKeyTableStatsCollector since for them they only have the knowledge of
// the user keys; internal keys are invisible to them.
auto& collectors = result.table_stats_collectors;
for (size_t i = 0; i < result.table_stats_collectors.size(); ++i) {
assert(collectors[i]);
collectors[i] =
std::make_shared<UserKeyTableStatsCollector>(collectors[i]);
}
// Add collector to collect internal key statistics
collectors.push_back(
std::make_shared<InternalKeyStatsCollector>()
);
return result;
}

@ -0,0 +1,55 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/table_stats_collector.h"
#include "db/dbformat.h"
#include "util/coding.h"
namespace rocksdb {
Status InternalKeyStatsCollector::Add(const Slice& key, const Slice& value) {
ParsedInternalKey ikey;
if (!ParseInternalKey(key, &ikey)) {
return Status::InvalidArgument("Invalid internal key");
}
if (ikey.type == ValueType::kTypeDeletion) {
++deleted_keys_;
}
return Status::OK();
}
Status InternalKeyStatsCollector::Finish(
TableStats::UserCollectedStats* stats) {
assert(stats);
assert(stats->find(InternalKeyTableStatsNames::kDeletedKeys) == stats->end());
std::string val;
PutVarint64(&val, deleted_keys_);
stats->insert(std::make_pair(InternalKeyTableStatsNames::kDeletedKeys, val));
return Status::OK();
}
Status UserKeyTableStatsCollector::Add(const Slice& key, const Slice& value) {
ParsedInternalKey ikey;
if (!ParseInternalKey(key, &ikey)) {
return Status::InvalidArgument("Invalid internal key");
}
return collector_->Add(ikey.user_key, value);
}
Status UserKeyTableStatsCollector::Finish(
TableStats::UserCollectedStats* stats) {
return collector_->Finish(stats);
}
const std::string InternalKeyTableStatsNames::kDeletedKeys
= "rocksdb.deleted.keys";
} // namespace rocksdb

@ -0,0 +1,58 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// This file defines a collection of statistics collectors.
#pragma once
#include "rocksdb/table_stats.h"
#include <memory>
#include <string>
#include <vector>
namespace rocksdb {
struct InternalKeyTableStatsNames {
static const std::string kDeletedKeys;
};
// Collecting the statistics for internal keys. Visible only by internal
// rocksdb modules.
class InternalKeyStatsCollector : public TableStatsCollector {
public:
virtual Status Add(const Slice& key, const Slice& value);
virtual Status Finish(TableStats::UserCollectedStats* stats);
virtual const char* Name() const { return "InternalKeyStatsCollector"; }
private:
uint64_t deleted_keys_ = 0;
};
// When rocksdb creates a new table, it will encode all "user keys" into
// "internal keys", which contains meta information of a given entry.
//
// This class extracts user key from the encoded internal key when Add() is
// invoked.
class UserKeyTableStatsCollector : public TableStatsCollector {
public:
explicit UserKeyTableStatsCollector(TableStatsCollector* collector):
UserKeyTableStatsCollector(
std::shared_ptr<TableStatsCollector>(collector)
) {
}
explicit UserKeyTableStatsCollector(
std::shared_ptr<TableStatsCollector> collector) : collector_(collector) {
}
virtual ~UserKeyTableStatsCollector() { }
virtual Status Add(const Slice& key, const Slice& value);
virtual Status Finish(TableStats::UserCollectedStats* stats);
virtual const char* Name() const { return collector_->Name(); }
protected:
std::shared_ptr<TableStatsCollector> collector_;
};
} // namespace rocksdb

@ -0,0 +1,259 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <map>
#include <memory>
#include <string>
#include "db/dbformat.h"
#include "db/db_impl.h"
#include "db/table_stats_collector.h"
#include "rocksdb/table_builder.h"
#include "rocksdb/table_stats.h"
#include "table/table.h"
#include "util/coding.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace rocksdb {
class TableStatsTest {
private:
unique_ptr<Table> table_;
};
// TODO(kailiu) the following classes should be moved to some more general
// places, so that other tests can also make use of them.
// `FakeWritableFile` and `FakeRandomeAccessFile` bypass the real file system
// and therefore enable us to quickly setup the tests.
class FakeWritableFile : public WritableFile {
public:
~FakeWritableFile() { }
const std::string& contents() const { return contents_; }
virtual Status Close() { return Status::OK(); }
virtual Status Flush() { return Status::OK(); }
virtual Status Sync() { return Status::OK(); }
virtual Status Append(const Slice& data) {
contents_.append(data.data(), data.size());
return Status::OK();
}
private:
std::string contents_;
};
class FakeRandomeAccessFile : public RandomAccessFile {
public:
explicit FakeRandomeAccessFile(const Slice& contents)
: contents_(contents.data(), contents.size()) {
}
virtual ~FakeRandomeAccessFile() { }
uint64_t Size() const { return contents_.size(); }
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
if (offset > contents_.size()) {
return Status::InvalidArgument("invalid Read offset");
}
if (offset + n > contents_.size()) {
n = contents_.size() - offset;
}
memcpy(scratch, &contents_[offset], n);
*result = Slice(scratch, n);
return Status::OK();
}
private:
std::string contents_;
};
class DumbLogger : public Logger {
public:
virtual void Logv(const char* format, va_list ap) { }
virtual size_t GetLogFileSize() const { return 0; }
};
// Utilities test functions
void MakeBuilder(
const Options& options,
std::unique_ptr<FakeWritableFile>* writable,
std::unique_ptr<TableBuilder>* builder) {
writable->reset(new FakeWritableFile);
builder->reset(
new TableBuilder(options, writable->get())
);
}
void OpenTable(
const Options& options,
const std::string& contents,
std::unique_ptr<Table>* table) {
std::unique_ptr<RandomAccessFile> file(new FakeRandomeAccessFile(contents));
auto s = Table::Open(
options,
EnvOptions(),
std::move(file),
contents.size(),
table
);
ASSERT_OK(s);
}
// Collects keys that starts with "A" in a table.
class RegularKeysStartWithA: public TableStatsCollector {
public:
const char* Name() const { return "RegularKeysStartWithA"; }
Status Finish(TableStats::UserCollectedStats* stats) {
std::string encoded;
PutVarint32(&encoded, count_);
*stats = TableStats::UserCollectedStats {
{ "TableStatsTest", "Rocksdb" },
{ "Count", encoded }
};
return Status::OK();
}
Status Add(const Slice& user_key, const Slice& value) {
// simply asssume all user keys are not empty.
if (user_key.data()[0] == 'A') {
++count_;
}
return Status::OK();
}
private:
uint32_t count_ = 0;
};
TEST(TableStatsTest, CustomizedTableStatsCollector) {
Options options;
// make sure the entries will be inserted with order.
std::map<std::string, std::string> kvs = {
{"About", "val5"}, // starts with 'A'
{"Abstract", "val2"}, // starts with 'A'
{"Around", "val7"}, // starts with 'A'
{"Beyond", "val3"},
{"Builder", "val1"},
{"Cancel", "val4"},
{"Find", "val6"},
};
// Test stats collectors with internal keys or regular keys
for (bool encode_as_internal : { true, false }) {
// -- Step 1: build table
auto collector = new RegularKeysStartWithA();
if (encode_as_internal) {
options.table_stats_collectors = {
std::make_shared<UserKeyTableStatsCollector>(collector)
};
} else {
options.table_stats_collectors.resize(1);
options.table_stats_collectors[0].reset(collector);
}
std::unique_ptr<TableBuilder> builder;
std::unique_ptr<FakeWritableFile> writable;
MakeBuilder(options, &writable, &builder);
for (const auto& kv : kvs) {
if (encode_as_internal) {
InternalKey ikey(kv.first, 0, ValueType::kTypeValue);
builder->Add(ikey.Encode(), kv.second);
} else {
builder->Add(kv.first, kv.second);
}
}
ASSERT_OK(builder->Finish());
// -- Step 2: Open table
std::unique_ptr<Table> table;
OpenTable(options, writable->contents(), &table);
const auto& stats = table->GetTableStats().user_collected_stats;
ASSERT_EQ("Rocksdb", stats.at("TableStatsTest"));
uint32_t starts_with_A = 0;
Slice key(stats.at("Count"));
ASSERT_TRUE(GetVarint32(&key, &starts_with_A));
ASSERT_EQ(3u, starts_with_A);
}
}
TEST(TableStatsTest, InternalKeyStatsCollector) {
InternalKey keys[] = {
InternalKey("A", 0, ValueType::kTypeValue),
InternalKey("B", 0, ValueType::kTypeValue),
InternalKey("C", 0, ValueType::kTypeValue),
InternalKey("W", 0, ValueType::kTypeDeletion),
InternalKey("X", 0, ValueType::kTypeDeletion),
InternalKey("Y", 0, ValueType::kTypeDeletion),
InternalKey("Z", 0, ValueType::kTypeDeletion),
};
for (bool sanitized : { false, true }) {
std::unique_ptr<TableBuilder> builder;
std::unique_ptr<FakeWritableFile> writable;
Options options;
if (sanitized) {
options.table_stats_collectors = {
std::make_shared<RegularKeysStartWithA>()
};
// with sanitization, even regular stats collector will be able to
// handle internal keys.
auto comparator = options.comparator;
// HACK: Set options.info_log to avoid writing log in
// SanitizeOptions().
options.info_log = std::make_shared<DumbLogger>();
options = SanitizeOptions(
"db", // just a place holder
nullptr, // with skip internal key comparator
nullptr, // don't care filter policy
options
);
options.comparator = comparator;
} else {
options.table_stats_collectors = {
std::make_shared<InternalKeyStatsCollector>()
};
}
MakeBuilder(options, &writable, &builder);
for (const auto& k : keys) {
builder->Add(k.Encode(), "val");
}
ASSERT_OK(builder->Finish());
std::unique_ptr<Table> table;
OpenTable(options, writable->contents(), &table);
const auto& stats = table->GetTableStats().user_collected_stats;
uint64_t deleted = 0;
Slice key(stats.at(InternalKeyTableStatsNames::kDeletedKeys));
ASSERT_TRUE(GetVarint64(&key, &deleted));
ASSERT_EQ(4u, deleted);
if (sanitized) {
uint32_t starts_with_A = 0;
Slice key(stats.at("Count"));
ASSERT_TRUE(GetVarint32(&key, &starts_with_A));
ASSERT_EQ(1u, starts_with_A);
}
}
}
} // namespace rocksdb
int main(int argc, char** argv) {
return rocksdb::test::RunAllTests();
}

@ -10,11 +10,13 @@
#include <memory>
#include <vector>
#include <stdint.h>
#include "rocksdb/memtablerep.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table_stats.h"
#include "rocksdb/universal_compaction.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/slice_transform.h"
namespace rocksdb {
@ -586,6 +588,12 @@ struct Options {
// to data file.
// Default: true
bool purge_log_after_memtable_flush;
// This option allows user to to collect their own interested statistics of
// the tables.
// Default: emtpy vector -- no user-defined statistics collection will be
// performed.
std::vector<std::shared_ptr<TableStatsCollector>> table_stats_collectors;
};
//

@ -6,14 +6,14 @@
#include <string>
#include <unordered_map>
#include "rocksdb/status.h"
namespace rocksdb {
// TableStats contains a bunch of read-only stats of its associated
// table.
struct TableStats {
public:
// TODO(kailiu) we do not support user collected stats yet.
//
// Other than basic table stats, each table may also have the user
// collected stats.
// The value of the user-collected stats are encoded as raw bytes --
@ -43,4 +43,24 @@ struct TableStats {
UserCollectedStats user_collected_stats;
};
// `TableStatsCollector` provides the mechanism for users to collect their own
// interested stats. This class is essentially a collection of callback
// functions that will be invoked during table building.
class TableStatsCollector {
public:
virtual ~TableStatsCollector() { }
// Add() will be called when a new key/value pair is inserted into the table.
// @params key the original key that is inserted into the table.
// @params value the original value that is inserted into the table.
virtual Status Add(const Slice& key, const Slice& value) = 0;
// Finish() will be called when a table has already been built and is ready
// for writing the stats block.
// @params stats User will add their collected statistics to `stats`.
virtual Status Finish(TableStats::UserCollectedStats* stats) = 0;
// The name of the stats collector can be used for debugging purpose.
virtual const char* Name() const = 0;
};
} // namespace rocksdb

@ -57,6 +57,18 @@ static bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
return compressed_size < raw_size - (raw_size / 8u);
}
// Were we encounter any error occurs during user-defined statistics collection,
// we'll write the warning message to info log.
void LogStatsCollectionError(
Logger* info_log, const std::string& method, const std::string& name) {
assert(method == "Add" || method == "Finish");
std::string msg =
"[Warning] encountered error when calling TableStatsCollector::" +
method + "() with collector name: " + name;
Log(info_log, msg.c_str());
}
} // anonymous namespace
struct TableBuilder::Rep {
@ -179,6 +191,17 @@ void TableBuilder::Add(const Slice& key, const Slice& value) {
r->num_entries++;
r->raw_key_size += key.size();
r->raw_value_size += value.size();
for (auto collector : r->options.table_stats_collectors) {
Status s = collector->Add(key, value);
if (!s.ok()) {
LogStatsCollectionError(
r->options.info_log.get(),
"Add", /* method */
collector->Name()
);
}
}
}
void TableBuilder::Flush() {
@ -383,6 +406,25 @@ Status TableBuilder::Finish() {
));
}
for (auto collector : r->options.table_stats_collectors) {
TableStats::UserCollectedStats user_collected_stats;
Status s =
collector->Finish(&user_collected_stats);
if (!s.ok()) {
LogStatsCollectionError(
r->options.info_log.get(),
"Finish", /* method */
collector->Name()
);
} else {
stats.insert(
user_collected_stats.begin(),
user_collected_stats.end()
);
}
}
for (const auto& stat : stats) {
stats_block.Add(stat.first, stat.second);
}

@ -264,6 +264,13 @@ Options::Dump(Logger* log) const
compaction_options_universal.max_size_amplification_percent);
Log(log," Options.purge_log_after_memtable_flush: %d",
purge_log_after_memtable_flush);
std::string collector_names;
for (auto collector : table_stats_collectors) {
collector_names.append(collector->Name());
collector_names.append("; ");
}
Log(log," Options.table_stats_collectors: %s",
collector_names.c_str());
} // Options::Dump
//

Loading…
Cancel
Save