[CF] Split SanitizeOptions into two

Summary:
There are three SanitizeOption-s now : one for DBOptions, one for ColumnFamilyOptions and one for Options (which just calls the other two)

I have also reshuffled some options -- table_cache options and info_log should live in DBOptions, for example.

Test Plan: make check doesn't complain

Reviewers: dhruba, haobo, kailiu, sdong

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15873
main
Igor Canadi 11 years ago
parent 5e2c4fe766
commit 73f62255c1
  1. 93
      db/column_family.cc
  2. 25
      db/column_family.h
  3. 86
      db/compaction_picker.cc
  4. 13
      db/compaction_picker.h
  5. 75
      db/db_impl.cc
  6. 2
      db/db_impl.h
  7. 2
      db/version_set.cc
  8. 46
      include/rocksdb/options.h
  9. 2
      util/auto_roll_logger.cc
  10. 2
      util/auto_roll_logger.h
  11. 2
      util/auto_roll_logger_test.cc
  12. 18
      util/options.cc
  13. 2
      utilities/backupable/backupable_db_test.cc

@ -15,9 +15,77 @@
#include "db/version_set.h" #include "db/version_set.h"
#include "db/compaction_picker.h" #include "db/compaction_picker.h"
#include "db/table_properties_collector.h"
#include "util/hash_skiplist_rep.h"
namespace rocksdb { namespace rocksdb {
namespace {
// Fix user-supplied options to be reasonable
template <class T, class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
}
} // anonymous namespace
ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
const InternalFilterPolicy* ipolicy,
const ColumnFamilyOptions& src) {
ColumnFamilyOptions result = src;
result.comparator = icmp;
result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
ClipToRange(&result.write_buffer_size,
((size_t)64) << 10, ((size_t)64) << 30);
// if user sets arena_block_size, we trust user to use this value. Otherwise,
// calculate a proper value from writer_buffer_size;
if (result.arena_block_size <= 0) {
result.arena_block_size = result.write_buffer_size / 10;
}
result.min_write_buffer_number_to_merge =
std::min(result.min_write_buffer_number_to_merge,
result.max_write_buffer_number - 1);
if (result.block_cache == nullptr && !result.no_block_cache) {
result.block_cache = NewLRUCache(8 << 20);
}
result.compression_per_level = src.compression_per_level;
if (result.block_size_deviation < 0 || result.block_size_deviation > 100) {
result.block_size_deviation = 0;
}
if (result.max_mem_compaction_level >= result.num_levels) {
result.max_mem_compaction_level = result.num_levels - 1;
}
if (result.soft_rate_limit > result.hard_rate_limit) {
result.soft_rate_limit = result.hard_rate_limit;
}
if (result.prefix_extractor) {
// If a prefix extractor has been supplied and a HashSkipListRepFactory is
// being used, make sure that the latter uses the former as its transform
// function.
auto factory =
dynamic_cast<HashSkipListRepFactory*>(result.memtable_factory.get());
if (factory && factory->GetTransform() != result.prefix_extractor) {
result.memtable_factory = std::make_shared<SkipListFactory>();
}
}
// -- Sanitize the table properties collector
// All user defined properties collectors will be wrapped by
// UserKeyTablePropertiesCollector since for them they only have the
// knowledge of the user keys; internal keys are invisible to them.
auto& collectors = result.table_properties_collectors;
for (size_t i = 0; i < result.table_properties_collectors.size(); ++i) {
assert(collectors[i]);
collectors[i] =
std::make_shared<UserKeyTablePropertiesCollector>(collectors[i]);
}
// Add collector to collect internal key statistics
collectors.push_back(std::make_shared<InternalKeyPropertiesCollector>());
return result;
}
SuperVersion::SuperVersion() {} SuperVersion::SuperVersion() {}
SuperVersion::~SuperVersion() { SuperVersion::~SuperVersion() {
@ -61,13 +129,16 @@ void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, Version* dummy_versions,
const ColumnFamilyOptions& options) const ColumnFamilyOptions& options,
Logger* logger)
: id_(id), : id_(id),
name_(name), name_(name),
dummy_versions_(dummy_versions), dummy_versions_(dummy_versions),
current_(nullptr), current_(nullptr),
options_(options), internal_comparator_(options.comparator),
icmp_(options_.comparator), internal_filter_policy_(options.filter_policy),
options_(SanitizeOptions(&internal_comparator_, &internal_filter_policy_,
options)),
mem_(nullptr), mem_(nullptr),
imm_(options.min_write_buffer_number_to_merge), imm_(options.min_write_buffer_number_to_merge),
super_version_(nullptr), super_version_(nullptr),
@ -77,9 +148,11 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
log_number_(0), log_number_(0),
need_slowdown_for_num_level0_files_(false) { need_slowdown_for_num_level0_files_(false) {
if (options_.compaction_style == kCompactionStyleUniversal) { if (options_.compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset(new UniversalCompactionPicker(&options_, &icmp_)); compaction_picker_.reset(new UniversalCompactionPicker(
&options_, &internal_comparator_, logger));
} else { } else {
compaction_picker_.reset(new LevelCompactionPicker(&options_, &icmp_)); compaction_picker_.reset(
new LevelCompactionPicker(&options_, &internal_comparator_, logger));
} }
} }
@ -119,7 +192,7 @@ void ColumnFamilyData::CreateNewMemtable() {
if (mem_ != nullptr) { if (mem_ != nullptr) {
delete mem_->Unref(); delete mem_->Unref();
} }
mem_ = new MemTable(icmp_, options_); mem_ = new MemTable(internal_comparator_, options_);
mem_->Ref(); mem_->Ref();
} }
@ -148,9 +221,11 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion(
return nullptr; return nullptr;
} }
ColumnFamilySet::ColumnFamilySet() ColumnFamilySet::ColumnFamilySet(Logger* logger)
: max_column_family_(0), : max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(0, "", nullptr, ColumnFamilyOptions())) { dummy_cfd_(new ColumnFamilyData(0, "", nullptr, ColumnFamilyOptions(),
nullptr)),
logger_(logger) {
// initialize linked list // initialize linked list
dummy_cfd_->prev_.store(dummy_cfd_); dummy_cfd_->prev_.store(dummy_cfd_);
dummy_cfd_->next_.store(dummy_cfd_); dummy_cfd_->next_.store(dummy_cfd_);
@ -206,7 +281,7 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
assert(column_families_.find(name) == column_families_.end()); assert(column_families_.find(name) == column_families_.end());
column_families_.insert({name, id}); column_families_.insert({name, id});
ColumnFamilyData* new_cfd = ColumnFamilyData* new_cfd =
new ColumnFamilyData(id, name, dummy_versions, options); new ColumnFamilyData(id, name, dummy_versions, options, logger_);
column_family_data_.insert({id, new_cfd}); column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id); max_column_family_ = std::max(max_column_family_, id);
// add to linked list // add to linked list

@ -14,6 +14,7 @@
#include <vector> #include <vector>
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/env.h"
#include "db/memtablelist.h" #include "db/memtablelist.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
@ -55,11 +56,16 @@ struct SuperVersion {
Version* new_current); Version* new_current);
}; };
extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
const InternalFilterPolicy* ipolicy,
const ColumnFamilyOptions& src);
// column family metadata. not thread-safe. should be protected by db_mutex // column family metadata. not thread-safe. should be protected by db_mutex
class ColumnFamilyData { class ColumnFamilyData {
public: public:
ColumnFamilyData(uint32_t id, const std::string& name, ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, const ColumnFamilyOptions& options); Version* dummy_versions, const ColumnFamilyOptions& options,
Logger* logger);
~ColumnFamilyData(); ~ColumnFamilyData();
uint32_t GetID() const { return id_; } uint32_t GetID() const { return id_; }
@ -89,8 +95,12 @@ class ColumnFamilyData {
CompactionPicker* compaction_picker() const { CompactionPicker* compaction_picker() const {
return compaction_picker_.get(); return compaction_picker_.get();
} }
const Comparator* user_comparator() const { return icmp_.user_comparator(); } const Comparator* user_comparator() const {
const InternalKeyComparator& internal_comparator() const { return icmp_; } return internal_comparator_.user_comparator();
}
const InternalKeyComparator& internal_comparator() const {
return internal_comparator_;
}
SuperVersion* GetSuperVersion() const { return super_version_; } SuperVersion* GetSuperVersion() const { return super_version_; }
uint64_t GetSuperVersionNumber() const { uint64_t GetSuperVersionNumber() const {
@ -117,9 +127,11 @@ class ColumnFamilyData {
const std::string name_; const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions. Version* dummy_versions_; // Head of circular doubly-linked list of versions.
Version* current_; // == dummy_versions->prev_ Version* current_; // == dummy_versions->prev_
ColumnFamilyOptions options_;
const InternalKeyComparator icmp_; const InternalKeyComparator internal_comparator_;
const InternalFilterPolicy internal_filter_policy_;
ColumnFamilyOptions options_;
MemTable* mem_; MemTable* mem_;
MemTableList imm_; MemTableList imm_;
@ -170,7 +182,7 @@ class ColumnFamilySet {
ColumnFamilyData* current_; ColumnFamilyData* current_;
}; };
ColumnFamilySet(); explicit ColumnFamilySet(Logger* logger);
~ColumnFamilySet(); ~ColumnFamilySet();
ColumnFamilyData* GetDefault() const; ColumnFamilyData* GetDefault() const;
@ -203,6 +215,7 @@ class ColumnFamilySet {
std::vector<ColumnFamilyData*> droppped_column_families_; std::vector<ColumnFamilyData*> droppped_column_families_;
uint32_t max_column_family_; uint32_t max_column_family_;
ColumnFamilyData* dummy_cfd_; ColumnFamilyData* dummy_cfd_;
Logger* logger_;
}; };
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {

@ -10,7 +10,6 @@
#include "db/compaction_picker.h" #include "db/compaction_picker.h"
#include <limits> #include <limits>
#include "util/statistics.h"
namespace rocksdb { namespace rocksdb {
@ -42,8 +41,10 @@ uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) {
} // anonymous namespace } // anonymous namespace
CompactionPicker::CompactionPicker(const ColumnFamilyOptions* options, CompactionPicker::CompactionPicker(const ColumnFamilyOptions* options,
const InternalKeyComparator* icmp) const InternalKeyComparator* icmp,
Logger* logger)
: compactions_in_progress_(options->num_levels), : compactions_in_progress_(options->num_levels),
logger_(logger),
options_(options), options_(options),
num_levels_(options->num_levels), num_levels_(options->num_levels),
icmp_(icmp) { icmp_(icmp) {
@ -269,17 +270,13 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) {
&c->parent_index_); &c->parent_index_);
if (expanded1.size() == c->inputs_[1].size() && if (expanded1.size() == c->inputs_[1].size() &&
!FilesInCompaction(expanded1)) { !FilesInCompaction(expanded1)) {
Log(options_->info_log, Log(logger_,
"Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu bytes)" "Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu bytes)"
"\n", "\n",
(unsigned long)level, (unsigned long)level, (unsigned long)(c->inputs_[0].size()),
(unsigned long)(c->inputs_[0].size()), (unsigned long)(c->inputs_[1].size()), (unsigned long)inputs0_size,
(unsigned long)(c->inputs_[1].size()), (unsigned long)inputs1_size, (unsigned long)(expanded0.size()),
(unsigned long)inputs0_size, (unsigned long)(expanded1.size()), (unsigned long)expanded0_size,
(unsigned long)inputs1_size,
(unsigned long)(expanded0.size()),
(unsigned long)(expanded1.size()),
(unsigned long)expanded0_size,
(unsigned long)inputs1_size); (unsigned long)inputs1_size);
smallest = new_start; smallest = new_start;
largest = new_limit; largest = new_limit;
@ -344,7 +341,7 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
c->inputs_[0] = inputs; c->inputs_[0] = inputs;
if (ExpandWhileOverlapping(c) == false) { if (ExpandWhileOverlapping(c) == false) {
delete c; delete c;
Log(options_->info_log, "Could not compact due to expansion failure.\n"); Log(logger_, "Could not compact due to expansion failure.\n");
return nullptr; return nullptr;
} }
@ -511,7 +508,7 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
} }
//if (i > Version::number_of_files_to_sort_) { //if (i > Version::number_of_files_to_sort_) {
// Log(options_->info_log, "XXX Looking at index %d", i); // Log(logger_, "XXX Looking at index %d", i);
//} //}
// Do not pick this file if its parents at level+1 are being compacted. // Do not pick this file if its parents at level+1 are being compacted.
@ -547,13 +544,12 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
if ((version->files_[level].size() < if ((version->files_[level].size() <
(unsigned int)options_->level0_file_num_compaction_trigger)) { (unsigned int)options_->level0_file_num_compaction_trigger)) {
Log(options_->info_log, "Universal: nothing to do\n"); Log(logger_, "Universal: nothing to do\n");
return nullptr; return nullptr;
} }
Version::FileSummaryStorage tmp; Version::FileSummaryStorage tmp;
Log(options_->info_log, "Universal: candidate files(%zu): %s\n", Log(logger_, "Universal: candidate files(%zu): %s\n",
version->files_[level].size(), version->files_[level].size(), version->LevelFileSummary(&tmp, 0));
version->LevelFileSummary(&tmp, 0));
// Check for size amplification first. // Check for size amplification first.
Compaction* c = PickCompactionUniversalSizeAmp(version, score); Compaction* c = PickCompactionUniversalSizeAmp(version, score);
@ -599,10 +595,6 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
c->bottommost_level_ = true; c->bottommost_level_ = true;
} }
// update statistics
MeasureTime(options_->statistics.get(), NUM_FILES_IN_SINGLE_COMPACTION,
c->inputs_[0].size());
// mark all the files that are being compacted // mark all the files that are being compacted
c->MarkFilesBeingCompacted(true); c->MarkFilesBeingCompacted(true);
@ -658,8 +650,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
candidate_count = 1; candidate_count = 1;
break; break;
} }
Log(options_->info_log, Log(logger_, "Universal: file %lu[%d] being compacted, skipping",
"Universal: file %lu[%d] being compacted, skipping",
(unsigned long)f->number, loop); (unsigned long)f->number, loop);
f = nullptr; f = nullptr;
} }
@ -668,7 +659,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// first candidate to be compacted. // first candidate to be compacted.
uint64_t candidate_size = f != nullptr? f->file_size : 0; uint64_t candidate_size = f != nullptr? f->file_size : 0;
if (f != nullptr) { if (f != nullptr) {
Log(options_->info_log, "Universal: Possible candidate file %lu[%d].", Log(logger_, "Universal: Possible candidate file %lu[%d].",
(unsigned long)f->number, loop); (unsigned long)f->number, loop);
} }
@ -701,11 +692,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
i < loop + candidate_count && i < file_by_time.size(); i++) { i < loop + candidate_count && i < file_by_time.size(); i++) {
int index = file_by_time[i]; int index = file_by_time[i];
FileMetaData* f = version->files_[level][index]; FileMetaData* f = version->files_[level][index];
Log(options_->info_log, Log(logger_, "Universal: Skipping file %lu[%d] with size %lu %d\n",
"Universal: Skipping file %lu[%d] with size %lu %d\n", (unsigned long)f->number, i, (unsigned long)f->file_size,
(unsigned long)f->number,
i,
(unsigned long)f->file_size,
f->being_compacted); f->being_compacted);
} }
} }
@ -740,10 +728,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
int index = file_by_time[i]; int index = file_by_time[i];
FileMetaData* f = c->input_version_->files_[level][index]; FileMetaData* f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f); c->inputs_[0].push_back(f);
Log(options_->info_log, "Universal: Picking file %lu[%d] with size %lu\n", Log(logger_, "Universal: Picking file %lu[%d] with size %lu\n",
(unsigned long)f->number, (unsigned long)f->number, i, (unsigned long)f->file_size);
i,
(unsigned long)f->file_size);
} }
return c; return c;
} }
@ -779,9 +765,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
start_index = loop; // Consider this as the first candidate. start_index = loop; // Consider this as the first candidate.
break; break;
} }
Log(options_->info_log, "Universal: skipping file %lu[%d] compacted %s", Log(logger_, "Universal: skipping file %lu[%d] compacted %s",
(unsigned long)f->number, (unsigned long)f->number, loop,
loop,
" cannot be a candidate to reduce size amp.\n"); " cannot be a candidate to reduce size amp.\n");
f = nullptr; f = nullptr;
} }
@ -789,10 +774,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
return nullptr; // no candidate files return nullptr; // no candidate files
} }
Log(options_->info_log, "Universal: First candidate file %lu[%d] %s", Log(logger_, "Universal: First candidate file %lu[%d] %s",
(unsigned long)f->number, (unsigned long)f->number, start_index, " to reduce size amp.\n");
start_index,
" to reduce size amp.\n");
// keep adding up all the remaining files // keep adding up all the remaining files
for (unsigned int loop = start_index; loop < file_by_time.size() - 1; for (unsigned int loop = start_index; loop < file_by_time.size() - 1;
@ -800,10 +783,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
int index = file_by_time[loop]; int index = file_by_time[loop];
f = version->files_[level][index]; f = version->files_[level][index];
if (f->being_compacted) { if (f->being_compacted) {
Log(options_->info_log, Log(logger_, "Universal: Possible candidate file %lu[%d] %s.",
"Universal: Possible candidate file %lu[%d] %s.", (unsigned long)f->number, loop,
(unsigned long)f->number,
loop,
" is already being compacted. No size amp reduction possible.\n"); " is already being compacted. No size amp reduction possible.\n");
return nullptr; return nullptr;
} }
@ -820,18 +801,16 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
// size amplification = percentage of additional size // size amplification = percentage of additional size
if (candidate_size * 100 < ratio * earliest_file_size) { if (candidate_size * 100 < ratio * earliest_file_size) {
Log(options_->info_log, Log(logger_,
"Universal: size amp not needed. newer-files-total-size %lu " "Universal: size amp not needed. newer-files-total-size %lu "
"earliest-file-size %lu", "earliest-file-size %lu",
(unsigned long)candidate_size, (unsigned long)candidate_size, (unsigned long)earliest_file_size);
(unsigned long)earliest_file_size);
return nullptr; return nullptr;
} else { } else {
Log(options_->info_log, Log(logger_,
"Universal: size amp needed. newer-files-total-size %lu " "Universal: size amp needed. newer-files-total-size %lu "
"earliest-file-size %lu", "earliest-file-size %lu",
(unsigned long)candidate_size, (unsigned long)candidate_size, (unsigned long)earliest_file_size);
(unsigned long)earliest_file_size);
} }
assert(start_index >= 0 && start_index < file_by_time.size() - 1); assert(start_index >= 0 && start_index < file_by_time.size() - 1);
@ -845,11 +824,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
int index = file_by_time[loop]; int index = file_by_time[loop];
f = c->input_version_->files_[level][index]; f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f); c->inputs_[0].push_back(f);
Log(options_->info_log, Log(logger_, "Universal: size amp picking file %lu[%d] with size %lu",
"Universal: size amp picking file %lu[%d] with size %lu", (unsigned long)f->number, index, (unsigned long)f->file_size);
(unsigned long)f->number,
index,
(unsigned long)f->file_size);
} }
return c; return c;
} }

