@ -1,4 +1,4 @@
// Copyright (c) 2013 , Facebook, Inc. All rights reserved.
// Copyright (c) 2011-present , Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
@ -15,6 +15,9 @@
# include <inttypes.h>
# include <stdint.h>
# ifdef OS_SOLARIS
# include <alloca.h>
# endif
# include <algorithm>
# include <climits>
@ -28,8 +31,10 @@
# include <utility>
# include <vector>
# include "db/auto_roll_logger.h"
# include "db/builder.h"
# include "db/compaction_job.h"
# include "db/db_info_dumper.h"
# include "db/db_iter.h"
# include "db/dbformat.h"
# include "db/event_helpers.h"
@ -51,6 +56,7 @@
# include "db/write_batch_internal.h"
# include "db/write_callback.h"
# include "db/writebuffer.h"
# include "db/xfunc_test_points.h"
# include "memtable/hash_linklist_rep.h"
# include "memtable/hash_skiplist_rep.h"
# include "port/likely.h"
@ -58,7 +64,6 @@
# include "rocksdb/cache.h"
# include "rocksdb/compaction_filter.h"
# include "rocksdb/db.h"
# include "rocksdb/delete_scheduler.h"
# include "rocksdb/env.h"
# include "rocksdb/merge_operator.h"
# include "rocksdb/sst_file_writer.h"
@ -72,19 +77,18 @@
# include "table/merger.h"
# include "table/table_builder.h"
# include "table/two_level_iterator.h"
# include "util/auto_roll_logger.h"
# include "util/autovector.h"
# include "util/build_version.h"
# include "util/coding.h"
# include "util/compression.h"
# include "util/crc32c.h"
# include "util/db_info_dumper.h"
# include "util/file_reader_writer.h"
# include "util/file_util.h"
# include "util/iostats_context_imp.h"
# include "util/log_buffer.h"
# include "util/logging.h"
# include "util/mutexlock.h"
# include "util/sst_file_manager_impl.h"
# include "util/options_helper.h"
# include "util/options_parser.h"
# include "util/perf_context_imp.h"
@ -142,6 +146,12 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result . info_log = nullptr ;
}
}
if ( result . base_background_compactions = = - 1 ) {
result . base_background_compactions = result . max_background_compactions ;
}
if ( result . base_background_compactions > result . max_background_compactions ) {
result . base_background_compactions = result . max_background_compactions ;
}
result . env - > IncBackgroundThreadsIfNeeded ( src . max_background_compactions ,
Env : : Priority : : LOW ) ;
result . env - > IncBackgroundThreadsIfNeeded ( src . max_background_flushes ,
@ -265,13 +275,14 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
db_options_ . delete_obsolete_files_period_micros ) ,
last_stats_dump_time_microsec_ ( 0 ) ,
next_job_id_ ( 1 ) ,
flush_on_destroy _( false ) ,
has_unpersisted_data _( false ) ,
env_options_ ( db_options_ ) ,
# ifndef ROCKSDB_LITE
wal_manager_ ( db_options_ , env_options_ ) ,
# endif // ROCKSDB_LITE
event_logger_ ( db_options_ . info_log . get ( ) ) ,
bg_work_paused_ ( 0 ) ,
bg_compaction_paused_ ( 0 ) ,
refitting_level_ ( false ) ,
opened_successfully_ ( false ) {
env_ - > GetAbsolutePath ( dbname , & db_absolute_path_ ) ;
@ -312,7 +323,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
DBImpl : : ~ DBImpl ( ) {
mutex_ . Lock ( ) ;
if ( ! shutting_down_ . load ( std : : memory_order_acquire ) & & flush_on_destroy_ ) {
if ( ! shutting_down_ . load ( std : : memory_order_acquire ) & &
has_unpersisted_data_ ) {
for ( auto cfd : * versions_ - > GetColumnFamilySet ( ) ) {
if ( ! cfd - > IsDropped ( ) & & ! cfd - > mem ( ) - > IsEmpty ( ) ) {
cfd - > Ref ( ) ;
@ -484,23 +496,22 @@ void DBImpl::MaybeDumpStats() {
last_stats_dump_time_microsec_ = now_micros ;
# ifndef ROCKSDB_LITE
bool tmp1 = false ;
bool tmp2 = false ;
DBPropertyType cf_property_type =
GetPropertyType ( DB : : Properties : : kCFStats , & tmp1 , & tmp2 ) ;
DBPropertyType db_property_type =
GetPropertyType ( DB : : Properties : : kDBStats , & tmp1 , & tmp2 ) ;
const DBPropertyInfo * cf_property_info =
GetPropertyInfo ( DB : : Properties : : kCFStats ) ;
assert ( cf_property_info ! = nullptr ) ;
const DBPropertyInfo * db_property_info =
GetPropertyInfo ( DB : : Properties : : kDBStats ) ;
assert ( db_property_info ! = nullptr ) ;
std : : string stats ;
{
InstrumentedMutexLock l ( & mutex_ ) ;
for ( auto cfd : * versions_ - > GetColumnFamilySet ( ) ) {
cfd - > internal_stats ( ) - > GetStringProperty ( cf_property_type ,
DB : : Properties : : kCFStats ,
& stats ) ;
cfd - > internal_stats ( ) - > GetStringProperty (
* cf_property_info , DB : : Properties : : kCFStats , & stats ) ;
}
default_cf_internal_stats_ - > GetStringProperty ( db_property_type ,
DB : : Properties : : kDBStats ,
& stats ) ;
default_cf_internal_stats_ - > GetStringProperty (
* db_property_info , DB : : Properties : : kDBStats , & stats ) ;
}
Log ( InfoLogLevel : : WARN_LEVEL ,
db_options_ . info_log , " ------- DUMPING STATS ------- " ) ;
@ -561,6 +572,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// Get obsolete files. This function will also update the list of
// pending files in VersionSet().
versions_ - > GetObsoleteFiles ( & job_context - > sst_delete_files ,
& job_context - > manifest_delete_files ,
job_context - > min_pending_output ) ;
// store the current filenum, lognum, etc
@ -678,9 +690,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
}
auto candidate_files = state . full_scan_candidate_files ;
candidate_files . reserve ( candidate_files . size ( ) +
state . sst_delete_files . size ( ) +
state . log _delete_files . size ( ) ) ;
candidate_files . reserve (
candidate_files . size ( ) + state . sst_delete_files . size ( ) +
state . log_delete_files . size ( ) + state . manifest _delete_files. size ( ) ) ;
// We may ignore the dbname when generating the file names.
const char * kDumbDbName = " " ;
for ( auto file : state . sst_delete_files ) {
@ -696,6 +708,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
0 ) ;
}
}
for ( const auto & filename : state . manifest_delete_files ) {
candidate_files . emplace_back ( filename , 0 ) ;
}
// dedup state.candidate_files so we don't try to delete the same
// file twice
@ -782,8 +797,8 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
}
# endif // !ROCKSDB_LITE
Status file_deletion_status ;
if ( type = = kTableFile & & path_id = = 0 ) {
file_deletion_status = DeleteOrMoveToTrash ( & db_options_ , fname ) ;
if ( type = = kTableFile ) {
file_deletion_status = DeleteSSTFile ( & db_options_ , fname , path_id ) ;
} else {
file_deletion_status = env_ - > DeleteFile ( fname ) ;
}
@ -814,7 +829,8 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
// Delete old info log files.
size_t old_info_log_file_count = old_info_log_files . size ( ) ;
if ( old_info_log_file_count > = db_options_ . keep_log_file_num ) {
if ( old_info_log_file_count ! = 0 & &
old_info_log_file_count > = db_options_ . keep_log_file_num ) {
std : : sort ( old_info_log_files . begin ( ) , old_info_log_files . end ( ) ) ;
size_t end = old_info_log_file_count - db_options_ . keep_log_file_num ;
for ( unsigned int i = 0 ; i < = end ; i + + ) {
@ -1393,9 +1409,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
mutex_ . AssertHeld ( ) ;
const uint64_t start_micros = env_ - > NowMicros ( ) ;
FileMetaData meta ;
meta . fd = FileDescriptor ( versions_ - > NewFileNumber ( ) , 0 , 0 ) ;
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs ( ) ;
meta . fd = FileDescriptor ( versions_ - > NewFileNumber ( ) , 0 , 0 ) ;
ReadOptions ro ;
ro . total_order_seek = true ;
Arena arena ;
@ -1517,13 +1533,26 @@ Status DBImpl::FlushMemTableToOutputFile(
bg_error_ = s ;
}
RecordFlushIOStats ( ) ;
# ifndef ROCKSDB_LITE
if ( s . ok ( ) ) {
# ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted ( cfd , & file_meta , mutable_cf_options ,
job_context - > job_id , flush_job . GetTableProperties ( ) ) ;
}
# endif // ROCKSDB_LITE
auto sfm =
static_cast < SstFileManagerImpl * > ( db_options_ . sst_file_manager . get ( ) ) ;
if ( sfm ) {
// Notify sst_file_manager that a new file was added
std : : string file_path = MakeTableFileName ( db_options_ . db_paths [ 0 ] . path ,
file_meta . fd . GetNumber ( ) ) ;
sfm - > OnAddFile ( file_path ) ;
if ( sfm - > IsMaxAllowedSpaceReached ( ) & & bg_error_ . ok ( ) ) {
bg_error_ = Status : : IOError ( " Max allowed space was reached " ) ;
TEST_SYNC_POINT (
" DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached " ) ;
}
}
}
return s ;
}
@ -1813,13 +1842,16 @@ Status DBImpl::CompactFilesImpl(
std : : vector < SequenceNumber > snapshot_seqs =
snapshots_ . GetAll ( & earliest_write_conflict_snapshot ) ;
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs ( ) ;
assert ( is_snapshot_supported_ | | snapshots_ . empty ( ) ) ;
CompactionJob compaction_job (
job_context - > job_id , c . get ( ) , db_options_ , env_options_ , versions_ . get ( ) ,
& shutting_down_ , log_buffer , directories_ . GetDbDir ( ) ,
directories_ . GetDataDir ( c - > output_path_id ( ) ) , stats_ , snapshot_seqs ,
earliest_write_conflict_snapshot , table_cache_ , & event_logger _ ,
c - > mutable_cf_options ( ) - > paranoid_file_checks ,
directories_ . GetDataDir ( c - > output_path_id ( ) ) , stats_ , & mutex_ , & bg_error_ ,
snapshot_seqs , earliest_write_conflict_snapshot , table_cache_ ,
& event_logger_ , c - > mutable_cf_options ( ) - > paranoid_file_checks ,
c - > mutable_cf_options ( ) - > compaction_measure_io_stats , dbname_ ,
nullptr ) ; // Here we pass a nullptr for CompactionJobStats because
// CompactFiles does not trigger OnCompactionCompleted(),
@ -1834,21 +1866,35 @@ Status DBImpl::CompactFilesImpl(
// support for CompactFiles, we should have CompactFiles API
// pass a pointer of CompactionJobStats as the out-value
// instead of using EventListener.
// Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already
// being compacted). Since we just changed compaction score, we recalculate it
// here.
{
CompactionOptionsFIFO dummy_compaction_options_fifo ;
version - > storage_info ( ) - > ComputeCompactionScore (
* c - > mutable_cf_options ( ) , dummy_compaction_options_fifo ) ;
}
compaction_job . Prepare ( ) ;
mutex_ . Unlock ( ) ;
TEST_SYNC_POINT ( " CompactFilesImpl:0 " ) ;
TEST_SYNC_POINT ( " CompactFilesImpl:1 " ) ;
compaction_job . Run ( ) ;
TEST_SYNC_POINT ( " CompactFilesImpl:2 " ) ;
TEST_SYNC_POINT ( " CompactFilesImpl:3 " ) ;
mutex_ . Lock ( ) ;
Status status = compaction_job . Install ( * c - > mutable_cf_options ( ) , & mutex_ ) ;
Status status = compaction_job . Install ( * c - > mutable_cf_options ( ) ) ;
if ( status . ok ( ) ) {
InstallSuperVersionAndScheduleWorkWrapper (
c - > column_family_data ( ) , job_context , * c - > mutable_cf_options ( ) ) ;
}
c - > ReleaseCompactionFiles ( s ) ;
c . reset ( ) ;
ReleaseFileNumberFromPendingOutputs ( pending_outputs_inserted_elem ) ;
if ( status . ok ( ) ) {
// Done
@ -1864,6 +1910,8 @@ Status DBImpl::CompactFilesImpl(
}
}
c . reset ( ) ;
bg_compaction_scheduled_ - - ;
if ( bg_compaction_scheduled_ = = 0 ) {
bg_cv_ . SignalAll ( ) ;
@ -1875,10 +1923,11 @@ Status DBImpl::CompactFilesImpl(
Status DBImpl : : PauseBackgroundWork ( ) {
InstrumentedMutexLock guard_lock ( & mutex_ ) ;
bg_work _paused_ + + ;
bg_compaction _paused_ + + ;
while ( bg_compaction_scheduled_ > 0 | | bg_flush_scheduled_ > 0 ) {
bg_cv_ . Wait ( ) ;
}
bg_work_paused_ + + ;
return Status : : OK ( ) ;
}
@ -1888,7 +1937,11 @@ Status DBImpl::ContinueBackgroundWork() {
return Status : : InvalidArgument ( ) ;
}
assert ( bg_work_paused_ > 0 ) ;
assert ( bg_compaction_paused_ > 0 ) ;
bg_compaction_paused_ - - ;
bg_work_paused_ - - ;
// It's sufficient to check just bg_work_paused_ here since
// bg_work_paused_ is always no greater than bg_compaction_paused_
if ( bg_work_paused_ = = 0 ) {
MaybeScheduleFlushOrCompaction ( ) ;
}
@ -2188,6 +2241,9 @@ Status DBImpl::SyncWAL() {
status = directories_ . GetWalDir ( ) - > Fsync ( ) ;
}
TEST_SYNC_POINT ( " DBImpl::SyncWAL:BeforeMarkLogsSynced:1 " ) ;
TEST_SYNC_POINT ( " DBImpl::SyncWAL:BeforeMarkLogsSynced:2 " ) ;
{
InstrumentedMutexLock l ( & mutex_ ) ;
MarkLogsSynced ( current_log_number , need_log_dir_sync , status ) ;
@ -2215,7 +2271,8 @@ void DBImpl::MarkLogsSynced(
+ + it ;
}
}
assert ( logs_ . empty ( ) | | ( logs_ . size ( ) = = 1 & & ! logs_ [ 0 ] . getting_synced ) ) ;
assert ( logs_ . empty ( ) | | logs_ [ 0 ] . number > up_to | |
( logs_ . size ( ) = = 1 & & ! logs_ [ 0 ] . getting_synced ) ) ;
log_sync_cv_ . SignalAll ( ) ;
}
@ -2453,25 +2510,32 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
env_ - > Schedule ( & DBImpl : : BGWorkFlush , this , Env : : Priority : : HIGH , this ) ;
}
auto bg_compactions_allowed = BGCompactionsAllowed ( ) ;
// special case -- if max_background_flushes == 0, then schedule flush on a
// compaction thread
if ( db_options_ . max_background_flushes = = 0 ) {
while ( unscheduled_flushes_ > 0 & &
bg_flush_scheduled_ + bg_compaction_scheduled_ <
db_options_ . max_background_compactions ) {
bg_compactions_allowed ) {
unscheduled_flushes_ - - ;
bg_flush_scheduled_ + + ;
env_ - > Schedule ( & DBImpl : : BGWorkFlush , this , Env : : Priority : : LOW , this ) ;
}
}
if ( bg_compaction_paused_ > 0 ) {
// we paused the background compaction
return ;
}
if ( HasExclusiveManualCompaction ( ) ) {
// only manual compactions are allowed to run. don't schedule automatic
// compactions
return ;
}
while ( bg_compaction_scheduled_ < db_options_ . max_background_compactions & &
while ( bg_compaction_scheduled_ < bg_compactions_allowed & &
unscheduled_compactions_ > 0 ) {
CompactionArg * ca = new CompactionArg ;
ca - > db = this ;
@ -2483,6 +2547,14 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
}
}
int DBImpl : : BGCompactionsAllowed ( ) const {
if ( write_controller_ . NeedSpeedupCompaction ( ) ) {
return db_options_ . max_background_compactions ;
} else {
return db_options_ . base_background_compactions ;
}
}
void DBImpl : : AddToCompactionQueue ( ColumnFamilyData * cfd ) {
assert ( ! cfd - > pending_compaction ( ) ) ;
cfd - > Ref ( ) ;
@ -2595,10 +2667,10 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
LogToBuffer (
log_buffer ,
" Calling FlushMemTableToOutputFile with column "
" family [%s], flush slots available %d, compaction slots avai lab le %d " ,
cfd - > GetName ( ) . c_str ( ) ,
db_options_ . max_background_flushes - bg_flush_scheduled_ ,
db_options_ . max_background_compactions - bg_compaction_scheduled_ ) ;
" family [%s], flush slots available %d, compaction slots allow ed %d, "
" compaction slots scheduled %d " ,
cfd - > GetName ( ) . c_str ( ) , db_options_ . max_background_flushes ,
bg_flush_scheduled_ , BGCompactionsAllowed ( ) - bg_compaction_scheduled_ ) ;
status = FlushMemTableToOutputFile ( cfd , mutable_cf_options , made_progress ,
job_context , log_buffer ) ;
if ( cfd - > Unref ( ) ) {
@ -2911,7 +2983,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
TEST_SYNC_POINT ( " DBImpl::BackgroundCompaction:TrivialMove " ) ;
// Instrument for event update
// TODO(yhchiang): add op details for showing trivial-move.
ThreadStatusUtil : : SetColumnFamily ( c - > column_family_data ( ) ) ;
ThreadStatusUtil : : SetColumnFamily (
c - > column_family_data ( ) , c - > column_family_data ( ) - > ioptions ( ) - > env ,
c - > column_family_data ( ) - > options ( ) - > enable_thread_tracking ) ;
ThreadStatusUtil : : SetThreadOperation ( ThreadStatus : : OP_COMPACTION ) ;
compaction_job_stats . num_input_files = c - > num_input_files ( 0 ) ;
@ -2980,8 +3054,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
CompactionJob compaction_job (
job_context - > job_id , c . get ( ) , db_options_ , env_options_ ,
versions_ . get ( ) , & shutting_down_ , log_buffer , directories_ . GetDbDir ( ) ,
directories_ . GetDataDir ( c - > output_path_id ( ) ) , stats_ , snapshot_seqs ,
earliest_write_conflict_snapshot , table_cache_ , & event_logger_ ,
directories_ . GetDataDir ( c - > output_path_id ( ) ) , stats_ , & mutex_ ,
& bg_error_ , snapshot_seqs , earliest_write_conflict_snapshot ,
table_cache_ , & event_logger_ ,
c - > mutable_cf_options ( ) - > paranoid_file_checks ,
c - > mutable_cf_options ( ) - > compaction_measure_io_stats , dbname_ ,
& compaction_job_stats ) ;
@ -2992,7 +3067,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
TEST_SYNC_POINT ( " DBImpl::BackgroundCompaction:NonTrivial:AfterRun " ) ;
mutex_ . Lock ( ) ;
status = compaction_job . Install ( * c - > mutable_cf_options ( ) , & mutex_ ) ;
status = compaction_job . Install ( * c - > mutable_cf_options ( ) ) ;
if ( status . ok ( ) ) {
InstallSuperVersionAndScheduleWorkWrapper (
c - > column_family_data ( ) , job_context , * c - > mutable_cf_options ( ) ) ;
@ -3294,13 +3369,19 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
LookupKey lkey ( key , snapshot ) ;
PERF_TIMER_STOP ( get_snapshot_time ) ;
bool skip_memtable =
( read_options . read_tier = = kPersistedTier & & has_unpersisted_data_ ) ;
bool done = false ;
if ( ! skip_memtable ) {
if ( sv - > mem - > Get ( lkey , value , & s , & merge_context ) ) {
// Done
done = true ;
RecordTick ( stats_ , MEMTABLE_HIT ) ;
} else if ( sv - > imm - > Get ( lkey , value , & s , & merge_context ) ) {
// Done
done = true ;
RecordTick ( stats_ , MEMTABLE_HIT ) ;
} else {
}
}
if ( ! done ) {
PERF_TIMER_GUARD ( get_from_output_files_time ) ;
sv - > current - > Get ( read_options , lkey , value , & s , & merge_context ,
value_found ) ;
@ -3314,6 +3395,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
RecordTick ( stats_ , NUMBER_KEYS_READ ) ;
RecordTick ( stats_ , BYTES_READ , value - > size ( ) ) ;
MeasureTime ( stats_ , BYTES_PER_READ , value - > size ( ) ) ;
}
return s ;
}
@ -3384,14 +3466,23 @@ std::vector<Status> DBImpl::MultiGet(
assert ( mgd_iter ! = multiget_cf_data . end ( ) ) ;
auto mgd = mgd_iter - > second ;
auto super_version = mgd - > super_version ;
bool skip_memtable =
( read_options . read_tier = = kPersistedTier & & has_unpersisted_data_ ) ;
bool done = false ;
if ( ! skip_memtable ) {
if ( super_version - > mem - > Get ( lkey , value , & s , & merge_context ) ) {
// Done
done = true ;
// TODO(?): RecordTick(stats_, MEMTABLE_HIT)?
} else if ( super_version - > imm - > Get ( lkey , value , & s , & merge_context ) ) {
// Done
} else {
done = true ;
// TODO(?): RecordTick(stats_, MEMTABLE_HIT)?
}
}
if ( ! done ) {
PERF_TIMER_GUARD ( get_from_output_files_time ) ;
super_version - > current - > Get ( read_options , lkey , value , & s ,
& merge_context ) ;
// TODO(?): RecordTick(stats_, MEMTABLE_MISS)?
}
if ( s . ok ( ) ) {
@ -3424,6 +3515,7 @@ std::vector<Status> DBImpl::MultiGet(
RecordTick ( stats_ , NUMBER_MULTIGET_CALLS ) ;
RecordTick ( stats_ , NUMBER_MULTIGET_KEYS_READ , num_keys ) ;
RecordTick ( stats_ , NUMBER_MULTIGET_BYTES_READ , bytes_read ) ;
MeasureTime ( stats_ , BYTES_PER_MULTIGET , bytes_read ) ;
PERF_TIMER_STOP ( get_post_process_time ) ;
return stat_list ;
@ -3516,6 +3608,9 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
ColumnFamilyData * cfd = cfh - > cfd ( ) ;
if ( file_info - > num_entries = = 0 ) {
return Status : : InvalidArgument ( " File contain no entries " ) ;
}
if ( file_info - > version ! = 1 ) {
return Status : : InvalidArgument ( " Generated table version is not supported " ) ;
}
@ -3536,8 +3631,16 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
return Status : : InvalidArgument (
" Non zero sequence numbers are not supported " ) ;
}
// Generate a location for the new table
meta . fd = FileDescriptor ( versions_ - > NewFileNumber ( ) , 0 , file_info - > file_size ) ;
std : : list < uint64_t > : : iterator pending_outputs_inserted_elem ;
{
InstrumentedMutexLock l ( & mutex_ ) ;
pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs ( ) ;
meta . fd =
FileDescriptor ( versions_ - > NewFileNumber ( ) , 0 , file_info - > file_size ) ;
}
std : : string db_fname = TableFileName (
db_options_ . db_paths , meta . fd . GetNumber ( ) , meta . fd . GetPathId ( ) ) ;
@ -3550,6 +3653,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
} else {
status = CopyFile ( env_ , file_info - > file_path , db_fname , 0 ) ;
}
TEST_SYNC_POINT ( " DBImpl::AddFile:FileCopied " ) ;
if ( ! status . ok ( ) ) {
return status ;
}
@ -3613,6 +3717,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
delete InstallSuperVersionAndScheduleWork ( cfd , nullptr ,
mutable_cf_options ) ;
}
ReleaseFileNumberFromPendingOutputs ( pending_outputs_inserted_elem ) ;
}
if ( ! status . ok ( ) ) {
@ -3826,6 +3931,10 @@ bool DBImpl::KeyMayExist(const ReadOptions& read_options,
Iterator * DBImpl : : NewIterator ( const ReadOptions & read_options ,
ColumnFamilyHandle * column_family ) {
if ( read_options . read_tier = = kPersistedTier ) {
return NewErrorIterator ( Status : : NotSupported (
" ReadTier::kPersistedData is not yet supported in iterators. " ) ) ;
}
auto cfh = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) ;
auto cfd = cfh - > cfd ( ) ;
@ -3857,8 +3966,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
env_ , * cfd - > ioptions ( ) , cfd - > user_comparator ( ) , iter ,
kMaxSequenceNumber ,
sv - > mutable_cf_options . max_sequential_skip_in_iterations ,
read_options . iterate_upper_bound , read_options . prefix_same_as_start ,
read_options . pin_data ) ;
sv - > version_number , read_options . iterate_upper_bound ,
read_options . prefix_same_as_start , read_options . p in_data ) ;
# endif
} else {
SequenceNumber latest_snapshot = versions_ - > LastSequence ( ) ;
@ -3915,8 +4024,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
ArenaWrappedDBIter * db_iter = NewArenaWrappedDbIterator (
env_ , * cfd - > ioptions ( ) , cfd - > user_comparator ( ) , snapshot ,
sv - > mutable_cf_options . max_sequential_skip_in_iterations ,
read_options . iterate_upper_bound , read_options . prefix_same_as_start ,
read_options . pin_data ) ;
sv - > version_number , read_options . iterate_upper_bound ,
read_options . prefix_same_as_start , read_options . p in_data ) ;
InternalIterator * internal_iter =
NewInternalIterator ( read_options , cfd , sv , db_iter - > GetArena ( ) ) ;
@ -3932,6 +4041,10 @@ Status DBImpl::NewIterators(
const ReadOptions & read_options ,
const std : : vector < ColumnFamilyHandle * > & column_families ,
std : : vector < Iterator * > * iterators ) {
if ( read_options . read_tier = = kPersistedTier ) {
return Status : : NotSupported (
" ReadTier::kPersistedData is not yet supported in iterators. " ) ;
}
iterators - > clear ( ) ;
iterators - > reserve ( column_families . size ( ) ) ;
XFUNC_TEST ( " " , " managed_new " , managed_new1 , xf_manage_new ,
@ -3965,8 +4078,8 @@ Status DBImpl::NewIterators(
iterators - > push_back ( NewDBIterator (
env_ , * cfd - > ioptions ( ) , cfd - > user_comparator ( ) , iter ,
kMaxSequenceNumber ,
sv - > mutable_cf_options . max_sequential_skip_in_iterations , nullptr ,
false , read_options . pin_data ) ) ;
sv - > mutable_cf_options . max_sequential_skip_in_iterations ,
sv - > version_number , nullptr , false , read_options . pin_data ) ) ;
}
# endif
} else {
@ -3985,8 +4098,8 @@ Status DBImpl::NewIterators(
ArenaWrappedDBIter * db_iter = NewArenaWrappedDbIterator (
env_ , * cfd - > ioptions ( ) , cfd - > user_comparator ( ) , snapshot ,
sv - > mutable_cf_options . max_sequential_skip_in_iterations , nullptr ,
false , read_options . pin_data ) ;
sv - > mutable_cf_options . max_sequential_skip_in_iterations ,
sv - > version_number , nullptr , false , read_options . pin_data ) ;
InternalIterator * internal_iter =
NewInternalIterator ( read_options , cfd , sv , db_iter - > GetArena ( ) ) ;
db_iter - > SetIterUnderDBIter ( internal_iter ) ;
@ -4078,7 +4191,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
Status status ;
bool callback_failed = false ;
bool xfunc_attempted_write = false ;
XFUNC_TEST ( " transaction " , " transaction_xftest_write_impl " ,
@ -4096,7 +4208,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
w . sync = write_options . sync ;
w . disableWAL = write_options . disableWAL ;
w . in_batch_group = false ;
w . has_ callback = ( callback ! = nullptr ) ? true : false ;
w . callback = callback ;
if ( ! write_options . disableWAL ) {
RecordTick ( stats_ , WRITE_WITH_WAL ) ;
@ -4109,6 +4221,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// we are a non-leader in a parallel group
PERF_TIMER_GUARD ( write_memtable_time ) ;
if ( ! w . CallbackFailed ( ) ) {
ColumnFamilyMemTablesImpl column_family_memtables (
versions_ - > GetColumnFamilySet ( ) ) ;
WriteBatchInternal : : SetSequence ( w . batch , w . sequence ) ;
@ -4116,21 +4229,24 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
w . batch , & column_family_memtables , & flush_scheduler_ ,
write_options . ignore_missing_column_families , 0 /*log_number*/ , this ,
true /*dont_filter_deletes*/ , true /*concurrent_memtable_writes*/ ) ;
}
if ( write_thread_ . CompleteParallelWorker ( & w ) ) {
// we're responsible for early exit
auto last_sequence = w . parallel_group - > last_writer - > sequence ;
auto last_sequence = w . parallel_group - > last_sequence ;
SetTickerCount ( stats_ , SEQUENCE_NUMBER , last_sequence ) ;
versions_ - > SetLastSequence ( last_sequence ) ;
write_thread_ . EarlyExitParallelGroup ( & w ) ;
}
assert ( w . state = = WriteThread : : STATE_COMPLETED ) ;
// STATE_COMPLETED conditional below handles exit
status = w . FinalStatus ( ) ;
}
if ( w . state = = WriteThread : : STATE_COMPLETED ) {
// write is complete and leader has updated sequence
RecordTick ( stats_ , WRITE_DONE_BY_OTHER ) ;
return w . status ;
return w . FinalStatus ( ) ;
}
// else we are the leader of the write batch group
assert ( w . state = = WriteThread : : STATE_GROUP_LEADER ) ;
@ -4236,7 +4352,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
uint64_t last_sequence = versions_ - > LastSequence ( ) ;
WriteThread : : Writer * last_writer = & w ;
autovector < WriteBatch * > write_batch _group ;
autovector < WriteThread : : Writer * > write_group ;
bool need_log_sync = ! write_options . disableWAL & & write_options . sync ;
bool need_log_dir_sync = need_log_sync & & ! log_dir_synced_ ;
@ -4255,24 +4371,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into memtables
mutex_ . Unlock ( ) ;
if ( callback ! = nullptr ) {
// If this write has a validation callback, check to see if this write
// is able to be written. Must be called on the write thread.
status = callback - > Callback ( this ) ;
callback_failed = true ;
}
} else {
mutex_ . Unlock ( ) ;
}
// At this point the mutex is unlocked
bool exit_completed_early = false ;
last_batch_group_size_ = write_thread_ . EnterAsBatchGroupLeader (
& w , & last_writer , & write_batch _group ) ;
last_batch_group_size_ =
write_thread_ . EnterAsBatchGroupLeader ( & w , & last_writer , & write_group ) ;
if ( status . ok ( ) ) {
// Rules for when we can update the memtable concurrently
@ -4288,15 +4395,17 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// assumed to be true. Rule 4 is checked for each batch. We could
// relax rules 2 and 3 if we could prevent write batches from referring
// more than once to a particular key.
bool parallel = db_options_ . allow_concurrent_memtable_write & &
write_batch _group . size ( ) > 1 ;
bool parallel =
db_options_ . allow_concurrent_memtable_write & & write _group. size ( ) > 1 ;
int total_count = 0 ;
uint64_t total_byte_size = 0 ;
for ( auto b : write_batch_group ) {
total_count + = WriteBatchInternal : : Count ( b ) ;
for ( auto writer : write_group ) {
if ( writer - > CheckCallback ( this ) ) {
total_count + = WriteBatchInternal : : Count ( writer - > batch ) ;
total_byte_size = WriteBatchInternal : : AppendedByteSize (
total_byte_size , WriteBatchInternal : : ByteSize ( b ) ) ;
parallel = parallel & & ! b - > HasMerge ( ) ;
total_byte_size , WriteBatchInternal : : ByteSize ( writer - > batch ) ) ;
parallel = parallel & & ! writer - > batch - > HasMerge ( ) ;
}
}
const SequenceNumber current_sequence = last_sequence + 1 ;
@ -4305,10 +4414,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// Record statistics
RecordTick ( stats_ , NUMBER_KEYS_WRITTEN , total_count ) ;
RecordTick ( stats_ , BYTES_WRITTEN , total_byte_size ) ;
MeasureTime ( stats_ , BYTES_PER_WRITE , total_byte_size ) ;
PERF_TIMER_STOP ( write_pre_and_post_process_time ) ;
if ( write_options . disableWAL ) {
flush_on_destroy _ = true ;
has_unpersisted_data _ = true ;
}
uint64_t log_size = 0 ;
@ -4316,21 +4426,22 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_GUARD ( write_wal_time ) ;
WriteBatch * merged_batch = nullptr ;
if ( write_batch_ group . size ( ) = = 1 ) {
merged_batch = write_batch_ group [ 0 ] ;
if ( write_group . size ( ) = = 1 & & ! write_group [ 0 ] - > CallbackFailed ( ) ) {
merged_batch = write_group [ 0 ] - > batch ;
} else {
// WAL needs all of the batches flattened into a single batch.
// We could avoid copying here with an iov-like AddRecord
// interface
merged_batch = & tmp_batch_ ;
for ( auto b : write_batch_group ) {
WriteBatchInternal : : Append ( merged_batch , b ) ;
for ( auto writer : write_group ) {
if ( ! writer - > CallbackFailed ( ) ) {
WriteBatchInternal : : Append ( merged_batch , writer - > batch ) ;
}
}
}
WriteBatchInternal : : SetSequence ( merged_batch , current_sequence ) ;
assert ( WriteBatchInternal : : Count ( merged_batch ) = = total_count ) ;
assert ( WriteBatchInternal : : ByteSize ( merged_batch ) = = total_byte_size ) ;
Slice log_entry = WriteBatchInternal : : Contents ( merged_batch ) ;
status = logs_ . back ( ) . writer - > AddRecord ( log_entry ) ;
@ -4385,7 +4496,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
stats - > AddDBStats ( InternalStats : : WAL_FILE_BYTES , log_size ) ;
}
uint64_t for_other = write_batch_ group . size ( ) - 1 ;
uint64_t for_other = write_group . size ( ) - 1 ;
if ( for_other > 0 ) {
stats - > AddDBStats ( InternalStats : : WRITE_DONE_BY_OTHER , for_other ) ;
if ( ! write_options . disableWAL ) {
@ -4396,18 +4507,28 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if ( ! parallel ) {
status = WriteBatchInternal : : InsertInto (
write_batch_ group , current_sequence , column_family_memtables_ . get ( ) ,
write_group , current_sequence , column_family_memtables_ . get ( ) ,
& flush_scheduler_ , write_options . ignore_missing_column_families ,
0 /*log_number*/ , this , false /*dont_filter_deletes*/ ) ;
if ( status . ok ( ) ) {
// There were no write failures. Set leader's status
// in case the write callback returned a non-ok status.
status = w . FinalStatus ( ) ;
}
} else {
WriteThread : : ParallelGroup pg ;
pg . leader = & w ;
pg . last_writer = last_writer ;
pg . last_sequence = last_sequence ;
pg . early_exit_allowed = ! need_log_sync ;
pg . running . store ( static_cast < uint32_t > ( write_batch_group . size ( ) ) ,
pg . running . store ( static_cast < uint32_t > ( write_group . size ( ) ) ,
std : : memory_order_relaxed ) ;
write_thread_ . LaunchParallelFollowers ( & pg , current_sequence ) ;
if ( ! w . CallbackFailed ( ) ) {
// do leader write
ColumnFamilyMemTablesImpl column_family_memtables (
versions_ - > GetColumnFamilySet ( ) ) ;
assert ( w . sequence = = current_sequence ) ;
@ -4417,20 +4538,19 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
write_options . ignore_missing_column_families , 0 /*log_number*/ ,
this , true /*dont_filter_deletes*/ ,
true /*concurrent_memtable_writes*/ ) ;
}
assert ( last_writer - > sequence = = last_sequence ) ;
// CompleteParallelWorker returns true if this thread should
// handle exit, false means somebody else did
exit_completed_early = ! write_thread_ . CompleteParallelWorker ( & w ) ;
status = w . status ;
assert ( status . ok ( ) | | ! exit_completed_early ) ;
status = w . FinalStatus ( ) ;
}
if ( status . ok ( ) & & ! exit_completed_early ) {
if ( ! exit_completed_early & & w . status . ok ( ) ) {
SetTickerCount ( stats_ , SEQUENCE_NUMBER , last_sequence ) ;
versions_ - > SetLastSequence ( last_sequence ) ;
if ( ! need_log_sync ) {
write_thread_ . ExitAsBatchGroupLeader ( & w , last_writer , status ) ;
write_thread_ . ExitAsBatchGroupLeader ( & w , last_writer , w . status ) ;
exit_completed_early = true ;
}
}
@ -4443,14 +4563,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
//
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
if ( ! status . ok ( ) & & bg_error_ . ok ( ) ) {
if ( ! status . ok ( ) & & bg_error_ . ok ( ) & & ! w . CallbackFailed ( ) ) {
bg_error_ = status ;
}
}
}
PERF_TIMER_START ( write_pre_and_post_process_time ) ;
if ( db_options_ . paranoid_checks & & ! status . ok ( ) & & ! callback_failed & &
if ( db_options_ . paranoid_checks & & ! status . ok ( ) & & ! w . CallbackFailed ( ) & &
! status . IsBusy ( ) ) {
mutex_ . Lock ( ) ;
if ( bg_error_ . ok ( ) ) {
@ -4466,7 +4586,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
if ( ! exit_completed_early ) {
write_thread_ . ExitAsBatchGroupLeader ( & w , last_writer , status ) ;
write_thread_ . ExitAsBatchGroupLeader ( & w , last_writer , w . status ) ;
}
return status ;
@ -4678,53 +4798,51 @@ const DBOptions& DBImpl::GetDBOptions() const { return db_options_; }
bool DBImpl : : GetProperty ( ColumnFamilyHandle * column_family ,
const Slice & property , std : : string * value ) {
bool is_int_property = false ;
bool need_out_of_mutex = false ;
DBPropertyType property_type =
GetPropertyType ( property , & is_int_property , & need_out_of_mutex ) ;
const DBPropertyInfo * property_info = GetPropertyInfo ( property ) ;
value - > clear ( ) ;
auto cfd = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) - > cfd ( ) ;
if ( is_int_property ) {
if ( property_info = = nullptr ) {
return false ;
} else if ( property_info - > handle_int ) {
uint64_t int_value ;
bool ret_value = GetIntPropertyInternal (
cfd , property_type , need_out_of_mutex , false , & int_value ) ;
bool ret_value =
GetIntPropertyInternal ( cfd , * property_info , false , & int_value ) ;
if ( ret_value ) {
* value = ToString ( int_value ) ;
}
return ret_value ;
} else {
} else if ( property_info - > handle_string ) {
InstrumentedMutexLock l ( & mutex_ ) ;
return cfd - > internal_stats ( ) - > GetStringProperty ( property_type , property ,
return cfd - > internal_stats ( ) - > GetStringProperty ( * property_info , property ,
value ) ;
}
// Shouldn't reach here since exactly one of handle_string and handle_int
// should be non-nullptr.
assert ( false ) ;
return false ;
}
bool DBImpl : : GetIntProperty ( ColumnFamilyHandle * column_family ,
const Slice & property , uint64_t * value ) {
bool is_int_property = false ;
bool need_out_of_mutex = false ;
DBPropertyType property_type =
GetPropertyType ( property , & is_int_property , & need_out_of_mutex ) ;
if ( ! is_int_property ) {
const DBPropertyInfo * property_info = GetPropertyInfo ( property ) ;
if ( property_info = = nullptr | | property_info - > handle_int = = nullptr ) {
return false ;
}
auto cfd = reinterpret_cast < ColumnFamilyHandleImpl * > ( column_family ) - > cfd ( ) ;
return GetIntPropertyInternal ( cfd , property_type , need_out_of_mutex , false ,
value ) ;
return GetIntPropertyInternal ( cfd , * property_info , false , value ) ;
}
bool DBImpl : : GetIntPropertyInternal ( ColumnFamilyData * cfd ,
DBPropertyType property_type ,
bool need_out_of_mutex , bool is_locked ,
uint64_t * value ) {
if ( ! need_out_of_mutex ) {
const DBPropertyInfo & property_info ,
bool is_locked , uint64_t * value ) {
assert ( property_info . handle_int ! = nullptr ) ;
if ( ! property_info . need_out_of_mutex ) {
if ( is_locked ) {
mutex_ . AssertHeld ( ) ;
return cfd - > internal_stats ( ) - > GetIntProperty ( property_type , value , this ) ;
return cfd - > internal_stats ( ) - > GetIntProperty ( property_info , value , this ) ;
} else {
InstrumentedMutexLock l ( & mutex_ ) ;
return cfd - > internal_stats ( ) - > GetIntProperty ( property_type , value , this ) ;
return cfd - > internal_stats ( ) - > GetIntProperty ( property_info , value , this ) ;
}
} else {
SuperVersion * sv = nullptr ;
@ -4735,7 +4853,7 @@ bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
}
bool ret = cfd - > internal_stats ( ) - > GetIntPropertyOutOfMutex (
property_type , sv - > current , value ) ;
property_info , sv - > current , value ) ;
if ( ! is_locked ) {
ReturnAndCleanupSuperVersion ( cfd , sv ) ;
@ -4747,11 +4865,8 @@ bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
bool DBImpl : : GetAggregatedIntProperty ( const Slice & property ,
uint64_t * aggregated_value ) {
bool need_out_of_mutex ;
bool is_int_property ;
DBPropertyType property_type =
GetPropertyType ( property , & is_int_property , & need_out_of_mutex ) ;
if ( ! is_int_property ) {
const DBPropertyInfo * property_info = GetPropertyInfo ( property ) ;
if ( property_info = = nullptr | | property_info - > handle_int = = nullptr ) {
return false ;
}
@ -4761,8 +4876,7 @@ bool DBImpl::GetAggregatedIntProperty(const Slice& property,
InstrumentedMutexLock l ( & mutex_ ) ;
uint64_t value ;
for ( auto * cfd : * versions_ - > GetColumnFamilySet ( ) ) {
if ( GetIntPropertyInternal ( cfd , property_type , need_out_of_mutex , true ,
& value ) ) {
if ( GetIntPropertyInternal ( cfd , * property_info , true , & value ) ) {
sum + = value ;
} else {
return false ;
@ -5414,6 +5528,25 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
}
impl - > mutex_ . Unlock ( ) ;
auto sfm = static_cast < SstFileManagerImpl * > (
impl - > db_options_ . sst_file_manager . get ( ) ) ;
if ( s . ok ( ) & & sfm ) {
// Notify SstFileManager about all sst files that already exist in
// db_paths[0] when the DB is opened.
auto & db_path = impl - > db_options_ . db_paths [ 0 ] ;
std : : vector < std : : string > existing_files ;
impl - > db_options_ . env - > GetChildren ( db_path . path , & existing_files ) ;
for ( auto & file_name : existing_files ) {
uint64_t file_number ;
FileType file_type ;
std : : string file_path = db_path . path + " / " + file_name ;
if ( ParseFileName ( file_name , & file_number , & file_type ) & &
file_type = = kTableFile ) {
sfm - > OnAddFile ( file_path ) ;
}
}
}
if ( s . ok ( ) ) {
Log ( InfoLogLevel : : INFO_LEVEL , impl - > db_options_ . info_log , " DB pointer %p " ,
impl ) ;
@ -5473,7 +5606,7 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
if ( type = = kMetaDatabase ) {
del = DestroyDB ( path_to_delete , options ) ;
} else if ( type = = kTableFile ) {
del = DeleteOrMoveToTrash ( & options , path_to_delete ) ;
del = DeleteSSTFile ( & options , path_to_delete , 0 ) ;
} else {
del = env - > DeleteFile ( path_to_delete ) ;
}
@ -5489,13 +5622,9 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
for ( size_t i = 0 ; i < filenames . size ( ) ; i + + ) {
if ( ParseFileName ( filenames [ i ] , & number , & type ) & &
type = = kTableFile ) { // Lock file will be deleted at end
Status del ;
std : : string table_path = db_path . path + " / " + filenames [ i ] ;
if ( path_id = = 0 ) {
del = DeleteOrMoveToTrash ( & options , table_path ) ;
} else {
del = env - > DeleteFile ( table_path ) ;
}
Status del = DeleteSSTFile ( & options , table_path ,
static_cast < uint32_t > ( path_id ) ) ;
if ( result . ok ( ) & & ! del . ok ( ) ) {
result = del ;
}
@ -5650,7 +5779,8 @@ Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
void DBImpl : : NewThreadStatusCfInfo (
ColumnFamilyData * cfd ) const {
if ( db_options_ . enable_thread_tracking ) {
ThreadStatusUtil : : NewColumnFamilyInfo ( this , cfd ) ;
ThreadStatusUtil : : NewColumnFamilyInfo ( this , cfd , cfd - > GetName ( ) ,
cfd - > ioptions ( ) - > env ) ;
}
}