VersionSet cleanup

Summary:
Removed icmp_ from VersionSet (since it's per-column-family, not per-DB-instance)
Unfriended VersionSet and ColumnFamilyData (yay!)
Removed VersionSet::NumberLevels()
Cleaned up DBImpl

Test Plan: make check

Reviewers: dhruba, haobo, kailiu

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15819
main
Igor Canadi 11 years ago
parent 27a8856c23
commit 29bacb2eb6
  1. 2
      db/column_family.cc
  2. 1
      db/column_family.h
  3. 13
      db/compaction.cc
  4. 59
      db/db_impl.cc
  5. 3
      db/db_test.cc
  6. 75
      db/version_set.cc
  7. 15
      db/version_set.h
  8. 8
      util/ldb_cmd.cc

@ -118,7 +118,7 @@ void ColumnFamilyData::CreateNewMemtable() {
if (mem_ != nullptr) {
delete mem_->Unref();
}
mem_ = new MemTable(current_->vset_->icmp_, options_);
mem_ = new MemTable(icmp_, options_);
mem_->Ref();
}

@ -89,6 +89,7 @@ class ColumnFamilyData {
CompactionPicker* compaction_picker() const {
return compaction_picker_.get();
}
const Comparator* user_comparator() const { return icmp_.user_comparator(); }
const InternalKeyComparator& internal_comparator() const { return icmp_; }
SuperVersion* GetSuperVersion() const { return super_version_; }

@ -79,12 +79,11 @@ void Compaction::AddInputDeletions(VersionEdit* edit) {
}
bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
if (input_version_->vset_->options_->compaction_style ==
kCompactionStyleUniversal) {
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
return bottommost_level_;
}
// Maybe use binary search to find right entry instead of linear search?
const Comparator* user_cmp = input_version_->vset_->icmp_.user_comparator();
const Comparator* user_cmp = cfd_->user_comparator();
for (int lvl = level_ + 2; lvl < number_levels_; lvl++) {
const std::vector<FileMetaData*>& files = input_version_->files_[lvl];
for (; level_ptrs_[lvl] < files.size(); ) {
@ -105,7 +104,7 @@ bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
bool Compaction::ShouldStopBefore(const Slice& internal_key) {
// Scan to find earliest grandparent file that contains key.
const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
const InternalKeyComparator* icmp = &cfd_->internal_comparator();
while (grandparent_index_ < grandparents_.size() &&
icmp->Compare(internal_key,
grandparents_[grandparent_index_]->largest.Encode()) > 0) {
@ -143,8 +142,7 @@ void Compaction::MarkFilesBeingCompacted(bool value) {
// Is this compaction producing files at the bottommost level?
void Compaction::SetupBottomMostLevel(bool isManual) {
if (input_version_->vset_->options_->compaction_style ==
kCompactionStyleUniversal) {
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
// If universal compaction style is used and manual
// compaction is occuring, then we are guaranteed that
// all files will be picked in a single compaction
@ -157,8 +155,7 @@ void Compaction::SetupBottomMostLevel(bool isManual) {
return;
}
bottommost_level_ = true;
int num_levels = input_version_->vset_->NumberLevels();
for (int i = output_level() + 1; i < num_levels; i++) {
for (int i = output_level() + 1; i < number_levels_; i++) {
if (input_version_->NumLevelFiles(i) > 0) {
bottommost_level_ = false;
break;

@ -294,8 +294,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
table_cache_.reset(new TableCache(dbname_, &options_,
storage_options_, table_cache_size));
versions_.reset(new VersionSet(dbname_, &options_, storage_options_,
table_cache_.get(), &internal_comparator_));
versions_.reset(
new VersionSet(dbname_, &options_, storage_options_, table_cache_.get()));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
@ -1127,8 +1127,8 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
(unsigned long)m->GetLogNumber());
list.push_back(m->NewIterator());
}
Iterator* iter = NewMergingIterator(&internal_comparator_, &list[0],
list.size());
Iterator* iter =
NewMergingIterator(&cfd->internal_comparator(), &list[0], list.size());
Log(options_.info_log,
"Level-0 flush table #%lu: started",
(unsigned long)meta.number);
@ -1290,7 +1290,7 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
for (int level = 0; level <= max_level_with_files; level++) {
// in case the compaction is unversal or if we're compacting the
// bottom-most level, the output level will be the same as input one
if (options_.compaction_style == kCompactionStyleUniversal ||
if (cfd->options()->compaction_style == kCompactionStyleUniversal ||
level == max_level_with_files) {
s = RunManualCompaction(cfd, level, level, begin, end);
} else {
@ -1400,15 +1400,27 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
}
int DBImpl::NumberLevels(const ColumnFamilyHandle& column_family) {
return options_.num_levels;
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
mutex_.Unlock();
assert(cfd != nullptr);
return cfd->NumberLevels();
}
int DBImpl::MaxMemCompactionLevel(const ColumnFamilyHandle& column_family) {
return options_.max_mem_compaction_level;
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
mutex_.Unlock();
assert(cfd != nullptr);
return cfd->options()->max_mem_compaction_level;
}
int DBImpl::Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) {
return options_.level0_stop_writes_trigger;
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
mutex_.Unlock();
assert(cfd != nullptr);
return cfd->options()->level0_stop_writes_trigger;
}
uint64_t DBImpl::CurrentVersionNumber() const {
@ -1630,7 +1642,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
// For universal compaction, we enforce every manual compaction to compact
// all files.
if (begin == nullptr ||
options_.compaction_style == kCompactionStyleUniversal) {
cfd->options()->compaction_style == kCompactionStyleUniversal) {
manual.begin = nullptr;
} else {
begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
@ -2742,8 +2754,9 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
immutable_mems->AddIterators(options, &iterator_list);
// Collect iterators for files in L0 - Ln
version->AddIterators(options, storage_options_, &iterator_list);
Iterator* internal_iter = NewMergingIterator(
&internal_comparator_, &iterator_list[0], iterator_list.size());
Iterator* internal_iter =
NewMergingIterator(&default_cfd_->internal_comparator(),
&iterator_list[0], iterator_list.size());
cleanup->version = version;
cleanup->mu = &mutex_;
cleanup->db = this;
@ -2799,8 +2812,8 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
immutable_cleanup->db = this;
immutable_cleanup->mu = &mutex_;
immutable_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
immutable_iter = NewMergingIterator(&default_cfd_->internal_comparator(),
&list[0], list.size());
immutable_iter->RegisterCleanup(CleanupIteratorState, immutable_cleanup,
nullptr);
@ -3500,7 +3513,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
// (compression, etc) but err on the side of caution.
lfile->SetPreallocationBlockSize(1.1 *
cfd->options()->write_buffer_size);
memtmp = new MemTable(internal_comparator_, *cfd->options());
memtmp = new MemTable(cfd->internal_comparator(), *cfd->options());
new_superversion = new SuperVersion();
}
}
@ -3610,7 +3623,6 @@ Status DBImpl::DeleteFile(std::string name) {
int level;
FileMetaData metadata;
int maxlevel = NumberLevels();
ColumnFamilyData* cfd;
VersionEdit edit;
DeletionState deletion_state(0, true);
@ -3622,7 +3634,7 @@ Status DBImpl::DeleteFile(std::string name) {
name.c_str());
return Status::InvalidArgument("File not found");
}
assert((level > 0) && (level < maxlevel));
assert((level > 0) && (level < cfd->NumberLevels()));
// If the file is being compacted no need to delete.
if (metadata.being_compacted) {
@ -3634,7 +3646,7 @@ Status DBImpl::DeleteFile(std::string name) {
// Only the files in the last level can be deleted externally.
// This is to make sure that any deletion tombstones are not
// lost. Check that the level passed is the last level.
for (int i = level + 1; i < maxlevel; i++) {
for (int i = level + 1; i < cfd->NumberLevels(); i++) {
if (cfd->current()->NumLevelFiles(i) != 0) {
Log(options_.info_log,
"DeleteFile %s FAILED. File not in last level\n", name.c_str());
@ -3820,9 +3832,11 @@ Status DB::OpenWithColumnFamilies(
}
}
if (s.ok() && impl->options_.compaction_style == kCompactionStyleUniversal) {
Version* current = impl->default_cfd_->current();
for (int i = 1; i < impl->NumberLevels(); i++) {
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
if (cfd->options()->compaction_style == kCompactionStyleUniversal) {
Version* current = cfd->current();
for (int i = 1; i < current->NumberLevels(); ++i) {
int num_files = current->NumLevelFiles(i);
if (num_files > 0) {
s = Status::InvalidArgument("Not all files are at level 0. Cannot "
@ -3831,6 +3845,11 @@ Status DB::OpenWithColumnFamilies(
}
}
}
if (!s.ok()) {
break;
}
}
}
impl->mutex_.Unlock();

@ -5039,10 +5039,9 @@ void BM_LogAndApply(int iters, int num_base_files) {
port::Mutex mu;
MutexLock l(&mu);
InternalKeyComparator cmp(BytewiseComparator());
Options options;
EnvOptions sopt;
VersionSet vset(dbname, &options, sopt, nullptr, &cmp);
VersionSet vset(dbname, &options, sopt, nullptr);
std::vector<ColumnFamilyDescriptor> dummy;
dummy.push_back(ColumnFamilyDescriptor());
ASSERT_OK(vset.Recover(dummy));

@ -417,7 +417,8 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset,
next_(this),
prev_(this),
refs_(0),
num_levels_(vset->num_levels_),
// cfd is nullptr if Version is dummy
num_levels_(cfd == nullptr ? 0 : cfd->NumberLevels()),
files_(new std::vector<FileMetaData*>[num_levels_]),
files_by_size_(num_levels_),
next_file_to_compact_by_size_(num_levels_),
@ -1372,19 +1373,16 @@ class VersionSet::Builder {
VersionSet::VersionSet(const std::string& dbname, const Options* options,
const EnvOptions& storage_options,
TableCache* table_cache,
const InternalKeyComparator* cmp)
TableCache* table_cache)
: column_family_set_(new ColumnFamilySet()),
env_(options->env),
dbname_(dbname),
options_(options),
table_cache_(table_cache),
icmp_(*cmp),
next_file_number_(2),
manifest_file_number_(0), // Filled by Recover()
last_sequence_(0),
prev_log_number_(0),
num_levels_(options_->num_levels),
current_version_number_(0),
manifest_file_size_(0),
storage_options_(storage_options),
@ -1698,17 +1696,17 @@ Status VersionSet::Recover(
if (!s.ok()) {
break;
}
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(
icmp_.user_comparator()->Name(),
"does not match existing comparator " + edit.comparator_);
break;
}
// Not found means that user didn't supply that column
// family option AND we encountered column family add
// record. Once we encounter column family drop record,
// we will delete the column family from
// column_families_not_found.
bool cf_in_not_found =
column_families_not_found.find(edit.column_family_) !=
column_families_not_found.end();
// in builders means that user supplied that column family
// option AND that we encountered column family add record
bool cf_in_builders =
builders.find(edit.column_family_) != builders.end();
@ -1729,6 +1727,13 @@ Status VersionSet::Recover(
CreateColumnFamily(cf_options->second, &edit);
builders.insert(
{edit.column_family_, new Builder(new_cfd, new_cfd->current())});
if (edit.has_comparator_ &&
edit.comparator_ != new_cfd->user_comparator()->Name()) {
s = Status::InvalidArgument(
new_cfd->user_comparator()->Name(),
"does not match existing comparator " + edit.comparator_);
break;
}
}
} else if (edit.is_column_family_drop_) {
if (cf_in_builders) {
@ -1764,6 +1769,13 @@ Status VersionSet::Recover(
cfd->SetLogNumber(edit.log_number_);
have_log_number = true;
}
if (edit.has_comparator_ &&
edit.comparator_ != cfd->user_comparator()->Name()) {
s = Status::InvalidArgument(
cfd->user_comparator()->Name(),
"does not match existing comparator " + edit.comparator_);
break;
}
// if it is not column family add or column family drop,
// then it's a file add/delete, which should be forwarded
@ -1924,9 +1936,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
"Number of levels needs to be bigger than 1");
}
const InternalKeyComparator cmp(options->comparator);
TableCache tc(dbname, options, storage_options, 10);
VersionSet versions(dbname, options, storage_options, &tc, &cmp);
VersionSet versions(dbname, options, storage_options, &tc);
Status status;
std::vector<ColumnFamilyDescriptor> dummy;
@ -2011,8 +2022,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
uint64_t prev_log_number = 0;
int count = 0;
// TODO works only for default column family currently
VersionSet::Builder builder(column_family_set_->GetDefault(),
column_family_set_->GetDefault()->current());
ColumnFamilyData* default_cfd = column_family_set_->GetDefault();
VersionSet::Builder builder(default_cfd, default_cfd->current());
{
VersionSet::LogReporter reporter;
@ -2025,11 +2036,11 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
VersionEdit edit;
s = edit.DecodeFrom(record);
if (s.ok()) {
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(icmp_.user_comparator()->Name(),
"does not match existing comparator " +
edit.comparator_);
if (edit.column_family_ == 0 && edit.has_comparator_ &&
edit.comparator_ != default_cfd->user_comparator()->Name()) {
s = Status::InvalidArgument(
default_cfd->user_comparator()->Name(),
"does not match existing comparator " + edit.comparator_);
}
}
@ -2127,6 +2138,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
// no need to explicitly write it
edit.AddColumnFamily(cfd->GetName());
edit.SetColumnFamily(cfd->GetID());
}
edit.SetComparatorName(
cfd->internal_comparator().user_comparator()->Name());
std::string record;
edit.EncodeTo(&record);
Status s = log->AddRecord(record);
@ -2134,14 +2148,13 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
return s;
}
}
}
{
// Save files
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
for (int level = 0; level < NumberLevels(); level++) {
for (int level = 0; level < cfd->NumberLevels(); level++) {
for (const auto& f : cfd->current()->files_[level]) {
edit.AddFile(level,
f->number,
@ -2162,13 +2175,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
}
}
// Save metadata
VersionEdit edit;
edit.SetComparatorName(icmp_.user_comparator()->Name());
std::string record;
edit.EncodeTo(&record);
return log->AddRecord(record);
return Status::OK();
}
// Opens the mainfest file and reads all records
@ -2205,10 +2212,12 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
for (int level = 0; level < v->NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
if (icmp_.Compare(files[i]->largest, ikey) <= 0) {
if (v->cfd_->internal_comparator().Compare(files[i]->largest, ikey) <=
0) {
// Entire file is before "ikey", so just add the file size
result += files[i]->file_size;
} else if (icmp_.Compare(files[i]->smallest, ikey) > 0) {
} else if (v->cfd_->internal_comparator().Compare(files[i]->smallest,
ikey) > 0) {
// Entire file is after "ikey", so ignore
if (level > 0) {
// Files other than level 0 are sorted by meta->smallest, so
@ -2368,7 +2377,7 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
for (auto cfd : *column_family_set_) {
for (int level = 0; level < NumberLevels(); level++) {
for (int level = 0; level < cfd->NumberLevels(); level++) {
for (const auto& file : cfd->current()->files_[level]) {
LiveFileMetaData filemetadata;
filemetadata.name = TableFileName("", file->number);

@ -201,7 +201,7 @@ class Version {
friend class Compaction;
friend class VersionSet;
friend class DBImpl;
friend struct ColumnFamilyData;
friend class ColumnFamilyData;
friend class CompactionPicker;
friend class LevelCompactionPicker;
friend class UniversalCompactionPicker;
@ -232,7 +232,7 @@ class Version {
// but files in each level are now sorted based on file
// size. The file with the largest size is at the front.
// This vector stores the index of the file from files_.
std::vector< std::vector<int> > files_by_size_;
std::vector<std::vector<int>> files_by_size_;
// An index into files_by_size_ that specifies the first
// file that is not yet compacted
@ -281,8 +281,7 @@ class Version {
class VersionSet {
public:
VersionSet(const std::string& dbname, const Options* options,
const EnvOptions& storage_options, TableCache* table_cache,
const InternalKeyComparator*);
const EnvOptions& storage_options, TableCache* table_cache);
~VersionSet();
// Apply *edit to the current version to form a new descriptor that
@ -361,8 +360,6 @@ class VersionSet {
return min_log_num;
}
int NumberLevels() const { return num_levels_; }
// Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed.
Iterator* MakeInputIterator(Compaction* c);
@ -406,10 +403,7 @@ class VersionSet {
class Builder;
struct ManifestWriter;
friend class Compaction;
friend class Version;
// TODO(icanadi) temporarily until we have what ColumnFamilyData needs (icmp_)
friend class ColumnFamilyData;
struct LogReporter : public log::Reader::Reporter {
Status* status;
@ -431,14 +425,11 @@ class VersionSet {
const std::string dbname_;
const Options* const options_;
TableCache* const table_cache_;
const InternalKeyComparator icmp_;
uint64_t next_file_number_;
uint64_t manifest_file_number_;
std::atomic<uint64_t> last_sequence_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
int num_levels_;
// Opened lazily
unique_ptr<log::Writer> descriptor_log_;

@ -536,10 +536,8 @@ void ManifestDumpCommand::DoCommand() {
std::string file(manifestfile);
std::string dbname("dummy");
TableCache* tc = new TableCache(dbname, &options, sopt, 10);
const InternalKeyComparator* cmp =
new InternalKeyComparator(options.comparator);
VersionSet* versions = new VersionSet(dbname, &options, sopt, tc, cmp);
VersionSet* versions = new VersionSet(dbname, &options, sopt, tc);
Status s = versions->DumpManifest(options, file, verbose_, is_key_hex_);
if (!s.ok()) {
printf("Error in processing file %s %s\n", manifestfile.c_str(),
@ -1015,7 +1013,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
EnvOptions soptions;
TableCache tc(db_path_, &opt, soptions, 10);
const InternalKeyComparator cmp(opt.comparator);
VersionSet versions(db_path_, &opt, soptions, &tc, &cmp);
VersionSet versions(db_path_, &opt, soptions, &tc);
std::vector<ColumnFamilyDescriptor> dummy;
ColumnFamilyDescriptor dummy_descriptor(default_column_family_name,
ColumnFamilyOptions(opt));
@ -1029,7 +1027,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
}
int max = -1;
auto default_cfd = versions.GetColumnFamilySet()->GetDefault();
for (int i = 0; i < versions.NumberLevels(); i++) {
for (int i = 0; i < default_cfd->NumberLevels(); i++) {
if (default_cfd->current()->NumLevelFiles(i)) {
max = i;
}

Loading…
Cancel
Save