BuildBatchGroup -- memcpy outside of lock

Summary: When building batch group, don't actually build a new batch since it requires heavy-weight mem copy and malloc. Only store references to the batches and build the batch group without lock held.

Test Plan:
`make check`

I am also planning to run performance tests. The workload that will benefit from this change is readwhilewriting. I will post the results once I have them.

Reviewers: dhruba, haobo, kailiu

Reviewed By: haobo

CC: leveldb, xjin

Differential Revision: https://reviews.facebook.net/D15063
main
Igor Canadi 11 years ago
parent 481c77e526
commit 7d9f21cf23
  1. 38
      db/db_impl.cc
  2. 4
      db/db_impl.h
  3. 63
      db/db_test.cc

@ -56,6 +56,7 @@
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
#include "util/stop_watch.h"
#include "util/autovector.h"
namespace rocksdb {
@ -2969,12 +2970,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions
// TODO: BuildBatchGroup physically concatenate/copy all write batches into
// a new one. Mem copy is done with the lock held. Ideally, we only need
// the lock to obtain the last_writer and the references to all batches.
// Creation (copy) of the merged batch could have been done outside of the
// lock protected region.
WriteBatch* updates = BuildBatchGroup(&last_writer);
autovector<WriteBatch*> write_batch_group;
BuildBatchGroup(&last_writer, &write_batch_group);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
@ -2982,6 +2979,16 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// into mem_.
{
mutex_.Unlock();
WriteBatch* updates = nullptr;
if (write_batch_group.size() == 1) {
updates = write_batch_group[0];
} else {
updates = &tmp_batch_;
for (size_t i = 0; i < write_batch_group.size(); ++i) {
WriteBatchInternal::Append(updates, write_batch_group[i]);
}
}
const SequenceNumber current_sequence = last_sequence + 1;
WriteBatchInternal::SetSequence(updates, current_sequence);
int my_batch_count = WriteBatchInternal::Count(updates);
@ -3027,12 +3034,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
SetTickerCount(options_.statistics.get(),
SEQUENCE_NUMBER, last_sequence);
}
if (updates == &tmp_batch_) tmp_batch_.Clear();
mutex_.Lock();
if (status.ok()) {
versions_->SetLastSequence(last_sequence);
}
}
if (updates == &tmp_batch_) tmp_batch_.Clear();
}
if (options_.paranoid_checks && !status.ok() && bg_error_.ok()) {
bg_error_ = status; // stop compaction & fail any further writes
@ -3060,13 +3067,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-nullptr batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
void DBImpl::BuildBatchGroup(Writer** last_writer,
autovector<WriteBatch*>* write_batch_group) {
assert(!writers_.empty());
Writer* first = writers_.front();
WriteBatch* result = first->batch;
assert(result != nullptr);
assert(first->batch != nullptr);
size_t size = WriteBatchInternal::ByteSize(first->batch);
write_batch_group->push_back(first->batch);
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
@ -3099,18 +3107,10 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
break;
}
// Append to *reuslt
if (result == first->batch) {
// Switch to temporary batch instead of disturbing caller's batch
result = &tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch);
}
WriteBatchInternal::Append(result, w->batch);
write_batch_group->push_back(w->batch);
}
*last_writer = w;
}
return result;
}
// This function computes the amount of time in microseconds by which a write

@ -22,6 +22,7 @@
#include "port/port.h"
#include "util/stats_logger.h"
#include "memtablelist.h"
#include "util/autovector.h"
namespace rocksdb {
@ -291,7 +292,8 @@ class DBImpl : public DB {
// the superversion outside of mutex
Status MakeRoomForWrite(bool force /* compact even if there is room? */,
SuperVersion** superversion_to_free);
WriteBatch* BuildBatchGroup(Writer** last_writer);
void BuildBatchGroup(Writer** last_writer,
autovector<WriteBatch*>* write_batch_group);
// Force current memtable contents to be flushed.
Status FlushMemTable(const FlushOptions& options);

@ -4333,6 +4333,69 @@ TEST(DBTest, MultiThreaded) {
} while (ChangeOptions());
}
// Group commit test:
namespace {
static const int kGCNumThreads = 4;
static const int kGCNumKeys = 1000;
struct GCThread {
DB* db;
int id;
std::atomic<bool> done;
};
static void GCThreadBody(void* arg) {
GCThread* t = reinterpret_cast<GCThread*>(arg);
int id = t->id;
DB* db = t->db;
WriteOptions wo;
for (int i = 0; i < kGCNumKeys; ++i) {
std::string kv(std::to_string(i + id * kGCNumKeys));
ASSERT_OK(db->Put(wo, kv, kv));
}
t->done = true;
}
} // namespace
TEST(DBTest, GroupCommitTest) {
do {
// Start threads
GCThread thread[kGCNumThreads];
for (int id = 0; id < kGCNumThreads; id++) {
thread[id].id = id;
thread[id].db = db_;
thread[id].done = false;
env_->StartThread(GCThreadBody, &thread[id]);
}
for (int id = 0; id < kGCNumThreads; id++) {
while (thread[id].done == false) {
env_->SleepForMicroseconds(100000);
}
}
std::vector<std::string> expected_db;
for (int i = 0; i < kGCNumThreads * kGCNumKeys; ++i) {
expected_db.push_back(std::to_string(i));
}
sort(expected_db.begin(), expected_db.end());
Iterator* itr = db_->NewIterator(ReadOptions());
itr->SeekToFirst();
for (auto x : expected_db) {
ASSERT_TRUE(itr->Valid());
ASSERT_EQ(itr->key().ToString(), x);
ASSERT_EQ(itr->value().ToString(), x);
itr->Next();
}
ASSERT_TRUE(!itr->Valid());
} while (ChangeOptions());
}
namespace {
typedef std::map<std::string, std::string> KVMap;
}

Loading…
Cancel
Save