[column families] Get rid of VersionSet::current_ and keep current Version for each column family

Summary:
The biggest change here is getting rid of current_ Version and adding a column_family_data->current Version to each column family.

I have also fixed some smaller things in VersionSet that made it easier to implement Column family support.

Test Plan: make check

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15105
main
Igor Canadi 11 years ago
parent 19e3ee64ac
commit d076cef347
  1. 21
      db/db_impl.cc
  2. 490
      db/version_set.cc
  3. 93
      db/version_set.h
  4. 3
      db/version_set_reduce_num_levels.cc

@ -906,7 +906,7 @@ Status DBImpl::Recover(
} }
auto cf_data_iter = versions_->column_family_data_.find(cf_iter->second); auto cf_data_iter = versions_->column_family_data_.find(cf_iter->second);
assert(cf_data_iter != versions_->column_family_data_.end()); assert(cf_data_iter != versions_->column_family_data_.end());
cf_data_iter->second.options = cf.options; cf_data_iter->second->options = cf.options;
} }
SequenceNumber max_sequence(0); SequenceNumber max_sequence(0);
@ -2888,15 +2888,19 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
Status s = versions_->LogAndApply(&edit, &mutex_); Status s = versions_->LogAndApply(&edit, &mutex_);
if (s.ok()) { if (s.ok()) {
// add to internal data structures // add to internal data structures
versions_->column_families_[column_family_name] = handle->id; versions_->CreateColumnFamily(options, &edit);
versions_->column_family_data_.insert(
{handle->id,
VersionSet::ColumnFamilyData(column_family_name, options)});
} }
return s; return s;
} }
Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) { Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) {
// TODO this is not good. implement some sort of refcounting
// column family data and only delete when refcount goes to 0
// We don't want to delete column family if there is a compaction going on,
// or if there are some outstanding iterators
if (column_family.id == 0) {
return Status::InvalidArgument("Can't drop default column family");
}
VersionEdit edit(0); VersionEdit edit(0);
edit.DropColumnFamily(); edit.DropColumnFamily();
edit.SetColumnFamily(column_family.id); edit.SetColumnFamily(column_family.id);
@ -2908,10 +2912,7 @@ Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) {
Status s = versions_->LogAndApply(&edit, &mutex_); Status s = versions_->LogAndApply(&edit, &mutex_);
if (s.ok()) { if (s.ok()) {
// remove from internal data structures // remove from internal data structures
auto cf_iter = versions_->column_families_.find(data_iter->second.name); versions_->DropColumnFamily(&edit);
assert(cf_iter != versions_->column_families_.end());
versions_->column_families_.erase(cf_iter);
versions_->column_family_data_.erase(data_iter);
} }
return s; return s;
} }
@ -3931,7 +3932,7 @@ Status DB::OpenWithColumnFamilies(
} }
impl->mutex_.Unlock(); impl->mutex_.Unlock();
if (options.compaction_style == kCompactionStyleUniversal) { if (s.ok() && options.compaction_style == kCompactionStyleUniversal) {
int num_files; int num_files;
for (int i = 1; i < impl->NumberLevels(); i++) { for (int i = 1; i < impl->NumberLevels(); i++) {
num_files = impl->versions_->NumLevelFiles(i); num_files = impl->versions_->NumLevelFiles(i);

File diff suppressed because it is too large Load Diff

@ -149,6 +149,7 @@ class Version {
friend class Compaction; friend class Compaction;
friend class VersionSet; friend class VersionSet;
friend class DBImpl; friend class DBImpl;
friend struct ColumnFamilyData;
class LevelFileNumIterator; class LevelFileNumIterator;
Iterator* NewConcatenatingIterator(const ReadOptions&, Iterator* NewConcatenatingIterator(const ReadOptions&,
@ -197,9 +198,6 @@ class Version {
double max_compaction_score_; // max score in l1 to ln-1 double max_compaction_score_; // max score in l1 to ln-1
int max_compaction_score_level_; // level on which max score occurs int max_compaction_score_level_; // level on which max score occurs
// The offset in the manifest file where this version is stored.
uint64_t offset_manifest_file_;
// A version number that uniquely represents this version. This is // A version number that uniquely represents this version. This is
// used for debugging and logging purposes only. // used for debugging and logging purposes only.
uint64_t version_number_; uint64_t version_number_;
@ -219,6 +217,20 @@ class Version {
void operator=(const Version&); void operator=(const Version&);
}; };
// column family metadata
struct ColumnFamilyData {
std::string name;
Version dummy_versions; // Head of circular doubly-linked list of versions.
Version* current; // == dummy_versions.prev_
ColumnFamilyOptions options;
ColumnFamilyData(const std::string& name,
VersionSet* vset,
const ColumnFamilyOptions& options)
: name(name), dummy_versions(vset), current(nullptr), options(options) {}
~ColumnFamilyData() {}
};
class VersionSet { class VersionSet {
public: public:
VersionSet(const std::string& dbname, VersionSet(const std::string& dbname,
@ -233,9 +245,18 @@ class VersionSet {
// current version. Will release *mu while actually writing to the file. // current version. Will release *mu while actually writing to the file.
// REQUIRES: *mu is held on entry. // REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply() // REQUIRES: no other thread concurrently calls LogAndApply()
Status LogAndApply(VersionEdit* edit, port::Mutex* mu, Status LogAndApply(ColumnFamilyData* column_family_data,
VersionEdit* edit,
port::Mutex* mu,
bool new_descriptor_log = false); bool new_descriptor_log = false);
Status LogAndApply(VersionEdit* edit,
port::Mutex* mu,
bool new_descriptor_log = false) {
return LogAndApply(
column_family_data_.find(0)->second, edit, mu, new_descriptor_log);
}
// Recover the last saved descriptor from persistent storage. // Recover the last saved descriptor from persistent storage.
Status Recover(); Status Recover();
@ -248,7 +269,10 @@ class VersionSet {
Status ReduceNumberOfLevels(int new_levels, port::Mutex* mu); Status ReduceNumberOfLevels(int new_levels, port::Mutex* mu);
// Return the current version. // Return the current version.
Version* current() const { return current_; } Version* current() const {
// TODO this only works for default column family now
return column_family_data_.find(0)->second->current;
}
// Return the current manifest file number // Return the current manifest file number
uint64_t ManifestFileNumber() const { return manifest_file_number_; } uint64_t ManifestFileNumber() const { return manifest_file_number_; }
@ -327,11 +351,13 @@ class VersionSet {
// ending up with nothing to do. We can improve it later. // ending up with nothing to do. We can improve it later.
// TODO: improve this function to be accurate for universal // TODO: improve this function to be accurate for universal
// compactions. // compactions.
// TODO this only works for default column family now
Version* version = column_family_data_.find(0)->second->current;
int num_levels_to_check = int num_levels_to_check =
(options_->compaction_style != kCompactionStyleUniversal) ? (options_->compaction_style != kCompactionStyleUniversal) ?
NumberLevels() - 1 : 1; NumberLevels() - 1 : 1;
for (int i = 0; i < num_levels_to_check; i++) { for (int i = 0; i < num_levels_to_check; i++) {
if (current_->compaction_score_[i] >= 1) { if (version->compaction_score_[i] >= 1) {
return true; return true;
} }
} }
@ -339,18 +365,23 @@ class VersionSet {
} }
// Returns true iff some level needs a compaction. // Returns true iff some level needs a compaction.
bool NeedsCompaction() const { bool NeedsCompaction() const {
return ((current_->file_to_compact_ != nullptr) || // TODO this only works for default column family now
NeedsSizeCompaction()); Version* version = column_family_data_.find(0)->second->current;
return ((version->file_to_compact_ != nullptr) || NeedsSizeCompaction());
} }
// Returns the maxmimum compaction score for levels 1 to max // Returns the maxmimum compaction score for levels 1 to max
double MaxCompactionScore() const { double MaxCompactionScore() const {
return current_->max_compaction_score_; // TODO this only works for default column family now
Version* version = column_family_data_.find(0)->second->current;
return version->max_compaction_score_;
} }
// See field declaration // See field declaration
int MaxCompactionScoreLevel() const { int MaxCompactionScoreLevel() const {
return current_->max_compaction_score_level_; // TODO this only works for default column family now
Version* version = column_family_data_.find(0)->second->current;
return version->max_compaction_score_level_;
} }
// Add all files listed in any live version to *live. // Add all files listed in any live version to *live.
@ -383,10 +414,12 @@ class VersionSet {
// Return a human-readable short (single-line) summary of files // Return a human-readable short (single-line) summary of files
// in a specified level. Uses *scratch as backing store. // in a specified level. Uses *scratch as backing store.
const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const; const char* LevelFileSummary(Version* version,
FileSummaryStorage* scratch,
int level) const;
// Return the size of the current manifest file // Return the size of the current manifest file
const uint64_t ManifestFileSize() { return current_->offset_manifest_file_; } const uint64_t ManifestFileSize() { return manifest_file_size_; }
// For the specfied level, pick a compaction. // For the specfied level, pick a compaction.
// Returns nullptr if there is no compaction to be done. // Returns nullptr if there is no compaction to be done.
@ -436,17 +469,13 @@ class VersionSet {
void GetObsoleteFiles(std::vector<FileMetaData*>* files); void GetObsoleteFiles(std::vector<FileMetaData*>* files);
// column family metadata ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& options,
struct ColumnFamilyData { VersionEdit* edit);
std::string name;
ColumnFamilyOptions options; void DropColumnFamily(VersionEdit* edit);
explicit ColumnFamilyData(const std::string& name) : name(name) {}
ColumnFamilyData(const std::string& name,
const ColumnFamilyOptions& options)
: name(name), options(options) {}
};
std::unordered_map<std::string, uint32_t> column_families_; std::unordered_map<std::string, uint32_t> column_families_;
std::unordered_map<uint32_t, ColumnFamilyData> column_family_data_; std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;
uint32_t max_column_family_; uint32_t max_column_family_;
private: private:
@ -476,7 +505,7 @@ class VersionSet {
// Save current contents to *log // Save current contents to *log
Status WriteSnapshot(log::Writer* log); Status WriteSnapshot(log::Writer* log);
void AppendVersion(Version* v); void AppendVersion(ColumnFamilyData* column_family_data, Version* v);
bool ManifestContains(const std::string& record) const; bool ManifestContains(const std::string& record) const;
@ -499,8 +528,6 @@ class VersionSet {
// Opened lazily // Opened lazily
unique_ptr<log::Writer> descriptor_log_; unique_ptr<log::Writer> descriptor_log_;
Version dummy_versions_; // Head of circular doubly-linked list of versions.
Version* current_; // == dummy_versions_.prev_
// Per-level key at which the next compaction at that level should start. // Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey. // Either an empty string, or a valid InternalKey.
@ -521,9 +548,8 @@ class VersionSet {
// Queue of writers to the manifest file // Queue of writers to the manifest file
std::deque<ManifestWriter*> manifest_writers_; std::deque<ManifestWriter*> manifest_writers_;
// Store the manifest file size when it is checked. // size of manifest file
// Save us the cost of checking file size twice in LogAndApply uint64_t manifest_file_size_;
uint64_t last_observed_manifest_size_;
std::vector<FileMetaData*> obsolete_files_; std::vector<FileMetaData*> obsolete_files_;
@ -616,9 +642,14 @@ class Compaction {
friend class Version; friend class Version;
friend class VersionSet; friend class VersionSet;
explicit Compaction(int level, int out_level, uint64_t target_file_size, Compaction(int level,
uint64_t max_grandparent_overlap_bytes, int number_levels, int out_level,
bool seek_compaction = false, bool enable_compression = true); uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes,
int number_levels,
Version* input_version,
bool seek_compaction = false,
bool enable_compression = true);
int level_; int level_;
int out_level_; // levels to which output files are stored int out_level_; // levels to which output files are stored

@ -24,7 +24,8 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) {
"Number of levels needs to be bigger than 1"); "Number of levels needs to be bigger than 1");
} }
Version* current_version = current_; // TODO this only works for default column family now
Version* current_version = column_family_data_.find(0)->second->current;
int current_levels = NumberLevels(); int current_levels = NumberLevels();
if (current_levels <= new_levels) { if (current_levels <= new_levels) {

Loading…
Cancel
Save