ColumnFamilySet

Summary:
I created a separate class ColumnFamilySet to keep track of column families. Before we did this in VersionSet and I believe this approach is cleaner.

Let me know if you have any comments. I will commit tomorrow.

Test Plan: make check

Reviewers: dhruba, haobo, kailiu, sdong

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15357
main
Igor Canadi 11 years ago
parent f9a25dda9f
commit 7c5e583a27
  1. 86
      db/column_family.cc
  2. 87
      db/column_family.h
  3. 2
      db/column_family_test.cc
  4. 64
      db/db_impl.cc
  5. 4
      db/db_test.cc
  6. 276
      db/version_set.cc
  7. 70
      db/version_set.h
  8. 2
      db/version_set_reduce_num_levels.cc
  9. 2
      include/rocksdb/db.h
  10. 9
      util/ldb_cmd.cc

@ -0,0 +1,86 @@
#include "db/column_family.h"
#include "db/version_set.h"
namespace rocksdb {
ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions,
const ColumnFamilyOptions& options)
: id(id),
name(name),
dummy_versions(dummy_versions),
current(nullptr),
options(options) {}
ColumnFamilyData::~ColumnFamilyData() {
// List must be empty
assert(dummy_versions->next_ == dummy_versions);
delete dummy_versions;
}
ColumnFamilySet::ColumnFamilySet() : max_column_family_(0) {}
ColumnFamilySet::~ColumnFamilySet() {
for (auto& cfd : column_family_data_) {
delete cfd.second;
}
for (auto& cfd : droppped_column_families_) {
delete cfd;
}
}
ColumnFamilyData* ColumnFamilySet::GetDefault() const {
auto ret = GetColumnFamily(0);
assert(ret != nullptr); // default column family should always exist
return ret;
}
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
auto cfd_iter = column_family_data_.find(id);
if (cfd_iter != column_family_data_.end()) {
return cfd_iter->second;
} else {
return nullptr;
}
}
bool ColumnFamilySet::Exists(uint32_t id) {
return column_family_data_.find(id) != column_family_data_.end();
}
bool ColumnFamilySet::Exists(const std::string& name) {
return column_families_.find(name) != column_families_.end();
}
uint32_t ColumnFamilySet::GetID(const std::string& name) {
auto cfd_iter = column_families_.find(name);
assert(cfd_iter != column_families_.end());
return cfd_iter->second;
}
uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
return ++max_column_family_;
}
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions,
const ColumnFamilyOptions& options) {
assert(column_families_.find(name) == column_families_.end());
column_families_.insert({name, id});
ColumnFamilyData* new_cfd =
new ColumnFamilyData(id, name, dummy_versions, options);
column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id);
return new_cfd;
}
void ColumnFamilySet::DropColumnFamily(uint32_t id) {
auto cfd = column_family_data_.find(id);
assert(cfd != column_family_data_.end());
column_families_.erase(cfd->second->name);
cfd->second->current->Unref();
droppped_column_families_.push_back(cfd->second);
column_family_data_.erase(cfd);
}
} // namespace rocksdb

@ -0,0 +1,87 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "rocksdb/options.h"
#include <map>
#include <string>
#include <vector>
namespace rocksdb {
class Version;
class VersionSet;
// column family metadata
struct ColumnFamilyData {
uint32_t id;
std::string name;
Version* dummy_versions; // Head of circular doubly-linked list of versions.
Version* current; // == dummy_versions->prev_
ColumnFamilyOptions options;
ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, const ColumnFamilyOptions& options);
~ColumnFamilyData();
};
class ColumnFamilySet {
public:
class iterator {
public:
explicit iterator(
std::unordered_map<uint32_t, ColumnFamilyData*>::iterator itr)
: itr_(itr) {}
iterator& operator++() {
++itr_;
return *this;
}
bool operator!=(const iterator& other) { return this->itr_ != other.itr_; }
ColumnFamilyData* operator*() { return itr_->second; }
private:
std::unordered_map<uint32_t, ColumnFamilyData*>::iterator itr_;
};
ColumnFamilySet();
~ColumnFamilySet();
ColumnFamilyData* GetDefault() const;
// GetColumnFamily() calls return nullptr if column family is not found
ColumnFamilyData* GetColumnFamily(uint32_t id) const;
bool Exists(uint32_t id);
bool Exists(const std::string& name);
uint32_t GetID(const std::string& name);
// this call will return the next available column family ID. it guarantees
// that there is no column family with id greater than or equal to the
// returned value in the current running instance. It does not, however,
// guarantee that the returned ID is unique accross RocksDB restarts.
// For example, if a client adds a column family 6 and then drops it,
// after a restart, we might reuse column family 6 ID.
uint32_t GetNextColumnFamilyID();
ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
Version* dummy_version,
const ColumnFamilyOptions& options);
void DropColumnFamily(uint32_t id);
iterator begin() { return iterator(column_family_data_.begin()); }
iterator end() { return iterator(column_family_data_.end()); }
private:
std::unordered_map<std::string, uint32_t> column_families_;
std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;
// we need to keep them alive because we still can't control the lifetime of
// all of column family data members (options for example)
std::vector<ColumnFamilyData*> droppped_column_families_;
uint32_t max_column_family_;
};
} // namespace rocksdb

