Publish log numbers for column family to wal_filter, and provide log number in the record callback

main
Praveen Rao 8 years ago
parent d9bca1e14c
commit f8c2189307
  1. 21
      db/db_impl.cc
  2. 166
      db/db_test.cc
  3. 2
      include/rocksdb/options.h
  4. 32
      include/rocksdb/wal_filter.h

@ -1087,6 +1087,23 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
stream.EndArray();
}
#ifndef ROCKSDB_LITE
if (db_options_.wal_filter != nullptr) {
std::map<std::string, uint32_t> cf_name_id_map;
std::map<uint32_t, uint64_t> cf_lognumber_map;
for (auto cfd : *versions_->GetColumnFamilySet()) {
cf_name_id_map.insert(
std::make_pair(cfd->GetName(), cfd->GetID()));
cf_lognumber_map.insert(
std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
}
db_options_.wal_filter->ColumnFamilyLogNumberMap(
cf_lognumber_map,
cf_name_id_map);
}
#endif
bool continue_replay_log = true;
for (auto log_number : log_numbers) {
// The previous incarnation may not have written any MANIFEST
@ -1166,8 +1183,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
bool batch_changed = false;
WalFilter::WalProcessingOption wal_processing_option =
db_options_.wal_filter->LogRecord(batch, &new_batch,
&batch_changed);
db_options_.wal_filter->LogRecord(log_number, fname, batch,
&new_batch, &batch_changed);
switch (wal_processing_option) {
case WalFilter::WalProcessingOption::kContinueProcessing:

@ -11055,6 +11055,172 @@ TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) {
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
}
TEST_F(DBTest, WalFilterTestWithColumnFamilies) {
class TestWalFilterWithColumnFamilies : public WalFilter {
private:
// column_family_id -> log_number map (provided to WALFilter)
std::map<uint32_t, uint64_t> cf_log_number_map_;
// column_family_name -> column_family_id map (provided to WALFilter)
std::map<std::string, uint32_t> cf_name_id_map_;
// column_family_name -> keys_found_in_wal map
// We store keys that are applicable to the column_family
// during recovery (i.e. aren't already flushed to SST file(s))
// for verification against the keys we expect.
std::map<uint32_t, std::vector<std::string>> cf_wal_keys_;
public:
virtual void ColumnFamilyLogNumberMap(
const std::map<uint32_t, uint64_t>& cf_lognumber_map,
const std::map<std::string, uint32_t>& cf_name_id_map) override {
cf_log_number_map_ = cf_lognumber_map;
cf_name_id_map_ = cf_name_id_map;
}
virtual WalProcessingOption LogRecord(unsigned long long log_number,
const std::string& log_file_name,
const WriteBatch& batch,
WriteBatch* new_batch,
bool* batch_changed) override {
class LogRecordBatchHandler : public WriteBatch::Handler {
private:
std::map<uint32_t, std::vector<std::string>> & cf_wal_keys_;
const std::map<uint32_t, uint64_t> & cf_log_number_map_;
unsigned long long log_number_;
public:
LogRecordBatchHandler(unsigned long long log_number,
const std::map<uint32_t, uint64_t> & cf_log_number_map,
std::map<uint32_t, std::vector<std::string>> & cf_wal_keys) :
cf_log_number_map_(cf_log_number_map),
cf_wal_keys_(cf_wal_keys),
log_number_(log_number){}
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& /*value*/) override {
auto it = cf_log_number_map_.find(column_family_id);
assert(it != cf_log_number_map_.end());
unsigned long long log_number_for_cf = it->second;
// If the current record is applicable for column_family_id
// (i.e. isn't flushed to SST file(s) for column_family_id)
// add it to the cf_wal_keys_ map for verification.
if (log_number_ >= log_number_for_cf) {
cf_wal_keys_[column_family_id].push_back(std::string(key.data(), key.size()));
}
return Status::OK();
}
} handler(log_number, cf_log_number_map_, cf_wal_keys_);
batch.Iterate(&handler);
return WalProcessingOption::kContinueProcessing;
}
virtual const char* Name() const override {
return "WalFilterTestWithColumnFamilies";
}
const std::map<uint32_t, std::vector<std::string>> & GetColumnFamilyKeys() {
return cf_wal_keys_;
}
const std::map<std::string, uint32_t> & GetColumnFamilyNameIdMap() {
return cf_name_id_map_;
}
};
std::vector<std::vector<std::string>> batch_keys_pre_flush(3);
batch_keys_pre_flush[0].push_back("key1");
batch_keys_pre_flush[0].push_back("key2");
batch_keys_pre_flush[1].push_back("key3");
batch_keys_pre_flush[1].push_back("key4");
batch_keys_pre_flush[2].push_back("key5");
batch_keys_pre_flush[2].push_back("key6");
Options options = OptionsForLogIterTest();
DestroyAndReopen(options);
CreateAndReopenWithCF({ "pikachu" }, options);
// Write given keys in given batches
for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
batch.Put(handles_[0], batch_keys_pre_flush[i][j], DummyString(1024));
batch.Put(handles_[1], batch_keys_pre_flush[i][j], DummyString(1024));
}
dbfull()->Write(WriteOptions(), &batch);
}
//Flush default column-family
db_->Flush(FlushOptions(), handles_[0]);
// Do some more writes
std::vector<std::vector<std::string>> batch_keys_post_flush(3);
batch_keys_post_flush[0].push_back("key7");
batch_keys_post_flush[0].push_back("key8");
batch_keys_post_flush[1].push_back("key9");
batch_keys_post_flush[1].push_back("key10");
batch_keys_post_flush[2].push_back("key11");
batch_keys_post_flush[2].push_back("key12");
// Write given keys in given batches
for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
WriteBatch batch;
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
batch.Put(handles_[0], batch_keys_post_flush[i][j], DummyString(1024));
batch.Put(handles_[1], batch_keys_post_flush[i][j], DummyString(1024));
}
dbfull()->Write(WriteOptions(), &batch);
}
// On Recovery we should only find the second batch applicable to default CF
// But both batches applicable to pikachu CF
// Create a test filter that would add extra keys
TestWalFilterWithColumnFamilies test_wal_filter_column_families;
// Reopen database with option to use WAL filter
options = OptionsForLogIterTest();
options.wal_filter = &test_wal_filter_column_families;
Status status =
TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
ASSERT_TRUE(status.ok());
// verify that handles_[0] only has post_flush keys
// while handles_[1] has pre and post flush keys
auto cf_wal_keys = test_wal_filter_column_families.GetColumnFamilyKeys();
auto name_id_map = test_wal_filter_column_families.GetColumnFamilyNameIdMap();
size_t index = 0;
auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]];
//default column-family, only post_flush keys are expected
for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
Slice key_from_the_log(keys_cf[index++]);
Slice batch_key(batch_keys_post_flush[i][j]);
ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
}
}
ASSERT_TRUE(index == keys_cf.size());
index = 0;
keys_cf = cf_wal_keys[name_id_map["pikachu"]];
//pikachu column-family, all keys are expected
for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
Slice key_from_the_log(keys_cf[index++]);
Slice batch_key(batch_keys_pre_flush[i][j]);
ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
}
}
for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
Slice key_from_the_log(keys_cf[index++]);
Slice batch_key(batch_keys_post_flush[i][j]);
ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
}
}
ASSERT_TRUE(index == keys_cf.size());
}
#endif // ROCKSDB_LITE
#ifndef ROCKSDB_LITE

