diff --git a/db/db_impl.cc b/db/db_impl.cc index 0a39387c8..f9497ef41 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -63,6 +63,7 @@ #include "rocksdb/status.h" #include "rocksdb/table.h" #include "rocksdb/version.h" +#include "rocksdb/wal_filter.h" #include "table/block.h" #include "table/block_based_table_factory.h" #include "table/merger.h" @@ -1153,6 +1154,26 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } WriteBatchInternal::SetContents(&batch, record); + if (db_options_.wal_filter != nullptr) { + WALFilter::WALProcessingOption walProcessingOption = + db_options_.wal_filter->LogRecord(batch); + + switch (walProcessingOption) { + case WALFilter::WALProcessingOption::kContinueProcessing: + //do nothing, proceeed normally + break; + case WALFilter::WALProcessingOption::kIgnoreCurrentRecord: + //skip current record + continue; + case WALFilter::WALProcessingOption::kStopReplay: + //skip current record and stop replay + continue_replay_log = false; + continue; + default: + assert(false); //unhandled case + } + } + // If column family was not found, it might mean that the WAL write // batch references to the column family that was dropped after the // insert. We don't want to fail the whole write batch in that case -- diff --git a/db/db_test.cc b/db/db_test.cc index 017c85701..e089f67bb 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -48,6 +48,7 @@ #include "rocksdb/table.h" #include "rocksdb/table_properties.h" #include "rocksdb/thread_status.h" +#include "rocksdb/wal_filter.h" #include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/utilities/checkpoint.h" #include "rocksdb/utilities/optimistic_transaction_db.h" @@ -9669,6 +9670,182 @@ TEST_F(DBTest, PauseBackgroundWorkTest) { ASSERT_EQ(true, done.load()); } +TEST_F(DBTest, WalFilterTest) { + class TestWALFilter : public WALFilter { + private: + // Processing option that is requested to be applied at the given index + WALFilter::WALProcessingOption m_walProcessingOption; + // Index at which to apply m_walProcessingOption + // At other indexes default WALProcessingOption::kContinueProcessing is + // returned. + size_t m_applyOptionAtRecordIndex; + // Current record index, incremented with each record encountered. + size_t m_currentRecordIndex; + public: + TestWALFilter(WALFilter::WALProcessingOption walProcessingOption, + size_t applyOptionForRecordIndex) : + m_walProcessingOption(walProcessingOption), + m_applyOptionAtRecordIndex(applyOptionForRecordIndex), + m_currentRecordIndex(0) { } + + virtual WALProcessingOption LogRecord(const WriteBatch & batch) const override { + WALFilter::WALProcessingOption optionToReturn; + + if (m_currentRecordIndex == m_applyOptionAtRecordIndex) { + optionToReturn = m_walProcessingOption; + } + else { + optionToReturn = WALProcessingOption::kContinueProcessing; + } + + // Filter is passed as a const object for RocksDB to not modify the + // object, however we modify it for our own purpose here and hence + // cast the constness away. + (const_cast(this)->m_currentRecordIndex)++; + + return optionToReturn; + } + + virtual const char* Name() const override { + return "TestWALFilter"; + } + }; + + // Create 3 batches with two keys each + std::vector> batchKeys(3); + + batchKeys[0].push_back("key1"); + batchKeys[0].push_back("key2"); + batchKeys[1].push_back("key3"); + batchKeys[1].push_back("key4"); + batchKeys[2].push_back("key5"); + batchKeys[2].push_back("key6"); + + // Test with all WAL processing options + for (char option = 0; + option < static_cast(WALFilter::WALProcessingOption::kWALProcessingOptionMax); + option++) { + Options options = OptionsForLogIterTest(); + DestroyAndReopen(options); + CreateAndReopenWithCF({ "pikachu" }, options); + { + // Write given keys in given batches + for (size_t i = 0; i < batchKeys.size(); i++) { + WriteBatch batch; + for (size_t j = 0; j < batchKeys[i].size(); j++) { + batch.Put(handles_[0], batchKeys[i][j], DummyString(1024)); + } + dbfull()->Write(WriteOptions(), &batch); + } + + WALFilter::WALProcessingOption walProcessingOption = + static_cast(option); + + // Create a test filter that would apply walProcessingOption at the first + // record + size_t applyOptionForRecordIndex = 1; + TestWALFilter testWalFilter(walProcessingOption, + applyOptionForRecordIndex); + + // Reopen database with option to use WAL filter + options = OptionsForLogIterTest(); + options.wal_filter = &testWalFilter; + ReopenWithColumnFamilies({ "default", "pikachu" }, options); + + // Compute which keys we expect to be found + // and which we expect not to be found after recovery. + std::vector keysMustExist; + std::vector keysMustNotExist; + switch (walProcessingOption) { + case WALFilter::WALProcessingOption::kContinueProcessing: { + fprintf(stderr, "Testing with complete WAL processing," + " i.e. the default case\n"); + //we expect all records to be processed + for (size_t i = 0; i < batchKeys.size(); i++) { + for (size_t j = 0; j < batchKeys[i].size(); j++) { + keysMustExist.push_back(Slice(batchKeys[i][j])); + } + } + break; + } + case WALFilter::WALProcessingOption::kIgnoreCurrentRecord: { + fprintf(stderr, "Testing with ignoring record %d only\n", + applyOptionForRecordIndex); + // We expect the record with applyOptionForRecordIndex to be not + // found. + for (size_t i = 0; i < batchKeys.size(); i++) { + for (size_t j = 0; j < batchKeys[i].size(); j++) { + if (i == applyOptionForRecordIndex) { + keysMustNotExist.push_back(Slice(batchKeys[i][j])); + } + else { + keysMustExist.push_back(Slice(batchKeys[i][j])); + } + } + } + break; + } + case WALFilter::WALProcessingOption::kStopReplay: { + fprintf(stderr, "Testing with stopping replay from record %d\n", + applyOptionForRecordIndex); + // We expect records beyond applyOptionForRecordIndex to be not + // found. + for (size_t i = 0; i < batchKeys.size(); i++) { + for (size_t j = 0; j < batchKeys[i].size(); j++) { + if (i >= applyOptionForRecordIndex) { + keysMustNotExist.push_back(Slice(batchKeys[i][j])); + } + else { + keysMustExist.push_back(Slice(batchKeys[i][j])); + } + } + } + break; + } + default: + assert(false); //unhandled case + } + + bool checkedAfterReopen = false; + + while (true) + { + // Ensure that expected keys exist after recovery + std::vector values; + if (keysMustExist.size() > 0) { + std::vector status_list = dbfull()->MultiGet(ReadOptions(), + keysMustExist, + &values); + for (size_t i = 0; i < keysMustExist.size(); i++) { + ASSERT_OK(status_list[i]); + } + } + + // Ensure that discarded keys don't exist after recovery + if (keysMustNotExist.size() > 0) { + std::vector status_list = dbfull()->MultiGet(ReadOptions(), + keysMustNotExist, + &values); + for (size_t i = 0; i < keysMustNotExist.size(); i++) { + ASSERT_TRUE(status_list[i].IsNotFound()); + } + } + + if (checkedAfterReopen) { + break; + } + + //reopen database again to make sure previous log(s) are not used + //(even if they were skipped) + //reopn database with option to use WAL filter + options = OptionsForLogIterTest(); + ReopenWithColumnFamilies({ "default", "pikachu" }, options); + + checkedAfterReopen = true; + } + } + } +} } // namespace rocksdb #endif diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b89d5d247..7a43f1f61 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -47,6 +47,7 @@ class Slice; class SliceTransform; class Statistics; class InternalKeyComparator; +class WALFilter; // DB contents are stored in a set of blocks, each of which holds a // sequence of key,value pairs. Each block may be compressed before @@ -1132,6 +1133,13 @@ struct DBOptions { // Default: nullptr (disabled) // Not supported in ROCKSDB_LITE mode! std::shared_ptr row_cache; + + // A filter object supplied to be invoked while processing write-ahead-logs + // (WALs) during recovery. The filter provides a way to inspect log + // records, ignoring a particular record or skipping replay. + // The filter is invoked at startup and is invoked from a single-thread + // currently. + const WALFilter * wal_filter; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/include/rocksdb/wal_filter.h b/include/rocksdb/wal_filter.h new file mode 100644 index 000000000..f3e88682d --- /dev/null +++ b/include/rocksdb/wal_filter.h @@ -0,0 +1,63 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2013 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_ROCKSDB_INCLUDE_WAL_FILTER_H_ +#define STORAGE_ROCKSDB_INCLUDE_WAL_FILTER_H_ + +namespace rocksdb { + +class WriteBatch; + +// WALFilter allows an application to inspect write-ahead-log (WAL) +// records or modify their processing on recovery. +// Please see the details below. +class WALFilter { +public: + enum class WALProcessingOption { + // Continue processing as usual + kContinueProcessing = 0, + // Ignore the current record but continue processing of log(s) + kIgnoreCurrentRecord = 1, + // Stop replay of logs and discard logs + // Logs won't be replayed on subsequent recovery + kStopReplay = 2, + // Marker for enum count + kWALProcessingOptionMax = 3 + }; + + virtual ~WALFilter() { }; + + // LogRecord is invoked for each log record encountered for all the logs + // during replay on logs on recovery. This method can be used to: + // * inspect the record (using the batch parameter) + // * ignoring current record + // (by returning WALProcessingOption::kIgnoreCurrentRecord) + // * stop log replay + // (by returning kStop replay) - please note that this implies + // discarding the logs from current record onwards. + virtual WALProcessingOption LogRecord(const WriteBatch & batch) const = 0; + + // Returns a name that identifies this WAL filter. + // The name will be printed to LOG file on start up for diagnosis. + virtual const char* Name() const = 0; +}; + +// Default implementation of WALFilter that does not alter WAL processing +class DefaultWALFilter : WALFilter { + virtual WALProcessingOption LogRecord(const WriteBatch & batch) const override { + return WALProcessingOption::kContinueProcessing; + } + + virtual const char* Name() const override { + return "DefaultWALFilter"; + } +}; + +} // namespace rocksdb + +#endif // STORAGE_ROCKSDB_INCLUDE_WAL_FILTER_H_ diff --git a/util/options.cc b/util/options.cc index 8d8a1e23f..5eea10bd5 100644 --- a/util/options.cc +++ b/util/options.cc @@ -29,6 +29,7 @@ #include "rocksdb/slice_transform.h" #include "rocksdb/table.h" #include "rocksdb/table_properties.h" +#include "rocksdb/wal_filter.h" #include "table/block_based_table_factory.h" #include "util/compression.h" #include "util/statistics.h" @@ -254,7 +255,8 @@ DBOptions::DBOptions() enable_thread_tracking(false), delayed_write_rate(1024U * 1024U), skip_stats_update_on_db_open(false), - wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords) { + wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords), + wal_filter(nullptr) { } DBOptions::DBOptions(const Options& options) @@ -309,7 +311,8 @@ DBOptions::DBOptions(const Options& options) delayed_write_rate(options.delayed_write_rate), skip_stats_update_on_db_open(options.skip_stats_update_on_db_open), wal_recovery_mode(options.wal_recovery_mode), - row_cache(options.row_cache) {} + row_cache(options.row_cache), + wal_filter(options.wal_filter){} static const char* const access_hints[] = { "NONE", "NORMAL", "SEQUENTIAL", "WILLNEED" @@ -406,6 +409,8 @@ void DBOptions::Dump(Logger* log) const { } else { Header(log, " Options.row_cache: None"); } + Header(log, " Options.wal_filter: %s", + wal_filter ? wal_filter->Name() : "None"); } // DBOptions::Dump void ColumnFamilyOptions::Dump(Logger* log) const {