Blob DB: not writing sequence number as blob record footer

Summary:
Previously each time we write a blob we write blog_record_header + key + value + blob_record_footer to blob log. The footer only contains a sequence and a crc for the sequence number. The sequence number was used in garbage collection to verify the value is recent. After #2703 we moved to use optimistic transaction and no longer use sequence number from the footer. Remove the footer altogether.

There's another usage of sequence number and we are keeping it: Each blob log file keep track of sequence number range of keys in it, and use it to check if it is reference by a snapshot, before being deleted.
Closes https://github.com/facebook/rocksdb/pull/3005

Differential Revision: D6057585

Pulled By: yiwu-arbug

fbshipit-source-id: d6da53c457a316e9723f359a1b47facfc3ffe090
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 966b32b57c
commit 0552029b5c
  1. 295
      utilities/blob_db/blob_db_impl.cc
  2. 7
      utilities/blob_db/blob_db_impl.h
  3. 37
      utilities/blob_db/blob_db_test.cc
  4. 12
      utilities/blob_db/blob_dump_tool.cc
  5. 8
      utilities/blob_db/blob_file.cc
  6. 5
      utilities/blob_db/blob_file.h
  7. 26
      utilities/blob_db/blob_log_format.cc
  8. 10
      utilities/blob_db/blob_log_format.h
  9. 42
      utilities/blob_db/blob_log_reader.cc
  10. 8
      utilities/blob_db/blob_log_reader.h
  11. 33
      utilities/blob_db/blob_log_writer.cc
  12. 5
      utilities/blob_db/blob_log_writer.h

