CompactedDBImpl::MultiGet() for better CuckooTable performance

Summary:
Add the MultiGet API to allow prefetching.
With file size of 1.5G, I configured it to have 0.9 hash ratio that can
fill With 115M keys and result in 2 hash functions, the lookup QPS is
~4.9M/s  vs. 3M/s for Get().
It is tricky to set the parameters right. Since files size is determined
by power-of-two factor, that means # of keys is fixed in each file. With
big file size (thus smaller # of files), we will have more chance to
waste lot of space in the last file - lower space utilization as a
result. Using smaller file size can improve the situation, but that
harms lookup speed.

Test Plan: db_bench

Reviewers: yhchiang, sdong, igor

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D23673
main
Lei Jin 10 years ago
parent 3c68006109
commit fbd2dafc9f
  1. 13
      db/db_bench.cc
  2. 18
      db/db_test.cc
  3. 54
      utilities/compacted_db/compacted_db_impl.cc
  4. 7
      utilities/compacted_db/compacted_db_impl.h

@ -167,6 +167,8 @@ DEFINE_int32(value_size, 100, "Size of each value");
DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator"); DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
DEFINE_int64(batch_size, 1, "Batch size");
static bool ValidateKeySize(const char* flagname, int32_t value) { static bool ValidateKeySize(const char* flagname, int32_t value) {
return true; return true;
} }
@ -1265,6 +1267,8 @@ class Benchmark {
} else if (name == Slice("readrandomfast")) { } else if (name == Slice("readrandomfast")) {
method = &Benchmark::ReadRandomFast; method = &Benchmark::ReadRandomFast;
} else if (name == Slice("multireadrandom")) { } else if (name == Slice("multireadrandom")) {
entries_per_batch_ = FLAGS_batch_size;
fprintf(stderr, "entries_per_batch_ = %ld\n", entries_per_batch_);
method = &Benchmark::MultiReadRandom; method = &Benchmark::MultiReadRandom;
} else if (name == Slice("readmissing")) { } else if (name == Slice("readmissing")) {
++key_size_; ++key_size_;
@ -2076,6 +2080,7 @@ class Benchmark {
void ReadRandomFast(ThreadState* thread) { void ReadRandomFast(ThreadState* thread) {
int64_t read = 0; int64_t read = 0;
int64_t found = 0; int64_t found = 0;
int64_t nonexist = 0;
ReadOptions options(FLAGS_verify_checksum, true); ReadOptions options(FLAGS_verify_checksum, true);
Slice key = AllocateKey(); Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data()); std::unique_ptr<const char[]> key_guard(key.data());
@ -2096,13 +2101,17 @@ class Benchmark {
if (db->Get(options, key, &value).ok()) { if (db->Get(options, key, &value).ok()) {
++found; ++found;
} }
if (key_rand >= FLAGS_num) {
++nonexist;
}
} }
thread->stats.FinishedOps(db, 100); thread->stats.FinishedOps(db, 100);
} while (!duration.Done(100)); } while (!duration.Done(100));
char msg[100]; char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n", snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found, "
found, read); "issued %" PRIu64 " non-exist keys)\n",
found, read, nonexist);
thread->stats.AddMessage(msg); thread->stats.AddMessage(msg);

@ -1342,6 +1342,24 @@ TEST(DBTest, CompactedDB) {
ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii")); ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii"));
ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj")); ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj"));
ASSERT_EQ("NOT_FOUND", Get("kkk")); ASSERT_EQ("NOT_FOUND", Get("kkk"));
// MultiGet
std::vector<std::string> values;
std::vector<Status> status_list = dbfull()->MultiGet(ReadOptions(),
std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
Slice("ggg"), Slice("iii"), Slice("kkk")}),
&values);
ASSERT_EQ(status_list.size(), 6);
ASSERT_EQ(values.size(), 6);
ASSERT_OK(status_list[0]);
ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]);
ASSERT_TRUE(status_list[1].IsNotFound());
ASSERT_OK(status_list[2]);
ASSERT_EQ(DummyString(kFileSize / 2, 'e'), values[2]);
ASSERT_TRUE(status_list[3].IsNotFound());
ASSERT_OK(status_list[4]);
ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]);
ASSERT_TRUE(status_list[5].IsNotFound());
} }
// Make sure that when options.block_cache is set, after a new table is // Make sure that when options.block_cache is set, after a new table is

