merge master to resolve merge conflicts

main
Chris Riccomini 10 years ago
commit 378f321da2
  1. 2
      INSTALL.md
  2. 18
      Makefile
  3. 7
      build_tools/build_detect_platform
  4. 14
      build_tools/version.sh
  5. 75
      db/column_family.cc
  6. 16
      db/column_family.h
  7. 7
      db/compaction.cc
  8. 8
      db/compaction.h
  9. 240
      db/compaction_picker.cc
  10. 104
      db/compaction_picker.h
  11. 2
      db/corruption_test.cc
  12. 1
      db/cuckoo_table_db_test.cc
  13. 164
      db/db_impl.cc
  14. 30
      db/db_impl.h
  15. 1
      db/db_iter.cc
  16. 115
      db/db_test.cc
  17. 1
      db/deletefile_test.cc
  18. 45
      db/forward_iterator.cc
  19. 1
      db/forward_iterator.h
  20. 6
      db/log_and_apply_bench.cc
  21. 1
      db/memtable.cc
  22. 5
      db/memtable_list.cc
  23. 3
      db/memtable_list.h
  24. 2
      db/repair.cc
  25. 54
      db/version_set.cc
  26. 17
      db/version_set.h
  27. 5
      db/write_batch_test.cc
  28. 3
      include/rocksdb/immutable_options.h
  29. 3
      include/rocksdb/version.h
  30. 1
      java/Makefile
  31. 3
      java/RocksDBSample.java
  32. 52
      java/org/rocksdb/BlockBasedTableConfig.java
  33. 27
      java/org/rocksdb/test/FilterTest.java
  34. 9
      java/rocksjni/table.cc
  35. 3
      java/rocksjni/write_batch.cc
  36. 1
      table/block_based_table_builder.cc
  37. 2
      table/bloom_block.cc
  38. 2
      table/bloom_block.h
  39. 4
      table/cuckoo_table_reader.cc
  40. 16
      table/format.cc
  41. 4
      table/plain_table_factory.cc
  42. 17
      table/table_test.cc
  43. 2
      util/cache_test.cc
  44. 6
      util/ldb_cmd.cc
  45. 11
      util/ldb_cmd_execute_result.h
  46. 72
      util/mutable_cf_options.cc
  47. 67
      util/mutable_cf_options.h
  48. 4
      util/options.cc
  49. 90
      util/options_helper.cc
  50. 1
      util/signal_test.cc
  51. 2
      utilities/backupable/backupable_db_test.cc
  52. 5
      utilities/document/document_db.cc
  53. 2
      utilities/document/document_db_test.cc
  54. 2
      utilities/spatialdb/spatial_db.cc
  55. 2
      utilities/ttl/db_ttl_impl.h
  56. 12
      utilities/ttl/ttl_test.cc

@ -85,4 +85,4 @@ SSE4.2 is used to speed up CRC32 when calculating data checksum.
We did not run any production workloads on it.
* **iOS**:
* Run: `TARGET_OS=IOS make static_lib`
* Run: `TARGET_OS=IOS make static_lib`. When building the project which uses rocksdb iOS library, make sure to define two important pre-processing macros: `ROCKSDB_LITE` and `IOS_CROSS_COMPILE`.

@ -179,22 +179,26 @@ ifneq ($(PLATFORM_SHARED_VERSIONED),true)
SHARED1 = ${LIBNAME}.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1)
SHARED3 = $(SHARED1)
SHARED4 = $(SHARED1)
SHARED = $(SHARED1)
else
# Update db.h if you change these.
SHARED_MAJOR = $(ROCKSDB_MAJOR)
SHARED_MINOR = $(ROCKSDB_MINOR)
SHARED_PATCH = $(ROCKSDB_PATCH)
SHARED1 = ${LIBNAME}.$(PLATFORM_SHARED_EXT)
SHARED2 = $(SHARED1).$(SHARED_MAJOR)
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
SHARED = $(SHARED1) $(SHARED2) $(SHARED3)
$(SHARED1): $(SHARED3)
ln -fs $(SHARED3) $(SHARED1)
$(SHARED2): $(SHARED3)
ln -fs $(SHARED3) $(SHARED2)
SHARED4 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR).$(SHARED_PATCH)
SHARED = $(SHARED1) $(SHARED2) $(SHARED3) $(SHARED4)
$(SHARED1): $(SHARED4)
ln -fs $(SHARED4) $(SHARED1)
$(SHARED2): $(SHARED4)
ln -fs $(SHARED4) $(SHARED2)
$(SHARED3): $(SHARED4)
ln -fs $(SHARED4) $(SHARED3)
endif
$(SHARED3):
$(SHARED4):
$(CXX) $(PLATFORM_SHARED_LDFLAGS)$(SHARED2) $(CXXFLAGS) $(PLATFORM_SHARED_CFLAGS) $(SOURCES) $(LDFLAGS) -o $@
endif # PLATFORM_SHARED_EXT

@ -326,6 +326,10 @@ PLATFORM_CXXFLAGS="$PLATFORM_CXXFLAGS $COMMON_FLAGS"
VALGRIND_VER="$VALGRIND_VER"
ROCKSDB_MAJOR=`build_tools/version.sh major`
ROCKSDB_MINOR=`build_tools/version.sh minor`
ROCKSDB_PATCH=`build_tools/version.sh patch`
echo "CC=$CC" >> "$OUTPUT"
echo "CXX=$CXX" >> "$OUTPUT"
echo "PLATFORM=$PLATFORM" >> "$OUTPUT"
@ -341,3 +345,6 @@ echo "PLATFORM_SHARED_VERSIONED=$PLATFORM_SHARED_VERSIONED" >> "$OUTPUT"
echo "EXEC_LDFLAGS=$EXEC_LDFLAGS" >> "$OUTPUT"
echo "JEMALLOC_INCLUDE=$JEMALLOC_INCLUDE" >> "$OUTPUT"
echo "JEMALLOC_LIB=$JEMALLOC_LIB" >> "$OUTPUT"
echo "ROCKSDB_MAJOR=$ROCKSDB_MAJOR" >> "$OUTPUT"
echo "ROCKSDB_MINOR=$ROCKSDB_MINOR" >> "$OUTPUT"
echo "ROCKSDB_PATCH=$ROCKSDB_PATCH" >> "$OUTPUT"

