Fixed a bug where CompactFiles won't delete obsolete files until flush.

Summary: Fixed a bug where CompactFiles won't delete obsolete files until flush.

Test Plan:
./compact_files_test
export ROCKSDB_TESTS=CompactFiles
./db_test

Reviewers: rven, sdong, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D34671
main
Yueh-Hsuan Chiang 10 years ago
parent 6f55798683
commit 2b785d76b8
  1. 6
      Makefile
  2. 104
      db/compact_files_test.cc
  3. 97
      db/db_impl.cc
  4. 3
      db/db_impl.h

@ -236,7 +236,8 @@ TESTS = \
listener_test \ listener_test \
compaction_job_test \ compaction_job_test \
thread_list_test \ thread_list_test \
sst_dump_test sst_dump_test \
compact_files_test
SUBSET := $(shell echo $(TESTS) |sed s/^.*$(ROCKSDBTESTS_START)/$(ROCKSDBTESTS_START)/) SUBSET := $(shell echo $(TESTS) |sed s/^.*$(ROCKSDBTESTS_START)/$(ROCKSDBTESTS_START)/)
@ -616,6 +617,9 @@ thread_list_test: util/thread_list_test.o $(LIBOBJECTS) $(TESTHARNESS)
compactor_test: utilities/compaction/compactor_test.o $(LIBOBJECTS) $(TESTHARNESS) compactor_test: utilities/compaction/compactor_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
compact_files_test: db/compact_files_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
options_test: util/options_test.o $(LIBOBJECTS) $(TESTHARNESS) options_test: util/options_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)

@ -0,0 +1,104 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <mutex>
#include <string>
#include <vector>
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "util/testharness.h"
namespace rocksdb {
class CompactFilesTest {
public:
CompactFilesTest() {
env_ = Env::Default();
db_name_ = test::TmpDir(env_) + "/compact_files_test";
}
std::string db_name_;
Env* env_;
};
// A class which remembers the name of each flushed file.
class FlushedFileCollector : public EventListener {
public:
FlushedFileCollector() {}
~FlushedFileCollector() {}
virtual void OnFlushCompleted(
DB* db, const std::string& column_family_name,
const std::string& file_path,
bool triggered_writes_slowdown,
bool triggered_writes_stop) {
std::lock_guard<std::mutex> lock(mutex_);
flushed_files_.push_back(file_path);
}
std::vector<std::string> GetFlushedFiles() {
std::lock_guard<std::mutex> lock(mutex_);
std::vector<std::string> result;
for (auto fname : flushed_files_) {
result.push_back(fname);
}
return result;
}
private:
std::vector<std::string> flushed_files_;
std::mutex mutex_;
};
TEST(CompactFilesTest, ObsoleteFiles) {
Options options;
// to trigger compaction more easily
const int kWriteBufferSize = 10000;
options.create_if_missing = true;
// Disable RocksDB background compaction.
options.compaction_style = kCompactionStyleNone;
// Small slowdown and stop trigger for experimental purpose.
options.level0_slowdown_writes_trigger = 20;
options.level0_stop_writes_trigger = 20;
options.write_buffer_size = kWriteBufferSize;
options.max_write_buffer_number = 2;
options.compression = kNoCompression;
// Add listener
FlushedFileCollector* collector = new FlushedFileCollector();
options.listeners.emplace_back(collector);
DB* db = nullptr;
DestroyDB(db_name_, options);
Status s = DB::Open(options, db_name_, &db);
assert(s.ok());
assert(db);
// create couple files
for (int i = 1000; i < 2000; ++i) {
db->Put(WriteOptions(),
std::to_string(i),
std::string(kWriteBufferSize / 10, 'a' + (i % 26)));
}
auto l0_files = collector->GetFlushedFiles();
CompactionOptions compact_opt;
compact_opt.compression = kNoCompression;
compact_opt.output_file_size_limit = kWriteBufferSize * 5;
ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, 1));
// verify all compaction input files are deleted
for (auto fname : l0_files) {
ASSERT_TRUE(!env_->FileExists(fname));
}
delete db;
}
} // namespace rocksdb
int main(int argc, char** argv) {
return rocksdb::test::RunAllTests();
}

