@ -2645,11 +2645,17 @@ struct VersionSet::ManifestWriter {
bool done ;
InstrumentedCondVar cv ;
ColumnFamilyData * cfd ;
const MutableCFOptions mutable_cf_options ;
const autovector < VersionEdit * > & edit_list ;
explicit ManifestWriter ( InstrumentedMutex * mu , ColumnFamilyData * _cfd ,
const MutableCFOptions & cf_options ,
const autovector < VersionEdit * > & e )
: done ( false ) , cv ( mu ) , cfd ( _cfd ) , edit_list ( e ) { }
: done ( false ) ,
cv ( mu ) ,
cfd ( _cfd ) ,
mutable_cf_options ( cf_options ) ,
edit_list ( e ) { }
} ;
VersionSet : : VersionSet ( const std : : string & dbname ,
@ -2724,90 +2730,78 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
v - > next_ - > prev_ = v ;
}
Status VersionSet : : LogAndApply ( ColumnFamilyData * column_family_data ,
const MutableCFOptions & mutable_cf_options ,
const autovector < VersionEdit * > & edit_list ,
InstrumentedMutex * mu , Directory * db_directory ,
bool new_descriptor_log ,
const ColumnFamilyOptions * new_cf_options ) {
mu - > AssertHeld ( ) ;
// num of edits
auto num_edits = edit_list . size ( ) ;
if ( num_edits = = 0 ) {
return Status : : OK ( ) ;
} else if ( num_edits > 1 ) {
# ifndef NDEBUG
// no group commits for column family add or drop
for ( auto & edit : edit_list ) {
assert ( ! edit - > IsColumnFamilyManipulation ( ) ) ;
}
# endif
}
// column_family_data can be nullptr only if this is column_family_add.
// in that case, we also need to specify ColumnFamilyOptions
if ( column_family_data = = nullptr ) {
assert ( num_edits = = 1 ) ;
assert ( edit_list [ 0 ] - > is_column_family_add_ ) ;
assert ( new_cf_options ! = nullptr ) ;
}
Status VersionSet : : ProcessManifestWrites (
std : : deque < ManifestWriter > & writers , InstrumentedMutex * mu ,
Directory * db_directory , bool new_descriptor_log ,
const ColumnFamilyOptions * new_cf_options ) {
assert ( ! writers . empty ( ) ) ;
ManifestWriter & first_writer = writers . front ( ) ;
ManifestWriter * last_writer = & first_writer ;
// queue our request
ManifestWriter w ( mu , column_family_data , edit_list ) ;
manifest_writers_ . push_back ( & w ) ;
while ( ! w . done & & & w ! = manifest_writers_ . front ( ) ) {
w . cv . Wait ( ) ;
}
if ( w . done ) {
return w . status ;
}
if ( column_family_data ! = nullptr & & column_family_data - > IsDropped ( ) ) {
// if column family is dropped by the time we get here, no need to write
// anything to the manifest
manifest_writers_ . pop_front ( ) ;
// Notify new head of write queue
if ( ! manifest_writers_ . empty ( ) ) {
manifest_writers_ . front ( ) - > cv . Signal ( ) ;
}
// we steal this code to also inform about cf-drop
return Status : : ShutdownInProgress ( ) ;
}
assert ( ! manifest_writers_ . empty ( ) ) ;
assert ( manifest_writers_ . front ( ) = = & first_writer ) ;
autovector < VersionEdit * > batch_edits ;
Version * v = nullptr ;
std : : unique_ptr < BaseReferencedVersionBuilder > builder_guard ( nullptr ) ;
// process all requests in the queue
ManifestWriter * last_writer = & w ;
assert ( ! manifest_writers_ . empty ( ) ) ;
assert ( manifest_writers_ . front ( ) = = & w ) ;
if ( w . edit_list . front ( ) - > IsColumnFamilyManipulation ( ) ) {
// no group commits for column family add or drop
LogAndApplyCFHelper ( w . edit_list . front ( ) ) ;
batch_edits . push_back ( w . edit_list . front ( ) ) ;
autovector < Version * > versions ;
autovector < const MutableCFOptions * > mutable_cf_options_ptrs ;
std : : vector < std : : unique_ptr < BaseReferencedVersionBuilder > > builder_guards ;
if ( first_writer . edit_list . front ( ) - > IsColumnFamilyManipulation ( ) ) {
// No group commits for column family add or drop
LogAndApplyCFHelper ( first_writer . edit_list . front ( ) ) ;
batch_edits . push_back ( first_writer . edit_list . front ( ) ) ;
} else {
v = new Version ( column_family_data , this , env_options_ , mutable_cf_options ,
current_version_number_ + + ) ;
builder_guard . reset ( new BaseReferencedVersionBuilder ( column_family_data ) ) ;
auto * builder = builder_guard - > version_builder ( ) ;
for ( const auto & writer : manifest_writers_ ) {
if ( writer - > edit_list . front ( ) - > IsColumnFamilyManipulation ( ) | |
writer - > cfd - > GetID ( ) ! = column_family_data - > GetID ( ) ) {
auto it = manifest_writers_ . cbegin ( ) ;
while ( it ! = manifest_writers_ . cend ( ) ) {
if ( ( * it ) - > edit_list . front ( ) - > IsColumnFamilyManipulation ( ) ) {
// no group commits for column family add or drop
// also, group commits across column families are not supported
break ;
}
last_writer = writer ;
for ( const auto & edit : writer - > edit_list ) {
LogAndApplyHelper ( column_family_data , builder , v , edit , mu ) ;
batch_edits . push_back ( edit ) ;
last_writer = * ( it + + ) ;
assert ( last_writer ! = nullptr ) ;
if ( last_writer - > cfd ! = nullptr & & last_writer - > cfd - > IsDropped ( ) ) {
continue ;
}
// We do a linear search on versions because versions is small.
// TODO(yanqin) maybe consider unordered_map
Version * version = nullptr ;
VersionBuilder * builder = nullptr ;
for ( int i = 0 ; i ! = static_cast < int > ( versions . size ( ) ) ; + + i ) {
uint32_t cf_id = last_writer - > cfd - > GetID ( ) ;
if ( versions [ i ] - > cfd ( ) - > GetID ( ) = = cf_id ) {
version = versions [ i ] ;
assert ( ! builder_guards . empty ( ) & &
builder_guards . size ( ) = = versions . size ( ) ) ;
builder = builder_guards [ i ] - > version_builder ( ) ;
TEST_SYNC_POINT_CALLBACK (
" VersionSet::ProcessManifestWrites:SameColumnFamily " , & cf_id ) ;
break ;
}
}
if ( version = = nullptr ) {
version = new Version ( last_writer - > cfd , this , env_options_ ,
last_writer - > mutable_cf_options ,
current_version_number_ + + ) ;
versions . push_back ( version ) ;
mutable_cf_options_ptrs . push_back ( & last_writer - > mutable_cf_options ) ;
builder_guards . emplace_back (
new BaseReferencedVersionBuilder ( last_writer - > cfd ) ) ;
builder = builder_guards . back ( ) - > version_builder ( ) ;
}
assert ( builder ! = nullptr ) ; // make checker happy
for ( const auto & e : last_writer - > edit_list ) {
LogAndApplyHelper ( last_writer - > cfd , builder , version , e , mu ) ;
batch_edits . push_back ( e ) ;
}
}
builder - > SaveTo ( v - > storage_info ( ) ) ;
for ( int i = 0 ; i < static_cast < int > ( versions . size ( ) ) ; + + i ) {
assert ( ! builder_guards . empty ( ) & &
builder_guards . size ( ) = = versions . size ( ) ) ;
auto * builder = builder_guards [ i ] - > version_builder ( ) ;
builder - > SaveTo ( versions [ i ] - > storage_info ( ) ) ;
}
}
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
uint64_t new_manifest_file_size = 0 ;
Status s ;
@ -2822,39 +2816,39 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
}
if ( new_descriptor_log ) {
// if we're writing out new snapshot make sure to persist max column family
// if we are writing out new snapshot make sure to persist max column
// family.
if ( column_family_set_ - > GetMaxColumnFamily ( ) > 0 ) {
w . edit_list . front ( ) - > SetMaxColumnFamily (
first_ writer . edit_list . front ( ) - > SetMaxColumnFamily (
column_family_set_ - > GetMaxColumnFamily ( ) ) ;
}
}
// Unlock during expensive operations. New writes cannot get here
// because &w is ensuring that all new writes get queued.
{
EnvOptions opt_env_opts = env_ - > OptimizeForManifestWrite ( env_options_ ) ;
// Before releasing mutex, make a copy of mutable_cf_options and pass to
// `PrepareApply` to avoided a potential data race with backgroundflush
MutableCFOptions mutable_cf_options_copy ( mutable_cf_options ) ;
mu - > Unlock ( ) ;
TEST_SYNC_POINT ( " VersionSet::LogAndApply:WriteManifest " ) ;
if ( ! w . edit_list . front ( ) - > IsColumnFamilyManipulation ( ) & &
this - > GetColumnFamilySet ( ) - > get_table_cache ( ) - > GetCapacity ( ) = =
if ( ! first_ writer . edit_list . front ( ) - > IsColumnFamilyManipulation ( ) & &
column_family_set_ - > get_table_cache ( ) - > GetCapacity ( ) = =
TableCache : : kInfiniteCapacity ) {
// unlimited table cache. Pre-load table handle now.
// Need to do it out of the mutex.
builder_guard - > version_builder ( ) - > LoadTableHandlers (
column_family_data - > internal_stats ( ) ,
column_family_data - > ioptions ( ) - > optimize_filters_for_hits ,
true /* prefetch_index_and_filter_in_cache */ ,
mutable_cf_options . prefix_extractor . get ( ) ) ;
for ( int i = 0 ; i < static_cast < int > ( versions . size ( ) ) ; + + i ) {
assert ( ! builder_guards . empty ( ) & &
builder_guards . size ( ) = = versions . size ( ) ) ;
assert ( ! mutable_cf_options_ptrs . empty ( ) & &
builder_guards . size ( ) = = versions . size ( ) ) ;
ColumnFamilyData * cfd = versions [ i ] - > cfd_ ;
builder_guards [ i ] - > version_builder ( ) - > LoadTableHandlers (
cfd - > internal_stats ( ) , cfd - > ioptions ( ) - > optimize_filters_for_hits ,
true /* prefetch_index_and_filter_in_cache */ ,
mutable_cf_options_ptrs [ i ] - > prefix_extractor . get ( ) ) ;
}
}
// This is fine because everything inside of this block is serialized --
// only one thread can be here at the same time
if ( new_descriptor_log ) {
// create manifest file
// create new manifest file
ROCKS_LOG_INFO ( db_options_ - > info_log , " Creating manifest % " PRIu64 " \n " ,
pending_manifest_file_number_ ) ;
unique_ptr < WritableFile > descriptor_file ;
@ -2873,18 +2867,19 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
}
}
if ( ! w . edit_list . front ( ) - > IsColumnFamilyManipulation ( ) ) {
// This is cpu-heavy operations, which should be called outside mutex.
v - > PrepareApply ( mutable_cf_options_copy , true ) ;
if ( ! first_writer . edit_list . front ( ) - > IsColumnFamilyManipulation ( ) ) {
for ( int i = 0 ; i < static_cast < int > ( versions . size ( ) ) ; + + i ) {
versions [ i ] - > PrepareApply ( * mutable_cf_options_ptrs [ i ] , true ) ;
}
}
// Write new record to MANIFEST log
// Write new records to MANIFEST log
if ( s . ok ( ) ) {
for ( auto & e : batch_edits ) {
std : : string record ;
if ( ! e - > EncodeTo ( & record ) ) {
s = Status : : Corruption (
" Unable to Encode VersionEdit: " + e - > DebugString ( true ) ) ;
s = Status : : Corruption ( " Unable to encode VersionEdit: " +
e - > DebugString ( true ) ) ;
break ;
}
TEST_KILL_RANDOM ( " VersionSet::LogAndApply:BeforeAddRecord " ,
@ -2898,7 +2893,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
s = SyncManifest ( env_ , db_options_ , descriptor_log_ - > file ( ) ) ;
}
if ( ! s . ok ( ) ) {
ROCKS_LOG_ERROR ( db_options_ - > info_log , " MANIFEST write: %s \n " ,
ROCKS_LOG_ERROR ( db_options_ - > info_log , " MANIFEST write %s \n " ,
s . ToString ( ) . c_str ( ) ) ;
}
}
@ -2915,7 +2910,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
new_manifest_file_size = descriptor_log_ - > file ( ) - > GetFileSize ( ) ;
}
if ( w . edit_list . front ( ) - > is_column_family_drop_ ) {
if ( first_ writer . edit_list . front ( ) - > is_column_family_drop_ ) {
TEST_SYNC_POINT ( " VersionSet::LogAndApply::ColumnFamilyDrop:0 " ) ;
TEST_SYNC_POINT ( " VersionSet::LogAndApply::ColumnFamilyDrop:1 " ) ;
TEST_SYNC_POINT ( " VersionSet::LogAndApply::ColumnFamilyDrop:2 " ) ;
@ -2926,25 +2921,24 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
mu - > Lock ( ) ;
}
// Append the old mai nfest file to the obsolete_manifests _ list to be deleted
// Append the old mani fest file to the obsolete_manifest_ list to be deleted
// by PurgeObsoleteFiles later.
if ( s . ok ( ) & & new_descriptor_log ) {
obsolete_manifests_ . emplace_back (
DescriptorFileName ( " " , manifest_file_number_ ) ) ;
}
// Install the new version
// Install the new versions
if ( s . ok ( ) ) {
if ( w . edit_list . front ( ) - > is_column_family_add_ ) {
// no group commit on column family add
if ( first_writer . edit_list . front ( ) - > is_column_family_add_ ) {
assert ( batch_edits . size ( ) = = 1 ) ;
assert ( new_cf_options ! = nullptr ) ;
CreateColumnFamily ( * new_cf_options , w . edit_list . front ( ) ) ;
} else if ( w . edit_list . front ( ) - > is_column_family_drop_ ) {
CreateColumnFamily ( * new_cf_options , first_ writer . edit_list . front ( ) ) ;
} else if ( first_ writer . edit_list . front ( ) - > is_column_family_drop_ ) {
assert ( batch_edits . size ( ) = = 1 ) ;
column_family_data - > SetDropped ( ) ;
if ( column_family_data - > Unref ( ) ) {
delete column_family_data ;
first_writer . cfd - > SetDropped ( ) ;
if ( first_writer . cfd - > Unref ( ) ) {
delete first_writer . cfd ;
}
} else {
uint64_t max_log_number_in_batch = 0 ;
@ -2960,60 +2954,158 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
}
}
if ( max_log_number_in_batch ! = 0 ) {
assert ( column_family_data - > GetLogNumber ( ) < = max_log_number_in_batch ) ;
column_family_data - > SetLogNumber ( max_log_number_in_batch ) ;
for ( int i = 0 ; i < static_cast < int > ( versions . size ( ) ) ; + + i ) {
ColumnFamilyData * cfd = versions [ i ] - > cfd_ ;
assert ( cfd - > GetLogNumber ( ) < = max_log_number_in_batch ) ;
cfd - > SetLogNumber ( max_log_number_in_batch ) ;
}
}
if ( last_min_log_number_to_keep ! = 0 ) {
// Should only be set in 2PC mode.
MarkMinLogNumberToKeep2PC ( last_min_log_number_to_keep ) ;
}
AppendVersion ( column_family_data , v ) ;
for ( int i = 0 ; i < static_cast < int > ( versions . size ( ) ) ; + + i ) {
ColumnFamilyData * cfd = versions [ i ] - > cfd_ ;
AppendVersion ( cfd , versions [ i ] ) ;
}
}
manifest_file_number_ = pending_manifest_file_number_ ;
manifest_file_size_ = new_manifest_file_size ;
prev_log_number_ = w . edit_list . front ( ) - > prev_log_number_ ;
prev_log_number_ = first_ writer . edit_list . front ( ) - > prev_log_number_ ;
} else {
std : : string version_edits ;
for ( auto & e : batch_edits ) {
version_edits = version_edits + " \n " + e - > DebugString ( true ) ;
version_edits + = ( " \n " + e - > DebugString ( true ) ) ;
}
ROCKS_LOG_ERROR ( db_options_ - > info_log ,
" Error in committing version edit to MANIFEST: %s " ,
version_edits . c_str ( ) ) ;
for ( auto v : versions ) {
delete v ;
}
ROCKS_LOG_ERROR (
db_options_ - > info_log ,
" [%s] Error in committing version edit to MANIFEST: %s " ,
column_family_data ? column_family_data - > GetName ( ) . c_str ( ) : " <null> " ,
version_edits . c_str ( ) ) ;
delete v ;
if ( new_descriptor_log ) {
ROCKS_LOG_INFO ( db_options_ - > info_log , " Deleting manifest % " PRIu64
" current manifest % " PRIu64 " \n " ,
ROCKS_LOG_INFO ( db_options_ - > info_log ,
" Deleting manifest % " PRIu64 " current manifest % " PRIu64
" \n " ,
manifest_file_number_ , pending_manifest_file_number_ ) ;
descriptor_log_ . reset ( ) ;
env_ - > DeleteFile (
DescriptorFileName ( dbname_ , pending_manifest_file_number_ ) ) ;
}
}
pending_manifest_file_number_ = 0 ;
// wake up all the waiting writers
while ( true ) {
ManifestWriter * ready = manifest_writers_ . front ( ) ;
manifest_writers_ . pop_front ( ) ;
if ( ready ! = & w ) {
ready - > status = s ;
ready - > done = true ;
bool need_signal = true ;
for ( const auto & w : writers ) {
if ( & w = = ready ) {
need_signal = false ;
break ;
}
}
ready - > status = s ;
ready - > done = true ;
if ( need_signal ) {
ready - > cv . Signal ( ) ;
}
if ( ready = = last_writer ) break ;
if ( ready = = last_writer ) {
break ;
}
}
// Notify new head of write queue
if ( ! manifest_writers_ . empty ( ) ) {
manifest_writers_ . front ( ) - > cv . Signal ( ) ;
}
return s ;
}
// 'datas' is gramatically incorrect. We still use this notation is to indicate
// that this variable represents a collection of column_family_data.
Status VersionSet : : LogAndApply (
const std : : vector < ColumnFamilyData * > & column_family_datas ,
const std : : vector < MutableCFOptions > & mutable_cf_options_list ,
const std : : vector < autovector < VersionEdit * > > & edit_lists ,
InstrumentedMutex * mu , Directory * db_directory , bool new_descriptor_log ,
const ColumnFamilyOptions * new_cf_options ) {
mu - > AssertHeld ( ) ;
int num_edits = 0 ;
for ( const auto & elist : edit_lists ) {
num_edits + = static_cast < int > ( elist . size ( ) ) ;
}
if ( num_edits = = 0 ) {
return Status : : OK ( ) ;
} else if ( num_edits > 1 ) {
# ifndef NDEBUG
for ( const auto & edit_list : edit_lists ) {
for ( const auto & edit : edit_list ) {
assert ( ! edit - > IsColumnFamilyManipulation ( ) ) ;
}
}
# endif /* ! NDEBUG */
}
int num_cfds = static_cast < int > ( column_family_datas . size ( ) ) ;
if ( num_cfds = = 1 & & column_family_datas [ 0 ] = = nullptr ) {
assert ( edit_lists . size ( ) = = 1 & & edit_lists [ 0 ] . size ( ) = = 1 ) ;
assert ( edit_lists [ 0 ] [ 0 ] - > is_column_family_add_ ) ;
assert ( new_cf_options ! = nullptr ) ;
}
std : : deque < ManifestWriter > writers ;
if ( num_cfds > 0 ) {
assert ( static_cast < size_t > ( num_cfds ) = = mutable_cf_options_list . size ( ) ) ;
assert ( static_cast < size_t > ( num_cfds ) = = edit_lists . size ( ) ) ;
}
for ( int i = 0 ; i < num_cfds ; + + i ) {
writers . emplace_back ( mu , column_family_datas [ i ] , mutable_cf_options_list [ i ] ,
edit_lists [ i ] ) ;
manifest_writers_ . push_back ( & writers [ i ] ) ;
}
assert ( ! writers . empty ( ) ) ;
ManifestWriter & first_writer = writers . front ( ) ;
while ( ! first_writer . done & & & first_writer ! = manifest_writers_ . front ( ) ) {
first_writer . cv . Wait ( ) ;
}
if ( first_writer . done ) {
// All non-CF-manipulation operations can be grouped together and committed
// to MANIFEST. They should all have finished. The status code is stored in
// the first manifest writer.
# ifndef NDEBUG
for ( const auto & writer : writers ) {
assert ( writer . done ) ;
}
# endif /* !NDEBUG */
return first_writer . status ;
}
int num_undropped_cfds = 0 ;
for ( auto cfd : column_family_datas ) {
// if cfd == nullptr, it is a column family add.
if ( cfd = = nullptr | | ! cfd - > IsDropped ( ) ) {
+ + num_undropped_cfds ;
}
}
if ( 0 = = num_undropped_cfds ) {
// TODO (yanqin) maybe use a different status code to denote column family
// drop other than OK and ShutdownInProgress
for ( int i = 0 ; i ! = num_cfds ; + + i ) {
manifest_writers_ . pop_front ( ) ;
}
// Notify new head of manifest write queue.
if ( ! manifest_writers_ . empty ( ) ) {
manifest_writers_ . front ( ) - > cv . Signal ( ) ;
}
return Status : : OK ( ) ;
}
return ProcessManifestWrites ( writers , mu , db_directory , new_descriptor_log ,
new_cf_options ) ;
}
void VersionSet : : LogAndApplyCFHelper ( VersionEdit * edit ) {
assert ( edit - > IsColumnFamilyManipulation ( ) ) ;
edit - > SetNextFile ( next_file_number_ . load ( ) ) ;
@ -4023,7 +4115,7 @@ InternalIterator* VersionSet::MakeInputIterator(
nullptr /* table_reader_ptr */ ,
nullptr /* no per level latency histogram */ ,
true /* for_compaction */ , nullptr /* arena */ ,
false /* skip_filters */ , ( int ) which /* level */ ) ;
false /* skip_filters */ , static_cast < int > ( which ) /* level */ ) ;
}
} else {
// Create concatenating iterator for the files from this level
@ -4034,7 +4126,7 @@ InternalIterator* VersionSet::MakeInputIterator(
false /* should_sample */ ,
nullptr /* no per level latency histogram */ ,
true /* for_compaction */ , false /* skip_filters */ ,
( int ) which /* level */ , range_del_agg ) ;
static_cast < int > ( which ) /* level */ , range_del_agg ) ;
}
}
}