@ -69,7 +69,7 @@ TEST(ColumnFamilyTest, AddDrop) {
Close(); Close();
vector<string> families; vector<string> families;
DB::ListColumnFamilies(db_options_, dbname_, &families); ASSERT_OK(DB::ListColumnFamilies(db_options_, dbname_, &families));
sort(families.begin(), families.end()); sort(families.begin(), families.end());
ASSERT_TRUE(families == vector<string>({"default", "four", "one", "three"})); ASSERT_TRUE(families == vector<string>({"default", "four", "one", "three"}));
} }

@ -913,21 +913,8 @@ Status DBImpl::Recover(
} }
} }
Status s = versions_->Recover(); Status s = versions_->Recover(column_families);
if (s.ok()) { if (s.ok()) {
if (column_families.size() != versions_->column_families_.size()) {
return Status::InvalidArgument("Column family specifications mismatch");
}
for (auto cf : column_families) {
auto cf_iter = versions_->column_families_.find(cf.name);
if (cf_iter == versions_->column_families_.end()) {
return Status::InvalidArgument("Column family specifications mismatch");
}
auto cf_data_iter = versions_->column_family_data_.find(cf_iter->second);
assert(cf_data_iter != versions_->column_family_data_.end());
cf_data_iter->second->options = cf.options;
}
SequenceNumber max_sequence(0); SequenceNumber max_sequence(0);
// Recover from all newer log files than the ones named in the // Recover from all newer log files than the ones named in the
@ -2933,11 +2920,13 @@ std::vector<Status> DBImpl::MultiGet(
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& column_family_name, const std::string& column_family_name,
ColumnFamilyHandle* handle) { ColumnFamilyHandle* handle) {
if (!versions_->GetColumnFamilySet()->Exists(column_family_name)) {
return Status::InvalidArgument("Column family already exists");
}
VersionEdit edit; VersionEdit edit;
edit.AddColumnFamily(column_family_name); edit.AddColumnFamily(column_family_name);
MutexLock l(&mutex_); MutexLock l(&mutex_);
++versions_->max_column_family_; handle->id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
handle->id = versions_->max_column_family_;
edit.SetColumnFamily(handle->id); edit.SetColumnFamily(handle->id);
Status s = versions_->LogAndApply(&edit, &mutex_); Status s = versions_->LogAndApply(&edit, &mutex_);
if (s.ok()) { if (s.ok()) {
@ -2948,21 +2937,16 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
} }
Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) { Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) {
// TODO this is not good. implement some sort of refcounting
// column family data and only delete when refcount goes to 0
// We don't want to delete column family if there is a compaction going on,
// or if there are some outstanding iterators
if (column_family.id == 0) { if (column_family.id == 0) {
return Status::InvalidArgument("Can't drop default column family"); return Status::InvalidArgument("Can't drop default column family");
} }
VersionEdit edit;
edit.DropColumnFamily();
edit.SetColumnFamily(column_family.id);
MutexLock l(&mutex_); MutexLock l(&mutex_);
auto data_iter = versions_->column_family_data_.find(column_family.id); if (!versions_->GetColumnFamilySet()->Exists(column_family.id)) {
if (data_iter == versions_->column_family_data_.end()) {
return Status::NotFound("Column family not found"); return Status::NotFound("Column family not found");
} }
VersionEdit edit;
edit.DropColumnFamily();
edit.SetColumnFamily(column_family.id);
Status s = versions_->LogAndApply(&edit, &mutex_); Status s = versions_->LogAndApply(&edit, &mutex_);
if (s.ok()) { if (s.ok()) {
// remove from internal data structures // remove from internal data structures
@ -3968,10 +3952,16 @@ Status DB::OpenWithColumnFamilies(
// set column family handles // set column family handles
handles->clear(); handles->clear();
for (auto cf : column_families) { for (auto cf : column_families) {
auto cf_iter = impl->versions_->column_families_.find(cf.name); if (!impl->versions_->GetColumnFamilySet()->Exists(cf.name)) {
assert(cf_iter != impl->versions_->column_families_.end()); s = Status::InvalidArgument("Column family not found: ", cf.name);
handles->push_back(ColumnFamilyHandle(cf_iter->second)); handles->clear();
break;
}
uint32_t id = impl->versions_->GetColumnFamilySet()->GetID(cf.name);
handles->push_back(ColumnFamilyHandle(id));
} }
}
if (s.ok()) {
delete impl->InstallSuperVersion(new DBImpl::SuperVersion()); delete impl->InstallSuperVersion(new DBImpl::SuperVersion());
impl->mem_->SetLogNumber(impl->logfile_number_); impl->mem_->SetLogNumber(impl->logfile_number_);
impl->DeleteObsoleteFiles(); impl->DeleteObsoleteFiles();
@ -4006,23 +3996,7 @@ Status DB::OpenWithColumnFamilies(
Status DB::ListColumnFamilies(const DBOptions& db_options, Status DB::ListColumnFamilies(const DBOptions& db_options,
const std::string& name, const std::string& name,
std::vector<std::string>* column_families) { std::vector<std::string>* column_families) {
Options options(db_options, ColumnFamilyOptions()); return VersionSet::ListColumnFamilies(column_families, name, db_options.env);
InternalKeyComparator* icmp = new InternalKeyComparator(options.comparator);
TableCache* table_cache = new TableCache(name, &options, EnvOptions(options),
db_options.max_open_files - 10);
VersionSet* version_set =
new VersionSet(name, &options, EnvOptions(options), table_cache, icmp);
version_set->Recover();
column_families->clear();
for (auto cf : version_set->column_families_) {
column_families->push_back(cf.first);
}
delete version_set;
delete table_cache;
delete icmp;
return Status::NotSupported("Working on it");
} }
Snapshot::~Snapshot() { Snapshot::~Snapshot() {

@ -5035,7 +5035,9 @@ void BM_LogAndApply(int iters, int num_base_files) {
Options options; Options options;
EnvOptions sopt; EnvOptions sopt;
VersionSet vset(dbname, &options, sopt, nullptr, &cmp); VersionSet vset(dbname, &options, sopt, nullptr, &cmp);
ASSERT_OK(vset.Recover()); std::vector<ColumnFamilyDescriptor> dummy;
dummy.push_back(ColumnFamilyDescriptor());
ASSERT_OK(vset.Recover(dummy));
VersionEdit vbase; VersionEdit vbase;
uint64_t fnum = 1; uint64_t fnum = 1;
for (int i = 0; i < num_base_files; i++) { for (int i = 0; i < num_base_files; i++) {

@ -10,6 +10,7 @@
#include "db/version_set.h" #include "db/version_set.h"
#include <algorithm> #include <algorithm>
#include <map>
#include <climits> #include <climits>
#include <stdio.h> #include <stdio.h>
#include "db/filename.h" #include "db/filename.h"
@ -751,9 +752,6 @@ void Version::Ref() {
} }
void Version::Unref() { void Version::Unref() {
for (auto cfd : vset_->column_family_data_) {
assert(this != &cfd.second->dummy_versions);
}
assert(refs_ >= 1); assert(refs_ >= 1);
--refs_; --refs_;
if (refs_ == 0) { if (refs_ == 0) {
@ -1344,7 +1342,8 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options,
const EnvOptions& storage_options, const EnvOptions& storage_options,
TableCache* table_cache, TableCache* table_cache,
const InternalKeyComparator* cmp) const InternalKeyComparator* cmp)
: env_(options->env), : column_family_set_(new ColumnFamilySet()),
env_(options->env),
dbname_(dbname), dbname_(dbname),
options_(options), options_(options),
table_cache_(table_cache), table_cache_(table_cache),
@ -1368,11 +1367,8 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options,
} }
VersionSet::~VersionSet() { VersionSet::~VersionSet() {
for (auto cfd : column_family_data_) { for (auto cfd : *column_family_set_) {
cfd.second->current->Unref(); cfd->current->Unref();
// List must be empty
assert(cfd.second->dummy_versions.next_ == &cfd.second->dummy_versions);
cfd.second->Unref();
} }
for (auto file : obsolete_files_) { for (auto file : obsolete_files_) {
delete file; delete file;
@ -1396,8 +1392,8 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
v->Ref(); v->Ref();
// Append to linked list // Append to linked list
v->prev_ = column_family_data->dummy_versions.prev_; v->prev_ = column_family_data->dummy_versions->prev_;
v->next_ = &column_family_data->dummy_versions; v->next_ = column_family_data->dummy_versions;
v->prev_->next_ = v; v->prev_->next_ = v;
v->next_->prev_ = v; v->next_->prev_ = v;
} }
@ -1592,13 +1588,16 @@ void VersionSet::LogAndApplyHelper(Builder* builder, Version* v,
builder->Apply(edit); builder->Apply(edit);
} }
Status VersionSet::Recover() { Status VersionSet::Recover(
struct LogReporter : public log::Reader::Reporter { const std::vector<ColumnFamilyDescriptor>& column_families) {
Status* status; std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
virtual void Corruption(size_t bytes, const Status& s) { for (auto cf : column_families) {
if (this->status->ok()) *this->status = s; cf_name_to_options.insert({cf.name, cf.options});
} }
}; // keeps track of column families in manifest that were not found in
// column families parameters. if those column families are not dropped
// by subsequent manifest records, Recover() will return failure status
std::set<int> column_families_not_found;
// Read "CURRENT" file, which contains a pointer to the current manifest file // Read "CURRENT" file, which contains a pointer to the current manifest file
std::string current; std::string current;
@ -1640,12 +1639,17 @@ Status VersionSet::Recover() {
VersionEdit default_cf_edit; VersionEdit default_cf_edit;
default_cf_edit.AddColumnFamily(default_column_family_name); default_cf_edit.AddColumnFamily(default_column_family_name);
default_cf_edit.SetColumnFamily(0); default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* default_cfd = auto default_cf_iter = cf_name_to_options.find(default_column_family_name);
CreateColumnFamily(ColumnFamilyOptions(*options_), &default_cf_edit); if (default_cf_iter == cf_name_to_options.end()) {
builders.insert({0, new Builder(this, default_cfd->current)}); column_families_not_found.insert(0);
} else {
ColumnFamilyData* default_cfd =
CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
builders.insert({0, new Builder(this, default_cfd->current)});
}
{ {
LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(std::move(file), &reporter, true/*checksum*/, log::Reader reader(std::move(file), &reporter, true/*checksum*/,
0/*initial_offset*/); 0/*initial_offset*/);
@ -1665,27 +1669,61 @@ Status VersionSet::Recover() {
break; break;
} }
bool cf_in_not_found =
column_families_not_found.find(edit.column_family_) !=
column_families_not_found.end();
bool cf_in_builders =
builders.find(edit.column_family_) != builders.end();
// they can't both be true
assert(!(cf_in_not_found && cf_in_builders));
if (edit.is_column_family_add_) { if (edit.is_column_family_add_) {
ColumnFamilyData* new_cfd = if (cf_in_builders || cf_in_not_found) {
CreateColumnFamily(ColumnFamilyOptions(), &edit); s = Status::Corruption(
builders.insert( "Manifest adding the same column family twice");
{edit.column_family_, new Builder(this, new_cfd->current)}); break;
}
auto cf_options = cf_name_to_options.find(edit.column_family_name_);
if (cf_options == cf_name_to_options.end()) {
column_families_not_found.insert(edit.column_family_);
} else {
ColumnFamilyData* new_cfd =
CreateColumnFamily(cf_options->second, &edit);
builders.insert(
{edit.column_family_, new Builder(this, new_cfd->current)});
}
} else if (edit.is_column_family_drop_) { } else if (edit.is_column_family_drop_) {
auto builder = builders.find(edit.column_family_); if (cf_in_builders) {
assert(builder != builders.end()); auto builder = builders.find(edit.column_family_);
delete builder->second; assert(builder != builders.end());
builders.erase(builder); delete builder->second;
DropColumnFamily(&edit); builders.erase(builder);
} else { DropColumnFamily(&edit);
auto cfd = column_family_data_.find(edit.column_family_); } else if (cf_in_not_found) {
assert(cfd != column_family_data_.end()); column_families_not_found.erase(edit.column_family_);
if (edit.max_level_ >= cfd->second->current->NumberLevels()) { } else {
s = Status::Corruption(
"Manifest - dropping non-existing column family");
break;
}
} else if (!cf_in_not_found) {
if (!cf_in_builders) {
s = Status::Corruption(
"Manifest record referencing unknown column family");
break;
}
auto cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// this should never happen since cf_in_builders is true
assert(cfd != nullptr);
if (edit.max_level_ >= cfd->current->NumberLevels()) {
s = Status::InvalidArgument( s = Status::InvalidArgument(
"db has more levels than options.num_levels"); "db has more levels than options.num_levels");
break; break;
} }
// if it isn't column family add or column family drop, // if it is not column family add or column family drop,
// then it's a file add/delete, which should be forwarded // then it's a file add/delete, which should be forwarded
// to builder // to builder
auto builder = builders.find(edit.column_family_); auto builder = builders.find(edit.column_family_);
@ -1733,16 +1771,24 @@ Status VersionSet::Recover() {
MarkFileNumberUsed(log_number); MarkFileNumberUsed(log_number);
} }
// there were some column families in the MANIFEST that weren't specified
// in the argument
if (column_families_not_found.size() > 0) {
s = Status::InvalidArgument(
"Found unexpected column families. You have to specify all column "
"families when opening the DB");
}
if (s.ok()) { if (s.ok()) {
for (auto cfd : column_family_data_) { for (auto cfd : *column_family_set_) {
Version* v = new Version(this, current_version_number_++); Version* v = new Version(this, current_version_number_++);
builders[cfd.first]->SaveTo(v); builders[cfd->id]->SaveTo(v);
// Install recovered version // Install recovered version
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1); std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
compaction_picker_->SizeBeingCompacted(size_being_compacted); compaction_picker_->SizeBeingCompacted(size_being_compacted);
v->Finalize(size_being_compacted); v->Finalize(size_being_compacted);
AppendVersion(cfd.second, v); AppendVersion(cfd, v);
} }
manifest_file_size_ = manifest_file_size; manifest_file_size_ = manifest_file_size;
@ -1771,15 +1817,65 @@ Status VersionSet::Recover() {
return s; return s;
} }
Status VersionSet::DumpManifest(Options& options, std::string& dscname, Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
bool verbose, bool hex) { const std::string& dbname, Env* env) {
struct LogReporter : public log::Reader::Reporter {
Status* status; // these are just for performance reasons, not correcntes,
virtual void Corruption(size_t bytes, const Status& s) { // so we're fine using the defaults
if (this->status->ok()) *this->status = s; EnvOptions soptions;
// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string current;
Status s = ReadFileToString(env, CurrentFileName(dbname), &current);
if (!s.ok()) {
return s;
}
if (current.empty() || current[current.size()-1] != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);
std::string dscname = dbname + "/" + current;
unique_ptr<SequentialFile> file;
s = env->NewSequentialFile(dscname, &file, soptions);
if (!s.ok()) {
return s;
}
std::map<uint32_t, std::string> column_family_names;
// default column family is always implicitly there
column_family_names.insert({0, default_column_family_name});
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(std::move(file), &reporter, true /*checksum*/,
0 /*initial_offset*/);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
break;
}
if (edit.is_column_family_add_) {
column_family_names.insert(
{edit.column_family_, edit.column_family_name_});
} else if (edit.is_column_family_drop_) {
column_family_names.erase(edit.column_family_);
}
}
column_families->clear();
if (s.ok()) {
for (const auto& iter : column_family_names) {
column_families->push_back(iter.second);
} }
}; }
return s;
}
Status VersionSet::DumpManifest(Options& options, std::string& dscname,
bool verbose, bool hex) {
// Open the specified manifest file. // Open the specified manifest file.
unique_ptr<SequentialFile> file; unique_ptr<SequentialFile> file;
Status s = options.env->NewSequentialFile(dscname, &file, storage_options_); Status s = options.env->NewSequentialFile(dscname, &file, storage_options_);
@ -1797,11 +1893,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
uint64_t prev_log_number = 0; uint64_t prev_log_number = 0;
int count = 0; int count = 0;
// TODO works only for default column family currently // TODO works only for default column family currently
VersionSet::Builder builder(this, VersionSet::Builder builder(this, column_family_set_->GetDefault()->current);
column_family_data_.find(0)->second->current);
{ {
LogReporter reporter; VersionSet::LogReporter reporter;
reporter.status = &s; reporter.status = &s;
log::Reader reader(std::move(file), &reporter, true/*checksum*/, log::Reader reader(std::move(file), &reporter, true/*checksum*/,
0/*initial_offset*/); 0/*initial_offset*/);
@ -1905,15 +2000,15 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
Status VersionSet::WriteSnapshot(log::Writer* log) { Status VersionSet::WriteSnapshot(log::Writer* log) {
// TODO: Break up into multiple records to reduce memory usage on recovery? // TODO: Break up into multiple records to reduce memory usage on recovery?
for (auto cfd : column_family_data_) { for (auto cfd : *column_family_set_) {
{ {
// Store column family info // Store column family info
VersionEdit edit; VersionEdit edit;
if (cfd.first != 0) { if (cfd->id != 0) {
// default column family is always there, // default column family is always there,
// no need to explicitly write it // no need to explicitly write it
edit.AddColumnFamily(cfd.second->name); edit.AddColumnFamily(cfd->name);
edit.SetColumnFamily(cfd.first); edit.SetColumnFamily(cfd->id);
std::string record; std::string record;
edit.EncodeTo(&record); edit.EncodeTo(&record);
Status s = log->AddRecord(record); Status s = log->AddRecord(record);
@ -1926,13 +2021,10 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
{ {
// Save files // Save files
VersionEdit edit; VersionEdit edit;
edit.SetColumnFamily(cfd.first); edit.SetColumnFamily(cfd->id);
for (int level = 0; level < NumberLevels(); level++) { for (int level = 0; level < NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = for (const auto& f : cfd->current->files_[level]) {
cfd.second->current->files_[level];
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
edit.AddFile(level, edit.AddFile(level,
f->number, f->number,
f->file_size, f->file_size,
@ -2025,9 +2117,9 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) { void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
// pre-calculate space requirement // pre-calculate space requirement
int64_t total_files = 0; int64_t total_files = 0;
for (auto cfd : column_family_data_) { for (auto cfd : *column_family_set_) {
for (Version* v = cfd.second->dummy_versions.next_; for (Version* v = cfd->dummy_versions->next_; v != cfd->dummy_versions;
v != &cfd.second->dummy_versions; v = v->next_) { v = v->next_) {
for (int level = 0; level < v->NumberLevels(); level++) { for (int level = 0; level < v->NumberLevels(); level++) {
total_files += v->files_[level].size(); total_files += v->files_[level].size();
} }
@ -2037,9 +2129,9 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
// just one time extension to the right size // just one time extension to the right size
live_list->reserve(live_list->size() + total_files); live_list->reserve(live_list->size() + total_files);
for (auto cfd : column_family_data_) { for (auto cfd : *column_family_set_) {
for (Version* v = cfd.second->dummy_versions.next_; for (Version* v = cfd->dummy_versions->next_; v != cfd->dummy_versions;
v != &cfd.second->dummy_versions; v = v->next_) { v = v->next_) {
for (int level = 0; level < v->NumberLevels(); level++) { for (int level = 0; level < v->NumberLevels(); level++) {
for (const auto& f : v->files_[level]) { for (const auto& f : v->files_[level]) {
live_list->push_back(f->number); live_list->push_back(f->number);
@ -2051,7 +2143,7 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
Compaction* VersionSet::PickCompaction() { Compaction* VersionSet::PickCompaction() {
// TODO this only works for default column family now // TODO this only works for default column family now
Version* version = column_family_data_.find(0)->second->current; Version* version = column_family_set_->GetDefault()->current;
return compaction_picker_->PickCompaction(version); return compaction_picker_->PickCompaction(version);
} }
@ -2060,7 +2152,7 @@ Compaction* VersionSet::CompactRange(int input_level, int output_level,
const InternalKey* end, const InternalKey* end,
InternalKey** compaction_end) { InternalKey** compaction_end) {
// TODO this only works for default column family now // TODO this only works for default column family now
Version* version = column_family_data_.find(0)->second->current; Version* version = column_family_set_->GetDefault()->current;
return compaction_picker_->CompactRange(version, input_level, output_level, return compaction_picker_->CompactRange(version, input_level, output_level,
begin, end, compaction_end); begin, end, compaction_end);
} }
@ -2112,7 +2204,7 @@ uint64_t VersionSet::MaxFileSizeForLevel(int level) {
bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
#ifndef NDEBUG #ifndef NDEBUG
// TODO this only works for default column family now // TODO this only works for default column family now
Version* version = column_family_data_.find(0)->second->current; Version* version = column_family_set_->GetDefault()->current;
if (c->input_version() != version) { if (c->input_version() != version) {
Log(options_->info_log, "VerifyCompactionFileConsistency version mismatch"); Log(options_->info_log, "VerifyCompactionFileConsistency version mismatch");
} }
@ -2163,13 +2255,12 @@ void VersionSet::ReleaseCompactionFiles(Compaction* c, Status status) {
Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
FileMetaData* meta) { FileMetaData* meta) {
for (auto cfd : column_family_data_) { for (auto cfd : *column_family_set_) {
Version* version = cfd.second->current; Version* version = cfd->current;
for (int level = 0; level < version->NumberLevels(); level++) { for (int level = 0; level < version->NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = version->files_[level]; for (const auto& file : version->files_[level]) {
for (size_t i = 0; i < files.size(); i++) { if (file->number == number) {
if (files[i]->number == number) { *meta = *file;
*meta = *files[i];
*filelevel = level; *filelevel = level;
return Status::OK(); return Status::OK();
} }
@ -2180,19 +2271,17 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
} }
void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) { void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
for (auto cfd : column_family_data_) { for (auto cfd : *column_family_set_) {
for (int level = 0; level < NumberLevels(); level++) { for (int level = 0; level < NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = for (const auto& file : cfd->current->files_[level]) {
cfd.second->current->files_[level];
for (size_t i = 0; i < files.size(); i++) {
LiveFileMetaData filemetadata; LiveFileMetaData filemetadata;
filemetadata.name = TableFileName("", files[i]->number); filemetadata.name = TableFileName("", file->number);
filemetadata.level = level; filemetadata.level = level;
filemetadata.size = files[i]->file_size; filemetadata.size = file->file_size;
filemetadata.smallestkey = files[i]->smallest.user_key().ToString(); filemetadata.smallestkey = file->smallest.user_key().ToString();
filemetadata.largestkey = files[i]->largest.user_key().ToString(); filemetadata.largestkey = file->largest.user_key().ToString();
filemetadata.smallest_seqno = files[i]->smallest_seqno; filemetadata.smallest_seqno = file->smallest_seqno;
filemetadata.largest_seqno = files[i]->largest_seqno; filemetadata.largest_seqno = file->largest_seqno;
metadata->push_back(filemetadata); metadata->push_back(filemetadata);
} }
} }
@ -2206,29 +2295,18 @@ void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files) {
ColumnFamilyData* VersionSet::CreateColumnFamily( ColumnFamilyData* VersionSet::CreateColumnFamily(
const ColumnFamilyOptions& options, VersionEdit* edit) { const ColumnFamilyOptions& options, VersionEdit* edit) {
assert(column_families_.find(edit->column_family_name_) ==
column_families_.end());
assert(edit->is_column_family_add_); assert(edit->is_column_family_add_);
column_families_.insert({edit->column_family_name_, edit->column_family_}); Version* dummy_versions = new Version(this);
ColumnFamilyData* new_cfd = auto new_cfd = column_family_set_->CreateColumnFamily(
new ColumnFamilyData(edit->column_family_name_, this, options); edit->column_family_name_, edit->column_family_, dummy_versions, options);
column_family_data_.insert({edit->column_family_, new_cfd});
max_column_family_ = std::max(max_column_family_, edit->column_family_);
AppendVersion(new_cfd, new Version(this, current_version_number_++)); AppendVersion(new_cfd, new Version(this, current_version_number_++));
return new_cfd; return new_cfd;
} }
void VersionSet::DropColumnFamily(VersionEdit* edit) { void VersionSet::DropColumnFamily(VersionEdit* edit) {
auto cfd = column_family_data_.find(edit->column_family_); column_family_set_->DropColumnFamily(edit->column_family_);
assert(cfd != column_family_data_.end());
column_families_.erase(cfd->second->name);
cfd->second->current->Unref();
// List must be empty
assert(cfd->second->dummy_versions.next_ == &cfd->second->dummy_versions);
// might delete itself
cfd->second->Unref();
column_family_data_.erase(cfd);
} }
} // namespace rocksdb } // namespace rocksdb

@ -29,6 +29,8 @@
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/compaction.h" #include "db/compaction.h"
#include "db/compaction_picker.h" #include "db/compaction_picker.h"
#include "db/column_family.h"
#include "db/log_reader.h"
namespace rocksdb { namespace rocksdb {
@ -42,6 +44,8 @@ class TableCache;
class Version; class Version;
class VersionSet; class VersionSet;
class MergeContext; class MergeContext;
struct ColumnFamilyData;
class ColumnFamilySet;
// Return the smallest index i such that files[i]->largest >= key. // Return the smallest index i such that files[i]->largest >= key.
// Return files.size() if there is no such file. // Return files.size() if there is no such file.
@ -263,38 +267,6 @@ class Version {
void operator=(const Version&); void operator=(const Version&);
}; };
// column family metadata
struct ColumnFamilyData {
std::string name;
Version dummy_versions; // Head of circular doubly-linked list of versions.
Version* current; // == dummy_versions.prev_
ColumnFamilyOptions options;
int refs;
void Ref() {
++refs;
}
void Unref() {
assert(refs > 0);
if (refs == 1) {
delete this;
} else {
--refs;
}
}
ColumnFamilyData(const std::string& name,
VersionSet* vset,
const ColumnFamilyOptions& options)
: name(name),
dummy_versions(vset),
current(nullptr),
options(options),
refs(1) {}
~ColumnFamilyData() {}
};
class VersionSet { class VersionSet {
public: public:
VersionSet(const std::string& dbname, const Options* options, VersionSet(const std::string& dbname, const Options* options,
@ -315,12 +287,17 @@ class VersionSet {
Status LogAndApply(VersionEdit* edit, Status LogAndApply(VersionEdit* edit,
port::Mutex* mu, port::Mutex* mu,
bool new_descriptor_log = false) { bool new_descriptor_log = false) {
return LogAndApply( return LogAndApply(column_family_set_->GetDefault(), edit, mu,
column_family_data_.find(0)->second, edit, mu, new_descriptor_log); new_descriptor_log);
} }
// Recover the last saved descriptor from persistent storage. // Recover the last saved descriptor from persistent storage.
Status Recover(); Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families);
// Reads a manifest file and returns a list of column families in
// column_families.
static Status ListColumnFamilies(std::vector<std::string>* column_families,
const std::string& dbname, Env* env);
// Try to reduce the number of levels. This call is valid when // Try to reduce the number of levels. This call is valid when
// only one level from the new max level to the old // only one level from the new max level to the old
@ -333,7 +310,7 @@ class VersionSet {
// Return the current version. // Return the current version.
Version* current() const { Version* current() const {
// TODO this only works for default column family now // TODO this only works for default column family now
return column_family_data_.find(0)->second->current; return column_family_set_->GetDefault()->current;
} }
// A Flag indicating whether write needs to slowdown because of there are // A Flag indicating whether write needs to slowdown because of there are
@ -418,7 +395,7 @@ class VersionSet {
// TODO: improve this function to be accurate for universal // TODO: improve this function to be accurate for universal
// compactions. // compactions.
// TODO this only works for default column family now // TODO this only works for default column family now
Version* version = column_family_data_.find(0)->second->current; Version* version = column_family_set_->GetDefault()->current;
int num_levels_to_check = int num_levels_to_check =
(options_->compaction_style != kCompactionStyleUniversal) ? (options_->compaction_style != kCompactionStyleUniversal) ?
NumberLevels() - 1 : 1; NumberLevels() - 1 : 1;
@ -432,21 +409,21 @@ class VersionSet {
// Returns true iff some level needs a compaction. // Returns true iff some level needs a compaction.
bool NeedsCompaction() const { bool NeedsCompaction() const {
// TODO this only works for default column family now // TODO this only works for default column family now
Version* version = column_family_data_.find(0)->second->current; Version* version = column_family_set_->GetDefault()->current;
return ((version->file_to_compact_ != nullptr) || NeedsSizeCompaction()); return ((version->file_to_compact_ != nullptr) || NeedsSizeCompaction());
} }
// Returns the maxmimum compaction score for levels 1 to max // Returns the maxmimum compaction score for levels 1 to max
double MaxCompactionScore() const { double MaxCompactionScore() const {
// TODO this only works for default column family now // TODO this only works for default column family now
Version* version = column_family_data_.find(0)->second->current; Version* version = column_family_set_->GetDefault()->current;
return version->max_compaction_score_; return version->max_compaction_score_;
} }
// See field declaration // See field declaration
int MaxCompactionScoreLevel() const { int MaxCompactionScoreLevel() const {
// TODO this only works for default column family now // TODO this only works for default column family now
Version* version = column_family_data_.find(0)->second->current; Version* version = column_family_set_->GetDefault()->current;
return version->max_compaction_score_level_; return version->max_compaction_score_level_;
} }
@ -490,9 +467,7 @@ class VersionSet {
void DropColumnFamily(VersionEdit* edit); void DropColumnFamily(VersionEdit* edit);
std::unordered_map<std::string, uint32_t> column_families_; ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;
uint32_t max_column_family_;
private: private:
class Builder; class Builder;
@ -501,6 +476,13 @@ class VersionSet {
friend class Compaction; friend class Compaction;
friend class Version; friend class Version;
struct LogReporter : public log::Reader::Reporter {
Status* status;
virtual void Corruption(size_t bytes, const Status& s) {
if (this->status->ok()) *this->status = s;
}
};
// Save current contents to *log // Save current contents to *log
Status WriteSnapshot(log::Writer* log); Status WriteSnapshot(log::Writer* log);
@ -508,6 +490,8 @@ class VersionSet {
bool ManifestContains(const std::string& record) const; bool ManifestContains(const std::string& record) const;
std::unique_ptr<ColumnFamilySet> column_family_set_;
Env* const env_; Env* const env_;
const std::string dbname_; const std::string dbname_;
const Options* const options_; const Options* const options_;

@ -25,7 +25,7 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) {
} }
// TODO this only works for default column family now // TODO this only works for default column family now
Version* current_version = column_family_data_.find(0)->second->current; Version* current_version = column_family_set_->GetDefault()->current;
int current_levels = current_version->NumberLevels(); int current_levels = current_version->NumberLevels();
if (current_levels <= new_levels) { if (current_levels <= new_levels) {

@ -29,6 +29,8 @@ extern const ColumnFamilyHandle default_column_family;
struct ColumnFamilyDescriptor { struct ColumnFamilyDescriptor {
std::string name; std::string name;
ColumnFamilyOptions options; ColumnFamilyOptions options;
ColumnFamilyDescriptor()
: name(default_column_family_name), options(ColumnFamilyOptions()) {}
ColumnFamilyDescriptor(const std::string& name, ColumnFamilyDescriptor(const std::string& name,
const ColumnFamilyOptions& options) const ColumnFamilyOptions& options)
: name(name), options(options) {} : name(name), options(options) {}

@ -11,6 +11,7 @@
#include "db/filename.h" #include "db/filename.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "rocksdb/column_family.h"
#include "util/coding.h" #include "util/coding.h"
#include <ctime> #include <ctime>
@ -1015,10 +1016,12 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
TableCache tc(db_path_, &opt, soptions, 10); TableCache tc(db_path_, &opt, soptions, 10);
const InternalKeyComparator cmp(opt.comparator); const InternalKeyComparator cmp(opt.comparator);
VersionSet versions(db_path_, &opt, soptions, &tc, &cmp); VersionSet versions(db_path_, &opt, soptions, &tc, &cmp);
std::vector<ColumnFamilyDescriptor> dummy;
dummy.push_back(ColumnFamilyDescriptor());
// We rely the VersionSet::Recover to tell us the internal data structures // We rely the VersionSet::Recover to tell us the internal data structures
// in the db. And the Recover() should never do any change // in the db. And the Recover() should never do any change
// (like LogAndApply) to the manifest file. // (like LogAndApply) to the manifest file.
Status st = versions.Recover(); Status st = versions.Recover(dummy);
if (!st.ok()) { if (!st.ok()) {
return st; return st;
} }
@ -1072,10 +1075,12 @@ void ReduceDBLevelsCommand::DoCommand() {
TableCache tc(db_path_, &opt, soptions, 10); TableCache tc(db_path_, &opt, soptions, 10);
const InternalKeyComparator cmp(opt.comparator); const InternalKeyComparator cmp(opt.comparator);
VersionSet versions(db_path_, &opt, soptions, &tc, &cmp); VersionSet versions(db_path_, &opt, soptions, &tc, &cmp);
std::vector<ColumnFamilyDescriptor> dummy;
dummy.push_back(ColumnFamilyDescriptor());
// We rely the VersionSet::Recover to tell us the internal data structures // We rely the VersionSet::Recover to tell us the internal data structures
// in the db. And the Recover() should never do any change (like LogAndApply) // in the db. And the Recover() should never do any change (like LogAndApply)
// to the manifest file. // to the manifest file.
st = versions.Recover(); st = versions.Recover(dummy);
if (!st.ok()) { if (!st.ok()) {
exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString()); exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString());
return; return;

Loading…
Cancel
Save