From 91e2aa3ce21773f20161f05da5cffaf335c4cfc4 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 13 Jun 2017 12:37:59 -0700 Subject: [PATCH] write exact sequence number for each put in write batch Summary: At the beginning of write batch write, grab the latest sequence from base db and assume sequence number will increment by 1 for each put and delete, and write the exact sequence number with each put. This is assuming we are the only writer to increment sequence number (no external file ingestion, etc) and there should be no holes in the sequence number. Also having some minor naming changes. Closes https://github.com/facebook/rocksdb/pull/2402 Differential Revision: D5176134 Pulled By: yiwu-arbug fbshipit-source-id: cb4712ee44478d5a2e5951213a10b72f08fe8c88 --- utilities/blob_db/blob_db_impl.cc | 154 +++++++++++++++++------------- utilities/blob_db/blob_db_test.cc | 11 +-- 2 files changed, 89 insertions(+), 76 deletions(-) diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index a3122f5c5..fcb7a82a4 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -867,22 +867,33 @@ Status BlobDBImpl::SingleDelete(const WriteOptions& wopts, } Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { - class Handler1 : public WriteBatch::Handler { + class BlobInserter : public WriteBatch::Handler { + private: + BlobDBImpl* impl_; + SequenceNumber sequence_; + WriteBatch updates_blob_; + Status batch_rewrite_status_; + std::shared_ptr last_file_; + bool has_put_; + public: - explicit Handler1(BlobDBImpl* i) : impl(i), previous_put(false) {} + explicit BlobInserter(BlobDBImpl* impl, SequenceNumber seq) + : impl_(impl), sequence_(seq), has_put_(false) {} + + WriteBatch& updates_blob() { return updates_blob_; } + + Status batch_rewrite_status() { return batch_rewrite_status_; } + + std::shared_ptr& last_file() { return last_file_; } - BlobDBImpl* impl; - WriteBatch updates_blob; - Status batch_rewrite_status; - std::shared_ptr last_file; - bool previous_put; + bool has_put() { return has_put_; } virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value_unc) override { Slice newval; int32_t ttl_val = -1; - if (impl->bdb_options_.extract_ttl_fn) { - impl->bdb_options_.extract_ttl_fn(value_unc, &newval, &ttl_val); + if (impl_->bdb_options_.extract_ttl_fn) { + impl_->bdb_options_.extract_ttl_fn(value_unc, &newval, &ttl_val); } else { newval = value_unc; } @@ -894,22 +905,26 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { expiration = ttl_val + static_cast(cur_t); } std::shared_ptr bfile = - (ttl_val != -1) ? impl->SelectBlobFileTTL(expiration) - : ((last_file) ? last_file : impl->SelectBlobFile()); - if (last_file && last_file != bfile) { - batch_rewrite_status = Status::NotFound("too many blob files"); - return batch_rewrite_status; + (ttl_val != -1) + ? impl_->SelectBlobFileTTL(expiration) + : ((last_file_) ? last_file_ : impl_->SelectBlobFile()); + if (last_file_ && last_file_ != bfile) { + batch_rewrite_status_ = Status::NotFound("too many blob files"); + return batch_rewrite_status_; } if (!bfile) { - batch_rewrite_status = Status::NotFound("blob file not found"); - return batch_rewrite_status; + batch_rewrite_status_ = Status::NotFound("blob file not found"); + return batch_rewrite_status_; } + last_file_ = bfile; + has_put_ = true; + Slice value = value_unc; std::string compression_output; - if (impl->bdb_options_.compression != kNoCompression) { - CompressionType ct = impl->bdb_options_.compression; + if (impl_->bdb_options_.compression != kNoCompression) { + CompressionType ct = impl_->bdb_options_.compression; CompressionOptions compression_opts; value = CompressBlock(value_unc, compression_opts, &ct, kBlockBasedTableVersionFormat, Slice(), @@ -918,96 +933,99 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { std::string headerbuf; Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1); - - if (previous_put) { - impl->AppendSN(last_file, 0 /*sequence number*/); - previous_put = false; - } - - last_file = bfile; - std::string index_entry; - Status st = impl->AppendBlob(bfile, headerbuf, key, value, &index_entry); + Status st = impl_->AppendBlob(bfile, headerbuf, key, value, &index_entry); + if (st.ok()) { + impl_->AppendSN(last_file_, sequence_); + sequence_++; + } - if (expiration != -1) + if (expiration != -1) { extendTTL(&(bfile->ttl_range_), (uint32_t)expiration); + } if (!st.ok()) { - batch_rewrite_status = st; + batch_rewrite_status_ = st; } else { - previous_put = true; - WriteBatchInternal::Put(&updates_blob, column_family_id, key, + WriteBatchInternal::Put(&updates_blob_, column_family_id, key, index_entry); } return Status::OK(); } - virtual Status MergeCF(uint32_t column_family_id, const Slice& key, - const Slice& value) override { - batch_rewrite_status = - Status::NotSupported("Not supported operation in blob db."); - return batch_rewrite_status; - } - virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) override { - WriteBatchInternal::Delete(&updates_blob, column_family_id, key); + WriteBatchInternal::Delete(&updates_blob_, column_family_id, key); + sequence_++; return Status::OK(); } - virtual void LogData(const Slice& blob) override { - updates_blob.PutLogData(blob); + virtual Status SingleDeleteCF(uint32_t /*column_family_id*/, + const Slice& /*key*/) override { + batch_rewrite_status_ = + Status::NotSupported("Not supported operation in blob db."); + return batch_rewrite_status_; } - private: - }; + virtual Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/, + const Slice& /*value*/) override { + batch_rewrite_status_ = + Status::NotSupported("Not supported operation in blob db."); + return batch_rewrite_status_; + } - Handler1 handler1(this); - updates->Iterate(&handler1); + virtual void LogData(const Slice& blob) override { + updates_blob_.PutLogData(blob); + } + }; - Status s; - SequenceNumber lsn = db_impl_->GetLatestSequenceNumber(); + SequenceNumber sequence = db_impl_->GetLatestSequenceNumber() + 1; + BlobInserter blob_inserter(this, sequence); + updates->Iterate(&blob_inserter); - if (!handler1.batch_rewrite_status.ok()) { - return handler1.batch_rewrite_status; - } else { - s = db_->Write(opts, &(handler1.updates_blob)); + if (!blob_inserter.batch_rewrite_status().ok()) { + return blob_inserter.batch_rewrite_status(); } - if (!s.ok()) return s; - - if (handler1.previous_put) { - // this is the sequence number of the write. - SequenceNumber sn = WriteBatchInternal::Sequence(&handler1.updates_blob) + - WriteBatchInternal::Count(&handler1.updates_blob) - 1; - AppendSN(handler1.last_file, sn); + Status s = db_->Write(opts, &(blob_inserter.updates_blob())); + if (!s.ok()) { + return s; + } - CloseIf(handler1.last_file); + if (blob_inserter.has_put()) { + CloseIf(blob_inserter.last_file()); } // add deleted key to list of keys that have been deleted for book-keeping - class Handler2 : public WriteBatch::Handler { + class DeleteBookkeeper : public WriteBatch::Handler { public: - explicit Handler2(BlobDBImpl* i, const SequenceNumber& sn) - : impl(i), lsn(sn) {} + explicit DeleteBookkeeper(BlobDBImpl* impl, const SequenceNumber& seq) + : impl_(impl), sequence_(seq) {} + + virtual Status PutCF(uint32_t /*column_family_id*/, const Slice& /*key*/, + const Slice& /*value*/) override { + sequence_++; + return Status::OK(); + } virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) override { ColumnFamilyHandle* cfh = - impl->db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); + impl_->db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); - impl->delete_keys_q_.enqueue({cfh, key.ToString(), lsn}); + impl_->delete_keys_q_.enqueue({cfh, key.ToString(), sequence_}); + sequence_++; return Status::OK(); } private: - BlobDBImpl* impl; - SequenceNumber lsn; + BlobDBImpl* impl_; + SequenceNumber sequence_; }; // add deleted key to list of keys that have been deleted for book-keeping - Handler2 handler2(this, lsn); - updates->Iterate(&handler2); + DeleteBookkeeper delete_bookkeeper(this, sequence); + updates->Iterate(&delete_bookkeeper); return Status::OK(); } diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 3714661b0..c43554d42 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -513,19 +513,14 @@ TEST_F(BlobDBTest, SequenceNumber) { ASSERT_OK(batch.Put("key" + ToString(i) + "-" + ToString(k), value)); } ASSERT_OK(blobdb_->Write(WriteOptions(), &batch)); - sequence += batch_size; - ASSERT_EQ(sequence, blobdb_->GetLatestSequenceNumber()); for (size_t k = 0; k < batch_size; k++) { std::string key = "key" + ToString(i) + "-" + ToString(k); + sequence++; SequenceNumber actual_sequence; ASSERT_OK(blobdb_impl->TEST_GetSequenceNumber(key, &actual_sequence)); - // We only write sequence for the last key in a batch. - if (k + 1 < batch_size) { - ASSERT_EQ(0, actual_sequence); - } else { - ASSERT_EQ(sequence, actual_sequence); - } + ASSERT_EQ(sequence, actual_sequence); } + ASSERT_EQ(sequence, blobdb_->GetLatestSequenceNumber()); } }