Merge pull request #765 from PraveenSinghRao/wal_filter

Adding wal filter to inspect and filter wal records on recovery
main
Siying Dong 9 years ago
commit beb69d4511
  1. 70
      db/db_impl.cc
  2. 400
      db/db_test.cc
  3. 10
      include/rocksdb/options.h
  4. 65
      include/rocksdb/wal_filter.h
  5. 18
      util/options.cc

@ -65,6 +65,7 @@
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "rocksdb/version.h"
#include "rocksdb/wal_filter.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/merger.h"
@ -1151,6 +1152,75 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}
WriteBatchInternal::SetContents(&batch, record);
#ifndef ROCKSDB_LITE
if (db_options_.wal_filter != nullptr) {
WriteBatch new_batch;
bool batch_changed = false;
WalFilter::WalProcessingOption wal_processing_option =
db_options_.wal_filter->LogRecord(batch, &new_batch, &batch_changed);
switch (wal_processing_option) {
case WalFilter::WalProcessingOption::kContinueProcessing:
//do nothing, proceeed normally
break;
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
//skip current record
continue;
case WalFilter::WalProcessingOption::kStopReplay:
//skip current record and stop replay
continue_replay_log = false;
continue;
case WalFilter::WalProcessingOption::kCorruptedRecord: {
status = Status::Corruption("Corruption reported by Wal Filter ",
db_options_.wal_filter->Name());
MaybeIgnoreError(&status);
if (!status.ok()) {
reporter.Corruption(record.size(), status);
continue;
}
break;
}
default: {
assert(false); //unhandled case
status = Status::NotSupported("Unknown WalProcessingOption returned"
" by Wal Filter ", db_options_.wal_filter->Name());
MaybeIgnoreError(&status);
if (!status.ok()) {
return status;
}
else {
// Ignore the error with current record processing.
continue;
}
}
}
if (batch_changed) {
// Make sure that the count in the new batch is
// within the orignal count.
int new_count = WriteBatchInternal::Count(&new_batch);
int original_count = WriteBatchInternal::Count(&batch);
if (new_count > original_count) {
Log(InfoLogLevel::FATAL_LEVEL, db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d log filter %s returned "
"more records (%d) than original (%d) which is not allowed. "
"Aborting recovery.",
log_number, db_options_.wal_recovery_mode,
db_options_.wal_filter->Name(), new_count, original_count);
status = Status::NotSupported("More than original # of records "
"returned by Wal Filter ", db_options_.wal_filter->Name());
return status;
}
// Set the same sequence number in the new_batch
// as the original batch.
WriteBatchInternal::SetSequence(&new_batch,
WriteBatchInternal::Sequence(&batch));
batch = new_batch;
}
}
#endif //ROCKSDB_LITE
// If column family was not found, it might mean that the WAL write
// batch references to the column family that was dropped after the
// insert. We don't want to fail the whole write batch in that case --

