@ -75,16 +75,30 @@ struct JobContext;
struct ExternalSstFileInfo ;
struct MemTableInfo ;
// While DB is the public interface of RocksDB, and DBImpl is the actual
// class implementing it. It's the entrance of the core RocksdB engine.
// All other DB implementations, e.g. TransactionDB, BlobDB, etc, wrap a
// DBImpl internally.
// Other than functions implementing the DB interface, some public
// functions are there for other internal components to call. For
// example, TransactionDB directly calls DBImpl::WriteImpl() and
// BlobDB directly calls DBImpl::GetImpl(). Some other functions
// are for sub-components to call. For example, ColumnFamilyHandleImpl
// calls DBImpl::FindObsoleteFiles().
//
// Since it's a very large class, the definition of the functions is
// divided in several db_impl_*.cc files, besides db_impl.cc.
class DBImpl : public DB {
public :
DBImpl ( const DBOptions & options , const std : : string & dbname ,
const bool seq_per_batch = false , const bool batch_per_txn = true ) ;
virtual ~ DBImpl ( ) ;
// ---- Implementations of the DB interface ----
using DB : : Resume ;
virtual Status Resume ( ) override ;
// Implementations of the DB interface
using DB : : Put ;
virtual Status Put ( const WriteOptions & options ,
ColumnFamilyHandle * column_family , const Slice & key ,
@ -110,13 +124,6 @@ class DBImpl : public DB {
ColumnFamilyHandle * column_family , const Slice & key ,
PinnableSlice * value ) override ;
// Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here
Status GetImpl ( const ReadOptions & options , ColumnFamilyHandle * column_family ,
const Slice & key , PinnableSlice * value ,
bool * value_found = nullptr , ReadCallback * callback = nullptr ,
bool * is_blob_index = nullptr ) ;
using DB : : MultiGet ;
virtual std : : vector < Status > MultiGet (
const ReadOptions & options ,
@ -174,12 +181,6 @@ class DBImpl : public DB {
const ReadOptions & options ,
const std : : vector < ColumnFamilyHandle * > & column_families ,
std : : vector < Iterator * > * iterators ) override ;
ArenaWrappedDBIter * NewIteratorImpl ( const ReadOptions & options ,
ColumnFamilyData * cfd ,
SequenceNumber snapshot ,
ReadCallback * read_callback ,
bool allow_blob = false ,
bool allow_refresh = true ) ;
virtual const Snapshot * GetSnapshot ( ) override ;
virtual void ReleaseSnapshot ( const Snapshot * snapshot ) override ;
@ -259,23 +260,19 @@ class DBImpl : public DB {
virtual Status UnlockWAL ( ) override ;
virtual SequenceNumber GetLatestSequenceNumber ( ) const override ;
virtual SequenceNumber GetLastPublishedSequence ( ) const {
if ( last_seq_same_as_publish_seq_ ) {
return versions_ - > LastSequence ( ) ;
} else {
return versions_ - > LastPublishedSequence ( ) ;
}
}
// REQUIRES: joined the main write queue if two_write_queues is disabled, and
// the second write queue otherwise.
virtual void SetLastPublishedSequence ( SequenceNumber seq ) ;
// Returns LastSequence in last_seq_same_as_publish_seq_
// mode and LastAllocatedSequence otherwise. This is useful when visiblility
// depends also on data written to the WAL but not to the memtable.
SequenceNumber TEST_GetLastVisibleSequence ( ) const ;
virtual bool SetPreserveDeletesSequenceNumber ( SequenceNumber seqnum ) override ;
virtual Status GetDbIdentity ( std : : string & identity ) const override ;
ColumnFamilyHandle * DefaultColumnFamily ( ) const override ;
virtual Status Close ( ) override ;
Status GetStatsHistory (
uint64_t start_time , uint64_t end_time ,
std : : unique_ptr < StatsHistoryIterator > * stats_iterator ) override ;
# ifndef ROCKSDB_LITE
using DB : : ResetStats ;
virtual Status ResetStats ( ) override ;
@ -313,12 +310,76 @@ class DBImpl : public DB {
Status PromoteL0 ( ColumnFamilyHandle * column_family ,
int target_level ) override ;
using DB : : IngestExternalFile ;
virtual Status IngestExternalFile (
ColumnFamilyHandle * column_family ,
const std : : vector < std : : string > & external_files ,
const IngestExternalFileOptions & ingestion_options ) override ;
using DB : : IngestExternalFiles ;
virtual Status IngestExternalFiles (
const std : : vector < IngestExternalFileArg > & args ) override ;
virtual Status VerifyChecksum ( ) override ;
using DB : : StartTrace ;
virtual Status StartTrace (
const TraceOptions & options ,
std : : unique_ptr < TraceWriter > & & trace_writer ) override ;
using DB : : EndTrace ;
virtual Status EndTrace ( ) override ;
using DB : : GetPropertiesOfAllTables ;
virtual Status GetPropertiesOfAllTables (
ColumnFamilyHandle * column_family ,
TablePropertiesCollection * props ) override ;
virtual Status GetPropertiesOfTablesInRange (
ColumnFamilyHandle * column_family , const Range * range , std : : size_t n ,
TablePropertiesCollection * props ) override ;
# endif // ROCKSDB_LITE
// ---- End of implementations of the DB interface ----
// Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here
Status GetImpl ( const ReadOptions & options , ColumnFamilyHandle * column_family ,
const Slice & key , PinnableSlice * value ,
bool * value_found = nullptr , ReadCallback * callback = nullptr ,
bool * is_blob_index = nullptr ) ;
ArenaWrappedDBIter * NewIteratorImpl ( const ReadOptions & options ,
ColumnFamilyData * cfd ,
SequenceNumber snapshot ,
ReadCallback * read_callback ,
bool allow_blob = false ,
bool allow_refresh = true ) ;
virtual SequenceNumber GetLastPublishedSequence ( ) const {
if ( last_seq_same_as_publish_seq_ ) {
return versions_ - > LastSequence ( ) ;
} else {
return versions_ - > LastPublishedSequence ( ) ;
}
}
// REQUIRES: joined the main write queue if two_write_queues is disabled, and
// the second write queue otherwise.
virtual void SetLastPublishedSequence ( SequenceNumber seq ) ;
// Returns LastSequence in last_seq_same_as_publish_seq_
// mode and LastAllocatedSequence otherwise. This is useful when visiblility
// depends also on data written to the WAL but not to the memtable.
SequenceNumber TEST_GetLastVisibleSequence ( ) const ;
# ifndef ROCKSDB_LITE
// Similar to Write() but will call the callback once on the single write
// thread to determine whether it is safe to perform the write.
virtual Status WriteWithCallback ( const WriteOptions & write_options ,
WriteBatch * my_batch ,
WriteCallback * callback ) ;
// Returns the sequence number that is guaranteed to be smaller than or equal
// to the sequence number of any key that could be inserted into the current
// memtables. It can then be assumed that any write with a larger(or equal)
@ -360,25 +421,6 @@ class DBImpl : public DB {
bool * found_record_for_key ,
bool * is_blob_index = nullptr ) ;
using DB : : IngestExternalFile ;
virtual Status IngestExternalFile (
ColumnFamilyHandle * column_family ,
const std : : vector < std : : string > & external_files ,
const IngestExternalFileOptions & ingestion_options ) override ;
using DB : : IngestExternalFiles ;
virtual Status IngestExternalFiles (
const std : : vector < IngestExternalFileArg > & args ) override ;
virtual Status VerifyChecksum ( ) override ;
using DB : : StartTrace ;
virtual Status StartTrace (
const TraceOptions & options ,
std : : unique_ptr < TraceWriter > & & trace_writer ) override ;
using DB : : EndTrace ;
virtual Status EndTrace ( ) override ;
Status TraceIteratorSeek ( const uint32_t & cf_id , const Slice & key ) ;
Status TraceIteratorSeekForPrev ( const uint32_t & cf_id , const Slice & key ) ;
# endif // ROCKSDB_LITE
@ -393,8 +435,6 @@ class DBImpl : public DB {
// match to our in-memory records
virtual Status CheckConsistency ( ) ;
virtual Status GetDbIdentity ( std : : string & identity ) const override ;
// max_file_num_to_ignore allows bottom level compaction to filter out newly
// compacted SST files. Setting max_file_num_to_ignore to kMaxUint64 will
// disable the filtering
@ -416,102 +456,6 @@ class DBImpl : public DB {
return & logs_with_prep_tracker_ ;
}
# ifndef NDEBUG
// Extra methods (for testing) that are not in the public DB interface
// Implemented in db_impl_debug.cc
// Compact any files in the named level that overlap [*begin, *end]
Status TEST_CompactRange ( int level , const Slice * begin , const Slice * end ,
ColumnFamilyHandle * column_family = nullptr ,
bool disallow_trivial_move = false ) ;
void TEST_SwitchWAL ( ) ;
bool TEST_UnableToReleaseOldestLog ( ) { return unable_to_release_oldest_log_ ; }
bool TEST_IsLogGettingFlushed ( ) {
return alive_log_files_ . begin ( ) - > getting_flushed ;
}
Status TEST_SwitchMemtable ( ColumnFamilyData * cfd = nullptr ) ;
// Force current memtable contents to be flushed.
Status TEST_FlushMemTable ( bool wait = true , bool allow_write_stall = false ,
ColumnFamilyHandle * cfh = nullptr ) ;
// Wait for memtable compaction
Status TEST_WaitForFlushMemTable ( ColumnFamilyHandle * column_family = nullptr ) ;
// Wait for any compaction
// We add a bool parameter to wait for unscheduledCompactions_ == 0, but this
// is only for the special test of CancelledCompactions
Status TEST_WaitForCompact ( bool waitUnscheduled = false ) ;
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes (
ColumnFamilyHandle * column_family = nullptr ) ;
// Return the current manifest file no.
uint64_t TEST_Current_Manifest_FileNo ( ) ;
// Returns the number that'll be assigned to the next file that's created.
uint64_t TEST_Current_Next_FileNo ( ) ;
// get total level0 file size. Only for testing.
uint64_t TEST_GetLevel0TotalSize ( ) ;
void TEST_GetFilesMetaData ( ColumnFamilyHandle * column_family ,
std : : vector < std : : vector < FileMetaData > > * metadata ) ;
void TEST_LockMutex ( ) ;
void TEST_UnlockMutex ( ) ;
// REQUIRES: mutex locked
void * TEST_BeginWrite ( ) ;
// REQUIRES: mutex locked
// pass the pointer that you got from TEST_BeginWrite()
void TEST_EndWrite ( void * w ) ;
uint64_t TEST_MaxTotalInMemoryState ( ) const {
return max_total_in_memory_state_ ;
}
size_t TEST_LogsToFreeSize ( ) ;
uint64_t TEST_LogfileNumber ( ) ;
uint64_t TEST_total_log_size ( ) const { return total_log_size_ ; }
// Returns column family name to ImmutableCFOptions map.
Status TEST_GetAllImmutableCFOptions (
std : : unordered_map < std : : string , const ImmutableCFOptions * > * iopts_map ) ;
// Return the lastest MutableCFOptions of a column family
Status TEST_GetLatestMutableCFOptions ( ColumnFamilyHandle * column_family ,
MutableCFOptions * mutable_cf_options ) ;
Cache * TEST_table_cache ( ) { return table_cache_ . get ( ) ; }
WriteController & TEST_write_controler ( ) { return write_controller_ ; }
uint64_t TEST_FindMinLogContainingOutstandingPrep ( ) ;
uint64_t TEST_FindMinPrepLogReferencedByMemTable ( ) ;
size_t TEST_PreparedSectionCompletedSize ( ) ;
size_t TEST_LogsWithPrepSize ( ) ;
int TEST_BGCompactionsAllowed ( ) const ;
int TEST_BGFlushesAllowed ( ) const ;
size_t TEST_GetWalPreallocateBlockSize ( uint64_t write_buffer_size ) const ;
void TEST_WaitForDumpStatsRun ( std : : function < void ( ) > callback ) const ;
void TEST_WaitForPersistStatsRun ( std : : function < void ( ) > callback ) const ;
bool TEST_IsPersistentStatsEnabled ( ) const ;
size_t TEST_EstiamteStatsHistorySize ( ) const ;
# endif // NDEBUG
struct BGJobLimits {
int max_flushes ;
int max_compactions ;
@ -555,12 +499,15 @@ class DBImpl : public DB {
void PurgeObsoleteFiles ( JobContext & background_contet ,
bool schedule_only = false ) ;
// Schedule a background job to actually delete obsolete files.
void SchedulePurge ( ) ;
ColumnFamilyHandle * DefaultColumnFamily ( ) const override ;
const SnapshotList & snapshots ( ) const { return snapshots_ ; }
// load list of snapshots to `snap_vector` that is no newer than `max_seq`
// in ascending order.
// `oldest_write_conflict_snapshot` is filled with the oldest snapshot
// which satisfies SnapshotImpl.is_write_conflict_boundary_ = true.
void LoadSnapshots ( std : : vector < SequenceNumber > * snap_vector ,
SequenceNumber * oldest_write_conflict_snapshot ,
const SequenceNumber & max_seq ) const {
@ -572,6 +519,10 @@ class DBImpl : public DB {
return immutable_db_options_ ;
}
// Cancel all background jobs, including flush, compaction, background
// purging, stats dumping threads, etc. If `wait` = true, wait for the
// running jobs to abort or finish before returning. Otherwise, only
// sends the signals.
void CancelAllBackgroundWork ( bool wait ) ;
// Find Super version and reference it. Based on options, it might return
@ -748,6 +699,8 @@ class DBImpl : public DB {
InstrumentedMutex * mutex ( ) const { return & mutex_ ; }
// Initialize a brand new DB. The DB directory is expected to be empty before
// calling it.
Status NewDB ( ) ;
// This is to be used only by internal rocksdb classes.
@ -756,21 +709,109 @@ class DBImpl : public DB {
std : : vector < ColumnFamilyHandle * > * handles , DB * * dbptr ,
const bool seq_per_batch , const bool batch_per_txn ) ;
virtual Status Close ( ) override ;
static Status CreateAndNewDirectory ( Env * env , const std : : string & dirname ,
std : : unique_ptr < Directory > * directory ) ;
Status GetStatsHistory (
uint64_t start_time , uint64_t end_time ,
std : : unique_ptr < StatsHistoryIterator > * stats_iterator ) override ;
// find stats map from stats_history_ with smallest timestamp in
// the range of [start_time, end_time)
bool FindStatsByTime ( uint64_t start_time , uint64_t end_time ,
uint64_t * new_time ,
std : : map < std : : string , uint64_t > * stats_map ) ;
# ifndef NDEBUG
// Compact any files in the named level that overlap [*begin, *end]
Status TEST_CompactRange ( int level , const Slice * begin , const Slice * end ,
ColumnFamilyHandle * column_family = nullptr ,
bool disallow_trivial_move = false ) ;
void TEST_SwitchWAL ( ) ;
bool TEST_UnableToReleaseOldestLog ( ) { return unable_to_release_oldest_log_ ; }
bool TEST_IsLogGettingFlushed ( ) {
return alive_log_files_ . begin ( ) - > getting_flushed ;
}
Status TEST_SwitchMemtable ( ColumnFamilyData * cfd = nullptr ) ;
// Force current memtable contents to be flushed.
Status TEST_FlushMemTable ( bool wait = true , bool allow_write_stall = false ,
ColumnFamilyHandle * cfh = nullptr ) ;
// Wait for memtable compaction
Status TEST_WaitForFlushMemTable ( ColumnFamilyHandle * column_family = nullptr ) ;
// Wait for any compaction
// We add a bool parameter to wait for unscheduledCompactions_ == 0, but this
// is only for the special test of CancelledCompactions
Status TEST_WaitForCompact ( bool waitUnscheduled = false ) ;
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes (
ColumnFamilyHandle * column_family = nullptr ) ;
// Return the current manifest file no.
uint64_t TEST_Current_Manifest_FileNo ( ) ;
// Returns the number that'll be assigned to the next file that's created.
uint64_t TEST_Current_Next_FileNo ( ) ;
// get total level0 file size. Only for testing.
uint64_t TEST_GetLevel0TotalSize ( ) ;
void TEST_GetFilesMetaData ( ColumnFamilyHandle * column_family ,
std : : vector < std : : vector < FileMetaData > > * metadata ) ;
void TEST_LockMutex ( ) ;
void TEST_UnlockMutex ( ) ;
// REQUIRES: mutex locked
void * TEST_BeginWrite ( ) ;
// REQUIRES: mutex locked
// pass the pointer that you got from TEST_BeginWrite()
void TEST_EndWrite ( void * w ) ;
uint64_t TEST_MaxTotalInMemoryState ( ) const {
return max_total_in_memory_state_ ;
}
size_t TEST_LogsToFreeSize ( ) ;
uint64_t TEST_LogfileNumber ( ) ;
uint64_t TEST_total_log_size ( ) const { return total_log_size_ ; }
// Returns column family name to ImmutableCFOptions map.
Status TEST_GetAllImmutableCFOptions (
std : : unordered_map < std : : string , const ImmutableCFOptions * > * iopts_map ) ;
// Return the lastest MutableCFOptions of a column family
Status TEST_GetLatestMutableCFOptions ( ColumnFamilyHandle * column_family ,
MutableCFOptions * mutable_cf_options ) ;
Cache * TEST_table_cache ( ) { return table_cache_ . get ( ) ; }
WriteController & TEST_write_controler ( ) { return write_controller_ ; }
uint64_t TEST_FindMinLogContainingOutstandingPrep ( ) ;
uint64_t TEST_FindMinPrepLogReferencedByMemTable ( ) ;
size_t TEST_PreparedSectionCompletedSize ( ) ;
size_t TEST_LogsWithPrepSize ( ) ;
int TEST_BGCompactionsAllowed ( ) const ;
int TEST_BGFlushesAllowed ( ) const ;
size_t TEST_GetWalPreallocateBlockSize ( uint64_t write_buffer_size ) const ;
void TEST_WaitForDumpStatsRun ( std : : function < void ( ) > callback ) const ;
void TEST_WaitForPersistStatsRun ( std : : function < void ( ) > callback ) const ;
bool TEST_IsPersistentStatsEnabled ( ) const ;
size_t TEST_EstiamteStatsHistorySize ( ) const ;
# endif // NDEBUG
protected :
Env * const env_ ;
const std : : string dbname_ ;
@ -1700,16 +1741,6 @@ class DBImpl : public DB {
ColumnFamilyData * cfd , SuperVersionContext * sv_context ,
const MutableCFOptions & mutable_cf_options ) ;
# ifndef ROCKSDB_LITE
using DB : : GetPropertiesOfAllTables ;
virtual Status GetPropertiesOfAllTables (
ColumnFamilyHandle * column_family ,
TablePropertiesCollection * props ) override ;
virtual Status GetPropertiesOfTablesInRange (
ColumnFamilyHandle * column_family , const Range * range , std : : size_t n ,
TablePropertiesCollection * props ) override ;
# endif // ROCKSDB_LITE
bool GetIntPropertyInternal ( ColumnFamilyData * cfd ,
const DBPropertyInfo & property_info ,