@ -379,7 +379,6 @@ class DBImpl : public DB {
WriteBatch * my_batch ,
WriteBatch * my_batch ,
WriteCallback * callback ) ;
WriteCallback * callback ) ;
// Returns the sequence number that is guaranteed to be smaller than or equal
// Returns the sequence number that is guaranteed to be smaller than or equal
// to the sequence number of any key that could be inserted into the current
// to the sequence number of any key that could be inserted into the current
// memtables. It can then be assumed that any write with a larger(or equal)
// memtables. It can then be assumed that any write with a larger(or equal)
@ -1007,7 +1006,10 @@ class DBImpl : public DB {
friend class DBBlobIndexTest ;
friend class DBBlobIndexTest ;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test ;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test ;
# endif
# endif
struct CompactionState ;
struct CompactionState ;
struct PrepickedCompaction ;
struct PurgeFileInfo ;
struct WriteContext {
struct WriteContext {
SuperVersionContext superversion_context ;
SuperVersionContext superversion_context ;
@ -1024,8 +1026,138 @@ class DBImpl : public DB {
}
}
} ;
} ;
struct PrepickedCompaction ;
// Class to maintain directories for all database paths other than main one.
struct PurgeFileInfo ;
class Directories {
public :
Status SetDirectories ( Env * env , const std : : string & dbname ,
const std : : string & wal_dir ,
const std : : vector < DbPath > & data_paths ) ;
Directory * GetDataDir ( size_t path_id ) const ;
Directory * GetWalDir ( ) {
if ( wal_dir_ ) {
return wal_dir_ . get ( ) ;
}
return db_dir_ . get ( ) ;
}
Directory * GetDbDir ( ) { return db_dir_ . get ( ) ; }
private :
std : : unique_ptr < Directory > db_dir_ ;
std : : vector < std : : unique_ptr < Directory > > data_dirs_ ;
std : : unique_ptr < Directory > wal_dir_ ;
} ;
struct LogFileNumberSize {
explicit LogFileNumberSize ( uint64_t _number ) : number ( _number ) { }
void AddSize ( uint64_t new_size ) { size + = new_size ; }
uint64_t number ;
uint64_t size = 0 ;
bool getting_flushed = false ;
} ;
struct LogWriterNumber {
// pass ownership of _writer
LogWriterNumber ( uint64_t _number , log : : Writer * _writer )
: number ( _number ) , writer ( _writer ) { }
log : : Writer * ReleaseWriter ( ) {
auto * w = writer ;
writer = nullptr ;
return w ;
}
Status ClearWriter ( ) {
Status s = writer - > WriteBuffer ( ) ;
delete writer ;
writer = nullptr ;
return s ;
}
uint64_t number ;
// Visual Studio doesn't support deque's member to be noncopyable because
// of a std::unique_ptr as a member.
log : : Writer * writer ; // own
// true for some prefix of logs_
bool getting_synced = false ;
} ;
// PurgeFileInfo is a structure to hold information of files to be deleted in
// purge_queue_
struct PurgeFileInfo {
std : : string fname ;
std : : string dir_to_sync ;
FileType type ;
uint64_t number ;
int job_id ;
PurgeFileInfo ( std : : string fn , std : : string d , FileType t , uint64_t num ,
int jid )
: fname ( fn ) , dir_to_sync ( d ) , type ( t ) , number ( num ) , job_id ( jid ) { }
} ;
// Argument required by background flush thread.
struct BGFlushArg {
BGFlushArg ( )
: cfd_ ( nullptr ) , max_memtable_id_ ( 0 ) , superversion_context_ ( nullptr ) { }
BGFlushArg ( ColumnFamilyData * cfd , uint64_t max_memtable_id ,
SuperVersionContext * superversion_context )
: cfd_ ( cfd ) ,
max_memtable_id_ ( max_memtable_id ) ,
superversion_context_ ( superversion_context ) { }
// Column family to flush.
ColumnFamilyData * cfd_ ;
// Maximum ID of memtable to flush. In this column family, memtables with
// IDs smaller than this value must be flushed before this flush completes.
uint64_t max_memtable_id_ ;
// Pointer to a SuperVersionContext object. After flush completes, RocksDB
// installs a new superversion for the column family. This operation
// requires a SuperVersionContext object (currently embedded in JobContext).
SuperVersionContext * superversion_context_ ;
} ;
// Argument passed to flush thread.
struct FlushThreadArg {
DBImpl * db_ ;
Env : : Priority thread_pri_ ;
} ;
// Information for a manual compaction
struct ManualCompactionState {
ColumnFamilyData * cfd ;
int input_level ;
int output_level ;
uint32_t output_path_id ;
Status status ;
bool done ;
bool in_progress ; // compaction request being processed?
bool incomplete ; // only part of requested range compacted
bool exclusive ; // current behavior of only one manual
bool disallow_trivial_move ; // Force actual compaction to run
const InternalKey * begin ; // nullptr means beginning of key range
const InternalKey * end ; // nullptr means end of key range
InternalKey * manual_end ; // how far we are compacting
InternalKey tmp_storage ; // Used to keep track of compaction progress
InternalKey tmp_storage1 ; // Used to keep track of compaction progress
} ;
struct PrepickedCompaction {
// background compaction takes ownership of `compaction`.
Compaction * compaction ;
// caller retains ownership of `manual_compaction_state` as it is reused
// across background compactions.
ManualCompactionState * manual_compaction_state ; // nullptr if non-manual
// task limiter token is requested during compaction picking.
std : : unique_ptr < TaskLimiterToken > task_token ;
} ;
struct CompactionArg {
// caller retains ownership of `db`.
DBImpl * db ;
// background compaction takes ownership of `prepicked_compaction`.
PrepickedCompaction * prepicked_compaction ;
} ;
Status ResumeImpl ( ) ;
Status ResumeImpl ( ) ;
@ -1079,34 +1211,6 @@ class DBImpl : public DB {
SnapshotChecker * snapshot_checker , LogBuffer * log_buffer ,
SnapshotChecker * snapshot_checker , LogBuffer * log_buffer ,
Env : : Priority thread_pri ) ;
Env : : Priority thread_pri ) ;
// Argument required by background flush thread.
struct BGFlushArg {
BGFlushArg ( )
: cfd_ ( nullptr ) , max_memtable_id_ ( 0 ) , superversion_context_ ( nullptr ) { }
BGFlushArg ( ColumnFamilyData * cfd , uint64_t max_memtable_id ,
SuperVersionContext * superversion_context )
: cfd_ ( cfd ) ,
max_memtable_id_ ( max_memtable_id ) ,
superversion_context_ ( superversion_context ) { }
// Column family to flush.
ColumnFamilyData * cfd_ ;
// Maximum ID of memtable to flush. In this column family, memtables with
// IDs smaller than this value must be flushed before this flush completes.
uint64_t max_memtable_id_ ;
// Pointer to a SuperVersionContext object. After flush completes, RocksDB
// installs a new superversion for the column family. This operation
// requires a SuperVersionContext object (currently embedded in JobContext).
SuperVersionContext * superversion_context_ ;
} ;
// Argument passed to flush thread.
struct FlushThreadArg {
DBImpl * db_ ;
Env : : Priority thread_pri_ ;
} ;
// Flush the memtables of (multiple) column families to multiple files on
// Flush the memtables of (multiple) column families to multiple files on
// persistent storage.
// persistent storage.
Status FlushMemTablesToOutputFiles (
Status FlushMemTablesToOutputFiles (
@ -1345,6 +1449,57 @@ class DBImpl : public DB {
void WaitForBackgroundWork ( ) ;
void WaitForBackgroundWork ( ) ;
// No copying allowed
DBImpl ( const DBImpl & ) ;
void operator = ( const DBImpl & ) ;
// Background threads call this function, which is just a wrapper around
// the InstallSuperVersion() function. Background threads carry
// sv_context which can have new_superversion already
// allocated.
// All ColumnFamily state changes go through this function. Here we analyze
// the new state and we schedule background work if we detect that the new
// state needs flush or compaction.
void InstallSuperVersionAndScheduleWork (
ColumnFamilyData * cfd , SuperVersionContext * sv_context ,
const MutableCFOptions & mutable_cf_options ) ;
bool GetIntPropertyInternal ( ColumnFamilyData * cfd ,
const DBPropertyInfo & property_info ,
bool is_locked , uint64_t * value ) ;
bool GetPropertyHandleOptionsStatistics ( std : : string * value ) ;
bool HasPendingManualCompaction ( ) ;
bool HasExclusiveManualCompaction ( ) ;
void AddManualCompaction ( ManualCompactionState * m ) ;
void RemoveManualCompaction ( ManualCompactionState * m ) ;
bool ShouldntRunManualCompaction ( ManualCompactionState * m ) ;
bool HaveManualCompaction ( ColumnFamilyData * cfd ) ;
bool MCOverlap ( ManualCompactionState * m , ManualCompactionState * m1 ) ;
# ifndef ROCKSDB_LITE
void BuildCompactionJobInfo ( const ColumnFamilyData * cfd , Compaction * c ,
const Status & st ,
const CompactionJobStats & compaction_job_stats ,
const int job_id , const Version * current ,
CompactionJobInfo * compaction_job_info ) const ;
// Reserve the next 'num' file numbers for to-be-ingested external SST files,
// and return the current file_number in 'next_file_number'.
// Write a version edit to the MANIFEST.
Status ReserveFileNumbersBeforeIngestion (
ColumnFamilyData * cfd , uint64_t num ,
std : : list < uint64_t > : : iterator * pending_output_elem ,
uint64_t * next_file_number ) ;
# endif //! ROCKSDB_LITE
bool ShouldPurge ( uint64_t file_number ) const ;
void MarkAsGrabbedForPurge ( uint64_t file_number ) ;
size_t GetWalPreallocateBlockSize ( uint64_t write_buffer_size ) const ;
Env : : WriteLifeTimeHint CalculateWALWriteHint ( ) { return Env : : WLTH_SHORT ; }
Status CreateWAL ( uint64_t log_file_num , uint64_t recycle_log_number ,
size_t preallocate_block_size , log : : Writer * * new_log ) ;
// table_cache_ provides its own synchronization
// table_cache_ provides its own synchronization
std : : shared_ptr < Cache > table_cache_ ;
std : : shared_ptr < Cache > table_cache_ ;
@ -1390,37 +1545,7 @@ class DBImpl : public DB {
// expesnive mutex_ lock during WAL write, which update log_empty_.
// expesnive mutex_ lock during WAL write, which update log_empty_.
bool log_empty_ ;
bool log_empty_ ;
struct LogFileNumberSize {
explicit LogFileNumberSize ( uint64_t _number ) : number ( _number ) { }
void AddSize ( uint64_t new_size ) { size + = new_size ; }
uint64_t number ;
uint64_t size = 0 ;
bool getting_flushed = false ;
} ;
struct LogWriterNumber {
// pass ownership of _writer
LogWriterNumber ( uint64_t _number , log : : Writer * _writer )
: number ( _number ) , writer ( _writer ) { }
log : : Writer * ReleaseWriter ( ) {
auto * w = writer ;
writer = nullptr ;
return w ;
}
Status ClearWriter ( ) {
Status s = writer - > WriteBuffer ( ) ;
delete writer ;
writer = nullptr ;
return s ;
}
uint64_t number ;
// Visual Studio doesn't support deque's member to be noncopyable because
// of a std::unique_ptr as a member.
log : : Writer * writer ; // own
// true for some prefix of logs_
bool getting_synced = false ;
} ;
// Without two_write_queues, read and writes to alive_log_files_ are
// Without two_write_queues, read and writes to alive_log_files_ are
// protected by mutex_. However since back() is never popped, and push_back()
// protected by mutex_. However since back() is never popped, and push_back()
// is done only from write_thread_, the same thread can access the item
// is done only from write_thread_, the same thread can access the item
@ -1467,30 +1592,6 @@ class DBImpl : public DB {
bool stats_slice_initialized_ = false ;
bool stats_slice_initialized_ = false ;
// Class to maintain directories for all database paths other than main one.
class Directories {
public :
Status SetDirectories ( Env * env , const std : : string & dbname ,
const std : : string & wal_dir ,
const std : : vector < DbPath > & data_paths ) ;
Directory * GetDataDir ( size_t path_id ) const ;
Directory * GetWalDir ( ) {
if ( wal_dir_ ) {
return wal_dir_ . get ( ) ;
}
return db_dir_ . get ( ) ;
}
Directory * GetDbDir ( ) { return db_dir_ . get ( ) ; }
private :
std : : unique_ptr < Directory > db_dir_ ;
std : : vector < std : : unique_ptr < Directory > > data_dirs_ ;
std : : unique_ptr < Directory > wal_dir_ ;
} ;
Directories directories_ ;
Directories directories_ ;
WriteBufferManager * write_buffer_manager_ ;
WriteBufferManager * write_buffer_manager_ ;
@ -1526,19 +1627,6 @@ class DBImpl : public DB {
// State is protected with db mutex.
// State is protected with db mutex.
std : : list < uint64_t > pending_outputs_ ;
std : : list < uint64_t > pending_outputs_ ;
// PurgeFileInfo is a structure to hold information of files to be deleted in
// purge_queue_
struct PurgeFileInfo {
std : : string fname ;
std : : string dir_to_sync ;
FileType type ;
uint64_t number ;
int job_id ;
PurgeFileInfo ( std : : string fn , std : : string d , FileType t , uint64_t num ,
int jid )
: fname ( fn ) , dir_to_sync ( d ) , type ( t ) , number ( num ) , job_id ( jid ) { }
} ;
// flush_queue_ and compaction_queue_ hold column families that we need to
// flush_queue_ and compaction_queue_ hold column families that we need to
// flush and compact, respectively.
// flush and compact, respectively.
// A column family is inserted into flush_queue_ when it satisfies condition
// A column family is inserted into flush_queue_ when it satisfies condition
@ -1595,42 +1683,8 @@ class DBImpl : public DB {
// number of background obsolete file purge jobs, submitted to the HIGH pool
// number of background obsolete file purge jobs, submitted to the HIGH pool
int bg_purge_scheduled_ ;
int bg_purge_scheduled_ ;
// Information for a manual compaction
struct ManualCompactionState {
ColumnFamilyData * cfd ;
int input_level ;
int output_level ;
uint32_t output_path_id ;
Status status ;
bool done ;
bool in_progress ; // compaction request being processed?
bool incomplete ; // only part of requested range compacted
bool exclusive ; // current behavior of only one manual
bool disallow_trivial_move ; // Force actual compaction to run
const InternalKey * begin ; // nullptr means beginning of key range
const InternalKey * end ; // nullptr means end of key range
InternalKey * manual_end ; // how far we are compacting
InternalKey tmp_storage ; // Used to keep track of compaction progress
InternalKey tmp_storage1 ; // Used to keep track of compaction progress
} ;
struct PrepickedCompaction {
// background compaction takes ownership of `compaction`.
Compaction * compaction ;
// caller retains ownership of `manual_compaction_state` as it is reused
// across background compactions.
ManualCompactionState * manual_compaction_state ; // nullptr if non-manual
// task limiter token is requested during compaction picking.
std : : unique_ptr < TaskLimiterToken > task_token ;
} ;
std : : deque < ManualCompactionState * > manual_compaction_dequeue_ ;
std : : deque < ManualCompactionState * > manual_compaction_dequeue_ ;
struct CompactionArg {
// caller retains ownership of `db`.
DBImpl * db ;
// background compaction takes ownership of `prepicked_compaction`.
PrepickedCompaction * prepicked_compaction ;
} ;
// shall we disable deletion of obsolete files
// shall we disable deletion of obsolete files
// if 0 the deletion is enabled.
// if 0 the deletion is enabled.
// if non-zero, files will not be getting deleted
// if non-zero, files will not be getting deleted
@ -1726,58 +1780,6 @@ class DBImpl : public DB {
// REQUIRES: mutex locked
// REQUIRES: mutex locked
std : : unique_ptr < rocksdb : : RepeatableThread > thread_persist_stats_ ;
std : : unique_ptr < rocksdb : : RepeatableThread > thread_persist_stats_ ;
// No copying allowed
DBImpl ( const DBImpl & ) ;
void operator = ( const DBImpl & ) ;
// Background threads call this function, which is just a wrapper around
// the InstallSuperVersion() function. Background threads carry
// sv_context which can have new_superversion already
// allocated.
// All ColumnFamily state changes go through this function. Here we analyze
// the new state and we schedule background work if we detect that the new
// state needs flush or compaction.
void InstallSuperVersionAndScheduleWork (
ColumnFamilyData * cfd , SuperVersionContext * sv_context ,
const MutableCFOptions & mutable_cf_options ) ;
bool GetIntPropertyInternal ( ColumnFamilyData * cfd ,
const DBPropertyInfo & property_info ,
bool is_locked , uint64_t * value ) ;
bool GetPropertyHandleOptionsStatistics ( std : : string * value ) ;
bool HasPendingManualCompaction ( ) ;
bool HasExclusiveManualCompaction ( ) ;
void AddManualCompaction ( ManualCompactionState * m ) ;
void RemoveManualCompaction ( ManualCompactionState * m ) ;
bool ShouldntRunManualCompaction ( ManualCompactionState * m ) ;
bool HaveManualCompaction ( ColumnFamilyData * cfd ) ;
bool MCOverlap ( ManualCompactionState * m , ManualCompactionState * m1 ) ;
# ifndef ROCKSDB_LITE
void BuildCompactionJobInfo ( const ColumnFamilyData * cfd , Compaction * c ,
const Status & st ,
const CompactionJobStats & compaction_job_stats ,
const int job_id , const Version * current ,
CompactionJobInfo * compaction_job_info ) const ;
// Reserve the next 'num' file numbers for to-be-ingested external SST files,
// and return the current file_number in 'next_file_number'.
// Write a version edit to the MANIFEST.
Status ReserveFileNumbersBeforeIngestion (
ColumnFamilyData * cfd , uint64_t num ,
std : : list < uint64_t > : : iterator * pending_output_elem ,
uint64_t * next_file_number ) ;
# endif //! ROCKSDB_LITE
bool ShouldPurge ( uint64_t file_number ) const ;
void MarkAsGrabbedForPurge ( uint64_t file_number ) ;
size_t GetWalPreallocateBlockSize ( uint64_t write_buffer_size ) const ;
Env : : WriteLifeTimeHint CalculateWALWriteHint ( ) { return Env : : WLTH_SHORT ; }
Status CreateWAL ( uint64_t log_file_num , uint64_t recycle_log_number ,
size_t preallocate_block_size , log : : Writer * * new_log ) ;
// When set, we use a separate queue for writes that dont write to memtable.
// When set, we use a separate queue for writes that dont write to memtable.
// In 2PC these are the writes at Prepare phase.
// In 2PC these are the writes at Prepare phase.
const bool two_write_queues_ ;
const bool two_write_queues_ ;