diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index a3122f5c5..fcb7a82a4 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -867,22 +867,33 @@ Status BlobDBImpl::SingleDelete(const WriteOptions& wopts, } Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { - class Handler1 : public WriteBatch::Handler { + class BlobInserter : public WriteBatch::Handler { + private: + BlobDBImpl* impl_; + SequenceNumber sequence_; + WriteBatch updates_blob_; + Status batch_rewrite_status_; + std::shared_ptr last_file_; + bool has_put_; + public: - explicit Handler1(BlobDBImpl* i) : impl(i), previous_put(false) {} + explicit BlobInserter(BlobDBImpl* impl, SequenceNumber seq) + : impl_(impl), sequence_(seq), has_put_(false) {} + + WriteBatch& updates_blob() { return updates_blob_; } + + Status batch_rewrite_status() { return batch_rewrite_status_; } + + std::shared_ptr& last_file() { return last_file_; } - BlobDBImpl* impl; - WriteBatch updates_blob; - Status batch_rewrite_status; - std::shared_ptr last_file; - bool previous_put; + bool has_put() { return has_put_; } virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value_unc) override { Slice newval; int32_t ttl_val = -1; - if (impl->bdb_options_.extract_ttl_fn) { - impl->bdb_options_.extract_ttl_fn(value_unc, &newval, &ttl_val); + if (impl_->bdb_options_.extract_ttl_fn) { + impl_->bdb_options_.extract_ttl_fn(value_unc, &newval, &ttl_val); } else { newval = value_unc; } @@ -894,22 +905,26 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { expiration = ttl_val + static_cast(cur_t); } std::shared_ptr bfile = - (ttl_val != -1) ? impl->SelectBlobFileTTL(expiration) - : ((last_file) ? last_file : impl->SelectBlobFile()); - if (last_file && last_file != bfile) { - batch_rewrite_status = Status::NotFound("too many blob files"); - return batch_rewrite_status; + (ttl_val != -1) + ? impl_->SelectBlobFileTTL(expiration) + : ((last_file_) ? last_file_ : impl_->SelectBlobFile()); + if (last_file_ && last_file_ != bfile) { + batch_rewrite_status_ = Status::NotFound("too many blob files"); + return batch_rewrite_status_; } if (!bfile) { - batch_rewrite_status = Status::NotFound("blob file not found"); - return batch_rewrite_status; + batch_rewrite_status_ = Status::NotFound("blob file not found"); + return batch_rewrite_status_; } + last_file_ = bfile; + has_put_ = true; + Slice value = value_unc; std::string compression_output; - if (impl->bdb_options_.compression != kNoCompression) { - CompressionType ct = impl->bdb_options_.compression; + if (impl_->bdb_options_.compression != kNoCompression) { + CompressionType ct = impl_->bdb_options_.compression; CompressionOptions compression_opts; value = CompressBlock(value_unc, compression_opts, &ct, kBlockBasedTableVersionFormat, Slice(), @@ -918,96 +933,99 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { std::string headerbuf; Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1); - - if (previous_put) { - impl->AppendSN(last_file, 0 /*sequence number*/); - previous_put = false; - } - - last_file = bfile; - std::string index_entry; - Status st = impl->AppendBlob(bfile, headerbuf, key, value, &index_entry); + Status st = impl_->AppendBlob(bfile, headerbuf, key, value, &index_entry); + if (st.ok()) { + impl_->AppendSN(last_file_, sequence_); + sequence_++; + } - if (expiration != -1) + if (expiration != -1) { extendTTL(&(bfile->ttl_range_), (uint32_t)expiration); + } if (!st.ok()) { - batch_rewrite_status = st; + batch_rewrite_status_ = st; } else { - previous_put = true; - WriteBatchInternal::Put(&updates_blob, column_family_id, key, + WriteBatchInternal::Put(&updates_blob_, column_family_id, key, index_entry); } return Status::OK(); } - virtual Status MergeCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { - batch_rewrite_status = - Status::NotSupported("Not supported operation in blob db."); - return batch_rewrite_status; - } - virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) override { - WriteBatchInternal::Delete(&updates_blob, column_family_id, key); + WriteBatchInternal::Delete(&updates_blob_, column_family_id, key); + sequence_++; return Status::OK(); } - virtual void LogData(const Slice& blob) override { - updates_blob.PutLogData(blob); + virtual Status SingleDeleteCF(uint32_t /*column_family_id*/, + const Slice& /*key*/) override { + batch_rewrite_status_ = + Status::NotSupported("Not supported operation in blob db."); + return batch_rewrite_status_; } - private: - }; + virtual Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/, + const Slice& /*value*/) override { + batch_rewrite_status_ = + Status::NotSupported("Not supported operation in blob db."); + return batch_rewrite_status_; + } - Handler1 handler1(this); - updates->Iterate(&handler1); + virtual void LogData(const Slice& blob) override { + updates_blob_.PutLogData(blob); + } + }; - Status s; - SequenceNumber lsn = db_impl_->GetLatestSequenceNumber(); + SequenceNumber sequence = db_impl_->GetLatestSequenceNumber() + 1; + BlobInserter blob_inserter(this, sequence); + updates->Iterate(&blob_inserter); - if (!handler1.batch_rewrite_status.ok()) { - return handler1.batch_rewrite_status; - } else { - s = db_->Write(opts, &(handler1.updates_blob)); + if (!blob_inserter.batch_rewrite_status().ok()) { + return blob_inserter.batch_rewrite_status(); } - if (!s.ok()) return s; - - if (handler1.previous_put) { - // this is the sequence number of the write. - SequenceNumber sn = WriteBatchInternal::Sequence(&handler1.updates_blob) + - WriteBatchInternal::Count(&handler1.updates_blob) - 1; - AppendSN(handler1.last_file, sn); + Status s = db_->Write(opts, &(blob_inserter.updates_blob())); + if (!s.ok()) { + return s; + } - CloseIf(handler1.last_file); + if (blob_inserter.has_put()) { + CloseIf(blob_inserter.last_file()); } // add deleted key to list of keys that have been deleted for book-keeping - class Handler2 : public WriteBatch::Handler { + class DeleteBookkeeper : public WriteBatch::Handler { public: - explicit Handler2(BlobDBImpl* i, const SequenceNumber& sn) - : impl(i), lsn(sn) {} + explicit DeleteBookkeeper(BlobDBImpl* impl, const SequenceNumber& seq) + : impl_(impl), sequence_(seq) {} + + virtual Status PutCF(uint32_t /*column_family_id*/, const Slice& /*key*/, + const Slice& /*value*/) override { + sequence_++; + return Status::OK(); + } virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) override { ColumnFamilyHandle* cfh = - impl->db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); + impl_->db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); - impl->delete_keys_q_.enqueue({cfh, key.ToString(), lsn}); + impl_->delete_keys_q_.enqueue({cfh, key.ToString(), sequence_}); + sequence_++; return Status::OK(); } private: - BlobDBImpl* impl; - SequenceNumber lsn; + BlobDBImpl* impl_; + SequenceNumber sequence_; }; // add deleted key to list of keys that have been deleted for book-keeping - Handler2 handler2(this, lsn); - updates->Iterate(&handler2); + DeleteBookkeeper delete_bookkeeper(this, sequence); + updates->Iterate(&delete_bookkeeper); return Status::OK(); } diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 3714661b0..c43554d42 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -513,19 +513,14 @@ TEST_F(BlobDBTest, SequenceNumber) { ASSERT_OK(batch.Put("key" + ToString(i) + "-" + ToString(k), value)); } ASSERT_OK(blobdb_->Write(WriteOptions(), &batch)); - sequence += batch_size; - ASSERT_EQ(sequence, blobdb_->GetLatestSequenceNumber()); for (size_t k = 0; k < batch_size; k++) { std::string key = "key" + ToString(i) + "-" + ToString(k); + sequence++; SequenceNumber actual_sequence; ASSERT_OK(blobdb_impl->TEST_GetSequenceNumber(key, &actual_sequence)); - // We only write sequence for the last key in a batch. - if (k + 1 < batch_size) { - ASSERT_EQ(0, actual_sequence); - } else { - ASSERT_EQ(sequence, actual_sequence); - } + ASSERT_EQ(sequence, actual_sequence); } + ASSERT_EQ(sequence, blobdb_->GetLatestSequenceNumber()); } }