@ -692,7 +692,6 @@ class MemTableInserter : public WriteBatch::Handler {
// log number that all Memtables inserted into should reference
// log number that all Memtables inserted into should reference
uint64_t log_number_ref_ ;
uint64_t log_number_ref_ ;
DBImpl * db_ ;
DBImpl * db_ ;
const bool dont_filter_deletes_ ;
const bool concurrent_memtable_writes_ ;
const bool concurrent_memtable_writes_ ;
// current recovered transaction we are rebuilding (recovery)
// current recovered transaction we are rebuilding (recovery)
WriteBatch * rebuilding_trx_ ;
WriteBatch * rebuilding_trx_ ;
@ -702,7 +701,6 @@ class MemTableInserter : public WriteBatch::Handler {
FlushScheduler * flush_scheduler ,
FlushScheduler * flush_scheduler ,
bool ignore_missing_column_families ,
bool ignore_missing_column_families ,
uint64_t recovering_log_number , DB * db ,
uint64_t recovering_log_number , DB * db ,
const bool dont_filter_deletes ,
bool concurrent_memtable_writes )
bool concurrent_memtable_writes )
: sequence_ ( sequence ) ,
: sequence_ ( sequence ) ,
cf_mems_ ( cf_mems ) ,
cf_mems_ ( cf_mems ) ,
@ -711,13 +709,9 @@ class MemTableInserter : public WriteBatch::Handler {
recovering_log_number_ ( recovering_log_number ) ,
recovering_log_number_ ( recovering_log_number ) ,
log_number_ref_ ( 0 ) ,
log_number_ref_ ( 0 ) ,
db_ ( reinterpret_cast < DBImpl * > ( db ) ) ,
db_ ( reinterpret_cast < DBImpl * > ( db ) ) ,
dont_filter_deletes_ ( dont_filter_deletes ) ,
concurrent_memtable_writes_ ( concurrent_memtable_writes ) ,
concurrent_memtable_writes_ ( concurrent_memtable_writes ) ,
rebuilding_trx_ ( nullptr ) {
rebuilding_trx_ ( nullptr ) {
assert ( cf_mems_ ) ;
assert ( cf_mems_ ) ;
if ( ! dont_filter_deletes_ ) {
assert ( db_ ) ;
}
}
}
void set_log_number_ref ( uint64_t log ) { log_number_ref_ = log ; }
void set_log_number_ref ( uint64_t log ) { log_number_ref_ = log ; }
@ -827,23 +821,6 @@ class MemTableInserter : public WriteBatch::Handler {
Status DeleteImpl ( uint32_t column_family_id , const Slice & key ,
Status DeleteImpl ( uint32_t column_family_id , const Slice & key ,
ValueType delete_type ) {
ValueType delete_type ) {
MemTable * mem = cf_mems_ - > GetMemTable ( ) ;
MemTable * mem = cf_mems_ - > GetMemTable ( ) ;
auto * moptions = mem - > GetMemTableOptions ( ) ;
if ( ! dont_filter_deletes_ & & moptions - > filter_deletes ) {
assert ( ! concurrent_memtable_writes_ ) ;
SnapshotImpl read_from_snapshot ;
read_from_snapshot . number_ = sequence_ ;
ReadOptions ropts ;
ropts . snapshot = & read_from_snapshot ;
std : : string value ;
auto cf_handle = cf_mems_ - > GetColumnFamilyHandle ( ) ;
if ( cf_handle = = nullptr ) {
cf_handle = db_ - > DefaultColumnFamily ( ) ;
}
if ( ! db_ - > KeyMayExist ( ropts , cf_handle , key , & value ) ) {
RecordTick ( moptions - > statistics , NUMBER_FILTERED_DELETES ) ;
return Status : : OK ( ) ;
}
}
mem - > Add ( sequence_ , delete_type , key , Slice ( ) , concurrent_memtable_writes_ ) ;
mem - > Add ( sequence_ , delete_type , key , Slice ( ) , concurrent_memtable_writes_ ) ;
sequence_ + + ;
sequence_ + + ;
CheckMemtableFull ( ) ;
CheckMemtableFull ( ) ;
@ -1080,10 +1057,10 @@ Status WriteBatchInternal::InsertInto(
const autovector < WriteThread : : Writer * > & writers , SequenceNumber sequence ,
const autovector < WriteThread : : Writer * > & writers , SequenceNumber sequence ,
ColumnFamilyMemTables * memtables , FlushScheduler * flush_scheduler ,
ColumnFamilyMemTables * memtables , FlushScheduler * flush_scheduler ,
bool ignore_missing_column_families , uint64_t log_number , DB * db ,
bool ignore_missing_column_families , uint64_t log_number , DB * db ,
const bool dont_filter_deletes , bool concurrent_memtable_writes ) {
bool concurrent_memtable_writes ) {
MemTableInserter inserter ( sequence , memtables , flush_scheduler ,
MemTableInserter inserter ( sequence , memtables , flush_scheduler ,
ignore_missing_column_families , log_number , db ,
ignore_missing_column_families , log_number , db ,
dont_filter_deletes , concurrent_memtable_writes ) ;
concurrent_memtable_writes ) ;
for ( size_t i = 0 ; i < writers . size ( ) ; i + + ) {
for ( size_t i = 0 ; i < writers . size ( ) ; i + + ) {
auto w = writers [ i ] ;
auto w = writers [ i ] ;
if ( ! w - > ShouldWriteToMemtable ( ) ) {
if ( ! w - > ShouldWriteToMemtable ( ) ) {
@ -1103,26 +1080,26 @@ Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer,
FlushScheduler * flush_scheduler ,
FlushScheduler * flush_scheduler ,
bool ignore_missing_column_families ,
bool ignore_missing_column_families ,
uint64_t log_number , DB * db ,
uint64_t log_number , DB * db ,
const bool dont_filter_deletes ,
bool concurrent_memtable_writes ) {
bool concurrent_memtable_writes ) {
MemTableInserter inserter ( WriteBatchInternal : : Sequence ( writer - > batch ) ,
MemTableInserter inserter ( WriteBatchInternal : : Sequence ( writer - > batch ) ,
memtables , flush_scheduler ,
memtables , flush_scheduler ,
ignore_missing_column_families , log_number , db ,
ignore_missing_column_families , log_number , db ,
dont_filter_deletes , concurrent_memtable_writes ) ;
concurrent_memtable_writes ) ;
assert ( writer - > ShouldWriteToMemtable ( ) ) ;
assert ( writer - > ShouldWriteToMemtable ( ) ) ;
inserter . set_log_number_ref ( writer - > log_ref ) ;
inserter . set_log_number_ref ( writer - > log_ref ) ;
return writer - > batch - > Iterate ( & inserter ) ;
return writer - > batch - > Iterate ( & inserter ) ;
}
}
Status WriteBatchInternal : : InsertInto (
Status WriteBatchInternal : : InsertInto ( const WriteBatch * batch ,
const WriteBatch * batch , ColumnFamilyMemTables * memtables ,
ColumnFamilyMemTables * memtables ,
FlushScheduler * flush_scheduler , bool ignore_missing_column_families ,
FlushScheduler * flush_scheduler ,
uint64_t log_number , DB * db , const bool dont_filter_deletes ,
bool ignore_missing_column_families ,
bool concurrent_memtable_writes , SequenceNumber * last_seq_used ) {
uint64_t log_number , DB * db ,
bool concurrent_memtable_writes ,
SequenceNumber * last_seq_used ) {
MemTableInserter inserter ( WriteBatchInternal : : Sequence ( batch ) , memtables ,
MemTableInserter inserter ( WriteBatchInternal : : Sequence ( batch ) , memtables ,
flush_scheduler , ignore_missing_column_families ,
flush_scheduler , ignore_missing_column_families ,
log_number , db , dont_filter_deletes ,
log_number , db , concurrent_memtable_writes ) ;
concurrent_memtable_writes ) ;
Status s = batch - > Iterate ( & inserter ) ;
Status s = batch - > Iterate ( & inserter ) ;
if ( last_seq_used ! = nullptr ) {
if ( last_seq_used ! = nullptr ) {
* last_seq_used = inserter . get_final_sequence ( ) ;
* last_seq_used = inserter . get_final_sequence ( ) ;