@ -12,6 +12,7 @@
#include "db/compaction.h" #include "db/compaction.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/env.h"
#include <vector> #include <vector>
#include <memory> #include <memory>
@ -25,7 +26,7 @@ class Version;
class CompactionPicker { class CompactionPicker {
public: public:
CompactionPicker(const ColumnFamilyOptions* options, CompactionPicker(const ColumnFamilyOptions* options,
const InternalKeyComparator* icmp); const InternalKeyComparator* icmp, Logger* logger);
virtual ~CompactionPicker(); virtual ~CompactionPicker();
// Pick level and inputs for a new compaction. // Pick level and inputs for a new compaction.
@ -116,7 +117,9 @@ class CompactionPicker {
// Per-level max bytes // Per-level max bytes
std::unique_ptr<uint64_t[]> level_max_bytes_; std::unique_ptr<uint64_t[]> level_max_bytes_;
Logger* logger_;
const ColumnFamilyOptions* const options_; const ColumnFamilyOptions* const options_;
private: private:
int num_levels_; int num_levels_;
@ -126,8 +129,8 @@ class CompactionPicker {
class UniversalCompactionPicker : public CompactionPicker { class UniversalCompactionPicker : public CompactionPicker {
public: public:
UniversalCompactionPicker(const ColumnFamilyOptions* options, UniversalCompactionPicker(const ColumnFamilyOptions* options,
const InternalKeyComparator* icmp) const InternalKeyComparator* icmp, Logger* logger)
: CompactionPicker(options, icmp) {} : CompactionPicker(options, icmp, logger) {}
virtual Compaction* PickCompaction(Version* version) override; virtual Compaction* PickCompaction(Version* version) override;
private: private:
@ -143,8 +146,8 @@ class UniversalCompactionPicker : public CompactionPicker {
class LevelCompactionPicker : public CompactionPicker { class LevelCompactionPicker : public CompactionPicker {
public: public:
LevelCompactionPicker(const ColumnFamilyOptions* options, LevelCompactionPicker(const ColumnFamilyOptions* options,
const InternalKeyComparator* icmp) const InternalKeyComparator* icmp, Logger* logger)
: CompactionPicker(options, icmp) {} : CompactionPicker(options, icmp, logger) {}
virtual Compaction* PickCompaction(Version* version) override; virtual Compaction* PickCompaction(Version* version) override;
private: private:

@ -120,32 +120,28 @@ struct DBImpl::CompactionState {
} }
}; };
namespace {
// Fix user-supplied options to be reasonable // Fix user-supplied options to be reasonable
template <class T, class V> template <class T, class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) { static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue; if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue; if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
} }
} // anonymous namespace
Options SanitizeOptions(const std::string& dbname, Options SanitizeOptions(const std::string& dbname,
const InternalKeyComparator* icmp, const InternalKeyComparator* icmp,
const InternalFilterPolicy* ipolicy, const InternalFilterPolicy* ipolicy,
const Options& src) { const Options& src) {
Options result = src; auto db_options = SanitizeOptions(dbname, DBOptions(src));
result.comparator = icmp; auto cf_options = SanitizeOptions(icmp, ipolicy, ColumnFamilyOptions(src));
result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr; return Options(db_options, cf_options);
ClipToRange(&result.max_open_files, 20, 1000000); }
ClipToRange(&result.write_buffer_size, ((size_t)64)<<10,
((size_t)64)<<30);
ClipToRange(&result.block_size, 1<<10, 4<<20);
// if user sets arena_block_size, we trust user to use this value. Otherwise, DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
// calculate a proper value from writer_buffer_size; DBOptions result = src;
if (result.arena_block_size <= 0) { ClipToRange(&result.max_open_files, 20, 1000000);
result.arena_block_size = result.write_buffer_size / 10;
}
result.min_write_buffer_number_to_merge = std::min(
result.min_write_buffer_number_to_merge, result.max_write_buffer_number-1);
if (result.info_log == nullptr) { if (result.info_log == nullptr) {
Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env, Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env,
result, &result.info_log); result, &result.info_log);
@ -154,60 +150,12 @@ Options SanitizeOptions(const std::string& dbname,
result.info_log = nullptr; result.info_log = nullptr;
} }
} }
if (result.block_cache == nullptr && !result.no_block_cache) {
result.block_cache = NewLRUCache(8 << 20);
}
result.compression_per_level = src.compression_per_level;
if (result.block_size_deviation < 0 || result.block_size_deviation > 100) {
result.block_size_deviation = 0;
}
if (result.max_mem_compaction_level >= result.num_levels) {
result.max_mem_compaction_level = result.num_levels - 1;
}
if (result.soft_rate_limit > result.hard_rate_limit) {
result.soft_rate_limit = result.hard_rate_limit;
}
if (result.compaction_filter) {
Log(result.info_log, "Compaction filter specified, ignore factory");
}
if (result.prefix_extractor) {
// If a prefix extractor has been supplied and a HashSkipListRepFactory is
// being used, make sure that the latter uses the former as its transform
// function.
auto factory = dynamic_cast<HashSkipListRepFactory*>(
result.memtable_factory.get());
if (factory &&
factory->GetTransform() != result.prefix_extractor) {
Log(result.info_log, "A prefix hash representation factory was supplied "
"whose prefix extractor does not match options.prefix_extractor. "
"Falling back to skip list representation factory");
result.memtable_factory = std::make_shared<SkipListFactory>();
} else if (factory) {
Log(result.info_log, "Prefix hash memtable rep is in use.");
}
}
if (result.wal_dir.empty()) { if (result.wal_dir.empty()) {
// Use dbname as default // Use dbname as default
result.wal_dir = dbname; result.wal_dir = dbname;
} }
// -- Sanitize the table properties collector
// All user defined properties collectors will be wrapped by
// UserKeyTablePropertiesCollector since for them they only have the
// knowledge of the user keys; internal keys are invisible to them.
auto& collectors = result.table_properties_collectors;
for (size_t i = 0; i < result.table_properties_collectors.size(); ++i) {
assert(collectors[i]);
collectors[i] =
std::make_shared<UserKeyTablePropertiesCollector>(collectors[i]);
}
// Add collector to collect internal key statistics
collectors.push_back(
std::make_shared<InternalKeyPropertiesCollector>()
);
return result; return result;
} }
@ -1979,6 +1927,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
c.reset(cfd->PickCompaction()); c.reset(cfd->PickCompaction());
if (c != nullptr) { if (c != nullptr) {
// update statistics
MeasureTime(options_.statistics.get(), NUM_FILES_IN_SINGLE_COMPACTION,
c->inputs(0)->size());
break; break;
} }
} }

