@ -1236,9 +1236,12 @@ Status DBImpl::Recover(
SetTickerCount ( stats_ , SEQUENCE_NUMBER , versions_ - > LastSequence ( ) ) ;
}
// Initial value
max_total_in_memory_state_ = 0 ;
for ( auto cfd : * versions_ - > GetColumnFamilySet ( ) ) {
max_total_in_memory_state_ + = cfd - > options ( ) - > write_buffer_size *
cfd - > options ( ) - > max_write_buffer_number ;
auto * mutable_cf_options = cfd - > GetLatestMutableCFOptions ( ) ;
max_total_in_memory_state_ + = mutable_cf_options - > write_buffer_size *
mutable_cf_options - > max_write_buffer_number ;
}
return s ;
@ -1803,8 +1806,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
status = versions_ - > LogAndApply ( cfd ,
mutable_cf_options , & edit , & mutex_ , db_directory_ . get ( ) ) ;
superversion_to_free = cfd - > InstallSuperVersion (
new_superversion , & mutex_ , mutable_cf_options ) ;
superversion_to_free = InstallSuperVersion (
cfd , new_superversion , mutable_cf_options ) ;
new_superversion = nullptr ;
Log ( db_options_ . info_log , " [%s] LogAndApply: %s \n " , cfd - > GetName ( ) . c_str ( ) ,
@ -1840,10 +1843,10 @@ int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
return cfh - > cfd ( ) - > options ( ) - > level0_stop_writes_trigger ;
}
Status DBImpl : : Flush ( const FlushOptions & options ,
Status DBImpl : : Flush ( const FlushOptions & flush_ options,
ColumnFamilyHandle * column_family ) {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
return FlushMemTable ( cfh - > cfd ( ) , options ) ;
return FlushMemTable ( cfh - > cfd ( ) , flush_ options) ;
}
SequenceNumber DBImpl : : GetLatestSequenceNumber ( ) const {
@ -1933,7 +1936,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
}
Status DBImpl : : FlushMemTable ( ColumnFamilyData * cfd ,
const FlushOptions & options ) {
const FlushOptions & flush_ options) {
Status s ;
{
WriteContext context ;
@ -1957,7 +1960,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
write_thread_ . ExitWriteThread ( & w , & w , s ) ;
}
if ( s . ok ( ) & & options . wait ) {
if ( s . ok ( ) & & flush_ options. wait ) {
// Wait until the compaction completes
s = WaitForFlushMemTable ( cfd ) ;
}
@ -3441,7 +3444,7 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
}
} // namespace
Iterator * DBImpl : : NewInternalIterator ( const ReadOptions & options ,
Iterator * DBImpl : : NewInternalIterator ( const ReadOptions & read_ options,
ColumnFamilyData * cfd ,
SuperVersion * super_version ,
Arena * arena ) {
@ -3451,11 +3454,11 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
MergeIteratorBuilder merge_iter_builder ( & cfd - > internal_comparator ( ) , arena ) ;
// Collect iterator for mutable mem
merge_iter_builder . AddIterator (
super_version - > mem - > NewIterator ( options , arena ) ) ;
super_version - > mem - > NewIterator ( read_ options, arena ) ) ;
// Collect all needed child iterators for immutable memtables
super_version - > imm - > AddIterators ( options , & merge_iter_builder ) ;
super_version - > imm - > AddIterators ( read_ options, & merge_iter_builder ) ;
// Collect iterators for files in L0 - Ln
super_version - > current - > AddIterators ( options , env_options_ ,
super_version - > current - > AddIterators ( read_ options, env_options_ ,
& merge_iter_builder ) ;
internal_iter = merge_iter_builder . Finish ( ) ;
IterState * cleanup = new IterState ( this , & mutex_ , super_version ) ;
@ -3468,10 +3471,10 @@ ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
return default_cf_handle_ ;
}
Status DBImpl : : Get ( const ReadOptions & options ,
Status DBImpl : : Get ( const ReadOptions & read_ options,
ColumnFamilyHandle * column_family , const Slice & key ,
std : : string * value ) {
return GetImpl ( options , column_family , key , value ) ;
return GetImpl ( read_ options, column_family , key , value ) ;
}
// DeletionState gets created and destructed outside of the lock -- we
@ -3488,17 +3491,39 @@ void DBImpl::InstallSuperVersion(
ColumnFamilyData * cfd , DeletionState & deletion_state ,
const MutableCFOptions & mutable_cf_options ) {
mutex_ . AssertHeld ( ) ;
// if new_superversion == nullptr, it means somebody already used it
SuperVersion * new_superversion =
( deletion_state . new_superversion ! = nullptr ) ?
deletion_state . new_superversion : new SuperVersion ( ) ;
SuperVersion * old_superversion =
cfd - > InstallSuperVersion ( new_superversion , & mutex_ , mutable_cf_options ) ;
InstallSuperVersion ( cfd , deletion_state . new_superversion ,
mutable_cf_options ) ;
deletion_state . new_superversion = nullptr ;
deletion_state . superversions_to_free . push_back ( old_superversion ) ;
}
Status DBImpl : : GetImpl ( const ReadOptions & options ,
SuperVersion * DBImpl : : InstallSuperVersion (
ColumnFamilyData * cfd , SuperVersion * new_sv ,
const MutableCFOptions & mutable_cf_options ) {
mutex_ . AssertHeld ( ) ;
auto * old = cfd - > InstallSuperVersion (
new_sv ? new_sv : new SuperVersion ( ) , & mutex_ , mutable_cf_options ) ;
// We want to schedule potential flush or compactions since new options may
// have been picked up in this new version. New options may cause flush
// compaction trigger condition to change.
MaybeScheduleFlushOrCompaction ( ) ;
// Update max_total_in_memory_state_
auto old_memtable_size = 0 ;
if ( old ) {
old_memtable_size = old - > mutable_cf_options . write_buffer_size *
old - > mutable_cf_options . max_write_buffer_number ;
}
max_total_in_memory_state_ =
max_total_in_memory_state_ - old_memtable_size +
mutable_cf_options . write_buffer_size *
mutable_cf_options . max_write_buffer_number ;
return old ;
}
Status DBImpl : : GetImpl ( const ReadOptions & read_options ,
ColumnFamilyHandle * column_family , const Slice & key ,
std : : string * value , bool * value_found ) {
StopWatch sw ( env_ , stats_ , DB_GET ) ;
@ -3508,8 +3533,9 @@ Status DBImpl::GetImpl(const ReadOptions& options,
auto cfd = cfh - > cfd ( ) ;
SequenceNumber snapshot ;
if ( options . snapshot ! = nullptr ) {
snapshot = reinterpret_cast < const SnapshotImpl * > ( options . snapshot ) - > number_ ;
if ( read_options . snapshot ! = nullptr ) {
snapshot = reinterpret_cast < const SnapshotImpl * > (
read_options . snapshot ) - > number_ ;
} else {
snapshot = versions_ - > LastSequence ( ) ;
}
@ -3535,7 +3561,8 @@ Status DBImpl::GetImpl(const ReadOptions& options,
RecordTick ( stats_ , MEMTABLE_HIT ) ;
} else {
PERF_TIMER_GUARD ( get_from_output_files_time ) ;
sv - > current - > Get ( options , lkey , value , & s , & merge_context , value_found ) ;
sv - > current - > Get ( read_options , lkey , value , & s , & merge_context ,
value_found ) ;
RecordTick ( stats_ , MEMTABLE_MISS ) ;
}
@ -3551,7 +3578,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
}
std : : vector < Status > DBImpl : : MultiGet (
const ReadOptions & options ,
const ReadOptions & read_ options,
const std : : vector < ColumnFamilyHandle * > & column_family ,
const std : : vector < Slice > & keys , std : : vector < std : : string > * values ) {
@ -3577,8 +3604,9 @@ std::vector<Status> DBImpl::MultiGet(
}
mutex_ . Lock ( ) ;
if ( options . snapshot ! = nullptr ) {
snapshot = reinterpret_cast < const SnapshotImpl * > ( options . snapshot ) - > number_ ;
if ( read_options . snapshot ! = nullptr ) {
snapshot = reinterpret_cast < const SnapshotImpl * > (
read_options . snapshot ) - > number_ ;
} else {
snapshot = versions_ - > LastSequence ( ) ;
}
@ -3621,7 +3649,8 @@ std::vector<Status> DBImpl::MultiGet(
// Done
} else {
PERF_TIMER_GUARD ( get_from_output_files_time ) ;
super_version - > current - > Get ( options , lkey , value , & s , & merge_context ) ;
super_version - > current - > Get ( read_options , lkey , value , & s ,
& merge_context ) ;
}
if ( s . ok ( ) ) {
@ -3659,7 +3688,7 @@ std::vector<Status> DBImpl::MultiGet(
return stat_list ;
}
Status DBImpl : : CreateColumnFamily ( const ColumnFamilyOptions & options ,
Status DBImpl : : CreateColumnFamily ( const ColumnFamilyOptions & cf_ options,
const std : : string & column_family_name ,
ColumnFamilyHandle * * handle ) {
* handle = nullptr ;
@ -3674,26 +3703,23 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
uint32_t new_id = versions_ - > GetColumnFamilySet ( ) - > GetNextColumnFamilyID ( ) ;
edit . SetColumnFamily ( new_id ) ;
edit . SetLogNumber ( logfile_number_ ) ;
edit . SetComparatorName ( options . comparator - > Name ( ) ) ;
edit . SetComparatorName ( cf_ options. comparator - > Name ( ) ) ;
// LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object
Options opt ( db_options_ , options ) ;
Options opt ( db_options_ , cf_ options) ;
Status s = versions_ - > LogAndApply ( nullptr ,
MutableCFOptions ( opt , ImmutableCFOptions ( opt ) ) ,
& edit , & mutex_ , db_directory_ . get ( ) , false , & options ) ;
& edit , & mutex_ , db_directory_ . get ( ) , false , & cf_ options) ;
if ( s . ok ( ) ) {
single_column_family_mode_ = false ;
auto cfd =
versions_ - > GetColumnFamilySet ( ) - > GetColumnFamily ( column_family_name ) ;
assert ( cfd ! = nullptr ) ;
delete cfd - > InstallSuperVersion ( new SuperVersion ( ) , & mutex_ ,
* cfd - > GetLatestMutableCFOptions ( ) ) ;
delete InstallSuperVersion ( cfd , nullptr , * cfd - > GetLatestMutableCFOptions ( ) ) ;
* handle = new ColumnFamilyHandleImpl ( cfd , this , & mutex_ ) ;
Log ( db_options_ . info_log , " Created column family [%s] (ID %u) " ,
column_family_name . c_str ( ) , ( unsigned ) cfd - > GetID ( ) ) ;
max_total_in_memory_state_ + = cfd - > options ( ) - > write_buffer_size *
cfd - > options ( ) - > max_write_buffer_number ;
} else {
Log ( db_options_ . info_log , " Creating column family [%s] FAILED -- %s " ,
column_family_name . c_str ( ) , s . ToString ( ) . c_str ( ) ) ;
@ -3712,7 +3738,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
edit . DropColumnFamily ( ) ;
edit . SetColumnFamily ( cfd - > GetID ( ) ) ;
Status s ;
{
MutexLock l ( & mutex_ ) ;
@ -3732,8 +3757,9 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
if ( s . ok ( ) ) {
assert ( cfd - > IsDropped ( ) ) ;
max_total_in_memory_state_ - = cfd - > options ( ) - > write_buffer_size *
cfd - > options ( ) - > max_write_buffer_number ;
auto * mutable_cf_options = cfd - > GetLatestMutableCFOptions ( ) ;
max_total_in_memory_state_ - = mutable_cf_options - > write_buffer_size *
mutable_cf_options - > max_write_buffer_number ;
Log ( db_options_ . info_log , " Dropped column family with id %u \n " ,
cfd - > GetID ( ) ) ;
} else {
@ -3745,14 +3771,14 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
return s ;
}
bool DBImpl : : KeyMayExist ( const ReadOptions & options ,
bool DBImpl : : KeyMayExist ( const ReadOptions & read_ options,
ColumnFamilyHandle * column_family , const Slice & key ,
std : : string * value , bool * value_found ) {
if ( value_found ! = nullptr ) {
// falsify later if key-may-exist but can't fetch value
* value_found = true ;
}
ReadOptions roptions = options ;
ReadOptions roptions = read_ options;
roptions . read_tier = kBlockCacheTier ; // read from block cache only
auto s = GetImpl ( roptions , column_family , key , value , value_found ) ;
@ -3941,23 +3967,23 @@ Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
}
}
Status DBImpl : : Delete ( const WriteOptions & options ,
Status DBImpl : : Delete ( const WriteOptions & write_ options,
ColumnFamilyHandle * column_family , const Slice & key ) {
return DB : : Delete ( options , column_family , key ) ;
return DB : : Delete ( write_ options, column_family , key ) ;
}
Status DBImpl : : Write ( const WriteOptions & options , WriteBatch * my_batch ) {
Status DBImpl : : Write ( const WriteOptions & write_ options, WriteBatch * my_batch ) {
if ( my_batch = = nullptr ) {
return Status : : Corruption ( " Batch is nullptr! " ) ;
}
PERF_TIMER_GUARD ( write_pre_and_post_process_time ) ;
WriteThread : : Writer w ( & mutex_ ) ;
w . batch = my_batch ;
w . sync = options . sync ;
w . disableWAL = options . disableWAL ;
w . sync = write_ options. sync ;
w . disableWAL = write_ options. disableWAL ;
w . in_batch_group = false ;
w . done = false ;
w . timeout_hint_us = options . timeout_hint_us ;
w . timeout_hint_us = write_ options. timeout_hint_us ;
uint64_t expiration_time = 0 ;
bool has_timeout = false ;
@ -3968,7 +3994,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
has_timeout = true ;
}
if ( ! options . disableWAL ) {
if ( ! write_ options. disableWAL ) {
RecordTick ( stats_ , WRITE_WITH_WAL ) ;
default_cf_internal_stats_ - > AddDBStats ( InternalStats : : WRITE_WITH_WAL , 1 ) ;
}
@ -4074,13 +4100,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// Record statistics
RecordTick ( stats_ , NUMBER_KEYS_WRITTEN , my_batch_count ) ;
RecordTick ( stats_ , BYTES_WRITTEN , WriteBatchInternal : : ByteSize ( updates ) ) ;
if ( options . disableWAL ) {
if ( write_ options. disableWAL ) {
flush_on_destroy_ = true ;
}
PERF_TIMER_STOP ( write_pre_and_post_process_time ) ;
uint64_t log_size = 0 ;
if ( ! options . disableWAL ) {
if ( ! write_ options. disableWAL ) {
PERF_TIMER_GUARD ( write_wal_time ) ;
Slice log_entry = WriteBatchInternal : : Contents ( updates ) ;
status = log_ - > AddRecord ( log_entry ) ;
@ -4089,7 +4115,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
log_empty_ = false ;
log_size = log_entry . size ( ) ;
RecordTick ( stats_ , WAL_FILE_BYTES , log_size ) ;
if ( status . ok ( ) & & options . sync ) {
if ( status . ok ( ) & & write_ options. sync ) {
RecordTick ( stats_ , WAL_FILE_SYNCED ) ;
StopWatch sw ( env_ , stats_ , WAL_FILE_SYNC_MICROS ) ;
if ( db_options_ . use_fsync ) {
@ -4104,7 +4130,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
status = WriteBatchInternal : : InsertInto (
updates , column_family_memtables_ . get ( ) ,
options . ignore_missing_column_families , 0 , this , false ) ;
write_ options. ignore_missing_column_families , 0 , this , false ) ;
// A non-OK status here indicates iteration failure (either in-memory
// writebatch corruption (very bad), or the client specified invalid
// column family). This will later on trigger bg_error_.
@ -4123,7 +4149,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// internal stats
default_cf_internal_stats_ - > AddDBStats (
InternalStats : : BYTES_WRITTEN , batch_size ) ;
if ( ! options . disableWAL ) {
if ( ! write_ options. disableWAL ) {
default_cf_internal_stats_ - > AddDBStats (
InternalStats : : WAL_FILE_SYNCED , 1 ) ;
default_cf_internal_stats_ - > AddDBStats (
@ -4221,8 +4247,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
if ( s . ok ( ) ) {
// Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution.
lfile - > SetPreallocationBlockSize ( 1.1 *
cfd - > options ( ) - > write_buffer_size ) ;
lfile - > SetPreallocationBlockSize (
1.1 * mutable_cf_options . write_buffer_size ) ;
new_log = new log : : Writer ( std : : move ( lfile ) ) ;
}
}
@ -4270,7 +4296,7 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
new_mem - > Ref ( ) ;
cfd - > SetMemtable ( new_mem ) ;
context - > superversions_to_free_ . push_back (
cfd - > InstallSuperVersion ( new_superversion , & mutex_ , mutable_cf_options ) ) ;
InstallSuperVersion ( cfd , new_superversion , mutable_cf_options ) ) ;
return s ;
}
@ -4616,7 +4642,7 @@ Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
}
// Default implementation -- returns not supported status
Status DB : : CreateColumnFamily ( const ColumnFamilyOptions & options ,
Status DB : : CreateColumnFamily ( const ColumnFamilyOptions & cf_ options,
const std : : string & column_family_name ,
ColumnFamilyHandle * * handle ) {
return Status : : NotSupported ( " " ) ;
@ -4739,8 +4765,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
}
if ( s . ok ( ) ) {
for ( auto cfd : * impl - > versions_ - > GetColumnFamilySet ( ) ) {
delete cfd - > InstallSuperVersion ( new SuperVersion ( ) , & impl - > mutex_ ,
* cfd - > GetLatestMutableCFOptions ( ) ) ;
delete impl - > InstallSuperVersion (
cfd , nullptr , * cfd - > GetLatestMutableCFOptions ( ) ) ;
}
impl - > alive_log_files_ . push_back (
DBImpl : : LogFileNumberSize ( impl - > logfile_number_ ) ) ;