[column families] Implement DB::OpenWithColumnFamilies()

Summary:
In addition to implementing OpenWithColumnFamilies, this diff also includes some minor changes:
* Changed all column family names from Slice() to std::string. The performance of column family name handling is not critical, and it's more convenient and cleaner to have names as std::strings
* Implemented ColumnFamilyOptions(const Options&) and DBOptions(const Options&)
* Added ColumnFamilyOptions to VersionSet::ColumnFamilyData. ColumnFamilyOptions are specified on OpenWithColumnFamilies() and CreateColumnFamily()

I will keep the diff in the Phabricator for a day or two and will push to the branch then. Feel free to comment even after the diff has been pushed.

Test Plan: Added a simple unit test

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15033
main
Igor Canadi 11 years ago
parent d3a2ba9c64
commit 72918efffe
  1. 34
      db/column_family_test.cc
  2. 95
      db/db_impl.cc
  3. 8
      db/db_impl.h
  4. 7
      db/db_impl_readonly.cc
  5. 10
      db/version_set.cc
  6. 4
      db/version_set.h
  7. 2
      include/rocksdb/column_family.h
  8. 8
      include/rocksdb/db.h
  9. 6
      include/rocksdb/options.h
  10. 95
      util/options.cc

@ -33,8 +33,15 @@ class ColumnFamilyTest {
db_ = nullptr;
}
void Open() {
ASSERT_OK(DB::Open(options_, dbname_, &db_));
Status Open(vector<string> cf) {
vector<ColumnFamilyDescriptor> column_families;
for (auto x : cf) {
column_families.push_back(
ColumnFamilyDescriptor(x, ColumnFamilyOptions()));
}
vector <ColumnFamilyHandle> handles;
return DB::OpenWithColumnFamilies(db_options_, dbname_, column_families,
&handles, &db_);
}
Options options_;
@ -45,25 +52,26 @@ class ColumnFamilyTest {
};
TEST(ColumnFamilyTest, AddDrop) {
Open();
ASSERT_OK(Open({"default"}));
ColumnFamilyHandle handles[4];
ASSERT_OK(db_->CreateColumnFamily(column_family_options_, Slice("one"),
&handles[0]));
ASSERT_OK(db_->CreateColumnFamily(column_family_options_, Slice("two"),
&handles[1]));
ASSERT_OK(db_->CreateColumnFamily(column_family_options_, Slice("three"),
&handles[2]));
ASSERT_OK(
db_->CreateColumnFamily(column_family_options_, "one", &handles[0]));
ASSERT_OK(
db_->CreateColumnFamily(column_family_options_, "two", &handles[1]));
ASSERT_OK(
db_->CreateColumnFamily(column_family_options_, "three", &handles[2]));
ASSERT_OK(db_->DropColumnFamily(handles[1]));
ASSERT_OK(db_->CreateColumnFamily(column_family_options_, Slice("four"),
&handles[3]));
ASSERT_OK(
db_->CreateColumnFamily(column_family_options_, "four", &handles[3]));
Close();
Open(); // this will roll the manifest, column families should stay consistent
ASSERT_TRUE(Open({"default"}).IsInvalidArgument());
ASSERT_OK(Open({"default", "one", "three", "four"}));
Close();
vector<string> families;
DB::ListColumnFamilies(db_options_, dbname_, &families);
sort(families.begin(), families.end());
ASSERT_TRUE(families == vector<string>({"four", "one", "three"}));
ASSERT_TRUE(families == vector<string>({"default", "four", "one", "three"}));
}
} // namespace rocksdb

