|
|
|
@ -1083,6 +1083,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, |
|
|
|
|
// Since we already recovered log_number, we want all logs
|
|
|
|
|
// with numbers `<= log_number` (includes this one) to be ignored
|
|
|
|
|
edit.SetLogNumber(log_number + 1); |
|
|
|
|
// we must mark the next log number as used, even though it's
|
|
|
|
|
// not actually used. that is because VersionSet assumes
|
|
|
|
|
// VersionSet::next_file_number_ always to be strictly greater than any log
|
|
|
|
|
// number
|
|
|
|
|
versions_->MarkFileNumberUsed(log_number + 1); |
|
|
|
|
status = versions_->LogAndApply(&edit, &mutex_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -2676,33 +2681,29 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, |
|
|
|
|
|
|
|
|
|
namespace { |
|
|
|
|
struct IterState { |
|
|
|
|
IterState(DBImpl* db, port::Mutex* mu, DBImpl::SuperVersion* super_version) |
|
|
|
|
: db(db), mu(mu), super_version(super_version) {} |
|
|
|
|
|
|
|
|
|
DBImpl* db; |
|
|
|
|
port::Mutex* mu; |
|
|
|
|
Version* version = nullptr; |
|
|
|
|
MemTable* mem = nullptr; |
|
|
|
|
MemTableListVersion* imm = nullptr; |
|
|
|
|
DBImpl *db; |
|
|
|
|
DBImpl::SuperVersion* super_version; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
static void CleanupIteratorState(void* arg1, void* arg2) { |
|
|
|
|
IterState* state = reinterpret_cast<IterState*>(arg1); |
|
|
|
|
DBImpl::DeletionState deletion_state; |
|
|
|
|
state->mu->Lock(); |
|
|
|
|
if (state->mem) { // not set for immutable iterator
|
|
|
|
|
MemTable* m = state->mem->Unref(); |
|
|
|
|
if (m != nullptr) { |
|
|
|
|
deletion_state.memtables_to_free.push_back(m); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (state->version) { // not set for memtable-only iterator
|
|
|
|
|
state->version->Unref(); |
|
|
|
|
} |
|
|
|
|
if (state->imm) { // not set for memtable-only iterator
|
|
|
|
|
state->imm->Unref(&deletion_state.memtables_to_free); |
|
|
|
|
DBImpl::DeletionState deletion_state(state->db->GetOptions(). |
|
|
|
|
max_write_buffer_number); |
|
|
|
|
|
|
|
|
|
bool need_cleanup = state->super_version->Unref(); |
|
|
|
|
if (need_cleanup) { |
|
|
|
|
state->mu->Lock(); |
|
|
|
|
state->super_version->Cleanup(); |
|
|
|
|
state->db->FindObsoleteFiles(deletion_state, false, true); |
|
|
|
|
state->mu->Unlock(); |
|
|
|
|
|
|
|
|
|
delete state->super_version; |
|
|
|
|
state->db->PurgeObsoleteFiles(deletion_state); |
|
|
|
|
} |
|
|
|
|
// fast path FindObsoleteFiles
|
|
|
|
|
state->db->FindObsoleteFiles(deletion_state, false, true); |
|
|
|
|
state->mu->Unlock(); |
|
|
|
|
state->db->PurgeObsoleteFiles(deletion_state); |
|
|
|
|
|
|
|
|
|
delete state; |
|
|
|
|
} |
|
|
|
@ -2710,36 +2711,23 @@ static void CleanupIteratorState(void* arg1, void* arg2) { |
|
|
|
|
|
|
|
|
|
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, |
|
|
|
|
SequenceNumber* latest_snapshot) { |
|
|
|
|
IterState* cleanup = new IterState; |
|
|
|
|
MemTable* mutable_mem; |
|
|
|
|
MemTableListVersion* immutable_mems; |
|
|
|
|
Version* version; |
|
|
|
|
|
|
|
|
|
// Collect together all needed child iterators for mem
|
|
|
|
|
mutex_.Lock(); |
|
|
|
|
*latest_snapshot = versions_->LastSequence(); |
|
|
|
|
mem_->Ref(); |
|
|
|
|
mutable_mem = mem_; |
|
|
|
|
// Collect together all needed child iterators for imm_
|
|
|
|
|
immutable_mems = imm_.current(); |
|
|
|
|
immutable_mems->Ref(); |
|
|
|
|
versions_->current()->Ref(); |
|
|
|
|
version = versions_->current(); |
|
|
|
|
SuperVersion* super_version = super_version_->Ref(); |
|
|
|
|
mutex_.Unlock(); |
|
|
|
|
|
|
|
|
|
std::vector<Iterator*> iterator_list; |
|
|
|
|
iterator_list.push_back(mutable_mem->NewIterator(options)); |
|
|
|
|
cleanup->mem = mutable_mem; |
|
|
|
|
cleanup->imm = immutable_mems; |
|
|
|
|
// Collect iterator for mutable mem
|
|
|
|
|
iterator_list.push_back(super_version->mem->NewIterator(options)); |
|
|
|
|
// Collect all needed child iterators for immutable memtables
|
|
|
|
|
immutable_mems->AddIterators(options, &iterator_list); |
|
|
|
|
super_version->imm->AddIterators(options, &iterator_list); |
|
|
|
|
// Collect iterators for files in L0 - Ln
|
|
|
|
|
version->AddIterators(options, storage_options_, &iterator_list); |
|
|
|
|
super_version->current->AddIterators(options, storage_options_, |
|
|
|
|
&iterator_list); |
|
|
|
|
Iterator* internal_iter = NewMergingIterator( |
|
|
|
|
env_, &internal_comparator_, &iterator_list[0], iterator_list.size()); |
|
|
|
|
cleanup->version = version; |
|
|
|
|
cleanup->mu = &mutex_; |
|
|
|
|
cleanup->db = this; |
|
|
|
|
|
|
|
|
|
IterState* cleanup = new IterState(this, &mutex_, super_version); |
|
|
|
|
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); |
|
|
|
|
|
|
|
|
|
return internal_iter; |
|
|
|
@ -2754,53 +2742,36 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair( |
|
|
|
|
const ReadOptions& options, |
|
|
|
|
uint64_t* superversion_number) { |
|
|
|
|
|
|
|
|
|
MemTable* mutable_mem; |
|
|
|
|
MemTableListVersion* immutable_mems; |
|
|
|
|
Version* version; |
|
|
|
|
|
|
|
|
|
// get all child iterators and bump their refcounts under lock
|
|
|
|
|
mutex_.Lock(); |
|
|
|
|
mutable_mem = mem_; |
|
|
|
|
mutable_mem->Ref(); |
|
|
|
|
immutable_mems = imm_.current(); |
|
|
|
|
immutable_mems->Ref(); |
|
|
|
|
version = versions_->current(); |
|
|
|
|
version->Ref(); |
|
|
|
|
SuperVersion* super_version = super_version_->Ref(); |
|
|
|
|
if (superversion_number != nullptr) { |
|
|
|
|
*superversion_number = CurrentVersionNumber(); |
|
|
|
|
} |
|
|
|
|
mutex_.Unlock(); |
|
|
|
|
|
|
|
|
|
Iterator* mutable_iter = mutable_mem->NewIterator(options); |
|
|
|
|
IterState* mutable_cleanup = new IterState(); |
|
|
|
|
mutable_cleanup->mem = mutable_mem; |
|
|
|
|
mutable_cleanup->db = this; |
|
|
|
|
mutable_cleanup->mu = &mutex_; |
|
|
|
|
mutable_iter->RegisterCleanup(CleanupIteratorState, mutable_cleanup, nullptr); |
|
|
|
|
|
|
|
|
|
Iterator* mutable_iter = super_version->mem->NewIterator(options); |
|
|
|
|
// create a DBIter that only uses memtable content; see NewIterator()
|
|
|
|
|
mutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(), |
|
|
|
|
mutable_iter, kMaxSequenceNumber); |
|
|
|
|
|
|
|
|
|
Iterator* immutable_iter; |
|
|
|
|
IterState* immutable_cleanup = new IterState(); |
|
|
|
|
std::vector<Iterator*> list; |
|
|
|
|
immutable_mems->AddIterators(options, &list); |
|
|
|
|
immutable_cleanup->imm = immutable_mems; |
|
|
|
|
version->AddIterators(options, storage_options_, &list); |
|
|
|
|
immutable_cleanup->version = version; |
|
|
|
|
immutable_cleanup->db = this; |
|
|
|
|
immutable_cleanup->mu = &mutex_; |
|
|
|
|
|
|
|
|
|
immutable_iter = |
|
|
|
|
NewMergingIterator(env_, &internal_comparator_, &list[0], list.size()); |
|
|
|
|
immutable_iter->RegisterCleanup(CleanupIteratorState, immutable_cleanup, |
|
|
|
|
nullptr); |
|
|
|
|
super_version->imm->AddIterators(options, &list); |
|
|
|
|
super_version->current->AddIterators(options, storage_options_, &list); |
|
|
|
|
Iterator* immutable_iter = |
|
|
|
|
NewMergingIterator(env_, &internal_comparator_, &list[0], list.size()); |
|
|
|
|
|
|
|
|
|
// create a DBIter that only uses memtable content; see NewIterator()
|
|
|
|
|
immutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(), |
|
|
|
|
immutable_iter, kMaxSequenceNumber); |
|
|
|
|
|
|
|
|
|
// register cleanups
|
|
|
|
|
mutable_iter->RegisterCleanup(CleanupIteratorState, |
|
|
|
|
new IterState(this, &mutex_, super_version), nullptr); |
|
|
|
|
|
|
|
|
|
// bump the ref one more time since it will be Unref'ed twice
|
|
|
|
|
immutable_iter->RegisterCleanup(CleanupIteratorState, |
|
|
|
|
new IterState(this, &mutex_, super_version->Ref()), nullptr); |
|
|
|
|
|
|
|
|
|
return std::make_pair(mutable_iter, immutable_iter); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -2946,7 +2917,6 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options, |
|
|
|
|
StartPerfTimer(&snapshot_timer); |
|
|
|
|
|
|
|
|
|
SequenceNumber snapshot; |
|
|
|
|
autovector<MemTable*> to_delete; |
|
|
|
|
|
|
|
|
|
mutex_.Lock(); |
|
|
|
|
if (options.snapshot != nullptr) { |
|
|
|
@ -2955,16 +2925,9 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options, |
|
|
|
|
snapshot = versions_->LastSequence(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
MemTable* mem = mem_; |
|
|
|
|
MemTableListVersion* imm = imm_.current(); |
|
|
|
|
Version* current = versions_->current(); |
|
|
|
|
mem->Ref(); |
|
|
|
|
imm->Ref(); |
|
|
|
|
current->Ref(); |
|
|
|
|
|
|
|
|
|
// Unlock while reading from files and memtables
|
|
|
|
|
|
|
|
|
|
SuperVersion* get_version = super_version_->Ref(); |
|
|
|
|
mutex_.Unlock(); |
|
|
|
|
|
|
|
|
|
bool have_stat_update = false; |
|
|
|
|
Version::GetStats stats; |
|
|
|
|
|
|
|
|
@ -2990,12 +2953,14 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options, |
|
|
|
|
std::string* value = &(*values)[i]; |
|
|
|
|
|
|
|
|
|
LookupKey lkey(keys[i], snapshot); |
|
|
|
|
if (mem->Get(lkey, value, &s, merge_context, options_)) { |
|
|
|
|
if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) { |
|
|
|
|
// Done
|
|
|
|
|
} else if (imm->Get(lkey, value, &s, merge_context, options_)) { |
|
|
|
|
} else if (get_version->imm->Get(lkey, value, &s, merge_context, |
|
|
|
|
options_)) { |
|
|
|
|
// Done
|
|
|
|
|
} else { |
|
|
|
|
current->Get(options, lkey, value, &s, &merge_context, &stats, options_); |
|
|
|
|
get_version->current->Get(options, lkey, value, &s, &merge_context, |
|
|
|
|
&stats, options_); |
|
|
|
|
have_stat_update = true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -3007,19 +2972,28 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options, |
|
|
|
|
// Post processing (decrement reference counts and record statistics)
|
|
|
|
|
StopWatchNano post_process_timer(env_, false); |
|
|
|
|
StartPerfTimer(&post_process_timer); |
|
|
|
|
mutex_.Lock(); |
|
|
|
|
if (!options_.disable_seek_compaction && |
|
|
|
|
have_stat_update && current->UpdateStats(stats)) { |
|
|
|
|
MaybeScheduleFlushOrCompaction(); |
|
|
|
|
bool delete_get_version = false; |
|
|
|
|
if (!options_.disable_seek_compaction && have_stat_update) { |
|
|
|
|
mutex_.Lock(); |
|
|
|
|
if (get_version->current->UpdateStats(stats)) { |
|
|
|
|
MaybeScheduleFlushOrCompaction(); |
|
|
|
|
} |
|
|
|
|
if (get_version->Unref()) { |
|
|
|
|
get_version->Cleanup(); |
|
|
|
|
delete_get_version = true; |
|
|
|
|
} |
|
|
|
|
mutex_.Unlock(); |
|
|
|
|
} else { |
|
|
|
|
if (get_version->Unref()) { |
|
|
|
|
mutex_.Lock(); |
|
|
|
|
get_version->Cleanup(); |
|
|
|
|
mutex_.Unlock(); |
|
|
|
|
delete_get_version = true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (delete_get_version) { |
|
|
|
|
delete get_version; |
|
|
|
|
} |
|
|
|
|
MemTable* m = mem->Unref(); |
|
|
|
|
imm->Unref(&to_delete); |
|
|
|
|
current->Unref(); |
|
|
|
|
mutex_.Unlock(); |
|
|
|
|
|
|
|
|
|
// free up all obsolete memtables outside the mutex
|
|
|
|
|
delete m; |
|
|
|
|
for (MemTable* v: to_delete) delete v; |
|
|
|
|
|
|
|
|
|
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS); |
|
|
|
|
RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, numKeys); |
|
|
|
|