Ankit Gupta 11 years ago
commit abf9920386
  1. 16
      HISTORY.md
  2. 2
      README.md
  3. 2
      build_tools/mac-install-gflags.sh
  4. 38
      db/column_family.cc
  5. 40
      db/column_family.h
  6. 122
      db/compaction_picker.cc
  7. 58
      db/db_bench.cc
  8. 71
      db/db_impl.cc
  9. 113
      db/db_test.cc
  10. 8
      db/memtable.cc
  11. 45
      db/repair.cc
  12. 15
      db/version_edit.h
  13. 123
      db/version_set.cc
  14. 38
      db/version_set.h
  15. 21
      include/rocksdb/ldb_tool.h
  16. 7
      include/rocksdb/memtablerep.h
  17. 52
      java/org/rocksdb/benchmark/DbBenchmark.java
  18. 2
      third-party/rapidjson/reader.h
  19. 3
      tools/reduce_levels_test.cc
  20. 2
      util/hash_cuckoo_rep.cc
  21. 10
      util/hash_linklist_rep.cc
  22. 10
      util/hash_skiplist_rep.cc
  23. 15
      util/ldb_cmd.cc
  24. 16
      util/ldb_cmd.h
  25. 22
      util/ldb_tool.cc
  26. 4
      util/options.cc
  27. 3
      util/skiplistrep.cc
  28. 3
      util/vectorrep.cc

@ -1,6 +1,9 @@
# Rocksdb Change Log
## Unreleased (will be released with 3.2.0)
## Unreleased
## 3.2.0 (06/20/2014)
### Public API changes
* We removed seek compaction as a concept from RocksDB because:
@ -8,17 +11,24 @@
2) It added some complexity to the important code-paths,
3) None of our internal customers were really using it.
Because of that, Options::disable_seek_compaction is now obsolete. It is still a parameter in Options, so it does not break the build, but it does not have any effect. We plan to completely remove it at some point, so we ask users to please remove this option from your code base.
* Add two paramters to NewHashLinkListRepFactory() for logging on too many entries in a hash bucket when flushing.
* Added new option BlockBasedTableOptions::hash_index_allow_collision. When enabled, prefix hash index for block-based table will not store prefix and allow hash collision, reducing memory consumption.
### New Features
* PlainTable now supports a new key encoding: for keys of the same prefix, the prefix is only written once. It can be enabled through encoding_type paramter of NewPlainTableFactory()
* Add AdaptiveTableFactory, which is used to convert from a DB of PlainTable to BlockBasedTabe, or vise versa. It can be created using NewAdaptiveTableFactory()
### Performance Improvements
* Tailing Iterator re-implemeted with ForwardIterator + Cascading Search Hint , see ~20% throughput improvement.
## 3.1.0 (05/21/2014)
### Public API changes
* Replaced ColumnFamilyOptions::table_properties_collectors with ColumnFamilyOptions::table_properties_collector_factories
* Add two paramters to NewHashLinkListRepFactory() for logging on too many entries in a hash bucket when flushing.
### New Features
* Hash index for block-based table will be materialized and reconstructed more efficiently. Previously hash index is constructed by scanning the whole table during every table open.
* FIFO compaction style
* Add AdaptiveTableFactory, which is used to convert from a DB of PlainTable to BlockBasedTabe, or vise versa. It can be created using NewAdaptiveTableFactory()
## 3.0.0 (05/05/2014)

@ -8,7 +8,7 @@ and Jeff Dean (jeff@google.com)
This code is a library that forms the core building block for a fast
key value server, especially suited for storing data on flash drives.
It has an Log-Structured-Merge-Database (LSM) design with flexible tradeoffs
It has a Log-Structured-Merge-Database (LSM) design with flexible tradeoffs
between Write-Amplification-Factor (WAF), Read-Amplification-Factor (RAF)
and Space-Amplification-Factor (SAF). It has multi-threaded compactions,
making it specially suitable for storing multiple terabytes of data in a

@ -22,4 +22,4 @@ echo ""
echo "-----------------------------------------------------------------------------"
echo "| Installation Completed |"
echo "-----------------------------------------------------------------------------"
echo "Please run `. ~/bash_profile` to be able to compile with gflags"
echo "Please run \`. ~/.bash_profile\` to be able to compile with gflags"

