diff --git a/HISTORY.md b/HISTORY.md index 6d6ad040e..161b2597a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -12,6 +12,7 @@ * Fix a deadlock caused by compaction and file ingestion waiting for each other in the event of write stalls. * Fix a memory leak when files with range tombstones are read in mmap mode and block cache is enabled * Fix handling of corrupt range tombstone blocks such that corruptions cannot cause deleted keys to reappear +* Lock free MultiGet ## 5.18.0 (11/30/2018) ### New Features diff --git a/db/column_family.h b/db/column_family.h index d570afaef..5ed20604a 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -383,6 +383,8 @@ class ColumnFamilyData { Directory* GetDataDir(size_t path_id) const; + ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); } + private: friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 1c7e4a5c0..3094ee30d 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -6,6 +6,7 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +// #include #include "db/db_test_util.h" #include "port/stack_trace.h" #include "rocksdb/perf_context.h" @@ -956,6 +957,184 @@ TEST_F(DBBasicTest, DBCloseFlushError) { Destroy(options); } +TEST_F(DBBasicTest, MultiGetMultiCF) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", + "alyosha", "popovich"}, + options); + + for (int i = 0; i < 8; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + int get_sv_count = 0; + rocksdb::DBImpl* db = reinterpret_cast(db_); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { + if (++get_sv_count == 2) { + // After MultiGet refs a couple of CFs, flush all CFs so MultiGet + // is forced to repeat the process + for (int i = 0; i < 8; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val2")); + } + } + if (get_sv_count == 11) { + for (int i = 0; i < 8; ++i) { + auto* cfd = reinterpret_cast( + db->GetColumnFamilyHandle(i)) + ->cfd(); + ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + } + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector cfs; + std::vector keys; + std::vector values; + + for (int i = 0; i < 8; ++i) { + cfs.push_back(i); + keys.push_back("cf" + std::to_string(i) + "_key"); + } + + values = MultiGet(cfs, keys); + ASSERT_EQ(values.size(), 8); + for (unsigned int j = 0; j < values.size(); ++j) { + ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val2"); + } + for (int i = 0; i < 8; ++i) { + auto* cfd = reinterpret_cast( + reinterpret_cast(db_)->GetColumnFamilyHandle(i)) + ->cfd(); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete); + } +} + +TEST_F(DBBasicTest, MultiGetMultiCFMutex) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", + "alyosha", "popovich"}, + options); + + for (int i = 0; i < 8; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + int get_sv_count = 0; + int retries = 0; + bool last_try = false; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) { + last_try = true; + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { + if (last_try) { + return; + } + if (++get_sv_count == 2) { + ++retries; + get_sv_count = 0; + for (int i = 0; i < 8; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_OK(Put( + i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val" + std::to_string(retries))); + } + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector cfs; + std::vector keys; + std::vector values; + + for (int i = 0; i < 8; ++i) { + cfs.push_back(i); + keys.push_back("cf" + std::to_string(i) + "_key"); + } + + values = MultiGet(cfs, keys); + ASSERT_TRUE(last_try); + ASSERT_EQ(values.size(), 8); + for (unsigned int j = 0; j < values.size(); ++j) { + ASSERT_EQ(values[j], + "cf" + std::to_string(j) + "_val" + std::to_string(retries)); + } + for (int i = 0; i < 8; ++i) { + auto* cfd = reinterpret_cast( + reinterpret_cast(db_)->GetColumnFamilyHandle(i)) + ->cfd(); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + } +} + +TEST_F(DBBasicTest, MultiGetMultiCFSnapshot) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", + "alyosha", "popovich"}, + options); + + for (int i = 0; i < 8; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + int get_sv_count = 0; + rocksdb::DBImpl* db = reinterpret_cast(db_); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { + if (++get_sv_count == 2) { + for (int i = 0; i < 8; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val2")); + } + } + if (get_sv_count == 8) { + for (int i = 0; i < 8; ++i) { + auto* cfd = reinterpret_cast( + db->GetColumnFamilyHandle(i)) + ->cfd(); + ASSERT_TRUE( + (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) || + (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete)); + } + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector cfs; + std::vector keys; + std::vector values; + + for (int i = 0; i < 8; ++i) { + cfs.push_back(i); + keys.push_back("cf" + std::to_string(i) + "_key"); + } + + const Snapshot* snapshot = db_->GetSnapshot(); + values = MultiGet(cfs, keys, snapshot); + db_->ReleaseSnapshot(snapshot); + ASSERT_EQ(values.size(), 8); + for (unsigned int j = 0; j < values.size(); ++j) { + ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val"); + } + for (int i = 0; i < 8; ++i) { + auto* cfd = reinterpret_cast( + reinterpret_cast(db_)->GetColumnFamilyHandle(i)) + ->cfd(); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 9ab47a9ba..9c485bd53 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1340,33 +1340,96 @@ std::vector DBImpl::MultiGet( struct MultiGetColumnFamilyData { ColumnFamilyData* cfd; SuperVersion* super_version; + MultiGetColumnFamilyData(ColumnFamilyData* cf, SuperVersion* sv) + : cfd(cf), super_version(sv) {} }; - std::unordered_map multiget_cf_data; - // fill up and allocate outside of mutex + std::unordered_map multiget_cf_data( + column_family.size()); for (auto cf : column_family) { auto cfh = reinterpret_cast(cf); auto cfd = cfh->cfd(); if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) { - auto mgcfd = new MultiGetColumnFamilyData(); - mgcfd->cfd = cfd; - multiget_cf_data.insert({cfd->GetID(), mgcfd}); + multiget_cf_data.emplace(cfd->GetID(), + MultiGetColumnFamilyData(cfd, nullptr)); } } - mutex_.Lock(); - if (read_options.snapshot != nullptr) { - snapshot = - reinterpret_cast(read_options.snapshot)->number_; - } else { - snapshot = last_seq_same_as_publish_seq_ - ? versions_->LastSequence() - : versions_->LastPublishedSequence(); - } - for (auto mgd_iter : multiget_cf_data) { - mgd_iter.second->super_version = - mgd_iter.second->cfd->GetSuperVersion()->Ref(); + bool last_try = false; + { + // If we end up with the same issue of memtable geting sealed during 2 + // consecutive retries, it means the write rate is very high. In that case + // its probably ok to take the mutex on the 3rd try so we can succeed for + // sure + static const int num_retries = 3; + for (auto i = 0; i < num_retries; ++i) { + last_try = (i == num_retries - 1); + bool retry = false; + + if (i > 0) { + for (auto mgd_iter = multiget_cf_data.begin(); + mgd_iter != multiget_cf_data.end(); ++mgd_iter) { + auto super_version = mgd_iter->second.super_version; + auto cfd = mgd_iter->second.cfd; + if (super_version != nullptr) { + ReturnAndCleanupSuperVersion(cfd, super_version); + } + mgd_iter->second.super_version = nullptr; + } + } + + if (read_options.snapshot == nullptr) { + if (last_try) { + TEST_SYNC_POINT("DBImpl::MultiGet::LastTry"); + // We're close to max number of retries. For the last retry, + // acquire the lock so we're sure to succeed + mutex_.Lock(); + } + snapshot = last_seq_same_as_publish_seq_ + ? versions_->LastSequence() + : versions_->LastPublishedSequence(); + } else { + snapshot = reinterpret_cast(read_options.snapshot) + ->number_; + } + + for (auto mgd_iter = multiget_cf_data.begin(); + mgd_iter != multiget_cf_data.end(); ++mgd_iter) { + if (!last_try) { + mgd_iter->second.super_version = + GetAndRefSuperVersion(mgd_iter->second.cfd); + } else { + mgd_iter->second.super_version = + mgd_iter->second.cfd->GetSuperVersion()->Ref(); + } + TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV"); + if (read_options.snapshot != nullptr || last_try) { + // If user passed a snapshot, then we don't care if a memtable is + // sealed or compaction happens because the snapshot would ensure + // that older key versions are kept around. If this is the last + // retry, then we have the lock so nothing bad can happen + continue; + } + // We could get the earliest sequence number for the whole list of + // memtables, which will include immutable memtables as well, but that + // might be tricky to maintain in case we decide, in future, to do + // memtable compaction. + if (!last_try) { + auto seq = + mgd_iter->second.super_version->mem->GetEarliestSequenceNumber(); + if (seq > snapshot) { + retry = true; + break; + } + } + } + if (!retry) { + if (last_try) { + mutex_.Unlock(); + } + break; + } + } } - mutex_.Unlock(); // Contain a list of merge operations if merge occurs. MergeContext merge_context; @@ -1396,7 +1459,7 @@ std::vector DBImpl::MultiGet( auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); assert(mgd_iter != multiget_cf_data.end()); auto mgd = mgd_iter->second; - auto super_version = mgd->super_version; + auto super_version = mgd.super_version; bool skip_memtable = (read_options.read_tier == kPersistedTier && has_unpersisted_data_.load(std::memory_order_relaxed)); @@ -1432,24 +1495,14 @@ std::vector DBImpl::MultiGet( PERF_TIMER_GUARD(get_post_process_time); autovector superversions_to_delete; - // TODO(icanadi) do we need lock here or just around Cleanup()? - mutex_.Lock(); for (auto mgd_iter : multiget_cf_data) { auto mgd = mgd_iter.second; - if (mgd->super_version->Unref()) { - mgd->super_version->Cleanup(); - superversions_to_delete.push_back(mgd->super_version); + if (!last_try) { + ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version); + } else { + mgd.cfd->GetSuperVersion()->Unref(); } } - mutex_.Unlock(); - - for (auto td : superversions_to_delete) { - delete td; - } - for (auto mgd : multiget_cf_data) { - delete mgd.second; - } - RecordTick(stats_, NUMBER_MULTIGET_CALLS); RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys); RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found); diff --git a/db/db_test_util.cc b/db/db_test_util.cc index d78fbfca9..38efbc436 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -763,6 +763,31 @@ std::string DBTestBase::Get(int cf, const std::string& k, return result; } +std::vector DBTestBase::MultiGet(std::vector cfs, + const std::vector& k, + const Snapshot* snapshot) { + ReadOptions options; + options.verify_checksums = true; + options.snapshot = snapshot; + std::vector handles; + std::vector keys; + std::vector result; + + for (unsigned int i = 0; i < cfs.size(); ++i) { + handles.push_back(handles_[cfs[i]]); + keys.push_back(k[i]); + } + std::vector s = db_->MultiGet(options, handles, keys, &result); + for (unsigned int i = 0; i < s.size(); ++i) { + if (s[i].IsNotFound()) { + result[i] = "NOT_FOUND"; + } else if (!s[i].ok()) { + result[i] = s[i].ToString(); + } + } + return result; +} + Status DBTestBase::Get(const std::string& k, PinnableSlice* v) { ReadOptions options; options.verify_checksums = true; diff --git a/db/db_test_util.h b/db/db_test_util.h index 3638ca7fa..27a20e776 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -840,6 +840,10 @@ class DBTestBase : public testing::Test { Status Get(const std::string& k, PinnableSlice* v); + std::vector MultiGet(std::vector cfs, + const std::vector& k, + const Snapshot* snapshot = nullptr); + uint64_t GetNumSnapshots(); uint64_t GetTimeOldestSnapshots();