Aggregate blob file related changes in VersionBuilder as VersionEdits are applied (#9085)

Summary:
The current VersionBuilder code on mainline keeps track of blob file related
changes ("delta") induced by a series of `VersionEdit`s in the form of
`BlobFileMetaDataDelta` objects. Specifically, `BlobFileMetaDataDelta`
contains the amount of additional garbage generated by compactions, as well
as the set of newly linked/unlinked SSTs. This is very handy for detecting trivial moves,
since in that case the newly linked and unlinked SSTs cancel each other out.
However, this representation does not allow us to easily tell whether a certain
blob file is obsolete after applying a set of `VersionEdit`s or not. In order to
solve this issue, the patch introduces `MutableBlobFileMetaData`, which, in addition
to the delta, also contains the materialized state after applying a set of version edits
(i.e. the total amount of garbage and the resulting set of linked SSTs). This will
enable us to add further consistency checks and to improve certain pieces of
functionality where knowing up front which blob files get obsoleted is beneficial.
(Note: this patch is just the refactoring part; I plan to create separate PRs for
the enhancements.)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9085

Test Plan: Ran `make check` and the stress tests in BlobDB mode.

Reviewed By: riversand963

Differential Revision: D31980867

Pulled By: ltamasi

fbshipit-source-id: cc4286778b10900af720423d6b772c77f28a93e3
main
Levi Tamasi 3 years ago committed by Facebook GitHub Bot
parent fdf2a0d7eb
commit 44d04582cb
  1. 297
      db/version_builder.cc

@ -86,16 +86,14 @@ class VersionBuilder::Rep {
std::unordered_map<uint64_t, FileMetaData*> added_files;
};
// A class that represents the accumulated changes (like additional garbage or
// newly linked/unlinked SST files) for a given blob file after applying a
// series of VersionEdits.
class BlobFileMetaDataDelta {
public:
bool IsEmpty() const {
return !shared_meta_ && !additional_garbage_count_ &&
!additional_garbage_bytes_ && newly_linked_ssts_.empty() &&
newly_unlinked_ssts_.empty();
}
std::shared_ptr<SharedBlobFileMetaData> GetSharedMeta() const {
return shared_meta_;
return !additional_garbage_count_ && !additional_garbage_bytes_ &&
newly_linked_ssts_.empty() && newly_unlinked_ssts_.empty();
}
uint64_t GetAdditionalGarbageCount() const {
@ -114,13 +112,6 @@ class VersionBuilder::Rep {
return newly_unlinked_ssts_;
}
void SetSharedMeta(std::shared_ptr<SharedBlobFileMetaData> shared_meta) {
assert(!shared_meta_);
assert(shared_meta);
shared_meta_ = std::move(shared_meta);
}
void AddGarbage(uint64_t count, uint64_t bytes) {
additional_garbage_count_ += count;
additional_garbage_bytes_ += bytes;
@ -159,13 +150,77 @@ class VersionBuilder::Rep {
}
private:
std::shared_ptr<SharedBlobFileMetaData> shared_meta_;
uint64_t additional_garbage_count_ = 0;
uint64_t additional_garbage_bytes_ = 0;
std::unordered_set<uint64_t> newly_linked_ssts_;
std::unordered_set<uint64_t> newly_unlinked_ssts_;
};
// A class that represents the state of a blob file after applying a series of
// VersionEdits. In addition to the resulting state, it also contains the
// delta (see BlobFileMetaDataDelta above). The resulting state can be used to
// identify obsolete blob files, while the delta makes it possible to
// efficiently detect trivial moves.
class MutableBlobFileMetaData {
public:
// To be used for brand new blob files
explicit MutableBlobFileMetaData(
std::shared_ptr<SharedBlobFileMetaData>&& shared_meta)
: shared_meta_(std::move(shared_meta)) {}
// To be used for pre-existing blob files
explicit MutableBlobFileMetaData(
const std::shared_ptr<BlobFileMetaData>& meta)
: shared_meta_(meta->GetSharedMeta()),
linked_ssts_(meta->GetLinkedSsts()),
garbage_blob_count_(meta->GetGarbageBlobCount()),
garbage_blob_bytes_(meta->GetGarbageBlobBytes()) {}
const std::shared_ptr<SharedBlobFileMetaData>& GetSharedMeta() const {
return shared_meta_;
}
bool HasDelta() const { return !delta_.IsEmpty(); }
const std::unordered_set<uint64_t>& GetLinkedSsts() const {
return linked_ssts_;
}
uint64_t GetGarbageBlobCount() const { return garbage_blob_count_; }
uint64_t GetGarbageBlobBytes() const { return garbage_blob_bytes_; }
void AddGarbage(uint64_t count, uint64_t bytes) {
delta_.AddGarbage(count, bytes);
garbage_blob_count_ += count;
garbage_blob_bytes_ += bytes;
}
void LinkSst(uint64_t sst_file_number) {
delta_.LinkSst(sst_file_number);
assert(linked_ssts_.find(sst_file_number) == linked_ssts_.end());
linked_ssts_.emplace(sst_file_number);
}
void UnlinkSst(uint64_t sst_file_number) {
delta_.UnlinkSst(sst_file_number);
assert(linked_ssts_.find(sst_file_number) != linked_ssts_.end());
linked_ssts_.erase(sst_file_number);
}
private:
std::shared_ptr<SharedBlobFileMetaData> shared_meta_;
// Accumulated changes
BlobFileMetaDataDelta delta_;
// Resulting state after applying the changes
BlobFileMetaData::LinkedSsts linked_ssts_;
uint64_t garbage_blob_count_ = 0;
uint64_t garbage_blob_bytes_ = 0;
};
const FileOptions& file_options_;
const ImmutableCFOptions* const ioptions_;
TableCache* table_cache_;
@ -186,8 +241,9 @@ class VersionBuilder::Rep {
FileComparator level_zero_cmp_;
FileComparator level_nonzero_cmp_;
// Metadata delta for all blob files affected by the series of version edits.
std::map<uint64_t, BlobFileMetaDataDelta> blob_file_meta_deltas_;
// Mutable metadata objects for all blob files affected by the series of
// version edits.
std::map<uint64_t, MutableBlobFileMetaData> mutable_blob_file_metas_;
public:
Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions,
@ -232,29 +288,6 @@ class VersionBuilder::Rep {
}
}
bool IsBlobFileInVersion(uint64_t blob_file_number) const {
auto delta_it = blob_file_meta_deltas_.find(blob_file_number);
if (delta_it != blob_file_meta_deltas_.end()) {
if (delta_it->second.GetSharedMeta()) {
return true;
}
}
assert(base_vstorage_);
const auto& base_blob_files = base_vstorage_->GetBlobFiles();
auto base_it = base_blob_files.find(blob_file_number);
if (base_it != base_blob_files.end()) {
assert(base_it->second);
assert(base_it->second->GetSharedMeta());
return true;
}
return false;
}
using ExpectedLinkedSsts =
std::unordered_map<uint64_t, BlobFileMetaData::LinkedSsts>;
@ -426,6 +459,49 @@ class VersionBuilder::Rep {
return true;
}
bool IsBlobFileInVersion(uint64_t blob_file_number) const {
auto mutable_it = mutable_blob_file_metas_.find(blob_file_number);
if (mutable_it != mutable_blob_file_metas_.end()) {
return true;
}
assert(base_vstorage_);
const auto& base_blob_files = base_vstorage_->GetBlobFiles();
auto base_it = base_blob_files.find(blob_file_number);
if (base_it != base_blob_files.end()) {
return true;
}
return false;
}
MutableBlobFileMetaData* GetOrCreateMutableBlobFileMetaData(
uint64_t blob_file_number) {
auto mutable_it = mutable_blob_file_metas_.find(blob_file_number);
if (mutable_it != mutable_blob_file_metas_.end()) {
return &mutable_it->second;
}
assert(base_vstorage_);
const auto& base_blob_files = base_vstorage_->GetBlobFiles();
auto base_it = base_blob_files.find(blob_file_number);
if (base_it != base_blob_files.end()) {
assert(base_it->second);
mutable_it = mutable_blob_file_metas_
.emplace(blob_file_number,
MutableBlobFileMetaData(base_it->second))
.first;
return &mutable_it->second;
}
return nullptr;
}
Status ApplyBlobFileAddition(const BlobFileAddition& blob_file_addition) {
const uint64_t blob_file_number = blob_file_addition.GetBlobFileNumber();
@ -460,8 +536,8 @@ class VersionBuilder::Rep {
blob_file_addition.GetChecksumMethod(),
blob_file_addition.GetChecksumValue(), deleter);
blob_file_meta_deltas_[blob_file_number].SetSharedMeta(
std::move(shared_meta));
mutable_blob_file_metas_.emplace(
blob_file_number, MutableBlobFileMetaData(std::move(shared_meta)));
return Status::OK();
}
@ -469,15 +545,17 @@ class VersionBuilder::Rep {
Status ApplyBlobFileGarbage(const BlobFileGarbage& blob_file_garbage) {
const uint64_t blob_file_number = blob_file_garbage.GetBlobFileNumber();
if (!IsBlobFileInVersion(blob_file_number)) {
MutableBlobFileMetaData* const mutable_meta =
GetOrCreateMutableBlobFileMetaData(blob_file_number);
if (!mutable_meta) {
std::ostringstream oss;
oss << "Blob file #" << blob_file_number << " not found";
return Status::Corruption("VersionBuilder", oss.str());
}
blob_file_meta_deltas_[blob_file_number].AddGarbage(
blob_file_garbage.GetGarbageBlobCount(),
mutable_meta->AddGarbage(blob_file_garbage.GetGarbageBlobCount(),
blob_file_garbage.GetGarbageBlobBytes());
return Status::OK();
@ -573,9 +651,12 @@ class VersionBuilder::Rep {
const uint64_t blob_file_number =
GetOldestBlobFileNumberForTableFile(level, file_number);
if (blob_file_number != kInvalidBlobFileNumber &&
IsBlobFileInVersion(blob_file_number)) {
blob_file_meta_deltas_[blob_file_number].UnlinkSst(file_number);
if (blob_file_number != kInvalidBlobFileNumber) {
MutableBlobFileMetaData* const mutable_meta =
GetOrCreateMutableBlobFileMetaData(blob_file_number);
if (mutable_meta) {
mutable_meta->UnlinkSst(file_number);
}
}
auto& level_state = levels_[level];
@ -640,9 +721,12 @@ class VersionBuilder::Rep {
const uint64_t blob_file_number = f->oldest_blob_file_number;
if (blob_file_number != kInvalidBlobFileNumber &&
IsBlobFileInVersion(blob_file_number)) {
blob_file_meta_deltas_[blob_file_number].LinkSst(file_number);
if (blob_file_number != kInvalidBlobFileNumber) {
MutableBlobFileMetaData* const mutable_meta =
GetOrCreateMutableBlobFileMetaData(blob_file_number);
if (mutable_meta) {
mutable_meta->LinkSst(file_number);
}
}
table_file_levels_[file_number] = level;
@ -704,65 +788,11 @@ class VersionBuilder::Rep {
return Status::OK();
}
static BlobFileMetaData::LinkedSsts ApplyLinkedSstChanges(
const BlobFileMetaData::LinkedSsts& base,
const std::unordered_set<uint64_t>& newly_linked,
const std::unordered_set<uint64_t>& newly_unlinked) {
BlobFileMetaData::LinkedSsts result(base);
for (uint64_t sst_file_number : newly_unlinked) {
assert(result.find(sst_file_number) != result.end());
result.erase(sst_file_number);
}
for (uint64_t sst_file_number : newly_linked) {
assert(result.find(sst_file_number) == result.end());
result.emplace(sst_file_number);
}
return result;
}
static std::shared_ptr<BlobFileMetaData> CreateMetaDataForNewBlobFile(
const BlobFileMetaDataDelta& delta) {
auto shared_meta = delta.GetSharedMeta();
assert(shared_meta);
assert(delta.GetNewlyUnlinkedSsts().empty());
auto meta = BlobFileMetaData::Create(
std::move(shared_meta), delta.GetNewlyLinkedSsts(),
delta.GetAdditionalGarbageCount(), delta.GetAdditionalGarbageBytes());
return meta;
}
static std::shared_ptr<BlobFileMetaData>
GetOrCreateMetaDataForExistingBlobFile(
const std::shared_ptr<BlobFileMetaData>& base_meta,
const BlobFileMetaDataDelta& delta) {
assert(base_meta);
assert(!delta.GetSharedMeta());
if (delta.IsEmpty()) {
return base_meta;
}
auto shared_meta = base_meta->GetSharedMeta();
assert(shared_meta);
auto linked_ssts = ApplyLinkedSstChanges(base_meta->GetLinkedSsts(),
delta.GetNewlyLinkedSsts(),
delta.GetNewlyUnlinkedSsts());
auto meta = BlobFileMetaData::Create(
std::move(shared_meta), std::move(linked_ssts),
base_meta->GetGarbageBlobCount() + delta.GetAdditionalGarbageCount(),
base_meta->GetGarbageBlobBytes() + delta.GetAdditionalGarbageBytes());
return meta;
static std::shared_ptr<BlobFileMetaData> CreateBlobFileMetaData(
const MutableBlobFileMetaData& mutable_meta) {
return BlobFileMetaData::Create(
mutable_meta.GetSharedMeta(), mutable_meta.GetLinkedSsts(),
mutable_meta.GetGarbageBlobCount(), mutable_meta.GetGarbageBlobBytes());
}
// Add the blob file specified by meta to *vstorage if it is determined to
@ -799,39 +829,52 @@ class VersionBuilder::Rep {
auto base_it = base_blob_files.begin();
const auto base_it_end = base_blob_files.end();
auto delta_it = blob_file_meta_deltas_.begin();
const auto delta_it_end = blob_file_meta_deltas_.end();
auto mutable_it = mutable_blob_file_metas_.begin();
const auto mutable_it_end = mutable_blob_file_metas_.end();
while (base_it != base_it_end && delta_it != delta_it_end) {
while (base_it != base_it_end && mutable_it != mutable_it_end) {
const uint64_t base_blob_file_number = base_it->first;
const uint64_t delta_blob_file_number = delta_it->first;
const uint64_t mutable_blob_file_number = mutable_it->first;
if (base_blob_file_number < delta_blob_file_number) {
if (base_blob_file_number < mutable_blob_file_number) {
const auto& base_meta = base_it->second;
AddBlobFileIfNeeded(vstorage, base_meta, &found_first_non_empty);
++base_it;
} else if (delta_blob_file_number < base_blob_file_number) {
const auto& delta = delta_it->second;
} else if (mutable_blob_file_number < base_blob_file_number) {
const auto& mutable_meta = mutable_it->second;
auto meta = CreateMetaDataForNewBlobFile(delta);
auto meta = CreateBlobFileMetaData(mutable_meta);
AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
++delta_it;
++mutable_it;
} else {
assert(base_blob_file_number == delta_blob_file_number);
assert(base_blob_file_number == mutable_blob_file_number);
const auto& base_meta = base_it->second;
const auto& delta = delta_it->second;
const auto& mutable_meta = mutable_it->second;
assert(base_meta);
assert(base_meta->GetSharedMeta() == mutable_meta.GetSharedMeta());
auto meta = GetOrCreateMetaDataForExistingBlobFile(base_meta, delta);
if (!mutable_meta.HasDelta()) {
assert(base_meta->GetGarbageBlobCount() ==
mutable_meta.GetGarbageBlobCount());
assert(base_meta->GetGarbageBlobBytes() ==
mutable_meta.GetGarbageBlobBytes());
assert(base_meta->GetLinkedSsts() == mutable_meta.GetLinkedSsts());
AddBlobFileIfNeeded(vstorage, base_meta, &found_first_non_empty);
} else {
auto meta = CreateBlobFileMetaData(mutable_meta);
AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
}
++base_it;
++delta_it;
++mutable_it;
}
}
@ -843,14 +886,14 @@ class VersionBuilder::Rep {
++base_it;
}
while (delta_it != delta_it_end) {
const auto& delta = delta_it->second;
while (mutable_it != mutable_it_end) {
const auto& mutable_meta = mutable_it->second;
auto meta = CreateMetaDataForNewBlobFile(delta);
auto meta = CreateBlobFileMetaData(mutable_meta);
AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
++delta_it;
++mutable_it;
}
}

Loading…
Cancel
Save