Summary: In this diff I present you BackupableDB v1. You can easily use it to backup your DB and it will do incremental snapshots for you. Let's first describe how you would use BackupableDB. It's inheriting StackableDB interface so you can easily construct it with your DB object -- it will add a method RollTheSnapshot() to the DB object. When you call RollTheSnapshot(), current snapshot of the DB will be stored in the backup dir. To restore, you can just call RestoreDBFromBackup() on a BackupableDB (which is a static method) and it will restore all files from the backup dir. In the next version, it will even support automatic backuping every X minutes. There are multiple things you can configure: 1. backup_env and db_env can be different, which is awesome because then you can easily backup to HDFS or wherever you feel like. 2. sync - if true, it *guarantees* backup consistency on machine reboot 3. number of snapshots to keep - this will keep last N snapshots around if you want, for some reason, be able to restore from an earlier snapshot. All the backuping is done in incremental fashion - if we already have 00010.sst, we will not copy it again. *IMPORTANT* -- This is based on assumption that 00010.sst never changes - two files named 00010.sst from the same DB will always be exactly the same. Is this true? I always copy manifest, current and log files. 4. You can decide if you want to flush the memtables before you backup, or you're fine with backing up the log files -- either way, you get a complete and consistent view of the database at a time of backup. 5. More things you can find in BackupableDBOptions Here is the directory structure I use: backup_dir/CURRENT_SNAPSHOT - just 4 bytes holding the latest snapshot 0, 1, 2, ... - files containing serialized version of each snapshot - containing a list of files files/*.sst - sst files shared between snapshots - if one snapshot references 00010.sst and another one needs to backup it from the DB, it will just reference the same file files/ 0/, 1/, 2/, ... - snapshot directories containing private snapshot files - current, manifest and log files All the files are ref counted and deleted immediatelly when they get out of scope. Some other stuff in this diff: 1. Added GetEnv() method to the DB. Discussed with @haobo and we agreed that it seems right thing to do. 2. Fixed StackableDB interface. The way it was set up before, I was not able to implement BackupableDB. Test Plan: I have a unittest, but please don't look at this yet. I just hacked it up to help me with debugging. I will write a lot of good tests and update the diff. Also, `make asan_check` Reviewers: dhruba, haobo, emayanke Reviewed By: dhruba CC: leveldb, haobo Differential Revision: https://reviews.facebook.net/D14295main
parent
26bc40a89a
commit
fb9fce4fc3
@ -0,0 +1,128 @@ |
||||
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#pragma once |
||||
#include "utilities/stackable_db.h" |
||||
#include "rocksdb/env.h" |
||||
#include "rocksdb/status.h" |
||||
|
||||
#include <string> |
||||
#include <map> |
||||
#include <vector> |
||||
|
||||
namespace rocksdb { |
||||
|
||||
struct BackupableDBOptions { |
||||
// Where to keep the backup files. Has to be different than dbname_
|
||||
// Best to set this to dbname_ + "/backups"
|
||||
// Required
|
||||
std::string backup_dir; |
||||
|
||||
// Backup Env object. It will be used for backup file I/O. If it's
|
||||
// nullptr, backups will be written out using DBs Env. If it's
|
||||
// non-nullptr, backup's I/O will be performed using this object.
|
||||
// If you want to have backups on HDFS, use HDFS Env here!
|
||||
// Default: nullptr
|
||||
Env* backup_env; |
||||
|
||||
// Backup info and error messages will be written to info_log
|
||||
// if non-nullptr.
|
||||
// Default: nullptr
|
||||
Logger* info_log; |
||||
|
||||
// If sync == true, we can guarantee you'll get consistent backup even
|
||||
// on a machine crash/reboot. Backup process is slower with sync enabled.
|
||||
// If sync == false, we don't guarantee anything on machine reboot. However,
|
||||
// chances are some of the backups are consistent.
|
||||
// Default: true
|
||||
bool sync; |
||||
|
||||
// If true, it will delete whatever backups there are already
|
||||
// Default: false
|
||||
bool destroy_old_data; |
||||
|
||||
explicit BackupableDBOptions(const std::string& _backup_dir, |
||||
Env* _backup_env = nullptr, |
||||
Logger* _info_log = nullptr, |
||||
bool _sync = true, |
||||
bool _destroy_old_data = false) : |
||||
backup_dir(_backup_dir), |
||||
backup_env(_backup_env), |
||||
info_log(_info_log), |
||||
sync(_sync), |
||||
destroy_old_data(_destroy_old_data) { } |
||||
}; |
||||
|
||||
class BackupEngine; |
||||
|
||||
typedef uint32_t BackupID; |
||||
|
||||
struct BackupInfo { |
||||
BackupID backup_id; |
||||
int64_t timestamp; |
||||
uint64_t size; |
||||
|
||||
BackupInfo() {} |
||||
BackupInfo(BackupID _backup_id, int64_t _timestamp, uint64_t _size) |
||||
: backup_id(_backup_id), timestamp(_timestamp), size(_size) {} |
||||
}; |
||||
|
||||
// Stack your DB with BackupableDB to be able to backup the DB
|
||||
class BackupableDB : public StackableDB { |
||||
public: |
||||
// BackupableDBOptions have to be the same as the ones used in a previous
|
||||
// incarnation of the DB
|
||||
BackupableDB(DB* db, const BackupableDBOptions& options); |
||||
virtual ~BackupableDB(); |
||||
|
||||
// Captures the state of the database in the latest backup
|
||||
// NOT a thread safe call
|
||||
Status CreateNewBackup(bool flush_before_backup = false); |
||||
// Returns info about backups in backup_info
|
||||
void GetBackupInfo(std::vector<BackupInfo>* backup_info); |
||||
// deletes old backups, keeping latest num_backups_to_keep alive
|
||||
Status PurgeOldBackups(uint32_t num_backups_to_keep); |
||||
// deletes a specific backup
|
||||
Status DeleteBackup(BackupID backup_id); |
||||
|
||||
private: |
||||
BackupEngine* backup_engine_; |
||||
}; |
||||
|
||||
// Use this class to access information about backups and restore from them
|
||||
class RestoreBackupableDB { |
||||
public: |
||||
RestoreBackupableDB(Env* db_env, const BackupableDBOptions& options); |
||||
~RestoreBackupableDB(); |
||||
|
||||
// Returns info about backups in backup_info
|
||||
void GetBackupInfo(std::vector<BackupInfo>* backup_info); |
||||
|
||||
// restore from backup with backup_id
|
||||
// IMPORTANT -- if you restore from some backup that is not the latest,
|
||||
// you HAVE to delete all the newer backups immediately, before creating
|
||||
// new backup on the restored database. Otherwise, your new backups
|
||||
// will be corrupted.
|
||||
// TODO should we enforce this somehow?
|
||||
Status RestoreDBFromBackup(BackupID backup_id, const std::string& db_dir, |
||||
const std::string& wal_dir); |
||||
|
||||
// restore from the latest backup
|
||||
Status RestoreDBFromLatestBackup(const std::string& db_dir, |
||||
const std::string& wal_dir); |
||||
// deletes old backups, keeping latest num_backups_to_keep alive
|
||||
Status PurgeOldBackups(uint32_t num_backups_to_keep); |
||||
// deletes a specific backup
|
||||
Status DeleteBackup(BackupID backup_id); |
||||
|
||||
private: |
||||
BackupEngine* backup_engine_; |
||||
}; |
||||
|
||||
} // rocksdb namespace
|
@ -0,0 +1,821 @@ |
||||
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "utilities/backupable_db.h" |
||||
#include "db/filename.h" |
||||
#include "util/coding.h" |
||||
#include "rocksdb/transaction_log.h" |
||||
|
||||
#define __STDC_FORMAT_MACROS |
||||
|
||||
#include <inttypes.h> |
||||
#include <algorithm> |
||||
#include <vector> |
||||
#include <map> |
||||
#include <string> |
||||
#include <limits> |
||||
|
||||
namespace rocksdb { |
||||
|
||||
// -------- BackupEngine class ---------
|
||||
class BackupEngine { |
||||
public: |
||||
BackupEngine(Env* db_env, const BackupableDBOptions& options); |
||||
~BackupEngine(); |
||||
Status CreateNewBackup(DB* db, bool flush_before_backup = false); |
||||
Status PurgeOldBackups(uint32_t num_backups_to_keep); |
||||
Status DeleteBackup(BackupID backup_id); |
||||
|
||||
void GetBackupInfo(std::vector<BackupInfo>* backup_info); |
||||
Status RestoreDBFromBackup(BackupID backup_id, const std::string &db_dir, |
||||
const std::string &wal_dir); |
||||
Status RestoreDBFromLatestBackup(const std::string &db_dir, |
||||
const std::string &wal_dir) { |
||||
return RestoreDBFromBackup(latest_backup_id_, db_dir, wal_dir); |
||||
} |
||||
|
||||
private: |
||||
class BackupMeta { |
||||
public: |
||||
BackupMeta(const std::string& meta_filename, |
||||
std::unordered_map<std::string, int>* file_refs, Env* env) |
||||
: timestamp_(0), size_(0), meta_filename_(meta_filename), |
||||
file_refs_(file_refs), env_(env) {} |
||||
|
||||
~BackupMeta() {} |
||||
|
||||
void RecordTimestamp() { |
||||
env_->GetCurrentTime(×tamp_); |
||||
} |
||||
int64_t GetTimestamp() const { |
||||
return timestamp_; |
||||
} |
||||
uint64_t GetSize() const { |
||||
return size_; |
||||
} |
||||
|
||||
void AddFile(const std::string& filename, uint64_t size); |
||||
void Delete(); |
||||
|
||||
bool Empty() { |
||||
return files_.empty(); |
||||
} |
||||
|
||||
const std::vector<std::string>& GetFiles() { |
||||
return files_; |
||||
} |
||||
|
||||
Status LoadFromFile(const std::string& backup_dir); |
||||
Status StoreToFile(bool sync); |
||||
|
||||
private: |
||||
int64_t timestamp_; |
||||
uint64_t size_; |
||||
std::string const meta_filename_; |
||||
// files with relative paths (without "/" prefix!!)
|
||||
std::vector<std::string> files_; |
||||
std::unordered_map<std::string, int>* file_refs_; |
||||
Env* env_; |
||||
}; // BackupMeta
|
||||
|
||||
inline std::string GetAbsolutePath( |
||||
const std::string &relative_path = "") const { |
||||
assert(relative_path.size() == 0 || relative_path[0] != '/'); |
||||
return options_.backup_dir + "/" + relative_path; |
||||
} |
||||
inline std::string GetPrivateDirRel() const { |
||||
return "private"; |
||||
} |
||||
inline std::string GetPrivateFileRel(BackupID backup_id, |
||||
const std::string &file = "") const { |
||||
assert(file.size() == 0 || file[0] != '/'); |
||||
return GetPrivateDirRel() + "/" + std::to_string(backup_id) + "/" + file; |
||||
} |
||||
inline std::string GetSharedFileRel(const std::string& file = "") const { |
||||
assert(file.size() == 0 || file[0] != '/'); |
||||
return "shared/" + file; |
||||
} |
||||
inline std::string GetLatestBackupFile(bool tmp = false) const { |
||||
return GetAbsolutePath(std::string("LATEST_BACKUP") + (tmp ? ".tmp" : "")); |
||||
} |
||||
inline std::string GetBackupMetaDir() const { |
||||
return GetAbsolutePath("meta"); |
||||
} |
||||
inline std::string GetBackupMetaFile(BackupID backup_id) const { |
||||
return GetBackupMetaDir() + "/" + std::to_string(backup_id); |
||||
} |
||||
|
||||
Status GetLatestBackupFileContents(uint32_t* latest_backup); |
||||
Status PutLatestBackupFileContents(uint32_t latest_backup); |
||||
// if size_limit == 0, there is no size limit, copy everything
|
||||
Status CopyFile(const std::string& src, |
||||
const std::string& dst, |
||||
Env* src_env, |
||||
Env* dst_env, |
||||
bool sync, |
||||
uint64_t* size = nullptr, |
||||
uint64_t size_limit = 0); |
||||
// if size_limit == 0, there is no size limit, copy everything
|
||||
Status BackupFile(BackupID backup_id, |
||||
BackupMeta* backup, |
||||
bool shared, |
||||
const std::string& src_dir, |
||||
const std::string& src_fname, // starts with "/"
|
||||
uint64_t size_limit = 0); |
||||
// Will delete all the files we don't need anymore
|
||||
// If full_scan == true, it will do the full scan of files/ directory
|
||||
// and delete all the files that are not referenced from backuped_file_refs_
|
||||
void GarbageCollection(bool full_scan); |
||||
|
||||
// backup state data
|
||||
BackupID latest_backup_id_; |
||||
std::map<BackupID, BackupMeta> backups_; |
||||
std::unordered_map<std::string, int> backuped_file_refs_; |
||||
std::vector<BackupID> obsolete_backups_; |
||||
|
||||
// options data
|
||||
BackupableDBOptions options_; |
||||
Env* db_env_; |
||||
Env* backup_env_; |
||||
|
||||
// constants
|
||||
static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB
|
||||
static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL; // 5MB
|
||||
}; |
||||
|
||||
BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options) |
||||
: options_(options), |
||||
db_env_(db_env), |
||||
backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_) { |
||||
|
||||
// create all the dirs we need
|
||||
backup_env_->CreateDirIfMissing(GetAbsolutePath()); |
||||
backup_env_->CreateDirIfMissing(GetAbsolutePath(GetSharedFileRel())); |
||||
backup_env_->CreateDirIfMissing(GetAbsolutePath(GetPrivateDirRel())); |
||||
backup_env_->CreateDirIfMissing(GetBackupMetaDir()); |
||||
|
||||
std::vector<std::string> backup_meta_files; |
||||
backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files); |
||||
// create backups_ structure
|
||||
for (auto& file : backup_meta_files) { |
||||
BackupID backup_id = 0; |
||||
sscanf(file.c_str(), "%u", &backup_id); |
||||
if (backup_id == 0 || file != std::to_string(backup_id)) { |
||||
// invalid file name, delete that
|
||||
backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file); |
||||
continue; |
||||
} |
||||
assert(backups_.find(backup_id) == backups_.end()); |
||||
backups_.insert(std::make_pair( |
||||
backup_id, BackupMeta(GetBackupMetaFile(backup_id), |
||||
&backuped_file_refs_, backup_env_))); |
||||
} |
||||
|
||||
if (options_.destroy_old_data) { // Destory old data
|
||||
for (auto& backup : backups_) { |
||||
backup.second.Delete(); |
||||
obsolete_backups_.push_back(backup.first); |
||||
} |
||||
backups_.clear(); |
||||
// start from beginning
|
||||
latest_backup_id_ = 0; |
||||
// GarbageCollection() will do the actual deletion
|
||||
} else { // Load data from storage
|
||||
// load the backups if any
|
||||
for (auto& backup : backups_) { |
||||
Status s = backup.second.LoadFromFile(options_.backup_dir); |
||||
if (!s.ok()) { |
||||
Log(options_.info_log, "Backup %u corrupted - deleting -- %s", |
||||
backup.first, s.ToString().c_str()); |
||||
backup.second.Delete(); |
||||
obsolete_backups_.push_back(backup.first); |
||||
} |
||||
} |
||||
// delete obsolete backups from the structure
|
||||
for (auto ob : obsolete_backups_) { |
||||
backups_.erase(ob); |
||||
} |
||||
|
||||
Status s = GetLatestBackupFileContents(&latest_backup_id_); |
||||
// If latest backup file is corrupted or non-existent
|
||||
// set latest backup as the biggest backup we have
|
||||
// or 0 if we have no backups
|
||||
if (!s.ok() || |
||||
backups_.find(latest_backup_id_) == backups_.end()) { |
||||
auto itr = backups_.end(); |
||||
latest_backup_id_ = (itr == backups_.begin()) ? 0 : (--itr)->first; |
||||
} |
||||
} |
||||
|
||||
// delete any backups that claim to be later than latest
|
||||
for (auto itr = backups_.upper_bound(latest_backup_id_); |
||||
itr != backups_.end();) { |
||||
itr->second.Delete(); |
||||
obsolete_backups_.push_back(itr->first); |
||||
itr = backups_.erase(itr); |
||||
} |
||||
|
||||
PutLatestBackupFileContents(latest_backup_id_); // Ignore errors
|
||||
GarbageCollection(true); |
||||
Log(options_.info_log, |
||||
"Initialized BackupEngine, the latest backup is %u.", |
||||
latest_backup_id_); |
||||
} |
||||
|
||||
BackupEngine::~BackupEngine() { |
||||
LogFlush(options_.info_log); |
||||
} |
||||
|
||||
Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) { |
||||
Status s; |
||||
std::vector<std::string> live_files; |
||||
VectorLogPtr live_wal_files; |
||||
uint64_t manifest_file_size = 0; |
||||
|
||||
s = db->DisableFileDeletions(); |
||||
if (s.ok()) { |
||||
// this will return live_files prefixed with "/"
|
||||
s = db->GetLiveFiles(live_files, &manifest_file_size, flush_before_backup); |
||||
} |
||||
// if we didn't flush before backup, we need to also get WAL files
|
||||
if (s.ok() && !flush_before_backup) { |
||||
// returns file names prefixed with "/"
|
||||
s = db->GetSortedWalFiles(live_wal_files); |
||||
} |
||||
if (!s.ok()) { |
||||
db->EnableFileDeletions(); |
||||
return s; |
||||
} |
||||
|
||||
BackupID new_backup_id = latest_backup_id_ + 1; |
||||
assert(backups_.find(new_backup_id) == backups_.end()); |
||||
auto ret = backups_.insert(std::make_pair( |
||||
new_backup_id, BackupMeta(GetBackupMetaFile(new_backup_id), |
||||
&backuped_file_refs_, backup_env_))); |
||||
assert(ret.second == true); |
||||
auto& new_backup = ret.first->second; |
||||
new_backup.RecordTimestamp(); |
||||
|
||||
Log(options_.info_log, "Started the backup process -- creating backup %u", |
||||
new_backup_id); |
||||
|
||||
// create private dir
|
||||
s = backup_env_->CreateDir(GetAbsolutePath(GetPrivateFileRel(new_backup_id))); |
||||
|
||||
// copy live_files
|
||||
for (size_t i = 0; s.ok() && i < live_files.size(); ++i) { |
||||
uint64_t number; |
||||
FileType type; |
||||
bool ok = ParseFileName(live_files[i], &number, &type); |
||||
assert(ok); |
||||
// we should only get sst, manifest and current files here
|
||||
assert(type == kTableFile || |
||||
type == kDescriptorFile || |
||||
type == kCurrentFile); |
||||
|
||||
// rules:
|
||||
// * if it's kTableFile, than it's shared
|
||||
// * if it's kDescriptorFile, limit the size to manifest_file_size
|
||||
s = BackupFile(new_backup_id, |
||||
&new_backup, |
||||
type == kTableFile, /* shared */ |
||||
db->GetName(), /* src_dir */ |
||||
live_files[i], /* src_fname */ |
||||
(type == kDescriptorFile) ? manifest_file_size : 0); |
||||
} |
||||
|
||||
// copy WAL files
|
||||
for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) { |
||||
if (live_wal_files[i]->Type() == kAliveLogFile) { |
||||
// we only care about live log files
|
||||
// copy the file into backup_dir/files/<new backup>/
|
||||
s = BackupFile(new_backup_id, |
||||
&new_backup, |
||||
false, /* not shared */ |
||||
db->GetOptions().wal_dir, |
||||
live_wal_files[i]->PathName()); |
||||
} |
||||
} |
||||
|
||||
// we copied all the files, enable file deletions
|
||||
db->EnableFileDeletions(); |
||||
|
||||
if (s.ok()) { |
||||
// persist the backup metadata on the disk
|
||||
s = new_backup.StoreToFile(options_.sync); |
||||
} |
||||
if (s.ok()) { |
||||
// install the newly created backup meta! (atomic)
|
||||
s = PutLatestBackupFileContents(new_backup_id); |
||||
} |
||||
if (!s.ok()) { |
||||
// clean all the files we might have created
|
||||
Log(options_.info_log, "Backup failed -- %s", s.ToString().c_str()); |
||||
backups_.erase(new_backup_id); |
||||
GarbageCollection(true); |
||||
return s; |
||||
} |
||||
|
||||
// here we know that we succeeded and installed the new backup
|
||||
// in the LATEST_BACKUP file
|
||||
latest_backup_id_ = new_backup_id; |
||||
Log(options_.info_log, "Backup DONE. All is good"); |
||||
return s; |
||||
} |
||||
|
||||
Status BackupEngine::PurgeOldBackups(uint32_t num_backups_to_keep) { |
||||
Log(options_.info_log, "Purging old backups, keeping %u", |
||||
num_backups_to_keep); |
||||
while (num_backups_to_keep < backups_.size()) { |
||||
Log(options_.info_log, "Deleting backup %u", backups_.begin()->first); |
||||
backups_.begin()->second.Delete(); |
||||
obsolete_backups_.push_back(backups_.begin()->first); |
||||
backups_.erase(backups_.begin()); |
||||
} |
||||
GarbageCollection(false); |
||||
return Status::OK(); |
||||
} |
||||
|
||||
Status BackupEngine::DeleteBackup(BackupID backup_id) { |
||||
Log(options_.info_log, "Deleting backup %u", backup_id); |
||||
auto backup = backups_.find(backup_id); |
||||
if (backup == backups_.end()) { |
||||
return Status::NotFound("Backup not found"); |
||||
} |
||||
backup->second.Delete(); |
||||
obsolete_backups_.push_back(backup_id); |
||||
backups_.erase(backup); |
||||
GarbageCollection(false); |
||||
return Status::OK(); |
||||
} |
||||
|
||||
void BackupEngine::GetBackupInfo(std::vector<BackupInfo>* backup_info) { |
||||
backup_info->reserve(backups_.size()); |
||||
for (auto& backup : backups_) { |
||||
if (!backup.second.Empty()) { |
||||
backup_info->push_back(BackupInfo( |
||||
backup.first, backup.second.GetTimestamp(), backup.second.GetSize())); |
||||
} |
||||
} |
||||
} |
||||
|
||||
Status BackupEngine::RestoreDBFromBackup(BackupID backup_id, |
||||
const std::string &db_dir, |
||||
const std::string &wal_dir) { |
||||
auto backup_itr = backups_.find(backup_id); |
||||
if (backup_itr == backups_.end()) { |
||||
return Status::NotFound("Backup not found"); |
||||
} |
||||
auto& backup = backup_itr->second; |
||||
if (backup.Empty()) { |
||||
return Status::NotFound("Backup not found"); |
||||
} |
||||
|
||||
Log(options_.info_log, "Restoring backup id %u\n", backup_id); |
||||
|
||||
// just in case. Ignore errors
|
||||
db_env_->CreateDirIfMissing(db_dir); |
||||
db_env_->CreateDirIfMissing(wal_dir); |
||||
|
||||
// delete log files that might have been already in wal_dir.
|
||||
// This is important since they might get replayed to the restored DB,
|
||||
// which will then differ from the backuped DB
|
||||
std::vector<std::string> wal_dir_children; |
||||
db_env_->GetChildren(wal_dir, &wal_dir_children); // ignore errors
|
||||
for (auto f : wal_dir_children) { |
||||
db_env_->DeleteFile(wal_dir + "/" + f); // ignore errors
|
||||
} |
||||
|
||||
Status s; |
||||
for (auto& file : backup.GetFiles()) { |
||||
std::string dst; |
||||
// 1. extract the filename
|
||||
size_t slash = file.find_last_of('/'); |
||||
// file will either be shared/<file> or private/<number>/<file>
|
||||
assert(slash != std::string::npos); |
||||
dst = file.substr(slash + 1); |
||||
|
||||
// 2. find the filetype
|
||||
uint64_t number; |
||||
FileType type; |
||||
bool ok = ParseFileName(dst, &number, &type); |
||||
if (!ok) { |
||||
return Status::Corruption("Backup corrupted"); |
||||
} |
||||
// 3. Construct the final path
|
||||
// kLogFile lives in wal_dir and all the rest live in db_dir
|
||||
dst = ((type == kLogFile) ? wal_dir : db_dir) + |
||||
"/" + dst; |
||||
|
||||
Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); |
||||
s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false); |
||||
if (!s.ok()) { |
||||
break; |
||||
} |
||||
} |
||||
|
||||
Log(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str()); |
||||
return s; |
||||
} |
||||
|
||||
// latest backup id is an ASCII representation of latest backup id
|
||||
Status BackupEngine::GetLatestBackupFileContents(uint32_t* latest_backup) { |
||||
Status s; |
||||
unique_ptr<SequentialFile> file; |
||||
s = backup_env_->NewSequentialFile(GetLatestBackupFile(), |
||||
&file, |
||||
EnvOptions()); |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
|
||||
char* buf = new char[10]; |
||||
Slice data(buf, 0); |
||||
|
||||
s = file->Read(10, &data, buf); |
||||
|
||||
if (!s.ok() || data.size() == 0) { |
||||
delete[] buf; |
||||
return s.ok() ? Status::Corruption("Latest backup file corrupted") : s; |
||||
} |
||||
|
||||
sscanf(data.data(), "%u", latest_backup); |
||||
if (backup_env_->FileExists(GetBackupMetaFile(*latest_backup)) == false) { |
||||
s = Status::Corruption("Latest backup file corrupted"); |
||||
} |
||||
delete[] buf; |
||||
return Status::OK(); |
||||
} |
||||
|
||||
// this operation HAS to be atomic
|
||||
// writing 4 bytes to the file is atomic alright, but we should *never*
|
||||
// do something like 1. delete file, 2. write new file
|
||||
// We write to a tmp file and then atomically rename
|
||||
Status BackupEngine::PutLatestBackupFileContents(uint32_t latest_backup) { |
||||
Status s; |
||||
unique_ptr<WritableFile> file; |
||||
EnvOptions env_options; |
||||
env_options.use_mmap_writes = false; |
||||
s = backup_env_->NewWritableFile(GetLatestBackupFile(true), |
||||
&file, |
||||
env_options); |
||||
if (!s.ok()) { |
||||
backup_env_->DeleteFile(GetLatestBackupFile(true)); |
||||
return s; |
||||
} |
||||
|
||||
char* file_contents = new char[10]; |
||||
int len = sprintf(file_contents, "%u\n", latest_backup); |
||||
s = file->Append(Slice(file_contents, len)); |
||||
if (s.ok() && options_.sync) { |
||||
file->Sync(); |
||||
} |
||||
if (s.ok()) { |
||||
s = file->Close(); |
||||
} |
||||
if (s.ok()) { |
||||
// atomically replace real file with new tmp
|
||||
s = backup_env_->RenameFile(GetLatestBackupFile(true), |
||||
GetLatestBackupFile(false)); |
||||
} |
||||
return s; |
||||
} |
||||
|
||||
Status BackupEngine::CopyFile(const std::string& src, |
||||
const std::string& dst, |
||||
Env* src_env, |
||||
Env* dst_env, |
||||
bool sync, |
||||
uint64_t* size, |
||||
uint64_t size_limit) { |
||||
Status s; |
||||
unique_ptr<WritableFile> dst_file; |
||||
unique_ptr<SequentialFile> src_file; |
||||
EnvOptions env_options; |
||||
env_options.use_mmap_writes = false; |
||||
if (size != nullptr) { |
||||
*size = 0; |
||||
} |
||||
|
||||
// Check if size limit is set. if not, set it to very big number
|
||||
if (size_limit == 0) { |
||||
size_limit = std::numeric_limits<uint64_t>::max(); |
||||
} |
||||
|
||||
s = src_env->NewSequentialFile(src, &src_file, env_options); |
||||
if (s.ok()) { |
||||
s = dst_env->NewWritableFile(dst, &dst_file, env_options); |
||||
} |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
|
||||
char* buf = new char[copy_file_buffer_size_]; |
||||
Slice data(buf, 0); |
||||
|
||||
do { |
||||
size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ? |
||||
copy_file_buffer_size_ : size_limit; |
||||
s = src_file->Read(buffer_to_read, &data, buf); |
||||
size_limit -= data.size(); |
||||
if (size != nullptr) { |
||||
*size += data.size(); |
||||
} |
||||
if (s.ok()) { |
||||
s = dst_file->Append(data); |
||||
} |
||||
} while (s.ok() && data.size() > 0 && size_limit > 0); |
||||
|
||||
if (s.ok() && sync) { |
||||
s = dst_file->Sync(); |
||||
} |
||||
|
||||
return s; |
||||
} |
||||
|
||||
// src_fname will always start with "/"
|
||||
Status BackupEngine::BackupFile(BackupID backup_id, |
||||
BackupMeta* backup, |
||||
bool shared, |
||||
const std::string& src_dir, |
||||
const std::string& src_fname, |
||||
uint64_t size_limit) { |
||||
|
||||
assert(src_fname.size() > 0 && src_fname[0] == '/'); |
||||
std::string dst_relative = src_fname.substr(1); |
||||
if (shared) { |
||||
dst_relative = GetSharedFileRel(dst_relative); |
||||
} else { |
||||
dst_relative = GetPrivateFileRel(backup_id, dst_relative); |
||||
} |
||||
std::string dst_path = GetAbsolutePath(dst_relative); |
||||
Status s; |
||||
uint64_t size; |
||||
|
||||
// if it's shared, we also need to check if it exists -- if it does,
|
||||
// no need to copy it again
|
||||
if (shared && backup_env_->FileExists(dst_path)) { |
||||
backup_env_->GetFileSize(dst_path, &size); // Ignore error
|
||||
Log(options_.info_log, "%s already present", src_fname.c_str()); |
||||
} else { |
||||
Log(options_.info_log, "Copying %s", src_fname.c_str()); |
||||
s = CopyFile(src_dir + src_fname, |
||||
dst_path, |
||||
db_env_, |
||||
backup_env_, |
||||
options_.sync, |
||||
&size, |
||||
size_limit); |
||||
} |
||||
if (s.ok()) { |
||||
backup->AddFile(dst_relative, size); |
||||
} |
||||
return s; |
||||
} |
||||
|
||||
void BackupEngine::GarbageCollection(bool full_scan) { |
||||
Log(options_.info_log, "Starting garbage collection"); |
||||
std::vector<std::string> to_delete; |
||||
for (auto& itr : backuped_file_refs_) { |
||||
if (itr.second == 0) { |
||||
Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first)); |
||||
Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(), |
||||
s.ToString().c_str()); |
||||
to_delete.push_back(itr.first); |
||||
} |
||||
} |
||||
for (auto& td : to_delete) { |
||||
backuped_file_refs_.erase(td); |
||||
} |
||||
if (!full_scan) { |
||||
// take care of private dirs -- if full_scan == true, then full_scan will
|
||||
// take care of them
|
||||
for (auto backup_id : obsolete_backups_) { |
||||
std::string private_dir = GetPrivateFileRel(backup_id); |
||||
Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir)); |
||||
Log(options_.info_log, "Deleting private dir %s -- %s", |
||||
private_dir.c_str(), s.ToString().c_str()); |
||||
} |
||||
obsolete_backups_.clear(); |
||||
} |
||||
|
||||
if (full_scan) { |
||||
Log(options_.info_log, "Starting full scan garbage collection"); |
||||
// delete obsolete shared files
|
||||
std::vector<std::string> shared_children; |
||||
backup_env_->GetChildren(GetAbsolutePath(GetSharedFileRel()), |
||||
&shared_children); |
||||
for (auto& child : shared_children) { |
||||
std::string rel_fname = GetSharedFileRel(child); |
||||
// if it's not refcounted, delete it
|
||||
if (backuped_file_refs_.find(rel_fname) == backuped_file_refs_.end()) { |
||||
// this might be a directory, but DeleteFile will just fail in that
|
||||
// case, so we're good
|
||||
Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname)); |
||||
if (s.ok()) { |
||||
Log(options_.info_log, "Deleted %s", rel_fname.c_str()); |
||||
} |
||||
} |
||||
} |
||||
|
||||
// delete obsolete private files
|
||||
std::vector<std::string> private_children; |
||||
backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()), |
||||
&private_children); |
||||
for (auto& child : private_children) { |
||||
BackupID backup_id = 0; |
||||
sscanf(child.c_str(), "%u", &backup_id); |
||||
if (backup_id == 0 || backups_.find(backup_id) != backups_.end()) { |
||||
// it's either not a number or it's still alive. continue
|
||||
continue; |
||||
} |
||||
// here we have to delete the dir and all its children
|
||||
std::string full_private_path = |
||||
GetAbsolutePath(GetPrivateFileRel(backup_id)); |
||||
std::vector<std::string> subchildren; |
||||
backup_env_->GetChildren(full_private_path, &subchildren); |
||||
for (auto& subchild : subchildren) { |
||||
Status s = backup_env_->DeleteFile(full_private_path + subchild); |
||||
if (s.ok()) { |
||||
Log(options_.info_log, "Deleted %s", |
||||
(full_private_path + subchild).c_str()); |
||||
} |
||||
} |
||||
// finally delete the private dir
|
||||
Status s = backup_env_->DeleteDir(full_private_path); |
||||
Log(options_.info_log, "Deleted dir %s -- %s", full_private_path.c_str(), |
||||
s.ToString().c_str()); |
||||
} |
||||
} |
||||
} |
||||
|
||||
// ------- BackupMeta class --------
|
||||
|
||||
void BackupEngine::BackupMeta::AddFile(const std::string& filename, |
||||
uint64_t size) { |
||||
size_ += size; |
||||
files_.push_back(filename); |
||||
auto itr = file_refs_->find(filename); |
||||
if (itr == file_refs_->end()) { |
||||
file_refs_->insert(std::make_pair(filename, 1)); |
||||
} else { |
||||
++itr->second; // increase refcount if already present
|
||||
} |
||||
} |
||||
|
||||
void BackupEngine::BackupMeta::Delete() { |
||||
for (auto& file : files_) { |
||||
auto itr = file_refs_->find(file); |
||||
assert(itr != file_refs_->end()); |
||||
--(itr->second); // decrease refcount
|
||||
} |
||||
files_.clear(); |
||||
// delete meta file
|
||||
env_->DeleteFile(meta_filename_); |
||||
timestamp_ = 0; |
||||
} |
||||
|
||||
// each backup meta file is of the format:
|
||||
// <timestamp>
|
||||
// <number of files>
|
||||
// <file1>
|
||||
// <file2>
|
||||
// ...
|
||||
// TODO: maybe add checksum?
|
||||
Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) { |
||||
assert(Empty()); |
||||
Status s; |
||||
unique_ptr<SequentialFile> backup_meta_file; |
||||
s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions()); |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
|
||||
char* buf = new char[max_backup_meta_file_size_ + 1]; |
||||
Slice data(buf, 0); |
||||
s = backup_meta_file->Read(max_backup_meta_file_size_, &data, buf); |
||||
|
||||
if (!s.ok() || data.size() == max_backup_meta_file_size_) { |
||||
delete[] buf; |
||||
return s.ok() ? Status::IOError("File size too big") : s; |
||||
} |
||||
buf[data.size()] = 0; |
||||
|
||||
uint32_t num_files = 0; |
||||
int bytes_read = 0; |
||||
sscanf(data.data(), "%ld%n", ×tamp_, &bytes_read); |
||||
data.remove_prefix(bytes_read + 1); // +1 for '\n'
|
||||
sscanf(data.data(), "%u%n", &num_files, &bytes_read); |
||||
data.remove_prefix(bytes_read + 1); // +1 for '\n'
|
||||
|
||||
for (uint32_t i = 0; s.ok() && i < num_files; ++i) { |
||||
std::string filename = GetSliceUntil(&data, '\n').ToString(); |
||||
uint64_t size; |
||||
s = env_->GetFileSize(backup_dir + "/" + filename, &size); |
||||
AddFile(filename, size); |
||||
} |
||||
|
||||
delete[] buf; |
||||
return s; |
||||
} |
||||
|
||||
Status BackupEngine::BackupMeta::StoreToFile(bool sync) { |
||||
Status s; |
||||
unique_ptr<WritableFile> backup_meta_file; |
||||
EnvOptions env_options; |
||||
env_options.use_mmap_writes = false; |
||||
s = env_->NewWritableFile(meta_filename_ + ".tmp", &backup_meta_file, |
||||
env_options); |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
|
||||
char* buf = new char[max_backup_meta_file_size_]; |
||||
int len = 0, buf_size = max_backup_meta_file_size_; |
||||
len += snprintf(buf, buf_size, "%" PRId64 "\n", timestamp_); |
||||
len += snprintf(buf + len, buf_size - len, "%zu\n", files_.size()); |
||||
for (size_t i = 0; i < files_.size(); ++i) { |
||||
len += snprintf(buf + len, buf_size - len, "%s\n", files_[i].c_str()); |
||||
} |
||||
|
||||
s = backup_meta_file->Append(Slice(buf, (size_t)len)); |
||||
if (s.ok() && sync) { |
||||
s = backup_meta_file->Sync(); |
||||
} |
||||
if (s.ok()) { |
||||
s = backup_meta_file->Close(); |
||||
} |
||||
if (s.ok()) { |
||||
s = env_->RenameFile(meta_filename_ + ".tmp", meta_filename_); |
||||
} |
||||
return s; |
||||
} |
||||
|
||||
// --- BackupableDB methods --------
|
||||
|
||||
BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options) : |
||||
StackableDB(db), |
||||
backup_engine_(new BackupEngine(db->GetEnv(), options)) {} |
||||
|
||||
BackupableDB::~BackupableDB() { |
||||
delete backup_engine_; |
||||
} |
||||
|
||||
Status BackupableDB::CreateNewBackup(bool flush_before_backup) { |
||||
return backup_engine_->CreateNewBackup(this, flush_before_backup); |
||||
} |
||||
|
||||
void BackupableDB::GetBackupInfo(std::vector<BackupInfo>* backup_info) { |
||||
backup_engine_->GetBackupInfo(backup_info); |
||||
} |
||||
|
||||
Status BackupableDB::PurgeOldBackups(uint32_t num_backups_to_keep) { |
||||
return backup_engine_->PurgeOldBackups(num_backups_to_keep); |
||||
} |
||||
|
||||
Status BackupableDB::DeleteBackup(BackupID backup_id) { |
||||
return backup_engine_->DeleteBackup(backup_id); |
||||
} |
||||
|
||||
// --- RestoreBackupableDB methods ------
|
||||
|
||||
RestoreBackupableDB::RestoreBackupableDB(Env* db_env, |
||||
const BackupableDBOptions& options) |
||||
: backup_engine_(new BackupEngine(db_env, options)) {} |
||||
|
||||
RestoreBackupableDB::~RestoreBackupableDB() { |
||||
delete backup_engine_; |
||||
} |
||||
|
||||
void |
||||
RestoreBackupableDB::GetBackupInfo(std::vector<BackupInfo>* backup_info) { |
||||
backup_engine_->GetBackupInfo(backup_info); |
||||
} |
||||
|
||||
Status RestoreBackupableDB::RestoreDBFromBackup(BackupID backup_id, |
||||
const std::string& db_dir, |
||||
const std::string& wal_dir) { |
||||
return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir); |
||||
} |
||||
|
||||
Status |
||||
RestoreBackupableDB::RestoreDBFromLatestBackup(const std::string& db_dir, |
||||
const std::string& wal_dir) { |
||||
return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir); |
||||
} |
||||
|
||||
Status RestoreBackupableDB::PurgeOldBackups(uint32_t num_backups_to_keep) { |
||||
return backup_engine_->PurgeOldBackups(num_backups_to_keep); |
||||
} |
||||
|
||||
Status RestoreBackupableDB::DeleteBackup(BackupID backup_id) { |
||||
return backup_engine_->DeleteBackup(backup_id); |
||||
} |
||||
|
||||
} // namespace rocksdb
|
@ -0,0 +1,625 @@ |
||||
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "rocksdb/transaction_log.h" |
||||
#include "utilities/utility_db.h" |
||||
#include "utilities/backupable_db.h" |
||||
#include "util/testharness.h" |
||||
#include "util/random.h" |
||||
#include "util/testutil.h" |
||||
#include "util/auto_roll_logger.h" |
||||
|
||||
#include <string> |
||||
#include <algorithm> |
||||
|
||||
namespace rocksdb { |
||||
|
||||
namespace { |
||||
|
||||
using std::unique_ptr; |
||||
|
||||
class DummyDB : public StackableDB { |
||||
public: |
||||
/* implicit */ |
||||
DummyDB(const Options& options, const std::string& dbname) |
||||
: StackableDB(nullptr), options_(options), dbname_(dbname), |
||||
deletions_enabled_(true) {} |
||||
|
||||
virtual const std::string& GetName() const override { |
||||
return dbname_; |
||||
} |
||||
|
||||
virtual Env* GetEnv() const override { |
||||
return options_.env; |
||||
} |
||||
|
||||
virtual const Options& GetOptions() const override { |
||||
return options_; |
||||
} |
||||
|
||||
virtual Status EnableFileDeletions() override { |
||||
ASSERT_TRUE(!deletions_enabled_); |
||||
deletions_enabled_ = true; |
||||
return Status::OK(); |
||||
} |
||||
|
||||
virtual Status DisableFileDeletions() override { |
||||
ASSERT_TRUE(deletions_enabled_); |
||||
deletions_enabled_ = false; |
||||
return Status::OK(); |
||||
} |
||||
|
||||
virtual Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs, |
||||
bool flush_memtable = true) override { |
||||
ASSERT_TRUE(!deletions_enabled_); |
||||
vec = live_files_; |
||||
*mfs = 100; |
||||
return Status::OK(); |
||||
} |
||||
|
||||
class DummyLogFile : public LogFile { |
||||
public: |
||||
/* implicit */ |
||||
DummyLogFile(const std::string& path, bool alive = true) |
||||
: path_(path), alive_(alive) {} |
||||
|
||||
virtual std::string PathName() const override { |
||||
return path_; |
||||
} |
||||
|
||||
virtual uint64_t LogNumber() const { |
||||
// what business do you have calling this method?
|
||||
ASSERT_TRUE(false); |
||||
return 0; |
||||
} |
||||
|
||||
virtual WalFileType Type() const override { |
||||
return alive_ ? kAliveLogFile : kArchivedLogFile; |
||||
} |
||||
|
||||
virtual SequenceNumber StartSequence() const { |
||||
// backupabledb should not need this method
|
||||
ASSERT_TRUE(false); |
||||
return 0; |
||||
} |
||||
|
||||
virtual uint64_t SizeFileBytes() const { |
||||
// backupabledb should not need this method
|
||||
ASSERT_TRUE(false); |
||||
return 0; |
||||
} |
||||
|
||||
private: |
||||
std::string path_; |
||||
bool alive_; |
||||
}; // DummyLogFile
|
||||
|
||||
virtual Status GetSortedWalFiles(VectorLogPtr& files) override { |
||||
ASSERT_TRUE(!deletions_enabled_); |
||||
files.resize(wal_files_.size()); |
||||
for (size_t i = 0; i < files.size(); ++i) { |
||||
files[i].reset( |
||||
new DummyLogFile(wal_files_[i].first, wal_files_[i].second)); |
||||
} |
||||
return Status::OK(); |
||||
} |
||||
|
||||
std::vector<std::string> live_files_; |
||||
// pair<filename, alive?>
|
||||
std::vector<std::pair<std::string, bool>> wal_files_; |
||||
private: |
||||
Options options_; |
||||
std::string dbname_; |
||||
bool deletions_enabled_; |
||||
}; // DummyDB
|
||||
|
||||
class TestEnv : public EnvWrapper { |
||||
public: |
||||
explicit TestEnv(Env* t) : EnvWrapper(t) {} |
||||
|
||||
class DummySequentialFile : public SequentialFile { |
||||
virtual Status Read(size_t n, Slice* result, char* scratch) { |
||||
size_t read_size = (n > size_left) ? size_left : n; |
||||
*result = Slice(scratch, read_size); |
||||
size_left -= read_size; |
||||
return Status::OK(); |
||||
} |
||||
|
||||
virtual Status Skip(uint64_t n) { |
||||
size_left = (n > size_left) ? size_left - n : 0; |
||||
return Status::OK(); |
||||
} |
||||
private: |
||||
size_t size_left = 200; |
||||
}; |
||||
|
||||
Status NewSequentialFile(const std::string& f, |
||||
unique_ptr<SequentialFile>* r, |
||||
const EnvOptions& options) { |
||||
opened_files_.push_back(f); |
||||
if (dummy_sequential_file_) { |
||||
r->reset(new TestEnv::DummySequentialFile()); |
||||
return Status::OK(); |
||||
} else { |
||||
return EnvWrapper::NewSequentialFile(f, r, options); |
||||
} |
||||
} |
||||
|
||||
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r, |
||||
const EnvOptions& options) { |
||||
if (limit_written_files_ <= 0) { |
||||
return Status::IOError("Sorry, can't do this"); |
||||
} |
||||
limit_written_files_--; |
||||
return EnvWrapper::NewWritableFile(f, r, options); |
||||
} |
||||
|
||||
void AssertOpenedFiles(std::vector<std::string>& should_have_opened) { |
||||
sort(should_have_opened.begin(), should_have_opened.end()); |
||||
sort(opened_files_.begin(), opened_files_.end()); |
||||
ASSERT_TRUE(opened_files_ == should_have_opened); |
||||
} |
||||
|
||||
void ClearOpenedFiles() { |
||||
opened_files_.clear(); |
||||
} |
||||
|
||||
void SetLimitWrittenFiles(uint64_t limit) { |
||||
limit_written_files_ = limit; |
||||
} |
||||
|
||||
void SetDummySequentialFile(bool dummy_sequential_file) { |
||||
dummy_sequential_file_ = dummy_sequential_file; |
||||
} |
||||
|
||||
private: |
||||
bool dummy_sequential_file_ = false; |
||||
std::vector<std::string> opened_files_; |
||||
uint64_t limit_written_files_ = 1000000; |
||||
}; // TestEnv
|
||||
|
||||
class FileManager : public EnvWrapper { |
||||
public: |
||||
explicit FileManager(Env* t) : EnvWrapper(t), rnd_(5) {} |
||||
|
||||
Status DeleteRandomFileInDir(const std::string dir) { |
||||
std::vector<std::string> children; |
||||
GetChildren(dir, &children); |
||||
if (children.size() <= 2) { // . and ..
|
||||
return Status::NotFound(""); |
||||
} |
||||
while (true) { |
||||
int i = rnd_.Next() % children.size(); |
||||
if (children[i] != "." && children[i] != "..") { |
||||
return DeleteFile(dir + "/" + children[i]); |
||||
} |
||||
} |
||||
// should never get here
|
||||
assert(false); |
||||
return Status::NotFound(""); |
||||
} |
||||
|
||||
Status CorruptFile(const std::string& fname, uint64_t bytes_to_corrupt) { |
||||
uint64_t size; |
||||
Status s = GetFileSize(fname, &size); |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
unique_ptr<RandomRWFile> file; |
||||
EnvOptions env_options; |
||||
env_options.use_mmap_writes = false; |
||||
s = NewRandomRWFile(fname, &file, env_options); |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
|
||||
for (uint64_t i = 0; s.ok() && i < bytes_to_corrupt; ++i) { |
||||
std::string tmp; |
||||
// write one random byte to a random position
|
||||
s = file->Write(rnd_.Next() % size, test::RandomString(&rnd_, 1, &tmp)); |
||||
} |
||||
return s; |
||||
} |
||||
|
||||
Status WriteToFile(const std::string& fname, const std::string& data) { |
||||
unique_ptr<WritableFile> file; |
||||
EnvOptions env_options; |
||||
env_options.use_mmap_writes = false; |
||||
Status s = EnvWrapper::NewWritableFile(fname, &file, env_options); |
||||
if (!s.ok()) { |
||||
return s; |
||||
} |
||||
return file->Append(Slice(data)); |
||||
} |
||||
private: |
||||
Random rnd_; |
||||
}; // FileManager
|
||||
|
||||
// utility functions
|
||||
static void FillDB(DB* db, int from, int to) { |
||||
for (int i = from; i < to; ++i) { |
||||
std::string key = "testkey" + std::to_string(i); |
||||
std::string value = "testvalue" + std::to_string(i); |
||||
|
||||
ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value))); |
||||
} |
||||
} |
||||
|
||||
static void AssertExists(DB* db, int from, int to) { |
||||
for (int i = from; i < to; ++i) { |
||||
std::string key = "testkey" + std::to_string(i); |
||||
std::string value; |
||||
Status s = db->Get(ReadOptions(), Slice(key), &value); |
||||
ASSERT_EQ(value, "testvalue" + std::to_string(i)); |
||||
} |
||||
} |
||||
|
||||
static void AssertEmpty(DB* db, int from, int to) { |
||||
for (int i = from; i < to; ++i) { |
||||
std::string key = "testkey" + std::to_string(i); |
||||
std::string value = "testvalue" + std::to_string(i); |
||||
|
||||
Status s = db->Get(ReadOptions(), Slice(key), &value); |
||||
ASSERT_TRUE(s.IsNotFound()); |
||||
} |
||||
} |
||||
|
||||
class BackupableDBTest { |
||||
public: |
||||
BackupableDBTest() { |
||||
// set up files
|
||||
dbname_ = test::TmpDir() + "/backupable_db"; |
||||
backupdir_ = test::TmpDir() + "/backupable_db_backup"; |
||||
|
||||
// set up envs
|
||||
env_ = Env::Default(); |
||||
test_db_env_.reset(new TestEnv(env_)); |
||||
test_backup_env_.reset(new TestEnv(env_)); |
||||
file_manager_.reset(new FileManager(env_)); |
||||
|
||||
// set up db options
|
||||
options_.create_if_missing = true; |
||||
options_.paranoid_checks = true; |
||||
options_.write_buffer_size = 1 << 19; // 512KB
|
||||
options_.env = test_db_env_.get(); |
||||
options_.wal_dir = dbname_; |
||||
// set up backup db options
|
||||
CreateLoggerFromOptions(dbname_, backupdir_, env_, |
||||
Options(), &logger); |
||||
backupable_options_.reset(new BackupableDBOptions( |
||||
backupdir_, test_backup_env_.get(), logger.get(), true)); |
||||
|
||||
// delete old files in db
|
||||
DestroyDB(dbname_, Options()); |
||||
} |
||||
|
||||
void OpenBackupableDB(bool destroy_old_data = false, bool dummy = false) { |
||||
// reset all the defaults
|
||||
test_backup_env_->SetLimitWrittenFiles(1000000); |
||||
test_db_env_->SetLimitWrittenFiles(1000000); |
||||
test_db_env_->SetDummySequentialFile(dummy); |
||||
|
||||
DB* db; |
||||
if (dummy) { |
||||
dummy_db_ = new DummyDB(options_, dbname_); |
||||
db = dummy_db_; |
||||
} else { |
||||
ASSERT_OK(DB::Open(options_, dbname_, &db)); |
||||
} |
||||
backupable_options_->destroy_old_data = destroy_old_data; |
||||
db_.reset(new BackupableDB(db, *backupable_options_)); |
||||
} |
||||
|
||||
void CloseBackupableDB() { |
||||
db_.reset(nullptr); |
||||
} |
||||
|
||||
void OpenRestoreDB() { |
||||
backupable_options_->destroy_old_data = false; |
||||
restore_db_.reset( |
||||
new RestoreBackupableDB(test_db_env_.get(), *backupable_options_)); |
||||
} |
||||
|
||||
void CloseRestoreDB() { |
||||
restore_db_.reset(nullptr); |
||||
} |
||||
|
||||
// restores backup backup_id and asserts the existence of
|
||||
// [start_exist, end_exist> and not-existence of
|
||||
// [end_exist, end>
|
||||
//
|
||||
// if backup_id == 0, it means restore from latest
|
||||
// if end == 0, don't check AssertEmpty
|
||||
void AssertBackupConsistency(BackupID backup_id, uint32_t start_exist, |
||||
uint32_t end_exist, uint32_t end = 0) { |
||||
bool opened_restore = false; |
||||
if (restore_db_.get() == nullptr) { |
||||
opened_restore = true; |
||||
OpenRestoreDB(); |
||||
} |
||||
if (backup_id > 0) { |
||||
ASSERT_OK(restore_db_->RestoreDBFromBackup(backup_id, dbname_, dbname_)); |
||||
} else { |
||||
ASSERT_OK(restore_db_->RestoreDBFromLatestBackup(dbname_, dbname_)); |
||||
} |
||||
OpenBackupableDB(); |
||||
AssertExists(db_.get(), start_exist, end_exist); |
||||
if (end != 0) { |
||||
AssertEmpty(db_.get(), end_exist, end); |
||||
} |
||||
CloseBackupableDB(); |
||||
if (opened_restore) { |
||||
CloseRestoreDB(); |
||||
} |
||||
} |
||||
|
||||
// files
|
||||
std::string dbname_; |
||||
std::string backupdir_; |
||||
|
||||
// envs
|
||||
Env* env_; |
||||
unique_ptr<TestEnv> test_db_env_; |
||||
unique_ptr<TestEnv> test_backup_env_; |
||||
unique_ptr<FileManager> file_manager_; |
||||
|
||||
// all the dbs!
|
||||
DummyDB* dummy_db_; // BackupableDB owns dummy_db_
|
||||
unique_ptr<BackupableDB> db_; |
||||
unique_ptr<RestoreBackupableDB> restore_db_; |
||||
|
||||
// options
|
||||
Options options_; |
||||
unique_ptr<BackupableDBOptions> backupable_options_; |
||||
std::shared_ptr<Logger> logger; |
||||
}; // BackupableDBTest
|
||||
|
||||
void AppendPath(const std::string& path, std::vector<std::string>& v) { |
||||
for (auto& f : v) { |
||||
f = path + f; |
||||
} |
||||
} |
||||
|
||||
// this will make sure that backup does not copy the same file twice
|
||||
TEST(BackupableDBTest, NoDoubleCopy) { |
||||
OpenBackupableDB(true, true); |
||||
|
||||
// should write 5 DB files + LATEST_BACKUP + one meta file
|
||||
test_backup_env_->SetLimitWrittenFiles(7); |
||||
test_db_env_->ClearOpenedFiles(); |
||||
test_db_env_->SetLimitWrittenFiles(0); |
||||
dummy_db_->live_files_ = { "/00010.sst", "/00011.sst", |
||||
"/CURRENT", "/MANIFEST-01" }; |
||||
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}}; |
||||
ASSERT_OK(db_->CreateNewBackup(false)); |
||||
std::vector<std::string> should_have_openened = dummy_db_->live_files_; |
||||
should_have_openened.push_back("/00011.log"); |
||||
AppendPath(dbname_, should_have_openened); |
||||
test_db_env_->AssertOpenedFiles(should_have_openened); |
||||
|
||||
// should write 4 new DB files + LATEST_BACKUP + one meta file
|
||||
// should not write/copy 00010.sst, since it's already there!
|
||||
test_backup_env_->SetLimitWrittenFiles(6); |
||||
test_db_env_->ClearOpenedFiles(); |
||||
dummy_db_->live_files_ = { "/00010.sst", "/00015.sst", |
||||
"/CURRENT", "/MANIFEST-01" }; |
||||
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}}; |
||||
ASSERT_OK(db_->CreateNewBackup(false)); |
||||
// should not open 00010.sst - it's already there
|
||||
should_have_openened = { "/00015.sst", "/CURRENT", |
||||
"/MANIFEST-01", "/00011.log" }; |
||||
AppendPath(dbname_, should_have_openened); |
||||
test_db_env_->AssertOpenedFiles(should_have_openened); |
||||
|
||||
ASSERT_OK(db_->DeleteBackup(1)); |
||||
ASSERT_EQ(true, |
||||
test_backup_env_->FileExists(backupdir_ + "/shared/00010.sst")); |
||||
// 00011.sst was only in backup 1, should be deleted
|
||||
ASSERT_EQ(false, |
||||
test_backup_env_->FileExists(backupdir_ + "/shared/00011.sst")); |
||||
ASSERT_EQ(true, |
||||
test_backup_env_->FileExists(backupdir_ + "/shared/00015.sst")); |
||||
|
||||
// MANIFEST file size should be only 100
|
||||
uint64_t size; |
||||
test_backup_env_->GetFileSize(backupdir_ + "/private/2/MANIFEST-01", &size); |
||||
ASSERT_EQ(100, size); |
||||
test_backup_env_->GetFileSize(backupdir_ + "/shared/00015.sst", &size); |
||||
ASSERT_EQ(200, size); |
||||
} |
||||
|
||||
// test various kind of corruptions that may happen:
|
||||
// 1. Not able to write a file for backup - that backup should fail,
|
||||
// everything else should work
|
||||
// 2. Corrupted/deleted LATEST_BACKUP - everything should work fine
|
||||
// 3. Corrupted backup meta file or missing backuped file - we should
|
||||
// not be able to open that backup, but all other backups should be
|
||||
// fine
|
||||
TEST(BackupableDBTest, CorruptionsTest) { |
||||
const int keys_iteration = 20000; |
||||
Random rnd(6); |
||||
Status s; |
||||
|
||||
OpenBackupableDB(true); |
||||
// create five backups
|
||||
for (int i = 0; i < 5; ++i) { |
||||
FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1)); |
||||
ASSERT_OK(db_->CreateNewBackup(!!(rnd.Next() % 2))); |
||||
} |
||||
|
||||
// ---------- case 1. - fail a write -----------
|
||||
// try creating backup 6, but fail a write
|
||||
FillDB(db_.get(), keys_iteration * 5, keys_iteration * 6); |
||||
test_backup_env_->SetLimitWrittenFiles(2); |
||||
// should fail
|
||||
s = db_->CreateNewBackup(!!(rnd.Next() % 2)); |
||||
ASSERT_TRUE(!s.ok()); |
||||
test_backup_env_->SetLimitWrittenFiles(1000000); |
||||
// latest backup should have all the keys
|
||||
CloseBackupableDB(); |
||||
AssertBackupConsistency(0, 0, keys_iteration * 5, keys_iteration * 6); |
||||
|
||||
// ---------- case 2. - corrupt/delete latest backup -----------
|
||||
ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/LATEST_BACKUP", 2)); |
||||
AssertBackupConsistency(0, 0, keys_iteration * 5); |
||||
ASSERT_OK(file_manager_->DeleteFile(backupdir_ + "/LATEST_BACKUP")); |
||||
AssertBackupConsistency(0, 0, keys_iteration * 5); |
||||
// create backup 6, point LATEST_BACKUP to 5
|
||||
OpenBackupableDB(); |
||||
FillDB(db_.get(), keys_iteration * 5, keys_iteration * 6); |
||||
ASSERT_OK(db_->CreateNewBackup(false)); |
||||
CloseBackupableDB(); |
||||
ASSERT_OK(file_manager_->WriteToFile(backupdir_ + "/LATEST_BACKUP", "5")); |
||||
AssertBackupConsistency(0, 0, keys_iteration * 5, keys_iteration * 6); |
||||
// assert that all 6 data is gone!
|
||||
ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/6") == false); |
||||
ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/private/6") == false); |
||||
|
||||
// --------- case 3. corrupted backup meta or missing backuped file ----
|
||||
ASSERT_OK(file_manager_->CorruptFile(backupdir_ + "/meta/5", 3)); |
||||
// since 5 meta is now corrupted, latest backup should be 4
|
||||
AssertBackupConsistency(0, 0, keys_iteration * 4, keys_iteration * 5); |
||||
OpenRestoreDB(); |
||||
s = restore_db_->RestoreDBFromBackup(5, dbname_, dbname_); |
||||
ASSERT_TRUE(!s.ok()); |
||||
CloseRestoreDB(); |
||||
ASSERT_OK(file_manager_->DeleteRandomFileInDir(backupdir_ + "/private/4")); |
||||
// 4 is corrupted, 3 is the latest backup now
|
||||
AssertBackupConsistency(0, 0, keys_iteration * 3, keys_iteration * 5); |
||||
OpenRestoreDB(); |
||||
s = restore_db_->RestoreDBFromBackup(4, dbname_, dbname_); |
||||
CloseRestoreDB(); |
||||
ASSERT_TRUE(!s.ok()); |
||||
|
||||
// new backup should be 4!
|
||||
OpenBackupableDB(); |
||||
FillDB(db_.get(), keys_iteration * 3, keys_iteration * 4); |
||||
ASSERT_OK(db_->CreateNewBackup(!!(rnd.Next() % 2))); |
||||
CloseBackupableDB(); |
||||
AssertBackupConsistency(4, 0, keys_iteration * 4, keys_iteration * 5); |
||||
} |
||||
|
||||
// open DB, write, close DB, backup, restore, repeat
|
||||
TEST(BackupableDBTest, OfflineIntegrationTest) { |
||||
// has to be a big number, so that it triggers the memtable flush
|
||||
const int keys_iteration = 20000; |
||||
const int max_key = keys_iteration * 4 + 10; |
||||
// first iter -- flush before backup
|
||||
// second iter -- don't flush before backup
|
||||
for (int iter = 0; iter < 2; ++iter) { |
||||
// delete old data
|
||||
DestroyDB(dbname_, Options()); |
||||
bool destroy_data = true; |
||||
|
||||
// every iteration --
|
||||
// 1. insert new data in the DB
|
||||
// 2. backup the DB
|
||||
// 3. destroy the db
|
||||
// 4. restore the db, check everything is still there
|
||||
for (int i = 0; i < 5; ++i) { |
||||
// in last iteration, put smaller amount of data,
|
||||
int fill_up_to = std::min(keys_iteration * (i + 1), max_key); |
||||
// ---- insert new data and back up ----
|
||||
OpenBackupableDB(destroy_data); |
||||
destroy_data = false; |
||||
FillDB(db_.get(), keys_iteration * i, fill_up_to); |
||||
ASSERT_OK(db_->CreateNewBackup(iter == 0)); |
||||
CloseBackupableDB(); |
||||
DestroyDB(dbname_, Options()); |
||||
|
||||
// ---- make sure it's empty ----
|
||||
OpenBackupableDB(); |
||||
AssertEmpty(db_.get(), 0, fill_up_to); |
||||
CloseBackupableDB(); |
||||
|
||||
// ---- restore the DB ----
|
||||
OpenRestoreDB(); |
||||
if (i >= 3) { // test purge old backups
|
||||
// when i == 4, purge to only 1 backup
|
||||
// when i == 3, purge to 2 backups
|
||||
ASSERT_OK(restore_db_->PurgeOldBackups(5 - i)); |
||||
} |
||||
// ---- make sure the data is there ---
|
||||
AssertBackupConsistency(0, 0, fill_up_to, max_key); |
||||
CloseRestoreDB(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
// open DB, write, backup, write, backup, close, restore
|
||||
TEST(BackupableDBTest, OnlineIntegrationTest) { |
||||
// has to be a big number, so that it triggers the memtable flush
|
||||
const int keys_iteration = 20000; |
||||
const int max_key = keys_iteration * 4 + 10; |
||||
Random rnd(7); |
||||
// delete old data
|
||||
DestroyDB(dbname_, Options()); |
||||
|
||||
OpenBackupableDB(true); |
||||
// write some data, backup, repeat
|
||||
for (int i = 0; i < 5; ++i) { |
||||
if (i == 4) { |
||||
// delete backup number 2, online delete!
|
||||
OpenRestoreDB(); |
||||
ASSERT_OK(restore_db_->DeleteBackup(2)); |
||||
CloseRestoreDB(); |
||||
} |
||||
// in last iteration, put smaller amount of data,
|
||||
// so that backups can share sst files
|
||||
int fill_up_to = std::min(keys_iteration * (i + 1), max_key); |
||||
FillDB(db_.get(), keys_iteration * i, fill_up_to); |
||||
// we should get consistent results with flush_before_backup
|
||||
// set to both true and false
|
||||
ASSERT_OK(db_->CreateNewBackup(!!(rnd.Next() % 2))); |
||||
} |
||||
// close and destroy
|
||||
CloseBackupableDB(); |
||||
DestroyDB(dbname_, Options()); |
||||
|
||||
// ---- make sure it's empty ----
|
||||
OpenBackupableDB(); |
||||
AssertEmpty(db_.get(), 0, max_key); |
||||
CloseBackupableDB(); |
||||
|
||||
// ---- restore every backup and verify all the data is there ----
|
||||
OpenRestoreDB(); |
||||
for (int i = 1; i <= 5; ++i) { |
||||
if (i == 2) { |
||||
// we deleted backup 2
|
||||
Status s = restore_db_->RestoreDBFromBackup(2, dbname_, dbname_); |
||||
ASSERT_TRUE(!s.ok()); |
||||
} else { |
||||
int fill_up_to = std::min(keys_iteration * i, max_key); |
||||
AssertBackupConsistency(i, 0, fill_up_to, max_key); |
||||
} |
||||
} |
||||
|
||||
// delete some backups -- this should leave only backups 3 and 5 alive
|
||||
ASSERT_OK(restore_db_->DeleteBackup(4)); |
||||
ASSERT_OK(restore_db_->PurgeOldBackups(2)); |
||||
|
||||
std::vector<BackupInfo> backup_info; |
||||
restore_db_->GetBackupInfo(&backup_info); |
||||
ASSERT_EQ(2, backup_info.size()); |
||||
|
||||
// check backup 3
|
||||
AssertBackupConsistency(3, 0, 3 * keys_iteration, max_key); |
||||
// check backup 5
|
||||
AssertBackupConsistency(5, 0, max_key); |
||||
|
||||
CloseRestoreDB(); |
||||
} |
||||
|
||||
} // anon namespace
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) { |
||||
return rocksdb::test::RunAllTests(); |
||||
} |
Loading…
Reference in new issue