@ -9,8 +9,6 @@
# ifndef ROCKSDB_LITE
# include "rocksdb/utilities/backupable_db.h"
# include <stdlib.h>
# include <algorithm>
@ -39,10 +37,12 @@
# include "rocksdb/transaction_log.h"
# include "table/sst_file_dumper.h"
# include "test_util/sync_point.h"
# include "util/cast_util.h"
# include "util/channel.h"
# include "util/coding.h"
# include "util/crc32c.h"
# include "util/string_util.h"
# include "utilities/backupable/backupable_db_impl.h"
# include "utilities/checkpoint/checkpoint_impl.h"
namespace ROCKSDB_NAMESPACE {
@ -189,7 +189,8 @@ class BackupEngineImpl : public BackupEngine {
int refs ;
const std : : string filename ;
const uint64_t size ;
const std : : string checksum_hex ;
// crc32c checksum as hex. empty == unknown / unavailable
std : : string checksum_hex ;
// DB identities
// db_id is obtained for potential usage in the future but not used
// currently
@ -259,8 +260,11 @@ class BackupEngineImpl : public BackupEngine {
// @param abs_path_to_size Pre-fetched file sizes (bytes).
Status LoadFromFile (
const std : : string & backup_dir ,
const std : : unordered_map < std : : string , uint64_t > & abs_path_to_size ) ;
Status StoreToFile ( bool sync ) ;
const std : : unordered_map < std : : string , uint64_t > & abs_path_to_size ,
Logger * info_log ,
std : : unordered_set < std : : string > * reported_ignored_fields ) ;
Status StoreToFile (
bool sync , const TEST_FutureSchemaVersion2Options * test_future_options ) ;
std : : string GetInfoString ( ) {
std : : ostringstream ss ;
@ -329,14 +333,12 @@ class BackupEngineImpl : public BackupEngine {
sid . empty ( ) ;
}
inline std : : string GetSharedFileWithChecksum (
const std : : string & file , bool has_checksum ,
const std : : string & checksum_hex , const uint64_t file_size ,
const std : : string & db_session_id ) const {
const std : : string & file , const std : : string & checksum_hex ,
const uint64_t file_size , const std : : string & db_session_id ) const {
assert ( file . size ( ) = = 0 | | file [ 0 ] ! = ' / ' ) ;
std : : string file_copy = file ;
if ( UseLegacyNaming ( db_session_id ) ) {
assert ( has_checksum ) ;
( void ) has_checksum ;
assert ( ! checksum_hex . empty ( ) ) ;
file_copy . insert ( file_copy . find_last_of ( ' . ' ) ,
" _ " + ToString ( ChecksumHexToInt32 ( checksum_hex ) ) + " _ " +
ToString ( file_size ) ) ;
@ -422,7 +424,6 @@ class BackupEngineImpl : public BackupEngine {
uint64_t size_limit ;
std : : promise < CopyOrCreateResult > result ;
std : : function < void ( ) > progress_callback ;
bool verify_checksum_after_work ;
std : : string src_checksum_func_name ;
std : : string src_checksum_hex ;
std : : string db_id ;
@ -438,7 +439,6 @@ class BackupEngineImpl : public BackupEngine {
sync ( false ) ,
rate_limiter ( nullptr ) ,
size_limit ( 0 ) ,
verify_checksum_after_work ( false ) ,
src_checksum_func_name ( kUnknownFileChecksumFuncName ) ,
src_checksum_hex ( " " ) ,
db_id ( " " ) ,
@ -463,7 +463,6 @@ class BackupEngineImpl : public BackupEngine {
size_limit = o . size_limit ;
result = std : : move ( o . result ) ;
progress_callback = std : : move ( o . progress_callback ) ;
verify_checksum_after_work = o . verify_checksum_after_work ;
src_checksum_func_name = std : : move ( o . src_checksum_func_name ) ;
src_checksum_hex = std : : move ( o . src_checksum_hex ) ;
db_id = std : : move ( o . db_id ) ;
@ -476,7 +475,6 @@ class BackupEngineImpl : public BackupEngine {
Env * _src_env , Env * _dst_env , EnvOptions _src_env_options , bool _sync ,
RateLimiter * _rate_limiter , uint64_t _size_limit ,
std : : function < void ( ) > _progress_callback = [ ] ( ) { } ,
bool _verify_checksum_after_work = false ,
const std : : string & _src_checksum_func_name =
kUnknownFileChecksumFuncName ,
const std : : string & _src_checksum_hex = " " ,
@ -491,7 +489,6 @@ class BackupEngineImpl : public BackupEngine {
rate_limiter ( _rate_limiter ) ,
size_limit ( _size_limit ) ,
progress_callback ( _progress_callback ) ,
verify_checksum_after_work ( _verify_checksum_after_work ) ,
src_checksum_func_name ( _src_checksum_func_name ) ,
src_checksum_hex ( _src_checksum_hex ) ,
db_id ( _db_id ) ,
@ -622,7 +619,11 @@ class BackupEngineImpl : public BackupEngine {
size_t copy_file_buffer_size_ ;
bool read_only_ ;
BackupStatistics backup_statistics_ ;
std : : unordered_set < std : : string > reported_ignored_fields_ ;
static const size_t kMaxAppMetaSize = 1024 * 1024 ; // 1MB
public :
std : : unique_ptr < TEST_FutureSchemaVersion2Options > test_future_options_ ;
} ;
Status BackupEngine : : Open ( const BackupableDBOptions & options , Env * env ,
@ -781,9 +782,12 @@ Status BackupEngineImpl::Initialize() {
for ( const auto & rel_dir :
{ GetSharedFileRel ( ) , GetSharedFileWithChecksumRel ( ) } ) {
const auto abs_dir = GetAbsolutePath ( rel_dir ) ;
// TODO: What do do on error?
InsertPathnameToSizeBytes ( abs_dir , backup_env_ , & abs_path_to_size )
. PermitUncheckedError ( ) ;
Status s =
InsertPathnameToSizeBytes ( abs_dir , backup_env_ , & abs_path_to_size ) ;
if ( ! s . ok ( ) ) {
// I/O error likely impacting all backups
return s ;
}
}
// load the backups if any, until valid_backups_to_open of the latest
// non-corrupted backups have been successfully opened.
@ -805,10 +809,11 @@ Status BackupEngineImpl::Initialize() {
GetAbsolutePath ( GetPrivateFileRel ( backup_iter - > first ) ) , backup_env_ ,
& abs_path_to_size ) ;
if ( s . ok ( ) ) {
s = backup_iter - > second - > LoadFromFile ( options_ . backup_dir ,
abs_path_to_size ) ;
s = backup_iter - > second - > LoadFromFile (
options_ . backup_dir , abs_path_to_size , options_ . info_log ,
& reported_ignored_fields_ ) ;
}
if ( s . IsCorruption ( ) ) {
if ( s . IsCorruption ( ) | | s . IsNotSupported ( ) ) {
ROCKS_LOG_INFO ( options_ . info_log , " Backup %u corrupted -- %s " ,
backup_iter - > first , s . ToString ( ) . c_str ( ) ) ;
corrupt_backups_ . insert (
@ -884,9 +889,9 @@ Status BackupEngineImpl::Initialize() {
work_item . progress_callback ) ;
result . db_id = work_item . db_id ;
result . db_session_id = work_item . db_session_id ;
if ( result . status . ok ( ) & & work_item . verify_checksum_after_work ) {
if ( result . status . ok ( ) & & ! work_item . src_checksum_hex . empty ( ) ) {
// unknown checksum function name implies no db table file checksum in
// db manifest; work_item.verify_checksum_after_work being true means
// db manifest; work_item.src_checksum_hex not empty means
// backup engine has calculated its crc32c checksum for the table
// file; therefore, we are able to compare the checksums.
if ( work_item . src_checksum_func_name = =
@ -901,6 +906,7 @@ Status BackupEngineImpl::Initialize() {
work_item . dst_path + " : " + checksum_info ) ;
}
} else {
// FIXME(peterd): dead code?
std : : string checksum_function_info (
" Existing checksum function is " +
work_item . src_checksum_func_name +
@ -1108,7 +1114,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
if ( s . ok ( ) ) {
// persist the backup metadata on the disk
s = new_backup - > StoreToFile ( options_ . sync ) ;
s = new_backup - > StoreToFile ( options_ . sync , test_future_options_ . get ( ) ) ;
}
if ( s . ok ( ) & & options_ . sync ) {
std : : unique_ptr < Directory > backup_private_directory ;
@ -1426,7 +1432,8 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
if ( ! item_status . ok ( ) ) {
s = item_status ;
break ;
} else if ( item . checksum_hex ! = result . checksum_hex ) {
} else if ( ! item . checksum_hex . empty ( ) & &
item . checksum_hex ! = result . checksum_hex ) {
s = Status : : Corruption ( " Checksum check failed " ) ;
break ;
}
@ -1484,7 +1491,7 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
return Status : : Corruption ( " File corrupted: File size mismatch for " +
abs_path + " : " + size_info ) ;
}
if ( verify_with_checksum ) {
if ( verify_with_checksum & & ! file_info - > checksum_hex . empty ( ) ) {
// verify file checksum
std : : string checksum_hex ;
ROCKS_LOG_INFO ( options_ . info_log , " Verifying %s checksum... \n " ,
@ -1618,11 +1625,10 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
std : : string dst_relative = fname . substr ( 1 ) ;
std : : string dst_relative_tmp ;
std : : string checksum_hex ;
std : : string db_id ;
std : : string db_session_id ;
// whether the checksum for a table file is available
bool has_checksum = false ;
// crc32c checksum in hex. empty == unavailable / unknown
std : : string checksum_hex ;
// Whenever a default checksum function name is passed in, we will compares
// the corresponding checksum values after copying. Note that only table files
@ -1640,7 +1646,6 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
return Status : : Aborted ( " Unknown checksum value for " + fname ) ;
}
checksum_hex = ChecksumStrToHex ( src_checksum_str ) ;
has_checksum = true ;
}
// Step 1: Prepare the relative path to destination
@ -1657,13 +1662,12 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
// If db session id is available, we will not calculate the checksum
// since the session id should suffice to avoid file name collision in
// the shared_checksum directory.
if ( ! has_checksum & & db_session_id . empty ( ) ) {
if ( checksum_hex . empty ( ) & & db_session_id . empty ( ) ) {
Status s = ReadFileAndComputeChecksum (
src_dir + fname , db_env_ , src_env_options , size_limit , & checksum_hex ) ;
if ( ! s . ok ( ) ) {
return s ;
}
has_checksum = true ;
}
if ( size_bytes = = port : : kMaxUint64 ) {
return Status : : NotFound ( " File missing: " + src_dir + fname ) ;
@ -1678,8 +1682,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
// shared_checksum/<file_number>_<db_session_id>.sst
// Otherwise, dst_relative is of the form
// shared_checksum/<file_number>_<checksum>_<size>.sst
dst_relative = GetSharedFileWithChecksum (
dst_relative , has_checksum , checksum_hex , size_bytes , db_session_id ) ;
dst_relative = GetSharedFileWithChecksum ( dst_relative , checksum_hex ,
size_bytes , db_session_id ) ;
dst_relative_tmp = GetSharedFileWithChecksumRel ( dst_relative , true ) ;
dst_relative = GetSharedFileWithChecksumRel ( dst_relative , false ) ;
} else if ( shared ) {
@ -1742,14 +1746,20 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
backup_env_ - > DeleteFile ( final_dest_path ) . PermitUncheckedError ( ) ;
} else {
// file exists and referenced
if ( ! has_checksum ) {
if ( ! same_path ) {
if ( checksum_hex . empty ( ) ) {
// same_path should not happen for a standard DB, so OK to
// read file contents to check for checksum mismatch between
// two files from same DB getting same name.
// For compatibility with future meta file that might not have
// crc32c checksum available, consider it might be empty, but
// we don't currently generate meta file without crc32c checksum.
// Therefore we have to read & compute it if we don't have it.
if ( ! same_path & & ! find_result - > second - > checksum_hex . empty ( ) ) {
assert ( find_result ! = backuped_file_infos_ . end ( ) ) ;
// Note: to save I/O on incremental backups, we copy prior known
// checksum of the file instead of reading entire file contents
// to recompute it.
checksum_hex = find_result - > second - > checksum_hex ;
has_checksum = true ;
// Regarding corruption detection, consider:
// (a) the DB file is corrupt (since previous backup) and the backup
// file is OK: we failed to detect, but the backup is safe. DB can
@ -1765,9 +1775,6 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
// ID, but even in that case, we double check the file sizes in
// BackupMeta::AddFile.
} else {
// same_path should not happen for a standard DB, so OK to
// read file contents to check for checksum mismatch between
// two files from same DB getting same name.
Status s = ReadFileAndComputeChecksum ( src_dir + fname , db_env_ ,
src_env_options , size_limit ,
& checksum_hex ) ;
@ -1798,8 +1805,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
CopyOrCreateWorkItem copy_or_create_work_item (
src_dir . empty ( ) ? " " : src_dir + fname , * copy_dest_path , contents ,
db_env_ , backup_env_ , src_env_options , options_ . sync , rate_limiter ,
size_limit , progress_callback , has_checksum , src_checksum_func_name ,
checksum_hex , db_id , db_session_id ) ;
size_limit , progress_callback , src_checksum_func_name , checksum_hex ,
db_id , db_session_id ) ;
BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item (
copy_or_create_work_item . result . get_future ( ) , shared , need_to_copy ,
backup_env_ , temp_dest_path , final_dest_path , dst_relative ) ;
@ -2091,10 +2098,16 @@ Status BackupEngineImpl::BackupMeta::AddFile(
" backups or backing up to a different backup directory. " ) ;
return Status : : Corruption ( msg ) ;
}
// Note: to save I/O, this check will pass trivially on already backed
// up files that don't have the checksum in their name. And it should
// never fail for files that do have checksum in their name.
if ( itr - > second - > checksum_hex ! = file_info - > checksum_hex ) {
if ( file_info - > checksum_hex . empty ( ) ) {
// No checksum available to check
} else if ( itr - > second - > checksum_hex . empty ( ) ) {
// Remember checksum if newly acquired
itr - > second - > checksum_hex = file_info - > checksum_hex ;
} else if ( itr - > second - > checksum_hex ! = file_info - > checksum_hex ) {
// Note: to save I/O, these will be equal trivially on already backed
// up files that don't have the checksum in their name. And it should
// never fail for files that do have checksum in their name.
// Should never reach here, but produce an appropriate corruption
// message in case we do in a release build.
assert ( false ) ;
@ -2135,20 +2148,85 @@ Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
return s ;
}
Slice kMetaDataPrefix ( " metadata " ) ;
// Constants for backup meta file schema (see LoadFromFile)
namespace {
const std : : string kSchemaVersionPrefix { " schema_version " } ;
const std : : string kFooterMarker { " // FOOTER " } ;
const std : : string kAppMetaDataFieldName { " metadata " } ;
// WART: The checksums are crc32c but named "crc32"
const std : : string kFileCrc32cFieldName { " crc32 " } ;
const std : : string kFileSizeFieldName { " size " } ;
// each backup meta file is of the format:
// Marks a (future) field that should cause failure if not recognized.
// Other fields are assumed to be ignorable. For example, in the future
// we might add
// ni::file_name_escape uri_percent
// to indicate all file names have had spaces and special characters
// escaped using a URI percent encoding.
const std : : string kNonIgnorableFieldPrefix { " ni:: " } ;
} // namespace
// Each backup meta file is of the format (schema version 1):
//----------------------------------------------------------
// <timestamp>
// <seq number>
// <metadata(literal string)> <metadata> (optional)
// metadata <metadata> (optional)
// <number of files>
// <file1> <crc32(literal string)> <crc32c_value>
// <file2> <crc32(literal string)> <crc32c_value>
// <file1> crc32 <crc32c_as_unsigned_decimal >
// <file2> crc32 <crc32c_as_unsigned_decimal >
// ...
//----------------------------------------------------------
//
// For schema version 2.x (not in public APIs, but
// forward-compatibility started):
//----------------------------------------------------------
// schema_version <ver>
// <timestamp>
// <seq number>
// [<field name> <field data>]
// ...
// <number of files>
// <file1>( <field name> <field data no spaces>)*
// <file2>( <field name> <field data no spaces>)*
// ...
// [// FOOTER]
// [<field name> <field data>]
// ...
//----------------------------------------------------------
// where
// <ver> ::= [0-9]+([.][0-9]+)
// <field name> ::= [A-Za-z_][A-Za-z_0-9.]+
// <field data> is anything but newline
// <field data no spaces> is anything but space and newline
// Although "// FOOTER" wouldn't strictly be required as a delimiter
// given the number of files is included, it is there for parsing
// sanity in case of corruption. It is only required if followed
// by footer fields, such as a checksum of the meta file (so far).
// Unrecognized fields are ignored, to support schema evolution on
// non-critical features with forward compatibility. Update schema
// major version for breaking changes. Schema minor versions are indicated
// only for diagnostic/debugging purposes.
//
// Fields in schema version 2.0:
// * Top-level meta fields:
// * Only "metadata" as in schema version 1
// * File meta fields:
// * "crc32" - a crc32c checksum as in schema version 1
// * "size" - the size of the file (new)
// * Footer meta fields:
// * None yet (future use for meta file checksum anticipated)
//
Status BackupEngineImpl : : BackupMeta : : LoadFromFile (
const std : : string & backup_dir ,
const std : : unordered_map < std : : string , uint64_t > & abs_path_to_size ) {
const std : : unordered_map < std : : string , uint64_t > & abs_path_to_size ,
Logger * info_log ,
std : : unordered_set < std : : string > * reported_ignored_fields ) {
assert ( reported_ignored_fields ) ;
assert ( Empty ( ) ) ;
std : : unique_ptr < LineFileReader > backup_meta_reader ;
{
Status s =
@ -2159,49 +2237,90 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
}
}
// If we don't read an explicit schema_version, that implies version 1,
// which is what we call the original backup meta schema.
int schema_major_version = 1 ;
// Failures handled at the end
std : : string line ;
if ( backup_meta_reader - > ReadLine ( & line ) ) {
if ( StartsWith ( line , kSchemaVersionPrefix ) ) {
std : : string ver = line . substr ( kSchemaVersionPrefix . size ( ) ) ;
if ( ver = = " 2 " | | StartsWith ( ver , " 2. " ) ) {
schema_major_version = 2 ;
} else {
return Status : : NotSupported (
" Unsupported/unrecognized schema version: " + ver ) ;
}
line . clear ( ) ;
} else if ( line . empty ( ) ) {
return Status : : Corruption ( " Unexpected empty line " ) ;
}
}
if ( ! line . empty ( ) | | backup_meta_reader - > ReadLine ( & line ) ) {
timestamp_ = std : : strtoull ( line . c_str ( ) , nullptr , /*base*/ 10 ) ;
}
if ( backup_meta_reader - > ReadLine ( & line ) ) {
sequence_number_ = std : : strtoull ( line . c_str ( ) , nullptr , /*base*/ 10 ) ;
}
if ( backup_meta_reader - > ReadLine ( & line ) ) {
Slice data = line ;
if ( data . starts_with ( kMetaDataPrefix ) ) {
uint32_t num_files = UINT32_MAX ;
while ( backup_meta_reader - > ReadLine ( & line ) ) {
if ( line . empty ( ) ) {
return Status : : Corruption ( " Unexpected empty line " ) ;
}
// Number -> number of files -> exit loop reading optional meta fields
if ( line [ 0 ] > = ' 0 ' & & line [ 0 ] < = ' 9 ' ) {
num_files = static_cast < uint32_t > ( strtoul ( line . c_str ( ) , nullptr , 10 ) ) ;
break ;
}
// else, must be a meta field assignment
auto space_pos = line . find_first_of ( ' ' ) ;
if ( space_pos = = std : : string : : npos ) {
return Status : : Corruption ( " Expected number of files or meta field " ) ;
}
std : : string field_name = line . substr ( 0 , space_pos ) ;
std : : string field_data = line . substr ( space_pos + 1 ) ;
if ( field_name = = kAppMetaDataFieldName ) {
// app metadata present
data . remove_prefix ( kMetaDataPrefix . size ( ) ) ;
bool decode_success = data . DecodeHex ( & app_metadata_ ) ;
bool decode_success = Slice ( field_data ) . DecodeHex ( & app_metadata_ ) ;
if ( ! decode_success ) {
return Status : : Corruption (
" Failed to decode stored hex encoded app metadata " ) ;
}
line . clear ( ) ;
} else if ( schema_major_version < 2 ) {
return Status : : Corruption ( " Expected number of files or \" " +
kAppMetaDataFieldName + " \" field " ) ;
} else if ( StartsWith ( field_name , kNonIgnorableFieldPrefix ) ) {
return Status : : NotSupported ( " Unrecognized non-ignorable meta field " +
field_name + " (from future version?) " ) ;
} else {
// process the line below
// Warn the first time we see any particular unrecognized meta field
if ( reported_ignored_fields - > insert ( " meta: " + field_name ) . second ) {
ROCKS_LOG_WARN ( info_log , " Ignoring unrecognized backup meta field %s " ,
field_name . c_str ( ) ) ;
}
}
} else {
line . clear ( ) ;
}
uint32_t num_files = UINT32_MAX ;
if ( ! line . empty ( ) | | backup_meta_reader - > ReadLine ( & line ) ) {
num_files = static_cast < uint32_t > ( strtoul ( line . c_str ( ) , nullptr , 10 ) ) ;
}
std : : vector < std : : shared_ptr < FileInfo > > files ;
bool footer_present = false ;
while ( backup_meta_reader - > ReadLine ( & line ) ) {
std : : vector < std : : string > components = StringSplit ( line , ' ' ) ;
if ( components . size ( ) < 1 ) {
return Status : : Corruption ( " Empty line instead of file entry. " ) ;
}
if ( schema_major_version > = 2 & & components . size ( ) = = 2 & &
line = = kFooterMarker ) {
footer_present = true ;
break ;
}
const std : : string & filename = components [ 0 ] ;
uint64_t size ;
uint64_t actual_ size;
const std : : shared_ptr < FileInfo > file_info = GetFile ( filename ) ;
if ( file_info ) {
size = file_info - > size ;
actual_ size = file_info - > size ;
} else {
std : : string abs_path = backup_dir + " / " + filename ;
auto e = abs_path_to_size . find ( abs_path ) ;
@ -2209,34 +2328,89 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
return Status : : Corruption ( " Pathname in meta file not found on disk: " +
abs_path ) ;
}
size = e - > second ;
actual_ size = e - > second ;
}
if ( components . size ( ) < 3 ) {
return Status : : Corruption ( " File checksum is missing for " + filename +
" in " + meta_filename_ ) ;
if ( schema_major_version > = 2 ) {
if ( components . size ( ) % 2 ! = 1 ) {
return Status : : Corruption (
" Bad number of line components for file entry. " ) ;
}
} else {
// Check restricted original schema
if ( components . size ( ) < 3 ) {
return Status : : Corruption ( " File checksum is missing for " + filename +
" in " + meta_filename_ ) ;
}
if ( components [ 1 ] ! = kFileCrc32cFieldName ) {
return Status : : Corruption ( " Unknown checksum type for " + filename +
" in " + meta_filename_ ) ;
}
if ( components . size ( ) > 3 ) {
return Status : : Corruption ( " Extra data for entry " + filename + " in " +
meta_filename_ ) ;
}
}
// WART: The checksums are crc32c, not original crc32
if ( components [ 1 ] ! = " crc32 " ) {
return Status : : Corruption ( " Unknown checksum type for " + filename +
" in " + meta_filename_ ) ;
std : : string checksum_hex ;
for ( unsigned i = 1 ; i < components . size ( ) ; i + = 2 ) {
const std : : string & field_name = components [ i ] ;
const std : : string & field_data = components [ i + 1 ] ;
if ( field_name = = kFileCrc32cFieldName ) {
uint32_t checksum_value =
static_cast < uint32_t > ( strtoul ( field_data . c_str ( ) , nullptr , 10 ) ) ;
if ( field_data ! = ROCKSDB_NAMESPACE : : ToString ( checksum_value ) ) {
return Status : : Corruption ( " Invalid checksum value for " + filename +
" in " + meta_filename_ ) ;
}
checksum_hex = ChecksumInt32ToHex ( checksum_value ) ;
} else if ( field_name = = kFileSizeFieldName ) {
uint64_t ex_size =
std : : strtoull ( field_data . c_str ( ) , nullptr , /*base*/ 10 ) ;
if ( ex_size ! = actual_size ) {
return Status : : Corruption ( " For file " + filename + " expected size " +
ToString ( ex_size ) + " but found size " +
ToString ( actual_size ) ) ;
}
} else if ( StartsWith ( field_name , kNonIgnorableFieldPrefix ) ) {
return Status : : NotSupported ( " Unrecognized non-ignorable file field " +
field_name + " (from future version?) " ) ;
} else {
// Warn the first time we see any particular unrecognized file field
if ( reported_ignored_fields - > insert ( " file: " + field_name ) . second ) {
ROCKS_LOG_WARN ( info_log , " Ignoring unrecognized backup file field %s " ,
field_name . c_str ( ) ) ;
}
}
}
uint32_t checksum_value =
static_cast < uint32_t > ( strtoul ( components [ 2 ] . c_str ( ) , nullptr , 10 ) ) ;
if ( components [ 2 ] ! = ROCKSDB_NAMESPACE : : ToString ( checksum_value ) ) {
return Status : : Corruption ( " Invalid checksum value for " + filename +
" in " + meta_filename_ ) ;
}
files . emplace_back ( new FileInfo ( filename , actual_size , checksum_hex ) ) ;
}
if ( components . size ( ) > 3 ) {
return Status : : Corruption ( " Extra data for entry " + filename + " in " +
meta_filename_ ) ;
if ( footer_present ) {
assert ( schema_major_version > = 2 ) ;
while ( backup_meta_reader - > ReadLine ( & line ) ) {
if ( line . empty ( ) ) {
return Status : : Corruption ( " Unexpected empty line " ) ;
}
auto space_pos = line . find_first_of ( ' ' ) ;
if ( space_pos = = std : : string : : npos ) {
return Status : : Corruption ( " Expected footer field " ) ;
}
std : : string field_name = line . substr ( 0 , space_pos ) ;
std : : string field_data = line . substr ( space_pos + 1 ) ;
if ( StartsWith ( field_name , kNonIgnorableFieldPrefix ) ) {
return Status : : NotSupported ( " Unrecognized non-ignorable field " +
field_name + " (from future version?) " ) ;
} else if ( reported_ignored_fields - > insert ( " footer: " + field_name )
. second ) {
// Warn the first time we see any particular unrecognized footer field
ROCKS_LOG_WARN ( info_log ,
" Ignoring unrecognized backup meta footer field %s " ,
field_name . c_str ( ) ) ;
}
}
files . emplace_back (
new FileInfo ( filename , size , ChecksumInt32ToHex ( checksum_value ) ) ) ;
}
{
@ -2263,7 +2437,8 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
return Status : : OK ( ) ;
}
Status BackupEngineImpl : : BackupMeta : : StoreToFile ( bool sync ) {
Status BackupEngineImpl : : BackupMeta : : StoreToFile (
bool sync , const TEST_FutureSchemaVersion2Options * test_future_options ) {
Status s ;
std : : unique_ptr < WritableFile > backup_meta_file ;
EnvOptions env_options ;
@ -2275,21 +2450,48 @@ Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
}
std : : ostringstream buf ;
if ( test_future_options ) {
buf < < kSchemaVersionPrefix < < test_future_options - > version < < " \n " ;
}
buf < < static_cast < unsigned long long > ( timestamp_ ) < < " \n " ;
buf < < sequence_number_ < < " \n " ;
if ( ! app_metadata_ . empty ( ) ) {
std : : string hex_encoded_metadata =
Slice ( app_metadata_ ) . ToString ( /* hex */ true ) ;
buf < < kMetaDataPrefix . ToString ( ) < < hex_encoded_metadata < < " \n " ;
buf < < kAppMetaDataFieldName < < " " < < hex_encoded_metadata < < " \n " ;
}
if ( test_future_options ) {
for ( auto & e : test_future_options - > meta_fields ) {
buf < < e . first < < " " < < e . second < < " \n " ;
}
}
buf < < files_ . size ( ) < < " \n " ;
for ( const auto & file : files_ ) {
// use crc32c for now, switch to something else if needed
// WART: The checksums are crc32c, not original crc32
buf < < file - > filename < < " crc32 " < < ChecksumHexToInt32 ( file - > checksum_hex )
< < " \n " ;
buf < < file - > filename ;
if ( test_future_options = = nullptr | |
test_future_options - > crc32c_checksums ) {
// use crc32c for now, switch to something else if needed
buf < < " " < < kFileCrc32cFieldName < < " "
< < ChecksumHexToInt32 ( file - > checksum_hex ) ;
}
if ( test_future_options & & test_future_options - > file_sizes ) {
buf < < " " < < kFileSizeFieldName < < " " < < ToString ( file - > size ) ;
}
if ( test_future_options ) {
for ( auto & e : test_future_options - > file_fields ) {
buf < < " " < < e . first < < " " < < e . second ;
}
}
buf < < " \n " ;
}
if ( test_future_options & & ! test_future_options - > footer_fields . empty ( ) ) {
buf < < kFooterMarker < < " \n " ;
for ( auto & e : test_future_options - > footer_fields ) {
buf < < e . first < < " " < < e . second < < " \n " ;
}
}
s = backup_meta_file - > Append ( Slice ( buf . str ( ) ) ) ;
@ -2368,6 +2570,13 @@ Status BackupEngineReadOnly::Open(const BackupableDBOptions& options, Env* env,
return Status : : OK ( ) ;
}
void TEST_EnableWriteFutureSchemaVersion2 (
BackupEngine * engine , const TEST_FutureSchemaVersion2Options & options ) {
BackupEngineImpl * impl = static_cast_with_check < BackupEngineImpl > ( engine ) ;
impl - > test_future_options_ . reset (
new TEST_FutureSchemaVersion2Options ( options ) ) ;
}
} // namespace ROCKSDB_NAMESPACE
# endif // ROCKSDB_LITE