Refactor with VersionEditHandler (#6581)

Summary:
Added a few classes in the same class hierarchy to remove code duplication and
refactor the logic of reading and processing MANIFEST files.

New classes are as follows.
```
class VersionEditHandlerBase;
class ListColumnFamiliesHandler : VersionEditHandlerBase;
class FileChecksumRetriever : VersionEditHandlerBase;
class DumpManifestHandler : VersionEditHandler;
```
Classes that already existed before this PR are as follows.
```
class VersionEditHandler : VersionEditHandlerBase;
```

With these classes, refactored functions: `VersionSet::Recover()`,
`VersionSet::ListColumnFamilies()`, `VersionSet::DumpManifest()`,
`GetFileChecksumFromManifest()`.

Test Plan (devserver):
```
make check
COMPILE_WITH_ASAN=1 make check
```
These refactored code, especially recovery-related logic, will be tested intensively by
all existing unit tests and stress tests. For example, run
```
make crash_test
```
Verified 3 successful runs on devserver.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/6581

Reviewed By: ajkr

Differential Revision: D20616217

Pulled By: riversand963

fbshipit-source-id: 048c7743aa4be2623ccd0cc3e61c0027e604e78b
main
Yanqin Jin 4 years ago committed by Facebook GitHub Bot
parent c57f914482
commit 8b6b6aeb1a
  1. 5
      db/corruption_test.cc
  2. 3
      db/version_edit.h
  3. 138
      db/version_edit_handler.cc
  4. 154
      db/version_edit_handler.h
  5. 377
      db/version_set.cc
  6. 1
      db/version_set.h
  7. 4
      db/version_set_test.cc
  8. 32
      util/file_checksum_helper.cc

@ -32,6 +32,7 @@
#include "table/mock_table.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/cast_util.h"
#include "util/random.h"
#include "util/string_util.h"
@ -552,7 +553,7 @@ TEST_F(CorruptionTest, FileSystemStateCorrupted) {
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
std::vector<LiveFileMetaData> metadata;
dbi->GetLiveFilesMetaData(&metadata);
ASSERT_GT(metadata.size(), size_t(0));
ASSERT_GT(metadata.size(), 0);
std::string filename = dbname_ + metadata[0].name;
delete db_;
@ -568,7 +569,7 @@ TEST_F(CorruptionTest, FileSystemStateCorrupted) {
} else { // delete the file
ASSERT_OK(env_.DeleteFile(filename));
Status x = TryReopen(&options);
ASSERT_TRUE(x.IsPathNotFound());
ASSERT_TRUE(x.IsCorruption());
}
ASSERT_OK(DestroyDB(dbname_, options_));

@ -533,8 +533,11 @@ class VersionEdit {
private:
friend class ReactiveVersionSet;
friend class VersionEditHandlerBase;
friend class ListColumnFamiliesHandler;
friend class VersionEditHandler;
friend class VersionEditHandlerPointInTime;
friend class DumpManifestHandler;
friend class VersionSet;
friend class Version;
friend class AtomicGroupReadBuffer;

@ -9,28 +9,14 @@
#include "db/version_edit_handler.h"
#include <cinttypes>
#include "monitoring/persistent_stats_history.h"
namespace ROCKSDB_NAMESPACE {
VersionEditHandler::VersionEditHandler(
bool read_only, const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set, bool track_missing_files,
bool no_error_if_table_files_missing,
const std::shared_ptr<IOTracer>& io_tracer)
: read_only_(read_only),
column_families_(column_families),
status_(),
version_set_(version_set),
track_missing_files_(track_missing_files),
no_error_if_table_files_missing_(no_error_if_table_files_missing),
initialized_(false),
io_tracer_(io_tracer) {
assert(version_set_ != nullptr);
}
void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status,
std::string* db_id) {
void VersionEditHandlerBase::Iterate(log::Reader& reader,
Status* log_read_status) {
Slice record;
std::string scratch;
assert(log_read_status);
@ -38,19 +24,14 @@ void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status,
size_t recovered_edits = 0;
Status s = Initialize();
while (s.ok() && reader.ReadRecord(&record, &scratch) &&
log_read_status->ok()) {
while (reader.LastRecordEnd() < max_manifest_read_size_ && s.ok() &&
reader.ReadRecord(&record, &scratch) && log_read_status->ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
break;
}
if (edit.has_db_id_) {
version_set_->db_id_ = edit.GetDbId();
if (db_id != nullptr) {
*db_id = version_set_->db_id_;
}
}
s = read_buffer_.AddEdit(&edit);
if (!s.ok()) {
break;
@ -81,11 +62,67 @@ void VersionEditHandler::Iterate(log::Reader& reader, Status* log_read_status,
s = *log_read_status;
}
read_buffer_.Clear();
CheckIterationResult(reader, &s);
if (!s.ok()) {
status_ = s;
}
TEST_SYNC_POINT_CALLBACK("VersionEditHandlerBase::Iterate:Finish",
&recovered_edits);
}
Status ListColumnFamiliesHandler::ApplyVersionEdit(
VersionEdit& edit, ColumnFamilyData** /*unused*/) {
Status s;
if (edit.is_column_family_add_) {
if (column_family_names_.find(edit.column_family_) !=
column_family_names_.end()) {
s = Status::Corruption("Manifest adding the same column family twice");
} else {
column_family_names_.insert(
{edit.column_family_, edit.column_family_name_});
}
} else if (edit.is_column_family_drop_) {
if (column_family_names_.find(edit.column_family_) ==
column_family_names_.end()) {
s = Status::Corruption("Manifest - dropping non-existing column family");
} else {
column_family_names_.erase(edit.column_family_);
}
}
return s;
}
Status FileChecksumRetriever::ApplyVersionEdit(VersionEdit& edit,
ColumnFamilyData** /*unused*/) {
for (const auto& deleted_file : edit.GetDeletedFiles()) {
file_checksum_list_.RemoveOneFileChecksum(deleted_file.second);
}
for (const auto& new_file : edit.GetNewFiles()) {
file_checksum_list_.InsertOneFileChecksum(
new_file.second.fd.GetNumber(), new_file.second.file_checksum,
new_file.second.file_checksum_func_name);
}
return Status::OK();
}
VersionEditHandler::VersionEditHandler(
bool read_only, const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set, bool track_missing_files,
bool no_error_if_table_files_missing,
const std::shared_ptr<IOTracer>& io_tracer, bool skip_load_table_files)
: VersionEditHandlerBase(),
read_only_(read_only),
column_families_(column_families),
version_set_(version_set),
track_missing_files_(track_missing_files),
no_error_if_table_files_missing_(no_error_if_table_files_missing),
io_tracer_(io_tracer),
skip_load_table_files_(skip_load_table_files),
initialized_(false) {
assert(version_set_ != nullptr);
}
Status VersionEditHandler::Initialize() {
@ -274,7 +311,7 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
Status* s) {
assert(s != nullptr);
if (!s->ok()) {
read_buffer_.Clear();
// Do nothing here.
} else if (!version_edit_params_.has_log_number_ ||
!version_edit_params_.has_next_file_number_ ||
!version_edit_params_.has_last_sequence_) {
@ -292,6 +329,8 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
msg.append(" entry in MANIFEST");
*s = Status::Corruption(msg);
}
// There were some column families in the MANIFEST that weren't specified
// in the argument. This is OK in read_only mode
if (s->ok() && !read_only_ && !column_families_not_found_.empty()) {
std::string msg;
for (const auto& cf : column_families_not_found_) {
@ -330,6 +369,10 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
*s = LoadTables(cfd, /*prefetch_index_and_filter_in_cache=*/false,
/*is_initial_load=*/true);
if (!s->ok()) {
// If s is IOError::PathNotFound, then we mark the db as corrupted.
if (s->IsPathNotFound()) {
*s = Status::Corruption("Corruption: " + s->ToString());
}
break;
}
}
@ -426,6 +469,9 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load) {
if (skip_load_table_files_) {
return Status::OK();
}
assert(cfd != nullptr);
assert(!cfd->IsDropped());
auto builder_iter = builders_.find(cfd->GetID());
@ -452,10 +498,11 @@ Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
const VersionEdit& edit) {
Status s;
if (edit.has_db_id_) {
version_set_->db_id_ = edit.GetDbId();
version_edit_params_.SetDBId(edit.db_id_);
}
if (cfd != nullptr) {
if (edit.has_db_id_) {
version_edit_params_.SetDBId(edit.db_id_);
}
if (edit.has_log_number_) {
if (cfd->GetLogNumber() > edit.log_number_) {
ROCKS_LOG_WARN(
@ -505,8 +552,7 @@ VersionEditHandlerPointInTime::VersionEditHandlerPointInTime(
VersionSet* version_set, const std::shared_ptr<IOTracer>& io_tracer)
: VersionEditHandler(read_only, column_families, version_set,
/*track_missing_files=*/true,
/*no_error_if_table_files_missing=*/true, io_tracer),
io_tracer_(io_tracer) {}
/*no_error_if_table_files_missing=*/true, io_tracer) {}
VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() {
for (const auto& elem : versions_) {
@ -612,4 +658,32 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
return s;
}
void DumpManifestHandler::CheckIterationResult(const log::Reader& reader,
Status* s) {
VersionEditHandler::CheckIterationResult(reader, s);
if (!s->ok()) {
fprintf(stdout, "%s\n", s->ToString().c_str());
return;
}
for (auto* cfd : *(version_set_->column_family_set_)) {
fprintf(stdout,
"--------------- Column family \"%s\" (ID %" PRIu32
") --------------\n",
cfd->GetName().c_str(), cfd->GetID());
fprintf(stdout, "log number: %" PRIu64 "\n", cfd->GetLogNumber());
fprintf(stdout, "comparator: %s\n", cfd->user_comparator()->Name());
assert(cfd->current());
fprintf(stdout, "%s \n", cfd->current()->DebugString(hex_).c_str());
}
fprintf(stdout,
"next_file_number %" PRIu64 " last_sequence %" PRIu64
" prev_log_number %" PRIu64 " max_column_family %" PRIu32
" min_log_number_to_keep "
"%" PRIu64 "\n",
version_set_->current_next_file_number(),
version_set_->LastSequence(), version_set_->prev_log_number(),
version_set_->column_family_set_->GetMaxColumnFamily(),
version_set_->min_log_number_to_keep_2pc());
}
} // namespace ROCKSDB_NAMESPACE

@ -15,7 +15,73 @@
namespace ROCKSDB_NAMESPACE {
typedef std::unique_ptr<BaseReferencedVersionBuilder> VersionBuilderUPtr;
class VersionEditHandlerBase {
public:
explicit VersionEditHandlerBase()
: max_manifest_read_size_(std::numeric_limits<uint64_t>::max()) {}
virtual ~VersionEditHandlerBase() {}
void Iterate(log::Reader& reader, Status* log_read_status);
const Status& status() const { return status_; }
protected:
explicit VersionEditHandlerBase(uint64_t max_read_size)
: max_manifest_read_size_(max_read_size) {}
virtual Status Initialize() { return Status::OK(); }
virtual Status ApplyVersionEdit(VersionEdit& edit,
ColumnFamilyData** cfd) = 0;
virtual void CheckIterationResult(const log::Reader& /*reader*/,
Status* /*s*/) {}
Status status_;
private:
AtomicGroupReadBuffer read_buffer_;
const uint64_t max_manifest_read_size_;
};
class ListColumnFamiliesHandler : public VersionEditHandlerBase {
public:
ListColumnFamiliesHandler() : VersionEditHandlerBase() {}
~ListColumnFamiliesHandler() override {}
const std::map<uint32_t, std::string> GetColumnFamilyNames() const {
return column_family_names_;
}
protected:
Status ApplyVersionEdit(VersionEdit& edit,
ColumnFamilyData** /*unused*/) override;
private:
// default column family is always implicitly there
std::map<uint32_t, std::string> column_family_names_{
{0, kDefaultColumnFamilyName}};
};
class FileChecksumRetriever : public VersionEditHandlerBase {
public:
FileChecksumRetriever(uint64_t max_read_size,
FileChecksumList& file_checksum_list)
: VersionEditHandlerBase(max_read_size),
file_checksum_list_(file_checksum_list) {}
~FileChecksumRetriever() override {}
protected:
Status ApplyVersionEdit(VersionEdit& edit,
ColumnFamilyData** /*unused*/) override;
private:
FileChecksumList& file_checksum_list_;
};
using VersionBuilderUPtr = std::unique_ptr<BaseReferencedVersionBuilder>;
// A class used for scanning MANIFEST file.
// VersionEditHandler reads a MANIFEST file, parses the version edits, and
@ -24,31 +90,48 @@ typedef std::unique_ptr<BaseReferencedVersionBuilder> VersionBuilderUPtr;
// To use this class and its subclasses,
// 1. Create an object of VersionEditHandler or its subclasses.
// VersionEditHandler handler(read_only, column_families, version_set,
// track_missing_files, ignore_missing_files);
// track_missing_files,
// no_error_if_table_files_missing);
// 2. Status s = handler.Iterate(reader, &db_id);
// 3. Check s and handle possible errors.
//
// Not thread-safe, external synchronization is necessary if an object of
// VersionEditHandler is shared by multiple threads.
class VersionEditHandler {
class VersionEditHandler : public VersionEditHandlerBase {
public:
explicit VersionEditHandler(
bool read_only,
const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set, bool track_missing_files,
bool ignore_missing_files, const std::shared_ptr<IOTracer>& io_tracer);
virtual ~VersionEditHandler() {}
bool no_error_if_table_files_missing,
const std::shared_ptr<IOTracer>& io_tracer)
: VersionEditHandler(read_only, column_families, version_set,
track_missing_files, no_error_if_table_files_missing,
io_tracer, /*skip_load_table_files=*/false) {}
void Iterate(log::Reader& reader, Status* log_read_status,
std::string* db_id);
~VersionEditHandler() override {}
const Status& status() const { return status_; }
const VersionEditParams& GetVersionEditParams() const {
return version_edit_params_;
}
bool HasMissingFiles() const;
void GetDbId(std::string* db_id) const {
if (db_id && version_edit_params_.has_db_id_) {
*db_id = version_edit_params_.db_id_;
}
}
protected:
Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd);
explicit VersionEditHandler(
bool read_only,
const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set, bool track_missing_files,
bool no_error_if_table_files_missing,
const std::shared_ptr<IOTracer>& io_tracer, bool skip_load_table_files);
Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd) override;
Status OnColumnFamilyAdd(VersionEdit& edit, ColumnFamilyData** cfd);
@ -60,12 +143,12 @@ class VersionEditHandler {
Status OnWalDeletion(VersionEdit& edit);
Status Initialize();
Status Initialize() override;
void CheckColumnFamilyId(const VersionEdit& edit, bool* cf_in_not_found,
bool* cf_in_builders) const;
virtual void CheckIterationResult(const log::Reader& reader, Status* s);
void CheckIterationResult(const log::Reader& reader, Status* s) override;
ColumnFamilyData* CreateCfAndInit(const ColumnFamilyOptions& cf_options,
const VersionEdit& edit);
@ -82,24 +165,26 @@ class VersionEditHandler {
const bool read_only_;
const std::vector<ColumnFamilyDescriptor>& column_families_;
Status status_;
VersionSet* version_set_;
AtomicGroupReadBuffer read_buffer_;
std::unordered_map<uint32_t, VersionBuilderUPtr> builders_;
std::unordered_map<std::string, ColumnFamilyOptions> name_to_options_;
// Keeps track of column families in manifest that were not found in
// column families parameters. if those column families are not dropped
// by subsequent manifest records, Recover() will return failure status.
std::unordered_map<uint32_t, std::string> column_families_not_found_;
VersionEditParams version_edit_params_;
const bool track_missing_files_;
std::unordered_map<uint32_t, std::unordered_set<uint64_t>>
cf_to_missing_files_;
bool no_error_if_table_files_missing_;
std::shared_ptr<IOTracer> io_tracer_;
bool skip_load_table_files_;
private:
Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
const VersionEdit& edit);
bool initialized_;
std::shared_ptr<IOTracer> io_tracer_;
};
// A class similar to its base class, i.e. VersionEditHandler.
@ -124,7 +209,44 @@ class VersionEditHandlerPointInTime : public VersionEditHandler {
private:
std::unordered_map<uint32_t, Version*> versions_;
std::shared_ptr<IOTracer> io_tracer_;
};
class DumpManifestHandler : public VersionEditHandler {
public:
DumpManifestHandler(
const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set, const std::shared_ptr<IOTracer>& io_tracer,
bool verbose, bool hex, bool json)
: VersionEditHandler(
/*read_only=*/true, column_families, version_set,
/*track_missing_files=*/false,
/*no_error_if_table_files_missing=*/false, io_tracer,
/*skip_load_table_files=*/true),
verbose_(verbose),
hex_(hex),
json_(json),
count_(0) {}
~DumpManifestHandler() override {}
Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd) override {
// Write out each individual edit
if (verbose_ && !json_) {
fprintf(stdout, "%s\n", edit.DebugString(hex_).c_str());
} else if (json_) {
fprintf(stdout, "%s\n", edit.DebugJSON(count_, hex_).c_str());
}
++count_;
return VersionEditHandler::ApplyVersionEdit(edit, cfd);
}
void CheckIterationResult(const log::Reader& reader, Status* s) override;
private:
const bool verbose_;
const bool hex_;
const bool json_;
int count_;
};
} // namespace ROCKSDB_NAMESPACE

@ -4713,15 +4713,6 @@ Status VersionSet::ReadAndRecover(
Status VersionSet::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
std::string* db_id) {
std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
for (const auto& cf : column_families) {
cf_name_to_options.emplace(cf.name, cf.options);
}
// keeps track of column families in manifest that were not found in
// column families parameters. if those column families are not dropped
// by subsequent manifest records, Recover() will return failure status
std::unordered_map<int, std::string> column_families_not_found;
// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string manifest_path;
Status s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
@ -4746,139 +4737,30 @@ Status VersionSet::Recover(
new SequentialFileReader(std::move(manifest_file), manifest_path,
db_options_->log_readahead_size, io_tracer_));
}
VersionBuilderMap builders;
// add default column family
auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
if (default_cf_iter == cf_name_to_options.end()) {
return Status::InvalidArgument("Default column family not specified");
}
VersionEdit default_cf_edit;
default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* default_cfd =
CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
// In recovery, nobody else can access it, so it's fine to set it to be
// initialized earlier.
default_cfd->set_initialized();
builders.insert(
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
new BaseReferencedVersionBuilder(default_cfd))));
uint64_t current_manifest_file_size = 0;
VersionEditParams version_edit_params;
uint64_t log_number = 0;
{
VersionSet::LogReporter reporter;
Status log_read_status;
reporter.status = &log_read_status;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
true /* checksum */, 0 /* log_number */);
AtomicGroupReadBuffer read_buffer;
s = ReadAndRecover(reader, &read_buffer, cf_name_to_options,
column_families_not_found, builders, &log_read_status,
&version_edit_params, db_id);
current_manifest_file_size = reader.GetReadOffset();
assert(current_manifest_file_size != 0);
}
if (s.ok()) {
if (!version_edit_params.has_next_file_number_) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
} else if (!version_edit_params.has_log_number_) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
} else if (!version_edit_params.has_last_sequence_) {
s = Status::Corruption("no last-sequence-number entry in descriptor");
}
if (!version_edit_params.has_prev_log_number_) {
version_edit_params.SetPrevLogNumber(0);
}
column_family_set_->UpdateMaxColumnFamily(
version_edit_params.max_column_family_);
// When reading DB generated using old release, min_log_number_to_keep=0.
// All log files will be scanned for potential prepare entries.
MarkMinLogNumberToKeep2PC(version_edit_params.min_log_number_to_keep_);
MarkFileNumberUsed(version_edit_params.prev_log_number_);
MarkFileNumberUsed(version_edit_params.log_number_);
}
// there were some column families in the MANIFEST that weren't specified
// in the argument. This is OK in read_only mode
if (read_only == false && !column_families_not_found.empty()) {
std::string list_of_not_found;
for (const auto& cf : column_families_not_found) {
list_of_not_found += ", " + cf.second;
}
list_of_not_found = list_of_not_found.substr(2);
s = Status::InvalidArgument(
"You have to open all column families. Column families not opened: " +
list_of_not_found);
}
if (s.ok()) {
for (auto cfd : *column_family_set_) {
assert(builders.count(cfd->GetID()) > 0);
auto* builder = builders[cfd->GetID()]->version_builder();
if (!builder->CheckConsistencyForNumLevels()) {
s = Status::InvalidArgument(
"db has more levels than options.num_levels");
break;
}
VersionEditHandler handler(
read_only, column_families, const_cast<VersionSet*>(this),
/*track_missing_files=*/false,
/*no_error_if_table_files_missing=*/false, io_tracer_);
handler.Iterate(reader, &log_read_status);
s = handler.status();
if (s.ok()) {
log_number = handler.GetVersionEditParams().log_number_;
current_manifest_file_size = reader.GetReadOffset();
assert(current_manifest_file_size != 0);
handler.GetDbId(db_id);
}
}
if (s.ok()) {
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
}
if (read_only) {
cfd->table_cache()->SetTablesAreImmortal();
}
assert(cfd->initialized());
auto builders_iter = builders.find(cfd->GetID());
assert(builders_iter != builders.end());
auto builder = builders_iter->second->version_builder();
// unlimited table cache. Pre-load table handle now.
// Need to do it out of the mutex.
s = builder->LoadTableHandlers(
cfd->internal_stats(), db_options_->max_file_opening_threads,
false /* prefetch_index_and_filter_in_cache */,
true /* is_initial_load */,
cfd->GetLatestMutableCFOptions()->prefix_extractor.get(),
MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions()));
if (!s.ok()) {
if (db_options_->paranoid_checks) {
return s;
}
s = Status::OK();
}
Version* v = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(), io_tracer_,
current_version_number_++);
s = builder->SaveTo(v->storage_info());
if (!s.ok()) {
delete v;
return s;
}
// Install recovered version
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
!(db_options_->skip_stats_update_on_db_open));
AppendVersion(cfd, v);
}
manifest_file_size_ = current_manifest_file_size;
next_file_number_.store(version_edit_params.next_file_number_ + 1);
last_allocated_sequence_ = version_edit_params.last_sequence_;
last_published_sequence_ = version_edit_params.last_sequence_;
last_sequence_ = version_edit_params.last_sequence_;
prev_log_number_ = version_edit_params.prev_log_number_;
ROCKS_LOG_INFO(
db_options_->info_log,
"Recovered from manifest file:%s succeeded,"
@ -4887,9 +4769,8 @@ Status VersionSet::Recover(
",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
",min_log_number_to_keep is %" PRIu64 "\n",
manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
last_sequence_.load(), version_edit_params.log_number_,
prev_log_number_, column_family_set_->GetMaxColumnFamily(),
min_log_number_to_keep_2pc());
last_sequence_.load(), log_number, prev_log_number_,
column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
@ -5037,7 +4918,9 @@ Status VersionSet::TryRecoverFromOneManifest(
VersionEditHandlerPointInTime handler_pit(
read_only, column_families, const_cast<VersionSet*>(this), io_tracer_);
handler_pit.Iterate(reader, &s, db_id);
handler_pit.Iterate(reader, &s);
handler_pit.GetDbId(db_id);
assert(nullptr != has_missing_table_file);
*has_missing_table_file = handler_pit.HasMissingFiles();
@ -5071,48 +4954,23 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
nullptr /*IOTracer*/));
}
std::map<uint32_t, std::string> column_family_names;
// default column family is always implicitly there
column_family_names.insert({0, kDefaultColumnFamilyName});
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter,
true /* checksum */, 0 /* log_number */);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
break;
}
if (edit.is_column_family_add_) {
if (column_family_names.find(edit.column_family_) !=
column_family_names.end()) {
s = Status::Corruption("Manifest adding the same column family twice");
break;
}
column_family_names.insert(
{edit.column_family_, edit.column_family_name_});
} else if (edit.is_column_family_drop_) {
if (column_family_names.find(edit.column_family_) ==
column_family_names.end()) {
s = Status::Corruption(
"Manifest - dropping non-existing column family");
break;
}
column_family_names.erase(edit.column_family_);
}
}
ListColumnFamiliesHandler handler;
handler.Iterate(reader, &s);
assert(column_families);
column_families->clear();
if (s.ok()) {
for (const auto& iter : column_family_names) {
if (handler.status().ok()) {
for (const auto& iter : handler.GetColumnFamilyNames()) {
column_families->push_back(iter.second);
}
}
return s;
return handler.status();
}
#ifndef ROCKSDB_LITE
@ -5271,194 +5129,19 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
std::move(file), dscname, db_options_->log_readahead_size, io_tracer_));
}
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t previous_log_number = 0;
int count = 0;
std::unordered_map<uint32_t, std::string> comparators;
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
builders;
// add default column family
VersionEdit default_cf_edit;
default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* default_cfd =
CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
builders.insert(
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
new BaseReferencedVersionBuilder(default_cfd))));
std::vector<ColumnFamilyDescriptor> column_families(
1, ColumnFamilyDescriptor(kDefaultColumnFamilyName, options));
DumpManifestHandler handler(column_families, this, io_tracer_, verbose, hex,
json);
{
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter,
true /* checksum */, 0 /* log_number */);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
break;
}
// Write out each individual edit
if (verbose && !json) {
printf("%s\n", edit.DebugString(hex).c_str());
} else if (json) {
printf("%s\n", edit.DebugJSON(count, hex).c_str());
}
count++;
bool cf_in_builders =
builders.find(edit.column_family_) != builders.end();
if (edit.has_comparator_) {
comparators.insert({edit.column_family_, edit.comparator_});
}
ColumnFamilyData* cfd = nullptr;
if (edit.is_column_family_add_) {
if (cf_in_builders) {
s = Status::Corruption(
"Manifest adding the same column family twice");
break;
}
cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
cfd->set_initialized();
builders.insert(std::make_pair(
edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
new BaseReferencedVersionBuilder(cfd))));
} else if (edit.is_column_family_drop_) {
if (!cf_in_builders) {
s = Status::Corruption(
"Manifest - dropping non-existing column family");
break;
}
auto builder_iter = builders.find(edit.column_family_);
builders.erase(builder_iter);
comparators.erase(edit.column_family_);
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
assert(cfd != nullptr);
cfd->UnrefAndTryDelete();
cfd = nullptr;
} else {
if (!cf_in_builders) {
s = Status::Corruption(
"Manifest record referencing unknown column family");
break;
}
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// this should never happen since cf_in_builders is true
assert(cfd != nullptr);
// if it is not column family add or column family drop,
// then it's a file add/delete, which should be forwarded
// to builder
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
s = builder->second->version_builder()->Apply(&edit);
if (!s.ok()) {
break;
}
}
if (cfd != nullptr && edit.has_log_number_) {
cfd->SetLogNumber(edit.log_number_);
}
if (edit.has_prev_log_number_) {
previous_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
if (edit.has_max_column_family_) {
column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
}
if (edit.has_min_log_number_to_keep_) {
MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
}
}
}
file_reader.reset();
if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
printf("no meta-nextfile entry in descriptor");
} else if (!have_last_sequence) {
printf("no last-sequence-number entry in descriptor");
s = Status::Corruption("no last-sequence-number entry in descriptor");
}
if (!have_prev_log_number) {
previous_log_number = 0;
}
handler.Iterate(reader, &s);
}
if (s.ok()) {
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
}
auto builders_iter = builders.find(cfd->GetID());
assert(builders_iter != builders.end());
auto builder = builders_iter->second->version_builder();
Version* v = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(), io_tracer_,
current_version_number_++);
s = builder->SaveTo(v->storage_info());
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
printf("--------------- Column family \"%s\" (ID %" PRIu32
") --------------\n",
cfd->GetName().c_str(), cfd->GetID());
printf("log number: %" PRIu64 "\n", cfd->GetLogNumber());
auto comparator = comparators.find(cfd->GetID());
if (comparator != comparators.end()) {
printf("comparator: %s\n", comparator->second.c_str());
} else {
printf("comparator: <NO COMPARATOR>\n");
}
printf("%s \n", v->DebugString(hex).c_str());
delete v;
}
next_file_number_.store(next_file + 1);
last_allocated_sequence_ = last_sequence;
last_published_sequence_ = last_sequence;
last_sequence_ = last_sequence;
prev_log_number_ = previous_log_number;
printf("next_file_number %" PRIu64 " last_sequence %" PRIu64
" prev_log_number %" PRIu64 " max_column_family %" PRIu32
" min_log_number_to_keep "
"%" PRIu64 "\n",
next_file_number_.load(), last_sequence, previous_log_number,
column_family_set_->GetMaxColumnFamily(),
min_log_number_to_keep_2pc());
}
return s;
return handler.status();
}
#endif // ROCKSDB_LITE

