@ -464,10 +464,6 @@ DBImpl::~DBImpl() {
LogFlush ( options_ . info_log ) ;
}
uint64_t DBImpl : : TEST_Current_Manifest_FileNo ( ) {
return versions_ - > ManifestFileNumber ( ) ;
}
Status DBImpl : : NewDB ( ) {
VersionEdit new_db ;
new_db . SetLogNumber ( 0 ) ;
@ -771,6 +767,7 @@ void DBImpl::DeleteObsoleteFiles() {
}
}
# ifndef ROCKSDB_LITE
// 1. Go through all archived files and
// a. if ttl is enabled, delete outdated files
// b. if archive size limit is enabled, delete empty files,
@ -895,6 +892,169 @@ void DBImpl::PurgeObsoleteWALFiles() {
}
}
Status DBImpl : : GetSortedWalsOfType ( const std : : string & path ,
VectorLogPtr & log_files ,
WalFileType log_type ) {
std : : vector < std : : string > all_files ;
const Status status = env_ - > GetChildren ( path , & all_files ) ;
if ( ! status . ok ( ) ) {
return status ;
}
log_files . reserve ( all_files . size ( ) ) ;
for ( const auto & f : all_files ) {
uint64_t number ;
FileType type ;
if ( ParseFileName ( f , & number , & type ) & & type = = kLogFile ) {
WriteBatch batch ;
Status s = ReadFirstRecord ( log_type , number , & batch ) ;
if ( ! s . ok ( ) ) {
if ( CheckWalFileExistsAndEmpty ( log_type , number ) ) {
continue ;
}
return s ;
}
uint64_t size_bytes ;
s = env_ - > GetFileSize ( LogFileName ( path , number ) , & size_bytes ) ;
if ( ! s . ok ( ) ) {
return s ;
}
log_files . push_back ( std : : move ( unique_ptr < LogFile > (
new LogFileImpl ( number , log_type ,
WriteBatchInternal : : Sequence ( & batch ) , size_bytes ) ) ) ) ;
}
}
CompareLogByPointer compare_log_files ;
std : : sort ( log_files . begin ( ) , log_files . end ( ) , compare_log_files ) ;
return status ;
}
Status DBImpl : : RetainProbableWalFiles ( VectorLogPtr & all_logs ,
const SequenceNumber target ) {
int64_t start = 0 ; // signed to avoid overflow when target is < first file.
int64_t end = static_cast < int64_t > ( all_logs . size ( ) ) - 1 ;
// Binary Search. avoid opening all files.
while ( end > = start ) {
int64_t mid = start + ( end - start ) / 2 ; // Avoid overflow.
SequenceNumber current_seq_num = all_logs . at ( mid ) - > StartSequence ( ) ;
if ( current_seq_num = = target ) {
end = mid ;
break ;
} else if ( current_seq_num < target ) {
start = mid + 1 ;
} else {
end = mid - 1 ;
}
}
size_t start_index = std : : max ( 0l , end ) ; // end could be -ve.
// The last wal file is always included
all_logs . erase ( all_logs . begin ( ) , all_logs . begin ( ) + start_index ) ;
return Status : : OK ( ) ;
}
bool DBImpl : : CheckWalFileExistsAndEmpty ( const WalFileType type ,
const uint64_t number ) {
const std : : string fname = ( type = = kAliveLogFile )
? LogFileName ( options_ . wal_dir , number )
: ArchivedLogFileName ( options_ . wal_dir , number ) ;
uint64_t file_size ;
Status s = env_ - > GetFileSize ( fname , & file_size ) ;
return ( s . ok ( ) & & ( file_size = = 0 ) ) ;
}
Status DBImpl : : ReadFirstRecord ( const WalFileType type , const uint64_t number ,
WriteBatch * const result ) {
if ( type = = kAliveLogFile ) {
std : : string fname = LogFileName ( options_ . wal_dir , number ) ;
Status status = ReadFirstLine ( fname , result ) ;
if ( status . ok ( ) | | env_ - > FileExists ( fname ) ) {
// return OK or any error that is not caused non-existing file
return status ;
}
// check if the file got moved to archive.
std : : string archived_file = ArchivedLogFileName ( options_ . wal_dir , number ) ;
Status s = ReadFirstLine ( archived_file , result ) ;
if ( s . ok ( ) | | env_ - > FileExists ( archived_file ) ) {
return s ;
}
return Status : : NotFound ( " Log File has been deleted: " + archived_file ) ;
} else if ( type = = kArchivedLogFile ) {
std : : string fname = ArchivedLogFileName ( options_ . wal_dir , number ) ;
Status status = ReadFirstLine ( fname , result ) ;
return status ;
}
return Status : : NotSupported ( " File Type Not Known: " + std : : to_string ( type ) ) ;
}
Status DBImpl : : ReadFirstLine ( const std : : string & fname ,
WriteBatch * const batch ) {
struct LogReporter : public log : : Reader : : Reporter {
Env * env ;
Logger * info_log ;
const char * fname ;
Status * status ;
bool ignore_error ; // true if options_.paranoid_checks==false
virtual void Corruption ( size_t bytes , const Status & s ) {
Log ( info_log , " %s%s: dropping %d bytes; %s " ,
( this - > ignore_error ? " (ignoring error) " : " " ) , fname ,
static_cast < int > ( bytes ) , s . ToString ( ) . c_str ( ) ) ;
if ( this - > status - > ok ( ) ) {
// only keep the first error
* this - > status = s ;
}
}
} ;
unique_ptr < SequentialFile > file ;
Status status = env_ - > NewSequentialFile ( fname , & file , storage_options_ ) ;
if ( ! status . ok ( ) ) {
return status ;
}
LogReporter reporter ;
reporter . env = env_ ;
reporter . info_log = options_ . info_log . get ( ) ;
reporter . fname = fname . c_str ( ) ;
reporter . status = & status ;
reporter . ignore_error = ! options_ . paranoid_checks ;
log : : Reader reader ( std : : move ( file ) , & reporter , true /*checksum*/ ,
0 /*initial_offset*/ ) ;
std : : string scratch ;
Slice record ;
if ( reader . ReadRecord ( & record , & scratch ) & &
( status . ok ( ) | | ! options_ . paranoid_checks ) ) {
if ( record . size ( ) < 12 ) {
reporter . Corruption ( record . size ( ) ,
Status : : Corruption ( " log record too small " ) ) ;
// TODO read record's till the first no corrupt entry?
} else {
WriteBatchInternal : : SetContents ( batch , record ) ;
return Status : : OK ( ) ;
}
}
// ReadRecord returns false on EOF, which is deemed as OK() by Reader
if ( status . ok ( ) ) {
status = Status : : Corruption ( " eof reached " ) ;
}
return status ;
}
struct CompareLogByPointer {
bool operator ( ) ( const unique_ptr < LogFile > & a , const unique_ptr < LogFile > & b ) {
LogFileImpl * a_impl = dynamic_cast < LogFileImpl * > ( a . get ( ) ) ;
LogFileImpl * b_impl = dynamic_cast < LogFileImpl * > ( b . get ( ) ) ;
return * a_impl < * b_impl ;
}
} ;
# endif // ROCKSDB_LITE
Status DBImpl : : Recover (
const std : : vector < ColumnFamilyDescriptor > & column_families , bool read_only ,
bool error_if_log_file_exist ) {
@ -1533,198 +1693,6 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const {
return versions_ - > LastSequence ( ) ;
}
Status DBImpl : : GetUpdatesSince (
SequenceNumber seq , unique_ptr < TransactionLogIterator > * iter ,
const TransactionLogIterator : : ReadOptions & read_options ) {
RecordTick ( options_ . statistics . get ( ) , GET_UPDATES_SINCE_CALLS ) ;
if ( seq > versions_ - > LastSequence ( ) ) {
return Status : : NotFound (
" Requested sequence not yet written in the db " ) ;
}
// Get all sorted Wal Files.
// Do binary search and open files and find the seq number.
std : : unique_ptr < VectorLogPtr > wal_files ( new VectorLogPtr ) ;
Status s = GetSortedWalFiles ( * wal_files ) ;
if ( ! s . ok ( ) ) {
return s ;
}
s = RetainProbableWalFiles ( * wal_files , seq ) ;
if ( ! s . ok ( ) ) {
return s ;
}
iter - > reset ( new TransactionLogIteratorImpl ( options_ . wal_dir , & options_ ,
read_options , storage_options_ ,
seq , std : : move ( wal_files ) , this ) ) ;
return ( * iter ) - > status ( ) ;
}
Status DBImpl : : RetainProbableWalFiles ( VectorLogPtr & all_logs ,
const SequenceNumber target ) {
long start = 0 ; // signed to avoid overflow when target is < first file.
long end = static_cast < long > ( all_logs . size ( ) ) - 1 ;
// Binary Search. avoid opening all files.
while ( end > = start ) {
long mid = start + ( end - start ) / 2 ; // Avoid overflow.
SequenceNumber current_seq_num = all_logs . at ( mid ) - > StartSequence ( ) ;
if ( current_seq_num = = target ) {
end = mid ;
break ;
} else if ( current_seq_num < target ) {
start = mid + 1 ;
} else {
end = mid - 1 ;
}
}
size_t start_index = std : : max ( 0l , end ) ; // end could be -ve.
// The last wal file is always included
all_logs . erase ( all_logs . begin ( ) , all_logs . begin ( ) + start_index ) ;
return Status : : OK ( ) ;
}
bool DBImpl : : CheckWalFileExistsAndEmpty ( const WalFileType type ,
const uint64_t number ) {
const std : : string fname = ( type = = kAliveLogFile ) ?
LogFileName ( options_ . wal_dir , number ) :
ArchivedLogFileName ( options_ . wal_dir , number ) ;
uint64_t file_size ;
Status s = env_ - > GetFileSize ( fname , & file_size ) ;
return ( s . ok ( ) & & ( file_size = = 0 ) ) ;
}
Status DBImpl : : ReadFirstRecord ( const WalFileType type , const uint64_t number ,
WriteBatch * const result ) {
if ( type = = kAliveLogFile ) {
std : : string fname = LogFileName ( options_ . wal_dir , number ) ;
Status status = ReadFirstLine ( fname , result ) ;
if ( status . ok ( ) | | env_ - > FileExists ( fname ) ) {
// return OK or any error that is not caused non-existing file
return status ;
}
// check if the file got moved to archive.
std : : string archived_file =
ArchivedLogFileName ( options_ . wal_dir , number ) ;
Status s = ReadFirstLine ( archived_file , result ) ;
if ( s . ok ( ) | | env_ - > FileExists ( archived_file ) ) {
return s ;
}
return Status : : NotFound ( " Log File has been deleted: " + archived_file ) ;
} else if ( type = = kArchivedLogFile ) {
std : : string fname = ArchivedLogFileName ( options_ . wal_dir , number ) ;
Status status = ReadFirstLine ( fname , result ) ;
return status ;
}
return Status : : NotSupported ( " File Type Not Known: " + std : : to_string ( type ) ) ;
}
Status DBImpl : : ReadFirstLine ( const std : : string & fname ,
WriteBatch * const batch ) {
struct LogReporter : public log : : Reader : : Reporter {
Env * env ;
Logger * info_log ;
const char * fname ;
Status * status ;
bool ignore_error ; // true if options_.paranoid_checks==false
virtual void Corruption ( size_t bytes , const Status & s ) {
Log ( info_log , " %s%s: dropping %d bytes; %s " ,
( this - > ignore_error ? " (ignoring error) " : " " ) ,
fname , static_cast < int > ( bytes ) , s . ToString ( ) . c_str ( ) ) ;
if ( this - > status - > ok ( ) ) {
// only keep the first error
* this - > status = s ;
}
}
} ;
unique_ptr < SequentialFile > file ;
Status status = env_ - > NewSequentialFile ( fname , & file , storage_options_ ) ;
if ( ! status . ok ( ) ) {
return status ;
}
LogReporter reporter ;
reporter . env = env_ ;
reporter . info_log = options_ . info_log . get ( ) ;
reporter . fname = fname . c_str ( ) ;
reporter . status = & status ;
reporter . ignore_error = ! options_ . paranoid_checks ;
log : : Reader reader ( std : : move ( file ) , & reporter , true /*checksum*/ ,
0 /*initial_offset*/ ) ;
std : : string scratch ;
Slice record ;
if ( reader . ReadRecord ( & record , & scratch ) & &
( status . ok ( ) | | ! options_ . paranoid_checks ) ) {
if ( record . size ( ) < 12 ) {
reporter . Corruption (
record . size ( ) , Status : : Corruption ( " log record too small " ) ) ;
// TODO read record's till the first no corrupt entry?
} else {
WriteBatchInternal : : SetContents ( batch , record ) ;
return Status : : OK ( ) ;
}
}
// ReadRecord returns false on EOF, which is deemed as OK() by Reader
if ( status . ok ( ) ) {
status = Status : : Corruption ( " eof reached " ) ;
}
return status ;
}
struct CompareLogByPointer {
bool operator ( ) ( const unique_ptr < LogFile > & a ,
const unique_ptr < LogFile > & b ) {
LogFileImpl * a_impl = dynamic_cast < LogFileImpl * > ( a . get ( ) ) ;
LogFileImpl * b_impl = dynamic_cast < LogFileImpl * > ( b . get ( ) ) ;
return * a_impl < * b_impl ;
}
} ;
Status DBImpl : : GetSortedWalsOfType ( const std : : string & path ,
VectorLogPtr & log_files , WalFileType log_type ) {
std : : vector < std : : string > all_files ;
const Status status = env_ - > GetChildren ( path , & all_files ) ;
if ( ! status . ok ( ) ) {
return status ;
}
log_files . reserve ( all_files . size ( ) ) ;
for ( const auto & f : all_files ) {
uint64_t number ;
FileType type ;
if ( ParseFileName ( f , & number , & type ) & & type = = kLogFile ) {
WriteBatch batch ;
Status s = ReadFirstRecord ( log_type , number , & batch ) ;
if ( ! s . ok ( ) ) {
if ( CheckWalFileExistsAndEmpty ( log_type , number ) ) {
continue ;
}
return s ;
}
uint64_t size_bytes ;
s = env_ - > GetFileSize ( LogFileName ( path , number ) , & size_bytes ) ;
if ( ! s . ok ( ) ) {
return s ;
}
log_files . push_back ( std : : move ( unique_ptr < LogFile > ( new LogFileImpl (
number , log_type , WriteBatchInternal : : Sequence ( & batch ) , size_bytes ) ) ) ) ;
}
}
CompareLogByPointer compare_log_files ;
std : : sort ( log_files . begin ( ) , log_files . end ( ) , compare_log_files ) ;
return status ;
}
Status DBImpl : : RunManualCompaction ( ColumnFamilyData * cfd , int input_level ,
int output_level , const Slice * begin ,
const Slice * end ) {
@ -1798,23 +1766,6 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
return manual . status ;
}
Status DBImpl : : TEST_CompactRange ( int level , const Slice * begin ,
const Slice * end ,
ColumnFamilyHandle * column_family ) {
ColumnFamilyData * cfd ;
if ( column_family = = nullptr ) {
cfd = default_cf_handle_ - > cfd ( ) ;
} else {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
cfd = cfh - > cfd ( ) ;
}
int output_level =
( cfd - > options ( ) - > compaction_style = = kCompactionStyleUniversal )
? level
: level + 1 ;
return RunManualCompaction ( cfd , level , output_level , begin , end ) ;
}
Status DBImpl : : FlushMemTable ( ColumnFamilyData * cfd ,
const FlushOptions & options ) {
// nullptr batch means just wait for earlier writes to be done
@ -1839,38 +1790,6 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
return s ;
}
Status DBImpl : : TEST_FlushMemTable ( bool wait ) {
FlushOptions fo ;
fo . wait = wait ;
return FlushMemTable ( default_cf_handle_ - > cfd ( ) , fo ) ;
}
Status DBImpl : : TEST_WaitForFlushMemTable ( ColumnFamilyHandle * column_family ) {
ColumnFamilyData * cfd ;
if ( column_family = = nullptr ) {
cfd = default_cf_handle_ - > cfd ( ) ;
} else {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
cfd = cfh - > cfd ( ) ;
}
return WaitForFlushMemTable ( cfd ) ;
}
Status DBImpl : : TEST_WaitForCompact ( ) {
// Wait until the compaction completes
// TODO: a bug here. This function actually does not necessarily
// wait for compact. It actually waits for scheduled compaction
// OR flush to finish.
MutexLock l ( & mutex_ ) ;
while ( ( bg_compaction_scheduled_ | | bg_flush_scheduled_ ) & &
bg_error_ . ok ( ) ) {
bg_cv_ . Wait ( ) ;
}
return bg_error_ ;
}
void DBImpl : : MaybeScheduleFlushOrCompaction ( ) {
mutex_ . AssertHeld ( ) ;
bg_schedule_needed_ = false ;
@ -2026,16 +1945,6 @@ void DBImpl::BackgroundCallFlush() {
}
}
void DBImpl : : TEST_PurgeObsoleteteWAL ( ) {
PurgeObsoleteWALFiles ( ) ;
}
uint64_t DBImpl : : TEST_GetLevel0TotalSize ( ) {
MutexLock l ( & mutex_ ) ;
return default_cf_handle_ - > cfd ( ) - > current ( ) - > NumLevelBytes ( 0 ) ;
}
void DBImpl : : BackgroundCallCompaction ( ) {
bool madeProgress = false ;
DeletionState deletion_state ( true ) ;
@ -3222,36 +3131,6 @@ ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
return default_cf_handle_ ;
}
Iterator * DBImpl : : TEST_NewInternalIterator ( ColumnFamilyHandle * column_family ) {
ColumnFamilyData * cfd ;
if ( column_family = = nullptr ) {
cfd = default_cf_handle_ - > cfd ( ) ;
} else {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
cfd = cfh - > cfd ( ) ;
}
mutex_ . Lock ( ) ;
SuperVersion * super_version = cfd - > GetSuperVersion ( ) - > Ref ( ) ;
mutex_ . Unlock ( ) ;
ReadOptions roptions ;
roptions . prefix_seek = true ;
return NewInternalIterator ( roptions , cfd , super_version ) ;
}
int64_t DBImpl : : TEST_MaxNextLevelOverlappingBytes (
ColumnFamilyHandle * column_family ) {
ColumnFamilyData * cfd ;
if ( column_family = = nullptr ) {
cfd = default_cf_handle_ - > cfd ( ) ;
} else {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
cfd = cfh - > cfd ( ) ;
}
MutexLock l ( & mutex_ ) ;
return cfd - > current ( ) - > MaxNextLevelOverlappingBytes ( ) ;
}
Status DBImpl : : Get ( const ReadOptions & options ,
ColumnFamilyHandle * column_family , const Slice & key ,
std : : string * value ) {
@ -3590,7 +3469,12 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
Iterator * iter ;
if ( options . tailing ) {
# ifdef ROCKSDB_LITE
// not supported in lite version
return nullptr ;
# else
iter = new TailingIterator ( env_ , this , options , cfd ) ;
# endif
} else {
SequenceNumber latest_snapshot = versions_ - > LastSequence ( ) ;
SuperVersion * sv = nullptr ;
@ -3642,10 +3526,15 @@ Status DBImpl::NewIterators(
}
if ( options . tailing ) {
# ifdef ROCKSDB_LITE
return Status : : InvalidArgument (
" Tailing interator not supported in RocksDB lite " ) ;
# else
for ( auto cfh : column_families ) {
auto cfd = reinterpret_cast < ColumnFamilyHandleImpl * > ( cfh ) - > cfd ( ) ;
iterators - > push_back ( new TailingIterator ( env_ , this , options , cfd ) ) ;
}
# endif
} else {
for ( size_t i = 0 ; i < column_families . size ( ) ; + + i ) {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_families [ i ] ) ;
@ -4132,6 +4021,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
return s ;
}
# ifndef ROCKSDB_LITE
Status DBImpl : : GetPropertiesOfAllTables ( ColumnFamilyHandle * column_family ,
TablePropertiesCollection * props ) {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
@ -4152,6 +4042,7 @@ Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
return s ;
}
# endif // ROCKSDB_LITE
const std : : string & DBImpl : : GetName ( ) const {
return dbname_ ;
@ -4211,6 +4102,34 @@ inline void DBImpl::DelayLoggingAndReset() {
}
}
# ifndef ROCKSDB_LITE
Status DBImpl : : GetUpdatesSince (
SequenceNumber seq , unique_ptr < TransactionLogIterator > * iter ,
const TransactionLogIterator : : ReadOptions & read_options ) {
RecordTick ( options_ . statistics . get ( ) , GET_UPDATES_SINCE_CALLS ) ;
if ( seq > versions_ - > LastSequence ( ) ) {
return Status : : NotFound ( " Requested sequence not yet written in the db " ) ;
}
// Get all sorted Wal Files.
// Do binary search and open files and find the seq number.
std : : unique_ptr < VectorLogPtr > wal_files ( new VectorLogPtr ) ;
Status s = GetSortedWalFiles ( * wal_files ) ;
if ( ! s . ok ( ) ) {
return s ;
}
s = RetainProbableWalFiles ( * wal_files , seq ) ;
if ( ! s . ok ( ) ) {
return s ;
}
iter - > reset ( new TransactionLogIteratorImpl ( options_ . wal_dir , & options_ ,
read_options , storage_options_ ,
seq , std : : move ( wal_files ) , this ) ) ;
return ( * iter ) - > status ( ) ;
}
Status DBImpl : : DeleteFile ( std : : string name ) {
uint64_t number ;
FileType type ;
@ -4294,6 +4213,7 @@ void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
MutexLock l ( & mutex_ ) ;
versions_ - > GetLiveFilesMetaData ( metadata ) ;
}
# endif // ROCKSDB_LITE
Status DBImpl : : CheckConsistency ( ) {
mutex_ . AssertHeld ( ) ;
@ -4322,23 +4242,6 @@ Status DBImpl::CheckConsistency() {
}
}
void DBImpl : : TEST_GetFilesMetaData (
ColumnFamilyHandle * column_family ,
std : : vector < std : : vector < FileMetaData > > * metadata ) {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
auto cfd = cfh - > cfd ( ) ;
MutexLock l ( & mutex_ ) ;
metadata - > resize ( NumberLevels ( ) ) ;
for ( int level = 0 ; level < NumberLevels ( ) ; level + + ) {
const std : : vector < FileMetaData * > & files = cfd - > current ( ) - > files_ [ level ] ;
( * metadata ) [ level ] . clear ( ) ;
for ( const auto & f : files ) {
( * metadata ) [ level ] . push_back ( * f ) ;
}
}
}
Status DBImpl : : GetDbIdentity ( std : : string & identity ) {
std : : string idfilename = IdentityFileName ( dbname_ ) ;
unique_ptr < SequentialFile > idfile ;