Allow compaction to reclaim storage more effectively.

Summary:
This diff allows compaction to reclaim storage more effectively.
In the current design, compactions are mainly triggered based on
the file sizes.  However, since deletion entries does not have
value, files which have many deletion entries are less likely
to be compacted.  As a result, it may took a while to make
deletion entries to be compacted.

This diff address issue by compensating the size of deletion
entries during compaction process: the size of each deletion
entry in the compaction process is augmented by 2x average
value size.  The diff applies to both leveled and universal
compacitons.

Test Plan:
develop CompactionDeletionTrigger
make db_test
./db_test

Reviewers: haobo, igor, ljin, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D19029
main
Yueh-Hsuan Chiang 11 years ago
parent faa8d21922
commit e813f5b6d9
  1. 63
      db/compaction_picker.cc
  2. 113
      db/db_test.cc
  3. 11
      db/version_edit.h
  4. 167
      db/version_set.cc
  5. 38
      db/version_set.h

@ -19,10 +19,10 @@ namespace rocksdb {
namespace {
uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
uint64_t sum = 0;
for (size_t i = 0; i < files.size() && files[i]; i++) {
sum += files[i]->fd.GetFileSize();
sum += files[i]->compensated_file_size;
}
return sum;
}
@ -80,7 +80,7 @@ void CompactionPicker::SizeBeingCompacted(std::vector<uint64_t>& sizes) {
for (auto c : compactions_in_progress_[level]) {
assert(c->level() == level);
for (int i = 0; i < c->num_input_files(0); i++) {
total += c->input(0, i)->fd.GetFileSize();
total += c->input(0, i)->compensated_file_size;
}
}
sizes[level] = total;
@ -261,9 +261,9 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) {
std::vector<FileMetaData*> expanded0;
c->input_version_->GetOverlappingInputs(
level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr);
const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const uint64_t expanded0_size = TotalFileSize(expanded0);
const uint64_t inputs0_size = TotalCompensatedFileSize(c->inputs_[0]);
const uint64_t inputs1_size = TotalCompensatedFileSize(c->inputs_[1]);
const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0);
uint64_t limit = ExpandedCompactionByteSizeLimit(level);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size < limit &&
@ -335,7 +335,7 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
MaxFileSizeForLevel(input_level) * options_->source_compaction_factor;
uint64_t total = 0;
for (size_t i = 0; i + 1 < inputs.size(); ++i) {
uint64_t s = inputs[i]->fd.GetFileSize();
uint64_t s = inputs[i]->compensated_file_size;
total += s;
if (total >= limit) {
**compaction_end = inputs[i + 1]->smallest;
@ -483,11 +483,11 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
FileMetaData* f = c->input_version_->files_[level][index];
// check to verify files are arranged in descending size
assert(
(i == file_size.size() - 1) ||
(i >= Version::number_of_files_to_sort_ - 1) ||
(f->fd.GetFileSize() >=
c->input_version_->files_[level][file_size[i + 1]]->fd.GetFileSize()));
assert((i == file_size.size() - 1) ||
(i >= Version::number_of_files_to_sort_ - 1) ||
(f->compensated_file_size >=
c->input_version_->files_[level][file_size[i + 1]]->
compensated_file_size));
// do not pick a file to compact if it is being compacted
// from n-1 level.
@ -665,7 +665,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// This file is not being compacted. Consider it as the
// first candidate to be compacted.
uint64_t candidate_size = f != nullptr ? f->fd.GetFileSize() : 0;
uint64_t candidate_size = f != nullptr? f->compensated_file_size : 0;
if (f != nullptr) {
LogToBuffer(log_buffer,
"[%s] Universal: Possible candidate file %lu[%d].",
@ -703,9 +703,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// by the last-resort read amp strategy which disregards size ratios.
break;
}
candidate_size = f->fd.GetFileSize();
candidate_size = f->compensated_file_size;
} else { // default kCompactionStopStyleTotalSize
candidate_size += f->fd.GetFileSize();
candidate_size += f->compensated_file_size;
}
candidate_count++;
}
@ -721,10 +721,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
int index = file_by_time[i];
FileMetaData* f = version->files_[level][index];
LogToBuffer(log_buffer,
"[%s] Universal: Skipping file %lu[%d] with size %lu %d\n",
version->cfd_->GetName().c_str(),
(unsigned long)f->fd.GetNumber(), i,
(unsigned long)f->fd.GetFileSize(), f->being_compacted);
"[%s] Universal: Skipping file %" PRIu64 "[%d] "
"with size %" PRIu64 " (compensated size %" PRIu64 ") %d\n",
version->cfd_->GetName().c_str(), f->fd.GetNumber(),
i, f->fd.GetFileSize(), f->compensated_file_size, f->being_compacted);
}
}
}
@ -759,10 +759,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
int index = file_by_time[i];
FileMetaData* f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f);
LogToBuffer(
log_buffer, "[%s] Universal: Picking file %lu[%d] with size %lu\n",
version->cfd_->GetName().c_str(), (unsigned long)f->fd.GetNumber(), i,
(unsigned long)f->fd.GetFileSize());
LogToBuffer(log_buffer,
"[%s] Universal: Picking file %" PRIu64 "[%d] "
"with size %" PRIu64 " (compensated size %" PRIu64 ")\n",
version->cfd_->GetName().c_str(),
f->fd.GetNumber(), i,
f->fd.GetFileSize(), f->compensated_file_size);
}
return c;
}
@ -826,7 +828,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
" is already being compacted. No size amp reduction possible.\n");
return nullptr;
}
candidate_size += f->fd.GetFileSize();
candidate_size += f->compensated_file_size;
candidate_count++;
}
if (candidate_count == 0) {
@ -866,10 +868,11 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f);
LogToBuffer(log_buffer,
"[%s] Universal: size amp picking file %lu[%d] with size %lu",
version->cfd_->GetName().c_str(),
(unsigned long)f->fd.GetNumber(), index,
(unsigned long)f->fd.GetFileSize());
"[%s] Universal: size amp picking file %" PRIu64 "[%d] "
"with size %" PRIu64 " (compensated size %" PRIu64 ")",
version->cfd_->GetName().c_str(),
f->fd.GetNumber(), index,
f->fd.GetFileSize(), f->compensated_file_size);
}
return c;
}
@ -879,7 +882,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version,
assert(version->NumberLevels() == 1);
uint64_t total_size = 0;
for (const auto& file : version->files_[0]) {
total_size += file->fd.GetFileSize();
total_size += file->compensated_file_size;
}
if (total_size <= options_->compaction_options_fifo.max_table_files_size ||
@ -907,7 +910,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version,
for (auto ritr = version->files_[0].rbegin();
ritr != version->files_[0].rend(); ++ritr) {
auto f = *ritr;
total_size -= f->fd.GetFileSize();
total_size -= f->compensated_file_size;
c->inputs_[0].push_back(f);
char tmp_fsize[16];
AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));

