@ -3620,10 +3620,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
Status status ;
// refcounting cfd in iteration
bool dead_cfd = false ;
autovector < SuperVersion * > superversions_to_free ;
autovector < log : : Writer * > logs_to_free ;
for ( auto cfd : * versions_ - > GetColumnFamilySet ( ) ) {
cfd - > Ref ( ) ;
// May temporarily unlock and wait.
status = MakeRoomForWrite ( cfd , my_batch = = nullptr ) ;
status = MakeRoomForWrite ( cfd , my_batch = = nullptr , & superversions_to_free ,
& logs_to_free ) ;
if ( cfd - > Unref ( ) ) {
dead_cfd = true ;
}
@ -3736,6 +3739,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
writers_ . front ( ) - > cv . Signal ( ) ;
}
mutex_ . Unlock ( ) ;
for ( auto & sv : superversions_to_free ) {
delete sv ;
}
for ( auto & log : logs_to_free ) {
delete log ;
}
PERF_TIMER_STOP ( write_pre_and_post_process_time ) ;
return status ;
}
@ -3822,7 +3833,10 @@ uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) {
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl : : MakeRoomForWrite ( ColumnFamilyData * cfd , bool force ) {
Status DBImpl : : MakeRoomForWrite (
ColumnFamilyData * cfd , bool force ,
autovector < SuperVersion * > * superversions_to_free ,
autovector < log : : Writer * > * logs_to_free ) {
mutex_ . AssertHeld ( ) ;
assert ( ! writers_ . empty ( ) ) ;
bool allow_delay = ! force ;
@ -3992,8 +4006,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
if ( creating_new_log ) {
logfile_number_ = new_log_number ;
assert ( new_log ! = nullptr ) ;
// TODO(icanadi) delete outside of mutex
delete log_ . release ( ) ;
logs_to_free - > push_back ( log_ . release ( ) ) ;
log_ . reset ( new_log ) ;
log_empty_ = true ;
alive_log_files_ . push_back ( logfile_number_ ) ;
@ -4019,8 +4032,8 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
cfd - > GetName ( ) . c_str ( ) , ( unsigned long ) logfile_number_ ) ;
force = false ; // Do not force another compaction if have room
MaybeScheduleFlushOrCompaction ( ) ;
// TODO(icanadi) delete outside of mutex
delete cfd - > InstallSuperVersion ( new_superversion , & mutex_ ) ;
superversions_to_free - > push_back (
cfd - > InstallSuperVersion ( new_superversion , & mutex_ ) ) ;
}
}
return s ;