@ -60,7 +60,7 @@
namespace rocksdb {
const Slice& default_column_family_name("default");
const std::string default_column_family_name("default");
void dumpLeveldbBuildVersion(Logger * log);
@ -843,8 +843,10 @@ void DBImpl::PurgeObsoleteWALFiles() {
// If externalTable is set, then apply recovered transactions
// to that table. This is used for readonly mode.
Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
bool error_if_log_file_exist) {
Status DBImpl::Recover(
VersionEdit* edit,
const std::vector<ColumnFamilyDescriptor>& column_families,
MemTable* external_table, bool error_if_log_file_exist) {
mutex_.AssertHeld();
assert(db_lock_ == nullptr);
@ -894,6 +896,19 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
Status s = versions_->Recover();
if (s.ok()) {
if (column_families.size() != versions_->column_families_.size()) {
return Status::InvalidArgument("Column family specifications mismatch");
}
for (auto cf : column_families) {
auto cf_iter = versions_->column_families_.find(cf.name);
if (cf_iter == versions_->column_families_.end()) {
return Status::InvalidArgument("Column family specifications mismatch");
}
auto cf_data_iter = versions_->column_family_data_.find(cf_iter->second);
assert(cf_data_iter != versions_->column_family_data_.end());
cf_data_iter->second.options = cf.options;
}
SequenceNumber max_sequence(0);
// Recover from all newer log files than the ones named in the
@ -2862,15 +2877,23 @@ std::vector<Status> DBImpl::MultiGet(
}
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
const Slice& column_family,
const std::string& column_family_name,
ColumnFamilyHandle* handle) {
VersionEdit edit(0);
edit.AddColumnFamily(column_family.ToString());
edit.AddColumnFamily(column_family_name);
MutexLock l(&mutex_);
++versions_->max_column_family_;
handle->id = versions_->max_column_family_;
edit.SetColumnFamily(handle->id);
return versions_->LogAndApply(&edit, &mutex_);
Status s = versions_->LogAndApply(&edit, &mutex_);
if (s.ok()) {
// add to internal data structures
versions_->column_families_[column_family_name] = handle->id;
versions_->column_family_data_.insert(
{handle->id,
VersionSet::ColumnFamilyData(column_family_name, options)});
}
return s;
}
Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) {
@ -2878,7 +2901,19 @@ Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) {
edit.DropColumnFamily();
edit.SetColumnFamily(column_family.id);
MutexLock l(&mutex_);
return versions_->LogAndApply(&edit, &mutex_);
auto data_iter = versions_->column_family_data_.find(column_family.id);
if (data_iter == versions_->column_family_data_.end()) {
return Status::NotFound("Column family not found");
}
Status s = versions_->LogAndApply(&edit, &mutex_);
if (s.ok()) {
// remove from internal data structures
auto cf_iter = versions_->column_families_.find(data_iter->second.name);
assert(cf_iter != versions_->column_families_.end());
versions_->column_families_.erase(cf_iter);
versions_->column_family_data_.erase(data_iter);
}
return s;
}
bool DBImpl::KeyMayExist(const ReadOptions& options,
@ -3803,7 +3838,7 @@ Status DB::Merge(const WriteOptions& opt,
// Default implementation -- returns not supported status
Status DB::CreateColumnFamily(const ColumnFamilyOptions& options,
const Slice& column_family,
const std::string& column_family_name,
ColumnFamilyHandle* handle) {
return Status::NotSupported("");
}
@ -3814,8 +3849,33 @@ Status DB::DropColumnFamily(const ColumnFamilyHandle& column_family) {
DB::~DB() { }
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(default_column_family_name, cf_options));
std::vector<ColumnFamilyHandle> handles;
return DB::OpenWithColumnFamilies(db_options, dbname, column_families,
&handles, dbptr);
}
Status DB::OpenWithColumnFamilies(
const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle>* handles, DB** dbptr) {
*dbptr = nullptr;
EnvOptions soptions;
// TODO temporary until we change DBImpl to accept
// DBOptions instead of Options
ColumnFamilyOptions default_column_family_options;
for (auto cfd : column_families) {
if (cfd.name == default_column_family_name) {
default_column_family_options = cfd.options;
break;
}
}
// default options
Options options(db_options, default_column_family_options);
if (options.block_cache != nullptr && options.no_block_cache) {
return Status::InvalidArgument(
@ -3836,7 +3896,8 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
}
impl->mutex_.Lock();
VersionEdit edit(impl->NumberLevels());
s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
// Handles create_if_missing, error_if_exists
s = impl->Recover(&edit, column_families);
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
unique_ptr<WritableFile> lfile;
@ -3854,6 +3915,13 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
// set column family handles
handles->clear();
for (auto cf : column_families) {
auto cf_iter = impl->versions_->column_families_.find(cf.name);
assert(cf_iter != impl->versions_->column_families_.end());
handles->push_back(ColumnFamilyHandle(cf_iter->second));
}
delete impl->InstallSuperVersion(new DBImpl::SuperVersion());
impl->mem_->SetLogNumber(impl->logfile_number_);
impl->DeleteObsoleteFiles();
@ -3878,19 +3946,12 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
if (s.ok()) {
*dbptr = impl;
} else {
handles->clear();
delete impl;
}
return s;
}
Status DB::OpenWithColumnFamilies(
const DBOptions& db_options, const std::string& name,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle>* handles, DB** dbptr) {
// TODO
return Status::NotSupported("Working on it");
}
Status DB::ListColumnFamilies(const DBOptions& db_options,
const std::string& name,
std::vector<std::string>* column_families) {

@ -62,7 +62,7 @@ class DBImpl : public DB {
const std::vector<Slice>& keys, std::vector<std::string>* values);
virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
const Slice& column_family,
const std::string& column_family,
ColumnFamilyHandle* handle);
virtual Status DropColumnFamily(const ColumnFamilyHandle& column_family);
@ -293,8 +293,10 @@ class DBImpl : public DB {
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
Status Recover(VersionEdit* edit, MemTable* external_table = nullptr,
bool error_if_log_file_exist = false);
Status Recover(VersionEdit* edit,
const std::vector<ColumnFamilyDescriptor>& column_families,
MemTable* external_table = nullptr,
bool error_if_log_file_exist = false);
void MaybeIgnoreError(Status* s) const;

@ -86,7 +86,12 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
DBImplReadOnly* impl = new DBImplReadOnly(options, dbname);
impl->mutex_.Lock();
VersionEdit edit(impl->NumberLevels());
Status s = impl->Recover(&edit, impl->GetMemTable(),
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(default_column_family_name, cf_options));
Status s = impl->Recover(&edit, column_families, impl->GetMemTable(),
error_if_log_file_exist);
impl->mutex_.Unlock();
if (s.ok()) {

@ -1443,6 +1443,11 @@ Status VersionSet::Recover() {
uint64_t prev_log_number = 0;
Builder builder(this, current_);
// add default column family
column_families_.insert({default_column_family_name, 0});
column_family_data_.insert(
{0, ColumnFamilyData(default_column_family_name)});
{
LogReporter reporter;
reporter.status = &s;
@ -1847,6 +1852,11 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
// Save column families
for (auto cf : column_families_) {
VersionEdit edit(0);
if (cf.second == 0) {
// default column family is always there,
// no need to explicitly write it
continue;
}
edit.AddColumnFamily(cf.first);
edit.SetColumnFamily(cf.second);
std::string record;

@ -439,7 +439,11 @@ class VersionSet {
// column family metadata
struct ColumnFamilyData {
std::string name;
ColumnFamilyOptions options;
explicit ColumnFamilyData(const std::string& name) : name(name) {}
ColumnFamilyData(const std::string& name,
const ColumnFamilyOptions& options)
: name(name), options(options) {}
};
std::unordered_map<std::string, uint32_t> column_families_;
std::unordered_map<uint32_t, ColumnFamilyData> column_family_data_;

@ -26,6 +26,6 @@ struct ColumnFamilyHandle {
};
const ColumnFamilyHandle default_column_family = ColumnFamilyHandle();
extern const Slice& default_column_family_name;
extern const std::string default_column_family_name;
}

@ -13,6 +13,7 @@
#include <stdio.h>
#include <memory>
#include <vector>
#include <string>
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/types.h"
@ -26,8 +27,11 @@ struct ColumnFamilyHandle;
extern const ColumnFamilyHandle default_column_family;
struct ColumnFamilyDescriptor {
Slice name;
std::string name;
ColumnFamilyOptions options;
ColumnFamilyDescriptor(const std::string& name,
const ColumnFamilyOptions& options)
: name(name), options(options) {}
};
// Update Makefile if you change these
@ -117,7 +121,7 @@ class DB {
// Create a column_family and return the handle of column family
// through the argument handle.
virtual Status CreateColumnFamily(const ColumnFamilyOptions& options,
const Slice& column_family,
const std::string& column_family_name,
ColumnFamilyHandle* handle);
// Drop a column family specified by column_family handle.

@ -68,6 +68,8 @@ struct CompressionOptions {
strategy(strategy){}
};
struct Options;
struct ColumnFamilyOptions {
// -------------------
// Parameters that affect behavior
@ -426,6 +428,8 @@ struct ColumnFamilyOptions {
// Create ColumnFamilyOptions with default values for all fields
ColumnFamilyOptions();
// Create ColumnFamilyOptions from Options
explicit ColumnFamilyOptions(const Options& options);
};
struct DBOptions {
@ -627,6 +631,8 @@ struct DBOptions {
// Create DBOptions with default values for all fields
DBOptions();
// Create DBOptions from Options
explicit DBOptions(const Options& options);
};
// Options to control the behavior of a database (passed to DB::Open)

@ -73,6 +73,65 @@ ColumnFamilyOptions::ColumnFamilyOptions()
assert(memtable_factory.get() != nullptr);
}
ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
: comparator(options.comparator),
merge_operator(options.merge_operator),
compaction_filter(options.compaction_filter),
compaction_filter_factory(options.compaction_filter_factory),
write_buffer_size(options.write_buffer_size),
max_write_buffer_number(options.max_write_buffer_number),
min_write_buffer_number_to_merge(
options.min_write_buffer_number_to_merge),
block_cache(options.block_cache),
block_cache_compressed(options.block_cache_compressed),
block_size(options.block_size),
block_restart_interval(options.block_restart_interval),
compression(options.compression),
compression_per_level(options.compression_per_level),
compression_opts(options.compression_opts),
filter_policy(options.filter_policy),
prefix_extractor(options.prefix_extractor),
whole_key_filtering(options.whole_key_filtering),
num_levels(options.num_levels),
level0_file_num_compaction_trigger(
options.level0_file_num_compaction_trigger),
level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger),
level0_stop_writes_trigger(options.level0_stop_writes_trigger),
max_mem_compaction_level(options.max_mem_compaction_level),
target_file_size_base(options.target_file_size_base),
target_file_size_multiplier(options.target_file_size_multiplier),
max_bytes_for_level_base(options.max_bytes_for_level_base),
max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier),
max_bytes_for_level_multiplier_additional(
options.max_bytes_for_level_multiplier_additional),
expanded_compaction_factor(options.expanded_compaction_factor),
source_compaction_factor(options.source_compaction_factor),
max_grandparent_overlap_factor(options.max_grandparent_overlap_factor),
disable_seek_compaction(options.disable_seek_compaction),
soft_rate_limit(options.soft_rate_limit),
hard_rate_limit(options.hard_rate_limit),
rate_limit_delay_max_milliseconds(
options.rate_limit_delay_max_milliseconds),
no_block_cache(options.no_block_cache),
table_cache_numshardbits(options.table_cache_numshardbits),
table_cache_remove_scan_count_limit(
options.table_cache_remove_scan_count_limit),
disable_auto_compactions(options.disable_auto_compactions),
purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush),
block_size_deviation(options.block_size_deviation),
compaction_style(options.compaction_style),
compaction_options_universal(options.compaction_options_universal),
filter_deletes(options.filter_deletes),
max_sequential_skip_in_iterations(
options.max_sequential_skip_in_iterations),
memtable_factory(options.memtable_factory),
table_factory(options.table_factory),
table_properties_collectors(options.table_properties_collectors),
inplace_update_support(options.inplace_update_support),
inplace_update_num_locks(options.inplace_update_num_locks) {
assert(memtable_factory.get() != nullptr);
}
DBOptions::DBOptions()
: create_if_missing(false),
error_if_exists(false),
@ -107,6 +166,42 @@ DBOptions::DBOptions()
use_adaptive_mutex(false),
bytes_per_sync(0) { }
DBOptions::DBOptions(const Options& options)
: create_if_missing(options.create_if_missing),
error_if_exists(options.error_if_exists),
paranoid_checks(options.paranoid_checks),
env(options.env),
info_log(options.info_log),
max_open_files(options.max_open_files),
statistics(options.statistics),
disableDataSync(options.disableDataSync),
use_fsync(options.use_fsync),
db_stats_log_interval(options.db_stats_log_interval),
db_log_dir(options.db_log_dir),
wal_dir(options.wal_dir),
delete_obsolete_files_period_micros(
options.delete_obsolete_files_period_micros),
max_background_compactions(options.max_background_compactions),
max_background_flushes(options.max_background_flushes),
max_log_file_size(options.max_log_file_size),
log_file_time_to_roll(options.log_file_time_to_roll),
keep_log_file_num(options.keep_log_file_num),
max_manifest_file_size(options.max_manifest_file_size),
arena_block_size(options.arena_block_size),
WAL_ttl_seconds(options.WAL_ttl_seconds),
WAL_size_limit_MB(options.WAL_size_limit_MB),
manifest_preallocation_size(options.manifest_preallocation_size),
allow_os_buffer(options.allow_os_buffer),
allow_mmap_reads(options.allow_mmap_reads),
allow_mmap_writes(options.allow_mmap_writes),
is_fd_close_on_exec(options.is_fd_close_on_exec),
skip_log_error_on_recovery(options.skip_log_error_on_recovery),
stats_dump_period_sec(options.stats_dump_period_sec),
advise_random_on_open(options.advise_random_on_open),
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
use_adaptive_mutex(options.use_adaptive_mutex),
bytes_per_sync(options.bytes_per_sync) {}
static const char* const access_hints[] = {
"NONE", "NORMAL", "SEQUENTIAL", "WILLNEED"
};

Loading…
Cancel
Save