@ -241,6 +241,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
mem_ ( new MemTable ( internal_comparator_ , mem_rep_factory_ ,
NumberLevels ( ) , options_ ) ) ,
logfile_number_ ( 0 ) ,
super_version_ ( nullptr ) ,
tmp_batch_ ( ) ,
bg_compaction_scheduled_ ( 0 ) ,
bg_flush_scheduled_ ( 0 ) ,
@ -316,6 +317,13 @@ DBImpl::~DBImpl() {
bg_logstats_scheduled_ ) {
bg_cv_ . Wait ( ) ;
}
if ( super_version_ ! = nullptr ) {
bool is_last_reference __attribute__ ( ( unused ) ) ;
is_last_reference = super_version_ - > Unref ( ) ;
assert ( is_last_reference ) ;
super_version_ - > Cleanup ( ) ;
delete super_version_ ;
}
mutex_ . Unlock ( ) ;
if ( db_lock_ ! = nullptr ) {
@ -345,6 +353,13 @@ void DBImpl::TEST_Destroy_DBImpl() {
bg_logstats_scheduled_ ) {
bg_cv_ . Wait ( ) ;
}
if ( super_version_ ! = nullptr ) {
bool is_last_reference __attribute__ ( ( unused ) ) ;
is_last_reference = super_version_ - > Unref ( ) ;
assert ( is_last_reference ) ;
super_version_ - > Cleanup ( ) ;
delete super_version_ ;
}
// Prevent new compactions from occuring.
bg_work_gate_closed_ = true ;
@ -443,6 +458,49 @@ void DBImpl::MaybeDumpStats() {
}
}
// DBImpl::SuperVersion methods
DBImpl : : SuperVersion : : SuperVersion ( const int num_memtables ) {
to_delete . resize ( num_memtables ) ;
}
DBImpl : : SuperVersion : : ~ SuperVersion ( ) {
for ( auto td : to_delete ) {
delete td ;
}
}
DBImpl : : SuperVersion * DBImpl : : SuperVersion : : Ref ( ) {
refs . fetch_add ( 1 , std : : memory_order_relaxed ) ;
return this ;
}
bool DBImpl : : SuperVersion : : Unref ( ) {
assert ( refs > 0 ) ;
// fetch_sub returns the previous value of ref
return refs . fetch_sub ( 1 , std : : memory_order_relaxed ) = = 1 ;
}
void DBImpl : : SuperVersion : : Cleanup ( ) {
assert ( refs . load ( std : : memory_order_relaxed ) = = 0 ) ;
imm . UnrefAll ( & to_delete ) ;
MemTable * m = mem - > Unref ( ) ;
if ( m ! = nullptr ) {
to_delete . push_back ( m ) ;
}
current - > Unref ( ) ;
}
void DBImpl : : SuperVersion : : Init ( MemTable * new_mem , const MemTableList & new_imm ,
Version * new_current ) {
mem = new_mem ;
imm = new_imm ;
current = new_current ;
mem - > Ref ( ) ;
imm . RefAll ( ) ;
current - > Ref ( ) ;
refs . store ( 1 , std : : memory_order_relaxed ) ;
}
// Returns the list of live files in 'sst_live' and the list
// of all files in the filesystem in 'all_files'.
// no_full_scan = true -- never do the full scan using GetChildren()
@ -518,11 +576,6 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
// It is not necessary to hold the mutex when invoking this method.
void DBImpl : : PurgeObsoleteFiles ( DeletionState & state ) {
// free pending memtables
for ( auto m : state . memtables_to_free ) {
delete m ;
}
// check if there is anything to do
if ( ! state . all_files . size ( ) & &
! state . sst_delete_files . size ( ) & &
@ -1188,6 +1241,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
file_number , pending_outputs_ , & deletion_state . memtables_to_free ) ;
if ( s . ok ( ) ) {
InstallSuperVersion ( deletion_state ) ;
if ( madeProgress ) {
* madeProgress = 1 ;
}
@ -1247,11 +1301,17 @@ int DBImpl::FindMinimumEmptyLevelFitting(int level) {
void DBImpl : : ReFitLevel ( int level , int target_level ) {
assert ( level < NumberLevels ( ) ) ;
MutexLock l ( & mutex_ ) ;
SuperVersion * superversion_to_free = nullptr ;
SuperVersion * new_superversion =
new SuperVersion ( options_ . max_write_buffer_number ) ;
mutex_ . Lock ( ) ;
// only allow one thread refitting
if ( refitting_level_ ) {
mutex_ . Unlock ( ) ;
Log ( options_ . info_log , " ReFitLevel: another thread is refitting " ) ;
delete new_superversion ;
return ;
}
refitting_level_ = true ;
@ -1287,6 +1347,8 @@ void DBImpl::ReFitLevel(int level, int target_level) {
edit . DebugString ( ) . data ( ) ) ;
auto status = versions_ - > LogAndApply ( & edit , & mutex_ ) ;
superversion_to_free = InstallSuperVersion ( new_superversion ) ;
new_superversion = nullptr ;
Log ( options_ . info_log , " LogAndApply: %s \n " , status . ToString ( ) . data ( ) ) ;
@ -1298,6 +1360,10 @@ void DBImpl::ReFitLevel(int level, int target_level) {
refitting_level_ = false ;
bg_work_gate_closed_ = false ;
mutex_ . Unlock ( ) ;
delete superversion_to_free ;
delete new_superversion ;
}
int DBImpl : : NumberLevels ( ) {
@ -1671,7 +1737,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
void DBImpl : : BackgroundCallFlush ( ) {
bool madeProgress = false ;
DeletionState deletion_state ( options_ . max_write_buffer_number ) ;
DeletionState deletion_state ( options_ . max_write_buffer_number , true ) ;
assert ( bg_flush_scheduled_ ) ;
MutexLock l ( & mutex_ ) ;
@ -1717,7 +1783,7 @@ void DBImpl::TEST_PurgeObsoleteteWAL() {
void DBImpl : : BackgroundCallCompaction ( ) {
bool madeProgress = false ;
DeletionState deletion_state ( options_ . max_write_buffer_number ) ;
DeletionState deletion_state ( options_ . max_write_buffer_number , true ) ;
MaybeDumpStats ( ) ;
@ -1770,7 +1836,7 @@ void DBImpl::BackgroundCallCompaction() {
}
Status DBImpl : : BackgroundCompaction ( bool * madeProgress ,
DeletionState & deletion_state ) {
DeletionState & deletion_state ) {
* madeProgress = false ;
mutex_ . AssertHeld ( ) ;
@ -1823,6 +1889,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
f - > smallest , f - > largest ,
f - > smallest_seqno , f - > largest_seqno ) ;
status = versions_ - > LogAndApply ( c - > edit ( ) , & mutex_ ) ;
InstallSuperVersion ( deletion_state ) ;
VersionSet : : LevelSummaryStorage tmp ;
Log ( options_ . info_log , " Moved #%lld to level-%d %lld bytes %s: %s \n " ,
static_cast < unsigned long long > ( f - > number ) ,
@ -2484,6 +2551,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
if ( status . ok ( ) ) {
status = InstallCompactionResults ( compact ) ;
InstallSuperVersion ( deletion_state ) ;
}
VersionSet : : LevelSummaryStorage tmp ;
Log ( options_ . info_log ,
@ -2588,6 +2656,44 @@ Status DBImpl::Get(const ReadOptions& options,
return GetImpl ( options , key , value ) ;
}
// DeletionState gets created and destructed outside of the lock -- we
// use this convinently to:
// * malloc one SuperVersion() outside of the lock -- new_superversion
// * delete one SuperVersion() outside of the lock -- superversion_to_free
//
// However, if InstallSuperVersion() gets called twice with the same,
// deletion_state, we can't reuse the SuperVersion() that got malloced because
// first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free
void DBImpl : : InstallSuperVersion ( DeletionState & deletion_state ) {
// if new_superversion == nullptr, it means somebody already used it
SuperVersion * new_superversion =
( deletion_state . new_superversion ! = nullptr ) ?
deletion_state . new_superversion : new SuperVersion ( ) ;
SuperVersion * old_superversion = InstallSuperVersion ( new_superversion ) ;
deletion_state . new_superversion = nullptr ;
if ( deletion_state . superversion_to_free ! = nullptr ) {
// somebody already put it there
delete old_superversion ;
} else {
deletion_state . superversion_to_free = old_superversion ;
}
}
DBImpl : : SuperVersion * DBImpl : : InstallSuperVersion (
SuperVersion * new_superversion ) {
mutex_ . AssertHeld ( ) ;
new_superversion - > Init ( mem_ , imm_ , versions_ - > current ( ) ) ;
SuperVersion * old_superversion = super_version_ ;
super_version_ = new_superversion ;
if ( old_superversion ! = nullptr & & old_superversion - > Unref ( ) ) {
old_superversion - > Cleanup ( ) ;
return old_superversion ; // will let caller delete outside of mutex
}
return nullptr ;
}
Status DBImpl : : GetImpl ( const ReadOptions & options ,
const Slice & key ,
std : : string * value ,
@ -2596,27 +2702,20 @@ Status DBImpl::GetImpl(const ReadOptions& options,
StopWatch sw ( env_ , options_ . statistics . get ( ) , DB_GET ) ;
SequenceNumber snapshot ;
std : : vector < MemTable * > to_delete ;
mutex_ . Lock ( ) ;
if ( options . snapshot ! = nullptr ) {
snapshot = reinterpret_cast < const SnapshotImpl * > ( options . snapshot ) - > number_ ;
} else {
snapshot = versions_ - > LastSequence ( ) ;
}
MemTable * mem = mem_ ;
MemTableList imm = imm_ ;
Version * current = versions_ - > current ( ) ;
mem - > Ref ( ) ;
imm . RefAll ( ) ;
current - > Ref ( ) ;
// Unlock while reading from files and memtables
// This can be replaced by using atomics and spinlock instead of big mutex
mutex_ . Lock ( ) ;
SuperVersion * get_version = super_version_ - > Ref ( ) ;
mutex_ . Unlock ( ) ;
bool have_stat_update = false ;
Version : : GetStats stats ;
// Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context ;
@ -2624,32 +2723,41 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey ( key , snapshot ) ;
if ( mem - > Get ( lkey , value , & s , merge_context , options_ ) ) {
if ( get_version - > mem - > Get ( lkey , value , & s , merge_context , options_ ) ) {
// Done
RecordTick ( options_ . statistics . get ( ) , MEMTABLE_HIT ) ;
} else if ( imm . Get ( lkey , value , & s , merge_context , options_ ) ) {
} else if ( get_version - > imm . Get ( lkey , value , & s , merge_context , options_ ) ) {
// Done
RecordTick ( options_ . statistics . get ( ) , MEMTABLE_HIT ) ;
} else {
current - > Get ( options , lkey , value , & s , & merge_context , & stats ,
options_ , value_found ) ;
get_version - > current - > Get ( options , lkey , value , & s , & merge_context , & stats ,
options_ , value_found ) ;
have_stat_update = true ;
RecordTick ( options_ . statistics . get ( ) , MEMTABLE_MISS ) ;
}
mutex_ . Lock ( ) ;
if ( ! options_ . disable_seek_compaction & &
have_stat_update & & current - > UpdateStats ( stats ) ) {
MaybeScheduleFlushOrCompaction ( ) ;
bool delete_get_version = false ;
if ( ! options_ . disable_seek_compaction & & have_stat_update ) {
mutex_ . Lock ( ) ;
if ( get_version - > current - > UpdateStats ( stats ) ) {
MaybeScheduleFlushOrCompaction ( ) ;
}
if ( get_version - > Unref ( ) ) {
get_version - > Cleanup ( ) ;
delete_get_version = true ;
}
mutex_ . Unlock ( ) ;
} else {
if ( get_version - > Unref ( ) ) {
mutex_ . Lock ( ) ;
get_version - > Cleanup ( ) ;
mutex_ . Unlock ( ) ;
delete_get_version = true ;
}
}
if ( delete_get_version ) {
delete get_version ;
}
MemTable * m = mem - > Unref ( ) ;
imm . UnrefAll ( & to_delete ) ;
current - > Unref ( ) ;
mutex_ . Unlock ( ) ;
// free up all obsolete memtables outside the mutex
delete m ;
for ( MemTable * v : to_delete ) delete v ;
// Note, tickers are atomic now - no lock protection needed any more.
RecordTick ( options_ . statistics . get ( ) , NUMBER_KEYS_READ ) ;
@ -2813,7 +2921,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
w . done = false ;
StopWatch sw ( env_ , options_ . statistics . get ( ) , DB_WRITE ) ;
MutexLock l ( & mutex_ ) ;
mutex_ . Lock ( ) ;
writers_ . push_back ( & w ) ;
while ( ! w . done & & & w ! = writers_ . front ( ) ) {
w . cv . Wait ( ) ;
@ -2824,6 +2932,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
if ( w . done ) {
mutex_ . Unlock ( ) ;
RecordTick ( options_ . statistics . get ( ) , WRITE_DONE_BY_OTHER , 1 ) ;
return w . status ;
} else {
@ -2831,7 +2940,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
// May temporarily unlock and wait.
Status status = MakeRoomForWrite ( my_batch = = nullptr ) ;
SuperVersion * superversion_to_free = nullptr ;
Status status = MakeRoomForWrite ( my_batch = = nullptr , & superversion_to_free ) ;
uint64_t last_sequence = versions_ - > LastSequence ( ) ;
Writer * last_writer = & w ;
if ( status . ok ( ) & & my_batch ! = nullptr ) { // nullptr batch is for compactions
@ -2919,6 +3029,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if ( ! writers_ . empty ( ) ) {
writers_ . front ( ) - > cv . Signal ( ) ;
}
mutex_ . Unlock ( ) ;
delete superversion_to_free ;
return status ;
}
@ -3011,7 +3123,8 @@ uint64_t DBImpl::SlowdownAmount(int n, int top, int bottom) {
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl : : MakeRoomForWrite ( bool force ) {
Status DBImpl : : MakeRoomForWrite ( bool force ,
SuperVersion * * superversion_to_free ) {
mutex_ . AssertHeld ( ) ;
assert ( ! writers_ . empty ( ) ) ;
bool allow_delay = ! force ;
@ -3020,6 +3133,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
uint64_t rate_limit_delay_millis = 0 ;
Status s ;
double score ;
* superversion_to_free = nullptr ;
while ( true ) {
if ( ! bg_error_ . ok ( ) ) {
@ -3146,6 +3260,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// Do this without holding the dbmutex lock.
assert ( versions_ - > PrevLogNumber ( ) = = 0 ) ;
uint64_t new_log_number = versions_ - > NewFileNumber ( ) ;
SuperVersion * new_superversion = nullptr ;
mutex_ . Unlock ( ) ;
{
EnvOptions soptions ( storage_options_ ) ;
@ -3162,6 +3277,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
lfile - > SetPreallocationBlockSize ( 1.1 * options_ . write_buffer_size ) ;
memtmp = new MemTable (
internal_comparator_ , mem_rep_factory_ , NumberLevels ( ) , options_ ) ;
new_superversion = new SuperVersion ( options_ . max_write_buffer_number ) ;
}
}
mutex_ . Lock ( ) ;
@ -3186,6 +3302,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
mem_ - > SetLogNumber ( logfile_number_ ) ;
force = false ; // Do not force another compaction if have room
MaybeScheduleFlushOrCompaction ( ) ;
* superversion_to_free = InstallSuperVersion ( new_superversion ) ;
}
}
return s ;
@ -3541,7 +3658,7 @@ Status DBImpl::DeleteFile(std::string name) {
FileMetaData metadata ;
int maxlevel = NumberLevels ( ) ;
VersionEdit edit ( maxlevel ) ;
DeletionState deletion_state ;
DeletionState deletion_state ( 0 , true ) ;
{
MutexLock l ( & mutex_ ) ;
status = versions_ - > GetMetadataForFile ( number , & level , & metadata ) ;
@ -3571,14 +3688,14 @@ Status DBImpl::DeleteFile(std::string name) {
}
edit . DeleteFile ( level , number ) ;
status = versions_ - > LogAndApply ( & edit , & mutex_ ) ;
if ( status . ok ( ) ) {
InstallSuperVersion ( deletion_state ) ;
}
FindObsoleteFiles ( deletion_state , false ) ;
} // lock released here
LogFlush ( options_ . info_log ) ;
if ( status . ok ( ) ) {
// remove files outside the db-lock
PurgeObsoleteFiles ( deletion_state ) ;
}
// remove files outside the db-lock
PurgeObsoleteFiles ( deletion_state ) ;
return status ;
}
@ -3678,6 +3795,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
s = impl - > versions_ - > LogAndApply ( & edit , & impl - > mutex_ ) ;
}
if ( s . ok ( ) ) {
delete impl - > InstallSuperVersion ( new DBImpl : : SuperVersion ( ) ) ;
impl - > mem_ - > SetLogNumber ( impl - > logfile_number_ ) ;
impl - > DeleteObsoleteFiles ( ) ;
impl - > MaybeScheduleFlushOrCompaction ( ) ;