Merge branch 'master' into columnfamilies

main
Igor Canadi 11 years ago
commit c1071ed95c
  1. 27
      db/db_impl.cc
  2. 70
      db/log_reader.cc
  3. 12
      db/log_reader.h
  4. 179
      db/log_test.cc
  5. 6
      db/version_set.cc
  6. 25
      include/utilities/backupable_db.h
  7. 274
      utilities/backupable/backupable_db.cc
  8. 115
      utilities/backupable/backupable_db_test.cc

@ -3540,8 +3540,8 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
// Pardon the long line but I think it is easier to read this way.
snprintf(buf, sizeof(buf),
" Compactions\n"
"Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count Ln-stall Stall-cnt\n"
"--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
"Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n"
"------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
);
value->append(buf);
for (int level = 0; level < current->NumberLevels(); level++) {
@ -3561,9 +3561,21 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
total_bytes_read += bytes_read;
total_bytes_written += stats_[level].bytes_written;
uint64_t stalls = level == 0 ?
(stall_level0_slowdown_count_ +
stall_level0_num_files_count_ +
stall_memtable_compaction_count_) :
stall_leveln_slowdown_count_[level];
double stall_us = level == 0 ?
(stall_level0_slowdown_ +
stall_level0_num_files_ +
stall_memtable_compaction_) :
stall_leveln_slowdown_[level];
snprintf(
buf, sizeof(buf),
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f %9lu\n",
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f %9lu\n",
level,
files,
current->NumLevelBytes(level) / 1048576.0,
@ -3585,8 +3597,13 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
stats_[level].files_out_levelnp1,
stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1,
stats_[level].count,
stall_leveln_slowdown_[level] / 1000000.0,
(unsigned long) stall_leveln_slowdown_count_[level]);
(int) ((double) stats_[level].micros /
1000.0 /
(stats_[level].count + 1)),
(double) stall_us / 1000.0 / (stalls + 1),
stall_us / 1000000.0,
(unsigned long) stalls);
total_slowdown += stall_leveln_slowdown_[level];
total_slowdown_count += stall_leveln_slowdown_count_[level];
value->append(buf);