@ -47,12 +47,6 @@ void extendTimestamps(rocksdb::blob_db::tsrange_t* ts_range, uint64_t ts) {
ts_range->first = std::min(ts_range->first, ts); ts_range->first = std::min(ts_range->first, ts);
ts_range->second = std::max(ts_range->second, ts); ts_range->second = std::max(ts_range->second, ts);
} }
void extendSN(rocksdb::blob_db::snrange_t* sn_range,
rocksdb::SequenceNumber sn) {
sn_range->first = std::min(sn_range->first, sn);
sn_range->second = std::max(sn_range->second, sn);
}
} // end namespace } // end namespace
namespace rocksdb { namespace rocksdb {
@ -438,12 +432,10 @@ Status BlobDBImpl::OpenAllFiles() {
std::numeric_limits<uint32_t>::min()); std::numeric_limits<uint32_t>::min());
tsrange_t ts_range(std::numeric_limits<uint32_t>::max(), tsrange_t ts_range(std::numeric_limits<uint32_t>::max(),
std::numeric_limits<uint32_t>::min()); std::numeric_limits<uint32_t>::min());
snrange_t sn_range(std::numeric_limits<SequenceNumber>::max(),
std::numeric_limits<SequenceNumber>::min());
uint64_t blob_count = 0; uint64_t blob_count = 0;
BlobLogRecord record; BlobLogRecord record;
Reader::ReadLevel shallow = Reader::kReadHdrKeyFooter; Reader::ReadLevel shallow = Reader::kReadHeaderKey;
uint64_t record_start = reader->GetNextByte(); uint64_t record_start = reader->GetNextByte();
// TODO(arahut) - when we detect corruption, we should truncate // TODO(arahut) - when we detect corruption, we should truncate
@ -455,7 +447,6 @@ Status BlobDBImpl::OpenAllFiles() {
if (bfptr->HasTimestamp()) { if (bfptr->HasTimestamp()) {
extendTimestamps(&ts_range, record.GetTimeVal()); extendTimestamps(&ts_range, record.GetTimeVal());
} }
extendSN(&sn_range, record.GetSN());
record_start = reader->GetNextByte(); record_start = reader->GetNextByte();
} }
@ -473,16 +464,15 @@ Status BlobDBImpl::OpenAllFiles() {
} }
bfptr->SetBlobCount(blob_count); bfptr->SetBlobCount(blob_count);
bfptr->SetSNRange(sn_range); bfptr->SetSNRange({0, 0});
if (bfptr->HasTimestamp()) bfptr->set_time_range(ts_range); if (bfptr->HasTimestamp()) bfptr->set_time_range(ts_range);
ROCKS_LOG_INFO(db_options_.info_log, ROCKS_LOG_INFO(db_options_.info_log,
"Blob File: %s blob_count: %" PRIu64 "Blob File: %s blob_count: %" PRIu64
" size_bytes: %" PRIu64 " size_bytes: %" PRIu64 " ts: %d ttl: %d",
" sn_range: (%d, %d) ts: %d ttl: %d", bfpath.c_str(), blob_count, size_bytes,
bfpath.c_str(), blob_count, size_bytes, sn_range.first, bfptr->HasTimestamp(), bfptr->HasTTL());
sn_range.second, bfptr->HasTimestamp(), bfptr->HasTTL());
if (bfptr->HasTTL()) { if (bfptr->HasTTL()) {
ttl_range.second = ttl_range.second =
@ -566,11 +556,11 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
} }
Writer::ElemType et = Writer::kEtNone; Writer::ElemType et = Writer::kEtNone;
if (bfile->file_size_ == BlobLogHeader::kHeaderSize) if (bfile->file_size_ == BlobLogHeader::kHeaderSize) {
et = Writer::kEtFileHdr; et = Writer::kEtFileHdr;
else if (bfile->file_size_ > BlobLogHeader::kHeaderSize) } else if (bfile->file_size_ > BlobLogHeader::kHeaderSize) {
et = Writer::kEtFooter; et = Writer::kEtRecord;
else if (bfile->file_size_) { } else if (bfile->file_size_) {
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(db_options_.info_log,
"Open blob file: %s with wrong size: %d", fpath.c_str(), "Open blob file: %s with wrong size: %d", fpath.c_str(),
boffset); boffset);
@ -772,14 +762,13 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
BlobDBImpl* impl_; BlobDBImpl* impl_;
SequenceNumber sequence_; SequenceNumber sequence_;
WriteBatch updates_blob_; WriteBatch updates_blob_;
Status batch_rewrite_status_;
std::shared_ptr<BlobFile> last_file_; std::shared_ptr<BlobFile> last_file_;
bool has_put_; bool has_put_;
std::string new_value_; std::string new_value_;
uint32_t default_cf_id_; uint32_t default_cf_id_;
public: public:
explicit BlobInserter(BlobDBImpl* impl, SequenceNumber seq) BlobInserter(BlobDBImpl* impl, SequenceNumber seq)
: impl_(impl), : impl_(impl),
sequence_(seq), sequence_(seq),
has_put_(false), has_put_(false),
@ -788,9 +777,9 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
->cfd() ->cfd()
->GetID()) {} ->GetID()) {}
WriteBatch& updates_blob() { return updates_blob_; } SequenceNumber sequence() { return sequence_; }
Status batch_rewrite_status() { return batch_rewrite_status_; } WriteBatch* updates_blob() { return &updates_blob_; }
std::shared_ptr<BlobFile>& last_file() { return last_file_; } std::shared_ptr<BlobFile>& last_file() { return last_file_; }
@ -799,9 +788,8 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
virtual Status PutCF(uint32_t column_family_id, const Slice& key, virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value_slice) override { const Slice& value_slice) override {
if (column_family_id != default_cf_id_) { if (column_family_id != default_cf_id_) {
batch_rewrite_status_ = Status::NotSupported( return Status::NotSupported(
"Blob DB doesn't support non-default column family."); "Blob DB doesn't support non-default column family.");
return batch_rewrite_status_;
} }
Slice value_unc; Slice value_unc;
uint64_t expiration = uint64_t expiration =
@ -812,13 +800,11 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
? impl_->SelectBlobFileTTL(expiration) ? impl_->SelectBlobFileTTL(expiration)
: ((last_file_) ? last_file_ : impl_->SelectBlobFile()); : ((last_file_) ? last_file_ : impl_->SelectBlobFile());
if (last_file_ && last_file_ != bfile) { if (last_file_ && last_file_ != bfile) {
batch_rewrite_status_ = Status::NotFound("too many blob files"); return Status::NotFound("too many blob files");
return batch_rewrite_status_;
} }
if (!bfile) { if (!bfile) {
batch_rewrite_status_ = Status::NotFound("blob file not found"); return Status::NotFound("blob file not found");
return batch_rewrite_status_;
} }
last_file_ = bfile; last_file_ = bfile;
@ -830,31 +816,26 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
std::string headerbuf; std::string headerbuf;
Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1); Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1);
std::string index_entry; std::string index_entry;
Status st = impl_->AppendBlob(bfile, headerbuf, key, value, &index_entry); Status s = impl_->AppendBlob(bfile, headerbuf, key, value, &index_entry);
if (st.ok()) { if (!s.ok()) {
impl_->AppendSN(last_file_, sequence_); return s;
sequence_++;
} }
bfile->ExtendSequenceRange(sequence_);
sequence_++;
if (expiration != kNoExpiration) { if (expiration != kNoExpiration) {
extendTTL(&(bfile->ttl_range_), expiration); extendTTL(&(bfile->ttl_range_), expiration);
} }
if (!st.ok()) { return WriteBatchInternal::Put(&updates_blob_, column_family_id, key,
batch_rewrite_status_ = st; index_entry);
} else {
WriteBatchInternal::Put(&updates_blob_, column_family_id, key,
index_entry);
}
return Status::OK();
} }
virtual Status DeleteCF(uint32_t column_family_id, virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override { const Slice& key) override {
if (column_family_id != default_cf_id_) { if (column_family_id != default_cf_id_) {
batch_rewrite_status_ = Status::NotSupported( return Status::NotSupported(
"Blob DB doesn't support non-default column family."); "Blob DB doesn't support non-default column family.");
return batch_rewrite_status_;
} }
WriteBatchInternal::Delete(&updates_blob_, column_family_id, key); WriteBatchInternal::Delete(&updates_blob_, column_family_id, key);
sequence_++; sequence_++;
@ -864,27 +845,23 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
virtual Status DeleteRange(uint32_t column_family_id, virtual Status DeleteRange(uint32_t column_family_id,
const Slice& begin_key, const Slice& end_key) { const Slice& begin_key, const Slice& end_key) {
if (column_family_id != default_cf_id_) { if (column_family_id != default_cf_id_) {
batch_rewrite_status_ = Status::NotSupported( return Status::NotSupported(
"Blob DB doesn't support non-default column family."); "Blob DB doesn't support non-default column family.");
return batch_rewrite_status_;
} }
WriteBatchInternal::DeleteRange(&updates_blob_, column_family_id, WriteBatchInternal::DeleteRange(&updates_blob_, column_family_id,
begin_key, end_key); begin_key, end_key);
sequence_++;
return Status::OK(); return Status::OK();
} }
virtual Status SingleDeleteCF(uint32_t /*column_family_id*/, virtual Status SingleDeleteCF(uint32_t /*column_family_id*/,
const Slice& /*key*/) override { const Slice& /*key*/) override {
batch_rewrite_status_ = return Status::NotSupported("Not supported operation in blob db.");
Status::NotSupported("Not supported operation in blob db.");
return batch_rewrite_status_;
} }
virtual Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/, virtual Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
const Slice& /*value*/) override { const Slice& /*value*/) override {
batch_rewrite_status_ = return Status::NotSupported("Not supported operation in blob db.");
Status::NotSupported("Not supported operation in blob db.");
return batch_rewrite_status_;
} }
virtual void LogData(const Slice& blob) override { virtual void LogData(const Slice& blob) override {
@ -894,19 +871,20 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
MutexLock l(&write_mutex_); MutexLock l(&write_mutex_);
SequenceNumber sequence = db_impl_->GetLatestSequenceNumber() + 1; SequenceNumber current_seq = db_impl_->GetLatestSequenceNumber() + 1;
BlobInserter blob_inserter(this, sequence); BlobInserter blob_inserter(this, current_seq);
updates->Iterate(&blob_inserter); Status s = updates->Iterate(&blob_inserter);
if (!s.ok()) {
if (!blob_inserter.batch_rewrite_status().ok()) { return s;
return blob_inserter.batch_rewrite_status();
} }
s = db_->Write(opts, blob_inserter.updates_blob());
Status s = db_->Write(opts, &(blob_inserter.updates_blob()));
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
assert(current_seq ==
WriteBatchInternal::Sequence(blob_inserter.updates_blob()));
assert(blob_inserter.sequence() ==
current_seq + WriteBatchInternal::Count(blob_inserter.updates_blob()));
if (blob_inserter.has_put()) { if (blob_inserter.has_put()) {
s = CloseBlobFileIfNeeded(blob_inserter.last_file()); s = CloseBlobFileIfNeeded(blob_inserter.last_file());
if (!s.ok()) { if (!s.ok()) {
@ -942,7 +920,7 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
}; };
// add deleted key to list of keys that have been deleted for book-keeping // add deleted key to list of keys that have been deleted for book-keeping
DeleteBookkeeper delete_bookkeeper(this, sequence); DeleteBookkeeper delete_bookkeeper(this, current_seq);
updates->Iterate(&delete_bookkeeper); updates->Iterate(&delete_bookkeeper);
return Status::OK(); return Status::OK();
@ -1051,20 +1029,7 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
// this is the sequence number of the write. // this is the sequence number of the write.
SequenceNumber sn = WriteBatchInternal::Sequence(&batch); SequenceNumber sn = WriteBatchInternal::Sequence(&batch);
bfile->ExtendSequenceRange(sn);
if (debug_level_ >= 3)
ROCKS_LOG_INFO(db_options_.info_log, "<Adding KEY FILE: %s: KEY: %s SN: %d",
bfile->PathName().c_str(), key.ToString().c_str(), sn);
s = AppendSN(bfile, sn);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to append SN to FILE: %s: KEY: %s VALSZ: %d"
" status: '%s' blob_file: '%s'",
bfile->PathName().c_str(), key.ToString().c_str(),
value.size(), s.ToString().c_str(),
bfile->DumpState().c_str());
}
if (expiration != kNoExpiration) { if (expiration != kNoExpiration) {
extendTTL(&(bfile->ttl_range_), expiration); extendTTL(&(bfile->ttl_range_), expiration);
@ -1140,32 +1105,6 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
return s; return s;
} }
Status BlobDBImpl::AppendSN(const std::shared_ptr<BlobFile>& bfile,
const SequenceNumber& sn) {
Status s;
{
WriteLock lockbfile_w(&bfile->mutex_);
std::shared_ptr<Writer> writer = CheckOrCreateWriterLocked(bfile);
if (!writer) return Status::IOError("Failed to create blob writer");
s = writer->AddRecordFooter(sn);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Invalid status in AppendSN: %s status: '%s'",
bfile->PathName().c_str(), s.ToString().c_str());
return s;
}
if (sn != std::numeric_limits<SequenceNumber>::max())
extendSN(&(bfile->sn_range_), sn);
}
bfile->file_size_ += BlobLogRecord::kFooterSize;
last_period_write_ += BlobLogRecord::kFooterSize;
total_blob_space_ += BlobLogRecord::kFooterSize;
return s;
}
std::vector<Status> BlobDBImpl::MultiGet( std::vector<Status> BlobDBImpl::MultiGet(
const ReadOptions& read_options, const ReadOptions& read_options,
const std::vector<Slice>& keys, std::vector<std::string>* values) { const std::vector<Slice>& keys, std::vector<std::string>* values) {
@ -1205,7 +1144,8 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
} }
Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry, Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry,
std::string* value, SequenceNumber* sequence) { std::string* value) {
assert(value != nullptr);
Slice index_entry_slice(index_entry); Slice index_entry_slice(index_entry);
BlobHandle handle; BlobHandle handle;
Status s = handle.DecodeFrom(&index_entry_slice); Status s = handle.DecodeFrom(&index_entry_slice);
@ -1249,90 +1189,69 @@ Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry,
std::shared_ptr<RandomAccessFileReader> reader = std::shared_ptr<RandomAccessFileReader> reader =
GetOrOpenRandomAccessReader(bfile, env_, env_options_); GetOrOpenRandomAccessReader(bfile, env_, env_options_);
if (value != nullptr) { std::string* valueptr = value;
std::string* valueptr = value; std::string value_c;
std::string value_c; if (bdb_options_.compression != kNoCompression) {
if (bdb_options_.compression != kNoCompression) { valueptr = &value_c;
valueptr = &value_c; }
}
// allocate the buffer. This is safe in C++11 // allocate the buffer. This is safe in C++11
valueptr->resize(handle.size()); valueptr->resize(handle.size());
char* buffer = &(*valueptr)[0]; char* buffer = &(*valueptr)[0];
Slice blob_value;
s = reader->Read(handle.offset(), handle.size(), &blob_value, buffer);
if (!s.ok() || blob_value.size() != handle.size()) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(
db_options_.info_log,
"Failed to read blob from file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " read: %d key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(), handle.size(),
static_cast<int>(blob_value.size()), key.data(),
s.ToString().c_str());
}
return Status::NotFound("Blob Not Found as couldnt retrieve Blob");
}
Slice crc_slice; Slice blob_value;
uint32_t crc_exp; s = reader->Read(handle.offset(), handle.size(), &blob_value, buffer);
std::string crc_str; if (!s.ok() || blob_value.size() != handle.size()) {
crc_str.resize(sizeof(uint32_t)); if (debug_level_ >= 2) {
char* crc_buffer = &(crc_str[0]); ROCKS_LOG_ERROR(db_options_.info_log,
s = reader->Read(handle.offset() - (key.size() + sizeof(uint32_t)), "Failed to read blob from file: %s blob_offset: %" PRIu64
sizeof(uint32_t), &crc_slice, crc_buffer); " blob_size: %" PRIu64 " read: %d key: %s status: '%s'",
if (!s.ok() || !GetFixed32(&crc_slice, &crc_exp)) { bfile->PathName().c_str(), handle.offset(), handle.size(),
if (debug_level_ >= 2) { static_cast<int>(blob_value.size()), key.data(),
ROCKS_LOG_ERROR( s.ToString().c_str());
db_options_.info_log,
"Failed to fetch blob crc file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(), handle.size(),
key.data(), s.ToString().c_str());
}
return Status::NotFound("Blob Not Found as couldnt retrieve CRC");
} }
return Status::NotFound("Blob Not Found as couldnt retrieve Blob");
}
uint32_t crc = crc32c::Extend(0, blob_value.data(), blob_value.size()); Slice crc_slice;
crc = crc32c::Mask(crc); // Adjust for storage uint32_t crc_exp;
if (crc != crc_exp) { std::string crc_str;
if (debug_level_ >= 2) { crc_str.resize(sizeof(uint32_t));
ROCKS_LOG_ERROR(db_options_.info_log, char* crc_buffer = &(crc_str[0]);
"Blob crc mismatch file: %s blob_offset: %" PRIu64 s = reader->Read(handle.offset() - (key.size() + sizeof(uint32_t)),
" blob_size: %" PRIu64 " key: %s status: '%s'", sizeof(uint32_t), &crc_slice, crc_buffer);
bfile->PathName().c_str(), handle.offset(), if (!s.ok() || !GetFixed32(&crc_slice, &crc_exp)) {
handle.size(), key.data(), s.ToString().c_str()); if (debug_level_ >= 2) {
} ROCKS_LOG_ERROR(db_options_.info_log,
return Status::Corruption("Corruption. Blob CRC mismatch"); "Failed to fetch blob crc file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(), handle.size(),
key.data(), s.ToString().c_str());
} }
return Status::NotFound("Blob Not Found as couldnt retrieve CRC");
}
if (bdb_options_.compression != kNoCompression) { uint32_t crc = crc32c::Extend(0, blob_value.data(), blob_value.size());
BlockContents contents; crc = crc32c::Mask(crc); // Adjust for storage
auto cfh = if (crc != crc_exp) {
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily()); if (debug_level_ >= 2) {
s = UncompressBlockContentsForCompressionType( ROCKS_LOG_ERROR(db_options_.info_log,
blob_value.data(), blob_value.size(), &contents, "Blob crc mismatch file: %s blob_offset: %" PRIu64
kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression, " blob_size: %" PRIu64 " key: %s status: '%s'",
*(cfh->cfd()->ioptions())); bfile->PathName().c_str(), handle.offset(), handle.size(),
*value = contents.data.ToString(); key.data(), s.ToString().c_str());
} }
return Status::Corruption("Corruption. Blob CRC mismatch");
} }
if (sequence != nullptr) { if (bdb_options_.compression != kNoCompression) {
char buffer[BlobLogRecord::kFooterSize]; BlockContents contents;
Slice footer_slice; auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
s = reader->Read(handle.offset() + handle.size(), s = UncompressBlockContentsForCompressionType(
BlobLogRecord::kFooterSize, &footer_slice, buffer); blob_value.data(), blob_value.size(), &contents,
if (!s.ok()) { kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression,
return s; *(cfh->cfd()->ioptions()));
} *value = contents.data.ToString();
BlobLogRecord record;
s = record.DecodeFooterFrom(footer_slice);
if (!s.ok()) {
return s;
}
*sequence = record.GetSN();
} }
return s; return s;
@ -1488,8 +1407,7 @@ bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
WriteLock lockbfile_w(&bfile->mutex_); WriteLock lockbfile_w(&bfile->mutex_);
bfile->deleted_count_++; bfile->deleted_count_++;
bfile->deleted_size_ += key_size + blob_size + BlobLogRecord::kHeaderSize + bfile->deleted_size_ += key_size + blob_size + BlobLogRecord::kHeaderSize;
BlobLogRecord::kFooterSize;
return true; return true;
} }
@ -1742,7 +1660,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
bool has_ttl = header.HasTTL(); bool has_ttl = header.HasTTL();
// this reads the key but skips the blob // this reads the key but skips the blob
Reader::ReadLevel shallow = Reader::kReadHdrKeyFooter; Reader::ReadLevel shallow = Reader::kReadHeaderKey;
assert(opt_db_); assert(opt_db_);
@ -1759,7 +1677,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
bool no_relocation = no_relocation_ttl || no_relocation_lsmdel; bool no_relocation = no_relocation_ttl || no_relocation_lsmdel;
if (!no_relocation) { if (!no_relocation) {
// read the blob because you have to write it back to new file // read the blob because you have to write it back to new file
shallow = Reader::kReadHdrKeyBlobFooter; shallow = Reader::kReadHeaderKeyBlob;
} }
BlobLogRecord record; BlobLogRecord record;
@ -1906,10 +1824,9 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
new_handle.set_compression(bdb_options_.compression); new_handle.set_compression(bdb_options_.compression);
new_handle.EncodeTo(&new_index_entry); new_handle.EncodeTo(&new_index_entry);
new_writer->AddRecordFooter(record.GetSN());
newfile->blob_count_++; newfile->blob_count_++;
newfile->file_size_ += BlobLogRecord::kHeaderSize + record.Key().size() + newfile->file_size_ +=
record.Blob().size() + BlobLogRecord::kFooterSize; BlobLogRecord::kHeaderSize + record.Key().size() + record.Blob().size();
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate");
transaction->Put(cfh, record.Key(), new_index_entry); transaction->Put(cfh, record.Key(), new_index_entry);
@ -2105,7 +2022,7 @@ bool BlobDBImpl::CallbackEvictsImpl(std::shared_ptr<BlobFile> bfile) {
ColumnFamilyHandle* cfh = bfile->GetColumnFamily(db_); ColumnFamilyHandle* cfh = bfile->GetColumnFamily(db_);
BlobLogRecord record; BlobLogRecord record;
Reader::ReadLevel full = Reader::kReadHdrKeyBlobFooter; Reader::ReadLevel full = Reader::kReadHeaderKeyBlob;
while (reader->ReadRecord(&record, full).ok()) { while (reader->ReadRecord(&record, full).ok()) {
bdb_options_.gc_evict_cb_fn(cfh, record.Key(), record.Blob()); bdb_options_.gc_evict_cb_fn(cfh, record.Key(), record.Blob());
} }
@ -2320,16 +2237,6 @@ Status DestroyBlobDB(const std::string& dbname, const Options& options,
} }
#ifndef NDEBUG #ifndef NDEBUG
Status BlobDBImpl::TEST_GetSequenceNumber(const Slice& key,
SequenceNumber* sequence) {
std::string index_entry;
Status s = db_->Get(ReadOptions(), key, &index_entry);
if (!s.ok()) {
return s;
}
return CommonGet(key, index_entry, nullptr, sequence);
}
std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const { std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
ReadLock l(&mutex_); ReadLock l(&mutex_);
std::vector<std::shared_ptr<BlobFile>> blob_files; std::vector<std::shared_ptr<BlobFile>> blob_files;

@ -252,8 +252,6 @@ class BlobDBImpl : public BlobDB {
~BlobDBImpl(); ~BlobDBImpl();
#ifndef NDEBUG #ifndef NDEBUG
Status TEST_GetSequenceNumber(const Slice& key, SequenceNumber* sequence);
std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const; std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const; std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
@ -278,7 +276,7 @@ class BlobDBImpl : public BlobDB {
bool SetSnapshotIfNeeded(ReadOptions* read_options); bool SetSnapshotIfNeeded(ReadOptions* read_options);
Status CommonGet(const Slice& key, const std::string& index_entry, Status CommonGet(const Slice& key, const std::string& index_entry,
std::string* value, SequenceNumber* sequence = nullptr); std::string* value);
Slice GetCompressedSlice(const Slice& raw, Slice GetCompressedSlice(const Slice& raw,
std::string* compression_output) const; std::string* compression_output) const;
@ -310,9 +308,6 @@ class BlobDBImpl : public BlobDB {
const std::string& headerbuf, const Slice& key, const std::string& headerbuf, const Slice& key,
const Slice& value, std::string* index_entry); const Slice& value, std::string* index_entry);
Status AppendSN(const std::shared_ptr<BlobFile>& bfile,
const SequenceNumber& sn);
// find an existing blob log file based on the expiration unix epoch // find an existing blob log file based on the expiration unix epoch
// if such a file does not exist, return nullptr // if such a file does not exist, return nullptr
std::shared_ptr<BlobFile> SelectBlobFileTTL(uint64_t expiration); std::shared_ptr<BlobFile> SelectBlobFileTTL(uint64_t expiration);

@ -541,43 +541,6 @@ TEST_F(BlobDBTest, MultipleWriters) {
VerifyDB(data); VerifyDB(data);
} }
// Test sequence number store in blob file is correct.
TEST_F(BlobDBTest, SequenceNumber) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
SequenceNumber sequence = blob_db_->GetLatestSequenceNumber();
BlobDBImpl *blob_db_impl =
static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
for (int i = 0; i < 100; i++) {
std::string key = "key" + ToString(i);
PutRandom(key, &rnd);
sequence += 1;
ASSERT_EQ(sequence, blob_db_->GetLatestSequenceNumber());
SequenceNumber actual_sequence = 0;
ASSERT_OK(blob_db_impl->TEST_GetSequenceNumber(key, &actual_sequence));
ASSERT_EQ(sequence, actual_sequence);
}
for (int i = 0; i < 100; i++) {
WriteBatch batch;
size_t batch_size = rnd.Next() % 10 + 1;
for (size_t k = 0; k < batch_size; k++) {
std::string value = test::RandomHumanReadableString(&rnd, 1000);
ASSERT_OK(batch.Put("key" + ToString(i) + "-" + ToString(k), value));
}
ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
for (size_t k = 0; k < batch_size; k++) {
std::string key = "key" + ToString(i) + "-" + ToString(k);
sequence++;
SequenceNumber actual_sequence;
ASSERT_OK(blob_db_impl->TEST_GetSequenceNumber(key, &actual_sequence));
ASSERT_EQ(sequence, actual_sequence);
}
ASSERT_EQ(sequence, blob_db_->GetLatestSequenceNumber());
}
}
TEST_F(BlobDBTest, GCAfterOverwriteKeys) { TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
Random rnd(301); Random rnd(301);
BlobDBOptions bdb_options; BlobDBOptions bdb_options;

@ -185,7 +185,7 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
uint32_t header_crc = uint32_t header_crc =
crc32c::Extend(0, slice.data(), slice.size() - 2 * sizeof(uint32_t)); crc32c::Extend(0, slice.data(), slice.size() - 2 * sizeof(uint32_t));
*offset += BlobLogRecord::kHeaderSize; *offset += BlobLogRecord::kHeaderSize;
s = Read(*offset, key_size + blob_size + BlobLogRecord::kFooterSize, &slice); s = Read(*offset, key_size + blob_size, &slice);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -207,15 +207,7 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
DumpSlice(Slice(slice.data() + key_size, blob_size), show_blob); DumpSlice(Slice(slice.data() + key_size, blob_size), show_blob);
} }
} }
Slice footer_slice(slice.data() + record.GetKeySize() + record.GetBlobSize(), *offset += key_size + blob_size;
BlobLogRecord::kFooterSize);
s = record.DecodeFooterFrom(footer_slice);
if (!s.ok()) {
return s;
}
fprintf(stdout, " footer CRC : %" PRIu32 "\n", record.footer_checksum());
fprintf(stdout, " sequence : %" PRIu64 "\n", record.GetSN());
*offset += key_size + blob_size + BlobLogRecord::kFooterSize;
return s; return s;
} }

