@ -26,11 +26,11 @@
namespace rocksdb {
namespace rocksdb {
// -------- BackupEngine class ---------
// -------- BackupEngineImpl class ---------
class BackupEngine {
class BackupEngineImpl : public BackupEngine {
public :
public :
BackupEngine ( Env * db_env , const BackupableDBOptions & options ) ;
BackupEngineImpl ( Env * db_env , const BackupableDBOptions & options ) ;
~ BackupEngine ( ) ;
~ BackupEngineImpl ( ) ;
Status CreateNewBackup ( DB * db , bool flush_before_backup = false ) ;
Status CreateNewBackup ( DB * db , bool flush_before_backup = false ) ;
Status PurgeOldBackups ( uint32_t num_backups_to_keep ) ;
Status PurgeOldBackups ( uint32_t num_backups_to_keep ) ;
Status DeleteBackup ( BackupID backup_id ) ;
Status DeleteBackup ( BackupID backup_id ) ;
@ -188,7 +188,13 @@ class BackupEngine {
static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL ; // 5MB
static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL ; // 5MB
} ;
} ;
BackupEngine : : BackupEngine ( Env * db_env , const BackupableDBOptions & options )
BackupEngine * CreateNewBackupEngine ( Env * db_env ,
const BackupableDBOptions & options ) {
return new BackupEngineImpl ( db_env , options ) ;
}
BackupEngineImpl : : BackupEngineImpl ( Env * db_env ,
const BackupableDBOptions & options )
: stop_backup_ ( false ) ,
: stop_backup_ ( false ) ,
options_ ( options ) ,
options_ ( options ) ,
db_env_ ( db_env ) ,
db_env_ ( db_env ) ,
@ -271,11 +277,9 @@ BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options)
latest_backup_id_ ) ;
latest_backup_id_ ) ;
}
}
BackupEngine : : ~ BackupEngine ( ) {
BackupEngineImpl : : ~ BackupEngineImpl ( ) { LogFlush ( options_ . info_log ) ; }
LogFlush ( options_ . info_log ) ;
}
void BackupEngine : : DeleteBackupsNewerThan ( uint64_t sequence_number ) {
void BackupEngineImpl : : DeleteBackupsNewerThan ( uint64_t sequence_number ) {
for ( auto backup : backups_ ) {
for ( auto backup : backups_ ) {
if ( backup . second . GetSequenceNumber ( ) > sequence_number ) {
if ( backup . second . GetSequenceNumber ( ) > sequence_number ) {
Log ( options_ . info_log ,
Log ( options_ . info_log ,
@ -295,7 +299,7 @@ void BackupEngine::DeleteBackupsNewerThan(uint64_t sequence_number) {
GarbageCollection ( false ) ;
GarbageCollection ( false ) ;
}
}
Status BackupEngine : : CreateNewBackup ( DB * db , bool flush_before_backup ) {
Status BackupEngineImpl : : CreateNewBackup ( DB * db , bool flush_before_backup ) {
Status s ;
Status s ;
std : : vector < std : : string > live_files ;
std : : vector < std : : string > live_files ;
VectorLogPtr live_wal_files ;
VectorLogPtr live_wal_files ;
@ -405,7 +409,7 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) {
return s ;
return s ;
}
}
Status BackupEngine : : PurgeOldBackups ( uint32_t num_backups_to_keep ) {
Status BackupEngineImpl : : PurgeOldBackups ( uint32_t num_backups_to_keep ) {
Log ( options_ . info_log , " Purging old backups, keeping %u " ,
Log ( options_ . info_log , " Purging old backups, keeping %u " ,
num_backups_to_keep ) ;
num_backups_to_keep ) ;
while ( num_backups_to_keep < backups_ . size ( ) ) {
while ( num_backups_to_keep < backups_ . size ( ) ) {
@ -418,7 +422,7 @@ Status BackupEngine::PurgeOldBackups(uint32_t num_backups_to_keep) {
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
Status BackupEngine : : DeleteBackup ( BackupID backup_id ) {
Status BackupEngineImpl : : DeleteBackup ( BackupID backup_id ) {
Log ( options_ . info_log , " Deleting backup %u " , backup_id ) ;
Log ( options_ . info_log , " Deleting backup %u " , backup_id ) ;
auto backup = backups_ . find ( backup_id ) ;
auto backup = backups_ . find ( backup_id ) ;
if ( backup = = backups_ . end ( ) ) {
if ( backup = = backups_ . end ( ) ) {
@ -431,7 +435,7 @@ Status BackupEngine::DeleteBackup(BackupID backup_id) {
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
void BackupEngine : : GetBackupInfo ( std : : vector < BackupInfo > * backup_info ) {
void BackupEngineImpl : : GetBackupInfo ( std : : vector < BackupInfo > * backup_info ) {
backup_info - > reserve ( backups_ . size ( ) ) ;
backup_info - > reserve ( backups_ . size ( ) ) ;
for ( auto & backup : backups_ ) {
for ( auto & backup : backups_ ) {
if ( ! backup . second . Empty ( ) ) {
if ( ! backup . second . Empty ( ) ) {
@ -441,9 +445,9 @@ void BackupEngine::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
}
}
}
}
Status BackupEngine : : RestoreDBFromBackup ( BackupID backup_id ,
Status BackupEngineImpl : : RestoreDBFromBackup ( BackupID backup_id ,
const std : : string & db_dir ,
const std : : string & db_dir ,
const std : : string & wal_dir ) {
const std : : string & wal_dir ) {
auto backup_itr = backups_ . find ( backup_id ) ;
auto backup_itr = backups_ . find ( backup_id ) ;
if ( backup_itr = = backups_ . end ( ) ) {
if ( backup_itr = = backups_ . end ( ) ) {
return Status : : NotFound ( " Backup not found " ) ;
return Status : : NotFound ( " Backup not found " ) ;
@ -517,7 +521,7 @@ Status BackupEngine::RestoreDBFromBackup(BackupID backup_id,
}
}
// latest backup id is an ASCII representation of latest backup id
// latest backup id is an ASCII representation of latest backup id
Status BackupEngine : : GetLatestBackupFileContents ( uint32_t * latest_backup ) {
Status BackupEngineImpl : : GetLatestBackupFileContents ( uint32_t * latest_backup ) {
Status s ;
Status s ;
unique_ptr < SequentialFile > file ;
unique_ptr < SequentialFile > file ;
s = backup_env_ - > NewSequentialFile ( GetLatestBackupFile ( ) ,
s = backup_env_ - > NewSequentialFile ( GetLatestBackupFile ( ) ,
@ -547,7 +551,7 @@ Status BackupEngine::GetLatestBackupFileContents(uint32_t* latest_backup) {
// writing 4 bytes to the file is atomic alright, but we should *never*
// writing 4 bytes to the file is atomic alright, but we should *never*
// do something like 1. delete file, 2. write new file
// do something like 1. delete file, 2. write new file
// We write to a tmp file and then atomically rename
// We write to a tmp file and then atomically rename
Status BackupEngine : : PutLatestBackupFileContents ( uint32_t latest_backup ) {
Status BackupEngineImpl : : PutLatestBackupFileContents ( uint32_t latest_backup ) {
Status s ;
Status s ;
unique_ptr < WritableFile > file ;
unique_ptr < WritableFile > file ;
EnvOptions env_options ;
EnvOptions env_options ;
@ -577,14 +581,11 @@ Status BackupEngine::PutLatestBackupFileContents(uint32_t latest_backup) {
return s ;
return s ;
}
}
Status BackupEngine : : CopyFile ( const std : : string & src ,
Status BackupEngineImpl : : CopyFile ( const std : : string & src ,
const std : : string & dst ,
const std : : string & dst , Env * src_env ,
Env * src_env ,
Env * dst_env , bool sync , uint64_t * size ,
Env * dst_env ,
uint32_t * checksum_value ,
bool sync ,
uint64_t size_limit ) {
uint64_t * size ,
uint32_t * checksum_value ,
uint64_t size_limit ) {
Status s ;
Status s ;
unique_ptr < WritableFile > dst_file ;
unique_ptr < WritableFile > dst_file ;
unique_ptr < SequentialFile > src_file ;
unique_ptr < SequentialFile > src_file ;
@ -644,12 +645,10 @@ Status BackupEngine::CopyFile(const std::string& src,
}
}
// src_fname will always start with "/"
// src_fname will always start with "/"
Status BackupEngine : : BackupFile ( BackupID backup_id ,
Status BackupEngineImpl : : BackupFile ( BackupID backup_id , BackupMeta * backup ,
BackupMeta * backup ,
bool shared , const std : : string & src_dir ,
bool shared ,
const std : : string & src_fname ,
const std : : string & src_dir ,
uint64_t size_limit ) {
const std : : string & src_fname ,
uint64_t size_limit ) {
assert ( src_fname . size ( ) > 0 & & src_fname [ 0 ] = = ' / ' ) ;
assert ( src_fname . size ( ) > 0 & & src_fname [ 0 ] = = ' / ' ) ;
std : : string dst_relative = src_fname . substr ( 1 ) ;
std : : string dst_relative = src_fname . substr ( 1 ) ;
@ -697,10 +696,9 @@ Status BackupEngine::BackupFile(BackupID backup_id,
return s ;
return s ;
}
}
Status BackupEngine : : CalculateChecksum ( const std : : string & src ,
Status BackupEngineImpl : : CalculateChecksum ( const std : : string & src , Env * src_env ,
Env * src_env ,
uint64_t size_limit ,
uint64_t size_limit ,
uint32_t * checksum_value ) {
uint32_t * checksum_value ) {
* checksum_value = 0 ;
* checksum_value = 0 ;
if ( size_limit = = 0 ) {
if ( size_limit = = 0 ) {
size_limit = std : : numeric_limits < uint64_t > : : max ( ) ;
size_limit = std : : numeric_limits < uint64_t > : : max ( ) ;
@ -737,7 +735,7 @@ Status BackupEngine::CalculateChecksum(const std::string& src,
return s ;
return s ;
}
}
void BackupEngine : : GarbageCollection ( bool full_scan ) {
void BackupEngineImpl : : GarbageCollection ( bool full_scan ) {
Log ( options_ . info_log , " Starting garbage collection " ) ;
Log ( options_ . info_log , " Starting garbage collection " ) ;
std : : vector < std : : string > to_delete ;
std : : vector < std : : string > to_delete ;
for ( auto & itr : backuped_file_infos_ ) {
for ( auto & itr : backuped_file_infos_ ) {
@ -817,7 +815,7 @@ void BackupEngine::GarbageCollection(bool full_scan) {
// ------- BackupMeta class --------
// ------- BackupMeta class --------
Status BackupEngine : : BackupMeta : : AddFile ( const FileInfo & file_info ) {
Status BackupEngineImpl : : BackupMeta : : AddFile ( const FileInfo & file_info ) {
size_ + = file_info . size ;
size_ + = file_info . size ;
files_ . push_back ( file_info . filename ) ;
files_ . push_back ( file_info . filename ) ;
@ -840,7 +838,7 @@ Status BackupEngine::BackupMeta::AddFile(const FileInfo& file_info) {
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
void BackupEngine : : BackupMeta : : Delete ( ) {
void BackupEngineImpl : : BackupMeta : : Delete ( ) {
for ( const auto & file : files_ ) {
for ( const auto & file : files_ ) {
auto itr = file_infos_ - > find ( file ) ;
auto itr = file_infos_ - > find ( file ) ;
assert ( itr ! = file_infos_ - > end ( ) ) ;
assert ( itr ! = file_infos_ - > end ( ) ) ;
@ -860,7 +858,8 @@ void BackupEngine::BackupMeta::Delete() {
// <file2> <crc32(literal string)> <crc32_value>
// <file2> <crc32(literal string)> <crc32_value>
// ...
// ...
// TODO: maybe add checksum?
// TODO: maybe add checksum?
Status BackupEngine : : BackupMeta : : LoadFromFile ( const std : : string & backup_dir ) {
Status BackupEngineImpl : : BackupMeta : : LoadFromFile (
const std : : string & backup_dir ) {
assert ( Empty ( ) ) ;
assert ( Empty ( ) ) ;
Status s ;
Status s ;
unique_ptr < SequentialFile > backup_meta_file ;
unique_ptr < SequentialFile > backup_meta_file ;
@ -927,7 +926,7 @@ Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) {
return s ;
return s ;
}
}
Status BackupEngine : : BackupMeta : : StoreToFile ( bool sync ) {
Status BackupEngineImpl : : BackupMeta : : StoreToFile ( bool sync ) {
Status s ;
Status s ;
unique_ptr < WritableFile > backup_meta_file ;
unique_ptr < WritableFile > backup_meta_file ;
EnvOptions env_options ;
EnvOptions env_options ;
@ -969,7 +968,8 @@ Status BackupEngine::BackupMeta::StoreToFile(bool sync) {
// --- BackupableDB methods --------
// --- BackupableDB methods --------
BackupableDB : : BackupableDB ( DB * db , const BackupableDBOptions & options )
BackupableDB : : BackupableDB ( DB * db , const BackupableDBOptions & options )
: StackableDB ( db ) , backup_engine_ ( new BackupEngine ( db - > GetEnv ( ) , options ) ) {
: StackableDB ( db ) ,
backup_engine_ ( new BackupEngineImpl ( db - > GetEnv ( ) , options ) ) {
if ( options . share_table_files ) {
if ( options . share_table_files ) {
backup_engine_ - > DeleteBackupsNewerThan ( GetLatestSequenceNumber ( ) ) ;
backup_engine_ - > DeleteBackupsNewerThan ( GetLatestSequenceNumber ( ) ) ;
}
}
@ -1003,7 +1003,7 @@ void BackupableDB::StopBackup() {
RestoreBackupableDB : : RestoreBackupableDB ( Env * db_env ,
RestoreBackupableDB : : RestoreBackupableDB ( Env * db_env ,
const BackupableDBOptions & options )
const BackupableDBOptions & options )
: backup_engine_ ( new BackupEngine ( db_env , options ) ) { }
: backup_engine_ ( new BackupEngineImpl ( db_env , options ) ) { }
RestoreBackupableDB : : ~ RestoreBackupableDB ( ) {
RestoreBackupableDB : : ~ RestoreBackupableDB ( ) {
delete backup_engine_ ;
delete backup_engine_ ;