@ -0,0 +1,14 @@
#!/bin/sh
if [ $# == 0 ]; then
echo "Usage: $0 major|minor|patch"
exit 1
fi
if [ $1 = "major" ]; then
cat include/rocksdb/version.h | grep MAJOR | head -n1 | awk '{print $3}'
fi
if [ $1 = "minor" ]; then
cat include/rocksdb/version.h | grep MINOR | head -n1 | awk '{print $3}'
fi
if [ $1 = "patch" ]; then
cat include/rocksdb/version.h | grep PATCH | head -n1 | awk '{print $3}'
fi

@ -230,7 +230,7 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
internal_comparator_(cf_options.comparator),
options_(*db_options, SanitizeOptions(&internal_comparator_, cf_options)),
ioptions_(options_),
mutable_cf_options_(options_),
mutable_cf_options_(options_, ioptions_),
mem_(nullptr),
imm_(options_.min_write_buffer_number_to_merge),
super_version_(nullptr),
@ -245,27 +245,27 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
// if dummy_versions is nullptr, then this is a dummy column family.
if (dummy_versions != nullptr) {
internal_stats_.reset(
new InternalStats(options_.num_levels, db_options->env, this));
new InternalStats(ioptions_.num_levels, db_options->env, this));
table_cache_.reset(new TableCache(ioptions_, env_options, table_cache));
if (options_.compaction_style == kCompactionStyleUniversal) {
if (ioptions_.compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset(
new UniversalCompactionPicker(&options_, &internal_comparator_));
} else if (options_.compaction_style == kCompactionStyleLevel) {
new UniversalCompactionPicker(ioptions_, &internal_comparator_));
} else if (ioptions_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(
new LevelCompactionPicker(&options_, &internal_comparator_));
new LevelCompactionPicker(ioptions_, &internal_comparator_));
} else {
assert(options_.compaction_style == kCompactionStyleFIFO);
assert(ioptions_.compaction_style == kCompactionStyleFIFO);
compaction_picker_.reset(
new FIFOCompactionPicker(&options_, &internal_comparator_));
new FIFOCompactionPicker(ioptions_, &internal_comparator_));
}
Log(options_.info_log, "Options for column family \"%s\":\n",
Log(ioptions_.info_log, "Options for column family \"%s\":\n",
name.c_str());
const ColumnFamilyOptions* cf_options = &options_;
cf_options->Dump(options_.info_log.get());
cf_options->Dump(ioptions_.info_log);
}
RecalculateWriteStallConditions();
RecalculateWriteStallConditions(mutable_cf_options_);
}
// DB mutex held
@ -318,7 +318,8 @@ ColumnFamilyData::~ColumnFamilyData() {
}
}
void ColumnFamilyData::RecalculateWriteStallConditions() {
void ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) {
if (current_ != nullptr) {
const double score = current_->MaxCompactionScore();
const int max_level = current_->MaxCompactionScoreLevel();
@ -328,26 +329,27 @@ void ColumnFamilyData::RecalculateWriteStallConditions() {
if (imm()->size() == options_.max_write_buffer_number) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stopping writes because we have %d immutable memtables "
"(waiting for flush)",
name_.c_str(), imm()->size());
} else if (current_->NumLevelFiles(0) >=
options_.level0_stop_writes_trigger) {
mutable_cf_options.level0_stop_writes_trigger) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), current_->NumLevelFiles(0));
} else if (options_.level0_slowdown_writes_trigger >= 0 &&
} else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
current_->NumLevelFiles(0) >=
options_.level0_slowdown_writes_trigger) {
mutable_cf_options.level0_slowdown_writes_trigger) {
uint64_t slowdown = SlowdownAmount(
current_->NumLevelFiles(0), options_.level0_slowdown_writes_trigger,
options_.level0_stop_writes_trigger);
current_->NumLevelFiles(0),
mutable_cf_options.level0_slowdown_writes_trigger,
mutable_cf_options.level0_stop_writes_trigger);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stalling writes because we have %d level-0 files (%" PRIu64
"us)",
name_.c_str(), current_->NumLevelFiles(0), slowdown);
@ -358,7 +360,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions() {
write_controller->GetDelayToken(kHardLimitSlowdown);
internal_stats_->RecordLevelNSlowdown(max_level, kHardLimitSlowdown,
false);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stalling writes because we hit hard limit on level %d. "
"(%" PRIu64 "us)",
name_.c_str(), max_level, kHardLimitSlowdown);
@ -368,7 +370,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions() {
options_.hard_rate_limit);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64
"us)",
name_.c_str(), max_level, slowdown);
@ -393,19 +395,21 @@ void ColumnFamilyData::CreateNewMemtable(const MemTableOptions& moptions) {
mem_->Ref();
}
Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) {
auto result = compaction_picker_->PickCompaction(current_, log_buffer);
Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
auto result = compaction_picker_->PickCompaction(
mutable_options, current_, log_buffer);
return result;
}
Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level,
uint32_t output_path_id,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
return compaction_picker_->CompactRange(current_, input_level, output_level,
output_path_id, begin, end,
compaction_end);
Compaction* ColumnFamilyData::CompactRange(
const MutableCFOptions& mutable_cf_options,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) {
return compaction_picker_->CompactRange(
mutable_cf_options, current_, input_level, output_level,
output_path_id, begin, end, compaction_end);
}
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
@ -443,11 +447,11 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
sv = static_cast<SuperVersion*>(ptr);
if (sv == SuperVersion::kSVObsolete ||
sv->version_number != super_version_number_.load()) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_ACQUIRES);
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
SuperVersion* sv_to_delete = nullptr;
if (sv && sv->Unref()) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_CLEANUPS);
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
db_mutex->Lock();
// NOTE: underlying resources held by superversion (sst files) might
// not be released until the next background job.
@ -502,7 +506,7 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion(
// Reset SuperVersions cached in thread local storage
ResetThreadLocalSuperVersions();
RecalculateWriteStallConditions();
RecalculateWriteStallConditions(mutable_cf_options);
if (old_superversion != nullptr && old_superversion->Unref()) {
old_superversion->Cleanup();
@ -533,6 +537,7 @@ bool ColumnFamilyData::SetOptions(
if (GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
&new_mutable_cf_options)) {
mutable_cf_options_ = new_mutable_cf_options;
mutable_cf_options_.RefreshDerivedOptions(ioptions_);
return true;
}
return false;

@ -203,11 +203,14 @@ class ColumnFamilyData {
TableCache* table_cache() const { return table_cache_.get(); }
// See documentation in compaction_picker.h
Compaction* PickCompaction(LogBuffer* log_buffer);
Compaction* CompactRange(int input_level, int output_level,
uint32_t output_path_id, const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end);
// REQUIRES: DB mutex held
Compaction* PickCompaction(const MutableCFOptions& mutable_options,
LogBuffer* log_buffer);
Compaction* CompactRange(
const MutableCFOptions& mutable_cf_options,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end);
CompactionPicker* compaction_picker() { return compaction_picker_.get(); }
// thread-safe
@ -260,7 +263,8 @@ class ColumnFamilyData {
// recalculation of compaction score. These values are used in
// DBImpl::MakeRoomForWrite function to decide, if it need to make
// a write stall
void RecalculateWriteStallConditions();
void RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options);
uint32_t id_;
const std::string name_;

@ -56,7 +56,6 @@ Compaction::Compaction(Version* input_version, int start_level, int out_level,
is_full_compaction_(false),
is_manual_compaction_(false),
level_ptrs_(std::vector<size_t>(number_levels_)) {
cfd_->Ref();
input_version_->Ref();
edit_ = new VersionEdit();
@ -267,12 +266,12 @@ void Compaction::Summary(char* output, int len) {
snprintf(output + write, len - write, "]");
}
uint64_t Compaction::OutputFilePreallocationSize() {
uint64_t Compaction::OutputFilePreallocationSize(
const MutableCFOptions& mutable_options) {
uint64_t preallocation_size = 0;
if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
preallocation_size =
cfd_->compaction_picker()->MaxFileSizeForLevel(output_level());
preallocation_size = mutable_options.MaxFileSizeForLevel(output_level());
} else {
for (int level = 0; level < num_input_levels(); ++level) {
for (const auto& f : inputs_[level].files) {

@ -10,6 +10,7 @@
#pragma once
#include "util/arena.h"
#include "util/autovector.h"
#include "util/mutable_cf_options.h"
#include "db/version_set.h"
namespace rocksdb {
@ -151,10 +152,14 @@ class Compaction {
// Was this compaction triggered manually by the client?
bool IsManualCompaction() { return is_manual_compaction_; }
// Return the MutableCFOptions that should be used throughout the compaction
// procedure
const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; }
// Returns the size in bytes that the output file should be preallocated to.
// In level compaction, that is max_file_size_. In universal compaction, that
// is the sum of all input file sizes.
uint64_t OutputFilePreallocationSize();
uint64_t OutputFilePreallocationSize(const MutableCFOptions& mutable_options);
private:
friend class CompactionPicker;
@ -171,6 +176,7 @@ class Compaction {
const int output_level_; // levels to which output files are stored
uint64_t max_output_file_size_;
uint64_t max_grandparent_overlap_bytes_;
MutableCFOptions mutable_cf_options_;
Version* input_version_;
VersionEdit* edit_;
int number_levels_;

@ -35,70 +35,36 @@ namespace {
// If enable_compression is false, then compression is always disabled no
// matter what the values of the other two parameters are.
// Otherwise, the compression type is determined based on options and level.
CompressionType GetCompressionType(const Options& options, int level,
const bool enable_compression = true) {
CompressionType GetCompressionType(
const ImmutableCFOptions& ioptions, int level,
const bool enable_compression = true) {
if (!enable_compression) {
// disable compression
return kNoCompression;
}
// If the use has specified a different compression level for each level,
// then pick the compression for that level.
if (!options.compression_per_level.empty()) {
const int n = options.compression_per_level.size() - 1;
if (!ioptions.compression_per_level.empty()) {
const int n = ioptions.compression_per_level.size() - 1;
// It is possible for level_ to be -1; in that case, we use level
// 0's compression. This occurs mostly in backwards compatibility
// situations when the builder doesn't know what level the file
// belongs to. Likewise, if level is beyond the end of the
// specified compression levels, use the last value.
return options.compression_per_level[std::max(0, std::min(level, n))];
return ioptions.compression_per_level[std::max(0, std::min(level, n))];
} else {
return options.compression;
return ioptions.compression;
}
}
// Multiple two operands. If they overflow, return op1.
uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) {
if (op1 == 0) {
return 0;
}
if (op2 <= 0) {
return op1;
}
uint64_t casted_op2 = (uint64_t) op2;
if (std::numeric_limits<uint64_t>::max() / op1 < casted_op2) {
return op1;
}
return op1 * casted_op2;
}
} // anonymous namespace
CompactionPicker::CompactionPicker(const Options* options,
CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: compactions_in_progress_(options->num_levels),
options_(options),
num_levels_(options->num_levels),
: ioptions_(ioptions),
compactions_in_progress_(ioptions_.num_levels),
icmp_(icmp) {
max_file_size_.reset(new uint64_t[NumberLevels()]);
level_max_bytes_.reset(new uint64_t[NumberLevels()]);
int target_file_size_multiplier = options_->target_file_size_multiplier;
int max_bytes_multiplier = options_->max_bytes_for_level_multiplier;
for (int i = 0; i < NumberLevels(); i++) {
if (i == 0 && options_->compaction_style == kCompactionStyleUniversal) {
max_file_size_[i] = ULLONG_MAX;
level_max_bytes_[i] = options_->max_bytes_for_level_base;
} else if (i > 1) {
max_file_size_[i] = MultiplyCheckOverflow(max_file_size_[i - 1],
target_file_size_multiplier);
level_max_bytes_[i] = MultiplyCheckOverflow(
MultiplyCheckOverflow(level_max_bytes_[i - 1], max_bytes_multiplier),
options_->max_bytes_for_level_multiplier_additional[i - 1]);
} else {
max_file_size_[i] = options_->target_file_size_base;
level_max_bytes_[i] = options_->max_bytes_for_level_base;
}
}
}
CompactionPicker::~CompactionPicker() {}
@ -126,26 +92,6 @@ void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) {
}
}
uint64_t CompactionPicker::MaxFileSizeForLevel(int level) const {
assert(level >= 0);
assert(level < NumberLevels());
return max_file_size_[level];
}
uint64_t CompactionPicker::MaxGrandParentOverlapBytes(int level) {
uint64_t result = MaxFileSizeForLevel(level);
result *= options_->max_grandparent_overlap_factor;
return result;
}
double CompactionPicker::MaxBytesForLevel(int level) {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
assert(level >= 0);
assert(level < NumberLevels());
return level_max_bytes_[level];
}
void CompactionPicker::GetRange(const std::vector<FileMetaData*>& inputs,
InternalKey* smallest, InternalKey* largest) {
assert(!inputs.empty());
@ -214,7 +160,7 @@ bool CompactionPicker::ExpandWhileOverlapping(Compaction* c) {
// compaction, then we must drop/cancel this compaction.
int parent_index = -1;
if (c->inputs_[0].empty()) {
Log(options_->info_log,
Log(ioptions_.info_log,
"[%s] ExpandWhileOverlapping() failure because zero input files",
c->column_family_data()->GetName().c_str());
}
@ -229,12 +175,6 @@ bool CompactionPicker::ExpandWhileOverlapping(Compaction* c) {
return true;
}
uint64_t CompactionPicker::ExpandedCompactionByteSizeLimit(int level) {
uint64_t result = MaxFileSizeForLevel(level);
result *= options_->expanded_compaction_factor;
return result;
}
// Returns true if any one of specified files are being compacted
bool CompactionPicker::FilesInCompaction(std::vector<FileMetaData*>& files) {
for (unsigned int i = 0; i < files.size(); i++) {
@ -262,7 +202,8 @@ bool CompactionPicker::ParentRangeInCompaction(Version* version,
// Will also attempt to expand "level" if that doesn't expand "level+1"
// or cause "level" to include a file for compaction that has an overlapping
// user-key with another file.
void CompactionPicker::SetupOtherInputs(Compaction* c) {
void CompactionPicker::SetupOtherInputs(
const MutableCFOptions& mutable_cf_options, Compaction* c) {
// If inputs are empty, then there is nothing to expand.
// If both input and output levels are the same, no need to consider
// files at level "level+1"
@ -298,7 +239,7 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) {
const uint64_t inputs0_size = TotalCompensatedFileSize(c->inputs_[0].files);
const uint64_t inputs1_size = TotalCompensatedFileSize(c->inputs_[1].files);
const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0);
uint64_t limit = ExpandedCompactionByteSizeLimit(level);
uint64_t limit = mutable_cf_options.ExpandedCompactionByteSizeLimit(level);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size < limit &&
!FilesInCompaction(expanded0) &&
@ -311,7 +252,7 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) {
&c->parent_index_);
if (expanded1.size() == c->inputs_[1].size() &&
!FilesInCompaction(expanded1)) {
Log(options_->info_log,
Log(ioptions_.info_log,
"[%s] Expanding@%d %zu+%zu (%" PRIu64 "+%" PRIu64
" bytes) to %zu+%zu (%" PRIu64 "+%" PRIu64 "bytes)\n",
c->column_family_data()->GetName().c_str(), level,
@ -336,21 +277,20 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) {
}
}
Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
int output_level,
uint32_t output_path_id,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
Compaction* CompactionPicker::CompactRange(
const MutableCFOptions& mutable_cf_options, Version* version,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) {
// CompactionPickerFIFO has its own implementation of compact range
assert(options_->compaction_style != kCompactionStyleFIFO);
assert(ioptions_.compaction_style != kCompactionStyleFIFO);
std::vector<FileMetaData*> inputs;
bool covering_the_whole_range = true;
// All files are 'overlapping' in universal style compaction.
// We have to compact the entire range in one shot.
if (options_->compaction_style == kCompactionStyleUniversal) {
if (ioptions_.compaction_style == kCompactionStyleUniversal) {
begin = nullptr;
end = nullptr;
}
@ -364,8 +304,8 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
// and we must not pick one file and drop another older file if the
// two files overlap.
if (input_level > 0) {
const uint64_t limit =
MaxFileSizeForLevel(input_level) * options_->source_compaction_factor;
const uint64_t limit = mutable_cf_options.MaxFileSizeForLevel(input_level) *
mutable_cf_options.source_compaction_factor;
uint64_t total = 0;
for (size_t i = 0; i + 1 < inputs.size(); ++i) {
uint64_t s = inputs[i]->compensated_file_size;
@ -378,22 +318,24 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
}
}
}
assert(output_path_id < static_cast<uint32_t>(options_->db_paths.size()));
assert(output_path_id < static_cast<uint32_t>(ioptions_.db_paths.size()));
Compaction* c = new Compaction(
version, input_level, output_level, MaxFileSizeForLevel(output_level),
MaxGrandParentOverlapBytes(input_level), output_path_id,
GetCompressionType(*options_, output_level));
version, input_level, output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level),
mutable_cf_options.MaxGrandParentOverlapBytes(input_level),
output_path_id,
GetCompressionType(ioptions_, output_level));
c->inputs_[0].files = inputs;
if (ExpandWhileOverlapping(c) == false) {
delete c;
Log(options_->info_log,
Log(ioptions_.info_log,
"[%s] Could not compact due to expansion failure.\n",
version->cfd_->GetName().c_str());
return nullptr;
}
SetupOtherInputs(c);
SetupOtherInputs(mutable_cf_options, c);
if (covering_the_whole_range) {
*compaction_end = nullptr;
@ -408,12 +350,14 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
c->SetupBottomMostLevel(true);
c->is_manual_compaction_ = true;
c->mutable_cf_options_ = mutable_cf_options;
return c;
}
Compaction* LevelCompactionPicker::PickCompaction(Version* version,
LogBuffer* log_buffer) {
Compaction* LevelCompactionPicker::PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) {
Compaction* c = nullptr;
int level = -1;
@ -421,7 +365,7 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version,
// and also in LogAndApply(), otherwise the values could be stale.
std::vector<uint64_t> size_being_compacted(NumberLevels() - 1);
SizeBeingCompacted(size_being_compacted);
version->ComputeCompactionScore(size_being_compacted);
version->ComputeCompactionScore(mutable_cf_options, size_being_compacted);
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
@ -432,7 +376,8 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version,
version->compaction_score_[i] <= version->compaction_score_[i - 1]);
level = version->compaction_level_[i];
if ((version->compaction_score_[i] >= 1)) {
c = PickCompactionBySize(version, level, version->compaction_score_[i]);
c = PickCompactionBySize(mutable_cf_options, version, level,
version->compaction_score_[i]);
if (c == nullptr || ExpandWhileOverlapping(c) == false) {
delete c;
c = nullptr;
@ -472,7 +417,7 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version,
}
// Setup "level+1" files (inputs_[1])
SetupOtherInputs(c);
SetupOtherInputs(mutable_cf_options, c);
// mark all the files that are being compacted
c->MarkFilesBeingCompacted(true);
@ -483,12 +428,13 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version,
// remember this currently undergoing compaction
compactions_in_progress_[level].insert(c);
c->mutable_cf_options_ = mutable_cf_options;
return c;
}
Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
int level,
double score) {
Compaction* LevelCompactionPicker::PickCompactionBySize(
const MutableCFOptions& mutable_cf_options,
Version* version, int level, double score) {
Compaction* c = nullptr;
// level 0 files are overlapping. So we cannot pick more
@ -501,9 +447,10 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
assert(level >= 0);
assert(level + 1 < NumberLevels());
c = new Compaction(version, level, level + 1, MaxFileSizeForLevel(level + 1),
MaxGrandParentOverlapBytes(level), 0,
GetCompressionType(*options_, level + 1));
c = new Compaction(version, level, level + 1,
mutable_cf_options.MaxFileSizeForLevel(level + 1),
mutable_cf_options.MaxGrandParentOverlapBytes(level), 0,
GetCompressionType(ioptions_, level + 1));
c->score_ = score;
// Pick the largest file in this level that is not already
@ -563,13 +510,14 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
// Universal style of compaction. Pick files that are contiguous in
// time-range to compact.
//
Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
LogBuffer* log_buffer) {
Compaction* UniversalCompactionPicker::PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) {
int level = 0;
double score = version->compaction_score_[0];
if ((version->files_[level].size() <
(unsigned int)options_->level0_file_num_compaction_trigger)) {
(unsigned int)mutable_cf_options.level0_file_num_compaction_trigger)) {
LogToBuffer(log_buffer, "[%s] Universal: nothing to do\n",
version->cfd_->GetName().c_str());
return nullptr;
@ -581,17 +529,18 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
// Check for size amplification first.
Compaction* c;
if ((c = PickCompactionUniversalSizeAmp(version, score, log_buffer)) !=
nullptr) {
if ((c = PickCompactionUniversalSizeAmp(
mutable_cf_options, version, score, log_buffer)) != nullptr) {
LogToBuffer(log_buffer, "[%s] Universal: compacting for size amp\n",
version->cfd_->GetName().c_str());
} else {
// Size amplification is within limits. Try reducing read
// amplification while maintaining file size ratios.
unsigned int ratio = options_->compaction_options_universal.size_ratio;
unsigned int ratio = ioptions_.compaction_options_universal.size_ratio;
if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX,
log_buffer)) != nullptr) {
if ((c = PickCompactionUniversalReadAmp(
mutable_cf_options, version, score, ratio,
UINT_MAX, log_buffer)) != nullptr) {
LogToBuffer(log_buffer, "[%s] Universal: compacting for size ratio\n",
version->cfd_->GetName().c_str());
} else {
@ -600,9 +549,10 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
// compaction without looking at filesize ratios and try to reduce
// the number of files to fewer than level0_file_num_compaction_trigger.
unsigned int num_files = version->files_[level].size() -
options_->level0_file_num_compaction_trigger;
mutable_cf_options.level0_file_num_compaction_trigger;
if ((c = PickCompactionUniversalReadAmp(
version, score, UINT_MAX, num_files, log_buffer)) != nullptr) {
mutable_cf_options, version, score, UINT_MAX,
num_files, log_buffer)) != nullptr) {
LogToBuffer(log_buffer, "[%s] Universal: compacting for file num\n",
version->cfd_->GetName().c_str());
}
@ -628,7 +578,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
c->bottommost_level_ = c->inputs_[0].files.back() == last_file;
// update statistics
MeasureTime(options_->statistics.get(),
MeasureTime(ioptions_.statistics,
NUM_FILES_IN_SINGLE_COMPACTION, c->inputs_[0].size());
// mark all the files that are being compacted
@ -642,11 +592,12 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
c->is_full_compaction_ =
(c->inputs_[0].size() == c->input_version_->files_[0].size());
c->mutable_cf_options_ = mutable_cf_options;
return c;
}
uint32_t UniversalCompactionPicker::GetPathId(const Options& options,
uint64_t file_size) {
uint32_t UniversalCompactionPicker::GetPathId(
const ImmutableCFOptions& ioptions, uint64_t file_size) {
// Two conditions need to be satisfied:
// (1) the target path needs to be able to hold the file's size
// (2) Total size left in this and previous paths need to be not
@ -662,11 +613,11 @@ uint32_t UniversalCompactionPicker::GetPathId(const Options& options,
// considered in this algorithm. So the target size can be violated in
// that case. We need to improve it.
uint64_t accumulated_size = 0;
uint64_t future_size =
file_size * (100 - options.compaction_options_universal.size_ratio) / 100;
uint64_t future_size = file_size *
(100 - ioptions.compaction_options_universal.size_ratio) / 100;
uint32_t p = 0;
for (; p < options.db_paths.size() - 1; p++) {
uint64_t target_size = options.db_paths[p].target_size;
for (; p < ioptions.db_paths.size() - 1; p++) {
uint64_t target_size = ioptions.db_paths[p].target_size;
if (target_size > file_size &&
accumulated_size + (target_size - file_size) > future_size) {
return p;
@ -681,14 +632,15 @@ uint32_t UniversalCompactionPicker::GetPathId(const Options& options,
// the next file in time order.
//
Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
Version* version, double score, unsigned int ratio,
const MutableCFOptions& mutable_cf_options, Version* version,
double score, unsigned int ratio,
unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) {
int level = 0;
unsigned int min_merge_width =
options_->compaction_options_universal.min_merge_width;
ioptions_.compaction_options_universal.min_merge_width;
unsigned int max_merge_width =
options_->compaction_options_universal.max_merge_width;
ioptions_.compaction_options_universal.max_merge_width;
// The files are sorted from newest first to oldest last.
const auto& files = version->files_[level];
@ -750,7 +702,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
if (sz < static_cast<double>(f->fd.GetFileSize())) {
break;
}
if (options_->compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) {
if (ioptions_.compaction_options_universal.stop_style ==
kCompactionStopStyleSimilarSize) {
// Similar-size stopping rule: also check the last picked file isn't
// far larger than the next candidate file.
sz = (f->fd.GetFileSize() * (100.0 + ratio)) / 100.0;
@ -794,7 +747,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// size ratio of compression.
bool enable_compression = true;
int ratio_to_compress =
options_->compaction_options_universal.compression_size_percent;
ioptions_.compaction_options_universal.compression_size_percent;
if (ratio_to_compress >= 0) {
uint64_t total_size = version->NumLevelBytes(level);
uint64_t older_file_size = 0;
@ -812,11 +765,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
for (unsigned int i = 0; i < first_index_after; i++) {
estimated_total_size += files[i]->fd.GetFileSize();
}
uint32_t path_id = GetPathId(*options_, estimated_total_size);
uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
Compaction* c = new Compaction(
version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, path_id,
GetCompressionType(*options_, level, enable_compression));
version, level, level, mutable_cf_options.MaxFileSizeForLevel(level),
LLONG_MAX, path_id, GetCompressionType(ioptions_, level,
enable_compression));
c->score_ = score;
for (unsigned int i = start_index; i < first_index_after; i++) {
@ -841,11 +795,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// min_merge_width and max_merge_width).
//
Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
Version* version, double score, LogBuffer* log_buffer) {
const MutableCFOptions& mutable_cf_options, Version* version,
double score, LogBuffer* log_buffer) {
int level = 0;
// percentage flexibilty while reducing size amplification
uint64_t ratio = options_->compaction_options_universal.
uint64_t ratio = ioptions_.compaction_options_universal.
max_size_amplification_percent;
// The files are sorted from newest first to oldest last.
@ -920,20 +875,21 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
"earliest-file-size %" PRIu64,
version->cfd_->GetName().c_str(), candidate_size, earliest_file_size);
}
assert(start_index >= 0 && start_index < files.size() - 1);
assert(start_index < files.size() - 1);
// Estimate total file size
uint64_t estimated_total_size = 0;
for (unsigned int loop = start_index; loop < files.size(); loop++) {
estimated_total_size += files[loop]->fd.GetFileSize();
}
uint32_t path_id = GetPathId(*options_, estimated_total_size);
uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
// create a compaction request
// We always compact all the files, so always compress.
Compaction* c =
new Compaction(version, level, level, MaxFileSizeForLevel(level),
LLONG_MAX, path_id, GetCompressionType(*options_, level));
new Compaction(version, level, level,
mutable_cf_options.MaxFileSizeForLevel(level),
LLONG_MAX, path_id, GetCompressionType(ioptions_, level));
c->score_ = score;
for (unsigned int loop = start_index; loop < files.size(); loop++) {
f = c->input_version_->files_[level][loop];
@ -948,22 +904,23 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
return c;
}
Compaction* FIFOCompactionPicker::PickCompaction(Version* version,
LogBuffer* log_buffer) {
Compaction* FIFOCompactionPicker::PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) {
assert(version->NumberLevels() == 1);
uint64_t total_size = 0;
for (const auto& file : version->files_[0]) {
total_size += file->compensated_file_size;
}
if (total_size <= options_->compaction_options_fifo.max_table_files_size ||
if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size ||
version->files_[0].size() == 0) {
// total size not exceeded
LogToBuffer(log_buffer,
"[%s] FIFO compaction: nothing to do. Total size %" PRIu64
", max size %" PRIu64 "\n",
version->cfd_->GetName().c_str(), total_size,
options_->compaction_options_fifo.max_table_files_size);
ioptions_.compaction_options_fifo.max_table_files_size);
return nullptr;
}
@ -988,28 +945,29 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version,
LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64
" with size %s for deletion",
version->cfd_->GetName().c_str(), f->fd.GetNumber(), tmp_fsize);
if (total_size <= options_->compaction_options_fifo.max_table_files_size) {
if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) {
break;
}
}
c->MarkFilesBeingCompacted(true);
compactions_in_progress_[0].insert(c);
c->mutable_cf_options_ = mutable_cf_options;
return c;
}
Compaction* FIFOCompactionPicker::CompactRange(
const MutableCFOptions& mutable_cf_options,
Version* version, int input_level, int output_level,
uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) {
assert(input_level == 0);
assert(output_level == 0);
*compaction_end = nullptr;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, options_->info_log.get());
Compaction* c = PickCompaction(version, &log_buffer);
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log);
Compaction* c = PickCompaction(mutable_cf_options, version, &log_buffer);
if (c != nullptr) {
assert(output_path_id < static_cast<uint32_t>(options_->db_paths.size()));
assert(output_path_id < static_cast<uint32_t>(ioptions_.db_paths.size()));
c->output_path_id_ = output_path_id;
}
log_buffer.FlushBufferToLog();

@ -13,6 +13,7 @@
#include "rocksdb/status.h"
#include "rocksdb/options.h"
#include "rocksdb/env.h"
#include "util/mutable_cf_options.h"
#include <vector>
#include <memory>
@ -26,15 +27,17 @@ class Version;
class CompactionPicker {
public:
CompactionPicker(const Options* options, const InternalKeyComparator* icmp);
CompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp);
virtual ~CompactionPicker();
// Pick level and inputs for a new compaction.
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) = 0;
virtual Compaction* PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) = 0;
// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
@ -47,11 +50,11 @@ class CompactionPicker {
// compaction_end will be set to nullptr.
// Client is responsible for compaction_end storage -- when called,
// *compaction_end should point to valid InternalKey!
virtual Compaction* CompactRange(Version* version, int input_level,
int output_level, uint32_t output_path_id,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end);
virtual Compaction* CompactRange(
const MutableCFOptions& mutable_cf_options, Version* version,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end);
// Given the current number of levels, returns the lowest allowed level
// for compaction input.
@ -64,19 +67,8 @@ class CompactionPicker {
// compactions per level
void SizeBeingCompacted(std::vector<uint64_t>& sizes);
// Returns maximum total overlap bytes with grandparent
// level (i.e., level+2) before we stop building a single
// file in level->level+1 compaction.
uint64_t MaxGrandParentOverlapBytes(int level);
// Returns maximum total bytes of data on a given level.
double MaxBytesForLevel(int level);
// Get the max file size in a given level.
uint64_t MaxFileSizeForLevel(int level) const;
protected:
int NumberLevels() const { return num_levels_; }
int NumberLevels() const { return ioptions_.num_levels; }
// Stores the minimal range that covers all entries in inputs in
// *smallest, *largest.
@ -103,8 +95,6 @@ class CompactionPicker {
// Will return false if it is impossible to apply this compaction.
bool ExpandWhileOverlapping(Compaction* c);
uint64_t ExpandedCompactionByteSizeLimit(int level);
// Returns true if any one of the specified files are being compacted
bool FilesInCompaction(std::vector<FileMetaData*>& files);
@ -113,32 +103,27 @@ class CompactionPicker {
const InternalKey* largest, int level,
int* index);
void SetupOtherInputs(Compaction* c);
void SetupOtherInputs(const MutableCFOptions& mutable_cf_options,
Compaction* c);
const ImmutableCFOptions& ioptions_;
// record all the ongoing compactions for all levels
std::vector<std::set<Compaction*>> compactions_in_progress_;
// Per-level target file size.
std::unique_ptr<uint64_t[]> max_file_size_;
// Per-level max bytes
std::unique_ptr<uint64_t[]> level_max_bytes_;
const Options* const options_;
private:
int num_levels_;
const InternalKeyComparator* const icmp_;
};
class UniversalCompactionPicker : public CompactionPicker {
public:
UniversalCompactionPicker(const Options* options,
UniversalCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) override;
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) override;
// The maxinum allowed input level. Always return 0.
virtual int MaxInputLevel(int current_num_levels) const override {
@ -147,27 +132,30 @@ class UniversalCompactionPicker : public CompactionPicker {
private:
// Pick Universal compaction to limit read amplification
Compaction* PickCompactionUniversalReadAmp(Version* version, double score,
unsigned int ratio,
unsigned int num_files,
LogBuffer* log_buffer);
Compaction* PickCompactionUniversalReadAmp(
const MutableCFOptions& mutable_cf_options,
Version* version, double score, unsigned int ratio,
unsigned int num_files, LogBuffer* log_buffer);
// Pick Universal compaction to limit space amplification.
Compaction* PickCompactionUniversalSizeAmp(Version* version, double score,
LogBuffer* log_buffer);
Compaction* PickCompactionUniversalSizeAmp(
const MutableCFOptions& mutable_cf_options,
Version* version, double score, LogBuffer* log_buffer);
// Pick a path ID to place a newly generated file, with its estimated file
// size.
static uint32_t GetPathId(const Options& options, uint64_t file_size);
static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
uint64_t file_size);
};
class LevelCompactionPicker : public CompactionPicker {
public:
LevelCompactionPicker(const Options* options,
LevelCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) override;
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) override;
// Returns current_num_levels - 2, meaning the last level cannot be
// compaction input level.
@ -180,23 +168,25 @@ class LevelCompactionPicker : public CompactionPicker {
// Returns nullptr if there is no compaction to be done.
// If level is 0 and there is already a compaction on that level, this
// function will return nullptr.
Compaction* PickCompactionBySize(Version* version, int level, double score);
Compaction* PickCompactionBySize(const MutableCFOptions& mutable_cf_options,
Version* version, int level, double score);
};
class FIFOCompactionPicker : public CompactionPicker {
public:
FIFOCompactionPicker(const Options* options,
FIFOCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) override;
virtual Compaction* PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) override;
virtual Compaction* CompactRange(Version* version, int input_level,
int output_level, uint32_t output_path_id,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) override;
virtual Compaction* CompactRange(
const MutableCFOptions& mutable_cf_options, Version* version,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) override;
// The maxinum allowed input level. Always return 0.
virtual int MaxInputLevel(int current_num_levels) const override {

@ -131,7 +131,7 @@ class CorruptionTest {
ASSERT_GE(max_expected, correct);
}
void CorruptFile(const std::string fname, int offset, int bytes_to_corrupt) {
void CorruptFile(const std::string& fname, int offset, int bytes_to_corrupt) {
struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) {
const char* msg = strerror(errno);

@ -218,6 +218,7 @@ TEST(CuckooTableDBTest, Uint64Comparator) {
// Add more keys.
ASSERT_OK(Delete(Uint64Key(2))); // Delete.
dbfull()->TEST_FlushMemTable();
ASSERT_OK(Put(Uint64Key(3), "v0")); // Update.
ASSERT_OK(Put(Uint64Key(4), "v4"));
dbfull()->TEST_FlushMemTable();

@ -1410,7 +1410,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// VersionSet::next_file_number_ always to be strictly greater than any
// log number
versions_->MarkFileNumberUsed(max_log_number + 1);
status = versions_->LogAndApply(cfd, edit, &mutex_);
status = versions_->LogAndApply(
cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_);
if (!status.ok()) {
// Recovery failed
break;
@ -1479,8 +1480,9 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
}
Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
autovector<MemTable*>& mems, VersionEdit* edit,
uint64_t* filenumber, LogBuffer* log_buffer) {
const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
@ -1560,7 +1562,8 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
if (base != nullptr && db_options_.max_background_compactions <= 1 &&
db_options_.max_background_flushes == 0 &&
cfd->ioptions()->compaction_style == kCompactionStyleLevel) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
level = base->PickLevelForMemTableOutput(
mutable_cf_options, min_user_key, max_user_key);
}
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
@ -1577,10 +1580,9 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
return s;
}
Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
bool* madeProgress,
DeletionState& deletion_state,
LogBuffer* log_buffer) {
Status DBImpl::FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer) {
mutex_.AssertHeld();
assert(cfd->imm()->size() != 0);
assert(cfd->imm()->IsFlushPending());
@ -1607,8 +1609,10 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
edit->SetLogNumber(mems.back()->GetNextLogNumber());
edit->SetColumnFamily(cfd->GetID());
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table(cfd, mems, edit, &file_number, log_buffer);
Status s = WriteLevel0Table(cfd, mutable_cf_options, mems, edit,
&file_number, log_buffer);
if (s.ok() && shutting_down_.Acquire_Load() && cfd->IsDropped()) {
s = Status::ShutdownInProgress(
@ -1620,14 +1624,13 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
} else {
// Replace immutable memtable with the generated Table
s = cfd->imm()->InstallMemtableFlushResults(
cfd, mems, versions_.get(), &mutex_, db_options_.info_log.get(),
file_number, &pending_outputs_, &deletion_state.memtables_to_free,
db_directory_.get(), log_buffer);
cfd, mutable_cf_options, mems, versions_.get(), &mutex_,
db_options_.info_log.get(), file_number, &pending_outputs_,
&deletion_state.memtables_to_free, db_directory_.get(), log_buffer);
}
if (s.ok()) {
// Use latest MutableCFOptions
InstallSuperVersion(cfd, deletion_state);
InstallSuperVersion(cfd, deletion_state, mutable_cf_options);
if (madeProgress) {
*madeProgress = 1;
}
@ -1726,7 +1729,8 @@ bool DBImpl::SetOptions(ColumnFamilyHandle* column_family,
}
// return the same level if it cannot be moved
int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level) {
int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options, int level) {
mutex_.AssertHeld();
Version* current = cfd->current();
int minimum_level = level;
@ -1734,7 +1738,7 @@ int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level) {
// stop if level i is not empty
if (current->NumLevelFiles(i) > 0) break;
// stop if level i is too small (cannot fit the level files)
if (cfd->compaction_picker()->MaxBytesForLevel(i) <
if (mutable_cf_options.MaxBytesForLevel(i) <
current->NumLevelBytes(level)) {
break;
}
@ -1770,10 +1774,12 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
bg_cv_.Wait();
}
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
// move to a smaller level
int to_level = target_level;
if (target_level < 0) {
to_level = FindMinimumEmptyLevelFitting(cfd, level);
to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
}
assert(to_level <= level);
@ -1794,9 +1800,10 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
Log(db_options_.info_log, "[%s] Apply version edit:\n%s",
cfd->GetName().c_str(), edit.DebugString().data());
status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get());
// Use latest MutableCFOptions
superversion_to_free = cfd->InstallSuperVersion(new_superversion, &mutex_);
status = versions_->LogAndApply(cfd,
mutable_cf_options, &edit, &mutex_, db_directory_.get());
superversion_to_free = cfd->InstallSuperVersion(
new_superversion, &mutex_, mutable_cf_options);
new_superversion = nullptr;
Log(db_options_.info_log, "[%s] LogAndApply: %s\n", cfd->GetName().c_str(),
@ -2058,6 +2065,8 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->Ref();
Status flush_status;
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
while (flush_status.ok() && cfd->imm()->IsFlushPending()) {
LogToBuffer(
log_buffer,
@ -2065,8 +2074,8 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
"family [%s], flush slots available %d",
cfd->GetName().c_str(),
db_options_.max_background_flushes - bg_flush_scheduled_);
flush_status = FlushMemTableToOutputFile(cfd, madeProgress,
deletion_state, log_buffer);
flush_status = FlushMemTableToOutputFile(
cfd, mutable_cf_options, madeProgress, deletion_state, log_buffer);
}
if (call_status.ok() && !flush_status.ok()) {
call_status = flush_status;
@ -2259,6 +2268,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
// FLUSH preempts compaction
Status flush_stat;
for (auto cfd : *versions_->GetColumnFamilySet()) {
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
while (cfd->imm()->IsFlushPending()) {
LogToBuffer(
log_buffer,
@ -2266,8 +2277,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
"compaction slots available %d",
db_options_.max_background_compactions - bg_compaction_scheduled_);
cfd->Ref();
flush_stat = FlushMemTableToOutputFile(cfd, madeProgress, deletion_state,
log_buffer);
flush_stat = FlushMemTableToOutputFile(
cfd, mutable_cf_options, madeProgress, deletion_state, log_buffer);
cfd->Unref();
if (!flush_stat.ok()) {
if (is_manual) {
@ -2281,15 +2292,18 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
}
}
// Compaction makes a copy of the latest MutableCFOptions. It should be used
// throughout the compaction procedure to make sure consistency. It will
// eventually be installed into SuperVersion
unique_ptr<Compaction> c;
InternalKey manual_end_storage;
InternalKey* manual_end = &manual_end_storage;
if (is_manual) {
ManualCompaction* m = manual_compaction_;
assert(m->in_progress);
c.reset(m->cfd->CompactRange(m->input_level, m->output_level,
m->output_path_id, m->begin, m->end,
&manual_end));
c.reset(m->cfd->CompactRange(
*m->cfd->GetLatestMutableCFOptions(), m->input_level, m->output_level,
m->output_path_id, m->begin, m->end, &manual_end));
if (!c) {
m->done = true;
}
@ -2306,7 +2320,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
// no need to refcount in iteration since it's always under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->options()->disable_auto_compactions) {
c.reset(cfd->PickCompaction(log_buffer));
// NOTE: try to avoid unnecessary copy of MutableCFOptions if
// compaction is not necessary. Need to make sure mutex is held
// until we make a copy in the following code
c.reset(cfd->PickCompaction(
*cfd->GetLatestMutableCFOptions(), log_buffer));
if (c != nullptr) {
// update statistics
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
@ -2331,10 +2349,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
}
status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
db_directory_.get());
// Use latest MutableCFOptions
InstallSuperVersion(c->column_family_data(), deletion_state);
status = versions_->LogAndApply(
c->column_family_data(), *c->mutable_cf_options(), c->edit(),
&mutex_, db_directory_.get());
InstallSuperVersion(c->column_family_data(), deletion_state,
*c->mutable_cf_options());
LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(),
c->num_input_files(0));
@ -2348,10 +2367,12 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
db_directory_.get());
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(),
c->edit(), &mutex_, db_directory_.get());
// Use latest MutableCFOptions
InstallSuperVersion(c->column_family_data(), deletion_state);
InstallSuperVersion(c->column_family_data(), deletion_state,
*c->mutable_cf_options());
Version::LevelSummaryStorage tmp;
LogToBuffer(
@ -2366,7 +2387,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
} else {
MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel.
CompactionState* compact = new CompactionState(c.get());
status = DoCompactionWork(compact, deletion_state, log_buffer);
status = DoCompactionWork(compact, *c->mutable_cf_options(),
deletion_state, log_buffer);
CleanupCompaction(compact, status);
c->ReleaseCompactionFiles(status);
c->ReleaseInputs();
@ -2468,7 +2490,8 @@ void DBImpl::ReleaseCompactionUnusedFileNumbers(CompactionState* compact) {
}
}
Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
Status DBImpl::OpenCompactionOutputFile(
CompactionState* compact, const MutableCFOptions& mutable_cf_options) {
assert(compact != nullptr);
assert(compact->builder == nullptr);
uint64_t file_number;
@ -2500,7 +2523,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
if (s.ok()) {
compact->outfile->SetIOPriority(Env::IO_LOW);
compact->outfile->SetPreallocationBlockSize(
compact->compaction->OutputFilePreallocationSize());
compact->compaction->OutputFilePreallocationSize(mutable_cf_options));
ColumnFamilyData* cfd = compact->compaction->column_family_data();
compact->builder.reset(NewTableBuilder(
@ -2570,7 +2593,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
Status DBImpl::InstallCompactionResults(CompactionState* compact,
LogBuffer* log_buffer) {
const MutableCFOptions& mutable_cf_options, LogBuffer* log_buffer) {
mutex_.AssertHeld();
// paranoia: verify that the files that we started with
@ -2604,6 +2627,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact,
out.smallest_seqno, out.largest_seqno);
}
return versions_->LogAndApply(compact->compaction->column_family_data(),
mutable_cf_options,
compact->compaction->edit(), &mutex_,
db_directory_.get());
}
@ -2635,8 +2659,8 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
}
uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd,
DeletionState& deletion_state,
LogBuffer* log_buffer) {
const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state,
LogBuffer* log_buffer) {
if (db_options_.max_background_flushes > 0) {
// flush thread will take care of this
return 0;
@ -2646,7 +2670,8 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd,
mutex_.Lock();
if (cfd->imm()->IsFlushPending()) {
cfd->Ref();
FlushMemTableToOutputFile(cfd, nullptr, deletion_state, log_buffer);
FlushMemTableToOutputFile(cfd, mutable_cf_options, nullptr,
deletion_state, log_buffer);
cfd->Unref();
bg_cv_.SignalAll(); // Wakeup DelayWrite() if necessary
}
@ -2658,6 +2683,7 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd,
}
Status DBImpl::ProcessKeyValueCompaction(
const MutableCFOptions& mutable_cf_options,
bool is_snapshot_supported,
SequenceNumber visible_at_tip,
SequenceNumber earliest_snapshot,
@ -2721,7 +2747,8 @@ Status DBImpl::ProcessKeyValueCompaction(
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
// other column families, too
imm_micros += CallFlushDuringCompaction(cfd, deletion_state, log_buffer);
imm_micros += CallFlushDuringCompaction(
cfd, mutable_cf_options, deletion_state, log_buffer);
Slice key;
Slice value;
@ -2922,7 +2949,7 @@ Status DBImpl::ProcessKeyValueCompaction(
// Open output file if necessary
if (compact->builder == nullptr) {
status = OpenCompactionOutputFile(compact);
status = OpenCompactionOutputFile(compact, mutable_cf_options);
if (!status.ok()) {
break;
}
@ -3059,12 +3086,12 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact,
}
Status DBImpl::DoCompactionWork(CompactionState* compact,
const MutableCFOptions& mutable_cf_options,
DeletionState& deletion_state,
LogBuffer* log_buffer) {
assert(compact);
compact->CleanupBatchBuffer();
compact->CleanupMergedBuffer();
bool prefix_initialized = false;
// Generate file_levels_ for compaction berfore making Iterator
compact->compaction->GenerateFileLevels();
@ -3130,6 +3157,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
if (!compaction_filter_v2) {
status = ProcessKeyValueCompaction(
mutable_cf_options,
is_snapshot_supported,
visible_at_tip,
earliest_snapshot,
@ -3149,6 +3177,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// 2) send value_buffer to compaction filter and alternate the values;
// 3) merge value_buffer with ineligible_value_buffer;
// 4) run the modified "compaction" using the old for loop.
bool prefix_initialized = false;
shared_ptr<Iterator> backup_input(
versions_->MakeInputIterator(compact->compaction));
backup_input->SeekToFirst();
@ -3158,7 +3187,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
// other column families, too
imm_micros += CallFlushDuringCompaction(cfd, deletion_state, log_buffer);
imm_micros += CallFlushDuringCompaction(cfd, mutable_cf_options,
deletion_state, log_buffer);
Slice key = backup_input->key();
Slice value = backup_input->value();
@ -3208,6 +3238,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// Done buffering for the current prefix. Spit it out to disk
// Now just iterate through all the kv-pairs
status = ProcessKeyValueCompaction(
mutable_cf_options,
is_snapshot_supported,
visible_at_tip,
earliest_snapshot,
@ -3244,6 +3275,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(
mutable_cf_options,
is_snapshot_supported,
visible_at_tip,
earliest_snapshot,
@ -3266,6 +3298,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
}
compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(
mutable_cf_options,
is_snapshot_supported,
visible_at_tip,
earliest_snapshot,
@ -3333,9 +3366,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
ReleaseCompactionUnusedFileNumbers(compact);
if (status.ok()) {
status = InstallCompactionResults(compact, log_buffer);
// Use latest MutableCFOptions
InstallSuperVersion(cfd, deletion_state);
status = InstallCompactionResults(compact, mutable_cf_options, log_buffer);
InstallSuperVersion(cfd, deletion_state, mutable_cf_options);
}
Version::LevelSummaryStorage tmp;
LogToBuffer(
@ -3434,16 +3466,16 @@ Status DBImpl::Get(const ReadOptions& options,
// first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free
void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
DeletionState& deletion_state) {
void DBImpl::InstallSuperVersion(
ColumnFamilyData* cfd, DeletionState& deletion_state,
const MutableCFOptions& mutable_cf_options) {
mutex_.AssertHeld();
// if new_superversion == nullptr, it means somebody already used it
SuperVersion* new_superversion =
(deletion_state.new_superversion != nullptr) ?
deletion_state.new_superversion : new SuperVersion();
// Use latest MutableCFOptions
SuperVersion* old_superversion =
cfd->InstallSuperVersion(new_superversion, &mutex_);
cfd->InstallSuperVersion(new_superversion, &mutex_, mutable_cf_options);
deletion_state.new_superversion = nullptr;
deletion_state.superversions_to_free.push_back(old_superversion);
}
@ -3627,15 +3659,17 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
// LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object
Status s = versions_->LogAndApply(nullptr, &edit, &mutex_,
db_directory_.get(), false, &options);
Options opt(db_options_, options);
Status s = versions_->LogAndApply(nullptr,
MutableCFOptions(opt, ImmutableCFOptions(opt)),
&edit, &mutex_, db_directory_.get(), false, &options);
if (s.ok()) {
single_column_family_mode_ = false;
auto cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
// Use latest MutableCFOptions
delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_);
delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_,
*cfd->GetLatestMutableCFOptions());
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
Log(db_options_.info_log, "Created column family [%s] (ID %u)",
column_family_name.c_str(), (unsigned)cfd->GetID());
@ -3671,7 +3705,8 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
WriteThread::Writer w(&mutex_);
s = write_thread_.EnterWriteThread(&w, 0);
assert(s.ok() && !w.done); // No timeout and nobody should do our job
s = versions_->LogAndApply(cfd, &edit, &mutex_);
s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_);
write_thread_.ExitWriteThread(&w, &w, s);
}
}
@ -4037,11 +4072,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
RecordTick(stats_, WAL_FILE_BYTES, log_size);
if (status.ok() && options.sync) {
RecordTick(stats_, WAL_FILE_SYNCED);
StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
if (db_options_.use_fsync) {
StopWatch(env_, stats_, WAL_FILE_SYNC_MICROS);
status = log_->file()->Fsync();
} else {
StopWatch(env_, stats_, WAL_FILE_SYNC_MICROS);
status = log_->file()->Sync();
}
}
@ -4451,9 +4485,11 @@ Status DBImpl::DeleteFile(std::string name) {
}
}
edit.DeleteFile(level, number);
status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get());
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, db_directory_.get());
if (status.ok()) {
InstallSuperVersion(cfd, deletion_state);
InstallSuperVersion(cfd, deletion_state,
*cfd->GetLatestMutableCFOptions());
}
FindObsoleteFiles(deletion_state, false);
} // lock released here
@ -4682,8 +4718,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
}
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
// Use latest MutableCFOptions
delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_);
delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_,
*cfd->GetLatestMutableCFOptions());
}
impl->alive_log_files_.push_back(
DBImpl::LogFileNumberSize(impl->logfile_number_));

@ -347,9 +347,9 @@ class DBImpl : public DB {
// Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, bool* madeProgress,
DeletionState& deletion_state,
LogBuffer* log_buffer);
Status FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer);
// REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
@ -362,9 +362,10 @@ class DBImpl : public DB {
// concurrent flush memtables to storage.
Status WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
VersionEdit* edit);
Status WriteLevel0Table(ColumnFamilyData* cfd, autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber,
LogBuffer* log_buffer);
Status WriteLevel0Table(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer);
void DelayWrite(uint64_t expiration_time);
@ -393,6 +394,7 @@ class DBImpl : public DB {
LogBuffer* log_buffer);
void CleanupCompaction(CompactionState* compact, Status status);
Status DoCompactionWork(CompactionState* compact,
const MutableCFOptions& mutable_cf_options,
DeletionState& deletion_state,
LogBuffer* log_buffer);
@ -400,12 +402,13 @@ class DBImpl : public DB {
// preempt compaction, since it's higher prioirty
// Returns: micros spent executing
uint64_t CallFlushDuringCompaction(ColumnFamilyData* cfd,
DeletionState& deletion_state,
LogBuffer* log_buffer);
const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state,
LogBuffer* log_buffer);
// Call compaction filter if is_compaction_v2 is not true. Then iterate
// through input and compact the kv-pairs
Status ProcessKeyValueCompaction(
const MutableCFOptions& mutable_cf_options,
bool is_snapshot_supported,
SequenceNumber visible_at_tip,
SequenceNumber earliest_snapshot,
@ -422,10 +425,11 @@ class DBImpl : public DB {
void CallCompactionFilterV2(CompactionState* compact,
CompactionFilterV2* compaction_filter_v2);
Status OpenCompactionOutputFile(CompactionState* compact);
Status OpenCompactionOutputFile(CompactionState* compact,
const MutableCFOptions& mutable_cf_options);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Status InstallCompactionResults(CompactionState* compact,
LogBuffer* log_buffer);
const MutableCFOptions& mutable_cf_options, LogBuffer* log_buffer);
void AllocateCompactionOutputFileNumbers(CompactionState* compact);
void ReleaseCompactionUnusedFileNumbers(CompactionState* compact);
@ -467,7 +471,8 @@ class DBImpl : public DB {
// Return the minimum empty level that could hold the total data in the
// input level. Return the input level, if such level could not be found.
int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level);
int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options, int level);
// Move the files in the input level to the target level.
// If target_level < 0, automatically calculate the minimum level that could
@ -621,7 +626,8 @@ class DBImpl : public DB {
// the cfd->InstallSuperVersion() function. Background threads carry
// deletion_state which can have new_superversion already allocated.
void InstallSuperVersion(ColumnFamilyData* cfd,
DeletionState& deletion_state);
DeletionState& deletion_state,
const MutableCFOptions& mutable_cf_options);
// Find Super version and reference it. Based on options, it might return
// the thread local cached one.

@ -287,7 +287,6 @@ void DBIter::MergeValuesNewToOld() {
std::deque<std::string> operands;
operands.push_front(iter_->value().ToString());
std::string merge_result; // Temporary string to hold merge result later
ParsedInternalKey ikey;
for (iter_->Next(); iter_->Valid(); iter_->Next()) {
if (!ParseKey(&ikey)) {

@ -874,6 +874,18 @@ class DBTest {
return atoi(property.c_str());
}
uint64_t SizeAtLevel(int level) {
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
uint64_t sum = 0;
for (const auto& m : metadata) {
if (m.level == level) {
sum += m.size;
}
}
return sum;
}
int TotalTableFiles(int cf = 0, int levels = -1) {
if (levels == -1) {
levels = CurrentOptions().num_levels;
@ -4672,9 +4684,9 @@ TEST(DBTest, CompactionFilterContextManual) {
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
// Verify total number of keys is correct after manual compaction.
int count = 0;
int total = 0;
{
int count = 0;
int total = 0;
Arena arena;
ScopedArenaIterator iter(dbfull()->TEST_NewInternalIterator(&arena));
iter->SeekToFirst();
@ -6138,7 +6150,7 @@ class WrappedBloom : public FilterPolicy {
const FilterPolicy* filter_;
mutable uint32_t counter_;
rocksdb::Slice convertKey(const rocksdb::Slice key) const {
rocksdb::Slice convertKey(const rocksdb::Slice& key) const {
return key;
}
};
@ -8193,7 +8205,6 @@ static void RandomTimeoutWriter(void* arg) {
if (write_opt.timeout_hint_us == 0 ||
put_duration + kTimerBias < write_opt.timeout_hint_us) {
ASSERT_OK(s);
std::string result;
}
if (s.IsTimedOut()) {
timeout_count++;
@ -8527,6 +8538,102 @@ TEST(DBTest, DisableDataSyncTest) {
}
}
TEST(DBTest, DynamicCompactionOptions) {
const uint64_t k64KB = 1 << 16;
const uint64_t k128KB = 1 << 17;
const uint64_t k256KB = 1 << 18;
const uint64_t k5KB = 5 * 1024;
Options options;
options.env = env_;
options.create_if_missing = true;
options.compression = kNoCompression;
options.max_background_compactions = 4;
options.hard_rate_limit = 1.1;
options.write_buffer_size = k128KB;
options.max_write_buffer_number = 2;
// Compaction related options
options.level0_file_num_compaction_trigger = 3;
options.level0_slowdown_writes_trigger = 10;
options.level0_stop_writes_trigger = 20;
options.max_grandparent_overlap_factor = 10;
options.expanded_compaction_factor = 25;
options.source_compaction_factor = 1;
options.target_file_size_base = k128KB;
options.target_file_size_multiplier = 1;
options.max_bytes_for_level_base = k256KB;
options.max_bytes_for_level_multiplier = 4;
DestroyAndReopen(&options);
auto gen_l0_kb = [this](int start, int size, int stride = 1) {
Random rnd(301);
std::vector<std::string> values;
for (int i = 0; i < size; i++) {
values.push_back(RandomString(&rnd, 1024));
ASSERT_OK(Put(Key(start + stride * i), values[i]));
}
dbfull()->TEST_WaitForFlushMemTable();
};
// Write 3 files that have the same key range, trigger compaction and
// result in one L1 file
gen_l0_kb(0, 128);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
gen_l0_kb(0, 128);
ASSERT_EQ(NumTableFilesAtLevel(0), 2);
gen_l0_kb(0, 128);
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("0,1", FilesPerLevel());
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(1U, metadata.size());
ASSERT_LE(metadata[0].size, k128KB + k5KB); // < 128KB + 5KB
ASSERT_GE(metadata[0].size, k128KB - k5KB); // > 128B - 5KB
// Make compaction trigger and file size smaller
ASSERT_TRUE(dbfull()->SetOptions({
{"level0_file_num_compaction_trigger", "2"},
{"target_file_size_base", "65536"}
}));
gen_l0_kb(0, 128);
ASSERT_EQ("1,1", FilesPerLevel());
gen_l0_kb(0, 128);
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("0,2", FilesPerLevel());
metadata.clear();
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(2U, metadata.size());
ASSERT_LE(metadata[0].size, k64KB + k5KB); // < 64KB + 5KB
ASSERT_GE(metadata[0].size, k64KB - k5KB); // > 64KB - 5KB
// Change base level size to 1MB
ASSERT_TRUE(dbfull()->SetOptions({ {"max_bytes_for_level_base", "1048576"} }));
// writing 56 x 128KB => 7MB
// (L1 + L2) = (1 + 4) * 1MB = 5MB
for (int i = 0; i < 56; ++i) {
gen_l0_kb(i, 128, 56);
}
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(SizeAtLevel(1) < 1048576 * 1.1);
ASSERT_TRUE(SizeAtLevel(2) < 4 * 1048576 * 1.1);
// Change multiplier to 2 with smaller base
ASSERT_TRUE(dbfull()->SetOptions({
{"max_bytes_for_level_multiplier", "2"},
{"max_bytes_for_level_base", "262144"}
}));
// writing 16 x 128KB
// (L1 + L2 + L3) = (1 + 2 + 4) * 256KB
for (int i = 0; i < 16; ++i) {
gen_l0_kb(i, 128, 50);
}
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(SizeAtLevel(1) < 262144 * 1.1);
ASSERT_TRUE(SizeAtLevel(2) < 2 * 262144 * 1.1);
ASSERT_TRUE(SizeAtLevel(3) < 4 * 262144 * 1.1);
}
} // namespace rocksdb

@ -148,7 +148,6 @@ class DeleteFileTest {
TEST(DeleteFileTest, AddKeysAndQueryLevels) {
CreateTwoLevels();
std::vector<LiveFileMetaData> metadata;
std::vector<int> keysinlevel;
db_->GetLiveFilesMetaData(&metadata);
std::string level1file = "";

@ -125,7 +125,8 @@ ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
mutable_iter_(nullptr),
current_(nullptr),
valid_(false),
is_prev_set_(false) {}
is_prev_set_(false),
is_prev_inclusive_(false) {}
ForwardIterator::~ForwardIterator() {
Cleanup();
@ -314,11 +315,12 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
}
}
if (seek_to_first || immutable_min_heap_.empty()) {
if (seek_to_first) {
is_prev_set_ = false;
} else {
prev_key_.SetKey(internal_key);
is_prev_set_ = true;
is_prev_inclusive_ = true;
}
} else if (current_ && current_ != mutable_iter_) {
// current_ is one of immutable iterators, push it back to the heap
@ -343,8 +345,20 @@ void ForwardIterator::Next() {
}
} else if (current_ != mutable_iter_) {
// It is going to advance immutable iterator
prev_key_.SetKey(current_->key());
is_prev_set_ = true;
bool update_prev_key = true;
if (is_prev_set_ && prefix_extractor_) {
// advance prev_key_ to current_ only if they share the same prefix
update_prev_key =
prefix_extractor_->Transform(prev_key_.GetKey()).compare(
prefix_extractor_->Transform(current_->key())) == 0;
}
if (update_prev_key) {
prev_key_.SetKey(current_->key());
is_prev_set_ = true;
is_prev_inclusive_ = false;
}
}
current_->Next();
@ -476,7 +490,14 @@ void ForwardIterator::UpdateCurrent() {
}
bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
if (!valid_ || !is_prev_set_) {
// We maintain the interval (prev_key_, immutable_min_heap_.top()->key())
// such that there are no records with keys within that range in
// immutable_min_heap_. Since immutable structures (SST files and immutable
// memtables) can't change in this version, we don't need to do a seek if
// 'target' belongs to that interval (immutable_min_heap_.top() is already
// at the correct position).
if (!valid_ || !current_ || !is_prev_set_) {
return true;
}
Slice prev_key = prev_key_.GetKey();
@ -485,13 +506,17 @@ bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
return true;
}
if (cfd_->internal_comparator().InternalKeyComparator::Compare(
prev_key, target) >= 0) {
prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
return true;
}
if (immutable_min_heap_.empty() ||
cfd_->internal_comparator().InternalKeyComparator::Compare(
target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
: current_->key()) > 0) {
if (immutable_min_heap_.empty() && current_ == mutable_iter_) {
// Nothing to seek on.
return false;
}
if (cfd_->internal_comparator().InternalKeyComparator::Compare(
target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
: current_->key()) > 0) {
return true;
}
return false;

@ -101,6 +101,7 @@ class ForwardIterator : public Iterator {
IterKey prev_key_;
bool is_prev_set_;
bool is_prev_inclusive_;
Arena arena_;
};

@ -60,7 +60,8 @@ void BM_LogAndApply(int iters, int num_base_files) {
InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion);
vbase.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1);
}
ASSERT_OK(vset->LogAndApply(default_cfd, &vbase, &mu));
ASSERT_OK(vset->LogAndApply(default_cfd,
*default_cfd->GetLatestMutableCFOptions(), &vbase, &mu));
}
for (int i = 0; i < iters; i++) {
@ -69,7 +70,8 @@ void BM_LogAndApply(int iters, int num_base_files) {
InternalKey start(MakeKey(2 * fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion);
vedit.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1);
vset->LogAndApply(default_cfd, &vedit, &mu);
vset->LogAndApply(default_cfd, *default_cfd->GetLatestMutableCFOptions(),
&vedit, &mu);
}
delete vset;
}

@ -413,7 +413,6 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->found_final_value) = true;
return false;
}
std::string merge_result; // temporary area for merge results later
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->merge_in_progress) = true;
merge_context->PushOperand(v);

@ -160,7 +160,8 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(
ColumnFamilyData* cfd, const autovector<MemTable*>& mems, VersionSet* vset,
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems, VersionSet* vset,
port::Mutex* mu, Logger* info_log, uint64_t file_number,
FileNumToPathIdMap* pending_outputs, autovector<MemTable*>* to_delete,
Directory* db_directory, LogBuffer* log_buffer) {
@ -197,7 +198,7 @@ Status MemTableList::InstallMemtableFlushResults(
cfd->GetName().c_str(), (unsigned long)m->file_number_);
// this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, &m->edit_, mu, db_directory);
s = vset->LogAndApply(cfd, mutable_cf_options, &m->edit_, mu, db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable

@ -113,7 +113,8 @@ class MemTableList {
// Commit a successful flush in the manifest file
Status InstallMemtableFlushResults(
ColumnFamilyData* cfd, const autovector<MemTable*>& m, VersionSet* vset,
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& m, VersionSet* vset,
port::Mutex* mu, Logger* info_log, uint64_t file_number,
FileNumToPathIdMap* pending_outputs, autovector<MemTable*>* to_delete,
Directory* db_directory, LogBuffer* log_buffer);

@ -220,7 +220,7 @@ class Repairer {
Slice record;
WriteBatch batch;
MemTable* mem = new MemTable(icmp_, ioptions_,
MemTableOptions(MutableCFOptions(options_), options_));
MemTableOptions(MutableCFOptions(options_, ioptions_), options_));
auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem, &options_);
mem->Ref();
int counter = 0;

@ -672,7 +672,7 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset,
}
}
void Version::Get(const ReadOptions& options,
void Version::Get(const ReadOptions& read_options,
const LookupKey& k,
std::string* value,
Status* status,
@ -691,8 +691,8 @@ void Version::Get(const ReadOptions& options,
&file_indexer_, user_comparator_, internal_comparator_);
FdWithKeyRange* f = fp.GetNextFile();
while (f != nullptr) {
*status = table_cache_->Get(options, *internal_comparator_, f->fd, ikey,
&get_context);
*status = table_cache_->Get(read_options, *internal_comparator_, f->fd,
ikey, &get_context);
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
return;
@ -746,9 +746,10 @@ void Version::GenerateFileLevels() {
}
}
void Version::PrepareApply(std::vector<uint64_t>& size_being_compacted) {
void Version::PrepareApply(const MutableCFOptions& mutable_cf_options,
std::vector<uint64_t>& size_being_compacted) {
UpdateTemporaryStats();
ComputeCompactionScore(size_being_compacted);
ComputeCompactionScore(mutable_cf_options, size_being_compacted);
UpdateFilesBySize();
UpdateNumNonEmptyLevels();
file_indexer_.UpdateIndex(&arena_, num_non_empty_levels_, files_);
@ -817,13 +818,13 @@ void Version::UpdateTemporaryStats() {
}
void Version::ComputeCompactionScore(
const MutableCFOptions& mutable_cf_options,
std::vector<uint64_t>& size_being_compacted) {
double max_score = 0;
int max_score_level = 0;
int max_input_level =
cfd_->compaction_picker()->MaxInputLevel(NumberLevels());
for (int level = 0; level <= max_input_level; level++) {
double score;
if (level == 0) {
@ -849,21 +850,22 @@ void Version::ComputeCompactionScore(
if (cfd_->ioptions()->compaction_style == kCompactionStyleFIFO) {
score = static_cast<double>(total_size) /
cfd_->options()->compaction_options_fifo.max_table_files_size;
} else if (numfiles >= cfd_->options()->level0_stop_writes_trigger) {
} else if (numfiles >= mutable_cf_options.level0_stop_writes_trigger) {
// If we are slowing down writes, then we better compact that first
score = 1000000;
} else if (numfiles >= cfd_->options()->level0_slowdown_writes_trigger) {
} else if (numfiles >=
mutable_cf_options.level0_slowdown_writes_trigger) {
score = 10000;
} else {
score = static_cast<double>(numfiles) /
cfd_->options()->level0_file_num_compaction_trigger;
mutable_cf_options.level0_file_num_compaction_trigger;
}
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes =
TotalCompensatedFileSize(files_[level]) - size_being_compacted[level];
score = static_cast<double>(level_bytes) /
cfd_->compaction_picker()->MaxBytesForLevel(level);
mutable_cf_options.MaxBytesForLevel(level);
if (max_score < score) {
max_score = score;
max_score_level = level;
@ -993,6 +995,7 @@ bool Version::OverlapInLevel(int level,
}
int Version::PickLevelForMemTableOutput(
const MutableCFOptions& mutable_cf_options,
const Slice& smallest_user_key,
const Slice& largest_user_key) {
int level = 0;
@ -1013,7 +1016,7 @@ int Version::PickLevelForMemTableOutput(
}
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const uint64_t sum = TotalFileSize(overlaps);
if (sum > cfd_->compaction_picker()->MaxGrandParentOverlapBytes(level)) {
if (sum > mutable_cf_options.MaxGrandParentOverlapBytes(level)) {
break;
}
level++;
@ -1216,7 +1219,7 @@ bool Version::HasOverlappingUserKey(
// Check the last file in inputs against the file after it
size_t last_file = FindFile(cfd_->internal_comparator(), file_level,
inputs->back()->largest.Encode());
assert(0 <= last_file && last_file < kNumFiles); // File should exist!
assert(last_file < kNumFiles); // File should exist!
if (last_file < kNumFiles-1) { // If not the last file
const Slice last_key_in_input = ExtractUserKey(
files[last_file].largest_key);
@ -1231,7 +1234,7 @@ bool Version::HasOverlappingUserKey(
// Check the first file in inputs against the file just before it
size_t first_file = FindFile(cfd_->internal_comparator(), file_level,
inputs->front()->smallest.Encode());
assert(0 <= first_file && first_file <= last_file); // File should exist!
assert(first_file <= last_file); // File should exist!
if (first_file > 0) { // If not first file
const Slice& first_key_in_input = ExtractUserKey(
files[first_file].smallest_key);
@ -1246,7 +1249,7 @@ bool Version::HasOverlappingUserKey(
return false;
}
int64_t Version::NumLevelBytes(int level) const {
uint64_t Version::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < NumberLevels());
return TotalFileSize(files_[level]);
@ -1653,16 +1656,17 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
}
Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
const MutableCFOptions& mutable_cf_options,
VersionEdit* edit, port::Mutex* mu,
Directory* db_directory, bool new_descriptor_log,
const ColumnFamilyOptions* options) {
const ColumnFamilyOptions* new_cf_options) {
mu->AssertHeld();
// column_family_data can be nullptr only if this is column_family_add.
// in that case, we also need to specify ColumnFamilyOptions
if (column_family_data == nullptr) {
assert(edit->is_column_family_add_);
assert(options != nullptr);
assert(new_cf_options != nullptr);
}
// queue our request
@ -1777,7 +1781,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (!edit->IsColumnFamilyManipulation()) {
// This is cpu-heavy operations, which should be called outside mutex.
v->PrepareApply(size_being_compacted);
v->PrepareApply(mutable_cf_options, size_being_compacted);
}
// Write new record to MANIFEST log
@ -1853,8 +1857,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (edit->is_column_family_add_) {
// no group commit on column family add
assert(batch_edits.size() == 1);
assert(options != nullptr);
CreateColumnFamily(*options, edit);
assert(new_cf_options != nullptr);
CreateColumnFamily(*new_cf_options, edit);
} else if (edit->is_column_family_drop_) {
assert(batch_edits.size() == 1);
column_family_data->SetDropped();
@ -2169,7 +2173,7 @@ Status VersionSet::Recover(
// there were some column families in the MANIFEST that weren't specified
// in the argument. This is OK in read_only mode
if (read_only == false && column_families_not_found.size() > 0) {
if (read_only == false && !column_families_not_found.empty()) {
std::string list_of_not_found;
for (const auto& cf : column_families_not_found) {
list_of_not_found += ", " + cf.second;
@ -2198,7 +2202,7 @@ Status VersionSet::Recover(
// Install recovered version
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted);
v->PrepareApply(size_being_compacted);
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted);
AppendVersion(cfd, v);
}
@ -2374,11 +2378,13 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
current_version->files_ = new_files_list;
current_version->num_levels_ = new_levels;
MutableCFOptions mutable_cf_options(*options, ImmutableCFOptions(*options));
VersionEdit ve;
port::Mutex dummy_mutex;
MutexLock l(&dummy_mutex);
return versions.LogAndApply(versions.GetColumnFamilySet()->GetDefault(), &ve,
&dummy_mutex, nullptr, true);
return versions.LogAndApply(
versions.GetColumnFamilySet()->GetDefault(),
mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
}
Status VersionSet::DumpManifest(Options& options, std::string& dscname,
@ -2530,7 +2536,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
builder->SaveTo(v);
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted);
v->PrepareApply(size_being_compacted);
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted);
delete builder;
printf("--------------- Column family \"%s\" (ID %u) --------------\n",

@ -103,14 +103,18 @@ class Version {
// We use compaction scores to figure out which compaction to do next
// REQUIRES: If Version is not yet saved to current_, it can be called without
// a lock. Once a version is saved to current_, call only with mutex held
void ComputeCompactionScore(std::vector<uint64_t>& size_being_compacted);
void ComputeCompactionScore(
const MutableCFOptions& mutable_cf_options,
std::vector<uint64_t>& size_being_compacted);
// Generate file_levels_ from files_
void GenerateFileLevels();
// Update scores, pre-calculated variables. It needs to be called before
// applying the version to the version set.
void PrepareApply(std::vector<uint64_t>& size_being_compacted);
void PrepareApply(
const MutableCFOptions& mutable_cf_options,
std::vector<uint64_t>& size_being_compacted);
// Reference count management (so Versions do not disappear out from
// under live iterators)
@ -169,7 +173,8 @@ class Version {
// Return the level at which we should place a new memtable compaction
// result that covers the range [smallest_user_key,largest_user_key].
int PickLevelForMemTableOutput(const Slice& smallest_user_key,
int PickLevelForMemTableOutput(const MutableCFOptions& mutable_cf_options,
const Slice& smallest_user_key,
const Slice& largest_user_key);
int NumberLevels() const { return num_levels_; }
@ -178,7 +183,7 @@ class Version {
int NumLevelFiles(int level) const { return files_[level].size(); }
// Return the combined file size of all files at the specified level.
int64_t NumLevelBytes(int level) const;
uint64_t NumLevelBytes(int level) const;
// Return a human-readable short (single-line) summary of the number
// of files per level. Uses *scratch as backing store.
@ -369,7 +374,9 @@ class VersionSet {
// column_family_options has to be set if edit is column family add
// REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply()
Status LogAndApply(ColumnFamilyData* column_family_data, VersionEdit* edit,
Status LogAndApply(ColumnFamilyData* column_family_data,
const MutableCFOptions& mutable_cf_options,
VersionEdit* edit,
port::Mutex* mu, Directory* db_directory = nullptr,
bool new_descriptor_log = false,
const ColumnFamilyOptions* column_family_options =

@ -27,8 +27,9 @@ static std::string PrintContents(WriteBatch* b) {
auto factory = std::make_shared<SkipListFactory>();
Options options;
options.memtable_factory = factory;
MemTable* mem = new MemTable(cmp, ImmutableCFOptions(options),
MemTableOptions(MutableCFOptions(options), options));
ImmutableCFOptions ioptions(options);
MemTable* mem = new MemTable(cmp, ioptions,
MemTableOptions(MutableCFOptions(options, ioptions), options));
mem->Ref();
std::string state;
ColumnFamilyMemTablesDefault cf_mems_default(mem, &options);

@ -22,6 +22,7 @@ struct ImmutableCFOptions {
CompactionStyle compaction_style;
CompactionOptionsUniversal compaction_options_universal;
CompactionOptionsFIFO compaction_options_fifo;
const SliceTransform* prefix_extractor;
@ -79,6 +80,8 @@ struct ImmutableCFOptions {
CompressionOptions compression_opts;
Options::AccessHint access_hint_on_compaction_start;
int num_levels;
};
} // namespace rocksdb

@ -4,9 +4,8 @@
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
// Also update Makefile if you change these
#define ROCKSDB_MAJOR 3
#define ROCKSDB_MINOR 5
#define ROCKSDB_MINOR 6
#define ROCKSDB_PATCH 0
// Do not use these. We made the mistake of declaring macros starting with

@ -38,6 +38,7 @@ test: java
javac org/rocksdb/test/*.java
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.WriteBatchTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.BackupableDBTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.FilterTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.OptionsTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOptionsTest
java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.StatisticsCollectorTest

@ -80,9 +80,10 @@ public class RocksDBSample {
10000, 10));
options.setRateLimiterConfig(new GenericRateLimiterConfig(10000000));
Filter bloomFilter = new BloomFilter(10);
BlockBasedTableConfig table_options = new BlockBasedTableConfig();
table_options.setBlockCacheSize(64 * SizeUnit.KB)
.setFilterBitsPerKey(10)
.setFilter(bloomFilter)
.setCacheNumShardBits(6)
.setBlockSizeDeviation(5)
.setBlockRestartInterval(10)

@ -18,7 +18,7 @@ public class BlockBasedTableConfig extends TableFormatConfig {
blockSizeDeviation_ = 10;
blockRestartInterval_ = 16;
wholeKeyFiltering_ = true;
bitsPerKey_ = 10;
filter_ = null;
cacheIndexAndFilterBlocks_ = false;
hashIndexAllowCollision_ = true;
blockCacheCompressedSize_ = 0;
@ -182,30 +182,30 @@ public class BlockBasedTableConfig extends TableFormatConfig {
*
* Filter instance can be re-used in multiple options instances.
*
* @param Filter policy java instance.
* @param Filter Filter Policy java instance.
* @return the reference to the current config.
*/
public BlockBasedTableConfig setFilterBitsPerKey(int bitsPerKey) {
bitsPerKey_ = bitsPerKey;
public BlockBasedTableConfig setFilter(Filter filter) {
filter_ = filter;
return this;
}
/**
* Indicating if we'd put index/filter blocks to the block cache.
If not specified, each "table reader" object will pre-load index/filter
block during table initialization.
*
*
* @return if index and filter blocks should be put in block cache.
*/
public boolean cacheIndexAndFilterBlocks() {
return cacheIndexAndFilterBlocks_;
}
/**
* Indicating if we'd put index/filter blocks to the block cache.
If not specified, each "table reader" object will pre-load index/filter
block during table initialization.
*
*
* @param index and filter blocks should be put in block cache.
* @return the reference to the current config.
*/
@ -214,25 +214,25 @@ public class BlockBasedTableConfig extends TableFormatConfig {
cacheIndexAndFilterBlocks_ = cacheIndexAndFilterBlocks;
return this;
}
/**
* Influence the behavior when kHashSearch is used.
if false, stores a precise prefix to block range mapping
if true, does not store prefix and allows prefix hash collision
(less memory consumption)
*
*
* @return if hash collisions should be allowed.
*/
public boolean hashIndexAllowCollision() {
return hashIndexAllowCollision_;
}
/**
* Influence the behavior when kHashSearch is used.
if false, stores a precise prefix to block range mapping
if true, does not store prefix and allows prefix hash collision
(less memory consumption)
*
*
* @param if hash collisions should be allowed.
* @return the reference to the current config.
*/
@ -241,21 +241,21 @@ public class BlockBasedTableConfig extends TableFormatConfig {
hashIndexAllowCollision_ = hashIndexAllowCollision;
return this;
}
/**
* Size of compressed block cache. If 0, then block_cache_compressed is set
* to null.
*
*
* @return size of compressed block cache.
*/
public long blockCacheCompressedSize() {
return blockCacheCompressedSize_;
}
/**
* Size of compressed block cache. If 0, then block_cache_compressed is set
* to null.
*
*
* @param size of compressed block cache.
* @return the reference to the current config.
*/
@ -264,7 +264,7 @@ public class BlockBasedTableConfig extends TableFormatConfig {
blockCacheCompressedSize_ = blockCacheCompressedSize;
return this;
}
/**
* Controls the number of shards for the block compressed cache.
* This is applied only if blockCompressedCacheSize is set to non-negative.
@ -276,7 +276,7 @@ public class BlockBasedTableConfig extends TableFormatConfig {
public int blockCacheCompressedNumShardBits() {
return blockCacheCompressedNumShardBits_;
}
/**
* Controls the number of shards for the block compressed cache.
* This is applied only if blockCompressedCacheSize is set to non-negative.
@ -293,17 +293,23 @@ public class BlockBasedTableConfig extends TableFormatConfig {
}
@Override protected long newTableFactoryHandle() {
long filterHandle = 0;
if (filter_ != null) {
filterHandle = filter_.nativeHandle_;
}
return newTableFactoryHandle(noBlockCache_, blockCacheSize_,
blockCacheNumShardBits_, blockSize_, blockSizeDeviation_,
blockRestartInterval_, wholeKeyFiltering_, bitsPerKey_,
cacheIndexAndFilterBlocks_, hashIndexAllowCollision_,
blockCacheCompressedSize_, blockCacheCompressedNumShardBits_);
blockRestartInterval_, wholeKeyFiltering_,
filterHandle, cacheIndexAndFilterBlocks_,
hashIndexAllowCollision_, blockCacheCompressedSize_,
blockCacheCompressedNumShardBits_);
}
private native long newTableFactoryHandle(
boolean noBlockCache, long blockCacheSize, int blockCacheNumShardBits,
long blockSize, int blockSizeDeviation, int blockRestartInterval,
boolean wholeKeyFiltering, int bitsPerKey,
boolean wholeKeyFiltering, long filterPolicyHandle,
boolean cacheIndexAndFilterBlocks, boolean hashIndexAllowCollision,
long blockCacheCompressedSize, int blockCacheCompressedNumShardBits);
@ -315,7 +321,7 @@ public class BlockBasedTableConfig extends TableFormatConfig {
private int blockSizeDeviation_;
private int blockRestartInterval_;
private boolean wholeKeyFiltering_;
private int bitsPerKey_;
private Filter filter_;
private boolean cacheIndexAndFilterBlocks_;
private boolean hashIndexAllowCollision_;
private long blockCacheCompressedSize_;

@ -0,0 +1,27 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
package org.rocksdb.test;
import org.rocksdb.*;
public class FilterTest {
static {
RocksDB.loadLibrary();
}
public static void main(String[] args) {
Options options = new Options();
// test table config without filter
BlockBasedTableConfig blockConfig = new BlockBasedTableConfig();
options.setTableFormatConfig(blockConfig);
options.dispose();
// new Bloom filter
options = new Options();
blockConfig = new BlockBasedTableConfig();
blockConfig.setFilter(new BloomFilter());
options.setTableFormatConfig(blockConfig);
System.out.println("Filter test passed");
}
}

@ -37,7 +37,7 @@ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle(
JNIEnv* env, jobject jobj, jboolean no_block_cache, jlong block_cache_size,
jint block_cache_num_shardbits, jlong block_size, jint block_size_deviation,
jint block_restart_interval, jboolean whole_key_filtering,
jint bits_per_key, jboolean cache_index_and_filter_blocks,
jlong jfilterPolicy, jboolean cache_index_and_filter_blocks,
jboolean hash_index_allow_collision, jlong block_cache_compressed_size,
jint block_cache_compressd_num_shard_bits) {
rocksdb::BlockBasedTableOptions options;
@ -55,8 +55,9 @@ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle(
options.block_size_deviation = block_size_deviation;
options.block_restart_interval = block_restart_interval;
options.whole_key_filtering = whole_key_filtering;
if (bits_per_key > 0) {
options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bits_per_key));
if (jfilterPolicy > 0) {
options.filter_policy.reset(
reinterpret_cast<rocksdb::FilterPolicy*>(jfilterPolicy));
}
options.cache_index_and_filter_blocks = cache_index_and_filter_blocks;
options.hash_index_allow_collision = hash_index_allow_collision;
@ -69,6 +70,6 @@ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle(
options.block_cache = rocksdb::NewLRUCache(block_cache_compressed_size);
}
}
return reinterpret_cast<jlong>(rocksdb::NewBlockBasedTableFactory(options));
}

@ -206,7 +206,8 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents(
options.memtable_factory = factory;
rocksdb::MemTable* mem = new rocksdb::MemTable(
cmp, rocksdb::ImmutableCFOptions(options),
rocksdb::MemTableOptions(rocksdb::MutableCFOptions(options), options));
rocksdb::MemTableOptions(rocksdb::MutableCFOptions(options,
rocksdb::ImmutableCFOptions(options)), options));
mem->Ref();
std::string state;
rocksdb::ColumnFamilyMemTablesDefault cf_mems_default(mem, &options);

@ -721,7 +721,6 @@ Status BlockBasedTableBuilder::Finish() {
// Write properties block.
{
PropertyBlockBuilder property_block_builder;
std::vector<std::string> failed_user_prop_collectors;
r->props.filter_policy_name = r->table_options.filter_policy != nullptr ?
r->table_options.filter_policy->Name() : "";
r->props.index_size =

@ -11,7 +11,7 @@
namespace rocksdb {
void BloomBlockBuilder::AddKeysHashes(const std::vector<uint32_t> keys_hashes) {
void BloomBlockBuilder::AddKeysHashes(const std::vector<uint32_t>& keys_hashes) {
for (auto hash : keys_hashes) {
bloom_.AddHash(hash);
}

@ -26,7 +26,7 @@ class BloomBlockBuilder {
uint32_t GetNumBlocks() const { return bloom_.GetNumBlocks(); }
void AddKeysHashes(const std::vector<uint32_t> keys_hashes);
void AddKeysHashes(const std::vector<uint32_t>& keys_hashes);
Slice Finish();

@ -191,9 +191,9 @@ class CuckooTableIterator : public Iterator {
private:
struct BucketComparator {
BucketComparator(const Slice file_data, const Comparator* ucomp,
BucketComparator(const Slice& file_data, const Comparator* ucomp,
uint32_t bucket_len, uint32_t user_key_len,
const Slice target = Slice())
const Slice& target = Slice())
: file_data_(file_data),
ucomp_(ucomp),
bucket_len_(bucket_len),

@ -334,9 +334,9 @@ Status UncompressBlockContents(const char* data, size_t n,
case kZlibCompression:
ubuf = std::unique_ptr<char[]>(
port::Zlib_Uncompress(data, n, &decompress_size));
static char zlib_corrupt_msg[] =
"Zlib not supported or corrupted Zlib compressed block contents";
if (!ubuf) {
static char zlib_corrupt_msg[] =
"Zlib not supported or corrupted Zlib compressed block contents";
return Status::Corruption(zlib_corrupt_msg);
}
*contents =
@ -345,9 +345,9 @@ Status UncompressBlockContents(const char* data, size_t n,
case kBZip2Compression:
ubuf = std::unique_ptr<char[]>(
port::BZip2_Uncompress(data, n, &decompress_size));
static char bzip2_corrupt_msg[] =
"Bzip2 not supported or corrupted Bzip2 compressed block contents";
if (!ubuf) {
static char bzip2_corrupt_msg[] =
"Bzip2 not supported or corrupted Bzip2 compressed block contents";
return Status::Corruption(bzip2_corrupt_msg);
}
*contents =
@ -356,9 +356,9 @@ Status UncompressBlockContents(const char* data, size_t n,
case kLZ4Compression:
ubuf = std::unique_ptr<char[]>(
port::LZ4_Uncompress(data, n, &decompress_size));
static char lz4_corrupt_msg[] =
"LZ4 not supported or corrupted LZ4 compressed block contents";
if (!ubuf) {
static char lz4_corrupt_msg[] =
"LZ4 not supported or corrupted LZ4 compressed block contents";
return Status::Corruption(lz4_corrupt_msg);
}
*contents =
@ -367,9 +367,9 @@ Status UncompressBlockContents(const char* data, size_t n,
case kLZ4HCCompression:
ubuf = std::unique_ptr<char[]>(
port::LZ4_Uncompress(data, n, &decompress_size));
static char lz4hc_corrupt_msg[] =
"LZ4HC not supported or corrupted LZ4HC compressed block contents";
if (!ubuf) {
static char lz4hc_corrupt_msg[] =
"LZ4HC not supported or corrupted LZ4HC compressed block contents";
return Status::Corruption(lz4hc_corrupt_msg);
}
*contents =

@ -52,10 +52,10 @@ std::string PlainTableFactory::GetPrintableTableOptions() const {
snprintf(buffer, kBufferSize, " hash_table_ratio: %lf\n",
hash_table_ratio_);
ret.append(buffer);
snprintf(buffer, kBufferSize, " index_sparseness: %zd\n",
snprintf(buffer, kBufferSize, " index_sparseness: %zu\n",
index_sparseness_);
ret.append(buffer);
snprintf(buffer, kBufferSize, " huge_page_tlb_size: %zd\n",
snprintf(buffer, kBufferSize, " huge_page_tlb_size: %zu\n",
huge_page_tlb_size_);
ret.append(buffer);
snprintf(buffer, kBufferSize, " encoding_type: %d\n",

@ -437,8 +437,9 @@ class MemTableConstructor: public Constructor {
table_factory_(new SkipListFactory) {
Options options;
options.memtable_factory = table_factory_;
memtable_ = new MemTable(internal_comparator_, ImmutableCFOptions(options),
MemTableOptions(MutableCFOptions(options), options));
ImmutableCFOptions ioptions(options);
memtable_ = new MemTable(internal_comparator_, ioptions,
MemTableOptions(MutableCFOptions(options, ioptions), options));
memtable_->Ref();
}
~MemTableConstructor() {
@ -452,8 +453,9 @@ class MemTableConstructor: public Constructor {
delete memtable_->Unref();
Options options;
options.memtable_factory = table_factory_;
memtable_ = new MemTable(internal_comparator_, ImmutableCFOptions(options),
MemTableOptions(MutableCFOptions(options), options));
ImmutableCFOptions mem_ioptions(options);
memtable_ = new MemTable(internal_comparator_, mem_ioptions,
MemTableOptions(MutableCFOptions(options, mem_ioptions), options));
memtable_->Ref();
int seq = 1;
for (KVMap::const_iterator it = data.begin();
@ -1216,7 +1218,7 @@ static std::string RandomString(Random* rnd, int len) {
return r;
}
void AddInternalKey(TableConstructor* c, const std::string prefix,
void AddInternalKey(TableConstructor* c, const std::string& prefix,
int suffix_len = 800) {
static Random rnd(1023);
InternalKey k(prefix + RandomString(&rnd, 800), 0, kTypeValue);
@ -1864,8 +1866,9 @@ TEST(MemTableTest, Simple) {
auto table_factory = std::make_shared<SkipListFactory>();
Options options;
options.memtable_factory = table_factory;
MemTable* memtable = new MemTable(cmp, ImmutableCFOptions(options),
MemTableOptions(MutableCFOptions(options), options));
ImmutableCFOptions ioptions(options);
MemTable* memtable = new MemTable(cmp, ioptions,
MemTableOptions(MutableCFOptions(options, ioptions), options));
memtable->Ref();
WriteBatch batch;
WriteBatchInternal::SetSequence(&batch, 100);

@ -386,7 +386,7 @@ class Value {
namespace {
void deleter(const Slice& key, void* value) {
delete (Value *)value;
delete static_cast<Value *>(value);
}
} // namespace

@ -325,7 +325,7 @@ bool LDBCommand::ParseKeyValue(const string& line, string* key, string* value,
bool LDBCommand::ValidateCmdLineOptions() {
for (map<string, string>::const_iterator itr = option_map_.begin();
itr != option_map_.end(); itr++) {
itr != option_map_.end(); ++itr) {
if (find(valid_cmd_line_options_.begin(),
valid_cmd_line_options_.end(), itr->first) ==
valid_cmd_line_options_.end()) {
@ -335,7 +335,7 @@ bool LDBCommand::ValidateCmdLineOptions() {
}
for (vector<string>::const_iterator itr = flags_.begin();
itr != flags_.end(); itr++) {
itr != flags_.end(); ++itr) {
if (find(valid_cmd_line_options_.begin(),
valid_cmd_line_options_.end(), *itr) ==
valid_cmd_line_options_.end()) {
@ -1538,7 +1538,7 @@ void BatchPutCommand::DoCommand() {
WriteBatch batch;
for (vector<pair<string, string>>::const_iterator itr
= key_values_.begin(); itr != key_values_.end(); itr++) {
= key_values_.begin(); itr != key_values_.end(); ++itr) {
batch.Put(itr->first, itr->second);
}
Status st = db_->Write(WriteOptions(), &batch);

@ -13,15 +13,10 @@ public:
EXEC_NOT_STARTED = 0, EXEC_SUCCEED = 1, EXEC_FAILED = 2,
};
LDBCommandExecuteResult() {
state_ = EXEC_NOT_STARTED;
message_ = "";
}
LDBCommandExecuteResult() : state_(EXEC_NOT_STARTED), message_("") {}
LDBCommandExecuteResult(State state, std::string& msg) {
state_ = state;
message_ = msg;
}
LDBCommandExecuteResult(State state, std::string& msg) :
state_(state), message_(msg) {}
std::string ToString() {
std::string ret;

@ -0,0 +1,72 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <limits>
#include <cassert>
#include "rocksdb/options.h"
#include "rocksdb/immutable_options.h"
#include "util/mutable_cf_options.h"
namespace rocksdb {
namespace {
// Multiple two operands. If they overflow, return op1.
uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) {
if (op1 == 0) {
return 0;
}
if (op2 <= 0) {
return op1;
}
uint64_t casted_op2 = (uint64_t) op2;
if (std::numeric_limits<uint64_t>::max() / op1 < casted_op2) {
return op1;
}
return op1 * casted_op2;
}
} // anonymous namespace
void MutableCFOptions::RefreshDerivedOptions(
const ImmutableCFOptions& ioptions) {
max_file_size.resize(ioptions.num_levels);
level_max_bytes.resize(ioptions.num_levels);
for (int i = 0; i < ioptions.num_levels; ++i) {
if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
max_file_size[i] = ULLONG_MAX;
level_max_bytes[i] = max_bytes_for_level_base;
} else if (i > 1) {
max_file_size[i] = MultiplyCheckOverflow(max_file_size[i - 1],
target_file_size_multiplier);
level_max_bytes[i] = MultiplyCheckOverflow(
MultiplyCheckOverflow(level_max_bytes[i - 1],
max_bytes_for_level_multiplier),
max_bytes_for_level_multiplier_additional[i - 1]);
} else {
max_file_size[i] = target_file_size_base;
level_max_bytes[i] = max_bytes_for_level_base;
}
}
}
uint64_t MutableCFOptions::MaxFileSizeForLevel(int level) const {
assert(level >= 0);
assert(level < (int)max_file_size.size());
return max_file_size[level];
}
uint64_t MutableCFOptions::MaxBytesForLevel(int level) const {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
assert(level >= 0);
assert(level < (int)level_max_bytes.size());
return level_max_bytes[level];
}
uint64_t MutableCFOptions::MaxGrandParentOverlapBytes(int level) const {
return MaxFileSizeForLevel(level) * max_grandparent_overlap_factor;
}
uint64_t MutableCFOptions::ExpandedCompactionByteSizeLimit(int level) const {
return MaxFileSizeForLevel(level) * expanded_compaction_factor;
}
} // namespace rocksdb

@ -5,12 +5,14 @@
#pragma once
#include <vector>
#include "rocksdb/options.h"
#include "rocksdb/immutable_options.h"
namespace rocksdb {
struct MutableCFOptions {
explicit MutableCFOptions(const Options& options)
MutableCFOptions(const Options& options, const ImmutableCFOptions& ioptions)
: write_buffer_size(options.write_buffer_size),
arena_block_size(options.arena_block_size),
memtable_prefix_bloom_bits(options.memtable_prefix_bloom_bits),
@ -18,7 +20,22 @@ struct MutableCFOptions {
memtable_prefix_bloom_huge_page_tlb_size(
options.memtable_prefix_bloom_huge_page_tlb_size),
max_successive_merges(options.max_successive_merges),
filter_deletes(options.filter_deletes) {
filter_deletes(options.filter_deletes),
level0_file_num_compaction_trigger(
options.level0_file_num_compaction_trigger),
level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger),
level0_stop_writes_trigger(options.level0_stop_writes_trigger),
max_grandparent_overlap_factor(options.max_grandparent_overlap_factor),
expanded_compaction_factor(options.expanded_compaction_factor),
source_compaction_factor(options.source_compaction_factor),
target_file_size_base(options.target_file_size_base),
target_file_size_multiplier(options.target_file_size_multiplier),
max_bytes_for_level_base(options.max_bytes_for_level_base),
max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier),
max_bytes_for_level_multiplier_additional(
options.max_bytes_for_level_multiplier_additional)
{
RefreshDerivedOptions(ioptions);
}
MutableCFOptions()
: write_buffer_size(0),
@ -27,8 +44,33 @@ struct MutableCFOptions {
memtable_prefix_bloom_probes(0),
memtable_prefix_bloom_huge_page_tlb_size(0),
max_successive_merges(0),
filter_deletes(false) {}
filter_deletes(false),
level0_file_num_compaction_trigger(0),
level0_slowdown_writes_trigger(0),
level0_stop_writes_trigger(0),
max_grandparent_overlap_factor(0),
expanded_compaction_factor(0),
source_compaction_factor(0),
target_file_size_base(0),
target_file_size_multiplier(0),
max_bytes_for_level_base(0),
max_bytes_for_level_multiplier(0)
{}
// Must be called after any change to MutableCFOptions
void RefreshDerivedOptions(const ImmutableCFOptions& ioptions);
// Get the max file size in a given level.
uint64_t MaxFileSizeForLevel(int level) const;
// Returns maximum total bytes of data on a given level.
uint64_t MaxBytesForLevel(int level) const;
// Returns maximum total overlap bytes with grandparent
// level (i.e., level+2) before we stop building a single
// file in level->level+1 compaction.
uint64_t MaxGrandParentOverlapBytes(int level) const;
uint64_t ExpandedCompactionByteSizeLimit(int level) const;
// Memtable related options
size_t write_buffer_size;
size_t arena_block_size;
uint32_t memtable_prefix_bloom_bits;
@ -36,6 +78,25 @@ struct MutableCFOptions {
size_t memtable_prefix_bloom_huge_page_tlb_size;
size_t max_successive_merges;
bool filter_deletes;
// Compaction related options
int level0_file_num_compaction_trigger;
int level0_slowdown_writes_trigger;
int level0_stop_writes_trigger;
int max_grandparent_overlap_factor;
int expanded_compaction_factor;
int source_compaction_factor;
int target_file_size_base;
int target_file_size_multiplier;
uint64_t max_bytes_for_level_base;
int max_bytes_for_level_multiplier;
std::vector<int> max_bytes_for_level_multiplier_additional;
// Derived options
// Per-level target file size.
std::vector<uint64_t> max_file_size;
// Per-level max bytes
std::vector<uint64_t> level_max_bytes;
};
} // namespace rocksdb

@ -35,6 +35,7 @@ namespace rocksdb {
ImmutableCFOptions::ImmutableCFOptions(const Options& options)
: compaction_style(options.compaction_style),
compaction_options_universal(options.compaction_options_universal),
compaction_options_fifo(options.compaction_options_fifo),
prefix_extractor(options.prefix_extractor.get()),
comparator(options.comparator),
merge_operator(options.merge_operator.get()),
@ -60,7 +61,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options)
compression(options.compression),
compression_per_level(options.compression_per_level),
compression_opts(options.compression_opts),
access_hint_on_compaction_start(options.access_hint_on_compaction_start) {}
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
num_levels(options.num_levels) {}
ColumnFamilyOptions::ColumnFamilyOptions()
: comparator(BytewiseComparator()),

@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory.
#include <cassert>
#include <unordered_set>
#include "rocksdb/options.h"
#include "util/options_helper.h"
@ -73,8 +74,8 @@ CompactionStyle ParseCompactionStyle(const std::string& type) {
} // anonymouse namespace
template<typename OptionsType>
bool ParseMemtableOption(const std::string& name, const std::string& value,
OptionsType* new_options) {
bool ParseMemtableOptions(const std::string& name, const std::string& value,
OptionsType* new_options) {
if (name == "write_buffer_size") {
new_options->write_buffer_size = ParseInt64(value);
} else if (name == "arena_block_size") {
@ -96,6 +97,50 @@ bool ParseMemtableOption(const std::string& name, const std::string& value,
return true;
}
template<typename OptionsType>
bool ParseCompactionOptions(const std::string& name, const std::string& value,
OptionsType* new_options) {
if (name == "level0_file_num_compaction_trigger") {
new_options->level0_file_num_compaction_trigger = ParseInt(value);
} else if (name == "level0_slowdown_writes_trigger") {
new_options->level0_slowdown_writes_trigger = ParseInt(value);
} else if (name == "level0_stop_writes_trigger") {
new_options->level0_stop_writes_trigger = ParseInt(value);
} else if (name == "max_grandparent_overlap_factor") {
new_options->max_grandparent_overlap_factor = ParseInt(value);
} else if (name == "expanded_compaction_factor") {
new_options->expanded_compaction_factor = ParseInt(value);
} else if (name == "source_compaction_factor") {
new_options->source_compaction_factor = ParseInt(value);
} else if (name == "target_file_size_base") {
new_options->target_file_size_base = ParseInt(value);
} else if (name == "target_file_size_multiplier") {
new_options->target_file_size_multiplier = ParseInt(value);
} else if (name == "max_bytes_for_level_base") {
new_options->max_bytes_for_level_base = ParseUint64(value);
} else if (name == "max_bytes_for_level_multiplier") {
new_options->max_bytes_for_level_multiplier = ParseInt(value);
} else if (name == "max_bytes_for_level_multiplier_additional") {
new_options->max_bytes_for_level_multiplier_additional.clear();
size_t start = 0;
while (true) {
size_t end = value.find_first_of(':', start);
if (end == std::string::npos) {
new_options->max_bytes_for_level_multiplier_additional.push_back(
ParseInt(value.substr(start)));
break;
} else {
new_options->max_bytes_for_level_multiplier_additional.push_back(
ParseInt(value.substr(start, end - start)));
start = end + 1;
}
}
} else {
return false;
}
return true;
}
bool GetMutableOptionsFromStrings(
const MutableCFOptions& base_options,
const std::unordered_map<std::string, std::string>& options_map,
@ -104,7 +149,8 @@ bool GetMutableOptionsFromStrings(
*new_options = base_options;
try {
for (const auto& o : options_map) {
if (ParseMemtableOption(o.first, o.second, new_options)) {
if (ParseMemtableOptions(o.first, o.second, new_options)) {
} else if (ParseCompactionOptions(o.first, o.second, new_options)) {
} else {
return false;
}
@ -123,7 +169,8 @@ bool GetOptionsFromStrings(
*new_options = base_options;
for (const auto& o : options_map) {
try {
if (ParseMemtableOption(o.first, o.second, new_options)) {
if (ParseMemtableOptions(o.first, o.second, new_options)) {
} else if (ParseCompactionOptions(o.first, o.second, new_options)) {
} else if (o.first == "max_write_buffer_number") {
new_options->max_write_buffer_number = ParseInt(o.second);
} else if (o.first == "min_write_buffer_number_to_merge") {
@ -168,43 +215,8 @@ bool GetOptionsFromStrings(
ParseInt(o.second.substr(start, o.second.size() - start));
} else if (o.first == "num_levels") {
new_options->num_levels = ParseInt(o.second);
} else if (o.first == "level0_file_num_compaction_trigger") {
new_options->level0_file_num_compaction_trigger = ParseInt(o.second);
} else if (o.first == "level0_slowdown_writes_trigger") {
new_options->level0_slowdown_writes_trigger = ParseInt(o.second);
} else if (o.first == "level0_stop_writes_trigger") {
new_options->level0_stop_writes_trigger = ParseInt(o.second);
} else if (o.first == "max_mem_compaction_level") {
new_options->max_mem_compaction_level = ParseInt(o.second);
} else if (o.first == "target_file_size_base") {
new_options->target_file_size_base = ParseUint64(o.second);
} else if (o.first == "target_file_size_multiplier") {
new_options->target_file_size_multiplier = ParseInt(o.second);
} else if (o.first == "max_bytes_for_level_base") {
new_options->max_bytes_for_level_base = ParseUint64(o.second);
} else if (o.first == "max_bytes_for_level_multiplier") {
new_options->max_bytes_for_level_multiplier = ParseInt(o.second);
} else if (o.first == "max_bytes_for_level_multiplier_additional") {
new_options->max_bytes_for_level_multiplier_additional.clear();
size_t start = 0;
while (true) {
size_t end = o.second.find_first_of(':', start);
if (end == std::string::npos) {
new_options->max_bytes_for_level_multiplier_additional.push_back(
ParseInt(o.second.substr(start)));
break;
} else {
new_options->max_bytes_for_level_multiplier_additional.push_back(
ParseInt(o.second.substr(start, end - start)));
start = end + 1;
}
}
} else if (o.first == "expanded_compaction_factor") {
new_options->expanded_compaction_factor = ParseInt(o.second);
} else if (o.first == "source_compaction_factor") {
new_options->source_compaction_factor = ParseInt(o.second);
} else if (o.first == "max_grandparent_overlap_factor") {
new_options->max_grandparent_overlap_factor = ParseInt(o.second);
} else if (o.first == "soft_rate_limit") {
new_options->soft_rate_limit = ParseDouble(o.second);
} else if (o.first == "hard_rate_limit") {

@ -9,6 +9,7 @@
namespace {
void f0() {
char *p = nullptr;
// cppcheck-suppress nullPointer
*p = 10; /* SIGSEGV here!! */
}

@ -228,7 +228,7 @@ class FileManager : public EnvWrapper {
public:
explicit FileManager(Env* t) : EnvWrapper(t), rnd_(5) {}
Status DeleteRandomFileInDir(const std::string dir) {
Status DeleteRandomFileInDir(const std::string& dir) {
std::vector<std::string> children;
GetChildren(dir, &children);
if (children.size() <= 2) { // . and ..

@ -33,7 +33,7 @@ namespace {
// > 0 <=> lhs == rhs
// TODO(icanadi) move this to JSONDocument?
int DocumentCompare(const JSONDocument& lhs, const JSONDocument& rhs) {
assert(rhs.IsObject() == false && rhs.IsObject() == false &&
assert(lhs.IsObject() == false && rhs.IsObject() == false &&
lhs.type() == rhs.type());
switch (lhs.type()) {
@ -376,7 +376,7 @@ class IndexKey {
class SimpleSortedIndex : public Index {
public:
SimpleSortedIndex(const std::string field, const std::string& name)
SimpleSortedIndex(const std::string& field, const std::string& name)
: field_(field), name_(name) {}
virtual const char* Name() const override { return name_.c_str(); }
@ -407,7 +407,6 @@ class SimpleSortedIndex : public Index {
assert(interval != nullptr); // because index is useful
Direction direction;
std::string op;
const JSONDocument* limit;
if (interval->lower_bound != nullptr) {
limit = interval->lower_bound;

@ -56,7 +56,7 @@ class DocumentDBTest {
}
}
JSONDocument* Parse(const std::string doc) {
JSONDocument* Parse(const std::string& doc) {
return JSONDocument::ParseJSON(ConvertQuotes(doc).c_str());
}

@ -369,7 +369,7 @@ class SpatialIndexCursor : public Cursor {
}
delete spatial_iterator;
valid_ = valid_ && primary_key_ids_.size() > 0;
valid_ = valid_ && !primary_key_ids_.empty();
if (valid_) {
primary_keys_iterator_ = primary_key_ids_.begin();

@ -206,7 +206,7 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
class TtlMergeOperator : public MergeOperator {
public:
explicit TtlMergeOperator(const std::shared_ptr<MergeOperator> merge_op,
explicit TtlMergeOperator(const std::shared_ptr<MergeOperator>& merge_op,
Env* env)
: user_merge_op_(merge_op), env_(env) {
assert(merge_op);

@ -120,7 +120,7 @@ class TtlTest {
static FlushOptions flush_opts;
WriteBatch batch;
kv_it_ = kvmap_.begin();
for (int i = 0; i < num_ops && kv_it_ != kvmap_.end(); i++, kv_it_++) {
for (int i = 0; i < num_ops && kv_it_ != kvmap_.end(); i++, ++kv_it_) {
switch (batch_ops[i]) {
case PUT:
batch.Put(kv_it_->first, kv_it_->second);
@ -145,7 +145,7 @@ class TtlTest {
static FlushOptions flush_opts;
kv_it_ = kvmap_.begin();
advance(kv_it_, start_pos_map);
for (int i = 0; kv_it_ != kvmap_.end() && i < num_entries; i++, kv_it_++) {
for (int i = 0; kv_it_ != kvmap_.end() && i < num_entries; i++, ++kv_it_) {
ASSERT_OK(cf == nullptr
? db_ttl_->Put(wopts, kv_it_->first, kv_it_->second)
: db_ttl_->Put(wopts, cf, kv_it_->first, kv_it_->second));
@ -207,7 +207,7 @@ class TtlTest {
kv_it_ = kvmap_.begin();
advance(kv_it_, st_pos);
std::string v;
for (int i = 0; kv_it_ != kvmap_.end() && i < span; i++, kv_it_++) {
for (int i = 0; kv_it_ != kvmap_.end() && i < span; i++, ++kv_it_) {
Status s = (cf == nullptr) ? db_ttl_->Get(ropts, kv_it_->first, &v)
: db_ttl_->Get(ropts, cf, kv_it_->first, &v);
if (s.ok() != check) {
@ -252,7 +252,7 @@ class TtlTest {
} else { // dbiter should have found out kvmap_[st_pos]
for (int i = st_pos;
kv_it_ != kvmap_.end() && i < st_pos + span;
i++, kv_it_++) {
i++, ++kv_it_) {
ASSERT_TRUE(dbiter->Valid());
ASSERT_EQ(dbiter->value().compare(kv_it_->second), 0);
dbiter->Next();
@ -263,7 +263,7 @@ class TtlTest {
class TestFilter : public CompactionFilter {
public:
TestFilter(const int64_t kSampleSize, const std::string kNewValue)
TestFilter(const int64_t kSampleSize, const std::string& kNewValue)
: kSampleSize_(kSampleSize),
kNewValue_(kNewValue) {
}
@ -311,7 +311,7 @@ class TtlTest {
class TestFilterFactory : public CompactionFilterFactory {
public:
TestFilterFactory(const int64_t kSampleSize, const std::string kNewValue)
TestFilterFactory(const int64_t kSampleSize, const std::string& kNewValue)
: kSampleSize_(kSampleSize),
kNewValue_(kNewValue) {
}

Loading…
Cancel
Save