@ -5,8 +5,14 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_file.h" #include "utilities/blob_db/blob_file.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <stdio.h> #include <stdio.h>
#include <cinttypes>
#include <algorithm>
#include <memory> #include <memory>
#include "util/filename.h" #include "util/filename.h"

@ -147,6 +147,11 @@ class BlobFile {
return header_.HasTimestamp(); return header_.HasTimestamp();
} }
void ExtendSequenceRange(SequenceNumber sequence) {
sn_range_.first = std::min(sn_range_.first, sequence);
sn_range_.second = std::max(sn_range_.second, sequence);
}
std::shared_ptr<Writer> GetWriter() const { return log_writer_; } std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
void Fsync(); void Fsync();

@ -224,7 +224,6 @@ BlobLogRecord::BlobLogRecord()
blob_size_(0), blob_size_(0),
time_val_(0), time_val_(0),
ttl_val_(0), ttl_val_(0),
sn_(0),
type_(0), type_(0),
subtype_(0) {} subtype_(0) {}
@ -249,7 +248,6 @@ void BlobLogRecord::Clear() {
blob_size_ = 0; blob_size_ = 0;
time_val_ = 0; time_val_ = 0;
ttl_val_ = 0; ttl_val_ = 0;
sn_ = 0;
type_ = subtype_ = 0; type_ = subtype_ = 0;
key_.clear(); key_.clear();
blob_.clear(); blob_.clear();
@ -289,30 +287,6 @@ Status BlobLogRecord::DecodeHeaderFrom(const Slice& hdrslice) {
return Status::OK(); return Status::OK();
} }
Status BlobLogRecord::DecodeFooterFrom(const Slice& footerslice) {
Slice input = footerslice;
if (input.size() < kFooterSize) {
return Status::Corruption("Invalid Blob Record Footer: size");
}
uint32_t f_crc = crc32c::Extend(0, input.data(), 8);
f_crc = crc32c::Mask(f_crc);
if (!GetFixed64(&input, &sn_)) {
return Status::Corruption("Invalid Blob Record Footer: sn");
}
if (!GetFixed32(&input, &footer_cksum_)) {
return Status::Corruption("Invalid Blob Record Footer: cksum");
}
if (f_crc != footer_cksum_) {
return Status::Corruption("Record Checksum mismatch: footer_cksum");
}
return Status::OK();
}
} // namespace blob_db } // namespace blob_db
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -188,8 +188,6 @@ class BlobLogRecord {
uint64_t blob_size_; uint64_t blob_size_;
uint64_t time_val_; uint64_t time_val_;
uint64_t ttl_val_; uint64_t ttl_val_;
SequenceNumber sn_;
uint32_t footer_cksum_;
char type_; char type_;
char subtype_; char subtype_;
Slice key_; Slice key_;
@ -218,8 +216,6 @@ class BlobLogRecord {
// = 42 // = 42
static const size_t kHeaderSize = 4 + 4 + 8 + 8 + 4 + 8 + 1 + 1; static const size_t kHeaderSize = 4 + 4 + 8 + 8 + 4 + 8 + 1 + 1;
static const size_t kFooterSize = 8 + 4;
public: public:
BlobLogRecord(); BlobLogRecord();
@ -245,17 +241,11 @@ class BlobLogRecord {
char subtype() const { return subtype_; } char subtype() const { return subtype_; }
SequenceNumber GetSN() const { return sn_; }
uint32_t header_checksum() const { return header_cksum_; } uint32_t header_checksum() const { return header_cksum_; }
uint32_t checksum() const { return checksum_; } uint32_t checksum() const { return checksum_; }
uint32_t footer_checksum() const { return footer_cksum_; }
Status DecodeHeaderFrom(const Slice& hdrslice); Status DecodeHeaderFrom(const Slice& hdrslice);
Status DecodeFooterFrom(const Slice& footerslice);
}; };
} // namespace blob_db } // namespace blob_db

