skip CompactRange flush based on memtable contents

Summary:
CompactRange has a call to Flush because we guarantee that, at the time it's called, all existing keys in the range will be pushed through the user's compaction filter. However, previously the flush was done blindly, so it'd happen even if the memtable does not contain keys in the range specified by the user. This caused unnecessarily many L0 files to be created, leading to write stalls in some cases. This PR checks the memtable's contents, and decides to flush only if it overlaps with `CompactRange`'s range.

- Move the memtable overlap check logic from `ExternalSstFileIngestionJob` to `ColumnFamilyData::RangesOverlapWithMemtables`
- Reuse the above logic in `CompactRange` and skip flushing if no overlap
Closes https://github.com/facebook/rocksdb/pull/3520

Differential Revision: D7018897

Pulled By: ajkr

fbshipit-source-id: a3c6b1cfae56687b49dd89ccac7c948e53545934
main
Andrew Kryczka 7 years ago committed by Facebook Github Bot
parent c287c098a4
commit 3ae0047278
  1. 3
      HISTORY.md
  2. 63
      db/column_family.cc
  3. 10
      db/column_family.h
  4. 58
      db/db_compaction_test.cc
  5. 2
      db/db_impl.cc
  6. 12
      db/db_impl_compaction_flush.cc
  7. 64
      db/external_sst_file_ingestion_job.cc
  8. 12
      db/external_sst_file_ingestion_job.h
  9. 4
      include/rocksdb/db.h

@ -3,6 +3,9 @@
### Public API Change
* RocksDBOptionsParser::Parse()'s `ignore_unknown_options` argument will only be effective if the option file shows it is generated using a higher version of RocksDB than the current version.
### New Features
* Avoid unnecessarily flushing in `CompactRange()` when the range specified by the user does not overlap unflushed memtables.
## 5.12.0 (2/14/2018)
### Public API Change
* Iterator::SeekForPrev is now a pure virtual method. This is to prevent user who implement the Iterator interface fail to implement SeekForPrev by mistake.

@ -31,6 +31,7 @@
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
#include "table/block_based_table_factory.h"
#include "table/merging_iterator.h"
#include "util/autovector.h"
#include "util/compression.h"
@ -881,6 +882,68 @@ bool ColumnFamilyData::RangeOverlapWithCompaction(
smallest_user_key, largest_user_key, level);
}
Status ColumnFamilyData::RangesOverlapWithMemtables(
const autovector<Range>& ranges, SuperVersion* super_version,
bool* overlap) {
assert(overlap != nullptr);
*overlap = false;
// Create an InternalIterator over all unflushed memtables
Arena arena;
ReadOptions read_opts;
read_opts.total_order_seek = true;
MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
merge_iter_builder.AddIterator(
super_version->mem->NewIterator(read_opts, &arena));
super_version->imm->AddIterators(read_opts, &merge_iter_builder);
ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
std::vector<InternalIterator*> memtable_range_del_iters;
auto* active_range_del_iter =
super_version->mem->NewRangeTombstoneIterator(read_opts);
if (active_range_del_iter != nullptr) {
memtable_range_del_iters.push_back(active_range_del_iter);
}
super_version->imm->AddRangeTombstoneIterators(read_opts,
&memtable_range_del_iters);
RangeDelAggregator range_del_agg(internal_comparator_, {} /* snapshots */,
false /* collapse_deletions */);
Status status;
{
std::unique_ptr<InternalIterator> memtable_range_del_iter(
NewMergingIterator(&internal_comparator_,
memtable_range_del_iters.empty()
? nullptr
: &memtable_range_del_iters[0],
static_cast<int>(memtable_range_del_iters.size())));
status = range_del_agg.AddTombstones(std::move(memtable_range_del_iter));
}
for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
auto* vstorage = super_version->current->storage_info();
auto* ucmp = vstorage->InternalComparator()->user_comparator();
InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
kValueTypeForSeek);
memtable_iter->Seek(range_start.Encode());
status = memtable_iter->status();
ParsedInternalKey seek_result;
if (status.ok()) {
if (memtable_iter->Valid() &&
!ParseInternalKey(memtable_iter->key(), &seek_result)) {
status = Status::Corruption("DB have corrupted keys");
}
}
if (status.ok()) {
if (memtable_iter->Valid() &&
ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
*overlap = true;
} else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
ranges[i].limit)) {
*overlap = true;
}
}
}
return status;
}
const int ColumnFamilyData::kCompactAllLevels = -1;
const int ColumnFamilyData::kCompactToBaseLevel = -2;