@ -1227,6 +1227,7 @@ class VersionSet {
friend class Version;
friend class VersionEditHandler;
friend class VersionEditHandlerPointInTime;
friend class DumpManifestHandler;
friend class DBImpl;
friend class DBImplReadOnly;

@ -1750,6 +1750,10 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase,
EXPECT_TRUE(first_in_atomic_group_);
last_in_atomic_group_ = true;
});
SyncPoint::GetInstance()->SetCallBack(
"VersionEditHandlerBase::Iterate:Finish", [&](void* arg) {
num_recovered_edits_ = *reinterpret_cast<int*>(arg);
});
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::ReadAndRecover:RecoveredEdits", [&](void* arg) {
num_recovered_edits_ = *reinterpret_cast<int*>(arg);

@ -13,6 +13,7 @@
#include "db/log_reader.h"
#include "db/version_edit.h"
#include "db/version_edit_handler.h"
#include "file/sequence_file_reader.h"
namespace ROCKSDB_NAMESPACE {
@ -95,7 +96,7 @@ Status GetFileChecksumsFromManifest(Env* src_env, const std::string& abs_path,
if (checksum_list == nullptr) {
return Status::InvalidArgument("checksum_list is nullptr");
}
assert(checksum_list);
checksum_list->reset();
Status s;
@ -123,32 +124,13 @@ Status GetFileChecksumsFromManifest(Env* src_env, const std::string& abs_path,
reporter.status_ptr = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter,
true /* checksum */, 0 /* log_number */);
Slice record;
std::string scratch;
while (reader.LastRecordEnd() < manifest_file_size &&
reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
break;
}
// Remove the deleted files from the checksum_list
for (const auto& deleted_file : edit.GetDeletedFiles()) {
checksum_list->RemoveOneFileChecksum(deleted_file.second);
}
// Add the new files to the checksum_list
for (const auto& new_file : edit.GetNewFiles()) {
checksum_list->InsertOneFileChecksum(
new_file.second.fd.GetNumber(), new_file.second.file_checksum,
new_file.second.file_checksum_func_name);
}
}
assert(!s.ok() ||
FileChecksumRetriever retriever(manifest_file_size, *checksum_list);
retriever.Iterate(reader, &s);
assert(!retriever.status().ok() ||
manifest_file_size == std::numeric_limits<uint64_t>::max() ||
reader.LastRecordEnd() == manifest_file_size);
return s;
return retriever.status();
}
} // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save