@ -69,21 +69,11 @@ Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
*blob_offset = next_byte_ + record->GetKeySize(); *blob_offset = next_byte_ + record->GetKeySize();
} }
switch (level) { switch (level) {
case kReadHdrFooter: case kReadHeader:
file_->Skip(kb_size); file_->Skip(kb_size);
next_byte_ += kb_size; next_byte_ += kb_size;
status =
file_->Read(BlobLogRecord::kFooterSize, &buffer_, GetReadBuffer());
next_byte_ += buffer_.size();
if (!status.ok()) return status;
if (buffer_.size() != BlobLogRecord::kFooterSize) {
return Status::IOError("EOF reached before record footer");
}
status = record->DecodeFooterFrom(buffer_);
return status;
case kReadHdrKeyFooter: case kReadHeaderKey:
record->ResizeKeyBuffer(record->GetKeySize()); record->ResizeKeyBuffer(record->GetKeySize());
status = file_->Read(record->GetKeySize(), &record->key_, status = file_->Read(record->GetKeySize(), &record->key_,
record->GetKeyBuffer()); record->GetKeyBuffer());
@ -103,18 +93,7 @@ Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
file_->Skip(record->GetBlobSize()); file_->Skip(record->GetBlobSize());
next_byte_ += record->GetBlobSize(); next_byte_ += record->GetBlobSize();
status = case kReadHeaderKeyBlob:
file_->Read(BlobLogRecord::kFooterSize, &buffer_, GetReadBuffer());
next_byte_ += buffer_.size();
if (!status.ok()) return status;
if (buffer_.size() != BlobLogRecord::kFooterSize) {
return Status::IOError("EOF reached during footer read");
}
status = record->DecodeFooterFrom(buffer_);
return status;
case kReadHdrKeyBlobFooter:
record->ResizeKeyBuffer(record->GetKeySize()); record->ResizeKeyBuffer(record->GetKeySize());
status = file_->Read(record->GetKeySize(), &record->key_, status = file_->Read(record->GetKeySize(), &record->key_,
record->GetKeyBuffer()); record->GetKeyBuffer());
@ -146,21 +125,8 @@ Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
if (blob_crc != record->checksum_) { if (blob_crc != record->checksum_) {
return Status::Corruption("Blob Checksum mismatch"); return Status::Corruption("Blob Checksum mismatch");
} }
status =
file_->Read(BlobLogRecord::kFooterSize, &buffer_, GetReadBuffer());
next_byte_ += buffer_.size();
if (!status.ok()) return status;
if (buffer_.size() != BlobLogRecord::kFooterSize) {
return Status::IOError("EOF reached during blob footer read");
}
status = record->DecodeFooterFrom(buffer_);
return status;
default:
assert(0);
return status;
} }
return status;
} }
} // namespace blob_db } // namespace blob_db

