diff --git a/db/db_impl.cc b/db/db_impl.cc index 2aeb45fb8..bbf559d99 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -955,7 +955,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, WriteBatchInternal::SetContents(&batch, record); status = WriteBatchInternal::InsertInto( - &batch, column_family_memtables_.get(), log_number); + &batch, column_family_memtables_.get(), true, log_number); MaybeIgnoreError(&status); if (!status.ok()) { @@ -3311,7 +3311,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { StartPerfTimer(&write_memtable_timer); status = WriteBatchInternal::InsertInto( - updates, column_family_memtables_.get(), 0, this, false); + updates, column_family_memtables_.get(), false, 0, this, false); BumpPerfTime(&perf_context.write_memtable_time, &write_memtable_timer); if (!status.ok()) { diff --git a/db/write_batch.cc b/db/write_batch.cc index 680bd1ccd..5747c5302 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -234,14 +234,17 @@ class MemTableInserter : public WriteBatch::Handler { public: SequenceNumber sequence_; ColumnFamilyMemTables* cf_mems_; + bool recovery_; uint64_t log_number_; DBImpl* db_; const bool dont_filter_deletes_; MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, - uint64_t log_number, DB* db, const bool dont_filter_deletes) + bool recovery, uint64_t log_number, DB* db, + const bool dont_filter_deletes) : sequence_(sequence), cf_mems_(cf_mems), + recovery_(recovery), log_number_(log_number), db_(reinterpret_cast(db)), dont_filter_deletes_(dont_filter_deletes) { @@ -251,19 +254,39 @@ class MemTableInserter : public WriteBatch::Handler { } } - bool IgnoreUpdate() { - return log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber(); - } - - virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value) { + bool SeekToColumnFamily(uint32_t column_family_id, Status* s) { bool found = cf_mems_->Seek(column_family_id); + if (recovery_ && (!found || log_number_ < cf_mems_->GetLogNumber())) { + // if in recovery envoronment: + // * If column family was not found, it might mean that the WAL write + // batch references to the column family that was dropped after the + // insert. We don't want to fail the whole write batch in that case -- we + // just ignore the update. + // * If log_number_ < cf_mems_->GetLogNumber(), this means that column + // family already contains updates from this log. We can't apply updates + // twice because of update-in-place or merge workloads -- ignore the + // update + *s = Status::OK(); + return false; + } if (!found) { - return Status::InvalidArgument( + assert(!recovery_); + // If the column family was not found in non-recovery enviornment + // (client's write code-path), we have to fail the write and return + // the failure status to the client. + *s = Status::InvalidArgument( "Invalid column family specified in write batch"); + return false; } - if (IgnoreUpdate()) { - return Status::OK(); + return true; + } + + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { + Status seek_status; + if (!SeekToColumnFamily(column_family_id, &seek_status)) { + ++sequence_; + return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); const Options* options = cf_mems_->GetFullOptions(); @@ -315,13 +338,10 @@ class MemTableInserter : public WriteBatch::Handler { virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) { - bool found = cf_mems_->Seek(column_family_id); - if (!found) { - return Status::InvalidArgument( - "Invalid column family specified in write batch"); - } - if (IgnoreUpdate()) { - return Status::OK(); + Status seek_status; + if (!SeekToColumnFamily(column_family_id, &seek_status)) { + ++sequence_; + return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); const Options* options = cf_mems_->GetFullOptions(); @@ -387,13 +407,10 @@ class MemTableInserter : public WriteBatch::Handler { } virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) { - bool found = cf_mems_->Seek(column_family_id); - if (!found) { - return Status::InvalidArgument( - "Invalid column family specified in write batch"); - } - if (IgnoreUpdate()) { - return Status::OK(); + Status seek_status; + if (!SeekToColumnFamily(column_family_id, &seek_status)) { + ++sequence_; + return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); const Options* options = cf_mems_->GetFullOptions(); @@ -421,10 +438,10 @@ class MemTableInserter : public WriteBatch::Handler { Status WriteBatchInternal::InsertInto(const WriteBatch* b, ColumnFamilyMemTables* memtables, - uint64_t log_number, DB* db, - const bool dont_filter_deletes) { + bool recovery, uint64_t log_number, + DB* db, const bool dont_filter_deletes) { MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables, - log_number, db, dont_filter_deletes); + recovery, log_number, db, dont_filter_deletes); return b->Iterate(&inserter); } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 7db21f1a3..e5ee045cd 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -90,12 +90,18 @@ class WriteBatchInternal { // Inserts batch entries into memtable // If dont_filter_deletes is false AND options.filter_deletes is true, // then --> Drops deletes in batch if db->KeyMayExist returns false - // If log_number is not-null, the memtable will be updated only if + // If recovery == true, this means InsertInto is executed on a recovery + // code-path. WriteBatch referencing a dropped column family can be + // found on a recovery code-path and should be ignored (recovery should not + // fail). Additionally, the memtable will be updated only if // memtables->GetLogNumber() >= log_number - // See MemTableInserter::IgnoreUpdate() + // However, if recovery == false, any WriteBatch referencing + // non-existing column family will return a failure. Also, log_number is + // ignored in that case static Status InsertInto(const WriteBatch* batch, ColumnFamilyMemTables* memtables, - uint64_t log_number = 0, DB* db = nullptr, + bool recovery = false, uint64_t log_number = 0, + DB* db = nullptr, const bool dont_filter_deletes = true); static void Append(WriteBatch* dst, const WriteBatch* src);