@ -243,6 +243,8 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
const ColumnFamilyOptions* cf_options = &options_;
cf_options->Dump(options_.info_log.get());
}
RecalculateWriteStallConditions();
}
// DB mutex held
@ -295,6 +297,35 @@ ColumnFamilyData::~ColumnFamilyData() {
}
}
void ColumnFamilyData::RecalculateWriteStallConditions() {
need_wait_for_num_memtables_ =
(imm()->size() == options()->max_write_buffer_number - 1);
if (current_ != nullptr) {
need_wait_for_num_level0_files_ =
(current_->NumLevelFiles(0) >= options()->level0_stop_writes_trigger);
} else {
need_wait_for_num_level0_files_ = false;
}
RecalculateWriteStallRateLimitsConditions();
}
void ColumnFamilyData::RecalculateWriteStallRateLimitsConditions() {
if (current_ != nullptr) {
exceeds_hard_rate_limit_ =
(options()->hard_rate_limit > 1.0 &&
current_->MaxCompactionScore() > options()->hard_rate_limit);
exceeds_soft_rate_limit_ =
(options()->soft_rate_limit > 0.0 &&
current_->MaxCompactionScore() > options()->soft_rate_limit);
} else {
exceeds_hard_rate_limit_ = false;
exceeds_soft_rate_limit_ = false;
}
}
const EnvOptions* ColumnFamilyData::soptions() const {
return &(column_family_set_->storage_options_);
}
@ -316,7 +347,9 @@ void ColumnFamilyData::CreateNewMemtable() {
}
Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) {
return compaction_picker_->PickCompaction(current_, log_buffer);
auto result = compaction_picker_->PickCompaction(current_, log_buffer);
RecalculateWriteStallRateLimitsConditions();
return result;
}
Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level,
@ -420,6 +453,9 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion(
if (column_family_set_->db_options_->allow_thread_local) {
ResetThreadLocalSuperVersions();
}
RecalculateWriteStallConditions();
if (old_superversion != nullptr && old_superversion->Unref()) {
old_superversion->Cleanup();
return old_superversion; // will let caller delete outside of mutex

@ -229,6 +229,22 @@ class ColumnFamilyData {
return need_slowdown_for_num_level0_files_;
}
bool NeedWaitForNumLevel0Files() const {
return need_wait_for_num_level0_files_;
}
bool NeedWaitForNumMemtables() const {
return need_wait_for_num_memtables_;
}
bool ExceedsSoftRateLimit() const {
return exceeds_soft_rate_limit_;
}
bool ExceedsHardRateLimit() const {
return exceeds_hard_rate_limit_;
}
private:
friend class ColumnFamilySet;
ColumnFamilyData(const std::string& dbname, uint32_t id,
@ -238,6 +254,14 @@ class ColumnFamilyData {
const EnvOptions& storage_options,
ColumnFamilySet* column_family_set);
// Recalculate some small conditions, which are changed only during
// compaction, adding new memtable and/or
// recalculation of compaction score. These values are used in
// DBImpl::MakeRoomForWrite function to decide, if it need to make
// a write stall
void RecalculateWriteStallConditions();
void RecalculateWriteStallRateLimitsConditions();
uint32_t id_;
const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions.
@ -282,6 +306,22 @@ class ColumnFamilyData {
// we have too many level 0 files
bool need_slowdown_for_num_level0_files_;
// These 4 variables are updated only after compaction,
// adding new memtable, flushing memtables to files
// and/or add recalculation of compaction score.
// That's why theirs values are cached in ColumnFamilyData.
// Recalculation is made by RecalculateWriteStallConditions and
// RecalculateWriteStallRateLimitsConditions function. They are used
// in DBImpl::MakeRoomForWrite function to decide, if it need
// to sleep during write operation
bool need_wait_for_num_memtables_;
bool need_wait_for_num_level0_files_;
bool exceeds_hard_rate_limit_;
bool exceeds_soft_rate_limit_;
// An object that keeps all the compaction stats
// and picks the next compaction
std::unique_ptr<CompactionPicker> compaction_picker_;

@ -19,10 +19,10 @@ namespace rocksdb {
namespace {
uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
uint64_t sum = 0;
for (size_t i = 0; i < files.size() && files[i]; i++) {
sum += files[i]->fd.GetFileSize();
sum += files[i]->compensated_file_size;
}
return sum;
}
@ -80,7 +80,7 @@ void CompactionPicker::SizeBeingCompacted(std::vector<uint64_t>& sizes) {
for (auto c : compactions_in_progress_[level]) {
assert(c->level() == level);
for (int i = 0; i < c->num_input_files(0); i++) {
total += c->input(0, i)->fd.GetFileSize();
total += c->input(0, i)->compensated_file_size;
}
}
sizes[level] = total;
@ -261,9 +261,9 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) {
std::vector<FileMetaData*> expanded0;
c->input_version_->GetOverlappingInputs(
level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr);
const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const uint64_t expanded0_size = TotalFileSize(expanded0);
const uint64_t inputs0_size = TotalCompensatedFileSize(c->inputs_[0]);
const uint64_t inputs1_size = TotalCompensatedFileSize(c->inputs_[1]);
const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0);
uint64_t limit = ExpandedCompactionByteSizeLimit(level);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size < limit &&
@ -278,14 +278,12 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) {
if (expanded1.size() == c->inputs_[1].size() &&
!FilesInCompaction(expanded1)) {
Log(options_->info_log,
"[%s] Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu "
"bytes)\n",
c->column_family_data()->GetName().c_str(), (unsigned long)level,
(unsigned long)(c->inputs_[0].size()),
(unsigned long)(c->inputs_[1].size()), (unsigned long)inputs0_size,
(unsigned long)inputs1_size, (unsigned long)(expanded0.size()),
(unsigned long)(expanded1.size()), (unsigned long)expanded0_size,
(unsigned long)inputs1_size);
"[%s] Expanding@%d %zu+%zu (%" PRIu64 "+%" PRIu64
" bytes) to %zu+%zu (%" PRIu64 "+%" PRIu64 "bytes)\n",
c->column_family_data()->GetName().c_str(), level,
c->inputs_[0].size(), c->inputs_[1].size(), inputs0_size,
inputs1_size, expanded0.size(), expanded1.size(), expanded0_size,
inputs1_size);
smallest = new_start;
largest = new_limit;
c->inputs_[0] = expanded0;
@ -335,7 +333,7 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
MaxFileSizeForLevel(input_level) * options_->source_compaction_factor;
uint64_t total = 0;
for (size_t i = 0; i + 1 < inputs.size(); ++i) {
uint64_t s = inputs[i]->fd.GetFileSize();
uint64_t s = inputs[i]->compensated_file_size;
total += s;
if (total >= limit) {
**compaction_end = inputs[i + 1]->smallest;
@ -483,11 +481,11 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
FileMetaData* f = c->input_version_->files_[level][index];
// check to verify files are arranged in descending size
assert(
(i == file_size.size() - 1) ||
assert((i == file_size.size() - 1) ||
(i >= Version::number_of_files_to_sort_ - 1) ||
(f->fd.GetFileSize() >=
c->input_version_->files_[level][file_size[i + 1]]->fd.GetFileSize()));
(f->compensated_file_size >=
c->input_version_->files_[level][file_size[i + 1]]->
compensated_file_size));
// do not pick a file to compact if it is being compacted
// from n-1 level.
@ -656,21 +654,19 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
candidate_count = 1;
break;
}
LogToBuffer(log_buffer,
"[%s] Universal: file %lu[%d] being compacted, skipping",
version->cfd_->GetName().c_str(),
(unsigned long)f->fd.GetNumber(), loop);
LogToBuffer(log_buffer, "[%s] Universal: file %" PRIu64
"[%d] being compacted, skipping",
version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop);
f = nullptr;
}
// This file is not being compacted. Consider it as the
// first candidate to be compacted.
uint64_t candidate_size = f != nullptr ? f->fd.GetFileSize() : 0;
uint64_t candidate_size = f != nullptr? f->compensated_file_size : 0;
if (f != nullptr) {
LogToBuffer(log_buffer,
"[%s] Universal: Possible candidate file %lu[%d].",
version->cfd_->GetName().c_str(),
(unsigned long)f->fd.GetNumber(), loop);
"[%s] Universal: Possible candidate file %" PRIu64 "[%d].",
version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop);
}
// Check if the suceeding files need compaction.
@ -703,9 +699,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// by the last-resort read amp strategy which disregards size ratios.
break;
}
candidate_size = f->fd.GetFileSize();
candidate_size = f->compensated_file_size;
} else { // default kCompactionStopStyleTotalSize
candidate_size += f->fd.GetFileSize();
candidate_size += f->compensated_file_size;
}
candidate_count++;
}
@ -721,10 +717,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
int index = file_by_time[i];
FileMetaData* f = version->files_[level][index];
LogToBuffer(log_buffer,
"[%s] Universal: Skipping file %lu[%d] with size %lu %d\n",
version->cfd_->GetName().c_str(),
(unsigned long)f->fd.GetNumber(), i,
(unsigned long)f->fd.GetFileSize(), f->being_compacted);
"[%s] Universal: Skipping file %" PRIu64 "[%d] "
"with size %" PRIu64 " (compensated size %" PRIu64 ") %d\n",
version->cfd_->GetName().c_str(), f->fd.GetNumber(),
i, f->fd.GetFileSize(), f->compensated_file_size, f->being_compacted);
}
}
}
@ -759,10 +755,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
int index = file_by_time[i];
FileMetaData* f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f);
LogToBuffer(
log_buffer, "[%s] Universal: Picking file %lu[%d] with size %lu\n",
version->cfd_->GetName().c_str(), (unsigned long)f->fd.GetNumber(), i,
(unsigned long)f->fd.GetFileSize());
LogToBuffer(log_buffer,
"[%s] Universal: Picking file %" PRIu64 "[%d] "
"with size %" PRIu64 " (compensated size %" PRIu64 ")\n",
version->cfd_->GetName().c_str(),
f->fd.GetNumber(), i,
f->fd.GetFileSize(), f->compensated_file_size);
}
return c;
}
@ -798,19 +796,19 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
start_index = loop; // Consider this as the first candidate.
break;
}
LogToBuffer(
log_buffer, "[%s] Universal: skipping file %lu[%d] compacted %s",
version->cfd_->GetName().c_str(), (unsigned long)f->fd.GetNumber(),
loop, " cannot be a candidate to reduce size amp.\n");
LogToBuffer(log_buffer,
"[%s] Universal: skipping file %" PRIu64 "[%d] compacted %s",
version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop,
" cannot be a candidate to reduce size amp.\n");
f = nullptr;
}
if (f == nullptr) {
return nullptr; // no candidate files
}
LogToBuffer(log_buffer, "[%s] Universal: First candidate file %lu[%d] %s",
version->cfd_->GetName().c_str(),
(unsigned long)f->fd.GetNumber(), start_index,
LogToBuffer(log_buffer,
"[%s] Universal: First candidate file %" PRIu64 "[%d] %s",
version->cfd_->GetName().c_str(), f->fd.GetNumber(), start_index,
" to reduce size amp.\n");
// keep adding up all the remaining files
@ -820,13 +818,13 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
f = version->files_[level][index];
if (f->being_compacted) {
LogToBuffer(
log_buffer, "[%s] Universal: Possible candidate file %lu[%d] %s.",
version->cfd_->GetName().c_str(), (unsigned long)f->fd.GetNumber(),
loop,
log_buffer,
"[%s] Universal: Possible candidate file %" PRIu64 "[%d] %s.",
version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop,
" is already being compacted. No size amp reduction possible.\n");
return nullptr;
}
candidate_size += f->fd.GetFileSize();
candidate_size += f->compensated_file_size;
candidate_count++;
}
if (candidate_count == 0) {
@ -841,17 +839,16 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
if (candidate_size * 100 < ratio * earliest_file_size) {
LogToBuffer(
log_buffer,
"[%s] Universal: size amp not needed. newer-files-total-size %lu "
"earliest-file-size %lu",
version->cfd_->GetName().c_str(), (unsigned long)candidate_size,
(unsigned long)earliest_file_size);
"[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64
"earliest-file-size %" PRIu64,
version->cfd_->GetName().c_str(), candidate_size, earliest_file_size);
return nullptr;
} else {
LogToBuffer(log_buffer,
"[%s] Universal: size amp needed. newer-files-total-size %lu "
"earliest-file-size %lu",
version->cfd_->GetName().c_str(), (unsigned long)candidate_size,
(unsigned long)earliest_file_size);
LogToBuffer(
log_buffer,
"[%s] Universal: size amp needed. newer-files-total-size %" PRIu64
"earliest-file-size %" PRIu64,
version->cfd_->GetName().c_str(), candidate_size, earliest_file_size);
}
assert(start_index >= 0 && start_index < file_by_time.size() - 1);
@ -866,10 +863,11 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f);
LogToBuffer(log_buffer,
"[%s] Universal: size amp picking file %lu[%d] with size %lu",
"[%s] Universal: size amp picking file %" PRIu64 "[%d] "
"with size %" PRIu64 " (compensated size %" PRIu64 ")",
version->cfd_->GetName().c_str(),
(unsigned long)f->fd.GetNumber(), index,
(unsigned long)f->fd.GetFileSize());
f->fd.GetNumber(), index,
f->fd.GetFileSize(), f->compensated_file_size);
}
return c;
}
@ -879,7 +877,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version,
assert(version->NumberLevels() == 1);
uint64_t total_size = 0;
for (const auto& file : version->files_[0]) {
total_size += file->fd.GetFileSize();
total_size += file->compensated_file_size;
}
if (total_size <= options_->compaction_options_fifo.max_table_files_size ||
@ -907,7 +905,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version,
for (auto ritr = version->files_[0].rbegin();
ritr != version->files_[0].rend(); ++ritr) {
auto f = *ritr;
total_size -= f->fd.GetFileSize();
total_size -= f->compensated_file_size;
c->inputs_[0].push_back(f);
char tmp_fsize[16];
AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));

