From f8c218930765d0e7622aa0cec0caa5bb2ff6e336 Mon Sep 17 00:00:00 2001 From: Praveen Rao Date: Fri, 18 Mar 2016 12:32:15 -0700 Subject: [PATCH 1/3] Publish log numbers for column family to wal_filter, and provide log number in the record callback --- db/db_impl.cc | 21 ++++- db/db_test.cc | 166 +++++++++++++++++++++++++++++++++++ include/rocksdb/options.h | 2 +- include/rocksdb/wal_filter.h | 32 ++++++- 4 files changed, 217 insertions(+), 4 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 77dc9457d..fa3713f44 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1087,6 +1087,23 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, stream.EndArray(); } +#ifndef ROCKSDB_LITE + if (db_options_.wal_filter != nullptr) { + std::map cf_name_id_map; + std::map cf_lognumber_map; + for (auto cfd : *versions_->GetColumnFamilySet()) { + cf_name_id_map.insert( + std::make_pair(cfd->GetName(), cfd->GetID())); + cf_lognumber_map.insert( + std::make_pair(cfd->GetID(), cfd->GetLogNumber())); + } + + db_options_.wal_filter->ColumnFamilyLogNumberMap( + cf_lognumber_map, + cf_name_id_map); + } +#endif + bool continue_replay_log = true; for (auto log_number : log_numbers) { // The previous incarnation may not have written any MANIFEST @@ -1166,8 +1183,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, bool batch_changed = false; WalFilter::WalProcessingOption wal_processing_option = - db_options_.wal_filter->LogRecord(batch, &new_batch, - &batch_changed); + db_options_.wal_filter->LogRecord(log_number, fname, batch, + &new_batch, &batch_changed); switch (wal_processing_option) { case WalFilter::WalProcessingOption::kContinueProcessing: diff --git a/db/db_test.cc b/db/db_test.cc index 34b94cfdf..35bea7345 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11055,6 +11055,172 @@ TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) { ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); } +TEST_F(DBTest, WalFilterTestWithColumnFamilies) { + class TestWalFilterWithColumnFamilies : public WalFilter { + private: + // column_family_id -> log_number map (provided to WALFilter) + std::map cf_log_number_map_; + // column_family_name -> column_family_id map (provided to WALFilter) + std::map cf_name_id_map_; + // column_family_name -> keys_found_in_wal map + // We store keys that are applicable to the column_family + // during recovery (i.e. aren't already flushed to SST file(s)) + // for verification against the keys we expect. + std::map> cf_wal_keys_; + public: + virtual void ColumnFamilyLogNumberMap( + const std::map& cf_lognumber_map, + const std::map& cf_name_id_map) override { + cf_log_number_map_ = cf_lognumber_map; + cf_name_id_map_ = cf_name_id_map; + } + + virtual WalProcessingOption LogRecord(unsigned long long log_number, + const std::string& log_file_name, + const WriteBatch& batch, + WriteBatch* new_batch, + bool* batch_changed) override { + class LogRecordBatchHandler : public WriteBatch::Handler { + private: + std::map> & cf_wal_keys_; + const std::map & cf_log_number_map_; + unsigned long long log_number_; + public: + LogRecordBatchHandler(unsigned long long log_number, + const std::map & cf_log_number_map, + std::map> & cf_wal_keys) : + cf_log_number_map_(cf_log_number_map), + cf_wal_keys_(cf_wal_keys), + log_number_(log_number){} + + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& /*value*/) override { + auto it = cf_log_number_map_.find(column_family_id); + assert(it != cf_log_number_map_.end()); + unsigned long long log_number_for_cf = it->second; + // If the current record is applicable for column_family_id + // (i.e. isn't flushed to SST file(s) for column_family_id) + // add it to the cf_wal_keys_ map for verification. + if (log_number_ >= log_number_for_cf) { + cf_wal_keys_[column_family_id].push_back(std::string(key.data(), key.size())); + } + return Status::OK(); + } + } handler(log_number, cf_log_number_map_, cf_wal_keys_); + + batch.Iterate(&handler); + + return WalProcessingOption::kContinueProcessing; + } + + virtual const char* Name() const override { + return "WalFilterTestWithColumnFamilies"; + } + + const std::map> & GetColumnFamilyKeys() { + return cf_wal_keys_; + } + + const std::map & GetColumnFamilyNameIdMap() { + return cf_name_id_map_; + } + }; + + std::vector> batch_keys_pre_flush(3); + + batch_keys_pre_flush[0].push_back("key1"); + batch_keys_pre_flush[0].push_back("key2"); + batch_keys_pre_flush[1].push_back("key3"); + batch_keys_pre_flush[1].push_back("key4"); + batch_keys_pre_flush[2].push_back("key5"); + batch_keys_pre_flush[2].push_back("key6"); + + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({ "pikachu" }, options); + + // Write given keys in given batches + for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) { + WriteBatch batch; + for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) { + batch.Put(handles_[0], batch_keys_pre_flush[i][j], DummyString(1024)); + batch.Put(handles_[1], batch_keys_pre_flush[i][j], DummyString(1024)); + } + dbfull()->Write(WriteOptions(), &batch); + } + + //Flush default column-family + db_->Flush(FlushOptions(), handles_[0]); + + // Do some more writes + std::vector> batch_keys_post_flush(3); + + batch_keys_post_flush[0].push_back("key7"); + batch_keys_post_flush[0].push_back("key8"); + batch_keys_post_flush[1].push_back("key9"); + batch_keys_post_flush[1].push_back("key10"); + batch_keys_post_flush[2].push_back("key11"); + batch_keys_post_flush[2].push_back("key12"); + + // Write given keys in given batches + for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { + WriteBatch batch; + for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { + batch.Put(handles_[0], batch_keys_post_flush[i][j], DummyString(1024)); + batch.Put(handles_[1], batch_keys_post_flush[i][j], DummyString(1024)); + } + dbfull()->Write(WriteOptions(), &batch); + } + + // On Recovery we should only find the second batch applicable to default CF + // But both batches applicable to pikachu CF + + // Create a test filter that would add extra keys + TestWalFilterWithColumnFamilies test_wal_filter_column_families; + + // Reopen database with option to use WAL filter + options = OptionsForLogIterTest(); + options.wal_filter = &test_wal_filter_column_families; + Status status = + TryReopenWithColumnFamilies({ "default", "pikachu" }, options); + ASSERT_TRUE(status.ok()); + + // verify that handles_[0] only has post_flush keys + // while handles_[1] has pre and post flush keys + auto cf_wal_keys = test_wal_filter_column_families.GetColumnFamilyKeys(); + auto name_id_map = test_wal_filter_column_families.GetColumnFamilyNameIdMap(); + size_t index = 0; + auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]]; + //default column-family, only post_flush keys are expected + for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { + for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { + Slice key_from_the_log(keys_cf[index++]); + Slice batch_key(batch_keys_post_flush[i][j]); + ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); + } + } + ASSERT_TRUE(index == keys_cf.size()); + + index = 0; + keys_cf = cf_wal_keys[name_id_map["pikachu"]]; + //pikachu column-family, all keys are expected + for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) { + for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) { + Slice key_from_the_log(keys_cf[index++]); + Slice batch_key(batch_keys_pre_flush[i][j]); + ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); + } + } + + for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { + for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { + Slice key_from_the_log(keys_cf[index++]); + Slice batch_key(batch_keys_post_flush[i][j]); + ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); + } + } + ASSERT_TRUE(index == keys_cf.size()); +} #endif // ROCKSDB_LITE #ifndef ROCKSDB_LITE diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index e7064b3cb..5a18027e1 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1237,7 +1237,7 @@ struct DBOptions { // records, ignoring a particular record or skipping replay. // The filter is invoked at startup and is invoked from a single-thread // currently. - const WalFilter* wal_filter; + WalFilter* wal_filter; #endif // ROCKSDB_LITE // If true, then DB::Open / CreateColumnFamily / DropColumnFamily diff --git a/include/rocksdb/wal_filter.h b/include/rocksdb/wal_filter.h index 226d6971c..c164e4d47 100644 --- a/include/rocksdb/wal_filter.h +++ b/include/rocksdb/wal_filter.h @@ -4,6 +4,8 @@ // of patent rights can be found in the PATENTS file in the same directory. #pragma once +#include +#include namespace rocksdb { @@ -30,6 +32,20 @@ class WalFilter { virtual ~WalFilter() {} + // Provide ColumnFamily->LogNumber map to filter + // so that filter can determine whether given record + // is applicable to given column family. + // We also pass in name->id map as this is known at + // recovery time and write batch callbacks happen + // in terms of column family id. + // + // @params cf_lognumber_map column_family_id to lognumber map + // @params cf_name_id_map column_family_name to column_family_id map + + virtual void ColumnFamilyLogNumberMap( + const std::map& cf_lognumber_map, + const std::map& cf_name_id_map) {} + // LogRecord is invoked for each log record encountered for all the logs // during replay on logs on recovery. This method can be used to: // * inspect the record (using the batch parameter) @@ -41,6 +57,10 @@ class WalFilter { // (by returning kStop replay) - please note that this implies // discarding the logs from current record onwards. // + // @params log_number log_number of the current log. + // Filter might use this to determine if the log record + // is applicable to a certain column family. + // @params log_file_name log file name - only for informational purposes // @params batch batch encountered in the log during recovery // @params new_batch new_batch to populate if filter wants to change // the batch (for example to filter some records out, @@ -54,9 +74,19 @@ class WalFilter { // @returns Processing option for the current record. // Please see WalProcessingOption enum above for // details. + virtual WalProcessingOption LogRecord(unsigned long long log_number, + const std::string& log_file_name, + const WriteBatch& batch, + WriteBatch* new_batch, + bool* batch_changed) { + return LogRecord(batch, new_batch, batch_changed); + } + virtual WalProcessingOption LogRecord(const WriteBatch& batch, WriteBatch* new_batch, - bool* batch_changed) const = 0; + bool* batch_changed) const { + return WalProcessingOption::kContinueProcessing; + } // Returns a name that identifies this WAL filter. // The name will be printed to LOG file on start up for diagnosis. From 7d371863e59afffbc5b18690fc42f195fa32f609 Mon Sep 17 00:00:00 2001 From: Praveen Rao Date: Fri, 18 Mar 2016 14:43:22 -0700 Subject: [PATCH 2/3] travis build fixes --- db/db_test.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index bdb081f93..2a4839b94 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -10648,16 +10648,16 @@ TEST_F(DBTest, WalFilterTestWithColumnFamilies) { bool* batch_changed) override { class LogRecordBatchHandler : public WriteBatch::Handler { private: - std::map> & cf_wal_keys_; const std::map & cf_log_number_map_; + std::map> & cf_wal_keys_; unsigned long long log_number_; public: - LogRecordBatchHandler(unsigned long long log_number, + LogRecordBatchHandler(unsigned long long current_log_number, const std::map & cf_log_number_map, std::map> & cf_wal_keys) : cf_log_number_map_(cf_log_number_map), cf_wal_keys_(cf_wal_keys), - log_number_(log_number){} + log_number_(current_log_number){} virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& /*value*/) override { From 2dcbb3b4f386f7d2aacf468b878223249439fe9f Mon Sep 17 00:00:00 2001 From: Praveen Rao Date: Tue, 22 Mar 2016 12:07:15 -0700 Subject: [PATCH 3/3] Addressed review comments --- db/db_test.cc | 564 ---------------------------------- db/db_test2.cc | 575 +++++++++++++++++++++++++++++++++++ include/rocksdb/wal_filter.h | 18 +- 3 files changed, 586 insertions(+), 571 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index 2a4839b94..35c427b69 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -10225,570 +10225,6 @@ TEST_F(DBTest, PauseBackgroundWorkTest) { ASSERT_EQ(true, done.load()); } -#ifndef ROCKSDB_LITE -namespace { -void ValidateKeyExistence(DB* db, const std::vector& keys_must_exist, - const std::vector& keys_must_not_exist) { - // Ensure that expected keys exist - std::vector values; - if (keys_must_exist.size() > 0) { - std::vector status_list = - db->MultiGet(ReadOptions(), keys_must_exist, &values); - for (size_t i = 0; i < keys_must_exist.size(); i++) { - ASSERT_OK(status_list[i]); - } - } - - // Ensure that given keys don't exist - if (keys_must_not_exist.size() > 0) { - std::vector status_list = - db->MultiGet(ReadOptions(), keys_must_not_exist, &values); - for (size_t i = 0; i < keys_must_not_exist.size(); i++) { - ASSERT_TRUE(status_list[i].IsNotFound()); - } - } -} - -} // namespace - -TEST_F(DBTest, WalFilterTest) { - class TestWalFilter : public WalFilter { - private: - // Processing option that is requested to be applied at the given index - WalFilter::WalProcessingOption wal_processing_option_; - // Index at which to apply wal_processing_option_ - // At other indexes default wal_processing_option::kContinueProcessing is - // returned. - size_t apply_option_at_record_index_; - // Current record index, incremented with each record encountered. - size_t current_record_index_; - - public: - TestWalFilter(WalFilter::WalProcessingOption wal_processing_option, - size_t apply_option_for_record_index) - : wal_processing_option_(wal_processing_option), - apply_option_at_record_index_(apply_option_for_record_index), - current_record_index_(0) {} - - virtual WalProcessingOption LogRecord(const WriteBatch& batch, - WriteBatch* new_batch, - bool* batch_changed) const override { - WalFilter::WalProcessingOption option_to_return; - - if (current_record_index_ == apply_option_at_record_index_) { - option_to_return = wal_processing_option_; - } else { - option_to_return = WalProcessingOption::kContinueProcessing; - } - - // Filter is passed as a const object for RocksDB to not modify the - // object, however we modify it for our own purpose here and hence - // cast the constness away. - (const_cast(this)->current_record_index_)++; - - return option_to_return; - } - - virtual const char* Name() const override { return "TestWalFilter"; } - }; - - // Create 3 batches with two keys each - std::vector> batch_keys(3); - - batch_keys[0].push_back("key1"); - batch_keys[0].push_back("key2"); - batch_keys[1].push_back("key3"); - batch_keys[1].push_back("key4"); - batch_keys[2].push_back("key5"); - batch_keys[2].push_back("key6"); - - // Test with all WAL processing options - for (int option = 0; - option < static_cast( - WalFilter::WalProcessingOption::kWalProcessingOptionMax); - option++) { - Options options = OptionsForLogIterTest(); - DestroyAndReopen(options); - CreateAndReopenWithCF({"pikachu"}, options); - - // Write given keys in given batches - for (size_t i = 0; i < batch_keys.size(); i++) { - WriteBatch batch; - for (size_t j = 0; j < batch_keys[i].size(); j++) { - batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)); - } - dbfull()->Write(WriteOptions(), &batch); - } - - WalFilter::WalProcessingOption wal_processing_option = - static_cast(option); - - // Create a test filter that would apply wal_processing_option at the first - // record - size_t apply_option_for_record_index = 1; - TestWalFilter test_wal_filter(wal_processing_option, - apply_option_for_record_index); - - // Reopen database with option to use WAL filter - options = OptionsForLogIterTest(); - options.wal_filter = &test_wal_filter; - Status status = - TryReopenWithColumnFamilies({"default", "pikachu"}, options); - if (wal_processing_option == - WalFilter::WalProcessingOption::kCorruptedRecord) { - assert(!status.ok()); - // In case of corruption we can turn off paranoid_checks to reopen - // databse - options.paranoid_checks = false; - ReopenWithColumnFamilies({"default", "pikachu"}, options); - } else { - assert(status.ok()); - } - - // Compute which keys we expect to be found - // and which we expect not to be found after recovery. - std::vector keys_must_exist; - std::vector keys_must_not_exist; - switch (wal_processing_option) { - case WalFilter::WalProcessingOption::kCorruptedRecord: - case WalFilter::WalProcessingOption::kContinueProcessing: { - fprintf(stderr, "Testing with complete WAL processing\n"); - // we expect all records to be processed - for (size_t i = 0; i < batch_keys.size(); i++) { - for (size_t j = 0; j < batch_keys[i].size(); j++) { - keys_must_exist.push_back(Slice(batch_keys[i][j])); - } - } - break; - } - case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: { - fprintf(stderr, - "Testing with ignoring record %" ROCKSDB_PRIszt " only\n", - apply_option_for_record_index); - // We expect the record with apply_option_for_record_index to be not - // found. - for (size_t i = 0; i < batch_keys.size(); i++) { - for (size_t j = 0; j < batch_keys[i].size(); j++) { - if (i == apply_option_for_record_index) { - keys_must_not_exist.push_back(Slice(batch_keys[i][j])); - } else { - keys_must_exist.push_back(Slice(batch_keys[i][j])); - } - } - } - break; - } - case WalFilter::WalProcessingOption::kStopReplay: { - fprintf(stderr, - "Testing with stopping replay from record %" ROCKSDB_PRIszt - "\n", - apply_option_for_record_index); - // We expect records beyond apply_option_for_record_index to be not - // found. - for (size_t i = 0; i < batch_keys.size(); i++) { - for (size_t j = 0; j < batch_keys[i].size(); j++) { - if (i >= apply_option_for_record_index) { - keys_must_not_exist.push_back(Slice(batch_keys[i][j])); - } else { - keys_must_exist.push_back(Slice(batch_keys[i][j])); - } - } - } - break; - } - default: - assert(false); // unhandled case - } - - bool checked_after_reopen = false; - - while (true) { - // Ensure that expected keys exists - // and not expected keys don't exist after recovery - ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); - - if (checked_after_reopen) { - break; - } - - // reopen database again to make sure previous log(s) are not used - //(even if they were skipped) - // reopn database with option to use WAL filter - options = OptionsForLogIterTest(); - ReopenWithColumnFamilies({"default", "pikachu"}, options); - - checked_after_reopen = true; - } - } -} - -TEST_F(DBTest, WalFilterTestWithChangeBatch) { - class ChangeBatchHandler : public WriteBatch::Handler { - private: - // Batch to insert keys in - WriteBatch* new_write_batch_; - // Number of keys to add in the new batch - size_t num_keys_to_add_in_new_batch_; - // Number of keys added to new batch - size_t num_keys_added_; - - public: - ChangeBatchHandler(WriteBatch* new_write_batch, - size_t num_keys_to_add_in_new_batch) - : new_write_batch_(new_write_batch), - num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch), - num_keys_added_(0) {} - virtual void Put(const Slice& key, const Slice& value) override { - if (num_keys_added_ < num_keys_to_add_in_new_batch_) { - new_write_batch_->Put(key, value); - ++num_keys_added_; - } - } - }; - - class TestWalFilterWithChangeBatch : public WalFilter { - private: - // Index at which to start changing records - size_t change_records_from_index_; - // Number of keys to add in the new batch - size_t num_keys_to_add_in_new_batch_; - // Current record index, incremented with each record encountered. - size_t current_record_index_; - - public: - TestWalFilterWithChangeBatch(size_t change_records_from_index, - size_t num_keys_to_add_in_new_batch) - : change_records_from_index_(change_records_from_index), - num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch), - current_record_index_(0) {} - - virtual WalProcessingOption LogRecord(const WriteBatch& batch, - WriteBatch* new_batch, - bool* batch_changed) const override { - if (current_record_index_ >= change_records_from_index_) { - ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_); - batch.Iterate(&handler); - *batch_changed = true; - } - - // Filter is passed as a const object for RocksDB to not modify the - // object, however we modify it for our own purpose here and hence - // cast the constness away. - (const_cast(this) - ->current_record_index_)++; - - return WalProcessingOption::kContinueProcessing; - } - - virtual const char* Name() const override { - return "TestWalFilterWithChangeBatch"; - } - }; - - std::vector> batch_keys(3); - - batch_keys[0].push_back("key1"); - batch_keys[0].push_back("key2"); - batch_keys[1].push_back("key3"); - batch_keys[1].push_back("key4"); - batch_keys[2].push_back("key5"); - batch_keys[2].push_back("key6"); - - Options options = OptionsForLogIterTest(); - DestroyAndReopen(options); - CreateAndReopenWithCF({"pikachu"}, options); - - // Write given keys in given batches - for (size_t i = 0; i < batch_keys.size(); i++) { - WriteBatch batch; - for (size_t j = 0; j < batch_keys[i].size(); j++) { - batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)); - } - dbfull()->Write(WriteOptions(), &batch); - } - - // Create a test filter that would apply wal_processing_option at the first - // record - size_t change_records_from_index = 1; - size_t num_keys_to_add_in_new_batch = 1; - TestWalFilterWithChangeBatch test_wal_filter_with_change_batch( - change_records_from_index, num_keys_to_add_in_new_batch); - - // Reopen database with option to use WAL filter - options = OptionsForLogIterTest(); - options.wal_filter = &test_wal_filter_with_change_batch; - ReopenWithColumnFamilies({"default", "pikachu"}, options); - - // Ensure that all keys exist before change_records_from_index_ - // And after that index only single key exists - // as our filter adds only single key for each batch - std::vector keys_must_exist; - std::vector keys_must_not_exist; - - for (size_t i = 0; i < batch_keys.size(); i++) { - for (size_t j = 0; j < batch_keys[i].size(); j++) { - if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) { - keys_must_not_exist.push_back(Slice(batch_keys[i][j])); - } else { - keys_must_exist.push_back(Slice(batch_keys[i][j])); - } - } - } - - bool checked_after_reopen = false; - - while (true) { - // Ensure that expected keys exists - // and not expected keys don't exist after recovery - ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); - - if (checked_after_reopen) { - break; - } - - // reopen database again to make sure previous log(s) are not used - //(even if they were skipped) - // reopn database with option to use WAL filter - options = OptionsForLogIterTest(); - ReopenWithColumnFamilies({"default", "pikachu"}, options); - - checked_after_reopen = true; - } -} - -TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) { - class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter { - public: - virtual WalProcessingOption LogRecord(const WriteBatch& batch, - WriteBatch* new_batch, - bool* batch_changed) const override { - *new_batch = batch; - new_batch->Put("key_extra", "value_extra"); - *batch_changed = true; - return WalProcessingOption::kContinueProcessing; - } - - virtual const char* Name() const override { - return "WalFilterTestWithChangeBatchExtraKeys"; - } - }; - - std::vector> batch_keys(3); - - batch_keys[0].push_back("key1"); - batch_keys[0].push_back("key2"); - batch_keys[1].push_back("key3"); - batch_keys[1].push_back("key4"); - batch_keys[2].push_back("key5"); - batch_keys[2].push_back("key6"); - - Options options = OptionsForLogIterTest(); - DestroyAndReopen(options); - CreateAndReopenWithCF({"pikachu"}, options); - - // Write given keys in given batches - for (size_t i = 0; i < batch_keys.size(); i++) { - WriteBatch batch; - for (size_t j = 0; j < batch_keys[i].size(); j++) { - batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)); - } - dbfull()->Write(WriteOptions(), &batch); - } - - // Create a test filter that would add extra keys - TestWalFilterWithChangeBatchAddExtraKeys test_wal_filter_extra_keys; - - // Reopen database with option to use WAL filter - options = OptionsForLogIterTest(); - options.wal_filter = &test_wal_filter_extra_keys; - Status status = TryReopenWithColumnFamilies({"default", "pikachu"}, options); - ASSERT_TRUE(status.IsNotSupported()); - - // Reopen without filter, now reopen should succeed - previous - // attempt to open must not have altered the db. - options = OptionsForLogIterTest(); - ReopenWithColumnFamilies({"default", "pikachu"}, options); - - std::vector keys_must_exist; - std::vector keys_must_not_exist; // empty vector - - for (size_t i = 0; i < batch_keys.size(); i++) { - for (size_t j = 0; j < batch_keys[i].size(); j++) { - keys_must_exist.push_back(Slice(batch_keys[i][j])); - } - } - - ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); -} - -TEST_F(DBTest, WalFilterTestWithColumnFamilies) { - class TestWalFilterWithColumnFamilies : public WalFilter { - private: - // column_family_id -> log_number map (provided to WALFilter) - std::map cf_log_number_map_; - // column_family_name -> column_family_id map (provided to WALFilter) - std::map cf_name_id_map_; - // column_family_name -> keys_found_in_wal map - // We store keys that are applicable to the column_family - // during recovery (i.e. aren't already flushed to SST file(s)) - // for verification against the keys we expect. - std::map> cf_wal_keys_; - public: - virtual void ColumnFamilyLogNumberMap( - const std::map& cf_lognumber_map, - const std::map& cf_name_id_map) override { - cf_log_number_map_ = cf_lognumber_map; - cf_name_id_map_ = cf_name_id_map; - } - - virtual WalProcessingOption LogRecord(unsigned long long log_number, - const std::string& log_file_name, - const WriteBatch& batch, - WriteBatch* new_batch, - bool* batch_changed) override { - class LogRecordBatchHandler : public WriteBatch::Handler { - private: - const std::map & cf_log_number_map_; - std::map> & cf_wal_keys_; - unsigned long long log_number_; - public: - LogRecordBatchHandler(unsigned long long current_log_number, - const std::map & cf_log_number_map, - std::map> & cf_wal_keys) : - cf_log_number_map_(cf_log_number_map), - cf_wal_keys_(cf_wal_keys), - log_number_(current_log_number){} - - virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& /*value*/) override { - auto it = cf_log_number_map_.find(column_family_id); - assert(it != cf_log_number_map_.end()); - unsigned long long log_number_for_cf = it->second; - // If the current record is applicable for column_family_id - // (i.e. isn't flushed to SST file(s) for column_family_id) - // add it to the cf_wal_keys_ map for verification. - if (log_number_ >= log_number_for_cf) { - cf_wal_keys_[column_family_id].push_back(std::string(key.data(), key.size())); - } - return Status::OK(); - } - } handler(log_number, cf_log_number_map_, cf_wal_keys_); - - batch.Iterate(&handler); - - return WalProcessingOption::kContinueProcessing; - } - - virtual const char* Name() const override { - return "WalFilterTestWithColumnFamilies"; - } - - const std::map> & GetColumnFamilyKeys() { - return cf_wal_keys_; - } - - const std::map & GetColumnFamilyNameIdMap() { - return cf_name_id_map_; - } - }; - - std::vector> batch_keys_pre_flush(3); - - batch_keys_pre_flush[0].push_back("key1"); - batch_keys_pre_flush[0].push_back("key2"); - batch_keys_pre_flush[1].push_back("key3"); - batch_keys_pre_flush[1].push_back("key4"); - batch_keys_pre_flush[2].push_back("key5"); - batch_keys_pre_flush[2].push_back("key6"); - - Options options = OptionsForLogIterTest(); - DestroyAndReopen(options); - CreateAndReopenWithCF({ "pikachu" }, options); - - // Write given keys in given batches - for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) { - WriteBatch batch; - for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) { - batch.Put(handles_[0], batch_keys_pre_flush[i][j], DummyString(1024)); - batch.Put(handles_[1], batch_keys_pre_flush[i][j], DummyString(1024)); - } - dbfull()->Write(WriteOptions(), &batch); - } - - //Flush default column-family - db_->Flush(FlushOptions(), handles_[0]); - - // Do some more writes - std::vector> batch_keys_post_flush(3); - - batch_keys_post_flush[0].push_back("key7"); - batch_keys_post_flush[0].push_back("key8"); - batch_keys_post_flush[1].push_back("key9"); - batch_keys_post_flush[1].push_back("key10"); - batch_keys_post_flush[2].push_back("key11"); - batch_keys_post_flush[2].push_back("key12"); - - // Write given keys in given batches - for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { - WriteBatch batch; - for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { - batch.Put(handles_[0], batch_keys_post_flush[i][j], DummyString(1024)); - batch.Put(handles_[1], batch_keys_post_flush[i][j], DummyString(1024)); - } - dbfull()->Write(WriteOptions(), &batch); - } - - // On Recovery we should only find the second batch applicable to default CF - // But both batches applicable to pikachu CF - - // Create a test filter that would add extra keys - TestWalFilterWithColumnFamilies test_wal_filter_column_families; - - // Reopen database with option to use WAL filter - options = OptionsForLogIterTest(); - options.wal_filter = &test_wal_filter_column_families; - Status status = - TryReopenWithColumnFamilies({ "default", "pikachu" }, options); - ASSERT_TRUE(status.ok()); - - // verify that handles_[0] only has post_flush keys - // while handles_[1] has pre and post flush keys - auto cf_wal_keys = test_wal_filter_column_families.GetColumnFamilyKeys(); - auto name_id_map = test_wal_filter_column_families.GetColumnFamilyNameIdMap(); - size_t index = 0; - auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]]; - //default column-family, only post_flush keys are expected - for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { - for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { - Slice key_from_the_log(keys_cf[index++]); - Slice batch_key(batch_keys_post_flush[i][j]); - ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); - } - } - ASSERT_TRUE(index == keys_cf.size()); - - index = 0; - keys_cf = cf_wal_keys[name_id_map["pikachu"]]; - //pikachu column-family, all keys are expected - for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) { - for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) { - Slice key_from_the_log(keys_cf[index++]); - Slice batch_key(batch_keys_pre_flush[i][j]); - ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); - } - } - - for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { - for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { - Slice key_from_the_log(keys_cf[index++]); - Slice batch_key(batch_keys_post_flush[i][j]); - ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); - } - } - ASSERT_TRUE(index == keys_cf.size()); -} -#endif // ROCKSDB_LITE - class SliceTransformLimitedDomain : public SliceTransform { const char* Name() const override { return "SliceTransformLimitedDomain"; } diff --git a/db/db_test2.cc b/db/db_test2.cc index 3d9820b65..e074f6004 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -9,6 +9,7 @@ #include #include "db/db_test_util.h" #include "port/stack_trace.h" +#include "rocksdb/wal_filter.h" namespace rocksdb { @@ -77,6 +78,580 @@ TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) { std::string value; value = Get(1, "a"); } + +#ifndef ROCKSDB_LITE +namespace { + void ValidateKeyExistence(DB* db, const std::vector& keys_must_exist, + const std::vector& keys_must_not_exist) { + // Ensure that expected keys exist + std::vector values; + if (keys_must_exist.size() > 0) { + std::vector status_list = + db->MultiGet(ReadOptions(), keys_must_exist, &values); + for (size_t i = 0; i < keys_must_exist.size(); i++) { + ASSERT_OK(status_list[i]); + } + } + + // Ensure that given keys don't exist + if (keys_must_not_exist.size() > 0) { + std::vector status_list = + db->MultiGet(ReadOptions(), keys_must_not_exist, &values); + for (size_t i = 0; i < keys_must_not_exist.size(); i++) { + ASSERT_TRUE(status_list[i].IsNotFound()); + } + } + } + +} // namespace + +TEST_F(DBTest2, WalFilterTest) { + class TestWalFilter : public WalFilter { + private: + // Processing option that is requested to be applied at the given index + WalFilter::WalProcessingOption wal_processing_option_; + // Index at which to apply wal_processing_option_ + // At other indexes default wal_processing_option::kContinueProcessing is + // returned. + size_t apply_option_at_record_index_; + // Current record index, incremented with each record encountered. + size_t current_record_index_; + + public: + TestWalFilter(WalFilter::WalProcessingOption wal_processing_option, + size_t apply_option_for_record_index) + : wal_processing_option_(wal_processing_option), + apply_option_at_record_index_(apply_option_for_record_index), + current_record_index_(0) {} + + virtual WalProcessingOption LogRecord(const WriteBatch& batch, + WriteBatch* new_batch, + bool* batch_changed) const override { + WalFilter::WalProcessingOption option_to_return; + + if (current_record_index_ == apply_option_at_record_index_) { + option_to_return = wal_processing_option_; + } + else { + option_to_return = WalProcessingOption::kContinueProcessing; + } + + // Filter is passed as a const object for RocksDB to not modify the + // object, however we modify it for our own purpose here and hence + // cast the constness away. + (const_cast(this)->current_record_index_)++; + + return option_to_return; + } + + virtual const char* Name() const override { return "TestWalFilter"; } + }; + + // Create 3 batches with two keys each + std::vector> batch_keys(3); + + batch_keys[0].push_back("key1"); + batch_keys[0].push_back("key2"); + batch_keys[1].push_back("key3"); + batch_keys[1].push_back("key4"); + batch_keys[2].push_back("key5"); + batch_keys[2].push_back("key6"); + + // Test with all WAL processing options + for (int option = 0; + option < static_cast( + WalFilter::WalProcessingOption::kWalProcessingOptionMax); + option++) { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({ "pikachu" }, options); + + // Write given keys in given batches + for (size_t i = 0; i < batch_keys.size(); i++) { + WriteBatch batch; + for (size_t j = 0; j < batch_keys[i].size(); j++) { + batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)); + } + dbfull()->Write(WriteOptions(), &batch); + } + + WalFilter::WalProcessingOption wal_processing_option = + static_cast(option); + + // Create a test filter that would apply wal_processing_option at the first + // record + size_t apply_option_for_record_index = 1; + TestWalFilter test_wal_filter(wal_processing_option, + apply_option_for_record_index); + + // Reopen database with option to use WAL filter + options = OptionsForLogIterTest(); + options.wal_filter = &test_wal_filter; + Status status = + TryReopenWithColumnFamilies({ "default", "pikachu" }, options); + if (wal_processing_option == + WalFilter::WalProcessingOption::kCorruptedRecord) { + assert(!status.ok()); + // In case of corruption we can turn off paranoid_checks to reopen + // databse + options.paranoid_checks = false; + ReopenWithColumnFamilies({ "default", "pikachu" }, options); + } + else { + assert(status.ok()); + } + + // Compute which keys we expect to be found + // and which we expect not to be found after recovery. + std::vector keys_must_exist; + std::vector keys_must_not_exist; + switch (wal_processing_option) { + case WalFilter::WalProcessingOption::kCorruptedRecord: + case WalFilter::WalProcessingOption::kContinueProcessing: { + fprintf(stderr, "Testing with complete WAL processing\n"); + // we expect all records to be processed + for (size_t i = 0; i < batch_keys.size(); i++) { + for (size_t j = 0; j < batch_keys[i].size(); j++) { + keys_must_exist.push_back(Slice(batch_keys[i][j])); + } + } + break; + } + case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: { + fprintf(stderr, + "Testing with ignoring record %" ROCKSDB_PRIszt " only\n", + apply_option_for_record_index); + // We expect the record with apply_option_for_record_index to be not + // found. + for (size_t i = 0; i < batch_keys.size(); i++) { + for (size_t j = 0; j < batch_keys[i].size(); j++) { + if (i == apply_option_for_record_index) { + keys_must_not_exist.push_back(Slice(batch_keys[i][j])); + } + else { + keys_must_exist.push_back(Slice(batch_keys[i][j])); + } + } + } + break; + } + case WalFilter::WalProcessingOption::kStopReplay: { + fprintf(stderr, + "Testing with stopping replay from record %" ROCKSDB_PRIszt + "\n", + apply_option_for_record_index); + // We expect records beyond apply_option_for_record_index to be not + // found. + for (size_t i = 0; i < batch_keys.size(); i++) { + for (size_t j = 0; j < batch_keys[i].size(); j++) { + if (i >= apply_option_for_record_index) { + keys_must_not_exist.push_back(Slice(batch_keys[i][j])); + } + else { + keys_must_exist.push_back(Slice(batch_keys[i][j])); + } + } + } + break; + } + default: + assert(false); // unhandled case + } + + bool checked_after_reopen = false; + + while (true) { + // Ensure that expected keys exists + // and not expected keys don't exist after recovery + ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); + + if (checked_after_reopen) { + break; + } + + // reopen database again to make sure previous log(s) are not used + //(even if they were skipped) + // reopn database with option to use WAL filter + options = OptionsForLogIterTest(); + ReopenWithColumnFamilies({ "default", "pikachu" }, options); + + checked_after_reopen = true; + } + } +} + +TEST_F(DBTest2, WalFilterTestWithChangeBatch) { + class ChangeBatchHandler : public WriteBatch::Handler { + private: + // Batch to insert keys in + WriteBatch* new_write_batch_; + // Number of keys to add in the new batch + size_t num_keys_to_add_in_new_batch_; + // Number of keys added to new batch + size_t num_keys_added_; + + public: + ChangeBatchHandler(WriteBatch* new_write_batch, + size_t num_keys_to_add_in_new_batch) + : new_write_batch_(new_write_batch), + num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch), + num_keys_added_(0) {} + virtual void Put(const Slice& key, const Slice& value) override { + if (num_keys_added_ < num_keys_to_add_in_new_batch_) { + new_write_batch_->Put(key, value); + ++num_keys_added_; + } + } + }; + + class TestWalFilterWithChangeBatch : public WalFilter { + private: + // Index at which to start changing records + size_t change_records_from_index_; + // Number of keys to add in the new batch + size_t num_keys_to_add_in_new_batch_; + // Current record index, incremented with each record encountered. + size_t current_record_index_; + + public: + TestWalFilterWithChangeBatch(size_t change_records_from_index, + size_t num_keys_to_add_in_new_batch) + : change_records_from_index_(change_records_from_index), + num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch), + current_record_index_(0) {} + + virtual WalProcessingOption LogRecord(const WriteBatch& batch, + WriteBatch* new_batch, + bool* batch_changed) const override { + if (current_record_index_ >= change_records_from_index_) { + ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_); + batch.Iterate(&handler); + *batch_changed = true; + } + + // Filter is passed as a const object for RocksDB to not modify the + // object, however we modify it for our own purpose here and hence + // cast the constness away. + (const_cast(this) + ->current_record_index_)++; + + return WalProcessingOption::kContinueProcessing; + } + + virtual const char* Name() const override { + return "TestWalFilterWithChangeBatch"; + } + }; + + std::vector> batch_keys(3); + + batch_keys[0].push_back("key1"); + batch_keys[0].push_back("key2"); + batch_keys[1].push_back("key3"); + batch_keys[1].push_back("key4"); + batch_keys[2].push_back("key5"); + batch_keys[2].push_back("key6"); + + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({ "pikachu" }, options); + + // Write given keys in given batches + for (size_t i = 0; i < batch_keys.size(); i++) { + WriteBatch batch; + for (size_t j = 0; j < batch_keys[i].size(); j++) { + batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)); + } + dbfull()->Write(WriteOptions(), &batch); + } + + // Create a test filter that would apply wal_processing_option at the first + // record + size_t change_records_from_index = 1; + size_t num_keys_to_add_in_new_batch = 1; + TestWalFilterWithChangeBatch test_wal_filter_with_change_batch( + change_records_from_index, num_keys_to_add_in_new_batch); + + // Reopen database with option to use WAL filter + options = OptionsForLogIterTest(); + options.wal_filter = &test_wal_filter_with_change_batch; + ReopenWithColumnFamilies({ "default", "pikachu" }, options); + + // Ensure that all keys exist before change_records_from_index_ + // And after that index only single key exists + // as our filter adds only single key for each batch + std::vector keys_must_exist; + std::vector keys_must_not_exist; + + for (size_t i = 0; i < batch_keys.size(); i++) { + for (size_t j = 0; j < batch_keys[i].size(); j++) { + if (i >= change_records_from_index && j >= + num_keys_to_add_in_new_batch) { + keys_must_not_exist.push_back(Slice(batch_keys[i][j])); + } + else { + keys_must_exist.push_back(Slice(batch_keys[i][j])); + } + } + } + + bool checked_after_reopen = false; + + while (true) { + // Ensure that expected keys exists + // and not expected keys don't exist after recovery + ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); + + if (checked_after_reopen) { + break; + } + + // reopen database again to make sure previous log(s) are not used + //(even if they were skipped) + // reopn database with option to use WAL filter + options = OptionsForLogIterTest(); + ReopenWithColumnFamilies({ "default", "pikachu" }, options); + + checked_after_reopen = true; + } +} + +TEST_F(DBTest2, WalFilterTestWithChangeBatchExtraKeys) { + class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter { + public: + virtual WalProcessingOption LogRecord(const WriteBatch& batch, + WriteBatch* new_batch, + bool* batch_changed) const override { + *new_batch = batch; + new_batch->Put("key_extra", "value_extra"); + *batch_changed = true; + return WalProcessingOption::kContinueProcessing; + } + + virtual const char* Name() const override { + return "WalFilterTestWithChangeBatchExtraKeys"; + } + }; + + std::vector> batch_keys(3); + + batch_keys[0].push_back("key1"); + batch_keys[0].push_back("key2"); + batch_keys[1].push_back("key3"); + batch_keys[1].push_back("key4"); + batch_keys[2].push_back("key5"); + batch_keys[2].push_back("key6"); + + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({ "pikachu" }, options); + + // Write given keys in given batches + for (size_t i = 0; i < batch_keys.size(); i++) { + WriteBatch batch; + for (size_t j = 0; j < batch_keys[i].size(); j++) { + batch.Put(handles_[0], batch_keys[i][j], DummyString(1024)); + } + dbfull()->Write(WriteOptions(), &batch); + } + + // Create a test filter that would add extra keys + TestWalFilterWithChangeBatchAddExtraKeys test_wal_filter_extra_keys; + + // Reopen database with option to use WAL filter + options = OptionsForLogIterTest(); + options.wal_filter = &test_wal_filter_extra_keys; + Status status = TryReopenWithColumnFamilies({ "default", "pikachu" }, + options); + ASSERT_TRUE(status.IsNotSupported()); + + // Reopen without filter, now reopen should succeed - previous + // attempt to open must not have altered the db. + options = OptionsForLogIterTest(); + ReopenWithColumnFamilies({ "default", "pikachu" }, options); + + std::vector keys_must_exist; + std::vector keys_must_not_exist; // empty vector + + for (size_t i = 0; i < batch_keys.size(); i++) { + for (size_t j = 0; j < batch_keys[i].size(); j++) { + keys_must_exist.push_back(Slice(batch_keys[i][j])); + } + } + + ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); +} + +TEST_F(DBTest2, WalFilterTestWithColumnFamilies) { + class TestWalFilterWithColumnFamilies : public WalFilter { + private: + // column_family_id -> log_number map (provided to WALFilter) + std::map cf_log_number_map_; + // column_family_name -> column_family_id map (provided to WALFilter) + std::map cf_name_id_map_; + // column_family_name -> keys_found_in_wal map + // We store keys that are applicable to the column_family + // during recovery (i.e. aren't already flushed to SST file(s)) + // for verification against the keys we expect. + std::map> cf_wal_keys_; + public: + virtual void ColumnFamilyLogNumberMap( + const std::map& cf_lognumber_map, + const std::map& cf_name_id_map) override { + cf_log_number_map_ = cf_lognumber_map; + cf_name_id_map_ = cf_name_id_map; + } + + virtual WalProcessingOption LogRecord(unsigned long long log_number, + const std::string& log_file_name, + const WriteBatch& batch, + WriteBatch* new_batch, + bool* batch_changed) override { + class LogRecordBatchHandler : public WriteBatch::Handler { + private: + const std::map & cf_log_number_map_; + std::map> & cf_wal_keys_; + unsigned long long log_number_; + public: + LogRecordBatchHandler(unsigned long long current_log_number, + const std::map & cf_log_number_map, + std::map> & cf_wal_keys) : + cf_log_number_map_(cf_log_number_map), + cf_wal_keys_(cf_wal_keys), + log_number_(current_log_number){} + + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& /*value*/) override { + auto it = cf_log_number_map_.find(column_family_id); + assert(it != cf_log_number_map_.end()); + unsigned long long log_number_for_cf = it->second; + // If the current record is applicable for column_family_id + // (i.e. isn't flushed to SST file(s) for column_family_id) + // add it to the cf_wal_keys_ map for verification. + if (log_number_ >= log_number_for_cf) { + cf_wal_keys_[column_family_id].push_back(std::string(key.data(), + key.size())); + } + return Status::OK(); + } + } handler(log_number, cf_log_number_map_, cf_wal_keys_); + + batch.Iterate(&handler); + + return WalProcessingOption::kContinueProcessing; + } + + virtual const char* Name() const override { + return "WalFilterTestWithColumnFamilies"; + } + + const std::map> & + GetColumnFamilyKeys() { + return cf_wal_keys_; + } + + const std::map & GetColumnFamilyNameIdMap() { + return cf_name_id_map_; + } + }; + + std::vector> batch_keys_pre_flush(3); + + batch_keys_pre_flush[0].push_back("key1"); + batch_keys_pre_flush[0].push_back("key2"); + batch_keys_pre_flush[1].push_back("key3"); + batch_keys_pre_flush[1].push_back("key4"); + batch_keys_pre_flush[2].push_back("key5"); + batch_keys_pre_flush[2].push_back("key6"); + + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({ "pikachu" }, options); + + // Write given keys in given batches + for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) { + WriteBatch batch; + for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) { + batch.Put(handles_[0], batch_keys_pre_flush[i][j], DummyString(1024)); + batch.Put(handles_[1], batch_keys_pre_flush[i][j], DummyString(1024)); + } + dbfull()->Write(WriteOptions(), &batch); + } + + //Flush default column-family + db_->Flush(FlushOptions(), handles_[0]); + + // Do some more writes + std::vector> batch_keys_post_flush(3); + + batch_keys_post_flush[0].push_back("key7"); + batch_keys_post_flush[0].push_back("key8"); + batch_keys_post_flush[1].push_back("key9"); + batch_keys_post_flush[1].push_back("key10"); + batch_keys_post_flush[2].push_back("key11"); + batch_keys_post_flush[2].push_back("key12"); + + // Write given keys in given batches + for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { + WriteBatch batch; + for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { + batch.Put(handles_[0], batch_keys_post_flush[i][j], DummyString(1024)); + batch.Put(handles_[1], batch_keys_post_flush[i][j], DummyString(1024)); + } + dbfull()->Write(WriteOptions(), &batch); + } + + // On Recovery we should only find the second batch applicable to default CF + // But both batches applicable to pikachu CF + + // Create a test filter that would add extra keys + TestWalFilterWithColumnFamilies test_wal_filter_column_families; + + // Reopen database with option to use WAL filter + options = OptionsForLogIterTest(); + options.wal_filter = &test_wal_filter_column_families; + Status status = + TryReopenWithColumnFamilies({ "default", "pikachu" }, options); + ASSERT_TRUE(status.ok()); + + // verify that handles_[0] only has post_flush keys + // while handles_[1] has pre and post flush keys + auto cf_wal_keys = test_wal_filter_column_families.GetColumnFamilyKeys(); + auto name_id_map = + test_wal_filter_column_families.GetColumnFamilyNameIdMap(); + size_t index = 0; + auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]]; + //default column-family, only post_flush keys are expected + for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { + for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { + Slice key_from_the_log(keys_cf[index++]); + Slice batch_key(batch_keys_post_flush[i][j]); + ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); + } + } + ASSERT_TRUE(index == keys_cf.size()); + + index = 0; + keys_cf = cf_wal_keys[name_id_map["pikachu"]]; + //pikachu column-family, all keys are expected + for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) { + for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) { + Slice key_from_the_log(keys_cf[index++]); + Slice batch_key(batch_keys_pre_flush[i][j]); + ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); + } + } + + for (size_t i = 0; i < batch_keys_post_flush.size(); i++) { + for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) { + Slice key_from_the_log(keys_cf[index++]); + Slice batch_key(batch_keys_post_flush[i][j]); + ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0); + } + } + ASSERT_TRUE(index == keys_cf.size()); +} +#endif // ROCKSDB_LITE } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/wal_filter.h b/include/rocksdb/wal_filter.h index 8b032bb9d..867d2118b 100644 --- a/include/rocksdb/wal_filter.h +++ b/include/rocksdb/wal_filter.h @@ -33,11 +33,12 @@ class WalFilter { virtual ~WalFilter() {} // Provide ColumnFamily->LogNumber map to filter - // so that filter can determine whether given record - // is applicable to given column family. - // We also pass in name->id map as this is known at - // recovery time and write batch callbacks happen - // in terms of column family id. + // so that filter can determine whether a log number applies to a given + // column family (i.e. that log hasn't been flushed to SST already for the + // column family). + // We also pass in name->id map as only name is known during + // recovery (as handles are opened post-recovery). + // while write batch callbacks happen in terms of column family id. // // @params cf_lognumber_map column_family_id to lognumber map // @params cf_name_id_map column_family_name to column_family_id map @@ -58,8 +59,8 @@ class WalFilter { // discarding the logs from current record onwards. // // @params log_number log_number of the current log. - // Filter might use this to determine if the log record - // is applicable to a certain column family. + // Filter might use this to determine if the log + // record is applicable to a certain column family. // @params log_file_name log file name - only for informational purposes // @params batch batch encountered in the log during recovery // @params new_batch new_batch to populate if filter wants to change @@ -82,6 +83,9 @@ class WalFilter { return LogRecord(batch, new_batch, batch_changed); } + // Please see the comments for LogRecord above. This function is for + // compatibility only and contains a subset of parameters. + // New code should use the function above. virtual WalProcessingOption LogRecord(const WriteBatch& batch, WriteBatch* new_batch, bool* batch_changed) const {