Don't merge WriteBatch-es if WAL is disabled

Summary:
There's no need for WriteImpl to flatten the write batch group
into a single WriteBatch if the WAL is disabled.  This diff moves the
flattening into the WAL step, and skips flattening entirely if it isn't
needed.  It's good for about 5% speedup on a multi-threaded workload
with no WAL.

This diff also adds clarifying comments about the chance for partial
failure of WriteBatchInternal::InsertInto, and always sets bg_error_ if
the memtable state diverges from the logged state or if a WriteBatch
succeeds only partially.

Benchmark for speedup:
  db_bench -benchmarks=fillrandom -threads=16 -batch_size=1 -memtablerep=skip_list -value_size=0 --num=200000 -level0_slowdown_writes_trigger=9999 -level0_stop_writes_trigger=9999 -disable_auto_compactions --max_write_buffer_number=8 -max_background_flushes=8 --disable_wal --write_buffer_size=160000000

Test Plan: asserts + make check

Reviewers: sdong, igor

Reviewed By: igor

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D50583
main
Nathan Bronson 9 years ago
parent 56245ddcf5
commit 6ce42dd075
  1. 82
      db/db_impl.cc
  2. 74
      db/write_batch.cc
  3. 34
      db/write_batch_internal.h

@ -3993,33 +3993,48 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// At this point the mutex is unlocked
if (status.ok()) {
WriteBatch* updates = nullptr;
if (write_batch_group.size() == 1) {
updates = write_batch_group[0];
} else {
updates = &tmp_batch_;
for (size_t i = 0; i < write_batch_group.size(); ++i) {
WriteBatchInternal::Append(updates, write_batch_group[i]);
}
int total_count = 0;
uint64_t total_byte_size = 0;
for (auto b : write_batch_group) {
total_count += WriteBatchInternal::Count(b);
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(b));
}
const SequenceNumber current_sequence = last_sequence + 1;
WriteBatchInternal::SetSequence(updates, current_sequence);
int my_batch_count = WriteBatchInternal::Count(updates);
last_sequence += my_batch_count;
const uint64_t batch_size = WriteBatchInternal::ByteSize(updates);
last_sequence += total_count;
// Record statistics
RecordTick(stats_, NUMBER_KEYS_WRITTEN, my_batch_count);
RecordTick(stats_, BYTES_WRITTEN, batch_size);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
PERF_TIMER_STOP(write_pre_and_post_process_time);
if (write_options.disableWAL) {
flush_on_destroy_ = true;
}
PERF_TIMER_STOP(write_pre_and_post_process_time);
uint64_t log_size = 0;
if (!write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
Slice log_entry = WriteBatchInternal::Contents(updates);
WriteBatch* merged_batch = nullptr;
if (write_batch_group.size() == 1) {
merged_batch = write_batch_group[0];
} else {
// WAL needs all of the batches flattened into a single batch.
// We could avoid copying here with an iov-like AddRecord
// interface
merged_batch = &tmp_batch_;
for (auto b : write_batch_group) {
WriteBatchInternal::Append(merged_batch, b);
}
}
WriteBatchInternal::SetSequence(merged_batch, current_sequence);
assert(WriteBatchInternal::Count(merged_batch) == total_count);
assert(WriteBatchInternal::ByteSize(merged_batch) == total_byte_size);
Slice log_entry = WriteBatchInternal::Contents(merged_batch);
status = logs_.back().writer->AddRecord(log_entry);
total_log_size_ += log_entry.size();
alive_log_files_.back().AddSize(log_entry.size());
@ -4049,34 +4064,41 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
status = directories_.GetWalDir()->Fsync();
}
}
if (merged_batch == &tmp_batch_) {
tmp_batch_.Clear();
}
}
if (status.ok()) {
PERF_TIMER_GUARD(write_memtable_time);
status = WriteBatchInternal::InsertInto(
updates, column_family_memtables_.get(),
write_options.ignore_missing_column_families, 0, this, false);
// A non-OK status here indicates iteration failure (either in-memory
// writebatch corruption (very bad), or the client specified invalid
// column family). This will later on trigger bg_error_.
write_batch_group, current_sequence, column_family_memtables_.get(),
write_options.ignore_missing_column_families,
/*log_number*/ 0, this, /*dont_filter_deletes*/ false);
// A non-OK status here indicates that the state implied by the
// WAL has diverged from the in-memory state. This could be
// because of a corrupt write_batch (very bad), or because the
// client specified an invalid column family and didn't specify
// ignore_missing_column_families.
//
// Note that existing logic was not sound. Any partial failure writing
// into the memtable would result in a state that some write ops might
// have succeeded in memtable but Status reports error for all writes.
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
if (!status.ok() && bg_error_.ok()) {
bg_error_ = status;
}
SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence);
}
PERF_TIMER_START(write_pre_and_post_process_time);
if (updates == &tmp_batch_) {
tmp_batch_.Clear();
}
mutex_.Lock();
// internal stats
default_cf_internal_stats_->AddDBStats(
InternalStats::BYTES_WRITTEN, batch_size);
default_cf_internal_stats_->AddDBStats(InternalStats::BYTES_WRITTEN,
total_byte_size);
default_cf_internal_stats_->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN,
my_batch_count);
total_count);
if (!write_options.disableWAL) {
if (write_options.sync) {
default_cf_internal_stats_->AddDBStats(InternalStats::WAL_FILE_SYNCED,

@ -591,6 +591,7 @@ class MemTableInserter : public WriteBatch::Handler {
}
return true;
}
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
Status seek_status;
@ -647,8 +648,8 @@ class MemTableInserter : public WriteBatch::Handler {
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
Status DeleteImpl(uint32_t column_family_id, const Slice& key,
ValueType delete_type) {
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
@ -671,40 +672,20 @@ class MemTableInserter : public WriteBatch::Handler {
return Status::OK();
}
}
mem->Add(sequence_, kTypeDeletion, key, Slice());
mem->Add(sequence_, delete_type, key, Slice());
sequence_++;
cf_mems_->CheckMemtableFull();
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
return DeleteImpl(column_family_id, key, kTypeDeletion);
}
virtual Status SingleDeleteCF(uint32_t column_family_id,
const Slice& key) override {
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
if (!dont_filter_deletes_ && moptions->filter_deletes) {
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions ropts;
ropts.snapshot = &read_from_snapshot;
std::string value;
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES);
return Status::OK();
}
}
mem->Add(sequence_, kTypeSingleDeletion, key, Slice());
sequence_++;
cf_mems_->CheckMemtableFull();
return Status::OK();
return DeleteImpl(column_family_id, key, kTypeSingleDeletion);
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
@ -791,18 +772,32 @@ class MemTableInserter : public WriteBatch::Handler {
// This function can only be called in these conditions:
// 1) During Recovery()
// 2) during Write(), in a single-threaded write thread
// The reason is that it calles ColumnFamilyMemTablesImpl::Seek(), which needs
// to be called from a single-threaded write thread (or while holding DB mutex)
Status WriteBatchInternal::InsertInto(const WriteBatch* b,
// 2) During Write(), in a single-threaded write thread
// The reason is that it calls memtables->Seek(), which has a stateful cache
Status WriteBatchInternal::InsertInto(const autovector<WriteBatch*>& batches,
SequenceNumber sequence,
ColumnFamilyMemTables* memtables,
bool ignore_missing_column_families,
uint64_t log_number, DB* db,
const bool dont_filter_deletes) {
MemTableInserter inserter(sequence, memtables, ignore_missing_column_families,
log_number, db, dont_filter_deletes);
Status rv = Status::OK();
for (size_t i = 0; i < batches.size() && rv.ok(); ++i) {
rv = batches[i]->Iterate(&inserter);
}
return rv;
}
Status WriteBatchInternal::InsertInto(const WriteBatch* batch,
ColumnFamilyMemTables* memtables,
bool ignore_missing_column_families,
uint64_t log_number, DB* db,
const bool dont_filter_deletes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables,
MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables,
ignore_missing_column_families, log_number, db,
dont_filter_deletes);
return b->Iterate(&inserter);
return batch->Iterate(&inserter);
}
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
@ -821,4 +816,13 @@ void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
std::memory_order_relaxed);
}
size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
size_t rightByteSize) {
if (leftByteSize == 0 || rightByteSize == 0) {
return leftByteSize + rightByteSize;
} else {
return leftByteSize + rightByteSize - kHeader;
}
}
} // namespace rocksdb