@ -274,6 +274,16 @@ class ColumnFamilyData {
const Slice& largest_user_key,
int level) const;
// Check if the passed ranges overlap with any unflushed memtables
// (immutable or mutable).
//
// @param super_version A referenced SuperVersion that will be held for the
// duration of this function.
//
// Thread-safe
Status RangesOverlapWithMemtables(const autovector<Range>& ranges,
SuperVersion* super_version, bool* overlap);
// A flag to tell a manual compaction is to compact all levels together
// instead of a specific level.
static const int kCompactAllLevels;

@ -3317,6 +3317,64 @@ TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBCompactionTest, CompactRangeFlushOverlappingMemtable) {
// Verify memtable only gets flushed if it contains data overlapping the range
// provided to `CompactRange`. Tests all kinds of overlap/non-overlap.
const int kNumEndpointKeys = 5;
std::string keys[kNumEndpointKeys] = {"a", "b", "c", "d", "e"};
Options options = CurrentOptions();
options.disable_auto_compactions = true;
Reopen(options);
// One extra iteration for nullptr, which means left side of interval is
// unbounded.
for (int i = 0; i <= kNumEndpointKeys; ++i) {
Slice begin;
Slice* begin_ptr;
if (i == 0) {
begin_ptr = nullptr;
} else {
begin = keys[i - 1];
begin_ptr = &begin;
}
// Start at `i` so right endpoint comes after left endpoint. One extra
// iteration for nullptr, which means right side of interval is unbounded.
for (int j = std::max(0, i - 1); j <= kNumEndpointKeys; ++j) {
Slice end;
Slice* end_ptr;
if (j == kNumEndpointKeys) {
end_ptr = nullptr;
} else {
end = keys[j];
end_ptr = &end;
}
ASSERT_OK(Put("b", "val"));
ASSERT_OK(Put("d", "val"));
CompactRangeOptions compact_range_opts;
ASSERT_OK(db_->CompactRange(compact_range_opts, begin_ptr, end_ptr));
uint64_t get_prop_tmp, num_memtable_entries = 0;
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesImmMemTables,
&get_prop_tmp));
num_memtable_entries += get_prop_tmp;
ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
&get_prop_tmp));
num_memtable_entries += get_prop_tmp;
if (begin_ptr == nullptr || end_ptr == nullptr ||
(i <= 4 && j >= 1 && (begin != "c" || end != "c"))) {
// In this case `CompactRange`'s range overlapped in some way with the
// memtable's range, so flush should've happened. Then "b" and "d" won't
// be in the memtable.
ASSERT_EQ(0, num_memtable_entries);
} else {
ASSERT_EQ(2, num_memtable_entries);
// flush anyways to prepare for next iteration
db_->Flush(FlushOptions());
}
}
}
}
INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
::testing::Values(std::make_tuple(1, true),
std::make_tuple(1, false),

@ -2832,7 +2832,7 @@ Status DBImpl::IngestExternalFile(
// Figure out if we need to flush the memtable first
if (status.ok()) {
bool need_flush = false;
status = ingestion_job.NeedsFlush(&need_flush);
status = ingestion_job.NeedsFlush(&need_flush, cfd->GetSuperVersion());
TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
&need_flush);
if (status.ok() && need_flush) {

@ -291,7 +291,17 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
bool exclusive = options.exclusive_manual_compaction;
bool flush_needed = true;
if (!options.allow_write_stall) {
if (begin != nullptr && end != nullptr) {
// TODO(ajkr): We could also optimize away the flush in certain cases where
// one/both sides of the interval are unbounded. But it requires more
// changes to RangesOverlapWithMemtables.
Range range(*begin, *end);
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
CleanupSuperVersion(super_version);
}
if (!options.allow_write_stall && flush_needed) {
InstrumentedMutexLock l(&mutex_);
uint64_t orig_active_memtable_id = cfd->mem()->GetID();
WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;

@ -132,11 +132,15 @@ Status ExternalSstFileIngestionJob::Prepare(
return status;
}
Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed) {
SuperVersion* super_version = cfd_->GetSuperVersion();
Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed,
SuperVersion* super_version) {
autovector<Range> ranges;
for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) {
ranges.emplace_back(file_to_ingest.smallest_user_key,
file_to_ingest.largest_user_key);
}
Status status =
IngestedFilesOverlapWithMemtables(super_version, flush_needed);
cfd_->RangesOverlapWithMemtables(ranges, super_version, flush_needed);
if (status.ok() && *flush_needed &&
!ingestion_options_.allow_blocking_flush) {
status = Status::InvalidArgument("External file requires flush");
@ -148,11 +152,12 @@ Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed) {
// nonmem_write_thread_
Status ExternalSstFileIngestionJob::Run() {
Status status;
SuperVersion* super_version = cfd_->GetSuperVersion();
#ifndef NDEBUG
// We should never run the job with a memtable that is overlapping
// with the files we are ingesting
bool need_flush = false;
status = NeedsFlush(&need_flush);
status = NeedsFlush(&need_flush, super_version);
assert(status.ok() && need_flush == false);
#endif
@ -167,7 +172,6 @@ Status ExternalSstFileIngestionJob::Run() {
// It is safe to use this instead of LastAllocatedSequence since we are
// the only active writer, and hence they are equal
const SequenceNumber last_seqno = versions_->LastSequence();
SuperVersion* super_version = cfd_->GetSuperVersion();
edit_.SetColumnFamily(cfd_->GetID());
// The levels that the files will be ingested into
@ -375,54 +379,6 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
return status;
}
Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables(
SuperVersion* sv, bool* overlap) {
*overlap = false;
// Create an InternalIterator over all memtables
Arena arena;
ReadOptions ro;
ro.total_order_seek = true;
MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(), &arena);
merge_iter_builder.AddIterator(sv->mem->NewIterator(ro, &arena));
sv->imm->AddIterators(ro, &merge_iter_builder);
ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
std::vector<InternalIterator*> memtable_range_del_iters;
auto* active_range_del_iter = sv->mem->NewRangeTombstoneIterator(ro);
if (active_range_del_iter != nullptr) {
memtable_range_del_iters.push_back(active_range_del_iter);
}
sv->imm->AddRangeTombstoneIterators(ro, &memtable_range_del_iters);
RangeDelAggregator range_del_agg(cfd_->internal_comparator(),
{} /* snapshots */,
false /* collapse_deletions */);
Status status;
{
std::unique_ptr<InternalIterator> memtable_range_del_iter(
NewMergingIterator(&cfd_->internal_comparator(),
memtable_range_del_iters.empty()
? nullptr
: &memtable_range_del_iters[0],
static_cast<int>(memtable_range_del_iters.size())));
status = range_del_agg.AddTombstones(std::move(memtable_range_del_iter));
}
if (status.ok()) {
for (IngestedFileInfo& f : files_to_ingest_) {
status = IngestedFileOverlapWithIteratorRange(&f, memtable_iter.get(),
overlap);
if (!status.ok() || *overlap == true) {
break;
}
if (range_del_agg.IsRangeOverlapped(f.smallest_user_key,
f.largest_user_key)) {
*overlap = true;
break;
}
}
}
return status;
}
Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style,
IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) {

@ -84,8 +84,12 @@ class ExternalSstFileIngestionJob {
// Check if we need to flush the memtable before running the ingestion job
// This will be true if the files we are ingesting are overlapping with any
// key range in the memtable.
// REQUIRES: Mutex held
Status NeedsFlush(bool* flush_needed);
//
// @param super_version A referenced SuperVersion that will be held for the
// duration of this function.
//
// Thread-safe
Status NeedsFlush(bool* flush_needed, SuperVersion* super_version);
// Will execute the ingestion job and prepare edit() to be applied.
// REQUIRES: Mutex held
@ -110,10 +114,6 @@ class ExternalSstFileIngestionJob {
Status GetIngestedFileInfo(const std::string& external_file,
IngestedFileInfo* file_to_ingest);
// Check if the files we are ingesting overlap with any memtable.
// REQUIRES: Mutex held
Status IngestedFilesOverlapWithMemtables(SuperVersion* sv, bool* overlap);
// Assign `file_to_ingest` the appropriate sequence number and the lowest
// possible level that it can be ingested to according to compaction_style.
// REQUIRES: Mutex held

@ -93,8 +93,8 @@ static const int kMinorVersion = __ROCKSDB_MINOR__;
// A range of keys
struct Range {
Slice start; // Included in the range
Slice limit; // Not included in the range
Slice start;
Slice limit;
Range() { }
Range(const Slice& s, const Slice& l) : start(s), limit(l) { }

Loading…
Cancel
Save