@ -26,17 +26,60 @@
namespace rocksdb {
namespace {
class RateLimiter {
public :
RateLimiter ( Env * env , uint64_t max_bytes_per_second , uint64_t bytes_per_check )
: env_ ( env ) ,
max_bytes_per_second_ ( max_bytes_per_second ) ,
bytes_per_check_ ( bytes_per_check ) ,
micros_start_time_ ( env - > NowMicros ( ) ) ,
bytes_since_start_ ( 0 ) { }
void ReportAndWait ( uint64_t bytes_since_last_call ) {
bytes_since_start_ + = bytes_since_last_call ;
if ( bytes_since_start_ < bytes_per_check_ ) {
// not enough bytes to be rate-limited
return ;
}
uint64_t now = env_ - > NowMicros ( ) ;
uint64_t interval = now - micros_start_time_ ;
uint64_t should_take_micros =
( bytes_since_start_ * kMicrosInSecond ) / max_bytes_per_second_ ;
if ( should_take_micros > interval ) {
env_ - > SleepForMicroseconds ( should_take_micros - interval ) ;
now = env_ - > NowMicros ( ) ;
}
// reset interval
micros_start_time_ = now ;
bytes_since_start_ = 0 ;
}
private :
Env * env_ ;
uint64_t max_bytes_per_second_ ;
uint64_t bytes_per_check_ ;
uint64_t micros_start_time_ ;
uint64_t bytes_since_start_ ;
static const uint64_t kMicrosInSecond = 1000 * 1000LL ;
} ;
} // namespace
void BackupableDBOptions : : Dump ( Logger * logger ) const {
Log ( logger , " Options.backup_dir: %s " , backup_dir . c_str ( ) ) ;
Log ( logger , " Options.backup_env: %p " , backup_env ) ;
Log ( logger , " Options.share_table_files: %d " ,
Log ( logger , " Options.backup_dir: %s " , backup_dir . c_str ( ) ) ;
Log ( logger , " Options.backup_env: %p " , backup_env ) ;
Log ( logger , " Options.share_table_files: %d" ,
static_cast < int > ( share_table_files ) ) ;
Log ( logger , " Options.info_log: %p " , info_log ) ;
Log ( logger , " Options.sync: %d " , static_cast < int > ( sync ) ) ;
Log ( logger , " Options.destroy_old_data: %d " ,
Log ( logger , " Options.info_log: %p " , info_log ) ;
Log ( logger , " Options.sync: %d " , static_cast < int > ( sync ) ) ;
Log ( logger , " Options.destroy_old_data: %d " ,
static_cast < int > ( destroy_old_data ) ) ;
Log ( logger , " Options.backup_log_files: %d " ,
Log ( logger , " Options.backup_log_files: %d " ,
static_cast < int > ( backup_log_files ) ) ;
Log ( logger , " Options.backup_rate_limit: % " PRIu64 , backup_rate_limit ) ;
Log ( logger , " Options.restore_rate_limit: % " PRIu64 , restore_rate_limit ) ;
}
// -------- BackupEngineImpl class ---------
@ -170,6 +213,7 @@ class BackupEngineImpl : public BackupEngine {
Env * src_env ,
Env * dst_env ,
bool sync ,
RateLimiter * rate_limiter ,
uint64_t * size = nullptr ,
uint32_t * checksum_value = nullptr ,
uint64_t size_limit = 0 ) ;
@ -179,6 +223,7 @@ class BackupEngineImpl : public BackupEngine {
bool shared ,
const std : : string & src_dir ,
const std : : string & src_fname , // starts with "/"
RateLimiter * rate_limiter ,
uint64_t size_limit = 0 ) ;
Status CalculateChecksum ( const std : : string & src ,
@ -209,7 +254,8 @@ class BackupEngineImpl : public BackupEngine {
unique_ptr < Directory > meta_directory_ ;
unique_ptr < Directory > private_directory_ ;
static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL ; // 5MB
static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL ; // 5MB
size_t copy_file_buffer_size_ ;
} ;
BackupEngine * BackupEngine : : NewBackupEngine (
@ -222,9 +268,8 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
: stop_backup_ ( false ) ,
options_ ( options ) ,
db_env_ ( db_env ) ,
backup_env_ ( options . backup_env ! = nullptr ? options . backup_env
: db_env_ ) {
backup_env_ ( options . backup_env ! = nullptr ? options . backup_env : db_env_ ) ,
copy_file_buffer_size_ ( kDefaultCopyFileBufferSize ) {
options_ . Dump ( options_ . info_log ) ;
// create all the dirs we need
@ -350,6 +395,13 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
s = backup_env_ - > CreateDir (
GetAbsolutePath ( GetPrivateFileRel ( new_backup_id , true ) ) ) ;
unique_ptr < RateLimiter > rate_limiter ;
if ( options_ . backup_rate_limit > 0 ) {
copy_file_buffer_size_ = options_ . backup_rate_limit / 10 ;
rate_limiter . reset ( new RateLimiter ( db_env_ , options_ . backup_rate_limit ,
copy_file_buffer_size_ ) ) ;
}
// copy live_files
for ( size_t i = 0 ; s . ok ( ) & & i < live_files . size ( ) ; + + i ) {
uint64_t number ;
@ -371,6 +423,7 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
options_ . share_table_files & & type = = kTableFile ,
db - > GetName ( ) , /* src_dir */
live_files [ i ] , /* src_fname */
rate_limiter . get ( ) ,
( type = = kDescriptorFile ) ? manifest_file_size : 0 ) ;
}
@ -383,7 +436,8 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
& new_backup ,
false , /* not shared */
db - > GetOptions ( ) . wal_dir ,
live_wal_files [ i ] - > PathName ( ) ) ;
live_wal_files [ i ] - > PathName ( ) ,
rate_limiter . get ( ) ) ;
}
}
@ -527,6 +581,12 @@ Status BackupEngineImpl::RestoreDBFromBackup(
DeleteChildren ( db_dir ) ;
}
unique_ptr < RateLimiter > rate_limiter ;
if ( options_ . restore_rate_limit > 0 ) {
copy_file_buffer_size_ = options_ . restore_rate_limit / 10 ;
rate_limiter . reset ( new RateLimiter ( db_env_ , options_ . restore_rate_limit ,
copy_file_buffer_size_ ) ) ;
}
Status s ;
for ( auto & file : backup . GetFiles ( ) ) {
std : : string dst ;
@ -551,7 +611,7 @@ Status BackupEngineImpl::RestoreDBFromBackup(
Log ( options_ . info_log , " Restoring %s to %s \n " , file . c_str ( ) , dst . c_str ( ) ) ;
uint32_t checksum_value ;
s = CopyFile ( GetAbsolutePath ( file ) , dst , backup_env_ , db_env_ , false ,
nullptr /* size */ , & checksum_value ) ;
rate_limiter . get ( ) , nullptr /* size */ , & checksum_value ) ;
if ( ! s . ok ( ) ) {
break ;
}
@ -631,7 +691,8 @@ Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) {
Status BackupEngineImpl : : CopyFile ( const std : : string & src ,
const std : : string & dst , Env * src_env ,
Env * dst_env , bool sync , uint64_t * size ,
Env * dst_env , bool sync ,
RateLimiter * rate_limiter , uint64_t * size ,
uint32_t * checksum_value ,
uint64_t size_limit ) {
Status s ;
@ -684,6 +745,9 @@ Status BackupEngineImpl::CopyFile(const std::string& src,
data . size ( ) ) ;
}
s = dst_file - > Append ( data ) ;
if ( rate_limiter ! = nullptr ) {
rate_limiter - > ReportAndWait ( data . size ( ) ) ;
}
} while ( s . ok ( ) & & data . size ( ) > 0 & & size_limit > 0 ) ;
if ( s . ok ( ) & & sync ) {
@ -697,6 +761,7 @@ Status BackupEngineImpl::CopyFile(const std::string& src,
Status BackupEngineImpl : : BackupFile ( BackupID backup_id , BackupMeta * backup ,
bool shared , const std : : string & src_dir ,
const std : : string & src_fname ,
RateLimiter * rate_limiter ,
uint64_t size_limit ) {
assert ( src_fname . size ( ) > 0 & & src_fname [ 0 ] = = ' / ' ) ;
@ -732,6 +797,7 @@ Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup,
db_env_ ,
backup_env_ ,
options_ . sync ,
rate_limiter ,
& size ,
& checksum_value ,
size_limit ) ;