From 3ebb7ba7b989df8d000d4f4caebe7b607ace4a93 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Fri, 27 Oct 2017 13:14:34 -0700 Subject: [PATCH] Blob DB: update blob file format Summary: Changing blob file format and some code cleanup around the change. The change with blob log format are: * Remove timestamp field in blob file header, blob file footer and blob records. The field is not being use and often confuse with expiration field. * Blob file header now come with column family id, which always equal to default column family id. It leaves room for future support of column family. * Compression field in blob file header now is a standalone byte (instead of compact encode with flags field) * Blob file footer now come with its own crc. * Key length now being uint64_t instead of uint32_t * Blob CRC now checksum both key and value (instead of value only). * Some reordering of the fields. The list of cleanups: * Better inline comments in blob_log_format.h * rename ttlrange_t and snrange_t to ExpirationRange and SequenceRange respectively. * simplify blob_db::Reader * Move crc checking logic to inside blob_log_format.cc Closes https://github.com/facebook/rocksdb/pull/3081 Differential Revision: D6171304 Pulled By: yiwu-arbug fbshipit-source-id: e4373e0d39264441b7e2fbd0caba93ddd99ea2af --- utilities/blob_db/blob_db_impl.cc | 167 ++++++------ utilities/blob_db/blob_db_test.cc | 8 +- utilities/blob_db/blob_dump_tool.cc | 84 ++---- utilities/blob_db/blob_file.cc | 74 +++--- utilities/blob_db/blob_file.h | 55 ++-- utilities/blob_db/blob_log_format.cc | 367 +++++++++------------------ utilities/blob_db/blob_log_format.h | 300 +++++++--------------- utilities/blob_db/blob_log_reader.cc | 130 ++++------ utilities/blob_db/blob_log_reader.h | 17 +- utilities/blob_db/blob_log_writer.cc | 62 ++--- utilities/blob_db/blob_log_writer.h | 28 +- 11 files changed, 458 insertions(+), 834 deletions(-) diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 916cb9bf7..cf7782d27 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -37,16 +37,6 @@ namespace { int kBlockBasedTableVersionFormat = 2; - -void extendTTL(rocksdb::blob_db::ttlrange_t* ttl_range, uint64_t ttl) { - ttl_range->first = std::min(ttl_range->first, ttl); - ttl_range->second = std::max(ttl_range->second, ttl); -} - -void extendTimestamps(rocksdb::blob_db::tsrange_t* ts_range, uint64_t ts) { - ts_range->first = std::min(ts_range->first, ts); - ts_range->second = std::max(ts_range->second, ts); -} } // end namespace namespace rocksdb { @@ -66,10 +56,12 @@ WalFilter::WalProcessingOption BlobReconcileWalFilter::LogRecordFound( bool blobf_compare_ttl::operator()(const std::shared_ptr& lhs, const std::shared_ptr& rhs) const { - if (lhs->ttl_range_.first < rhs->ttl_range_.first) return true; - - if (lhs->ttl_range_.first > rhs->ttl_range_.first) return false; - + if (lhs->expiration_range_.first < rhs->expiration_range_.first) { + return true; + } + if (lhs->expiration_range_.first > rhs->expiration_range_.first) { + return false; + } return lhs->BlobFileNumber() > rhs->BlobFileNumber(); } @@ -332,6 +324,7 @@ Status BlobDBImpl::OpenAllFiles() { bfpath.c_str(), s1.ToString().c_str(), size_bytes); continue; } + bfptr->SetHasTTL(bfptr->header_.has_ttl); bfptr->header_valid_ = true; std::shared_ptr ra_reader = @@ -355,10 +348,8 @@ Status BlobDBImpl::OpenAllFiles() { "File found incomplete (w/o footer) %s", bfpath.c_str()); // sequentially iterate over the file and read all the records - ttlrange_t ttl_range(std::numeric_limits::max(), - std::numeric_limits::min()); - tsrange_t ts_range(std::numeric_limits::max(), - std::numeric_limits::min()); + ExpirationRange expiration_range(std::numeric_limits::max(), + std::numeric_limits::min()); uint64_t blob_count = 0; BlobLogRecord record; @@ -369,10 +360,10 @@ Status BlobDBImpl::OpenAllFiles() { while (reader->ReadRecord(&record, shallow).ok()) { ++blob_count; if (bfptr->HasTTL()) { - extendTTL(&ttl_range, record.GetTTL()); - } - if (bfptr->HasTimestamp()) { - extendTimestamps(&ts_range, record.GetTimeVal()); + expiration_range.first = + std::min(expiration_range.first, record.expiration); + expiration_range.second = + std::max(expiration_range.second, record.expiration); } record_start = reader->GetNextByte(); } @@ -391,24 +382,21 @@ Status BlobDBImpl::OpenAllFiles() { } bfptr->SetBlobCount(blob_count); - bfptr->SetSNRange({0, 0}); - - if (bfptr->HasTimestamp()) bfptr->set_time_range(ts_range); + bfptr->SetSequenceRange({0, 0}); ROCKS_LOG_INFO(db_options_.info_log, "Blob File: %s blob_count: %" PRIu64 - " size_bytes: %" PRIu64 " ts: %d ttl: %d", - bfpath.c_str(), blob_count, size_bytes, - bfptr->HasTimestamp(), bfptr->HasTTL()); + " size_bytes: %" PRIu64 " has_ttl: %d", + bfpath.c_str(), blob_count, size_bytes, bfptr->HasTTL()); if (bfptr->HasTTL()) { - ttl_range.second = - std::max(ttl_range.second, - ttl_range.first + (uint32_t)bdb_options_.ttl_range_secs); - bfptr->set_ttl_range(ttl_range); + expiration_range.second = std::max( + expiration_range.second, + expiration_range.first + (uint32_t)bdb_options_.ttl_range_secs); + bfptr->set_expiration_range(expiration_range); uint64_t now = EpochNow(); - if (ttl_range.second < now) { + if (expiration_range.second < now) { Status fstatus = CreateWriterLocked(bfptr); if (fstatus.ok()) fstatus = bfptr->WriteFooterAndCloseLocked(); if (!fstatus.ok()) { @@ -418,10 +406,11 @@ Status BlobDBImpl::OpenAllFiles() { bfpath.c_str(), fstatus.ToString().c_str()); continue; } else { - ROCKS_LOG_ERROR(db_options_.info_log, - "Blob File Closed: %s now: %d ttl_range: (%d, %d)", - bfpath.c_str(), now, ttl_range.first, - ttl_range.second); + ROCKS_LOG_ERROR( + db_options_.info_log, + "Blob File Closed: %s now: %d expiration_range: (%d, %d)", + bfpath.c_str(), now, expiration_range.first, + expiration_range.second); } } else { open_blob_files_.insert(bfptr); @@ -483,9 +472,9 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr& bfile) { } Writer::ElemType et = Writer::kEtNone; - if (bfile->file_size_ == BlobLogHeader::kHeaderSize) { + if (bfile->file_size_ == BlobLogHeader::kSize) { et = Writer::kEtFileHdr; - } else if (bfile->file_size_ > BlobLogHeader::kHeaderSize) { + } else if (bfile->file_size_ > BlobLogHeader::kSize) { et = Writer::kEtRecord; } else if (bfile->file_size_) { ROCKS_LOG_WARN(db_options_.info_log, @@ -507,14 +496,14 @@ std::shared_ptr BlobDBImpl::FindBlobFileLocked( if (open_blob_files_.empty()) return nullptr; std::shared_ptr tmp = std::make_shared(); - tmp->ttl_range_ = std::make_pair(expiration, 0); + tmp->expiration_range_ = std::make_pair(expiration, 0); auto citr = open_blob_files_.equal_range(tmp); if (citr.first == open_blob_files_.end()) { assert(citr.second == open_blob_files_.end()); std::shared_ptr check = *(open_blob_files_.rbegin()); - return (check->ttl_range_.second < expiration) ? nullptr : check; + return (check->expiration_range_.second < expiration) ? nullptr : check; } if (citr.first != citr.second) return *(citr.first); @@ -522,8 +511,8 @@ std::shared_ptr BlobDBImpl::FindBlobFileLocked( auto finditr = citr.second; if (finditr != open_blob_files_.begin()) --finditr; - bool b2 = (*finditr)->ttl_range_.second < expiration; - bool b1 = (*finditr)->ttl_range_.first > expiration; + bool b2 = (*finditr)->expiration_range_.second < expiration; + bool b1 = (*finditr)->expiration_range_.first > expiration; return (b1 || b2) ? nullptr : (*finditr); } @@ -560,9 +549,11 @@ std::shared_ptr BlobDBImpl::SelectBlobFile() { return nullptr; } - bfile->file_size_ = BlobLogHeader::kHeaderSize; - bfile->header_.compression_ = bdb_options_.compression; + bfile->file_size_ = BlobLogHeader::kSize; + bfile->header_.compression = bdb_options_.compression; + bfile->header_.has_ttl = false; bfile->header_valid_ = true; + bfile->SetHasTTL(false); // CHECK again WriteLock wl(&mutex_); @@ -603,7 +594,7 @@ std::shared_ptr BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) { uint64_t exp_low = (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs; uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs; - ttlrange_t ttl_guess = std::make_pair(exp_low, exp_high); + ExpirationRange expiration_range = std::make_pair(exp_low, exp_high); bfile = NewBlobFile("SelectBlobFileTTL"); assert(bfile); @@ -621,14 +612,16 @@ std::shared_ptr BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) { return nullptr; } - bfile->header_.set_ttl_guess(ttl_guess); - bfile->header_.compression_ = bdb_options_.compression; + bfile->header_.expiration_range = expiration_range; + bfile->header_.compression = bdb_options_.compression; + bfile->header_.has_ttl = true; bfile->header_valid_ = true; - bfile->file_size_ = BlobLogHeader::kHeaderSize; + bfile->SetHasTTL(true); + bfile->file_size_ = BlobLogHeader::kSize; // set the first value of the range, since that is // concrete at this time. also necessary to add to open_blob_files_ - bfile->ttl_range_ = ttl_guess; + bfile->expiration_range_ = expiration_range; WriteLock wl(&mutex_); // in case the epoch has shifted in the interim, then check @@ -878,8 +871,7 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key, Slice value_compressed = GetCompressedSlice(value, &compression_output); std::string headerbuf; - Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration, - -1); + Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration); s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration, &index_entry); @@ -887,7 +879,7 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key, if (s.ok()) { bfile->ExtendSequenceRange(sequence); if (expiration != kNoExpiration) { - extendTTL(&(bfile->ttl_range_), expiration); + bfile->ExtendExpirationRange(expiration); } s = CloseBlobFileIfNeeded(bfile); if (s.ok()) { @@ -1045,7 +1037,7 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, // later from the Blob Header, which needs to be also a // valid offset. if (blob_index.offset() < - (BlobLogHeader::kHeaderSize + BlobLogRecord::kHeaderSize + key.size())) { + (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) { if (debug_level_ >= 2) { ROCKS_LOG_ERROR(db_options_.info_log, "Invalid blob index file_number: %" PRIu64 @@ -1085,7 +1077,9 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, valueptr = &value_c; } - // allocate the buffer. This is safe in C++11 + // Allocate the buffer. This is safe in C++11 + // Note that std::string::reserved() does not work, since previous value + // of the buffer can be larger than blob_index.size(). valueptr->resize(blob_index.size()); char* buffer = &(*valueptr)[0]; @@ -1103,6 +1097,7 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, return Status::NotFound("Blob Not Found as couldnt retrieve Blob"); } + // TODO(yiwu): Add an option to skip crc checking. Slice crc_slice; uint32_t crc_exp; std::string crc_str; @@ -1121,7 +1116,8 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, return Status::NotFound("Blob Not Found as couldnt retrieve CRC"); } - uint32_t crc = crc32c::Extend(0, blob_value.data(), blob_value.size()); + uint32_t crc = crc32c::Value(key.data(), key.size()); + crc = crc32c::Extend(crc, blob_value.data(), blob_value.size()); crc = crc32c::Mask(crc); // Adjust for storage if (crc != crc_exp) { if (debug_level_ >= 2) { @@ -1134,6 +1130,8 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, return Status::Corruption("Corruption. Blob CRC mismatch"); } + // TODO(yiwu): Should use compression flag in the blob file instead of + // current compression option. if (bdb_options_.compression != kNoCompression) { BlockContents contents; auto cfh = reinterpret_cast(DefaultColumnFamily()); @@ -1204,7 +1202,7 @@ std::pair BlobDBImpl::SanityCheck(bool aborted) { "Blob File %s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64, bfile->PathName().c_str(), bfile->GetFileSize(), bfile->BlobCount(), bfile->deleted_count_, bfile->deleted_size_, - (bfile->ttl_range_.second - epoch_now)); + (bfile->expiration_range_.second - epoch_now)); } // reschedule @@ -1256,7 +1254,7 @@ bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked( const std::shared_ptr& bfile) { assert(bfile->Obsolete()); - SequenceNumber esn = bfile->GetSNRange().first; + SequenceNumber esn = bfile->GetSequenceRange().first; // TODO(yiwu): Here we should check instead if there is an active snapshot // lies between the first sequence in the file, and the last sequence by @@ -1413,7 +1411,7 @@ std::pair BlobDBImpl::CheckSeqFiles(bool aborted) { { ReadLock lockbfile_r(&bfile->mutex_); - if (bfile->ttl_range_.second > epoch_now) continue; + if (bfile->expiration_range_.second > epoch_now) continue; process_files.push_back(bfile); } } @@ -1587,22 +1585,24 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, bool first_gc = bfptr->gc_once_after_open_; - auto* cfh = bfptr->GetColumnFamily(db_); + auto* cfh = + db_impl_->GetColumnFamilyHandleUnlocked(bfptr->column_family_id()); auto* cfd = reinterpret_cast(cfh)->cfd(); auto column_family_id = cfd->GetID(); - bool has_ttl = header.HasTTL(); + bool has_ttl = header.has_ttl; // this reads the key but skips the blob Reader::ReadLevel shallow = Reader::kReadHeaderKey; - bool no_relocation_ttl = (has_ttl && now >= bfptr->GetTTLRange().second); + bool no_relocation_ttl = + (has_ttl && now >= bfptr->GetExpirationRange().second); bool no_relocation_lsmdel = false; { ReadLock lockbfile_r(&bfptr->mutex_); - no_relocation_lsmdel = (bfptr->GetFileSize() == - (BlobLogHeader::kHeaderSize + bfptr->deleted_size_ + - BlobLogFooter::kFooterSize)); + no_relocation_lsmdel = + (bfptr->GetFileSize() == + (BlobLogHeader::kSize + bfptr->deleted_size_ + BlobLogFooter::kSize)); } bool no_relocation = no_relocation_ttl || no_relocation_lsmdel; @@ -1641,7 +1641,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, bool is_blob_index = false; PinnableSlice index_entry; Status get_status = db_impl_->GetImpl( - ReadOptions(), cfh, record.Key(), &index_entry, nullptr /*value_found*/, + ReadOptions(), cfh, record.key, &index_entry, nullptr /*value_found*/, nullptr /*read_callback*/, &is_blob_index); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB"); if (!get_status.ok() && !get_status.ok()) { @@ -1672,15 +1672,15 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, continue; } - GarbageCollectionWriteCallback callback(cfd, record.Key(), latest_seq); + GarbageCollectionWriteCallback callback(cfd, record.key, latest_seq); // If key has expired, remove it from base DB. - if (no_relocation_ttl || (has_ttl && now >= record.GetTTL())) { + if (no_relocation_ttl || (has_ttl && now >= record.expiration)) { gc_stats->num_deletes++; - gc_stats->deleted_size += record.GetBlobSize(); + gc_stats->deleted_size += record.value_size; TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"); WriteBatch delete_batch; - Status delete_status = delete_batch.Delete(record.Key()); + Status delete_status = delete_batch.Delete(record.key); if (delete_status.ok()) { delete_status = db_impl_->WriteWithCallback(WriteOptions(), &delete_batch, &callback); @@ -1719,7 +1719,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, newfile->header_ = std::move(header); // Can't use header beyond this point newfile->header_valid_ = true; - newfile->file_size_ = BlobLogHeader::kHeaderSize; + newfile->file_size_ = BlobLogHeader::kSize; s = new_writer->WriteHeader(newfile->header_); if (!s.ok()) { @@ -1741,21 +1741,21 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, uint64_t new_blob_offset = 0; uint64_t new_key_offset = 0; // write the blob to the blob log. - s = new_writer->AddRecord(record.Key(), record.Blob(), &new_key_offset, - &new_blob_offset, record.GetTTL()); + s = new_writer->AddRecord(record.key, record.value, record.expiration, + &new_key_offset, &new_blob_offset); BlobIndex::EncodeBlob(&new_index_entry, newfile->BlobFileNumber(), - new_blob_offset, record.Blob().size(), + new_blob_offset, record.value.size(), bdb_options_.compression); newfile->blob_count_++; newfile->file_size_ += - BlobLogRecord::kHeaderSize + record.Key().size() + record.Blob().size(); + BlobLogRecord::kHeaderSize + record.key.size() + record.value.size(); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"); WriteBatch rewrite_batch; Status rewrite_status = WriteBatchInternal::PutBlobIndex( - &rewrite_batch, column_family_id, record.Key(), new_index_entry); + &rewrite_batch, column_family_id, record.key, new_index_entry); if (rewrite_status.ok()) { rewrite_status = db_impl_->WriteWithCallback(WriteOptions(), &rewrite_batch, &callback); @@ -1798,8 +1798,8 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr bfile, uint64_t now, bool is_oldest_simple_blob_file, std::string* reason) { if (bfile->HasTTL()) { - ttlrange_t ttl_range = bfile->GetTTLRange(); - if (now > ttl_range.second) { + ExpirationRange expiration_range = bfile->GetExpirationRange(); + if (now > expiration_range.second) { *reason = "entire file ttl expired"; return true; } @@ -1942,11 +1942,12 @@ bool BlobDBImpl::CallbackEvictsImpl(std::shared_ptr bfile) { return false; } - ColumnFamilyHandle* cfh = bfile->GetColumnFamily(db_); + ColumnFamilyHandle* cfh = + db_impl_->GetColumnFamilyHandleUnlocked(bfile->column_family_id()); BlobLogRecord record; Reader::ReadLevel full = Reader::kReadHeaderKeyBlob; while (reader->ReadRecord(&record, full).ok()) { - bdb_options_.gc_evict_cb_fn(cfh, record.Key(), record.Blob()); + bdb_options_.gc_evict_cb_fn(cfh, record.key, record.value); } return true; @@ -2039,15 +2040,15 @@ void BlobDBImpl::FilterSubsetOfFiles( "File has been skipped for GC ttl %s %" PRIu64 " %" PRIu64 " reason='%s'", bfile->PathName().c_str(), now, - bfile->GetTTLRange().second, reason.c_str()); + bfile->GetExpirationRange().second, reason.c_str()); continue; } ROCKS_LOG_INFO(db_options_.info_log, "File has been chosen for GC ttl %s %" PRIu64 " %" PRIu64 " reason='%s'", - bfile->PathName().c_str(), now, bfile->GetTTLRange().second, - reason.c_str()); + bfile->PathName().c_str(), now, + bfile->GetExpirationRange().second, reason.c_str()); to_process->push_back(bfile); } } diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 6f16e5b3d..85507eb5f 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -1012,11 +1012,11 @@ TEST_F(BlobDBTest, InlineSmallValues) { ttl_file = blob_files[1]; } ASSERT_FALSE(non_ttl_file->HasTTL()); - ASSERT_EQ(first_non_ttl_seq, non_ttl_file->GetSNRange().first); - ASSERT_EQ(last_non_ttl_seq, non_ttl_file->GetSNRange().second); + ASSERT_EQ(first_non_ttl_seq, non_ttl_file->GetSequenceRange().first); + ASSERT_EQ(last_non_ttl_seq, non_ttl_file->GetSequenceRange().second); ASSERT_TRUE(ttl_file->HasTTL()); - ASSERT_EQ(first_ttl_seq, ttl_file->GetSNRange().first); - ASSERT_EQ(last_ttl_seq, ttl_file->GetSNRange().second); + ASSERT_EQ(first_ttl_seq, ttl_file->GetSequenceRange().first); + ASSERT_EQ(last_ttl_seq, ttl_file->GetSequenceRange().second); } } // namespace blob_db diff --git a/utilities/blob_db/blob_dump_tool.cc b/utilities/blob_db/blob_dump_tool.cc index e9b7351bb..b7ae8162d 100644 --- a/utilities/blob_db/blob_dump_tool.cc +++ b/utilities/blob_db/blob_dump_tool.cc @@ -18,7 +18,6 @@ #include "rocksdb/convenience.h" #include "rocksdb/env.h" #include "util/coding.h" -#include "util/crc32c.h" #include "util/string_util.h" namespace rocksdb { @@ -92,7 +91,7 @@ Status BlobDumpTool::Read(uint64_t offset, size_t size, Slice* result) { Status BlobDumpTool::DumpBlobLogHeader(uint64_t* offset) { Slice slice; - Status s = Read(0, BlobLogHeader::kHeaderSize, &slice); + Status s = Read(0, BlobLogHeader::kSize, &slice); if (!s.ok()) { return s; } @@ -102,20 +101,19 @@ Status BlobDumpTool::DumpBlobLogHeader(uint64_t* offset) { return s; } fprintf(stdout, "Blob log header:\n"); - fprintf(stdout, " Magic Number : %" PRIu32 "\n", header.magic_number()); - fprintf(stdout, " Version : %" PRIu32 "\n", header.version()); - CompressionType compression = header.compression(); + fprintf(stdout, " Version : %" PRIu32 "\n", header.version); + fprintf(stdout, " Column Family ID : %" PRIu32 "\n", + header.column_family_id); std::string compression_str; - if (!GetStringFromCompressionType(&compression_str, compression).ok()) { + if (!GetStringFromCompressionType(&compression_str, header.compression) + .ok()) { compression_str = "Unrecongnized compression type (" + - ToString((int)header.compression()) + ")"; - } - fprintf(stdout, " Compression : %s\n", compression_str.c_str()); - fprintf(stdout, " TTL Range : %s\n", - GetString(header.ttl_range()).c_str()); - fprintf(stdout, " Timestamp Range: %s\n", - GetString(header.ts_range()).c_str()); - *offset = BlobLogHeader::kHeaderSize; + ToString((int)header.compression) + ")"; + } + fprintf(stdout, " Compression : %s\n", compression_str.c_str()); + fprintf(stdout, " Expiration range : %s\n", + GetString(header.expiration_range).c_str()); + *offset = BlobLogHeader::kSize; return s; } @@ -126,20 +124,12 @@ Status BlobDumpTool::DumpBlobLogFooter(uint64_t file_size, fprintf(stdout, "No blob log footer.\n"); return Status::OK(); }; - if (file_size < BlobLogHeader::kHeaderSize + BlobLogFooter::kFooterSize) { + if (file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) { return no_footer(); } Slice slice; - Status s = Read(file_size - 4, 4, &slice); - if (!s.ok()) { - return s; - } - uint32_t magic_number = DecodeFixed32(slice.data()); - if (magic_number != kMagicNumber) { - return no_footer(); - } - *footer_offset = file_size - BlobLogFooter::kFooterSize; - s = Read(*footer_offset, BlobLogFooter::kFooterSize, &slice); + *footer_offset = file_size - BlobLogFooter::kSize; + Status s = Read(*footer_offset, BlobLogFooter::kSize, &slice); if (!s.ok()) { return s; } @@ -149,13 +139,11 @@ Status BlobDumpTool::DumpBlobLogFooter(uint64_t file_size, return s; } fprintf(stdout, "Blob log footer:\n"); - fprintf(stdout, " Blob count : %" PRIu64 "\n", footer.GetBlobCount()); - fprintf(stdout, " TTL Range : %s\n", - GetString(footer.GetTTLRange()).c_str()); - fprintf(stdout, " Time Range : %s\n", - GetString(footer.GetTimeRange()).c_str()); - fprintf(stdout, " Sequence Range : %s\n", - GetString(footer.GetSNRange()).c_str()); + fprintf(stdout, " Blob count : %" PRIu64 "\n", footer.blob_count); + fprintf(stdout, " Expiration Range : %s\n", + GetString(footer.expiration_range).c_str()); + fprintf(stdout, " Sequence Range : %s\n", + GetString(footer.sequence_range).c_str()); return s; } @@ -173,41 +161,25 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob, if (!s.ok()) { return s; } - uint32_t key_size = record.GetKeySize(); - uint64_t blob_size = record.GetBlobSize(); - fprintf(stdout, " key size : %" PRIu32 "\n", key_size); - fprintf(stdout, " blob size : %" PRIu64 "\n", record.GetBlobSize()); - fprintf(stdout, " TTL : %" PRIu64 "\n", record.GetTTL()); - fprintf(stdout, " time : %" PRIu64 "\n", record.GetTimeVal()); - fprintf(stdout, " type : %d, %d\n", record.type(), record.subtype()); - fprintf(stdout, " header CRC : %" PRIu32 "\n", record.header_checksum()); - fprintf(stdout, " CRC : %" PRIu32 "\n", record.checksum()); - uint32_t header_crc = - crc32c::Extend(0, slice.data(), slice.size() - 2 * sizeof(uint32_t)); + uint64_t key_size = record.key_size; + uint64_t value_size = record.value_size; + fprintf(stdout, " key size : %" PRIu64 "\n", key_size); + fprintf(stdout, " value size : %" PRIu64 "\n", value_size); + fprintf(stdout, " expiration : %" PRIu64 "\n", record.expiration); *offset += BlobLogRecord::kHeaderSize; - s = Read(*offset, key_size + blob_size, &slice); + s = Read(*offset, key_size + value_size, &slice); if (!s.ok()) { return s; } - header_crc = crc32c::Extend(header_crc, slice.data(), key_size); - header_crc = crc32c::Mask(header_crc); - if (header_crc != record.header_checksum()) { - return Status::Corruption("Record header checksum mismatch."); - } - uint32_t blob_crc = crc32c::Extend(0, slice.data() + key_size, blob_size); - blob_crc = crc32c::Mask(blob_crc); - if (blob_crc != record.checksum()) { - return Status::Corruption("Blob checksum mismatch."); - } if (show_key != DisplayType::kNone) { fprintf(stdout, " key : "); DumpSlice(Slice(slice.data(), key_size), show_key); if (show_blob != DisplayType::kNone) { fprintf(stdout, " blob : "); - DumpSlice(Slice(slice.data() + key_size, blob_size), show_blob); + DumpSlice(Slice(slice.data() + key_size, value_size), show_blob); } } - *offset += key_size + blob_size; + *offset += key_size + value_size; return s; } diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index 9989bacf3..d50256ca6 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -15,6 +15,8 @@ #include #include +#include "db/column_family.h" +#include "db/db_impl.h" #include "db/dbformat.h" #include "util/filename.h" #include "util/logging.h" @@ -27,6 +29,7 @@ namespace blob_db { BlobFile::BlobFile() : parent_(nullptr), file_number_(0), + has_ttl_(false), blob_count_(0), gc_epoch_(-1), file_size_(0), @@ -35,9 +38,8 @@ BlobFile::BlobFile() closed_(false), can_be_deleted_(false), gc_once_after_open_(false), - ttl_range_(std::make_pair(0, 0)), - time_range_(std::make_pair(0, 0)), - sn_range_(std::make_pair(kMaxSequenceNumber, 0)), + expiration_range_({0, 0}), + sequence_range_({kMaxSequenceNumber, 0}), last_access_(-1), last_fsync_(0), header_valid_(false) {} @@ -46,6 +48,7 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn) : parent_(p), path_to_dir_(bdir), file_number_(fn), + has_ttl_(false), blob_count_(0), gc_epoch_(-1), file_size_(0), @@ -54,9 +57,8 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn) closed_(false), can_be_deleted_(false), gc_once_after_open_(false), - ttl_range_(std::make_pair(0, 0)), - time_range_(std::make_pair(0, 0)), - sn_range_(std::make_pair(kMaxSequenceNumber, 0)), + expiration_range_({0, 0}), + sequence_range_({kMaxSequenceNumber, 0}), last_access_(-1), last_fsync_(0), header_valid_(false) {} @@ -72,6 +74,13 @@ BlobFile::~BlobFile() { } } +uint32_t BlobFile::column_family_id() const { + // TODO(yiwu): Should return column family id encoded in blob file after + // we add blob db column family support. + return reinterpret_cast(parent_->DefaultColumnFamily()) + ->GetID(); +} + std::string BlobFile::PathName() const { return BlobFileName(path_to_dir_, file_number_); } @@ -101,13 +110,14 @@ std::string BlobFile::DumpState() const { "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " gc_epoch: %" PRIu64 " file_size: %" PRIu64 " deleted_count: %" PRIu64 " deleted_size: %" PRIu64 - " closed: %d can_be_deleted: %d ttl_range: (%" PRIu64 ", %" PRIu64 - ") sn_range: (%" PRIu64 " %" PRIu64 "), writer: %d reader: %d", + " closed: %d can_be_deleted: %d expiration_range: (%" PRIu64 + ", %" PRIu64 ") sequence_range: (%" PRIu64 " %" PRIu64 + "), writer: %d reader: %d", path_to_dir_.c_str(), file_number_, blob_count_.load(), gc_epoch_.load(), file_size_.load(), deleted_count_, deleted_size_, - closed_.load(), can_be_deleted_.load(), ttl_range_.first, - ttl_range_.second, sn_range_.first, sn_range_.second, - (!!log_writer_), (!!ra_file_reader_)); + closed_.load(), can_be_deleted_.load(), expiration_range_.first, + expiration_range_.second, sequence_range_.first, + sequence_range_.second, (!!log_writer_), (!!ra_file_reader_)); return str; } @@ -122,17 +132,18 @@ Status BlobFile::WriteFooterAndCloseLocked() { "File is being closed after footer %s", PathName().c_str()); BlobLogFooter footer; - footer.blob_count_ = blob_count_; - if (HasTTL()) footer.set_ttl_range(ttl_range_); + footer.blob_count = blob_count_; + if (HasTTL()) { + footer.expiration_range = expiration_range_; + } - footer.sn_range_ = sn_range_; - if (HasTimestamp()) footer.set_time_range(time_range_); + footer.sequence_range = sequence_range_; // this will close the file and reset the Writable File Pointer. Status s = log_writer_->AppendFooter(footer); if (s.ok()) { closed_ = true; - file_size_ += BlobLogFooter::kFooterSize; + file_size_ += BlobLogFooter::kSize; } else { ROCKS_LOG_ERROR(parent_->db_options_.info_log, "Failure to read Header for blob-file %s", @@ -144,20 +155,20 @@ Status BlobFile::WriteFooterAndCloseLocked() { } Status BlobFile::ReadFooter(BlobLogFooter* bf) { - if (file_size_ < (BlobLogHeader::kHeaderSize + BlobLogFooter::kFooterSize)) { + if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) { return Status::IOError("File does not have footer", PathName()); } - uint64_t footer_offset = file_size_ - BlobLogFooter::kFooterSize; + uint64_t footer_offset = file_size_ - BlobLogFooter::kSize; // assume that ra_file_reader_ is valid before we enter this assert(ra_file_reader_); Slice result; - char scratch[BlobLogFooter::kFooterSize + 10]; - Status s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kFooterSize, - &result, scratch); + char scratch[BlobLogFooter::kSize + 10]; + Status s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result, + scratch); if (!s.ok()) return s; - if (result.size() != BlobLogFooter::kFooterSize) { + if (result.size() != BlobLogFooter::kSize) { // should not happen return Status::IOError("EOF reached before footer"); } @@ -167,21 +178,12 @@ Status BlobFile::ReadFooter(BlobLogFooter* bf) { } Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) { - if (footer.HasTTL() != header_.HasTTL()) { - return Status::Corruption("has_ttl mismatch"); - } - if (footer.HasTimestamp() != header_.HasTimestamp()) { - return Status::Corruption("has_ts mismatch"); - } - // assume that file has been fully fsync'd last_fsync_.store(file_size_); - blob_count_ = footer.GetBlobCount(); - ttl_range_ = footer.GetTTLRange(); - time_range_ = footer.GetTimeRange(); - sn_range_ = footer.GetSNRange(); + blob_count_ = footer.blob_count; + expiration_range_ = footer.expiration_range; + sequence_range_ = footer.sequence_range; closed_ = true; - return Status::OK(); } @@ -229,10 +231,6 @@ std::shared_ptr BlobFile::GetOrOpenRandomAccessReader( return ra_file_reader_; } -ColumnFamilyHandle* BlobFile::GetColumnFamily(DB* db) { - return db->DefaultColumnFamily(); -} - } // namespace blob_db } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/blob_db/blob_file.h b/utilities/blob_db/blob_file.h index a18bf778a..455383448 100644 --- a/utilities/blob_db/blob_file.h +++ b/utilities/blob_db/blob_file.h @@ -13,11 +13,14 @@ #include "rocksdb/options.h" #include "util/file_reader_writer.h" #include "utilities/blob_db/blob_log_format.h" +#include "utilities/blob_db/blob_log_reader.h" #include "utilities/blob_db/blob_log_writer.h" namespace rocksdb { namespace blob_db { +class BlobDBImpl; + class BlobFile { friend class BlobDBImpl; friend struct blobf_compare_ttl; @@ -34,6 +37,10 @@ class BlobFile { // after that uint64_t file_number_; + // If true, the keys in this file all has TTL. Otherwise all keys don't + // have TTL. + bool has_ttl_; + // number of blobs in the file std::atomic blob_count_; @@ -62,14 +69,9 @@ class BlobFile { // should this file been gc'd once to reconcile lost deletes/compactions std::atomic gc_once_after_open_; - // et - lt of the blobs - ttlrange_t ttl_range_; - - // et - lt of the timestamp of the KV pairs. - tsrange_t time_range_; + ExpirationRange expiration_range_; - // ESN - LSN of the blobs - snrange_t sn_range_; + SequenceRange sequence_range_; // Sequential/Append writer for blobs std::shared_ptr log_writer_; @@ -96,7 +98,7 @@ class BlobFile { ~BlobFile(); - ColumnFamilyHandle* GetColumnFamily(DB* db); + uint32_t column_family_id() const; // Returns log file's pathname relative to the main db dir // Eg. For a live-log-file = blob_dir/000003.blob @@ -128,30 +130,29 @@ class BlobFile { } // All Get functions which are not atomic, will need ReadLock on the mutex - tsrange_t GetTimeRange() const { - assert(HasTimestamp()); - return time_range_; - } - - ttlrange_t GetTTLRange() const { return ttl_range_; } - snrange_t GetSNRange() const { return sn_range_; } + ExpirationRange GetExpirationRange() const { return expiration_range_; } - bool HasTTL() const { - assert(header_valid_); - return header_.HasTTL(); + void ExtendExpirationRange(uint64_t expiration) { + expiration_range_.first = std::min(expiration_range_.first, expiration); + expiration_range_.second = std::max(expiration_range_.second, expiration); } - bool HasTimestamp() const { - assert(header_valid_); - return header_.HasTimestamp(); + SequenceRange GetSequenceRange() const { return sequence_range_; } + + void SetSequenceRange(SequenceRange sequence_range) { + sequence_range_ = sequence_range; } void ExtendSequenceRange(SequenceNumber sequence) { - sn_range_.first = std::min(sn_range_.first, sequence); - sn_range_.second = std::max(sn_range_.second, sequence); + sequence_range_.first = std::min(sequence_range_.first, sequence); + sequence_range_.second = std::max(sequence_range_.second, sequence); } + bool HasTTL() const { return has_ttl_; } + + void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; } + std::shared_ptr GetWriter() const { return log_writer_; } void Fsync(); @@ -174,11 +175,9 @@ class BlobFile { // previously closed file Status SetFromFooterLocked(const BlobLogFooter& footer); - void set_time_range(const tsrange_t& tr) { time_range_ = tr; } - - void set_ttl_range(const ttlrange_t& ttl) { ttl_range_ = ttl; } - - void SetSNRange(const snrange_t& snr) { sn_range_ = snr; } + void set_expiration_range(const ExpirationRange& expiration_range) { + expiration_range_ = expiration_range; + } // The following functions are atomic, and don't need locks void SetFileSize(uint64_t fs) { file_size_ = fs; } diff --git a/utilities/blob_db/blob_log_format.cc b/utilities/blob_db/blob_log_format.cc index 2e6fa3c63..eb748ac99 100644 --- a/utilities/blob_db/blob_log_format.cc +++ b/utilities/blob_db/blob_log_format.cc @@ -6,284 +6,145 @@ #ifndef ROCKSDB_LITE #include "utilities/blob_db/blob_log_format.h" + #include "util/coding.h" #include "util/crc32c.h" namespace rocksdb { namespace blob_db { -const uint32_t kMagicNumber = 2395959; -const uint32_t kVersion1 = 1; -const size_t kBlockSize = 32768; - -BlobLogHeader::BlobLogHeader() - : magic_number_(kMagicNumber), compression_(kNoCompression) {} - -BlobLogHeader& BlobLogHeader::operator=(BlobLogHeader&& in) noexcept { - if (this != &in) { - magic_number_ = in.magic_number_; - version_ = in.version_; - ttl_guess_ = std::move(in.ttl_guess_); - ts_guess_ = std::move(in.ts_guess_); - compression_ = in.compression_; - } - return *this; +void BlobLogHeader::EncodeTo(std::string* dst) { + assert(dst != nullptr); + dst->clear(); + dst->reserve(BlobLogHeader::kSize); + PutFixed32(dst, kMagicNumber); + PutFixed32(dst, version); + PutFixed32(dst, column_family_id); + unsigned char flags = (has_ttl ? 1 : 0); + dst->push_back(flags); + dst->push_back(compression); + PutFixed64(dst, expiration_range.first); + PutFixed64(dst, expiration_range.second); } -BlobLogFooter::BlobLogFooter() : magic_number_(kMagicNumber), blob_count_(0) {} - -Status BlobLogFooter::DecodeFrom(const Slice& input) { - Slice slice(input); - uint32_t val; - if (!GetFixed32(&slice, &val)) { - return Status::Corruption("Invalid Blob Footer: flags"); - } - - bool has_ttl = false; - bool has_ts = false; - val >>= 8; - RecordSubType st = static_cast(val); - switch (st) { - case kRegularType: - break; - case kTTLType: - has_ttl = true; - break; - case kTimestampType: - has_ts = true; - break; - default: - return Status::Corruption("Invalid Blob Footer: flags_val"); - } - - if (!GetFixed64(&slice, &blob_count_)) { - return Status::Corruption("Invalid Blob Footer: blob_count"); - } - - ttlrange_t temp_ttl; - if (!GetFixed64(&slice, &temp_ttl.first) || - !GetFixed64(&slice, &temp_ttl.second)) { - return Status::Corruption("Invalid Blob Footer: ttl_range"); - } - if (has_ttl) { - ttl_range_.reset(new ttlrange_t(temp_ttl)); - } - - if (!GetFixed64(&slice, &sn_range_.first) || - !GetFixed64(&slice, &sn_range_.second)) { - return Status::Corruption("Invalid Blob Footer: sn_range"); - } - - tsrange_t temp_ts; - if (!GetFixed64(&slice, &temp_ts.first) || - !GetFixed64(&slice, &temp_ts.second)) { - return Status::Corruption("Invalid Blob Footer: ts_range"); - } - if (has_ts) { - ts_range_.reset(new tsrange_t(temp_ts)); - } - - if (!GetFixed32(&slice, &magic_number_) || magic_number_ != kMagicNumber) { - return Status::Corruption("Invalid Blob Footer: magic"); +Status BlobLogHeader::DecodeFrom(Slice src) { + static const std::string kErrorMessage = + "Error while decoding blob log header"; + if (src.size() != BlobLogHeader::kSize) { + return Status::Corruption(kErrorMessage, + "Unexpected blob file header size"); + } + uint32_t magic_number; + unsigned char flags; + if (!GetFixed32(&src, &magic_number) || !GetFixed32(&src, &version) || + !GetFixed32(&src, &column_family_id)) { + return Status::Corruption( + kErrorMessage, + "Error decoding magic number, version and column family id"); + } + if (magic_number != kMagicNumber) { + return Status::Corruption(kErrorMessage, "Magic number mismatch"); + } + if (version != kVersion1) { + return Status::Corruption(kErrorMessage, "Unknown header version"); + } + flags = src.data()[0]; + compression = static_cast(src.data()[1]); + has_ttl = (flags & 1) == 1; + src.remove_prefix(2); + if (!GetFixed64(&src, &expiration_range.first) || + !GetFixed64(&src, &expiration_range.second)) { + return Status::Corruption(kErrorMessage, "Error decoding expiration range"); } - return Status::OK(); } -void BlobLogFooter::EncodeTo(std::string* dst) const { - dst->reserve(kFooterSize); - - RecordType rt = kFullType; - RecordSubType st = kRegularType; - if (HasTTL()) { - st = kTTLType; - } else if (HasTimestamp()) { - st = kTimestampType; - } - uint32_t val = static_cast(rt) | (static_cast(st) << 8); - PutFixed32(dst, val); - - PutFixed64(dst, blob_count_); - bool has_ttl = HasTTL(); - bool has_ts = HasTimestamp(); - - if (has_ttl) { - PutFixed64(dst, ttl_range_.get()->first); - PutFixed64(dst, ttl_range_.get()->second); - } else { - PutFixed64(dst, 0); - PutFixed64(dst, 0); - } - PutFixed64(dst, sn_range_.first); - PutFixed64(dst, sn_range_.second); - - if (has_ts) { - PutFixed64(dst, ts_range_.get()->first); - PutFixed64(dst, ts_range_.get()->second); - } else { - PutFixed64(dst, 0); - PutFixed64(dst, 0); - } - - PutFixed32(dst, magic_number_); +void BlobLogFooter::EncodeTo(std::string* dst) { + assert(dst != nullptr); + dst->clear(); + dst->reserve(BlobLogFooter::kSize); + PutFixed32(dst, kMagicNumber); + PutFixed64(dst, blob_count); + PutFixed64(dst, expiration_range.first); + PutFixed64(dst, expiration_range.second); + PutFixed64(dst, sequence_range.first); + PutFixed64(dst, sequence_range.second); + crc = crc32c::Value(dst->c_str(), dst->size()); + crc = crc32c::Mask(crc); + PutFixed32(dst, crc); } -void BlobLogHeader::EncodeTo(std::string* dst) const { - dst->reserve(kHeaderSize); - - PutFixed32(dst, magic_number_); - - PutFixed32(dst, version_); - - RecordSubType st = kRegularType; - bool has_ttl = HasTTL(); - bool has_ts = HasTimestamp(); - - if (has_ttl) { - st = kTTLType; - } else if (has_ts) { - st = kTimestampType; - } - uint32_t val = - static_cast(st) | (static_cast(compression_) << 8); - PutFixed32(dst, val); - - if (has_ttl) { - PutFixed64(dst, ttl_guess_.get()->first); - PutFixed64(dst, ttl_guess_.get()->second); - } else { - PutFixed64(dst, 0); - PutFixed64(dst, 0); +Status BlobLogFooter::DecodeFrom(Slice src) { + static const std::string kErrorMessage = + "Error while decoding blob log footer"; + if (src.size() != BlobLogFooter::kSize) { + return Status::Corruption(kErrorMessage, + "Unexpected blob file footer size"); + } + uint32_t src_crc = 0; + src_crc = crc32c::Value(src.data(), BlobLogFooter::kSize - 4); + src_crc = crc32c::Mask(src_crc); + uint32_t magic_number; + if (!GetFixed32(&src, &magic_number) || !GetFixed64(&src, &blob_count) || + !GetFixed64(&src, &expiration_range.first) || + !GetFixed64(&src, &expiration_range.second) || + !GetFixed64(&src, &sequence_range.first) || + !GetFixed64(&src, &sequence_range.second) || !GetFixed32(&src, &crc)) { + return Status::Corruption(kErrorMessage, "Error decoding content"); + } + if (magic_number != kMagicNumber) { + return Status::Corruption(kErrorMessage, "Magic number mismatch"); + } + if (src_crc != crc) { + return Status::Corruption(kErrorMessage, "CRC mismatch"); } - - if (has_ts) { - PutFixed64(dst, ts_guess_.get()->first); - PutFixed64(dst, ts_guess_.get()->second); - } else { - PutFixed64(dst, 0); - PutFixed64(dst, 0); - } -} - -Status BlobLogHeader::DecodeFrom(const Slice& input) { - Slice slice(input); - if (!GetFixed32(&slice, &magic_number_) || magic_number_ != kMagicNumber) { - return Status::Corruption("Invalid Blob Log Header: magic"); - } - - // as of today, we only support 1 version - if (!GetFixed32(&slice, &version_) || version_ != kVersion1) { - return Status::Corruption("Invalid Blob Log Header: version"); - } - - uint32_t val; - if (!GetFixed32(&slice, &val)) { - return Status::Corruption("Invalid Blob Log Header: subtype"); - } - - bool has_ttl = false; - bool has_ts = false; - RecordSubType st = static_cast(val & 0xff); - compression_ = static_cast((val >> 8) & 0xff); - switch (st) { - case kRegularType: - break; - case kTTLType: - has_ttl = true; - break; - case kTimestampType: - has_ts = true; - break; - default: - return Status::Corruption("Invalid Blob Log Header: subtype_2"); - } - - ttlrange_t temp_ttl; - if (!GetFixed64(&slice, &temp_ttl.first) || - !GetFixed64(&slice, &temp_ttl.second)) { - return Status::Corruption("Invalid Blob Log Header: ttl"); - } - if (has_ttl) { - set_ttl_guess(temp_ttl); - } - - tsrange_t temp_ts; - if (!GetFixed64(&slice, &temp_ts.first) || - !GetFixed64(&slice, &temp_ts.second)) { - return Status::Corruption("Invalid Blob Log Header: timestamp"); - } - if (has_ts) set_ts_guess(temp_ts); - return Status::OK(); } -BlobLogRecord::BlobLogRecord() - : checksum_(0), - header_cksum_(0), - key_size_(0), - blob_size_(0), - time_val_(0), - ttl_val_(0), - type_(0), - subtype_(0) {} - -BlobLogRecord::~BlobLogRecord() {} - -void BlobLogRecord::ResizeKeyBuffer(size_t kbs) { - if (kbs > key_buffer_.size()) { - key_buffer_.resize(kbs); - } -} - -void BlobLogRecord::ResizeBlobBuffer(size_t bbs) { - if (bbs > blob_buffer_.size()) { - blob_buffer_.resize(bbs); - } -} - -void BlobLogRecord::Clear() { - checksum_ = 0; - header_cksum_ = 0; - key_size_ = 0; - blob_size_ = 0; - time_val_ = 0; - ttl_val_ = 0; - type_ = subtype_ = 0; - key_.clear(); - blob_.clear(); +void BlobLogRecord::EncodeHeaderTo(std::string* dst) { + assert(dst != nullptr); + dst->clear(); + dst->reserve(BlobLogRecord::kHeaderSize + key.size() + value.size()); + PutFixed64(dst, key.size()); + PutFixed64(dst, value.size()); + PutFixed64(dst, expiration); + header_crc = crc32c::Value(dst->c_str(), dst->size()); + header_crc = crc32c::Mask(header_crc); + PutFixed32(dst, header_crc); + blob_crc = crc32c::Value(key.data(), key.size()); + blob_crc = crc32c::Extend(blob_crc, value.data(), value.size()); + blob_crc = crc32c::Mask(blob_crc); + PutFixed32(dst, blob_crc); } -Status BlobLogRecord::DecodeHeaderFrom(const Slice& hdrslice) { - Slice input = hdrslice; - if (input.size() < kHeaderSize) { - return Status::Corruption("Invalid Blob Record Header: size"); +Status BlobLogRecord::DecodeHeaderFrom(Slice src) { + static const std::string kErrorMessage = "Error while decoding blob record"; + if (src.size() != BlobLogRecord::kHeaderSize) { + return Status::Corruption(kErrorMessage, + "Unexpected blob record header size"); } - - if (!GetFixed32(&input, &key_size_)) { - return Status::Corruption("Invalid Blob Record Header: key_size"); - } - if (!GetFixed64(&input, &blob_size_)) { - return Status::Corruption("Invalid Blob Record Header: blob_size"); + uint32_t src_crc = 0; + src_crc = crc32c::Value(src.data(), BlobLogRecord::kHeaderSize - 8); + src_crc = crc32c::Mask(src_crc); + if (!GetFixed64(&src, &key_size) || !GetFixed64(&src, &value_size) || + !GetFixed64(&src, &expiration) || !GetFixed32(&src, &header_crc) || + !GetFixed32(&src, &blob_crc)) { + return Status::Corruption(kErrorMessage, "Error decoding content"); } - if (!GetFixed64(&input, &ttl_val_)) { - return Status::Corruption("Invalid Blob Record Header: ttl_val"); + if (src_crc != header_crc) { + return Status::Corruption(kErrorMessage, "Header CRC mismatch"); } - if (!GetFixed64(&input, &time_val_)) { - return Status::Corruption("Invalid Blob Record Header: time_val"); - } - - type_ = *(input.data()); - input.remove_prefix(1); - subtype_ = *(input.data()); - input.remove_prefix(1); + return Status::OK(); +} - if (!GetFixed32(&input, &header_cksum_)) { - return Status::Corruption("Invalid Blob Record Header: header_cksum"); +Status BlobLogRecord::CheckBlobCRC() const { + uint32_t expected_crc = 0; + expected_crc = crc32c::Value(key.data(), key.size()); + expected_crc = crc32c::Extend(expected_crc, value.data(), value.size()); + expected_crc = crc32c::Mask(expected_crc); + if (expected_crc != blob_crc) { + return Status::Corruption("Blob CRC mismatch"); } - if (!GetFixed32(&input, &checksum_)) { - return Status::Corruption("Invalid Blob Record Header: checksum"); - } - return Status::OK(); } diff --git a/utilities/blob_db/blob_log_format.h b/utilities/blob_db/blob_log_format.h index c5b96d1b0..1e056aa50 100644 --- a/utilities/blob_db/blob_log_format.h +++ b/utilities/blob_db/blob_log_format.h @@ -9,243 +9,113 @@ #ifndef ROCKSDB_LITE -#include -#include #include -#include -#include #include #include "rocksdb/options.h" +#include "rocksdb/slice.h" #include "rocksdb/status.h" #include "rocksdb/types.h" namespace rocksdb { - namespace blob_db { -class BlobFile; -class BlobDBImpl; +constexpr uint32_t kMagicNumber = 2395959; // 0x00248f37 +constexpr uint32_t kVersion1 = 1; constexpr uint64_t kNoExpiration = std::numeric_limits::max(); -enum RecordType : uint8_t { - // Zero is reserved for preallocated files - kFullType = 0, - - // For fragments - kFirstType = 1, - kMiddleType = 2, - kLastType = 3, - kMaxRecordType = kLastType -}; - -enum RecordSubType : uint8_t { - kRegularType = 0, - kTTLType = 1, - kTimestampType = 2, -}; - -extern const uint32_t kMagicNumber; - -class Reader; - -using ttlrange_t = std::pair; -using tsrange_t = std::pair; -using snrange_t = std::pair; - -class BlobLogHeader { - friend class BlobFile; - friend class BlobDBImpl; - - private: - uint32_t magic_number_ = 0; - uint32_t version_ = 1; - CompressionType compression_; - std::unique_ptr ttl_guess_; - std::unique_ptr ts_guess_; - - private: - void set_ttl_guess(const ttlrange_t& ttl) { - ttl_guess_.reset(new ttlrange_t(ttl)); - } - - void set_version(uint32_t v) { version_ = v; } - - void set_ts_guess(const tsrange_t& ts) { ts_guess_.reset(new tsrange_t(ts)); } - - public: - // magic number + version + flags + ttl guess + timestamp range = 44 - static const size_t kHeaderSize = 4 + 4 + 4 + 8 * 2 + 8 * 2; - - void EncodeTo(std::string* dst) const; - - Status DecodeFrom(const Slice& input); - - BlobLogHeader(); +using ExpirationRange = std::pair; +using SequenceRange = std::pair; - uint32_t magic_number() const { return magic_number_; } - - uint32_t version() const { return version_; } - - CompressionType compression() const { return compression_; } - - ttlrange_t ttl_range() const { - if (!ttl_guess_) { - return {0, 0}; - } - return *ttl_guess_; - } - - tsrange_t ts_range() const { - if (!ts_guess_) { - return {0, 0}; - } - return *ts_guess_; - } +// Format of blob log file header (30 bytes): +// +// +--------------+---------+---------+-------+-------------+-------------------+ +// | magic number | version | cf id | flags | compression | expiration range | +// +--------------+---------+---------+-------+-------------+-------------------+ +// | Fixed32 | Fixed32 | Fixed32 | char | char | Fixed64 Fixed64 | +// +--------------+---------+---------+-------+-------------+-------------------+ +// +// List of flags: +// has_ttl: Whether the file contain TTL data. +// +// Expiration range in the header is a rough range based on +// blob_db_options.ttl_range_secs. +struct BlobLogHeader { + static constexpr size_t kSize = 30; - bool HasTTL() const { return ttl_guess_ != nullptr; } + uint32_t version = kVersion1; + uint32_t column_family_id; + CompressionType compression; + bool has_ttl; + ExpirationRange expiration_range; - bool HasTimestamp() const { return ts_guess_ != nullptr; } + void EncodeTo(std::string* dst); - BlobLogHeader& operator=(BlobLogHeader&& in) noexcept; + Status DecodeFrom(Slice slice); }; -// Footer encapsulates the fixed information stored at the tail -// end of every blob log file. -class BlobLogFooter { - friend class BlobFile; - - public: - // Use this constructor when you plan to write out the footer using - // EncodeTo(). Never use this constructor with DecodeFrom(). - BlobLogFooter(); - - uint32_t magic_number() const { return magic_number_; } - - void EncodeTo(std::string* dst) const; - - Status DecodeFrom(const Slice& input); - - // convert this object to a human readable form - std::string ToString() const; - - // footer size = 4 byte magic number - // 8 bytes count - // 8, 8 - ttl range - // 8, 8 - sn range - // 8, 8 - ts range - // = 64 - static const size_t kFooterSize = 4 + 4 + 8 + (8 * 2) + (8 * 2) + (8 * 2); - - bool HasTTL() const { return !!ttl_range_; } - - bool HasTimestamp() const { return !!ts_range_; } - - uint64_t GetBlobCount() const { return blob_count_; } - - ttlrange_t GetTTLRange() const { - if (ttl_range_) { - *ttl_range_; - } - return {0, 0}; - } - - tsrange_t GetTimeRange() const { - if (ts_range_) { - return *ts_range_; - } - return {0, 0}; - } - - const snrange_t& GetSNRange() const { return sn_range_; } +// Format of blob log file footer (48 bytes): +// +// +--------------+------------+-------------------+-------------------+------------+ +// | magic number | blob count | expiration range | sequence range | footer CRC | +// +--------------+------------+-------------------+-------------------+------------+ +// | Fixed32 | Fixed64 | Fixed64 + Fixed64 | Fixed64 + Fixed64 | Fixed32 | +// +--------------+------------+-------------------+-------------------+------------+ +// +// The footer will be presented only when the blob file is properly closed. +// +// Unlike the same field in file header, expiration range in the footer is the +// range of smallest and largest expiration of the data in this file. +struct BlobLogFooter { + static constexpr size_t kSize = 48; - private: - uint32_t magic_number_ = 0; - uint64_t blob_count_ = 0; + uint64_t blob_count; + ExpirationRange expiration_range; + SequenceRange sequence_range; + uint32_t crc; - std::unique_ptr ttl_range_; - std::unique_ptr ts_range_; - snrange_t sn_range_; + void EncodeTo(std::string* dst); - private: - void set_ttl_range(const ttlrange_t& ttl) { - ttl_range_.reset(new ttlrange_t(ttl)); - } - void set_time_range(const tsrange_t& ts) { - ts_range_.reset(new tsrange_t(ts)); - } + Status DecodeFrom(Slice slice); }; -extern const size_t kBlockSize; - -class BlobLogRecord { - friend class Reader; - - private: - // this might not be set. - uint32_t checksum_; - uint32_t header_cksum_; - uint32_t key_size_; - uint64_t blob_size_; - uint64_t time_val_; - uint64_t ttl_val_; - char type_; - char subtype_; - Slice key_; - Slice blob_; - std::string key_buffer_; - std::string blob_buffer_; - - private: - void Clear(); - - char* GetKeyBuffer() { return &(key_buffer_[0]); } - - char* GetBlobBuffer() { return &(blob_buffer_[0]); } - - void ResizeKeyBuffer(size_t kbs); - - void ResizeBlobBuffer(size_t bbs); - - public: - // Header is - // Key Length ( 4 bytes ), - // Blob Length ( 8 bytes), - // ttl (8 bytes), timestamp (8 bytes), - // type (1 byte), subtype (1 byte) - // header checksum (4 bytes), blob checksum (4 bytes), - // = 42 - static const size_t kHeaderSize = 4 + 4 + 8 + 8 + 4 + 8 + 1 + 1; - - public: - BlobLogRecord(); - - ~BlobLogRecord(); - - const Slice& Key() const { return key_; } - - const Slice& Blob() const { return blob_; } - - uint32_t GetKeySize() const { return key_size_; } - - uint64_t GetBlobSize() const { return blob_size_; } - - bool HasTTL() const { - return ttl_val_ != std::numeric_limits::max(); - } - - uint64_t GetTTL() const { return ttl_val_; } - - uint64_t GetTimeVal() const { return time_val_; } - - char type() const { return type_; } - - char subtype() const { return subtype_; } - - uint32_t header_checksum() const { return header_cksum_; } - - uint32_t checksum() const { return checksum_; } - - Status DecodeHeaderFrom(const Slice& hdrslice); +// Blob record format (32 bytes header + key + value): +// +// +------------+--------------+------------+------------+----------+---------+-----------+ +// | key length | value length | expiration | header CRC | blob CRC | key | value | +// +------------+--------------+------------+------------+----------+---------+-----------+ +// | Fixed64 | Fixed64 | Fixed64 | Fixed32 | Fixed32 | key len | value len | +// +------------+--------------+------------+------------+----------+---------+-----------+ +// +// If file has has_ttl = false, expiration field is always 0, and the blob +// doesn't has expiration. +// +// Also note that if compression is used, value is compressed value and value +// length is compressed value length. +// +// Header CRC is the checksum of (key_len + val_len + expiration), while +// blob CRC is the checksum of (key + value). +// +// We could use variable length encoding (Varint64) to save more space, but it +// make reader more complicated. +struct BlobLogRecord { + // header include fields up to blob CRC + static constexpr size_t kHeaderSize = 32; + + uint64_t key_size; + uint64_t value_size; + uint64_t expiration; + uint32_t header_crc; + uint32_t blob_crc; + Slice key; + Slice value; + std::string key_buf; + std::string value_buf; + + void EncodeHeaderTo(std::string* dst); + + Status DecodeHeaderFrom(Slice src); + + Status CheckBlobCRC() const; }; } // namespace blob_db diff --git a/utilities/blob_db/blob_log_reader.cc b/utilities/blob_db/blob_log_reader.cc index 826551d68..a2421b930 100644 --- a/utilities/blob_db/blob_log_reader.cc +++ b/utilities/blob_db/blob_log_reader.cc @@ -7,10 +7,8 @@ #include "utilities/blob_db/blob_log_reader.h" -#include -#include "rocksdb/env.h" -#include "util/coding.h" -#include "util/crc32c.h" +#include + #include "util/file_reader_writer.h" namespace rocksdb { @@ -18,115 +16,79 @@ namespace blob_db { Reader::Reader(std::shared_ptr info_log, unique_ptr&& _file) - : info_log_(info_log), file_(std::move(_file)), buffer_(), next_byte_(0) { - backing_store_.resize(kBlockSize); + : info_log_(info_log), file_(std::move(_file)), buffer_(), next_byte_(0) {} + +Status Reader::ReadSlice(uint64_t size, Slice* slice, std::string* buf) { + buf->reserve(size); + Status s = file_->Read(size, slice, &(*buf)[0]); + next_byte_ += size; + if (!s.ok()) { + return s; + } + if (slice->size() != size) { + return Status::Corruption("EOF reached while reading record"); + } + return s; } -Reader::~Reader() {} - Status Reader::ReadHeader(BlobLogHeader* header) { assert(file_.get() != nullptr); assert(next_byte_ == 0); - Status status = - file_->Read(BlobLogHeader::kHeaderSize, &buffer_, GetReadBuffer()); - next_byte_ += buffer_.size(); - if (!status.ok()) return status; + Status s = ReadSlice(BlobLogHeader::kSize, &buffer_, &backing_store_); + if (!s.ok()) { + return s; + } - if (buffer_.size() != BlobLogHeader::kHeaderSize) { - return Status::IOError("EOF reached before file header"); + if (buffer_.size() != BlobLogHeader::kSize) { + return Status::Corruption("EOF reached before file header"); } - status = header->DecodeFrom(buffer_); - return status; + return header->DecodeFrom(buffer_); } Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level, uint64_t* blob_offset) { - record->Clear(); - buffer_.clear(); - backing_store_[0] = '\0'; - - Status status = - file_->Read(BlobLogRecord::kHeaderSize, &buffer_, GetReadBuffer()); - next_byte_ += buffer_.size(); - if (!status.ok()) return status; + Status s = ReadSlice(BlobLogRecord::kHeaderSize, &buffer_, &backing_store_); + if (!s.ok()) { + return s; + } if (buffer_.size() != BlobLogRecord::kHeaderSize) { - return Status::IOError("EOF reached before record header"); + return Status::Corruption("EOF reached before record header"); } - status = record->DecodeHeaderFrom(buffer_); - if (!status.ok()) { - return status; + s = record->DecodeHeaderFrom(buffer_); + if (!s.ok()) { + return s; } - uint32_t header_crc = 0; - uint32_t blob_crc = 0; - size_t crc_data_size = BlobLogRecord::kHeaderSize - 2 * sizeof(uint32_t); - header_crc = crc32c::Extend(header_crc, buffer_.data(), crc_data_size); - - uint64_t kb_size = record->GetKeySize() + record->GetBlobSize(); + uint64_t kb_size = record->key_size + record->value_size; if (blob_offset != nullptr) { - *blob_offset = next_byte_ + record->GetKeySize(); + *blob_offset = next_byte_ + record->key_size; } + switch (level) { case kReadHeader: - file_->Skip(kb_size); + file_->Skip(record->key_size + record->value_size); next_byte_ += kb_size; + break; case kReadHeaderKey: - record->ResizeKeyBuffer(record->GetKeySize()); - status = file_->Read(record->GetKeySize(), &record->key_, - record->GetKeyBuffer()); - next_byte_ += record->key_.size(); - if (!status.ok()) return status; - if (record->key_.size() != record->GetKeySize()) { - return Status::IOError("EOF reached before key read"); - } - - header_crc = - crc32c::Extend(header_crc, record->key_.data(), record->GetKeySize()); - header_crc = crc32c::Mask(header_crc); - if (header_crc != record->header_cksum_) { - return Status::Corruption("Record Checksum mismatch: header_cksum"); - } - - file_->Skip(record->GetBlobSize()); - next_byte_ += record->GetBlobSize(); + s = ReadSlice(record->key_size, &record->key, &record->key_buf); + file_->Skip(record->value_size); + next_byte_ += record->value_size; + break; case kReadHeaderKeyBlob: - record->ResizeKeyBuffer(record->GetKeySize()); - status = file_->Read(record->GetKeySize(), &record->key_, - record->GetKeyBuffer()); - next_byte_ += record->key_.size(); - if (!status.ok()) return status; - if (record->key_.size() != record->GetKeySize()) { - return Status::IOError("EOF reached before key read"); - } - - header_crc = - crc32c::Extend(header_crc, record->key_.data(), record->GetKeySize()); - header_crc = crc32c::Mask(header_crc); - if (header_crc != record->header_cksum_) { - return Status::Corruption("Record Checksum mismatch: header_cksum"); - } - - record->ResizeBlobBuffer(record->GetBlobSize()); - status = file_->Read(record->GetBlobSize(), &record->blob_, - record->GetBlobBuffer()); - next_byte_ += record->blob_.size(); - if (!status.ok()) return status; - if (record->blob_.size() != record->GetBlobSize()) { - return Status::IOError("EOF reached during blob read"); + s = ReadSlice(record->key_size, &record->key, &record->key_buf); + if (s.ok()) { + s = ReadSlice(record->value_size, &record->value, &record->value_buf); } - - blob_crc = - crc32c::Extend(blob_crc, record->blob_.data(), record->blob_.size()); - blob_crc = crc32c::Mask(blob_crc); - if (blob_crc != record->checksum_) { - return Status::Corruption("Blob Checksum mismatch"); + if (s.ok()) { + s = record->CheckBlobCRC(); } + break; } - return status; + return s; } } // namespace blob_db diff --git a/utilities/blob_db/blob_log_reader.h b/utilities/blob_db/blob_log_reader.h index d37e10bc4..9c76b92ae 100644 --- a/utilities/blob_db/blob_log_reader.h +++ b/utilities/blob_db/blob_log_reader.h @@ -7,11 +7,9 @@ #ifndef ROCKSDB_LITE -#include #include #include -#include "rocksdb/options.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" #include "utilities/blob_db/blob_log_format.h" @@ -51,7 +49,11 @@ class Reader { Reader(std::shared_ptr info_log, std::unique_ptr&& file); - ~Reader(); + ~Reader() = default; + + // No copying allowed + Reader(const Reader&) = delete; + Reader& operator=(const Reader&) = delete; Status ReadHeader(BlobLogHeader* header); @@ -64,6 +66,8 @@ class Reader { Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHeader, uint64_t* blob_offset = nullptr); + Status ReadSlice(uint64_t size, Slice* slice, std::string* buf); + SequentialFileReader* file() { return file_.get(); } void ResetNextByte() { next_byte_ = 0; } @@ -72,9 +76,6 @@ class Reader { const SequentialFileReader* file_reader() const { return file_.get(); } - private: - char* GetReadBuffer() { return &(backing_store_[0]); } - private: std::shared_ptr info_log_; const std::unique_ptr file_; @@ -84,10 +85,6 @@ class Reader { // which byte to read next. For asserting proper usage uint64_t next_byte_; - - // No copying allowed - Reader(const Reader&) = delete; - Reader& operator=(const Reader&) = delete; }; } // namespace blob_db diff --git a/utilities/blob_db/blob_log_writer.cc b/utilities/blob_db/blob_log_writer.cc index f92df8fae..806ca3c95 100644 --- a/utilities/blob_db/blob_log_writer.cc +++ b/utilities/blob_db/blob_log_writer.cc @@ -10,8 +10,8 @@ #include #include "rocksdb/env.h" #include "util/coding.h" -#include "util/crc32c.h" #include "util/file_reader_writer.h" +#include "utilities/blob_db/blob_log_format.h" namespace rocksdb { namespace blob_db { @@ -24,18 +24,11 @@ Writer::Writer(unique_ptr&& dest, uint64_t log_number, bytes_per_sync_(bpsync), next_sync_offset_(0), use_fsync_(use_fs), - last_elem_type_(kEtNone) { - for (int i = 0; i <= kMaxRecordType; i++) { - char t = static_cast(i); - type_crc_[i] = crc32c::Value(&t, 1); - } -} - -Writer::~Writer() {} + last_elem_type_(kEtNone) {} void Writer::Sync() { dest_->Sync(use_fsync_); } -Status Writer::WriteHeader(const BlobLogHeader& header) { +Status Writer::WriteHeader(BlobLogHeader& header) { assert(block_offset_ == 0); assert(last_elem_type_ == kEtNone); std::string str; @@ -50,7 +43,7 @@ Status Writer::WriteHeader(const BlobLogHeader& header) { return s; } -Status Writer::AppendFooter(const BlobLogFooter& footer) { +Status Writer::AppendFooter(BlobLogFooter& footer) { assert(block_offset_ != 0); assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); @@ -69,13 +62,13 @@ Status Writer::AppendFooter(const BlobLogFooter& footer) { } Status Writer::AddRecord(const Slice& key, const Slice& val, - uint64_t* key_offset, uint64_t* blob_offset, - uint64_t ttl) { + uint64_t expiration, uint64_t* key_offset, + uint64_t* blob_offset) { assert(block_offset_ != 0); assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); std::string buf; - ConstructBlobHeader(&buf, key, val, ttl, -1); + ConstructBlobHeader(&buf, key, val, expiration); Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset); return s; @@ -87,44 +80,19 @@ Status Writer::AddRecord(const Slice& key, const Slice& val, assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); std::string buf; - ConstructBlobHeader(&buf, key, val, 0, -1); + ConstructBlobHeader(&buf, key, val, 0); Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset); return s; } -void Writer::ConstructBlobHeader(std::string* headerbuf, const Slice& key, - const Slice& val, uint64_t ttl, int64_t ts) { - headerbuf->reserve(BlobLogRecord::kHeaderSize); - - uint32_t key_size = static_cast(key.size()); - PutFixed32(headerbuf, key_size); - PutFixed64(headerbuf, val.size()); - - PutFixed64(headerbuf, ttl); - PutFixed64(headerbuf, ts); - - RecordType t = kFullType; - headerbuf->push_back(static_cast(t)); - - RecordSubType st = kRegularType; - if (ttl != kNoExpiration) { - st = kTTLType; - } - headerbuf->push_back(static_cast(st)); - - uint32_t header_crc = 0; - header_crc = - crc32c::Extend(header_crc, headerbuf->c_str(), headerbuf->size()); - header_crc = crc32c::Extend(header_crc, key.data(), key.size()); - header_crc = crc32c::Mask(header_crc); - PutFixed32(headerbuf, header_crc); - - uint32_t crc = 0; - // Compute the crc of the record type and the payload. - crc = crc32c::Extend(crc, val.data(), val.size()); - crc = crc32c::Mask(crc); // Adjust for storage - PutFixed32(headerbuf, crc); +void Writer::ConstructBlobHeader(std::string* buf, const Slice& key, + const Slice& val, uint64_t expiration) { + BlobLogRecord record; + record.key = key; + record.value = val; + record.expiration = expiration; + record.EncodeHeaderTo(buf); } Status Writer::EmitPhysicalRecord(const std::string& headerbuf, diff --git a/utilities/blob_db/blob_log_writer.h b/utilities/blob_db/blob_log_writer.h index d67435158..2a1f05e1b 100644 --- a/utilities/blob_db/blob_log_writer.h +++ b/utilities/blob_db/blob_log_writer.h @@ -37,24 +37,29 @@ class Writer { explicit Writer(std::unique_ptr&& dest, uint64_t log_number, uint64_t bpsync, bool use_fsync, uint64_t boffset = 0); - ~Writer(); - static void ConstructBlobHeader(std::string* headerbuf, const Slice& key, - const Slice& val, uint64_t ttl, int64_t ts); + ~Writer() = default; + + // No copying allowed + Writer(const Writer&) = delete; + Writer& operator=(const Writer&) = delete; + + static void ConstructBlobHeader(std::string* buf, const Slice& key, + const Slice& val, uint64_t expiration); Status AddRecord(const Slice& key, const Slice& val, uint64_t* key_offset, uint64_t* blob_offset); - Status AddRecord(const Slice& key, const Slice& val, uint64_t* key_offset, - uint64_t* blob_offset, uint64_t ttl); + Status AddRecord(const Slice& key, const Slice& val, uint64_t expiration, + uint64_t* key_offset, uint64_t* blob_offset); Status EmitPhysicalRecord(const std::string& headerbuf, const Slice& key, const Slice& val, uint64_t* key_offset, uint64_t* blob_offset); - Status AppendFooter(const BlobLogFooter& footer); + Status AppendFooter(BlobLogFooter& footer); - Status WriteHeader(const BlobLogHeader& header); + Status WriteHeader(BlobLogHeader& header); WritableFileWriter* file() { return dest_.get(); } @@ -76,15 +81,6 @@ class Writer { uint64_t next_sync_offset_; bool use_fsync_; - // crc32c values for all supported record types. These are - // pre-computed to reduce the overhead of computing the crc of the - // record type stored in the header. - uint32_t type_crc_[kMaxRecordType + 1]; - - // No copying allowed - Writer(const Writer&) = delete; - Writer& operator=(const Writer&) = delete; - public: enum ElemType { kEtNone, kEtFileHdr, kEtRecord, kEtFileFooter }; ElemType last_elem_type_;