Handle blob files when options.best_efforts_recovery is true (#8180)

Summary:
If `options.best_efforts_recovery == true`, RocksDB currently tolerates missing table files and recovers to the latest version without missing table files (not considering WAL). It is necessary to handle blob files as well to make the feature more complete.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8180

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D27840556

Pulled By: riversand963

fbshipit-source-id: 041685d0dc2e7779ac4f0374c07a8a327704aa5e
main
Yanqin Jin 4 years ago committed by Facebook GitHub Bot
parent c377c2ba15
commit b0e20194ea
  1. 43
      db/blob/db_blob_basic_test.cc
  2. 28
      db/version_builder.cc
  3. 1
      db/version_builder.h
  4. 122
      db/version_edit_handler.cc
  5. 15
      db/version_edit_handler.h
  6. 60
      db/version_set.cc
  7. 4
      db/version_set.h

@ -267,6 +267,49 @@ TEST_F(DBBlobBasicTest, GenerateIOTracing) {
}
#endif // !ROCKSDB_LITE
TEST_F(DBBlobBasicTest, BestEffortsRecovery_MissingNewestBlobFile) {
Options options = GetDefaultOptions();
options.enable_blob_files = true;
options.min_blob_size = 0;
options.create_if_missing = true;
Reopen(options);
ASSERT_OK(dbfull()->DisableFileDeletions());
constexpr int kNumTableFiles = 2;
for (int i = 0; i < kNumTableFiles; ++i) {
for (char ch = 'a'; ch != 'c'; ++ch) {
std::string key(1, ch);
ASSERT_OK(Put(key, "value" + std::to_string(i)));
}
ASSERT_OK(Flush());
}
Close();
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(dbname_, &files));
std::string blob_file_path;
uint64_t max_blob_file_num = kInvalidBlobFileNumber;
for (const auto& fname : files) {
uint64_t file_num = 0;
FileType type;
if (ParseFileName(fname, &file_num, /*info_log_name_prefix=*/"", &type) &&
type == kBlobFile) {
if (file_num > max_blob_file_num) {
max_blob_file_num = file_num;
blob_file_path = dbname_ + "/" + fname;
}
}
}
ASSERT_OK(env_->DeleteFile(blob_file_path));
options.best_efforts_recovery = true;
Reopen(options);
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), "a", &value));
ASSERT_EQ("value" + std::to_string(kNumTableFiles - 2), value);
}
class DBBlobBasicIOErrorTest : public DBBlobBasicTest,
public testing::WithParamInterface<std::string> {
protected:

@ -517,6 +517,28 @@ class VersionBuilder::Rep {
return meta->oldest_blob_file_number;
}
uint64_t GetMinOldestBlobFileNumber() const {
uint64_t min_oldest_blob_file_num = std::numeric_limits<uint64_t>::max();
for (int level = 0; level < num_levels_; ++level) {
const auto& base_files = base_vstorage_->LevelFiles(level);
for (const auto* fmeta : base_files) {
assert(fmeta);
min_oldest_blob_file_num =
std::min(min_oldest_blob_file_num, fmeta->oldest_blob_file_number);
}
const auto& added_files = levels_[level].added_files;
for (const auto& elem : added_files) {
assert(elem.second);
min_oldest_blob_file_num = std::min(
min_oldest_blob_file_num, elem.second->oldest_blob_file_number);
}
}
if (min_oldest_blob_file_num == std::numeric_limits<uint64_t>::max()) {
min_oldest_blob_file_num = kInvalidBlobFileNumber;
}
return min_oldest_blob_file_num;
}
Status ApplyFileDeletion(int level, uint64_t file_number) {
assert(level != VersionStorageInfo::FileLocation::Invalid().GetLevel());
@ -834,7 +856,7 @@ class VersionBuilder::Rep {
}
}
// Save the current state in *v.
// Save the current state in *vstorage.
Status SaveTo(VersionStorageInfo* vstorage) {
Status s = CheckConsistency(base_vstorage_);
if (!s.ok()) {
@ -1052,6 +1074,10 @@ Status VersionBuilder::LoadTableHandlers(
is_initial_load, prefix_extractor, max_file_size_for_l0_meta_pin);
}
uint64_t VersionBuilder::GetMinOldestBlobFileNumber() const {
return rep_->GetMinOldestBlobFileNumber();
}
BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
ColumnFamilyData* cfd)
: version_builder_(new VersionBuilder(

@ -44,6 +44,7 @@ class VersionBuilder {
bool is_initial_load,
const SliceTransform* prefix_extractor,
size_t max_file_size_for_l0_meta_pin);
uint64_t GetMinOldestBlobFileNumber() const;
private:
class Rep;

@ -11,6 +11,8 @@
#include <cinttypes>
#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_file_reader.h"
#include "monitoring/persistent_stats_history.h"
namespace ROCKSDB_NAMESPACE {
@ -129,14 +131,14 @@ Status FileChecksumRetriever::ApplyVersionEdit(VersionEdit& edit,
VersionEditHandler::VersionEditHandler(
bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
VersionSet* version_set, bool track_missing_files,
bool no_error_if_table_files_missing,
const std::shared_ptr<IOTracer>& io_tracer, bool skip_load_table_files)
bool no_error_if_files_missing, const std::shared_ptr<IOTracer>& io_tracer,
bool skip_load_table_files)
: VersionEditHandlerBase(),
read_only_(read_only),
column_families_(std::move(column_families)),
version_set_(version_set),
track_missing_files_(track_missing_files),
no_error_if_table_files_missing_(no_error_if_table_files_missing),
no_error_if_files_missing_(no_error_if_files_missing),
io_tracer_(io_tracer),
skip_load_table_files_(skip_load_table_files),
initialized_(false) {
@ -301,6 +303,14 @@ bool VersionEditHandler::HasMissingFiles() const {
break;
}
}
if (!ret) {
for (const auto& elem : cf_to_missing_blob_files_high_) {
if (elem.second != kInvalidBlobFileNumber) {
ret = true;
break;
}
}
}
return ret;
}
@ -437,6 +447,8 @@ ColumnFamilyData* VersionEditHandler::CreateCfAndInit(
if (track_missing_files_) {
cf_to_missing_files_.emplace(edit.column_family_,
std::unordered_set<uint64_t>());
cf_to_missing_blob_files_high_.emplace(edit.column_family_,
kInvalidBlobFileNumber);
}
return cfd;
}
@ -450,6 +462,12 @@ ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup(
auto missing_files_iter = cf_to_missing_files_.find(edit.column_family_);
assert(missing_files_iter != cf_to_missing_files_.end());
cf_to_missing_files_.erase(missing_files_iter);
auto missing_blob_files_high_iter =
cf_to_missing_blob_files_high_.find(edit.column_family_);
assert(missing_blob_files_high_iter !=
cf_to_missing_blob_files_high_.end());
cf_to_missing_blob_files_high_.erase(missing_blob_files_high_iter);
}
ColumnFamilyData* ret =
version_set_->GetColumnFamilySet()->GetColumnFamily(edit.column_family_);
@ -505,8 +523,7 @@ Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
prefetch_index_and_filter_in_cache, is_initial_load,
cfd->GetLatestMutableCFOptions()->prefix_extractor.get(),
MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions()));
if ((s.IsPathNotFound() || s.IsCorruption()) &&
no_error_if_table_files_missing_) {
if ((s.IsPathNotFound() || s.IsCorruption()) && no_error_if_files_missing_) {
s = Status::OK();
}
if (!s.ok() && !version_set_->db_options_->paranoid_checks) {
@ -576,7 +593,7 @@ VersionEditHandlerPointInTime::VersionEditHandlerPointInTime(
VersionSet* version_set, const std::shared_ptr<IOTracer>& io_tracer)
: VersionEditHandler(read_only, column_families, version_set,
/*track_missing_files=*/true,
/*no_error_if_table_files_missing=*/true, io_tracer) {}
/*no_error_if_files_missing=*/true, io_tracer) {}
VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() {
for (const auto& elem : versions_) {
@ -626,7 +643,29 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
auto missing_files_iter = cf_to_missing_files_.find(cfd->GetID());
assert(missing_files_iter != cf_to_missing_files_.end());
std::unordered_set<uint64_t>& missing_files = missing_files_iter->second;
const bool prev_has_missing_files = !missing_files.empty();
auto missing_blob_files_high_iter =
cf_to_missing_blob_files_high_.find(cfd->GetID());
assert(missing_blob_files_high_iter != cf_to_missing_blob_files_high_.end());
const uint64_t prev_missing_blob_file_high =
missing_blob_files_high_iter->second;
VersionBuilder* builder = nullptr;
if (prev_missing_blob_file_high != kInvalidBlobFileNumber) {
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
builder = builder_iter->second->version_builder();
assert(builder != nullptr);
}
// At this point, we have not yet applied the new version edits read from the
// MANIFEST. We check whether we have any missing table and blob files.
const bool prev_has_missing_files =
!missing_files.empty() ||
(prev_missing_blob_file_high != kInvalidBlobFileNumber &&
prev_missing_blob_file_high >= builder->GetMinOldestBlobFileNumber());
for (const auto& file : edit.GetDeletedFiles()) {
uint64_t file_num = file.second;
auto fiter = missing_files.find(file_num);
@ -634,6 +673,8 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
missing_files.erase(fiter);
}
}
assert(!cfd->ioptions()->cf_paths.empty());
Status s;
for (const auto& elem : edit.GetNewFiles()) {
const FileMetaData& meta = elem.second;
@ -649,17 +690,60 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
break;
}
}
uint64_t missing_blob_file_num = prev_missing_blob_file_high;
for (const auto& elem : edit.GetBlobFileAdditions()) {
uint64_t file_num = elem.GetBlobFileNumber();
s = VerifyBlobFile(cfd, file_num, elem);
if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
missing_blob_file_num = std::max(missing_blob_file_num, file_num);
s = Status::OK();
} else if (!s.ok()) {
break;
}
}
bool has_missing_blob_files = false;
if (missing_blob_file_num != kInvalidBlobFileNumber &&
missing_blob_file_num >= prev_missing_blob_file_high) {
missing_blob_files_high_iter->second = missing_blob_file_num;
has_missing_blob_files = true;
} else if (missing_blob_file_num < prev_missing_blob_file_high) {
assert(false);
}
// We still have not applied the new version edit, but have tried to add new
// table and blob files after verifying their presence and consistency.
// Therefore, we know whether we will see new missing table and blob files
// later after actually applying the version edit. We perform the check here
// and record the result.
const bool has_missing_files =
!missing_files.empty() || has_missing_blob_files;
bool missing_info = !version_edit_params_.has_log_number_ ||
!version_edit_params_.has_next_file_number_ ||
!version_edit_params_.has_last_sequence_;
// Create version before apply edit
// Create version before apply edit. The version will represent the state
// before applying the version edit.
// A new version will created if:
// 1) no error has occurred so far, and
// 2) log_number_, next_file_number_ and last_sequence_ are known, and
// 3) any of the following:
// a) no missing file before, but will have missing file(s) after applying
// this version edit.
// b) no missing file after applying the version edit, and the caller
// explicitly request that a new version be created.
if (s.ok() && !missing_info &&
((!missing_files.empty() && !prev_has_missing_files) ||
(missing_files.empty() && force_create_version))) {
((has_missing_files && !prev_has_missing_files) ||
(!has_missing_files && force_create_version))) {
if (!builder) {
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
auto* builder = builder_iter->second->version_builder();
builder = builder_iter->second->version_builder();
assert(builder);
}
auto* version = new Version(cfd, version_set_, version_set_->file_options_,
*cfd->GetLatestMutableCFOptions(), io_tracer_,
version_set_->current_version_number_++);
@ -687,6 +771,22 @@ Status VersionEditHandlerPointInTime::VerifyFile(const std::string& fpath,
return version_set_->VerifyFileMetadata(fpath, fmeta);
}
Status VersionEditHandlerPointInTime::VerifyBlobFile(
ColumnFamilyData* cfd, uint64_t blob_file_num,
const BlobFileAddition& blob_addition) {
BlobFileCache* blob_file_cache = cfd->blob_file_cache();
assert(blob_file_cache);
CacheHandleGuard<BlobFileReader> blob_file_reader;
Status s =
blob_file_cache->GetBlobFileReader(blob_file_num, &blob_file_reader);
if (!s.ok()) {
return s;
}
// TODO: verify checksum
(void)blob_addition;
return s;
}
Status ManifestTailer::Initialize() {
if (Mode::kRecovery == mode_) {
return VersionEditHandler::Initialize();

@ -97,7 +97,7 @@ using VersionBuilderUPtr = std::unique_ptr<BaseReferencedVersionBuilder>;
// 1. Create an object of VersionEditHandler or its subclasses.
// VersionEditHandler handler(read_only, column_families, version_set,
// track_missing_files,
// no_error_if_table_files_missing);
// no_error_if_files_missing);
// 2. Status s = handler.Iterate(reader, &db_id);
// 3. Check s and handle possible errors.
//
@ -109,10 +109,10 @@ class VersionEditHandler : public VersionEditHandlerBase {
bool read_only,
const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set, bool track_missing_files,
bool no_error_if_table_files_missing,
bool no_error_if_files_missing,
const std::shared_ptr<IOTracer>& io_tracer)
: VersionEditHandler(read_only, column_families, version_set,
track_missing_files, no_error_if_table_files_missing,
track_missing_files, no_error_if_files_missing,
io_tracer, /*skip_load_table_files=*/false) {}
~VersionEditHandler() override {}
@ -133,7 +133,7 @@ class VersionEditHandler : public VersionEditHandlerBase {
explicit VersionEditHandler(
bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
VersionSet* version_set, bool track_missing_files,
bool no_error_if_table_files_missing,
bool no_error_if_files_missing,
const std::shared_ptr<IOTracer>& io_tracer, bool skip_load_table_files);
Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd) override;
@ -183,7 +183,8 @@ class VersionEditHandler : public VersionEditHandlerBase {
const bool track_missing_files_;
std::unordered_map<uint32_t, std::unordered_set<uint64_t>>
cf_to_missing_files_;
bool no_error_if_table_files_missing_;
std::unordered_map<uint32_t, uint64_t> cf_to_missing_blob_files_high_;
bool no_error_if_files_missing_;
std::shared_ptr<IOTracer> io_tracer_;
bool skip_load_table_files_;
bool initialized_;
@ -213,6 +214,8 @@ class VersionEditHandlerPointInTime : public VersionEditHandler {
bool force_create_version) override;
virtual Status VerifyFile(const std::string& fpath,
const FileMetaData& fmeta);
virtual Status VerifyBlobFile(ColumnFamilyData* cfd, uint64_t blob_file_num,
const BlobFileAddition& blob_addition);
std::unordered_map<uint32_t, Version*> versions_;
};
@ -267,7 +270,7 @@ class DumpManifestHandler : public VersionEditHandler {
: VersionEditHandler(
/*read_only=*/true, column_families, version_set,
/*track_missing_files=*/false,
/*no_error_if_table_files_missing=*/false, io_tracer,
/*no_error_if_files_missing=*/false, io_tracer,
/*skip_load_table_files=*/true),
verbose_(verbose),
hex_(hex),

@ -4490,60 +4490,6 @@ Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
return builder ? builder->Apply(edit) : Status::OK();
}
Status VersionSet::ExtractInfoFromVersionEdit(
ColumnFamilyData* cfd, const VersionEdit& from_edit,
VersionEditParams* version_edit_params) {
if (cfd != nullptr) {
if (from_edit.has_db_id_) {
version_edit_params->SetDBId(from_edit.db_id_);
}
if (from_edit.has_log_number_) {
if (cfd->GetLogNumber() > from_edit.log_number_) {
ROCKS_LOG_WARN(
db_options_->info_log,
"MANIFEST corruption detected, but ignored - Log numbers in "
"records NOT monotonically increasing");
} else {
cfd->SetLogNumber(from_edit.log_number_);
version_edit_params->SetLogNumber(from_edit.log_number_);
}
}
if (from_edit.has_comparator_ &&
from_edit.comparator_ != cfd->user_comparator()->Name()) {
return Status::InvalidArgument(
cfd->user_comparator()->Name(),
"does not match existing comparator " + from_edit.comparator_);
}
if (from_edit.HasFullHistoryTsLow()) {
const std::string& new_ts = from_edit.GetFullHistoryTsLow();
cfd->SetFullHistoryTsLow(new_ts);
}
}
if (from_edit.has_prev_log_number_) {
version_edit_params->SetPrevLogNumber(from_edit.prev_log_number_);
}
if (from_edit.has_next_file_number_) {
version_edit_params->SetNextFile(from_edit.next_file_number_);
}
if (from_edit.has_max_column_family_) {
version_edit_params->SetMaxColumnFamily(from_edit.max_column_family_);
}
if (from_edit.has_min_log_number_to_keep_) {
version_edit_params->min_log_number_to_keep_ =
std::max(version_edit_params->min_log_number_to_keep_,
from_edit.min_log_number_to_keep_);
}
if (from_edit.has_last_sequence_) {
version_edit_params->SetLastSequence(from_edit.last_sequence_);
}
return Status::OK();
}
Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
FileSystem* fs,
std::string* manifest_path,
@ -4610,10 +4556,10 @@ Status VersionSet::Recover(
reporter.status = &log_read_status;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
true /* checksum */, 0 /* log_number */);
VersionEditHandler handler(
read_only, column_families, const_cast<VersionSet*>(this),
VersionEditHandler handler(read_only, column_families,
const_cast<VersionSet*>(this),
/*track_missing_files=*/false,
/*no_error_if_table_files_missing=*/false, io_tracer_);
/*no_error_if_files_missing=*/false, io_tracer_);
handler.Iterate(reader, &log_read_status);
s = handler.status();
if (s.ok()) {

@ -1331,10 +1331,6 @@ class VersionSet {
ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const VersionEdit* edit);
Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
const VersionEdit& from_edit,
VersionEditParams* version_edit_params);
Status VerifyFileMetadata(const std::string& fpath,
const FileMetaData& meta) const;

Loading…
Cancel
Save