@ -584,9 +584,9 @@ class RandomGenerator {
}
Slice Generate(unsigned int len) {
assert(len <= data_.size());
if (pos_ + len > data_.size()) {
pos_ = 0;
assert(len < data_.size());
}
pos_ += len;
return Slice(data_.data() + pos_ - len, len);
@ -664,7 +664,7 @@ class Stats {
void SetId(int id) { id_ = id; }
void SetExcludeFromMerge() { exclude_from_merge_ = true; }
void FinishedSingleOp(DB* db) {
void FinishedOps(DB* db, int64_t num_ops) {
if (FLAGS_histogram) {
double now = FLAGS_env->NowMicros();
double micros = now - last_op_finish_;
@ -676,7 +676,7 @@ class Stats {
last_op_finish_ = now;
}
done_++;
done_ += num_ops;
if (done_ >= next_report_) {
if (!FLAGS_stats_interval) {
if (next_report_ < 1000) next_report_ += 100;
@ -722,7 +722,7 @@ class Stats {
void Report(const Slice& name) {
// Pretend at least one op was done in case we are running a benchmark
// that does not call FinishedSingleOp().
// that does not call FinishedOps().
if (done_ < 1) done_ = 1;
std::string extra;
@ -837,6 +837,15 @@ class Benchmark {
int64_t writes_;
int64_t readwrites_;
int64_t merge_keys_;
bool SanityCheck() {
if (FLAGS_compression_ratio > 1) {
fprintf(stderr, "compression_ratio should be between 0 and 1\n");
return false;
}
return true;
}
void PrintHeader() {
PrintEnvironment();
fprintf(stdout, "Keys: %d bytes each\n", FLAGS_key_size);
@ -1116,6 +1125,9 @@ class Benchmark {
}
void Run() {
if (!SanityCheck()) {
exit(1);
}
PrintHeader();
Open();
const char* benchmarks = FLAGS_benchmarks.c_str();
@ -1379,7 +1391,7 @@ class Benchmark {
uint32_t crc = 0;
while (bytes < 500 * 1048576) {
crc = crc32c::Value(data.data(), size);
thread->stats.FinishedSingleOp(nullptr);
thread->stats.FinishedOps(nullptr, 1);
bytes += size;
}
// Print so result is not dead
@ -1398,7 +1410,7 @@ class Benchmark {
unsigned int xxh32 = 0;
while (bytes < 500 * 1048576) {
xxh32 = XXH32(data.data(), size, 0);
thread->stats.FinishedSingleOp(nullptr);
thread->stats.FinishedOps(nullptr, 1);
bytes += size;
}
// Print so result is not dead
@ -1419,7 +1431,7 @@ class Benchmark {
ptr = ap.Acquire_Load();
}
count++;
thread->stats.FinishedSingleOp(nullptr);
thread->stats.FinishedOps(nullptr, 1);
}
if (ptr == nullptr) exit(1); // Disable unused variable warning.
}
@ -1460,7 +1472,7 @@ class Benchmark {
}
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedSingleOp(nullptr);
thread->stats.FinishedOps(nullptr, 1);
}
if (!ok) {
@ -1541,7 +1553,7 @@ class Benchmark {
}
delete[] uncompressed;
bytes += input.size();
thread->stats.FinishedSingleOp(nullptr);
thread->stats.FinishedOps(nullptr, 1);
}
if (!ok) {
@ -1850,9 +1862,9 @@ class Benchmark {
GenerateKeyFromInt(key_gens[id]->Next(), FLAGS_num, &key);
batch.Put(key, gen.Generate(value_size_));
bytes += value_size_ + key_size_;
thread->stats.FinishedSingleOp(db_to_write);
}
s = db_to_write->Write(write_options_, &batch);
thread->stats.FinishedOps(db_to_write, entries_per_batch_);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
@ -1877,7 +1889,7 @@ class Benchmark {
int64_t bytes = 0;
for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
bytes += iter->key().size() + iter->value().size();
thread->stats.FinishedSingleOp(db);
thread->stats.FinishedOps(db, 1);
++i;
}
delete iter;
@ -1900,7 +1912,7 @@ class Benchmark {
int64_t bytes = 0;
for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
bytes += iter->key().size() + iter->value().size();
thread->stats.FinishedSingleOp(db_);
thread->stats.FinishedOps(db_, 1);
++i;
}
delete iter;
@ -1923,7 +1935,7 @@ class Benchmark {
if (db->Get(options, key, &value).ok()) {
found++;
}
thread->stats.FinishedSingleOp(db_);
thread->stats.FinishedOps(db_, 1);
}
char msg[100];
@ -1983,7 +1995,7 @@ class Benchmark {
DB* db = SelectDB(thread);
Iterator* iter = db->NewIterator(options);
delete iter;
thread->stats.FinishedSingleOp(db);
thread->stats.FinishedOps(db, 1);
}
}
@ -2047,7 +2059,7 @@ class Benchmark {
if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
found++;
}
thread->stats.FinishedSingleOp(db_);
thread->stats.FinishedOps(db_, 1);
}
delete single_iter;
for (auto iter : multi_iters) {
@ -2085,9 +2097,9 @@ class Benchmark {
const int64_t k = seq ? i + j : (thread->rand.Next() % FLAGS_num);
GenerateKeyFromInt(k, FLAGS_num, &key);
batch.Delete(key);
thread->stats.FinishedSingleOp(db);
}
auto s = db->Write(write_options_, &batch);
thread->stats.FinishedOps(db, entries_per_batch_);
if (!s.ok()) {
fprintf(stderr, "del error: %s\n", s.ToString().c_str());
exit(1);
@ -2147,7 +2159,7 @@ class Benchmark {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedSingleOp(db_);
thread->stats.FinishedOps(db_, 1);
++num_writes;
if (writes_per_second_by_10 && num_writes >= writes_per_second_by_10) {
@ -2307,7 +2319,7 @@ class Benchmark {
deletes_done++;
}
thread->stats.FinishedSingleOp(db_);
thread->stats.FinishedOps(db_, 1);
}
char msg[100];
snprintf(msg, sizeof(msg),
@ -2365,7 +2377,7 @@ class Benchmark {
put_weight--;
writes_done++;
}
thread->stats.FinishedSingleOp(db);
thread->stats.FinishedOps(db, 1);
}
char msg[100];
snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \
@ -2399,7 +2411,7 @@ class Benchmark {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedSingleOp(db);
thread->stats.FinishedOps(db, 1);
}
char msg[100];
snprintf(msg, sizeof(msg),
@ -2446,7 +2458,7 @@ class Benchmark {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedSingleOp(db_);
thread->stats.FinishedOps(db_, 1);
}
char msg[100];
@ -2482,7 +2494,7 @@ class Benchmark {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedSingleOp(db_);
thread->stats.FinishedOps(db_, 1);
}
// Print some statistics
@ -2543,7 +2555,7 @@ class Benchmark {
}
thread->stats.FinishedSingleOp(db_);
thread->stats.FinishedOps(db_, 1);
}
char msg[100];

@ -731,9 +731,8 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
fname.c_str(), archived_log_name.c_str(), s.ToString().c_str());
} else {
Status s = env_->DeleteFile(fname);
Log(options_.info_log, "Delete %s type=%d #%lu -- %s\n",
fname.c_str(), type, (unsigned long)number,
s.ToString().c_str());
Log(options_.info_log, "Delete %s type=%d #%" PRIu64 " -- %s\n",
fname.c_str(), type, number, s.ToString().c_str());
}
}
@ -1257,8 +1256,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
// large sequence numbers).
log::Reader reader(std::move(file), &reporter, true/*checksum*/,
0/*initial_offset*/);
Log(options_.info_log, "Recovering log #%lu",
(unsigned long) log_number);
Log(options_.info_log, "Recovering log #%" PRIu64 "", log_number);
// Read all the records and add to a memtable
std::string scratch;
@ -1375,8 +1373,8 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
const SequenceNumber earliest_seqno_in_memtable =
mem->GetFirstSequenceNumber();
Log(options_.info_log, "[%s] Level-0 table #%lu: started",
cfd->GetName().c_str(), (unsigned long)meta.fd.GetNumber());
Log(options_.info_log, "[%s] Level-0 table #%" PRIu64 ": started",
cfd->GetName().c_str(), meta.fd.GetNumber());
Status s;
{
@ -1389,9 +1387,10 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
mutex_.Lock();
}
Log(options_.info_log, "[%s] Level-0 table #%lu: %lu bytes %s",
cfd->GetName().c_str(), (unsigned long)meta.fd.GetNumber(),
(unsigned long)meta.fd.GetFileSize(), s.ToString().c_str());
Log(options_.info_log,
"[%s] Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(),
s.ToString().c_str());
delete iter;
pending_outputs_.erase(meta.fd.GetNumber());
@ -1436,14 +1435,15 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
log_buffer->FlushBufferToLog();
std::vector<Iterator*> memtables;
for (MemTable* m : mems) {
Log(options_.info_log, "[%s] Flushing memtable with next log file: %lu\n",
cfd->GetName().c_str(), (unsigned long)m->GetNextLogNumber());
Log(options_.info_log,
"[%s] Flushing memtable with next log file: %" PRIu64 "\n",
cfd->GetName().c_str(), m->GetNextLogNumber());
memtables.push_back(m->NewIterator(ReadOptions(), true));
}
Iterator* iter = NewMergingIterator(&cfd->internal_comparator(),
&memtables[0], memtables.size());
Log(options_.info_log, "[%s] Level-0 flush table #%lu: started",
cfd->GetName().c_str(), (unsigned long)meta.fd.GetNumber());
Log(options_.info_log, "[%s] Level-0 flush table #%" PRIu64 ": started",
cfd->GetName().c_str(), meta.fd.GetNumber());
s = BuildTable(dbname_, env_, *cfd->options(), storage_options_,
cfd->table_cache(), iter, &meta, cfd->internal_comparator(),
@ -1451,9 +1451,10 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
GetCompressionFlush(*cfd->options()));
LogFlush(options_.info_log);
delete iter;
Log(options_.info_log, "[%s] Level-0 flush table #%lu: %lu bytes %s",
cfd->GetName().c_str(), (unsigned long)meta.fd.GetFileSize(),
(unsigned long)meta.fd.GetFileSize(), s.ToString().c_str());
Log(options_.info_log,
"[%s] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s",
cfd->GetName().c_str(), meta.fd.GetFileSize(), meta.fd.GetFileSize(),
s.ToString().c_str());
if (!options_.disableDataSync) {
db_directory_->Fsync();
@ -2402,9 +2403,10 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
s = iter->status();
delete iter;
if (s.ok()) {
Log(options_.info_log, "[%s] Generated table #%lu: %lu keys, %lu bytes",
cfd->GetName().c_str(), (unsigned long)output_number,
(unsigned long)current_entries, (unsigned long)current_bytes);
Log(options_.info_log, "[%s] Generated table #%" PRIu64 ": %" PRIu64
" keys, %" PRIu64 " bytes",
cfd->GetName().c_str(), output_number, current_entries,
current_bytes);
}
}
return s;
@ -2469,9 +2471,8 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
assert(prev);
}
Log(options_.info_log,
"Looking for seqid %lu but maxseqid is %lu",
(unsigned long)in,
(unsigned long)snapshots[snapshots.size()-1]);
"Looking for seqid %" PRIu64 " but maxseqid is %" PRIu64 "", in,
snapshots[snapshots.size() - 1]);
assert(0);
return 0;
}
@ -4026,8 +4027,7 @@ Status DBImpl::MakeRoomForWrite(
DelayLoggingAndReset();
}
break;
} else if (cfd->imm()->size() ==
cfd->options()->max_write_buffer_number - 1) {
} else if (cfd->NeedWaitForNumMemtables()) {
// We have filled up the current memtable, but the previous
// ones are still being flushed, so we wait.
DelayLoggingAndReset();
@ -4048,9 +4048,7 @@ Status DBImpl::MakeRoomForWrite(
STALL_MEMTABLE_COMPACTION_MICROS, stall);
cfd->internal_stats()->RecordWriteStall(
InternalStats::MEMTABLE_COMPACTION, stall);
} else if (cfd->current()->NumLevelFiles(0) >=
cfd->options()->level0_stop_writes_trigger) {
// There are too many level-0 files.
} else if (cfd->NeedWaitForNumLevel0Files()) {
DelayLoggingAndReset();
Log(options_.info_log, "[%s] wait for fewer level0 files...\n",
cfd->GetName().c_str());
@ -4064,12 +4062,10 @@ Status DBImpl::MakeRoomForWrite(
RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall);
cfd->internal_stats()->RecordWriteStall(InternalStats::LEVEL0_NUM_FILES,
stall);
} else if (allow_hard_rate_limit_delay &&
cfd->options()->hard_rate_limit > 1.0 &&
(score = cfd->current()->MaxCompactionScore()) >
cfd->options()->hard_rate_limit) {
} else if (allow_hard_rate_limit_delay && cfd->ExceedsHardRateLimit()) {
// Delay a write when the compaction score for any level is too large.
int max_level = cfd->current()->MaxCompactionScoreLevel();
score = cfd->current()->MaxCompactionScore();
mutex_.Unlock();
uint64_t delayed;
{
@ -4090,10 +4086,8 @@ Status DBImpl::MakeRoomForWrite(
allow_hard_rate_limit_delay = false;
}
mutex_.Lock();
} else if (allow_soft_rate_limit_delay &&
cfd->options()->soft_rate_limit > 0.0 &&
(score = cfd->current()->MaxCompactionScore()) >
cfd->options()->soft_rate_limit) {
} else if (allow_soft_rate_limit_delay && cfd->ExceedsSoftRateLimit()) {
score = cfd->current()->MaxCompactionScore();
// Delay a write when the compaction score for any level is too large.
// TODO: add statistics
mutex_.Unlock();
@ -4176,8 +4170,9 @@ Status DBImpl::MakeRoomForWrite(
}
new_mem->Ref();
cfd->SetMemtable(new_mem);
Log(options_.info_log, "[%s] New memtable created with log file: #%lu\n",
cfd->GetName().c_str(), (unsigned long)logfile_number_);
Log(options_.info_log,
"[%s] New memtable created with log file: #%" PRIu64 "\n",
cfd->GetName().c_str(), logfile_number_);
force = false; // Do not force another compaction if have room
MaybeScheduleFlushOrCompaction();
superversions_to_free->push_back(

@ -2726,6 +2726,119 @@ TEST(DBTest, CompactionTrigger) {
ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1);
}
namespace {
static const int kCDTValueSize = 1000;
static const int kCDTKeysPerBuffer = 4;
static const int kCDTNumLevels = 8;
Options DeletionTriggerOptions() {
Options options;
options.compression = kNoCompression;
options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24);
options.min_write_buffer_number_to_merge = 1;
options.num_levels = kCDTNumLevels;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = 1;
options.target_file_size_base = options.write_buffer_size * 2;
options.target_file_size_multiplier = 2;
options.max_bytes_for_level_base =
options.target_file_size_base * options.target_file_size_multiplier;
options.max_bytes_for_level_multiplier = 2;
options.disable_auto_compactions = false;
return options;
}
} // anonymous namespace
TEST(DBTest, CompactionDeletionTrigger) {
Options options = DeletionTriggerOptions();
options.create_if_missing = true;
for (int tid = 0; tid < 2; ++tid) {
uint64_t db_size[2];
DestroyAndReopen(&options);
Random rnd(301);
const int kTestSize = kCDTKeysPerBuffer * 512;
std::vector<std::string> values;
for (int k = 0; k < kTestSize; ++k) {
values.push_back(RandomString(&rnd, kCDTValueSize));
ASSERT_OK(Put(Key(k), values[k]));
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
db_size[0] = Size(Key(0), Key(kTestSize - 1));
for (int k = 0; k < kTestSize; ++k) {
ASSERT_OK(Delete(Key(k)));
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
db_size[1] = Size(Key(0), Key(kTestSize - 1));
// must have much smaller db size.
ASSERT_GT(db_size[0] / 3, db_size[1]);
// repeat the test with universal compaction
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 1;
}
}
TEST(DBTest, CompactionDeletionTriggerReopen) {
for (int tid = 0; tid < 2; ++tid) {
uint64_t db_size[3];
Options options = DeletionTriggerOptions();
options.create_if_missing = true;
DestroyAndReopen(&options);
Random rnd(301);
// round 1 --- insert key/value pairs.
const int kTestSize = kCDTKeysPerBuffer * 512;
std::vector<std::string> values;
for (int k = 0; k < kTestSize; ++k) {
values.push_back(RandomString(&rnd, kCDTValueSize));
ASSERT_OK(Put(Key(k), values[k]));
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
db_size[0] = Size(Key(0), Key(kTestSize - 1));
Close();
// round 2 --- disable auto-compactions and issue deletions.
options.create_if_missing = false;
options.disable_auto_compactions = true;
Reopen(&options);
for (int k = 0; k < kTestSize; ++k) {
ASSERT_OK(Delete(Key(k)));
}
db_size[1] = Size(Key(0), Key(kTestSize - 1));
Close();
// as auto_compaction is off, we shouldn't see too much reduce
// in db size.
ASSERT_LT(db_size[0] / 3, db_size[1]);
// round 3 --- reopen db with auto_compaction on and see if
// deletion compensation still work.
options.disable_auto_compactions = false;
Reopen(&options);
// insert relatively small amount of data to trigger auto compaction.
for (int k = 0; k < kTestSize / 10; ++k) {
ASSERT_OK(Put(Key(k), values[k]));
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
db_size[2] = Size(Key(0), Key(kTestSize - 1));
// this time we're expecting significant drop in size.
ASSERT_GT(db_size[0] / 3, db_size[2]);
// repeat the test with universal compaction
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 1;
}
}
// This is a static filter used for filtering
// kvs during the compaction process.
static int cfilter_count;

@ -449,7 +449,7 @@ void MemTable::Update(SequenceNumber seq,
Slice mem_key = lkey.memtable_key();
std::unique_ptr<MemTableRep::Iterator> iter(
table_->GetIterator(lkey.user_key()));
table_->GetDynamicPrefixIterator());
iter->Seek(lkey.internal_key(), mem_key.data());
if (iter->Valid()) {
@ -508,7 +508,7 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
Slice memkey = lkey.memtable_key();
std::unique_ptr<MemTableRep::Iterator> iter(
table_->GetIterator(lkey.user_key()));
table_->GetDynamicPrefixIterator());
iter->Seek(lkey.internal_key(), memkey.data());
if (iter->Valid()) {
@ -583,7 +583,7 @@ size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
// reps). By passing in the user key, we allow efficient iterator creation.
// The iterator only needs to be ordered within the same user key.
std::unique_ptr<MemTableRep::Iterator> iter(
table_->GetIterator(key.user_key()));
table_->GetDynamicPrefixIterator());
iter->Seek(key.internal_key(), memkey.data());
size_t num_successive_merges = 0;
@ -610,7 +610,7 @@ size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
void MemTableRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
auto iter = GetIterator(k.user_key());
auto iter = GetDynamicPrefixIterator();
for (iter->Seek(k.internal_key(), k.memtable_key().data());
iter->Valid() && callback_func(callback_args, iter->key());
iter->Next()) {

@ -31,6 +31,8 @@
#ifndef ROCKSDB_LITE
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include "db/builder.h"
#include "db/db_impl.h"
#include "db/dbformat.h"
@ -82,18 +84,17 @@ class Repairer {
status = WriteDescriptor();
}
if (status.ok()) {
unsigned long long bytes = 0;
uint64_t bytes = 0;
for (size_t i = 0; i < tables_.size(); i++) {
bytes += tables_[i].meta.fd.GetFileSize();
}
Log(options_.info_log,
"**** Repaired rocksdb %s; "
"recovered %d files; %llu bytes. "
"recovered %zu files; %" PRIu64
"bytes. "
"Some data may have been lost. "
"****",
dbname_.c_str(),
static_cast<int>(tables_.size()),
bytes);
dbname_.c_str(), tables_.size(), bytes);
}
return status;
}
@ -159,8 +160,8 @@ class Repairer {
std::string logname = LogFileName(dbname_, logs_[i]);
Status status = ConvertLogToTable(logs_[i]);
if (!status.ok()) {
Log(options_.info_log, "Log #%llu: ignoring conversion error: %s",
(unsigned long long) logs_[i],
Log(options_.info_log,
"Log #%" PRIu64 ": ignoring conversion error: %s", logs_[i],
status.ToString().c_str());
}
ArchiveFile(logname);
@ -174,10 +175,8 @@ class Repairer {
uint64_t lognum;
virtual void Corruption(size_t bytes, const Status& s) {
// We print error messages for corruption, but continue repairing.
Log(info_log, "Log #%llu: dropping %d bytes; %s",
(unsigned long long) lognum,
static_cast<int>(bytes),
s.ToString().c_str());
Log(info_log, "Log #%" PRIu64 ": dropping %d bytes; %s", lognum,
static_cast<int>(bytes), s.ToString().c_str());
}
};
@ -220,8 +219,7 @@ class Repairer {
if (status.ok()) {
counter += WriteBatchInternal::Count(&batch);
} else {
Log(options_.info_log, "Log #%llu: ignoring %s",
(unsigned long long) log,
Log(options_.info_log, "Log #%" PRIu64 ": ignoring %s", log,
status.ToString().c_str());
status = Status::OK(); // Keep going with rest of file
}
@ -244,9 +242,9 @@ class Repairer {
table_numbers_.push_back(meta.fd.GetNumber());
}
}
Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s",
(unsigned long long)log, counter,
(unsigned long long)meta.fd.GetNumber(), status.ToString().c_str());
Log(options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter,
meta.fd.GetNumber(), status.ToString().c_str());
return status;
}
@ -257,9 +255,8 @@ class Repairer {
Status status = ScanTable(&t);
if (!status.ok()) {
std::string fname = TableFileName(dbname_, table_numbers_[i]);
Log(options_.info_log, "Table #%llu: ignoring %s",
(unsigned long long) table_numbers_[i],
status.ToString().c_str());
Log(options_.info_log, "Table #%" PRIu64 ": ignoring %s",
table_numbers_[i], status.ToString().c_str());
ArchiveFile(fname);
} else {
tables_.push_back(t);
@ -281,9 +278,8 @@ class Repairer {
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Slice key = iter->key();
if (!ParseInternalKey(key, &parsed)) {
Log(options_.info_log, "Table #%llu: unparsable key %s",
(unsigned long long)t->meta.fd.GetNumber(),
EscapeString(key).c_str());
Log(options_.info_log, "Table #%" PRIu64 ": unparsable key %s",
t->meta.fd.GetNumber(), EscapeString(key).c_str());
continue;
}
@ -305,9 +301,8 @@ class Repairer {
}
delete iter;
}
Log(options_.info_log, "Table #%llu: %d entries %s",
(unsigned long long)t->meta.fd.GetNumber(), counter,
status.ToString().c_str());
Log(options_.info_log, "Table #%" PRIu64 ": %d entries %s",
t->meta.fd.GetNumber(), counter, status.ToString().c_str());
return status;
}

@ -49,11 +49,23 @@ struct FileMetaData {
// Needs to be disposed when refs becomes 0.
Cache::Handle* table_reader_handle;
// stats for compensating deletion entries during compaction
uint64_t compensated_file_size; // File size compensated by deletion entry.
uint64_t num_entries; // the number of entries.
uint64_t num_deletions; // the number of deletion entries.
uint64_t raw_key_size; // total uncompressed key size.
uint64_t raw_value_size; // total uncompressed value size.
FileMetaData()
: refs(0),
fd(0, 0),
being_compacted(false),
table_reader_handle(nullptr) {}
table_reader_handle(nullptr),
compensated_file_size(0),
num_entries(0),
num_deletions(0),
raw_key_size(0),
raw_value_size(0) {}
};
class VersionEdit {
@ -149,6 +161,7 @@ class VersionEdit {
private:
friend class VersionSet;
friend class Version;
typedef std::set< std::pair<int, uint64_t>> DeletedFileSet;

@ -47,6 +47,15 @@ static uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
return sum;
}
static uint64_t TotalCompensatedFileSize(
const std::vector<FileMetaData*>& files) {
uint64_t sum = 0;
for (size_t i = 0; i < files.size() && files[i]; i++) {
sum += files[i]->compensated_file_size;
}
return sum;
}
Version::~Version() {
assert(refs_ == 0);
@ -241,21 +250,16 @@ class Version::LevelFileIteratorState : public TwoLevelIteratorState {
bool for_compaction_;
};
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
const FileMetaData* file_meta,
const std::string* fname) {
auto table_cache = cfd_->table_cache();
auto options = cfd_->options();
for (int level = 0; level < num_levels_; level++) {
for (const auto& file_meta : files_[level]) {
auto fname = TableFileName(vset_->dbname_, file_meta->fd.GetNumber());
// 1. If the table is already present in table cache, load table
// properties from there.
std::shared_ptr<const TableProperties> table_properties;
Status s = table_cache->GetTableProperties(
vset_->storage_options_, cfd_->internal_comparator(), file_meta->fd,
&table_properties, true /* no io */);
tp, true /* no io */);
if (s.ok()) {
props->insert({fname, table_properties});
continue;
return s;
}
// We only ignore error type `Incomplete` since it's by design that we
@ -267,8 +271,14 @@ Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
// 2. Table is not present in table cache, we'll read the table properties
// directly from the properties block in the file.
std::unique_ptr<RandomAccessFile> file;
s = options->env->NewRandomAccessFile(fname, &file,
vset_->storage_options_);
if (fname != nullptr) {
s = options->env->NewRandomAccessFile(
*fname, &file, vset_->storage_options_);
} else {
s = options->env->NewRandomAccessFile(
TableFileName(vset_->dbname_, file_meta->fd.GetNumber()),
&file, vset_->storage_options_);
}
if (!s.ok()) {
return s;
}
@ -286,8 +296,23 @@ Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
RecordTick(options->statistics.get(),
NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
props->insert({fname, std::shared_ptr<const TableProperties>(
raw_table_properties)});
*tp = std::shared_ptr<const TableProperties>(raw_table_properties);
return s;
}
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
for (int level = 0; level < num_levels_; level++) {
for (const auto& file_meta : files_[level]) {
auto fname = TableFileName(vset_->dbname_, file_meta->fd.GetNumber());
// 1. If the table is already present in table cache, load table
// properties from there.
std::shared_ptr<const TableProperties> table_properties;
Status s = GetTableProperties(&table_properties, file_meta, &fname);
if (s.ok()) {
props->insert({fname, table_properties});
} else {
return s;
}
}
}
@ -492,7 +517,11 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset,
compaction_level_(num_levels_),
version_number_(version_number),
file_indexer_(num_levels_, cfd == nullptr ? nullptr
: cfd->internal_comparator().user_comparator()) {
: cfd->internal_comparator().user_comparator()),
total_file_size_(0),
total_raw_key_size_(0),
total_raw_value_size_(0),
num_non_deletions_(0) {
}
void Version::Get(const ReadOptions& options,
@ -699,6 +728,58 @@ void Version::PrepareApply(std::vector<uint64_t>& size_being_compacted) {
UpdateNumNonEmptyLevels();
}
bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
if (file_meta->num_entries > 0) {
return false;
}
std::shared_ptr<const TableProperties> tp;
Status s = GetTableProperties(&tp, file_meta);
if (!s.ok()) {
return false;
}
if (tp.get() == nullptr) return false;
file_meta->num_entries = tp->num_entries;
file_meta->num_deletions = GetDeletedKeys(tp->user_collected_properties);
file_meta->raw_value_size = tp->raw_value_size;
file_meta->raw_key_size = tp->raw_key_size;
return true;
}
void Version::UpdateTemporaryStats(const VersionEdit* edit) {
static const int kDeletionWeightOnCompaction = 2;
// incrementally update the average value size by
// including newly added files into the global stats
int init_count = 0;
int total_count = 0;
for (int level = 0; level < num_levels_; level++) {
for (auto* file_meta : files_[level]) {
if (MaybeInitializeFileMetaData(file_meta)) {
// each FileMeta will be initialized only once.
total_file_size_ += file_meta->fd.GetFileSize();
total_raw_key_size_ += file_meta->raw_key_size;
total_raw_value_size_ += file_meta->raw_value_size;
num_non_deletions_ +=
file_meta->num_entries - file_meta->num_deletions;
init_count++;
}
total_count++;
}
}
uint64_t average_value_size = GetAverageValueSize();
// compute the compensated size
for (int level = 0; level < num_levels_; level++) {
for (auto* file_meta : files_[level]) {
file_meta->compensated_file_size = file_meta->fd.GetFileSize() +
file_meta->num_deletions * average_value_size *
kDeletionWeightOnCompaction;
}
}
}
void Version::ComputeCompactionScore(
std::vector<uint64_t>& size_being_compacted) {
double max_score = 0;
@ -728,7 +809,7 @@ void Version::ComputeCompactionScore(
uint64_t total_size = 0;
for (unsigned int i = 0; i < files_[level].size(); i++) {
if (!files_[level][i]->being_compacted) {
total_size += files_[level][i]->fd.GetFileSize();
total_size += files_[level][i]->compensated_file_size;
numfiles++;
}
}
@ -747,7 +828,7 @@ void Version::ComputeCompactionScore(
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes =
TotalFileSize(files_[level]) - size_being_compacted[level];
TotalCompensatedFileSize(files_[level]) - size_being_compacted[level];
score = static_cast<double>(level_bytes) /
cfd_->compaction_picker()->MaxBytesForLevel(level);
if (max_score < score) {
@ -783,9 +864,10 @@ namespace {
// Compator that is used to sort files based on their size
// In normal mode: descending size
bool CompareSizeDescending(const Version::Fsize& first,
bool CompareCompensatedSizeDescending(const Version::Fsize& first,
const Version::Fsize& second) {
return (first.file->fd.GetFileSize() > second.file->fd.GetFileSize());
return (first.file->compensated_file_size >
second.file->compensated_file_size);
}
// A static compator used to sort files based on their seqno
// In universal style : descending seqno
@ -846,7 +928,7 @@ void Version::UpdateFilesBySize() {
num = temp.size();
}
std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
CompareSizeDescending);
CompareCompensatedSizeDescending);
}
assert(temp.size() == files.size());
@ -1674,6 +1756,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (!edit->IsColumnFamilyManipulation()) {
// This is cpu-heavy operations, which should be called outside mutex.
v->UpdateTemporaryStats(edit);
v->PrepareApply(size_being_compacted);
}

@ -196,6 +196,25 @@ class Version {
// Returns the version nuber of this version
uint64_t GetVersionNumber() const { return version_number_; }
uint64_t GetAverageValueSize() const {
if (num_non_deletions_ == 0) {
return 0;
}
assert(total_raw_key_size_ + total_raw_value_size_ > 0);
assert(total_file_size_ > 0);
return total_raw_value_size_ / num_non_deletions_ * total_file_size_ /
(total_raw_key_size_ + total_raw_value_size_);
}
// REQUIRES: lock is held
// On success, "tp" will contains the table properties of the file
// specified in "file_meta". If the file name of "file_meta" is
// known ahread, passing it by a non-null "fname" can save a
// file-name conversion.
Status GetTableProperties(std::shared_ptr<const TableProperties>* tp,
const FileMetaData* file_meta,
const std::string* fname = nullptr);
// REQUIRES: lock is held
// On success, *props will be populated with all SSTables' table properties.
// The keys of `props` are the sst file name, the values of `props` are the
@ -228,6 +247,15 @@ class Version {
// Update num_non_empty_levels_.
void UpdateNumNonEmptyLevels();
// The helper function of UpdateTemporaryStats, which may fill the missing
// fields of file_mata from its associated TableProperties.
// Returns true if it does initialize FileMetaData.
bool MaybeInitializeFileMetaData(FileMetaData* file_meta);
// Update the temporary stats associated with the current version.
// This temporary stats will be used in compaction.
void UpdateTemporaryStats(const VersionEdit* edit);
// Sort all files for this version based on their file size and
// record results in files_by_size_. The largest files are listed first.
void UpdateFilesBySize();
@ -285,6 +313,16 @@ class Version {
Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number = 0);
FileIndexer file_indexer_;
// total file size
uint64_t total_file_size_;
// the total size of all raw keys.
uint64_t total_raw_key_size_;
// the total size of all raw values.
uint64_t total_raw_value_size_;
// total number of non-deletion entries
uint64_t num_non_deletions_;
~Version();
// re-initializes the index that is used to offset into files_by_size_

@ -4,13 +4,32 @@
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#pragma once
#include <string>
#include "rocksdb/options.h"
namespace rocksdb {
// An interface for converting a slice to a readable string
class SliceFormatter {
public:
virtual ~SliceFormatter() {}
virtual std::string Format(const Slice& s) const = 0;
};
// Options for customizing ldb tool (beyond the DB Options)
struct LDBOptions {
// Create LDBOptions with default values for all fields
LDBOptions();
// Key formatter that converts a slice to a readable string.
// Default: Slice::ToString()
std::shared_ptr<SliceFormatter> key_formatter;
};
class LDBTool {
public:
void Run(int argc, char** argv, Options = Options());
void Run(int argc, char** argv, Options db_options= Options(),
const LDBOptions& ldb_options = LDBOptions());
};
} // namespace rocksdb

@ -148,13 +148,6 @@ class MemTableRep {
// all the states but those allocated in arena.
virtual Iterator* GetIterator(Arena* arena = nullptr) = 0;
// Return an iterator over at least the keys with the specified user key. The
// iterator may also allow access to other keys, but doesn't have to. Default:
// GetIterator().
virtual Iterator* GetIterator(const Slice& user_key) {
return GetIterator(nullptr);
}
// Return an iterator that has a special Seek semantics. The result of
// a Seek might only include keys with the same prefix as the target key.
// arena: If not null, the arena needs to be used to allocate the Iterator.

@ -22,6 +22,7 @@
package org.rocksdb.benchmark;
import java.lang.Runnable;
import java.lang.Math;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Collection;
@ -240,7 +241,8 @@ public class DbBenchmark {
if (entriesPerBatch_ == 1) {
for (long i = 0; i < numEntries_; ++i) {
getKey(key, i, keyRange_);
db_.put(writeOpt_, key, DbBenchmark.this.gen_.generate(valueSize_));
DbBenchmark.this.gen_.generate(value);
db_.put(writeOpt_, key, value);
stats_.finishedSingleOp(keySize_ + valueSize_);
writeRateControl(i);
if (isFinished()) {
@ -252,7 +254,8 @@ public class DbBenchmark {
WriteBatch batch = new WriteBatch();
for (long j = 0; j < entriesPerBatch_; j++) {
getKey(key, i + j, keyRange_);
batch.put(key, DbBenchmark.this.gen_.generate(valueSize_));
DbBenchmark.this.gen_.generate(value);
db_.put(writeOpt_, key, value);
stats_.finishedSingleOp(keySize_ + valueSize_);
}
db_.write(writeOpt_, batch);
@ -473,7 +476,6 @@ public class DbBenchmark {
"No compression is used.%n",
compressionType_, e.toString());
compressionType_ = "none";
compressionRatio_ = 1.0;
}
gen_ = new RandomGenerator(randSeed_, compressionRatio_);
}
@ -1522,24 +1524,54 @@ public class DbBenchmark {
private final byte[] data_;
private int dataLength_;
private int position_;
private double compressionRatio_;
Random rand_;
private RandomGenerator(long seed, double compressionRatio) {
// We use a limited amount of data over and over again and ensure
// that it is larger than the compression window (32KB), and also
byte[] value = new byte[100];
// large enough to serve all typical value sizes we want to write.
rand_ = new Random(seed);
dataLength_ = 1048576 + 100;
dataLength_ = value.length * 10000;
data_ = new byte[dataLength_];
// TODO(yhchiang): mimic test::CompressibleString?
for (int i = 0; i < dataLength_; ++i) {
data_[i] = (byte) (' ' + rand_.nextInt(95));
compressionRatio_ = compressionRatio;
int pos = 0;
while (pos < dataLength_) {
compressibleBytes(value);
System.arraycopy(value, 0, data_, pos,
Math.min(value.length, dataLength_ - pos));
pos += value.length;
}
}
private byte[] generate(int length) {
position_ = rand_.nextInt(data_.length - length);
return Arrays.copyOfRange(data_, position_, position_ + length);
private void compressibleBytes(byte[] value) {
int baseLength = value.length;
if (compressionRatio_ < 1.0d) {
baseLength = (int) (compressionRatio_ * value.length + 0.5);
}
if (baseLength <= 0) {
baseLength = 1;
}
int pos;
for (pos = 0; pos < baseLength; ++pos) {
value[pos] = (byte) (' ' + rand_.nextInt(95)); // ' ' .. '~'
}
while (pos < value.length) {
System.arraycopy(value, 0, value, pos,
Math.min(baseLength, value.length - pos));
pos += baseLength;
}
}
private void generate(byte[] value) {
if (position_ + value.length > data_.length) {
position_ = 0;
assert(value.length <= data_.length);
}
position_ += value.length;
System.arraycopy(data_, position_ - value.length,
value, 0, value.length);
}
}

@ -419,7 +419,7 @@ private:
Ch c = s.Take();
if (c == '\\') { // Escape
Ch e = s.Take();
if ((sizeof(Ch) == 1 || e < 256) && escape[(unsigned char)e])
if ((sizeof(Ch) == 1 || (int)e < 256) && escape[(unsigned char)e])
RAPIDJSON_PUT(escape[(unsigned char)e]);
else if (e == 'u') { // Unicode
unsigned codepoint = ParseHex4(s);

@ -86,7 +86,8 @@ Status ReduceLevelTest::OpenDB(bool create_if_missing, int num_levels,
bool ReduceLevelTest::ReduceLevels(int target_level) {
std::vector<std::string> args = rocksdb::ReduceDBLevelsCommand::PrepareArgs(
dbname_, target_level, false);
LDBCommand* level_reducer = LDBCommand::InitFromCmdLineArgs(args);
LDBCommand* level_reducer = LDBCommand::InitFromCmdLineArgs(
args, Options(), LDBOptions());
level_reducer->Run();
bool is_succeed = level_reducer->GetExecuteState().IsSucceed();
delete level_reducer;

@ -244,8 +244,6 @@ class HashCuckooRep : public MemTableRep {
bool QuickInsert(const char* internal_key, const Slice& user_key,
int bucket_ids[], const int initial_hash_id);
// Unhide default implementations of GetIterator
using MemTableRep::GetIterator;
// Returns the pointer to the internal iterator to the buckets where buckets
// are sorted according to the user specified KeyComparator. Note that
// any insert after this function call may affect the sorted nature of

@ -75,8 +75,6 @@ class HashLinkListRep : public MemTableRep {
virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override;
virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override;
virtual MemTableRep::Iterator* GetDynamicPrefixIterator(
Arena* arena = nullptr) override;
@ -466,14 +464,6 @@ MemTableRep::Iterator* HashLinkListRep::GetIterator(Arena* alloc_arena) {
}
}
MemTableRep::Iterator* HashLinkListRep::GetIterator(const Slice& slice) {
auto bucket = GetBucket(transform_->Transform(slice));
if (bucket == nullptr) {
return new EmptyIterator();
}
return new Iterator(this, bucket);
}
MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator(
Arena* alloc_arena) {
if (alloc_arena == nullptr) {

@ -40,8 +40,6 @@ class HashSkipListRep : public MemTableRep {
virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override;
virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override;
virtual MemTableRep::Iterator* GetDynamicPrefixIterator(
Arena* arena = nullptr) override;
@ -310,14 +308,6 @@ MemTableRep::Iterator* HashSkipListRep::GetIterator(Arena* arena) {
}
}
MemTableRep::Iterator* HashSkipListRep::GetIterator(const Slice& slice) {
auto bucket = GetBucket(transform_->Transform(slice));
if (bucket == nullptr) {
return new EmptyIterator();
}
return new Iterator(bucket, false);
}
MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator(Arena* arena) {
if (arena == nullptr) {
return new DynamicIterator(*this);

@ -50,13 +50,14 @@ const char* LDBCommand::DELIM = " ==> ";
LDBCommand* LDBCommand::InitFromCmdLineArgs(
int argc,
char** argv,
const Options& options
const Options& options,
const LDBOptions& ldb_options
) {
vector<string> args;
for (int i = 1; i < argc; i++) {
args.push_back(argv[i]);
}
return InitFromCmdLineArgs(args, options);
return InitFromCmdLineArgs(args, options, ldb_options);
}
/**
@ -71,7 +72,8 @@ LDBCommand* LDBCommand::InitFromCmdLineArgs(
*/
LDBCommand* LDBCommand::InitFromCmdLineArgs(
const vector<string>& args,
const Options& options
const Options& options,
const LDBOptions& ldb_options
) {
// --x=y command line arguments are added as x->y map entries.
map<string, string> option_map;
@ -115,7 +117,8 @@ LDBCommand* LDBCommand::InitFromCmdLineArgs(
);
if (command) {
command->SetOptions(options);
command->SetDBOptions(options);
command->SetLDBOptions(ldb_options);
}
return command;
}
@ -1619,7 +1622,7 @@ void ScanCommand::DoCommand() {
for ( ;
it->Valid() && (!end_key_specified_ || it->key().ToString() < end_key_);
it->Next()) {
string key = it->key().ToString();
string key = ldb_options_.key_formatter->Format(it->key());
if (is_db_ttl_) {
TtlIterator* it_ttl = dynamic_cast<TtlIterator*>(it);
assert(it_ttl);
@ -1633,7 +1636,7 @@ void ScanCommand::DoCommand() {
}
string value = it->value().ToString();
fprintf(stdout, "%s : %s\n",
(is_key_hex_ ? StringToHex(key) : key).c_str(),
(is_key_hex_ ? "0x" + it->key().ToString(true) : key).c_str(),
(is_value_hex_ ? StringToHex(value) : value).c_str()
);
num_keys_scanned++;

@ -13,8 +13,9 @@
#include "db/version_set.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/iterator.h"
#include "rocksdb/ldb_tool.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "util/logging.h"
#include "util/ldb_cmd_execute_result.h"
@ -54,23 +55,29 @@ public:
static LDBCommand* InitFromCmdLineArgs(
const vector<string>& args,
const Options& options = Options()
const Options& options,
const LDBOptions& ldb_options
);
static LDBCommand* InitFromCmdLineArgs(
int argc,
char** argv,
const Options& options = Options()
const Options& options,
const LDBOptions& ldb_options
);
bool ValidateCmdLineOptions();
virtual Options PrepareOptionsForOpenDB();
virtual void SetOptions(Options options) {
virtual void SetDBOptions(Options options) {
options_ = options;
}
void SetLDBOptions(const LDBOptions& ldb_options) {
ldb_options_ = ldb_options;
}
virtual bool NoDBOpen() {
return false;
}
@ -291,6 +298,7 @@ protected:
const string& option, string* value);
Options options_;
LDBOptions ldb_options_;
private:

@ -9,6 +9,17 @@
namespace rocksdb {
class DefaultSliceFormatter : public SliceFormatter {
public:
virtual std::string Format(const Slice& s) const override {
return s.ToString();
}
};
LDBOptions::LDBOptions()
: key_formatter(new DefaultSliceFormatter()) {
}
class LDBCommandRunner {
public:
@ -71,13 +82,15 @@ public:
fprintf(stderr, "%s\n", ret.c_str());
}
static void RunCommand(int argc, char** argv, Options options) {
static void RunCommand(int argc, char** argv, Options options,
const LDBOptions& ldb_options) {
if (argc <= 2) {
PrintHelp(argv[0]);
exit(1);
}
LDBCommand* cmdObj = LDBCommand::InitFromCmdLineArgs(argc, argv, options);
LDBCommand* cmdObj = LDBCommand::InitFromCmdLineArgs(argc, argv, options,
ldb_options);
if (cmdObj == nullptr) {
fprintf(stderr, "Unknown command\n");
PrintHelp(argv[0]);
@ -99,8 +112,9 @@ public:
};
void LDBTool::Run(int argc, char** argv, Options options) {
LDBCommandRunner::RunCommand(argc, argv, options);
void LDBTool::Run(int argc, char** argv, Options options,
const LDBOptions& ldb_options) {
LDBCommandRunner::RunCommand(argc, argv, options, ldb_options);
}
} // namespace rocksdb

@ -154,6 +154,10 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
max_successive_merges(options.max_successive_merges),
min_partial_merge_operands(options.min_partial_merge_operands) {
assert(memtable_factory.get() != nullptr);
if (max_bytes_for_level_multiplier_additional.size() <
static_cast<unsigned int>(num_levels)) {
max_bytes_for_level_multiplier_additional.resize(num_levels, 1);
}
}
DBOptions::DBOptions()

@ -106,9 +106,6 @@ public:
std::string tmp_; // For passing to EncodeKey
};
// Unhide default implementations of GetIterator
using MemTableRep::GetIterator;
virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override {
if (arena == nullptr) {
return new SkipListRep::Iterator(&skip_list_);

@ -91,9 +91,6 @@ class VectorRep : public MemTableRep {
virtual void SeekToLast() override;
};
// Unhide default implementations of GetIterator()
using MemTableRep::GetIterator;
// Return an iterator over the keys in this representation.
virtual MemTableRep::Iterator* GetIterator(Arena* arena) override;

Loading…
Cancel
Save