Attempt to recover from db with missing table files (#6334)

Summary:
There are situations when RocksDB tries to recover, but the db is in an inconsistent state due to SST files referenced in the MANIFEST being missing. In this case, previous RocksDB will just fail the recovery and return a non-ok status.
This PR enables another possibility. During recovery, RocksDB checks possible MANIFEST files, and try to recover to the most recent state without missing table file. `VersionSet::Recover()` applies version edits incrementally and "materializes" a version only when this version does not reference any missing table file. After processing the entire MANIFEST, the version created last will be the latest version.
`DBImpl::Recover()` calls `VersionSet::Recover()`. Afterwards, WAL replay will *not* be performed.
To use this capability, set `options.best_efforts_recovery = true` when opening the db. Best-efforts recovery is currently incompatible with atomic flush.

Test plan (on devserver):
```
$make check
$COMPILE_WITH_ASAN=1 make all && make check
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6334

Reviewed By: anand1976

Differential Revision: D19778960

Pulled By: riversand963

fbshipit-source-id: c27ea80f29bc952e7d3311ecf5ee9c54393b40a8
main
Yanqin Jin 5 years ago committed by Facebook GitHub Bot
parent 4fc216649d
commit fb09ef05dc
  1. 1
      CMakeLists.txt
  2. 1
      HISTORY.md
  3. 1
      TARGETS
  4. 8
      db/column_family.cc
  5. 8
      db/column_family.h
  6. 152
      db/db_basic_test.cc
  7. 11
      db/db_impl/db_impl.h
  8. 52
      db/db_impl/db_impl_files.cc
  9. 20
      db/db_impl/db_impl_open.cc
  10. 22
      db/version_builder.cc
  11. 17
      db/version_builder.h
  12. 2
      db/version_edit.h
  13. 576
      db/version_edit_handler.cc
  14. 123
      db/version_edit_handler.h
  15. 219
      db/version_set.cc
  16. 23
      db/version_set.h
  17. 778
      db/version_set_test.cc
  18. 12
      env/mock_env.cc
  19. 2
      env/mock_env.h
  20. 12
      file/filename.cc
  21. 8
      file/filename.h
  22. 10
      include/rocksdb/options.h
  23. 5
      options/db_options.cc
  24. 1
      options/db_options.h
  25. 4
      options/options_helper.cc
  26. 3
      options/options_settable_test.cc
  27. 1
      src.mk

@ -565,6 +565,7 @@ set(SOURCES
db/trim_history_scheduler.cc
db/version_builder.cc
db/version_edit.cc
db/version_edit_handler.cc
db/version_set.cc
db/wal_manager.cc
db/write_batch.cc

@ -13,6 +13,7 @@
### New Features
* Basic support for user timestamp in iterator. Seek/SeekToFirst/Next and lower/upper bounds are supported. Reverse iteration is not supported. Merge is not considered.
* When file lock failure when the lock is held by the current process, return acquiring time and thread ID in the error message.
* Added a new option, best_efforts_recovery (default: false), to allow database to open in a db dir with missing table files. During best efforts recovery, missing table files are ignored, and database recovers to the most recent state without missing table file. Cross-column-family consistency is not guaranteed even if WAL is enabled.
## 6.8.0 (02/24/2020)
### Java API Changes

@ -171,6 +171,7 @@ cpp_library(
"db/trim_history_scheduler.cc",
"db/version_builder.cc",
"db/version_edit.cc",
"db/version_edit_handler.cc",
"db/version_set.cc",
"db/wal_manager.cc",
"db/write_batch.cc",

@ -1397,8 +1397,8 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const ImmutableDBOptions* db_options,
const FileOptions& file_options,
Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller,
WriteBufferManager* _write_buffer_manager,
WriteController* _write_controller,
BlockCacheTracer* const block_cache_tracer)
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(
@ -1410,8 +1410,8 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
db_options_(db_options),
file_options_(file_options),
table_cache_(table_cache),
write_buffer_manager_(write_buffer_manager),
write_controller_(write_controller),
write_buffer_manager_(_write_buffer_manager),
write_controller_(_write_controller),
block_cache_tracer_(block_cache_tracer) {
// initialize linked list
dummy_cfd_->prev_ = dummy_cfd_;

@ -647,8 +647,8 @@ class ColumnFamilySet {
ColumnFamilySet(const std::string& dbname,
const ImmutableDBOptions* db_options,
const FileOptions& file_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller,
WriteBufferManager* _write_buffer_manager,
WriteController* _write_controller,
BlockCacheTracer* const block_cache_tracer);
~ColumnFamilySet();
@ -678,6 +678,10 @@ class ColumnFamilySet {
Cache* get_table_cache() { return table_cache_; }
WriteBufferManager* write_buffer_manager() { return write_buffer_manager_; }
WriteController* write_controller() { return write_controller_; }
private:
friend class ColumnFamilyData;
// helper function that gets called from cfd destructor

@ -1734,6 +1734,158 @@ TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
keys.data(), values.data(), statuses.data(), true);
}
TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) {
Options options = CurrentOptions();
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
WriteOptions write_opts;
write_opts.disableWAL = true;
for (size_t cf = 0; cf != num_cfs; ++cf) {
for (size_t i = 0; i != 10000; ++i) {
std::string key_str = Key(static_cast<int>(i));
std::string value_str = std::to_string(cf) + "_" + std::to_string(i);
ASSERT_OK(Put(static_cast<int>(cf), key_str, value_str));
if (0 == (i % 1000)) {
ASSERT_OK(Flush(static_cast<int>(cf)));
}
}
}
for (size_t cf = 0; cf != num_cfs; ++cf) {
ASSERT_OK(Flush(static_cast<int>(cf)));
}
Close();
options.best_efforts_recovery = true;
ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
options);
num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
for (size_t cf = 0; cf != num_cfs; ++cf) {
for (int i = 0; i != 10000; ++i) {
std::string key_str = Key(static_cast<int>(i));
std::string expected_value_str =
std::to_string(cf) + "_" + std::to_string(i);
ASSERT_EQ(expected_value_str, Get(static_cast<int>(cf), key_str));
}
}
}
namespace {
class TableFileListener : public EventListener {
public:
void OnTableFileCreated(const TableFileCreationInfo& info) override {
InstrumentedMutexLock lock(&mutex_);
cf_to_paths_[info.cf_name].push_back(info.file_path);
}
std::vector<std::string>& GetFiles(const std::string& cf_name) {
InstrumentedMutexLock lock(&mutex_);
return cf_to_paths_[cf_name];
}
private:
InstrumentedMutex mutex_;
std::unordered_map<std::string, std::vector<std::string>> cf_to_paths_;
};
} // namespace
TEST_F(DBBasicTest, RecoverWithMissingFiles) {
Options options = CurrentOptions();
DestroyAndReopen(options);
TableFileListener* listener = new TableFileListener();
// Disable auto compaction to simplify SST file name tracking.
options.disable_auto_compactions = true;
options.listeners.emplace_back(listener);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
std::vector<std::string> all_cf_names = {kDefaultColumnFamilyName, "pikachu",
"eevee"};
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
for (size_t cf = 0; cf != num_cfs; ++cf) {
ASSERT_OK(Put(static_cast<int>(cf), "a", "0_value"));
ASSERT_OK(Flush(static_cast<int>(cf)));
ASSERT_OK(Put(static_cast<int>(cf), "b", "0_value"));
ASSERT_OK(Flush(static_cast<int>(cf)));
ASSERT_OK(Put(static_cast<int>(cf), "c", "0_value"));
ASSERT_OK(Flush(static_cast<int>(cf)));
}
// Delete files
for (size_t i = 0; i < all_cf_names.size(); ++i) {
std::vector<std::string>& files = listener->GetFiles(all_cf_names[i]);
ASSERT_EQ(3, files.size());
for (int j = static_cast<int>(files.size() - 1); j >= static_cast<int>(i);
--j) {
ASSERT_OK(env_->DeleteFile(files[j]));
}
}
options.best_efforts_recovery = true;
ReopenWithColumnFamilies(all_cf_names, options);
// Verify data
ReadOptions read_opts;
read_opts.total_order_seek = true;
{
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
iter->SeekToFirst();
ASSERT_FALSE(iter->Valid());
iter.reset(db_->NewIterator(read_opts, handles_[1]));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a", iter->key());
iter->Next();
ASSERT_FALSE(iter->Valid());
iter.reset(db_->NewIterator(read_opts, handles_[2]));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a", iter->key());
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("b", iter->key());
iter->Next();
ASSERT_FALSE(iter->Valid());
}
}
TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
Options options = CurrentOptions();
DestroyAndReopen(options);
TableFileListener* listener = new TableFileListener();
options.listeners.emplace_back(listener);
CreateAndReopenWithCF({"pikachu"}, options);
std::vector<std::string> kAllCfNames = {kDefaultColumnFamilyName, "pikachu"};
size_t num_cfs = handles_.size();
ASSERT_EQ(2, num_cfs);
for (int cf = 0; cf < static_cast<int>(kAllCfNames.size()); ++cf) {
ASSERT_OK(Put(cf, "a", "0_value"));
ASSERT_OK(Flush(cf));
ASSERT_OK(Put(cf, "b", "0_value"));
}
// Delete files
for (size_t i = 0; i < kAllCfNames.size(); ++i) {
std::vector<std::string>& files = listener->GetFiles(kAllCfNames[i]);
ASSERT_EQ(1, files.size());
for (int j = static_cast<int>(files.size() - 1); j >= static_cast<int>(i);
--j) {
ASSERT_OK(env_->DeleteFile(files[j]));
}
}
options.best_efforts_recovery = true;
ReopenWithColumnFamilies(kAllCfNames, options);
// Verify WAL is not applied
ReadOptions read_opts;
read_opts.total_order_seek = true;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
iter->SeekToFirst();
ASSERT_FALSE(iter->Valid());
iter.reset(db_->NewIterator(read_opts, handles_[1]));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a", iter->key());
iter->Next();
ASSERT_FALSE(iter->Valid());
}
class DBBasicTestWithParallelIO
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool, bool, bool, bool>> {

@ -1124,6 +1124,17 @@ class DBImpl : public DB {
virtual bool OwnTablesAndLogs() const { return true; }
// REQUIRES: db mutex held when calling this function, but the db mutex can
// be released and re-acquired. Db mutex will be held when the function
// returns.
// Currently, this function should be called only in best-efforts recovery
// mode.
// After best-efforts recovery, there may be SST files in db/cf paths that are
// not referenced in the MANIFEST. We delete these SST files. In the
// meantime, we find out the largest file number present in the paths, and
// bump up the version set's next_file_number_ to be 1 + largest_file_number.
Status CleanupFilesAfterRecovery();
private:
friend class DB;
friend class ErrorHandler;

@ -14,6 +14,7 @@
#include "db/event_helpers.h"
#include "db/memtable_list.h"
#include "file/file_util.h"
#include "file/filename.h"
#include "file/sst_file_manager_impl.h"
#include "util/autovector.h"
@ -664,4 +665,55 @@ uint64_t PrecomputeMinLogNumberToKeep(
return min_log_number_to_keep;
}
Status DBImpl::CleanupFilesAfterRecovery() {
mutex_.AssertHeld();
std::vector<std::string> paths;
paths.push_back(dbname_);
for (const auto& db_path : immutable_db_options_.db_paths) {
paths.push_back(db_path.path);
}
for (const auto* cfd : *versions_->GetColumnFamilySet()) {
for (const auto& cf_path : cfd->ioptions()->cf_paths) {
paths.push_back(cf_path.path);
}
}
// Dedup paths
std::sort(paths.begin(), paths.end());
paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
uint64_t next_file_number = versions_->current_next_file_number();
uint64_t largest_file_number = next_file_number;
std::set<std::string> files_to_delete;
for (const auto& path : paths) {
std::vector<std::string> files;
env_->GetChildren(path, &files);
for (const auto& fname : files) {
uint64_t number = 0;
FileType type;
if (!ParseFileName(fname, &number, &type)) {
continue;
}
const std::string normalized_fpath = NormalizePath(path + fname);
largest_file_number = std::max(largest_file_number, number);
if (type == kTableFile && number >= next_file_number &&
files_to_delete.find(normalized_fpath) == files_to_delete.end()) {
files_to_delete.insert(normalized_fpath);
}
}
}
if (largest_file_number > next_file_number) {
versions_->next_file_number_.store(largest_file_number + 1);
}
mutex_.Unlock();
Status s;
for (const auto& fname : files_to_delete) {
s = env_->DeleteFile(fname);
if (!s.ok()) {
break;
}
}
mutex_.Lock();
return s;
}
} // namespace ROCKSDB_NAMESPACE

@ -252,6 +252,12 @@ Status DBImpl::ValidateOptions(const DBOptions& db_options) {
"atomic_flush is incompatible with enable_pipelined_write");
}
// TODO remove this restriction
if (db_options.atomic_flush && db_options.best_efforts_recovery) {
return Status::InvalidArgument(
"atomic_flush is currently incompatible with best-efforts recovery");
}
return Status::OK();
}
@ -419,7 +425,17 @@ Status DBImpl::Recover(
}
}
assert(db_id_.empty());
Status s = versions_->Recover(column_families, read_only, &db_id_);
Status s;
bool missing_table_file = false;
if (!immutable_db_options_.best_efforts_recovery) {
s = versions_->Recover(column_families, read_only, &db_id_);
} else {
s = versions_->TryRecover(column_families, read_only, &db_id_,
&missing_table_file);
if (s.ok()) {
s = CleanupFilesAfterRecovery();
}
}
if (!s.ok()) {
return s;
}
@ -499,7 +515,9 @@ Status DBImpl::Recover(
// attention to it in case we are recovering a database
// produced by an older version of rocksdb.
std::vector<std::string> filenames;
if (!immutable_db_options_.best_efforts_recovery) {
s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
}
if (s.IsNotFound()) {
return Status::InvalidArgument("wal_dir not found",
immutable_db_options_.wal_dir);

@ -528,4 +528,26 @@ Status VersionBuilder::LoadTableHandlers(
is_initial_load, prefix_extractor);
}
BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
ColumnFamilyData* cfd)
: version_builder_(new VersionBuilder(
cfd->current()->version_set()->file_options(), cfd->table_cache(),
cfd->current()->storage_info(), cfd->ioptions()->info_log)),
version_(cfd->current()) {
version_->Ref();
}
BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
ColumnFamilyData* cfd, Version* v)
: version_builder_(new VersionBuilder(
cfd->current()->version_set()->file_options(), cfd->table_cache(),
v->storage_info(), cfd->ioptions()->info_log)),
version_(v) {
assert(version_ != cfd->current());
}
BaseReferencedVersionBuilder::~BaseReferencedVersionBuilder() {
version_->Unref();
}
} // namespace ROCKSDB_NAMESPACE

@ -21,6 +21,8 @@ class VersionStorageInfo;
class VersionEdit;
struct FileMetaData;
class InternalStats;
class Version;
class ColumnFamilyData;
// A helper class so we can efficiently apply a whole sequence
// of edits to a particular state without creating intermediate
@ -44,5 +46,20 @@ class VersionBuilder {
std::unique_ptr<Rep> rep_;
};
// A wrapper of version builder which references the current version in
// constructor and unref it in the destructor.
// Both of the constructor and destructor need to be called inside DB Mutex.
class BaseReferencedVersionBuilder {
public:
explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd);
BaseReferencedVersionBuilder(ColumnFamilyData* cfd, Version* v);
~BaseReferencedVersionBuilder();
VersionBuilder* version_builder() const { return version_builder_.get(); }
private:
std::unique_ptr<VersionBuilder> version_builder_;
Version* version_;
};
extern bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b);
} // namespace ROCKSDB_NAMESPACE

@ -438,6 +438,8 @@ class VersionEdit {
private:
friend class ReactiveVersionSet;
friend class VersionEditHandler;
friend class VersionEditHandlerPointInTime;
friend class VersionSet;
friend class Version;
friend class AtomicGroupReadBuffer;

@ -0,0 +1,576 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/version_edit_handler.h"
#include "monitoring/persistent_stats_history.h"
namespace ROCKSDB_NAMESPACE {
VersionEditHandler::VersionEditHandler(
bool read_only, const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set, bool track_missing_files,
bool no_error_if_table_files_missing)
: read_only_(read_only),
column_families_(column_families),
status_(),
version_set_(version_set),
track_missing_files_(track_missing_files),
no_error_if_table_files_missing_(no_error_if_table_files_missing),
initialized_(false) {
assert(version_set_ != nullptr);
}
Status VersionEditHandler::Iterate(log::Reader& reader, std::string* db_id) {
Slice record;
std::string scratch;
size_t recovered_edits = 0;
Status s = Initialize();
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
break;
}
if (edit.has_db_id_) {
version_set_->db_id_ = edit.GetDbId();
if (db_id != nullptr) {
*db_id = version_set_->db_id_;
}
}
s = read_buffer_.AddEdit(&edit);
if (!s.ok()) {
break;
}
ColumnFamilyData* cfd = nullptr;
if (edit.is_in_atomic_group_) {
if (read_buffer_.IsFull()) {
for (auto& e : read_buffer_.replay_buffer()) {
s = ApplyVersionEdit(e, &cfd);
if (!s.ok()) {
break;
}
++recovered_edits;
}
if (!s.ok()) {
break;
}
read_buffer_.Clear();
}
} else {
s = ApplyVersionEdit(edit, &cfd);
if (s.ok()) {
++recovered_edits;
}
}
}
CheckIterationResult(reader, &s);
if (!s.ok()) {
status_ = s;
}
return s;
}
Status VersionEditHandler::Initialize() {
Status s;
if (!initialized_) {
for (const auto& cf_desc : column_families_) {
name_to_options_.emplace(cf_desc.name, cf_desc.options);
}
auto default_cf_iter = name_to_options_.find(kDefaultColumnFamilyName);
if (default_cf_iter == name_to_options_.end()) {
s = Status::InvalidArgument("Default column family not specified");
}
if (s.ok()) {
VersionEdit default_cf_edit;
default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* cfd =
CreateCfAndInit(default_cf_iter->second, default_cf_edit);
assert(cfd != nullptr);
#ifdef NDEBUG
(void)cfd;
#endif
initialized_ = true;
}
}
return s;
}
Status VersionEditHandler::ApplyVersionEdit(VersionEdit& edit,
ColumnFamilyData** cfd) {
Status s;
if (edit.is_column_family_add_) {
s = OnColumnFamilyAdd(edit, cfd);
} else if (edit.is_column_family_drop_) {
s = OnColumnFamilyDrop(edit, cfd);
} else {
s = OnNonCfOperation(edit, cfd);
}
if (s.ok()) {
assert(cfd != nullptr);
s = ExtractInfoFromVersionEdit(*cfd, edit);
}
return s;
}
Status VersionEditHandler::OnColumnFamilyAdd(VersionEdit& edit,
ColumnFamilyData** cfd) {
bool cf_in_not_found = false;
bool cf_in_builders = false;
CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders);
assert(cfd != nullptr);
*cfd = nullptr;
Status s;
if (cf_in_builders || cf_in_not_found) {
s = Status::Corruption("MANIFEST adding the same column family twice: " +
edit.column_family_name_);
}
if (s.ok()) {
auto cf_options = name_to_options_.find(edit.column_family_name_);
// implicitly add persistent_stats column family without requiring user
// to specify
ColumnFamilyData* tmp_cfd = nullptr;
bool is_persistent_stats_column_family =
edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0;
if (cf_options == name_to_options_.end() &&
!is_persistent_stats_column_family) {
column_families_not_found_.emplace(edit.column_family_,
edit.column_family_name_);
} else {
if (is_persistent_stats_column_family) {
ColumnFamilyOptions cfo;
OptimizeForPersistentStats(&cfo);
tmp_cfd = CreateCfAndInit(cfo, edit);
} else {
tmp_cfd = CreateCfAndInit(cf_options->second, edit);
}
*cfd = tmp_cfd;
}
}
return s;
}
Status VersionEditHandler::OnColumnFamilyDrop(VersionEdit& edit,
ColumnFamilyData** cfd) {
bool cf_in_not_found = false;
bool cf_in_builders = false;
CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders);
assert(cfd != nullptr);
*cfd = nullptr;
ColumnFamilyData* tmp_cfd = nullptr;
Status s;
if (cf_in_builders) {
tmp_cfd = DestroyCfAndCleanup(edit);
} else if (cf_in_not_found) {
column_families_not_found_.erase(edit.column_family_);
} else {
s = Status::Corruption("MANIFEST - dropping non-existing column family");
}
*cfd = tmp_cfd;
return s;
}
Status VersionEditHandler::OnNonCfOperation(VersionEdit& edit,
ColumnFamilyData** cfd) {
bool cf_in_not_found = false;
bool cf_in_builders = false;
CheckColumnFamilyId(edit, &cf_in_not_found, &cf_in_builders);
assert(cfd != nullptr);
*cfd = nullptr;
Status s;
if (!cf_in_not_found) {
if (!cf_in_builders) {
s = Status::Corruption(
"MANIFEST record referencing unknown column family");
}
ColumnFamilyData* tmp_cfd = nullptr;
if (s.ok()) {
auto builder_iter = builders_.find(edit.column_family_);
assert(builder_iter != builders_.end());
tmp_cfd = version_set_->GetColumnFamilySet()->GetColumnFamily(
edit.column_family_);
assert(tmp_cfd != nullptr);
s = MaybeCreateVersion(edit, tmp_cfd, /*force_create_version=*/false);
if (s.ok()) {
s = builder_iter->second->version_builder()->Apply(&edit);
}
}
*cfd = tmp_cfd;
}
return s;
}
// TODO maybe cache the computation result
bool VersionEditHandler::HasMissingFiles() const {
bool ret = false;
for (const auto& elem : cf_to_missing_files_) {
const auto& missing_files = elem.second;
if (!missing_files.empty()) {
ret = true;
break;
}
}
return ret;
}
void VersionEditHandler::CheckColumnFamilyId(const VersionEdit& edit,
bool* cf_in_not_found,
bool* cf_in_builders) const {
assert(cf_in_not_found != nullptr);
assert(cf_in_builders != nullptr);
// Not found means that user didn't supply that column
// family option AND we encountered column family add
// record. Once we encounter column family drop record,
// we will delete the column family from
// column_families_not_found.
bool in_not_found = column_families_not_found_.find(edit.column_family_) !=
column_families_not_found_.end();
// in builders means that user supplied that column family
// option AND that we encountered column family add record
bool in_builders = builders_.find(edit.column_family_) != builders_.end();
// They cannot both be true
assert(!(in_not_found && in_builders));
*cf_in_not_found = in_not_found;
*cf_in_builders = in_builders;
}
void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
Status* s) {
assert(s != nullptr);
if (!s->ok()) {
read_buffer_.Clear();
} else if (!version_edit_params_.has_log_number_ ||
!version_edit_params_.has_next_file_number_ ||
!version_edit_params_.has_last_sequence_) {
std::string msg("no ");
if (!version_edit_params_.has_log_number_) {
msg.append("log_file_number, ");
}
if (!version_edit_params_.has_next_file_number_) {
msg.append("next_file_number, ");
}
if (!version_edit_params_.has_last_sequence_) {
msg.append("last_sequence, ");
}
msg = msg.substr(0, msg.size() - 2);
msg.append(" entry in MANIFEST");
*s = Status::Corruption(msg);
}
if (s->ok() && !read_only_ && !column_families_not_found_.empty()) {
std::string msg;
for (const auto& cf : column_families_not_found_) {
msg.append(", ");
msg.append(cf.second);
}
msg = msg.substr(2);
*s = Status::InvalidArgument("Column families not opened: " + msg);
}
if (s->ok()) {
version_set_->GetColumnFamilySet()->UpdateMaxColumnFamily(
version_edit_params_.max_column_family_);
version_set_->MarkMinLogNumberToKeep2PC(
version_edit_params_.min_log_number_to_keep_);
version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_);
version_set_->MarkFileNumberUsed(version_edit_params_.log_number_);
for (auto* cfd : *(version_set_->GetColumnFamilySet())) {
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
auto* builder = builder_iter->second->version_builder();
if (!builder->CheckConsistencyForNumLevels()) {
*s = Status::InvalidArgument(
"db has more levels than options.num_levels");
break;
}
}
}
if (s->ok()) {
for (auto* cfd : *(version_set_->GetColumnFamilySet())) {
if (cfd->IsDropped()) {
continue;
}
if (read_only_) {
cfd->table_cache()->SetTablesAreImmortal();
}
*s = LoadTables(cfd, /*prefetch_index_and_filter_in_cache=*/false,
/*is_initial_load=*/true);
if (!s->ok()) {
break;
}
}
}
if (s->ok()) {
for (auto* cfd : *(version_set_->column_family_set_)) {
if (cfd->IsDropped()) {
continue;
}
assert(cfd->initialized());
VersionEdit edit;
*s = MaybeCreateVersion(edit, cfd, /*force_create_version=*/true);
if (!s->ok()) {
break;
}
}
}
if (s->ok()) {
version_set_->manifest_file_size_ = reader.GetReadOffset();
assert(version_set_->manifest_file_size_ > 0);
version_set_->next_file_number_.store(
version_edit_params_.next_file_number_ + 1);
version_set_->last_allocated_sequence_ =
version_edit_params_.last_sequence_;
version_set_->last_published_sequence_ =
version_edit_params_.last_sequence_;
version_set_->last_sequence_ = version_edit_params_.last_sequence_;
version_set_->prev_log_number_ = version_edit_params_.prev_log_number_;
}
}
ColumnFamilyData* VersionEditHandler::CreateCfAndInit(
const ColumnFamilyOptions& cf_options, const VersionEdit& edit) {
ColumnFamilyData* cfd = version_set_->CreateColumnFamily(cf_options, &edit);
assert(cfd != nullptr);
cfd->set_initialized();
assert(builders_.find(edit.column_family_) == builders_.end());
builders_.emplace(edit.column_family_,
VersionBuilderUPtr(new BaseReferencedVersionBuilder(cfd)));
if (track_missing_files_) {
cf_to_missing_files_.emplace(edit.column_family_,
std::unordered_set<uint64_t>());
}
return cfd;
}
ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup(
const VersionEdit& edit) {
auto builder_iter = builders_.find(edit.column_family_);
assert(builder_iter != builders_.end());
builders_.erase(builder_iter);
if (track_missing_files_) {
auto missing_files_iter = cf_to_missing_files_.find(edit.column_family_);
assert(missing_files_iter != cf_to_missing_files_.end());
cf_to_missing_files_.erase(missing_files_iter);
}
ColumnFamilyData* ret =
version_set_->GetColumnFamilySet()->GetColumnFamily(edit.column_family_);
assert(ret != nullptr);
if (ret->UnrefAndTryDelete()) {
ret = nullptr;
} else {
assert(false);
}
return ret;
}
Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
ColumnFamilyData* cfd,
bool force_create_version) {
assert(cfd->initialized());
if (force_create_version) {
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
auto* builder = builder_iter->second->version_builder();
auto* v = new Version(cfd, version_set_, version_set_->file_options_,
*cfd->GetLatestMutableCFOptions(),
version_set_->current_version_number_++);
builder->SaveTo(v->storage_info());
// Install new version
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
!(version_set_->db_options_->skip_stats_update_on_db_open));
version_set_->AppendVersion(cfd, v);
}
return Status::OK();
}
Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load) {
assert(cfd != nullptr);
assert(!cfd->IsDropped());
Status s;
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
assert(builder_iter->second != nullptr);
VersionBuilder* builder = builder_iter->second->version_builder();
assert(builder);
s = builder->LoadTableHandlers(
cfd->internal_stats(),
version_set_->db_options_->max_file_opening_threads,
prefetch_index_and_filter_in_cache, is_initial_load,
cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
if (s.IsPathNotFound() && no_error_if_table_files_missing_) {
s = Status::OK();
}
if (!s.ok() && !version_set_->db_options_->paranoid_checks) {
s = Status::OK();
}
return s;
}
Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
const VersionEdit& edit) {
Status s;
if (cfd != nullptr) {
if (edit.has_db_id_) {
version_edit_params_.SetDBId(edit.db_id_);
}
if (edit.has_log_number_) {
if (cfd->GetLogNumber() > edit.log_number_) {
ROCKS_LOG_WARN(
version_set_->db_options()->info_log,
"MANIFEST corruption detected, but ignored - Log numbers in "
"records NOT monotonically increasing");
} else {
cfd->SetLogNumber(edit.log_number_);
version_edit_params_.SetLogNumber(edit.log_number_);
}
}
if (edit.has_comparator_ &&
edit.comparator_ != cfd->user_comparator()->Name()) {
s = Status::InvalidArgument(
cfd->user_comparator()->Name(),
"does not match existing comparator " + edit.comparator_);
}
}
if (s.ok()) {
if (edit.has_prev_log_number_) {
version_edit_params_.SetPrevLogNumber(edit.prev_log_number_);
}
if (edit.has_next_file_number_) {
version_edit_params_.SetNextFile(edit.next_file_number_);
}
if (edit.has_max_column_family_) {
version_edit_params_.SetMaxColumnFamily(edit.max_column_family_);
}
if (edit.has_min_log_number_to_keep_) {
version_edit_params_.min_log_number_to_keep_ =
std::max(version_edit_params_.min_log_number_to_keep_,
edit.min_log_number_to_keep_);
}
if (edit.has_last_sequence_) {
version_edit_params_.SetLastSequence(edit.last_sequence_);
}
if (!version_edit_params_.has_prev_log_number_) {
version_edit_params_.SetPrevLogNumber(0);
}
}
return s;
}
VersionEditHandlerPointInTime::VersionEditHandlerPointInTime(
bool read_only, const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set)
: VersionEditHandler(read_only, column_families, version_set,
/*track_missing_files=*/true,
/*no_error_if_table_files_missing=*/true) {}
VersionEditHandlerPointInTime::~VersionEditHandlerPointInTime() {
for (const auto& elem : versions_) {
delete elem.second;
}
versions_.clear();
}
void VersionEditHandlerPointInTime::CheckIterationResult(
const log::Reader& reader, Status* s) {
VersionEditHandler::CheckIterationResult(reader, s);
assert(s != nullptr);
if (s->ok()) {
for (auto* cfd : *(version_set_->column_family_set_)) {
if (cfd->IsDropped()) {
continue;
}
assert(cfd->initialized());
auto v_iter = versions_.find(cfd->GetID());
if (v_iter != versions_.end()) {
assert(v_iter->second != nullptr);
version_set_->AppendVersion(cfd, v_iter->second);
versions_.erase(v_iter);
}
}
}
}
ColumnFamilyData* VersionEditHandlerPointInTime::DestroyCfAndCleanup(
const VersionEdit& edit) {
ColumnFamilyData* cfd = VersionEditHandler::DestroyCfAndCleanup(edit);
auto v_iter = versions_.find(edit.column_family_);
if (v_iter != versions_.end()) {
delete v_iter->second;
versions_.erase(v_iter);
}
return cfd;
}
Status VersionEditHandlerPointInTime::MaybeCreateVersion(
const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) {
assert(cfd != nullptr);
if (!force_create_version) {
assert(edit.column_family_ == cfd->GetID());
}
auto missing_files_iter = cf_to_missing_files_.find(cfd->GetID());
assert(missing_files_iter != cf_to_missing_files_.end());
std::unordered_set<uint64_t>& missing_files = missing_files_iter->second;
const bool prev_has_missing_files = !missing_files.empty();
for (const auto& file : edit.GetDeletedFiles()) {
uint64_t file_num = file.second;
auto fiter = missing_files.find(file_num);
if (fiter != missing_files.end()) {
missing_files.erase(fiter);
}
}
Status s;
for (const auto& elem : edit.GetNewFiles()) {
const FileMetaData& meta = elem.second;
const FileDescriptor& fd = meta.fd;
uint64_t file_num = fd.GetNumber();
const std::string fpath =
MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_num);
s = version_set_->VerifyFileMetadata(fpath, meta);
if (s.IsPathNotFound() || s.IsNotFound()) {
missing_files.insert(file_num);
s = Status::OK();
}
}
bool missing_info = !version_edit_params_.has_log_number_ ||
!version_edit_params_.has_next_file_number_ ||
!version_edit_params_.has_last_sequence_;
// Create version before apply edit
if (!missing_info && ((!missing_files.empty() && !prev_has_missing_files) ||
(missing_files.empty() && force_create_version))) {
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
auto* builder = builder_iter->second->version_builder();
auto* version = new Version(cfd, version_set_, version_set_->file_options_,
*cfd->GetLatestMutableCFOptions(),
version_set_->current_version_number_++);
builder->SaveTo(version->storage_info());
version->PrepareApply(
*cfd->GetLatestMutableCFOptions(),
!version_set_->db_options_->skip_stats_update_on_db_open);
auto v_iter = versions_.find(cfd->GetID());
if (v_iter != versions_.end()) {
delete v_iter->second;
v_iter->second = version;
} else {
versions_.emplace(cfd->GetID(), version);
}
}
return s;
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,123 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "db/version_builder.h"
#include "db/version_edit.h"
#include "db/version_set.h"
namespace ROCKSDB_NAMESPACE {
typedef std::unique_ptr<BaseReferencedVersionBuilder> VersionBuilderUPtr;
// A class used for scanning MANIFEST file.
// VersionEditHandler reads a MANIFEST file, parses the version edits, and
// builds the version set's in-memory state, e.g. the version storage info for
// the versions of column families.
// To use this class and its subclasses,
// 1. Create an object of VersionEditHandler or its subclasses.
// VersionEditHandler handler(read_only, column_families, version_set,
// track_missing_files, ignore_missing_files);
// 2. Status s = handler.Iterate(reader, &db_id);
// 3. Check s and handle possible errors.
//
// Not thread-safe, external synchronization is necessary if an object of
// VersionEditHandler is shared by multiple threads.
class VersionEditHandler {
public:
explicit VersionEditHandler(
bool read_only,
const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set, bool track_missing_files,
bool ignore_missing_files);
virtual ~VersionEditHandler() {}
Status Iterate(log::Reader& reader, std::string* db_id);
const Status& status() const { return status_; }
bool HasMissingFiles() const;
protected:
Status ApplyVersionEdit(VersionEdit& edit, ColumnFamilyData** cfd);
Status OnColumnFamilyAdd(VersionEdit& edit, ColumnFamilyData** cfd);
Status OnColumnFamilyDrop(VersionEdit& edit, ColumnFamilyData** cfd);
Status OnNonCfOperation(VersionEdit& edit, ColumnFamilyData** cfd);
Status Initialize();
void CheckColumnFamilyId(const VersionEdit& edit, bool* cf_in_not_found,
bool* cf_in_builders) const;
virtual void CheckIterationResult(const log::Reader& reader, Status* s);
ColumnFamilyData* CreateCfAndInit(const ColumnFamilyOptions& cf_options,
const VersionEdit& edit);
virtual ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit);
virtual Status MaybeCreateVersion(const VersionEdit& edit,
ColumnFamilyData* cfd,
bool force_create_version);
Status LoadTables(ColumnFamilyData* cfd,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load);
const bool read_only_;
const std::vector<ColumnFamilyDescriptor>& column_families_;
Status status_;
VersionSet* version_set_;
AtomicGroupReadBuffer read_buffer_;
std::unordered_map<uint32_t, VersionBuilderUPtr> builders_;
std::unordered_map<std::string, ColumnFamilyOptions> name_to_options_;
std::unordered_map<uint32_t, std::string> column_families_not_found_;
VersionEditParams version_edit_params_;
const bool track_missing_files_;
std::unordered_map<uint32_t, std::unordered_set<uint64_t>>
cf_to_missing_files_;
bool no_error_if_table_files_missing_;
private:
Status ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
const VersionEdit& edit);
bool initialized_;
};
// A class similar to its base class, i.e. VersionEditHandler.
// VersionEditHandlerPointInTime restores the versions to the most recent point
// in time such that at this point, the version does not have missing files.
//
// Not thread-safe, external synchronization is necessary if an object of
// VersionEditHandlerPointInTime is shared by multiple threads.
class VersionEditHandlerPointInTime : public VersionEditHandler {
public:
VersionEditHandlerPointInTime(
bool read_only,
const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set);
~VersionEditHandlerPointInTime() override;
protected:
void CheckIterationResult(const log::Reader& reader, Status* s) override;
ColumnFamilyData* DestroyCfAndCleanup(const VersionEdit& edit) override;
Status MaybeCreateVersion(const VersionEdit& edit, ColumnFamilyData* cfd,
bool force_create_version) override;
private:
std::unordered_map<uint32_t, Version*> versions_;
};
} // namespace ROCKSDB_NAMESPACE

@ -29,6 +29,7 @@
#include "db/pinned_iterators_manager.h"
#include "db/table_cache.h"
#include "db/version_builder.h"
#include "db/version_edit_handler.h"
#include "file/filename.h"
#include "file/random_access_file_reader.h"
#include "file/read_write_util.h"
@ -1202,28 +1203,6 @@ void LevelIterator::InitFileIterator(size_t new_file_index) {
}
} // anonymous namespace
// A wrapper of version builder which references the current version in
// constructor and unref it in the destructor.
// Both of the constructor and destructor need to be called inside DB Mutex.
class BaseReferencedVersionBuilder {
public:
explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
: version_builder_(new VersionBuilder(
cfd->current()->version_set()->file_options(), cfd->table_cache(),
cfd->current()->storage_info(), cfd->ioptions()->info_log)),
version_(cfd->current()) {
version_->Ref();
}
~BaseReferencedVersionBuilder() {
version_->Unref();
}
VersionBuilder* version_builder() { return version_builder_.get(); }
private:
std::unique_ptr<VersionBuilder> version_builder_;
Version* version_;
};
Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
const FileMetaData* file_meta,
const std::string* fname) const {
@ -3570,6 +3549,33 @@ VersionSet::~VersionSet() {
obsolete_files_.clear();
}
void VersionSet::Reset() {
if (column_family_set_) {
Cache* table_cache = column_family_set_->get_table_cache();
WriteBufferManager* wbm = column_family_set_->write_buffer_manager();
WriteController* wc = column_family_set_->write_controller();
column_family_set_.reset(new ColumnFamilySet(dbname_, db_options_,
file_options_, table_cache,
wbm, wc, block_cache_tracer_));
}
db_id_.clear();
next_file_number_.store(2);
min_log_number_to_keep_2pc_.store(0);
manifest_file_number_ = 0;
options_file_number_ = 0;
pending_manifest_file_number_ = 0;
last_sequence_.store(0);
last_allocated_sequence_.store(0);
last_published_sequence_.store(0);
prev_log_number_ = 0;
descriptor_log_.reset();
current_version_number_ = 0;
manifest_writers_.clear();
manifest_file_size_ = 0;
obsolete_files_.clear();
obsolete_manifests_.clear();
}
void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
Version* v) {
// compute new compaction score
@ -4452,8 +4458,6 @@ Status VersionSet::Recover(
reporter.status = &s;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
true /* checksum */, 0 /* log_number */);
Slice record;
std::string scratch;
AtomicGroupReadBuffer read_buffer;
s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
column_families_not_found, builders,
@ -4581,6 +4585,159 @@ Status VersionSet::Recover(
return s;
}
namespace {
class ManifestPicker {
public:
explicit ManifestPicker(const std::string& dbname, FileSystem* fs);
void SeekToFirstManifest();
// REQUIRES Valid() == true
std::string GetNextManifest(uint64_t* file_number, std::string* file_name);
bool Valid() const { return manifest_file_iter_ != manifest_files_.end(); }
const Status& status() const { return status_; }
private:
const std::string& dbname_;
FileSystem* const fs_;
// MANIFEST file names(s)
std::vector<std::string> manifest_files_;
std::vector<std::string>::const_iterator manifest_file_iter_;
Status status_;
};
ManifestPicker::ManifestPicker(const std::string& dbname, FileSystem* fs)
: dbname_(dbname), fs_(fs) {}
void ManifestPicker::SeekToFirstManifest() {
assert(fs_ != nullptr);
std::vector<std::string> children;
Status s = fs_->GetChildren(dbname_, IOOptions(), &children, /*dbg=*/nullptr);
if (!s.ok()) {
status_ = s;
return;
}
for (const auto& fname : children) {
uint64_t file_num = 0;
FileType file_type;
bool parse_ok = ParseFileName(fname, &file_num, &file_type);
if (parse_ok && file_type == kDescriptorFile) {
manifest_files_.push_back(fname);
}
}
std::sort(manifest_files_.begin(), manifest_files_.end(),
[](const std::string& lhs, const std::string& rhs) {
uint64_t num1 = 0;
uint64_t num2 = 0;
FileType type1;
FileType type2;
bool parse_ok1 = ParseFileName(lhs, &num1, &type1);
bool parse_ok2 = ParseFileName(rhs, &num2, &type2);
#ifndef NDEBUG
assert(parse_ok1);
assert(parse_ok2);
#else
(void)parse_ok1;
(void)parse_ok2;
#endif
return num1 > num2;
});
manifest_file_iter_ = manifest_files_.begin();
}
std::string ManifestPicker::GetNextManifest(uint64_t* number,
std::string* file_name) {
assert(status_.ok());
assert(Valid());
std::string ret;
if (manifest_file_iter_ != manifest_files_.end()) {
ret.assign(dbname_);
if (ret.back() != kFilePathSeparator) {
ret.push_back(kFilePathSeparator);
}
ret.append(*manifest_file_iter_);
if (number) {
FileType type;
bool parse = ParseFileName(*manifest_file_iter_, number, &type);
assert(type == kDescriptorFile);
#ifndef NDEBUG
assert(parse);
#else
(void)parse;
#endif
}
if (file_name) {
*file_name = *manifest_file_iter_;
}
++manifest_file_iter_;
}
return ret;
}
} // namespace
Status VersionSet::TryRecover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
std::string* db_id, bool* has_missing_table_file) {
ManifestPicker manifest_picker(dbname_, fs_);
manifest_picker.SeekToFirstManifest();
Status s = manifest_picker.status();
if (!s.ok()) {
return s;
}
if (!manifest_picker.Valid()) {
return Status::Corruption("Cannot locate MANIFEST file in " + dbname_);
}
std::string manifest_path =
manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
while (!manifest_path.empty()) {
s = TryRecoverFromOneManifest(manifest_path, column_families, read_only,
db_id, has_missing_table_file);
if (s.ok() || !manifest_picker.Valid()) {
break;
}
Reset();
manifest_path =
manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
}
return s;
}
Status VersionSet::TryRecoverFromOneManifest(
const std::string& manifest_path,
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
std::string* db_id, bool* has_missing_table_file) {
ROCKS_LOG_INFO(db_options_->info_log, "Trying to recover from manifest: %s\n",
manifest_path.c_str());
std::unique_ptr<SequentialFileReader> manifest_file_reader;
Status s;
{
std::unique_ptr<FSSequentialFile> manifest_file;
s = fs_->NewSequentialFile(manifest_path,
fs_->OptimizeForManifestRead(file_options_),
&manifest_file, nullptr);
if (!s.ok()) {
return s;
}
manifest_file_reader.reset(
new SequentialFileReader(std::move(manifest_file), manifest_path,
db_options_->log_readahead_size));
}
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
/*checksum=*/true, /*log_num=*/0);
{
VersionEditHandlerPointInTime handler_pit(read_only, column_families,
const_cast<VersionSet*>(this));
s = handler_pit.Iterate(reader, db_id);
assert(nullptr != has_missing_table_file);
*has_missing_table_file = handler_pit.HasMissingFiles();
}
return s;
}
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
const std::string& dbname,
FileSystem* fs) {
@ -5518,7 +5675,7 @@ void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
}
ColumnFamilyData* VersionSet::CreateColumnFamily(
const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
const ColumnFamilyOptions& cf_options, const VersionEdit* edit) {
assert(edit->is_column_family_add_);
MutableCFOptions dummy_cf_options;
@ -5573,6 +5730,18 @@ uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
return total_files_size;
}
Status VersionSet::VerifyFileMetadata(const std::string& fpath,
const FileMetaData& meta) const {
uint64_t fsize = 0;
Status status = fs_->GetFileSize(fpath, IOOptions(), &fsize, nullptr);
if (status.ok()) {
if (fsize != meta.fd.GetFileSize()) {
status = Status::Corruption("File size mismatch: " + fpath);
}
}
return status;
}
ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
const ImmutableDBOptions* _db_options,
const FileOptions& _file_options,

@ -688,6 +688,8 @@ class Version {
FileSystem* fs_;
friend class ReactiveVersionSet;
friend class VersionSet;
friend class VersionEditHandler;
friend class VersionEditHandlerPointInTime;
const InternalKeyComparator* internal_comparator() const {
return storage_info_.internal_comparator_;
@ -876,6 +878,17 @@ class VersionSet {
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, std::string* db_id = nullptr);
Status TryRecover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only, std::string* db_id,
bool* has_missing_table_file);
// Try to recover the version set to the most recent consistent state
// recorded in the specified manifest.
Status TryRecoverFromOneManifest(
const std::string& manifest_path,
const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only, std::string* db_id, bool* has_missing_table_file);
// Reads a manifest file and returns a list of column families in
// column_families.
static Status ListColumnFamilies(std::vector<std::string>* column_families,
@ -1059,6 +1072,8 @@ class VersionSet {
struct ManifestWriter;
friend class Version;
friend class VersionEditHandler;
friend class VersionEditHandlerPointInTime;
friend class DBImpl;
friend class DBImplReadOnly;
@ -1069,6 +1084,8 @@ class VersionSet {
}
};
void Reset();
// Returns approximated offset of a key in a file for a given version.
uint64_t ApproximateOffsetOf(Version* v, const FdWithKeyRange& f,
const Slice& key, TableReaderCaller caller);
@ -1091,7 +1108,7 @@ class VersionSet {
void AppendVersion(ColumnFamilyData* column_family_data, Version* v);
ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options,
VersionEdit* edit);
const VersionEdit* edit);
Status ReadAndRecover(
log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
@ -1115,8 +1132,10 @@ class VersionSet {
const VersionEdit& from_edit,
VersionEditParams* version_edit_params);
std::unique_ptr<ColumnFamilySet> column_family_set_;
Status VerifyFileMetadata(const std::string& fpath,
const FileMetaData& meta) const;
std::unique_ptr<ColumnFamilySet> column_family_set_;
Env* const env_;
FileSystem* const fs_;
const std::string dbname_;

@ -10,6 +10,7 @@
#include "db/version_set.h"
#include "db/db_impl/db_impl.h"
#include "db/log_writer.h"
#include "env/mock_env.h"
#include "logging/logging.h"
#include "table/mock_table.h"
#include "test_util/testharness.h"
@ -611,16 +612,37 @@ class VersionSetTestBase {
const static std::string kColumnFamilyName3;
int num_initial_edits_;
VersionSetTestBase()
: env_(Env::Default()),
fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
dbname_(test::PerThreadDBPath("version_set_test")),
db_options_(),
explicit VersionSetTestBase(const std::string& name)
: mem_env_(nullptr),
env_(nullptr),
env_guard_(),
fs_(),
dbname_(test::PerThreadDBPath(name)),
options_(),
db_options_(options_),
cf_options_(options_),
immutable_cf_options_(db_options_, cf_options_),
mutable_cf_options_(cf_options_),
table_cache_(NewLRUCache(50000, 16)),
write_buffer_manager_(db_options_.db_write_buffer_size),
shutting_down_(false),
mock_table_factory_(std::make_shared<mock::MockTableFactory>()) {
const char* test_env_uri = getenv("TEST_ENV_URI");
Env* base_env = nullptr;
if (test_env_uri) {
Status s = Env::LoadEnv(test_env_uri, &base_env, &env_guard_);
EXPECT_OK(s);
EXPECT_NE(Env::Default(), base_env);
} else {
base_env = Env::Default();
}
EXPECT_NE(nullptr, base_env);
if (getenv("MEM_ENV")) {
mem_env_ = new MockEnv(base_env);
}
env_ = mem_env_ ? mem_env_ : base_env;
fs_ = std::make_shared<LegacyFileSystemWrapper>(env_);
EXPECT_OK(env_->CreateDirIfMissing(dbname_));
db_options_.env = env_;
@ -628,7 +650,7 @@ class VersionSetTestBase {
versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_,
/*block_cache_tracer=*/nullptr)),
/*block_cache_tracer=*/nullptr));
reactive_versions_ = std::make_shared<ReactiveVersionSet>(
dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_);
@ -636,15 +658,30 @@ class VersionSetTestBase {
std::numeric_limits<uint64_t>::max());
}
void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families,
SequenceNumber* last_seqno,
std::unique_ptr<log::Writer>* log_writer) {
virtual ~VersionSetTestBase() {
if (getenv("KEEP_DB")) {
fprintf(stdout, "DB is still at %s\n", dbname_.c_str());
} else {
Options options;
options.env = env_;
EXPECT_OK(DestroyDB(dbname_, options));
}
if (mem_env_) {
delete mem_env_;
mem_env_ = nullptr;
}
}
protected:
virtual void PrepareManifest(
std::vector<ColumnFamilyDescriptor>* column_families,
SequenceNumber* last_seqno, std::unique_ptr<log::Writer>* log_writer) {
assert(column_families != nullptr);
assert(last_seqno != nullptr);
assert(log_writer != nullptr);
VersionEdit new_db;
if (db_options_.write_dbid_to_manifest) {
DBImpl* impl = new DBImpl(DBOptions(), dbname_);
std::unique_ptr<DBImpl> impl(new DBImpl(DBOptions(), dbname_));
std::string db_id;
impl->GetDbIdentityFromIdentityFile(&db_id);
new_db.SetDBId(db_id);
@ -715,12 +752,25 @@ class VersionSetTestBase {
versions_->GetColumnFamilySet()->NumberOfColumnFamilies());
}
void VerifyManifest(std::string* manifest_path) const {
assert(manifest_path != nullptr);
uint64_t manifest_file_number = 0;
Status s = versions_->GetCurrentManifestPath(
dbname_, fs_.get(), manifest_path, &manifest_file_number);
ASSERT_OK(s);
ASSERT_EQ(1, manifest_file_number);
}
MockEnv* mem_env_;
Env* env_;
std::shared_ptr<Env> env_guard_;
std::shared_ptr<FileSystem> fs_;
const std::string dbname_;
EnvOptions env_options_;
Options options_;
ImmutableDBOptions db_options_;
ColumnFamilyOptions cf_options_;
ImmutableCFOptions immutable_cf_options_;
MutableCFOptions mutable_cf_options_;
std::shared_ptr<Cache> table_cache_;
WriteController write_controller_;
@ -738,7 +788,7 @@ const std::string VersionSetTestBase::kColumnFamilyName3 = "charles";
class VersionSetTest : public VersionSetTestBase, public testing::Test {
public:
VersionSetTest() : VersionSetTestBase() {}
VersionSetTest() : VersionSetTestBase("version_set_test") {}
};
TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
@ -780,7 +830,8 @@ TEST_F(VersionSetTest, SameColumnFamilyGroupCommit) {
class VersionSetAtomicGroupTest : public VersionSetTestBase,
public testing::Test {
public:
VersionSetAtomicGroupTest() : VersionSetTestBase() {}
VersionSetAtomicGroupTest()
: VersionSetTestBase("version_set_atomic_group_test") {}
void SetUp() override {
PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
@ -1164,7 +1215,8 @@ TEST_F(VersionSetAtomicGroupTest,
class VersionSetTestDropOneCF : public VersionSetTestBase,
public testing::TestWithParam<std::string> {
public:
VersionSetTestDropOneCF() : VersionSetTestBase() {}
VersionSetTestDropOneCF()
: VersionSetTestBase("version_set_test_drop_one_cf") {}
};
// This test simulates the following execution sequence
@ -1279,6 +1331,706 @@ INSTANTIATE_TEST_CASE_P(
testing::Values(VersionSetTestBase::kColumnFamilyName1,
VersionSetTestBase::kColumnFamilyName2,
VersionSetTestBase::kColumnFamilyName3));
class EmptyDefaultCfNewManifest : public VersionSetTestBase,
public testing::Test {
public:
EmptyDefaultCfNewManifest() : VersionSetTestBase("version_set_new_db_test") {}
// Emulate DBImpl::NewDB()
void PrepareManifest(std::vector<ColumnFamilyDescriptor>* /*column_families*/,
SequenceNumber* /*last_seqno*/,
std::unique_ptr<log::Writer>* log_writer) override {
assert(log_writer != nullptr);
VersionEdit new_db;
new_db.SetLogNumber(0);
std::unique_ptr<WritableFile> file;
const std::string manifest_path = DescriptorFileName(dbname_, 1);
Status s = env_->NewWritableFile(
manifest_path, &file, env_->OptimizeForManifestWrite(env_options_));
ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)),
manifest_path, env_options_));
log_writer->reset(new log::Writer(std::move(file_writer), 0, true));
std::string record;
ASSERT_TRUE(new_db.EncodeTo(&record));
s = (*log_writer)->AddRecord(record);
ASSERT_OK(s);
// Create new column family
VersionEdit new_cf;
new_cf.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1);
new_cf.SetColumnFamily(1);
new_cf.SetLastSequence(2);
new_cf.SetNextFile(2);
record.clear();
ASSERT_TRUE(new_cf.EncodeTo(&record));
s = (*log_writer)->AddRecord(record);
ASSERT_OK(s);
}
protected:
bool write_dbid_to_manifest_ = false;
std::unique_ptr<log::Writer> log_writer_;
};
// Create db, create column family. Cf creation will switch to a new MANIFEST.
// Then reopen db, trying to recover.
TEST_F(EmptyDefaultCfNewManifest, Recover) {
PrepareManifest(nullptr, nullptr, &log_writer_);
log_writer_.reset();
Status s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr);
ASSERT_OK(s);
std::string manifest_path;
VerifyManifest(&manifest_path);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);
column_families.emplace_back(VersionSetTestBase::kColumnFamilyName1,
cf_options_);
std::string db_id;
bool has_missing_table_file = false;
s = versions_->TryRecoverFromOneManifest(
manifest_path, column_families, false, &db_id, &has_missing_table_file);
ASSERT_OK(s);
ASSERT_FALSE(has_missing_table_file);
}
class VersionSetTestEmptyDb
: public VersionSetTestBase,
public testing::TestWithParam<
std::tuple<bool, bool, std::vector<std::string>>> {
public:
static const std::string kUnknownColumnFamilyName;
VersionSetTestEmptyDb() : VersionSetTestBase("version_set_test_empty_db") {}
protected:
void PrepareManifest(std::vector<ColumnFamilyDescriptor>* /*column_families*/,
SequenceNumber* /*last_seqno*/,
std::unique_ptr<log::Writer>* log_writer) override {
assert(nullptr != log_writer);
VersionEdit new_db;
if (db_options_.write_dbid_to_manifest) {
std::unique_ptr<DBImpl> impl(new DBImpl(DBOptions(), dbname_));
std::string db_id;
impl->GetDbIdentityFromIdentityFile(&db_id);
new_db.SetDBId(db_id);
}
const std::string manifest_path = DescriptorFileName(dbname_, 1);
std::unique_ptr<WritableFile> file;
Status s = env_->NewWritableFile(
manifest_path, &file, env_->OptimizeForManifestWrite(env_options_));
ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(file)),
manifest_path, env_options_));
{
log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
std::string record;
new_db.EncodeTo(&record);
s = (*log_writer)->AddRecord(record);
ASSERT_OK(s);
}
}
std::unique_ptr<log::Writer> log_writer_;
};
const std::string VersionSetTestEmptyDb::kUnknownColumnFamilyName = "unknown";
TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest0) {
db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
PrepareManifest(nullptr, nullptr, &log_writer_);
log_writer_.reset();
Status s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr);
ASSERT_OK(s);
std::string manifest_path;
VerifyManifest(&manifest_path);
bool read_only = std::get<1>(GetParam());
const std::vector<std::string> cf_names = std::get<2>(GetParam());
std::vector<ColumnFamilyDescriptor> column_families;
for (const auto& cf_name : cf_names) {
column_families.emplace_back(cf_name, cf_options_);
}
std::string db_id;
bool has_missing_table_file = false;
s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
read_only, &db_id,
&has_missing_table_file);
auto iter =
std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
if (iter == cf_names.end()) {
ASSERT_TRUE(s.IsInvalidArgument());
} else {
ASSERT_TRUE(s.IsCorruption());
}
}
TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest1) {
db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
PrepareManifest(nullptr, nullptr, &log_writer_);
// Only a subset of column families in the MANIFEST.
VersionEdit new_cf1;
new_cf1.AddColumnFamily(VersionSetTestBase::kColumnFamilyName1);
new_cf1.SetColumnFamily(1);
Status s;
{
std::string record;
new_cf1.EncodeTo(&record);
s = log_writer_->AddRecord(record);
ASSERT_OK(s);
}
log_writer_.reset();
s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr);
ASSERT_OK(s);
std::string manifest_path;
VerifyManifest(&manifest_path);
bool read_only = std::get<1>(GetParam());
const std::vector<std::string>& cf_names = std::get<2>(GetParam());
std::vector<ColumnFamilyDescriptor> column_families;
for (const auto& cf_name : cf_names) {
column_families.emplace_back(cf_name, cf_options_);
}
std::string db_id;
bool has_missing_table_file = false;
s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
read_only, &db_id,
&has_missing_table_file);
auto iter =
std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
if (iter == cf_names.end()) {
ASSERT_TRUE(s.IsInvalidArgument());
} else {
ASSERT_TRUE(s.IsCorruption());
}
}
TEST_P(VersionSetTestEmptyDb, OpenFromInCompleteManifest2) {
db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
PrepareManifest(nullptr, nullptr, &log_writer_);
// Write all column families but no log_number, next_file_number and
// last_sequence.
const std::vector<std::string> all_cf_names = {
kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
kColumnFamilyName3};
uint32_t cf_id = 1;
Status s;
for (size_t i = 1; i != all_cf_names.size(); ++i) {
VersionEdit new_cf;
new_cf.AddColumnFamily(all_cf_names[i]);
new_cf.SetColumnFamily(cf_id++);
std::string record;
ASSERT_TRUE(new_cf.EncodeTo(&record));
s = log_writer_->AddRecord(record);
ASSERT_OK(s);
}
log_writer_.reset();
s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr);
ASSERT_OK(s);
std::string manifest_path;
VerifyManifest(&manifest_path);
bool read_only = std::get<1>(GetParam());
const std::vector<std::string>& cf_names = std::get<2>(GetParam());
std::vector<ColumnFamilyDescriptor> column_families;
for (const auto& cf_name : cf_names) {
column_families.emplace_back(cf_name, cf_options_);
}
std::string db_id;
bool has_missing_table_file = false;
s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
read_only, &db_id,
&has_missing_table_file);
auto iter =
std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
if (iter == cf_names.end()) {
ASSERT_TRUE(s.IsInvalidArgument());
} else {
ASSERT_TRUE(s.IsCorruption());
}
}
TEST_P(VersionSetTestEmptyDb, OpenManifestWithUnknownCF) {
db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
PrepareManifest(nullptr, nullptr, &log_writer_);
// Write all column families but no log_number, next_file_number and
// last_sequence.
const std::vector<std::string> all_cf_names = {
kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
kColumnFamilyName3};
uint32_t cf_id = 1;
Status s;
for (size_t i = 1; i != all_cf_names.size(); ++i) {
VersionEdit new_cf;
new_cf.AddColumnFamily(all_cf_names[i]);
new_cf.SetColumnFamily(cf_id++);
std::string record;
ASSERT_TRUE(new_cf.EncodeTo(&record));
s = log_writer_->AddRecord(record);
ASSERT_OK(s);
}
{
VersionEdit tmp_edit;
tmp_edit.SetColumnFamily(4);
tmp_edit.SetLogNumber(0);
tmp_edit.SetNextFile(2);
tmp_edit.SetLastSequence(0);
std::string record;
ASSERT_TRUE(tmp_edit.EncodeTo(&record));
s = log_writer_->AddRecord(record);
ASSERT_OK(s);
}
log_writer_.reset();
s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr);
ASSERT_OK(s);
std::string manifest_path;
VerifyManifest(&manifest_path);
bool read_only = std::get<1>(GetParam());
const std::vector<std::string>& cf_names = std::get<2>(GetParam());
std::vector<ColumnFamilyDescriptor> column_families;
for (const auto& cf_name : cf_names) {
column_families.emplace_back(cf_name, cf_options_);
}
std::string db_id;
bool has_missing_table_file = false;
s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
read_only, &db_id,
&has_missing_table_file);
auto iter =
std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
if (iter == cf_names.end()) {
ASSERT_TRUE(s.IsInvalidArgument());
} else {
ASSERT_TRUE(s.IsCorruption());
}
}
TEST_P(VersionSetTestEmptyDb, OpenCompleteManifest) {
db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
PrepareManifest(nullptr, nullptr, &log_writer_);
// Write all column families but no log_number, next_file_number and
// last_sequence.
const std::vector<std::string> all_cf_names = {
kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
kColumnFamilyName3};
uint32_t cf_id = 1;
Status s;
for (size_t i = 1; i != all_cf_names.size(); ++i) {
VersionEdit new_cf;
new_cf.AddColumnFamily(all_cf_names[i]);
new_cf.SetColumnFamily(cf_id++);
std::string record;
ASSERT_TRUE(new_cf.EncodeTo(&record));
s = log_writer_->AddRecord(record);
ASSERT_OK(s);
}
{
VersionEdit tmp_edit;
tmp_edit.SetLogNumber(0);
tmp_edit.SetNextFile(2);
tmp_edit.SetLastSequence(0);
std::string record;
ASSERT_TRUE(tmp_edit.EncodeTo(&record));
s = log_writer_->AddRecord(record);
ASSERT_OK(s);
}
log_writer_.reset();
s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr);
ASSERT_OK(s);
std::string manifest_path;
VerifyManifest(&manifest_path);
bool read_only = std::get<1>(GetParam());
const std::vector<std::string>& cf_names = std::get<2>(GetParam());
std::vector<ColumnFamilyDescriptor> column_families;
for (const auto& cf_name : cf_names) {
column_families.emplace_back(cf_name, cf_options_);
}
std::string db_id;
bool has_missing_table_file = false;
s = versions_->TryRecoverFromOneManifest(manifest_path, column_families,
read_only, &db_id,
&has_missing_table_file);
auto iter =
std::find(cf_names.begin(), cf_names.end(), kDefaultColumnFamilyName);
if (iter == cf_names.end()) {
ASSERT_TRUE(s.IsInvalidArgument());
} else if (read_only) {
ASSERT_OK(s);
ASSERT_FALSE(has_missing_table_file);
} else if (cf_names.size() == all_cf_names.size()) {
ASSERT_OK(s);
ASSERT_FALSE(has_missing_table_file);
} else if (cf_names.size() < all_cf_names.size()) {
ASSERT_TRUE(s.IsInvalidArgument());
} else {
ASSERT_OK(s);
ASSERT_FALSE(has_missing_table_file);
ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(
kUnknownColumnFamilyName);
ASSERT_EQ(nullptr, cfd);
}
}
INSTANTIATE_TEST_CASE_P(
BestEffortRecovery, VersionSetTestEmptyDb,
testing::Combine(
/*write_dbid_to_manifest=*/testing::Bool(),
/*read_only=*/testing::Bool(),
/*cf_names=*/
testing::Values(
std::vector<std::string>(),
std::vector<std::string>({kDefaultColumnFamilyName}),
std::vector<std::string>({VersionSetTestBase::kColumnFamilyName1,
VersionSetTestBase::kColumnFamilyName2,
VersionSetTestBase::kColumnFamilyName3}),
std::vector<std::string>({kDefaultColumnFamilyName,
VersionSetTestBase::kColumnFamilyName1}),
std::vector<std::string>({kDefaultColumnFamilyName,
VersionSetTestBase::kColumnFamilyName1,
VersionSetTestBase::kColumnFamilyName2,
VersionSetTestBase::kColumnFamilyName3}),
std::vector<std::string>(
{kDefaultColumnFamilyName,
VersionSetTestBase::kColumnFamilyName1,
VersionSetTestBase::kColumnFamilyName2,
VersionSetTestBase::kColumnFamilyName3,
VersionSetTestEmptyDb::kUnknownColumnFamilyName}))));
class VersionSetTestMissingFiles : public VersionSetTestBase,
public testing::Test {
public:
VersionSetTestMissingFiles()
: VersionSetTestBase("version_set_test_missing_files"),
block_based_table_options_(),
table_factory_(std::make_shared<BlockBasedTableFactory>(
block_based_table_options_)),
internal_comparator_(
std::make_shared<InternalKeyComparator>(options_.comparator)) {}
protected:
void PrepareManifest(std::vector<ColumnFamilyDescriptor>* column_families,
SequenceNumber* last_seqno,
std::unique_ptr<log::Writer>* log_writer) override {
assert(column_families != nullptr);
assert(last_seqno != nullptr);
assert(log_writer != nullptr);
const std::string manifest = DescriptorFileName(dbname_, 1);
std::unique_ptr<WritableFile> file;
Status s = env_->NewWritableFile(
manifest, &file, env_->OptimizeForManifestWrite(env_options_));
ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
NewLegacyWritableFileWrapper(std::move(file)), manifest, env_options_));
log_writer->reset(new log::Writer(std::move(file_writer), 0, false));
VersionEdit new_db;
if (db_options_.write_dbid_to_manifest) {
std::unique_ptr<DBImpl> impl(new DBImpl(DBOptions(), dbname_));
std::string db_id;
impl->GetDbIdentityFromIdentityFile(&db_id);
new_db.SetDBId(db_id);
}
{
std::string record;
ASSERT_TRUE(new_db.EncodeTo(&record));
s = (*log_writer)->AddRecord(record);
ASSERT_OK(s);
}
const std::vector<std::string> cf_names = {
kDefaultColumnFamilyName, kColumnFamilyName1, kColumnFamilyName2,
kColumnFamilyName3};
uint32_t cf_id = 1; // default cf id is 0
cf_options_.table_factory = table_factory_;
for (const auto& cf_name : cf_names) {
column_families->emplace_back(cf_name, cf_options_);
if (cf_name == kDefaultColumnFamilyName) {
continue;
}
VersionEdit new_cf;
new_cf.AddColumnFamily(cf_name);
new_cf.SetColumnFamily(cf_id);
std::string record;
ASSERT_TRUE(new_cf.EncodeTo(&record));
s = (*log_writer)->AddRecord(record);
ASSERT_OK(s);
VersionEdit cf_files;
cf_files.SetColumnFamily(cf_id);
cf_files.SetLogNumber(0);
record.clear();
ASSERT_TRUE(cf_files.EncodeTo(&record));
s = (*log_writer)->AddRecord(record);
ASSERT_OK(s);
++cf_id;
}
SequenceNumber seq = 2;
{
VersionEdit edit;
edit.SetNextFile(7);
edit.SetLastSequence(seq);
std::string record;
ASSERT_TRUE(edit.EncodeTo(&record));
s = (*log_writer)->AddRecord(record);
ASSERT_OK(s);
}
*last_seqno = seq + 1;
}
struct SstInfo {
uint64_t file_number;
std::string column_family;
std::string key; // the only key
int level = 0;
SstInfo(uint64_t file_num, const std::string& cf_name,
const std::string& _key)
: SstInfo(file_num, cf_name, _key, 0) {}
SstInfo(uint64_t file_num, const std::string& cf_name,
const std::string& _key, int lvl)
: file_number(file_num),
column_family(cf_name),
key(_key),
level(lvl) {}
};
// Create dummy sst, return their metadata. Note that only file name and size
// are used.
void CreateDummyTableFiles(const std::vector<SstInfo>& file_infos,
std::vector<FileMetaData>* file_metas) {
assert(file_metas != nullptr);
for (const auto& info : file_infos) {
uint64_t file_num = info.file_number;
std::string fname = MakeTableFileName(dbname_, file_num);
std::unique_ptr<FSWritableFile> file;
Status s = fs_->NewWritableFile(fname, FileOptions(), &file, nullptr);
ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> fwriter(
new WritableFileWriter(std::move(file), fname, FileOptions(), env_));
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
std::unique_ptr<TableBuilder> builder(table_factory_->NewTableBuilder(
TableBuilderOptions(
immutable_cf_options_, mutable_cf_options_, *internal_comparator_,
&int_tbl_prop_collector_factories, kNoCompression,
/*_sample_for_compression=*/0, CompressionOptions(),
/*_skip_filters=*/false, info.column_family, info.level),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
fwriter.get()));
InternalKey ikey(info.key, 0, ValueType::kTypeValue);
builder->Add(ikey.Encode(), "value");
ASSERT_OK(builder->Finish());
fwriter->Flush();
uint64_t file_size = 0;
s = fs_->GetFileSize(fname, IOOptions(), &file_size, nullptr);
ASSERT_OK(s);
ASSERT_NE(0, file_size);
FileMetaData meta;
meta = FileMetaData(file_num, /*file_path_id=*/0, file_size, ikey, ikey,
0, 0, false, 0, 0, 0, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
file_metas->emplace_back(meta);
}
}
// This method updates last_sequence_.
void WriteFileAdditionAndDeletionToManifest(
uint32_t cf, const std::vector<std::pair<int, FileMetaData>>& added_files,
const std::vector<std::pair<int, uint64_t>>& deleted_files) {
VersionEdit edit;
edit.SetColumnFamily(cf);
for (const auto& elem : added_files) {
int level = elem.first;
edit.AddFile(level, elem.second);
}
for (const auto& elem : deleted_files) {
int level = elem.first;
edit.DeleteFile(level, elem.second);
}
edit.SetLastSequence(last_seqno_);
++last_seqno_;
assert(log_writer_.get() != nullptr);
std::string record;
ASSERT_TRUE(edit.EncodeTo(&record));
Status s = log_writer_->AddRecord(record);
ASSERT_OK(s);
}
BlockBasedTableOptions block_based_table_options_;
std::shared_ptr<TableFactory> table_factory_;
std::shared_ptr<InternalKeyComparator> internal_comparator_;
std::vector<ColumnFamilyDescriptor> column_families_;
SequenceNumber last_seqno_;
std::unique_ptr<log::Writer> log_writer_;
};
TEST_F(VersionSetTestMissingFiles, ManifestFarBehindSst) {
std::vector<SstInfo> existing_files = {
SstInfo(100, kDefaultColumnFamilyName, "a"),
SstInfo(102, kDefaultColumnFamilyName, "b"),
SstInfo(103, kDefaultColumnFamilyName, "c"),
SstInfo(107, kDefaultColumnFamilyName, "d"),
SstInfo(110, kDefaultColumnFamilyName, "e")};
std::vector<FileMetaData> file_metas;
CreateDummyTableFiles(existing_files, &file_metas);
PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
std::vector<std::pair<int, FileMetaData>> added_files;
for (uint64_t file_num = 10; file_num < 15; ++file_num) {
std::string smallest_ukey = "a";
std::string largest_ukey = "b";
InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue);
InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue);
FileMetaData meta =
FileMetaData(file_num, /*file_path_id=*/0, /*file_size=*/12,
smallest_ikey, largest_ikey, 0, 0, false, 0, 0, 0,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
added_files.emplace_back(0, meta);
}
WriteFileAdditionAndDeletionToManifest(
/*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
std::vector<std::pair<int, uint64_t>> deleted_files;
deleted_files.emplace_back(0, 10);
WriteFileAdditionAndDeletionToManifest(
/*cf=*/0, std::vector<std::pair<int, FileMetaData>>(), deleted_files);
log_writer_.reset();
Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
ASSERT_OK(s);
std::string manifest_path;
VerifyManifest(&manifest_path);
std::string db_id;
bool has_missing_table_file = false;
s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_,
/*read_only=*/false, &db_id,
&has_missing_table_file);
ASSERT_OK(s);
ASSERT_TRUE(has_missing_table_file);
for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
VersionStorageInfo* vstorage = cfd->current()->storage_info();
const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
ASSERT_TRUE(files.empty());
}
}
TEST_F(VersionSetTestMissingFiles, ManifestAheadofSst) {
std::vector<SstInfo> existing_files = {
SstInfo(100, kDefaultColumnFamilyName, "a"),
SstInfo(102, kDefaultColumnFamilyName, "b"),
SstInfo(103, kDefaultColumnFamilyName, "c"),
SstInfo(107, kDefaultColumnFamilyName, "d"),
SstInfo(110, kDefaultColumnFamilyName, "e")};
std::vector<FileMetaData> file_metas;
CreateDummyTableFiles(existing_files, &file_metas);
PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
std::vector<std::pair<int, FileMetaData>> added_files;
for (size_t i = 3; i != 5; ++i) {
added_files.emplace_back(0, file_metas[i]);
}
WriteFileAdditionAndDeletionToManifest(
/*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
added_files.clear();
for (uint64_t file_num = 120; file_num < 130; ++file_num) {
std::string smallest_ukey = "a";
std::string largest_ukey = "b";
InternalKey smallest_ikey(smallest_ukey, 1, ValueType::kTypeValue);
InternalKey largest_ikey(largest_ukey, 1, ValueType::kTypeValue);
FileMetaData meta =
FileMetaData(file_num, /*file_path_id=*/0, /*file_size=*/12,
smallest_ikey, largest_ikey, 0, 0, false, 0, 0, 0,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
added_files.emplace_back(0, meta);
}
WriteFileAdditionAndDeletionToManifest(
/*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
log_writer_.reset();
Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
ASSERT_OK(s);
std::string manifest_path;
VerifyManifest(&manifest_path);
std::string db_id;
bool has_missing_table_file = false;
s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_,
/*read_only=*/false, &db_id,
&has_missing_table_file);
ASSERT_OK(s);
ASSERT_TRUE(has_missing_table_file);
for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
VersionStorageInfo* vstorage = cfd->current()->storage_info();
const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
if (cfd->GetName() == kDefaultColumnFamilyName) {
ASSERT_EQ(2, files.size());
for (const auto* fmeta : files) {
if (fmeta->fd.GetNumber() != 107 && fmeta->fd.GetNumber() != 110) {
ASSERT_FALSE(true);
}
}
} else {
ASSERT_TRUE(files.empty());
}
}
}
TEST_F(VersionSetTestMissingFiles, NoFileMissing) {
std::vector<SstInfo> existing_files = {
SstInfo(100, kDefaultColumnFamilyName, "a"),
SstInfo(102, kDefaultColumnFamilyName, "b"),
SstInfo(103, kDefaultColumnFamilyName, "c"),
SstInfo(107, kDefaultColumnFamilyName, "d"),
SstInfo(110, kDefaultColumnFamilyName, "e")};
std::vector<FileMetaData> file_metas;
CreateDummyTableFiles(existing_files, &file_metas);
PrepareManifest(&column_families_, &last_seqno_, &log_writer_);
std::vector<std::pair<int, FileMetaData>> added_files;
for (const auto& meta : file_metas) {
added_files.emplace_back(0, meta);
}
WriteFileAdditionAndDeletionToManifest(
/*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
std::vector<std::pair<int, uint64_t>> deleted_files;
deleted_files.emplace_back(/*level=*/0, 100);
WriteFileAdditionAndDeletionToManifest(
/*cf=*/0, std::vector<std::pair<int, FileMetaData>>(), deleted_files);
log_writer_.reset();
Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
ASSERT_OK(s);
std::string manifest_path;
VerifyManifest(&manifest_path);
std::string db_id;
bool has_missing_table_file = false;
s = versions_->TryRecoverFromOneManifest(manifest_path, column_families_,
/*read_only=*/false, &db_id,
&has_missing_table_file);
ASSERT_OK(s);
ASSERT_FALSE(has_missing_table_file);
for (ColumnFamilyData* cfd : *(versions_->GetColumnFamilySet())) {
VersionStorageInfo* vstorage = cfd->current()->storage_info();
const std::vector<FileMetaData*>& files = vstorage->LevelFiles(0);
if (cfd->GetName() == kDefaultColumnFamilyName) {
ASSERT_EQ(existing_files.size() - deleted_files.size(), files.size());
bool has_deleted_file = false;
for (const auto* fmeta : files) {
if (fmeta->fd.GetNumber() == 100) {
has_deleted_file = true;
break;
}
}
ASSERT_FALSE(has_deleted_file);
} else {
ASSERT_TRUE(files.empty());
}
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

12
env/mock_env.cc vendored

@ -10,6 +10,7 @@
#include "env/mock_env.h"
#include <algorithm>
#include <chrono>
#include "file/filename.h"
#include "port/sys_time.h"
#include "util/cast_util.h"
#include "util/murmurhash.h"
@ -747,17 +748,6 @@ Status MockEnv::CorruptBuffer(const std::string& fname) {
return Status::OK();
}
std::string MockEnv::NormalizePath(const std::string path) {
std::string dst;
for (auto c : path) {
if (!dst.empty() && c == '/' && dst.back() == '/') {
continue;
}
dst.push_back(c);
}
return dst;
}
void MockEnv::FakeSleepForMicroseconds(int64_t micros) {
fake_sleep_micros_.fetch_add(micros);
}

2
env/mock_env.h vendored

@ -106,8 +106,6 @@ class MockEnv : public EnvWrapper {
void FakeSleepForMicroseconds(int64_t micros);
private:
std::string NormalizePath(const std::string path);
// Map from filenames to MemFile objects, representing a simple file system.
typedef std::map<std::string, MemFile*> FileSystem;
port::Mutex mutex_;

@ -453,4 +453,16 @@ Status GetInfoLogFiles(Env* env, const std::string& db_log_dir,
return Status::OK();
}
std::string NormalizePath(const std::string& path) {
std::string dst;
for (auto c : path) {
if (!dst.empty() && c == kFilePathSeparator &&
dst.back() == kFilePathSeparator) {
continue;
}
dst.push_back(c);
}
return dst;
}
} // namespace ROCKSDB_NAMESPACE

@ -29,6 +29,12 @@ class Env;
class Directory;
class WritableFileWriter;
#ifdef OS_WIN
const char kFilePathSeparator = '\\';
#else
const char kFilePathSeparator = '/';
#endif
enum FileType {
kLogFile,
kDBLockFile,
@ -183,4 +189,6 @@ extern Status GetInfoLogFiles(Env* env, const std::string& db_log_dir,
const std::string& dbname,
std::string* parent_dir,
std::vector<std::string>* file_names);
extern std::string NormalizePath(const std::string& path);
} // namespace ROCKSDB_NAMESPACE

@ -1130,6 +1130,16 @@ struct DBOptions {
//
// Default: nullptr
std::shared_ptr<FileChecksumFunc> sst_file_checksum_func = nullptr;
// By default, RocksDB recovery fails if any table file referenced in
// MANIFEST are missing after scanning the MANIFEST.
// Best-efforts recovery is another recovery mode that
// tries to restore the database to the most recent point in time without
// missing file.
// Currently not compatible with atomic flush. Furthermore, WAL files will
// not be used for recovery if best_efforts_recovery is true.
// Default: false
bool best_efforts_recovery = false;
};
// Options to control the behavior of a database (passed to DB::Open)

@ -95,7 +95,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
persist_stats_to_disk(options.persist_stats_to_disk),
write_dbid_to_manifest(options.write_dbid_to_manifest),
log_readahead_size(options.log_readahead_size),
sst_file_checksum_func(options.sst_file_checksum_func) {
sst_file_checksum_func(options.sst_file_checksum_func),
best_efforts_recovery(options.best_efforts_recovery) {
}
void ImmutableDBOptions::Dump(Logger* log) const {
@ -250,6 +251,8 @@ void ImmutableDBOptions::Dump(Logger* log) const {
sst_file_checksum_func
? sst_file_checksum_func->Name()
: kUnknownFileChecksumFuncName.c_str());
ROCKS_LOG_HEADER(log, " Options.best_efforts_recovery: %d",
static_cast<int>(best_efforts_recovery));
}
MutableDBOptions::MutableDBOptions()

@ -88,6 +88,7 @@ struct ImmutableDBOptions {
bool write_dbid_to_manifest;
size_t log_readahead_size;
std::shared_ptr<FileChecksumFunc> sst_file_checksum_func;
bool best_efforts_recovery;
};
struct MutableDBOptions {

@ -145,6 +145,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
immutable_db_options.avoid_unnecessary_blocking_io;
options.log_readahead_size = immutable_db_options.log_readahead_size;
options.sst_file_checksum_func = immutable_db_options.sst_file_checksum_func;
options.best_efforts_recovery = immutable_db_options.best_efforts_recovery;
return options;
}
@ -1681,6 +1682,9 @@ std::unordered_map<std::string, OptionTypeInfo>
{"log_readahead_size",
{offsetof(struct DBOptions, log_readahead_size), OptionType::kSizeT,
OptionVerificationType::kNormal, false, 0}},
{"best_efforts_recovery",
{offsetof(struct DBOptions, best_efforts_recovery),
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}},
};
std::unordered_map<std::string, BlockBasedTableOptions::IndexType>

@ -303,7 +303,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"atomic_flush=false;"
"avoid_unnecessary_blocking_io=false;"
"log_readahead_size=0;"
"write_dbid_to_manifest=false",
"write_dbid_to_manifest=false;"
"best_efforts_recovery=false",
new_options));
ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions),

@ -59,6 +59,7 @@ LIB_SOURCES = \
db/trim_history_scheduler.cc \
db/version_builder.cc \
db/version_edit.cc \
db/version_edit_handler.cc \
db/version_set.cc \
db/wal_manager.cc \
db/write_batch.cc \

Loading…
Cancel
Save