@ -510,7 +510,7 @@ extern Options SanitizeOptions(const std::string& db,
const InternalKeyComparator* icmp, const InternalKeyComparator* icmp,
const InternalFilterPolicy* ipolicy, const InternalFilterPolicy* ipolicy,
const Options& src); const Options& src);
extern DBOptions SanitizeOptions(const std::string& db, const DBOptions& src);
// Determine compression type, based on user options, level of the output // Determine compression type, based on user options, level of the output
// file and whether compression is disabled. // file and whether compression is disabled.

@ -1374,7 +1374,7 @@ class VersionSet::Builder {
VersionSet::VersionSet(const std::string& dbname, const Options* options, VersionSet::VersionSet(const std::string& dbname, const Options* options,
const EnvOptions& storage_options, const EnvOptions& storage_options,
TableCache* table_cache) TableCache* table_cache)
: column_family_set_(new ColumnFamilySet()), : column_family_set_(new ColumnFamilySet(options->info_log.get())),
env_(options->env), env_(options->env),
dbname_(dbname), dbname_(dbname),
options_(options), options_(options),

@ -118,12 +118,6 @@ struct ColumnFamilyOptions {
// Default: a factory that doesn't provide any object // Default: a factory that doesn't provide any object
std::shared_ptr<CompactionFilterFactory> compaction_filter_factory; std::shared_ptr<CompactionFilterFactory> compaction_filter_factory;
// Any internal progress/error information generated by the db will
// be written to info_log if it is non-nullptr, or to a file stored
// in the same directory as the DB contents if info_log is nullptr.
// Default: nullptr
shared_ptr<Logger> info_log;
// ------------------- // -------------------
// Parameters that affect performance // Parameters that affect performance
@ -322,11 +316,6 @@ struct ColumnFamilyOptions {
// stop building a single file in a level->level+1 compaction. // stop building a single file in a level->level+1 compaction.
int max_grandparent_overlap_factor; int max_grandparent_overlap_factor;
// If non-null, then we should collect metrics about database operations
// Statistics objects should not be shared between DB instances as
// it does not use any locks to prevent concurrent updates.
shared_ptr<Statistics> statistics;
// Disable compaction triggered by seek. // Disable compaction triggered by seek.
// With bloomfilter and fast storage, a miss on one level // With bloomfilter and fast storage, a miss on one level
// is very cheap if the file handle is cached in table cache // is very cheap if the file handle is cached in table cache
@ -356,18 +345,6 @@ struct ColumnFamilyOptions {
// Default: false // Default: false
bool no_block_cache; bool no_block_cache;
// Number of shards used for table cache.
int table_cache_numshardbits;
// During data eviction of table's LRU cache, it would be inefficient
// to strictly follow LRU because this piece of memory will not really
// be released unless its refcount falls to zero. Instead, make two
// passes: the first pass will release items with refcount = 1,
// and if not enough space releases after scanning the number of
// elements specified by this parameter, we will remove items in LRU
// order.
int table_cache_remove_scan_count_limit;
// size of one block in arena memory allocation. // size of one block in arena memory allocation.
// If <= 0, a proper value is automatically calculated (usually 1/10 of // If <= 0, a proper value is automatically calculated (usually 1/10 of
// writer_buffer_size). // writer_buffer_size).
@ -489,6 +466,12 @@ struct DBOptions {
// Default: Env::Default() // Default: Env::Default()
Env* env; Env* env;
// Any internal progress/error information generated by the db will
// be written to info_log if it is non-nullptr, or to a file stored
// in the same directory as the DB contents if info_log is nullptr.
// Default: nullptr
shared_ptr<Logger> info_log;
// Number of open files that can be used by the DB. You may need to // Number of open files that can be used by the DB. You may need to
// increase this if your database has a large working set (budget // increase this if your database has a large working set (budget
// one open file per 2MB of working set). // one open file per 2MB of working set).
@ -496,6 +479,11 @@ struct DBOptions {
// Default: 1000 // Default: 1000
int max_open_files; int max_open_files;
// If non-null, then we should collect metrics about database operations
// Statistics objects should not be shared between DB instances as
// it does not use any locks to prevent concurrent updates.
shared_ptr<Statistics> statistics;
// If true, then the contents of data files are not synced // If true, then the contents of data files are not synced
// to stable storage. Their contents remain in the OS buffers till the // to stable storage. Their contents remain in the OS buffers till the
// OS decides to flush them. This option is good for bulk-loading // OS decides to flush them. This option is good for bulk-loading
@ -577,6 +565,18 @@ struct DBOptions {
// The default value is MAX_INT so that roll-over does not take place. // The default value is MAX_INT so that roll-over does not take place.
uint64_t max_manifest_file_size; uint64_t max_manifest_file_size;
// Number of shards used for table cache.
int table_cache_numshardbits;
// During data eviction of table's LRU cache, it would be inefficient
// to strictly follow LRU because this piece of memory will not really
// be released unless its refcount falls to zero. Instead, make two
// passes: the first pass will release items with refcount = 1,
// and if not enough space releases after scanning the number of
// elements specified by this parameter, we will remove items in LRU
// order.
int table_cache_remove_scan_count_limit;
// The following two fields affect how archived logs will be deleted. // The following two fields affect how archived logs will be deleted.
// 1. If both set to 0, logs will be deleted asap and will not get into // 1. If both set to 0, logs will be deleted asap and will not get into
// the archive. // the archive.

@ -77,7 +77,7 @@ Status CreateLoggerFromOptions(
const std::string& dbname, const std::string& dbname,
const std::string& db_log_dir, const std::string& db_log_dir,
Env* env, Env* env,
const Options& options, const DBOptions& options,
std::shared_ptr<Logger>* logger) { std::shared_ptr<Logger>* logger) {
std::string db_absolute_path; std::string db_absolute_path;
env->GetAbsolutePath(dbname, &db_absolute_path); env->GetAbsolutePath(dbname, &db_absolute_path);

@ -84,7 +84,7 @@ Status CreateLoggerFromOptions(
const std::string& dbname, const std::string& dbname,
const std::string& db_log_dir, const std::string& db_log_dir,
Env* env, Env* env,
const Options& options, const DBOptions& options,
std::shared_ptr<Logger>* logger); std::shared_ptr<Logger>* logger);
} // namespace rocksdb } // namespace rocksdb

@ -191,7 +191,7 @@ TEST(AutoRollLoggerTest, CompositeRollByTimeAndSizeLogger) {
} }
TEST(AutoRollLoggerTest, CreateLoggerFromOptions) { TEST(AutoRollLoggerTest, CreateLoggerFromOptions) {
Options options; DBOptions options;
shared_ptr<Logger> logger; shared_ptr<Logger> logger;
// Normal logger // Normal logger

@ -32,7 +32,6 @@ ColumnFamilyOptions::ColumnFamilyOptions()
compaction_filter_factory( compaction_filter_factory(
std::shared_ptr<CompactionFilterFactory>( std::shared_ptr<CompactionFilterFactory>(
new DefaultCompactionFilterFactory())), new DefaultCompactionFilterFactory())),
info_log(nullptr),
write_buffer_size(4<<20), write_buffer_size(4<<20),
max_write_buffer_number(2), max_write_buffer_number(2),
min_write_buffer_number_to_merge(1), min_write_buffer_number_to_merge(1),
@ -57,14 +56,11 @@ ColumnFamilyOptions::ColumnFamilyOptions()
expanded_compaction_factor(25), expanded_compaction_factor(25),
source_compaction_factor(1), source_compaction_factor(1),
max_grandparent_overlap_factor(10), max_grandparent_overlap_factor(10),
statistics(nullptr),
disable_seek_compaction(false), disable_seek_compaction(false),
soft_rate_limit(0.0), soft_rate_limit(0.0),
hard_rate_limit(0.0), hard_rate_limit(0.0),
rate_limit_delay_max_milliseconds(1000), rate_limit_delay_max_milliseconds(1000),
no_block_cache(false), no_block_cache(false),
table_cache_numshardbits(4),
table_cache_remove_scan_count_limit(16),
arena_block_size(0), arena_block_size(0),
disable_auto_compactions(false), disable_auto_compactions(false),
purge_redundant_kvs_while_flush(true), purge_redundant_kvs_while_flush(true),
@ -86,7 +82,6 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
merge_operator(options.merge_operator), merge_operator(options.merge_operator),
compaction_filter(options.compaction_filter), compaction_filter(options.compaction_filter),
compaction_filter_factory(options.compaction_filter_factory), compaction_filter_factory(options.compaction_filter_factory),
info_log(options.info_log),
write_buffer_size(options.write_buffer_size), write_buffer_size(options.write_buffer_size),
max_write_buffer_number(options.max_write_buffer_number), max_write_buffer_number(options.max_write_buffer_number),
min_write_buffer_number_to_merge( min_write_buffer_number_to_merge(
@ -116,16 +111,12 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
expanded_compaction_factor(options.expanded_compaction_factor), expanded_compaction_factor(options.expanded_compaction_factor),
source_compaction_factor(options.source_compaction_factor), source_compaction_factor(options.source_compaction_factor),
max_grandparent_overlap_factor(options.max_grandparent_overlap_factor), max_grandparent_overlap_factor(options.max_grandparent_overlap_factor),
statistics(options.statistics),
disable_seek_compaction(options.disable_seek_compaction), disable_seek_compaction(options.disable_seek_compaction),
soft_rate_limit(options.soft_rate_limit), soft_rate_limit(options.soft_rate_limit),
hard_rate_limit(options.hard_rate_limit), hard_rate_limit(options.hard_rate_limit),
rate_limit_delay_max_milliseconds( rate_limit_delay_max_milliseconds(
options.rate_limit_delay_max_milliseconds), options.rate_limit_delay_max_milliseconds),
no_block_cache(options.no_block_cache), no_block_cache(options.no_block_cache),
table_cache_numshardbits(options.table_cache_numshardbits),
table_cache_remove_scan_count_limit(
options.table_cache_remove_scan_count_limit),
arena_block_size(options.arena_block_size), arena_block_size(options.arena_block_size),
disable_auto_compactions(options.disable_auto_compactions), disable_auto_compactions(options.disable_auto_compactions),
purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush), purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush),
@ -149,7 +140,9 @@ DBOptions::DBOptions()
error_if_exists(false), error_if_exists(false),
paranoid_checks(false), paranoid_checks(false),
env(Env::Default()), env(Env::Default()),
info_log(nullptr),
max_open_files(1000), max_open_files(1000),
statistics(nullptr),
disableDataSync(false), disableDataSync(false),
use_fsync(false), use_fsync(false),
db_stats_log_interval(1800), db_stats_log_interval(1800),
@ -162,6 +155,8 @@ DBOptions::DBOptions()
log_file_time_to_roll(0), log_file_time_to_roll(0),
keep_log_file_num(1000), keep_log_file_num(1000),
max_manifest_file_size(std::numeric_limits<uint64_t>::max()), max_manifest_file_size(std::numeric_limits<uint64_t>::max()),
table_cache_numshardbits(4),
table_cache_remove_scan_count_limit(16),
WAL_ttl_seconds(0), WAL_ttl_seconds(0),
WAL_size_limit_MB(0), WAL_size_limit_MB(0),
manifest_preallocation_size(4 * 1024 * 1024), manifest_preallocation_size(4 * 1024 * 1024),
@ -181,7 +176,9 @@ DBOptions::DBOptions(const Options& options)
error_if_exists(options.error_if_exists), error_if_exists(options.error_if_exists),
paranoid_checks(options.paranoid_checks), paranoid_checks(options.paranoid_checks),
env(options.env), env(options.env),
info_log(options.info_log),
max_open_files(options.max_open_files), max_open_files(options.max_open_files),
statistics(options.statistics),
disableDataSync(options.disableDataSync), disableDataSync(options.disableDataSync),
use_fsync(options.use_fsync), use_fsync(options.use_fsync),
db_stats_log_interval(options.db_stats_log_interval), db_stats_log_interval(options.db_stats_log_interval),
@ -195,6 +192,9 @@ DBOptions::DBOptions(const Options& options)
log_file_time_to_roll(options.log_file_time_to_roll), log_file_time_to_roll(options.log_file_time_to_roll),
keep_log_file_num(options.keep_log_file_num), keep_log_file_num(options.keep_log_file_num),
max_manifest_file_size(options.max_manifest_file_size), max_manifest_file_size(options.max_manifest_file_size),
table_cache_numshardbits(options.table_cache_numshardbits),
table_cache_remove_scan_count_limit(
options.table_cache_remove_scan_count_limit),
WAL_ttl_seconds(options.WAL_ttl_seconds), WAL_ttl_seconds(options.WAL_ttl_seconds),
WAL_size_limit_MB(options.WAL_size_limit_MB), WAL_size_limit_MB(options.WAL_size_limit_MB),
manifest_preallocation_size(options.manifest_preallocation_size), manifest_preallocation_size(options.manifest_preallocation_size),

@ -344,7 +344,7 @@ class BackupableDBTest {
options_.wal_dir = dbname_; options_.wal_dir = dbname_;
// set up backup db options // set up backup db options
CreateLoggerFromOptions(dbname_, backupdir_, env_, CreateLoggerFromOptions(dbname_, backupdir_, env_,
Options(), &logger_); DBOptions(), &logger_);
backupable_options_.reset(new BackupableDBOptions( backupable_options_.reset(new BackupableDBOptions(
backupdir_, test_backup_env_.get(), true, logger_.get(), true)); backupdir_, test_backup_env_.get(), true, logger_.get(), true));

Loading…
Cancel
Save