@ -276,6 +276,10 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) {
} else if ( parsed_params . cmd = = ListFileRangeDeletesCommand : : Name ( ) ) {
return new ListFileRangeDeletesCommand ( parsed_params . option_map ,
parsed_params . flags ) ;
} else if ( parsed_params . cmd = = UnsafeRemoveSstFileCommand : : Name ( ) ) {
return new UnsafeRemoveSstFileCommand ( parsed_params . cmd_params ,
parsed_params . option_map ,
parsed_params . flags ) ;
}
return nullptr ;
}
@ -370,34 +374,7 @@ LDBCommand::LDBCommand(const std::map<std::string, std::string>& options,
}
void LDBCommand : : OpenDB ( ) {
if ( ! create_if_missing_ & & try_load_options_ ) {
config_options_ . env = options_ . env ;
Status s = LoadLatestOptions ( config_options_ , db_path_ , & options_ ,
& column_families_ ) ;
if ( ! s . ok ( ) & & ! s . IsNotFound ( ) ) {
// Option file exists but load option file error.
std : : string msg = s . ToString ( ) ;
exec_state_ = LDBCommandExecuteResult : : Failed ( msg ) ;
db_ = nullptr ;
return ;
}
if ( options_ . env - > FileExists ( options_ . wal_dir ) . IsNotFound ( ) ) {
options_ . wal_dir = db_path_ ;
fprintf (
stderr ,
" wal_dir loaded from the option file doesn't exist. Ignore it. \n " ) ;
}
// If merge operator is not set, set a string append operator. There is
// no harm doing it.
for ( auto & cf_entry : column_families_ ) {
if ( ! cf_entry . options . merge_operator ) {
cf_entry . options . merge_operator =
MergeOperators : : CreateStringAppendOperator ( ' : ' ) ;
}
}
}
options_ = PrepareOptionsForOpenDB ( ) ;
PrepareOptions ( ) ;
if ( ! exec_state_ . IsNotStarted ( ) ) {
return ;
}
@ -425,21 +402,6 @@ void LDBCommand::OpenDB() {
}
db_ = db_ttl_ ;
} else {
if ( column_families_ . empty ( ) ) {
// Try to figure out column family lists
std : : vector < std : : string > cf_list ;
st = DB : : ListColumnFamilies ( options_ , db_path_ , & cf_list ) ;
// There is possible the DB doesn't exist yet, for "create if not
// "existing case". The failure is ignored here. We rely on DB::Open()
// to give us the correct error message for problem with opening
// existing DB.
if ( st . ok ( ) & & cf_list . size ( ) > 1 ) {
// Ignore single column family DB.
for ( auto cf_name : cf_list ) {
column_families_ . emplace_back ( cf_name , options_ ) ;
}
}
}
if ( is_read_only_ & & secondary_path_ . empty ( ) ) {
if ( column_families_ . empty ( ) ) {
st = DB : : OpenForReadOnly ( options_ , db_path_ , & db_ ) ;
@ -585,22 +547,8 @@ bool LDBCommand::ParseStringOption(
return false ;
}
Options LDBCommand : : PrepareOptionsForOpenDB ( ) {
ColumnFamilyOptions * cf_opts ;
auto column_families_iter =
std : : find_if ( column_families_ . begin ( ) , column_families_ . end ( ) ,
[ this ] ( const ColumnFamilyDescriptor & cf_desc ) {
return cf_desc . name = = column_family_name_ ;
} ) ;
if ( column_families_iter ! = column_families_ . end ( ) ) {
cf_opts = & column_families_iter - > options ;
} else {
cf_opts = static_cast < ColumnFamilyOptions * > ( & options_ ) ;
}
DBOptions * db_opts = static_cast < DBOptions * > ( & options_ ) ;
db_opts - > create_if_missing = false ;
std : : map < std : : string , std : : string > : : const_iterator itr ;
void LDBCommand : : OverrideBaseOptions ( ) {
options_ . create_if_missing = false ;
BlockBasedTableOptions table_options ;
bool use_table_options = false ;
@ -626,35 +574,35 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
}
}
cf_opts - > force_consistency_checks = force_consistency_checks_ ;
options_ . force_consistency_checks = force_consistency_checks_ ;
if ( use_table_options ) {
cf_opts - > table_factory . reset ( NewBlockBasedTableFactory ( table_options ) ) ;
options_ . table_factory . reset ( NewBlockBasedTableFactory ( table_options ) ) ;
}
itr = option_map_ . find ( ARG_AUTO_COMPACTION ) ;
auto itr = option_map_ . find ( ARG_AUTO_COMPACTION ) ;
if ( itr ! = option_map_ . end ( ) ) {
cf_opts - > disable_auto_compactions = ! StringToBool ( itr - > second ) ;
options_ . disable_auto_compactions = ! StringToBool ( itr - > second ) ;
}
itr = option_map_ . find ( ARG_COMPRESSION_TYPE ) ;
if ( itr ! = option_map_ . end ( ) ) {
std : : string comp = itr - > second ;
if ( comp = = " no " ) {
cf_opts - > compression = kNoCompression ;
options_ . compression = kNoCompression ;
} else if ( comp = = " snappy " ) {
cf_opts - > compression = kSnappyCompression ;
options_ . compression = kSnappyCompression ;
} else if ( comp = = " zlib " ) {
cf_opts - > compression = kZlibCompression ;
options_ . compression = kZlibCompression ;
} else if ( comp = = " bzip2 " ) {
cf_opts - > compression = kBZip2Compression ;
options_ . compression = kBZip2Compression ;
} else if ( comp = = " lz4 " ) {
cf_opts - > compression = kLZ4Compression ;
options_ . compression = kLZ4Compression ;
} else if ( comp = = " lz4hc " ) {
cf_opts - > compression = kLZ4HCCompression ;
options_ . compression = kLZ4HCCompression ;
} else if ( comp = = " xpress " ) {
cf_opts - > compression = kXpressCompression ;
options_ . compression = kXpressCompression ;
} else if ( comp = = " zstd " ) {
cf_opts - > compression = kZSTD ;
options_ . compression = kZSTD ;
} else {
// Unknown compression.
exec_state_ =
@ -666,7 +614,7 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
if ( ParseIntOption ( option_map_ , ARG_COMPRESSION_MAX_DICT_BYTES ,
compression_max_dict_bytes , exec_state_ ) ) {
if ( compression_max_dict_bytes > = 0 ) {
cf_opts - > compression_opts . max_dict_bytes = compression_max_dict_bytes ;
options_ . compression_opts . max_dict_bytes = compression_max_dict_bytes ;
} else {
exec_state_ = LDBCommandExecuteResult : : Failed (
ARG_COMPRESSION_MAX_DICT_BYTES + " must be >= 0. " ) ;
@ -677,7 +625,7 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
if ( ParseIntOption ( option_map_ , ARG_DB_WRITE_BUFFER_SIZE ,
db_write_buffer_size , exec_state_ ) ) {
if ( db_write_buffer_size > = 0 ) {
db_opts - > db_write_buffer_size = db_write_buffer_size ;
options_ . db_write_buffer_size = db_write_buffer_size ;
} else {
exec_state_ = LDBCommandExecuteResult : : Failed ( ARG_DB_WRITE_BUFFER_SIZE +
" must be >= 0. " ) ;
@ -688,7 +636,7 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
if ( ParseIntOption ( option_map_ , ARG_WRITE_BUFFER_SIZE , write_buffer_size ,
exec_state_ ) ) {
if ( write_buffer_size > 0 ) {
cf_opts - > write_buffer_size = write_buffer_size ;
options_ . write_buffer_size = write_buffer_size ;
} else {
exec_state_ = LDBCommandExecuteResult : : Failed ( ARG_WRITE_BUFFER_SIZE +
" must be > 0. " ) ;
@ -698,15 +646,15 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
int file_size ;
if ( ParseIntOption ( option_map_ , ARG_FILE_SIZE , file_size , exec_state_ ) ) {
if ( file_size > 0 ) {
cf_opts - > target_file_size_base = file_size ;
options_ . target_file_size_base = file_size ;
} else {
exec_state_ =
LDBCommandExecuteResult : : Failed ( ARG_FILE_SIZE + " must be > 0. " ) ;
}
}
if ( db_opts - > db_paths . size ( ) = = 0 ) {
db_opts - > db_paths . emplace_back ( db_path_ ,
if ( options_ . db_paths . size ( ) = = 0 ) {
options_ . db_paths . emplace_back ( db_path_ ,
std : : numeric_limits < uint64_t > : : max ( ) ) ;
}
@ -714,18 +662,83 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
if ( ParseIntOption ( option_map_ , ARG_FIX_PREFIX_LEN , fix_prefix_len ,
exec_state_ ) ) {
if ( fix_prefix_len > 0 ) {
cf_opts - > prefix_extractor . reset (
options_ . prefix_extractor . reset (
NewFixedPrefixTransform ( static_cast < size_t > ( fix_prefix_len ) ) ) ;
} else {
exec_state_ =
LDBCommandExecuteResult : : Failed ( ARG_FIX_PREFIX_LEN + " must be > 0. " ) ;
}
}
}
// TODO(ajkr): this return value doesn't reflect the CF options changed, so
// subcommands that rely on this won't see the effect of CF-related CLI args.
// Such subcommands need to be changed to properly support CFs.
return options_ ;
// First, initializes the options state using the OPTIONS file when enabled.
// Second, overrides the options according to the CLI arguments and the
// specific subcommand being run.
void LDBCommand : : PrepareOptions ( ) {
if ( ! create_if_missing_ & & try_load_options_ ) {
config_options_ . env = options_ . env ;
Status s = LoadLatestOptions ( config_options_ , db_path_ , & options_ ,
& column_families_ ) ;
if ( ! s . ok ( ) & & ! s . IsNotFound ( ) ) {
// Option file exists but load option file error.
std : : string msg = s . ToString ( ) ;
exec_state_ = LDBCommandExecuteResult : : Failed ( msg ) ;
db_ = nullptr ;
return ;
}
if ( options_ . env - > FileExists ( options_ . wal_dir ) . IsNotFound ( ) ) {
options_ . wal_dir = db_path_ ;
fprintf (
stderr ,
" wal_dir loaded from the option file doesn't exist. Ignore it. \n " ) ;
}
// If merge operator is not set, set a string append operator.
for ( auto & cf_entry : column_families_ ) {
if ( ! cf_entry . options . merge_operator ) {
cf_entry . options . merge_operator =
MergeOperators : : CreateStringAppendOperator ( ' : ' ) ;
}
}
}
OverrideBaseOptions ( ) ;
if ( exec_state_ . IsFailed ( ) ) {
return ;
}
if ( column_families_ . empty ( ) ) {
// Reads the MANIFEST to figure out what column families exist. In this
// case, the option overrides from the CLI argument/specific subcommand
// apply to all column families.
std : : vector < std : : string > cf_list ;
Status st = DB : : ListColumnFamilies ( options_ , db_path_ , & cf_list ) ;
// It is possible the DB doesn't exist yet, for "create if not
// existing" case. The failure is ignored here. We rely on DB::Open()
// to give us the correct error message for problem with opening
// existing DB.
if ( st . ok ( ) & & cf_list . size ( ) > 1 ) {
// Ignore single column family DB.
for ( auto cf_name : cf_list ) {
column_families_ . emplace_back ( cf_name , options_ ) ;
}
}
} else {
// We got column families from the OPTIONS file. In this case, the option
// overrides from the CLI argument/specific subcommand only apply to the
// column family specified by `--column_family_name`.
auto column_families_iter =
std : : find_if ( column_families_ . begin ( ) , column_families_ . end ( ) ,
[ this ] ( const ColumnFamilyDescriptor & cf_desc ) {
return cf_desc . name = = column_family_name_ ;
} ) ;
if ( column_families_iter = = column_families_ . end ( ) ) {
exec_state_ = LDBCommandExecuteResult : : Failed (
" Non-existing column family " + column_family_name_ ) ;
return ;
}
column_families_iter - > options = options_ ;
}
}
bool LDBCommand : : ParseKeyValue ( const std : : string & line , std : : string * key ,
@ -964,13 +977,12 @@ void DBLoaderCommand::Help(std::string& ret) {
ret . append ( " \n " ) ;
}
Options DBLoaderCommand : : PrepareOptionsForOpenDB ( ) {
Options opt = LDBCommand : : PrepareOptionsForOpenDB ( ) ;
opt . create_if_missing = create_if_missing_ ;
void DBLoaderCommand : : OverrideBaseOptions ( ) {
LDBCommand : : OverrideBaseOptions ( ) ;
options_ . create_if_missing = create_if_missing_ ;
if ( bulk_load_ ) {
opt . PrepareForBulkLoad ( ) ;
options_ . PrepareForBulkLoad ( ) ;
}
return opt ;
}
void DBLoaderCommand : : DoCommand ( ) {
@ -1872,14 +1884,14 @@ void ReduceDBLevelsCommand::Help(std::string& ret) {
ret . append ( " \n " ) ;
}
Options ReduceDBLevelsCommand : : PrepareOptionsForOpenDB ( ) {
Options opt = LDBCommand : : PrepareOptionsForOpenDB ( ) ;
opt . num_levels = old_levels_ ;
opt . max_bytes_for_level_multiplier_additional . resize ( opt . num_levels , 1 ) ;
void ReduceDBLevelsCommand : : OverrideBaseOptions ( ) {
LDBCommand : : OverrideBaseOptions ( ) ;
options_ . num_levels = old_levels_ ;
options_ . max_bytes_for_level_multiplier_additional . resize ( options_ . num_levels ,
1 ) ;
// Disable size compaction
opt . max_bytes_for_level_base = 1ULL < < 50 ;
opt . max_bytes_for_level_multiplier = 1 ;
return opt ;
options_ . max_bytes_for_level_base = 1ULL < < 50 ;
options_ . max_bytes_for_level_multiplier = 1 ;
}
Status ReduceDBLevelsCommand : : GetOldNumOfLevels ( Options & opt ,
@ -1924,9 +1936,9 @@ void ReduceDBLevelsCommand::DoCommand() {
}
Status st ;
Options opt = PrepareOptionsForOpenDB ( ) ;
PrepareOptions ( ) ;
int old_level_num = - 1 ;
st = GetOldNumOfLevels ( opt , & old_level_num ) ;
st = GetOldNumOfLevels ( options_ , & old_level_num ) ;
if ( ! st . ok ( ) ) {
exec_state_ = LDBCommandExecuteResult : : Failed ( st . ToString ( ) ) ;
return ;
@ -1953,7 +1965,8 @@ void ReduceDBLevelsCommand::DoCommand() {
CloseDB ( ) ;
EnvOptions soptions ;
st = VersionSet : : ReduceNumberOfLevels ( db_path_ , & opt , soptions , new_levels_ ) ;
st = VersionSet : : ReduceNumberOfLevels ( db_path_ , & options_ , soptions ,
new_levels_ ) ;
if ( ! st . ok ( ) ) {
exec_state_ = LDBCommandExecuteResult : : Failed ( st . ToString ( ) ) ;
return ;
@ -2020,21 +2033,19 @@ void ChangeCompactionStyleCommand::Help(std::string& ret) {
ret . append ( " \n " ) ;
}
Options ChangeCompactionStyleCommand : : PrepareOptionsForOpenDB ( ) {
Options opt = LDBCommand : : PrepareOptionsForOpenDB ( ) ;
void ChangeCompactionStyleCommand : : OverrideBaseOptions ( ) {
LDBCommand : : OverrideBaseOptions ( ) ;
if ( old_compaction_style_ = = kCompactionStyleLevel & &
new_compaction_style_ = = kCompactionStyleUniversal ) {
// In order to convert from level compaction to universal compaction, we
// need to compact all data into a single file and move it to level 0.
opt . disable_auto_compactions = true ;
opt . target_file_size_base = INT_MAX ;
opt . target_file_size_multiplier = 1 ;
opt . max_bytes_for_level_base = INT_MAX ;
opt . max_bytes_for_level_multiplier = 1 ;
options_ . disable_auto_compactions = true ;
options_ . target_file_size_base = INT_MAX ;
options_ . target_file_size_multiplier = 1 ;
options_ . max_bytes_for_level_base = INT_MAX ;
options_ . max_bytes_for_level_multiplier = 1 ;
}
return opt ;
}
void ChangeCompactionStyleCommand : : DoCommand ( ) {
@ -2481,10 +2492,9 @@ void BatchPutCommand::DoCommand() {
}
}
Options BatchPutCommand : : PrepareOptionsForOpenDB ( ) {
Options opt = LDBCommand : : PrepareOptionsForOpenDB ( ) ;
opt . create_if_missing = create_if_missing_ ;
return opt ;
void BatchPutCommand : : OverrideBaseOptions ( ) {
LDBCommand : : OverrideBaseOptions ( ) ;
options_ . create_if_missing = create_if_missing_ ;
}
// ----------------------------------------------------------------------------
@ -2762,10 +2772,9 @@ void PutCommand::DoCommand() {
}
}
Options PutCommand : : PrepareOptionsForOpenDB ( ) {
Options opt = LDBCommand : : PrepareOptionsForOpenDB ( ) ;
opt . create_if_missing = create_if_missing_ ;
return opt ;
void PutCommand : : OverrideBaseOptions ( ) {
LDBCommand : : OverrideBaseOptions ( ) ;
options_ . create_if_missing = create_if_missing_ ;
}
// ----------------------------------------------------------------------------
@ -2927,10 +2936,14 @@ void RepairCommand::Help(std::string& ret) {
ret . append ( " \n " ) ;
}
void RepairCommand : : OverrideBaseOptions ( ) {
LDBCommand : : OverrideBaseOptions ( ) ;
options_ . info_log . reset ( new StderrLogger ( InfoLogLevel : : WARN_LEVEL ) ) ;
}
void RepairCommand : : DoCommand ( ) {
Options options = PrepareOptionsForOpenDB ( ) ;
options . info_log . reset ( new StderrLogger ( InfoLogLevel : : WARN_LEVEL ) ) ;
Status status = RepairDB ( db_path_ , options ) ;
PrepareOptions ( ) ;
Status status = RepairDB ( db_path_ , options_ ) ;
if ( status . ok ( ) ) {
fprintf ( stdout , " OK \n " ) ;
} else {
@ -3273,10 +3286,9 @@ void WriteExternalSstFilesCommand::DoCommand() {
" external SST file written to " + output_sst_path_ ) ;
}
Options WriteExternalSstFilesCommand : : PrepareOptionsForOpenDB ( ) {
Options opt = LDBCommand : : PrepareOptionsForOpenDB ( ) ;
opt . create_if_missing = create_if_missing_ ;
return opt ;
void WriteExternalSstFilesCommand : : OverrideBaseOptions ( ) {
LDBCommand : : OverrideBaseOptions ( ) ;
options_ . create_if_missing = create_if_missing_ ;
}
const std : : string IngestExternalSstFilesCommand : : ARG_MOVE_FILES = " move_files " ;
@ -3388,10 +3400,9 @@ void IngestExternalSstFilesCommand::DoCommand() {
}
}
Options IngestExternalSstFilesCommand : : PrepareOptionsForOpenDB ( ) {
Options opt = LDBCommand : : PrepareOptionsForOpenDB ( ) ;
opt . create_if_missing = create_if_missing_ ;
return opt ;
void IngestExternalSstFilesCommand : : OverrideBaseOptions ( ) {
LDBCommand : : OverrideBaseOptions ( ) ;
options_ . create_if_missing = create_if_missing_ ;
}
ListFileRangeDeletesCommand : : ListFileRangeDeletesCommand (
@ -3440,8 +3451,87 @@ void ListFileRangeDeletesCommand::DoCommand() {
TEST_SYNC_POINT_CALLBACK (
" ListFileRangeDeletesCommand::DoCommand:BeforePrint " , & out_str ) ;
fprintf ( stdout , " %s \n " , out_str . c_str ( ) ) ;
}
}
void UnsafeRemoveSstFileCommand : : Help ( std : : string & ret ) {
ret . append ( " " ) ;
ret . append ( UnsafeRemoveSstFileCommand : : Name ( ) ) ;
ret . append ( " <SST file number> " ) ;
ret . append ( " \n " ) ;
ret . append ( " MUST NOT be used on a live DB. " ) ;
ret . append ( " \n " ) ;
}
UnsafeRemoveSstFileCommand : : UnsafeRemoveSstFileCommand (
const std : : vector < std : : string > & params ,
const std : : map < std : : string , std : : string > & options ,
const std : : vector < std : : string > & flags )
: LDBCommand ( options , flags , false /* is_read_only */ ,
BuildCmdLineOptions ( { } ) ) {
if ( params . size ( ) ! = 1 ) {
exec_state_ =
LDBCommandExecuteResult : : Failed ( " SST file number must be specified " ) ;
} else {
exec_state_ = LDBCommandExecuteResult : : Failed ( st . ToString ( ) ) ;
char * endptr = nullptr ;
sst_file_number_ = strtoull ( params . at ( 0 ) . c_str ( ) , & endptr , 10 /* base */ ) ;
if ( endptr = = nullptr | | * endptr ! = ' \0 ' ) {
exec_state_ = LDBCommandExecuteResult : : Failed (
" Failed to parse SST file number " + params . at ( 0 ) ) ;
}
}
}
void UnsafeRemoveSstFileCommand : : DoCommand ( ) {
// Instead of opening a `DB` and calling `DeleteFile()`, this implementation
// uses the underlying `VersionSet` API to read and modify the MANIFEST. This
// allows us to use the user's real options, while not having to worry about
// the DB persisting new SST files via flush/compaction or attempting to read/
// compact files which may fail, particularly for the file we intend to remove
// (the user may want to remove an already deleted file from MANIFEST).
PrepareOptions ( ) ;
if ( options_ . db_paths . empty ( ) ) {
// `VersionSet` expects options that have been through `SanitizeOptions()`,
// which would sanitize an empty `db_paths`.
options_ . db_paths . emplace_back ( db_path_ , 0 /* target_size */ ) ;
}
WriteController wc ( options_ . delayed_write_rate ) ;
WriteBufferManager wb ( options_ . db_write_buffer_size ) ;
ImmutableDBOptions immutable_db_options ( options_ ) ;
std : : shared_ptr < Cache > tc (
NewLRUCache ( 1 < < 20 /* capacity */ , options_ . table_cache_numshardbits ) ) ;
EnvOptions sopt ;
VersionSet versions ( db_path_ , & immutable_db_options , sopt , tc . get ( ) , & wb , & wc ,
/*block_cache_tracer=*/ nullptr , /*io_tracer=*/ nullptr ) ;
Status s = versions . Recover ( column_families_ ) ;
ColumnFamilyData * cfd = nullptr ;
int level = - 1 ;
if ( s . ok ( ) ) {
FileMetaData * metadata = nullptr ;
s = versions . GetMetadataForFile ( sst_file_number_ , & level , & metadata , & cfd ) ;
}
if ( s . ok ( ) ) {
VersionEdit edit ;
edit . SetColumnFamily ( cfd - > GetID ( ) ) ;
edit . DeleteFile ( level , sst_file_number_ ) ;
// Use `mutex` to imitate a locked DB mutex when calling `LogAndApply()`.
InstrumentedMutex mutex ;
mutex . Lock ( ) ;
s = versions . LogAndApply ( cfd , * cfd - > GetLatestMutableCFOptions ( ) , & edit ,
& mutex , nullptr /* db_directory */ ,
false /* new_descriptor_log */ ) ;
mutex . Unlock ( ) ;
}
if ( ! s . ok ( ) ) {
exec_state_ = LDBCommandExecuteResult : : Failed (
" failed to unsafely remove SST file: " + s . ToString ( ) ) ;
} else {
exec_state_ = LDBCommandExecuteResult : : Succeed ( " unsafely removed SST file " ) ;
}
}