@ -28,6 +28,8 @@ Reader::Reader(unique_ptr<SequentialFile>&& file, Reporter* reporter,
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
read_error_(false),
eof_offset_(0),
last_record_offset_(0),
end_of_buffer_offset_(0),
initial_offset_(initial_offset) {
@ -170,6 +172,69 @@ uint64_t Reader::LastRecordOffset() {
return last_record_offset_;
}
void Reader::UnmarkEOF() {
if (read_error_) {
return;
}
eof_ = false;
if (eof_offset_ == 0) {
return;
}
// If the EOF was in the middle of a block (a partial block was read) we have
// to read the rest of the block as ReadPhysicalRecord can only read full
// blocks and expects the file position indicator to be aligned to the start
// of a block.
//
// consumed_bytes + buffer_size() + remaining == kBlockSize
size_t consumed_bytes = eof_offset_ - buffer_.size();
size_t remaining = kBlockSize - eof_offset_;
// backing_store_ is used to concatenate what is left in buffer_ and
// the remainder of the block. If buffer_ already uses backing_store_,
// we just append the new data.
if (buffer_.data() != backing_store_ + consumed_bytes) {
// Buffer_ does not use backing_store_ for storage.
// Copy what is left in buffer_ to backing_store.
memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size());
}
Slice read_buffer;
Status status = file_->Read(remaining, &read_buffer,
backing_store_ + eof_offset_);
size_t added = read_buffer.size();
end_of_buffer_offset_ += added;
if (!status.ok()) {
if (added > 0) {
ReportDrop(added, status);
}
read_error_ = true;
return;
}
if (read_buffer.data() != backing_store_ + eof_offset_) {
// Read did not write to backing_store_
memmove(backing_store_ + eof_offset_, read_buffer.data(),
read_buffer.size());
}
buffer_ = Slice(backing_store_ + consumed_bytes,
eof_offset_ + added - consumed_bytes);
if (added < remaining) {
eof_ = true;
eof_offset_ += added;
} else {
eof_offset_ = 0;
}
}
void Reader::ReportCorruption(size_t bytes, const char* reason) {
ReportDrop(bytes, Status::Corruption(reason));
}
@ -184,7 +249,7 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) {
unsigned int Reader::ReadPhysicalRecord(Slice* result) {
while (true) {
if (buffer_.size() < (size_t)kHeaderSize) {
if (!eof_) {
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
@ -192,10 +257,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) {
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
eof_ = true;
read_error_ = true;
return kEof;
} else if (buffer_.size() < (size_t)kBlockSize) {
eof_ = true;
eof_offset_ = buffer_.size();
}
continue;
} else if (buffer_.size() == 0) {

@ -69,9 +69,10 @@ class Reader {
// when we know more data has been written to the file. we can use this
// function to force the reader to look again in the file.
void UnmarkEOF() {
eof_ = false;
}
// Also aligns the file position indicator to the start of the next block
// by reading the rest of the data from the EOF position to the end of the
// block that was partially read.
void UnmarkEOF();
SequentialFile* file() { return file_.get(); }
@ -82,6 +83,11 @@ class Reader {
char* const backing_store_;
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
bool read_error_; // Error occurred while reading from file
// Offset of the file position indicator within the last block when an
// EOF was detected.
size_t eof_offset_;
// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;

@ -47,36 +47,93 @@ class LogTest {
public:
std::string contents_;
explicit StringDest(Slice& reader_contents) :
WritableFile(),
contents_(""),
reader_contents_(reader_contents),
last_flush_(0) {
reader_contents_ = Slice(contents_.data(), 0);
};
virtual Status Close() { return Status::OK(); }
virtual Status Flush() { return Status::OK(); }
virtual Status Flush() {
ASSERT_TRUE(reader_contents_.size() <= last_flush_);
size_t offset = last_flush_ - reader_contents_.size();
reader_contents_ = Slice(
contents_.data() + offset,
contents_.size() - offset);
last_flush_ = contents_.size();
return Status::OK();
}
virtual Status Sync() { return Status::OK(); }
virtual Status Append(const Slice& slice) {
contents_.append(slice.data(), slice.size());
return Status::OK();
}
void Drop(size_t bytes) {
contents_.resize(contents_.size() - bytes);
reader_contents_ = Slice(
reader_contents_.data(), reader_contents_.size() - bytes);
last_flush_ = contents_.size();
}
private:
Slice& reader_contents_;
size_t last_flush_;
};
class StringSource : public SequentialFile {
public:
Slice contents_;
Slice& contents_;
bool force_error_;
size_t force_error_position_;
bool force_eof_;
size_t force_eof_position_;
bool returned_partial_;
StringSource() : force_error_(false), returned_partial_(false) { }
explicit StringSource(Slice& contents) :
contents_(contents),
force_error_(false),
force_error_position_(0),
force_eof_(false),
force_eof_position_(0),
returned_partial_(false) { }
virtual Status Read(size_t n, Slice* result, char* scratch) {
ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
if (force_error_) {
force_error_ = false;
returned_partial_ = true;
return Status::Corruption("read error");
if (force_error_position_ >= n) {
force_error_position_ -= n;
} else {
*result = Slice(contents_.data(), force_error_position_);
contents_.remove_prefix(force_error_position_);
force_error_ = false;
returned_partial_ = true;
return Status::Corruption("read error");
}
}
if (contents_.size() < n) {
n = contents_.size();
returned_partial_ = true;
}
*result = Slice(contents_.data(), n);
if (force_eof_) {
if (force_eof_position_ >= n) {
force_eof_position_ -= n;
} else {
force_eof_ = false;
n = force_eof_position_;
returned_partial_ = true;
}
}
// By using scratch we ensure that caller has control over the
// lifetime of result.data()
memcpy(scratch, contents_.data(), n);
*result = Slice(scratch, n);
contents_.remove_prefix(n);
return Status::OK();
}
@ -123,10 +180,10 @@ class LogTest {
src->contents_ = dest_contents();
}
Slice reader_contents_;
unique_ptr<StringDest> dest_holder_;
unique_ptr<StringSource> source_holder_;
ReportCollector report_;
bool reading_;
Writer writer_;
Reader reader_;
@ -135,16 +192,15 @@ class LogTest {
static uint64_t initial_offset_last_record_offsets_[];
public:
LogTest() : dest_holder_(new StringDest),
source_holder_(new StringSource),
reading_(false),
LogTest() : reader_contents_(),
dest_holder_(new StringDest(reader_contents_)),
source_holder_(new StringSource(reader_contents_)),
writer_(std::move(dest_holder_)),
reader_(std::move(source_holder_), &report_, true/*checksum*/,
0/*initial_offset*/) {
}
void Write(const std::string& msg) {
ASSERT_TRUE(!reading_) << "Write() after starting to read";
writer_.AddRecord(Slice(msg));
}
@ -153,10 +209,6 @@ class LogTest {
}
std::string Read() {
if (!reading_) {
reading_ = true;
reset_source_contents();
}
std::string scratch;
Slice record;
if (reader_.ReadRecord(&record, &scratch)) {
@ -175,7 +227,9 @@ class LogTest {
}
void ShrinkSize(int bytes) {
dest_contents().resize(dest_contents().size() - bytes);
auto dest = dynamic_cast<StringDest*>(writer_.file());
assert(dest);
dest->Drop(bytes);
}
void FixChecksum(int header_offset, int len) {
@ -185,9 +239,10 @@ class LogTest {
EncodeFixed32(&dest_contents()[header_offset], crc);
}
void ForceError() {
void ForceError(size_t position = 0) {
auto src = dynamic_cast<StringSource*>(reader_.file());
src->force_error_ = true;
src->force_error_position_ = position;
}
size_t DroppedBytes() const {
@ -198,6 +253,22 @@ class LogTest {
return report_.message_;
}
void ForceEOF(size_t position = 0) {
auto src = dynamic_cast<StringSource*>(reader_.file());
src->force_eof_ = true;
src->force_eof_position_ = position;
}
void UnmarkEOF() {
auto src = dynamic_cast<StringSource*>(reader_.file());
src->returned_partial_ = false;
reader_.UnmarkEOF();
}
bool IsEOF() {
return reader_.IsEOF();
}
// Returns OK iff recorded error message contains "msg"
std::string MatchError(const std::string& msg) const {
if (report_.message_.find(msg) == std::string::npos) {
@ -217,9 +288,7 @@ class LogTest {
void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
WriteInitialOffsetLog();
reading_ = true;
unique_ptr<StringSource> source(new StringSource);
source->contents_ = dest_contents();
unique_ptr<StringSource> source(new StringSource(reader_contents_));
unique_ptr<Reader> offset_reader(
new Reader(std::move(source), &report_, true/*checksum*/,
WrittenBytes() + offset_past_end));
@ -231,9 +300,7 @@ class LogTest {
void CheckInitialOffsetRecord(uint64_t initial_offset,
int expected_record_offset) {
WriteInitialOffsetLog();
reading_ = true;
unique_ptr<StringSource> source(new StringSource);
source->contents_ = dest_contents();
unique_ptr<StringSource> source(new StringSource(reader_contents_));
unique_ptr<Reader> offset_reader(
new Reader(std::move(source), &report_, true/*checksum*/,
initial_offset));
@ -520,6 +587,70 @@ TEST(LogTest, ReadPastEnd) {
CheckOffsetPastEndReturnsNoRecords(5);
}
TEST(LogTest, ClearEofSingleBlock) {
Write("foo");
Write("bar");
ForceEOF(3 + kHeaderSize + 2);
ASSERT_EQ("foo", Read());
UnmarkEOF();
ASSERT_EQ("bar", Read());
ASSERT_TRUE(IsEOF());
ASSERT_EQ("EOF", Read());
Write("xxx");
UnmarkEOF();
ASSERT_EQ("xxx", Read());
ASSERT_TRUE(IsEOF());
}
TEST(LogTest, ClearEofMultiBlock) {
size_t num_full_blocks = 5;
size_t n = (kBlockSize - kHeaderSize) * num_full_blocks + 25;
Write(BigString("foo", n));
Write(BigString("bar", n));
ForceEOF(n + num_full_blocks * kHeaderSize + 10);
ASSERT_EQ(BigString("foo", n), Read());
ASSERT_TRUE(IsEOF());
UnmarkEOF();
ASSERT_EQ(BigString("bar", n), Read());
ASSERT_TRUE(IsEOF());
Write(BigString("xxx", n));
UnmarkEOF();
ASSERT_EQ(BigString("xxx", n), Read());
ASSERT_TRUE(IsEOF());
}
TEST(LogTest, ClearEofError) {
// If an error occurs during Read() in UnmarkEOF(), the records contained
// in the buffer should be returned on subsequent calls of ReadRecord()
// until no more full records are left, whereafter ReadRecord() should return
// false to indicate that it cannot read any further.
Write("foo");
Write("bar");
UnmarkEOF();
ASSERT_EQ("foo", Read());
ASSERT_TRUE(IsEOF());
Write("xxx");
ForceError(0);
UnmarkEOF();
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
}
TEST(LogTest, ClearEofError2) {
Write("foo");
Write("bar");
UnmarkEOF();
ASSERT_EQ("foo", Read());
Write("xxx");
ForceError(3);
UnmarkEOF();
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
ASSERT_EQ("OK", MatchError("read error"));
}
} // namespace log
} // namespace rocksdb

@ -1553,8 +1553,10 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
}
}
// find offset in manifest file where this version is stored.
new_manifest_file_size = descriptor_log_->file()->GetFileSize();
if (s.ok()) {
// find offset in manifest file where this version is stored.
new_manifest_file_size = descriptor_log_->file()->GetFileSize();
}
LogFlush(options_->info_log);
mu->Lock();

@ -68,8 +68,6 @@ struct BackupableDBOptions {
destroy_old_data(_destroy_old_data) { }
};
class BackupEngine;
typedef uint32_t BackupID;
struct BackupInfo {
@ -82,6 +80,29 @@ struct BackupInfo {
: backup_id(_backup_id), timestamp(_timestamp), size(_size) {}
};
// Please see the documentation in BackupableDB and RestoreBackupableDB
class BackupEngine {
public:
virtual ~BackupEngine() {}
static BackupEngine* NewBackupEngine(Env* db_env,
const BackupableDBOptions& options);
virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false) = 0;
virtual Status PurgeOldBackups(uint32_t num_backups_to_keep) = 0;
virtual Status DeleteBackup(BackupID backup_id) = 0;
virtual void StopBackup() = 0;
virtual void GetBackupInfo(std::vector<BackupInfo>* backup_info) = 0;
virtual Status RestoreDBFromBackup(BackupID backup_id,
const std::string& db_dir,
const std::string& wal_dir) = 0;
virtual Status RestoreDBFromLatestBackup(const std::string& db_dir,
const std::string& wal_dir) = 0;
virtual void DeleteBackupsNewerThan(uint64_t sequence_number) = 0;
};
// Stack your DB with BackupableDB to be able to backup the DB
class BackupableDB : public StackableDB {
public:

@ -10,6 +10,7 @@
#include "utilities/backupable_db.h"
#include "db/filename.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "rocksdb/transaction_log.h"
#define __STDC_FORMAT_MACROS
@ -25,11 +26,11 @@
namespace rocksdb {
// -------- BackupEngine class ---------
class BackupEngine {
// -------- BackupEngineImpl class ---------
class BackupEngineImpl : public BackupEngine {
public:
BackupEngine(Env* db_env, const BackupableDBOptions& options);
~BackupEngine();
BackupEngineImpl(Env* db_env, const BackupableDBOptions& options);
~BackupEngineImpl();
Status CreateNewBackup(DB* db, bool flush_before_backup = false);
Status PurgeOldBackups(uint32_t num_backups_to_keep);
Status DeleteBackup(BackupID backup_id);
@ -48,12 +49,22 @@ class BackupEngine {
void DeleteBackupsNewerThan(uint64_t sequence_number);
private:
struct FileInfo {
FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
: refs(0), filename(fname), size(sz), checksum_value(checksum) {}
int refs;
const std::string filename;
const uint64_t size;
uint32_t checksum_value;
};
class BackupMeta {
public:
BackupMeta(const std::string& meta_filename,
std::unordered_map<std::string, int>* file_refs, Env* env)
std::unordered_map<std::string, FileInfo>* file_infos, Env* env)
: timestamp_(0), size_(0), meta_filename_(meta_filename),
file_refs_(file_refs), env_(env) {}
file_infos_(file_infos), env_(env) {}
~BackupMeta() {}
@ -73,7 +84,8 @@ class BackupEngine {
return sequence_number_;
}
void AddFile(const std::string& filename, uint64_t size);
Status AddFile(const FileInfo& file_info);
void Delete();
bool Empty() {
@ -96,7 +108,7 @@ class BackupEngine {
std::string const meta_filename_;
// files with relative paths (without "/" prefix!!)
std::vector<std::string> files_;
std::unordered_map<std::string, int>* file_refs_;
std::unordered_map<std::string, FileInfo>* file_infos_;
Env* env_;
static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB
@ -141,6 +153,7 @@ class BackupEngine {
Env* dst_env,
bool sync,
uint64_t* size = nullptr,
uint32_t* checksum_value = nullptr,
uint64_t size_limit = 0);
// if size_limit == 0, there is no size limit, copy everything
Status BackupFile(BackupID backup_id,
@ -149,15 +162,21 @@ class BackupEngine {
const std::string& src_dir,
const std::string& src_fname, // starts with "/"
uint64_t size_limit = 0);
Status CalculateChecksum(const std::string& src,
Env* src_env,
uint64_t size_limit,
uint32_t* checksum_value);
// Will delete all the files we don't need anymore
// If full_scan == true, it will do the full scan of files/ directory
// and delete all the files that are not referenced from backuped_file_refs_
// and delete all the files that are not referenced from backuped_file_infos__
void GarbageCollection(bool full_scan);
// backup state data
BackupID latest_backup_id_;
std::map<BackupID, BackupMeta> backups_;
std::unordered_map<std::string, int> backuped_file_refs_;
std::unordered_map<std::string, FileInfo> backuped_file_infos_;
std::vector<BackupID> obsolete_backups_;
std::atomic<bool> stop_backup_;
@ -169,7 +188,13 @@ class BackupEngine {
static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL; // 5MB
};
BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options)
BackupEngine* BackupEngine::NewBackupEngine(
Env* db_env, const BackupableDBOptions& options) {
return new BackupEngineImpl(db_env, options);
}
BackupEngineImpl::BackupEngineImpl(Env* db_env,
const BackupableDBOptions& options)
: stop_backup_(false),
options_(options),
db_env_(db_env),
@ -198,7 +223,7 @@ BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options)
assert(backups_.find(backup_id) == backups_.end());
backups_.insert(std::make_pair(
backup_id, BackupMeta(GetBackupMetaFile(backup_id),
&backuped_file_refs_, backup_env_)));
&backuped_file_infos_, backup_env_)));
}
if (options_.destroy_old_data) { // Destory old data
@ -252,11 +277,9 @@ BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options)
latest_backup_id_);
}
BackupEngine::~BackupEngine() {
LogFlush(options_.info_log);
}
BackupEngineImpl::~BackupEngineImpl() { LogFlush(options_.info_log); }
void BackupEngine::DeleteBackupsNewerThan(uint64_t sequence_number) {
void BackupEngineImpl::DeleteBackupsNewerThan(uint64_t sequence_number) {
for (auto backup : backups_) {
if (backup.second.GetSequenceNumber() > sequence_number) {
Log(options_.info_log,
@ -276,7 +299,7 @@ void BackupEngine::DeleteBackupsNewerThan(uint64_t sequence_number) {
GarbageCollection(false);
}
Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) {
Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
Status s;
std::vector<std::string> live_files;
VectorLogPtr live_wal_files;
@ -302,7 +325,7 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) {
assert(backups_.find(new_backup_id) == backups_.end());
auto ret = backups_.insert(std::make_pair(
new_backup_id, BackupMeta(GetBackupMetaFile(new_backup_id),
&backuped_file_refs_, backup_env_)));
&backuped_file_infos_, backup_env_)));
assert(ret.second == true);
auto& new_backup = ret.first->second;
new_backup.RecordTimestamp();
@ -386,7 +409,7 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) {
return s;
}
Status BackupEngine::PurgeOldBackups(uint32_t num_backups_to_keep) {
Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
Log(options_.info_log, "Purging old backups, keeping %u",
num_backups_to_keep);
while (num_backups_to_keep < backups_.size()) {
@ -399,7 +422,7 @@ Status BackupEngine::PurgeOldBackups(uint32_t num_backups_to_keep) {
return Status::OK();
}
Status BackupEngine::DeleteBackup(BackupID backup_id) {
Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
Log(options_.info_log, "Deleting backup %u", backup_id);
auto backup = backups_.find(backup_id);
if (backup == backups_.end()) {
@ -412,7 +435,7 @@ Status BackupEngine::DeleteBackup(BackupID backup_id) {
return Status::OK();
}
void BackupEngine::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
backup_info->reserve(backups_.size());
for (auto& backup : backups_) {
if (!backup.second.Empty()) {
@ -422,9 +445,9 @@ void BackupEngine::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
}
}
Status BackupEngine::RestoreDBFromBackup(BackupID backup_id,
const std::string &db_dir,
const std::string &wal_dir) {
Status BackupEngineImpl::RestoreDBFromBackup(BackupID backup_id,
const std::string& db_dir,
const std::string& wal_dir) {
auto backup_itr = backups_.find(backup_id);
if (backup_itr == backups_.end()) {
return Status::NotFound("Backup not found");
@ -478,10 +501,19 @@ Status BackupEngine::RestoreDBFromBackup(BackupID backup_id,
"/" + dst;
Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str());
s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false);
uint32_t checksum_value;
s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false,
nullptr /* size */, &checksum_value);
if (!s.ok()) {
break;
}
const auto iter = backuped_file_infos_.find(file);
assert(iter != backuped_file_infos_.end());
if (iter->second.checksum_value != checksum_value) {
s = Status::Corruption("Checksum check failed");
break;
}
}
Log(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str());
@ -489,7 +521,7 @@ Status BackupEngine::RestoreDBFromBackup(BackupID backup_id,
}
// latest backup id is an ASCII representation of latest backup id
Status BackupEngine::GetLatestBackupFileContents(uint32_t* latest_backup) {
Status BackupEngineImpl::GetLatestBackupFileContents(uint32_t* latest_backup) {
Status s;
unique_ptr<SequentialFile> file;
s = backup_env_->NewSequentialFile(GetLatestBackupFile(),
@ -519,7 +551,7 @@ Status BackupEngine::GetLatestBackupFileContents(uint32_t* latest_backup) {
// writing 4 bytes to the file is atomic alright, but we should *never*
// do something like 1. delete file, 2. write new file
// We write to a tmp file and then atomically rename
Status BackupEngine::PutLatestBackupFileContents(uint32_t latest_backup) {
Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) {
Status s;
unique_ptr<WritableFile> file;
EnvOptions env_options;
@ -549,13 +581,11 @@ Status BackupEngine::PutLatestBackupFileContents(uint32_t latest_backup) {
return s;
}
Status BackupEngine::CopyFile(const std::string& src,
const std::string& dst,
Env* src_env,
Env* dst_env,
bool sync,
uint64_t* size,
uint64_t size_limit) {
Status BackupEngineImpl::CopyFile(const std::string& src,
const std::string& dst, Env* src_env,
Env* dst_env, bool sync, uint64_t* size,
uint32_t* checksum_value,
uint64_t size_limit) {
Status s;
unique_ptr<WritableFile> dst_file;
unique_ptr<SequentialFile> src_file;
@ -564,6 +594,9 @@ Status BackupEngine::CopyFile(const std::string& src,
if (size != nullptr) {
*size = 0;
}
if (checksum_value != nullptr) {
*checksum_value = 0;
}
// Check if size limit is set. if not, set it to very big number
if (size_limit == 0) {
@ -589,12 +622,19 @@ Status BackupEngine::CopyFile(const std::string& src,
copy_file_buffer_size_ : size_limit;
s = src_file->Read(buffer_to_read, &data, buf.get());
size_limit -= data.size();
if (!s.ok()) {
return s;
}
if (size != nullptr) {
*size += data.size();
}
if (s.ok()) {
s = dst_file->Append(data);
if (checksum_value != nullptr) {
*checksum_value = crc32c::Extend(*checksum_value, data.data(),
data.size());
}
s = dst_file->Append(data);
} while (s.ok() && data.size() > 0 && size_limit > 0);
if (s.ok() && sync) {
@ -605,12 +645,10 @@ Status BackupEngine::CopyFile(const std::string& src,
}
// src_fname will always start with "/"
Status BackupEngine::BackupFile(BackupID backup_id,
BackupMeta* backup,
bool shared,
const std::string& src_dir,
const std::string& src_fname,
uint64_t size_limit) {
Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup,
bool shared, const std::string& src_dir,
const std::string& src_fname,
uint64_t size_limit) {
assert(src_fname.size() > 0 && src_fname[0] == '/');
std::string dst_relative = src_fname.substr(1);
@ -629,9 +667,15 @@ Status BackupEngine::BackupFile(BackupID backup_id,
// if it's shared, we also need to check if it exists -- if it does,
// no need to copy it again
uint32_t checksum_value = 0;
if (shared && backup_env_->FileExists(dst_path)) {
backup_env_->GetFileSize(dst_path, &size); // Ignore error
Log(options_.info_log, "%s already present", src_fname.c_str());
Log(options_.info_log, "%s already present, calculate checksum",
src_fname.c_str());
s = CalculateChecksum(src_dir + src_fname,
db_env_,
size_limit,
&checksum_value);
} else {
Log(options_.info_log, "Copying %s", src_fname.c_str());
s = CopyFile(src_dir + src_fname,
@ -640,22 +684,62 @@ Status BackupEngine::BackupFile(BackupID backup_id,
backup_env_,
options_.sync,
&size,
&checksum_value,
size_limit);
if (s.ok() && shared) {
s = backup_env_->RenameFile(dst_path_tmp, dst_path);
}
}
if (s.ok()) {
backup->AddFile(dst_relative, size);
s = backup->AddFile(FileInfo(dst_relative, size, checksum_value));
}
return s;
}
void BackupEngine::GarbageCollection(bool full_scan) {
Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
uint64_t size_limit,
uint32_t* checksum_value) {
*checksum_value = 0;
if (size_limit == 0) {
size_limit = std::numeric_limits<uint64_t>::max();
}
EnvOptions env_options;
env_options.use_mmap_writes = false;
std::unique_ptr<SequentialFile> src_file;
Status s = src_env->NewSequentialFile(src, &src_file, env_options);
if (!s.ok()) {
return s;
}
std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
Slice data;
do {
if (stop_backup_.load(std::memory_order_acquire)) {
return Status::Incomplete("Backup stopped");
}
size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
copy_file_buffer_size_ : size_limit;
s = src_file->Read(buffer_to_read, &data, buf.get());
if (!s.ok()) {
return s;
}
size_limit -= data.size();
*checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size());
} while (data.size() > 0 && size_limit > 0);
return s;
}
void BackupEngineImpl::GarbageCollection(bool full_scan) {
Log(options_.info_log, "Starting garbage collection");
std::vector<std::string> to_delete;
for (auto& itr : backuped_file_refs_) {
if (itr.second == 0) {
for (auto& itr : backuped_file_infos_) {
if (itr.second.refs == 0) {
Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
s.ToString().c_str());
@ -663,7 +747,7 @@ void BackupEngine::GarbageCollection(bool full_scan) {
}
}
for (auto& td : to_delete) {
backuped_file_refs_.erase(td);
backuped_file_infos_.erase(td);
}
if (!full_scan) {
// take care of private dirs -- if full_scan == true, then full_scan will
@ -686,7 +770,7 @@ void BackupEngine::GarbageCollection(bool full_scan) {
for (auto& child : shared_children) {
std::string rel_fname = GetSharedFileRel(child);
// if it's not refcounted, delete it
if (backuped_file_refs_.find(rel_fname) == backuped_file_refs_.end()) {
if (backuped_file_infos_.find(rel_fname) == backuped_file_infos_.end()) {
// this might be a directory, but DeleteFile will just fail in that
// case, so we're good
Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
@ -731,23 +815,34 @@ void BackupEngine::GarbageCollection(bool full_scan) {
// ------- BackupMeta class --------
void BackupEngine::BackupMeta::AddFile(const std::string& filename,
uint64_t size) {
size_ += size;
files_.push_back(filename);
auto itr = file_refs_->find(filename);
if (itr == file_refs_->end()) {
file_refs_->insert(std::make_pair(filename, 1));
Status BackupEngineImpl::BackupMeta::AddFile(const FileInfo& file_info) {
size_ += file_info.size;
files_.push_back(file_info.filename);
auto itr = file_infos_->find(file_info.filename);
if (itr == file_infos_->end()) {
auto ret = file_infos_->insert({file_info.filename, file_info});
if (ret.second) {
ret.first->second.refs = 1;
} else {
// if this happens, something is seriously wrong
return Status::Corruption("In memory metadata insertion error");
}
} else {
++itr->second; // increase refcount if already present
if (itr->second.checksum_value != file_info.checksum_value) {
return Status::Corruption("Checksum mismatch for existing backup file");
}
++itr->second.refs; // increase refcount if already present
}
return Status::OK();
}
void BackupEngine::BackupMeta::Delete() {
for (auto& file : files_) {
auto itr = file_refs_->find(file);
assert(itr != file_refs_->end());
--(itr->second); // decrease refcount
void BackupEngineImpl::BackupMeta::Delete() {
for (const auto& file : files_) {
auto itr = file_infos_->find(file);
assert(itr != file_infos_->end());
--(itr->second.refs); // decrease refcount
}
files_.clear();
// delete meta file
@ -759,11 +854,12 @@ void BackupEngine::BackupMeta::Delete() {
// <timestamp>
// <seq number>
// <number of files>
// <file1>
// <file2>
// <file1> <crc32(literal string)> <crc32_value>
// <file2> <crc32(literal string)> <crc32_value>
// ...
// TODO: maybe add checksum?
Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) {
Status BackupEngineImpl::BackupMeta::LoadFromFile(
const std::string& backup_dir) {
assert(Empty());
Status s;
unique_ptr<SequentialFile> backup_meta_file;
@ -790,25 +886,47 @@ Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) {
sscanf(data.data(), "%u%n", &num_files, &bytes_read);
data.remove_prefix(bytes_read + 1); // +1 for '\n'
std::vector<std::pair<std::string, uint64_t>> files;
std::vector<FileInfo> files;
for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
std::string filename = GetSliceUntil(&data, '\n').ToString();
auto line = GetSliceUntil(&data, '\n');
std::string filename = GetSliceUntil(&line, ' ').ToString();
uint64_t size;
s = env_->GetFileSize(backup_dir + "/" + filename, &size);
files.push_back(std::make_pair(filename, size));
if (line.empty()) {
return Status::Corruption("File checksum is missing");
}
uint32_t checksum_value = 0;
if (line.starts_with("crc32 ")) {
line.remove_prefix(6);
sscanf(line.data(), "%u", &checksum_value);
if (memcmp(line.data(), std::to_string(checksum_value).c_str(),
line.size() - 1) != 0) {
return Status::Corruption("Invalid checksum value");
}
} else {
return Status::Corruption("Unknown checksum type");
}
files.emplace_back(filename, size, checksum_value);
}
if (s.ok()) {
for (auto file : files) {
AddFile(file.first, file.second);
for (const auto& file_info : files) {
s = AddFile(file_info);
if (!s.ok()) {
break;
}
}
}
return s;
}
Status BackupEngine::BackupMeta::StoreToFile(bool sync) {
Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
Status s;
unique_ptr<WritableFile> backup_meta_file;
EnvOptions env_options;
@ -825,8 +943,13 @@ Status BackupEngine::BackupMeta::StoreToFile(bool sync) {
len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
sequence_number_);
len += snprintf(buf.get() + len, buf_size - len, "%zu\n", files_.size());
for (size_t i = 0; i < files_.size(); ++i) {
len += snprintf(buf.get() + len, buf_size - len, "%s\n", files_[i].c_str());
for (const auto& file : files_) {
const auto& iter = file_infos_->find(file);
assert(iter != file_infos_->end());
// use crc32 for now, switch to something else if needed
len += snprintf(buf.get() + len, buf_size - len, "%s crc32 %u\n",
file.c_str(), iter->second.checksum_value);
}
s = backup_meta_file->Append(Slice(buf.get(), (size_t)len));
@ -845,7 +968,8 @@ Status BackupEngine::BackupMeta::StoreToFile(bool sync) {
// --- BackupableDB methods --------
BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options)
: StackableDB(db), backup_engine_(new BackupEngine(db->GetEnv(), options)) {
: StackableDB(db),
backup_engine_(new BackupEngineImpl(db->GetEnv(), options)) {
if (options.share_table_files) {
backup_engine_->DeleteBackupsNewerThan(GetLatestSequenceNumber());
}
@ -879,7 +1003,7 @@ void BackupableDB::StopBackup() {
RestoreBackupableDB::RestoreBackupableDB(Env* db_env,
const BackupableDBOptions& options)
: backup_engine_(new BackupEngine(db_env, options)) {}
: backup_engine_(new BackupEngineImpl(db_env, options)) {}
RestoreBackupableDB::~RestoreBackupableDB() {
delete backup_engine_;

@ -156,7 +156,6 @@ class TestEnv : public EnvWrapper {
Status NewSequentialFile(const std::string& f,
unique_ptr<SequentialFile>* r,
const EnvOptions& options) {
opened_files_.push_back(f);
if (dummy_sequential_file_) {
r->reset(new TestEnv::DummySequentialFile());
return Status::OK();
@ -167,6 +166,7 @@ class TestEnv : public EnvWrapper {
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
const EnvOptions& options) {
written_files_.push_back(f);
if (limit_written_files_ <= 0) {
return Status::IOError("Sorry, can't do this");
}
@ -174,14 +174,14 @@ class TestEnv : public EnvWrapper {
return EnvWrapper::NewWritableFile(f, r, options);
}
void AssertOpenedFiles(std::vector<std::string>& should_have_opened) {
sort(should_have_opened.begin(), should_have_opened.end());
sort(opened_files_.begin(), opened_files_.end());
ASSERT_TRUE(opened_files_ == should_have_opened);
void AssertWrittenFiles(std::vector<std::string>& should_have_written) {
sort(should_have_written.begin(), should_have_written.end());
sort(written_files_.begin(), written_files_.end());
ASSERT_TRUE(written_files_ == should_have_written);
}
void ClearOpenedFiles() {
opened_files_.clear();
void ClearWrittenFiles() {
written_files_.clear();
}
void SetLimitWrittenFiles(uint64_t limit) {
@ -194,7 +194,7 @@ class TestEnv : public EnvWrapper {
private:
bool dummy_sequential_file_ = false;
std::vector<std::string> opened_files_;
std::vector<std::string> written_files_;
uint64_t limit_written_files_ = 1000000;
}; // TestEnv
@ -241,6 +241,46 @@ class FileManager : public EnvWrapper {
return s;
}
Status CorruptChecksum(const std::string& fname, bool appear_valid) {
std::string metadata;
Status s = ReadFileToString(this, fname, &metadata);
if (!s.ok()) {
return s;
}
s = DeleteFile(fname);
if (!s.ok()) {
return s;
}
std::vector<int64_t> positions;
auto pos = metadata.find(" crc32 ");
if (pos == std::string::npos) {
return Status::Corruption("checksum not found");
}
do {
positions.push_back(pos);
pos = metadata.find(" crc32 ", pos + 6);
} while (pos != std::string::npos);
pos = positions[rnd_.Next() % positions.size()];
if (metadata.size() < pos + 7) {
return Status::Corruption("bad CRC32 checksum value");
}
if (appear_valid) {
if (metadata[pos + 8] == '\n') {
// single digit value, safe to insert one more digit
metadata.insert(pos + 8, 1, '0');
} else {
metadata.erase(pos + 8, 1);
}
} else {
metadata[pos + 7] = 'a';
}
return WriteToFile(fname, metadata);
}
Status WriteToFile(const std::string& fname, const std::string& data) {
unique_ptr<WritableFile> file;
EnvOptions env_options;
@ -251,6 +291,7 @@ class FileManager : public EnvWrapper {
}
return file->Append(Slice(data));
}
private:
Random rnd_;
}; // FileManager
@ -414,30 +455,43 @@ TEST(BackupableDBTest, NoDoubleCopy) {
// should write 5 DB files + LATEST_BACKUP + one meta file
test_backup_env_->SetLimitWrittenFiles(7);
test_db_env_->ClearOpenedFiles();
test_backup_env_->ClearWrittenFiles();
test_db_env_->SetLimitWrittenFiles(0);
dummy_db_->live_files_ = { "/00010.sst", "/00011.sst",
"/CURRENT", "/MANIFEST-01" };
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}};
ASSERT_OK(db_->CreateNewBackup(false));
std::vector<std::string> should_have_openened = dummy_db_->live_files_;
should_have_openened.push_back("/00011.log");
AppendPath(dbname_, should_have_openened);
test_db_env_->AssertOpenedFiles(should_have_openened);
std::vector<std::string> should_have_written = {
"/shared/00010.sst.tmp",
"/shared/00011.sst.tmp",
"/private/1.tmp/CURRENT",
"/private/1.tmp/MANIFEST-01",
"/private/1.tmp/00011.log",
"/meta/1.tmp",
"/LATEST_BACKUP.tmp"
};
AppendPath(dbname_ + "_backup", should_have_written);
test_backup_env_->AssertWrittenFiles(should_have_written);
// should write 4 new DB files + LATEST_BACKUP + one meta file
// should not write/copy 00010.sst, since it's already there!
test_backup_env_->SetLimitWrittenFiles(6);
test_db_env_->ClearOpenedFiles();
test_backup_env_->ClearWrittenFiles();
dummy_db_->live_files_ = { "/00010.sst", "/00015.sst",
"/CURRENT", "/MANIFEST-01" };
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}};
ASSERT_OK(db_->CreateNewBackup(false));
// should not open 00010.sst - it's already there
should_have_openened = { "/00015.sst", "/CURRENT",
"/MANIFEST-01", "/00011.log" };
AppendPath(dbname_, should_have_openened);
test_db_env_->AssertOpenedFiles(should_have_openened);
should_have_written = {
"/shared/00015.sst.tmp",
"/private/2.tmp/CURRENT",
"/private/2.tmp/MANIFEST-01",
"/private/2.tmp/00011.log",
"/meta/2.tmp",
"/LATEST_BACKUP.tmp"
};
AppendPath(dbname_ + "_backup", should_have_written);
test_backup_env_->AssertWrittenFiles(should_have_written);
ASSERT_OK(db_->DeleteBackup(1));
ASSERT_EQ(true,
@ -465,6 +519,8 @@ TEST(BackupableDBTest, NoDoubleCopy) {
// 3. Corrupted backup meta file or missing backuped file - we should
// not be able to open that backup, but all other backups should be
// fine
// 4. Corrupted checksum value - if the checksum is not a valid uint32_t,
// db open should fail, otherwise, it aborts during the restore process.
TEST(BackupableDBTest, CorruptionsTest) {
const int keys_iteration = 5000;
Random rnd(6);
@ -521,12 +577,29 @@ TEST(BackupableDBTest, CorruptionsTest) {
CloseRestoreDB();
ASSERT_TRUE(!s.ok());
// new backup should be 4!
// --------- case 4. corrupted checksum value ----
ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/3", false));
// checksum of backup 3 is an invalid value, this can be detected at
// db open time, and it reverts to the previous backup automatically
AssertBackupConsistency(0, 0, keys_iteration * 2, keys_iteration * 5);
// checksum of the backup 2 appears to be valid, this can cause checksum
// mismatch and abort restore process
ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/2", true));
ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/2"));
OpenRestoreDB();
ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/2"));
s = restore_db_->RestoreDBFromBackup(2, dbname_, dbname_);
ASSERT_TRUE(!s.ok());
ASSERT_OK(restore_db_->DeleteBackup(2));
CloseRestoreDB();
AssertBackupConsistency(0, 0, keys_iteration * 1, keys_iteration * 5);
// new backup should be 2!
OpenBackupableDB();
FillDB(db_.get(), keys_iteration * 3, keys_iteration * 4);
FillDB(db_.get(), keys_iteration * 1, keys_iteration * 2);
ASSERT_OK(db_->CreateNewBackup(!!(rnd.Next() % 2)));
CloseBackupableDB();
AssertBackupConsistency(4, 0, keys_iteration * 4, keys_iteration * 5);
AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * 5);
}
// open DB, write, close DB, backup, restore, repeat

Loading…
Cancel
Save