write exact sequence number for each put in write batch

Summary:
At the beginning of write batch write, grab the latest sequence from base db and assume sequence number will increment by 1 for each put and delete, and write the exact sequence number with each put. This is assuming we are the only writer to increment sequence number (no external file ingestion, etc) and there should be no holes in the sequence number.

Also having some minor naming changes.
Closes https://github.com/facebook/rocksdb/pull/2402

Differential Revision: D5176134

Pulled By: yiwu-arbug

fbshipit-source-id: cb4712ee44478d5a2e5951213a10b72f08fe8c88
main
Yi Wu 8 years ago committed by Facebook Github Bot
parent 6f4154d693
commit 91e2aa3ce2
  1. 154
      utilities/blob_db/blob_db_impl.cc
  2. 11
      utilities/blob_db/blob_db_test.cc

@ -867,22 +867,33 @@ Status BlobDBImpl::SingleDelete(const WriteOptions& wopts,
}
Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
class Handler1 : public WriteBatch::Handler {
class BlobInserter : public WriteBatch::Handler {
private:
BlobDBImpl* impl_;
SequenceNumber sequence_;
WriteBatch updates_blob_;
Status batch_rewrite_status_;
std::shared_ptr<BlobFile> last_file_;
bool has_put_;
public:
explicit Handler1(BlobDBImpl* i) : impl(i), previous_put(false) {}
explicit BlobInserter(BlobDBImpl* impl, SequenceNumber seq)
: impl_(impl), sequence_(seq), has_put_(false) {}
WriteBatch& updates_blob() { return updates_blob_; }
Status batch_rewrite_status() { return batch_rewrite_status_; }
std::shared_ptr<BlobFile>& last_file() { return last_file_; }
BlobDBImpl* impl;
WriteBatch updates_blob;
Status batch_rewrite_status;
std::shared_ptr<BlobFile> last_file;
bool previous_put;
bool has_put() { return has_put_; }
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value_unc) override {
Slice newval;
int32_t ttl_val = -1;
if (impl->bdb_options_.extract_ttl_fn) {
impl->bdb_options_.extract_ttl_fn(value_unc, &newval, &ttl_val);
if (impl_->bdb_options_.extract_ttl_fn) {
impl_->bdb_options_.extract_ttl_fn(value_unc, &newval, &ttl_val);
} else {
newval = value_unc;
}
@ -894,22 +905,26 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
expiration = ttl_val + static_cast<int32_t>(cur_t);
}
std::shared_ptr<BlobFile> bfile =
(ttl_val != -1) ? impl->SelectBlobFileTTL(expiration)
: ((last_file) ? last_file : impl->SelectBlobFile());
if (last_file && last_file != bfile) {
batch_rewrite_status = Status::NotFound("too many blob files");
return batch_rewrite_status;
(ttl_val != -1)
? impl_->SelectBlobFileTTL(expiration)
: ((last_file_) ? last_file_ : impl_->SelectBlobFile());
if (last_file_ && last_file_ != bfile) {
batch_rewrite_status_ = Status::NotFound("too many blob files");
return batch_rewrite_status_;
}
if (!bfile) {
batch_rewrite_status = Status::NotFound("blob file not found");
return batch_rewrite_status;
batch_rewrite_status_ = Status::NotFound("blob file not found");
return batch_rewrite_status_;
}
last_file_ = bfile;
has_put_ = true;
Slice value = value_unc;
std::string compression_output;
if (impl->bdb_options_.compression != kNoCompression) {
CompressionType ct = impl->bdb_options_.compression;
if (impl_->bdb_options_.compression != kNoCompression) {
CompressionType ct = impl_->bdb_options_.compression;
CompressionOptions compression_opts;
value = CompressBlock(value_unc, compression_opts, &ct,
kBlockBasedTableVersionFormat, Slice(),
@ -918,96 +933,99 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
std::string headerbuf;
Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1);
if (previous_put) {
impl->AppendSN(last_file, 0 /*sequence number*/);
previous_put = false;
}
last_file = bfile;
std::string index_entry;
Status st = impl->AppendBlob(bfile, headerbuf, key, value, &index_entry);
Status st = impl_->AppendBlob(bfile, headerbuf, key, value, &index_entry);
if (st.ok()) {
impl_->AppendSN(last_file_, sequence_);
sequence_++;
}
if (expiration != -1)
if (expiration != -1) {
extendTTL(&(bfile->ttl_range_), (uint32_t)expiration);
}
if (!st.ok()) {
batch_rewrite_status = st;
batch_rewrite_status_ = st;
} else {
previous_put = true;
WriteBatchInternal::Put(&updates_blob, column_family_id, key,
WriteBatchInternal::Put(&updates_blob_, column_family_id, key,
index_entry);
}
return Status::OK();
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
batch_rewrite_status =
Status::NotSupported("Not supported operation in blob db.");
return batch_rewrite_status;
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
WriteBatchInternal::Delete(&updates_blob, column_family_id, key);
WriteBatchInternal::Delete(&updates_blob_, column_family_id, key);
sequence_++;
return Status::OK();
}
virtual void LogData(const Slice& blob) override {
updates_blob.PutLogData(blob);
virtual Status SingleDeleteCF(uint32_t /*column_family_id*/,
const Slice& /*key*/) override {
batch_rewrite_status_ =
Status::NotSupported("Not supported operation in blob db.");
return batch_rewrite_status_;
}
private:
};
virtual Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
const Slice& /*value*/) override {
batch_rewrite_status_ =
Status::NotSupported("Not supported operation in blob db.");
return batch_rewrite_status_;
}
Handler1 handler1(this);
updates->Iterate(&handler1);
virtual void LogData(const Slice& blob) override {
updates_blob_.PutLogData(blob);
}
};
Status s;
SequenceNumber lsn = db_impl_->GetLatestSequenceNumber();
SequenceNumber sequence = db_impl_->GetLatestSequenceNumber() + 1;
BlobInserter blob_inserter(this, sequence);
updates->Iterate(&blob_inserter);
if (!handler1.batch_rewrite_status.ok()) {
return handler1.batch_rewrite_status;
} else {
s = db_->Write(opts, &(handler1.updates_blob));
if (!blob_inserter.batch_rewrite_status().ok()) {
return blob_inserter.batch_rewrite_status();
}
if (!s.ok()) return s;
if (handler1.previous_put) {
// this is the sequence number of the write.
SequenceNumber sn = WriteBatchInternal::Sequence(&handler1.updates_blob) +
WriteBatchInternal::Count(&handler1.updates_blob) - 1;
AppendSN(handler1.last_file, sn);
Status s = db_->Write(opts, &(blob_inserter.updates_blob()));
if (!s.ok()) {
return s;
}
CloseIf(handler1.last_file);
if (blob_inserter.has_put()) {
CloseIf(blob_inserter.last_file());
}
// add deleted key to list of keys that have been deleted for book-keeping
class Handler2 : public WriteBatch::Handler {
class DeleteBookkeeper : public WriteBatch::Handler {
public:
explicit Handler2(BlobDBImpl* i, const SequenceNumber& sn)
: impl(i), lsn(sn) {}
explicit DeleteBookkeeper(BlobDBImpl* impl, const SequenceNumber& seq)
: impl_(impl), sequence_(seq) {}
virtual Status PutCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
const Slice& /*value*/) override {
sequence_++;
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
ColumnFamilyHandle* cfh =
impl->db_impl_->GetColumnFamilyHandleUnlocked(column_family_id);
impl_->db_impl_->GetColumnFamilyHandleUnlocked(column_family_id);
impl->delete_keys_q_.enqueue({cfh, key.ToString(), lsn});
impl_->delete_keys_q_.enqueue({cfh, key.ToString(), sequence_});
sequence_++;
return Status::OK();
}
private:
BlobDBImpl* impl;
SequenceNumber lsn;
BlobDBImpl* impl_;
SequenceNumber sequence_;
};
// add deleted key to list of keys that have been deleted for book-keeping
Handler2 handler2(this, lsn);
updates->Iterate(&handler2);
DeleteBookkeeper delete_bookkeeper(this, sequence);
updates->Iterate(&delete_bookkeeper);
return Status::OK();
}

@ -513,19 +513,14 @@ TEST_F(BlobDBTest, SequenceNumber) {
ASSERT_OK(batch.Put("key" + ToString(i) + "-" + ToString(k), value));
}
ASSERT_OK(blobdb_->Write(WriteOptions(), &batch));
sequence += batch_size;
ASSERT_EQ(sequence, blobdb_->GetLatestSequenceNumber());
for (size_t k = 0; k < batch_size; k++) {
std::string key = "key" + ToString(i) + "-" + ToString(k);
sequence++;
SequenceNumber actual_sequence;
ASSERT_OK(blobdb_impl->TEST_GetSequenceNumber(key, &actual_sequence));
// We only write sequence for the last key in a batch.
if (k + 1 < batch_size) {
ASSERT_EQ(0, actual_sequence);
} else {
ASSERT_EQ(sequence, actual_sequence);
}
ASSERT_EQ(sequence, actual_sequence);
}
ASSERT_EQ(sequence, blobdb_->GetLatestSequenceNumber());
}
}

Loading…
Cancel
Save