From 994575c134ef07a1093f21e2c8ccd9b486e92c59 Mon Sep 17 00:00:00 2001 From: Kai Liu Date: Wed, 16 Oct 2013 11:50:50 -0700 Subject: [PATCH] Support user-defined table stats collector Summary: 1. Added a new option that support user-defined table stats collection. 2. Added a deleted key stats collector in `utilities` Test Plan: Added a unit test for newly added code. Also ran make check to make sure other tests are not broken. Reviewers: dhruba, haobo Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D13491 --- Makefile | 4 + db/db_impl.cc | 30 +++- db/table_stats_collector.cc | 55 +++++++ db/table_stats_collector.h | 58 +++++++ db/table_stats_collector_test.cc | 259 +++++++++++++++++++++++++++++++ include/rocksdb/options.h | 12 +- include/rocksdb/table_stats.h | 24 ++- table/table_builder.cc | 42 +++++ util/options.cc | 7 + 9 files changed, 481 insertions(+), 10 deletions(-) create mode 100644 db/table_stats_collector.cc create mode 100644 db/table_stats_collector.h create mode 100644 db/table_stats_collector_test.cc diff --git a/Makefile b/Makefile index bc72d6ae9..31720522f 100644 --- a/Makefile +++ b/Makefile @@ -36,6 +36,7 @@ VALGRIND_VER := $(join $(VALGRIND_VER),valgrind) VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full TESTS = \ + table_stats_collector_test \ arena_test \ auto_roll_logger_test \ block_test \ @@ -199,6 +200,9 @@ signal_test: util/signal_test.o $(LIBOBJECTS) arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +table_stats_collector_test: db/table_stats_collector_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) db/table_stats_collector_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) diff --git a/db/db_impl.cc b/db/db_impl.cc index b8fec9e69..fbbf650d8 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -13,15 +13,15 @@ #include #include #include -#include -#include #include -#include +#include +#include #include +#include #include "db/builder.h" -#include "db/db_iter.h" #include "db/dbformat.h" +#include "db/db_iter.h" #include "db/filename.h" #include "db/log_reader.h" #include "db/log_writer.h" @@ -30,9 +30,11 @@ #include "db/merge_helper.h" #include "db/prefix_filter_iterator.h" #include "db/table_cache.h" +#include "db/table_stats_collector.h" +#include "db/transaction_log_impl.h" #include "db/version_set.h" #include "db/write_batch_internal.h" -#include "db/transaction_log_impl.h" +#include "port/port.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -40,7 +42,6 @@ #include "rocksdb/statistics.h" #include "rocksdb/status.h" #include "rocksdb/table_builder.h" -#include "port/port.h" #include "table/block.h" #include "table/merger.h" #include "table/table.h" @@ -190,6 +191,23 @@ Options SanitizeOptions(const std::string& dbname, // Use dbname as default result.wal_dir = dbname; } + + // -- Sanitize the table stats collector + // All user defined stats collectors will be wrapped by + // UserKeyTableStatsCollector since for them they only have the knowledge of + // the user keys; internal keys are invisible to them. + auto& collectors = result.table_stats_collectors; + for (size_t i = 0; i < result.table_stats_collectors.size(); ++i) { + assert(collectors[i]); + collectors[i] = + std::make_shared(collectors[i]); + } + + // Add collector to collect internal key statistics + collectors.push_back( + std::make_shared() + ); + return result; } diff --git a/db/table_stats_collector.cc b/db/table_stats_collector.cc new file mode 100644 index 000000000..91e89ce4c --- /dev/null +++ b/db/table_stats_collector.cc @@ -0,0 +1,55 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/table_stats_collector.h" + +#include "db/dbformat.h" +#include "util/coding.h" + +namespace rocksdb { + +Status InternalKeyStatsCollector::Add(const Slice& key, const Slice& value) { + ParsedInternalKey ikey; + if (!ParseInternalKey(key, &ikey)) { + return Status::InvalidArgument("Invalid internal key"); + } + + if (ikey.type == ValueType::kTypeDeletion) { + ++deleted_keys_; + } + + return Status::OK(); +} + +Status InternalKeyStatsCollector::Finish( + TableStats::UserCollectedStats* stats) { + assert(stats); + assert(stats->find(InternalKeyTableStatsNames::kDeletedKeys) == stats->end()); + std::string val; + + PutVarint64(&val, deleted_keys_); + stats->insert(std::make_pair(InternalKeyTableStatsNames::kDeletedKeys, val)); + + return Status::OK(); +} + +Status UserKeyTableStatsCollector::Add(const Slice& key, const Slice& value) { + ParsedInternalKey ikey; + if (!ParseInternalKey(key, &ikey)) { + return Status::InvalidArgument("Invalid internal key"); + } + + return collector_->Add(ikey.user_key, value); +} + +Status UserKeyTableStatsCollector::Finish( + TableStats::UserCollectedStats* stats) { + return collector_->Finish(stats); +} + +const std::string InternalKeyTableStatsNames::kDeletedKeys + = "rocksdb.deleted.keys"; + +} // namespace rocksdb diff --git a/db/table_stats_collector.h b/db/table_stats_collector.h new file mode 100644 index 000000000..e4e3685f7 --- /dev/null +++ b/db/table_stats_collector.h @@ -0,0 +1,58 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file defines a collection of statistics collectors. +#pragma once + +#include "rocksdb/table_stats.h" + +#include +#include +#include + +namespace rocksdb { + +struct InternalKeyTableStatsNames { + static const std::string kDeletedKeys; +}; + +// Collecting the statistics for internal keys. Visible only by internal +// rocksdb modules. +class InternalKeyStatsCollector : public TableStatsCollector { + public: + virtual Status Add(const Slice& key, const Slice& value); + virtual Status Finish(TableStats::UserCollectedStats* stats); + virtual const char* Name() const { return "InternalKeyStatsCollector"; } + + private: + uint64_t deleted_keys_ = 0; +}; + +// When rocksdb creates a new table, it will encode all "user keys" into +// "internal keys", which contains meta information of a given entry. +// +// This class extracts user key from the encoded internal key when Add() is +// invoked. +class UserKeyTableStatsCollector : public TableStatsCollector { + public: + explicit UserKeyTableStatsCollector(TableStatsCollector* collector): + UserKeyTableStatsCollector( + std::shared_ptr(collector) + ) { + } + + explicit UserKeyTableStatsCollector( + std::shared_ptr collector) : collector_(collector) { + } + virtual ~UserKeyTableStatsCollector() { } + virtual Status Add(const Slice& key, const Slice& value); + virtual Status Finish(TableStats::UserCollectedStats* stats); + virtual const char* Name() const { return collector_->Name(); } + + protected: + std::shared_ptr collector_; +}; + +} // namespace rocksdb diff --git a/db/table_stats_collector_test.cc b/db/table_stats_collector_test.cc new file mode 100644 index 000000000..586b65d40 --- /dev/null +++ b/db/table_stats_collector_test.cc @@ -0,0 +1,259 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include +#include + +#include "db/dbformat.h" +#include "db/db_impl.h" +#include "db/table_stats_collector.h" +#include "rocksdb/table_builder.h" +#include "rocksdb/table_stats.h" +#include "table/table.h" +#include "util/coding.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace rocksdb { + +class TableStatsTest { + private: + unique_ptr table_; +}; + +// TODO(kailiu) the following classes should be moved to some more general +// places, so that other tests can also make use of them. +// `FakeWritableFile` and `FakeRandomeAccessFile` bypass the real file system +// and therefore enable us to quickly setup the tests. +class FakeWritableFile : public WritableFile { + public: + ~FakeWritableFile() { } + + const std::string& contents() const { return contents_; } + + virtual Status Close() { return Status::OK(); } + virtual Status Flush() { return Status::OK(); } + virtual Status Sync() { return Status::OK(); } + + virtual Status Append(const Slice& data) { + contents_.append(data.data(), data.size()); + return Status::OK(); + } + + private: + std::string contents_; +}; + + +class FakeRandomeAccessFile : public RandomAccessFile { + public: + explicit FakeRandomeAccessFile(const Slice& contents) + : contents_(contents.data(), contents.size()) { + } + + virtual ~FakeRandomeAccessFile() { } + + uint64_t Size() const { return contents_.size(); } + + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + if (offset > contents_.size()) { + return Status::InvalidArgument("invalid Read offset"); + } + if (offset + n > contents_.size()) { + n = contents_.size() - offset; + } + memcpy(scratch, &contents_[offset], n); + *result = Slice(scratch, n); + return Status::OK(); + } + + private: + std::string contents_; +}; + + +class DumbLogger : public Logger { + public: + virtual void Logv(const char* format, va_list ap) { } + virtual size_t GetLogFileSize() const { return 0; } +}; + +// Utilities test functions +void MakeBuilder( + const Options& options, + std::unique_ptr* writable, + std::unique_ptr* builder) { + writable->reset(new FakeWritableFile); + builder->reset( + new TableBuilder(options, writable->get()) + ); +} + +void OpenTable( + const Options& options, + const std::string& contents, + std::unique_ptr
* table) { + std::unique_ptr file(new FakeRandomeAccessFile(contents)); + auto s = Table::Open( + options, + EnvOptions(), + std::move(file), + contents.size(), + table + ); + ASSERT_OK(s); +} + +// Collects keys that starts with "A" in a table. +class RegularKeysStartWithA: public TableStatsCollector { + public: + const char* Name() const { return "RegularKeysStartWithA"; } + + Status Finish(TableStats::UserCollectedStats* stats) { + std::string encoded; + PutVarint32(&encoded, count_); + *stats = TableStats::UserCollectedStats { + { "TableStatsTest", "Rocksdb" }, + { "Count", encoded } + }; + return Status::OK(); + } + + Status Add(const Slice& user_key, const Slice& value) { + // simply asssume all user keys are not empty. + if (user_key.data()[0] == 'A') { + ++count_; + } + return Status::OK(); + } + + private: + uint32_t count_ = 0; +}; + +TEST(TableStatsTest, CustomizedTableStatsCollector) { + Options options; + + // make sure the entries will be inserted with order. + std::map kvs = { + {"About", "val5"}, // starts with 'A' + {"Abstract", "val2"}, // starts with 'A' + {"Around", "val7"}, // starts with 'A' + {"Beyond", "val3"}, + {"Builder", "val1"}, + {"Cancel", "val4"}, + {"Find", "val6"}, + }; + + // Test stats collectors with internal keys or regular keys + for (bool encode_as_internal : { true, false }) { + // -- Step 1: build table + auto collector = new RegularKeysStartWithA(); + if (encode_as_internal) { + options.table_stats_collectors = { + std::make_shared(collector) + }; + } else { + options.table_stats_collectors.resize(1); + options.table_stats_collectors[0].reset(collector); + } + std::unique_ptr builder; + std::unique_ptr writable; + MakeBuilder(options, &writable, &builder); + + for (const auto& kv : kvs) { + if (encode_as_internal) { + InternalKey ikey(kv.first, 0, ValueType::kTypeValue); + builder->Add(ikey.Encode(), kv.second); + } else { + builder->Add(kv.first, kv.second); + } + } + ASSERT_OK(builder->Finish()); + + // -- Step 2: Open table + std::unique_ptr
table; + OpenTable(options, writable->contents(), &table); + const auto& stats = table->GetTableStats().user_collected_stats; + + ASSERT_EQ("Rocksdb", stats.at("TableStatsTest")); + + uint32_t starts_with_A = 0; + Slice key(stats.at("Count")); + ASSERT_TRUE(GetVarint32(&key, &starts_with_A)); + ASSERT_EQ(3u, starts_with_A); + } +} + +TEST(TableStatsTest, InternalKeyStatsCollector) { + InternalKey keys[] = { + InternalKey("A", 0, ValueType::kTypeValue), + InternalKey("B", 0, ValueType::kTypeValue), + InternalKey("C", 0, ValueType::kTypeValue), + InternalKey("W", 0, ValueType::kTypeDeletion), + InternalKey("X", 0, ValueType::kTypeDeletion), + InternalKey("Y", 0, ValueType::kTypeDeletion), + InternalKey("Z", 0, ValueType::kTypeDeletion), + }; + + for (bool sanitized : { false, true }) { + std::unique_ptr builder; + std::unique_ptr writable; + Options options; + if (sanitized) { + options.table_stats_collectors = { + std::make_shared() + }; + // with sanitization, even regular stats collector will be able to + // handle internal keys. + auto comparator = options.comparator; + // HACK: Set options.info_log to avoid writing log in + // SanitizeOptions(). + options.info_log = std::make_shared(); + options = SanitizeOptions( + "db", // just a place holder + nullptr, // with skip internal key comparator + nullptr, // don't care filter policy + options + ); + options.comparator = comparator; + } else { + options.table_stats_collectors = { + std::make_shared() + }; + } + + MakeBuilder(options, &writable, &builder); + for (const auto& k : keys) { + builder->Add(k.Encode(), "val"); + } + + ASSERT_OK(builder->Finish()); + + std::unique_ptr
table; + OpenTable(options, writable->contents(), &table); + const auto& stats = table->GetTableStats().user_collected_stats; + + uint64_t deleted = 0; + Slice key(stats.at(InternalKeyTableStatsNames::kDeletedKeys)); + ASSERT_TRUE(GetVarint64(&key, &deleted)); + ASSERT_EQ(4u, deleted); + + if (sanitized) { + uint32_t starts_with_A = 0; + Slice key(stats.at("Count")); + ASSERT_TRUE(GetVarint32(&key, &starts_with_A)); + ASSERT_EQ(1u, starts_with_A); + } + } +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + return rocksdb::test::RunAllTests(); +} diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 9c9fee4fa..79abacb78 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -10,11 +10,13 @@ #include #include #include + +#include "rocksdb/memtablerep.h" #include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" #include "rocksdb/statistics.h" +#include "rocksdb/table_stats.h" #include "rocksdb/universal_compaction.h" -#include "rocksdb/memtablerep.h" -#include "rocksdb/slice_transform.h" namespace rocksdb { @@ -586,6 +588,12 @@ struct Options { // to data file. // Default: true bool purge_log_after_memtable_flush; + + // This option allows user to to collect their own interested statistics of + // the tables. + // Default: emtpy vector -- no user-defined statistics collection will be + // performed. + std::vector> table_stats_collectors; }; // diff --git a/include/rocksdb/table_stats.h b/include/rocksdb/table_stats.h index 811be107e..4c3437aea 100644 --- a/include/rocksdb/table_stats.h +++ b/include/rocksdb/table_stats.h @@ -6,14 +6,14 @@ #include #include +#include "rocksdb/status.h" + namespace rocksdb { // TableStats contains a bunch of read-only stats of its associated // table. struct TableStats { public: - // TODO(kailiu) we do not support user collected stats yet. - // // Other than basic table stats, each table may also have the user // collected stats. // The value of the user-collected stats are encoded as raw bytes -- @@ -43,4 +43,24 @@ struct TableStats { UserCollectedStats user_collected_stats; }; +// `TableStatsCollector` provides the mechanism for users to collect their own +// interested stats. This class is essentially a collection of callback +// functions that will be invoked during table building. +class TableStatsCollector { + public: + virtual ~TableStatsCollector() { } + // Add() will be called when a new key/value pair is inserted into the table. + // @params key the original key that is inserted into the table. + // @params value the original value that is inserted into the table. + virtual Status Add(const Slice& key, const Slice& value) = 0; + + // Finish() will be called when a table has already been built and is ready + // for writing the stats block. + // @params stats User will add their collected statistics to `stats`. + virtual Status Finish(TableStats::UserCollectedStats* stats) = 0; + + // The name of the stats collector can be used for debugging purpose. + virtual const char* Name() const = 0; +}; + } // namespace rocksdb diff --git a/table/table_builder.cc b/table/table_builder.cc index 8569b9402..a9dd21ff6 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -57,6 +57,18 @@ static bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { return compressed_size < raw_size - (raw_size / 8u); } +// Were we encounter any error occurs during user-defined statistics collection, +// we'll write the warning message to info log. +void LogStatsCollectionError( + Logger* info_log, const std::string& method, const std::string& name) { + assert(method == "Add" || method == "Finish"); + + std::string msg = + "[Warning] encountered error when calling TableStatsCollector::" + + method + "() with collector name: " + name; + Log(info_log, msg.c_str()); +} + } // anonymous namespace struct TableBuilder::Rep { @@ -179,6 +191,17 @@ void TableBuilder::Add(const Slice& key, const Slice& value) { r->num_entries++; r->raw_key_size += key.size(); r->raw_value_size += value.size(); + + for (auto collector : r->options.table_stats_collectors) { + Status s = collector->Add(key, value); + if (!s.ok()) { + LogStatsCollectionError( + r->options.info_log.get(), + "Add", /* method */ + collector->Name() + ); + } + } } void TableBuilder::Flush() { @@ -383,6 +406,25 @@ Status TableBuilder::Finish() { )); } + for (auto collector : r->options.table_stats_collectors) { + TableStats::UserCollectedStats user_collected_stats; + Status s = + collector->Finish(&user_collected_stats); + + if (!s.ok()) { + LogStatsCollectionError( + r->options.info_log.get(), + "Finish", /* method */ + collector->Name() + ); + } else { + stats.insert( + user_collected_stats.begin(), + user_collected_stats.end() + ); + } + } + for (const auto& stat : stats) { stats_block.Add(stat.first, stat.second); } diff --git a/util/options.cc b/util/options.cc index 1e0e03700..795b08d77 100644 --- a/util/options.cc +++ b/util/options.cc @@ -264,6 +264,13 @@ Options::Dump(Logger* log) const compaction_options_universal.max_size_amplification_percent); Log(log," Options.purge_log_after_memtable_flush: %d", purge_log_after_memtable_flush); + std::string collector_names; + for (auto collector : table_stats_collectors) { + collector_names.append(collector->Name()); + collector_names.append("; "); + } + Log(log," Options.table_stats_collectors: %s", + collector_names.c_str()); } // Options::Dump //