fix DBImpl::NewInternalIterator super-version leak on failure

Summary:
Close #2955
Closes https://github.com/facebook/rocksdb/pull/2960

Differential Revision: D5962872

Pulled By: yiwu-arbug

fbshipit-source-id: a6472d5c015bea3dc476c572ff5a5c90259e6059
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 019aa7074c
commit fb4ae4d810
  1. 33
      db/db_impl.cc
  2. 3
      db/db_impl.h
  3. 13
      db/db_iterator_test.cc
  4. 18
      table/merging_iterator.cc
  5. 2
      table/merging_iterator.h

@ -894,6 +894,7 @@ InternalIterator* DBImpl::NewInternalIterator(
range_del_agg); range_del_agg);
} }
} }
TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
if (s.ok()) { if (s.ok()) {
// Collect iterators for files in L0 - Ln // Collect iterators for files in L0 - Ln
if (read_options.read_tier != kMemtableTier) { if (read_options.read_tier != kMemtableTier) {
@ -907,8 +908,10 @@ InternalIterator* DBImpl::NewInternalIterator(
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
return internal_iter; return internal_iter;
} else {
CleanupSuperVersion(super_version);
} }
return NewErrorInternalIterator(s); return NewErrorInternalIterator(s, arena);
} }
ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const { ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
@ -1819,21 +1822,23 @@ SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
return GetAndRefSuperVersion(cfd); return GetAndRefSuperVersion(cfd);
} }
void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
// Release SuperVersion
if (sv->Unref()) {
{
InstrumentedMutexLock l(&mutex_);
sv->Cleanup();
}
delete sv;
RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
}
RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
}
void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd, void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
SuperVersion* sv) { SuperVersion* sv) {
bool unref_sv = !cfd->ReturnThreadLocalSuperVersion(sv); if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
CleanupSuperVersion(sv);
if (unref_sv) {
// Release SuperVersion
if (sv->Unref()) {
{
InstrumentedMutexLock l(&mutex_);
sv->Cleanup();
}
delete sv;
RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
}
RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
} }
} }

@ -477,6 +477,9 @@ class DBImpl : public DB {
// mutex is held. // mutex is held.
SuperVersion* GetAndRefSuperVersion(uint32_t column_family_id); SuperVersion* GetAndRefSuperVersion(uint32_t column_family_id);
// Un-reference the super version and clean it up if it is the last reference.
void CleanupSuperVersion(SuperVersion* sv);
// Un-reference the super version and return it to thread local cache if // Un-reference the super version and return it to thread local cache if
// needed. If it is the last reference of the super version. Clean it up // needed. If it is the last reference of the super version. Clean it up
// after un-referencing it. // after un-referencing it.

@ -1997,6 +1997,19 @@ TEST_F(DBIteratorTest, Refresh) {
iter.reset(); iter.reset();
} }
TEST_F(DBIteratorTest, CreationFailure) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::NewInternalIterator:StatusCallback", [](void* arg) {
*(reinterpret_cast<Status*>(arg)) = Status::Corruption("test status");
});
SyncPoint::GetInstance()->EnableProcessing();
Iterator* iter = db_->NewIterator(ReadOptions());
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsCorruption());
delete iter;
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -387,10 +387,20 @@ MergeIteratorBuilder::MergeIteratorBuilder(
new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode); new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode);
} }
MergeIteratorBuilder::~MergeIteratorBuilder() {
if (first_iter != nullptr) {
first_iter->~InternalIterator();
}
if (merge_iter != nullptr) {
merge_iter->~MergingIterator();
}
}
void MergeIteratorBuilder::AddIterator(InternalIterator* iter) { void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
if (!use_merging_iter && first_iter != nullptr) { if (!use_merging_iter && first_iter != nullptr) {
merge_iter->AddIterator(first_iter); merge_iter->AddIterator(first_iter);
use_merging_iter = true; use_merging_iter = true;
first_iter = nullptr;
} }
if (use_merging_iter) { if (use_merging_iter) {
merge_iter->AddIterator(iter); merge_iter->AddIterator(iter);
@ -400,13 +410,15 @@ void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {
} }
InternalIterator* MergeIteratorBuilder::Finish() { InternalIterator* MergeIteratorBuilder::Finish() {
InternalIterator* ret = nullptr;
if (!use_merging_iter) { if (!use_merging_iter) {
return first_iter; ret = first_iter;
first_iter = nullptr;
} else { } else {
auto ret = merge_iter; ret = merge_iter;
merge_iter = nullptr; merge_iter = nullptr;
return ret;
} }
return ret;
} }
} // namespace rocksdb } // namespace rocksdb

@ -40,7 +40,7 @@ class MergeIteratorBuilder {
// arena: where the merging iterator needs to be allocated from. // arena: where the merging iterator needs to be allocated from.
explicit MergeIteratorBuilder(const InternalKeyComparator* comparator, explicit MergeIteratorBuilder(const InternalKeyComparator* comparator,
Arena* arena, bool prefix_seek_mode = false); Arena* arena, bool prefix_seek_mode = false);
~MergeIteratorBuilder() {} ~MergeIteratorBuilder();
// Add iter to the merging iterator. // Add iter to the merging iterator.
void AddIterator(InternalIterator* iter); void AddIterator(InternalIterator* iter);

Loading…
Cancel
Save