diff --git a/db/db_impl.cc b/db/db_impl.cc index a250dbbcb..155f9096b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3993,33 +3993,48 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // At this point the mutex is unlocked if (status.ok()) { - WriteBatch* updates = nullptr; - if (write_batch_group.size() == 1) { - updates = write_batch_group[0]; - } else { - updates = &tmp_batch_; - for (size_t i = 0; i < write_batch_group.size(); ++i) { - WriteBatchInternal::Append(updates, write_batch_group[i]); - } + int total_count = 0; + uint64_t total_byte_size = 0; + for (auto b : write_batch_group) { + total_count += WriteBatchInternal::Count(b); + total_byte_size = WriteBatchInternal::AppendedByteSize( + total_byte_size, WriteBatchInternal::ByteSize(b)); } const SequenceNumber current_sequence = last_sequence + 1; - WriteBatchInternal::SetSequence(updates, current_sequence); - int my_batch_count = WriteBatchInternal::Count(updates); - last_sequence += my_batch_count; - const uint64_t batch_size = WriteBatchInternal::ByteSize(updates); + last_sequence += total_count; + // Record statistics - RecordTick(stats_, NUMBER_KEYS_WRITTEN, my_batch_count); - RecordTick(stats_, BYTES_WRITTEN, batch_size); + RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + PERF_TIMER_STOP(write_pre_and_post_process_time); + if (write_options.disableWAL) { flush_on_destroy_ = true; } - PERF_TIMER_STOP(write_pre_and_post_process_time); uint64_t log_size = 0; if (!write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); - Slice log_entry = WriteBatchInternal::Contents(updates); + + WriteBatch* merged_batch = nullptr; + if (write_batch_group.size() == 1) { + merged_batch = write_batch_group[0]; + } else { + // WAL needs all of the batches flattened into a single batch. + // We could avoid copying here with an iov-like AddRecord + // interface + merged_batch = &tmp_batch_; + for (auto b : write_batch_group) { + WriteBatchInternal::Append(merged_batch, b); + } + } + WriteBatchInternal::SetSequence(merged_batch, current_sequence); + + assert(WriteBatchInternal::Count(merged_batch) == total_count); + assert(WriteBatchInternal::ByteSize(merged_batch) == total_byte_size); + + Slice log_entry = WriteBatchInternal::Contents(merged_batch); status = logs_.back().writer->AddRecord(log_entry); total_log_size_ += log_entry.size(); alive_log_files_.back().AddSize(log_entry.size()); @@ -4049,34 +4064,41 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, status = directories_.GetWalDir()->Fsync(); } } + + if (merged_batch == &tmp_batch_) { + tmp_batch_.Clear(); + } } if (status.ok()) { PERF_TIMER_GUARD(write_memtable_time); status = WriteBatchInternal::InsertInto( - updates, column_family_memtables_.get(), - write_options.ignore_missing_column_families, 0, this, false); - // A non-OK status here indicates iteration failure (either in-memory - // writebatch corruption (very bad), or the client specified invalid - // column family). This will later on trigger bg_error_. + write_batch_group, current_sequence, column_family_memtables_.get(), + write_options.ignore_missing_column_families, + /*log_number*/ 0, this, /*dont_filter_deletes*/ false); + + // A non-OK status here indicates that the state implied by the + // WAL has diverged from the in-memory state. This could be + // because of a corrupt write_batch (very bad), or because the + // client specified an invalid column family and didn't specify + // ignore_missing_column_families. // - // Note that existing logic was not sound. Any partial failure writing - // into the memtable would result in a state that some write ops might - // have succeeded in memtable but Status reports error for all writes. + // Is setting bg_error_ enough here? This will at least stop + // compaction and fail any further writes. + if (!status.ok() && bg_error_.ok()) { + bg_error_ = status; + } SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence); } PERF_TIMER_START(write_pre_and_post_process_time); - if (updates == &tmp_batch_) { - tmp_batch_.Clear(); - } mutex_.Lock(); // internal stats - default_cf_internal_stats_->AddDBStats( - InternalStats::BYTES_WRITTEN, batch_size); + default_cf_internal_stats_->AddDBStats(InternalStats::BYTES_WRITTEN, + total_byte_size); default_cf_internal_stats_->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, - my_batch_count); + total_count); if (!write_options.disableWAL) { if (write_options.sync) { default_cf_internal_stats_->AddDBStats(InternalStats::WAL_FILE_SYNCED, diff --git a/db/write_batch.cc b/db/write_batch.cc index 925a05efd..e6e1acab3 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -591,6 +591,7 @@ class MemTableInserter : public WriteBatch::Handler { } return true; } + virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { Status seek_status; @@ -647,8 +648,8 @@ class MemTableInserter : public WriteBatch::Handler { return Status::OK(); } - virtual Status DeleteCF(uint32_t column_family_id, - const Slice& key) override { + Status DeleteImpl(uint32_t column_family_id, const Slice& key, + ValueType delete_type) { Status seek_status; if (!SeekToColumnFamily(column_family_id, &seek_status)) { ++sequence_; @@ -671,40 +672,20 @@ class MemTableInserter : public WriteBatch::Handler { return Status::OK(); } } - mem->Add(sequence_, kTypeDeletion, key, Slice()); + mem->Add(sequence_, delete_type, key, Slice()); sequence_++; cf_mems_->CheckMemtableFull(); return Status::OK(); } + virtual Status DeleteCF(uint32_t column_family_id, + const Slice& key) override { + return DeleteImpl(column_family_id, key, kTypeDeletion); + } + virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override { - Status seek_status; - if (!SeekToColumnFamily(column_family_id, &seek_status)) { - ++sequence_; - return seek_status; - } - MemTable* mem = cf_mems_->GetMemTable(); - auto* moptions = mem->GetMemTableOptions(); - if (!dont_filter_deletes_ && moptions->filter_deletes) { - SnapshotImpl read_from_snapshot; - read_from_snapshot.number_ = sequence_; - ReadOptions ropts; - ropts.snapshot = &read_from_snapshot; - std::string value; - auto cf_handle = cf_mems_->GetColumnFamilyHandle(); - if (cf_handle == nullptr) { - cf_handle = db_->DefaultColumnFamily(); - } - if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) { - RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES); - return Status::OK(); - } - } - mem->Add(sequence_, kTypeSingleDeletion, key, Slice()); - sequence_++; - cf_mems_->CheckMemtableFull(); - return Status::OK(); + return DeleteImpl(column_family_id, key, kTypeSingleDeletion); } virtual Status MergeCF(uint32_t column_family_id, const Slice& key, @@ -791,18 +772,32 @@ class MemTableInserter : public WriteBatch::Handler { // This function can only be called in these conditions: // 1) During Recovery() -// 2) during Write(), in a single-threaded write thread -// The reason is that it calles ColumnFamilyMemTablesImpl::Seek(), which needs -// to be called from a single-threaded write thread (or while holding DB mutex) -Status WriteBatchInternal::InsertInto(const WriteBatch* b, +// 2) During Write(), in a single-threaded write thread +// The reason is that it calls memtables->Seek(), which has a stateful cache +Status WriteBatchInternal::InsertInto(const autovector& batches, + SequenceNumber sequence, + ColumnFamilyMemTables* memtables, + bool ignore_missing_column_families, + uint64_t log_number, DB* db, + const bool dont_filter_deletes) { + MemTableInserter inserter(sequence, memtables, ignore_missing_column_families, + log_number, db, dont_filter_deletes); + Status rv = Status::OK(); + for (size_t i = 0; i < batches.size() && rv.ok(); ++i) { + rv = batches[i]->Iterate(&inserter); + } + return rv; +} + +Status WriteBatchInternal::InsertInto(const WriteBatch* batch, ColumnFamilyMemTables* memtables, bool ignore_missing_column_families, uint64_t log_number, DB* db, const bool dont_filter_deletes) { - MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables, + MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables, ignore_missing_column_families, log_number, db, dont_filter_deletes); - return b->Iterate(&inserter); + return batch->Iterate(&inserter); } void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { @@ -821,4 +816,13 @@ void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { std::memory_order_relaxed); } +size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize, + size_t rightByteSize) { + if (leftByteSize == 0 || rightByteSize == 0) { + return leftByteSize + rightByteSize; + } else { + return leftByteSize + rightByteSize - kHeader; + } +} + } // namespace rocksdb diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 04db461a0..3ae4edc7a 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -12,6 +12,7 @@ #include "rocksdb/write_batch.h" #include "rocksdb/db.h" #include "rocksdb/options.h" +#include "util/autovector.h" namespace rocksdb { @@ -112,17 +113,28 @@ class WriteBatchInternal { static void SetContents(WriteBatch* batch, const Slice& contents); - // Inserts batch entries into memtable - // If dont_filter_deletes is false AND options.filter_deletes is true, - // then --> Drops deletes in batch if db->KeyMayExist returns false - // If ignore_missing_column_families == true. WriteBatch referencing - // non-existing column family should be ignored. - // However, if ignore_missing_column_families == false, any WriteBatch - // referencing non-existing column family will return a InvalidArgument() - // failure. + // Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive. + // + // If dont_filter_deletes is false AND options.filter_deletes is true + // AND db->KeyMayExist is false, then a Delete won't modify the memtable. + // + // If ignore_missing_column_families == true. WriteBatch + // referencing non-existing column family will be ignored. + // If ignore_missing_column_families == false, processing of the + // batches will be stopped if a reference is found to a non-existing + // column family and InvalidArgument() will be returned. The writes + // in batches may be only partially applied at that point. // // If log_number is non-zero, the memtable will be updated only if - // memtables->GetLogNumber() >= log_number + // memtables->GetLogNumber() >= log_number. + static Status InsertInto(const autovector& batches, + SequenceNumber sequence, + ColumnFamilyMemTables* memtables, + bool ignore_missing_column_families = false, + uint64_t log_number = 0, DB* db = nullptr, + const bool dont_filter_deletes = true); + + // Convenience form of InsertInto when you have only one batch static Status InsertInto(const WriteBatch* batch, ColumnFamilyMemTables* memtables, bool ignore_missing_column_families = false, @@ -130,6 +142,10 @@ class WriteBatchInternal { const bool dont_filter_deletes = true); static void Append(WriteBatch* dst, const WriteBatch* src); + + // Returns the byte size of appending a WriteBatch with ByteSize + // leftByteSize and a WriteBatch with ByteSize rightByteSize + static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize); }; } // namespace rocksdb