CompactFiles, EventListener and GetDatabaseMetaData

Summary:
This diff adds three sets of APIs to RocksDB.

= GetColumnFamilyMetaData =
* This APIs allow users to obtain the current state of a RocksDB instance on one column family.
* See GetColumnFamilyMetaData in include/rocksdb/db.h

= EventListener =
* A virtual class that allows users to implement a set of
  call-back functions which will be called when specific
  events of a RocksDB instance happens.
* To register EventListener, simply insert an EventListener to ColumnFamilyOptions::listeners

= CompactFiles =
* CompactFiles API inputs a set of file numbers and an output level, and RocksDB
  will try to compact those files into the specified level.

= Example =
* Example code can be found in example/compact_files_example.cc, which implements
  a simple external compactor using EventListener, GetColumnFamilyMetaData, and
  CompactFiles API.

Test Plan:
listener_test
compactor_test
example/compact_files_example
export ROCKSDB_TESTS=CompactFiles
db_test
export ROCKSDB_TESTS=MetaData
db_test

Reviewers: ljin, igor, rven, sdong

Reviewed By: sdong

Subscribers: MarkCallaghan, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D24705
main
Yueh-Hsuan Chiang 10 years ago
parent 5c93090530
commit 28c82ff1b3
  1. 10
      Makefile
  2. 34
      db/column_family.cc
  3. 6
      db/column_family.h
  4. 32
      db/compaction.cc
  5. 29
      db/compaction.h
  6. 315
      db/compaction_picker.cc
  7. 98
      db/compaction_picker.h
  8. 225
      db/db_impl.cc
  9. 34
      db/db_impl.h
  10. 10
      db/db_impl_readonly.h
  11. 259
      db/db_test.cc
  12. 11
      db/filename.cc
  13. 4
      db/filename.h
  14. 9
      db/flush_job.cc
  15. 2
      db/flush_job.h
  16. 344
      db/listener_test.cc
  17. 75
      db/version_set.cc
  18. 5
      db/version_set.h
  19. 9
      examples/Makefile
  20. 175
      examples/compact_files_example.cc
  21. 71
      include/rocksdb/db.h
  22. 4
      include/rocksdb/immutable_options.h
  23. 65
      include/rocksdb/listener.h
  24. 90
      include/rocksdb/metadata.h
  25. 22
      include/rocksdb/options.h
  26. 14
      include/rocksdb/status.h
  27. 21
      include/rocksdb/utilities/stackable_db.h
  28. 9
      util/options.cc
  29. 3
      util/status.cc

@ -149,7 +149,9 @@ TESTS = \
cuckoo_table_db_test \
write_batch_with_index_test \
flush_job_test \
wal_manager_test
wal_manager_test \
listener_test \
write_batch_with_index_test
TOOLS = \
sst_dump \
@ -502,6 +504,12 @@ cuckoo_table_reader_test: table/cuckoo_table_reader_test.o $(LIBOBJECTS) $(TESTH
cuckoo_table_db_test: db/cuckoo_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/cuckoo_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
listener_test: db/listener_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/listener_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
compactor_test: utilities/compaction/compactor_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) utilities/compaction/compactor_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
options_test: util/options_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/options_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

