Enable cancelling manual compactions if they hit the sfm size limit

Summary:
Manual compactions should be cancelled, just like scheduled compactions are cancelled, if sfm->EnoughRoomForCompaction is not true.
Closes https://github.com/facebook/rocksdb/pull/3670

Differential Revision: D7457683

Pulled By: amytai

fbshipit-source-id: 669b02fdb707f75db576d03d2c818fb98d1876f5
main
Amy Tai 7 years ago committed by Facebook Github Bot
parent 44653c7b7a
commit 1579626d0d
  1. 3
      db/db_impl.h
  2. 88
      db/db_impl_compaction_flush.cc
  3. 104
      db/db_sst_test.cc
  4. 9
      util/sst_file_manager_impl.cc
  5. 2
      util/sst_file_manager_impl.h

@ -928,6 +928,9 @@ class DBImpl : public DB {
Status BackgroundFlush(bool* madeProgress, JobContext* job_context, Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer); LogBuffer* log_buffer);
bool EnoughRoomForCompaction(const std::vector<CompactionInputFiles>& inputs,
bool* sfm_bookkeeping, LogBuffer* log_buffer);
void PrintStatistics(); void PrintStatistics();
// dump rocksdb.stats to LOG // dump rocksdb.stats to LOG

@ -24,6 +24,32 @@
namespace rocksdb { namespace rocksdb {
bool DBImpl::EnoughRoomForCompaction(
const std::vector<CompactionInputFiles>& inputs,
bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
// Check if we have enough room to do the compaction
bool enough_room = true;
#ifndef ROCKSDB_LITE
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
if (sfm) {
enough_room = sfm->EnoughRoomForCompaction(inputs);
if (enough_room) {
*sfm_reserved_compact_space = true;
}
}
#endif // ROCKSDB_LITE
if (!enough_room) {
// Just in case tests want to change the value of enough_room
TEST_SYNC_POINT_CALLBACK(
"DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
ROCKS_LOG_BUFFER(log_buffer,
"Cancelled compaction because not enough room");
RecordTick(stats_, COMPACTION_CANCELLED, 1);
}
return enough_room;
}
Status DBImpl::SyncClosedLogs(JobContext* job_context) { Status DBImpl::SyncClosedLogs(JobContext* job_context) {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
mutex_.AssertHeld(); mutex_.AssertHeld();
@ -582,6 +608,16 @@ Status DBImpl::CompactFilesImpl(
"files are already being compacted"); "files are already being compacted");
} }
} }
bool sfm_reserved_compact_space = false;
// First check if we have enough room to do the compaction
bool enough_room = EnoughRoomForCompaction(
input_files, &sfm_reserved_compact_space, log_buffer);
if (!enough_room) {
// m's vars will get set properly at the end of this function,
// as long as status == CompactionTooLarge
return Status::CompactionTooLarge();
}
// At this point, CompactFiles will be run. // At this point, CompactFiles will be run.
bg_compaction_scheduled_++; bg_compaction_scheduled_++;
@ -658,6 +694,14 @@ Status DBImpl::CompactFilesImpl(
*c->mutable_cf_options(), FlushReason::kManualCompaction); *c->mutable_cf_options(), FlushReason::kManualCompaction);
} }
c->ReleaseCompactionFiles(s); c->ReleaseCompactionFiles(s);
#ifndef ROCKSDB_LITE
// Need to make sure SstFileManager does its bookkeeping
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
if (sfm && sfm_reserved_compact_space) {
sfm->OnCompactionCompletion(c.get());
}
#endif // ROCKSDB_LITE
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
@ -696,6 +740,7 @@ Status DBImpl::CompactFilesImpl(
if (bg_compaction_scheduled_ == 0) { if (bg_compaction_scheduled_ == 0) {
bg_cv_.SignalAll(); bg_cv_.SignalAll();
} }
TEST_SYNC_POINT("CompactFilesImpl:End");
return status; return status;
} }
@ -1578,9 +1623,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// InternalKey manual_end_storage; // InternalKey manual_end_storage;
// InternalKey* manual_end = &manual_end_storage; // InternalKey* manual_end = &manual_end_storage;
#ifndef ROCKSDB_LITE bool sfm_reserved_compact_space = false;
bool sfm_bookkeeping = false;
#endif // ROCKSDB_LITE
if (is_manual) { if (is_manual) {
ManualCompactionState* m = manual_compaction; ManualCompactionState* m = manual_compaction;
assert(m->in_progress); assert(m->in_progress);
@ -1593,6 +1636,18 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
m->cfd->GetName().c_str(), m->input_level, m->cfd->GetName().c_str(), m->input_level,
(m->begin ? m->begin->DebugString().c_str() : "(begin)"), (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)")); (m->end ? m->end->DebugString().c_str() : "(end)"));
} else {
// First check if we have enough room to do the compaction
bool enough_room = EnoughRoomForCompaction(
*(c->inputs()), &sfm_reserved_compact_space, log_buffer);
if (!enough_room) {
// Then don't do the compaction
c->ReleaseCompactionFiles(status);
c.reset();
// m's vars will get set properly at the end of this function,
// as long as status == CompactionTooLarge
status = Status::CompactionTooLarge();
} else { } else {
ROCKS_LOG_BUFFER( ROCKS_LOG_BUFFER(
log_buffer, log_buffer,
@ -1605,6 +1660,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
? "(end)" ? "(end)"
: m->manual_end->DebugString().c_str())); : m->manual_end->DebugString().c_str()));
} }
}
} else if (!is_prepicked && !compaction_queue_.empty()) { } else if (!is_prepicked && !compaction_queue_.empty()) {
if (HasExclusiveManualCompaction()) { if (HasExclusiveManualCompaction()) {
// Can't compact right now, but try again later // Can't compact right now, but try again later
@ -1644,24 +1700,10 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer)); c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction"); TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
bool enough_room = true;
if (c != nullptr) { if (c != nullptr) {
#ifndef ROCKSDB_LITE bool enough_room = EnoughRoomForCompaction(
auto sfm = static_cast<SstFileManagerImpl*>( *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
immutable_db_options_.sst_file_manager.get());
if (sfm) {
enough_room = sfm->EnoughRoomForCompaction(c.get());
if (enough_room) {
sfm_bookkeeping = true;
}
}
#endif // ROCKSDB_LITE
if (!enough_room) {
// Just in case tests want to change the value of enough_room
TEST_SYNC_POINT_CALLBACK(
"DBImpl::BackgroundCompaction():CancelledCompaction",
&enough_room);
}
if (!enough_room) { if (!enough_room) {
// Then don't do the compaction // Then don't do the compaction
c->ReleaseCompactionFiles(status); c->ReleaseCompactionFiles(status);
@ -1670,9 +1712,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
->storage_info() ->storage_info()
->ComputeCompactionScore(*(c->immutable_cf_options()), ->ComputeCompactionScore(*(c->immutable_cf_options()),
*(c->mutable_cf_options())); *(c->mutable_cf_options()));
ROCKS_LOG_BUFFER(log_buffer,
"Cancelled compaction because not enough room");
AddToCompactionQueue(cfd); AddToCompactionQueue(cfd);
++unscheduled_compactions_; ++unscheduled_compactions_;
@ -1680,7 +1719,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// Don't need to sleep here, because BackgroundCallCompaction // Don't need to sleep here, because BackgroundCallCompaction
// will sleep if !s.ok() // will sleep if !s.ok()
status = Status::CompactionTooLarge(); status = Status::CompactionTooLarge();
RecordTick(stats_, COMPACTION_CANCELLED, 1);
} else { } else {
// update statistics // update statistics
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
@ -1867,7 +1905,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// Need to make sure SstFileManager does its bookkeeping // Need to make sure SstFileManager does its bookkeeping
auto sfm = static_cast<SstFileManagerImpl*>( auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get()); immutable_db_options_.sst_file_manager.get());
if (sfm && sfm_bookkeeping) { if (sfm && sfm_reserved_compact_space) {
sfm->OnCompactionCompletion(c.get()); sfm->OnCompactionCompletion(c.get());
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -20,6 +20,35 @@ class DBSSTTest : public DBTestBase {
DBSSTTest() : DBTestBase("/db_sst_test") {} DBSSTTest() : DBTestBase("/db_sst_test") {}
}; };
// A class which remembers the name of each flushed file.
class FlushedFileCollector : public EventListener {
public:
FlushedFileCollector() {}
~FlushedFileCollector() {}
virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
std::lock_guard<std::mutex> lock(mutex_);
flushed_files_.push_back(info.file_path);
}
std::vector<std::string> GetFlushedFiles() {
std::lock_guard<std::mutex> lock(mutex_);
std::vector<std::string> result;
for (auto fname : flushed_files_) {
result.push_back(fname);
}
return result;
}
void ClearFlushedFiles() {
std::lock_guard<std::mutex> lock(mutex_);
flushed_files_.clear();
}
private:
std::vector<std::string> flushed_files_;
std::mutex mutex_;
};
TEST_F(DBSSTTest, DontDeletePendingOutputs) { TEST_F(DBSSTTest, DontDeletePendingOutputs) {
Options options; Options options;
options.env = env_; options.env = env_;
@ -559,6 +588,7 @@ TEST_F(DBSSTTest, CancellingCompactionsWorks) {
rocksdb::SyncPoint::GetInstance()->SetCallBack( rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* arg) { "DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* arg) {
sfm->SetMaxAllowedSpaceUsage(0); sfm->SetMaxAllowedSpaceUsage(0);
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
}); });
rocksdb::SyncPoint::GetInstance()->SetCallBack( rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun", "DBImpl::BackgroundCompaction:NonTrivial:AfterRun",
@ -584,6 +614,8 @@ TEST_F(DBSSTTest, CancellingCompactionsWorks) {
ASSERT_OK(Flush()); ASSERT_OK(Flush());
dbfull()->TEST_WaitForCompact(true); dbfull()->TEST_WaitForCompact(true);
// Because we set a callback in CancelledCompaction, we actually
// let the compaction run
ASSERT_GT(completed_compactions, 0); ASSERT_GT(completed_compactions, 0);
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0); ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
// Make sure the stat is bumped // Make sure the stat is bumped
@ -591,6 +623,78 @@ TEST_F(DBSSTTest, CancellingCompactionsWorks) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_F(DBSSTTest, CancellingManualCompactionsWorks) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.statistics = CreateDBStatistics();
FlushedFileCollector* collector = new FlushedFileCollector();
options.listeners.emplace_back(collector);
DestroyAndReopen(options);
Random rnd(301);
// Generate a file containing 10 keys.
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 50)));
}
ASSERT_OK(Flush());
uint64_t total_file_size = 0;
auto files_in_db = GetAllSSTFiles(&total_file_size);
// Set the maximum allowed space usage to the current total size
sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
// Generate another file to trigger compaction.
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 50)));
}
ASSERT_OK(Flush());
// OK, now trigger a manual compaction
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// Wait for manual compaction to get scheduled and finish
dbfull()->TEST_WaitForCompact(true);
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
// Make sure the stat is bumped
ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
COMPACTION_CANCELLED),
1);
// Now make sure CompactFiles also gets cancelled
auto l0_files = collector->GetFlushedFiles();
dbfull()->CompactFiles(rocksdb::CompactionOptions(), l0_files, 0);
// Wait for manual compaction to get scheduled and finish
dbfull()->TEST_WaitForCompact(true);
ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
COMPACTION_CANCELLED),
2);
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
// Now let the flush through and make sure GetCompactionsReservedSize
// returns to normal
sfm->SetMaxAllowedSpaceUsage(0);
int completed_compactions = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"CompactFilesImpl:End", [&](void* arg) { completed_compactions++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
dbfull()->CompactFiles(rocksdb::CompactionOptions(), l0_files, 0);
dbfull()->TEST_WaitForCompact(true);
ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
ASSERT_GT(completed_compactions, 0);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) { TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) {
// This test will set a maximum allowed space for the DB, then it will // This test will set a maximum allowed space for the DB, then it will
// keep filling the DB until the limit is reached and bg_error_ is set. // keep filling the DB until the limit is reached and bg_error_ is set.

@ -106,13 +106,14 @@ bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() {
max_allowed_space_; max_allowed_space_;
} }
bool SstFileManagerImpl::EnoughRoomForCompaction(Compaction* c) { bool SstFileManagerImpl::EnoughRoomForCompaction(
const std::vector<CompactionInputFiles>& inputs) {
MutexLock l(&mu_); MutexLock l(&mu_);
uint64_t size_added_by_compaction = 0; uint64_t size_added_by_compaction = 0;
// First check if we even have the space to do the compaction // First check if we even have the space to do the compaction
for (size_t i = 0; i < c->num_input_levels(); i++) { for (size_t i = 0; i < inputs.size(); i++) {
for (size_t j = 0; j < c->num_input_files(i); j++) { for (size_t j = 0; j < inputs[i].size(); j++) {
FileMetaData* filemeta = c->input(i, j); FileMetaData* filemeta = inputs[i][j];
size_added_by_compaction += filemeta->fd.GetFileSize(); size_added_by_compaction += filemeta->fd.GetFileSize();
} }
} }

@ -67,7 +67,7 @@ class SstFileManagerImpl : public SstFileManager {
// estimates how much space is currently being used by compactions (i.e. // estimates how much space is currently being used by compactions (i.e.
// if a compaction has started, this function bumps the used space by // if a compaction has started, this function bumps the used space by
// the full compaction size). // the full compaction size).
bool EnoughRoomForCompaction(Compaction* c); bool EnoughRoomForCompaction(const std::vector<CompactionInputFiles>& inputs);
// Bookkeeping so total_file_sizes_ goes back to normal after compaction // Bookkeeping so total_file_sizes_ goes back to normal after compaction
// finishes // finishes

Loading…
Cancel
Save