@ -203,13 +203,10 @@ CompressionType GetCompressionFlush(const Options& options) {
}
}
DBImpl : : DBImpl ( const Options & options , const std : : string & dbname )
DBImpl : : DBImpl ( const DB Options& options , const std : : string & dbname )
: env_ ( options . env ) ,
dbname_ ( dbname ) ,
internal_comparator_ ( options . comparator ) ,
options_ ( SanitizeOptions ( dbname , & internal_comparator_ ,
& internal_filter_policy_ , options ) ) ,
internal_filter_policy_ ( options . filter_policy ) ,
options_ ( SanitizeOptions ( dbname , options ) ) ,
// Reserve ten files or so for other uses and give the rest to TableCache.
table_cache_ ( NewLRUCache ( options_ . max_open_files - 10 ,
options_ . table_cache_numshardbits ,
@ -245,7 +242,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
new ColumnFamilyMemTablesImpl ( versions_ - > GetColumnFamilySet ( ) ) ) ;
dumpLeveldbBuildVersion ( options_ . info_log . get ( ) ) ;
options_ . Dump ( options_ . info_log . get ( ) ) ;
// TODO(icanadi) dump DBOptions and ColumnFamilyOptions separately
// options_.Dump(options_.info_log.get());
char name [ 100 ] ;
Status st = env_ - > GetHostName ( name , 100L ) ;
@ -322,7 +320,6 @@ uint64_t DBImpl::TEST_Current_Manifest_FileNo() {
Status DBImpl : : NewDB ( ) {
VersionEdit new_db ;
new_db . SetComparatorName ( internal_comparator_ . user_comparator ( ) - > Name ( ) ) ;
new_db . SetLogNumber ( 0 ) ;
new_db . SetNextFile ( 2 ) ;
new_db . SetLastSequence ( 0 ) ;
@ -909,8 +906,9 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
// filter out all the column families that have already
// flushed memtables with log_number
column_family_memtables_ - > SetLogNumber ( log_number ) ;
// TODO(icanadi) options_
status = WriteBatchInternal : : InsertInto (
& batch , column_family_memtables_ . get ( ) , & options_ ) ;
& batch , column_family_memtables_ . get ( ) , default_cfd_ - > full_options ( ) ) ;
column_family_memtables_ - > SetLogNumber ( 0 ) ;
MaybeIgnoreError ( & status ) ;
@ -1013,10 +1011,10 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
Status s ;
{
mutex_ . Unlock ( ) ;
s = BuildTable ( dbname_ , env_ , options_ , storage_options_ ,
s = BuildTable ( dbname_ , env_ , * cfd - > full_options ( ) , storage_options_ ,
cfd - > table_cache ( ) , iter , & meta , cfd - > user_comparator ( ) ,
newest_snapshot , earliest_seqno_in_memtable ,
GetCompressionFlush ( options_ ) ) ;
GetCompressionFlush ( * cfd - > full_options ( ) ) ) ;
LogFlush ( options_ . info_log ) ;
mutex_ . Lock ( ) ;
}
@ -1078,10 +1076,10 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
" Level-0 flush table #%lu: started " ,
( unsigned long ) meta . number ) ;
s = BuildTable ( dbname_ , env_ , options_ , storage_options_ ,
s = BuildTable ( dbname_ , env_ , * cfd - > full_options ( ) , storage_options_ ,
cfd - > table_cache ( ) , iter , & meta , cfd - > user_comparator ( ) ,
newest_snapshot , earliest_seqno_in_memtable ,
GetCompressionFlush ( options_ ) ) ;
GetCompressionFlush ( * cfd - > full_options ( ) ) ) ;
LogFlush ( options_ . info_log ) ;
delete iter ;
Log ( options_ . info_log , " Level-0 flush table #%lu: %lu bytes %s " ,
@ -1643,9 +1641,10 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
Status DBImpl : : TEST_CompactRange ( int level ,
const Slice * begin ,
const Slice * end ) {
int output_level = ( options_ . compaction_style = = kCompactionStyleUniversal )
? level
: level + 1 ;
int output_level =
( default_cfd_ - > options ( ) - > compaction_style = = kCompactionStyleUniversal )
? level
: level + 1 ;
return RunManualCompaction ( default_cfd_ , level , output_level , begin , end ) ;
}
@ -1763,7 +1762,8 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
void DBImpl : : BackgroundCallFlush ( ) {
bool madeProgress = false ;
DeletionState deletion_state ( options_ . max_write_buffer_number , true ) ;
DeletionState deletion_state ( default_cfd_ - > options ( ) - > max_write_buffer_number ,
true ) ;
assert ( bg_flush_scheduled_ ) ;
MutexLock l ( & mutex_ ) ;
@ -1814,7 +1814,8 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() {
void DBImpl : : BackgroundCallCompaction ( ) {
bool madeProgress = false ;
DeletionState deletion_state ( options_ . max_write_buffer_number , true ) ;
DeletionState deletion_state ( default_cfd_ - > options ( ) - > max_write_buffer_number ,
true ) ;
MaybeDumpStats ( ) ;
@ -1921,14 +1922,16 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
( ( m - > done | | manual_end = = nullptr )
? " (end) "
: manual_end - > DebugString ( ) . c_str ( ) ) ) ;
} else if ( ! options_ . disable_auto_compactions ) {
} else {
for ( auto cfd : * versions_ - > GetColumnFamilySet ( ) ) {
c . reset ( cfd - > PickCompaction ( ) ) ;
if ( c ! = nullptr ) {
// update statistics
MeasureTime ( options_ . statistics . get ( ) , NUM_FILES_IN_SINGLE_COMPACTION ,
c - > inputs ( 0 ) - > size ( ) ) ;
break ;
if ( ! cfd - > options ( ) - > disable_auto_compactions ) {
c . reset ( cfd - > PickCompaction ( ) ) ;
if ( c ! = nullptr ) {
// update statistics
MeasureTime ( options_ . statistics . get ( ) , NUM_FILES_IN_SINGLE_COMPACTION ,
c - > inputs ( 0 ) - > size ( ) ) ;
break ;
}
}
}
}
@ -2005,7 +2008,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
// Universal compaction should always compact the whole range
assert ( options_ . compaction_style ! = kCompactionStyleUniversal ) ;
assert ( m - > cfd - > options ( ) - > compaction_style ! = kCompactionStyleUniversal ) ;
m - > tmp_storage = * manual_end ;
m - > begin = & m - > tmp_storage ;
}
@ -2097,11 +2100,11 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
compact - > compaction - > output_level ( ) ) ) ;
CompressionType compression_type = GetCompressionType (
options_ , compact - > compaction - > output_level ( ) ,
* cfd - > full_options ( ) , compact - > compaction - > output_level ( ) ,
compact - > compaction - > enable_compression ( ) ) ;
compact - > builder . reset (
GetTableBuilder ( options_ , compact - > outfile . get ( ) , compression_type ) ) ;
compact - > builder . reset ( GetTableBuilder (
* cfd - > full_options ( ) , compact - > outfile . get ( ) , compression_type ) ) ;
}
LogFlush ( options_ . info_log ) ;
return s ;
@ -2286,15 +2289,16 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
SequenceNumber visible_in_snapshot = kMaxSequenceNumber ;
std : : string compaction_filter_value ;
std : : vector < char > delete_key ; // for compaction filter
MergeHelper merge ( cfd - > user_comparator ( ) , options_ . merge_operator . get ( ) ,
options_ . info_log . get ( ) ,
false /* internal key corruption is expected */ ) ;
MergeHelper merge (
cfd - > user_comparator ( ) , cfd - > options ( ) - > merge_operator . get ( ) ,
options_ . info_log . get ( ) , false /* internal key corruption is expected */ ) ;
auto compaction_filter = cfd - > options ( ) - > compaction_filter ;
std : : unique_ptr < CompactionFilter > compaction_filter_from_factory = nullptr ;
if ( ! compaction_filter ) {
auto context = compact - > GetFilterContext ( ) ;
compaction_filter_from_factory =
options_ . compaction_filter_factory - > CreateCompactionFilter ( context ) ;
cfd - > options ( ) - > compaction_filter_factory - > CreateCompactionFilter (
context ) ;
compaction_filter = compaction_filter_from_factory . get ( ) ;
}
@ -2706,8 +2710,9 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
Iterator * mutable_iter = super_version - > mem - > NewIterator ( options ) ;
// create a DBIter that only uses memtable content; see NewIterator()
mutable_iter = NewDBIterator ( & dbname_ , env_ , options_ , cfd - > user_comparator ( ) ,
mutable_iter , kMaxSequenceNumber ) ;
mutable_iter =
NewDBIterator ( & dbname_ , env_ , * cfd - > full_options ( ) ,
cfd - > user_comparator ( ) , mutable_iter , kMaxSequenceNumber ) ;
std : : vector < Iterator * > list ;
super_version - > imm - > AddIterators ( options , & list ) ;
@ -2717,8 +2722,8 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
// create a DBIter that only uses memtable content; see NewIterator()
immutable_iter =
NewDBIterator ( & dbname_ , env_ , options_ , cfd - > user_comparator ( ) ,
immutable_iter , kMaxSequenceNumber ) ;
NewDBIterator ( & dbname_ , env_ , * cfd - > full_options ( ) ,
cfd - > user_comparator ( ) , immutable_iter , kMaxSequenceNumber ) ;
// register cleanups
mutable_iter - > RegisterCleanup ( CleanupIteratorState ,
@ -2801,21 +2806,23 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey ( key , snapshot ) ;
if ( get_version - > mem - > Get ( lkey , value , & s , merge_context , options_ ) ) {
if ( get_version - > mem - > Get ( lkey , value , & s , merge_context ,
* cfd - > full_options ( ) ) ) {
// Done
RecordTick ( options_ . statistics . get ( ) , MEMTABLE_HIT ) ;
} else if ( get_version - > imm - > Get ( lkey , value , & s , merge_context , options_ ) ) {
} else if ( get_version - > imm - > Get ( lkey , value , & s , merge_context ,
* cfd - > full_options ( ) ) ) {
// Done
RecordTick ( options_ . statistics . get ( ) , MEMTABLE_HIT ) ;
} else {
get_version - > current - > Get ( options , lkey , value , & s , & merge_context , & stats ,
options_ , value_found ) ;
* cfd - > full_options ( ) , value_found ) ;
have_stat_update = true ;
RecordTick ( options_ . statistics . get ( ) , MEMTABLE_MISS ) ;
}
bool delete_get_version = false ;
if ( ! options_ . disable_seek_compaction & & have_stat_update ) {
if ( ! cfd - > options ( ) - > disable_seek_compaction & & have_stat_update ) {
mutex_ . Lock ( ) ;
if ( get_version - > current - > UpdateStats ( stats ) ) {
MaybeScheduleFlushOrCompaction ( ) ;
@ -2852,6 +2859,7 @@ std::vector<Status> DBImpl::MultiGet(
SequenceNumber snapshot ;
struct MultiGetColumnFamilyData {
ColumnFamilyData * cfd ;
SuperVersion * super_version ;
Version : : GetStats stats ;
bool have_stat_update = false ;
@ -2873,6 +2881,7 @@ std::vector<Status> DBImpl::MultiGet(
for ( auto mgd_iter : multiget_cf_data ) {
auto cfd = versions_ - > GetColumnFamilySet ( ) - > GetColumnFamily ( mgd_iter . first ) ;
assert ( cfd ! = nullptr ) ;
mgd_iter . second - > cfd = cfd ;
mgd_iter . second - > super_version = cfd - > GetSuperVersion ( ) - > Ref ( ) ;
}
mutex_ . Unlock ( ) ;
@ -2902,14 +2911,16 @@ std::vector<Status> DBImpl::MultiGet(
assert ( mgd_iter ! = multiget_cf_data . end ( ) ) ;
auto mgd = mgd_iter - > second ;
auto super_version = mgd - > super_version ;
if ( super_version - > mem - > Get ( lkey , value , & s , merge_context , options_ ) ) {
auto cfd = mgd - > cfd ;
if ( super_version - > mem - > Get ( lkey , value , & s , merge_context ,
* cfd - > full_options ( ) ) ) {
// Done
} else if ( super_version - > imm - > Get ( lkey , value , & s , merge_context ,
options_ ) ) {
* cfd - > full_options ( ) ) ) {
// Done
} else {
super_version - > current - > Get ( options , lkey , value , & s , & merge_context ,
& mgd - > stats , options_ ) ;
& mgd - > stats , * cfd - > full_options ( ) ) ;
mgd - > have_stat_update = true ;
}
@ -2924,7 +2935,8 @@ std::vector<Status> DBImpl::MultiGet(
mutex_ . Lock ( ) ;
for ( auto mgd_iter : multiget_cf_data ) {
auto mgd = mgd_iter . second ;
if ( ! options_ . disable_seek_compaction & & mgd - > have_stat_update ) {
auto cfd = mgd - > cfd ;
if ( ! cfd - > options ( ) - > disable_seek_compaction & & mgd - > have_stat_update ) {
if ( mgd - > super_version - > current - > UpdateStats ( mgd - > stats ) ) {
schedule_flush_or_compaction = true ;
}
@ -3037,7 +3049,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
iter = NewInternalIterator ( options , cfd , super_version ) ;
iter = NewDBIterator (
& dbname_ , env_ , options_ , cfd - > user_comparator ( ) , iter ,
& dbname_ , env_ , * cfd - > full_options ( ) , cfd - > user_comparator ( ) , iter ,
( options . snapshot ! = nullptr
? reinterpret_cast < const SnapshotImpl * > ( options . snapshot ) - > number_
: latest_snapshot ) ) ;
@ -3047,7 +3059,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
// use extra wrapper to exclude any keys from the results which
// don't begin with the prefix
iter = new PrefixFilterIterator ( iter , * options . prefix ,
options_ . prefix_extractor ) ;
cfd - > options ( ) - > prefix_extractor ) ;
}
return iter ;
}
@ -3080,7 +3092,11 @@ Status DBImpl::Put(const WriteOptions& o,
Status DBImpl : : Merge ( const WriteOptions & o ,
const ColumnFamilyHandle & column_family , const Slice & key ,
const Slice & val ) {
if ( ! options_ . merge_operator ) {
mutex_ . Lock ( ) ;
auto cfd = versions_ - > GetColumnFamilySet ( ) - > GetColumnFamily ( column_family . id ) ;
mutex_ . Unlock ( ) ;
assert ( cfd ! = nullptr ) ;
if ( ! cfd - > options ( ) - > merge_operator ) {
return Status : : NotSupported ( " Provide a merge_operator when opening DB " ) ;
} else {
return DB : : Merge ( o , column_family , key , val ) ;
@ -3186,9 +3202,11 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// We'll need to add a spinlock for reading that we also lock when we
// write to a column family (only on column family add/drop, which is
// a very rare action)
// TODO(icanadi) options_
status = WriteBatchInternal : : InsertInto (
updates , column_family_memtables_ . get ( ) , & options_ , this ,
options_ . filter_deletes ) ;
updates , column_family_memtables_ . get ( ) ,
default_cfd_ - > full_options ( ) , this ,
default_cfd_ - > options ( ) - > filter_deletes ) ;
if ( ! status . ok ( ) ) {
// Panic for in-memory corruptions
@ -3390,7 +3408,8 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
RecordTick ( options_ . statistics . get ( ) , STALL_L0_NUM_FILES_MICROS , stall ) ;
cfd - > internal_stats ( ) - > RecordWriteStall ( InternalStats : : LEVEL0_NUM_FILES ,
stall ) ;
} else if ( allow_hard_rate_limit_delay & & options_ . hard_rate_limit > 1.0 & &
} else if ( allow_hard_rate_limit_delay & &
cfd - > options ( ) - > hard_rate_limit > 1.0 & &
( score = cfd - > current ( ) - > MaxCompactionScore ( ) ) >
cfd - > options ( ) - > hard_rate_limit ) {
// Delay a write when the compaction score for any level is too large.
@ -3495,7 +3514,7 @@ Env* DBImpl::GetEnv() const {
const Options & DBImpl : : GetOptions ( const ColumnFamilyHandle & column_family )
const {
return options_ ;
return * default_cfd_ - > full_options ( ) ;
}
bool DBImpl : : GetProperty ( const ColumnFamilyHandle & column_family ,
@ -3699,24 +3718,18 @@ Status DB::OpenWithColumnFamilies(
std : : vector < ColumnFamilyHandle > * handles , DB * * dbptr ) {
* dbptr = nullptr ;
EnvOptions soptions ;
// TODO temporary until we change DBImpl to accept
// DBOptions instead of Options
ColumnFamilyOptions default_column_family_options ;
for ( auto cfd : column_families ) {
if ( cfd . name = = default_column_family_name ) {
default_column_family_options = cfd . options ;
break ;
}
}
// default options
Options options ( db_options , default_column_family_options ) ;
if ( options . block_cache ! = nullptr & & options . no_block_cache ) {
return Status : : InvalidArgument (
" no_block_cache is true while block_cache is not nullptr " ) ;
size_t max_write_buffer_size = 0 ;
for ( auto cf : column_families ) {
max_write_buffer_size =
std : : max ( max_write_buffer_size , cf . options . write_buffer_size ) ;
if ( cf . options . block_cache ! = nullptr & & cf . options . no_block_cache ) {
return Status : : InvalidArgument (
" no_block_cache is true while block_cache is not nullptr " ) ;
}
}
DBImpl * impl = new DBImpl ( options , dbname ) ;
DBImpl * impl = new DBImpl ( db_ options, dbname ) ;
Status s = impl - > env_ - > CreateDirIfMissing ( impl - > options_ . wal_dir ) ;
if ( ! s . ok ( ) ) {
delete impl ;
@ -3732,6 +3745,7 @@ Status DB::OpenWithColumnFamilies(
// Handles create_if_missing, error_if_exists
s = impl - > Recover ( column_families ) ;
if ( s . ok ( ) ) {
lfile - > SetPreallocationBlockSize ( 1.1 * max_write_buffer_size ) ;
uint64_t new_log_number = impl - > versions_ - > NewFileNumber ( ) ;
unique_ptr < WritableFile > lfile ;
soptions . use_mmap_writes = false ;
@ -3741,7 +3755,6 @@ Status DB::OpenWithColumnFamilies(
soptions
) ;
if ( s . ok ( ) ) {
lfile - > SetPreallocationBlockSize ( 1.1 * impl - > options_ . write_buffer_size ) ;
VersionEdit edit ;
impl - > logfile_number_ = new_log_number ;
impl - > log_ . reset ( new log : : Writer ( std : : move ( lfile ) ) ) ;