@ -87,6 +87,10 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
const std::string& ColumnFamilyHandleImpl::GetName() const {
return cfd()->GetName();
}
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
return cfd()->user_comparator();
}
@ -255,10 +259,23 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
} else if (ioptions_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(
new LevelCompactionPicker(ioptions_, &internal_comparator_));
} else {
assert(ioptions_.compaction_style == kCompactionStyleFIFO);
} else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
compaction_picker_.reset(
new FIFOCompactionPicker(ioptions_, &internal_comparator_));
} else if (ioptions_.compaction_style == kCompactionStyleNone) {
compaction_picker_.reset(new NullCompactionPicker(
ioptions_, &internal_comparator_));
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"Column family %s does not use any background compaction. "
"Compactions can only be done via CompactFiles\n",
GetName().c_str());
} else {
Log(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log,
"Unable to recognize the specified compaction style %d. "
"Column family %s will use kCompactionStyleLevel.\n",
ioptions_.compaction_style, GetName().c_str());
compaction_picker_.reset(
new LevelCompactionPicker(ioptions_, &internal_comparator_));
}
Log(InfoLogLevel::INFO_LEVEL,
@ -503,6 +520,19 @@ bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
return false;
}
void ColumnFamilyData::NotifyOnFlushCompleted(
DB* db, const std::string& file_path,
bool triggered_flush_slowdown,
bool triggered_flush_stop) {
auto listeners = ioptions()->listeners;
for (auto listener : listeners) {
listener->OnFlushCompleted(
db, GetName(), file_path,
// Use path 0 as fulled memtables are first flushed into path 0.
triggered_flush_slowdown, triggered_flush_stop);
}
}
SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion, port::Mutex* db_mutex) {
db_mutex->AssertHeld();

@ -52,6 +52,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
virtual const Comparator* user_comparator() const;
virtual uint32_t GetID() const;
virtual const std::string& GetName() const override;
private:
ColumnFamilyData* cfd_;
@ -250,6 +251,11 @@ class ColumnFamilyData {
void ResetThreadLocalSuperVersions();
void NotifyOnFlushCompleted(
DB* db, const std::string& file_path,
bool triggered_flush_slowdown,
bool triggered_flush_stop);
private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,

@ -78,6 +78,38 @@ Compaction::Compaction(int number_levels, int start_level, int out_level,
}
}
Compaction::Compaction(VersionStorageInfo* vstorage,
const autovector<CompactionInputFiles>& inputs,
int start_level, int output_level,
uint64_t max_grandparent_overlap_bytes,
const CompactionOptions& options,
bool deletion_compaction)
: start_level_(start_level),
output_level_(output_level),
max_output_file_size_(options.output_file_size_limit),
max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes),
input_version_(nullptr), // TODO(yhchiang): set it later
number_levels_(vstorage->NumberLevels()),
cfd_(nullptr),
output_compression_(options.compression),
seek_compaction_(false),
deletion_compaction_(deletion_compaction),
inputs_(inputs),
grandparent_index_(0),
seen_key_(false),
overlapped_bytes_(0),
base_index_(-1),
parent_index_(-1),
score_(0),
bottommost_level_(false),
is_full_compaction_(false),
is_manual_compaction_(false),
level_ptrs_(std::vector<size_t>(number_levels_)) {
for (int i = 0; i < number_levels_; i++) {
level_ptrs_[i] = 0;
}
}
Compaction::~Compaction() {
delete edit_;
if (input_version_ != nullptr) {

@ -33,6 +33,13 @@ class VersionStorageInfo;
// A Compaction encapsulates information about a compaction.
class Compaction {
public:
Compaction(VersionStorageInfo* input_version,
const autovector<CompactionInputFiles>& inputs,
int start_level, int output_level,
uint64_t max_grandparent_overlap_bytes,
const CompactionOptions& options,
bool deletion_compaction);
// No copying allowed
Compaction(const Compaction&) = delete;
void operator=(const Compaction&) = delete;
@ -153,6 +160,8 @@ class Compaction {
// Was this compaction triggered manually by the client?
bool IsManualCompaction() { return is_manual_compaction_; }
void SetOutputPathId(uint32_t path_id) { output_path_id_ = path_id; }
// Return the MutableCFOptions that should be used throughout the compaction
// procedure
const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; }
@ -164,6 +173,16 @@ class Compaction {
void SetInputVersion(Version* input_version);
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);
// Initialize whether the compaction is producing files at the
// bottommost level.
//
// @see BottomMostLevel()
void SetupBottomMostLevel(VersionStorageInfo* vstorage, bool is_manual,
bool level0_only);
private:
friend class CompactionPicker;
friend class UniversalCompactionPicker;
@ -226,16 +245,6 @@ class Compaction {
// records indices for all levels beyond "output_level_".
std::vector<size_t> level_ptrs_;
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);
// Initialize whether the compaction is producing files at the
// bottommost level.
//
// @see BottomMostLevel()
void SetupBottomMostLevel(VersionStorageInfo* vstorage, bool is_manual,
bool level0_only);
// In case of compaction error, reset the nextIndex that is used
// to pick up the next file to be compacted from files_by_size_
void ResetNextCompactionIndex();

@ -185,7 +185,8 @@ bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name,
}
// Returns true if any one of specified files are being compacted
bool CompactionPicker::FilesInCompaction(std::vector<FileMetaData*>& files) {
bool CompactionPicker::FilesInCompaction(
const std::vector<FileMetaData*>& files) {
for (unsigned int i = 0; i < files.size(); i++) {
if (files[i]->being_compacted) {
return true;
@ -194,6 +195,89 @@ bool CompactionPicker::FilesInCompaction(std::vector<FileMetaData*>& files) {
return false;
}
Compaction* CompactionPicker::FormCompaction(
const CompactionOptions& compact_options,
const autovector<CompactionInputFiles>& input_files,
int output_level, VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options) const {
uint64_t max_grandparent_overlap_bytes =
output_level + 1 < vstorage->NumberLevels() ?
mutable_cf_options.MaxGrandParentOverlapBytes(output_level + 1) :
std::numeric_limits<uint64_t>::max();
assert(input_files.size());
auto c = new Compaction(vstorage, input_files,
input_files[0].level, output_level,
max_grandparent_overlap_bytes,
compact_options, false);
c->mutable_cf_options_ = mutable_cf_options;
c->MarkFilesBeingCompacted(true);
// TODO(yhchiang): complete the SetBottomMostLevel as follows
// If there is no any key of the range in DB that is older than the
// range to compact, it is bottom most. For leveled compaction,
// if number-of_level-1 is empty, and output is going to number-of_level-2,
// it is also bottom-most. On the other hand, if number of level=1 (
// something like universal), the compaction is only "bottom-most" if
// the oldest file is involved.
c->SetupBottomMostLevel(
vstorage,
(output_level == vstorage->NumberLevels() - 1),
(output_level == 0));
return c;
}
Status CompactionPicker::GetCompactionInputsFromFileNumbers(
autovector<CompactionInputFiles>* input_files,
std::unordered_set<uint64_t>* input_set,
const VersionStorageInfo* vstorage,
const CompactionOptions& compact_options) const {
if (input_set->size() == 0U) {
return Status::InvalidArgument(
"Compaction must include at least one file.");
}
assert(input_files);
autovector<CompactionInputFiles> matched_input_files;
matched_input_files.resize(vstorage->NumberLevels());
int first_non_empty_level = -1;
int last_non_empty_level = -1;
// TODO(yhchiang): use a lazy-initialized mapping from
// file_number to FileMetaData in Version.
for (int level = 0; level < vstorage->NumberLevels(); ++level) {
for (auto file : vstorage->LevelFiles(level)) {
auto iter = input_set->find(file->fd.GetNumber());
if (iter != input_set->end()) {
matched_input_files[level].files.push_back(file);
input_set->erase(iter);
last_non_empty_level = level;
if (first_non_empty_level == -1) {
first_non_empty_level = level;
}
}
}
}
if (!input_set->empty()) {
std::string message(
"Cannot find matched SST files for the following file numbers:");
for (auto fn : *input_set) {
message += " ";
message += std::to_string(fn);
}
return Status::InvalidArgument(message);
}
for (int level = first_non_empty_level;
level <= last_non_empty_level; ++level) {
matched_input_files[level].level = level;
input_files->emplace_back(std::move(matched_input_files[level]));
}
return Status::OK();
}
// Returns true if any one of the parent files are being compacted
bool CompactionPicker::ParentRangeInCompaction(VersionStorageInfo* vstorage,
const InternalKey* smallest,
@ -362,6 +446,235 @@ Compaction* CompactionPicker::CompactRange(
return c;
}
namespace {
// Test whether two files have overlapping key-ranges.
bool HaveOverlappingKeyRanges(
const Comparator* c,
const SstFileMetaData& a, const SstFileMetaData& b) {
if (c->Compare(a.smallestkey, b.smallestkey) >= 0) {
if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
// b.smallestkey <= a.smallestkey <= b.largestkey
return true;
}
} else if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
// a.smallestkey < b.smallestkey <= a.largestkey
return true;
}
if (c->Compare(a.largestkey, b.largestkey) <= 0) {
if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
// b.smallestkey <= a.largestkey <= b.largestkey
return true;
}
} else if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
// a.smallestkey <= b.largestkey < a.largestkey
return true;
}
return false;
}
} // namespace
Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta,
const int output_level) const {
auto& levels = cf_meta.levels;
auto comparator = icmp_->user_comparator();
// TODO(yhchiang): If there is any input files of L1 or up and there
// is at least one L0 files. All L0 files older than the L0 file needs
// to be included. Otherwise, it is a false conditoin
// TODO(yhchiang): add is_adjustable to CompactionOptions
// the smallest and largest key of the current compaction input
std::string smallestkey;
std::string largestkey;
// a flag for initializing smallest and largest key
bool is_first = false;
const int kNotFound = -1;
// For each level, it does the following things:
// 1. Find the first and the last compaction input files
// in the current level.
// 2. Include all files between the first and the last
// compaction input files.
// 3. Update the compaction key-range.
// 4. For all remaining levels, include files that have
// overlapping key-range with the compaction key-range.
for (int l = 0; l <= output_level; ++l) {
auto& current_files = levels[l].files;
int first_included = static_cast<int>(current_files.size());
int last_included = kNotFound;
// identify the first and the last compaction input files
// in the current level.
for (size_t f = 0; f < current_files.size(); ++f) {
if (input_files->find(TableFileNameToNumber(current_files[f].name)) !=
input_files->end()) {
first_included = std::min(first_included, static_cast<int>(f));
last_included = std::max(last_included, static_cast<int>(f));
if (is_first == false) {
smallestkey = current_files[f].smallestkey;
largestkey = current_files[f].largestkey;
is_first = true;
}
}
}
if (last_included == kNotFound) {
continue;
}
if (l != 0) {
// expend the compaction input of the current level if it
// has overlapping key-range with other non-compaction input
// files in the same level.
while (first_included > 0) {
if (comparator->Compare(
current_files[first_included - 1].largestkey,
current_files[first_included].smallestkey) < 0) {
break;
}
first_included--;
}
while (last_included < static_cast<int>(current_files.size()) - 1) {
if (comparator->Compare(
current_files[last_included + 1].smallestkey,
current_files[last_included].largestkey) > 0) {
break;
}
last_included++;
}
}
// include all files between the first and the last compaction input files.
for (int f = first_included; f <= last_included; ++f) {
if (current_files[f].being_compacted) {
return Status::Aborted(
"Necessary compaction input file " + current_files[f].name +
" is currently being compacted.");
}
input_files->insert(
TableFileNameToNumber(current_files[f].name));
}
// update smallest and largest key
if (l == 0) {
for (int f = first_included; f <= last_included; ++f) {
if (comparator->Compare(
smallestkey, current_files[f].smallestkey) > 0) {
smallestkey = current_files[f].smallestkey;
}
if (comparator->Compare(
largestkey, current_files[f].largestkey) < 0) {
largestkey = current_files[f].largestkey;
}
}
} else {
if (comparator->Compare(
smallestkey, current_files[first_included].smallestkey) > 0) {
smallestkey = current_files[first_included].smallestkey;
}
if (comparator->Compare(
largestkey, current_files[last_included].largestkey) < 0) {
largestkey = current_files[last_included].largestkey;
}
}
SstFileMetaData aggregated_file_meta;
aggregated_file_meta.smallestkey = smallestkey;
aggregated_file_meta.largestkey = largestkey;
// For all lower levels, include all overlapping files.
for (int m = l + 1; m <= output_level; ++m) {
for (auto& next_lv_file : levels[m].files) {
if (HaveOverlappingKeyRanges(
comparator, aggregated_file_meta, next_lv_file)) {
if (next_lv_file.being_compacted) {
return Status::Aborted(
"File " + next_lv_file.name +
" that has overlapping key range with one of the compaction "
" input file is currently being compacted.");
}
input_files->insert(
TableFileNameToNumber(next_lv_file.name));
}
}
}
}
return Status::OK();
}
Status CompactionPicker::SanitizeCompactionInputFiles(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta,
const int output_level) const {
assert(static_cast<int>(cf_meta.levels.size()) - 1 ==
cf_meta.levels[cf_meta.levels.size() - 1].level);
if (output_level >= static_cast<int>(cf_meta.levels.size())) {
return Status::InvalidArgument(
"Output level for column family " + cf_meta.name +
" must between [0, " +
std::to_string(cf_meta.levels[cf_meta.levels.size() - 1].level) +
"].");
}
if (output_level > MaxOutputLevel()) {
return Status::InvalidArgument(
"Exceed the maximum output level defined by "
"the current compaction algorithm --- " +
std::to_string(MaxOutputLevel()));
}
if (output_level < 0) {
return Status::InvalidArgument(
"Output level cannot be negative.");
}
if (input_files->size() == 0) {
return Status::InvalidArgument(
"A compaction must contain at least one file.");
}
Status s = SanitizeCompactionInputFilesForAllLevels(
input_files, cf_meta, output_level);
if (!s.ok()) {
return s;
}
// for all input files, check whether the file number matches
// any currently-existing files.
for (auto file_num : *input_files) {
bool found = false;
for (auto level_meta : cf_meta.levels) {
for (auto file_meta : level_meta.files) {
if (file_num == TableFileNameToNumber(file_meta.name)) {
if (file_meta.being_compacted) {
return Status::Aborted(
"Specified compaction input file " +
MakeTableFileName("", file_num) +
" is already being compacted.");
}
found = true;
break;
}
}
if (found) {
break;
}
}
if (!found) {
return Status::InvalidArgument(
"Specified compaction input file " +
MakeTableFileName("", file_num) +
" does not exist in column family " + cf_meta.name + ".");
}
}
return Status::OK();
}
Compaction* LevelCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {

@ -8,6 +8,11 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <vector>
#include <memory>
#include <set>
#include <unordered_set>
#include "db/version_set.h"
#include "db/compaction.h"
#include "rocksdb/status.h"
@ -25,6 +30,7 @@ namespace rocksdb {
class LogBuffer;
class Compaction;
class VersionStorageInfo;
struct CompactionInputFiles;
class CompactionPicker {
public:
@ -62,6 +68,22 @@ class CompactionPicker {
// for compaction input.
virtual int MaxInputLevel(int current_num_levels) const = 0;
// The maximum allowed output level. Default value is NumberLevels() - 1.
virtual int MaxOutputLevel() const {
return NumberLevels() - 1;
}
// Sanitize the input set of compaction input files.
// When the input parameters do not describe a valid compaction, the
// function will try to fix the input_files by adding necessary
// files. If it's not possible to conver an invalid input_files
// into a valid one by adding more files, the function will return a
// non-ok status with specific reason.
Status SanitizeCompactionInputFiles(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta,
const int output_level) const;
// Free up the files that participated in a compaction
void ReleaseCompactionFiles(Compaction* c, Status status);
@ -69,6 +91,25 @@ class CompactionPicker {
// compactions per level
void SizeBeingCompacted(std::vector<uint64_t>& sizes);
// Returns true if any one of the specified files are being compacted
bool FilesInCompaction(const std::vector<FileMetaData*>& files);
// Takes a list of CompactionInputFiles and returns a Compaction object.
Compaction* FormCompaction(
const CompactionOptions& compact_options,
const autovector<CompactionInputFiles>& input_files,
int output_level, VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options) const;
// Converts a set of compaction input file numbers into
// a list of CompactionInputFiles.
Status GetCompactionInputsFromFileNumbers(
autovector<CompactionInputFiles>* input_files,
std::unordered_set<uint64_t>* input_set,
const VersionStorageInfo* vstorage,
const CompactionOptions& compact_options) const;
protected:
int NumberLevels() const { return ioptions_.num_levels; }
@ -98,9 +139,6 @@ class CompactionPicker {
bool ExpandWhileOverlapping(const std::string& cf_name,
VersionStorageInfo* vstorage, Compaction* c);
// Returns true if any one of the specified files are being compacted
bool FilesInCompaction(std::vector<FileMetaData*>& files);
// Returns true if any one of the parent files are being compacted
bool ParentRangeInCompaction(VersionStorageInfo* vstorage,
const InternalKey* smallest,
@ -113,11 +151,16 @@ class CompactionPicker {
const ImmutableCFOptions& ioptions_;
// A helper function to SanitizeCompactionInputFiles() that
// sanitizes "input_files" by adding necessary files.
virtual Status SanitizeCompactionInputFilesForAllLevels(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta,
const int output_level) const;
// record all the ongoing compactions for all levels
std::vector<std::set<Compaction*>> compactions_in_progress_;
private:
const InternalKeyComparator* const icmp_;
};
@ -131,11 +174,16 @@ class UniversalCompactionPicker : public CompactionPicker {
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override;
// The maxinum allowed input level. Always return 0.
// The maxinum allowed input level. Always returns 0.
virtual int MaxInputLevel(int current_num_levels) const override {
return 0;
}
// The maximum allowed output level. Always returns 0.
virtual int MaxOutputLevel() const override {
return 0;
}
private:
// Pick Universal compaction to limit read amplification
Compaction* PickCompactionUniversalReadAmp(
@ -197,10 +245,46 @@ class FIFOCompactionPicker : public CompactionPicker {
uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) override;
// The maxinum allowed input level. Always return 0.
// The maxinum allowed input level. Always returns 0.
virtual int MaxInputLevel(int current_num_levels) const override {
return 0;
}
// The maximum allowed output level. Always returns 0.
virtual int MaxOutputLevel() const override {
return 0;
}
};
class NullCompactionPicker : public CompactionPicker {
public:
NullCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp) :
CompactionPicker(ioptions, icmp) {}
virtual ~NullCompactionPicker() {}
// Always return "nullptr"
Compaction* PickCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override {
return nullptr;
}
// Always return "nullptr"
Compaction* CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level,
uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) override {
return nullptr;
}
// Given the current number of levels, returns the highest allowed level
// for compaction input.
virtual int MaxInputLevel(int current_num_levels) const {
return current_num_levels - 2;
}
};
// Utility function

@ -213,7 +213,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
#endif // ROCKSDB_LITE
bg_work_gate_closed_(false),
refitting_level_(false),
opened_successfully_(false) {
opened_successfully_(false),
notifying_events_(0) {
env_->GetAbsolutePath(dbname, &db_absolute_path_);
// Reserve ten files or so for other uses and give the rest to TableCache.
@ -239,6 +240,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
DBImpl::~DBImpl() {
mutex_.Lock();
if (flush_on_destroy_) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->mem()->IsEmpty()) {
@ -254,10 +256,10 @@ DBImpl::~DBImpl() {
// Wait for background work to finish
shutting_down_.store(true, std::memory_order_release);
while (bg_compaction_scheduled_ || bg_flush_scheduled_) {
while (bg_compaction_scheduled_ || bg_flush_scheduled_ || notifying_events_) {
bg_cv_.Wait();
}
listeners_.clear();
flush_scheduler_.Clear();
if (default_cf_handle_ != nullptr) {
@ -1055,7 +1057,8 @@ Status DBImpl::FlushMemTableToOutputFile(
db_directory_.get(), GetCompressionFlush(*cfd->ioptions()),
stats_);
Status s = flush_job.Run();
uint64_t file_number;
Status s = flush_job.Run(&file_number);
if (s.ok()) {
InstallSuperVersionBackground(cfd, job_context, mutable_cf_options);
@ -1085,9 +1088,42 @@ Status DBImpl::FlushMemTableToOutputFile(
bg_error_ = s;
}
RecordFlushIOStats();
#ifndef ROCKSDB_LITE
if (s.ok()) {
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, file_number);
}
#endif // ROCKSDB_LITE
return s;
}
void DBImpl::NotifyOnFlushCompleted(
ColumnFamilyData* cfd, uint64_t file_number) {
mutex_.AssertHeld();
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
bool triggered_flush_slowdown =
(cfd->current()->storage_info()->NumLevelFiles(0) >=
cfd->options()->level0_slowdown_writes_trigger);
bool triggered_flush_stop =
(cfd->current()->storage_info()->NumLevelFiles(0) >=
cfd->options()->level0_stop_writes_trigger);
notifying_events_++;
// release lock while notifying events
mutex_.Unlock();
// TODO(yhchiang): make db_paths dynamic.
cfd->NotifyOnFlushCompleted(
this, MakeTableFileName(db_options_.db_paths[0].path, file_number),
triggered_flush_slowdown,
triggered_flush_stop);
mutex_.Lock();
notifying_events_--;
assert(notifying_events_ >= 0);
// no need to signal bg_cv_ as it will be signaled at the end of the
// flush process.
}
Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
bool reduce_level, int target_level,
@ -1149,6 +1185,167 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
return s;
}
Status DBImpl::CompactFiles(
const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names,
const int output_level, const int output_path_id) {
MutexLock l(&mutex_);
if (column_family == nullptr) {
return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
}
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
assert(cfd);
// TODO(yhchiang): use superversion
cfd->Ref();
auto version = cfd->current();
version->Ref();
auto s = CompactFilesImpl(compact_options, cfd, version,
input_file_names, output_level, output_path_id);
// TODO(yhchiang): unref could move into CompactFilesImpl(). Otherwise,
// FindObsoleteFiles might never able to find any file to delete.
version->Unref();
// TODO(yhchiang): cfd should be deleted after its last reference.
cfd->Unref();
return s;
}
Status DBImpl::CompactFilesImpl(
const CompactionOptions& compact_options, ColumnFamilyData* cfd,
Version* version, const std::vector<std::string>& input_file_names,
const int output_level, int output_path_id) {
mutex_.AssertHeld();
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
std::unordered_set<uint64_t> input_set;
for (auto file_name : input_file_names) {
input_set.insert(TableFileNameToNumber(file_name));
}
ColumnFamilyMetaData cf_meta;
// TODO(yhchiang): can directly use version here if none of the
// following functions call is pluggable to external developers.
version->GetColumnFamilyMetaData(&cf_meta);
if (output_path_id < 0) {
if (db_options_.db_paths.size() == 1U) {
output_path_id = 0;
} else {
return Status::NotSupported(
"Automatic output path selection is not "
"yet supported in CompactFiles()");
}
}
Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
&input_set, cf_meta, output_level);
if (!s.ok()) {
return s;
}
autovector<CompactionInputFiles> input_files;
s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
&input_files, &input_set, version->storage_info(), compact_options);
if (!s.ok()) {
return s;
}
for (auto inputs : input_files) {
if (cfd->compaction_picker()->FilesInCompaction(inputs.files)) {
return Status::Aborted(
"Some of the necessary compaction input "
"files are already being compacted");
}
}
// At this point, CompactFiles will be run.
bg_compaction_scheduled_++;
unique_ptr<Compaction> c;
assert(cfd->compaction_picker());
c.reset(cfd->compaction_picker()->FormCompaction(
compact_options, input_files,
output_level, version->storage_info(),
*cfd->GetLatestMutableCFOptions()));
assert(c);
c->SetInputVersion(version);
c->SetOutputPathId(static_cast<uint32_t>(output_path_id));
// deletion compaction currently not allowed in CompactFiles.
assert(!c->IsDeletionCompaction());
JobContext job_context(true);
auto yield_callback = [&]() {
return CallFlushDuringCompaction(c->column_family_data(),
*c->mutable_cf_options(), &job_context,
&log_buffer);
};
CompactionJob compaction_job(
c.get(), db_options_, *c->mutable_cf_options(), env_options_,
versions_.get(), &mutex_, &shutting_down_, &pending_outputs_,
&log_buffer, db_directory_.get(), stats_, &snapshots_,
IsSnapshotSupported(), table_cache_, std::move(yield_callback));
compaction_job.Prepare();
mutex_.Unlock();
Status status = compaction_job.Run();
mutex_.Lock();
if (status.ok()) {
status = compaction_job.Install(status);
if (status.ok()) {
InstallSuperVersionBackground(c->column_family_data(), &job_context,
*c->mutable_cf_options());
}
}
c->ReleaseCompactionFiles(s);
c->ReleaseInputs();
c.reset();
if (status.ok()) {
// Done
} else if (status.IsShutdownInProgress()) {
// Ignore compaction errors found during shutting down
} else {
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "Compaction error: %s",
status.ToString().c_str());
if (db_options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status;
}
}
// If !s.ok(), this means that Compaction failed. In that case, we want
// to delete all obsolete files we might have created and we force
// FindObsoleteFiles(). This is because job_context does not
// catch all created files if compaction failed.
// TODO(yhchiang): write an unit-test to make sure files are actually
// deleted after CompactFiles.
FindObsoleteFiles(&job_context, !s.ok());
// delete unnecessary files if any, this is done outside the mutex
if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock();
// Have to flush the info logs before bg_compaction_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the deconstructor of DB can kick in and destroy all the
// states of DB so info_log might not be available after that point.
// It also applies to access other states that DB owns.
log_buffer.FlushBufferToLog();
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
mutex_.Lock();
}
bg_compaction_scheduled_--;
return status;
}
Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& options_map) {
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
@ -3112,6 +3309,17 @@ void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
MutexLock l(&mutex_);
versions_->GetLiveFilesMetaData(metadata);
}
void DBImpl::GetColumnFamilyMetaData(
ColumnFamilyHandle* column_family,
ColumnFamilyMetaData* cf_meta) {
assert(column_family);
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
auto* sv = GetAndRefSuperVersion(cfd);
sv->current->GetColumnFamilyMetaData(cf_meta);
ReturnAndCleanupSuperVersion(cfd, sv);
}
#endif // ROCKSDB_LITE
Status DBImpl::CheckConsistency() {
@ -3362,6 +3570,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
if (s.ok()) {
impl->opened_successfully_ = true;
*dbptr = impl;
// TODO(yhchiang): Add NotifyOnDatabaseOpen() here.
// Since the column-family handles are only available after DB::Open(),
// typically developers will need to pass the returned ColumnFamilyHandles
// to their EventListeners in order to maintain the mapping between
// column-family-name to ColumnFamilyHandle. However, some database
// events might happen before the user passing those ColumnFamilyHandle to
// their Listeners. To address this, we should have NotifyOnDatabaseOpen()
// here which passes the created ColumnFamilyHandle to the Listeners
// as the first event after DB::Open().
} else {
for (auto h : *handles) {
delete h;

@ -14,6 +14,7 @@
#include <set>
#include <list>
#include <utility>
#include <list>
#include <vector>
#include <string>
@ -115,6 +116,13 @@ class DBImpl : public DB {
bool reduce_level = false, int target_level = -1,
uint32_t target_path_id = 0);
using DB::CompactFiles;
virtual Status CompactFiles(
const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names,
const int output_level, const int output_path_id = -1);
using DB::SetOptions;
Status SetOptions(ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& options_map);
@ -152,6 +160,15 @@ class DBImpl : public DB {
virtual Status DeleteFile(std::string name);
virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata);
// Obtains the meta data of the specified column family of the DB.
// Status::NotFound() will be returned if the current DB does not have
// any column family match the specified name.
// TODO(yhchiang): output parameter is placed in the end in this codebase.
virtual void GetColumnFamilyMetaData(
ColumnFamilyHandle* column_family,
ColumnFamilyMetaData* metadata) override;
#endif // ROCKSDB_LITE
// checks if all live files exist on file system and that their file sizes
@ -211,7 +228,7 @@ class DBImpl : public DB {
// REQUIRES: mutex locked
// pass the pointer that you got from TEST_BeginWrite()
void TEST_EndWrite(void* w);
#endif // NDEBUG
#endif // ROCKSDB_LITE
// Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'candidate_files'.
@ -239,6 +256,8 @@ class DBImpl : public DB {
Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd,
SuperVersion* super_version, Arena* arena);
void NotifyOnFlushCompleted(ColumnFamilyData* cfd, uint64_t file_number);
private:
friend class DB;
friend class InternalStats;
@ -318,6 +337,13 @@ class DBImpl : public DB {
void RecordFlushIOStats();
void RecordCompactionIOStats();
Status CompactFilesImpl(
const CompactionOptions& compact_options, ColumnFamilyData* cfd,
Version* version, const std::vector<std::string>& input_file_names,
const int output_level, int output_path_id);
ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
void MaybeScheduleFlushOrCompaction();
static void BGWorkCompaction(void* db);
static void BGWorkFlush(void* db);
@ -488,6 +514,12 @@ class DBImpl : public DB {
// Indicate DB was opened successfully
bool opened_successfully_;
// The list of registered event listeners.
std::list<EventListener*> listeners_;
// count how many events are currently being notified.
int notifying_events_;
// No copying allowed
DBImpl(const DBImpl&);
void operator=(const DBImpl&);

@ -62,10 +62,20 @@ class DBImplReadOnly : public DBImpl {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::CompactFiles;
virtual Status CompactFiles(
const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names,
const int output_level, const int output_path_id = -1) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
#ifndef ROCKSDB_LITE
virtual Status DisableFileDeletions() override {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status EnableFileDeletions(bool force) override {
return Status::NotSupported("Not supported operation in read only mode.");
}

@ -15,6 +15,7 @@
#include <unordered_set>
#include <utility>
#include "db/filename.h"
#include "db/dbformat.h"
#include "db/db_impl.h"
#include "db/filename.h"
@ -4060,8 +4061,43 @@ TEST(DBTest, UniversalCompactionFourPaths) {
Destroy(options);
}
#endif
void CheckColumnFamilyMeta(const ColumnFamilyMetaData& cf_meta) {
uint64_t cf_size = 0;
uint64_t cf_csize = 0;
size_t file_count = 0;
for (auto level_meta : cf_meta.levels) {
uint64_t level_size = 0;
uint64_t level_csize = 0;
file_count += level_meta.files.size();
for (auto file_meta : level_meta.files) {
level_size += file_meta.size;
}
ASSERT_EQ(level_meta.size, level_size);
cf_size += level_size;
cf_csize += level_csize;
}
ASSERT_EQ(cf_meta.file_count, file_count);
ASSERT_EQ(cf_meta.size, cf_size);
}
TEST(DBTest, ColumnFamilyMetaDataTest) {
Options options = CurrentOptions();
options.create_if_missing = true;
DestroyAndReopen(options);
Random rnd(301);
int key_index = 0;
ColumnFamilyMetaData cf_meta;
for (int i = 0; i < 100; ++i) {
GenerateNewFile(&rnd, &key_index);
db_->GetColumnFamilyMetaData(&cf_meta);
CheckColumnFamilyMeta(cf_meta);
}
}
TEST(DBTest, ConvertCompactionStyle) {
Random rnd(301);
int max_key_level_insert = 200;
@ -4238,7 +4274,7 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits,
TEST(DBTest, MinLevelToCompress1) {
Options options = CurrentOptions();
CompressionType type;
CompressionType type = kSnappyCompression;
if (!MinLevelToCompress(type, options, -14, -1, 0)) {
return;
}
@ -4258,7 +4294,7 @@ TEST(DBTest, MinLevelToCompress1) {
TEST(DBTest, MinLevelToCompress2) {
Options options = CurrentOptions();
CompressionType type;
CompressionType type = kSnappyCompression;
if (!MinLevelToCompress(type, options, 15, -1, 0)) {
return;
}
@ -7246,6 +7282,15 @@ class ModelDB: public DB {
return Status::NotSupported("Not supported operation.");
}
using DB::CompactFiles;
virtual Status CompactFiles(
const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names,
const int output_level, const int output_path_id = -1) override {
return Status::NotSupported("Not supported operation.");
}
using DB::NumberLevels;
virtual int NumberLevels(ColumnFamilyHandle* column_family) { return 1; }
@ -7314,6 +7359,10 @@ class ModelDB: public DB {
virtual ColumnFamilyHandle* DefaultColumnFamily() const { return nullptr; }
virtual void GetColumnFamilyMetaData(
ColumnFamilyHandle* column_family,
ColumnFamilyMetaData* metadata) {}
private:
class ModelIter: public Iterator {
public:
@ -8202,6 +8251,211 @@ TEST(DBTest, RateLimitingTest) {
ASSERT_TRUE(ratio < 0.6);
}
namespace {
bool HaveOverlappingKeyRanges(
const Comparator* c,
const SstFileMetaData& a, const SstFileMetaData& b) {
if (c->Compare(a.smallestkey, b.smallestkey) >= 0) {
if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
// b.smallestkey <= a.smallestkey <= b.largestkey
return true;
}
} else if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
// a.smallestkey < b.smallestkey <= a.largestkey
return true;
}
if (c->Compare(a.largestkey, b.largestkey) <= 0) {
if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
// b.smallestkey <= a.largestkey <= b.largestkey
return true;
}
} else if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
// a.smallestkey <= b.largestkey < a.largestkey
return true;
}
return false;
}
// Identifies all files between level "min_level" and "max_level"
// which has overlapping key range with "input_file_meta".
void GetOverlappingFileNumbersForLevelCompaction(
const ColumnFamilyMetaData& cf_meta,
const Comparator* comparator,
int min_level, int max_level,
const SstFileMetaData* input_file_meta,
std::set<std::string>* overlapping_file_names) {
std::set<const SstFileMetaData*> overlapping_files;
overlapping_files.insert(input_file_meta);
for (int m = min_level; m <= max_level; ++m) {
for (auto& file : cf_meta.levels[m].files) {
for (auto* included_file : overlapping_files) {
if (HaveOverlappingKeyRanges(
comparator, *included_file, file)) {
overlapping_files.insert(&file);
overlapping_file_names->insert(file.name);
break;
}
}
}
}
}
void VerifyCompactionResult(
const ColumnFamilyMetaData& cf_meta,
const std::set<std::string>& overlapping_file_numbers) {
for (auto& level : cf_meta.levels) {
for (auto& file : level.files) {
assert(overlapping_file_numbers.find(file.name) ==
overlapping_file_numbers.end());
}
}
}
const SstFileMetaData* PickFileRandomly(
const ColumnFamilyMetaData& cf_meta,
Random* rand,
int* level = nullptr) {
auto file_id = rand->Uniform(static_cast<int>(
cf_meta.file_count)) + 1;
for (auto& level_meta : cf_meta.levels) {
if (file_id <= level_meta.files.size()) {
if (level != nullptr) {
*level = level_meta.level;
}
auto result = rand->Uniform(file_id);
return &(level_meta.files[result]);
}
file_id -= level_meta.files.size();
}
assert(false);
return nullptr;
}
} // namespace
TEST(DBTest, CompactFilesOnLevelCompaction) {
const int kKeySize = 16;
const int kValueSize = 984;
const int kEntrySize = kKeySize + kValueSize;
const int kEntriesPerBuffer = 100;
Options options;
options.create_if_missing = true;
options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
options.compaction_style = kCompactionStyleLevel;
options.target_file_size_base = options.write_buffer_size;
options.max_bytes_for_level_base = options.target_file_size_base * 2;
options.level0_stop_writes_trigger = 2;
options.max_bytes_for_level_multiplier = 2;
options.compression = kNoCompression;
options = CurrentOptions(options);
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
ASSERT_OK(Put(1, std::to_string(key), RandomString(&rnd, kValueSize)));
}
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForCompact();
ColumnFamilyMetaData cf_meta;
dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
int output_level = cf_meta.levels.size() - 1;
for (int file_picked = 5; file_picked > 0; --file_picked) {
std::set<std::string> overlapping_file_names;
std::vector<std::string> compaction_input_file_names;
for (int f = 0; f < file_picked; ++f) {
int level;
auto file_meta = PickFileRandomly(cf_meta, &rnd, &level);
compaction_input_file_names.push_back(file_meta->name);
GetOverlappingFileNumbersForLevelCompaction(
cf_meta, options.comparator, level, output_level,
file_meta, &overlapping_file_names);
}
ASSERT_OK(dbfull()->CompactFiles(
CompactionOptions(), handles_[1],
compaction_input_file_names,
output_level));
// Make sure all overlapping files do not exist after compaction
dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
VerifyCompactionResult(cf_meta, overlapping_file_names);
}
// make sure all key-values are still there.
for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
ASSERT_NE(Get(1, std::to_string(key)), "NOT_FOUND");
}
}
TEST(DBTest, CompactFilesOnUniversalCompaction) {
const int kKeySize = 16;
const int kValueSize = 984;
const int kEntrySize = kKeySize + kValueSize;
const int kEntriesPerBuffer = 10;
ChangeCompactOptions();
Options options;
options.create_if_missing = true;
options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
options.compaction_style = kCompactionStyleLevel;
options.target_file_size_base = options.write_buffer_size;
options.compression = kNoCompression;
options = CurrentOptions(options);
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(options.compaction_style, kCompactionStyleUniversal);
Random rnd(301);
for (int key = 1024 * kEntriesPerBuffer; key >= 0; --key) {
ASSERT_OK(Put(1, std::to_string(key), RandomString(&rnd, kValueSize)));
}
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForCompact();
ColumnFamilyMetaData cf_meta;
dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
std::vector<std::string> compaction_input_file_names;
for (auto file : cf_meta.levels[0].files) {
if (rnd.OneIn(2)) {
compaction_input_file_names.push_back(file.name);
}
}
if (compaction_input_file_names.size() == 0) {
compaction_input_file_names.push_back(
cf_meta.levels[0].files[0].name);
}
// expect fail since universal compaction only allow L0 output
ASSERT_TRUE(!dbfull()->CompactFiles(
CompactionOptions(), handles_[1],
compaction_input_file_names, 1).ok());
// expect ok and verify the compacted files no longer exist.
ASSERT_OK(dbfull()->CompactFiles(
CompactionOptions(), handles_[1],
compaction_input_file_names, 0));
dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
VerifyCompactionResult(
cf_meta,
std::set<std::string>(compaction_input_file_names.begin(),
compaction_input_file_names.end()));
compaction_input_file_names.clear();
// Pick the first and the last file, expect everything is
// compacted into one single file.
compaction_input_file_names.push_back(
cf_meta.levels[0].files[0].name);
compaction_input_file_names.push_back(
cf_meta.levels[0].files[
cf_meta.levels[0].files.size() - 1].name);
ASSERT_OK(dbfull()->CompactFiles(
CompactionOptions(), handles_[1],
compaction_input_file_names, 0));
dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
ASSERT_EQ(cf_meta.levels[0].files.size(), 1U);
}
TEST(DBTest, TableOptionsSanitizeTest) {
Options options = CurrentOptions();
options.create_if_missing = true;
@ -9079,7 +9333,6 @@ TEST(DBTest, DontDeletePendingOutputs) {
Compact("a", "b");
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -79,6 +79,17 @@ std::string MakeTableFileName(const std::string& path, uint64_t number) {
return MakeFileName(path, number, "sst");
}
uint64_t TableFileNameToNumber(const std::string& name) {
uint64_t number = 0;
uint64_t base = 1;
int pos = static_cast<int>(name.find_last_of('.'));
while (--pos >= 0 && name[pos] >= '0' && name[pos] <= '9') {
number += (name[pos] - '0') * base;
base *= 10;
}
return number;
}
std::string TableFileName(const std::vector<DbPath>& db_paths, uint64_t number,
uint32_t path_id) {
assert(number > 0);

@ -52,6 +52,10 @@ extern std::string ArchivedLogFileName(const std::string& dbname,
extern std::string MakeTableFileName(const std::string& name, uint64_t number);
// the reverse function of MakeTableFileName
// TODO(yhchiang): could merge this function with ParseFileName()
extern uint64_t TableFileNameToNumber(const std::string& name);
// Return the name of the sstable with the specified number
// in the db named by "dbname". The result will be prefixed with
// "dbname".

@ -73,9 +73,9 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
output_compression_(output_compression),
stats_(stats) {}
Status FlushJob::Run() {
Status FlushJob::Run(uint64_t* file_number) {
// Save the contents of the earliest memtable as a new Table
uint64_t file_number;
uint64_t fn;
autovector<MemTable*> mems;
cfd_->imm()->PickMemtablesToFlush(&mems);
if (mems.empty()) {
@ -96,7 +96,7 @@ Status FlushJob::Run() {
edit->SetColumnFamily(cfd_->GetID());
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table(mems, edit, &file_number);
Status s = WriteLevel0Table(mems, edit, &fn);
if (s.ok() &&
(shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
@ -113,6 +113,9 @@ Status FlushJob::Run() {
&job_context_->memtables_to_free, db_directory_, log_buffer_);
}
if (s.ok() && file_number != nullptr) {
*file_number = fn;
}
return s;
}

@ -60,7 +60,7 @@ class FlushJob {
CompressionType output_compression, Statistics* stats);
~FlushJob() {}
Status Run();
Status Run(uint64_t* file_number = nullptr);
private:
Status WriteLevel0Table(const autovector<MemTable*>& mems, VersionEdit* edit,

@ -0,0 +1,344 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/dbformat.h"
#include "db/db_impl.h"
#include "db/filename.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/table.h"
#include "rocksdb/options.h"
#include "rocksdb/table_properties.h"
#include "table/block_based_table_factory.h"
#include "table/plain_table_factory.h"
#include "util/hash.h"
#include "util/hash_linklist_rep.h"
#include "utilities/merge_operators.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/rate_limiter.h"
#include "util/statistics.h"
#include "util/testharness.h"
#include "util/sync_point.h"
#include "util/testutil.h"
#ifndef ROCKSDB_LITE
namespace rocksdb {
class EventListenerTest {
public:
EventListenerTest() {
dbname_ = test::TmpDir() + "/listener_test";
ASSERT_OK(DestroyDB(dbname_, Options()));
db_ = nullptr;
Reopen();
}
~EventListenerTest() {
Close();
Options options;
options.db_paths.emplace_back(dbname_, 0);
options.db_paths.emplace_back(dbname_ + "_2", 0);
options.db_paths.emplace_back(dbname_ + "_3", 0);
options.db_paths.emplace_back(dbname_ + "_4", 0);
ASSERT_OK(DestroyDB(dbname_, options));
}
void CreateColumnFamilies(const std::vector<std::string>& cfs,
const ColumnFamilyOptions* options = nullptr) {
ColumnFamilyOptions cf_opts;
cf_opts = ColumnFamilyOptions(Options());
int cfi = handles_.size();
handles_.resize(cfi + cfs.size());
for (auto cf : cfs) {
ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]));
}
}
void Close() {
for (auto h : handles_) {
delete h;
}
handles_.clear();
delete db_;
db_ = nullptr;
}
void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const Options* options = nullptr) {
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
}
Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
const Options* options = nullptr) {
Close();
Options opts = (options == nullptr) ? Options() : *options;
std::vector<const Options*> v_opts(cfs.size(), &opts);
return TryReopenWithColumnFamilies(cfs, v_opts);
}
Status TryReopenWithColumnFamilies(
const std::vector<std::string>& cfs,
const std::vector<const Options*>& options) {
Close();
ASSERT_EQ(cfs.size(), options.size());
std::vector<ColumnFamilyDescriptor> column_families;
for (size_t i = 0; i < cfs.size(); ++i) {
column_families.push_back(ColumnFamilyDescriptor(cfs[i], *options[i]));
}
DBOptions db_opts = DBOptions(*options[0]);
return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
}
Status TryReopen(Options* options = nullptr) {
Close();
Options opts;
if (options != nullptr) {
opts = *options;
} else {
opts.create_if_missing = true;
}
return DB::Open(opts, dbname_, &db_);
}
void Reopen(Options* options = nullptr) {
ASSERT_OK(TryReopen(options));
}
void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
const Options* options = nullptr) {
CreateColumnFamilies(cfs, options);
std::vector<std::string> cfs_plus_default = cfs;
cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
ReopenWithColumnFamilies(cfs_plus_default, options);
}
DBImpl* dbfull() {
return reinterpret_cast<DBImpl*>(db_);
}
Status Put(int cf, const Slice& k, const Slice& v,
WriteOptions wo = WriteOptions()) {
return db_->Put(wo, handles_[cf], k, v);
}
Status Flush(int cf = 0) {
if (cf == 0) {
return db_->Flush(FlushOptions());
} else {
return db_->Flush(FlushOptions(), handles_[cf]);
}
}
DB* db_;
std::string dbname_;
std::vector<ColumnFamilyHandle*> handles_;
};
class TestFlushListener : public EventListener {
public:
void OnFlushCompleted(
DB* db, const std::string& name,
const std::string& file_path,
bool triggered_writes_slowdown,
bool triggered_writes_stop) override {
flushed_dbs_.push_back(db);
flushed_column_family_names_.push_back(name);
if (triggered_writes_slowdown) {
slowdown_count++;
}
if (triggered_writes_stop) {
stop_count++;
}
}
std::vector<std::string> flushed_column_family_names_;
std::vector<DB*> flushed_dbs_;
int slowdown_count;
int stop_count;
};
TEST(EventListenerTest, OnSingleDBFlushTest) {
Options options;
TestFlushListener* listener = new TestFlushListener();
options.listeners.emplace_back(listener);
std::vector<std::string> cf_names = {
"pikachu", "ilya", "muromec", "dobrynia",
"nikitich", "alyosha", "popovich"};
CreateAndReopenWithCF(cf_names, &options);
ASSERT_OK(Put(1, "pikachu", "pikachu"));
ASSERT_OK(Put(2, "ilya", "ilya"));
ASSERT_OK(Put(3, "muromec", "muromec"));
ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
ASSERT_OK(Put(5, "nikitich", "nikitich"));
ASSERT_OK(Put(6, "alyosha", "alyosha"));
ASSERT_OK(Put(7, "popovich", "popovich"));
for (size_t i = 1; i < 8; ++i) {
Flush(i);
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(listener->flushed_dbs_.size(), i);
ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
}
// make sure call-back functions are called in the right order
for (size_t i = 0; i < cf_names.size(); ++i) {
ASSERT_EQ(listener->flushed_dbs_[i], db_);
ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
}
}
TEST(EventListenerTest, MultiCF) {
Options options;
TestFlushListener* listener = new TestFlushListener();
options.listeners.emplace_back(listener);
std::vector<std::string> cf_names = {
"pikachu", "ilya", "muromec", "dobrynia",
"nikitich", "alyosha", "popovich"};
CreateAndReopenWithCF(cf_names, &options);
ASSERT_OK(Put(1, "pikachu", "pikachu"));
ASSERT_OK(Put(2, "ilya", "ilya"));
ASSERT_OK(Put(3, "muromec", "muromec"));
ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
ASSERT_OK(Put(5, "nikitich", "nikitich"));
ASSERT_OK(Put(6, "alyosha", "alyosha"));
ASSERT_OK(Put(7, "popovich", "popovich"));
for (size_t i = 1; i < 8; ++i) {
Flush(i);
ASSERT_EQ(listener->flushed_dbs_.size(), i);
ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
}
// make sure call-back functions are called in the right order
for (size_t i = 0; i < cf_names.size(); i++) {
ASSERT_EQ(listener->flushed_dbs_[i], db_);
ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
}
}
TEST(EventListenerTest, MultiDBMultiListeners) {
std::vector<TestFlushListener*> listeners;
const int kNumDBs = 5;
const int kNumListeners = 10;
for (int i = 0; i < kNumListeners; ++i) {
listeners.emplace_back(new TestFlushListener());
}
std::vector<std::string> cf_names = {
"pikachu", "ilya", "muromec", "dobrynia",
"nikitich", "alyosha", "popovich"};
Options options;
options.create_if_missing = true;
for (int i = 0; i < kNumListeners; ++i) {
options.listeners.emplace_back(listeners[i]);
}
DBOptions db_opts(options);
ColumnFamilyOptions cf_opts(options);
std::vector<DB*> dbs;
std::vector<std::vector<ColumnFamilyHandle *>> vec_handles;
for (int d = 0; d < kNumDBs; ++d) {
ASSERT_OK(DestroyDB(dbname_ + std::to_string(d), options));
DB* db;
std::vector<ColumnFamilyHandle*> handles;
ASSERT_OK(DB::Open(options, dbname_ + std::to_string(d), &db));
for (size_t c = 0; c < cf_names.size(); ++c) {
ColumnFamilyHandle* handle;
db->CreateColumnFamily(cf_opts, cf_names[c], &handle);
handles.push_back(handle);
}
vec_handles.push_back(std::move(handles));
dbs.push_back(db);
}
for (int d = 0; d < kNumDBs; ++d) {
for (size_t c = 0; c < cf_names.size(); ++c) {
ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c],
cf_names[c], cf_names[c]));
}
}
for (size_t c = 0; c < cf_names.size(); ++c) {
for (int d = 0; d < kNumDBs; ++d) {
ASSERT_OK(dbs[d]->Flush(FlushOptions(), vec_handles[d][c]));
reinterpret_cast<DBImpl*>(dbs[d])->TEST_WaitForFlushMemTable();
}
}
for (auto* listener : listeners) {
int pos = 0;
for (size_t c = 0; c < cf_names.size(); ++c) {
for (int d = 0; d < kNumDBs; ++d) {
ASSERT_EQ(listener->flushed_dbs_[pos], dbs[d]);
ASSERT_EQ(listener->flushed_column_family_names_[pos], cf_names[c]);
pos++;
}
}
}
for (auto handles : vec_handles) {
for (auto h : handles) {
delete h;
}
handles.clear();
}
vec_handles.clear();
for (auto db : dbs) {
delete db;
}
}
TEST(EventListenerTest, DisableBGCompaction) {
Options options;
TestFlushListener* listener = new TestFlushListener();
const int kSlowdownTrigger = 5;
const int kStopTrigger = 10;
options.level0_slowdown_writes_trigger = kSlowdownTrigger;
options.level0_stop_writes_trigger = kStopTrigger;
options.listeners.emplace_back(listener);
// BG compaction is disabled. Number of L0 files will simply keeps
// increasing in this test.
options.compaction_style = kCompactionStyleNone;
options.compression = kNoCompression;
options.write_buffer_size = 100000; // Small write buffer
CreateAndReopenWithCF({"pikachu"}, &options);
WriteOptions wopts;
wopts.timeout_hint_us = 100000;
ColumnFamilyMetaData cf_meta;
db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
// keep writing until writes are forced to stop.
for (int i = 0; static_cast<int>(cf_meta.file_count) < kStopTrigger; ++i) {
Put(1, std::to_string(i), std::string(100000, 'x'), wopts);
db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
}
ASSERT_GE(listener->slowdown_count, kStopTrigger - kSlowdownTrigger);
ASSERT_GE(listener->stop_count, 1);
}
} // namespace rocksdb
#endif // ROCKSDB_LITE
int main(int argc, char** argv) {
return rocksdb::test::RunAllTests();
}

@ -14,13 +14,14 @@
#endif
#include <inttypes.h>
#include <stdio.h>
#include <algorithm>
#include <map>
#include <set>
#include <climits>
#include <unordered_map>
#include <vector>
#include <stdio.h>
#include <string>
#include "db/filename.h"
#include "db/log_reader.h"
@ -599,6 +600,49 @@ size_t Version::GetMemoryUsageByTableReaders() {
return total_usage;
}
void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
assert(cf_meta);
assert(cfd_);
cf_meta->name = cfd_->GetName();
cf_meta->size = 0;
cf_meta->file_count = 0;
cf_meta->levels.clear();
auto* ioptions = cfd_->ioptions();
auto* vstorage = storage_info();
for (int level = 0; level < cfd_->NumberLevels(); level++) {
uint64_t level_size = 0;
cf_meta->file_count += vstorage->LevelFiles(level).size();
std::vector<SstFileMetaData> files;
for (const auto& file : vstorage->LevelFiles(level)) {
uint32_t path_id = file->fd.GetPathId();
std::string file_path;
if (path_id < ioptions->db_paths.size()) {
file_path = ioptions->db_paths[path_id].path;
} else {
assert(!ioptions->db_paths.empty());
file_path = ioptions->db_paths.back().path;
}
files.emplace_back(
MakeTableFileName("", file->fd.GetNumber()),
file_path,
file->fd.GetFileSize(),
file->smallest_seqno,
file->largest_seqno,
file->smallest.user_key().ToString(),
file->largest.user_key().ToString(),
file->being_compacted);
level_size += file->fd.GetFileSize();
}
cf_meta->levels.emplace_back(
level, level_size, std::move(files));
cf_meta->size += level_size;
}
}
uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
// Estimation will be not accurate when:
// (1) there is merge keys
@ -2645,12 +2689,10 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
c->column_family_data()->GetName().c_str());
}
// verify files in level
int level = c->level();
for (int i = 0; i < c->num_input_files(0); i++) {
uint64_t number = c->input(0, i)->fd.GetNumber();
// look for this file in the current version
for (int input = 0; input < c->num_input_levels(); ++input) {
int level = c->level(input);
for (int i = 0; i < c->num_input_files(input); ++i) {
uint64_t number = c->input(input, i)->fd.GetNumber();
bool found = false;
for (unsigned int j = 0; j < vstorage->files_[level].size(); j++) {
FileMetaData* f = vstorage->files_[level][j];
@ -2660,26 +2702,9 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
}
}
if (!found) {
return false; // input files non existant in current version
return false; // input files non existent in current version
}
}
// verify level+1 files
level++;
for (int i = 0; i < c->num_input_files(1); i++) {
uint64_t number = c->input(1, i)->fd.GetNumber();
// look for this file in the current version
bool found = false;
for (unsigned int j = 0; j < vstorage->files_[level].size(); j++) {
FileMetaData* f = vstorage->files_[level][j];
if (f->fd.GetNumber() == number) {
found = true;
break;
}
}
if (!found) {
return false; // input files non existant in current version
}
}
#endif
return true; // everything good

@ -420,6 +420,8 @@ class Version {
VersionSet* version_set() { return vset_; }
void GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta);
private:
friend class VersionSet;
@ -598,7 +600,7 @@ class VersionSet {
Status GetMetadataForFile(uint64_t number, int* filelevel,
FileMetaData** metadata, ColumnFamilyData** cfd);
void GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata);
void GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata);
void GetObsoleteFiles(std::vector<FileMetaData*>* files);
@ -609,6 +611,7 @@ class VersionSet {
struct ManifestWriter;
friend class Version;
friend class DBImpl;
struct LogReporter : public log::Reader::Reporter {
Status* status;

@ -2,7 +2,7 @@ include ../build_config.mk
.PHONY: main clean
all: simple_example column_families_example
all: simple_example column_families_example compact_files_example
simple_example: simple_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
@ -10,5 +10,8 @@ simple_example: simple_example.cc
column_families_example: column_families_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
clean: simple_example column_families_example
rm -rf ./simple_example ./column_families_example
compact_files_example: compact_files_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
clean: simple_example column_families_example compact_files_example
rm -rf ./simple_example ./column_families_example ./compact_files_example

@ -0,0 +1,175 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// An example code demonstrating how to use CompactFiles, EventListener,
// and GetColumnFamilyMetaData APIs to implement custom compaction algorithm.
#include <mutex>
#include <string>
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
using namespace rocksdb;
std::string kDBPath = "/tmp/rocksdb_compact_files_example";
class CompactionTask;
// This is an example interface of external-compaction algorithm.
// Compaction algorithm can be implemented outside the core-RocksDB
// code by using the pluggable compaction APIs that RocksDb provides.
class Compactor : public EventListener {
public:
// Picks and returns a compaction task given the specified DB
// and column family. It is the caller's responsibility to
// destroy the returned CompactionTask. Returns "nullptr"
// if it cannot find a proper compaction task.
virtual CompactionTask* PickCompaction(
DB* db, const std::string& cf_name) = 0;
// Schedule and run the specified compaction task in background.
virtual void ScheduleCompaction(CompactionTask *task) = 0;
};
// Example structure that describes a compaction task.
struct CompactionTask {
CompactionTask(
DB* db, Compactor* compactor,
const std::string& column_family_name,
const std::vector<std::string>& input_file_names,
const int output_level,
const CompactionOptions& compact_options,
bool retry_on_fail)
: db(db),
compactor(compactor),
column_family_name(column_family_name),
input_file_names(input_file_names),
output_level(output_level),
compact_options(compact_options),
retry_on_fail(false) {}
DB* db;
Compactor* compactor;
const std::string& column_family_name;
std::vector<std::string> input_file_names;
int output_level;
CompactionOptions compact_options;
bool retry_on_fail;
};
// A simple compaction algorithm that always compacts everything
// to the highest level whenever possible.
class FullCompactor : public Compactor {
public:
explicit FullCompactor(const Options options) : options_(options) {
compact_options_.compression = options_.compression;
compact_options_.output_file_size_limit =
options_.target_file_size_base;
}
// When flush happens, it determins whether to trigger compaction.
// If triggered_writes_stop is true, it will also set the retry
// flag of compaction-task to true.
void OnFlushCompleted(
DB* db, const std::string& cf_name,
const std::string& file_path,
bool triggered_writes_slowdown,
bool triggered_writes_stop) override {
CompactionTask* task = PickCompaction(db, cf_name);
if (task != nullptr) {
if (triggered_writes_stop) {
task->retry_on_fail = true;
}
// Schedule compaction in a different thread.
ScheduleCompaction(task);
}
}
// Always pick a compaction which includes all files whenever possible.
CompactionTask* PickCompaction(
DB* db, const std::string& cf_name) override {
ColumnFamilyMetaData cf_meta;
db->GetColumnFamilyMetaData(&cf_meta);
std::vector<std::string> input_file_names;
for (auto level : cf_meta.levels) {
for (auto file : level.files) {
if (file.being_compacted) {
return nullptr;
}
input_file_names.push_back(file.name);
}
}
return new CompactionTask(
db, this, cf_name, input_file_names,
options_.num_levels - 1, compact_options_, false);
}
// Schedule the specified compaction task in background.
void ScheduleCompaction(CompactionTask* task) override {
options_.env->Schedule(&FullCompactor::CompactFiles, task);
}
static void CompactFiles(void* arg) {
CompactionTask* task = reinterpret_cast<CompactionTask*>(arg);
assert(task);
assert(task->db);
Status s = task->db->CompactFiles(
task->compact_options,
task->input_file_names,
task->output_level);
printf("CompactFiles() finished with status %s\n", s.ToString().c_str());
if (!s.ok() && !s.IsIOError() && task->retry_on_fail) {
// If a compaction task with its retry_on_fail=true failed,
// try to schedule another compaction in case the reason
// is not an IO error.
CompactionTask* new_task = task->compactor->PickCompaction(
task->db, task->column_family_name);
task->compactor->ScheduleCompaction(new_task);
}
// release the task
delete task;
}
private:
Options options_;
CompactionOptions compact_options_;
};
int main() {
Options options;
options.create_if_missing = true;
// Disable RocksDB background compaction.
options.compaction_style = kCompactionStyleNone;
// Small slowdown and stop trigger for experimental purpose.
options.level0_slowdown_writes_trigger = 3;
options.level0_stop_writes_trigger = 5;
options.IncreaseParallelism(5);
options.listeners.emplace_back(new FullCompactor(options));
DB* db = nullptr;
DestroyDB(kDBPath, options);
Status s = DB::Open(options, kDBPath, &db);
assert(s.ok());
assert(db);
// if background compaction is not working, write will stall
// because of options.level0_stop_writes_trigger
for (int i = 1000; i < 99999; ++i) {
db->Put(WriteOptions(), std::to_string(i),
std::string(500, 'a' + (i % 26)));
}
// verify the values are still there
std::string value;
for (int i = 1000; i < 99999; ++i) {
db->Get(ReadOptions(), std::to_string(i),
&value);
assert(value == std::string(500, 'a' + (i % 26)));
}
// close the db.
delete db;
return 0;
}

@ -15,19 +15,34 @@
#include <vector>
#include <string>
#include <unordered_map>
#include "rocksdb/metadata.h"
#include "rocksdb/version.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/types.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/listener.h"
namespace rocksdb {
struct Options;
struct DBOptions;
struct ColumnFamilyOptions;
struct ReadOptions;
struct WriteOptions;
struct FlushOptions;
struct CompactionOptions;
struct TableProperties;
class WriteBatch;
class Env;
class EventListener;
using std::unique_ptr;
class ColumnFamilyHandle {
public:
virtual ~ColumnFamilyHandle() {}
virtual const std::string& GetName() const = 0;
};
extern const std::string kDefaultColumnFamilyName;
@ -44,27 +59,6 @@ struct ColumnFamilyDescriptor {
static const int kMajorVersion = __ROCKSDB_MAJOR__;
static const int kMinorVersion = __ROCKSDB_MINOR__;
struct Options;
struct ReadOptions;
struct WriteOptions;
struct FlushOptions;
struct TableProperties;
class WriteBatch;
class Env;
// Metadata associated with each SST file.
struct LiveFileMetaData {
std::string column_family_name; // Name of the column family
std::string db_path;
std::string name; // Name of the file
int level; // Level at which this file resides.
size_t size; // File size in bytes.
std::string smallestkey; // Smallest user defined key in the file.
std::string largestkey; // Largest user defined key in the file.
SequenceNumber smallest_seqno; // smallest seqno in file
SequenceNumber largest_seqno; // largest seqno in file
};
// Abstract handle to particular state of a DB.
// A Snapshot is an immutable object and can therefore be safely
// accessed from multiple threads without any external synchronization.
@ -370,6 +364,26 @@ class DB {
return SetOptions(DefaultColumnFamily(), new_options);
}
// CompactFiles() inputs a list of files specified by file numbers
// and compacts them to the specified level. Note that the behavior
// is different from CompactRange in that CompactFiles() will
// perform the compaction job using the CURRENT thread.
//
// @see GetDataBaseMetaData
// @see GetColumnFamilyMetaData
virtual Status CompactFiles(
const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names,
const int output_level, const int output_path_id = -1) = 0;
virtual Status CompactFiles(
const CompactionOptions& compact_options,
const std::vector<std::string>& input_file_names,
const int output_level, const int output_path_id = -1) {
return CompactFiles(compact_options, DefaultColumnFamily(),
input_file_names, output_level, output_path_id);
}
// Number of levels used for this DB.
virtual int NumberLevels(ColumnFamilyHandle* column_family) = 0;
virtual int NumberLevels() { return NumberLevels(DefaultColumnFamily()); }
@ -476,6 +490,21 @@ class DB {
// and end key
virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {}
// Obtains the meta data of the specified column family of the DB.
// Status::NotFound() will be returned if the current DB does not have
// any column family match the specified name.
//
// If cf_name is not specified, then the metadata of the default
// column family will be returned.
virtual void GetColumnFamilyMetaData(
ColumnFamilyHandle* column_family,
ColumnFamilyMetaData* metadata) {}
// Get the metadata of the default column family.
virtual void GetColumnFamilyMetaData(
ColumnFamilyMetaData* metadata) {
GetColumnFamilyMetaData(DefaultColumnFamily(), metadata);
}
#endif // ROCKSDB_LITE
// Sets the globally unique ID created at database creation time by invoking

@ -90,6 +90,10 @@ struct ImmutableCFOptions {
Options::AccessHint access_hint_on_compaction_start;
int num_levels;
// A vector of EventListeners which call-back functions will be called
// when specific RocksDB event happens.
std::vector<std::shared_ptr<EventListener>> listeners;
};
} // namespace rocksdb

@ -0,0 +1,65 @@
// Copyright (c) 2014 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#ifndef ROCKSDB_LITE
#include <string>
#include "rocksdb/status.h"
namespace rocksdb {
class DB;
class Status;
// EventListener class contains a set of call-back functions that will
// be called when specific RocksDB event happens such as flush. It can
// be used as a building block for developing custom features such as
// stats-collector or external compaction algorithm.
//
// Note that call-back functions should not run for an extended period of
// time before the function returns, otherwise RocksDB may be blocked.
// For example, it is not suggested to do DB::CompactFiles() (as it may
// run for a long while) or issue many of DB::Put() (as Put may be blocked
// in certain cases) in the same thread in the EventListener callback.
// However, doing DB::CompactFiles() and DB::Put() in another thread is
// considered safe.
//
// [Threading] All EventListener callback will be called using the
// actual thread that involves in that specific event. For example, it
// is the RocksDB background flush thread that does the actual flush to
// call EventListener::OnFlushCompleted().
class EventListener {
public:
// A call-back function to RocksDB which will be called whenever a
// registered RocksDB flushes a file. The default implementation is
// no-op.
//
// Note that the this function must be implemented in a way such that
// it should not run for an extended period of time before the function
// returns. Otherwise, RocksDB may be blocked.
//
// @param db a pointer to the rocksdb instance which just flushed
// a memtable to disk.
// @param column_family_id the id of the flushed column family.
// @param file_path the path to the newly created file.
// @param triggered_writes_slowdown true when rocksdb is currently
// slowing-down all writes to prevent creating too many Level 0
// files as compaction seems not able to catch up the write request
// speed. This indicates that there're too many files in Level 0.
// @param triggered_writes_stop true when rocksdb is currently blocking
// any writes to prevent creating more L0 files. This indicates that
// there're too many files in level 0. Compactions should try to
// compact L0 files down to lower levels as soon as possible.
virtual void OnFlushCompleted(
DB* db, const std::string& column_family_name,
const std::string& file_path,
bool triggered_writes_slowdown,
bool triggered_writes_stop) {}
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,90 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <limits>
#include <string>
#include <vector>
#include "rocksdb/types.h"
#pragma once
namespace rocksdb {
struct ColumnFamilyMetaData;
struct LevelMetaData;
struct SstFileMetaData;
// The metadata that describes a column family.
struct ColumnFamilyMetaData {
ColumnFamilyMetaData() : size(0), name("") {}
ColumnFamilyMetaData(const std::string& name, uint64_t size,
const std::vector<LevelMetaData>&& levels) :
size(size), name(name), levels(levels) {}
// The size of this column family in bytes, which is equal to the sum of
// the file size of its "levels".
uint64_t size;
// The number of files in this column family.
size_t file_count;
// The name of the column family.
std::string name;
// The metadata of all levels in this column family.
std::vector<LevelMetaData> levels;
};
// The metadata that describes a level.
struct LevelMetaData {
LevelMetaData(int level, uint64_t size,
const std::vector<SstFileMetaData>&& files) :
level(level), size(size),
files(files) {}
// The level which this meta data describes.
const int level;
// The size of this level in bytes, which is equal to the sum of
// the file size of its "files".
const uint64_t size;
// The metadata of all sst files in this level.
const std::vector<SstFileMetaData> files;
};
// The metadata that describes a SST file.
struct SstFileMetaData {
SstFileMetaData() {}
SstFileMetaData(const std::string& file_name,
const std::string& path, uint64_t size,
SequenceNumber smallest_seqno,
SequenceNumber largest_seqno,
const std::string& smallestkey,
const std::string& largestkey,
bool being_compacted) :
size(size), name(file_name),
db_path(path), smallest_seqno(smallest_seqno), largest_seqno(largest_seqno),
smallestkey(smallestkey), largestkey(largestkey),
being_compacted(being_compacted) {}
// File size in bytes.
uint64_t size;
// The name of the file.
std::string name;
// The full path where the file locates.
std::string db_path;
SequenceNumber smallest_seqno; // Smallest sequence number in file.
SequenceNumber largest_seqno; // Largest sequence number in file.
std::string smallestkey; // Smallest user defined key in the file.
std::string largestkey; // Largest user defined key in the file.
bool being_compacted; // true if the file is currently being compacted.
};
// The full set of metadata associated with each SST file.
struct LiveFileMetaData : SstFileMetaData {
std::string column_family_name; // Name of the column family
int level; // Level at which this file resides.
};
} // namespace rocksdb

@ -10,13 +10,16 @@
#define STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_
#include <stddef.h>
#include <stdint.h>
#include <string>
#include <memory>
#include <vector>
#include <limits>
#include <stdint.h>
#include <unordered_map>
#include "rocksdb/version.h"
#include "rocksdb/listener.h"
#include "rocksdb/universal_compaction.h"
namespace rocksdb {
@ -56,6 +59,8 @@ enum CompactionStyle : char {
kCompactionStyleLevel = 0x0, // level based compaction style
kCompactionStyleUniversal = 0x1, // Universal compaction style
kCompactionStyleFIFO = 0x2, // FIFO compaction style
kCompactionStyleNone = 0x3, // Disable background compaction. Compaction
// jobs are submitted via CompactFiles()
};
@ -586,6 +591,10 @@ struct ColumnFamilyOptions {
// Default: 2
uint32_t min_partial_merge_operands;
// A vector of EventListeners which call-back functions will be called
// when specific RocksDB event happens.
std::vector<std::shared_ptr<EventListener>> listeners;
// Create ColumnFamilyOptions with default values for all fields
ColumnFamilyOptions();
// Create ColumnFamilyOptions from Options
@ -1067,6 +1076,19 @@ extern Options GetOptions(size_t total_write_buffer_limit,
int write_amplification_threshold = 32,
uint64_t target_db_size = 68719476736 /* 64GB */);
// CompactionOptions are used in CompactFiles() call.
struct CompactionOptions {
// Compaction output compression type
// Default: snappy
CompressionType compression;
// Compaction will create files of size `output_file_size_limit`.
// Default: MAX, which means that compaction will create a single file
uint64_t output_file_size_limit;
CompactionOptions()
: compression(kSnappyCompression),
output_file_size_limit(std::numeric_limits<uint64_t>::max()) {}
};
} // namespace rocksdb
#endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_

@ -61,6 +61,9 @@ class Status {
static Status Incomplete(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kIncomplete, msg, msg2);
}
static Status ShutdownInProgress() {
return Status(kShutdownInProgress);
}
static Status ShutdownInProgress(const Slice& msg,
const Slice& msg2 = Slice()) {
return Status(kShutdownInProgress, msg, msg2);
@ -71,6 +74,12 @@ class Status {
static Status TimedOut(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kTimedOut, msg, msg2);
}
static Status Aborted() {
return Status(kAborted);
}
static Status Aborted(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kAborted, msg, msg2);
}
// Returns true iff the status indicates success.
bool ok() const { return code() == kOk; }
@ -101,6 +110,8 @@ class Status {
bool IsTimedOut() const { return code() == kTimedOut; }
bool IsAborted() const { return code() == kAborted; }
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;
@ -115,7 +126,8 @@ class Status {
kMergeInProgress = 6,
kIncomplete = 7,
kShutdownInProgress = 8,
kTimedOut = 9
kTimedOut = 9,
kAborted = 10
};
Code code() const {

@ -133,6 +133,17 @@ class StackableDB : public DB {
target_level, target_path_id);
}
using DB::CompactFiles;
virtual Status CompactFiles(
const CompactionOptions& compact_options,
ColumnFamilyHandle* column_family,
const std::vector<std::string>& input_file_names,
const int output_level, const int output_path_id = -1) override {
return db_->CompactFiles(
compact_options, column_family, input_file_names,
output_level, output_path_id);
}
using DB::NumberLevels;
virtual int NumberLevels(ColumnFamilyHandle* column_family) override {
return db_->NumberLevels(column_family);
@ -170,6 +181,8 @@ class StackableDB : public DB {
return db_->Flush(fopts, column_family);
}
#ifndef ROCKSDB_LITE
virtual Status DisableFileDeletions() override {
return db_->DisableFileDeletions();
}
@ -183,6 +196,14 @@ class StackableDB : public DB {
db_->GetLiveFilesMetaData(metadata);
}
virtual void GetColumnFamilyMetaData(
ColumnFamilyHandle *column_family,
ColumnFamilyMetaData* cf_meta) override {
db_->GetColumnFamilyMetaData(column_family, cf_meta);
}
#endif // ROCKSDB_LITE
virtual Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs,
bool flush_memtable = true) override {
return db_->GetLiveFiles(vec, mfs, flush_memtable);

@ -64,7 +64,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options)
compression_per_level(options.compression_per_level),
compression_opts(options.compression_opts),
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
num_levels(options.num_levels) {}
num_levels(options.num_levels),
listeners(options.listeners) {}
ColumnFamilyOptions::ColumnFamilyOptions()
: comparator(BytewiseComparator()),
@ -112,7 +113,8 @@ ColumnFamilyOptions::ColumnFamilyOptions()
memtable_prefix_bloom_huge_page_tlb_size(0),
bloom_locality(0),
max_successive_merges(0),
min_partial_merge_operands(2) {
min_partial_merge_operands(2),
listeners() {
assert(memtable_factory.get() != nullptr);
}
@ -172,7 +174,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
options.memtable_prefix_bloom_huge_page_tlb_size),
bloom_locality(options.bloom_locality),
max_successive_merges(options.max_successive_merges),
min_partial_merge_operands(options.min_partial_merge_operands) {
min_partial_merge_operands(options.min_partial_merge_operands),
listeners(options.listeners) {
assert(memtable_factory.get() != nullptr);
if (max_bytes_for_level_multiplier_additional.size() <
static_cast<unsigned int>(num_levels)) {

@ -70,6 +70,9 @@ std::string Status::ToString() const {
case kTimedOut:
type = "Operation timed out: ";
break;
case kAborted:
type = "Operation aborted: ";
break;
default:
snprintf(tmp, sizeof(tmp), "Unknown code(%d): ",
static_cast<int>(code()));

Loading…
Cancel
Save