@ -23,8 +23,7 @@ CompactedDBImpl::CompactedDBImpl(
CompactedDBImpl::~CompactedDBImpl() { CompactedDBImpl::~CompactedDBImpl() {
} }
Status CompactedDBImpl::Get(const ReadOptions& options, size_t CompactedDBImpl::FindFile(const Slice& key) {
ColumnFamilyHandle*, const Slice& key, std::string* value) {
size_t left = 0; size_t left = 0;
size_t right = files_.num_files - 1; size_t right = files_.num_files - 1;
while (left < right) { while (left < right) {
@ -40,7 +39,12 @@ Status CompactedDBImpl::Get(const ReadOptions& options,
right = mid; right = mid;
} }
} }
const FdWithKeyRange& f = files_.files[right]; return right;
}
Status CompactedDBImpl::Get(const ReadOptions& options,
ColumnFamilyHandle*, const Slice& key, std::string* value) {
const FdWithKeyRange& f = files_.files[FindFile(key)];
bool value_found; bool value_found;
MergeContext merge_context; MergeContext merge_context;
@ -64,6 +68,50 @@ Status CompactedDBImpl::Get(const ReadOptions& options,
return Status::NotFound(); return Status::NotFound();
} }
std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
autovector<TableReader*, 16> reader_list;
for (const auto& key : keys) {
const FdWithKeyRange& f = files_.files[FindFile(key)];
if (user_comparator_->Compare(key, ExtractUserKey(f.smallest_key)) < 0) {
reader_list.push_back(nullptr);
} else {
LookupKey lkey(key, kMaxSequenceNumber);
f.fd.table_reader->Prepare(lkey.internal_key());
reader_list.push_back(f.fd.table_reader);
}
}
std::vector<Status> statuses(keys.size(), Status::NotFound());
values->resize(keys.size());
bool value_found;
MergeContext merge_context;
Version::Saver saver;
saver.ucmp = user_comparator_;
saver.value_found = &value_found;
saver.merge_operator = nullptr;
saver.merge_context = &merge_context;
saver.logger = info_log_;
saver.statistics = statistics_;
int idx = 0;
for (auto* r : reader_list) {
if (r != nullptr) {
saver.state = Version::kNotFound;
saver.user_key = keys[idx];
saver.value = &(*values)[idx];
LookupKey lkey(keys[idx], kMaxSequenceNumber);
r->Get(options, lkey.internal_key(),
reinterpret_cast<void*>(&saver), SaveValue,
MarkKeyMayExist);
if (saver.state == Version::kFound) {
statuses[idx] = Status::OK();
}
}
++idx;
}
return statuses;
}
Status CompactedDBImpl::Init(const Options& options) { Status CompactedDBImpl::Init(const Options& options) {
mutex_.Lock(); mutex_.Lock();
ColumnFamilyDescriptor cf(kDefaultColumnFamilyName, ColumnFamilyDescriptor cf(kDefaultColumnFamilyName,

@ -24,6 +24,12 @@ class CompactedDBImpl : public DBImpl {
virtual Status Get(const ReadOptions& options, virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) override; std::string* value) override;
using DB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>&,
const std::vector<Slice>& keys, std::vector<std::string>* values)
override;
using DBImpl::Put; using DBImpl::Put;
virtual Status Put(const WriteOptions& options, virtual Status Put(const WriteOptions& options,
@ -74,6 +80,7 @@ class CompactedDBImpl : public DBImpl {
private: private:
friend class DB; friend class DB;
inline size_t FindFile(const Slice& key);
Status Init(const Options& options); Status Init(const Options& options);
ColumnFamilyData* cfd_; ColumnFamilyData* cfd_;

Loading…
Cancel
Save