@ -1285,24 +1285,53 @@ Status DBImpl::CompactFiles(
// not supported in lite version // not supported in lite version
return Status::NotSupported("Not supported in ROCKSDB LITE"); return Status::NotSupported("Not supported in ROCKSDB LITE");
#else #else
InstrumentedMutexLock l(&mutex_);
if (column_family == nullptr) { if (column_family == nullptr) {
return Status::InvalidArgument("ColumnFamilyHandle must be non-null."); return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
} }
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
assert(cfd); assert(cfd);
// TODO(yhchiang): use superversion
cfd->Ref(); Status s;
auto version = cfd->current(); JobContext job_context(0, true);
version->Ref(); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
auto s = CompactFilesImpl(compact_options, cfd, version, db_options_.info_log.get());
input_file_names, output_level, output_path_id);
// TODO(yhchiang): unref could move into CompactFilesImpl(). Otherwise, // Perform CompactFiles
// FindObsoleteFiles might never able to find any file to delete. SuperVersion* sv = GetAndRefSuperVersion(cfd);
version->Unref(); {
// TODO(yhchiang): cfd should be deleted after its last reference. InstrumentedMutexLock l(&mutex_);
cfd->Unref();
s = CompactFilesImpl(compact_options, cfd, sv->current,
input_file_names, output_level,
output_path_id, &job_context, &log_buffer);
}
ReturnAndCleanupSuperVersion(cfd, sv);
// Find and delete obsolete files
{
InstrumentedMutexLock l(&mutex_);
// If !s.ok(), this means that Compaction failed. In that case, we want
// to delete all obsolete files we might have created and we force
// FindObsoleteFiles(). This is because job_context does not
// catch all created files if compaction failed.
FindObsoleteFiles(&job_context, !s.ok());
}
// delete unnecessary files if any, this is done outside the mutex
if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
// Have to flush the info logs before bg_compaction_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the deconstructor of DB can kick in and destroy all the
// states of DB so info_log might not be available after that point.
// It also applies to access other states that DB owns.
log_buffer.FlushBufferToLog();
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
job_context.Clean();
}
return s; return s;
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }
@ -1311,11 +1340,10 @@ Status DBImpl::CompactFiles(
Status DBImpl::CompactFilesImpl( Status DBImpl::CompactFilesImpl(
const CompactionOptions& compact_options, ColumnFamilyData* cfd, const CompactionOptions& compact_options, ColumnFamilyData* cfd,
Version* version, const std::vector<std::string>& input_file_names, Version* version, const std::vector<std::string>& input_file_names,
const int output_level, int output_path_id) { const int output_level, int output_path_id, JobContext* job_context,
LogBuffer* log_buffer) {
mutex_.AssertHeld(); mutex_.AssertHeld();
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
if (shutting_down_.load(std::memory_order_acquire)) { if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress(); return Status::ShutdownInProgress();
} }
@ -1376,15 +1404,14 @@ Status DBImpl::CompactFilesImpl(
// deletion compaction currently not allowed in CompactFiles. // deletion compaction currently not allowed in CompactFiles.
assert(!c->IsDeletionCompaction()); assert(!c->IsDeletionCompaction());
JobContext job_context(0, true);
auto yield_callback = [&]() { auto yield_callback = [&]() {
return CallFlushDuringCompaction(c->column_family_data(), return CallFlushDuringCompaction(
*c->mutable_cf_options(), &job_context, c->column_family_data(), *c->mutable_cf_options(),
&log_buffer); job_context, log_buffer);
}; };
CompactionJob compaction_job( CompactionJob compaction_job(
job_context.job_id, c.get(), db_options_, *c->mutable_cf_options(), job_context->job_id, c.get(), db_options_, *c->mutable_cf_options(),
env_options_, versions_.get(), &shutting_down_, &log_buffer, env_options_, versions_.get(), &shutting_down_, log_buffer,
directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()), directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()),
stats_, &snapshots_, is_snapshot_supported_, table_cache_, stats_, &snapshots_, is_snapshot_supported_, table_cache_,
std::move(yield_callback)); std::move(yield_callback));
@ -1395,7 +1422,7 @@ Status DBImpl::CompactFilesImpl(
mutex_.Lock(); mutex_.Lock();
compaction_job.Install(&status, &mutex_); compaction_job.Install(&status, &mutex_);
if (status.ok()) { if (status.ok()) {
InstallSuperVersionBackground(c->column_family_data(), &job_context, InstallSuperVersionBackground(c->column_family_data(), job_context,
*c->mutable_cf_options()); *c->mutable_cf_options());
} }
c->ReleaseCompactionFiles(s); c->ReleaseCompactionFiles(s);
@ -1408,37 +1435,13 @@ Status DBImpl::CompactFilesImpl(
} else { } else {
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Compaction error: %s", "[%s] [JOB %d] Compaction error: %s",
c->column_family_data()->GetName().c_str(), job_context.job_id, c->column_family_data()->GetName().c_str(), job_context->job_id,
status.ToString().c_str()); status.ToString().c_str());
if (db_options_.paranoid_checks && bg_error_.ok()) { if (db_options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status; bg_error_ = status;
} }
} }
// If !s.ok(), this means that Compaction failed. In that case, we want
// to delete all obsolete files we might have created and we force
// FindObsoleteFiles(). This is because job_context does not
// catch all created files if compaction failed.
// TODO(yhchiang): write an unit-test to make sure files are actually
// deleted after CompactFiles.
FindObsoleteFiles(&job_context, !s.ok());
// delete unnecessary files if any, this is done outside the mutex
if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock();
// Have to flush the info logs before bg_compaction_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the deconstructor of DB can kick in and destroy all the
// states of DB so info_log might not be available after that point.
// It also applies to access other states that DB owns.
log_buffer.FlushBufferToLog();
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
job_context.Clean();
mutex_.Lock();
}
bg_compaction_scheduled_--; bg_compaction_scheduled_--;
return status; return status;

@ -372,7 +372,8 @@ class DBImpl : public DB {
Status CompactFilesImpl( Status CompactFilesImpl(
const CompactionOptions& compact_options, ColumnFamilyData* cfd, const CompactionOptions& compact_options, ColumnFamilyData* cfd,
Version* version, const std::vector<std::string>& input_file_names, Version* version, const std::vector<std::string>& input_file_names,
const int output_level, int output_path_id); const int output_level, int output_path_id, JobContext* job_context,
LogBuffer* log_buffer);
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name); ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);

Loading…
Cancel
Save