Make VersionBuilder unit testable

Summary:
Rename Version::Builder to VersionBuilder and expose its definition to a header.
Make VerisonBuilder not reference Version or ColumnFamilyData, only working with VersionStorageInfo.
Add version_builder_test which has a simple test.

Test Plan: make all check

Reviewers: rven, yhchiang, igor, ljin

Reviewed By: igor

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D27969
main
sdong 10 years ago
parent 2b1f23dcae
commit 4d2ba38b65
  1. 4
      Makefile
  2. 6
      db/column_family.cc
  3. 4
      db/compaction.cc
  4. 7
      db/compaction_picker_test.cc
  5. 23
      db/db_impl.cc
  6. 7
      db/db_impl_debug.cc
  7. 2
      db/flush_job.cc
  8. 6
      db/forward_iterator.cc
  9. 6
      db/internal_stats.cc
  10. 40
      db/version_builder.h
  11. 123
      db/version_builder_test.cc
  12. 9
      db/version_edit.h
  13. 346
      db/version_set.cc
  14. 42
      db/version_set.h
  15. 2
      util/ldb_cmd.cc
  16. 2
      utilities/compacted_db/compacted_db_impl.cc

@ -133,6 +133,7 @@ TESTS = \
version_edit_test \
version_set_test \
compaction_picker_test \
version_builder_test \
file_indexer_test \
write_batch_test \
write_controller_test\
@ -464,6 +465,9 @@ version_set_test: db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS)
compaction_picker_test: db/compaction_picker_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/compaction_picker_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
version_builder_test: db/version_builder_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/version_builder_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
file_indexer_test : db/file_indexer_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/file_indexer_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

