From b9d6eccac14588d03f083d22d577da0c0285143f Mon Sep 17 00:00:00 2001 From: Anand Ananthabhotla Date: Wed, 2 Jan 2019 11:40:12 -0800 Subject: [PATCH] Lock free MultiGet (#4754) Summary: Avoid locking the DB mutex in order to reference SuperVersions. Instead, we get the thread local cached SuperVersion for each column family in the list. It depends on finding a sequence number that overlaps with all the open memtables. We start with the latest published sequence number, and if any of the memtables is sealed before we can get all the SuperVersions, the process is repeated. After a few times, give up and lock the DB mutex. Tests: 1. Unit tests 2. make check 3. db_bench - TEST_TMPDIR=/dev/shm ./db_bench -use_existing_db=true -benchmarks=readrandom -write_buffer_size=4194304 -target_file_size_base=4194304 -max_bytes_for_level_base=16777216 -num=5000000 -reads=1000000 -threads=32 -compression_type=none -cache_size=1048576000 -batch_size=1 -bloom_bits=1 readrandom : 0.167 micros/op 5983920 ops/sec; 426.2 MB/s (1000000 of 1000000 found) Multireadrandom with batch size 1: multireadrandom : 0.176 micros/op 5684033 ops/sec; (1000000 of 1000000 found) Pull Request resolved: https://github.com/facebook/rocksdb/pull/4754 Differential Revision: D13363550 Pulled By: anand1976 fbshipit-source-id: 6243e8de7dbd9c8bb490a8eca385da0c855b1dd4 --- HISTORY.md | 1 + db/column_family.h | 2 + db/db_basic_test.cc | 179 ++++++++++++++++++++++++++++++++++++++++++++ db/db_impl.cc | 119 +++++++++++++++++++++-------- db/db_test_util.cc | 25 +++++++ db/db_test_util.h | 4 + 6 files changed, 297 insertions(+), 33 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 6d6ad040e..161b2597a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -12,6 +12,7 @@ * Fix a deadlock caused by compaction and file ingestion waiting for each other in the event of write stalls. * Fix a memory leak when files with range tombstones are read in mmap mode and block cache is enabled * Fix handling of corrupt range tombstone blocks such that corruptions cannot cause deleted keys to reappear +* Lock free MultiGet ## 5.18.0 (11/30/2018) ### New Features diff --git a/db/column_family.h b/db/column_family.h index d570afaef..5ed20604a 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -383,6 +383,8 @@ class ColumnFamilyData { Directory* GetDataDir(size_t path_id) const; + ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); } + private: friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 1c7e4a5c0..3094ee30d 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -6,6 +6,7 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +// #include #include "db/db_test_util.h" #include "port/stack_trace.h" #include "rocksdb/perf_context.h" @@ -956,6 +957,184 @@ TEST_F(DBBasicTest, DBCloseFlushError) { Destroy(options); } +TEST_F(DBBasicTest, MultiGetMultiCF) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", + "alyosha", "popovich"}, + options); + + for (int i = 0; i < 8; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + int get_sv_count = 0; + rocksdb::DBImpl* db = reinterpret_cast(db_); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { + if (++get_sv_count == 2) { + // After MultiGet refs a couple of CFs, flush all CFs so MultiGet + // is forced to repeat the process + for (int i = 0; i < 8; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val2")); + } + } + if (get_sv_count == 11) { + for (int i = 0; i < 8; ++i) { + auto* cfd = reinterpret_cast( + db->GetColumnFamilyHandle(i)) + ->cfd(); + ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + } + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector cfs; + std::vector keys; + std::vector values; + + for (int i = 0; i < 8; ++i) { + cfs.push_back(i); + keys.push_back("cf" + std::to_string(i) + "_key"); + } + + values = MultiGet(cfs, keys); + ASSERT_EQ(values.size(), 8); + for (unsigned int j = 0; j < values.size(); ++j) { + ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val2"); + } + for (int i = 0; i < 8; ++i) { + auto* cfd = reinterpret_cast( + reinterpret_cast(db_)->GetColumnFamilyHandle(i)) + ->cfd(); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete); + } +} + +TEST_F(DBBasicTest, MultiGetMultiCFMutex) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", + "alyosha", "popovich"}, + options); + + for (int i = 0; i < 8; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + int get_sv_count = 0; + int retries = 0; + bool last_try = false; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) { + last_try = true; + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { + if (last_try) { + return; + } + if (++get_sv_count == 2) { + ++retries; + get_sv_count = 0; + for (int i = 0; i < 8; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_OK(Put( + i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val" + std::to_string(retries))); + } + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector cfs; + std::vector keys; + std::vector values; + + for (int i = 0; i < 8; ++i) { + cfs.push_back(i); + keys.push_back("cf" + std::to_string(i) + "_key"); + } + + values = MultiGet(cfs, keys); + ASSERT_TRUE(last_try); + ASSERT_EQ(values.size(), 8); + for (unsigned int j = 0; j < values.size(); ++j) { + ASSERT_EQ(values[j], + "cf" + std::to_string(j) + "_val" + std::to_string(retries)); + } + for (int i = 0; i < 8; ++i) { + auto* cfd = reinterpret_cast( + reinterpret_cast(db_)->GetColumnFamilyHandle(i)) + ->cfd(); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + } +} + +TEST_F(DBBasicTest, MultiGetMultiCFSnapshot) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", + "alyosha", "popovich"}, + options); + + for (int i = 0; i < 8; ++i) { + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val")); + } + + int get_sv_count = 0; + rocksdb::DBImpl* db = reinterpret_cast(db_); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { + if (++get_sv_count == 2) { + for (int i = 0; i < 8; ++i) { + ASSERT_OK(Flush(i)); + ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", + "cf" + std::to_string(i) + "_val2")); + } + } + if (get_sv_count == 8) { + for (int i = 0; i < 8; ++i) { + auto* cfd = reinterpret_cast( + db->GetColumnFamilyHandle(i)) + ->cfd(); + ASSERT_TRUE( + (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) || + (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete)); + } + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + std::vector cfs; + std::vector keys; + std::vector values; + + for (int i = 0; i < 8; ++i) { + cfs.push_back(i); + keys.push_back("cf" + std::to_string(i) + "_key"); + } + + const Snapshot* snapshot = db_->GetSnapshot(); + values = MultiGet(cfs, keys, snapshot); + db_->ReleaseSnapshot(snapshot); + ASSERT_EQ(values.size(), 8); + for (unsigned int j = 0; j < values.size(); ++j) { + ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val"); + } + for (int i = 0; i < 8; ++i) { + auto* cfd = reinterpret_cast( + reinterpret_cast(db_)->GetColumnFamilyHandle(i)) + ->cfd(); + ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 9ab47a9ba..9c485bd53 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1340,33 +1340,96 @@ std::vector DBImpl::MultiGet( struct MultiGetColumnFamilyData { ColumnFamilyData* cfd; SuperVersion* super_version; + MultiGetColumnFamilyData(ColumnFamilyData* cf, SuperVersion* sv) + : cfd(cf), super_version(sv) {} }; - std::unordered_map multiget_cf_data; - // fill up and allocate outside of mutex + std::unordered_map multiget_cf_data( + column_family.size()); for (auto cf : column_family) { auto cfh = reinterpret_cast(cf); auto cfd = cfh->cfd(); if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) { - auto mgcfd = new MultiGetColumnFamilyData(); - mgcfd->cfd = cfd; - multiget_cf_data.insert({cfd->GetID(), mgcfd}); + multiget_cf_data.emplace(cfd->GetID(), + MultiGetColumnFamilyData(cfd, nullptr)); } } - mutex_.Lock(); - if (read_options.snapshot != nullptr) { - snapshot = - reinterpret_cast(read_options.snapshot)->number_; - } else { - snapshot = last_seq_same_as_publish_seq_ - ? versions_->LastSequence() - : versions_->LastPublishedSequence(); - } - for (auto mgd_iter : multiget_cf_data) { - mgd_iter.second->super_version = - mgd_iter.second->cfd->GetSuperVersion()->Ref(); + bool last_try = false; + { + // If we end up with the same issue of memtable geting sealed during 2 + // consecutive retries, it means the write rate is very high. In that case + // its probably ok to take the mutex on the 3rd try so we can succeed for + // sure + static const int num_retries = 3; + for (auto i = 0; i < num_retries; ++i) { + last_try = (i == num_retries - 1); + bool retry = false; + + if (i > 0) { + for (auto mgd_iter = multiget_cf_data.begin(); + mgd_iter != multiget_cf_data.end(); ++mgd_iter) { + auto super_version = mgd_iter->second.super_version; + auto cfd = mgd_iter->second.cfd; + if (super_version != nullptr) { + ReturnAndCleanupSuperVersion(cfd, super_version); + } + mgd_iter->second.super_version = nullptr; + } + } + + if (read_options.snapshot == nullptr) { + if (last_try) { + TEST_SYNC_POINT("DBImpl::MultiGet::LastTry"); + // We're close to max number of retries. For the last retry, + // acquire the lock so we're sure to succeed + mutex_.Lock(); + } + snapshot = last_seq_same_as_publish_seq_ + ? versions_->LastSequence() + : versions_->LastPublishedSequence(); + } else { + snapshot = reinterpret_cast(read_options.snapshot) + ->number_; + } + + for (auto mgd_iter = multiget_cf_data.begin(); + mgd_iter != multiget_cf_data.end(); ++mgd_iter) { + if (!last_try) { + mgd_iter->second.super_version = + GetAndRefSuperVersion(mgd_iter->second.cfd); + } else { + mgd_iter->second.super_version = + mgd_iter->second.cfd->GetSuperVersion()->Ref(); + } + TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV"); + if (read_options.snapshot != nullptr || last_try) { + // If user passed a snapshot, then we don't care if a memtable is + // sealed or compaction happens because the snapshot would ensure + // that older key versions are kept around. If this is the last + // retry, then we have the lock so nothing bad can happen + continue; + } + // We could get the earliest sequence number for the whole list of + // memtables, which will include immutable memtables as well, but that + // might be tricky to maintain in case we decide, in future, to do + // memtable compaction. + if (!last_try) { + auto seq = + mgd_iter->second.super_version->mem->GetEarliestSequenceNumber(); + if (seq > snapshot) { + retry = true; + break; + } + } + } + if (!retry) { + if (last_try) { + mutex_.Unlock(); + } + break; + } + } } - mutex_.Unlock(); // Contain a list of merge operations if merge occurs. MergeContext merge_context; @@ -1396,7 +1459,7 @@ std::vector DBImpl::MultiGet( auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); assert(mgd_iter != multiget_cf_data.end()); auto mgd = mgd_iter->second; - auto super_version = mgd->super_version; + auto super_version = mgd.super_version; bool skip_memtable = (read_options.read_tier == kPersistedTier && has_unpersisted_data_.load(std::memory_order_relaxed)); @@ -1432,24 +1495,14 @@ std::vector DBImpl::MultiGet( PERF_TIMER_GUARD(get_post_process_time); autovector superversions_to_delete; - // TODO(icanadi) do we need lock here or just around Cleanup()? - mutex_.Lock(); for (auto mgd_iter : multiget_cf_data) { auto mgd = mgd_iter.second; - if (mgd->super_version->Unref()) { - mgd->super_version->Cleanup(); - superversions_to_delete.push_back(mgd->super_version); + if (!last_try) { + ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version); + } else { + mgd.cfd->GetSuperVersion()->Unref(); } } - mutex_.Unlock(); - - for (auto td : superversions_to_delete) { - delete td; - } - for (auto mgd : multiget_cf_data) { - delete mgd.second; - } - RecordTick(stats_, NUMBER_MULTIGET_CALLS); RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys); RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found); diff --git a/db/db_test_util.cc b/db/db_test_util.cc index d78fbfca9..38efbc436 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -763,6 +763,31 @@ std::string DBTestBase::Get(int cf, const std::string& k, return result; } +std::vector DBTestBase::MultiGet(std::vector cfs, + const std::vector& k, + const Snapshot* snapshot) { + ReadOptions options; + options.verify_checksums = true; + options.snapshot = snapshot; + std::vector handles; + std::vector keys; + std::vector result; + + for (unsigned int i = 0; i < cfs.size(); ++i) { + handles.push_back(handles_[cfs[i]]); + keys.push_back(k[i]); + } + std::vector s = db_->MultiGet(options, handles, keys, &result); + for (unsigned int i = 0; i < s.size(); ++i) { + if (s[i].IsNotFound()) { + result[i] = "NOT_FOUND"; + } else if (!s[i].ok()) { + result[i] = s[i].ToString(); + } + } + return result; +} + Status DBTestBase::Get(const std::string& k, PinnableSlice* v) { ReadOptions options; options.verify_checksums = true; diff --git a/db/db_test_util.h b/db/db_test_util.h index 3638ca7fa..27a20e776 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -840,6 +840,10 @@ class DBTestBase : public testing::Test { Status Get(const std::string& k, PinnableSlice* v); + std::vector MultiGet(std::vector cfs, + const std::vector& k, + const Snapshot* snapshot = nullptr); + uint64_t GetNumSnapshots(); uint64_t GetTimeOldestSnapshots();