@ -2726,6 +2726,119 @@ TEST(DBTest, CompactionTrigger) {
ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1);
}
namespace {
static const int kCDTValueSize = 1000;
static const int kCDTKeysPerBuffer = 4;
static const int kCDTNumLevels = 8;
Options DeletionTriggerOptions() {
Options options;
options.compression = kNoCompression;
options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24);
options.min_write_buffer_number_to_merge = 1;
options.num_levels = kCDTNumLevels;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = 1;
options.target_file_size_base = options.write_buffer_size * 2;
options.target_file_size_multiplier = 2;
options.max_bytes_for_level_base =
options.target_file_size_base * options.target_file_size_multiplier;
options.max_bytes_for_level_multiplier = 2;
options.disable_auto_compactions = false;
return options;
}
} // anonymous namespace
TEST(DBTest, CompactionDeletionTrigger) {
Options options = DeletionTriggerOptions();
options.create_if_missing = true;
for (int tid = 0; tid < 2; ++tid) {
uint64_t db_size[2];
DestroyAndReopen(&options);
Random rnd(301);
const int kTestSize = kCDTKeysPerBuffer * 512;
std::vector<std::string> values;
for (int k = 0; k < kTestSize; ++k) {
values.push_back(RandomString(&rnd, kCDTValueSize));
ASSERT_OK(Put(Key(k), values[k]));
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
db_size[0] = Size(Key(0), Key(kTestSize - 1));
for (int k = 0; k < kTestSize; ++k) {
ASSERT_OK(Delete(Key(k)));
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
db_size[1] = Size(Key(0), Key(kTestSize - 1));
// must have much smaller db size.
ASSERT_GT(db_size[0] / 3, db_size[1]);
// repeat the test with universal compaction
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 1;
}
}
TEST(DBTest, CompactionDeletionTriggerReopen) {
for (int tid = 0; tid < 2; ++tid) {
uint64_t db_size[3];
Options options = DeletionTriggerOptions();
options.create_if_missing = true;
DestroyAndReopen(&options);
Random rnd(301);
// round 1 --- insert key/value pairs.
const int kTestSize = kCDTKeysPerBuffer * 512;
std::vector<std::string> values;
for (int k = 0; k < kTestSize; ++k) {
values.push_back(RandomString(&rnd, kCDTValueSize));
ASSERT_OK(Put(Key(k), values[k]));
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
db_size[0] = Size(Key(0), Key(kTestSize - 1));
Close();
// round 2 --- disable auto-compactions and issue deletions.
options.create_if_missing = false;
options.disable_auto_compactions = true;
Reopen(&options);
for (int k = 0; k < kTestSize; ++k) {
ASSERT_OK(Delete(Key(k)));
}
db_size[1] = Size(Key(0), Key(kTestSize - 1));
Close();
// as auto_compaction is off, we shouldn't see too much reduce
// in db size.
ASSERT_LT(db_size[0] / 3, db_size[1]);
// round 3 --- reopen db with auto_compaction on and see if
// deletion compensation still work.
options.disable_auto_compactions = false;
Reopen(&options);
// insert relatively small amount of data to trigger auto compaction.
for (int k = 0; k < kTestSize / 10; ++k) {
ASSERT_OK(Put(Key(k), values[k]));
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
db_size[2] = Size(Key(0), Key(kTestSize - 1));
// this time we're expecting significant drop in size.
ASSERT_GT(db_size[0] / 3, db_size[2]);
// repeat the test with universal compaction
options.compaction_style = kCompactionStyleUniversal;
options.num_levels = 1;
}
}
// This is a static filter used for filtering
// kvs during the compaction process.
static int cfilter_count;

@ -40,6 +40,11 @@ struct FileDescriptor {
struct FileMetaData {
int refs;
FileDescriptor fd;
uint64_t compensated_file_size; // File size compensated by deletion entry.
uint64_t num_entries; // the number of entries.
uint64_t num_deletions; // the number of deletion entries.
uint64_t raw_key_size; // total uncompressed key size.
uint64_t raw_value_size; // total uncompressed value size.
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
bool being_compacted; // Is this file undergoing compaction?
@ -52,6 +57,11 @@ struct FileMetaData {
FileMetaData()
: refs(0),
fd(0, 0),
compensated_file_size(0),
num_entries(0),
num_deletions(0),
raw_key_size(0),
raw_value_size(0),
being_compacted(false),
table_reader_handle(nullptr) {}
};
@ -149,6 +159,7 @@ class VersionEdit {
private:
friend class VersionSet;
friend class Version;
typedef std::set< std::pair<int, uint64_t>> DeletedFileSet;

@ -47,6 +47,15 @@ static uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
return sum;
}
static uint64_t TotalCompensatedFileSize(
const std::vector<FileMetaData*>& files) {
uint64_t sum = 0;
for (size_t i = 0; i < files.size() && files[i]; i++) {
sum += files[i]->compensated_file_size;
}
return sum;
}
Version::~Version() {
assert(refs_ == 0);
@ -241,53 +250,69 @@ class Version::LevelFileIteratorState : public TwoLevelIteratorState {
bool for_compaction_;
};
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
const FileMetaData* file_meta,
const std::string* fname) {
auto table_cache = cfd_->table_cache();
auto options = cfd_->options();
Status s = table_cache->GetTableProperties(
vset_->storage_options_, cfd_->internal_comparator(), file_meta->fd,
tp, true /* no io */);
if (s.ok()) {
return s;
}
// We only ignore error type `Incomplete` since it's by design that we
// disallow table when it's not in table cache.
if (!s.IsIncomplete()) {
return s;
}
// 2. Table is not present in table cache, we'll read the table properties
// directly from the properties block in the file.
std::unique_ptr<RandomAccessFile> file;
if (fname != nullptr) {
s = options->env->NewRandomAccessFile(
*fname, &file, vset_->storage_options_);
} else {
s = options->env->NewRandomAccessFile(
TableFileName(vset_->dbname_, file_meta->fd.GetNumber()),
&file, vset_->storage_options_);
}
if (!s.ok()) {
return s;
}
TableProperties* raw_table_properties;
// By setting the magic number to kInvalidTableMagicNumber, we can by
// pass the magic number check in the footer.
s = ReadTableProperties(
file.get(), file_meta->fd.GetFileSize(),
Footer::kInvalidTableMagicNumber /* table's magic number */,
vset_->env_, options->info_log.get(), &raw_table_properties);
if (!s.ok()) {
return s;
}
RecordTick(options->statistics.get(),
NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
*tp = std::shared_ptr<const TableProperties>(raw_table_properties);
return s;
}
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
for (int level = 0; level < num_levels_; level++) {
for (const auto& file_meta : files_[level]) {
auto fname = TableFileName(vset_->dbname_, file_meta->fd.GetNumber());
// 1. If the table is already present in table cache, load table
// properties from there.
std::shared_ptr<const TableProperties> table_properties;
Status s = table_cache->GetTableProperties(
vset_->storage_options_, cfd_->internal_comparator(), file_meta->fd,
&table_properties, true /* no io */);
Status s = GetTableProperties(&table_properties, file_meta, &fname);
if (s.ok()) {
props->insert({fname, table_properties});
continue;
}
// We only ignore error type `Incomplete` since it's by design that we
// disallow table when it's not in table cache.
if (!s.IsIncomplete()) {
return s;
}
// 2. Table is not present in table cache, we'll read the table properties
// directly from the properties block in the file.
std::unique_ptr<RandomAccessFile> file;
s = options->env->NewRandomAccessFile(fname, &file,
vset_->storage_options_);
if (!s.ok()) {
return s;
}
TableProperties* raw_table_properties;
// By setting the magic number to kInvalidTableMagicNumber, we can by
// pass the magic number check in the footer.
s = ReadTableProperties(
file.get(), file_meta->fd.GetFileSize(),
Footer::kInvalidTableMagicNumber /* table's magic number */,
vset_->env_, options->info_log.get(), &raw_table_properties);
if (!s.ok()) {
} else {
return s;
}
RecordTick(options->statistics.get(),
NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
props->insert({fname, std::shared_ptr<const TableProperties>(
raw_table_properties)});
}
}
@ -492,7 +517,11 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset,
compaction_level_(num_levels_),
version_number_(version_number),
file_indexer_(num_levels_, cfd == nullptr ? nullptr
: cfd->internal_comparator().user_comparator()) {
: cfd->internal_comparator().user_comparator()),
total_file_size_(0),
total_raw_key_size_(0),
total_raw_value_size_(0),
num_non_deletions_(0) {
}
void Version::Get(const ReadOptions& options,
@ -699,6 +728,58 @@ void Version::PrepareApply(std::vector<uint64_t>& size_being_compacted) {
UpdateNumNonEmptyLevels();
}
bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
if (file_meta->num_entries > 0) {
return false;
}
std::shared_ptr<const TableProperties> tp;
Status s = GetTableProperties(&tp, file_meta);
if (!s.ok()) {
return false;
}
if (tp.get() == nullptr) return false;
file_meta->num_entries = tp->num_entries;
file_meta->num_deletions = GetDeletedKeys(tp->user_collected_properties);
file_meta->raw_value_size = tp->raw_value_size;
file_meta->raw_key_size = tp->raw_key_size;
return true;
}
void Version::UpdateTemporaryStats(const VersionEdit* edit) {
static const int kDeletionWeightOnCompaction = 2;
// incrementally update the average value size by
// including newly added files into the global stats
int init_count = 0;
int total_count = 0;
for (int level = 0; level < num_levels_; level++) {
for (auto* file_meta : files_[level]) {
if (MaybeInitializeFileMetaData(file_meta)) {
// each FileMeta will be initialized only once.
total_file_size_ += file_meta->fd.GetFileSize();
total_raw_key_size_ += file_meta->raw_key_size;
total_raw_value_size_ += file_meta->raw_value_size;
num_non_deletions_ +=
file_meta->num_entries - file_meta->num_deletions;
init_count++;
}
total_count++;
}
}
uint64_t average_value_size = GetAverageValueSize();
// compute the compensated size
for (int level = 0; level < num_levels_; level++) {
for (auto* file_meta : files_[level]) {
file_meta->compensated_file_size = file_meta->fd.GetFileSize() +
file_meta->num_deletions * average_value_size *
kDeletionWeightOnCompaction;
}
}
}
void Version::ComputeCompactionScore(
std::vector<uint64_t>& size_being_compacted) {
double max_score = 0;
@ -728,7 +809,7 @@ void Version::ComputeCompactionScore(
uint64_t total_size = 0;
for (unsigned int i = 0; i < files_[level].size(); i++) {
if (!files_[level][i]->being_compacted) {
total_size += files_[level][i]->fd.GetFileSize();
total_size += files_[level][i]->compensated_file_size;
numfiles++;
}
}
@ -747,7 +828,7 @@ void Version::ComputeCompactionScore(
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes =
TotalFileSize(files_[level]) - size_being_compacted[level];
TotalCompensatedFileSize(files_[level]) - size_being_compacted[level];
score = static_cast<double>(level_bytes) /
cfd_->compaction_picker()->MaxBytesForLevel(level);
if (max_score < score) {
@ -783,9 +864,10 @@ namespace {
// Compator that is used to sort files based on their size
// In normal mode: descending size
bool CompareSizeDescending(const Version::Fsize& first,
const Version::Fsize& second) {
return (first.file->fd.GetFileSize() > second.file->fd.GetFileSize());
bool CompareCompensatedSizeDescending(const Version::Fsize& first,
const Version::Fsize& second) {
return (first.file->compensated_file_size >
second.file->compensated_file_size);
}
// A static compator used to sort files based on their seqno
// In universal style : descending seqno
@ -846,7 +928,7 @@ void Version::UpdateFilesBySize() {
num = temp.size();
}
std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
CompareSizeDescending);
CompareCompensatedSizeDescending);
}
assert(temp.size() == files.size());
@ -1674,6 +1756,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (!edit->IsColumnFamilyManipulation()) {
// This is cpu-heavy operations, which should be called outside mutex.
v->UpdateTemporaryStats(edit);
v->PrepareApply(size_being_compacted);
}

@ -196,6 +196,25 @@ class Version {
// Returns the version nuber of this version
uint64_t GetVersionNumber() const { return version_number_; }
uint64_t GetAverageValueSize() const {
if (num_non_deletions_ == 0) {
return 0;
}
assert(total_raw_key_size_ + total_raw_value_size_ > 0);
assert(total_file_size_ > 0);
return total_raw_value_size_ / num_non_deletions_ * total_file_size_ /
(total_raw_key_size_ + total_raw_value_size_);
}
// REQUIRES: lock is held
// On success, "tp" will contains the table properties of the file
// specified in "file_meta". If the file name of "file_meta" is
// known ahread, passing it by a non-null "fname" can save a
// file-name conversion.
Status GetTableProperties(std::shared_ptr<const TableProperties>* tp,
const FileMetaData* file_meta,
const std::string* fname = nullptr);
// REQUIRES: lock is held
// On success, *props will be populated with all SSTables' table properties.
// The keys of `props` are the sst file name, the values of `props` are the
@ -228,6 +247,15 @@ class Version {
// Update num_non_empty_levels_.
void UpdateNumNonEmptyLevels();
// The helper function of UpdateTemporaryStats, which may fill the missing
// fields of file_mata from its associated TableProperties.
// Returns true if it does initialize FileMetaData.
bool MaybeInitializeFileMetaData(FileMetaData* file_meta);
// Update the temporary stats associated with the current version.
// This temporary stats will be used in compaction.
void UpdateTemporaryStats(const VersionEdit* edit);
// Sort all files for this version based on their file size and
// record results in files_by_size_. The largest files are listed first.
void UpdateFilesBySize();
@ -285,6 +313,16 @@ class Version {
Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number = 0);
FileIndexer file_indexer_;
// total file size
uint64_t total_file_size_;
// the total size of all raw keys.
uint64_t total_raw_key_size_;
// the total size of all raw values.
uint64_t total_raw_value_size_;
// total number of non-deletion entries
uint64_t num_non_deletions_;
~Version();
// re-initializes the index that is used to offset into files_by_size_

Loading…
Cancel
Save