@ -349,6 +349,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
shutting_down_ ( nullptr ) ,
shutting_down_ ( nullptr ) ,
bg_cv_ ( & mutex_ ) ,
bg_cv_ ( & mutex_ ) ,
logfile_number_ ( 0 ) ,
logfile_number_ ( 0 ) ,
log_empty_ ( true ) ,
default_cf_handle_ ( nullptr ) ,
default_cf_handle_ ( nullptr ) ,
tmp_batch_ ( ) ,
tmp_batch_ ( ) ,
bg_schedule_needed_ ( false ) ,
bg_schedule_needed_ ( false ) ,
@ -463,10 +464,6 @@ DBImpl::~DBImpl() {
LogFlush ( options_ . info_log ) ;
LogFlush ( options_ . info_log ) ;
}
}
uint64_t DBImpl : : TEST_Current_Manifest_FileNo ( ) {
return versions_ - > ManifestFileNumber ( ) ;
}
Status DBImpl : : NewDB ( ) {
Status DBImpl : : NewDB ( ) {
VersionEdit new_db ;
VersionEdit new_db ;
new_db . SetLogNumber ( 0 ) ;
new_db . SetLogNumber ( 0 ) ;
@ -770,6 +767,7 @@ void DBImpl::DeleteObsoleteFiles() {
}
}
}
}
# ifndef ROCKSDB_LITE
// 1. Go through all archived files and
// 1. Go through all archived files and
// a. if ttl is enabled, delete outdated files
// a. if ttl is enabled, delete outdated files
// b. if archive size limit is enabled, delete empty files,
// b. if archive size limit is enabled, delete empty files,
@ -894,11 +892,178 @@ void DBImpl::PurgeObsoleteWALFiles() {
}
}
}
}
namespace {
struct CompareLogByPointer {
bool operator ( ) ( const unique_ptr < LogFile > & a , const unique_ptr < LogFile > & b ) {
LogFileImpl * a_impl = dynamic_cast < LogFileImpl * > ( a . get ( ) ) ;
LogFileImpl * b_impl = dynamic_cast < LogFileImpl * > ( b . get ( ) ) ;
return * a_impl < * b_impl ;
}
} ;
}
Status DBImpl : : GetSortedWalsOfType ( const std : : string & path ,
VectorLogPtr & log_files ,
WalFileType log_type ) {
std : : vector < std : : string > all_files ;
const Status status = env_ - > GetChildren ( path , & all_files ) ;
if ( ! status . ok ( ) ) {
return status ;
}
log_files . reserve ( all_files . size ( ) ) ;
for ( const auto & f : all_files ) {
uint64_t number ;
FileType type ;
if ( ParseFileName ( f , & number , & type ) & & type = = kLogFile ) {
WriteBatch batch ;
Status s = ReadFirstRecord ( log_type , number , & batch ) ;
if ( ! s . ok ( ) ) {
if ( CheckWalFileExistsAndEmpty ( log_type , number ) ) {
continue ;
}
return s ;
}
uint64_t size_bytes ;
s = env_ - > GetFileSize ( LogFileName ( path , number ) , & size_bytes ) ;
if ( ! s . ok ( ) ) {
return s ;
}
log_files . push_back ( std : : move ( unique_ptr < LogFile > (
new LogFileImpl ( number , log_type ,
WriteBatchInternal : : Sequence ( & batch ) , size_bytes ) ) ) ) ;
}
}
CompareLogByPointer compare_log_files ;
std : : sort ( log_files . begin ( ) , log_files . end ( ) , compare_log_files ) ;
return status ;
}
Status DBImpl : : RetainProbableWalFiles ( VectorLogPtr & all_logs ,
const SequenceNumber target ) {
int64_t start = 0 ; // signed to avoid overflow when target is < first file.
int64_t end = static_cast < int64_t > ( all_logs . size ( ) ) - 1 ;
// Binary Search. avoid opening all files.
while ( end > = start ) {
int64_t mid = start + ( end - start ) / 2 ; // Avoid overflow.
SequenceNumber current_seq_num = all_logs . at ( mid ) - > StartSequence ( ) ;
if ( current_seq_num = = target ) {
end = mid ;
break ;
} else if ( current_seq_num < target ) {
start = mid + 1 ;
} else {
end = mid - 1 ;
}
}
// end could be -ve.
size_t start_index = std : : max ( static_cast < int64_t > ( 0 ) , end ) ;
// The last wal file is always included
all_logs . erase ( all_logs . begin ( ) , all_logs . begin ( ) + start_index ) ;
return Status : : OK ( ) ;
}
bool DBImpl : : CheckWalFileExistsAndEmpty ( const WalFileType type ,
const uint64_t number ) {
const std : : string fname = ( type = = kAliveLogFile )
? LogFileName ( options_ . wal_dir , number )
: ArchivedLogFileName ( options_ . wal_dir , number ) ;
uint64_t file_size ;
Status s = env_ - > GetFileSize ( fname , & file_size ) ;
return ( s . ok ( ) & & ( file_size = = 0 ) ) ;
}
Status DBImpl : : ReadFirstRecord ( const WalFileType type , const uint64_t number ,
WriteBatch * const result ) {
if ( type = = kAliveLogFile ) {
std : : string fname = LogFileName ( options_ . wal_dir , number ) ;
Status status = ReadFirstLine ( fname , result ) ;
if ( status . ok ( ) | | env_ - > FileExists ( fname ) ) {
// return OK or any error that is not caused non-existing file
return status ;
}
// check if the file got moved to archive.
std : : string archived_file = ArchivedLogFileName ( options_ . wal_dir , number ) ;
Status s = ReadFirstLine ( archived_file , result ) ;
if ( s . ok ( ) | | env_ - > FileExists ( archived_file ) ) {
return s ;
}
return Status : : NotFound ( " Log File has been deleted: " + archived_file ) ;
} else if ( type = = kArchivedLogFile ) {
std : : string fname = ArchivedLogFileName ( options_ . wal_dir , number ) ;
Status status = ReadFirstLine ( fname , result ) ;
return status ;
}
return Status : : NotSupported ( " File Type Not Known: " + std : : to_string ( type ) ) ;
}
Status DBImpl : : ReadFirstLine ( const std : : string & fname ,
WriteBatch * const batch ) {
struct LogReporter : public log : : Reader : : Reporter {
Env * env ;
Logger * info_log ;
const char * fname ;
Status * status ;
bool ignore_error ; // true if options_.paranoid_checks==false
virtual void Corruption ( size_t bytes , const Status & s ) {
Log ( info_log , " %s%s: dropping %d bytes; %s " ,
( this - > ignore_error ? " (ignoring error) " : " " ) , fname ,
static_cast < int > ( bytes ) , s . ToString ( ) . c_str ( ) ) ;
if ( this - > status - > ok ( ) ) {
// only keep the first error
* this - > status = s ;
}
}
} ;
unique_ptr < SequentialFile > file ;
Status status = env_ - > NewSequentialFile ( fname , & file , storage_options_ ) ;
if ( ! status . ok ( ) ) {
return status ;
}
LogReporter reporter ;
reporter . env = env_ ;
reporter . info_log = options_ . info_log . get ( ) ;
reporter . fname = fname . c_str ( ) ;
reporter . status = & status ;
reporter . ignore_error = ! options_ . paranoid_checks ;
log : : Reader reader ( std : : move ( file ) , & reporter , true /*checksum*/ ,
0 /*initial_offset*/ ) ;
std : : string scratch ;
Slice record ;
if ( reader . ReadRecord ( & record , & scratch ) & &
( status . ok ( ) | | ! options_ . paranoid_checks ) ) {
if ( record . size ( ) < 12 ) {
reporter . Corruption ( record . size ( ) ,
Status : : Corruption ( " log record too small " ) ) ;
// TODO read record's till the first no corrupt entry?
} else {
WriteBatchInternal : : SetContents ( batch , record ) ;
return Status : : OK ( ) ;
}
}
// ReadRecord returns false on EOF, which is deemed as OK() by Reader
if ( status . ok ( ) ) {
status = Status : : Corruption ( " eof reached " ) ;
}
return status ;
}
# endif // ROCKSDB_LITE
Status DBImpl : : Recover (
Status DBImpl : : Recover (
const std : : vector < ColumnFamilyDescriptor > & column_families , bool read_only ,
const std : : vector < ColumnFamilyDescriptor > & column_families , bool read_only ,
bool error_if_log_file_exist ) {
bool error_if_log_file_exist ) {
mutex_ . AssertHeld ( ) ;
mutex_ . AssertHeld ( ) ;
bool is_new_db = false ;
assert ( db_lock_ = = nullptr ) ;
assert ( db_lock_ = = nullptr ) ;
if ( ! read_only ) {
if ( ! read_only ) {
// We call CreateDirIfMissing() as the directory may already exist (if we
// We call CreateDirIfMissing() as the directory may already exist (if we
@ -927,6 +1092,7 @@ Status DBImpl::Recover(
if ( options_ . create_if_missing ) {
if ( options_ . create_if_missing ) {
// TODO: add merge_operator name check
// TODO: add merge_operator name check
s = NewDB ( ) ;
s = NewDB ( ) ;
is_new_db = true ;
if ( ! s . ok ( ) ) {
if ( ! s . ok ( ) ) {
return s ;
return s ;
}
}
@ -977,10 +1143,15 @@ Status DBImpl::Recover(
for ( size_t i = 0 ; i < filenames . size ( ) ; i + + ) {
for ( size_t i = 0 ; i < filenames . size ( ) ; i + + ) {
uint64_t number ;
uint64_t number ;
FileType type ;
FileType type ;
if ( ParseFileName ( filenames [ i ] , & number , & type )
if ( ParseFileName ( filenames [ i ] , & number , & type ) & & type = = kLogFile ) {
& & type = = kLogFile
if ( is_new_db ) {
& & ( ( number > = min_log ) | | ( number = = prev_log ) ) ) {
return Status : : Corruption (
logs . push_back ( number ) ;
" While creating a new Db, wal_dir contains "
" existing log file: " ,
filenames [ i ] ) ;
} else if ( ( number > = min_log ) | | ( number = = prev_log ) ) {
logs . push_back ( number ) ;
}
}
}
}
}
@ -1532,198 +1703,6 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const {
return versions_ - > LastSequence ( ) ;
return versions_ - > LastSequence ( ) ;
}
}
Status DBImpl : : GetUpdatesSince (
SequenceNumber seq , unique_ptr < TransactionLogIterator > * iter ,
const TransactionLogIterator : : ReadOptions & read_options ) {
RecordTick ( options_ . statistics . get ( ) , GET_UPDATES_SINCE_CALLS ) ;
if ( seq > versions_ - > LastSequence ( ) ) {
return Status : : NotFound (
" Requested sequence not yet written in the db " ) ;
}
// Get all sorted Wal Files.
// Do binary search and open files and find the seq number.
std : : unique_ptr < VectorLogPtr > wal_files ( new VectorLogPtr ) ;
Status s = GetSortedWalFiles ( * wal_files ) ;
if ( ! s . ok ( ) ) {
return s ;
}
s = RetainProbableWalFiles ( * wal_files , seq ) ;
if ( ! s . ok ( ) ) {
return s ;
}
iter - > reset ( new TransactionLogIteratorImpl ( options_ . wal_dir , & options_ ,
read_options , storage_options_ ,
seq , std : : move ( wal_files ) , this ) ) ;
return ( * iter ) - > status ( ) ;
}
Status DBImpl : : RetainProbableWalFiles ( VectorLogPtr & all_logs ,
const SequenceNumber target ) {
long start = 0 ; // signed to avoid overflow when target is < first file.
long end = static_cast < long > ( all_logs . size ( ) ) - 1 ;
// Binary Search. avoid opening all files.
while ( end > = start ) {
long mid = start + ( end - start ) / 2 ; // Avoid overflow.
SequenceNumber current_seq_num = all_logs . at ( mid ) - > StartSequence ( ) ;
if ( current_seq_num = = target ) {
end = mid ;
break ;
} else if ( current_seq_num < target ) {
start = mid + 1 ;
} else {
end = mid - 1 ;
}
}
size_t start_index = std : : max ( 0l , end ) ; // end could be -ve.
// The last wal file is always included
all_logs . erase ( all_logs . begin ( ) , all_logs . begin ( ) + start_index ) ;
return Status : : OK ( ) ;
}
bool DBImpl : : CheckWalFileExistsAndEmpty ( const WalFileType type ,
const uint64_t number ) {
const std : : string fname = ( type = = kAliveLogFile ) ?
LogFileName ( options_ . wal_dir , number ) :
ArchivedLogFileName ( options_ . wal_dir , number ) ;
uint64_t file_size ;
Status s = env_ - > GetFileSize ( fname , & file_size ) ;
return ( s . ok ( ) & & ( file_size = = 0 ) ) ;
}
Status DBImpl : : ReadFirstRecord ( const WalFileType type , const uint64_t number ,
WriteBatch * const result ) {
if ( type = = kAliveLogFile ) {
std : : string fname = LogFileName ( options_ . wal_dir , number ) ;
Status status = ReadFirstLine ( fname , result ) ;
if ( status . ok ( ) | | env_ - > FileExists ( fname ) ) {
// return OK or any error that is not caused non-existing file
return status ;
}
// check if the file got moved to archive.
std : : string archived_file =
ArchivedLogFileName ( options_ . wal_dir , number ) ;
Status s = ReadFirstLine ( archived_file , result ) ;
if ( s . ok ( ) | | env_ - > FileExists ( archived_file ) ) {
return s ;
}
return Status : : NotFound ( " Log File has been deleted: " + archived_file ) ;
} else if ( type = = kArchivedLogFile ) {
std : : string fname = ArchivedLogFileName ( options_ . wal_dir , number ) ;
Status status = ReadFirstLine ( fname , result ) ;
return status ;
}
return Status : : NotSupported ( " File Type Not Known: " + std : : to_string ( type ) ) ;
}
Status DBImpl : : ReadFirstLine ( const std : : string & fname ,
WriteBatch * const batch ) {
struct LogReporter : public log : : Reader : : Reporter {
Env * env ;
Logger * info_log ;
const char * fname ;
Status * status ;
bool ignore_error ; // true if options_.paranoid_checks==false
virtual void Corruption ( size_t bytes , const Status & s ) {
Log ( info_log , " %s%s: dropping %d bytes; %s " ,
( this - > ignore_error ? " (ignoring error) " : " " ) ,
fname , static_cast < int > ( bytes ) , s . ToString ( ) . c_str ( ) ) ;
if ( this - > status - > ok ( ) ) {
// only keep the first error
* this - > status = s ;
}
}
} ;
unique_ptr < SequentialFile > file ;
Status status = env_ - > NewSequentialFile ( fname , & file , storage_options_ ) ;
if ( ! status . ok ( ) ) {
return status ;
}
LogReporter reporter ;
reporter . env = env_ ;
reporter . info_log = options_ . info_log . get ( ) ;
reporter . fname = fname . c_str ( ) ;
reporter . status = & status ;
reporter . ignore_error = ! options_ . paranoid_checks ;
log : : Reader reader ( std : : move ( file ) , & reporter , true /*checksum*/ ,
0 /*initial_offset*/ ) ;
std : : string scratch ;
Slice record ;
if ( reader . ReadRecord ( & record , & scratch ) & &
( status . ok ( ) | | ! options_ . paranoid_checks ) ) {
if ( record . size ( ) < 12 ) {
reporter . Corruption (
record . size ( ) , Status : : Corruption ( " log record too small " ) ) ;
// TODO read record's till the first no corrupt entry?
} else {
WriteBatchInternal : : SetContents ( batch , record ) ;
return Status : : OK ( ) ;
}
}
// ReadRecord returns false on EOF, which is deemed as OK() by Reader
if ( status . ok ( ) ) {
status = Status : : Corruption ( " eof reached " ) ;
}
return status ;
}
struct CompareLogByPointer {
bool operator ( ) ( const unique_ptr < LogFile > & a ,
const unique_ptr < LogFile > & b ) {
LogFileImpl * a_impl = dynamic_cast < LogFileImpl * > ( a . get ( ) ) ;
LogFileImpl * b_impl = dynamic_cast < LogFileImpl * > ( b . get ( ) ) ;
return * a_impl < * b_impl ;
}
} ;
Status DBImpl : : GetSortedWalsOfType ( const std : : string & path ,
VectorLogPtr & log_files , WalFileType log_type ) {
std : : vector < std : : string > all_files ;
const Status status = env_ - > GetChildren ( path , & all_files ) ;
if ( ! status . ok ( ) ) {
return status ;
}
log_files . reserve ( all_files . size ( ) ) ;
for ( const auto & f : all_files ) {
uint64_t number ;
FileType type ;
if ( ParseFileName ( f , & number , & type ) & & type = = kLogFile ) {
WriteBatch batch ;
Status s = ReadFirstRecord ( log_type , number , & batch ) ;
if ( ! s . ok ( ) ) {
if ( CheckWalFileExistsAndEmpty ( log_type , number ) ) {
continue ;
}
return s ;
}
uint64_t size_bytes ;
s = env_ - > GetFileSize ( LogFileName ( path , number ) , & size_bytes ) ;
if ( ! s . ok ( ) ) {
return s ;
}
log_files . push_back ( std : : move ( unique_ptr < LogFile > ( new LogFileImpl (
number , log_type , WriteBatchInternal : : Sequence ( & batch ) , size_bytes ) ) ) ) ;
}
}
CompareLogByPointer compare_log_files ;
std : : sort ( log_files . begin ( ) , log_files . end ( ) , compare_log_files ) ;
return status ;
}
Status DBImpl : : RunManualCompaction ( ColumnFamilyData * cfd , int input_level ,
Status DBImpl : : RunManualCompaction ( ColumnFamilyData * cfd , int input_level ,
int output_level , const Slice * begin ,
int output_level , const Slice * begin ,
const Slice * end ) {
const Slice * end ) {
@ -1797,23 +1776,6 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
return manual . status ;
return manual . status ;
}
}
Status DBImpl : : TEST_CompactRange ( int level , const Slice * begin ,
const Slice * end ,
ColumnFamilyHandle * column_family ) {
ColumnFamilyData * cfd ;
if ( column_family = = nullptr ) {
cfd = default_cf_handle_ - > cfd ( ) ;
} else {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
cfd = cfh - > cfd ( ) ;
}
int output_level =
( cfd - > options ( ) - > compaction_style = = kCompactionStyleUniversal )
? level
: level + 1 ;
return RunManualCompaction ( cfd , level , output_level , begin , end ) ;
}
Status DBImpl : : FlushMemTable ( ColumnFamilyData * cfd ,
Status DBImpl : : FlushMemTable ( ColumnFamilyData * cfd ,
const FlushOptions & options ) {
const FlushOptions & options ) {
// nullptr batch means just wait for earlier writes to be done
// nullptr batch means just wait for earlier writes to be done
@ -1838,38 +1800,6 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
return s ;
return s ;
}
}
Status DBImpl : : TEST_FlushMemTable ( bool wait ) {
FlushOptions fo ;
fo . wait = wait ;
return FlushMemTable ( default_cf_handle_ - > cfd ( ) , fo ) ;
}
Status DBImpl : : TEST_WaitForFlushMemTable ( ColumnFamilyHandle * column_family ) {
ColumnFamilyData * cfd ;
if ( column_family = = nullptr ) {
cfd = default_cf_handle_ - > cfd ( ) ;
} else {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
cfd = cfh - > cfd ( ) ;
}
return WaitForFlushMemTable ( cfd ) ;
}
Status DBImpl : : TEST_WaitForCompact ( ) {
// Wait until the compaction completes
// TODO: a bug here. This function actually does not necessarily
// wait for compact. It actually waits for scheduled compaction
// OR flush to finish.
MutexLock l ( & mutex_ ) ;
while ( ( bg_compaction_scheduled_ | | bg_flush_scheduled_ ) & &
bg_error_ . ok ( ) ) {
bg_cv_ . Wait ( ) ;
}
return bg_error_ ;
}
void DBImpl : : MaybeScheduleFlushOrCompaction ( ) {
void DBImpl : : MaybeScheduleFlushOrCompaction ( ) {
mutex_ . AssertHeld ( ) ;
mutex_ . AssertHeld ( ) ;
bg_schedule_needed_ = false ;
bg_schedule_needed_ = false ;
@ -2025,16 +1955,6 @@ void DBImpl::BackgroundCallFlush() {
}
}
}
}
void DBImpl : : TEST_PurgeObsoleteteWAL ( ) {
PurgeObsoleteWALFiles ( ) ;
}
uint64_t DBImpl : : TEST_GetLevel0TotalSize ( ) {
MutexLock l ( & mutex_ ) ;
return default_cf_handle_ - > cfd ( ) - > current ( ) - > NumLevelBytes ( 0 ) ;
}
void DBImpl : : BackgroundCallCompaction ( ) {
void DBImpl : : BackgroundCallCompaction ( ) {
bool madeProgress = false ;
bool madeProgress = false ;
DeletionState deletion_state ( true ) ;
DeletionState deletion_state ( true ) ;
@ -3221,36 +3141,6 @@ ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
return default_cf_handle_ ;
return default_cf_handle_ ;
}
}
Iterator * DBImpl : : TEST_NewInternalIterator ( ColumnFamilyHandle * column_family ) {
ColumnFamilyData * cfd ;
if ( column_family = = nullptr ) {
cfd = default_cf_handle_ - > cfd ( ) ;
} else {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
cfd = cfh - > cfd ( ) ;
}
mutex_ . Lock ( ) ;
SuperVersion * super_version = cfd - > GetSuperVersion ( ) - > Ref ( ) ;
mutex_ . Unlock ( ) ;
ReadOptions roptions ;
roptions . prefix_seek = true ;
return NewInternalIterator ( roptions , cfd , super_version ) ;
}
int64_t DBImpl : : TEST_MaxNextLevelOverlappingBytes (
ColumnFamilyHandle * column_family ) {
ColumnFamilyData * cfd ;
if ( column_family = = nullptr ) {
cfd = default_cf_handle_ - > cfd ( ) ;
} else {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
cfd = cfh - > cfd ( ) ;
}
MutexLock l ( & mutex_ ) ;
return cfd - > current ( ) - > MaxNextLevelOverlappingBytes ( ) ;
}
Status DBImpl : : Get ( const ReadOptions & options ,
Status DBImpl : : Get ( const ReadOptions & options ,
ColumnFamilyHandle * column_family , const Slice & key ,
ColumnFamilyHandle * column_family , const Slice & key ,
std : : string * value ) {
std : : string * value ) {
@ -3589,7 +3479,12 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
Iterator * iter ;
Iterator * iter ;
if ( options . tailing ) {
if ( options . tailing ) {
# ifdef ROCKSDB_LITE
// not supported in lite version
return nullptr ;
# else
iter = new TailingIterator ( env_ , this , options , cfd ) ;
iter = new TailingIterator ( env_ , this , options , cfd ) ;
# endif
} else {
} else {
SequenceNumber latest_snapshot = versions_ - > LastSequence ( ) ;
SequenceNumber latest_snapshot = versions_ - > LastSequence ( ) ;
SuperVersion * sv = nullptr ;
SuperVersion * sv = nullptr ;
@ -3641,10 +3536,15 @@ Status DBImpl::NewIterators(
}
}
if ( options . tailing ) {
if ( options . tailing ) {
# ifdef ROCKSDB_LITE
return Status : : InvalidArgument (
" Tailing interator not supported in RocksDB lite " ) ;
# else
for ( auto cfh : column_families ) {
for ( auto cfh : column_families ) {
auto cfd = reinterpret_cast < ColumnFamilyHandleImpl * > ( cfh ) - > cfd ( ) ;
auto cfd = reinterpret_cast < ColumnFamilyHandleImpl * > ( cfh ) - > cfd ( ) ;
iterators - > push_back ( new TailingIterator ( env_ , this , options , cfd ) ) ;
iterators - > push_back ( new TailingIterator ( env_ , this , options , cfd ) ) ;
}
}
# endif
} else {
} else {
for ( size_t i = 0 ; i < column_families . size ( ) ; + + i ) {
for ( size_t i = 0 ; i < column_families . size ( ) ; + + i ) {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_families [ i ] ) ;
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_families [ i ] ) ;
@ -3782,6 +3682,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
PERF_TIMER_START ( write_wal_time ) ;
PERF_TIMER_START ( write_wal_time ) ;
Slice log_entry = WriteBatchInternal : : Contents ( updates ) ;
Slice log_entry = WriteBatchInternal : : Contents ( updates ) ;
status = log_ - > AddRecord ( log_entry ) ;
status = log_ - > AddRecord ( log_entry ) ;
log_empty_ = false ;
RecordTick ( options_ . statistics . get ( ) , WAL_FILE_SYNCED , 1 ) ;
RecordTick ( options_ . statistics . get ( ) , WAL_FILE_SYNCED , 1 ) ;
RecordTick ( options_ . statistics . get ( ) , WAL_FILE_BYTES , log_entry . size ( ) ) ;
RecordTick ( options_ . statistics . get ( ) , WAL_FILE_BYTES , log_entry . size ( ) ) ;
if ( status . ok ( ) & & options . sync ) {
if ( status . ok ( ) & & options . sync ) {
@ -4057,57 +3958,66 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
// Attempt to switch to a new memtable and trigger flush of old.
// Attempt to switch to a new memtable and trigger flush of old.
// Do this without holding the dbmutex lock.
// Do this without holding the dbmutex lock.
assert ( versions_ - > PrevLogNumber ( ) = = 0 ) ;
assert ( versions_ - > PrevLogNumber ( ) = = 0 ) ;
uint64_t new_log_number = versions_ - > NewFileNumber ( ) ;
bool creating_new_log = ! log_empty_ ;
uint64_t new_log_number =
creating_new_log ? versions_ - > NewFileNumber ( ) : logfile_number_ ;
SuperVersion * new_superversion = nullptr ;
SuperVersion * new_superversion = nullptr ;
mutex_ . Unlock ( ) ;
mutex_ . Unlock ( ) ;
{
{
DelayLoggingAndReset ( ) ;
DelayLoggingAndReset ( ) ;
s = env_ - > NewWritableFile ( LogFileName ( options_ . wal_dir , new_log_number ) ,
if ( creating_new_log ) {
& lfile ,
s = env_ - > NewWritableFile (
env_ - > OptimizeForLogWrite ( storage_options_ ) ) ;
LogFileName ( options_ . wal_dir , new_log_number ) , & lfile ,
env_ - > OptimizeForLogWrite ( storage_options_ ) ) ;
if ( s . ok ( ) ) {
// Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution.
lfile - > SetPreallocationBlockSize ( 1.1 *
cfd - > options ( ) - > write_buffer_size ) ;
new_log = new log : : Writer ( std : : move ( lfile ) ) ;
}
}
if ( s . ok ( ) ) {
if ( s . ok ( ) ) {
// Our final size should be less than write_buffer_size
// (compression, etc) but err on the side of caution.
lfile - > SetPreallocationBlockSize ( 1.1 *
cfd - > options ( ) - > write_buffer_size ) ;
new_log = new log : : Writer ( std : : move ( lfile ) ) ;
new_mem = new MemTable ( cfd - > internal_comparator ( ) , * cfd - > options ( ) ) ;
new_mem = new MemTable ( cfd - > internal_comparator ( ) , * cfd - > options ( ) ) ;
new_superversion = new SuperVersion ( ) ;
new_superversion = new SuperVersion ( ) ;
}
}
Log ( options_ . info_log ,
" New memtable created with log file: #%lu \n " ,
( unsigned long ) new_log_number ) ;
}
}
mutex_ . Lock ( ) ;
mutex_ . Lock ( ) ;
if ( ! s . ok ( ) ) {
if ( ! s . ok ( ) ) {
// how do we fail if we're not creating new log?
assert ( creating_new_log ) ;
// Avoid chewing through file number space in a tight loop.
// Avoid chewing through file number space in a tight loop.
versions_ - > ReuseFileNumber ( new_log_number ) ;
versions_ - > ReuseFileNumber ( new_log_number ) ;
assert ( ! new_mem ) ;
assert ( ! new_mem ) ;
assert ( ! new_log ) ;
assert ( ! new_log ) ;
break ;
break ;
}
}
logfile_number_ = new_log_number ;
if ( creating_new_log ) {
assert ( new_log ! = nullptr ) ;
logfile_number_ = new_log_number ;
// TODO(icanadi) delete outside of mutex
assert ( new_log ! = nullptr ) ;
delete log_ . release ( ) ;
// TODO(icanadi) delete outside of mutex
log_ . reset ( new_log ) ;
delete log_ . release ( ) ;
log_ . reset ( new_log ) ;
log_empty_ = true ;
alive_log_files_ . push_back ( logfile_number_ ) ;
for ( auto cfd : * versions_ - > GetColumnFamilySet ( ) ) {
// all this is just optimization to delete logs that
// are no longer needed -- if CF is empty, that means it
// doesn't need that particular log to stay alive, so we just
// advance the log number. no need to persist this in the manifest
if ( cfd - > mem ( ) - > GetFirstSequenceNumber ( ) = = 0 & &
cfd - > imm ( ) - > size ( ) = = 0 ) {
cfd - > SetLogNumber ( logfile_number_ ) ;
}
}
}
cfd - > mem ( ) - > SetNextLogNumber ( logfile_number_ ) ;
cfd - > mem ( ) - > SetNextLogNumber ( logfile_number_ ) ;
cfd - > imm ( ) - > Add ( cfd - > mem ( ) ) ;
cfd - > imm ( ) - > Add ( cfd - > mem ( ) ) ;
if ( force ) {
if ( force ) {
cfd - > imm ( ) - > FlushRequested ( ) ;
cfd - > imm ( ) - > FlushRequested ( ) ;
}
}
new_mem - > Ref ( ) ;
new_mem - > Ref ( ) ;
alive_log_files_ . push_back ( logfile_number_ ) ;
for ( auto cfd : * versions_ - > GetColumnFamilySet ( ) ) {
// all this is just optimization to delete logs that
// are no longer needed -- if CF is empty, that means it
// doesn't need that particular log to stay alive, so we just
// advance the log number. no need to persist this in the manifest
if ( cfd - > mem ( ) - > GetFirstSequenceNumber ( ) = = 0 & &
cfd - > imm ( ) - > size ( ) = = 0 ) {
cfd - > SetLogNumber ( logfile_number_ ) ;
}
}
cfd - > SetMemtable ( new_mem ) ;
cfd - > SetMemtable ( new_mem ) ;
Log ( options_ . info_log ,
Log ( options_ . info_log ,
" [CF % " PRIu32 " ] New memtable created with log file: #%lu \n " ,
" [CF % " PRIu32 " ] New memtable created with log file: #%lu \n " ,
@ -4121,6 +4031,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
return s ;
return s ;
}
}
# ifndef ROCKSDB_LITE
Status DBImpl : : GetPropertiesOfAllTables ( ColumnFamilyHandle * column_family ,
Status DBImpl : : GetPropertiesOfAllTables ( ColumnFamilyHandle * column_family ,
TablePropertiesCollection * props ) {
TablePropertiesCollection * props ) {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
@ -4141,6 +4052,7 @@ Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
return s ;
return s ;
}
}
# endif // ROCKSDB_LITE
const std : : string & DBImpl : : GetName ( ) const {
const std : : string & DBImpl : : GetName ( ) const {
return dbname_ ;
return dbname_ ;
@ -4200,6 +4112,34 @@ inline void DBImpl::DelayLoggingAndReset() {
}
}
}
}
# ifndef ROCKSDB_LITE
Status DBImpl : : GetUpdatesSince (
SequenceNumber seq , unique_ptr < TransactionLogIterator > * iter ,
const TransactionLogIterator : : ReadOptions & read_options ) {
RecordTick ( options_ . statistics . get ( ) , GET_UPDATES_SINCE_CALLS ) ;
if ( seq > versions_ - > LastSequence ( ) ) {
return Status : : NotFound ( " Requested sequence not yet written in the db " ) ;
}
// Get all sorted Wal Files.
// Do binary search and open files and find the seq number.
std : : unique_ptr < VectorLogPtr > wal_files ( new VectorLogPtr ) ;
Status s = GetSortedWalFiles ( * wal_files ) ;
if ( ! s . ok ( ) ) {
return s ;
}
s = RetainProbableWalFiles ( * wal_files , seq ) ;
if ( ! s . ok ( ) ) {
return s ;
}
iter - > reset ( new TransactionLogIteratorImpl ( options_ . wal_dir , & options_ ,
read_options , storage_options_ ,
seq , std : : move ( wal_files ) , this ) ) ;
return ( * iter ) - > status ( ) ;
}
Status DBImpl : : DeleteFile ( std : : string name ) {
Status DBImpl : : DeleteFile ( std : : string name ) {
uint64_t number ;
uint64_t number ;
FileType type ;
FileType type ;
@ -4283,6 +4223,7 @@ void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
MutexLock l ( & mutex_ ) ;
MutexLock l ( & mutex_ ) ;
versions_ - > GetLiveFilesMetaData ( metadata ) ;
versions_ - > GetLiveFilesMetaData ( metadata ) ;
}
}
# endif // ROCKSDB_LITE
Status DBImpl : : CheckConsistency ( ) {
Status DBImpl : : CheckConsistency ( ) {
mutex_ . AssertHeld ( ) ;
mutex_ . AssertHeld ( ) ;
@ -4311,23 +4252,6 @@ Status DBImpl::CheckConsistency() {
}
}
}
}
void DBImpl : : TEST_GetFilesMetaData (
ColumnFamilyHandle * column_family ,
std : : vector < std : : vector < FileMetaData > > * metadata ) {
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
auto cfd = cfh - > cfd ( ) ;
MutexLock l ( & mutex_ ) ;
metadata - > resize ( NumberLevels ( ) ) ;
for ( int level = 0 ; level < NumberLevels ( ) ; level + + ) {
const std : : vector < FileMetaData * > & files = cfd - > current ( ) - > files_ [ level ] ;
( * metadata ) [ level ] . clear ( ) ;
for ( const auto & f : files ) {
( * metadata ) [ level ] . push_back ( * f ) ;
}
}
}
Status DBImpl : : GetDbIdentity ( std : : string & identity ) {
Status DBImpl : : GetDbIdentity ( std : : string & identity ) {
std : : string idfilename = IdentityFileName ( dbname_ ) ;
std : : string idfilename = IdentityFileName ( dbname_ ) ;
unique_ptr < SequentialFile > idfile ;
unique_ptr < SequentialFile > idfile ;