diff --git a/db/db_impl.cc b/db/db_impl.cc index 4440ed74d..900e6b053 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1682,12 +1682,6 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { delete casted_s; } -bool DBImpl::HasActiveSnapshotInRange(SequenceNumber lower_bound, - SequenceNumber upper_bound) { - InstrumentedMutexLock l(&mutex_); - return snapshots_.HasSnapshotInRange(lower_bound, upper_bound); -} - #ifndef ROCKSDB_LITE Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, TablePropertiesCollection* props) { diff --git a/db/db_impl.h b/db/db_impl.h index 3db65048b..140720946 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -229,10 +229,6 @@ class DBImpl : public DB { virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override; - // Whether there is an active snapshot in range [lower_bound, upper_bound). - bool HasActiveSnapshotInRange(SequenceNumber lower_bound, - SequenceNumber upper_bound); - #ifndef ROCKSDB_LITE using DB::ResetStats; virtual Status ResetStats() override; diff --git a/db/db_write_test.cc b/db/db_write_test.cc index d21bfe473..1a27f470e 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -39,52 +39,6 @@ TEST_P(DBWriteTest, SyncAndDisableWAL) { ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument()); } -// Sequence number should be return through input write batch. -TEST_P(DBWriteTest, ReturnSeuqneceNumber) { - Random rnd(4422); - Open(); - for (int i = 0; i < 100; i++) { - WriteBatch batch; - batch.Put("key" + ToString(i), test::RandomHumanReadableString(&rnd, 10)); - ASSERT_OK(dbfull()->Write(WriteOptions(), &batch)); - ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), - WriteBatchInternal::Sequence(&batch)); - } -} - -TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) { - constexpr size_t kThreads = 16; - constexpr size_t kNumKeys = 1000; - Open(); - ASSERT_EQ(0, dbfull()->GetLatestSequenceNumber()); - // Check each sequence is used once and only once. - std::vector flags(kNumKeys * kThreads + 1); - for (size_t i = 0; i < flags.size(); i++) { - flags[i].clear(); - } - auto writer = [&](size_t id) { - Random rnd(4422 + static_cast(id)); - for (size_t k = 0; k < kNumKeys; k++) { - WriteBatch batch; - batch.Put("key" + ToString(id) + "-" + ToString(k), - test::RandomHumanReadableString(&rnd, 10)); - ASSERT_OK(dbfull()->Write(WriteOptions(), &batch)); - SequenceNumber sequence = WriteBatchInternal::Sequence(&batch); - ASSERT_GT(sequence, 0); - ASSERT_LE(sequence, kNumKeys * kThreads); - // The sequence isn't consumed by someone else. - ASSERT_FALSE(flags[sequence].test_and_set()); - } - }; - std::vector threads; - for (size_t i = 0; i < kThreads; i++) { - threads.emplace_back(writer, i); - } - for (size_t i = 0; i < kThreads; i++) { - threads[i].join(); - } -} - TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { constexpr int kNumThreads = 5; std::unique_ptr mock_env( diff --git a/db/snapshot_impl.h b/db/snapshot_impl.h index 7dc405931..ad9c1a9fb 100644 --- a/db/snapshot_impl.h +++ b/db/snapshot_impl.h @@ -108,22 +108,6 @@ class SnapshotList { return ret; } - // Whether there is an active snapshot in range [lower_bound, upper_bound). - bool HasSnapshotInRange(SequenceNumber lower_bound, - SequenceNumber upper_bound) { - if (empty()) { - return false; - } - const SnapshotImpl* s = &list_; - while (s->next_ != &list_) { - if (s->next_->number_ >= lower_bound) { - return s->next_->number_ < upper_bound; - } - s = s->next_; - } - return false; - } - // get the sequence number of the most recent snapshot SequenceNumber GetNewest() { if (empty()) { diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index f78210bb9..a8e24863e 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -571,18 +571,14 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler { const WriteOptions& options_; BlobDBImpl* blob_db_impl_; uint32_t default_cf_id_; - SequenceNumber sequence_; WriteBatch batch_; public: BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl, - uint32_t default_cf_id, SequenceNumber seq) + uint32_t default_cf_id) : options_(options), blob_db_impl_(blob_db_impl), - default_cf_id_(default_cf_id), - sequence_(seq) {} - - SequenceNumber sequence() { return sequence_; } + default_cf_id_(default_cf_id) {} WriteBatch* batch() { return &batch_; } @@ -597,8 +593,7 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler { uint64_t expiration = blob_db_impl_->ExtractExpiration(key, value, &value_slice, &new_value); Status s = blob_db_impl_->PutBlobValue(options_, key, value_slice, - expiration, sequence_, &batch_); - sequence_++; + expiration, &batch_); return s; } @@ -609,7 +604,6 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler { "Blob DB doesn't support non-default column family."); } Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key); - sequence_++; return s; } @@ -621,7 +615,6 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler { } Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id, begin_key, end_key); - sequence_++; return s; } @@ -643,12 +636,8 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) { RecordTick(statistics_, BLOB_DB_NUM_WRITE); uint32_t default_cf_id = reinterpret_cast(DefaultColumnFamily())->GetID(); - // TODO(yiwu): In case there are multiple writers the latest sequence would - // not be the actually sequence we are writting. Need to get the sequence - // from write batch after DB write instead. - SequenceNumber current_seq = GetLatestSequenceNumber() + 1; Status s; - BlobInserter blob_inserter(options, this, default_cf_id, current_seq); + BlobInserter blob_inserter(options, this, default_cf_id); { // Release write_mutex_ before DB write to avoid race condition with // flush begin listener, which also require write_mutex_ to sync @@ -693,6 +682,8 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) { if (bdb_options_.enable_garbage_collection) { // add deleted key to list of keys that have been deleted for book-keeping + SequenceNumber current_seq = + WriteBatchInternal::Sequence(blob_inserter.batch()); DeleteBookkeeper delete_bookkeeper(this, current_seq); s = updates->Iterate(&delete_bookkeeper); } @@ -761,11 +752,7 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, // flush begin listener, which also require write_mutex_ to sync // blob files. MutexLock l(&write_mutex_); - // TODO(yiwu): In case there are multiple writers the latest sequence would - // not be the actually sequence we are writting. Need to get the sequence - // from write batch after DB write instead. - SequenceNumber sequence = GetLatestSequenceNumber() + 1; - s = PutBlobValue(options, key, value, expiration, sequence, &batch); + s = PutBlobValue(options, key, value, expiration, &batch); } if (s.ok()) { s = db_->Write(options, &batch); @@ -776,7 +763,7 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t expiration, - SequenceNumber sequence, WriteBatch* batch) { + WriteBatch* batch) { Status s; std::string index_entry; uint32_t column_family_id = @@ -817,7 +804,6 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key, } if (s.ok()) { - bfile->ExtendSequenceRange(sequence); if (expiration != kNoExpiration) { bfile->ExtendExpirationRange(expiration); } @@ -898,7 +884,7 @@ bool BlobDBImpl::EvictOldestBlobFile() { total_blob_space_.load(), bdb_options_.blob_dir_size, oldest_file->BlobFileNumber(), expiration_range.first, expiration_range.second); - oldest_file->MarkObsolete(oldest_file->GetSequenceRange().second); + oldest_file->MarkObsolete(GetLatestSequenceNumber()); obsolete_files_.push_back(oldest_file); oldest_file_evicted_.store(true); RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED); @@ -1271,9 +1257,26 @@ Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr& bfile) { bool BlobDBImpl::VisibleToActiveSnapshot( const std::shared_ptr& bfile) { assert(bfile->Obsolete()); - SequenceNumber first_sequence = bfile->GetSequenceRange().first; + + // We check whether the oldest snapshot is no less than the last sequence + // by the time the blob file become obsolete. If so, the blob file is not + // visible to all existing snapshots. + // + // If we keep track of the earliest sequence of the keys in the blob file, + // we could instead check if there's a snapshot falls in range + // [earliest_sequence, obsolete_sequence). But doing so will make the + // implementation more complicated. SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence(); - return db_impl_->HasActiveSnapshotInRange(first_sequence, obsolete_sequence); + SequenceNumber oldest_snapshot = 0; + { + // Need to lock DBImpl mutex before access snapshot list. + InstrumentedMutexLock l(db_impl_->mutex()); + auto snapshots = db_impl_->snapshots(); + if (!snapshots.empty()) { + oldest_snapshot = snapshots.oldest()->GetSequenceNumber(); + } + } + return oldest_snapshot < obsolete_sequence; } bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size, @@ -1757,8 +1760,6 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, &rewrite_batch, &callback); } if (rewrite_status.ok()) { - newfile->ExtendSequenceRange( - WriteBatchInternal::Sequence(&rewrite_batch)); gc_stats->num_keys_relocated++; gc_stats->bytes_relocated += record.record_size(); } else if (rewrite_status.IsBusy()) { @@ -1775,10 +1776,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, } // end of ReadRecord loop if (s.ok()) { - SequenceNumber obsolete_sequence = - newfile == nullptr ? bfptr->GetSequenceRange().second + 1 - : newfile->GetSequenceRange().second; - bfptr->MarkObsolete(obsolete_sequence); + bfptr->MarkObsolete(GetLatestSequenceNumber()); if (!first_gc) { WriteLock wl(&mutex_); obsolete_files_.push_back(bfptr); diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 3e361cd18..f259038d5 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -284,7 +284,7 @@ class BlobDBImpl : public BlobDB { Status PutBlobValue(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t expiration, - SequenceNumber sequence, WriteBatch* batch); + WriteBatch* batch); Status AppendBlob(const std::shared_ptr& bfile, const std::string& headerbuf, const Slice& key, diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 73ec6ca47..c0c0fc490 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -901,20 +901,18 @@ TEST_F(BlobDBTest, SnapshotAndGarbageCollection) { ASSERT_EQ(1, gc_stats.blob_count); if (delete_key) { ASSERT_EQ(0, gc_stats.num_keys_relocated); - ASSERT_EQ(bfile->GetSequenceRange().second + 1, - bfile->GetObsoleteSequence()); } else { ASSERT_EQ(1, gc_stats.num_keys_relocated); - ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), - bfile->GetObsoleteSequence()); } + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), + bfile->GetObsoleteSequence()); if (i == 3) { snapshot = blob_db_->GetSnapshot(); } size_t num_files = delete_key ? 3 : 4; ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size()); blob_db_impl()->TEST_DeleteObsoleteFiles(); - if (i == 0 || i == 3 || (i == 2 && delete_key)) { + if (i == 3) { // The snapshot shouldn't see data in bfile ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size()); blob_db_->ReleaseSnapshot(snapshot); @@ -1111,10 +1109,6 @@ TEST_F(BlobDBTest, InlineSmallValues) { Open(bdb_options, options); std::map data; std::map versions; - SequenceNumber first_non_ttl_seq = kMaxSequenceNumber; - SequenceNumber first_ttl_seq = kMaxSequenceNumber; - SequenceNumber last_non_ttl_seq = 0; - SequenceNumber last_ttl_seq = 0; for (size_t i = 0; i < 1000; i++) { bool is_small_value = rnd.Next() % 2; bool has_ttl = rnd.Next() % 2; @@ -1134,15 +1128,6 @@ TEST_F(BlobDBTest, InlineSmallValues) { versions[key] = KeyVersion(key, value, sequence, (is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex); - if (!is_small_value) { - if (!has_ttl) { - first_non_ttl_seq = std::min(first_non_ttl_seq, sequence); - last_non_ttl_seq = std::max(last_non_ttl_seq, sequence); - } else { - first_ttl_seq = std::min(first_ttl_seq, sequence); - last_ttl_seq = std::max(last_ttl_seq, sequence); - } - } } VerifyDB(data); VerifyBaseDB(versions); @@ -1159,11 +1144,7 @@ TEST_F(BlobDBTest, InlineSmallValues) { ttl_file = blob_files[1]; } ASSERT_FALSE(non_ttl_file->HasTTL()); - ASSERT_EQ(first_non_ttl_seq, non_ttl_file->GetSequenceRange().first); - ASSERT_EQ(last_non_ttl_seq, non_ttl_file->GetSequenceRange().second); ASSERT_TRUE(ttl_file->HasTTL()); - ASSERT_EQ(first_ttl_seq, ttl_file->GetSequenceRange().first); - ASSERT_EQ(last_ttl_seq, ttl_file->GetSequenceRange().second); } TEST_F(BlobDBTest, CompactionFilterNotSupported) { diff --git a/utilities/blob_db/blob_dump_tool.cc b/utilities/blob_db/blob_dump_tool.cc index b7ae8162d..7d3895bcc 100644 --- a/utilities/blob_db/blob_dump_tool.cc +++ b/utilities/blob_db/blob_dump_tool.cc @@ -142,8 +142,6 @@ Status BlobDumpTool::DumpBlobLogFooter(uint64_t file_size, fprintf(stdout, " Blob count : %" PRIu64 "\n", footer.blob_count); fprintf(stdout, " Expiration Range : %s\n", GetString(footer.expiration_range).c_str()); - fprintf(stdout, " Sequence Range : %s\n", - GetString(footer.sequence_range).c_str()); return s; } diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index 18c17516c..324a9521d 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -43,7 +43,6 @@ BlobFile::BlobFile() obsolete_(false), gc_once_after_open_(false), expiration_range_({0, 0}), - sequence_range_({kMaxSequenceNumber, 0}), last_access_(-1), last_fsync_(0), header_valid_(false), @@ -67,7 +66,6 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, obsolete_(false), gc_once_after_open_(false), expiration_range_({0, 0}), - sequence_range_({kMaxSequenceNumber, 0}), last_access_(-1), last_fsync_(0), header_valid_(false), @@ -116,12 +114,11 @@ std::string BlobFile::DumpState() const { " file_size: %" PRIu64 " deleted_count: %" PRIu64 " deleted_size: %" PRIu64 " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64 - ") sequence_range: (%" PRIu64 " %" PRIu64 "), writer: %d reader: %d", + "), writer: %d reader: %d", path_to_dir_.c_str(), file_number_, blob_count_.load(), gc_epoch_.load(), file_size_.load(), deleted_count_, deleted_size_, closed_.load(), obsolete_.load(), expiration_range_.first, - expiration_range_.second, sequence_range_.first, - sequence_range_.second, (!!log_writer_), (!!ra_file_reader_)); + expiration_range_.second, (!!log_writer_), (!!ra_file_reader_)); return str; } @@ -144,8 +141,6 @@ Status BlobFile::WriteFooterAndCloseLocked() { footer.expiration_range = expiration_range_; } - footer.sequence_range = sequence_range_; - // this will close the file and reset the Writable File Pointer. Status s = log_writer_->AppendFooter(footer); if (s.ok()) { @@ -185,7 +180,6 @@ Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) { last_fsync_.store(file_size_); blob_count_ = footer.blob_count; expiration_range_ = footer.expiration_range; - sequence_range_ = footer.sequence_range; closed_ = true; return Status::OK(); } diff --git a/utilities/blob_db/blob_file.h b/utilities/blob_db/blob_file.h index cc5a8c3e3..0dac911c0 100644 --- a/utilities/blob_db/blob_file.h +++ b/utilities/blob_db/blob_file.h @@ -84,8 +84,6 @@ class BlobFile { ExpirationRange expiration_range_; - SequenceRange sequence_range_; - // Sequential/Append writer for blobs std::shared_ptr log_writer_; @@ -177,17 +175,6 @@ class BlobFile { expiration_range_.second = std::max(expiration_range_.second, expiration); } - SequenceRange GetSequenceRange() const { return sequence_range_; } - - void SetSequenceRange(SequenceRange sequence_range) { - sequence_range_ = sequence_range; - } - - void ExtendSequenceRange(SequenceNumber sequence) { - sequence_range_.first = std::min(sequence_range_.first, sequence); - sequence_range_.second = std::max(sequence_range_.second, sequence); - } - bool HasTTL() const { return has_ttl_; } void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; } diff --git a/utilities/blob_db/blob_log_format.cc b/utilities/blob_db/blob_log_format.cc index eb748ac99..2bf702848 100644 --- a/utilities/blob_db/blob_log_format.cc +++ b/utilities/blob_db/blob_log_format.cc @@ -67,8 +67,6 @@ void BlobLogFooter::EncodeTo(std::string* dst) { PutFixed64(dst, blob_count); PutFixed64(dst, expiration_range.first); PutFixed64(dst, expiration_range.second); - PutFixed64(dst, sequence_range.first); - PutFixed64(dst, sequence_range.second); crc = crc32c::Value(dst->c_str(), dst->size()); crc = crc32c::Mask(crc); PutFixed32(dst, crc); @@ -82,14 +80,12 @@ Status BlobLogFooter::DecodeFrom(Slice src) { "Unexpected blob file footer size"); } uint32_t src_crc = 0; - src_crc = crc32c::Value(src.data(), BlobLogFooter::kSize - 4); + src_crc = crc32c::Value(src.data(), BlobLogFooter::kSize - sizeof(uint32_t)); src_crc = crc32c::Mask(src_crc); uint32_t magic_number; if (!GetFixed32(&src, &magic_number) || !GetFixed64(&src, &blob_count) || !GetFixed64(&src, &expiration_range.first) || - !GetFixed64(&src, &expiration_range.second) || - !GetFixed64(&src, &sequence_range.first) || - !GetFixed64(&src, &sequence_range.second) || !GetFixed32(&src, &crc)) { + !GetFixed64(&src, &expiration_range.second) || !GetFixed32(&src, &crc)) { return Status::Corruption(kErrorMessage, "Error decoding content"); } if (magic_number != kMagicNumber) { diff --git a/utilities/blob_db/blob_log_format.h b/utilities/blob_db/blob_log_format.h index a770aa765..3e1b686aa 100644 --- a/utilities/blob_db/blob_log_format.h +++ b/utilities/blob_db/blob_log_format.h @@ -24,7 +24,6 @@ constexpr uint32_t kVersion1 = 1; constexpr uint64_t kNoExpiration = std::numeric_limits::max(); using ExpirationRange = std::pair; -using SequenceRange = std::pair; // Format of blob log file header (30 bytes): // @@ -53,24 +52,23 @@ struct BlobLogHeader { Status DecodeFrom(Slice slice); }; -// Format of blob log file footer (48 bytes): +// Format of blob log file footer (32 bytes): // -// +--------------+------------+-------------------+-------------------+------------+ -// | magic number | blob count | expiration range | sequence range | footer CRC | -// +--------------+------------+-------------------+-------------------+------------+ -// | Fixed32 | Fixed64 | Fixed64 + Fixed64 | Fixed64 + Fixed64 | Fixed32 | -// +--------------+------------+-------------------+-------------------+------------+ +// +--------------+------------+-------------------+------------+ +// | magic number | blob count | expiration range | footer CRC | +// +--------------+------------+-------------------+------------+ +// | Fixed32 | Fixed64 | Fixed64 + Fixed64 | Fixed32 | +// +--------------+------------+-------------------+------------+ // // The footer will be presented only when the blob file is properly closed. // // Unlike the same field in file header, expiration range in the footer is the // range of smallest and largest expiration of the data in this file. struct BlobLogFooter { - static constexpr size_t kSize = 48; + static constexpr size_t kSize = 32; uint64_t blob_count = 0; ExpirationRange expiration_range = std::make_pair(0, 0); - SequenceRange sequence_range = std::make_pair(0, 0); uint32_t crc = 0; void EncodeTo(std::string* dst);