From f8c218930765d0e7622aa0cec0caa5bb2ff6e336 Mon Sep 17 00:00:00 2001 From: Praveen Rao Date: Fri, 18 Mar 2016 12:32:15 -0700 Subject: [PATCH] Publish log numbers for column family to wal_filter, and provide log number in the record callback --- db/db_impl.cc | 21 ++++- db/db_test.cc | 166 +++++++++++++++++++++++++++++++++++ include/rocksdb/options.h | 2 +- include/rocksdb/wal_filter.h | 32 ++++++- 4 files changed, 217 insertions(+), 4 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 77dc9457d..fa3713f44 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1087,6 +1087,23 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, stream.EndArray(); } +#ifndef ROCKSDB_LITE + if (db_options_.wal_filter != nullptr) { + std::map cf_name_id_map; + std::map cf_lognumber_map; + for (auto cfd : *versions_->GetColumnFamilySet()) { + cf_name_id_map.insert( + std::make_pair(cfd->GetName(), cfd->GetID())); + cf_lognumber_map.insert( + std::make_pair(cfd->GetID(), cfd->GetLogNumber())); + } + + db_options_.wal_filter->ColumnFamilyLogNumberMap( + cf_lognumber_map, + cf_name_id_map); + } +#endif + bool continue_replay_log = true; for (auto log_number : log_numbers) { // The previous incarnation may not have written any MANIFEST @@ -1166,8 +1183,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, bool batch_changed = false; WalFilter::WalProcessingOption wal_processing_option = - db_options_.wal_filter->LogRecord(batch, &new_batch, - &batch_changed); + db_options_.wal_filter->LogRecord(log_number, fname, batch, + &new_batch, &batch_changed); switch (wal_processing_option) { case WalFilter::WalProcessingOption::kContinueProcessing: diff --git a/db/db_test.cc b/db/db_test.cc index 34b94cfdf..35bea7345 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11055,6 +11055,172 @@ TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) { ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); } +TEST_F(DBTest, WalFilterTestWithColumnFamilies) { + class TestWalFilterWithColumnFamilies : public WalFilter { + private: + // column_family_id -> log_number map (provided to WALFilter) + std::map cf_log_number_map_; + // column_family_name -> column_family_id map (provided to WALFilter) + std::map cf_name_id_map_; + // column_family_name -> keys_found_in_wal map + // We store keys that are applicable to the column_family + // during recovery (i.e. aren't already flushed to SST file(s)) + // for verification against the keys we expect. + std::map> cf_wal_keys_; + public: + virtual void ColumnFamilyLogNumberMap( + const std::map& cf_lognumber_map, + const std::map& cf_name_id_map) override { + cf_log_number_map_ = cf_lognumber_map; + cf_name_id_map_ = cf_name_id_map; + } + + virtual WalProcessingOption LogRecord(unsigned long long log_number, + const std::string& log_file_name, + const WriteBatch& batch, + WriteBatch* new_batch, + bool* batch_changed) override { + class LogRecordBatchHandler : public WriteBatch::Handler { + private: + std::map> & cf_wal_keys_; + const std::map & cf_log_number_map_; + unsigned long long log_number_; + public: + LogRecordBatchHandler(unsigned long long log_number, + const std::map & cf_log_number_map, + std::map> & cf_wal_keys) : + cf_log_number_map_(cf_log_number_map), + cf_wal_keys_(cf_wal_keys), + log_number_(log_number){} + + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& /*value*/) override { + auto it = cf_log_number_map_.find(column_family_id); + assert(it != cf_log_number_map_.end()); + unsigned long long log_number_for_cf = it->second; + // If the current record is applicable for column_family_id + // (i.e. isn't flushed to SST file(s) for column_family_id) + // add it to the cf_wal_keys_ map for verification. + if (log_number_ >= log_number_for_cf) { + cf_wal_keys_[column_family_id].push_back(std::string(key.data(), key.size())); + } + return Status::OK(); + } + } handler(log_number, cf_log_number_map_, cf_wal_keys_); + + batch.Iterate(&handler); + + return WalProcessingOption::kContinueProcessing; + } + + virtual const char* Name() const override { + return "WalFilterTestWithColumnFamilies"; + } + + const std::map> & GetColumnFamilyKeys() { + return cf_wal_keys_; + } + + const std::map & GetColumnFamilyNameIdMap() { + return cf_name_id_map_; + } + }; + + std::vector> batch_keys_pre_flush(3); + + batch_keys_pre_flush[0].push_back("key1"); + batch_keys_pre_flush[0].push_back("key2"); + batch_keys_pre_flush[1].push_back("key3"); + batch_keys_pre_flush[1].push_back("key4"); + batch_keys_pre_flush[2].push_back("key5"); + batch_keys_pre_flush[2].push_back("key6"); + + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({ "pikachu" }, options); + + // Write given keys in given batches + for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) { + WriteBatch batch; + for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) { + batch.Put(handles_[0], batch_keys_pre_flush[i][j], DummyString(1024)); + batch.Put(handles_[1], batch_keys_pre_flush[i][j], DummyString(1024)); + } + dbfull()->Write(WriteOptions(), &batch); + } + + //Flush default column-family + db_->Flush(FlushOptions(), handles_[0]); + + // Do some more writes + std::vector> batch_keys_post_flush(3); + + batch_keys_post_flush[0].push_back("key7"); + batch_keys_post_flush[0].push_back("key8"); + batch_keys_post_flush[1].push_back("key9"); + batch_keys_post_flush[1].push_back("key10"); + batch_keys_post_flush[2].push_back("key11"); + batch_keys_post_flush[2].push_back("key12"); + + // Write given keys in given batches + for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { + WriteBatch batch; + for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { + batch.Put(handles_[0], batch_keys_post_flush[i][j], DummyString(1024)); + batch.Put(handles_[1], batch_keys_post_flush[i][j], DummyString(1024)); + } + dbfull()->Write(WriteOptions(), &batch); + } + + // On Recovery we should only find the second batch applicable to default CF + // But both batches applicable to pikachu CF + + // Create a test filter that would add extra keys + TestWalFilterWithColumnFamilies test_wal_filter_column_families; + + // Reopen database with option to use WAL filter + options = OptionsForLogIterTest(); + options.wal_filter = &test_wal_filter_column_families; + Status status = + TryReopenWithColumnFamilies({ "default", "pikachu" }, options); + ASSERT_TRUE(status.ok()); + + // verify that handles_[0] only has post_flush keys + // while handles_[1] has pre and post flush keys + auto cf_wal_keys = test_wal_filter_column_families.GetColumnFamilyKeys(); + auto name_id_map = test_wal_filter_column_families.GetColumnFamilyNameIdMap(); + size_t index = 0; + auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]]; + //default column-family, only post_flush keys are expected + for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { + for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { + Slice key_from_the_log(keys_cf[index++]); + Slice batch_key(batch_keys_post_flush[i][j]); + ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); + } + } + ASSERT_TRUE(index == keys_cf.size()); + + index = 0; + keys_cf = cf_wal_keys[name_id_map["pikachu"]]; + //pikachu column-family, all keys are expected + for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) { + for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) { + Slice key_from_the_log(keys_cf[index++]); + Slice batch_key(batch_keys_pre_flush[i][j]); + ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); + } + } + + for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { + for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { + Slice key_from_the_log(keys_cf[index++]); + Slice batch_key(batch_keys_post_flush[i][j]); + ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); + } + } + ASSERT_TRUE(index == keys_cf.size()); +} #endif // ROCKSDB_LITE #ifndef ROCKSDB_LITE diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index e7064b3cb..5a18027e1 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1237,7 +1237,7 @@ struct DBOptions { // records, ignoring a particular record or skipping replay. // The filter is invoked at startup and is invoked from a single-thread // currently. - const WalFilter* wal_filter; + WalFilter* wal_filter; #endif // ROCKSDB_LITE // If true, then DB::Open / CreateColumnFamily / DropColumnFamily diff --git a/include/rocksdb/wal_filter.h b/include/rocksdb/wal_filter.h index 226d6971c..c164e4d47 100644 --- a/include/rocksdb/wal_filter.h +++ b/include/rocksdb/wal_filter.h @@ -4,6 +4,8 @@ // of patent rights can be found in the PATENTS file in the same directory. #pragma once +#include +#include namespace rocksdb { @@ -30,6 +32,20 @@ class WalFilter { virtual ~WalFilter() {} + // Provide ColumnFamily->LogNumber map to filter + // so that filter can determine whether given record + // is applicable to given column family. + // We also pass in name->id map as this is known at + // recovery time and write batch callbacks happen + // in terms of column family id. + // + // @params cf_lognumber_map column_family_id to lognumber map + // @params cf_name_id_map column_family_name to column_family_id map + + virtual void ColumnFamilyLogNumberMap( + const std::map& cf_lognumber_map, + const std::map& cf_name_id_map) {} + // LogRecord is invoked for each log record encountered for all the logs // during replay on logs on recovery. This method can be used to: // * inspect the record (using the batch parameter) @@ -41,6 +57,10 @@ class WalFilter { // (by returning kStop replay) - please note that this implies // discarding the logs from current record onwards. // + // @params log_number log_number of the current log. + // Filter might use this to determine if the log record + // is applicable to a certain column family. + // @params log_file_name log file name - only for informational purposes // @params batch batch encountered in the log during recovery // @params new_batch new_batch to populate if filter wants to change // the batch (for example to filter some records out, @@ -54,9 +74,19 @@ class WalFilter { // @returns Processing option for the current record. // Please see WalProcessingOption enum above for // details. + virtual WalProcessingOption LogRecord(unsigned long long log_number, + const std::string& log_file_name, + const WriteBatch& batch, + WriteBatch* new_batch, + bool* batch_changed) { + return LogRecord(batch, new_batch, batch_changed); + } + virtual WalProcessingOption LogRecord(const WriteBatch& batch, WriteBatch* new_batch, - bool* batch_changed) const = 0; + bool* batch_changed) const { + return WalProcessingOption::kContinueProcessing; + } // Returns a name that identifies this WAL filter. // The name will be printed to LOG file on start up for diagnosis.