Move compaction picker and internal key comparator to ColumnFamilyData

Summary: Compaction picker and internal key comparator are different for each column family (not global), so they should live in ColumnFamilyData

Test Plan: make check

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15801
main
Igor Canadi 11 years ago
parent 5693db2a02
commit f7489123e2
  1. 22
      db/column_family.cc
  2. 22
      db/column_family.h
  3. 2
      db/compaction_picker.cc
  4. 9
      db/compaction_picker.h
  5. 22
      db/db_impl.cc
  6. 22
      db/internal_stats.cc
  7. 4
      db/internal_stats.h
  8. 132
      db/version_set.cc
  9. 37
      db/version_set.h
  10. 22
      include/rocksdb/options.h
  11. 7
      util/options.cc

@ -14,6 +14,7 @@
#include <algorithm>
#include "db/version_set.h"
#include "db/compaction_picker.h"
namespace rocksdb {
@ -65,6 +66,7 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
dummy_versions_(dummy_versions),
current_(nullptr),
options_(options),
icmp_(options_.comparator),
mem_(nullptr),
imm_(options.min_write_buffer_number_to_merge),
super_version_(nullptr),
@ -72,7 +74,13 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
next_(nullptr),
prev_(nullptr),
log_number_(0),
need_slowdown_for_num_level0_files_(false) {}
need_slowdown_for_num_level0_files_(false) {
if (options_.compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset(new UniversalCompactionPicker(&options_, &icmp_));
} else {
compaction_picker_.reset(new LevelCompactionPicker(&options_, &icmp_));
}
}
ColumnFamilyData::~ColumnFamilyData() {
if (super_version_ != nullptr) {
@ -114,6 +122,18 @@ void ColumnFamilyData::CreateNewMemtable() {
mem_->Ref();
}
Compaction* ColumnFamilyData::PickCompaction() {
return compaction_picker_->PickCompaction(current_);
}
Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
return compaction_picker_->CompactRange(current_, input_level, output_level,
begin, end, compaction_end);
}
SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion) {
new_superversion->Init(mem_, imm_.current(), current_);

@ -23,6 +23,9 @@ class Version;
class VersionSet;
class MemTable;
class MemTableListVersion;
class CompactionPicker;
class Compaction;
class InternalKey;
// holds references to memtable, all immutable memtables and version
struct SuperVersion {
@ -62,6 +65,8 @@ class ColumnFamilyData {
uint32_t GetID() const { return id_; }
const std::string& GetName() { return name_; }
int NumberLevels() const { return options_.num_levels; }
void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
uint64_t GetLogNumber() const { return log_number_; }
@ -75,6 +80,17 @@ class ColumnFamilyData {
void SetCurrent(Version* current);
void CreateNewMemtable();
// See documentation in compaction_picker.h
Compaction* PickCompaction();
Compaction* CompactRange(int input_level, int output_level,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end);
CompactionPicker* compaction_picker() const {
return compaction_picker_.get();
}
const InternalKeyComparator& internal_comparator() const { return icmp_; }
SuperVersion* GetSuperVersion() const { return super_version_; }
uint64_t GetSuperVersionNumber() const {
return super_version_number_.load();
@ -102,6 +118,8 @@ class ColumnFamilyData {
Version* current_; // == dummy_versions->prev_
ColumnFamilyOptions options_;
const InternalKeyComparator icmp_;
MemTable* mem_;
MemTableList imm_;
SuperVersion* super_version_;
@ -124,6 +142,10 @@ class ColumnFamilyData {
// A flag indicating whether we should delay writes because
// we have too many level 0 files
bool need_slowdown_for_num_level0_files_;
// An object that keeps all the compaction stats
// and picks the next compaction
std::unique_ptr<CompactionPicker> compaction_picker_;
};
// Thread safe only for reading without a writer. All access should be

@ -41,7 +41,7 @@ uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) {
} // anonymous namespace
CompactionPicker::CompactionPicker(const Options* options,
CompactionPicker::CompactionPicker(const ColumnFamilyOptions* options,
const InternalKeyComparator* icmp)
: compactions_in_progress_(options->num_levels),
options_(options),

@ -24,7 +24,8 @@ class Version;
class CompactionPicker {
public:
CompactionPicker(const Options* options, const InternalKeyComparator* icmp);
CompactionPicker(const ColumnFamilyOptions* options,
const InternalKeyComparator* icmp);
virtual ~CompactionPicker();
// Pick level and inputs for a new compaction.
@ -115,7 +116,7 @@ class CompactionPicker {
// Per-level max bytes
std::unique_ptr<uint64_t[]> level_max_bytes_;
const Options* const options_;
const ColumnFamilyOptions* const options_;
private:
int num_levels_;
@ -124,7 +125,7 @@ class CompactionPicker {
class UniversalCompactionPicker : public CompactionPicker {
public:
UniversalCompactionPicker(const Options* options,
UniversalCompactionPicker(const ColumnFamilyOptions* options,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version) override;
@ -141,7 +142,7 @@ class UniversalCompactionPicker : public CompactionPicker {
class LevelCompactionPicker : public CompactionPicker {
public:
LevelCompactionPicker(const Options* options,
LevelCompactionPicker(const ColumnFamilyOptions* options,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version) override;

@ -1312,7 +1312,10 @@ int DBImpl::FindMinimumEmptyLevelFitting(int level) {
// stop if level i is not empty
if (current->NumLevelFiles(i) > 0) break;
// stop if level i is too small (cannot fit the level files)
if (versions_->MaxBytesForLevel(i) < current->NumLevelBytes(level)) break;
if (default_cfd_->compaction_picker()->MaxBytesForLevel(i) <
current->NumLevelBytes(level)) {
break;
}
minimum_level = i;
}
@ -1943,8 +1946,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
if (is_manual) {
ManualCompaction* m = manual_compaction_;
assert(m->in_progress);
c.reset(versions_->CompactRange(
m->input_level, m->output_level, m->begin, m->end, &manual_end));
c.reset(default_cfd_->CompactRange(m->input_level, m->output_level,
m->begin, m->end, &manual_end));
if (!c) {
m->done = true;
}
@ -1959,7 +1962,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
? "(end)"
: manual_end->DebugString().c_str()));
} else if (!options_.disable_auto_compactions) {
c.reset(versions_->PickCompaction());
c.reset(default_cfd_->PickCompaction());
}
Status status;
@ -1983,14 +1986,14 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), default_cfd_->current()->LevelSummary(&tmp));
versions_->ReleaseCompactionFiles(c.get(), status);
default_cfd_->compaction_picker()->ReleaseCompactionFiles(c.get(), status);
*madeProgress = true;
} else {
MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel.
CompactionState* compact = new CompactionState(c.get());
status = DoCompactionWork(compact, deletion_state);
CleanupCompaction(compact, status);
versions_->ReleaseCompactionFiles(c.get(), status);
default_cfd_->compaction_picker()->ReleaseCompactionFiles(c.get(), status);
c->ReleaseInputs();
*madeProgress = true;
}
@ -2121,7 +2124,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
// Over-estimate slightly so we don't end up just barely crossing
// the threshold.
compact->outfile->SetPreallocationBlockSize(
1.1 * versions_->MaxFileSizeForLevel(compact->compaction->output_level()));
1.1 * default_cfd_->compaction_picker()->MaxFileSizeForLevel(
compact->compaction->output_level()));
CompressionType compression_type = GetCompressionType(
options_, compact->compaction->output_level(),
@ -3534,9 +3538,7 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
const Slice& property, std::string* value) {
value->clear();
MutexLock l(&mutex_);
return internal_stats_.GetProperty(property, value, versions_.get(),
default_cfd_->current(),
default_cfd_->imm()->size());
return internal_stats_.GetProperty(property, value, default_cfd_);
}
void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family,

@ -8,14 +8,14 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/internal_stats.h"
#include "db/column_family.h"
#include <vector>
namespace rocksdb {
bool InternalStats::GetProperty(const Slice& property, std::string* value,
VersionSet* version_set, Version* current,
int immsize) {
ColumnFamilyData* cfd) {
Slice in = property;
Slice prefix("rocksdb.");
if (!in.starts_with(prefix)) return false;
@ -30,7 +30,7 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
} else {
char buf[100];
snprintf(buf, sizeof(buf), "%d",
current->NumLevelFiles(static_cast<int>(level)));
cfd->current()->NumLevelFiles(static_cast<int>(level)));
*value = buf;
return true;
}
@ -43,8 +43,8 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
for (int level = 0; level < number_levels_; level++) {
snprintf(buf, sizeof(buf), "%3d %8d %8.0f\n", level,
current->NumLevelFiles(level),
current->NumLevelBytes(level) / 1048576.0);
cfd->current()->NumLevelFiles(level),
cfd->current()->NumLevelBytes(level) / 1048576.0);
value->append(buf);
}
return true;
@ -87,7 +87,7 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
);
value->append(buf);
for (int level = 0; level < number_levels_; level++) {
int files = current->NumLevelFiles(level);
int files = cfd->current()->NumLevelFiles(level);
if (compaction_stats_[level].micros > 0 || files > 0) {
int64_t bytes_read = compaction_stats_[level].bytes_readn +
compaction_stats_[level].bytes_readnp1;
@ -117,9 +117,9 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f "
"%10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f "
"%9lu\n",
level, files, current->NumLevelBytes(level) / 1048576.0,
current->NumLevelBytes(level) /
version_set->MaxBytesForLevel(level),
level, files, cfd->current()->NumLevelBytes(level) / 1048576.0,
cfd->current()->NumLevelBytes(level) /
cfd->compaction_picker()->MaxBytesForLevel(level),
compaction_stats_[level].micros / 1e6, bytes_read / 1048576.0,
compaction_stats_[level].bytes_written / 1048576.0,
compaction_stats_[level].bytes_readn / 1048576.0,
@ -285,10 +285,10 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value,
return true;
} else if (in == "sstables") {
*value = current->DebugString();
*value = cfd->current()->DebugString();
return true;
} else if (in == "num-immutable-mem-table") {
*value = std::to_string(immsize);
*value = std::to_string(cfd->imm()->size());
return true;
}

@ -16,6 +16,8 @@
#include <vector>
#include <string>
class ColumnFamilyData;
namespace rocksdb {
class InternalStats {
public:
@ -100,7 +102,7 @@ class InternalStats {
}
bool GetProperty(const Slice& property, std::string* value,
VersionSet* version_set, Version* current, int immsize);
ColumnFamilyData* cfd);
private:
std::vector<CompactionStats> compaction_stats_;

@ -409,8 +409,10 @@ static bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
return false;
}
Version::Version(VersionSet* vset, uint64_t version_number)
: vset_(vset),
Version::Version(ColumnFamilyData* cfd, VersionSet* vset,
uint64_t version_number)
: cfd_(cfd),
vset_(vset),
next_(this),
prev_(this),
refs_(0),
@ -434,7 +436,7 @@ void Version::Get(const ReadOptions& options,
bool* value_found) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
const Comparator* ucmp = vset_->icmp_.user_comparator();
const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
auto merge_operator = db_options.merge_operator.get();
auto logger = db_options.info_log;
@ -481,7 +483,7 @@ void Version::Get(const ReadOptions& options,
// On Level-n (n>=1), files are sorted.
// Binary search to find earliest index whose largest key >= ikey.
// We will also stop when the file no longer overlaps ikey
start_index = FindFile(vset_->icmp_, files_[level], ikey);
start_index = FindFile(cfd_->internal_comparator(), files_[level], ikey);
}
// Traverse each relevant file to find the desired key
@ -507,11 +509,12 @@ void Version::Get(const ReadOptions& options,
// Sanity check to make sure that the files are correctly sorted
if (prev_file) {
if (level != 0) {
int comp_sign = vset_->icmp_.Compare(prev_file->largest, f->smallest);
int comp_sign = cfd_->internal_comparator().Compare(
prev_file->largest, f->smallest);
assert(comp_sign < 0);
} else {
// level == 0, the current file cannot be newer than the previous one.
if (vset_->options_->compaction_style == kCompactionStyleUniversal) {
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
assert(!NewestFirstBySeqNo(f, prev_file));
} else {
assert(!NewestFirst(f, prev_file));
@ -597,7 +600,7 @@ bool Version::UpdateStats(const GetStats& stats) {
void Version::Finalize(std::vector<uint64_t>& size_being_compacted) {
// Pre-sort level0 for Get()
if (vset_->options_->compaction_style == kCompactionStyleUniversal) {
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
std::sort(files_[0].begin(), files_[0].end(), NewestFirstBySeqNo);
} else {
std::sort(files_[0].begin(), files_[0].end(), NewestFirst);
@ -607,7 +610,7 @@ void Version::Finalize(std::vector<uint64_t>& size_being_compacted) {
int max_score_level = 0;
int num_levels_to_check =
(vset_->options_->compaction_style != kCompactionStyleUniversal)
(cfd_->options()->compaction_style != kCompactionStyleUniversal)
? NumberLevels() - 1
: 1;
@ -633,15 +636,15 @@ void Version::Finalize(std::vector<uint64_t>& size_being_compacted) {
}
// If we are slowing down writes, then we better compact that first
if (numfiles >= vset_->options_->level0_stop_writes_trigger) {
if (numfiles >= cfd_->options()->level0_stop_writes_trigger) {
score = 1000000;
// Log(options_->info_log, "XXX score l0 = 1000000000 max");
} else if (numfiles >= vset_->options_->level0_slowdown_writes_trigger) {
} else if (numfiles >= cfd_->options()->level0_slowdown_writes_trigger) {
score = 10000;
// Log(options_->info_log, "XXX score l0 = 1000000 medium");
} else {
score = static_cast<double>(numfiles) /
vset_->options_->level0_file_num_compaction_trigger;
cfd_->options()->level0_file_num_compaction_trigger;
if (score >= 1) {
// Log(options_->info_log, "XXX score l0 = %d least", (int)score);
}
@ -650,7 +653,8 @@ void Version::Finalize(std::vector<uint64_t>& size_being_compacted) {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes =
TotalFileSize(files_[level]) - size_being_compacted[level];
score = static_cast<double>(level_bytes) / vset_->MaxBytesForLevel(level);
score = static_cast<double>(level_bytes) /
cfd_->compaction_picker()->MaxBytesForLevel(level);
if (score > 1) {
// Log(options_->info_log, "XXX score l%d = %d ", level, (int)score);
}
@ -708,7 +712,7 @@ bool CompareSeqnoDescending(const Version::Fsize& first,
void Version::UpdateFilesBySize() {
// No need to sort the highest level because it is never compacted.
int max_level =
(vset_->options_->compaction_style == kCompactionStyleUniversal)
(cfd_->options()->compaction_style == kCompactionStyleUniversal)
? NumberLevels()
: NumberLevels() - 1;
@ -725,7 +729,7 @@ void Version::UpdateFilesBySize() {
}
// sort the top number_of_files_to_sort_ based on file size
if (vset_->options_->compaction_style == kCompactionStyleUniversal) {
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
int num = temp.size();
std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
CompareSeqnoDescending);
@ -772,8 +776,9 @@ bool Version::NeedsCompaction() const {
// TODO(sdong): improve this function to be accurate for universal
// compactions.
int num_levels_to_check =
(vset_->options_->compaction_style != kCompactionStyleUniversal) ?
NumberLevels() - 1 : 1;
(cfd_->options()->compaction_style != kCompactionStyleUniversal)
? NumberLevels() - 1
: 1;
for (int i = 0; i < num_levels_to_check; i++) {
if (compaction_score_[i] >= 1) {
return true;
@ -785,8 +790,9 @@ bool Version::NeedsCompaction() const {
bool Version::OverlapInLevel(int level,
const Slice* smallest_user_key,
const Slice* largest_user_key) {
return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
smallest_user_key, largest_user_key);
return SomeFileOverlapsRange(cfd_->internal_comparator(), (level > 0),
files_[level], smallest_user_key,
largest_user_key);
}
int Version::PickLevelForMemTableOutput(
@ -799,7 +805,7 @@ int Version::PickLevelForMemTableOutput(
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
std::vector<FileMetaData*> overlaps;
int max_mem_compact_level = vset_->options_->max_mem_compaction_level;
int max_mem_compact_level = cfd_->options()->max_mem_compaction_level;
while (max_mem_compact_level > 0 && level < max_mem_compact_level) {
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
break;
@ -810,7 +816,7 @@ int Version::PickLevelForMemTableOutput(
}
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const uint64_t sum = TotalFileSize(overlaps);
if (sum > vset_->compaction_picker_->MaxGrandParentOverlapBytes(level)) {
if (sum > cfd_->compaction_picker()->MaxGrandParentOverlapBytes(level)) {
break;
}
level++;
@ -841,7 +847,7 @@ void Version::GetOverlappingInputs(int level,
if (file_index) {
*file_index = -1;
}
const Comparator* user_cmp = vset_->icmp_.user_comparator();
const Comparator* user_cmp = cfd_->internal_comparator().user_comparator();
if (begin != nullptr && end != nullptr && level > 0) {
GetOverlappingInputsBinarySearch(level, user_begin, user_end, inputs,
hint_index, file_index);
@ -893,7 +899,7 @@ void Version::GetOverlappingInputsBinarySearch(
int mid = 0;
int max = files_[level].size() -1;
bool foundOverlap = false;
const Comparator* user_cmp = vset_->icmp_.user_comparator();
const Comparator* user_cmp = cfd_->internal_comparator().user_comparator();
// if the caller already knows the index of a file that has overlap,
// then we can skip the binary search.
@ -939,7 +945,7 @@ void Version::ExtendOverlappingInputs(
std::vector<FileMetaData*>* inputs,
unsigned int midIndex) {
const Comparator* user_cmp = vset_->icmp_.user_comparator();
const Comparator* user_cmp = cfd_->internal_comparator().user_comparator();
#ifndef NDEBUG
{
// assert that the file at midIndex overlaps with the range
@ -1003,12 +1009,12 @@ bool Version::HasOverlappingUserKey(
return false;
}
const Comparator* user_cmp = vset_->icmp_.user_comparator();
const Comparator* user_cmp = cfd_->internal_comparator().user_comparator();
const std::vector<FileMetaData*>& files = files_[level];
const size_t kNumFiles = files.size();
// Check the last file in inputs against the file after it
size_t last_file = FindFile(vset_->icmp_, files,
size_t last_file = FindFile(cfd_->internal_comparator(), files,
inputs->back()->largest.Encode());
assert(0 <= last_file && last_file < kNumFiles); // File should exist!
if (last_file < kNumFiles-1) { // If not the last file
@ -1021,7 +1027,7 @@ bool Version::HasOverlappingUserKey(
}
// Check the first file in inputs against the file just before it
size_t first_file = FindFile(vset_->icmp_, files,
size_t first_file = FindFile(cfd_->internal_comparator(), files,
inputs->front()->smallest.Encode());
assert(0 <= first_file && first_file <= last_file); // File should exist!
if (first_file > 0) { // If not first file
@ -1164,17 +1170,17 @@ class VersionSet::Builder {
FileSet* added_files;
};
VersionSet* vset_;
ColumnFamilyData* cfd_;
Version* base_;
LevelState* levels_;
public:
// Initialize a builder with the files from *base and other info from *vset
Builder(VersionSet* vset, Version* base) : vset_(vset), base_(base) {
Builder(ColumnFamilyData* cfd, Version* base) : cfd_(cfd), base_(base) {
base_->Ref();
levels_ = new LevelState[base->NumberLevels()];
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
cmp.internal_comparator = &cfd_->internal_comparator();
for (int level = 0; level < base->NumberLevels(); level++) {
levels_[level].added_files = new FileSet(cmp);
}
@ -1210,7 +1216,7 @@ class VersionSet::Builder {
for (uint32_t i = 1; i < v->files_[level].size(); i++) {
const InternalKey& prev_end = v->files_[level][i-1]->largest;
const InternalKey& this_begin = v->files_[level][i]->smallest;
if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
if (cfd_->internal_comparator().Compare(prev_end, this_begin) >= 0) {
fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
prev_end.DebugString().c_str(),
this_begin.DebugString().c_str());
@ -1315,7 +1321,7 @@ class VersionSet::Builder {
CheckConsistency(base_);
CheckConsistency(v);
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
cmp.internal_comparator = &cfd_->internal_comparator();
for (int level = 0; level < base_->NumberLevels(); level++) {
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
@ -1354,8 +1360,8 @@ class VersionSet::Builder {
std::vector<FileMetaData*>* files = &v->files_[level];
if (level > 0 && !files->empty()) {
// Must not overlap
assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest,
f->smallest) < 0);
assert(cfd_->internal_comparator().Compare(
(*files)[files->size() - 1]->largest, f->smallest) < 0);
}
f->refs++;
files->push_back(f);
@ -1382,11 +1388,6 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options,
manifest_file_size_(0),
storage_options_(storage_options),
storage_options_compactions_(storage_options_) {
if (options_->compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset(new UniversalCompactionPicker(options_, &icmp_));
} else {
compaction_picker_.reset(new LevelCompactionPicker(options_, &icmp_));
}
}
VersionSet::~VersionSet() {
@ -1439,8 +1440,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
}
std::vector<VersionEdit*> batch_edits;
Version* v = new Version(this, current_version_number_++);
Builder builder(this, column_family_data->current());
Version* v = new Version(column_family_data, this, current_version_number_++);
Builder builder(column_family_data, column_family_data->current());
// process all requests in the queue
ManifestWriter* last_writer = &w;
@ -1483,7 +1484,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
{
// calculate the amount of data being compacted at every level
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
compaction_picker_->SizeBeingCompacted(size_being_compacted);
column_family_data->compaction_picker()->SizeBeingCompacted(
size_being_compacted);
mu->Unlock();
@ -1679,7 +1681,7 @@ Status VersionSet::Recover(
} else {
ColumnFamilyData* default_cfd =
CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
builders.insert({0, new Builder(this, default_cfd->current())});
builders.insert({0, new Builder(default_cfd, default_cfd->current())});
}
{
@ -1725,7 +1727,7 @@ Status VersionSet::Recover(
ColumnFamilyData* new_cfd =
CreateColumnFamily(cf_options->second, &edit);
builders.insert(
{edit.column_family_, new Builder(this, new_cfd->current())});
{edit.column_family_, new Builder(new_cfd, new_cfd->current())});
}
} else if (edit.is_column_family_drop_) {
if (cf_in_builders) {
@ -1815,12 +1817,12 @@ Status VersionSet::Recover(
if (s.ok()) {
for (auto cfd : *column_family_set_) {
Version* v = new Version(this, current_version_number_++);
Version* v = new Version(cfd, this, current_version_number_++);
builders[cfd->GetID()]->SaveTo(v);
// Install recovered version
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
compaction_picker_->SizeBeingCompacted(size_being_compacted);
cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted);
v->Finalize(size_being_compacted);
AppendVersion(cfd, v);
}
@ -2007,7 +2009,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
uint64_t prev_log_number = 0;
int count = 0;
// TODO works only for default column family currently
VersionSet::Builder builder(this,
VersionSet::Builder builder(column_family_set_->GetDefault(),
column_family_set_->GetDefault()->current());
{
@ -2084,7 +2086,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
}
if (s.ok()) {
Version* v = new Version(this, 0);
Version* v = new Version(column_family_set_->GetDefault(), this, 0);
builder.SaveTo(v);
manifest_file_number_ = next_file;
@ -2258,22 +2260,6 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
}
}
Compaction* VersionSet::PickCompaction() {
// TODO this only works for default column family now
Version* version = column_family_set_->GetDefault()->current();
return compaction_picker_->PickCompaction(version);
}
Compaction* VersionSet::CompactRange(int input_level, int output_level,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
// TODO this only works for default column family now
Version* version = column_family_set_->GetDefault()->current();
return compaction_picker_->CompactRange(version, input_level, output_level,
begin, end, compaction_end);
}
Iterator* VersionSet::MakeInputIterator(Compaction* c) {
ReadOptions options;
options.fill_cache = false;
@ -2295,7 +2281,9 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
} else {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
new Version::LevelFileNumIterator(icmp_, c->inputs(which)),
new Version::LevelFileNumIterator(
c->input_version()->cfd_->internal_comparator(),
c->inputs(which)),
&GetFileIterator, table_cache_, options, storage_options_,
true /* for compaction */);
}
@ -2307,14 +2295,6 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
return result;
}
double VersionSet::MaxBytesForLevel(int level) {
return compaction_picker_->MaxBytesForLevel(level);
}
uint64_t VersionSet::MaxFileSizeForLevel(int level) {
return compaction_picker_->MaxFileSizeForLevel(level);
}
// verify that the files listed in this compaction are present
// in the current version
bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
@ -2365,10 +2345,6 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
return true; // everything good
}
void VersionSet::ReleaseCompactionFiles(Compaction* c, Status status) {
compaction_picker_->ReleaseCompactionFiles(c, status);
}
Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
FileMetaData* meta,
ColumnFamilyData** cfd) {
@ -2415,11 +2391,11 @@ ColumnFamilyData* VersionSet::CreateColumnFamily(
const ColumnFamilyOptions& options, VersionEdit* edit) {
assert(edit->is_column_family_add_);
Version* dummy_versions = new Version(this);
Version* dummy_versions = new Version(nullptr, this);
auto new_cfd = column_family_set_->CreateColumnFamily(
edit->column_family_name_, edit->column_family_, dummy_versions, options);
AppendVersion(new_cfd, new Version(this, current_version_number_++));
AppendVersion(new_cfd, new Version(new_cfd, this, current_version_number_++));
new_cfd->CreateNewMemtable();
return new_cfd;
}

@ -217,6 +217,7 @@ class Version {
// record results in files_by_size_. The largest files are listed first.
void UpdateFilesBySize();
ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs
VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list
@ -262,7 +263,7 @@ class Version {
// used for debugging and logging purposes only.
uint64_t version_number_;
explicit Version(VersionSet* vset, uint64_t version_number = 0);
Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number = 0);
~Version();
@ -362,29 +363,6 @@ class VersionSet {
int NumberLevels() const { return num_levels_; }
// Pick level and inputs for a new compaction.
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
Compaction* PickCompaction();
// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
// level that overlaps the specified range. Caller should delete
// the result.
//
// The returned Compaction might not include the whole requested range.
// In that case, compaction_end will be set to the next key that needs
// compacting. In case the compaction will compact the whole range,
// compaction_end will be set to nullptr.
// Client is responsible for compaction_end storage -- when called,
// *compaction_end should point to valid InternalKey!
Compaction* CompactRange(int input_level,
int output_level,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end);
// Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed.
Iterator* MakeInputIterator(Compaction* c);
@ -409,13 +387,6 @@ class VersionSet {
// pick the same files to compact.
bool VerifyCompactionFileConsistency(Compaction* c);
double MaxBytesForLevel(int level);
// Get the max file size in a given level.
uint64_t MaxFileSizeForLevel(int level);
void ReleaseCompactionFiles(Compaction* c, Status status);
Status GetMetadataForFile(uint64_t number, int* filelevel,
FileMetaData* metadata, ColumnFamilyData** cfd);
@ -471,10 +442,6 @@ class VersionSet {
// Opened lazily
unique_ptr<log::Writer> descriptor_log_;
// An object that keeps all the compaction stats
// and picks the next compaction
std::unique_ptr<CompactionPicker> compaction_picker_;
// generates a increasing version number for every new version
uint64_t current_version_number_;

@ -118,6 +118,12 @@ struct ColumnFamilyOptions {
// Default: a factory that doesn't provide any object
std::shared_ptr<CompactionFilterFactory> compaction_filter_factory;
// Any internal progress/error information generated by the db will
// be written to info_log if it is non-nullptr, or to a file stored
// in the same directory as the DB contents if info_log is nullptr.
// Default: nullptr
shared_ptr<Logger> info_log;
// -------------------
// Parameters that affect performance
@ -316,6 +322,11 @@ struct ColumnFamilyOptions {
// stop building a single file in a level->level+1 compaction.
int max_grandparent_overlap_factor;
// If non-null, then we should collect metrics about database operations
// Statistics objects should not be shared between DB instances as
// it does not use any locks to prevent concurrent updates.
shared_ptr<Statistics> statistics;
// Disable compaction triggered by seek.
// With bloomfilter and fast storage, a miss on one level
// is very cheap if the file handle is cached in table cache
@ -478,12 +489,6 @@ struct DBOptions {
// Default: Env::Default()
Env* env;
// Any internal progress/error information generated by the db will
// be written to info_log if it is non-nullptr, or to a file stored
// in the same directory as the DB contents if info_log is nullptr.
// Default: nullptr
shared_ptr<Logger> info_log;
// Number of open files that can be used by the DB. You may need to
// increase this if your database has a large working set (budget
// one open file per 2MB of working set).
@ -491,11 +496,6 @@ struct DBOptions {
// Default: 1000
int max_open_files;
// If non-null, then we should collect metrics about database operations
// Statistics objects should not be shared between DB instances as
// it does not use any locks to prevent concurrent updates.
shared_ptr<Statistics> statistics;
// If true, then the contents of data files are not synced
// to stable storage. Their contents remain in the OS buffers till the
// OS decides to flush them. This option is good for bulk-loading

@ -32,6 +32,7 @@ ColumnFamilyOptions::ColumnFamilyOptions()
compaction_filter_factory(
std::shared_ptr<CompactionFilterFactory>(
new DefaultCompactionFilterFactory())),
info_log(nullptr),
write_buffer_size(4<<20),
max_write_buffer_number(2),
min_write_buffer_number_to_merge(1),
@ -56,6 +57,7 @@ ColumnFamilyOptions::ColumnFamilyOptions()
expanded_compaction_factor(25),
source_compaction_factor(1),
max_grandparent_overlap_factor(10),
statistics(nullptr),
disable_seek_compaction(false),
soft_rate_limit(0.0),
hard_rate_limit(0.0),
@ -84,6 +86,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
merge_operator(options.merge_operator),
compaction_filter(options.compaction_filter),
compaction_filter_factory(options.compaction_filter_factory),
info_log(options.info_log),
write_buffer_size(options.write_buffer_size),
max_write_buffer_number(options.max_write_buffer_number),
min_write_buffer_number_to_merge(
@ -113,6 +116,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
expanded_compaction_factor(options.expanded_compaction_factor),
source_compaction_factor(options.source_compaction_factor),
max_grandparent_overlap_factor(options.max_grandparent_overlap_factor),
statistics(options.statistics),
disable_seek_compaction(options.disable_seek_compaction),
soft_rate_limit(options.soft_rate_limit),
hard_rate_limit(options.hard_rate_limit),
@ -145,7 +149,6 @@ DBOptions::DBOptions()
error_if_exists(false),
paranoid_checks(false),
env(Env::Default()),
info_log(nullptr),
max_open_files(1000),
disableDataSync(false),
use_fsync(false),
@ -178,9 +181,7 @@ DBOptions::DBOptions(const Options& options)
error_if_exists(options.error_if_exists),
paranoid_checks(options.paranoid_checks),
env(options.env),
info_log(options.info_log),
max_open_files(options.max_open_files),
statistics(options.statistics),
disableDataSync(options.disableDataSync),
use_fsync(options.use_fsync),
db_stats_log_interval(options.db_stats_log_interval),

Loading…
Cancel
Save