@ -324,7 +324,7 @@ ColumnFamilyData::~ColumnFamilyData() {
void ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) {
if (current_ != nullptr) {
auto* vstorage = current_->GetStorageInfo();
auto* vstorage = current_->storage_info();
const double score = vstorage->MaxCompactionScore();
const int max_level = vstorage->MaxCompactionScoreLevel();
@ -405,7 +405,7 @@ void ColumnFamilyData::CreateNewMemtable(
Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, current_->GetStorageInfo(), log_buffer);
GetName(), mutable_options, current_->storage_info(), log_buffer);
if (result != nullptr) {
result->SetInputVersion(current_);
}
@ -418,7 +418,7 @@ Compaction* ColumnFamilyData::CompactRange(
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) {
auto* result = compaction_picker_->CompactRange(
GetName(), mutable_cf_options, current_->GetStorageInfo(), input_level,
GetName(), mutable_cf_options, current_->storage_info(), input_level,
output_level, output_path_id, begin, end, compaction_end);
if (result != nullptr) {
result->SetInputVersion(current_);

@ -129,7 +129,7 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) {
const Comparator* user_cmp = cfd_->user_comparator();
for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) {
const std::vector<FileMetaData*>& files =
input_version_->GetStorageInfo()->LevelFiles(lvl);
input_version_->storage_info()->LevelFiles(lvl);
for (; level_ptrs_[lvl] < files.size(); ) {
FileMetaData* f = files[level_ptrs_[lvl]];
if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
@ -228,7 +228,7 @@ void Compaction::ReleaseCompactionFiles(Status status) {
void Compaction::ResetNextCompactionIndex() {
assert(input_version_ != nullptr);
input_version_->GetStorageInfo()->ResetNextCompactionIndex(start_level_);
input_version_->storage_info()->ResetNextCompactionIndex(start_level_);
}
namespace {

@ -50,9 +50,8 @@ class CompactionPickerTest {
}
~CompactionPickerTest() {
auto* files = vstorage.GetFiles();
for (int i = 0; i < vstorage.NumberLevels(); i++) {
for (auto* f : files[i]) {
for (auto* f : vstorage.LevelFiles(i)) {
delete f;
}
}
@ -63,13 +62,13 @@ class CompactionPickerTest {
SequenceNumber smallest_seq = 100,
SequenceNumber largest_seq = 100) {
assert(level < vstorage.NumberLevels());
auto& files = vstorage.GetFiles()[level];
FileMetaData* f = new FileMetaData;
f->fd = FileDescriptor(file_number, path_id, file_size);
f->smallest = InternalKey(smallest, smallest_seq, kTypeValue);
f->largest = InternalKey(largest, largest_seq, kTypeValue);
f->compensated_file_size = file_size;
files.push_back(f);
f->refs = 0;
vstorage.MaybeAddFile(level, f);
}
void UpdateVersionStorageInfo() {

@ -1181,7 +1181,7 @@ Status DBImpl::FlushMemTableToOutputFile(
}
VersionStorageInfo::LevelSummaryStorage tmp;
LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(),
cfd->current()->GetStorageInfo()->LevelSummary(&tmp));
cfd->current()->storage_info()->LevelSummary(&tmp));
if (disable_delete_obsolete_files_ == 0) {
// add to deletion state
@ -1227,7 +1227,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
MutexLock l(&mutex_);
Version* base = cfd->current();
for (int level = 1; level < cfd->NumberLevels(); level++) {
if (base->GetStorageInfo()->OverlapInLevel(level, begin, end)) {
if (base->storage_info()->OverlapInLevel(level, begin, end)) {
max_level_with_files = level;
}
}
@ -1305,7 +1305,7 @@ bool DBImpl::SetOptions(ColumnFamilyHandle* column_family,
int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options, int level) {
mutex_.AssertHeld();
auto* vstorage = cfd->current()->GetStorageInfo();
const auto* vstorage = cfd->current()->storage_info();
int minimum_level = level;
for (int i = level - 1; i > 0; --i) {
// stop if level i is not empty
@ -1364,7 +1364,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
for (const auto& f : cfd->current()->GetStorageInfo()->files_[level]) {
for (const auto& f : cfd->current()->storage_info()->LevelFiles(level)) {
edit.DeleteFile(level, f->fd.GetNumber());
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
@ -1580,7 +1580,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
bool is_compaction_needed = false;
// no need to refcount since we're under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->current()->GetStorageInfo()->NeedsCompaction()) {
if (cfd->current()->storage_info()->NeedsCompaction()) {
is_compaction_needed = true;
break;
}
@ -1956,7 +1956,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
" bytes %s: %s\n",
c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
c->level() + 1, f->fd.GetFileSize(), status.ToString().c_str(),
c->input_version()->GetStorageInfo()->LevelSummary(&tmp));
c->input_version()->storage_info()->LevelSummary(&tmp));
c->ReleaseCompactionFiles(status);
*madeProgress = true;
} else {
@ -2688,7 +2688,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
LogToBuffer(log_buffer, "[%s] Compaction start summary: %s\n",
cfd->GetName().c_str(), scratch);
assert(cfd->current()->GetStorageInfo()->NumLevelFiles(
assert(cfd->current()->storage_info()->NumLevelFiles(
compact->compaction->level()) > 0);
assert(compact->builder == nullptr);
assert(!compact->outfile);
@ -2934,7 +2934,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s, records in: %d, records dropped: %d\n",
cfd->GetName().c_str(),
cfd->current()->GetStorageInfo()->LevelSummary(&tmp),
cfd->current()->storage_info()->LevelSummary(&tmp),
(stats.bytes_readn + stats.bytes_readnp1) /
static_cast<double>(stats.micros),
stats.bytes_written / static_cast<double>(stats.micros),
@ -4040,7 +4040,7 @@ Status DBImpl::DeleteFile(std::string name) {
// Only the files in the last level can be deleted externally.
// This is to make sure that any deletion tombstones are not
// lost. Check that the level passed is the last level.
auto* vstoreage = cfd->current()->GetStorageInfo();
auto* vstoreage = cfd->current()->storage_info();
for (int i = level + 1; i < cfd->NumberLevels(); i++) {
if (vstoreage->NumLevelFiles(i) != 0) {
Log(db_options_.info_log,
@ -4049,7 +4049,8 @@ Status DBImpl::DeleteFile(std::string name) {
}
}
// if level == 0, it has to be the oldest file
if (level == 0 && vstoreage->files_[0].back()->fd.GetNumber() != number) {
if (level == 0 &&
vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) {
return Status::InvalidArgument("File in level 0, but not oldest");
}
edit.SetColumnFamily(cfd->GetID());
@ -4302,7 +4303,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
auto* vstorage = cfd->current()->GetStorageInfo();
auto* vstorage = cfd->current()->storage_info();
for (int i = 1; i < vstorage->NumberLevels(); ++i) {
int num_files = vstorage->NumLevelFiles(i);
if (num_files > 0) {

@ -15,8 +15,7 @@ namespace rocksdb {
uint64_t DBImpl::TEST_GetLevel0TotalSize() {
MutexLock l(&mutex_);
return default_cf_handle_->cfd()->current()->GetStorageInfo()->NumLevelBytes(
0);
return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
}
Iterator* DBImpl::TEST_NewInternalIterator(Arena* arena,
@ -46,7 +45,7 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
cfd = cfh->cfd();
}
MutexLock l(&mutex_);
return cfd->current()->GetStorageInfo()->MaxNextLevelOverlappingBytes();
return cfd->current()->storage_info()->MaxNextLevelOverlappingBytes();
}
void DBImpl::TEST_GetFilesMetaData(
@ -58,7 +57,7 @@ void DBImpl::TEST_GetFilesMetaData(
metadata->resize(NumberLevels());
for (int level = 0; level < NumberLevels(); level++) {
const std::vector<FileMetaData*>& files =
cfd->current()->GetStorageInfo()->LevelFiles(level);
cfd->current()->storage_info()->LevelFiles(level);
(*metadata)[level].clear();
for (const auto& f : files) {

@ -202,7 +202,7 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
if (base != nullptr && db_options_.max_background_compactions <= 1 &&
db_options_.max_background_flushes == 0 &&
cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
level = base->GetStorageInfo()->PickLevelForMemTableOutput(
level = base->storage_info()->PickLevelForMemTableOutput(
mutable_cf_options_, min_user_key, max_user_key);
}
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),

@ -220,7 +220,7 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
if (!seek_to_first) {
user_key = ExtractUserKey(internal_key);
}
VersionStorageInfo* vstorage = sv_->current->GetStorageInfo();
const VersionStorageInfo* vstorage = sv_->current->storage_info();
const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
for (uint32_t i = 0; i < l0.size(); ++i) {
if (seek_to_first) {
@ -430,7 +430,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
auto* vstorage = sv_->current->GetStorageInfo();
const auto* vstorage = sv_->current->storage_info();
const auto& l0_files = vstorage->LevelFiles(0);
l0_iters_.reserve(l0_files.size());
for (const auto* l0 : l0_files) {
@ -454,7 +454,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
}
void ForwardIterator::ResetIncompleteIterators() {
const auto& l0_files = sv_->current->GetStorageInfo()->LevelFiles(0);
const auto& l0_files = sv_->current->storage_info()->LevelFiles(0);
for (uint32_t i = 0; i < l0_iters_.size(); ++i) {
assert(i < l0_files.size());
if (!l0_iters_[i]->status().IsIncomplete()) {

@ -170,7 +170,7 @@ bool InternalStats::GetStringProperty(DBPropertyType property_type,
std::string* value) {
assert(value != nullptr);
auto* current = cfd_->current();
auto* vstorage = current->GetStorageInfo();
const auto* vstorage = current->storage_info();
Slice in = property;
switch (property_type) {
@ -230,7 +230,7 @@ bool InternalStats::GetStringProperty(DBPropertyType property_type,
bool InternalStats::GetIntProperty(DBPropertyType property_type,
uint64_t* value, DBImpl* db) const {
auto* vstorage = cfd_->current()->GetStorageInfo();
const auto* vstorage = cfd_->current()->storage_info();
switch (property_type) {
case kNumImmutableMemTable:
@ -366,7 +366,7 @@ void InternalStats::DumpDBStats(std::string* value) {
}
void InternalStats::DumpCFStats(std::string* value) {
VersionStorageInfo* vstorage = cfd_->current()->GetStorageInfo();
const VersionStorageInfo* vstorage = cfd_->current()->storage_info();
int num_levels_to_check =
(cfd_->options()->compaction_style != kCompactionStyleUniversal &&

@ -0,0 +1,40 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
#pragma once
#include "rocksdb/env.h"
namespace rocksdb {
class TableCache;
class VersionStorageInfo;
class VersionEdit;
class FileMetaData;
// A helper class so we can efficiently apply a whole sequence
// of edits to a particular state without creating intermediate
// Versions that contain full copies of the intermediate state.
class VersionBuilder {
public:
VersionBuilder(const EnvOptions& env_options, TableCache* table_cache,
VersionStorageInfo* base_vstorage);
~VersionBuilder();
void CheckConsistency(VersionStorageInfo* vstorage);
void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number,
int level);
void Apply(VersionEdit* edit);
void SaveTo(VersionStorageInfo* vstorage);
void LoadTableHandlers();
void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f);
private:
class Rep;
Rep* rep_;
};
} // namespace rocksdb

@ -0,0 +1,123 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <string>
#include "db/version_edit.h"
#include "db/version_set.h"
#include "util/logging.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace rocksdb {
class VersionBuilderTest {
public:
const Comparator* ucmp;
InternalKeyComparator icmp;
Options options;
ImmutableCFOptions ioptions;
MutableCFOptions mutable_cf_options;
VersionStorageInfo vstorage;
uint32_t file_num;
CompactionOptionsFIFO fifo_options;
std::vector<uint64_t> size_being_compacted;
VersionBuilderTest()
: ucmp(BytewiseComparator()),
icmp(ucmp),
ioptions(options),
mutable_cf_options(options, ioptions),
vstorage(&icmp, ucmp, options.num_levels, kCompactionStyleLevel,
nullptr),
file_num(1) {
mutable_cf_options.RefreshDerivedOptions(ioptions);
size_being_compacted.resize(options.num_levels);
}
~VersionBuilderTest() {
for (int i = 0; i < vstorage.NumberLevels(); i++) {
for (auto* f : vstorage.LevelFiles(i)) {
if (--f->refs == 0) {
delete f;
}
}
}
}
InternalKey GetInternalKey(const char* ukey,
SequenceNumber smallest_seq = 100) {
return InternalKey(ukey, smallest_seq, kTypeValue);
}
void Add(int level, uint32_t file_number, const char* smallest,
const char* largest, uint64_t file_size = 0, uint32_t path_id = 0,
SequenceNumber smallest_seq = 100,
SequenceNumber largest_seq = 100) {
assert(level < vstorage.NumberLevels());
FileMetaData* f = new FileMetaData;
f->fd = FileDescriptor(file_number, path_id, file_size);
f->smallest = GetInternalKey(smallest, smallest_seq);
f->largest = GetInternalKey(largest, largest_seq);
f->compensated_file_size = file_size;
f->refs = 0;
vstorage.MaybeAddFile(level, f);
}
void UpdateVersionStorageInfo() {
vstorage.ComputeCompactionScore(mutable_cf_options, fifo_options,
size_being_compacted);
vstorage.UpdateFilesBySize();
vstorage.UpdateNumNonEmptyLevels();
vstorage.GenerateFileIndexer();
vstorage.GenerateLevelFilesBrief();
vstorage.SetFinalized();
}
};
TEST(VersionBuilderTest, ApplyAndSaveTo) {
Add(0, 1U, "150", "200", 100U);
// Level 1 score 1.2
Add(1, 66U, "150", "200", 100U);
Add(1, 88U, "201", "300", 100U);
// Level 2 score 1.8. File 7 is the largest. Should be picked
Add(2, 6U, "150", "179", 100U);
Add(2, 7U, "180", "220", 100U);
Add(2, 8U, "221", "300", 100U);
// Level 3 score slightly larger than 1
Add(3, 26U, "150", "170", 100U);
Add(3, 27U, "171", "179", 100U);
Add(3, 28U, "191", "220", 100U);
Add(3, 29U, "221", "300", 100U);
UpdateVersionStorageInfo();
VersionEdit version_edit;
version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200);
version_edit.DeleteFile(3, 27U);
EnvOptions env_options;
VersionBuilder version_builder(env_options, nullptr, &vstorage);
VersionStorageInfo new_vstorage(&icmp, ucmp, options.num_levels,
kCompactionStyleLevel, nullptr);
version_builder.Apply(&version_edit);
version_builder.SaveTo(&new_vstorage);
ASSERT_EQ(400U, new_vstorage.NumLevelBytes(2));
ASSERT_EQ(300U, new_vstorage.NumLevelBytes(3));
for (int i = 0; i < new_vstorage.NumberLevels(); i++) {
for (auto* f : new_vstorage.LevelFiles(i)) {
if (--f->refs == 0) {
delete f;
}
}
}
}
} // namespace rocksdb
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }

@ -217,14 +217,19 @@ class VersionEdit {
bool EncodeTo(std::string* dst) const;
Status DecodeFrom(const Slice& src);
typedef std::set<std::pair<int, uint64_t>> DeletedFileSet;
const DeletedFileSet& GetDeletedFiles() { return deleted_files_; }
const std::vector<std::pair<int, FileMetaData>>& GetNewFiles() {
return new_files_;
}
std::string DebugString(bool hex_key = false) const;
private:
friend class VersionSet;
friend class Version;
typedef std::set< std::pair<int, uint64_t>> DeletedFileSet;
bool GetLevel(Slice* input, int* level, const char** msg);
int max_level_;

@ -29,6 +29,7 @@
#include "db/merge_context.h"
#include "db/table_cache.h"
#include "db/compaction.h"
#include "db/version_builder.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "table/table_reader.h"
@ -315,9 +316,9 @@ Version::~Version() {
next_->prev_ = prev_;
// Drop references to files
for (int level = 0; level < vstorage_.num_levels_; level++) {
for (size_t i = 0; i < vstorage_.files_[level].size(); i++) {
FileMetaData* f = vstorage_.files_[level][i];
for (int level = 0; level < storage_info_.num_levels_; level++) {
for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
FileMetaData* f = storage_info_.files_[level][i];
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
@ -512,6 +513,23 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
bool for_compaction_;
};
// A wrapper of version builder which references the current version in
// constructor and unref it in the destructor.
class BaseReferencedVersionBuilder {
public:
explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
: version_builder_(cfd->current()->version_set()->GetEnvOptions(),
cfd->table_cache(), cfd->current()->storage_info()),
version_(cfd->current()) {
version_->Ref();
}
~BaseReferencedVersionBuilder() { version_->Unref(); }
VersionBuilder* GetVersionBuilder() { return &version_builder_; }
private:
VersionBuilder version_builder_;
Version* version_;
};
} // anonymous namespace
Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
@ -565,8 +583,8 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
}
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
for (int level = 0; level < vstorage_.num_levels_; level++) {
for (const auto& file_meta : vstorage_.files_[level]) {
for (int level = 0; level < storage_info_.num_levels_; level++) {
for (const auto& file_meta : storage_info_.files_[level]) {
auto fname =
TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(),
file_meta->fd.GetPathId());
@ -587,7 +605,7 @@ Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
size_t Version::GetMemoryUsageByTableReaders() {
size_t total_usage = 0;
for (auto& file_level : vstorage_.level_files_brief_) {
for (auto& file_level : storage_info_.level_files_brief_) {
for (size_t i = 0; i < file_level.num_files; i++) {
total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
vset_->env_options_, cfd_->internal_comparator(),
@ -597,7 +615,7 @@ size_t Version::GetMemoryUsageByTableReaders() {
return total_usage;
}
uint64_t VersionStorageInfo::GetEstimatedActiveKeys() {
uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
// Estimation will be not accurate when:
// (1) there is merge keys
// (2) keys are directly overwritten
@ -620,11 +638,11 @@ uint64_t VersionStorageInfo::GetEstimatedActiveKeys() {
void Version::AddIterators(const ReadOptions& read_options,
const EnvOptions& soptions,
MergeIteratorBuilder* merge_iter_builder) {
assert(vstorage_.finalized_);
assert(storage_info_.finalized_);
// Merge all level zero files together since they may overlap
for (size_t i = 0; i < vstorage_.level_files_brief_[0].num_files; i++) {
const auto& file = vstorage_.level_files_brief_[0].files[i];
for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
const auto& file = storage_info_.LevelFilesBrief(0).files[i];
merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr,
false, merge_iter_builder->GetArena()));
@ -633,15 +651,15 @@ void Version::AddIterators(const ReadOptions& read_options,
// For levels > 0, we can use a concatenating iterator that sequentially
// walks through the non-overlapping files in the level, opening them
// lazily.
for (int level = 1; level < vstorage_.num_levels_; level++) {
if (vstorage_.level_files_brief_[level].num_files != 0) {
for (int level = 1; level < storage_info_.NumberLevels(); level++) {
if (storage_info_.level_files_brief_[level].num_files != 0) {
merge_iter_builder->AddIterator(NewTwoLevelIterator(
new LevelFileIteratorState(
cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(), false /* for_compaction */,
cfd_->ioptions()->prefix_extractor != nullptr),
new LevelFileNumIterator(cfd_->internal_comparator(),
&vstorage_.level_files_brief_[level]),
&storage_info_.LevelFilesBrief(level)),
merge_iter_builder->GetArena()));
}
}
@ -689,14 +707,14 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset,
table_cache_((cfd == nullptr) ? nullptr : cfd->table_cache()),
merge_operator_((cfd == nullptr) ? nullptr
: cfd->ioptions()->merge_operator),
vstorage_((cfd == nullptr) ? nullptr : &cfd->internal_comparator(),
(cfd == nullptr) ? nullptr : cfd->user_comparator(),
cfd == nullptr ? 0 : cfd->NumberLevels(),
cfd == nullptr ? kCompactionStyleLevel
: cfd->ioptions()->compaction_style,
(cfd == nullptr || cfd->current() == nullptr)
? nullptr
: cfd->current()->GetStorageInfo()),
storage_info_((cfd == nullptr) ? nullptr : &cfd->internal_comparator(),
(cfd == nullptr) ? nullptr : cfd->user_comparator(),
cfd == nullptr ? 0 : cfd->NumberLevels(),
cfd == nullptr ? kCompactionStyleLevel
: cfd->ioptions()->compaction_style,
(cfd == nullptr || cfd->current() == nullptr)
? nullptr
: cfd->current()->storage_info()),
vset_(vset),
next_(this),
prev_(this),
@ -715,16 +733,17 @@ void Version::Get(const ReadOptions& read_options,
assert(status->ok() || status->IsMergeInProgress());
GetContext get_context(
GetUserComparator(), merge_operator_, info_log_, db_statistics_,
user_comparator(), merge_operator_, info_log_, db_statistics_,
status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
value, value_found, merge_context);
FilePicker fp(vstorage_.files_, user_key, ikey, &vstorage_.level_files_brief_,
vstorage_.num_non_empty_levels_, &vstorage_.file_indexer_,
GetUserComparator(), GetInternalComparator());
FilePicker fp(
storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
user_comparator(), internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();
while (f != nullptr) {
*status = table_cache_->Get(read_options, *GetInternalComparator(), f->fd,
*status = table_cache_->Get(read_options, *internal_comparator(), f->fd,
ikey, &get_context);
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
@ -783,13 +802,13 @@ void VersionStorageInfo::GenerateLevelFilesBrief() {
void Version::PrepareApply(const MutableCFOptions& mutable_cf_options,
std::vector<uint64_t>& size_being_compacted) {
UpdateAccumulatedStats();
vstorage_.ComputeCompactionScore(mutable_cf_options,
cfd_->ioptions()->compaction_options_fifo,
size_being_compacted);
vstorage_.UpdateFilesBySize();
vstorage_.UpdateNumNonEmptyLevels();
vstorage_.GenerateFileIndexer();
vstorage_.GenerateLevelFilesBrief();
storage_info_.ComputeCompactionScore(
mutable_cf_options, cfd_->ioptions()->compaction_options_fifo,
size_being_compacted);
storage_info_.UpdateFilesBySize();
storage_info_.UpdateNumNonEmptyLevels();
storage_info_.GenerateFileIndexer();
storage_info_.GenerateLevelFilesBrief();
}
bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
@ -841,11 +860,12 @@ void Version::UpdateAccumulatedStats() {
// will be triggered, which creates higher-level files whose num_deletions
// will be updated here.
for (int level = 0;
level < vstorage_.num_levels_ && init_count < kMaxInitCount; ++level) {
for (auto* file_meta : vstorage_.files_[level]) {
level < storage_info_.num_levels_ && init_count < kMaxInitCount;
++level) {
for (auto* file_meta : storage_info_.files_[level]) {
if (MaybeInitializeFileMetaData(file_meta)) {
// each FileMeta will be initialized only once.
vstorage_.UpdateAccumulatedStats(file_meta);
storage_info_.UpdateAccumulatedStats(file_meta);
if (++init_count >= kMaxInitCount) {
break;
}
@ -855,17 +875,17 @@ void Version::UpdateAccumulatedStats() {
// In case all sampled-files contain only deletion entries, then we
// load the table-property of a file in higher-level to initialize
// that value.
for (int level = vstorage_.num_levels_ - 1;
vstorage_.accumulated_raw_value_size_ == 0 && level >= 0; --level) {
for (int i = static_cast<int>(vstorage_.files_[level].size()) - 1;
vstorage_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
if (MaybeInitializeFileMetaData(vstorage_.files_[level][i])) {
vstorage_.UpdateAccumulatedStats(vstorage_.files_[level][i]);
for (int level = storage_info_.num_levels_ - 1;
storage_info_.accumulated_raw_value_size_ == 0 && level >= 0; --level) {
for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
}
}
}
vstorage_.ComputeCompensatedSizes();
storage_info_.ComputeCompensatedSizes();
}
void VersionStorageInfo::ComputeCompensatedSizes() {
@ -987,6 +1007,18 @@ bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
} // anonymous namespace
void VersionStorageInfo::MaybeAddFile(int level, FileMetaData* f) {
assert(level < NumberLevels());
auto* level_files = &files_[level];
// Must not overlap
assert(level <= 0 || level_files->empty() ||
internal_comparator_->Compare(
(*level_files)[level_files->size() - 1]->largest, f->smallest) <
0);
f->refs++;
level_files->push_back(f);
}
void VersionStorageInfo::UpdateNumNonEmptyLevels() {
num_non_empty_levels_ = num_levels_;
for (int i = num_levels_ - 1; i >= 0; i--) {
@ -1379,8 +1411,8 @@ int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
}
void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
for (int level = 0; level < vstorage_.NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = vstorage_.files_[level];
for (int level = 0; level < storage_info_.NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = storage_info_.files_[level];
for (const auto& file : files) {
live->push_back(file->fd);
}
@ -1389,7 +1421,7 @@ void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
std::string Version::DebugString(bool hex) const {
std::string r;
for (int level = 0; level < vstorage_.num_levels_; level++) {
for (int level = 0; level < storage_info_.num_levels_; level++) {
// E.g.,
// --- level 1 ---
// 17:123['a' .. 'd']
@ -1399,7 +1431,7 @@ std::string Version::DebugString(bool hex) const {
r.append(" --- version# ");
AppendNumberTo(&r, version_number_);
r.append(" ---\n");
const std::vector<FileMetaData*>& files = vstorage_.files_[level];
const std::vector<FileMetaData*>& files = storage_info_.files_[level];
for (size_t i = 0; i < files.size(); i++) {
r.push_back(' ');
AppendNumberTo(&r, files[i]->fd.GetNumber());
@ -1428,10 +1460,7 @@ struct VersionSet::ManifestWriter {
: done(false), cv(mu), cfd(cfd), edit(e) {}
};
// A helper class so we can efficiently apply a whole sequence
// of edits to a particular state without creating intermediate
// Versions that contain full copies of the intermediate state.
class VersionSet::Builder {
class VersionBuilder::Rep {
private:
// Helper to sort files_ in v
// kLevel0 -- NewestFirstBySeqNo
@ -1461,30 +1490,33 @@ class VersionSet::Builder {
FileSet* added_files;
};
ColumnFamilyData* cfd_;
Version* base_;
const EnvOptions& env_options_;
TableCache* table_cache_;
VersionStorageInfo* base_vstorage_;
LevelState* levels_;
FileComparator level_zero_cmp_;
FileComparator level_nonzero_cmp_;
public:
Builder(ColumnFamilyData* cfd) : cfd_(cfd), base_(cfd->current()) {
base_->Ref();
levels_ = new LevelState[base_->GetStorageInfo()->NumberLevels()];
Rep(const EnvOptions& env_options, TableCache* table_cache,
VersionStorageInfo* base_vstorage)
: env_options_(env_options),
table_cache_(table_cache),
base_vstorage_(base_vstorage) {
levels_ = new LevelState[base_vstorage_->NumberLevels()];
level_zero_cmp_.sort_method = FileComparator::kLevel0;
level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
level_nonzero_cmp_.internal_comparator = &cfd->internal_comparator();
level_nonzero_cmp_.internal_comparator =
base_vstorage_->InternalComparator();
levels_[0].added_files = new FileSet(level_zero_cmp_);
for (int level = 1; level < base_->GetStorageInfo()->NumberLevels();
level++) {
for (int level = 1; level < base_vstorage_->NumberLevels(); level++) {
levels_[level].added_files = new FileSet(level_nonzero_cmp_);
}
}
~Builder() {
for (int level = 0; level < base_->GetStorageInfo()->NumberLevels();
level++) {
~Rep() {
for (int level = 0; level < base_vstorage_->NumberLevels(); level++) {
const FileSet* added = levels_[level].added_files;
std::vector<FileMetaData*> to_unref;
to_unref.reserve(added->size());
@ -1498,7 +1530,8 @@ class VersionSet::Builder {
f->refs--;
if (f->refs <= 0) {
if (f->table_reader_handle) {
cfd_->table_cache()->ReleaseHandle(f->table_reader_handle);
assert(table_cache_ != nullptr);
table_cache_->ReleaseHandle(f->table_reader_handle);
f->table_reader_handle = nullptr;
}
delete f;
@ -1507,17 +1540,16 @@ class VersionSet::Builder {
}
delete[] levels_;
base_->Unref();
}
void CheckConsistency(Version* v) {
void CheckConsistency(VersionStorageInfo* vstorage) {
#ifndef NDEBUG
// make sure the files are sorted correctly
auto* files = v->GetFiles();
for (int level = 0; level < v->GetStorageInfo()->NumberLevels(); level++) {
for (size_t i = 1; i < files[level].size(); i++) {
auto f1 = files[level][i - 1];
auto f2 = files[level][i];
for (int level = 0; level < vstorage->NumberLevels(); level++) {
auto& level_files = vstorage->LevelFiles(level);
for (size_t i = 1; i < level_files.size(); i++) {
auto f1 = level_files[i - 1];
auto f2 = level_files[i];
if (level == 0) {
assert(level_zero_cmp_(f1, f2));
assert(f1->largest_seqno > f2->largest_seqno);
@ -1525,8 +1557,8 @@ class VersionSet::Builder {
assert(level_nonzero_cmp_(f1, f2));
// Make sure there is no overlap in levels > 0
if (cfd_->internal_comparator().Compare(f1->largest, f2->smallest) >=
0) {
if (vstorage->InternalComparator()->Compare(f1->largest,
f2->smallest) >= 0) {
fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
(f1->largest).DebugString().c_str(),
(f2->smallest).DebugString().c_str());
@ -1543,10 +1575,9 @@ class VersionSet::Builder {
#ifndef NDEBUG
// a file to be deleted better exist in the previous version
bool found = false;
auto* files = base_->GetFiles();
for (int l = 0; !found && l < base_->GetStorageInfo()->NumberLevels();
l++) {
const std::vector<FileMetaData*>& base_files = files[l];
for (int l = 0; !found && l < base_vstorage_->NumberLevels(); l++) {
const std::vector<FileMetaData*>& base_files =
base_vstorage_->LevelFiles(l);
for (unsigned int i = 0; i < base_files.size(); i++) {
FileMetaData* f = base_files[i];
if (f->fd.GetNumber() == number) {
@ -1558,8 +1589,8 @@ class VersionSet::Builder {
// if the file did not exist in the previous version, then it
// is possibly moved from lower level to higher level in current
// version
for (int l = level + 1;
!found && l < base_->GetStorageInfo()->NumberLevels(); l++) {
for (int l = level + 1; !found && l < base_vstorage_->NumberLevels();
l++) {
const FileSet* added = levels_[l].added_files;
for (FileSet::const_iterator added_iter = added->begin();
added_iter != added->end(); ++added_iter) {
@ -1592,10 +1623,10 @@ class VersionSet::Builder {
// Apply all of the edits in *edit to the current state.
void Apply(VersionEdit* edit) {
CheckConsistency(base_);
CheckConsistency(base_vstorage_);
// Delete files
const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
const VersionEdit::DeletedFileSet& del = edit->GetDeletedFiles();
for (const auto& del_file : del) {
const auto level = del_file.first;
const auto number = del_file.second;
@ -1604,7 +1635,7 @@ class VersionSet::Builder {
}
// Add new files
for (const auto& new_file : edit->new_files_) {
for (const auto& new_file : edit->GetNewFiles()) {
const int level = new_file.first;
FileMetaData* f = new FileMetaData(new_file.second);
f->refs = 1;
@ -1615,77 +1646,88 @@ class VersionSet::Builder {
}
// Save the current state in *v.
void SaveTo(Version* v) {
CheckConsistency(base_);
CheckConsistency(v);
void SaveTo(VersionStorageInfo* vstorage) {
CheckConsistency(base_vstorage_);
CheckConsistency(vstorage);
auto* out_files = v->GetFiles();
for (int level = 0; level < base_->GetStorageInfo()->NumberLevels();
level++) {
for (int level = 0; level < base_vstorage_->NumberLevels(); level++) {
const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
const auto& base_files = base_->GetStorageInfo()->LevelFiles(level);
const auto& base_files = base_vstorage_->LevelFiles(level);
auto base_iter = base_files.begin();
auto base_end = base_files.end();
const auto& added_files = *levels_[level].added_files;
out_files[level].reserve(base_files.size() + added_files.size());
vstorage->Reserve(level, base_files.size() + added_files.size());
for (const auto& added : added_files) {
// Add all smaller files listed in base_
for (auto bpos = std::upper_bound(base_iter, base_end, added, cmp);
base_iter != bpos;
++base_iter) {
MaybeAddFile(v, level, *base_iter);
MaybeAddFile(vstorage, level, *base_iter);
}
MaybeAddFile(v, level, added);
MaybeAddFile(vstorage, level, added);
}
// Add remaining base files
for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
MaybeAddFile(vstorage, level, *base_iter);
}
}
CheckConsistency(v);
CheckConsistency(vstorage);
}
void LoadTableHandlers() {
for (int level = 0; level < cfd_->NumberLevels(); level++) {
assert(table_cache_ != nullptr);
for (int level = 0; level < base_vstorage_->NumberLevels(); level++) {
for (auto& file_meta : *(levels_[level].added_files)) {
assert (!file_meta->table_reader_handle);
cfd_->table_cache()->FindTable(
base_->GetVersionSet()->env_options_, cfd_->internal_comparator(),
assert(!file_meta->table_reader_handle);
table_cache_->FindTable(
env_options_, *(base_vstorage_->InternalComparator()),
file_meta->fd, &file_meta->table_reader_handle, false);
if (file_meta->table_reader_handle != nullptr) {
// Load table_reader
file_meta->fd.table_reader =
cfd_->table_cache()->GetTableReaderFromHandle(
file_meta->table_reader_handle);
}
file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(
file_meta->table_reader_handle);
}
}
}
}
void MaybeAddFile(Version* v, int level, FileMetaData* f) {
void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) {
// File is deleted: do nothing
} else {
auto* files = v->GetFiles();
auto* level_files = &files[level];
if (level > 0 && !level_files->empty()) {
// Must not overlap
assert(cfd_->internal_comparator().Compare(
(*level_files)[level_files->size() - 1]->largest,
f->smallest) < 0);
}
f->refs++;
level_files->push_back(f);
vstorage->MaybeAddFile(level, f);
}
}
};
VersionBuilder::VersionBuilder(const EnvOptions& env_options,
TableCache* table_cache,
VersionStorageInfo* base_vstorage)
: rep_(new Rep(env_options, table_cache, base_vstorage)) {}
VersionBuilder::~VersionBuilder() { delete rep_; }
void VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) {
rep_->CheckConsistency(vstorage);
}
void VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit,
uint64_t number, int level) {
rep_->CheckConsistencyForDeletes(edit, number, level);
}
void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); }
void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
rep_->SaveTo(vstorage);
}
void VersionBuilder::LoadTableHandlers() { rep_->LoadTableHandlers(); }
void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,
FileMetaData* f) {
rep_->MaybeAddFile(vstorage, level, f);
}
VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options,
const EnvOptions& env_options, Cache* table_cache,
WriteController* write_controller)
@ -1717,7 +1759,7 @@ VersionSet::~VersionSet() {
void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
Version* v) {
// Mark v finalized
v->vstorage_.SetFinalized();
v->storage_info_.SetFinalized();
// Make "v" current
assert(v->refs_ == 0);
@ -1773,7 +1815,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
std::vector<VersionEdit*> batch_edits;
Version* v = nullptr;
std::unique_ptr<Builder> builder(nullptr);
std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(nullptr);
// process all requests in the queue
ManifestWriter* last_writer = &w;
@ -1785,7 +1827,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
batch_edits.push_back(edit);
} else {
v = new Version(column_family_data, this, current_version_number_++);
builder.reset(new Builder(column_family_data));
builder_guard.reset(new BaseReferencedVersionBuilder(column_family_data));
auto* builder = builder_guard->GetVersionBuilder();
for (const auto& writer : manifest_writers_) {
if (writer->edit->IsColumnFamilyManipulation() ||
writer->cfd->GetID() != column_family_data->GetID()) {
@ -1794,11 +1837,10 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
break;
}
last_writer = writer;
LogAndApplyHelper(column_family_data, builder.get(), v, last_writer->edit,
mu);
LogAndApplyHelper(column_family_data, builder, v, last_writer->edit, mu);
batch_edits.push_back(last_writer->edit);
}
builder->SaveTo(v);
builder->SaveTo(v->storage_info());
}
// Initialize new descriptor log file if necessary by creating
@ -1828,7 +1870,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
{
std::vector<uint64_t> size_being_compacted;
if (!edit->IsColumnFamilyManipulation()) {
size_being_compacted.resize(v->GetStorageInfo()->NumberLevels() - 1);
size_being_compacted.resize(v->storage_info()->NumberLevels() - 1);
// calculate the amount of data being compacted at every level
column_family_data->compaction_picker()->SizeBeingCompacted(
size_being_compacted);
@ -1840,7 +1882,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
db_options_->max_open_files == -1) {
// unlimited table cache. Pre-load table handle now.
// Need to do it out of the mutex.
builder->LoadTableHandlers();
builder_guard->GetVersionBuilder()->LoadTableHandlers();
}
// This is fine because everything inside of this block is serialized --
@ -2019,9 +2061,9 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
}
}
void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, Builder* builder,
Version* v, VersionEdit* edit,
port::Mutex* mu) {
void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
VersionBuilder* builder, Version* v,
VersionEdit* edit, port::Mutex* mu) {
mu->AssertHeld();
assert(!edit->IsColumnFamilyManipulation());
@ -2097,7 +2139,7 @@ Status VersionSet::Recover(
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
uint32_t max_column_family = 0;
std::unordered_map<uint32_t, Builder*> builders;
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
// add default column family
auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
@ -2109,7 +2151,7 @@ Status VersionSet::Recover(
default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* default_cfd =
CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
builders.insert({0, new Builder(default_cfd)});
builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)});
{
VersionSet::LogReporter reporter;
@ -2155,7 +2197,8 @@ Status VersionSet::Recover(
{edit.column_family_, edit.column_family_name_});
} else {
cfd = CreateColumnFamily(cf_options->second, &edit);
builders.insert({edit.column_family_, new Builder(cfd)});
builders.insert(
{edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
}
} else if (edit.is_column_family_drop_) {
if (cf_in_builders) {
@ -2188,8 +2231,7 @@ Status VersionSet::Recover(
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// this should never happen since cf_in_builders is true
assert(cfd != nullptr);
if (edit.max_level_ >=
cfd->current()->GetStorageInfo()->NumberLevels()) {
if (edit.max_level_ >= cfd->current()->storage_info()->NumberLevels()) {
s = Status::InvalidArgument(
"db has more levels than options.num_levels");
break;
@ -2200,7 +2242,7 @@ Status VersionSet::Recover(
// to builder
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
builder->second->Apply(&edit);
builder->second->GetVersionBuilder()->Apply(&edit);
}
if (cfd != nullptr) {
@ -2280,7 +2322,7 @@ Status VersionSet::Recover(
for (auto cfd : *column_family_set_) {
auto builders_iter = builders.find(cfd->GetID());
assert(builders_iter != builders.end());
auto builder = builders_iter->second;
auto builder = builders_iter->second->GetVersionBuilder();
if (db_options_->max_open_files == -1) {
// unlimited table cache. Pre-load table handle now.
@ -2289,11 +2331,11 @@ Status VersionSet::Recover(
}
Version* v = new Version(cfd, this, current_version_number_++);
builder->SaveTo(v);
builder->SaveTo(v->storage_info());
// Install recovered version
std::vector<uint64_t> size_being_compacted(
v->GetStorageInfo()->NumberLevels() - 1);
v->storage_info()->NumberLevels() - 1);
cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted);
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted);
AppendVersion(cfd, v);
@ -2425,7 +2467,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
Version* current_version =
versions.GetColumnFamilySet()->GetDefault()->current();
auto* vstorage = current_version->GetStorageInfo();
auto* vstorage = current_version->storage_info();
int current_levels = vstorage->NumberLevels();
if (current_levels <= new_levels) {
@ -2454,18 +2496,17 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
}
}
std::vector<FileMetaData*>* old_files_list = vstorage->GetFiles();
// we need to allocate an array with the old number of levels size to
// avoid SIGSEGV in WriteSnapshot()
// however, all levels bigger or equal to new_levels will be empty
std::vector<FileMetaData*>* new_files_list =
new std::vector<FileMetaData*>[current_levels];
for (int i = 0; i < new_levels - 1; i++) {
new_files_list[i] = old_files_list[i];
new_files_list[i] = vstorage->LevelFiles(i);
}
if (first_nonempty_level > 0) {
new_files_list[new_levels - 1] = old_files_list[first_nonempty_level];
new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level);
}
delete[] vstorage -> files_;
@ -2498,7 +2539,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
uint64_t prev_log_number = 0;
int count = 0;
std::unordered_map<uint32_t, std::string> comparators;
std::unordered_map<uint32_t, Builder*> builders;
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
// add default column family
VersionEdit default_cf_edit;
@ -2506,7 +2547,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* default_cfd =
CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
builders.insert({0, new Builder(default_cfd)});
builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)});
{
VersionSet::LogReporter reporter;
@ -2545,7 +2586,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
break;
}
cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
builders.insert({edit.column_family_, new Builder(cfd)});
builders.insert(
{edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
} else if (edit.is_column_family_drop_) {
if (!cf_in_builders) {
s = Status::Corruption(
@ -2577,7 +2619,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
// to builder
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
builder->second->Apply(&edit);
builder->second->GetVersionBuilder()->Apply(&edit);
}
if (cfd != nullptr && edit.has_log_number_) {
@ -2624,12 +2666,12 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
for (auto cfd : *column_family_set_) {
auto builders_iter = builders.find(cfd->GetID());
assert(builders_iter != builders.end());
auto builder = builders_iter->second;
auto builder = builders_iter->second->GetVersionBuilder();
Version* v = new Version(cfd, this, current_version_number_++);
builder->SaveTo(v);
builder->SaveTo(v->storage_info());
std::vector<uint64_t> size_being_compacted(
v->GetStorageInfo()->NumberLevels() - 1);
v->storage_info()->NumberLevels() - 1);
cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted);
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted);
delete builder;
@ -2706,8 +2748,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
edit.SetColumnFamily(cfd->GetID());
for (int level = 0; level < cfd->NumberLevels(); level++) {
auto* files = cfd->current()->GetFiles();
for (const auto& f : files[level]) {
for (const auto& f :
cfd->current()->storage_info()->LevelFiles(level)) {
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
@ -2762,7 +2804,7 @@ bool VersionSet::ManifestContains(uint64_t manifest_file_number,
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0;
auto* vstorage = v->GetStorageInfo();
const auto* vstorage = v->storage_info();
for (int level = 0; level < vstorage->NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = vstorage->LevelFiles(level);
for (size_t i = 0; i < files.size(); i++) {
@ -2803,7 +2845,7 @@ void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
Version* dummy_versions = cfd->dummy_versions();
for (Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) {
auto* vstorage = v->GetStorageInfo();
const auto* vstorage = v->storage_info();
for (int level = 0; level < vstorage->NumberLevels(); level++) {
total_files += vstorage->LevelFiles(level).size();
}
@ -2817,7 +2859,7 @@ void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
Version* dummy_versions = cfd->dummy_versions();
for (Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) {
auto* vstorage = v->GetStorageInfo();
const auto* vstorage = v->storage_info();
for (int level = 0; level < vstorage->NumberLevels(); level++) {
for (const auto& f : vstorage->LevelFiles(level)) {
live_list->push_back(f->fd);
@ -2875,7 +2917,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
#ifndef NDEBUG
Version* version = c->column_family_data()->current();
VersionStorageInfo* vstorage = version->GetStorageInfo();
const VersionStorageInfo* vstorage = version->storage_info();
if (c->input_version() != version) {
Log(db_options_->info_log,
"[%s] VerifyCompactionFileConsistency version mismatch",
@ -2927,7 +2969,7 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
ColumnFamilyData** cfd) {
for (auto cfd_iter : *column_family_set_) {
Version* version = cfd_iter->current();
auto* vstorage = version->GetStorageInfo();
const auto* vstorage = version->storage_info();
for (int level = 0; level < vstorage->NumberLevels(); level++) {
for (const auto& file : vstorage->LevelFiles(level)) {
if (file->fd.GetNumber() == number) {
@ -2944,9 +2986,9 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
for (auto cfd : *column_family_set_) {
auto* files = cfd->current()->GetFiles();
for (int level = 0; level < cfd->NumberLevels(); level++) {
for (const auto& file : files[level]) {
for (const auto& file :
cfd->current()->storage_info()->LevelFiles(level)) {
LiveFileMetaData filemetadata;
filemetadata.column_family_name = cfd->GetName();
uint32_t path_id = file->fd.GetPathId();

@ -26,6 +26,7 @@
#include <atomic>
#include <limits>
#include "db/dbformat.h"
#include "db/version_builder.h"
#include "db/version_edit.h"
#include "port/port.h"
#include "db/table_cache.h"
@ -91,6 +92,10 @@ class VersionStorageInfo {
VersionStorageInfo* src_vstorage);
~VersionStorageInfo();
void Reserve(int level, size_t size) { files_[level].reserve(size); }
void MaybeAddFile(int level, FileMetaData* f);
void SetFinalized() { finalized_ = true; }
// Update num_non_empty_levels_.
@ -197,7 +202,6 @@ class VersionStorageInfo {
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
const std::vector<FileMetaData*>& LevelFiles(int level) const {
assert(finalized_);
return files_[level];
}
@ -249,8 +253,6 @@ class VersionStorageInfo {
// in a specified level. Uses *scratch as backing store.
const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const;
std::vector<FileMetaData*>* GetFiles() { return files_; }
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64_t MaxNextLevelOverlappingBytes();
@ -269,7 +271,7 @@ class VersionStorageInfo {
(accumulated_raw_key_size_ + accumulated_raw_value_size_);
}
uint64_t GetEstimatedActiveKeys();
uint64_t GetEstimatedActiveKeys() const;
// re-initializes the index that is used to offset into files_by_size_
// to find the next compaction candidate file.
@ -277,6 +279,10 @@ class VersionStorageInfo {
next_file_to_compact_by_size_[level] = 0;
}
const InternalKeyComparator* InternalComparator() {
return internal_comparator_;
}
private:
const InternalKeyComparator* internal_comparator_;
const Comparator* user_comparator_;
@ -374,8 +380,6 @@ class Version {
// and return true. Otherwise, return false.
bool Unref();
std::vector<FileMetaData*>* GetFiles() { return vstorage_.GetFiles(); }
// Add all files listed in the current version to *live.
void AddLiveFiles(std::vector<FileDescriptor>* live);
@ -385,10 +389,6 @@ class Version {
// Returns the version nuber of this version
uint64_t GetVersionNumber() const { return version_number_; }
uint64_t GetAverageValueSize() const {
return vstorage_.GetAverageValueSize();
}
// REQUIRES: lock is held
// On success, "tp" will contains the table properties of the file
// specified in "file_meta". If the file name of "file_meta" is
@ -405,7 +405,7 @@ class Version {
Status GetPropertiesOfAllTables(TablePropertiesCollection* props);
uint64_t GetEstimatedActiveKeys() {
return vstorage_.GetEstimatedActiveKeys();
return storage_info_.GetEstimatedActiveKeys();
}
size_t GetMemoryUsageByTableReaders();
@ -418,16 +418,18 @@ class Version {
return next_;
}
VersionStorageInfo* GetStorageInfo() { return &vstorage_; }
VersionStorageInfo* storage_info() { return &storage_info_; }
VersionSet* version_set() { return vset_; }
private:
friend class VersionSet;
const InternalKeyComparator* GetInternalComparator() const {
return vstorage_.internal_comparator_;
const InternalKeyComparator* internal_comparator() const {
return storage_info_.internal_comparator_;
}
const Comparator* GetUserComparator() const {
return vstorage_.user_comparator_;
const Comparator* user_comparator() const {
return storage_info_.user_comparator_;
}
bool PrefixMayMatch(const ReadOptions& read_options, Iterator* level_iter,
@ -446,15 +448,13 @@ class Version {
// record results in files_by_size_. The largest files are listed first.
void UpdateFilesBySize();
VersionSet* GetVersionSet() { return vset_; }
ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs
Logger* info_log_;
Statistics* db_statistics_;
TableCache* table_cache_;
const MergeOperator* merge_operator_;
VersionStorageInfo vstorage_;
VersionStorageInfo storage_info_;
VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list
@ -602,9 +602,9 @@ class VersionSet {
void GetObsoleteFiles(std::vector<FileMetaData*>* files);
ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
const EnvOptions& GetEnvOptions() { return env_options_; }
private:
class Builder;
struct ManifestWriter;
friend class Version;
@ -664,7 +664,7 @@ class VersionSet {
void operator=(const VersionSet&);
void LogAndApplyCFHelper(VersionEdit* edit);
void LogAndApplyHelper(ColumnFamilyData* cfd, Builder* b, Version* v,
void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, Version* v,
VersionEdit* edit, port::Mutex* mu);
};

@ -1125,7 +1125,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
int max = -1;
auto default_cfd = versions.GetColumnFamilySet()->GetDefault();
for (int i = 0; i < default_cfd->NumberLevels(); i++) {
if (default_cfd->current()->GetStorageInfo()->NumLevelFiles(i)) {
if (default_cfd->current()->storage_info()->NumLevelFiles(i)) {
max = i;
}
}

@ -104,7 +104,7 @@ Status CompactedDBImpl::Init(const Options& options) {
}
version_ = cfd_->GetSuperVersion()->current;
user_comparator_ = cfd_->user_comparator();
auto* vstorage = version_->GetStorageInfo();
auto* vstorage = version_->storage_info();
const LevelFilesBrief& l0 = vstorage->LevelFilesBrief(0);
// L0 should not have files
if (l0.num_files > 1) {

Loading…
Cancel
Save