ColumnFamilyOptions API [CF + RepairDB part 3/3]

Summary:
Overload RepairDB to take vector-of-ColumnFamilyDescriptor, which tells
us CF name + options. Also takes a ColumnFamilyOptions for unspecified column
families encountered during the repair.

One potentially confusing thing is that we store options in the constructor and
don't invoke AddColumnFamily() until discovering the CF in ScanTable. This is
because we don't know the CF ID until we find a table belonging to that CF.

Depends on D59781.

Test Plan:
  $ ./repair_test

Reviewers: yhchiang, IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D59853
main
Andrew Kryczka 9 years ago
parent 56ac686292
commit 3b7ed677de
  1. 173
      db/repair.cc
  2. 60
      db/repair_test.cc
  3. 17
      include/rocksdb/db.h

@ -74,15 +74,16 @@
#include "db/memtable.h" #include "db/memtable.h"
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_edit.h" #include "db/version_edit.h"
#include "db/writebuffer.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "db/writebuffer.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/immutable_options.h" #include "rocksdb/immutable_options.h"
#include "rocksdb/options.h"
#include "table/scoped_arena_iterator.h" #include "table/scoped_arena_iterator.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
#include "util/string_util.h"
namespace rocksdb { namespace rocksdb {
@ -90,35 +91,60 @@ namespace {
class Repairer { class Repairer {
public: public:
Repairer(const std::string& dbname, const Options& options) Repairer(const std::string& dbname, const DBOptions& db_options,
const std::vector<ColumnFamilyDescriptor>& column_families,
const ColumnFamilyOptions& default_cf_opts,
const ColumnFamilyOptions& unknown_cf_opts, bool create_unknown_cfs)
: dbname_(dbname), : dbname_(dbname),
env_(options.env), env_(db_options.env),
icmp_(options.comparator),
cf_options_(options),
options_(SanitizeOptions(dbname, &icmp_, options)),
ioptions_(options_),
env_options_(), env_options_(),
db_options_(SanitizeOptions(dbname_, db_options)),
icmp_(default_cf_opts.comparator),
default_cf_opts_(default_cf_opts),
default_cf_iopts_(
ImmutableCFOptions(Options(db_options_, default_cf_opts))),
unknown_cf_opts_(unknown_cf_opts),
create_unknown_cfs_(create_unknown_cfs),
raw_table_cache_( raw_table_cache_(
// TableCache can be small since we expect each table to be opened // TableCache can be small since we expect each table to be opened
// once. // once.
NewLRUCache(10, options_.table_cache_numshardbits)), NewLRUCache(10, db_options_.table_cache_numshardbits)),
table_cache_( table_cache_(new TableCache(default_cf_iopts_, env_options_,
new TableCache(ioptions_, env_options_, raw_table_cache_.get())), raw_table_cache_.get())),
wb_(options_.db_write_buffer_size), wb_(db_options_.db_write_buffer_size),
wc_(options_.delayed_write_rate), wc_(db_options_.delayed_write_rate),
vset_(dbname_, &options_, env_options_, raw_table_cache_.get(), &wb_, vset_(dbname_, &db_options_, env_options_, raw_table_cache_.get(), &wb_,
&wc_), &wc_),
next_file_number_(1) { next_file_number_(1) {
GetIntTblPropCollectorFactory(options, &int_tbl_prop_collector_factories_); for (const auto& cfd : column_families) {
cf_name_to_opts_[cfd.name] = cfd.options;
}
}
const ColumnFamilyOptions* GetColumnFamilyOptions(
const std::string& cf_name) {
if (cf_name_to_opts_.find(cf_name) == cf_name_to_opts_.end()) {
if (create_unknown_cfs_) {
return &unknown_cf_opts_;
}
return nullptr;
}
return &cf_name_to_opts_[cf_name];
} }
// Adds a column family to the VersionSet with cf_options_ and updates // Adds a column family to the VersionSet with cf_options_ and updates
// manifest. // manifest.
Status AddColumnFamily(const std::string& cf_name, uint32_t cf_id) { Status AddColumnFamily(const std::string& cf_name, uint32_t cf_id) {
MutableCFOptions mut_cf_opts(options_, ioptions_); const auto* cf_opts = GetColumnFamilyOptions(cf_name);
if (cf_opts == nullptr) {
return Status::Corruption("Encountered unknown column family with name=" +
cf_name + ", id=" + ToString(cf_id));
}
Options opts(db_options_, *cf_opts);
MutableCFOptions mut_cf_opts(opts, ImmutableCFOptions(opts));
VersionEdit edit; VersionEdit edit;
edit.SetComparatorName(icmp_.user_comparator()->Name()); edit.SetComparatorName(opts.comparator->Name());
edit.SetLogNumber(0); edit.SetLogNumber(0);
edit.SetColumnFamily(cf_id); edit.SetColumnFamily(cf_id);
ColumnFamilyData* cfd; ColumnFamilyData* cfd;
@ -126,9 +152,9 @@ class Repairer {
edit.AddColumnFamily(cf_name); edit.AddColumnFamily(cf_name);
mutex_.Lock(); mutex_.Lock();
Status status = vset_.LogAndApply( Status status = vset_.LogAndApply(cfd, mut_cf_opts, &edit, &mutex_,
cfd, mut_cf_opts, &edit, &mutex_, nullptr /* db_directory */, nullptr /* db_directory */,
false /* new_descriptor_log */, &cf_options_); false /* new_descriptor_log */, cf_opts);
mutex_.Unlock(); mutex_.Unlock();
return status; return status;
} }
@ -145,13 +171,14 @@ class Repairer {
ArchiveFile(dbname_ + "/" + manifests_[i]); ArchiveFile(dbname_ + "/" + manifests_[i]);
} }
// Just create a DBImpl temporarily so we can reuse NewDB() // Just create a DBImpl temporarily so we can reuse NewDB()
DBImpl* db_impl = new DBImpl(options_, dbname_); DBImpl* db_impl = new DBImpl(db_options_, dbname_);
status = db_impl->NewDB(); status = db_impl->NewDB();
delete db_impl; delete db_impl;
} }
if (status.ok()) { if (status.ok()) {
// Recover using the fresh manifest created by NewDB() // Recover using the fresh manifest created by NewDB()
status = vset_.Recover({{kDefaultColumnFamilyName, cf_options_}}, false); status =
vset_.Recover({{kDefaultColumnFamilyName, default_cf_opts_}}, false);
} }
if (status.ok()) { if (status.ok()) {
// Need to scan existing SST files first so the column families are // Need to scan existing SST files first so the column families are
@ -171,7 +198,7 @@ class Repairer {
for (size_t i = 0; i < tables_.size(); i++) { for (size_t i = 0; i < tables_.size(); i++) {
bytes += tables_[i].meta.fd.GetFileSize(); bytes += tables_[i].meta.fd.GetFileSize();
} }
Log(InfoLogLevel::WARN_LEVEL, options_.info_log, Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"**** Repaired rocksdb %s; " "**** Repaired rocksdb %s; "
"recovered %" ROCKSDB_PRIszt " files; %" PRIu64 "recovered %" ROCKSDB_PRIszt " files; %" PRIu64
"bytes. " "bytes. "
@ -193,18 +220,19 @@ class Repairer {
std::string const dbname_; std::string const dbname_;
Env* const env_; Env* const env_;
const InternalKeyComparator icmp_;
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories_;
const ColumnFamilyOptions cf_options_; // unsanitized
const Options options_; // sanitized
const ImmutableCFOptions ioptions_; // sanitized
const EnvOptions env_options_; const EnvOptions env_options_;
const DBOptions db_options_;
const InternalKeyComparator icmp_;
const ColumnFamilyOptions default_cf_opts_;
const ImmutableCFOptions default_cf_iopts_; // table_cache_ holds reference
const ColumnFamilyOptions unknown_cf_opts_;
const bool create_unknown_cfs_;
std::shared_ptr<Cache> raw_table_cache_; std::shared_ptr<Cache> raw_table_cache_;
TableCache* table_cache_; TableCache* table_cache_;
WriteBuffer wb_; WriteBuffer wb_;
WriteController wc_; WriteController wc_;
VersionSet vset_; VersionSet vset_;
std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_opts_;
InstrumentedMutex mutex_; InstrumentedMutex mutex_;
std::vector<std::string> manifests_; std::vector<std::string> manifests_;
@ -216,9 +244,9 @@ class Repairer {
Status FindFiles() { Status FindFiles() {
std::vector<std::string> filenames; std::vector<std::string> filenames;
bool found_file = false; bool found_file = false;
for (size_t path_id = 0; path_id < options_.db_paths.size(); path_id++) { for (size_t path_id = 0; path_id < db_options_.db_paths.size(); path_id++) {
Status status = Status status =
env_->GetChildren(options_.db_paths[path_id].path, &filenames); env_->GetChildren(db_options_.db_paths[path_id].path, &filenames);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
@ -261,7 +289,7 @@ class Repairer {
std::string logname = LogFileName(dbname_, logs_[i]); std::string logname = LogFileName(dbname_, logs_[i]);
Status status = ConvertLogToTable(logs_[i]); Status status = ConvertLogToTable(logs_[i]);
if (!status.ok()) { if (!status.ok()) {
Log(InfoLogLevel::WARN_LEVEL, options_.info_log, Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"Log #%" PRIu64 ": ignoring conversion error: %s", logs_[i], "Log #%" PRIu64 ": ignoring conversion error: %s", logs_[i],
status.ToString().c_str()); status.ToString().c_str());
} }
@ -295,13 +323,13 @@ class Repairer {
// Create the log reader. // Create the log reader.
LogReporter reporter; LogReporter reporter;
reporter.env = env_; reporter.env = env_;
reporter.info_log = options_.info_log; reporter.info_log = db_options_.info_log;
reporter.lognum = log; reporter.lognum = log;
// We intentionally make log::Reader do checksumming so that // We intentionally make log::Reader do checksumming so that
// corruptions cause entire commits to be skipped instead of // corruptions cause entire commits to be skipped instead of
// propagating bad information (like overly large sequence // propagating bad information (like overly large sequence
// numbers). // numbers).
log::Reader reader(options_.info_log, std::move(lfile_reader), &reporter, log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter,
true /*enable checksum*/, 0 /*initial_offset*/, log); true /*enable checksum*/, 0 /*initial_offset*/, log);
// Initialize per-column family memtables // Initialize per-column family memtables
@ -327,9 +355,8 @@ class Repairer {
if (status.ok()) { if (status.ok()) {
counter += WriteBatchInternal::Count(&batch); counter += WriteBatchInternal::Count(&batch);
} else { } else {
Log(InfoLogLevel::WARN_LEVEL, Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
options_.info_log, "Log #%" PRIu64 ": ignoring %s", log, "Log #%" PRIu64 ": ignoring %s", log, status.ToString().c_str());
status.ToString().c_str());
status = Status::OK(); // Keep going with rest of file status = Status::OK(); // Keep going with rest of file
} }
} }
@ -356,7 +383,7 @@ class Repairer {
cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, kNoCompression, cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, kNoCompression,
CompressionOptions(), false, nullptr /* internal_stats */, CompressionOptions(), false, nullptr /* internal_stats */,
TableFileCreationReason::kRecovery); TableFileCreationReason::kRecovery);
Log(InfoLogLevel::INFO_LEVEL, options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log,
counter, meta.fd.GetNumber(), status.ToString().c_str()); counter, meta.fd.GetNumber(), status.ToString().c_str());
if (status.ok()) { if (status.ok()) {
@ -378,13 +405,12 @@ class Repairer {
Status status = ScanTable(&t); Status status = ScanTable(&t);
if (!status.ok()) { if (!status.ok()) {
std::string fname = TableFileName( std::string fname = TableFileName(
options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId()); db_options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId());
char file_num_buf[kFormatFileNumberBufSize]; char file_num_buf[kFormatFileNumberBufSize];
FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId(), FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId(),
file_num_buf, sizeof(file_num_buf)); file_num_buf, sizeof(file_num_buf));
Log(InfoLogLevel::WARN_LEVEL, options_.info_log, Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"Table #%s: ignoring %s", file_num_buf, "Table #%s: ignoring %s", file_num_buf, status.ToString().c_str());
status.ToString().c_str());
ArchiveFile(fname); ArchiveFile(fname);
} else { } else {
tables_.push_back(t); tables_.push_back(t);
@ -393,8 +419,8 @@ class Repairer {
} }
Status ScanTable(TableInfo* t) { Status ScanTable(TableInfo* t) {
std::string fname = TableFileName(options_.db_paths, t->meta.fd.GetNumber(), std::string fname = TableFileName(
t->meta.fd.GetPathId()); db_options_.db_paths, t->meta.fd.GetNumber(), t->meta.fd.GetPathId());
int counter = 0; int counter = 0;
uint64_t file_size; uint64_t file_size;
Status status = env_->GetFileSize(fname, &file_size); Status status = env_->GetFileSize(fname, &file_size);
@ -409,7 +435,7 @@ class Repairer {
t->column_family_id = static_cast<uint32_t>(props->column_family_id); t->column_family_id = static_cast<uint32_t>(props->column_family_id);
if (t->column_family_id == if (t->column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) { TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) {
Log(InfoLogLevel::WARN_LEVEL, options_.info_log, Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"Table #%" PRIu64 "Table #%" PRIu64
": column family unknown (probably due to legacy format); " ": column family unknown (probably due to legacy format); "
"adding to default column family id 0.", "adding to default column family id 0.",
@ -427,7 +453,7 @@ class Repairer {
if (status.ok()) { if (status.ok()) {
cfd = vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id); cfd = vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id);
if (cfd->GetName() != props->column_family_name) { if (cfd->GetName() != props->column_family_name) {
Log(InfoLogLevel::ERROR_LEVEL, options_.info_log, Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Table #%" PRIu64 "Table #%" PRIu64
": inconsistent column family name '%s'; expected '%s' for column " ": inconsistent column family name '%s'; expected '%s' for column "
"family id %" PRIu32 ".", "family id %" PRIu32 ".",
@ -446,7 +472,7 @@ class Repairer {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Slice key = iter->key(); Slice key = iter->key();
if (!ParseInternalKey(key, &parsed)) { if (!ParseInternalKey(key, &parsed)) {
Log(InfoLogLevel::ERROR_LEVEL, options_.info_log, Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"Table #%" PRIu64 ": unparsable key %s", t->meta.fd.GetNumber(), "Table #%" PRIu64 ": unparsable key %s", t->meta.fd.GetNumber(),
EscapeString(key).c_str()); EscapeString(key).c_str());
continue; continue;
@ -470,7 +496,7 @@ class Repairer {
} }
delete iter; delete iter;
Log(InfoLogLevel::INFO_LEVEL, options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Table #%" PRIu64 ": %d entries %s", t->meta.fd.GetNumber(), counter, "Table #%" PRIu64 ": %d entries %s", t->meta.fd.GetNumber(), counter,
status.ToString().c_str()); status.ToString().c_str());
} }
@ -532,15 +558,60 @@ class Repairer {
new_file.append("/"); new_file.append("/");
new_file.append((slash == nullptr) ? fname.c_str() : slash + 1); new_file.append((slash == nullptr) ? fname.c_str() : slash + 1);
Status s = env_->RenameFile(fname, new_file); Status s = env_->RenameFile(fname, new_file);
Log(InfoLogLevel::INFO_LEVEL, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Archiving %s: %s\n",
options_.info_log, "Archiving %s: %s\n",
fname.c_str(), s.ToString().c_str()); fname.c_str(), s.ToString().c_str());
} }
}; };
Status GetDefaultCFOptions(
const std::vector<ColumnFamilyDescriptor>& column_families,
ColumnFamilyOptions* res) {
assert(res != nullptr);
auto iter = std::find_if(column_families.begin(), column_families.end(),
[](const ColumnFamilyDescriptor& cfd) {
return cfd.name == kDefaultColumnFamilyName;
});
if (iter == column_families.end()) {
return Status::InvalidArgument(
"column_families", "Must contain entry for default column family");
}
*res = iter->options;
return Status::OK();
}
} // anonymous namespace } // anonymous namespace
Status RepairDB(const std::string& dbname, const DBOptions& db_options,
const std::vector<ColumnFamilyDescriptor>& column_families) {
ColumnFamilyOptions default_cf_opts;
Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
if (status.ok()) {
Repairer repairer(dbname, db_options, column_families, default_cf_opts,
ColumnFamilyOptions() /* unknown_cf_opts */,
false /* create_unknown_cfs */);
status = repairer.Run();
}
return status;
}
Status RepairDB(const std::string& dbname, const DBOptions& db_options,
const std::vector<ColumnFamilyDescriptor>& column_families,
const ColumnFamilyOptions& unknown_cf_opts) {
ColumnFamilyOptions default_cf_opts;
Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
if (status.ok()) {
Repairer repairer(dbname, db_options, column_families, default_cf_opts,
unknown_cf_opts, true /* create_unknown_cfs */);
status = repairer.Run();
}
return status;
}
Status RepairDB(const std::string& dbname, const Options& options) { Status RepairDB(const std::string& dbname, const Options& options) {
Repairer repairer(dbname, options); DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
Repairer repairer(dbname, db_options, {}, cf_options /* default_cf_opts */,
cf_options /* unknown_cf_opts */,
true /* create_unknown_cfs */);
return repairer.Run(); return repairer.Run();
} }

@ -9,6 +9,7 @@
#include "db/db_impl.h" #include "db/db_impl.h"
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "util/file_util.h" #include "util/file_util.h"
@ -208,6 +209,65 @@ TEST_F(RepairTest, RepairMultipleColumnFamilies) {
} }
} }
TEST_F(RepairTest, RepairColumnFamilyOptions) {
// Verify repair logic uses correct ColumnFamilyOptions when repairing a
// database with different options for column families.
const int kNumCfs = 2;
const int kEntriesPerCf = 2;
Options opts(CurrentOptions()), rev_opts(CurrentOptions());
opts.comparator = BytewiseComparator();
rev_opts.comparator = ReverseBytewiseComparator();
DestroyAndReopen(opts);
CreateColumnFamilies({"reverse"}, rev_opts);
ReopenWithColumnFamilies({"default", "reverse"},
std::vector<Options>{opts, rev_opts});
for (int i = 0; i < kNumCfs; ++i) {
for (int j = 0; j < kEntriesPerCf; ++j) {
Put(i, "key" + ToString(j), "val" + ToString(j));
if (i == kNumCfs - 1 && j == kEntriesPerCf - 1) {
// Leave one unflushed so we can verify RepairDB's flush logic
continue;
}
Flush(i);
}
}
Close();
// RepairDB() records the comparator in the manifest, and DB::Open would fail
// if a different comparator were used.
ASSERT_OK(RepairDB(dbname_, opts, {{"default", opts}, {"reverse", rev_opts}},
opts /* unknown_cf_opts */));
ASSERT_OK(TryReopenWithColumnFamilies({"default", "reverse"},
std::vector<Options>{opts, rev_opts}));
for (int i = 0; i < kNumCfs; ++i) {
for (int j = 0; j < kEntriesPerCf; ++j) {
ASSERT_EQ(Get(i, "key" + ToString(j)), "val" + ToString(j));
}
}
// Examine table properties to verify RepairDB() used the right options when
// converting WAL->SST
TablePropertiesCollection fname_to_props;
db_->GetPropertiesOfAllTables(handles_[1], &fname_to_props);
ASSERT_EQ(fname_to_props.size(), 2U);
for (const auto& fname_and_props : fname_to_props) {
ASSERT_EQ(InternalKeyComparator(rev_opts.comparator).Name(),
fname_and_props.second->comparator_name);
}
// Also check comparator when it's provided via "unknown" CF options
ASSERT_OK(RepairDB(dbname_, opts, {{"default", opts}},
rev_opts /* unknown_cf_opts */));
ASSERT_OK(TryReopenWithColumnFamilies({"default", "reverse"},
std::vector<Options>{opts, rev_opts}));
for (int i = 0; i < kNumCfs; ++i) {
for (int j = 0; j < kEntriesPerCf; ++j) {
ASSERT_EQ(Get(i, "key" + ToString(j)), "val" + ToString(j));
}
}
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -859,7 +859,24 @@ Status DestroyDB(const std::string& name, const Options& options);
// resurrect as much of the contents of the database as possible. // resurrect as much of the contents of the database as possible.
// Some data may be lost, so be careful when calling this function // Some data may be lost, so be careful when calling this function
// on a database that contains important information. // on a database that contains important information.
//
// With this API, we will warn and skip data associated with column families not
// specified in column_families.
//
// @param column_families Descriptors for known column families
Status RepairDB(const std::string& dbname, const DBOptions& db_options,
const std::vector<ColumnFamilyDescriptor>& column_families);
// @param unknown_cf_opts Options for column families encountered during the
// repair that were not specified in column_families.
Status RepairDB(const std::string& dbname, const DBOptions& db_options,
const std::vector<ColumnFamilyDescriptor>& column_families,
const ColumnFamilyOptions& unknown_cf_opts);
// @param options These options will be used for the database and for ALL column
// families encountered during the repair
Status RepairDB(const std::string& dbname, const Options& options); Status RepairDB(const std::string& dbname, const Options& options);
#endif #endif
} // namespace rocksdb } // namespace rocksdb

Loading…
Cancel
Save