@ -47,6 +47,7 @@
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/thread_status.h"
#include "rocksdb/wal_filter.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
@ -9888,6 +9889,405 @@ TEST_F(DBTest, PauseBackgroundWorkTest) {
ASSERT_EQ(true, done.load());
}
#ifndef ROCKSDB_LITE
namespace {
void ValidateKeyExistence(DB* db,
const std::vector<Slice>& keys_must_exist,
const std::vector<Slice>& keys_must_not_exist) {
// Ensure that expected keys exist
std::vector<std::string> values;
if (keys_must_exist.size() > 0) {
std::vector<Status> status_list = db->MultiGet(ReadOptions(),
keys_must_exist,
&values);
for (size_t i = 0; i < keys_must_exist.size(); i++) {
ASSERT_OK(status_list[i]);
}
}
// Ensure that given keys don't exist
if (keys_must_not_exist.size() > 0) {
std::vector<Status> status_list = db->MultiGet(ReadOptions(),
keys_must_not_exist,
&values);
for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
ASSERT_TRUE(status_list[i].IsNotFound());
}
}
}
} //namespace
TEST_F(DBTest, WalFilterTest) {
class TestWalFilter : public WalFilter {
private:
// Processing option that is requested to be applied at the given index
WalFilter::WalProcessingOption wal_processing_option_;
// Index at which to apply wal_processing_option_
// At other indexes default wal_processing_option::kContinueProcessing is
// returned.
size_t apply_option_at_record_index_;
// Current record index, incremented with each record encountered.
size_t current_record_index_;
public:
TestWalFilter(WalFilter::WalProcessingOption wal_processing_option,
size_t apply_option_for_record_index) :
wal_processing_option_(wal_processing_option),
apply_option_at_record_index_(apply_option_for_record_index),
current_record_index_(0) { }
virtual WalProcessingOption LogRecord(const WriteBatch & batch,
WriteBatch* new_batch, bool* batch_changed) const override {
WalFilter::WalProcessingOption option_to_return;
if (current_record_index_ == apply_option_at_record_index_) {
option_to_return = wal_processing_option_;
}
else {
option_to_return = WalProcessingOption::kContinueProcessing;
}
// Filter is passed as a const object for RocksDB to not modify the
// object, however we modify it for our own purpose here and hence
// cast the constness away.
(const_cast<TestWalFilter*>(this)->current_record_index_)++;
return option_to_return;
}
virtual const char* Name() const override {
return "TestWalFilter";
}
};
// Create 3 batches with two keys each
std::vector<std::vector<std::string>> batch_keys(3);
batch_keys[0].push_back("key1");
batch_keys[0].push_back("key2");
batch_keys[1].push_back("key3");
batch_keys[1].push_back("key4");
batch_keys[2].push_back("key5");
batch_keys[2].push_back("key6");
// Test with all WAL processing options
for (int option = 0;
option < static_cast<int>(WalFilter::WalProcessingOption::kWalProcessingOptionMax);
option++) {
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({ "pikachu" }, options);
// Write given keys in given batches
for (size_t i = 0; i < batch_keys.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys[i].size(); j++) {
batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
}
dbfull()->Write(WriteOptions(), &batch);
}
WalFilter::WalProcessingOption wal_processing_option =
static_cast<WalFilter::WalProcessingOption>(option);
// Create a test filter that would apply wal_processing_option at the first
// record
size_t apply_option_for_record_index = 1;
TestWalFilter test_wal_filter(wal_processing_option,
apply_option_for_record_index);
// Reopen database with option to use WAL filter
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter;
Status status = TryReopenWithColumnFamilies({ "default", "pikachu" },
options);
if (wal_processing_option ==
WalFilter::WalProcessingOption::kCorruptedRecord) {
assert(!status.ok());
// In case of corruption we can turn off paranoid_checks to reopen
// databse
options.paranoid_checks = false;
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
} else {
assert(status.ok());
}
// Compute which keys we expect to be found
// and which we expect not to be found after recovery.
std::vector<Slice> keys_must_exist;
std::vector<Slice> keys_must_not_exist;
switch (wal_processing_option) {
case WalFilter::WalProcessingOption::kCorruptedRecord:
case WalFilter::WalProcessingOption::kContinueProcessing: {
fprintf(stderr, "Testing with complete WAL processing\n");
//we expect all records to be processed
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
keys_must_exist.push_back(Slice(batch_keys[i][j]));
}
}
break;
}
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
fprintf(stderr, "Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
apply_option_for_record_index);
// We expect the record with apply_option_for_record_index to be not
// found.
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
if (i == apply_option_for_record_index) {
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
}
else {
keys_must_exist.push_back(Slice(batch_keys[i][j]));
}
}
}
break;
}
case WalFilter::WalProcessingOption::kStopReplay: {
fprintf(stderr, "Testing with stopping replay from record %" ROCKSDB_PRIszt "\n",
apply_option_for_record_index);
// We expect records beyond apply_option_for_record_index to be not
// found.
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
if (i >= apply_option_for_record_index) {
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
}
else {
keys_must_exist.push_back(Slice(batch_keys[i][j]));
}
}
}
break;
}
default:
assert(false); //unhandled case
}
bool checked_after_reopen = false;
while (true) {
// Ensure that expected keys exists
// and not expected keys don't exist after recovery
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
if (checked_after_reopen) {
break;
}
//reopen database again to make sure previous log(s) are not used
//(even if they were skipped)
//reopn database with option to use WAL filter
options = OptionsForLogIterTest();
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
checked_after_reopen = true;
}
}
}
TEST_F(DBTest, WalFilterTestWithChangeBatch) {
class ChangeBatchHandler : public WriteBatch::Handler {
private:
// Batch to insert keys in
WriteBatch* new_write_batch_;
// Number of keys to add in the new batch
size_t num_keys_to_add_in_new_batch_;
// Number of keys added to new batch
size_t num_keys_added_;
public:
ChangeBatchHandler(WriteBatch* new_write_batch,
size_t num_keys_to_add_in_new_batch) :
new_write_batch_(new_write_batch),
num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
num_keys_added_(0){ }
virtual void Put(const Slice& key, const Slice& value) override {
if (num_keys_added_ < num_keys_to_add_in_new_batch_) {
new_write_batch_->Put(key, value);
++num_keys_added_;
}
}
};
class TestWalFilterWithChangeBatch : public WalFilter {
private:
// Index at which to start changing records
size_t change_records_from_index_;
// Number of keys to add in the new batch
size_t num_keys_to_add_in_new_batch_;
// Current record index, incremented with each record encountered.
size_t current_record_index_;
public:
TestWalFilterWithChangeBatch(
size_t change_records_from_index,
size_t num_keys_to_add_in_new_batch) :
change_records_from_index_(change_records_from_index),
num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
current_record_index_(0) { }
virtual WalProcessingOption LogRecord(const WriteBatch & batch,
WriteBatch* new_batch, bool* batch_changed) const override {
if (current_record_index_ >= change_records_from_index_) {
ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_);
batch.Iterate(&handler);
*batch_changed = true;
}
// Filter is passed as a const object for RocksDB to not modify the
// object, however we modify it for our own purpose here and hence
// cast the constness away.
(const_cast<TestWalFilterWithChangeBatch*>(this)->current_record_index_)++;
return WalProcessingOption::kContinueProcessing;
}
virtual const char* Name() const override {
return "TestWalFilterWithChangeBatch";
}
};
std::vector<std::vector<std::string>> batch_keys(3);
batch_keys[0].push_back("key1");
batch_keys[0].push_back("key2");
batch_keys[1].push_back("key3");
batch_keys[1].push_back("key4");
batch_keys[2].push_back("key5");
batch_keys[2].push_back("key6");
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({ "pikachu" }, options);
// Write given keys in given batches
for (size_t i = 0; i < batch_keys.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys[i].size(); j++) {
batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
}
dbfull()->Write(WriteOptions(), &batch);
}
// Create a test filter that would apply wal_processing_option at the first
// record
size_t change_records_from_index = 1;
size_t num_keys_to_add_in_new_batch = 1;
TestWalFilterWithChangeBatch test_wal_filter_with_change_batch(
change_records_from_index, num_keys_to_add_in_new_batch);
// Reopen database with option to use WAL filter
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter_with_change_batch;
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
// Ensure that all keys exist before change_records_from_index_
// And after that index only single key exists
// as our filter adds only single key for each batch
std::vector<Slice> keys_must_exist;
std::vector<Slice> keys_must_not_exist;
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) {
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
}
else {
keys_must_exist.push_back(Slice(batch_keys[i][j]));
}
}
}
bool checked_after_reopen = false;
while (true) {
// Ensure that expected keys exists
// and not expected keys don't exist after recovery
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
if (checked_after_reopen) {
break;
}
//reopen database again to make sure previous log(s) are not used
//(even if they were skipped)
//reopn database with option to use WAL filter
options = OptionsForLogIterTest();
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
checked_after_reopen = true;
}
}
TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) {
class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter {
public:
virtual WalProcessingOption LogRecord(const WriteBatch & batch,
WriteBatch* new_batch, bool* batch_changed) const override {
*new_batch = batch;
new_batch->Put("key_extra", "value_extra");
*batch_changed = true;
return WalProcessingOption::kContinueProcessing;
}
virtual const char* Name() const override {
return "WalFilterTestWithChangeBatchExtraKeys";
}
};
std::vector<std::vector<std::string>> batch_keys(3);
batch_keys[0].push_back("key1");
batch_keys[0].push_back("key2");
batch_keys[1].push_back("key3");
batch_keys[1].push_back("key4");
batch_keys[2].push_back("key5");
batch_keys[2].push_back("key6");
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({ "pikachu" }, options);
// Write given keys in given batches
for (size_t i = 0; i < batch_keys.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys[i].size(); j++) {
batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
}
dbfull()->Write(WriteOptions(), &batch);
}
// Create a test filter that would add extra keys
TestWalFilterWithChangeBatchAddExtraKeys test_wal_filter_extra_keys;
// Reopen database with option to use WAL filter
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter_extra_keys;
Status status =
TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
ASSERT_TRUE(status.IsNotSupported());
// Reopen without filter, now reopen should succeed - previous
// attempt to open must not have altered the db.
options = OptionsForLogIterTest();
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
std::vector<Slice> keys_must_exist;
std::vector<Slice> keys_must_not_exist; //empty vector
for (size_t i = 0; i < batch_keys.size(); i++) {
for (size_t j = 0; j < batch_keys[i].size(); j++) {
keys_must_exist.push_back(Slice(batch_keys[i][j]));
}
}
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
}
#endif // ROCKSDB_LITE
#ifndef ROCKSDB_LITE
class BloomStatsTestWithParam
: public DBTest,

