diff --git a/db/db_impl.cc b/db/db_impl.cc index 8c8ababb6..a061da835 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2556,7 +2556,8 @@ SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv, #ifndef ROCKSDB_LITE Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, bool cache_only, SequenceNumber* seq, - bool* found_record_for_key) { + bool* found_record_for_key, + bool* is_blob_index) { Status s; MergeContext merge_context; RangeDelAggregator range_del_agg(sv->mem->GetInternalKeyComparator(), @@ -2571,7 +2572,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, // Check if there is a record for this key in the latest memtable sv->mem->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq, - read_options); + read_options, nullptr /*read_callback*/, is_blob_index); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading memtable. @@ -2590,7 +2591,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, // Check if there is a record for this key in the immutable memtables sv->imm->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq, - read_options); + read_options, nullptr /*read_callback*/, is_blob_index); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading memtable. @@ -2609,7 +2610,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, // Check if there is a record for this key in the immutable memtables sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context, &range_del_agg, - seq, read_options); + seq, read_options, is_blob_index); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading memtable. @@ -2633,7 +2634,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, // Check tables sv->current->Get(read_options, lkey, nullptr, &s, &merge_context, &range_del_agg, nullptr /* value_found */, - found_record_for_key, seq); + found_record_for_key, seq, nullptr /*read_callback*/, + is_blob_index); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { // unexpected error reading SST files diff --git a/db/db_impl.h b/db/db_impl.h index 30866fba1..99abe2d34 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -96,6 +96,14 @@ class DBImpl : public DB { virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; + + // Function that Get and KeyMayExist call with no_io true or false + // Note: 'value_found' from KeyMayExist propagates here + Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value, + bool* value_found = nullptr, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr); + using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, @@ -295,7 +303,8 @@ class DBImpl : public DB { // TODO(andrewkr): this API need to be aware of range deletion operations Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, bool cache_only, SequenceNumber* seq, - bool* found_record_for_key); + bool* found_record_for_key, + bool* is_blob_index = nullptr); using DB::IngestExternalFile; virtual Status IngestExternalFile( @@ -1272,13 +1281,6 @@ class DBImpl : public DB { #endif // ROCKSDB_LITE - // Function that Get and KeyMayExist call with no_io true or false - // Note: 'value_found' from KeyMayExist propagates here - Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, PinnableSlice* value, - bool* value_found = nullptr, ReadCallback* callback = nullptr, - bool* is_blob_index = nullptr); - bool GetIntPropertyInternal(ColumnFamilyData* cfd, const DBPropertyInfo& property_info, bool is_locked, uint64_t* value); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 93bc19281..d8eb1fefc 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -109,14 +109,13 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value, seq, read_opts, callback, is_blob_index); } -bool MemTableListVersion::GetFromHistory(const LookupKey& key, - std::string* value, Status* s, - MergeContext* merge_context, - RangeDelAggregator* range_del_agg, - SequenceNumber* seq, - const ReadOptions& read_opts) { +bool MemTableListVersion::GetFromHistory( + const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, RangeDelAggregator* range_del_agg, + SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) { return GetFromList(&memlist_history_, key, value, s, merge_context, - range_del_agg, seq, read_opts); + range_del_agg, seq, read_opts, nullptr /*read_callback*/, + is_blob_index); } bool MemTableListVersion::GetFromList( diff --git a/db/memtable_list.h b/db/memtable_list.h index 52833a245..ca0fd9d77 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -73,14 +73,16 @@ class MemTableListVersion { bool GetFromHistory(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, SequenceNumber* seq, - const ReadOptions& read_opts); + const ReadOptions& read_opts, + bool* is_blob_index = nullptr); bool GetFromHistory(const LookupKey& key, std::string* value, Status* s, MergeContext* merge_context, RangeDelAggregator* range_del_agg, - const ReadOptions& read_opts) { + const ReadOptions& read_opts, + bool* is_blob_index = nullptr) { SequenceNumber seq; return GetFromHistory(key, value, s, merge_context, range_del_agg, &seq, - read_opts); + read_opts, is_blob_index); } Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena, diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index f6c6dc6e2..aefa2598b 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -32,8 +32,7 @@ #include "util/random.h" #include "util/sync_point.h" #include "util/timer_queue.h" -#include "utilities/transactions/optimistic_transaction.h" -#include "utilities/transactions/optimistic_transaction_db_impl.h" +#include "utilities/blob_db/blob_db_iterator.h" namespace { int kBlockBasedTableVersionFormat = 2; @@ -78,7 +77,7 @@ class BlobHandle { void EncodeTo(std::string* dst) const; - Status DecodeFrom(Slice* input); + Status DecodeFrom(const Slice& input); void clear(); @@ -109,10 +108,12 @@ void BlobHandle::clear() { compression_ = kNoCompression; } -Status BlobHandle::DecodeFrom(Slice* input) { - if (GetVarint64(input, &file_number_) && GetVarint64(input, &offset_) && - GetVarint64(input, &size_)) { - compression_ = static_cast(input->data()[0]); +Status BlobHandle::DecodeFrom(const Slice& input) { + Slice s(input); + Slice* p = &s; + if (GetVarint64(p, &file_number_) && GetVarint64(p, &offset_) && + GetVarint64(p, &size_)) { + compression_ = static_cast(p->data()[0]); return Status::OK(); } else { clear(); @@ -149,8 +150,7 @@ void EvictAllVersionsCompactionListener::InternalListener::OnCompaction( value_type == CompactionEventListener::CompactionListenerValueType::kValue) { BlobHandle handle; - Slice lsmval(existing_value); - Status s = handle.DecodeFrom(&lsmval); + Status s = handle.DecodeFrom(existing_value); if (s.ok()) { if (impl_->debug_level_ >= 3) ROCKS_LOG_INFO(impl_->db_options_.info_log, @@ -211,8 +211,6 @@ Status BlobDBImpl::LinkToBaseDB(DB* db) { env_ = db_->GetEnv(); - opt_db_.reset(new OptimisticTransactionDBImpl(db, false)); - Status s = env_->CreateDirIfMissing(blob_dir_); if (!s.ok()) { ROCKS_LOG_WARN(db_options_.info_log, @@ -237,7 +235,6 @@ BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; } BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options) : BlobDB(db), db_impl_(static_cast_with_check(db)), - opt_db_(new OptimisticTransactionDBImpl(db, false)), wo_set_(false), bdb_options_(blob_db_options), db_options_(db->GetOptions()), @@ -827,8 +824,8 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { extendTTL(&(bfile->ttl_range_), expiration); } - return WriteBatchInternal::Put(&updates_blob_, column_family_id, key, - index_entry); + return WriteBatchInternal::PutBlobIndex(&updates_blob_, column_family_id, + key, index_entry); } virtual Status DeleteCF(uint32_t column_family_id, @@ -997,18 +994,6 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, std::string headerbuf; Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1); - // this is another more safer way to do it, where you keep the writeLock - // for the entire write path. this will increase latency and reduce - // throughput - // WriteLock lockbfile_w(&bfile->mutex_); - // std::shared_ptr writer = - // CheckOrCreateWriterLocked(bfile); - - if (debug_level_ >= 3) - ROCKS_LOG_DEBUG( - db_options_.info_log, ">Adding KEY FILE: %s: KEY: %s VALSZ: %d", - bfile->PathName().c_str(), key.ToString().c_str(), value.size()); - std::string index_entry; Status s = AppendBlob(bfile, headerbuf, key, value, &index_entry); if (!s.ok()) { @@ -1022,20 +1007,25 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, } WriteBatch batch; - batch.Put(key, index_entry); + uint32_t column_family_id = + reinterpret_cast(DefaultColumnFamily())->GetID(); + s = WriteBatchInternal::PutBlobIndex(&batch, column_family_id, key, + index_entry); // this goes to the base db and can be expensive - s = db_->Write(options, &batch); - - // this is the sequence number of the write. - SequenceNumber sn = WriteBatchInternal::Sequence(&batch); - bfile->ExtendSequenceRange(sn); - - if (expiration != kNoExpiration) { - extendTTL(&(bfile->ttl_range_), expiration); + if (s.ok()) { + s = db_->Write(options, &batch); } if (s.ok()) { + // this is the sequence number of the write. + SequenceNumber sn = WriteBatchInternal::Sequence(&batch); + bfile->ExtendSequenceRange(sn); + + if (expiration != kNoExpiration) { + extendTTL(&(bfile->ttl_range_), expiration); + } + s = CloseBlobFileIfNeeded(bfile); } @@ -1112,21 +1102,16 @@ std::vector BlobDBImpl::MultiGet( // fetch and index entry and reading from the file. ReadOptions ro(read_options); bool snapshot_created = SetSnapshotIfNeeded(&ro); - std::vector values_lsm; - values_lsm.resize(keys.size()); - auto statuses = db_->MultiGet(ro, keys, &values_lsm); - TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:1"); - TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:2"); - - values->resize(keys.size()); - assert(statuses.size() == keys.size()); - assert(values_lsm.size() == keys.size()); - for (size_t i = 0; i < keys.size(); ++i) { - if (!statuses[i].ok()) { - continue; - } - Status s = CommonGet(keys[i], values_lsm[i], &((*values)[i])); - statuses[i] = s; + + std::vector statuses; + statuses.reserve(keys.size()); + values->clear(); + values->reserve(keys.size()); + PinnableSlice value; + for (size_t i = 0; i < keys.size(); i++) { + statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value)); + values->push_back(value.ToString()); + value.Reset(); } if (snapshot_created) { db_->ReleaseSnapshot(ro.snapshot); @@ -1143,12 +1128,11 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) { return true; } -Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry, - std::string* value) { +Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value) { assert(value != nullptr); - Slice index_entry_slice(index_entry); BlobHandle handle; - Status s = handle.DecodeFrom(&index_entry_slice); + Status s = handle.DecodeFrom(index_entry); if (!s.ok()) return s; // offset has to have certain min, as we will read CRC @@ -1179,9 +1163,8 @@ Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry, bfile = hitr->second; } - // 0 - size - if (!handle.size() && value != nullptr) { - value->clear(); + if (handle.size() == 0 && value != nullptr) { + value->PinSelf(""); return Status::OK(); } @@ -1189,7 +1172,7 @@ Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry, std::shared_ptr reader = GetOrOpenRandomAccessReader(bfile, env_, env_options_); - std::string* valueptr = value; + std::string* valueptr = value->GetSelf(); std::string value_c; if (bdb_options_.compression != kNoCompression) { valueptr = &value_c; @@ -1251,9 +1234,11 @@ Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry, blob_value.data(), blob_value.size(), &contents, kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression, *(cfh->cfd()->ioptions())); - *value = contents.data.ToString(); + *(value->GetSelf()) = contents.data.ToString(); } + value->PinSelf(); + return s; } @@ -1271,13 +1256,16 @@ Status BlobDBImpl::Get(const ReadOptions& read_options, bool snapshot_created = SetSnapshotIfNeeded(&ro); Status s; - std::string index_entry; - s = db_->Get(ro, key, &index_entry); + bool is_blob_index = false; + s = db_impl_->GetImpl(ro, column_family, key, value, nullptr /*value_found*/, + nullptr /*read_callback*/, &is_blob_index); TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1"); TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2"); if (s.ok()) { - s = CommonGet(key, index_entry, value->GetSelf()); - value->PinSelf(); + if (is_blob_index) { + PinnableSlice index_entry = std::move(*value); + s = GetBlobValue(key, index_entry, value); + } } if (snapshot_created) { db_->ReleaseSnapshot(ro.snapshot); @@ -1285,15 +1273,6 @@ Status BlobDBImpl::Get(const ReadOptions& read_options, return s; } -Slice BlobDBIterator::value() const { - TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:1"); - TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:2"); - Slice index_entry = iter_->value(); - Status s = - db_impl_->CommonGet(iter_->key(), index_entry.ToString(false), &vpart_); - return Slice(vpart_); -} - std::pair BlobDBImpl::SanityCheck(bool aborted) { if (aborted) return std::make_pair(false, -1); @@ -1411,14 +1390,13 @@ bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size, return true; } -bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& lsmValue) { - Slice val(lsmValue); +bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) { BlobHandle handle; - Status s = handle.DecodeFrom(&val); + Status s = handle.DecodeFrom(index_entry); if (!s.ok()) { ROCKS_LOG_INFO(db_options_.info_log, "Could not parse lsm val in MarkBlobDeleted %s", - lsmValue.ToString().c_str()); + index_entry.ToString().c_str()); return false; } bool succ = FindFileAndEvictABlob(handle.filenumber(), key.size(), @@ -1618,7 +1596,52 @@ std::pair BlobDBImpl::WaStats(bool aborted) { return std::make_pair(true, -1); } -//////////////////////////////////////////////////////////////////////////////// +// Write callback for garbage collection to check if key has been updated +// since last read. Similar to how OptimisticTransaction works. See inline +// comment in GCFileAndUpdateLSM(). +class BlobDBImpl::GarbageCollectionWriteCallback : public WriteCallback { + public: + GarbageCollectionWriteCallback(ColumnFamilyData* cfd, const Slice& key, + SequenceNumber upper_bound) + : cfd_(cfd), key_(key), upper_bound_(upper_bound) {} + + virtual Status Callback(DB* db) override { + auto* db_impl = reinterpret_cast(db); + auto* sv = db_impl->GetAndRefSuperVersion(cfd_); + SequenceNumber latest_seq = 0; + bool found_record_for_key = false; + bool is_blob_index = false; + Status s = db_impl->GetLatestSequenceForKey( + sv, key_, false /*cache_only*/, &latest_seq, &found_record_for_key, + &is_blob_index); + db_impl->ReturnAndCleanupSuperVersion(cfd_, sv); + if (!s.ok() && !s.IsNotFound()) { + // Error. + assert(!s.IsBusy()); + return s; + } + if (s.IsNotFound()) { + assert(!found_record_for_key); + return Status::Busy("Key deleted"); + } + assert(found_record_for_key); + assert(is_blob_index); + if (latest_seq > upper_bound_) { + return Status::Busy("Key overwritten"); + } + return s; + } + + virtual bool AllowWriteBatching() override { return false; } + + private: + ColumnFamilyData* cfd_; + // Key to check + Slice key_; + // Upper bound of sequence number to proceed. + SequenceNumber upper_bound_; +}; + // iterate over the blobs sequentially and check if the blob sequence number // is the latest. If it is the latest, preserve it, otherwise delete it // if it is TTL based, and the TTL has expired, then @@ -1631,7 +1654,6 @@ std::pair BlobDBImpl::WaStats(bool aborted) { // // if it is not TTL based, then we can blow the key if the key has been // DELETED in the LSM -//////////////////////////////////////////////////////////////////////////////// Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, GCStats* gc_stats) { uint64_t now = EpochNow(); @@ -1656,14 +1678,14 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, bool first_gc = bfptr->gc_once_after_open_; - ColumnFamilyHandle* cfh = bfptr->GetColumnFamily(db_); + auto* cfh = bfptr->GetColumnFamily(db_); + auto* cfd = reinterpret_cast(cfh)->cfd(); + auto column_family_id = cfd->GetID(); bool has_ttl = header.HasTTL(); // this reads the key but skips the blob Reader::ReadLevel shallow = Reader::kReadHeaderKey; - assert(opt_db_); - bool no_relocation_ttl = (has_ttl && now >= bfptr->GetTTLRange().second); bool no_relocation_lsmdel = false; @@ -1683,59 +1705,52 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, BlobLogRecord record; std::shared_ptr newfile; std::shared_ptr new_writer; - Transaction* transaction = nullptr; uint64_t blob_offset = 0; - bool retry = false; - - static const WriteOptions kGarbageCollectionWriteOptions = []() { - WriteOptions write_options; - // It is ok to ignore column families that were dropped. - write_options.ignore_missing_column_families = true; - return write_options; - }(); while (true) { assert(s.ok()); - if (retry) { - // Retry in case transaction fail with Status::TryAgain. - retry = false; - } else { - // Read the next blob record. - Status read_record_status = - reader->ReadRecord(&record, shallow, &blob_offset); - // Exit if we reach the end of blob file. - // TODO(yiwu): properly handle ReadRecord error. - if (!read_record_status.ok()) { - break; - } - gc_stats->blob_count++; - } - transaction = - opt_db_->BeginTransaction(kGarbageCollectionWriteOptions, - OptimisticTransactionOptions(), transaction); - - std::string index_entry; - Status get_status = transaction->GetForUpdate(ReadOptions(), cfh, - record.Key(), &index_entry); - TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate"); - if (get_status.IsNotFound()) { - // Key has been deleted. Drop the blob record. - continue; + // Read the next blob record. + Status read_record_status = + reader->ReadRecord(&record, shallow, &blob_offset); + // Exit if we reach the end of blob file. + // TODO(yiwu): properly handle ReadRecord error. + if (!read_record_status.ok()) { + break; } - if (!get_status.ok()) { + gc_stats->blob_count++; + + // Similar to OptimisticTransaction, we obtain latest_seq from + // base DB, which is guaranteed to be no smaller than the sequence of + // current key. We use a WriteCallback on write to check the key sequence + // on write. If the key sequence is larger than latest_seq, we know + // a new versions is inserted and the old blob can be disgard. + // + // We cannot use OptimisticTransaction because we need to pass + // is_blob_index flag to GetImpl. + SequenceNumber latest_seq = GetLatestSequenceNumber(); + bool is_blob_index = false; + PinnableSlice index_entry; + Status get_status = db_impl_->GetImpl( + ReadOptions(), cfh, record.Key(), &index_entry, nullptr /*value_found*/, + nullptr /*read_callback*/, &is_blob_index); + TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB"); + if (!get_status.ok() && !get_status.ok()) { + // error s = get_status; ROCKS_LOG_ERROR(db_options_.info_log, "Error while getting index entry: %s", s.ToString().c_str()); break; } + if (get_status.IsNotFound() || !is_blob_index) { + // Either the key is deleted or updated with a newer version whish is + // inlined in LSM. + continue; + } - // TODO(yiwu): We should have an override of GetForUpdate returning a - // PinnableSlice. - Slice index_entry_slice(index_entry); BlobHandle handle; - s = handle.DecodeFrom(&index_entry_slice); + s = handle.DecodeFrom(index_entry); if (!s.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, "Error while decoding index entry: %s", @@ -1748,21 +1763,24 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, continue; } + GarbageCollectionWriteCallback callback(cfd, record.Key(), latest_seq); + // If key has expired, remove it from base DB. if (no_relocation_ttl || (has_ttl && now >= record.GetTTL())) { gc_stats->num_deletes++; gc_stats->deleted_size += record.GetBlobSize(); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"); - transaction->Delete(cfh, record.Key()); - Status delete_status = transaction->Commit(); + WriteBatch delete_batch; + Status delete_status = delete_batch.Delete(record.Key()); + if (delete_status.ok()) { + delete_status = db_impl_->WriteWithCallback(WriteOptions(), + &delete_batch, &callback); + } if (delete_status.ok()) { gc_stats->delete_succeeded++; } else if (delete_status.IsBusy()) { // The key is overwritten in the meanwhile. Drop the blob record. gc_stats->overwritten_while_delete++; - } else if (delete_status.IsTryAgain()) { - // Retry the transaction. - retry = true; } else { // We hit an error. s = delete_status; @@ -1829,29 +1847,27 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, BlobLogRecord::kHeaderSize + record.Key().size() + record.Blob().size(); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"); - transaction->Put(cfh, record.Key(), new_index_entry); - Status put_status = transaction->Commit(); - if (put_status.ok()) { + WriteBatch rewrite_batch; + Status rewrite_status = WriteBatchInternal::PutBlobIndex( + &rewrite_batch, column_family_id, record.Key(), new_index_entry); + if (rewrite_status.ok()) { + rewrite_status = db_impl_->WriteWithCallback(WriteOptions(), + &rewrite_batch, &callback); + } + if (rewrite_status.ok()) { gc_stats->relocate_succeeded++; - } else if (put_status.IsBusy()) { + } else if (rewrite_status.IsBusy()) { // The key is overwritten in the meanwhile. Drop the blob record. gc_stats->overwritten_while_relocate++; - } else if (put_status.IsTryAgain()) { - // Retry the transaction. - // TODO(yiwu): On retry, we can reuse the new blob record. - retry = true; } else { // We hit an error. - s = put_status; + s = rewrite_status; ROCKS_LOG_ERROR(db_options_.info_log, "Error while relocating key: %s", s.ToString().c_str()); break; } } // end of ReadRecord loop - if (transaction != nullptr) { - delete transaction; - } ROCKS_LOG_INFO( db_options_.info_log, "%s blob file %" PRIu64 @@ -2195,12 +2211,20 @@ std::pair BlobDBImpl::RunGC(bool aborted) { } Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) { + auto* cfd = + reinterpret_cast(DefaultColumnFamily())->cfd(); // Get a snapshot to avoid blob file get deleted between we // fetch and index entry and reading from the file. - ReadOptions ro(read_options); - bool snapshot_created = SetSnapshotIfNeeded(&ro); - return new BlobDBIterator(db_->NewIterator(ro), this, snapshot_created, - ro.snapshot); + ManagedSnapshot* own_snapshot = nullptr; + const Snapshot* snapshot = read_options.snapshot; + if (snapshot == nullptr) { + own_snapshot = new ManagedSnapshot(db_); + snapshot = own_snapshot->snapshot(); + } + auto* iter = db_impl_->NewIteratorImpl( + read_options, cfd, snapshot->GetSequenceNumber(), + nullptr /*read_callback*/, true /*allow_blob*/); + return new BlobDBIterator(own_snapshot, iter, this); } Status DestroyBlobDB(const std::string& dbname, const Options& options, diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 5654d05e5..6496c585d 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -18,6 +18,7 @@ #include #include +#include "db/db_iter.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" #include "rocksdb/listener.h" @@ -37,7 +38,6 @@ namespace rocksdb { class DBImpl; class ColumnFamilyHandle; class ColumnFamilyData; -class OptimisticTransactionDBImpl; struct FlushJobInfo; namespace blob_db { @@ -215,9 +215,20 @@ class BlobDBImpl : public BlobDB { Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; + Status GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value); + using BlobDB::NewIterator; virtual Iterator* NewIterator(const ReadOptions& read_options) override; + using BlobDB::NewIterators; + virtual Status NewIterators( + const ReadOptions& read_options, + const std::vector& column_families, + std::vector* iterators) override { + return Status::NotSupported("Not implemented"); + } + using BlobDB::MultiGet; virtual std::vector MultiGet( const ReadOptions& read_options, @@ -269,15 +280,14 @@ class BlobDBImpl : public BlobDB { #endif // !NDEBUG private: + class GarbageCollectionWriteCallback; + Status OpenPhase1(); // Create a snapshot if there isn't one in read options. // Return true if a snapshot is created. bool SetSnapshotIfNeeded(ReadOptions* read_options); - Status CommonGet(const Slice& key, const std::string& index_entry, - std::string* value); - Slice GetCompressedSlice(const Slice& raw, std::string* compression_output) const; @@ -416,10 +426,6 @@ class BlobDBImpl : public BlobDB { Env* env_; TTLExtractor* ttl_extractor_; - // Optimistic Transaction DB used during Garbage collection - // for atomicity - std::unique_ptr opt_db_; - // a boolean to capture whether write_options has been set std::atomic wo_set_; WriteOptions write_options_; @@ -527,55 +533,6 @@ class BlobDBImpl : public BlobDB { uint32_t debug_level_; }; -class BlobDBIterator : public Iterator { - public: - explicit BlobDBIterator(Iterator* iter, BlobDBImpl* impl, bool own_snapshot, - const Snapshot* snapshot) - : iter_(iter), - db_impl_(impl), - own_snapshot_(own_snapshot), - snapshot_(snapshot) { - assert(iter != nullptr); - assert(snapshot != nullptr); - } - - ~BlobDBIterator() { - if (own_snapshot_) { - db_impl_->ReleaseSnapshot(snapshot_); - } - delete iter_; - } - - bool Valid() const override { return iter_->Valid(); } - - void SeekToFirst() override { iter_->SeekToFirst(); } - - void SeekToLast() override { iter_->SeekToLast(); } - - void Seek(const Slice& target) override { iter_->Seek(target); } - - void SeekForPrev(const Slice& target) override { iter_->SeekForPrev(target); } - - void Next() override { iter_->Next(); } - - void Prev() override { iter_->Prev(); } - - Slice key() const override { return iter_->key(); } - - Slice value() const override; - - Status status() const override { return iter_->status(); } - - // Iterator::Refresh() not supported. - - private: - Iterator* iter_; - BlobDBImpl* db_impl_; - bool own_snapshot_; - const Snapshot* snapshot_; - mutable std::string vpart_; -}; - } // namespace blob_db } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/blob_db/blob_db_iterator.h b/utilities/blob_db/blob_db_iterator.h new file mode 100644 index 000000000..c8aa1ff17 --- /dev/null +++ b/utilities/blob_db/blob_db_iterator.h @@ -0,0 +1,104 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include "rocksdb/iterator.h" +#include "utilities/blob_db/blob_db_impl.h" + +namespace rocksdb { +namespace blob_db { + +using rocksdb::ManagedSnapshot; + +class BlobDBIterator : public Iterator { + public: + BlobDBIterator(ManagedSnapshot* snapshot, ArenaWrappedDBIter* iter, + BlobDBImpl* blob_db) + : snapshot_(snapshot), iter_(iter), blob_db_(blob_db) {} + + virtual ~BlobDBIterator() = default; + + bool Valid() const override { + if (!iter_->Valid()) { + return false; + } + return status_.ok(); + } + + Status status() const override { + if (!iter_->status().ok()) { + return iter_->status(); + } + return status_; + } + + void SeekToFirst() override { + iter_->SeekToFirst(); + UpdateBlobValue(); + } + + void SeekToLast() override { + iter_->SeekToLast(); + UpdateBlobValue(); + } + + void Seek(const Slice& target) override { + iter_->Seek(target); + UpdateBlobValue(); + } + + void SeekForPrev(const Slice& target) override { + iter_->SeekForPrev(target); + UpdateBlobValue(); + } + + void Next() override { + assert(Valid()); + iter_->Next(); + UpdateBlobValue(); + } + + void Prev() override { + assert(Valid()); + iter_->Prev(); + UpdateBlobValue(); + } + + Slice key() const override { + assert(Valid()); + return iter_->key(); + } + + Slice value() const override { + assert(Valid()); + if (!iter_->IsBlob()) { + return iter_->value(); + } + return value_; + } + + // Iterator::Refresh() not supported. + + private: + void UpdateBlobValue() { + TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:1"); + TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:2"); + value_.Reset(); + if (iter_->Valid() && iter_->IsBlob()) { + status_ = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_); + } + } + + std::unique_ptr snapshot_; + std::unique_ptr iter_; + BlobDBImpl* blob_db_; + Status status_; + PinnableSlice value_; +}; +} // namespace blob_db +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index a31e90e0c..535b86a6e 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -88,9 +88,14 @@ class BlobDBTest : public testing::Test { void PutRandom(const std::string &key, Random *rnd, std::map *data = nullptr) { + PutRandom(blob_db_, key, rnd, data); + } + + void PutRandom(DB *db, const std::string &key, Random *rnd, + std::map *data = nullptr) { int len = rnd->Next() % kMaxBlobSize + 1; std::string value = test::RandomHumanReadableString(rnd, len); - ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value))); + ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value))); if (data != nullptr) { (*data)[key] = value; } @@ -116,9 +121,12 @@ class BlobDBTest : public testing::Test { } // Verify blob db contain expected data and nothing more. - // TODO(yiwu): Verify blob files are consistent with data in LSM. void VerifyDB(const std::map &data) { - Iterator *iter = blob_db_->NewIterator(ReadOptions()); + VerifyDB(blob_db_, data); + } + + void VerifyDB(DB *db, const std::map &data) { + Iterator *iter = db->NewIterator(ReadOptions()); iter->SeekToFirst(); for (auto &p : data) { ASSERT_TRUE(iter->Valid()); @@ -593,7 +601,7 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) { ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0])); SyncPoint::GetInstance()->LoadDependency( - {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate", + {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB", "BlobDBImpl::PutUntil:Start"}, {"BlobDBImpl::PutUntil:Finish", "BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}}); @@ -630,7 +638,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { mock_env_->set_now_micros(300 * 1000000); SyncPoint::GetInstance()->LoadDependency( - {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate", + {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB", "BlobDBImpl::PutUntil:Start"}, {"BlobDBImpl::PutUntil:Finish", "BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}}); @@ -687,7 +695,7 @@ TEST_F(BlobDBTest, GCOldestSimpleBlobFileWhenOutOfSpace) { TEST_F(BlobDBTest, ReadWhileGC) { // run the same test for Get(), MultiGet() and Iterator each. - for (int i = 0; i < 3; i++) { + for (int i = 0; i < 2; i++) { BlobDBOptions bdb_options; bdb_options.disable_background_tasks = true; Open(bdb_options); @@ -710,17 +718,10 @@ TEST_F(BlobDBTest, ReadWhileGC) { break; case 1: SyncPoint::GetInstance()->LoadDependency( - {{"BlobDBImpl::MultiGet:AfterIndexEntryGet:1", + {{"BlobDBIterator::UpdateBlobValue:Start:1", "BlobDBTest::ReadWhileGC:1"}, {"BlobDBTest::ReadWhileGC:2", - "BlobDBImpl::MultiGet:AfterIndexEntryGet:2"}}); - break; - case 2: - SyncPoint::GetInstance()->LoadDependency( - {{"BlobDBIterator::value:BeforeGetBlob:1", - "BlobDBTest::ReadWhileGC:1"}, - {"BlobDBTest::ReadWhileGC:2", - "BlobDBIterator::value:BeforeGetBlob:2"}}); + "BlobDBIterator::UpdateBlobValue:Start:2"}}); break; } SyncPoint::GetInstance()->EnableProcessing(); @@ -735,12 +736,6 @@ TEST_F(BlobDBTest, ReadWhileGC) { ASSERT_EQ("bar", value); break; case 1: - statuses = blob_db_->MultiGet(ReadOptions(), {"foo"}, &values); - ASSERT_EQ(1, statuses.size()); - ASSERT_EQ(1, values.size()); - ASSERT_EQ("bar", values[0]); - break; - case 2: // VerifyDB use iterator to scan the DB. VerifyDB({{"foo", "bar"}}); break; @@ -834,6 +829,58 @@ TEST_F(BlobDBTest, GetLiveFilesMetaData) { VerifyDB(data); } +TEST_F(BlobDBTest, MigrateFromPlainRocksDB) { + constexpr size_t kNumKey = 20; + constexpr size_t kNumIteration = 10; + Random rnd(301); + std::map data; + std::vector is_blob(kNumKey, false); + + // Write to plain rocksdb. + Options options; + options.create_if_missing = true; + DB *db = nullptr; + ASSERT_OK(DB::Open(options, dbname_, &db)); + for (size_t i = 0; i < kNumIteration; i++) { + auto key_index = rnd.Next() % kNumKey; + std::string key = "key" + ToString(key_index); + PutRandom(db, key, &rnd, &data); + } + VerifyDB(db, data); + delete db; + db = nullptr; + + // Open as blob db. Verify it can read existing data. + Open(); + VerifyDB(blob_db_, data); + for (size_t i = 0; i < kNumIteration; i++) { + auto key_index = rnd.Next() % kNumKey; + std::string key = "key" + ToString(key_index); + is_blob[key_index] = true; + PutRandom(blob_db_, key, &rnd, &data); + } + VerifyDB(blob_db_, data); + delete blob_db_; + blob_db_ = nullptr; + + // Verify plain db return error for keys written by blob db. + ASSERT_OK(DB::Open(options, dbname_, &db)); + std::string value; + for (size_t i = 0; i < kNumKey; i++) { + std::string key = "key" + ToString(i); + Status s = db->Get(ReadOptions(), key, &value); + if (data.count(key) == 0) { + ASSERT_TRUE(s.IsNotFound()); + } else if (is_blob[i]) { + ASSERT_TRUE(s.IsNotSupported()); + } else { + ASSERT_OK(s); + ASSERT_EQ(data[key], value); + } + } + delete db; +} + } // namespace blob_db } // namespace rocksdb