@ -242,6 +242,14 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) {
} else if ( parsed_params . cmd = = RestoreCommand : : Name ( ) ) {
} else if ( parsed_params . cmd = = RestoreCommand : : Name ( ) ) {
return new RestoreCommand ( parsed_params . cmd_params ,
return new RestoreCommand ( parsed_params . cmd_params ,
parsed_params . option_map , parsed_params . flags ) ;
parsed_params . option_map , parsed_params . flags ) ;
} else if ( parsed_params . cmd = = WriteExternalSstFilesCommand : : Name ( ) ) {
return new WriteExternalSstFilesCommand ( parsed_params . cmd_params ,
parsed_params . option_map ,
parsed_params . flags ) ;
} else if ( parsed_params . cmd = = IngestExternalSstFilesCommand : : Name ( ) ) {
return new IngestExternalSstFilesCommand ( parsed_params . cmd_params ,
parsed_params . option_map ,
parsed_params . flags ) ;
}
}
return nullptr ;
return nullptr ;
}
}
@ -2939,5 +2947,203 @@ void DBFileDumperCommand::DoCommand() {
}
}
}
}
void WriteExternalSstFilesCommand : : Help ( std : : string & ret ) {
ret . append ( " " ) ;
ret . append ( WriteExternalSstFilesCommand : : Name ( ) ) ;
ret . append ( " <output_sst_path> " ) ;
ret . append ( " \n " ) ;
}
WriteExternalSstFilesCommand : : WriteExternalSstFilesCommand (
const std : : vector < std : : string > & params ,
const std : : map < std : : string , std : : string > & options ,
const std : : vector < std : : string > & flags )
: LDBCommand (
options , flags , false /* is_read_only */ ,
BuildCmdLineOptions ( { ARG_HEX , ARG_KEY_HEX , ARG_VALUE_HEX , ARG_FROM ,
ARG_TO , ARG_CREATE_IF_MISSING } ) ) {
create_if_missing_ =
IsFlagPresent ( flags , ARG_CREATE_IF_MISSING ) | |
ParseBooleanOption ( options , ARG_CREATE_IF_MISSING , false ) ;
if ( params . size ( ) ! = 1 ) {
exec_state_ = LDBCommandExecuteResult : : Failed (
" output SST file path must be specified " ) ;
} else {
output_sst_path_ = params . at ( 0 ) ;
}
}
void WriteExternalSstFilesCommand : : DoCommand ( ) {
if ( ! db_ ) {
assert ( GetExecuteState ( ) . IsFailed ( ) ) ;
return ;
}
ColumnFamilyHandle * cfh = GetCfHandle ( ) ;
SstFileWriter sst_file_writer ( EnvOptions ( ) , db_ - > GetOptions ( ) , cfh ) ;
Status status = sst_file_writer . Open ( output_sst_path_ ) ;
if ( ! status . ok ( ) ) {
exec_state_ = LDBCommandExecuteResult : : Failed ( " failed to open SST file: " +
status . ToString ( ) ) ;
return ;
}
int bad_lines = 0 ;
std : : string line ;
std : : ifstream ifs_stdin ( " /dev/stdin " ) ;
std : : istream * istream_p = ifs_stdin . is_open ( ) ? & ifs_stdin : & std : : cin ;
while ( getline ( * istream_p , line , ' \n ' ) ) {
std : : string key ;
std : : string value ;
if ( ParseKeyValue ( line , & key , & value , is_key_hex_ , is_value_hex_ ) ) {
status = sst_file_writer . Put ( key , value ) ;
if ( ! status . ok ( ) ) {
exec_state_ = LDBCommandExecuteResult : : Failed (
" failed to write record to file: " + status . ToString ( ) ) ;
return ;
}
} else if ( 0 = = line . find ( " Keys in range: " ) ) {
// ignore this line
} else if ( 0 = = line . find ( " Created bg thread 0x " ) ) {
// ignore this line
} else {
bad_lines + + ;
}
}
status = sst_file_writer . Finish ( ) ;
if ( ! status . ok ( ) ) {
exec_state_ = LDBCommandExecuteResult : : Failed (
" Failed to finish writing to file: " + status . ToString ( ) ) ;
return ;
}
if ( bad_lines > 0 ) {
fprintf ( stderr , " Warning: %d bad lines ignored. \n " , bad_lines ) ;
}
exec_state_ = LDBCommandExecuteResult : : Succeed (
" external SST file written to " + output_sst_path_ ) ;
}
Options WriteExternalSstFilesCommand : : PrepareOptionsForOpenDB ( ) {
Options opt = LDBCommand : : PrepareOptionsForOpenDB ( ) ;
opt . create_if_missing = create_if_missing_ ;
return opt ;
}
const std : : string IngestExternalSstFilesCommand : : ARG_MOVE_FILES = " move_files " ;
const std : : string IngestExternalSstFilesCommand : : ARG_SNAPSHOT_CONSISTENCY =
" snapshot_consistency " ;
const std : : string IngestExternalSstFilesCommand : : ARG_ALLOW_GLOBAL_SEQNO =
" allow_global_seqno " ;
const std : : string IngestExternalSstFilesCommand : : ARG_ALLOW_BLOCKING_FLUSH =
" allow_blocking_flush " ;
const std : : string IngestExternalSstFilesCommand : : ARG_INGEST_BEHIND =
" ingest_behind " ;
const std : : string IngestExternalSstFilesCommand : : ARG_WRITE_GLOBAL_SEQNO =
" write_global_seqno " ;
void IngestExternalSstFilesCommand : : Help ( std : : string & ret ) {
ret . append ( " " ) ;
ret . append ( IngestExternalSstFilesCommand : : Name ( ) ) ;
ret . append ( " <input_sst_path> " ) ;
ret . append ( " [-- " + ARG_MOVE_FILES + " ] " ) ;
ret . append ( " [-- " + ARG_SNAPSHOT_CONSISTENCY + " ] " ) ;
ret . append ( " [-- " + ARG_ALLOW_GLOBAL_SEQNO + " ] " ) ;
ret . append ( " [-- " + ARG_ALLOW_BLOCKING_FLUSH + " ] " ) ;
ret . append ( " [-- " + ARG_INGEST_BEHIND + " ] " ) ;
ret . append ( " [-- " + ARG_WRITE_GLOBAL_SEQNO + " ] " ) ;
ret . append ( " \n " ) ;
}
IngestExternalSstFilesCommand : : IngestExternalSstFilesCommand (
const std : : vector < std : : string > & params ,
const std : : map < std : : string , std : : string > & options ,
const std : : vector < std : : string > & flags )
: LDBCommand (
options , flags , false /* is_read_only */ ,
BuildCmdLineOptions ( { ARG_MOVE_FILES , ARG_SNAPSHOT_CONSISTENCY ,
ARG_ALLOW_GLOBAL_SEQNO , ARG_CREATE_IF_MISSING ,
ARG_ALLOW_BLOCKING_FLUSH , ARG_INGEST_BEHIND ,
ARG_WRITE_GLOBAL_SEQNO } ) ) ,
move_files_ ( false ) ,
snapshot_consistency_ ( true ) ,
allow_global_seqno_ ( true ) ,
allow_blocking_flush_ ( true ) ,
ingest_behind_ ( false ) ,
write_global_seqno_ ( true ) {
create_if_missing_ =
IsFlagPresent ( flags , ARG_CREATE_IF_MISSING ) | |
ParseBooleanOption ( options , ARG_CREATE_IF_MISSING , false ) ;
move_files_ = IsFlagPresent ( flags , ARG_MOVE_FILES ) | |
ParseBooleanOption ( options , ARG_MOVE_FILES , false ) ;
snapshot_consistency_ =
IsFlagPresent ( flags , ARG_SNAPSHOT_CONSISTENCY ) | |
ParseBooleanOption ( options , ARG_SNAPSHOT_CONSISTENCY , true ) ;
allow_global_seqno_ =
IsFlagPresent ( flags , ARG_ALLOW_GLOBAL_SEQNO ) | |
ParseBooleanOption ( options , ARG_ALLOW_GLOBAL_SEQNO , true ) ;
allow_blocking_flush_ =
IsFlagPresent ( flags , ARG_ALLOW_BLOCKING_FLUSH ) | |
ParseBooleanOption ( options , ARG_ALLOW_BLOCKING_FLUSH , true ) ;
ingest_behind_ = IsFlagPresent ( flags , ARG_INGEST_BEHIND ) | |
ParseBooleanOption ( options , ARG_INGEST_BEHIND , false ) ;
write_global_seqno_ =
IsFlagPresent ( flags , ARG_WRITE_GLOBAL_SEQNO ) | |
ParseBooleanOption ( options , ARG_WRITE_GLOBAL_SEQNO , true ) ;
if ( allow_global_seqno_ ) {
if ( ! write_global_seqno_ ) {
fprintf ( stderr ,
" Warning: not writing global_seqno to the ingested SST can \n "
" prevent older versions of RocksDB from being able to open it \n " ) ;
}
} else {
if ( write_global_seqno_ ) {
exec_state_ = LDBCommandExecuteResult : : Failed (
" ldb cannot write global_seqno to the ingested SST when global_seqno "
" is not allowed " ) ;
}
}
if ( params . size ( ) ! = 1 ) {
exec_state_ =
LDBCommandExecuteResult : : Failed ( " input SST path must be specified " ) ;
} else {
input_sst_path_ = params . at ( 0 ) ;
}
}
void IngestExternalSstFilesCommand : : DoCommand ( ) {
if ( ! db_ ) {
assert ( GetExecuteState ( ) . IsFailed ( ) ) ;
return ;
}
if ( GetExecuteState ( ) . IsFailed ( ) ) {
return ;
}
ColumnFamilyHandle * cfh = GetCfHandle ( ) ;
IngestExternalFileOptions ifo ;
ifo . move_files = move_files_ ;
ifo . snapshot_consistency = snapshot_consistency_ ;
ifo . allow_global_seqno = allow_global_seqno_ ;
ifo . allow_blocking_flush = allow_blocking_flush_ ;
ifo . ingest_behind = ingest_behind_ ;
ifo . write_global_seqno = write_global_seqno_ ;
Status status = db_ - > IngestExternalFile ( cfh , { input_sst_path_ } , ifo ) ;
if ( ! status . ok ( ) ) {
exec_state_ = LDBCommandExecuteResult : : Failed (
" failed to ingest external SST: " + status . ToString ( ) ) ;
} else {
exec_state_ =
LDBCommandExecuteResult : : Succeed ( " external SST files ingested " ) ;
}
}
Options IngestExternalSstFilesCommand : : PrepareOptionsForOpenDB ( ) {
Options opt = LDBCommand : : PrepareOptionsForOpenDB ( ) ;
opt . create_if_missing = create_if_missing_ ;
return opt ;
}
} // namespace rocksdb
} // namespace rocksdb
# endif // ROCKSDB_LITE
# endif // ROCKSDB_LITE