@ -1237,7 +1237,7 @@ struct DBOptions {
// records, ignoring a particular record or skipping replay.
// The filter is invoked at startup and is invoked from a single-thread
// currently.
const WalFilter* wal_filter;
WalFilter* wal_filter;
#endif // ROCKSDB_LITE
// If true, then DB::Open / CreateColumnFamily / DropColumnFamily

@ -4,6 +4,8 @@
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <string>
#include <map>
namespace rocksdb {
@ -30,6 +32,20 @@ class WalFilter {
virtual ~WalFilter() {}
// Provide ColumnFamily->LogNumber map to filter
// so that filter can determine whether given record
// is applicable to given column family.
// We also pass in name->id map as this is known at
// recovery time and write batch callbacks happen
// in terms of column family id.
//
// @params cf_lognumber_map column_family_id to lognumber map
// @params cf_name_id_map column_family_name to column_family_id map
virtual void ColumnFamilyLogNumberMap(
const std::map<uint32_t, uint64_t>& cf_lognumber_map,
const std::map<std::string, uint32_t>& cf_name_id_map) {}
// LogRecord is invoked for each log record encountered for all the logs
// during replay on logs on recovery. This method can be used to:
// * inspect the record (using the batch parameter)
@ -41,6 +57,10 @@ class WalFilter {
// (by returning kStop replay) - please note that this implies
// discarding the logs from current record onwards.
//
// @params log_number log_number of the current log.
// Filter might use this to determine if the log record
// is applicable to a certain column family.
// @params log_file_name log file name - only for informational purposes
// @params batch batch encountered in the log during recovery
// @params new_batch new_batch to populate if filter wants to change
// the batch (for example to filter some records out,
@ -54,9 +74,19 @@ class WalFilter {
// @returns Processing option for the current record.
// Please see WalProcessingOption enum above for
// details.
virtual WalProcessingOption LogRecord(unsigned long long log_number,
const std::string& log_file_name,
const WriteBatch& batch,
WriteBatch* new_batch,
bool* batch_changed) {
return LogRecord(batch, new_batch, batch_changed);
}
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
WriteBatch* new_batch,
bool* batch_changed) const = 0;
bool* batch_changed) const {
return WalProcessingOption::kContinueProcessing;
}
// Returns a name that identifies this WAL filter.
// The name will be printed to LOG file on start up for diagnosis.

Loading…
Cancel
Save