@ -12,6 +12,7 @@
#include "rocksdb/write_batch.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "util/autovector.h"
namespace rocksdb {
@ -112,17 +113,28 @@ class WriteBatchInternal {
static void SetContents(WriteBatch* batch, const Slice& contents);
// Inserts batch entries into memtable
// If dont_filter_deletes is false AND options.filter_deletes is true,
// then --> Drops deletes in batch if db->KeyMayExist returns false
// If ignore_missing_column_families == true. WriteBatch referencing
// non-existing column family should be ignored.
// However, if ignore_missing_column_families == false, any WriteBatch
// referencing non-existing column family will return a InvalidArgument()
// failure.
// Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive.
//
// If dont_filter_deletes is false AND options.filter_deletes is true
// AND db->KeyMayExist is false, then a Delete won't modify the memtable.
//
// If ignore_missing_column_families == true. WriteBatch
// referencing non-existing column family will be ignored.
// If ignore_missing_column_families == false, processing of the
// batches will be stopped if a reference is found to a non-existing
// column family and InvalidArgument() will be returned. The writes
// in batches may be only partially applied at that point.
//
// If log_number is non-zero, the memtable will be updated only if
// memtables->GetLogNumber() >= log_number
// memtables->GetLogNumber() >= log_number.
static Status InsertInto(const autovector<WriteBatch*>& batches,
SequenceNumber sequence,
ColumnFamilyMemTables* memtables,
bool ignore_missing_column_families = false,
uint64_t log_number = 0, DB* db = nullptr,
const bool dont_filter_deletes = true);
// Convenience form of InsertInto when you have only one batch
static Status InsertInto(const WriteBatch* batch,
ColumnFamilyMemTables* memtables,
bool ignore_missing_column_families = false,
@ -130,6 +142,10 @@ class WriteBatchInternal {
const bool dont_filter_deletes = true);
static void Append(WriteBatch* dst, const WriteBatch* src);
// Returns the byte size of appending a WriteBatch with ByteSize
// leftByteSize and a WriteBatch with ByteSize rightByteSize
static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize);
};
} // namespace rocksdb

Loading…
Cancel
Save