@ -46,6 +46,7 @@ class Slice;
class SliceTransform;
class Statistics;
class InternalKeyComparator;
class WalFilter;
// DB contents are stored in a set of blocks, each of which holds a
// sequence of key,value pairs. Each block may be compressed before
@ -1140,6 +1141,15 @@ struct DBOptions {
// Default: nullptr (disabled)
// Not supported in ROCKSDB_LITE mode!
std::shared_ptr<Cache> row_cache;
#ifndef ROCKSDB_LITE
// A filter object supplied to be invoked while processing write-ahead-logs
// (WALs) during recovery. The filter provides a way to inspect log
// records, ignoring a particular record or skipping replay.
// The filter is invoked at startup and is invoked from a single-thread
// currently.
const WalFilter* wal_filter;
#endif //ROCKSDB_LITE
};
// Options to control the behavior of a database (passed to DB::Open)

@ -0,0 +1,65 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
namespace rocksdb {
class WriteBatch;
// WALFilter allows an application to inspect write-ahead-log (WAL)
// records or modify their processing on recovery.
// Please see the details below.
class WalFilter {
public:
enum class WalProcessingOption {
// Continue processing as usual
kContinueProcessing = 0,
// Ignore the current record but continue processing of log(s)
kIgnoreCurrentRecord = 1,
// Stop replay of logs and discard logs
// Logs won't be replayed on subsequent recovery
kStopReplay = 2,
// Corrupted record detected by filter
kCorruptedRecord = 3,
// Marker for enum count
kWalProcessingOptionMax = 4
};
virtual ~WalFilter() { };
// LogRecord is invoked for each log record encountered for all the logs
// during replay on logs on recovery. This method can be used to:
// * inspect the record (using the batch parameter)
// * ignoring current record
// (by returning WalProcessingOption::kIgnoreCurrentRecord)
// * reporting corrupted record
// (by returning WalProcessingOption::kCorruptedRecord)
// * stop log replay
// (by returning kStop replay) - please note that this implies
// discarding the logs from current record onwards.
//
// @params batch batch encountered in the log during recovery
// @params new_batch new_batch to populate if filter wants to change
// the batch (for example to filter some records out,
// or alter some records).
// Please note that the new batch MUST NOT contain
// more records than original, else recovery would
// be failed.
// @params batch_changed Whether batch was changed by the filter.
// It must be set to true if new_batch was populated,
// else new_batch has no effect.
// @returns Processing option for the current record.
// Please see WalProcessingOption enum above for
// details.
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
WriteBatch* new_batch, bool* batch_changed) const = 0;
// Returns a name that identifies this WAL filter.
// The name will be printed to LOG file on start up for diagnosis.
virtual const char* Name() const = 0;
};
} // namespace rocksdb

