|
|
@ -300,6 +300,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
DBImpl::~DBImpl() { |
|
|
|
DBImpl::~DBImpl() { |
|
|
|
|
|
|
|
std::vector<MemTable*> to_delete; |
|
|
|
|
|
|
|
to_delete.reserve(options_.max_write_buffer_number); |
|
|
|
|
|
|
|
|
|
|
|
// Wait for background work to finish
|
|
|
|
// Wait for background work to finish
|
|
|
|
if (flush_on_destroy_ && mem_->GetFirstSequenceNumber() != 0) { |
|
|
|
if (flush_on_destroy_ && mem_->GetFirstSequenceNumber() != 0) { |
|
|
|
FlushMemTable(FlushOptions()); |
|
|
|
FlushMemTable(FlushOptions()); |
|
|
@ -317,8 +320,14 @@ DBImpl::~DBImpl() { |
|
|
|
env_->UnlockFile(db_lock_); |
|
|
|
env_->UnlockFile(db_lock_); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (mem_ != nullptr) mem_->Unref(); |
|
|
|
if (mem_ != nullptr) { |
|
|
|
imm_.UnrefAll(); |
|
|
|
delete mem_->Unref(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
imm_.UnrefAll(&to_delete); |
|
|
|
|
|
|
|
for (MemTable* m: to_delete) { |
|
|
|
|
|
|
|
delete m; |
|
|
|
|
|
|
|
} |
|
|
|
LogFlush(options_.info_log); |
|
|
|
LogFlush(options_.info_log); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -404,7 +413,7 @@ const Status DBImpl::CreateArchivalDirectory() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void DBImpl::PrintStatistics() { |
|
|
|
void DBImpl::PrintStatistics() { |
|
|
|
auto dbstats = options_.statistics; |
|
|
|
auto dbstats = options_.statistics.get(); |
|
|
|
if (dbstats) { |
|
|
|
if (dbstats) { |
|
|
|
Log(options_.info_log, |
|
|
|
Log(options_.info_log, |
|
|
|
"STATISTCS:\n %s", |
|
|
|
"STATISTCS:\n %s", |
|
|
@ -860,7 +869,7 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table, |
|
|
|
if (versions_->LastSequence() < max_sequence) { |
|
|
|
if (versions_->LastSequence() < max_sequence) { |
|
|
|
versions_->SetLastSequence(max_sequence); |
|
|
|
versions_->SetLastSequence(max_sequence); |
|
|
|
} |
|
|
|
} |
|
|
|
SetTickerCount(options_.statistics, SEQUENCE_NUMBER, |
|
|
|
SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER, |
|
|
|
versions_->LastSequence()); |
|
|
|
versions_->LastSequence()); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -954,7 +963,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, |
|
|
|
// file-systems cause the DB::Open() to fail.
|
|
|
|
// file-systems cause the DB::Open() to fail.
|
|
|
|
break; |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
mem->Unref(); |
|
|
|
delete mem->Unref(); |
|
|
|
mem = nullptr; |
|
|
|
mem = nullptr; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -965,7 +974,9 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, |
|
|
|
// file-systems cause the DB::Open() to fail.
|
|
|
|
// file-systems cause the DB::Open() to fail.
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (mem != nullptr && !external_table) mem->Unref(); |
|
|
|
if (mem != nullptr && !external_table) { |
|
|
|
|
|
|
|
delete mem->Unref(); |
|
|
|
|
|
|
|
} |
|
|
|
return status; |
|
|
|
return status; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -1297,7 +1308,7 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const { |
|
|
|
Status DBImpl::GetUpdatesSince(SequenceNumber seq, |
|
|
|
Status DBImpl::GetUpdatesSince(SequenceNumber seq, |
|
|
|
unique_ptr<TransactionLogIterator>* iter) { |
|
|
|
unique_ptr<TransactionLogIterator>* iter) { |
|
|
|
|
|
|
|
|
|
|
|
RecordTick(options_.statistics, GET_UPDATES_SINCE_CALLS); |
|
|
|
RecordTick(options_.statistics.get(), GET_UPDATES_SINCE_CALLS); |
|
|
|
if (seq > versions_->LastSequence()) { |
|
|
|
if (seq > versions_->LastSequence()) { |
|
|
|
return Status::IOError("Requested sequence not yet written in the db"); |
|
|
|
return Status::IOError("Requested sequence not yet written in the db"); |
|
|
|
} |
|
|
|
} |
|
|
@ -1971,10 +1982,12 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, |
|
|
|
// Finish and check for file errors
|
|
|
|
// Finish and check for file errors
|
|
|
|
if (s.ok() && !options_.disableDataSync) { |
|
|
|
if (s.ok() && !options_.disableDataSync) { |
|
|
|
if (options_.use_fsync) { |
|
|
|
if (options_.use_fsync) { |
|
|
|
StopWatch sw(env_, options_.statistics, COMPACTION_OUTFILE_SYNC_MICROS); |
|
|
|
StopWatch sw(env_, options_.statistics.get(), |
|
|
|
|
|
|
|
COMPACTION_OUTFILE_SYNC_MICROS); |
|
|
|
s = compact->outfile->Fsync(); |
|
|
|
s = compact->outfile->Fsync(); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
StopWatch sw(env_, options_.statistics, COMPACTION_OUTFILE_SYNC_MICROS); |
|
|
|
StopWatch sw(env_, options_.statistics.get(), |
|
|
|
|
|
|
|
COMPACTION_OUTFILE_SYNC_MICROS); |
|
|
|
s = compact->outfile->Sync(); |
|
|
|
s = compact->outfile->Sync(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -2212,7 +2225,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, |
|
|
|
ParseInternalKey(key, &ikey); |
|
|
|
ParseInternalKey(key, &ikey); |
|
|
|
// no value associated with delete
|
|
|
|
// no value associated with delete
|
|
|
|
value.clear(); |
|
|
|
value.clear(); |
|
|
|
RecordTick(options_.statistics, COMPACTION_KEY_DROP_USER); |
|
|
|
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_USER); |
|
|
|
} else if (value_changed) { |
|
|
|
} else if (value_changed) { |
|
|
|
value = compaction_filter_value; |
|
|
|
value = compaction_filter_value; |
|
|
|
} |
|
|
|
} |
|
|
@ -2238,7 +2251,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, |
|
|
|
// TODO: why not > ?
|
|
|
|
// TODO: why not > ?
|
|
|
|
assert(last_sequence_for_key >= ikey.sequence); |
|
|
|
assert(last_sequence_for_key >= ikey.sequence); |
|
|
|
drop = true; // (A)
|
|
|
|
drop = true; // (A)
|
|
|
|
RecordTick(options_.statistics, COMPACTION_KEY_DROP_NEWER_ENTRY); |
|
|
|
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_NEWER_ENTRY); |
|
|
|
} else if (ikey.type == kTypeDeletion && |
|
|
|
} else if (ikey.type == kTypeDeletion && |
|
|
|
ikey.sequence <= earliest_snapshot && |
|
|
|
ikey.sequence <= earliest_snapshot && |
|
|
|
compact->compaction->IsBaseLevelForKey(ikey.user_key)) { |
|
|
|
compact->compaction->IsBaseLevelForKey(ikey.user_key)) { |
|
|
@ -2250,7 +2263,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, |
|
|
|
// few iterations of this loop (by rule (A) above).
|
|
|
|
// few iterations of this loop (by rule (A) above).
|
|
|
|
// Therefore this deletion marker is obsolete and can be dropped.
|
|
|
|
// Therefore this deletion marker is obsolete and can be dropped.
|
|
|
|
drop = true; |
|
|
|
drop = true; |
|
|
|
RecordTick(options_.statistics, COMPACTION_KEY_DROP_OBSOLETE); |
|
|
|
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_OBSOLETE); |
|
|
|
} else if (ikey.type == kTypeMerge) { |
|
|
|
} else if (ikey.type == kTypeMerge) { |
|
|
|
// We know the merge type entry is not hidden, otherwise we would
|
|
|
|
// We know the merge type entry is not hidden, otherwise we would
|
|
|
|
// have hit (A)
|
|
|
|
// have hit (A)
|
|
|
@ -2259,7 +2272,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, |
|
|
|
// logic could also be nicely re-used for memtable flush purge
|
|
|
|
// logic could also be nicely re-used for memtable flush purge
|
|
|
|
// optimization in BuildTable.
|
|
|
|
// optimization in BuildTable.
|
|
|
|
merge.MergeUntil(input.get(), prev_snapshot, bottommost_level, |
|
|
|
merge.MergeUntil(input.get(), prev_snapshot, bottommost_level, |
|
|
|
options_.statistics); |
|
|
|
options_.statistics.get()); |
|
|
|
current_entry_is_merging = true; |
|
|
|
current_entry_is_merging = true; |
|
|
|
if (merge.IsSuccess()) { |
|
|
|
if (merge.IsSuccess()) { |
|
|
|
// Successfully found Put/Delete/(end-of-key-range) while merging
|
|
|
|
// Successfully found Put/Delete/(end-of-key-range) while merging
|
|
|
@ -2412,8 +2425,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, |
|
|
|
|
|
|
|
|
|
|
|
CompactionStats stats; |
|
|
|
CompactionStats stats; |
|
|
|
stats.micros = env_->NowMicros() - start_micros - imm_micros; |
|
|
|
stats.micros = env_->NowMicros() - start_micros - imm_micros; |
|
|
|
if (options_.statistics) { |
|
|
|
if (options_.statistics.get()) { |
|
|
|
options_.statistics->measureTime(COMPACTION_TIME, stats.micros); |
|
|
|
options_.statistics.get()->measureTime(COMPACTION_TIME, stats.micros); |
|
|
|
} |
|
|
|
} |
|
|
|
stats.files_in_leveln = compact->compaction->num_input_files(0); |
|
|
|
stats.files_in_leveln = compact->compaction->num_input_files(0); |
|
|
|
stats.files_in_levelnp1 = compact->compaction->num_input_files(1); |
|
|
|
stats.files_in_levelnp1 = compact->compaction->num_input_files(1); |
|
|
@ -2478,9 +2491,14 @@ struct IterState { |
|
|
|
|
|
|
|
|
|
|
|
static void CleanupIteratorState(void* arg1, void* arg2) { |
|
|
|
static void CleanupIteratorState(void* arg1, void* arg2) { |
|
|
|
IterState* state = reinterpret_cast<IterState*>(arg1); |
|
|
|
IterState* state = reinterpret_cast<IterState*>(arg1); |
|
|
|
|
|
|
|
std::vector<MemTable*> to_delete; |
|
|
|
|
|
|
|
to_delete.reserve(state->mem.size()); |
|
|
|
state->mu->Lock(); |
|
|
|
state->mu->Lock(); |
|
|
|
for (unsigned int i = 0; i < state->mem.size(); i++) { |
|
|
|
for (unsigned int i = 0; i < state->mem.size(); i++) { |
|
|
|
state->mem[i]->Unref(); |
|
|
|
MemTable* m = state->mem[i]->Unref(); |
|
|
|
|
|
|
|
if (m != nullptr) { |
|
|
|
|
|
|
|
to_delete.push_back(m); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
state->version->Unref(); |
|
|
|
state->version->Unref(); |
|
|
|
// delete only the sst obsolete files
|
|
|
|
// delete only the sst obsolete files
|
|
|
@ -2489,6 +2507,9 @@ static void CleanupIteratorState(void* arg1, void* arg2) { |
|
|
|
state->db->FindObsoleteFiles(deletion_state, false, true); |
|
|
|
state->db->FindObsoleteFiles(deletion_state, false, true); |
|
|
|
state->mu->Unlock(); |
|
|
|
state->mu->Unlock(); |
|
|
|
state->db->PurgeObsoleteFiles(deletion_state); |
|
|
|
state->db->PurgeObsoleteFiles(deletion_state); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// delete obsolete memtables outside the db-mutex
|
|
|
|
|
|
|
|
for (MemTable* m : to_delete) delete m; |
|
|
|
delete state; |
|
|
|
delete state; |
|
|
|
} |
|
|
|
} |
|
|
|
} // namespace
|
|
|
|
} // namespace
|
|
|
@ -2554,10 +2575,12 @@ Status DBImpl::GetImpl(const ReadOptions& options, |
|
|
|
bool* value_found) { |
|
|
|
bool* value_found) { |
|
|
|
Status s; |
|
|
|
Status s; |
|
|
|
|
|
|
|
|
|
|
|
StopWatch sw(env_, options_.statistics, DB_GET); |
|
|
|
StopWatch sw(env_, options_.statistics.get(), DB_GET); |
|
|
|
StopWatchNano snapshot_timer(env_, false); |
|
|
|
StopWatchNano snapshot_timer(env_, false); |
|
|
|
StartPerfTimer(&snapshot_timer); |
|
|
|
StartPerfTimer(&snapshot_timer); |
|
|
|
SequenceNumber snapshot; |
|
|
|
SequenceNumber snapshot; |
|
|
|
|
|
|
|
std::vector<MemTable*> to_delete; |
|
|
|
|
|
|
|
to_delete.reserve(options_.max_write_buffer_number); |
|
|
|
mutex_.Lock(); |
|
|
|
mutex_.Lock(); |
|
|
|
if (options.snapshot != nullptr) { |
|
|
|
if (options.snapshot != nullptr) { |
|
|
|
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_; |
|
|
|
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_; |
|
|
@ -2608,28 +2631,35 @@ Status DBImpl::GetImpl(const ReadOptions& options, |
|
|
|
have_stat_update && current->UpdateStats(stats)) { |
|
|
|
have_stat_update && current->UpdateStats(stats)) { |
|
|
|
MaybeScheduleFlushOrCompaction(); |
|
|
|
MaybeScheduleFlushOrCompaction(); |
|
|
|
} |
|
|
|
} |
|
|
|
mem->Unref(); |
|
|
|
MemTable* m = mem->Unref(); |
|
|
|
imm.UnrefAll(); |
|
|
|
imm.UnrefAll(&to_delete); |
|
|
|
current->Unref(); |
|
|
|
current->Unref(); |
|
|
|
mutex_.Unlock(); |
|
|
|
mutex_.Unlock(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// free up all obsolete memtables outside the mutex
|
|
|
|
|
|
|
|
delete m; |
|
|
|
|
|
|
|
for (MemTable* v: to_delete) delete v; |
|
|
|
|
|
|
|
|
|
|
|
LogFlush(options_.info_log); |
|
|
|
LogFlush(options_.info_log); |
|
|
|
// Note, tickers are atomic now - no lock protection needed any more.
|
|
|
|
// Note, tickers are atomic now - no lock protection needed any more.
|
|
|
|
RecordTick(options_.statistics, NUMBER_KEYS_READ); |
|
|
|
|
|
|
|
RecordTick(options_.statistics, BYTES_READ, value->size()); |
|
|
|
|
|
|
|
BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
RecordTick(options_.statistics.get(), NUMBER_KEYS_READ); |
|
|
|
|
|
|
|
RecordTick(options_.statistics.get(), BYTES_READ, value->size()); |
|
|
|
|
|
|
|
BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer); |
|
|
|
return s; |
|
|
|
return s; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
std::vector<Status> DBImpl::MultiGet(const ReadOptions& options, |
|
|
|
std::vector<Status> DBImpl::MultiGet(const ReadOptions& options, |
|
|
|
const std::vector<Slice>& keys, |
|
|
|
const std::vector<Slice>& keys, |
|
|
|
std::vector<std::string>* values) { |
|
|
|
std::vector<std::string>* values) { |
|
|
|
|
|
|
|
StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET); |
|
|
|
StopWatch sw(env_, options_.statistics, DB_MULTIGET); |
|
|
|
|
|
|
|
StopWatchNano snapshot_timer(env_, false); |
|
|
|
StopWatchNano snapshot_timer(env_, false); |
|
|
|
StartPerfTimer(&snapshot_timer); |
|
|
|
StartPerfTimer(&snapshot_timer); |
|
|
|
|
|
|
|
|
|
|
|
SequenceNumber snapshot; |
|
|
|
SequenceNumber snapshot; |
|
|
|
|
|
|
|
std::vector<MemTable*> to_delete; |
|
|
|
|
|
|
|
to_delete.reserve(options_.max_write_buffer_number); |
|
|
|
|
|
|
|
|
|
|
|
mutex_.Lock(); |
|
|
|
mutex_.Lock(); |
|
|
|
if (options.snapshot != nullptr) { |
|
|
|
if (options.snapshot != nullptr) { |
|
|
|
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_; |
|
|
|
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_; |
|
|
@ -2694,15 +2724,20 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options, |
|
|
|
have_stat_update && current->UpdateStats(stats)) { |
|
|
|
have_stat_update && current->UpdateStats(stats)) { |
|
|
|
MaybeScheduleFlushOrCompaction(); |
|
|
|
MaybeScheduleFlushOrCompaction(); |
|
|
|
} |
|
|
|
} |
|
|
|
mem->Unref(); |
|
|
|
MemTable* m = mem->Unref(); |
|
|
|
imm.UnrefAll(); |
|
|
|
imm.UnrefAll(&to_delete); |
|
|
|
current->Unref(); |
|
|
|
current->Unref(); |
|
|
|
mutex_.Unlock(); |
|
|
|
mutex_.Unlock(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// free up all obsolete memtables outside the mutex
|
|
|
|
|
|
|
|
delete m; |
|
|
|
|
|
|
|
for (MemTable* v: to_delete) delete v; |
|
|
|
|
|
|
|
|
|
|
|
LogFlush(options_.info_log); |
|
|
|
LogFlush(options_.info_log); |
|
|
|
RecordTick(options_.statistics, NUMBER_MULTIGET_CALLS); |
|
|
|
|
|
|
|
RecordTick(options_.statistics, NUMBER_MULTIGET_KEYS_READ, numKeys); |
|
|
|
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS); |
|
|
|
RecordTick(options_.statistics, NUMBER_MULTIGET_BYTES_READ, bytesRead); |
|
|
|
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, numKeys); |
|
|
|
|
|
|
|
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_BYTES_READ, bytesRead); |
|
|
|
BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer); |
|
|
|
BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer); |
|
|
|
|
|
|
|
|
|
|
|
return statList; |
|
|
|
return statList; |
|
|
@ -2780,7 +2815,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { |
|
|
|
w.disableWAL = options.disableWAL; |
|
|
|
w.disableWAL = options.disableWAL; |
|
|
|
w.done = false; |
|
|
|
w.done = false; |
|
|
|
|
|
|
|
|
|
|
|
StopWatch sw(env_, options_.statistics, DB_WRITE); |
|
|
|
StopWatch sw(env_, options_.statistics.get(), DB_WRITE); |
|
|
|
MutexLock l(&mutex_); |
|
|
|
MutexLock l(&mutex_); |
|
|
|
writers_.push_back(&w); |
|
|
|
writers_.push_back(&w); |
|
|
|
while (!w.done && &w != writers_.front()) { |
|
|
|
while (!w.done && &w != writers_.front()) { |
|
|
@ -2813,8 +2848,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { |
|
|
|
int my_batch_count = WriteBatchInternal::Count(updates); |
|
|
|
int my_batch_count = WriteBatchInternal::Count(updates); |
|
|
|
last_sequence += my_batch_count; |
|
|
|
last_sequence += my_batch_count; |
|
|
|
// Record statistics
|
|
|
|
// Record statistics
|
|
|
|
RecordTick(options_.statistics, NUMBER_KEYS_WRITTEN, my_batch_count); |
|
|
|
RecordTick(options_.statistics.get(), |
|
|
|
RecordTick(options_.statistics, |
|
|
|
NUMBER_KEYS_WRITTEN, my_batch_count); |
|
|
|
|
|
|
|
RecordTick(options_.statistics.get(), |
|
|
|
BYTES_WRITTEN, |
|
|
|
BYTES_WRITTEN, |
|
|
|
WriteBatchInternal::ByteSize(updates)); |
|
|
|
WriteBatchInternal::ByteSize(updates)); |
|
|
|
if (options.disableWAL) { |
|
|
|
if (options.disableWAL) { |
|
|
@ -2829,10 +2865,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { |
|
|
|
status = log_->AddRecord(WriteBatchInternal::Contents(updates)); |
|
|
|
status = log_->AddRecord(WriteBatchInternal::Contents(updates)); |
|
|
|
if (status.ok() && options.sync) { |
|
|
|
if (status.ok() && options.sync) { |
|
|
|
if (options_.use_fsync) { |
|
|
|
if (options_.use_fsync) { |
|
|
|
StopWatch(env_, options_.statistics, WAL_FILE_SYNC_MICROS); |
|
|
|
StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS); |
|
|
|
status = log_->file()->Fsync(); |
|
|
|
status = log_->file()->Fsync(); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
StopWatch(env_, options_.statistics, WAL_FILE_SYNC_MICROS); |
|
|
|
StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS); |
|
|
|
status = log_->file()->Sync(); |
|
|
|
status = log_->file()->Sync(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
@ -2851,7 +2887,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { |
|
|
|
// have succeeded in memtable but Status reports error for all writes.
|
|
|
|
// have succeeded in memtable but Status reports error for all writes.
|
|
|
|
throw std::runtime_error("In memory WriteBatch corruption!"); |
|
|
|
throw std::runtime_error("In memory WriteBatch corruption!"); |
|
|
|
} |
|
|
|
} |
|
|
|
SetTickerCount(options_.statistics, SEQUENCE_NUMBER, last_sequence); |
|
|
|
SetTickerCount(options_.statistics.get(), |
|
|
|
|
|
|
|
SEQUENCE_NUMBER, last_sequence); |
|
|
|
} |
|
|
|
} |
|
|
|
StartPerfTimer(&pre_post_process_timer); |
|
|
|
StartPerfTimer(&pre_post_process_timer); |
|
|
|
LogFlush(options_.info_log); |
|
|
|
LogFlush(options_.info_log); |
|
|
@ -3003,7 +3040,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { |
|
|
|
mutex_.Unlock(); |
|
|
|
mutex_.Unlock(); |
|
|
|
uint64_t delayed; |
|
|
|
uint64_t delayed; |
|
|
|
{ |
|
|
|
{ |
|
|
|
StopWatch sw(env_, options_.statistics, STALL_L0_SLOWDOWN_COUNT); |
|
|
|
StopWatch sw(env_, options_.statistics.get(), STALL_L0_SLOWDOWN_COUNT); |
|
|
|
env_->SleepForMicroseconds( |
|
|
|
env_->SleepForMicroseconds( |
|
|
|
SlowdownAmount(versions_->NumLevelFiles(0), |
|
|
|
SlowdownAmount(versions_->NumLevelFiles(0), |
|
|
|
options_.level0_slowdown_writes_trigger, |
|
|
|
options_.level0_slowdown_writes_trigger, |
|
|
@ -3011,7 +3048,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { |
|
|
|
); |
|
|
|
); |
|
|
|
delayed = sw.ElapsedMicros(); |
|
|
|
delayed = sw.ElapsedMicros(); |
|
|
|
} |
|
|
|
} |
|
|
|
RecordTick(options_.statistics, STALL_L0_SLOWDOWN_MICROS, delayed); |
|
|
|
RecordTick(options_.statistics.get(), STALL_L0_SLOWDOWN_MICROS, delayed); |
|
|
|
stall_level0_slowdown_ += delayed; |
|
|
|
stall_level0_slowdown_ += delayed; |
|
|
|
stall_level0_slowdown_count_++; |
|
|
|
stall_level0_slowdown_count_++; |
|
|
|
allow_delay = false; // Do not delay a single write more than once
|
|
|
|
allow_delay = false; // Do not delay a single write more than once
|
|
|
@ -3031,12 +3068,13 @@ Status DBImpl::MakeRoomForWrite(bool force) { |
|
|
|
Log(options_.info_log, "wait for memtable compaction...\n"); |
|
|
|
Log(options_.info_log, "wait for memtable compaction...\n"); |
|
|
|
uint64_t stall; |
|
|
|
uint64_t stall; |
|
|
|
{ |
|
|
|
{ |
|
|
|
StopWatch sw(env_, options_.statistics, |
|
|
|
StopWatch sw(env_, options_.statistics.get(), |
|
|
|
STALL_MEMTABLE_COMPACTION_COUNT); |
|
|
|
STALL_MEMTABLE_COMPACTION_COUNT); |
|
|
|
bg_cv_.Wait(); |
|
|
|
bg_cv_.Wait(); |
|
|
|
stall = sw.ElapsedMicros(); |
|
|
|
stall = sw.ElapsedMicros(); |
|
|
|
} |
|
|
|
} |
|
|
|
RecordTick(options_.statistics, STALL_MEMTABLE_COMPACTION_MICROS, stall); |
|
|
|
RecordTick(options_.statistics.get(), |
|
|
|
|
|
|
|
STALL_MEMTABLE_COMPACTION_MICROS, stall); |
|
|
|
stall_memtable_compaction_ += stall; |
|
|
|
stall_memtable_compaction_ += stall; |
|
|
|
stall_memtable_compaction_count_++; |
|
|
|
stall_memtable_compaction_count_++; |
|
|
|
} else if (versions_->NumLevelFiles(0) >= |
|
|
|
} else if (versions_->NumLevelFiles(0) >= |
|
|
@ -3046,11 +3084,12 @@ Status DBImpl::MakeRoomForWrite(bool force) { |
|
|
|
Log(options_.info_log, "wait for fewer level0 files...\n"); |
|
|
|
Log(options_.info_log, "wait for fewer level0 files...\n"); |
|
|
|
uint64_t stall; |
|
|
|
uint64_t stall; |
|
|
|
{ |
|
|
|
{ |
|
|
|
StopWatch sw(env_, options_.statistics, STALL_L0_NUM_FILES_COUNT); |
|
|
|
StopWatch sw(env_, options_.statistics.get(), |
|
|
|
|
|
|
|
STALL_L0_NUM_FILES_COUNT); |
|
|
|
bg_cv_.Wait(); |
|
|
|
bg_cv_.Wait(); |
|
|
|
stall = sw.ElapsedMicros(); |
|
|
|
stall = sw.ElapsedMicros(); |
|
|
|
} |
|
|
|
} |
|
|
|
RecordTick(options_.statistics, STALL_L0_NUM_FILES_MICROS, stall); |
|
|
|
RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall); |
|
|
|
stall_level0_num_files_ += stall; |
|
|
|
stall_level0_num_files_ += stall; |
|
|
|
stall_level0_num_files_count_++; |
|
|
|
stall_level0_num_files_count_++; |
|
|
|
} else if ( |
|
|
|
} else if ( |
|
|
@ -3062,7 +3101,8 @@ Status DBImpl::MakeRoomForWrite(bool force) { |
|
|
|
mutex_.Unlock(); |
|
|
|
mutex_.Unlock(); |
|
|
|
uint64_t delayed; |
|
|
|
uint64_t delayed; |
|
|
|
{ |
|
|
|
{ |
|
|
|
StopWatch sw(env_, options_.statistics, HARD_RATE_LIMIT_DELAY_COUNT); |
|
|
|
StopWatch sw(env_, options_.statistics.get(), |
|
|
|
|
|
|
|
HARD_RATE_LIMIT_DELAY_COUNT); |
|
|
|
env_->SleepForMicroseconds(1000); |
|
|
|
env_->SleepForMicroseconds(1000); |
|
|
|
delayed = sw.ElapsedMicros(); |
|
|
|
delayed = sw.ElapsedMicros(); |
|
|
|
} |
|
|
|
} |
|
|
@ -3071,7 +3111,8 @@ Status DBImpl::MakeRoomForWrite(bool force) { |
|
|
|
// Make sure the following value doesn't round to zero.
|
|
|
|
// Make sure the following value doesn't round to zero.
|
|
|
|
uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1); |
|
|
|
uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1); |
|
|
|
rate_limit_delay_millis += rate_limit; |
|
|
|
rate_limit_delay_millis += rate_limit; |
|
|
|
RecordTick(options_.statistics, RATE_LIMIT_DELAY_MILLIS, rate_limit); |
|
|
|
RecordTick(options_.statistics.get(), |
|
|
|
|
|
|
|
RATE_LIMIT_DELAY_MILLIS, rate_limit); |
|
|
|
if (options_.rate_limit_delay_max_milliseconds > 0 && |
|
|
|
if (options_.rate_limit_delay_max_milliseconds > 0 && |
|
|
|
rate_limit_delay_millis >= |
|
|
|
rate_limit_delay_millis >= |
|
|
|
(unsigned)options_.rate_limit_delay_max_milliseconds) { |
|
|
|
(unsigned)options_.rate_limit_delay_max_milliseconds) { |
|
|
@ -3086,7 +3127,8 @@ Status DBImpl::MakeRoomForWrite(bool force) { |
|
|
|
// TODO: add statistics
|
|
|
|
// TODO: add statistics
|
|
|
|
mutex_.Unlock(); |
|
|
|
mutex_.Unlock(); |
|
|
|
{ |
|
|
|
{ |
|
|
|
StopWatch sw(env_, options_.statistics, SOFT_RATE_LIMIT_DELAY_COUNT); |
|
|
|
StopWatch sw(env_, options_.statistics.get(), |
|
|
|
|
|
|
|
SOFT_RATE_LIMIT_DELAY_COUNT); |
|
|
|
env_->SleepForMicroseconds(SlowdownAmount( |
|
|
|
env_->SleepForMicroseconds(SlowdownAmount( |
|
|
|
score, |
|
|
|
score, |
|
|
|
options_.soft_rate_limit, |
|
|
|
options_.soft_rate_limit, |
|
|
@ -3096,27 +3138,40 @@ Status DBImpl::MakeRoomForWrite(bool force) { |
|
|
|
} |
|
|
|
} |
|
|
|
allow_soft_rate_limit_delay = false; |
|
|
|
allow_soft_rate_limit_delay = false; |
|
|
|
mutex_.Lock(); |
|
|
|
mutex_.Lock(); |
|
|
|
|
|
|
|
|
|
|
|
} else { |
|
|
|
} else { |
|
|
|
// Attempt to switch to a new memtable and trigger compaction of old
|
|
|
|
unique_ptr<WritableFile> lfile; |
|
|
|
DelayLoggingAndReset(); |
|
|
|
MemTable* memtmp = nullptr; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Attempt to switch to a new memtable and trigger compaction of old.
|
|
|
|
|
|
|
|
// Do this without holding the dbmutex lock.
|
|
|
|
assert(versions_->PrevLogNumber() == 0); |
|
|
|
assert(versions_->PrevLogNumber() == 0); |
|
|
|
uint64_t new_log_number = versions_->NewFileNumber(); |
|
|
|
uint64_t new_log_number = versions_->NewFileNumber(); |
|
|
|
unique_ptr<WritableFile> lfile; |
|
|
|
mutex_.Unlock(); |
|
|
|
EnvOptions soptions(storage_options_); |
|
|
|
{ |
|
|
|
soptions.use_mmap_writes = false; |
|
|
|
EnvOptions soptions(storage_options_); |
|
|
|
s = env_->NewWritableFile( |
|
|
|
soptions.use_mmap_writes = false; |
|
|
|
|
|
|
|
DelayLoggingAndReset(); |
|
|
|
|
|
|
|
s = env_->NewWritableFile( |
|
|
|
LogFileName(options_.wal_dir, new_log_number), |
|
|
|
LogFileName(options_.wal_dir, new_log_number), |
|
|
|
&lfile, |
|
|
|
&lfile, |
|
|
|
soptions |
|
|
|
soptions |
|
|
|
); |
|
|
|
); |
|
|
|
|
|
|
|
if (s.ok()) { |
|
|
|
|
|
|
|
// Our final size should be less than write_buffer_size
|
|
|
|
|
|
|
|
// (compression, etc) but err on the side of caution.
|
|
|
|
|
|
|
|
lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size); |
|
|
|
|
|
|
|
memtmp = new MemTable( |
|
|
|
|
|
|
|
internal_comparator_, mem_rep_factory_, NumberLevels(), options_); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
mutex_.Lock(); |
|
|
|
if (!s.ok()) { |
|
|
|
if (!s.ok()) { |
|
|
|
// Avoid chewing through file number space in a tight loop.
|
|
|
|
// Avoid chewing through file number space in a tight loop.
|
|
|
|
versions_->ReuseFileNumber(new_log_number); |
|
|
|
versions_->ReuseFileNumber(new_log_number); |
|
|
|
|
|
|
|
assert (!memtmp); |
|
|
|
break; |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
// Our final size should be less than write_buffer_size
|
|
|
|
|
|
|
|
// (compression, etc) but err on the side of caution.
|
|
|
|
|
|
|
|
lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size); |
|
|
|
|
|
|
|
logfile_number_ = new_log_number; |
|
|
|
logfile_number_ = new_log_number; |
|
|
|
log_.reset(new log::Writer(std::move(lfile))); |
|
|
|
log_.reset(new log::Writer(std::move(lfile))); |
|
|
|
mem_->SetNextLogNumber(logfile_number_); |
|
|
|
mem_->SetNextLogNumber(logfile_number_); |
|
|
@ -3124,8 +3179,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { |
|
|
|
if (force) { |
|
|
|
if (force) { |
|
|
|
imm_.FlushRequested(); |
|
|
|
imm_.FlushRequested(); |
|
|
|
} |
|
|
|
} |
|
|
|
mem_ = new MemTable( |
|
|
|
mem_ = memtmp; |
|
|
|
internal_comparator_, mem_rep_factory_, NumberLevels(), options_); |
|
|
|
|
|
|
|
mem_->Ref(); |
|
|
|
mem_->Ref(); |
|
|
|
Log(options_.info_log, |
|
|
|
Log(options_.info_log, |
|
|
|
"New memtable created with log file: #%lu\n", |
|
|
|
"New memtable created with log file: #%lu\n", |
|
|
@ -3138,6 +3192,14 @@ Status DBImpl::MakeRoomForWrite(bool force) { |
|
|
|
return s; |
|
|
|
return s; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Env* DBImpl::GetEnv() const { |
|
|
|
|
|
|
|
return env_; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const Options& DBImpl::GetOptions() const { |
|
|
|
|
|
|
|
return options_; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
bool DBImpl::GetProperty(const Slice& property, std::string* value) { |
|
|
|
bool DBImpl::GetProperty(const Slice& property, std::string* value) { |
|
|
|
value->clear(); |
|
|
|
value->clear(); |
|
|
|
|
|
|
|
|
|
|
|