Summary: We expect the persistent read cache to perform at speeds upto 8 GB/s. In order to accomplish that, we need build a index mechanism which operate in the order of multiple millions per sec rate. This patch provide the basic data structure to accomplish that: (1) Hash table implementation with lock contention spread It is based on the StripedHashSet<T> implementation in The Art of multiprocessor programming by Maurice Henry & Nir Shavit (2) LRU implementation Place holder algorithm for further optimizing (3) Evictable Hash Table implementation Building block for building index data structure that evicts data like files etc TODO: (1) Figure if the sharded hash table and LRU can be used instead (2) Figure if we need to support configurable eviction algorithm for EvictableHashTable Test Plan: Run unit tests Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D55785main
parent
43afd72bee
commit
1f0142ce19
@ -0,0 +1,226 @@ |
|||||||
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
//
|
||||||
|
#pragma once |
||||||
|
|
||||||
|
#include <assert.h> |
||||||
|
#include <sys/mman.h> |
||||||
|
#include <list> |
||||||
|
#include <vector> |
||||||
|
|
||||||
|
#include "include/rocksdb/env.h" |
||||||
|
#include "port/port_posix.h" |
||||||
|
#include "util/mutexlock.h" |
||||||
|
|
||||||
|
namespace rocksdb { |
||||||
|
|
||||||
|
// HashTable<T, Hash, Equal>
|
||||||
|
//
|
||||||
|
// Traditional implementation of hash table with syncronization built on top
|
||||||
|
// don't perform very well in multi-core scenarios. This is an implementation
|
||||||
|
// designed for multi-core scenarios with high lock contention.
|
||||||
|
//
|
||||||
|
// |<-------- alpha ------------->|
|
||||||
|
// Buckets Collision list
|
||||||
|
// ---- +----+ +---+---+--- ...... ---+---+---+
|
||||||
|
// / | |--->| | | | | |
|
||||||
|
// / +----+ +---+---+--- ...... ---+---+---+
|
||||||
|
// / | |
|
||||||
|
// Locks/ +----+
|
||||||
|
// +--+/ . .
|
||||||
|
// | | . .
|
||||||
|
// +--+ . .
|
||||||
|
// | | . .
|
||||||
|
// +--+ . .
|
||||||
|
// | | . .
|
||||||
|
// +--+ . .
|
||||||
|
// \ +----+
|
||||||
|
// \ | |
|
||||||
|
// \ +----+
|
||||||
|
// \ | |
|
||||||
|
// \---- +----+
|
||||||
|
//
|
||||||
|
// The lock contention is spread over an array of locks. This helps improve
|
||||||
|
// concurrent access. The spine is designed for a certain capacity and load
|
||||||
|
// factor. When the capacity planning is done correctly we can expect
|
||||||
|
// O(load_factor = 1) insert, access and remove time.
|
||||||
|
//
|
||||||
|
// Micro benchmark on debug build gives about .5 Million/sec rate of insert,
|
||||||
|
// erase and lookup in parallel (total of about 1.5 Million ops/sec). If the
|
||||||
|
// blocks were of 4K, the hash table can support a virtual throughput of
|
||||||
|
// 6 GB/s.
|
||||||
|
//
|
||||||
|
// T Object type (contains both key and value)
|
||||||
|
// Hash Function that returns an hash from type T
|
||||||
|
// Equal Returns if two objects are equal
|
||||||
|
// (We need explicit equal for pointer type)
|
||||||
|
//
|
||||||
|
template <class T, class Hash, class Equal> |
||||||
|
class HashTable { |
||||||
|
public: |
||||||
|
explicit HashTable(const size_t capacity = 1024 * 1024, |
||||||
|
const float load_factor = 2.0, const uint32_t nlocks = 256) |
||||||
|
: nbuckets_(load_factor ? capacity / load_factor : 0), nlocks_(nlocks) { |
||||||
|
// pre-conditions
|
||||||
|
assert(capacity); |
||||||
|
assert(load_factor); |
||||||
|
assert(nbuckets_); |
||||||
|
assert(nlocks_); |
||||||
|
|
||||||
|
buckets_.reset(new Bucket[nbuckets_]); |
||||||
|
mlock(buckets_.get(), nbuckets_ * sizeof(Bucket)); |
||||||
|
|
||||||
|
// initialize locks
|
||||||
|
locks_.reset(new port::RWMutex[nlocks_]); |
||||||
|
mlock(locks_.get(), nlocks_ * sizeof(port::RWMutex)); |
||||||
|
|
||||||
|
// post-conditions
|
||||||
|
assert(buckets_); |
||||||
|
assert(locks_); |
||||||
|
} |
||||||
|
|
||||||
|
virtual ~HashTable() { AssertEmptyBuckets(); } |
||||||
|
|
||||||
|
//
|
||||||
|
// Insert given record to hash table
|
||||||
|
//
|
||||||
|
bool Insert(const T& t) { |
||||||
|
const uint64_t h = Hash()(t); |
||||||
|
const uint32_t bucket_idx = h % nbuckets_; |
||||||
|
const uint32_t lock_idx = bucket_idx % nlocks_; |
||||||
|
|
||||||
|
WriteLock _(&locks_[lock_idx]); |
||||||
|
auto& bucket = buckets_[bucket_idx]; |
||||||
|
return Insert(&bucket, t); |
||||||
|
} |
||||||
|
|
||||||
|
// Lookup hash table
|
||||||
|
//
|
||||||
|
// Please note that read lock should be held by the caller. This is because
|
||||||
|
// the caller owns the data, and should hold the read lock as long as he
|
||||||
|
// operates on the data.
|
||||||
|
bool Find(const T& t, T* ret, port::RWMutex** ret_lock) { |
||||||
|
const uint64_t h = Hash()(t); |
||||||
|
const uint32_t bucket_idx = h % nbuckets_; |
||||||
|
const uint32_t lock_idx = bucket_idx % nlocks_; |
||||||
|
|
||||||
|
port::RWMutex& lock = locks_[lock_idx]; |
||||||
|
lock.ReadLock(); |
||||||
|
|
||||||
|
auto& bucket = buckets_[bucket_idx]; |
||||||
|
if (Find(&bucket, t, ret)) { |
||||||
|
*ret_lock = &lock; |
||||||
|
return true; |
||||||
|
} |
||||||
|
|
||||||
|
lock.ReadUnlock(); |
||||||
|
return false; |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// Erase a given key from the hash table
|
||||||
|
//
|
||||||
|
bool Erase(const T& t, T* ret) { |
||||||
|
const uint64_t h = Hash()(t); |
||||||
|
const uint32_t bucket_idx = h % nbuckets_; |
||||||
|
const uint32_t lock_idx = bucket_idx % nlocks_; |
||||||
|
|
||||||
|
WriteLock _(&locks_[lock_idx]); |
||||||
|
|
||||||
|
auto& bucket = buckets_[bucket_idx]; |
||||||
|
return Erase(&bucket, t, ret); |
||||||
|
} |
||||||
|
|
||||||
|
// Fetch the mutex associated with a key
|
||||||
|
// This call is used to hold the lock for a given data for extended period of
|
||||||
|
// time.
|
||||||
|
port::RWMutex* GetMutex(const T& t) { |
||||||
|
const uint64_t h = Hash()(t); |
||||||
|
const uint32_t bucket_idx = h % nbuckets_; |
||||||
|
const uint32_t lock_idx = bucket_idx % nlocks_; |
||||||
|
|
||||||
|
return &locks_[lock_idx]; |
||||||
|
} |
||||||
|
|
||||||
|
void Clear(void (*fn)(T)) { |
||||||
|
for (uint32_t i = 0; i < nbuckets_; ++i) { |
||||||
|
const uint32_t lock_idx = i % nlocks_; |
||||||
|
WriteLock _(&locks_[lock_idx]); |
||||||
|
for (auto& t : buckets_[i].list_) { |
||||||
|
(*fn)(t); |
||||||
|
} |
||||||
|
buckets_[i].list_.clear(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
protected: |
||||||
|
// Models bucket of keys that hash to the same bucket number
|
||||||
|
struct Bucket { |
||||||
|
std::list<T> list_; |
||||||
|
}; |
||||||
|
|
||||||
|
// Substitute for std::find with custom comparator operator
|
||||||
|
typename std::list<T>::iterator Find(std::list<T>* list, const T& t) { |
||||||
|
for (auto it = list->begin(); it != list->end(); ++it) { |
||||||
|
if (Equal()(*it, t)) { |
||||||
|
return it; |
||||||
|
} |
||||||
|
} |
||||||
|
return list->end(); |
||||||
|
} |
||||||
|
|
||||||
|
bool Insert(Bucket* bucket, const T& t) { |
||||||
|
// Check if the key already exists
|
||||||
|
auto it = Find(&bucket->list_, t); |
||||||
|
if (it != bucket->list_.end()) { |
||||||
|
return false; |
||||||
|
} |
||||||
|
|
||||||
|
// insert to bucket
|
||||||
|
bucket->list_.push_back(t); |
||||||
|
return true; |
||||||
|
} |
||||||
|
|
||||||
|
bool Find(Bucket* bucket, const T& t, T* ret) { |
||||||
|
auto it = Find(&bucket->list_, t); |
||||||
|
if (it != bucket->list_.end()) { |
||||||
|
if (ret) { |
||||||
|
*ret = *it; |
||||||
|
} |
||||||
|
return true; |
||||||
|
} |
||||||
|
return false; |
||||||
|
} |
||||||
|
|
||||||
|
bool Erase(Bucket* bucket, const T& t, T* ret) { |
||||||
|
auto it = Find(&bucket->list_, t); |
||||||
|
if (it != bucket->list_.end()) { |
||||||
|
if (ret) { |
||||||
|
*ret = *it; |
||||||
|
} |
||||||
|
|
||||||
|
bucket->list_.erase(it); |
||||||
|
return true; |
||||||
|
} |
||||||
|
return false; |
||||||
|
} |
||||||
|
|
||||||
|
// assert that all buckets are empty
|
||||||
|
void AssertEmptyBuckets() { |
||||||
|
#ifndef NDEBUG |
||||||
|
for (size_t i = 0; i < nbuckets_; ++i) { |
||||||
|
WriteLock _(&locks_[i % nlocks_]); |
||||||
|
assert(buckets_[i].list_.empty()); |
||||||
|
} |
||||||
|
#endif |
||||||
|
} |
||||||
|
|
||||||
|
const uint32_t nbuckets_; // No. of buckets in the spine
|
||||||
|
std::unique_ptr<Bucket[]> buckets_; // Spine of the hash buckets
|
||||||
|
const uint32_t nlocks_; // No. of locks
|
||||||
|
std::unique_ptr<port::RWMutex[]> locks_; // Granular locks
|
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace rocksdb
|
@ -0,0 +1,294 @@ |
|||||||
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <gflags/gflags.h> |
||||||
|
#include <sys/time.h> |
||||||
|
#include <unistd.h> |
||||||
|
|
||||||
|
#include <atomic> |
||||||
|
#include <functional> |
||||||
|
#include <string> |
||||||
|
#include <unordered_map> |
||||||
|
|
||||||
|
#include "include/rocksdb/env.h" |
||||||
|
#include "port/port_posix.h" |
||||||
|
#include "util/mutexlock.h" |
||||||
|
#include "utilities/persistent_cache/hash_table.h" |
||||||
|
|
||||||
|
using std::string; |
||||||
|
|
||||||
|
DEFINE_int32(nsec, 10, "nsec"); |
||||||
|
DEFINE_int32(nthread_write, 1, "insert %"); |
||||||
|
DEFINE_int32(nthread_read, 1, "lookup %"); |
||||||
|
DEFINE_int32(nthread_erase, 1, "erase %"); |
||||||
|
|
||||||
|
namespace rocksdb { |
||||||
|
|
||||||
|
//
|
||||||
|
// HashTableImpl interface
|
||||||
|
//
|
||||||
|
// Abstraction of a hash table implementation
|
||||||
|
template <class Key, class Value> |
||||||
|
class HashTableImpl { |
||||||
|
public: |
||||||
|
virtual ~HashTableImpl() {} |
||||||
|
|
||||||
|
virtual bool Insert(const Key& key, const Value& val) = 0; |
||||||
|
virtual bool Erase(const Key& key) = 0; |
||||||
|
virtual bool Lookup(const Key& key, Value* val) = 0; |
||||||
|
}; |
||||||
|
|
||||||
|
// HashTableBenchmark
|
||||||
|
//
|
||||||
|
// Abstraction to test a given hash table implementation. The test mostly
|
||||||
|
// focus on insert, lookup and erase. The test can operate in test mode and
|
||||||
|
// benchmark mode.
|
||||||
|
class HashTableBenchmark { |
||||||
|
public: |
||||||
|
explicit HashTableBenchmark(HashTableImpl<size_t, std::string>* impl, |
||||||
|
const size_t sec = 10, |
||||||
|
const size_t nthread_write = 1, |
||||||
|
const size_t nthread_read = 1, |
||||||
|
const size_t nthread_erase = 1) |
||||||
|
: impl_(impl), |
||||||
|
sec_(sec), |
||||||
|
ninserts_(0), |
||||||
|
nreads_(0), |
||||||
|
nerases_(0), |
||||||
|
nerases_failed_(0), |
||||||
|
quit_(false) { |
||||||
|
Prepop(); |
||||||
|
|
||||||
|
StartThreads(nthread_write, WriteMain); |
||||||
|
StartThreads(nthread_read, ReadMain); |
||||||
|
StartThreads(nthread_erase, EraseMain); |
||||||
|
|
||||||
|
uint64_t start = NowInMillSec(); |
||||||
|
while (!quit_) { |
||||||
|
quit_ = NowInMillSec() - start > sec_ * 1000; |
||||||
|
/* sleep override */ sleep(1); |
||||||
|
} |
||||||
|
|
||||||
|
Env* env = Env::Default(); |
||||||
|
env->WaitForJoin(); |
||||||
|
|
||||||
|
if (sec_) { |
||||||
|
printf("Result \n"); |
||||||
|
printf("====== \n"); |
||||||
|
printf("insert/sec = %f \n", ninserts_ / static_cast<double>(sec_)); |
||||||
|
printf("read/sec = %f \n", nreads_ / static_cast<double>(sec_)); |
||||||
|
printf("erases/sec = %f \n", nerases_ / static_cast<double>(sec_)); |
||||||
|
const uint64_t ops = ninserts_ + nreads_ + nerases_; |
||||||
|
printf("ops/sec = %f \n", ops / static_cast<double>(sec_)); |
||||||
|
printf("erase fail = %d (%f%%)\n", static_cast<int>(nerases_failed_), |
||||||
|
static_cast<float>(nerases_failed_ / nerases_ * 100)); |
||||||
|
printf("====== \n"); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void RunWrite() { |
||||||
|
while (!quit_) { |
||||||
|
size_t k = insert_key_++; |
||||||
|
std::string tmp(1000, k % 255); |
||||||
|
bool status = impl_->Insert(k, tmp); |
||||||
|
assert(status); |
||||||
|
ninserts_++; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void RunRead() { |
||||||
|
while (!quit_) { |
||||||
|
std::string s; |
||||||
|
size_t k = random() % max_prepop_key; |
||||||
|
bool status = impl_->Lookup(k, &s); |
||||||
|
assert(status); |
||||||
|
assert(s == std::string(1000, k % 255)); |
||||||
|
nreads_++; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void RunErase() { |
||||||
|
while (!quit_) { |
||||||
|
size_t k = erase_key_++; |
||||||
|
bool status = impl_->Erase(k); |
||||||
|
nerases_failed_ += !status; |
||||||
|
nerases_++; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
// Start threads for a given function
|
||||||
|
void StartThreads(const size_t n, void (*fn)(void*)) { |
||||||
|
Env* env = Env::Default(); |
||||||
|
for (size_t i = 0; i < n; ++i) { |
||||||
|
env->StartThread(fn, this); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// Prepop the hash table with 1M keys
|
||||||
|
void Prepop() { |
||||||
|
for (size_t i = 0; i < max_prepop_key; ++i) { |
||||||
|
bool status = impl_->Insert(i, std::string(1000, i % 255)); |
||||||
|
assert(status); |
||||||
|
} |
||||||
|
|
||||||
|
erase_key_ = insert_key_ = max_prepop_key; |
||||||
|
|
||||||
|
for (size_t i = 0; i < 10 * max_prepop_key; ++i) { |
||||||
|
bool status = impl_->Insert(insert_key_++, std::string(1000, 'x')); |
||||||
|
assert(status); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
static uint64_t NowInMillSec() { |
||||||
|
timeval tv; |
||||||
|
gettimeofday(&tv, /*tz=*/nullptr); |
||||||
|
return tv.tv_sec * 1000 + tv.tv_usec / 1000; |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// Wrapper functions for thread entry
|
||||||
|
//
|
||||||
|
static void WriteMain(void* args) { |
||||||
|
reinterpret_cast<HashTableBenchmark*>(args)->RunWrite(); |
||||||
|
} |
||||||
|
|
||||||
|
static void ReadMain(void* args) { |
||||||
|
reinterpret_cast<HashTableBenchmark*>(args)->RunRead(); |
||||||
|
} |
||||||
|
|
||||||
|
static void EraseMain(void* args) { |
||||||
|
reinterpret_cast<HashTableBenchmark*>(args)->RunErase(); |
||||||
|
} |
||||||
|
|
||||||
|
HashTableImpl<size_t, std::string>* impl_; // Implementation to test
|
||||||
|
const size_t sec_; // Test time
|
||||||
|
const size_t max_prepop_key = 1ULL * 1024 * 1024; // Max prepop key
|
||||||
|
std::atomic<size_t> insert_key_; // Last inserted key
|
||||||
|
std::atomic<size_t> erase_key_; // Erase key
|
||||||
|
std::atomic<size_t> ninserts_; // Number of inserts
|
||||||
|
std::atomic<size_t> nreads_; // Number of reads
|
||||||
|
std::atomic<size_t> nerases_; // Number of erases
|
||||||
|
std::atomic<size_t> nerases_failed_; // Number of erases failed
|
||||||
|
bool quit_; // Should the threads quit ?
|
||||||
|
}; |
||||||
|
|
||||||
|
//
|
||||||
|
// SimpleImpl
|
||||||
|
// Lock safe unordered_map implementation
|
||||||
|
class SimpleImpl : public HashTableImpl<size_t, string> { |
||||||
|
public: |
||||||
|
bool Insert(const size_t& key, const string& val) override { |
||||||
|
WriteLock _(&rwlock_); |
||||||
|
map_.insert(make_pair(key, val)); |
||||||
|
return true; |
||||||
|
} |
||||||
|
|
||||||
|
bool Erase(const size_t& key) override { |
||||||
|
WriteLock _(&rwlock_); |
||||||
|
auto it = map_.find(key); |
||||||
|
if (it == map_.end()) { |
||||||
|
return false; |
||||||
|
} |
||||||
|
map_.erase(it); |
||||||
|
return true; |
||||||
|
} |
||||||
|
|
||||||
|
bool Lookup(const size_t& key, string* val) override { |
||||||
|
ReadLock _(&rwlock_); |
||||||
|
auto it = map_.find(key); |
||||||
|
if (it != map_.end()) { |
||||||
|
*val = it->second; |
||||||
|
} |
||||||
|
return it != map_.end(); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
port::RWMutex rwlock_; |
||||||
|
std::unordered_map<size_t, string> map_; |
||||||
|
}; |
||||||
|
|
||||||
|
//
|
||||||
|
// GranularLockImpl
|
||||||
|
// Thread safe custom RocksDB implementation of hash table with granular
|
||||||
|
// locking
|
||||||
|
class GranularLockImpl : public HashTableImpl<size_t, string> { |
||||||
|
public: |
||||||
|
bool Insert(const size_t& key, const string& val) override { |
||||||
|
Node n(key, val); |
||||||
|
return impl_.Insert(n); |
||||||
|
} |
||||||
|
|
||||||
|
bool Erase(const size_t& key) override { |
||||||
|
Node n(key, string()); |
||||||
|
return impl_.Erase(n, nullptr); |
||||||
|
} |
||||||
|
|
||||||
|
bool Lookup(const size_t& key, string* val) override { |
||||||
|
Node n(key, string()); |
||||||
|
port::RWMutex* rlock; |
||||||
|
bool status = impl_.Find(n, &n, &rlock); |
||||||
|
if (status) { |
||||||
|
ReadUnlock _(rlock); |
||||||
|
*val = n.val_; |
||||||
|
} |
||||||
|
return status; |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
struct Node { |
||||||
|
explicit Node(const size_t key, const string& val) : key_(key), val_(val) {} |
||||||
|
|
||||||
|
size_t key_ = 0; |
||||||
|
string val_; |
||||||
|
}; |
||||||
|
|
||||||
|
struct Hash { |
||||||
|
uint64_t operator()(const Node& node) { |
||||||
|
return std::hash<uint64_t>()(node.key_); |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
struct Equal { |
||||||
|
bool operator()(const Node& lhs, const Node& rhs) { |
||||||
|
return lhs.key_ == rhs.key_; |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
HashTable<Node, Hash, Equal> impl_; |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace rocksdb
|
||||||
|
|
||||||
|
//
|
||||||
|
// main
|
||||||
|
//
|
||||||
|
int main(int argc, char** argv) { |
||||||
|
google::SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) + |
||||||
|
" [OPTIONS]..."); |
||||||
|
google::ParseCommandLineFlags(&argc, &argv, false); |
||||||
|
|
||||||
|
//
|
||||||
|
// Micro benchmark unordered_map
|
||||||
|
//
|
||||||
|
printf("Micro benchmarking std::unordered_map \n"); |
||||||
|
{ |
||||||
|
rocksdb::SimpleImpl impl; |
||||||
|
rocksdb::HashTableBenchmark _(&impl, FLAGS_nsec, FLAGS_nthread_write, |
||||||
|
FLAGS_nthread_read, FLAGS_nthread_erase); |
||||||
|
} |
||||||
|
//
|
||||||
|
// Micro benchmark scalable hash table
|
||||||
|
//
|
||||||
|
printf("Micro benchmarking scalable hash map \n"); |
||||||
|
{ |
||||||
|
rocksdb::GranularLockImpl impl; |
||||||
|
rocksdb::HashTableBenchmark _(&impl, FLAGS_nsec, FLAGS_nthread_write, |
||||||
|
FLAGS_nthread_read, FLAGS_nthread_erase); |
||||||
|
} |
||||||
|
|
||||||
|
return 0; |
||||||
|
} |
@ -0,0 +1,160 @@ |
|||||||
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
//
|
||||||
|
#pragma once |
||||||
|
|
||||||
|
#include "utilities/persistent_cache/hash_table.h" |
||||||
|
#include "utilities/persistent_cache/lrulist.h" |
||||||
|
|
||||||
|
namespace rocksdb { |
||||||
|
|
||||||
|
// Evictable Hash Table
|
||||||
|
//
|
||||||
|
// Hash table index where least accessed (or one of the least accessed) elements
|
||||||
|
// can be evicted.
|
||||||
|
//
|
||||||
|
// Please note EvictableHashTable can only be created for pointer type objects
|
||||||
|
template <class T, class Hash, class Equal> |
||||||
|
class EvictableHashTable : private HashTable<T*, Hash, Equal> { |
||||||
|
public: |
||||||
|
typedef HashTable<T*, Hash, Equal> hash_table; |
||||||
|
|
||||||
|
explicit EvictableHashTable(const size_t capacity = 1024 * 1024, |
||||||
|
const float load_factor = 2.0, |
||||||
|
const uint32_t nlocks = 256) |
||||||
|
: HashTable<T*, Hash, Equal>(capacity, load_factor, nlocks), |
||||||
|
lru_lists_(new LRUList<T>[hash_table::nlocks_]) { |
||||||
|
assert(lru_lists_); |
||||||
|
} |
||||||
|
|
||||||
|
virtual ~EvictableHashTable() { AssertEmptyLRU(); } |
||||||
|
|
||||||
|
//
|
||||||
|
// Insert given record to hash table (and LRU list)
|
||||||
|
//
|
||||||
|
bool Insert(T* t) { |
||||||
|
const uint64_t h = Hash()(t); |
||||||
|
typename hash_table::Bucket& bucket = GetBucket(h); |
||||||
|
LRUListType& lru = GetLRUList(h); |
||||||
|
port::RWMutex& lock = GetMutex(h); |
||||||
|
|
||||||
|
WriteLock _(&lock); |
||||||
|
if (hash_table::Insert(&bucket, t)) { |
||||||
|
lru.Push(t); |
||||||
|
return true; |
||||||
|
} |
||||||
|
return false; |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// Lookup hash table
|
||||||
|
//
|
||||||
|
// Please note that read lock should be held by the caller. This is because
|
||||||
|
// the caller owns the data, and should hold the read lock as long as he
|
||||||
|
// operates on the data.
|
||||||
|
bool Find(T* t, T** ret) { |
||||||
|
const uint64_t h = Hash()(t); |
||||||
|
typename hash_table::Bucket& bucket = GetBucket(h); |
||||||
|
LRUListType& lru = GetLRUList(h); |
||||||
|
port::RWMutex& lock = GetMutex(h); |
||||||
|
|
||||||
|
ReadLock _(&lock); |
||||||
|
if (hash_table::Find(bucket, t, ret)) { |
||||||
|
++(*ret)->refs_; |
||||||
|
lru.Touch(*ret); |
||||||
|
return true; |
||||||
|
} |
||||||
|
return false; |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// Evict one of the least recently used object
|
||||||
|
//
|
||||||
|
T* Evict(const std::function<void(T*)>& fn = nullptr) { |
||||||
|
const size_t start_idx = random() % hash_table::nlocks_; |
||||||
|
T* t = nullptr; |
||||||
|
|
||||||
|
// iterate from start_idx .. 0 .. start_idx
|
||||||
|
for (size_t i = 0; !t && i < hash_table::nlocks_; ++i) { |
||||||
|
const size_t idx = (start_idx + i) % hash_table::nlocks_; |
||||||
|
|
||||||
|
WriteLock _(&hash_table::locks_[idx]); |
||||||
|
LRUListType& lru = lru_lists_[idx]; |
||||||
|
if (!lru.IsEmpty() && (t = lru.Pop())) { |
||||||
|
assert(!t->refs_); |
||||||
|
// We got an item to evict, erase from the bucket
|
||||||
|
const uint64_t h = Hash()(t); |
||||||
|
typename hash_table::Bucket& bucket = GetBucket(h); |
||||||
|
T* tmp = nullptr; |
||||||
|
bool status = hash_table::Erase(&bucket, t, &tmp); |
||||||
|
assert(t == tmp); |
||||||
|
(void)status; |
||||||
|
assert(status); |
||||||
|
if (fn) { |
||||||
|
fn(t); |
||||||
|
} |
||||||
|
break; |
||||||
|
} |
||||||
|
assert(!t); |
||||||
|
} |
||||||
|
return t; |
||||||
|
} |
||||||
|
|
||||||
|
void Clear(void (*fn)(T*)) { |
||||||
|
for (uint32_t i = 0; i < hash_table::nbuckets_; ++i) { |
||||||
|
const uint32_t lock_idx = i % hash_table::nlocks_; |
||||||
|
WriteLock _(&hash_table::locks_[lock_idx]); |
||||||
|
auto& lru_list = lru_lists_[lock_idx]; |
||||||
|
auto& bucket = hash_table::buckets_[i]; |
||||||
|
for (auto* t : bucket.list_) { |
||||||
|
lru_list.Unlink(t); |
||||||
|
(*fn)(t); |
||||||
|
} |
||||||
|
bucket.list_.clear(); |
||||||
|
} |
||||||
|
// make sure that all LRU lists are emptied
|
||||||
|
AssertEmptyLRU(); |
||||||
|
} |
||||||
|
|
||||||
|
void AssertEmptyLRU() { |
||||||
|
#ifndef NDEBUG |
||||||
|
for (uint32_t i = 0; i < hash_table::nlocks_; ++i) { |
||||||
|
WriteLock _(&hash_table::locks_[i]); |
||||||
|
auto& lru_list = lru_lists_[i]; |
||||||
|
assert(lru_list.IsEmpty()); |
||||||
|
} |
||||||
|
#endif |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// Fetch the mutex associated with a key
|
||||||
|
// This call is used to hold the lock for a given data for extended period of
|
||||||
|
// time.
|
||||||
|
port::RWMutex* GetMutex(T* t) { return hash_table::GetMutex(t); } |
||||||
|
|
||||||
|
private: |
||||||
|
typedef LRUList<T> LRUListType; |
||||||
|
|
||||||
|
typename hash_table::Bucket& GetBucket(const uint64_t h) { |
||||||
|
const uint32_t bucket_idx = h % hash_table::nbuckets_; |
||||||
|
return hash_table::buckets_[bucket_idx]; |
||||||
|
} |
||||||
|
|
||||||
|
LRUListType& GetLRUList(const uint64_t h) { |
||||||
|
const uint32_t bucket_idx = h % hash_table::nbuckets_; |
||||||
|
const uint32_t lock_idx = bucket_idx % hash_table::nlocks_; |
||||||
|
return lru_lists_[lock_idx]; |
||||||
|
} |
||||||
|
|
||||||
|
port::RWMutex& GetMutex(const uint64_t h) { |
||||||
|
const uint32_t bucket_idx = h % hash_table::nbuckets_; |
||||||
|
const uint32_t lock_idx = bucket_idx % hash_table::nlocks_; |
||||||
|
return hash_table::locks_[lock_idx]; |
||||||
|
} |
||||||
|
|
||||||
|
std::unique_ptr<LRUListType[]> lru_lists_; |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace rocksdb
|
@ -0,0 +1,152 @@ |
|||||||
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
//
|
||||||
|
#include <iostream> |
||||||
|
#include <set> |
||||||
|
#include <string> |
||||||
|
|
||||||
|
#include "db/db_test_util.h" |
||||||
|
#include "util/arena.h" |
||||||
|
#include "util/testharness.h" |
||||||
|
#include "utilities/persistent_cache/hash_table.h" |
||||||
|
#include "utilities/persistent_cache/hash_table_evictable.h" |
||||||
|
|
||||||
|
namespace rocksdb { |
||||||
|
|
||||||
|
struct HashTableTest : public testing::Test { |
||||||
|
~HashTableTest() { map_.Clear(&HashTableTest::ClearNode); } |
||||||
|
|
||||||
|
struct Node { |
||||||
|
Node() {} |
||||||
|
explicit Node(const uint64_t key, const std::string& val = std::string()) |
||||||
|
: key_(key), val_(val) {} |
||||||
|
|
||||||
|
uint64_t key_ = 0; |
||||||
|
std::string val_; |
||||||
|
}; |
||||||
|
|
||||||
|
struct Equal { |
||||||
|
bool operator()(const Node& lhs, const Node& rhs) { |
||||||
|
return lhs.key_ == rhs.key_; |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
struct Hash { |
||||||
|
uint64_t operator()(const Node& node) { |
||||||
|
return std::hash<uint64_t>()(node.key_); |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
static void ClearNode(Node node) {} |
||||||
|
|
||||||
|
HashTable<Node, Hash, Equal> map_; |
||||||
|
}; |
||||||
|
|
||||||
|
struct EvictableHashTableTest : public testing::Test { |
||||||
|
~EvictableHashTableTest() { map_.Clear(&EvictableHashTableTest::ClearNode); } |
||||||
|
|
||||||
|
struct Node : LRUElement<Node> { |
||||||
|
Node() {} |
||||||
|
explicit Node(const uint64_t key, const std::string& val = std::string()) |
||||||
|
: key_(key), val_(val) {} |
||||||
|
|
||||||
|
uint64_t key_ = 0; |
||||||
|
std::string val_; |
||||||
|
std::atomic<uint32_t> refs_{0}; |
||||||
|
}; |
||||||
|
|
||||||
|
struct Equal { |
||||||
|
bool operator()(const Node* lhs, const Node* rhs) { |
||||||
|
return lhs->key_ == rhs->key_; |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
struct Hash { |
||||||
|
uint64_t operator()(const Node* node) { |
||||||
|
return std::hash<uint64_t>()(node->key_); |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
static void ClearNode(Node* node) {} |
||||||
|
|
||||||
|
EvictableHashTable<Node, Hash, Equal> map_; |
||||||
|
}; |
||||||
|
|
||||||
|
TEST_F(HashTableTest, TestInsert) { |
||||||
|
const uint64_t max_keys = 1024 * 1024; |
||||||
|
|
||||||
|
// insert
|
||||||
|
for (uint64_t k = 0; k < max_keys; ++k) { |
||||||
|
map_.Insert(Node(k, std::string(1000, k % 255))); |
||||||
|
} |
||||||
|
|
||||||
|
// verify
|
||||||
|
for (uint64_t k = 0; k < max_keys; ++k) { |
||||||
|
Node val; |
||||||
|
port::RWMutex* rlock; |
||||||
|
assert(map_.Find(Node(k), &val, &rlock)); |
||||||
|
rlock->ReadUnlock(); |
||||||
|
assert(val.val_ == std::string(1000, k % 255)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
TEST_F(HashTableTest, TestErase) { |
||||||
|
const uint64_t max_keys = 1024 * 1024; |
||||||
|
// insert
|
||||||
|
for (uint64_t k = 0; k < max_keys; ++k) { |
||||||
|
map_.Insert(Node(k, std::string(1000, k % 255))); |
||||||
|
} |
||||||
|
|
||||||
|
// erase a few keys randomly
|
||||||
|
std::set<uint64_t> erased; |
||||||
|
for (int i = 0; i < 1024; ++i) { |
||||||
|
uint64_t k = random() % max_keys; |
||||||
|
if (erased.find(k) != erased.end()) { |
||||||
|
continue; |
||||||
|
} |
||||||
|
assert(map_.Erase(Node(k), /*ret=*/nullptr)); |
||||||
|
erased.insert(k); |
||||||
|
} |
||||||
|
|
||||||
|
// verify
|
||||||
|
for (uint64_t k = 0; k < max_keys; ++k) { |
||||||
|
Node val; |
||||||
|
port::RWMutex* rlock = nullptr; |
||||||
|
bool status = map_.Find(Node(k), &val, &rlock); |
||||||
|
if (erased.find(k) == erased.end()) { |
||||||
|
assert(status); |
||||||
|
rlock->ReadUnlock(); |
||||||
|
assert(val.val_ == std::string(1000, k % 255)); |
||||||
|
} else { |
||||||
|
assert(!status); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
TEST_F(EvictableHashTableTest, TestEvict) { |
||||||
|
const uint64_t max_keys = 1024 * 1024; |
||||||
|
|
||||||
|
// insert
|
||||||
|
for (uint64_t k = 0; k < max_keys; ++k) { |
||||||
|
map_.Insert(new Node(k, std::string(1000, k % 255))); |
||||||
|
} |
||||||
|
|
||||||
|
// verify
|
||||||
|
for (uint64_t k = 0; k < max_keys; ++k) { |
||||||
|
Node* val = map_.Evict(); |
||||||
|
// unfortunately we can't predict eviction value since it is from any one of
|
||||||
|
// the lock stripe
|
||||||
|
assert(val); |
||||||
|
assert(val->val_ == std::string(1000, val->key_ % 255)); |
||||||
|
delete val; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace rocksdb
|
||||||
|
|
||||||
|
int main(int argc, char** argv) { |
||||||
|
::testing::InitGoogleTest(&argc, argv); |
||||||
|
return RUN_ALL_TESTS(); |
||||||
|
} |
@ -0,0 +1,170 @@ |
|||||||
|
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under the BSD-style license found in the
|
||||||
|
// LICENSE file in the root directory of this source tree. An additional grant
|
||||||
|
// of patent rights can be found in the PATENTS file in the same directory.
|
||||||
|
//
|
||||||
|
#pragma once |
||||||
|
|
||||||
|
#include <atomic> |
||||||
|
|
||||||
|
#include "util/mutexlock.h" |
||||||
|
|
||||||
|
namespace rocksdb { |
||||||
|
|
||||||
|
// LRU element definition
|
||||||
|
//
|
||||||
|
// Any object that needs to be part of the LRU algorithm should extend this
|
||||||
|
// class
|
||||||
|
template <class T> |
||||||
|
struct LRUElement { |
||||||
|
explicit LRUElement() : next_(nullptr), prev_(nullptr), refs_(0) {} |
||||||
|
|
||||||
|
virtual ~LRUElement() { assert(!refs_); } |
||||||
|
|
||||||
|
T* next_; |
||||||
|
T* prev_; |
||||||
|
std::atomic<size_t> refs_; |
||||||
|
}; |
||||||
|
|
||||||
|
// LRU implementation
|
||||||
|
//
|
||||||
|
// In place LRU implementation. There is no copy or allocation involved when
|
||||||
|
// inserting or removing an element. This makes the data structure slim
|
||||||
|
template <class T> |
||||||
|
class LRUList { |
||||||
|
public: |
||||||
|
virtual ~LRUList() { |
||||||
|
MutexLock _(&lock_); |
||||||
|
assert(!head_); |
||||||
|
assert(!tail_); |
||||||
|
} |
||||||
|
|
||||||
|
// Push element into the LRU at the cold end
|
||||||
|
inline void Push(T* const t) { |
||||||
|
assert(t); |
||||||
|
assert(!t->next_); |
||||||
|
assert(!t->prev_); |
||||||
|
|
||||||
|
MutexLock _(&lock_); |
||||||
|
|
||||||
|
assert((!head_ && !tail_) || (head_ && tail_)); |
||||||
|
assert(!head_ || !head_->prev_); |
||||||
|
assert(!tail_ || !tail_->next_); |
||||||
|
|
||||||
|
t->next_ = head_; |
||||||
|
if (head_) { |
||||||
|
head_->prev_ = t; |
||||||
|
} |
||||||
|
|
||||||
|
head_ = t; |
||||||
|
if (!tail_) { |
||||||
|
tail_ = t; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
// Unlink the element from the LRU
|
||||||
|
inline void Unlink(T* const t) { |
||||||
|
MutexLock _(&lock_); |
||||||
|
UnlinkImpl(t); |
||||||
|
} |
||||||
|
|
||||||
|
// Evict an element from the LRU
|
||||||
|
inline T* Pop() { |
||||||
|
MutexLock _(&lock_); |
||||||
|
|
||||||
|
assert(tail_ && head_); |
||||||
|
assert(!tail_->next_); |
||||||
|
assert(!head_->prev_); |
||||||
|
|
||||||
|
T* t = head_; |
||||||
|
while (t && t->refs_) { |
||||||
|
t = t->next_; |
||||||
|
} |
||||||
|
|
||||||
|
if (!t) { |
||||||
|
// nothing can be evicted
|
||||||
|
return nullptr; |
||||||
|
} |
||||||
|
|
||||||
|
assert(!t->refs_); |
||||||
|
|
||||||
|
// unlike the element
|
||||||
|
UnlinkImpl(t); |
||||||
|
return t; |
||||||
|
} |
||||||
|
|
||||||
|
// Move the element from the front of the list to the back of the list
|
||||||
|
inline void Touch(T* const t) { |
||||||
|
MutexLock _(&lock_); |
||||||
|
UnlinkImpl(t); |
||||||
|
PushBackImpl(t); |
||||||
|
} |
||||||
|
|
||||||
|
// Check if the LRU is empty
|
||||||
|
inline bool IsEmpty() const { |
||||||
|
MutexLock _(&lock_); |
||||||
|
return !head_ && !tail_; |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
// Unlink an element from the LRU
|
||||||
|
void UnlinkImpl(T* const t) { |
||||||
|
assert(t); |
||||||
|
|
||||||
|
lock_.AssertHeld(); |
||||||
|
|
||||||
|
assert(head_ && tail_); |
||||||
|
assert(t->prev_ || head_ == t); |
||||||
|
assert(t->next_ || tail_ == t); |
||||||
|
|
||||||
|
if (t->prev_) { |
||||||
|
t->prev_->next_ = t->next_; |
||||||
|
} |
||||||
|
if (t->next_) { |
||||||
|
t->next_->prev_ = t->prev_; |
||||||
|
} |
||||||
|
|
||||||
|
if (tail_ == t) { |
||||||
|
tail_ = tail_->prev_; |
||||||
|
} |
||||||
|
if (head_ == t) { |
||||||
|
head_ = head_->next_; |
||||||
|
} |
||||||
|
|
||||||
|
t->next_ = t->prev_ = nullptr; |
||||||
|
} |
||||||
|
|
||||||
|
// Insert an element at the hot end
|
||||||
|
inline void PushBack(T* const t) { |
||||||
|
MutexLock _(&lock_); |
||||||
|
PushBackImpl(t); |
||||||
|
} |
||||||
|
|
||||||
|
inline void PushBackImpl(T* const t) { |
||||||
|
assert(t); |
||||||
|
assert(!t->next_); |
||||||
|
assert(!t->prev_); |
||||||
|
|
||||||
|
lock_.AssertHeld(); |
||||||
|
|
||||||
|
assert((!head_ && !tail_) || (head_ && tail_)); |
||||||
|
assert(!head_ || !head_->prev_); |
||||||
|
assert(!tail_ || !tail_->next_); |
||||||
|
|
||||||
|
t->prev_ = tail_; |
||||||
|
if (tail_) { |
||||||
|
tail_->next_ = t; |
||||||
|
} |
||||||
|
|
||||||
|
tail_ = t; |
||||||
|
if (!head_) { |
||||||
|
head_ = tail_; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
mutable port::Mutex lock_; // syncronization primitive
|
||||||
|
T* head_ = nullptr; // front (cold)
|
||||||
|
T* tail_ = nullptr; // back (hot)
|
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace rocksdb
|
Loading…
Reference in new issue