@ -11,6 +11,7 @@
# include "rocksdb/utilities/backupable_db.h"
# include "db/filename.h"
# include "util/channel.h"
# include "util/coding.h"
# include "util/crc32c.h"
# include "util/logging.h"
@ -30,11 +31,13 @@
# include <string>
# include <limits>
# include <atomic>
# include <future>
# include <thread>
# include <unordered_map>
# include <unordered_set>
namespace rocksdb {
namespace {
class BackupRateLimiter {
public :
BackupRateLimiter ( Env * env , uint64_t max_bytes_per_second ,
@ -75,7 +78,6 @@ class BackupRateLimiter {
uint64_t bytes_since_start_ ;
static const uint64_t kMicrosInSecond = 1000 * 1000LL ;
} ;
} // namespace
void BackupStatistics : : IncrementNumberSuccessBackup ( ) {
number_success_backup + + ;
@ -99,18 +101,21 @@ std::string BackupStatistics::ToString() const {
}
void BackupableDBOptions : : Dump ( Logger * logger ) const {
Log ( logger , " Options.backup_dir: %s " , backup_dir . c_str ( ) ) ;
Log ( logger , " Options.backup_env: %p " , backup_env ) ;
Log ( logger , " Options.share_table_files: %d " ,
Log ( logger , " Options.backup_dir: %s " , backup_dir . c_str ( ) ) ;
Log ( logger , " Options.backup_env: %p " , backup_env ) ;
Log ( logger , " Options.share_table_files: %d " ,
static_cast < int > ( share_table_files ) ) ;
Log ( logger , " Options.info_log: %p " , info_log ) ;
Log ( logger , " Options.sync: %d " , static_cast < int > ( sync ) ) ;
Log ( logger , " Options.destroy_old_data: %d " ,
Log ( logger , " Options.info_log: %p " , info_log ) ;
Log ( logger , " Options.sync: %d " , static_cast < int > ( sync ) ) ;
Log ( logger , " Options.destroy_old_data: %d " ,
static_cast < int > ( destroy_old_data ) ) ;
Log ( logger , " Options.backup_log_files: %d " ,
Log ( logger , " Options.backup_log_files: %d " ,
static_cast < int > ( backup_log_files ) ) ;
Log ( logger , " Options.backup_rate_limit: % " PRIu64 , backup_rate_limit ) ;
Log ( logger , " Options.restore_rate_limit: % " PRIu64 , restore_rate_limit ) ;
Log ( logger , " Options.backup_rate_limit: % " PRIu64 , backup_rate_limit ) ;
Log ( logger , " Options.restore_rate_limit: % " PRIu64 ,
restore_rate_limit ) ;
Log ( logger , " Options.max_background_operations: %d " ,
max_background_operations ) ;
}
// -------- BackupEngineImpl class ---------
@ -303,21 +308,92 @@ class BackupEngineImpl : public BackupEngine {
uint64_t * size = nullptr ,
uint32_t * checksum_value = nullptr ,
uint64_t size_limit = 0 ) ;
// if size_limit == 0, there is no size limit, copy everything
Status BackupFile ( BackupID backup_id ,
BackupMeta * backup ,
bool shared ,
const std : : string & src_dir ,
const std : : string & src_fname , // starts with "/"
BackupRateLimiter * rate_limiter ,
uint64_t size_limit = 0 ,
bool shared_checksum = false ) ;
Status CalculateChecksum ( const std : : string & src ,
Env * src_env ,
uint64_t size_limit ,
uint32_t * checksum_value ) ;
struct CopyResult {
uint64_t size ;
uint32_t checksum_value ;
Status status ;
} ;
struct CopyWorkItem {
std : : string src_path ;
std : : string dst_path ;
Env * src_env ;
Env * dst_env ;
bool sync ;
BackupRateLimiter * rate_limiter ;
uint64_t size_limit ;
std : : promise < CopyResult > result ;
CopyWorkItem ( ) { }
CopyWorkItem ( std : : string _src_path ,
std : : string _dst_path ,
Env * _src_env ,
Env * _dst_env ,
bool _sync ,
BackupRateLimiter * _rate_limiter ,
uint64_t _size_limit )
: src_path ( std : : move ( _src_path ) ) ,
dst_path ( std : : move ( _dst_path ) ) ,
src_env ( _src_env ) ,
dst_env ( _dst_env ) ,
sync ( _sync ) ,
rate_limiter ( _rate_limiter ) ,
size_limit ( _size_limit ) { }
} ;
struct BackupAfterCopyWorkItem {
std : : future < CopyResult > result ;
bool shared ;
bool needed_to_copy ;
Env * backup_env ;
std : : string dst_path_tmp ;
std : : string dst_path ;
std : : string dst_relative ;
BackupAfterCopyWorkItem ( ) { }
BackupAfterCopyWorkItem ( std : : future < CopyResult > _result ,
bool _shared ,
bool _needed_to_copy ,
Env * _backup_env ,
std : : string _dst_path_tmp ,
std : : string _dst_path ,
std : : string _dst_relative )
: result ( std : : move ( _result ) ) ,
shared ( _shared ) ,
needed_to_copy ( _needed_to_copy ) ,
backup_env ( _backup_env ) ,
dst_path_tmp ( std : : move ( _dst_path_tmp ) ) ,
dst_path ( std : : move ( _dst_path ) ) ,
dst_relative ( std : : move ( _dst_relative ) ) { }
} ;
struct RestoreAfterCopyWorkItem {
std : : future < CopyResult > result ;
uint32_t checksum_value ;
RestoreAfterCopyWorkItem ( ) { }
RestoreAfterCopyWorkItem ( std : : future < CopyResult > _result ,
uint32_t _checksum_value )
: result ( std : : move ( _result ) ) ,
checksum_value ( _checksum_value ) { }
} ;
channel < CopyWorkItem > files_to_copy_ ;
std : : vector < std : : thread > threads_ ;
Status AddBackupFileWorkItem (
std : : unordered_set < std : : string > & live_dst_paths ,
std : : vector < BackupAfterCopyWorkItem > & backup_items_to_finish ,
BackupID backup_id ,
bool shared ,
const std : : string & src_dir ,
const std : : string & src_fname , // starts with "/"
BackupRateLimiter * rate_limiter ,
uint64_t size_limit = 0 ,
bool shared_checksum = false ) ;
// backup state data
BackupID latest_backup_id_ ;
std : : map < BackupID , unique_ptr < BackupMeta > > backups_ ;
@ -365,6 +441,27 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
backup_env_ ( options . backup_env ! = nullptr ? options . backup_env : db_env_ ) ,
copy_file_buffer_size_ ( kDefaultCopyFileBufferSize ) ,
read_only_ ( read_only ) {
// set up threads perform copies from files_to_copy_ in the background
for ( int t = 0 ; t < options_ . max_background_operations ; t + + ) {
threads_ . emplace_back ( [ & ] ( ) {
CopyWorkItem work_item ;
while ( files_to_copy_ . read ( work_item ) ) {
CopyResult result ;
result . status = CopyFile ( work_item . src_path ,
work_item . dst_path ,
work_item . src_env ,
work_item . dst_env ,
work_item . sync ,
work_item . rate_limiter ,
& result . size ,
& result . checksum_value ,
work_item . size_limit ) ;
work_item . result . set_value ( std : : move ( result ) ) ;
}
} ) ;
}
if ( read_only_ ) {
Log ( options_ . info_log , " Starting read_only backup engine " ) ;
}
@ -487,9 +584,20 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
Log ( options_ . info_log , " Initialized BackupEngine " ) ;
}
BackupEngineImpl : : ~ BackupEngineImpl ( ) { LogFlush ( options_ . info_log ) ; }
BackupEngineImpl : : ~ BackupEngineImpl ( ) {
files_to_copy_ . sendEof ( ) ;
for ( int i = 0 ; i < options_ . max_background_operations ; i + + ) {
threads_ [ i ] . join ( ) ;
}
LogFlush ( options_ . info_log ) ;
}
Status BackupEngineImpl : : CreateNewBackup ( DB * db , bool flush_before_backup ) {
if ( options_ . max_background_operations > 1 & &
options_ . backup_rate_limit ! = 0 ) {
return Status : : InvalidArgument (
" Multi-threaded backups cannot use a backup_rate_limit " ) ;
}
assert ( ! read_only_ ) ;
Status s ;
std : : vector < std : : string > live_files ;
@ -539,7 +647,15 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
options_ . backup_rate_limit , copy_file_buffer_size_ ) ) ;
}
// copy live_files
// A set into which we will insert the dst_paths that are calculated for live
// files and live WAL files.
// This is used to check whether a live files shares a dst_path with another
// live file.
std : : unordered_set < std : : string > live_dst_paths ;
live_dst_paths . reserve ( live_files . size ( ) + live_wal_files . size ( ) ) ;
std : : vector < BackupAfterCopyWorkItem > backup_items_to_finish ;
// Add a CopyWorkItem to the channel for each live file
for ( size_t i = 0 ; s . ok ( ) & & i < live_files . size ( ) ; + + i ) {
uint64_t number ;
FileType type ;
@ -555,27 +671,49 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
// rules:
// * if it's kTableFile, then it's shared
// * if it's kDescriptorFile, limit the size to manifest_file_size
s = BackupFile ( new_backup_id ,
new_backup . get ( ) ,
options_ . share_table_files & & type = = kTableFile ,
db - > GetName ( ) , /* src_dir */
live_files [ i ] , /* src_fname */
rate_limiter . get ( ) ,
( type = = kDescriptorFile ) ? manifest_file_size : 0 ,
options_ . share_files_with_checksum & & type = = kTableFile ) ;
}
// copy WAL files
s = AddBackupFileWorkItem (
live_dst_paths ,
backup_items_to_finish ,
new_backup_id ,
options_ . share_table_files & & type = = kTableFile ,
db - > GetName ( ) ,
live_files [ i ] ,
rate_limiter . get ( ) ,
( type = = kDescriptorFile ) ? manifest_file_size : 0 ,
options_ . share_files_with_checksum & & type = = kTableFile ) ;
}
// Add a CopyWorkItem to the channel for each WAL file
for ( size_t i = 0 ; s . ok ( ) & & i < live_wal_files . size ( ) ; + + i ) {
if ( live_wal_files [ i ] - > Type ( ) = = kAliveLogFile ) {
// we only care about live log files
// copy the file into backup_dir/files/<new backup>/
s = BackupFile ( new_backup_id ,
new_backup . get ( ) ,
false , /* not shared */
db - > GetOptions ( ) . wal_dir ,
live_wal_files [ i ] - > PathName ( ) ,
rate_limiter . get ( ) ) ;
s = AddBackupFileWorkItem ( live_dst_paths ,
backup_items_to_finish ,
new_backup_id ,
false , /* not shared */
db - > GetOptions ( ) . wal_dir ,
live_wal_files [ i ] - > PathName ( ) ,
rate_limiter . get ( ) ) ;
}
}
Status item_status ;
for ( auto & item : backup_items_to_finish ) {
item . result . wait ( ) ;
auto result = item . result . get ( ) ;
item_status = result . status ;
if ( item_status . ok ( ) & & item . shared & & item . needed_to_copy ) {
item_status = item . backup_env - > RenameFile ( item . dst_path_tmp ,
item . dst_path ) ;
}
if ( item_status . ok ( ) ) {
item_status = new_backup . get ( ) - > AddFile (
std : : make_shared < FileInfo > ( item . dst_relative ,
result . size ,
result . checksum_value ) ) ;
}
if ( ! item_status . ok ( ) ) {
s = item_status ;
}
}
@ -737,6 +875,11 @@ BackupEngineImpl::GetCorruptedBackups(
Status BackupEngineImpl : : RestoreDBFromBackup (
BackupID backup_id , const std : : string & db_dir , const std : : string & wal_dir ,
const RestoreOptions & restore_options ) {
if ( options_ . max_background_operations > 1 & &
options_ . restore_rate_limit ! = 0 ) {
return Status : : InvalidArgument (
" Multi-threaded restores cannot use a restore_rate_limit " ) ;
}
auto corrupt_itr = corrupt_backups_ . find ( backup_id ) ;
if ( corrupt_itr ! = corrupt_backups_ . end ( ) ) {
return corrupt_itr - > second . first ;
@ -794,6 +937,7 @@ Status BackupEngineImpl::RestoreDBFromBackup(
options_ . restore_rate_limit , copy_file_buffer_size_ ) ) ;
}
Status s ;
std : : vector < RestoreAfterCopyWorkItem > restore_items_to_finish ;
for ( const auto & file_info : backup - > GetFiles ( ) ) {
const std : : string & file = file_info - > filename ;
std : : string dst ;
@ -823,14 +967,30 @@ Status BackupEngineImpl::RestoreDBFromBackup(
" / " + dst ;
Log ( options_ . info_log , " Restoring %s to %s \n " , file . c_str ( ) , dst . c_str ( ) ) ;
uint32_t checksum_value ;
s = CopyFile ( GetAbsolutePath ( file ) , dst , backup_env_ , db_env_ , false ,
rate_limiter . get ( ) , nullptr /* size */ , & checksum_value ) ;
if ( ! s . ok ( ) ) {
CopyWorkItem copy_work_item ( GetAbsolutePath ( file ) ,
dst ,
backup_env_ ,
db_env_ ,
false ,
rate_limiter . get ( ) ,
0 /* size_limit */ ) ;
RestoreAfterCopyWorkItem after_copy_work_item (
copy_work_item . result . get_future ( ) ,
file_info - > checksum_value ) ;
files_to_copy_ . write ( std : : move ( copy_work_item ) ) ;
restore_items_to_finish . push_back ( std : : move ( after_copy_work_item ) ) ;
}
Status item_status ;
for ( auto & item : restore_items_to_finish ) {
item . result . wait ( ) ;
auto result = item . result . get ( ) ;
item_status = result . status ;
// Note: It is possible that both of the following bad-status cases occur
// during copying. But, we only return one status.
if ( ! item_status . ok ( ) ) {
s = item_status ;
break ;
}
if ( file_info - > checksum_value ! = checksum_value ) {
} else if ( item . checksum_value ! = result . checksum_value ) {
s = Status : : Corruption ( " Checksum check failed " ) ;
break ;
}
@ -972,13 +1132,16 @@ Status BackupEngineImpl::CopyFile(
}
// src_fname will always start with "/"
Status BackupEngineImpl : : BackupFile ( BackupID backup_id , BackupMeta * backup ,
bool shared , const std : : string & src_dir ,
const std : : string & src_fname ,
BackupRateLimiter * rate_limiter ,
uint64_t size_limit ,
bool shared_checksum ) {
Status BackupEngineImpl : : AddBackupFileWorkItem (
std : : unordered_set < std : : string > & live_dst_paths ,
std : : vector < BackupAfterCopyWorkItem > & backup_items_to_finish ,
BackupID backup_id ,
bool shared ,
const std : : string & src_dir ,
const std : : string & src_fname ,
BackupRateLimiter * rate_limiter ,
uint64_t size_limit ,
bool shared_checksum ) {
assert ( src_fname . size ( ) > 0 & & src_fname [ 0 ] = = ' / ' ) ;
std : : string dst_relative = src_fname . substr ( 1 ) ;
std : : string dst_relative_tmp ;
@ -1012,17 +1175,20 @@ Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup,
std : : string dst_path = GetAbsolutePath ( dst_relative ) ;
std : : string dst_path_tmp = GetAbsolutePath ( dst_relative_tmp ) ;
// if it's shared, we also need to check if it exists -- if it does,
// no need to copy it again
// if it's shared, we also need to check if it exists -- if it does, no need
// to copy it again.
bool need_to_copy = true ;
if ( shared & & backup_env_ - > FileExists ( dst_path ) ) {
// true if dst_path is the same path as another live file
const bool same_path =
live_dst_paths . find ( dst_path ) ! = live_dst_paths . end ( ) ;
if ( shared & & ( backup_env_ - > FileExists ( dst_path ) | | same_path ) ) {
need_to_copy = false ;
if ( shared_checksum ) {
Log ( options_ . info_log ,
" %s already present, with checksum %u and size % " PRIu64 ,
src_fname . c_str ( ) , checksum_value , size ) ;
} else if ( backuped_file_infos_ . find ( dst_relative ) = =
backuped_file_infos_ . end ( ) ) {
backuped_file_infos_ . end ( ) & & ! same_path ) {
// file already exists, but it's not referenced by any backup. overwrite
// the file
Log ( options_ . info_log ,
@ -1040,25 +1206,44 @@ Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup,
& checksum_value ) ;
}
}
live_dst_paths . insert ( dst_path ) ;
if ( need_to_copy ) {
Log ( options_ . info_log , " Copying %s to %s " , src_fname . c_str ( ) ,
dst_path_tmp . c_str ( ) ) ;
s = CopyFile ( src_dir + src_fname ,
dst_path_tmp ,
db_env_ ,
backup_env_ ,
options_ . sync ,
rate_limiter ,
& size ,
& checksum_value ,
size_limit ) ;
if ( s . ok ( ) & & shared ) {
s = backup_env_ - > RenameFile ( dst_path_tmp , dst_path ) ;
}
}
if ( s . ok ( ) ) {
s = backup - > AddFile ( std : : make_shared < FileInfo > (
dst_relative , size , checksum_value ) ) ;
dst_path_tmp . c_str ( ) ) ;
CopyWorkItem copy_work_item ( src_dir + src_fname ,
dst_path_tmp ,
db_env_ ,
backup_env_ ,
options_ . sync ,
rate_limiter ,
size_limit ) ;
BackupAfterCopyWorkItem after_copy_work_item (
copy_work_item . result . get_future ( ) ,
shared ,
need_to_copy ,
backup_env_ ,
dst_path_tmp ,
dst_path ,
dst_relative ) ;
files_to_copy_ . write ( std : : move ( copy_work_item ) ) ;
backup_items_to_finish . push_back ( std : : move ( after_copy_work_item ) ) ;
} else {
std : : promise < CopyResult > promise_result ;
BackupAfterCopyWorkItem after_copy_work_item (
promise_result . get_future ( ) ,
shared ,
need_to_copy ,
backup_env_ ,
dst_path_tmp ,
dst_path ,
dst_relative ) ;
backup_items_to_finish . push_back ( std : : move ( after_copy_work_item ) ) ;
CopyResult result ;
result . status = s ;
result . size = size ;
result . checksum_value = checksum_value ;
promise_result . set_value ( std : : move ( result ) ) ;
}
return s ;
}