@ -486,108 +486,108 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
ReadKeyFromWriteBatchEntry ( & entry_ptr , & key , column_family_id ! = 0 ) ;
ReadKeyFromWriteBatchEntry ( & entry_ptr , & key , column_family_id ! = 0 ) ;
assert ( success ) ;
assert ( success ) ;
auto * mem = arena . Allocate ( sizeof ( WriteBatchIndexEntry ) ) ;
auto * mem = arena . Allocate ( sizeof ( WriteBatchIndexEntry ) ) ;
auto * index_entry =
auto * index_entry =
new ( mem ) WriteBatchIndexEntry ( last_entry_offset , column_family_id ,
new ( mem ) WriteBatchIndexEntry ( last_entry_offset , column_family_id ,
key . data ( ) - wb_data . data ( ) , key . size ( ) ) ;
key . data ( ) - wb_data . data ( ) , key . size ( ) ) ;
skip_list . Insert ( index_entry ) ;
skip_list . Insert ( index_entry ) ;
}
}
void WriteBatchWithIndex : : Rep : : Clear ( ) {
write_batch . Clear ( ) ;
ClearIndex ( ) ;
}
void WriteBatchWithIndex : : Rep : : ClearIndex ( ) {
void WriteBatchWithIndex : : Rep : : Clear ( ) {
skip_list . ~ WriteBatchEntrySkipList ( ) ;
write_batch . Clear ( ) ;
arena . ~ Arena ( ) ;
ClearIndex ( ) ;
new ( & arena ) Arena ( ) ;
}
new ( & skip_list ) WriteBatchEntrySkipList ( comparator , & arena ) ;
last_entry_offset = 0 ;
}
Status WriteBatchWithIndex : : Rep : : ReBuildIndex ( ) {
void WriteBatchWithIndex : : Rep : : ClearIndex ( ) {
Status s ;
skip_list . ~ WriteBatchEntrySkipList ( ) ;
arena . ~ Arena ( ) ;
new ( & arena ) Arena ( ) ;
new ( & skip_list ) WriteBatchEntrySkipList ( comparator , & arena ) ;
last_entry_offset = 0 ;
}
ClearIndex ( ) ;
Status WriteBatchWithIndex : : Rep : : ReBuildIndex ( ) {
Status s ;
if ( write_batch . Count ( ) = = 0 ) {
ClearIndex ( ) ;
// Nothing to re-index
return s ;
}
size_t offset = WriteBatchInternal : : GetFirstOffset ( & write_batch ) ;
if ( write_batch . Count ( ) = = 0 ) {
// Nothing to re-index
return s ;
}
Slice input ( write_batch . Data ( ) ) ;
size_t offset = WriteBatchInternal : : GetFirstOffset ( & write_batch ) ;
input . remove_prefix ( offset ) ;
// Loop through all entries in Rep and add each one to the index
Slice input ( write_batch . Data ( ) ) ;
int found = 0 ;
input . remove_prefix ( offset ) ;
while ( s . ok ( ) & & ! input . empty ( ) ) {
Slice key , value , blob , xid ;
uint32_t column_family_id = 0 ; // default
char tag = 0 ;
// set offset of current entry for call to AddNewEntry()
// Loop through all entries in Rep and add each one to the index
last_entry_offset = input . data ( ) - write_batch . Data ( ) . data ( ) ;
int found = 0 ;
while ( s . ok ( ) & & ! input . empty ( ) ) {
Slice key , value , blob , xid ;
uint32_t column_family_id = 0 ; // default
char tag = 0 ;
s = ReadRecordFromWriteBatch ( & input , & tag , & column_family_id , & key ,
// set offset of current entry for call to AddNewEntry()
& value , & blob , & xid ) ;
last_entry_offset = input . data ( ) - write_batch . Data ( ) . data ( ) ;
if ( ! s . ok ( ) ) {
break ;
}
switch ( tag ) {
s = ReadRecordFromWriteBatch ( & input , & tag , & column_family_id , & key ,
case kTypeColumnFamilyValue :
& value , & blob , & xid ) ;
case kTypeValue :
if ( ! s . ok ( ) ) {
case kTypeColumnFamilyDeletion :
break ;
case kTypeDeletion :
case kTypeColumnFamilySingleDeletion :
case kTypeSingleDeletion :
case kTypeColumnFamilyMerge :
case kTypeMerge :
found + + ;
if ( ! UpdateExistingEntryWithCfId ( column_family_id , key ) ) {
AddNewEntry ( column_family_id ) ;
}
break ;
case kTypeLogData :
case kTypeBeginPrepareXID :
case kTypeBeginPersistedPrepareXID :
case kTypeEndPrepareXID :
case kTypeCommitXID :
case kTypeRollbackXID :
case kTypeNoop :
break ;
default :
return Status : : Corruption ( " unknown WriteBatch tag in ReBuildIndex " ,
ToString ( static_cast < unsigned int > ( tag ) ) ) ;
}
}
}
if ( s . ok ( ) & & found ! = write_batch . Count ( ) ) {
switch ( tag ) {
s = Status : : Corruption ( " WriteBatch has wrong count " ) ;
case kTypeColumnFamilyValue :
case kTypeValue :
case kTypeColumnFamilyDeletion :
case kTypeDeletion :
case kTypeColumnFamilySingleDeletion :
case kTypeSingleDeletion :
case kTypeColumnFamilyMerge :
case kTypeMerge :
found + + ;
if ( ! UpdateExistingEntryWithCfId ( column_family_id , key ) ) {
AddNewEntry ( column_family_id ) ;
}
break ;
case kTypeLogData :
case kTypeBeginPrepareXID :
case kTypeBeginPersistedPrepareXID :
case kTypeEndPrepareXID :
case kTypeCommitXID :
case kTypeRollbackXID :
case kTypeNoop :
break ;
default :
return Status : : Corruption ( " unknown WriteBatch tag in ReBuildIndex " ,
ToString ( static_cast < unsigned int > ( tag ) ) ) ;
}
}
}
return s ;
if ( s . ok ( ) & & found ! = write_batch . Count ( ) ) {
s = Status : : Corruption ( " WriteBatch has wrong count " ) ;
}
}
WriteBatchWithIndex : : WriteBatchWithIndex (
return s ;
const Comparator * default_index_comparator , size_t reserved_bytes ,
}
bool overwrite_key , size_t max_bytes )
: rep ( new Rep ( default_index_comparator , reserved_bytes , max_bytes ,
WriteBatchWithIndex : : WriteBatchWithIndex (
overwrite_key ) ) { }
const Comparator * default_index_comparator , size_t reserved_bytes ,
bool overwrite_key , size_t max_bytes )
: rep ( new Rep ( default_index_comparator , reserved_bytes , max_bytes ,
overwrite_key ) ) { }
WriteBatchWithIndex : : ~ WriteBatchWithIndex ( ) { }
WriteBatchWithIndex : : ~ WriteBatchWithIndex ( ) { }
WriteBatch * WriteBatchWithIndex : : GetWriteBatch ( ) { return & rep - > write_batch ; }
WriteBatch * WriteBatchWithIndex : : GetWriteBatch ( ) { return & rep - > write_batch ; }
bool WriteBatchWithIndex : : HasDuplicateKeys ( ) {
bool WriteBatchWithIndex : : HasDuplicateKeys ( ) {
return rep - > obsolete_offsets . size ( ) > 0 ;
return rep - > obsolete_offsets . size ( ) > 0 ;
}
}
WBWIIterator * WriteBatchWithIndex : : NewIterator ( ) {
WBWIIterator * WriteBatchWithIndex : : NewIterator ( ) {
return new WBWIIteratorImpl ( 0 , & ( rep - > skip_list ) , & rep - > write_batch ) ;
return new WBWIIteratorImpl ( 0 , & ( rep - > skip_list ) , & rep - > write_batch ) ;
}
}
WBWIIterator * WriteBatchWithIndex : : NewIterator (
WBWIIterator * WriteBatchWithIndex : : NewIterator (