@ -32,9 +32,9 @@ namespace blob_db {
class Reader { class Reader {
public: public:
enum ReadLevel { enum ReadLevel {
kReadHdrFooter, kReadHeader,
kReadHdrKeyFooter, kReadHeaderKey,
kReadHdrKeyBlobFooter, kReadHeaderKeyBlob,
}; };
// Create a reader that will return log records from "*file". // Create a reader that will return log records from "*file".
@ -61,7 +61,7 @@ class Reader {
// will only be valid until the next mutating operation on this // will only be valid until the next mutating operation on this
// reader or the next mutation to *scratch. // reader or the next mutation to *scratch.
// If blob_offset is non-null, return offset of the blob through it. // If blob_offset is non-null, return offset of the blob through it.
Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHdrFooter, Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHeader,
uint64_t* blob_offset = nullptr); uint64_t* blob_offset = nullptr);
SequentialFileReader* file() { return file_.get(); } SequentialFileReader* file() { return file_.get(); }

@ -2,7 +2,6 @@
// This source code is licensed under both the GPLv2 (found in the // This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License // COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
//
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_log_writer.h" #include "utilities/blob_db/blob_log_writer.h"
@ -53,7 +52,7 @@ Status Writer::WriteHeader(const BlobLogHeader& header) {
Status Writer::AppendFooter(const BlobLogFooter& footer) { Status Writer::AppendFooter(const BlobLogFooter& footer) {
assert(block_offset_ != 0); assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtFooter); assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
std::string str; std::string str;
footer.EncodeTo(&str); footer.EncodeTo(&str);
@ -73,7 +72,7 @@ Status Writer::AddRecord(const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset, uint64_t* key_offset, uint64_t* blob_offset,
uint64_t ttl) { uint64_t ttl) {
assert(block_offset_ != 0); assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtFooter); assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
std::string buf; std::string buf;
ConstructBlobHeader(&buf, key, val, ttl, -1); ConstructBlobHeader(&buf, key, val, ttl, -1);
@ -85,7 +84,7 @@ Status Writer::AddRecord(const Slice& key, const Slice& val,
Status Writer::AddRecord(const Slice& key, const Slice& val, Status Writer::AddRecord(const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset) { uint64_t* key_offset, uint64_t* blob_offset) {
assert(block_offset_ != 0); assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtFooter); assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
std::string buf; std::string buf;
ConstructBlobHeader(&buf, key, val, -1, -1); ConstructBlobHeader(&buf, key, val, -1, -1);
@ -134,7 +133,12 @@ Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
Status s = dest_->Append(Slice(headerbuf)); Status s = dest_->Append(Slice(headerbuf));
if (s.ok()) { if (s.ok()) {
s = dest_->Append(key); s = dest_->Append(key);
if (s.ok()) s = dest_->Append(val); }
if (s.ok()) {
s = dest_->Append(val);
}
if (s.ok()) {
s = dest_->Flush();
} }
*key_offset = block_offset_ + BlobLogRecord::kHeaderSize; *key_offset = block_offset_ + BlobLogRecord::kHeaderSize;
@ -144,25 +148,6 @@ Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
return s; return s;
} }
Status Writer::AddRecordFooter(const SequenceNumber& seq) {
assert(last_elem_type_ == kEtRecord);
std::string buf;
PutFixed64(&buf, seq);
uint32_t footer_crc = crc32c::Extend(0, buf.c_str(), buf.size());
footer_crc = crc32c::Mask(footer_crc);
PutFixed32(&buf, footer_crc);
Status s = dest_->Append(Slice(buf));
block_offset_ += BlobLogRecord::kFooterSize;
if (s.ok()) dest_->Flush();
last_elem_type_ = kEtFooter;
return s;
}
} // namespace blob_db } // namespace blob_db
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -2,7 +2,6 @@
// This source code is licensed under both the GPLv2 (found in the // This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License // COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
//
#pragma once #pragma once
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -53,8 +52,6 @@ class Writer {
const Slice& val, uint64_t* key_offset, const Slice& val, uint64_t* key_offset,
uint64_t* blob_offset); uint64_t* blob_offset);
Status AddRecordFooter(const SequenceNumber& sn);
Status AppendFooter(const BlobLogFooter& footer); Status AppendFooter(const BlobLogFooter& footer);
Status WriteHeader(const BlobLogHeader& header); Status WriteHeader(const BlobLogHeader& header);
@ -89,7 +86,7 @@ class Writer {
Writer& operator=(const Writer&) = delete; Writer& operator=(const Writer&) = delete;
public: public:
enum ElemType { kEtNone, kEtFileHdr, kEtRecord, kEtFooter, kEtFileFooter }; enum ElemType { kEtNone, kEtFileHdr, kEtRecord, kEtFileFooter };
ElemType last_elem_type_; ElemType last_elem_type_;
}; };

Loading…
Cancel
Save