@ -29,6 +29,7 @@
#include "rocksdb/slice_transform.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/wal_filter.h"
#include "table/block_based_table_factory.h"
#include "util/compression.h"
#include "util/statistics.h"
@ -256,7 +257,11 @@ DBOptions::DBOptions()
enable_thread_tracking(false),
delayed_write_rate(1024U * 1024U),
skip_stats_update_on_db_open(false),
wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords) {
wal_recovery_mode(WALRecoveryMode::kTolerateCorruptedTailRecords)
#ifndef ROCKSDB_LITE
, wal_filter(nullptr)
#endif // ROCKSDB_LITE
{
}
DBOptions::DBOptions(const Options& options)
@ -313,7 +318,12 @@ DBOptions::DBOptions(const Options& options)
delayed_write_rate(options.delayed_write_rate),
skip_stats_update_on_db_open(options.skip_stats_update_on_db_open),
wal_recovery_mode(options.wal_recovery_mode),
row_cache(options.row_cache) {}
row_cache(options.row_cache)
#ifndef ROCKSDB_LITE
, wal_filter(options.wal_filter)
#endif // ROCKSDB_LITE
{
}
static const char* const access_hints[] = {
"NONE", "NORMAL", "SEQUENTIAL", "WILLNEED"
@ -413,6 +423,10 @@ void DBOptions::Dump(Logger* log) const {
} else {
Header(log, " Options.row_cache: None");
}
#ifndef ROCKSDB_LITE
Header(log, " Options.wal_filter: %s",
wal_filter ? wal_filter->Name() : "None");
#endif // ROCKDB_LITE
} // DBOptions::Dump
void ColumnFamilyOptions::Dump(Logger* log) const {

Loading…
Cancel
Save