diff --git a/Makefile b/Makefile index c2d206e91..b8913108f 100644 --- a/Makefile +++ b/Makefile @@ -149,7 +149,9 @@ TESTS = \ cuckoo_table_db_test \ write_batch_with_index_test \ flush_job_test \ - wal_manager_test + wal_manager_test \ + listener_test \ + write_batch_with_index_test TOOLS = \ sst_dump \ @@ -502,6 +504,12 @@ cuckoo_table_reader_test: table/cuckoo_table_reader_test.o $(LIBOBJECTS) $(TESTH cuckoo_table_db_test: db/cuckoo_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/cuckoo_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +listener_test: db/listener_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) db/listener_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + +compactor_test: utilities/compaction/compactor_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) utilities/compaction/compactor_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + options_test: util/options_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) util/options_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) diff --git a/db/column_family.cc b/db/column_family.cc index eba3c74dd..08ff09866 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -87,6 +87,10 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() { uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); } +const std::string& ColumnFamilyHandleImpl::GetName() const { + return cfd()->GetName(); +} + const Comparator* ColumnFamilyHandleImpl::user_comparator() const { return cfd()->user_comparator(); } @@ -255,10 +259,23 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, } else if (ioptions_.compaction_style == kCompactionStyleLevel) { compaction_picker_.reset( new LevelCompactionPicker(ioptions_, &internal_comparator_)); - } else { - assert(ioptions_.compaction_style == kCompactionStyleFIFO); + } else if (ioptions_.compaction_style == kCompactionStyleFIFO) { compaction_picker_.reset( new FIFOCompactionPicker(ioptions_, &internal_comparator_)); + } else if (ioptions_.compaction_style == kCompactionStyleNone) { + compaction_picker_.reset(new NullCompactionPicker( + ioptions_, &internal_comparator_)); + Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, + "Column family %s does not use any background compaction. " + "Compactions can only be done via CompactFiles\n", + GetName().c_str()); + } else { + Log(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log, + "Unable to recognize the specified compaction style %d. " + "Column family %s will use kCompactionStyleLevel.\n", + ioptions_.compaction_style, GetName().c_str()); + compaction_picker_.reset( + new LevelCompactionPicker(ioptions_, &internal_comparator_)); } Log(InfoLogLevel::INFO_LEVEL, @@ -503,6 +520,19 @@ bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) { return false; } +void ColumnFamilyData::NotifyOnFlushCompleted( + DB* db, const std::string& file_path, + bool triggered_flush_slowdown, + bool triggered_flush_stop) { + auto listeners = ioptions()->listeners; + for (auto listener : listeners) { + listener->OnFlushCompleted( + db, GetName(), file_path, + // Use path 0 as fulled memtables are first flushed into path 0. + triggered_flush_slowdown, triggered_flush_stop); + } +} + SuperVersion* ColumnFamilyData::InstallSuperVersion( SuperVersion* new_superversion, port::Mutex* db_mutex) { db_mutex->AssertHeld(); diff --git a/db/column_family.h b/db/column_family.h index 0be47ee84..eef7e93b5 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -52,6 +52,7 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle { virtual const Comparator* user_comparator() const; virtual uint32_t GetID() const; + virtual const std::string& GetName() const override; private: ColumnFamilyData* cfd_; @@ -250,6 +251,11 @@ class ColumnFamilyData { void ResetThreadLocalSuperVersions(); + void NotifyOnFlushCompleted( + DB* db, const std::string& file_path, + bool triggered_flush_slowdown, + bool triggered_flush_stop); + private: friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, diff --git a/db/compaction.cc b/db/compaction.cc index 3f9da1d82..00513f533 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -78,6 +78,38 @@ Compaction::Compaction(int number_levels, int start_level, int out_level, } } +Compaction::Compaction(VersionStorageInfo* vstorage, + const autovector& inputs, + int start_level, int output_level, + uint64_t max_grandparent_overlap_bytes, + const CompactionOptions& options, + bool deletion_compaction) + : start_level_(start_level), + output_level_(output_level), + max_output_file_size_(options.output_file_size_limit), + max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes), + input_version_(nullptr), // TODO(yhchiang): set it later + number_levels_(vstorage->NumberLevels()), + cfd_(nullptr), + output_compression_(options.compression), + seek_compaction_(false), + deletion_compaction_(deletion_compaction), + inputs_(inputs), + grandparent_index_(0), + seen_key_(false), + overlapped_bytes_(0), + base_index_(-1), + parent_index_(-1), + score_(0), + bottommost_level_(false), + is_full_compaction_(false), + is_manual_compaction_(false), + level_ptrs_(std::vector(number_levels_)) { + for (int i = 0; i < number_levels_; i++) { + level_ptrs_[i] = 0; + } +} + Compaction::~Compaction() { delete edit_; if (input_version_ != nullptr) { diff --git a/db/compaction.h b/db/compaction.h index d8014545b..3a012fb60 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -33,6 +33,13 @@ class VersionStorageInfo; // A Compaction encapsulates information about a compaction. class Compaction { public: + Compaction(VersionStorageInfo* input_version, + const autovector& inputs, + int start_level, int output_level, + uint64_t max_grandparent_overlap_bytes, + const CompactionOptions& options, + bool deletion_compaction); + // No copying allowed Compaction(const Compaction&) = delete; void operator=(const Compaction&) = delete; @@ -153,6 +160,8 @@ class Compaction { // Was this compaction triggered manually by the client? bool IsManualCompaction() { return is_manual_compaction_; } + void SetOutputPathId(uint32_t path_id) { output_path_id_ = path_id; } + // Return the MutableCFOptions that should be used throughout the compaction // procedure const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; } @@ -164,6 +173,16 @@ class Compaction { void SetInputVersion(Version* input_version); + // mark (or clear) all files that are being compacted + void MarkFilesBeingCompacted(bool mark_as_compacted); + + // Initialize whether the compaction is producing files at the + // bottommost level. + // + // @see BottomMostLevel() + void SetupBottomMostLevel(VersionStorageInfo* vstorage, bool is_manual, + bool level0_only); + private: friend class CompactionPicker; friend class UniversalCompactionPicker; @@ -226,16 +245,6 @@ class Compaction { // records indices for all levels beyond "output_level_". std::vector level_ptrs_; - // mark (or clear) all files that are being compacted - void MarkFilesBeingCompacted(bool mark_as_compacted); - - // Initialize whether the compaction is producing files at the - // bottommost level. - // - // @see BottomMostLevel() - void SetupBottomMostLevel(VersionStorageInfo* vstorage, bool is_manual, - bool level0_only); - // In case of compaction error, reset the nextIndex that is used // to pick up the next file to be compacted from files_by_size_ void ResetNextCompactionIndex(); diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index e2694bcd0..f5207748b 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -185,7 +185,8 @@ bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name, } // Returns true if any one of specified files are being compacted -bool CompactionPicker::FilesInCompaction(std::vector& files) { +bool CompactionPicker::FilesInCompaction( + const std::vector& files) { for (unsigned int i = 0; i < files.size(); i++) { if (files[i]->being_compacted) { return true; @@ -194,6 +195,89 @@ bool CompactionPicker::FilesInCompaction(std::vector& files) { return false; } +Compaction* CompactionPicker::FormCompaction( + const CompactionOptions& compact_options, + const autovector& input_files, + int output_level, VersionStorageInfo* vstorage, + const MutableCFOptions& mutable_cf_options) const { + uint64_t max_grandparent_overlap_bytes = + output_level + 1 < vstorage->NumberLevels() ? + mutable_cf_options.MaxGrandParentOverlapBytes(output_level + 1) : + std::numeric_limits::max(); + assert(input_files.size()); + auto c = new Compaction(vstorage, input_files, + input_files[0].level, output_level, + max_grandparent_overlap_bytes, + compact_options, false); + c->mutable_cf_options_ = mutable_cf_options; + c->MarkFilesBeingCompacted(true); + + // TODO(yhchiang): complete the SetBottomMostLevel as follows + // If there is no any key of the range in DB that is older than the + // range to compact, it is bottom most. For leveled compaction, + // if number-of_level-1 is empty, and output is going to number-of_level-2, + // it is also bottom-most. On the other hand, if number of level=1 ( + // something like universal), the compaction is only "bottom-most" if + // the oldest file is involved. + c->SetupBottomMostLevel( + vstorage, + (output_level == vstorage->NumberLevels() - 1), + (output_level == 0)); + return c; +} + +Status CompactionPicker::GetCompactionInputsFromFileNumbers( + autovector* input_files, + std::unordered_set* input_set, + const VersionStorageInfo* vstorage, + const CompactionOptions& compact_options) const { + if (input_set->size() == 0U) { + return Status::InvalidArgument( + "Compaction must include at least one file."); + } + assert(input_files); + + autovector matched_input_files; + matched_input_files.resize(vstorage->NumberLevels()); + int first_non_empty_level = -1; + int last_non_empty_level = -1; + // TODO(yhchiang): use a lazy-initialized mapping from + // file_number to FileMetaData in Version. + for (int level = 0; level < vstorage->NumberLevels(); ++level) { + for (auto file : vstorage->LevelFiles(level)) { + auto iter = input_set->find(file->fd.GetNumber()); + if (iter != input_set->end()) { + matched_input_files[level].files.push_back(file); + input_set->erase(iter); + last_non_empty_level = level; + if (first_non_empty_level == -1) { + first_non_empty_level = level; + } + } + } + } + + if (!input_set->empty()) { + std::string message( + "Cannot find matched SST files for the following file numbers:"); + for (auto fn : *input_set) { + message += " "; + message += std::to_string(fn); + } + return Status::InvalidArgument(message); + } + + for (int level = first_non_empty_level; + level <= last_non_empty_level; ++level) { + matched_input_files[level].level = level; + input_files->emplace_back(std::move(matched_input_files[level])); + } + + return Status::OK(); +} + + + // Returns true if any one of the parent files are being compacted bool CompactionPicker::ParentRangeInCompaction(VersionStorageInfo* vstorage, const InternalKey* smallest, @@ -362,6 +446,235 @@ Compaction* CompactionPicker::CompactRange( return c; } +namespace { +// Test whether two files have overlapping key-ranges. +bool HaveOverlappingKeyRanges( + const Comparator* c, + const SstFileMetaData& a, const SstFileMetaData& b) { + if (c->Compare(a.smallestkey, b.smallestkey) >= 0) { + if (c->Compare(a.smallestkey, b.largestkey) <= 0) { + // b.smallestkey <= a.smallestkey <= b.largestkey + return true; + } + } else if (c->Compare(a.largestkey, b.smallestkey) >= 0) { + // a.smallestkey < b.smallestkey <= a.largestkey + return true; + } + if (c->Compare(a.largestkey, b.largestkey) <= 0) { + if (c->Compare(a.largestkey, b.smallestkey) >= 0) { + // b.smallestkey <= a.largestkey <= b.largestkey + return true; + } + } else if (c->Compare(a.smallestkey, b.largestkey) <= 0) { + // a.smallestkey <= b.largestkey < a.largestkey + return true; + } + return false; +} +} // namespace + +Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels( + std::unordered_set* input_files, + const ColumnFamilyMetaData& cf_meta, + const int output_level) const { + auto& levels = cf_meta.levels; + auto comparator = icmp_->user_comparator(); + + // TODO(yhchiang): If there is any input files of L1 or up and there + // is at least one L0 files. All L0 files older than the L0 file needs + // to be included. Otherwise, it is a false conditoin + + // TODO(yhchiang): add is_adjustable to CompactionOptions + + // the smallest and largest key of the current compaction input + std::string smallestkey; + std::string largestkey; + // a flag for initializing smallest and largest key + bool is_first = false; + const int kNotFound = -1; + + // For each level, it does the following things: + // 1. Find the first and the last compaction input files + // in the current level. + // 2. Include all files between the first and the last + // compaction input files. + // 3. Update the compaction key-range. + // 4. For all remaining levels, include files that have + // overlapping key-range with the compaction key-range. + for (int l = 0; l <= output_level; ++l) { + auto& current_files = levels[l].files; + int first_included = static_cast(current_files.size()); + int last_included = kNotFound; + + // identify the first and the last compaction input files + // in the current level. + for (size_t f = 0; f < current_files.size(); ++f) { + if (input_files->find(TableFileNameToNumber(current_files[f].name)) != + input_files->end()) { + first_included = std::min(first_included, static_cast(f)); + last_included = std::max(last_included, static_cast(f)); + if (is_first == false) { + smallestkey = current_files[f].smallestkey; + largestkey = current_files[f].largestkey; + is_first = true; + } + } + } + if (last_included == kNotFound) { + continue; + } + + if (l != 0) { + // expend the compaction input of the current level if it + // has overlapping key-range with other non-compaction input + // files in the same level. + while (first_included > 0) { + if (comparator->Compare( + current_files[first_included - 1].largestkey, + current_files[first_included].smallestkey) < 0) { + break; + } + first_included--; + } + + while (last_included < static_cast(current_files.size()) - 1) { + if (comparator->Compare( + current_files[last_included + 1].smallestkey, + current_files[last_included].largestkey) > 0) { + break; + } + last_included++; + } + } + + // include all files between the first and the last compaction input files. + for (int f = first_included; f <= last_included; ++f) { + if (current_files[f].being_compacted) { + return Status::Aborted( + "Necessary compaction input file " + current_files[f].name + + " is currently being compacted."); + } + input_files->insert( + TableFileNameToNumber(current_files[f].name)); + } + + // update smallest and largest key + if (l == 0) { + for (int f = first_included; f <= last_included; ++f) { + if (comparator->Compare( + smallestkey, current_files[f].smallestkey) > 0) { + smallestkey = current_files[f].smallestkey; + } + if (comparator->Compare( + largestkey, current_files[f].largestkey) < 0) { + largestkey = current_files[f].largestkey; + } + } + } else { + if (comparator->Compare( + smallestkey, current_files[first_included].smallestkey) > 0) { + smallestkey = current_files[first_included].smallestkey; + } + if (comparator->Compare( + largestkey, current_files[last_included].largestkey) < 0) { + largestkey = current_files[last_included].largestkey; + } + } + + SstFileMetaData aggregated_file_meta; + aggregated_file_meta.smallestkey = smallestkey; + aggregated_file_meta.largestkey = largestkey; + + // For all lower levels, include all overlapping files. + for (int m = l + 1; m <= output_level; ++m) { + for (auto& next_lv_file : levels[m].files) { + if (HaveOverlappingKeyRanges( + comparator, aggregated_file_meta, next_lv_file)) { + if (next_lv_file.being_compacted) { + return Status::Aborted( + "File " + next_lv_file.name + + " that has overlapping key range with one of the compaction " + " input file is currently being compacted."); + } + input_files->insert( + TableFileNameToNumber(next_lv_file.name)); + } + } + } + } + return Status::OK(); +} + +Status CompactionPicker::SanitizeCompactionInputFiles( + std::unordered_set* input_files, + const ColumnFamilyMetaData& cf_meta, + const int output_level) const { + assert(static_cast(cf_meta.levels.size()) - 1 == + cf_meta.levels[cf_meta.levels.size() - 1].level); + if (output_level >= static_cast(cf_meta.levels.size())) { + return Status::InvalidArgument( + "Output level for column family " + cf_meta.name + + " must between [0, " + + std::to_string(cf_meta.levels[cf_meta.levels.size() - 1].level) + + "]."); + } + + if (output_level > MaxOutputLevel()) { + return Status::InvalidArgument( + "Exceed the maximum output level defined by " + "the current compaction algorithm --- " + + std::to_string(MaxOutputLevel())); + } + + if (output_level < 0) { + return Status::InvalidArgument( + "Output level cannot be negative."); + } + + if (input_files->size() == 0) { + return Status::InvalidArgument( + "A compaction must contain at least one file."); + } + + Status s = SanitizeCompactionInputFilesForAllLevels( + input_files, cf_meta, output_level); + + if (!s.ok()) { + return s; + } + + // for all input files, check whether the file number matches + // any currently-existing files. + for (auto file_num : *input_files) { + bool found = false; + for (auto level_meta : cf_meta.levels) { + for (auto file_meta : level_meta.files) { + if (file_num == TableFileNameToNumber(file_meta.name)) { + if (file_meta.being_compacted) { + return Status::Aborted( + "Specified compaction input file " + + MakeTableFileName("", file_num) + + " is already being compacted."); + } + found = true; + break; + } + } + if (found) { + break; + } + } + if (!found) { + return Status::InvalidArgument( + "Specified compaction input file " + + MakeTableFileName("", file_num) + + " does not exist in column family " + cf_meta.name + "."); + } + } + + return Status::OK(); +} + Compaction* LevelCompactionPicker::PickCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, LogBuffer* log_buffer) { diff --git a/db/compaction_picker.h b/db/compaction_picker.h index d691a765a..d8daed115 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -8,6 +8,11 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include +#include +#include +#include + #include "db/version_set.h" #include "db/compaction.h" #include "rocksdb/status.h" @@ -25,6 +30,7 @@ namespace rocksdb { class LogBuffer; class Compaction; class VersionStorageInfo; +struct CompactionInputFiles; class CompactionPicker { public: @@ -62,6 +68,22 @@ class CompactionPicker { // for compaction input. virtual int MaxInputLevel(int current_num_levels) const = 0; + // The maximum allowed output level. Default value is NumberLevels() - 1. + virtual int MaxOutputLevel() const { + return NumberLevels() - 1; + } + + // Sanitize the input set of compaction input files. + // When the input parameters do not describe a valid compaction, the + // function will try to fix the input_files by adding necessary + // files. If it's not possible to conver an invalid input_files + // into a valid one by adding more files, the function will return a + // non-ok status with specific reason. + Status SanitizeCompactionInputFiles( + std::unordered_set* input_files, + const ColumnFamilyMetaData& cf_meta, + const int output_level) const; + // Free up the files that participated in a compaction void ReleaseCompactionFiles(Compaction* c, Status status); @@ -69,6 +91,25 @@ class CompactionPicker { // compactions per level void SizeBeingCompacted(std::vector& sizes); + // Returns true if any one of the specified files are being compacted + bool FilesInCompaction(const std::vector& files); + + // Takes a list of CompactionInputFiles and returns a Compaction object. + Compaction* FormCompaction( + const CompactionOptions& compact_options, + const autovector& input_files, + int output_level, VersionStorageInfo* vstorage, + const MutableCFOptions& mutable_cf_options) const; + + // Converts a set of compaction input file numbers into + // a list of CompactionInputFiles. + Status GetCompactionInputsFromFileNumbers( + autovector* input_files, + std::unordered_set* input_set, + const VersionStorageInfo* vstorage, + const CompactionOptions& compact_options) const; + + protected: int NumberLevels() const { return ioptions_.num_levels; } @@ -98,9 +139,6 @@ class CompactionPicker { bool ExpandWhileOverlapping(const std::string& cf_name, VersionStorageInfo* vstorage, Compaction* c); - // Returns true if any one of the specified files are being compacted - bool FilesInCompaction(std::vector& files); - // Returns true if any one of the parent files are being compacted bool ParentRangeInCompaction(VersionStorageInfo* vstorage, const InternalKey* smallest, @@ -113,11 +151,16 @@ class CompactionPicker { const ImmutableCFOptions& ioptions_; + // A helper function to SanitizeCompactionInputFiles() that + // sanitizes "input_files" by adding necessary files. + virtual Status SanitizeCompactionInputFilesForAllLevels( + std::unordered_set* input_files, + const ColumnFamilyMetaData& cf_meta, + const int output_level) const; + // record all the ongoing compactions for all levels std::vector> compactions_in_progress_; - - private: const InternalKeyComparator* const icmp_; }; @@ -131,11 +174,16 @@ class UniversalCompactionPicker : public CompactionPicker { VersionStorageInfo* vstorage, LogBuffer* log_buffer) override; - // The maxinum allowed input level. Always return 0. + // The maxinum allowed input level. Always returns 0. virtual int MaxInputLevel(int current_num_levels) const override { return 0; } + // The maximum allowed output level. Always returns 0. + virtual int MaxOutputLevel() const override { + return 0; + } + private: // Pick Universal compaction to limit read amplification Compaction* PickCompactionUniversalReadAmp( @@ -197,10 +245,46 @@ class FIFOCompactionPicker : public CompactionPicker { uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end) override; - // The maxinum allowed input level. Always return 0. + // The maxinum allowed input level. Always returns 0. virtual int MaxInputLevel(int current_num_levels) const override { return 0; } + + // The maximum allowed output level. Always returns 0. + virtual int MaxOutputLevel() const override { + return 0; + } +}; + +class NullCompactionPicker : public CompactionPicker { + public: + NullCompactionPicker(const ImmutableCFOptions& ioptions, + const InternalKeyComparator* icmp) : + CompactionPicker(ioptions, icmp) {} + virtual ~NullCompactionPicker() {} + + // Always return "nullptr" + Compaction* PickCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, + LogBuffer* log_buffer) override { + return nullptr; + } + + // Always return "nullptr" + Compaction* CompactRange( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, int input_level, int output_level, + uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end) override { + return nullptr; + } + + // Given the current number of levels, returns the highest allowed level + // for compaction input. + virtual int MaxInputLevel(int current_num_levels) const { + return current_num_levels - 2; + } }; // Utility function diff --git a/db/db_impl.cc b/db/db_impl.cc index da0603303..0c218fb03 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -213,7 +213,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) #endif // ROCKSDB_LITE bg_work_gate_closed_(false), refitting_level_(false), - opened_successfully_(false) { + opened_successfully_(false), + notifying_events_(0) { env_->GetAbsolutePath(dbname, &db_absolute_path_); // Reserve ten files or so for other uses and give the rest to TableCache. @@ -239,6 +240,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) DBImpl::~DBImpl() { mutex_.Lock(); + if (flush_on_destroy_) { for (auto cfd : *versions_->GetColumnFamilySet()) { if (!cfd->mem()->IsEmpty()) { @@ -254,10 +256,10 @@ DBImpl::~DBImpl() { // Wait for background work to finish shutting_down_.store(true, std::memory_order_release); - while (bg_compaction_scheduled_ || bg_flush_scheduled_) { + while (bg_compaction_scheduled_ || bg_flush_scheduled_ || notifying_events_) { bg_cv_.Wait(); } - + listeners_.clear(); flush_scheduler_.Clear(); if (default_cf_handle_ != nullptr) { @@ -1055,7 +1057,8 @@ Status DBImpl::FlushMemTableToOutputFile( db_directory_.get(), GetCompressionFlush(*cfd->ioptions()), stats_); - Status s = flush_job.Run(); + uint64_t file_number; + Status s = flush_job.Run(&file_number); if (s.ok()) { InstallSuperVersionBackground(cfd, job_context, mutable_cf_options); @@ -1085,9 +1088,42 @@ Status DBImpl::FlushMemTableToOutputFile( bg_error_ = s; } RecordFlushIOStats(); +#ifndef ROCKSDB_LITE + if (s.ok()) { + // may temporarily unlock and lock the mutex. + NotifyOnFlushCompleted(cfd, file_number); + } +#endif // ROCKSDB_LITE return s; } +void DBImpl::NotifyOnFlushCompleted( + ColumnFamilyData* cfd, uint64_t file_number) { + mutex_.AssertHeld(); + if (shutting_down_.load(std::memory_order_acquire)) { + return; + } + bool triggered_flush_slowdown = + (cfd->current()->storage_info()->NumLevelFiles(0) >= + cfd->options()->level0_slowdown_writes_trigger); + bool triggered_flush_stop = + (cfd->current()->storage_info()->NumLevelFiles(0) >= + cfd->options()->level0_stop_writes_trigger); + notifying_events_++; + // release lock while notifying events + mutex_.Unlock(); + // TODO(yhchiang): make db_paths dynamic. + cfd->NotifyOnFlushCompleted( + this, MakeTableFileName(db_options_.db_paths[0].path, file_number), + triggered_flush_slowdown, + triggered_flush_stop); + mutex_.Lock(); + notifying_events_--; + assert(notifying_events_ >= 0); + // no need to signal bg_cv_ as it will be signaled at the end of the + // flush process. +} + Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end, bool reduce_level, int target_level, @@ -1149,6 +1185,167 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, return s; } +Status DBImpl::CompactFiles( + const CompactionOptions& compact_options, + ColumnFamilyHandle* column_family, + const std::vector& input_file_names, + const int output_level, const int output_path_id) { + MutexLock l(&mutex_); + if (column_family == nullptr) { + return Status::InvalidArgument("ColumnFamilyHandle must be non-null."); + } + + auto cfd = reinterpret_cast(column_family)->cfd(); + assert(cfd); + // TODO(yhchiang): use superversion + cfd->Ref(); + auto version = cfd->current(); + version->Ref(); + auto s = CompactFilesImpl(compact_options, cfd, version, + input_file_names, output_level, output_path_id); + // TODO(yhchiang): unref could move into CompactFilesImpl(). Otherwise, + // FindObsoleteFiles might never able to find any file to delete. + version->Unref(); + // TODO(yhchiang): cfd should be deleted after its last reference. + cfd->Unref(); + return s; +} + +Status DBImpl::CompactFilesImpl( + const CompactionOptions& compact_options, ColumnFamilyData* cfd, + Version* version, const std::vector& input_file_names, + const int output_level, int output_path_id) { + mutex_.AssertHeld(); + + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); + + if (shutting_down_.load(std::memory_order_acquire)) { + return Status::ShutdownInProgress(); + } + + std::unordered_set input_set; + for (auto file_name : input_file_names) { + input_set.insert(TableFileNameToNumber(file_name)); + } + + ColumnFamilyMetaData cf_meta; + // TODO(yhchiang): can directly use version here if none of the + // following functions call is pluggable to external developers. + version->GetColumnFamilyMetaData(&cf_meta); + + if (output_path_id < 0) { + if (db_options_.db_paths.size() == 1U) { + output_path_id = 0; + } else { + return Status::NotSupported( + "Automatic output path selection is not " + "yet supported in CompactFiles()"); + } + } + + Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles( + &input_set, cf_meta, output_level); + if (!s.ok()) { + return s; + } + + autovector input_files; + s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers( + &input_files, &input_set, version->storage_info(), compact_options); + if (!s.ok()) { + return s; + } + + for (auto inputs : input_files) { + if (cfd->compaction_picker()->FilesInCompaction(inputs.files)) { + return Status::Aborted( + "Some of the necessary compaction input " + "files are already being compacted"); + } + } + + // At this point, CompactFiles will be run. + bg_compaction_scheduled_++; + + unique_ptr c; + assert(cfd->compaction_picker()); + c.reset(cfd->compaction_picker()->FormCompaction( + compact_options, input_files, + output_level, version->storage_info(), + *cfd->GetLatestMutableCFOptions())); + assert(c); + c->SetInputVersion(version); + c->SetOutputPathId(static_cast(output_path_id)); + // deletion compaction currently not allowed in CompactFiles. + assert(!c->IsDeletionCompaction()); + + JobContext job_context(true); + auto yield_callback = [&]() { + return CallFlushDuringCompaction(c->column_family_data(), + *c->mutable_cf_options(), &job_context, + &log_buffer); + }; + CompactionJob compaction_job( + c.get(), db_options_, *c->mutable_cf_options(), env_options_, + versions_.get(), &mutex_, &shutting_down_, &pending_outputs_, + &log_buffer, db_directory_.get(), stats_, &snapshots_, + IsSnapshotSupported(), table_cache_, std::move(yield_callback)); + compaction_job.Prepare(); + + mutex_.Unlock(); + Status status = compaction_job.Run(); + mutex_.Lock(); + if (status.ok()) { + status = compaction_job.Install(status); + if (status.ok()) { + InstallSuperVersionBackground(c->column_family_data(), &job_context, + *c->mutable_cf_options()); + } + } + c->ReleaseCompactionFiles(s); + c->ReleaseInputs(); + c.reset(); + + if (status.ok()) { + // Done + } else if (status.IsShutdownInProgress()) { + // Ignore compaction errors found during shutting down + } else { + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "Compaction error: %s", + status.ToString().c_str()); + if (db_options_.paranoid_checks && bg_error_.ok()) { + bg_error_ = status; + } + } + + // If !s.ok(), this means that Compaction failed. In that case, we want + // to delete all obsolete files we might have created and we force + // FindObsoleteFiles(). This is because job_context does not + // catch all created files if compaction failed. + // TODO(yhchiang): write an unit-test to make sure files are actually + // deleted after CompactFiles. + FindObsoleteFiles(&job_context, !s.ok()); + + // delete unnecessary files if any, this is done outside the mutex + if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { + mutex_.Unlock(); + // Have to flush the info logs before bg_compaction_scheduled_-- + // because if bg_flush_scheduled_ becomes 0 and the lock is + // released, the deconstructor of DB can kick in and destroy all the + // states of DB so info_log might not be available after that point. + // It also applies to access other states that DB owns. + log_buffer.FlushBufferToLog(); + if (job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(job_context); + } + mutex_.Lock(); + } + + bg_compaction_scheduled_--; + + return status; +} + Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, const std::unordered_map& options_map) { auto* cfd = reinterpret_cast(column_family)->cfd(); @@ -3112,6 +3309,17 @@ void DBImpl::GetLiveFilesMetaData(std::vector* metadata) { MutexLock l(&mutex_); versions_->GetLiveFilesMetaData(metadata); } + +void DBImpl::GetColumnFamilyMetaData( + ColumnFamilyHandle* column_family, + ColumnFamilyMetaData* cf_meta) { + assert(column_family); + auto* cfd = reinterpret_cast(column_family)->cfd(); + auto* sv = GetAndRefSuperVersion(cfd); + sv->current->GetColumnFamilyMetaData(cf_meta); + ReturnAndCleanupSuperVersion(cfd, sv); +} + #endif // ROCKSDB_LITE Status DBImpl::CheckConsistency() { @@ -3362,6 +3570,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, if (s.ok()) { impl->opened_successfully_ = true; *dbptr = impl; + // TODO(yhchiang): Add NotifyOnDatabaseOpen() here. + // Since the column-family handles are only available after DB::Open(), + // typically developers will need to pass the returned ColumnFamilyHandles + // to their EventListeners in order to maintain the mapping between + // column-family-name to ColumnFamilyHandle. However, some database + // events might happen before the user passing those ColumnFamilyHandle to + // their Listeners. To address this, we should have NotifyOnDatabaseOpen() + // here which passes the created ColumnFamilyHandle to the Listeners + // as the first event after DB::Open(). } else { for (auto h : *handles) { delete h; diff --git a/db/db_impl.h b/db/db_impl.h index a25a82a9a..eda00ab9b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -115,6 +116,13 @@ class DBImpl : public DB { bool reduce_level = false, int target_level = -1, uint32_t target_path_id = 0); + using DB::CompactFiles; + virtual Status CompactFiles( + const CompactionOptions& compact_options, + ColumnFamilyHandle* column_family, + const std::vector& input_file_names, + const int output_level, const int output_path_id = -1); + using DB::SetOptions; Status SetOptions(ColumnFamilyHandle* column_family, const std::unordered_map& options_map); @@ -152,6 +160,15 @@ class DBImpl : public DB { virtual Status DeleteFile(std::string name); virtual void GetLiveFilesMetaData(std::vector* metadata); + + // Obtains the meta data of the specified column family of the DB. + // Status::NotFound() will be returned if the current DB does not have + // any column family match the specified name. + // TODO(yhchiang): output parameter is placed in the end in this codebase. + virtual void GetColumnFamilyMetaData( + ColumnFamilyHandle* column_family, + ColumnFamilyMetaData* metadata) override; + #endif // ROCKSDB_LITE // checks if all live files exist on file system and that their file sizes @@ -211,7 +228,7 @@ class DBImpl : public DB { // REQUIRES: mutex locked // pass the pointer that you got from TEST_BeginWrite() void TEST_EndWrite(void* w); -#endif // NDEBUG +#endif // ROCKSDB_LITE // Returns the list of live files in 'live' and the list // of all files in the filesystem in 'candidate_files'. @@ -239,6 +256,8 @@ class DBImpl : public DB { Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd, SuperVersion* super_version, Arena* arena); + void NotifyOnFlushCompleted(ColumnFamilyData* cfd, uint64_t file_number); + private: friend class DB; friend class InternalStats; @@ -318,6 +337,13 @@ class DBImpl : public DB { void RecordFlushIOStats(); void RecordCompactionIOStats(); + Status CompactFilesImpl( + const CompactionOptions& compact_options, ColumnFamilyData* cfd, + Version* version, const std::vector& input_file_names, + const int output_level, int output_path_id); + + ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name); + void MaybeScheduleFlushOrCompaction(); static void BGWorkCompaction(void* db); static void BGWorkFlush(void* db); @@ -488,6 +514,12 @@ class DBImpl : public DB { // Indicate DB was opened successfully bool opened_successfully_; + // The list of registered event listeners. + std::list listeners_; + + // count how many events are currently being notified. + int notifying_events_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 9b10b83fb..d84b23f18 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -62,10 +62,20 @@ class DBImplReadOnly : public DBImpl { return Status::NotSupported("Not supported operation in read only mode."); } + using DBImpl::CompactFiles; + virtual Status CompactFiles( + const CompactionOptions& compact_options, + ColumnFamilyHandle* column_family, + const std::vector& input_file_names, + const int output_level, const int output_path_id = -1) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + #ifndef ROCKSDB_LITE virtual Status DisableFileDeletions() override { return Status::NotSupported("Not supported operation in read only mode."); } + virtual Status EnableFileDeletions(bool force) override { return Status::NotSupported("Not supported operation in read only mode."); } diff --git a/db/db_test.cc b/db/db_test.cc index 3a7891559..8b018715c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -15,6 +15,7 @@ #include #include +#include "db/filename.h" #include "db/dbformat.h" #include "db/db_impl.h" #include "db/filename.h" @@ -4060,8 +4061,43 @@ TEST(DBTest, UniversalCompactionFourPaths) { Destroy(options); } + #endif +void CheckColumnFamilyMeta(const ColumnFamilyMetaData& cf_meta) { + uint64_t cf_size = 0; + uint64_t cf_csize = 0; + size_t file_count = 0; + for (auto level_meta : cf_meta.levels) { + uint64_t level_size = 0; + uint64_t level_csize = 0; + file_count += level_meta.files.size(); + for (auto file_meta : level_meta.files) { + level_size += file_meta.size; + } + ASSERT_EQ(level_meta.size, level_size); + cf_size += level_size; + cf_csize += level_csize; + } + ASSERT_EQ(cf_meta.file_count, file_count); + ASSERT_EQ(cf_meta.size, cf_size); +} + +TEST(DBTest, ColumnFamilyMetaDataTest) { + Options options = CurrentOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + + Random rnd(301); + int key_index = 0; + ColumnFamilyMetaData cf_meta; + for (int i = 0; i < 100; ++i) { + GenerateNewFile(&rnd, &key_index); + db_->GetColumnFamilyMetaData(&cf_meta); + CheckColumnFamilyMeta(cf_meta); + } +} + TEST(DBTest, ConvertCompactionStyle) { Random rnd(301); int max_key_level_insert = 200; @@ -4238,7 +4274,7 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits, TEST(DBTest, MinLevelToCompress1) { Options options = CurrentOptions(); - CompressionType type; + CompressionType type = kSnappyCompression; if (!MinLevelToCompress(type, options, -14, -1, 0)) { return; } @@ -4258,7 +4294,7 @@ TEST(DBTest, MinLevelToCompress1) { TEST(DBTest, MinLevelToCompress2) { Options options = CurrentOptions(); - CompressionType type; + CompressionType type = kSnappyCompression; if (!MinLevelToCompress(type, options, 15, -1, 0)) { return; } @@ -7246,6 +7282,15 @@ class ModelDB: public DB { return Status::NotSupported("Not supported operation."); } + using DB::CompactFiles; + virtual Status CompactFiles( + const CompactionOptions& compact_options, + ColumnFamilyHandle* column_family, + const std::vector& input_file_names, + const int output_level, const int output_path_id = -1) override { + return Status::NotSupported("Not supported operation."); + } + using DB::NumberLevels; virtual int NumberLevels(ColumnFamilyHandle* column_family) { return 1; } @@ -7314,6 +7359,10 @@ class ModelDB: public DB { virtual ColumnFamilyHandle* DefaultColumnFamily() const { return nullptr; } + virtual void GetColumnFamilyMetaData( + ColumnFamilyHandle* column_family, + ColumnFamilyMetaData* metadata) {} + private: class ModelIter: public Iterator { public: @@ -8202,6 +8251,211 @@ TEST(DBTest, RateLimitingTest) { ASSERT_TRUE(ratio < 0.6); } +namespace { + bool HaveOverlappingKeyRanges( + const Comparator* c, + const SstFileMetaData& a, const SstFileMetaData& b) { + if (c->Compare(a.smallestkey, b.smallestkey) >= 0) { + if (c->Compare(a.smallestkey, b.largestkey) <= 0) { + // b.smallestkey <= a.smallestkey <= b.largestkey + return true; + } + } else if (c->Compare(a.largestkey, b.smallestkey) >= 0) { + // a.smallestkey < b.smallestkey <= a.largestkey + return true; + } + if (c->Compare(a.largestkey, b.largestkey) <= 0) { + if (c->Compare(a.largestkey, b.smallestkey) >= 0) { + // b.smallestkey <= a.largestkey <= b.largestkey + return true; + } + } else if (c->Compare(a.smallestkey, b.largestkey) <= 0) { + // a.smallestkey <= b.largestkey < a.largestkey + return true; + } + return false; + } + + // Identifies all files between level "min_level" and "max_level" + // which has overlapping key range with "input_file_meta". + void GetOverlappingFileNumbersForLevelCompaction( + const ColumnFamilyMetaData& cf_meta, + const Comparator* comparator, + int min_level, int max_level, + const SstFileMetaData* input_file_meta, + std::set* overlapping_file_names) { + std::set overlapping_files; + overlapping_files.insert(input_file_meta); + for (int m = min_level; m <= max_level; ++m) { + for (auto& file : cf_meta.levels[m].files) { + for (auto* included_file : overlapping_files) { + if (HaveOverlappingKeyRanges( + comparator, *included_file, file)) { + overlapping_files.insert(&file); + overlapping_file_names->insert(file.name); + break; + } + } + } + } + } + + void VerifyCompactionResult( + const ColumnFamilyMetaData& cf_meta, + const std::set& overlapping_file_numbers) { + for (auto& level : cf_meta.levels) { + for (auto& file : level.files) { + assert(overlapping_file_numbers.find(file.name) == + overlapping_file_numbers.end()); + } + } + } + + const SstFileMetaData* PickFileRandomly( + const ColumnFamilyMetaData& cf_meta, + Random* rand, + int* level = nullptr) { + auto file_id = rand->Uniform(static_cast( + cf_meta.file_count)) + 1; + for (auto& level_meta : cf_meta.levels) { + if (file_id <= level_meta.files.size()) { + if (level != nullptr) { + *level = level_meta.level; + } + auto result = rand->Uniform(file_id); + return &(level_meta.files[result]); + } + file_id -= level_meta.files.size(); + } + assert(false); + return nullptr; + } +} // namespace + +TEST(DBTest, CompactFilesOnLevelCompaction) { + const int kKeySize = 16; + const int kValueSize = 984; + const int kEntrySize = kKeySize + kValueSize; + const int kEntriesPerBuffer = 100; + Options options; + options.create_if_missing = true; + options.write_buffer_size = kEntrySize * kEntriesPerBuffer; + options.compaction_style = kCompactionStyleLevel; + options.target_file_size_base = options.write_buffer_size; + options.max_bytes_for_level_base = options.target_file_size_base * 2; + options.level0_stop_writes_trigger = 2; + options.max_bytes_for_level_multiplier = 2; + options.compression = kNoCompression; + options = CurrentOptions(options); + CreateAndReopenWithCF({"pikachu"}, options); + + Random rnd(301); + for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) { + ASSERT_OK(Put(1, std::to_string(key), RandomString(&rnd, kValueSize))); + } + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForCompact(); + + ColumnFamilyMetaData cf_meta; + dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta); + int output_level = cf_meta.levels.size() - 1; + for (int file_picked = 5; file_picked > 0; --file_picked) { + std::set overlapping_file_names; + std::vector compaction_input_file_names; + for (int f = 0; f < file_picked; ++f) { + int level; + auto file_meta = PickFileRandomly(cf_meta, &rnd, &level); + compaction_input_file_names.push_back(file_meta->name); + GetOverlappingFileNumbersForLevelCompaction( + cf_meta, options.comparator, level, output_level, + file_meta, &overlapping_file_names); + } + + ASSERT_OK(dbfull()->CompactFiles( + CompactionOptions(), handles_[1], + compaction_input_file_names, + output_level)); + + // Make sure all overlapping files do not exist after compaction + dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta); + VerifyCompactionResult(cf_meta, overlapping_file_names); + } + + // make sure all key-values are still there. + for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) { + ASSERT_NE(Get(1, std::to_string(key)), "NOT_FOUND"); + } +} + +TEST(DBTest, CompactFilesOnUniversalCompaction) { + const int kKeySize = 16; + const int kValueSize = 984; + const int kEntrySize = kKeySize + kValueSize; + const int kEntriesPerBuffer = 10; + + ChangeCompactOptions(); + Options options; + options.create_if_missing = true; + options.write_buffer_size = kEntrySize * kEntriesPerBuffer; + options.compaction_style = kCompactionStyleLevel; + options.target_file_size_base = options.write_buffer_size; + options.compression = kNoCompression; + options = CurrentOptions(options); + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_EQ(options.compaction_style, kCompactionStyleUniversal); + Random rnd(301); + for (int key = 1024 * kEntriesPerBuffer; key >= 0; --key) { + ASSERT_OK(Put(1, std::to_string(key), RandomString(&rnd, kValueSize))); + } + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + dbfull()->TEST_WaitForCompact(); + ColumnFamilyMetaData cf_meta; + dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta); + std::vector compaction_input_file_names; + for (auto file : cf_meta.levels[0].files) { + if (rnd.OneIn(2)) { + compaction_input_file_names.push_back(file.name); + } + } + + if (compaction_input_file_names.size() == 0) { + compaction_input_file_names.push_back( + cf_meta.levels[0].files[0].name); + } + + // expect fail since universal compaction only allow L0 output + ASSERT_TRUE(!dbfull()->CompactFiles( + CompactionOptions(), handles_[1], + compaction_input_file_names, 1).ok()); + + // expect ok and verify the compacted files no longer exist. + ASSERT_OK(dbfull()->CompactFiles( + CompactionOptions(), handles_[1], + compaction_input_file_names, 0)); + + dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta); + VerifyCompactionResult( + cf_meta, + std::set(compaction_input_file_names.begin(), + compaction_input_file_names.end())); + + compaction_input_file_names.clear(); + + // Pick the first and the last file, expect everything is + // compacted into one single file. + compaction_input_file_names.push_back( + cf_meta.levels[0].files[0].name); + compaction_input_file_names.push_back( + cf_meta.levels[0].files[ + cf_meta.levels[0].files.size() - 1].name); + ASSERT_OK(dbfull()->CompactFiles( + CompactionOptions(), handles_[1], + compaction_input_file_names, 0)); + + dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta); + ASSERT_EQ(cf_meta.levels[0].files.size(), 1U); +} + TEST(DBTest, TableOptionsSanitizeTest) { Options options = CurrentOptions(); options.create_if_missing = true; @@ -9079,7 +9333,6 @@ TEST(DBTest, DontDeletePendingOutputs) { Compact("a", "b"); } - } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/filename.cc b/db/filename.cc index a8f685296..e5d97bdf2 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -79,6 +79,17 @@ std::string MakeTableFileName(const std::string& path, uint64_t number) { return MakeFileName(path, number, "sst"); } +uint64_t TableFileNameToNumber(const std::string& name) { + uint64_t number = 0; + uint64_t base = 1; + int pos = static_cast(name.find_last_of('.')); + while (--pos >= 0 && name[pos] >= '0' && name[pos] <= '9') { + number += (name[pos] - '0') * base; + base *= 10; + } + return number; +} + std::string TableFileName(const std::vector& db_paths, uint64_t number, uint32_t path_id) { assert(number > 0); diff --git a/db/filename.h b/db/filename.h index 87963ea21..4136ff12e 100644 --- a/db/filename.h +++ b/db/filename.h @@ -52,6 +52,10 @@ extern std::string ArchivedLogFileName(const std::string& dbname, extern std::string MakeTableFileName(const std::string& name, uint64_t number); +// the reverse function of MakeTableFileName +// TODO(yhchiang): could merge this function with ParseFileName() +extern uint64_t TableFileNameToNumber(const std::string& name); + // Return the name of the sstable with the specified number // in the db named by "dbname". The result will be prefixed with // "dbname". diff --git a/db/flush_job.cc b/db/flush_job.cc index 973d86033..a3079d2df 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -73,9 +73,9 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, output_compression_(output_compression), stats_(stats) {} -Status FlushJob::Run() { +Status FlushJob::Run(uint64_t* file_number) { // Save the contents of the earliest memtable as a new Table - uint64_t file_number; + uint64_t fn; autovector mems; cfd_->imm()->PickMemtablesToFlush(&mems); if (mems.empty()) { @@ -96,7 +96,7 @@ Status FlushJob::Run() { edit->SetColumnFamily(cfd_->GetID()); // This will release and re-acquire the mutex. - Status s = WriteLevel0Table(mems, edit, &file_number); + Status s = WriteLevel0Table(mems, edit, &fn); if (s.ok() && (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) { @@ -113,6 +113,9 @@ Status FlushJob::Run() { &job_context_->memtables_to_free, db_directory_, log_buffer_); } + if (s.ok() && file_number != nullptr) { + *file_number = fn; + } return s; } diff --git a/db/flush_job.h b/db/flush_job.h index 86d4aa073..394a7a45e 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -60,7 +60,7 @@ class FlushJob { CompressionType output_compression, Statistics* stats); ~FlushJob() {} - Status Run(); + Status Run(uint64_t* file_number = nullptr); private: Status WriteLevel0Table(const autovector& mems, VersionEdit* edit, diff --git a/db/listener_test.cc b/db/listener_test.cc new file mode 100644 index 000000000..f39ac93eb --- /dev/null +++ b/db/listener_test.cc @@ -0,0 +1,344 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/dbformat.h" +#include "db/db_impl.h" +#include "db/filename.h" +#include "db/version_set.h" +#include "db/write_batch_internal.h" +#include "rocksdb/cache.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/filter_policy.h" +#include "rocksdb/perf_context.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/table.h" +#include "rocksdb/options.h" +#include "rocksdb/table_properties.h" +#include "table/block_based_table_factory.h" +#include "table/plain_table_factory.h" +#include "util/hash.h" +#include "util/hash_linklist_rep.h" +#include "utilities/merge_operators.h" +#include "util/logging.h" +#include "util/mutexlock.h" +#include "util/rate_limiter.h" +#include "util/statistics.h" +#include "util/testharness.h" +#include "util/sync_point.h" +#include "util/testutil.h" + +#ifndef ROCKSDB_LITE + +namespace rocksdb { + +class EventListenerTest { + public: + EventListenerTest() { + dbname_ = test::TmpDir() + "/listener_test"; + ASSERT_OK(DestroyDB(dbname_, Options())); + db_ = nullptr; + Reopen(); + } + + ~EventListenerTest() { + Close(); + Options options; + options.db_paths.emplace_back(dbname_, 0); + options.db_paths.emplace_back(dbname_ + "_2", 0); + options.db_paths.emplace_back(dbname_ + "_3", 0); + options.db_paths.emplace_back(dbname_ + "_4", 0); + ASSERT_OK(DestroyDB(dbname_, options)); + } + + void CreateColumnFamilies(const std::vector& cfs, + const ColumnFamilyOptions* options = nullptr) { + ColumnFamilyOptions cf_opts; + cf_opts = ColumnFamilyOptions(Options()); + int cfi = handles_.size(); + handles_.resize(cfi + cfs.size()); + for (auto cf : cfs) { + ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++])); + } + } + + void Close() { + for (auto h : handles_) { + delete h; + } + handles_.clear(); + delete db_; + db_ = nullptr; + } + + void ReopenWithColumnFamilies(const std::vector& cfs, + const Options* options = nullptr) { + ASSERT_OK(TryReopenWithColumnFamilies(cfs, options)); + } + + Status TryReopenWithColumnFamilies(const std::vector& cfs, + const Options* options = nullptr) { + Close(); + Options opts = (options == nullptr) ? Options() : *options; + std::vector v_opts(cfs.size(), &opts); + return TryReopenWithColumnFamilies(cfs, v_opts); + } + + Status TryReopenWithColumnFamilies( + const std::vector& cfs, + const std::vector& options) { + Close(); + ASSERT_EQ(cfs.size(), options.size()); + std::vector column_families; + for (size_t i = 0; i < cfs.size(); ++i) { + column_families.push_back(ColumnFamilyDescriptor(cfs[i], *options[i])); + } + DBOptions db_opts = DBOptions(*options[0]); + return DB::Open(db_opts, dbname_, column_families, &handles_, &db_); + } + + Status TryReopen(Options* options = nullptr) { + Close(); + Options opts; + if (options != nullptr) { + opts = *options; + } else { + opts.create_if_missing = true; + } + + return DB::Open(opts, dbname_, &db_); + } + + void Reopen(Options* options = nullptr) { + ASSERT_OK(TryReopen(options)); + } + + void CreateAndReopenWithCF(const std::vector& cfs, + const Options* options = nullptr) { + CreateColumnFamilies(cfs, options); + std::vector cfs_plus_default = cfs; + cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName); + ReopenWithColumnFamilies(cfs_plus_default, options); + } + + DBImpl* dbfull() { + return reinterpret_cast(db_); + } + + Status Put(int cf, const Slice& k, const Slice& v, + WriteOptions wo = WriteOptions()) { + return db_->Put(wo, handles_[cf], k, v); + } + + Status Flush(int cf = 0) { + if (cf == 0) { + return db_->Flush(FlushOptions()); + } else { + return db_->Flush(FlushOptions(), handles_[cf]); + } + } + + + DB* db_; + std::string dbname_; + std::vector handles_; +}; + +class TestFlushListener : public EventListener { + public: + void OnFlushCompleted( + DB* db, const std::string& name, + const std::string& file_path, + bool triggered_writes_slowdown, + bool triggered_writes_stop) override { + flushed_dbs_.push_back(db); + flushed_column_family_names_.push_back(name); + if (triggered_writes_slowdown) { + slowdown_count++; + } + if (triggered_writes_stop) { + stop_count++; + } + } + + std::vector flushed_column_family_names_; + std::vector flushed_dbs_; + int slowdown_count; + int stop_count; +}; + +TEST(EventListenerTest, OnSingleDBFlushTest) { + Options options; + TestFlushListener* listener = new TestFlushListener(); + options.listeners.emplace_back(listener); + std::vector cf_names = { + "pikachu", "ilya", "muromec", "dobrynia", + "nikitich", "alyosha", "popovich"}; + CreateAndReopenWithCF(cf_names, &options); + + ASSERT_OK(Put(1, "pikachu", "pikachu")); + ASSERT_OK(Put(2, "ilya", "ilya")); + ASSERT_OK(Put(3, "muromec", "muromec")); + ASSERT_OK(Put(4, "dobrynia", "dobrynia")); + ASSERT_OK(Put(5, "nikitich", "nikitich")); + ASSERT_OK(Put(6, "alyosha", "alyosha")); + ASSERT_OK(Put(7, "popovich", "popovich")); + for (size_t i = 1; i < 8; ++i) { + Flush(i); + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(listener->flushed_dbs_.size(), i); + ASSERT_EQ(listener->flushed_column_family_names_.size(), i); + } + + // make sure call-back functions are called in the right order + for (size_t i = 0; i < cf_names.size(); ++i) { + ASSERT_EQ(listener->flushed_dbs_[i], db_); + ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]); + } +} + +TEST(EventListenerTest, MultiCF) { + Options options; + TestFlushListener* listener = new TestFlushListener(); + options.listeners.emplace_back(listener); + std::vector cf_names = { + "pikachu", "ilya", "muromec", "dobrynia", + "nikitich", "alyosha", "popovich"}; + CreateAndReopenWithCF(cf_names, &options); + + ASSERT_OK(Put(1, "pikachu", "pikachu")); + ASSERT_OK(Put(2, "ilya", "ilya")); + ASSERT_OK(Put(3, "muromec", "muromec")); + ASSERT_OK(Put(4, "dobrynia", "dobrynia")); + ASSERT_OK(Put(5, "nikitich", "nikitich")); + ASSERT_OK(Put(6, "alyosha", "alyosha")); + ASSERT_OK(Put(7, "popovich", "popovich")); + for (size_t i = 1; i < 8; ++i) { + Flush(i); + ASSERT_EQ(listener->flushed_dbs_.size(), i); + ASSERT_EQ(listener->flushed_column_family_names_.size(), i); + } + + // make sure call-back functions are called in the right order + for (size_t i = 0; i < cf_names.size(); i++) { + ASSERT_EQ(listener->flushed_dbs_[i], db_); + ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]); + } +} + +TEST(EventListenerTest, MultiDBMultiListeners) { + std::vector listeners; + const int kNumDBs = 5; + const int kNumListeners = 10; + for (int i = 0; i < kNumListeners; ++i) { + listeners.emplace_back(new TestFlushListener()); + } + + std::vector cf_names = { + "pikachu", "ilya", "muromec", "dobrynia", + "nikitich", "alyosha", "popovich"}; + + Options options; + options.create_if_missing = true; + for (int i = 0; i < kNumListeners; ++i) { + options.listeners.emplace_back(listeners[i]); + } + DBOptions db_opts(options); + ColumnFamilyOptions cf_opts(options); + + std::vector dbs; + std::vector> vec_handles; + + for (int d = 0; d < kNumDBs; ++d) { + ASSERT_OK(DestroyDB(dbname_ + std::to_string(d), options)); + DB* db; + std::vector handles; + ASSERT_OK(DB::Open(options, dbname_ + std::to_string(d), &db)); + for (size_t c = 0; c < cf_names.size(); ++c) { + ColumnFamilyHandle* handle; + db->CreateColumnFamily(cf_opts, cf_names[c], &handle); + handles.push_back(handle); + } + + vec_handles.push_back(std::move(handles)); + dbs.push_back(db); + } + + for (int d = 0; d < kNumDBs; ++d) { + for (size_t c = 0; c < cf_names.size(); ++c) { + ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c], + cf_names[c], cf_names[c])); + } + } + + for (size_t c = 0; c < cf_names.size(); ++c) { + for (int d = 0; d < kNumDBs; ++d) { + ASSERT_OK(dbs[d]->Flush(FlushOptions(), vec_handles[d][c])); + reinterpret_cast(dbs[d])->TEST_WaitForFlushMemTable(); + } + } + + for (auto* listener : listeners) { + int pos = 0; + for (size_t c = 0; c < cf_names.size(); ++c) { + for (int d = 0; d < kNumDBs; ++d) { + ASSERT_EQ(listener->flushed_dbs_[pos], dbs[d]); + ASSERT_EQ(listener->flushed_column_family_names_[pos], cf_names[c]); + pos++; + } + } + } + + for (auto handles : vec_handles) { + for (auto h : handles) { + delete h; + } + handles.clear(); + } + vec_handles.clear(); + + for (auto db : dbs) { + delete db; + } +} + +TEST(EventListenerTest, DisableBGCompaction) { + Options options; + TestFlushListener* listener = new TestFlushListener(); + const int kSlowdownTrigger = 5; + const int kStopTrigger = 10; + options.level0_slowdown_writes_trigger = kSlowdownTrigger; + options.level0_stop_writes_trigger = kStopTrigger; + options.listeners.emplace_back(listener); + // BG compaction is disabled. Number of L0 files will simply keeps + // increasing in this test. + options.compaction_style = kCompactionStyleNone; + options.compression = kNoCompression; + options.write_buffer_size = 100000; // Small write buffer + + CreateAndReopenWithCF({"pikachu"}, &options); + WriteOptions wopts; + wopts.timeout_hint_us = 100000; + ColumnFamilyMetaData cf_meta; + db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); + // keep writing until writes are forced to stop. + for (int i = 0; static_cast(cf_meta.file_count) < kStopTrigger; ++i) { + Put(1, std::to_string(i), std::string(100000, 'x'), wopts); + db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); + } + ASSERT_GE(listener->slowdown_count, kStopTrigger - kSlowdownTrigger); + ASSERT_GE(listener->stop_count, 1); +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE + +int main(int argc, char** argv) { + return rocksdb::test::RunAllTests(); +} + diff --git a/db/version_set.cc b/db/version_set.cc index cdca14177..1c34b56a5 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -14,13 +14,14 @@ #endif #include +#include #include #include #include #include #include #include -#include +#include #include "db/filename.h" #include "db/log_reader.h" @@ -599,6 +600,49 @@ size_t Version::GetMemoryUsageByTableReaders() { return total_usage; } +void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) { + assert(cf_meta); + assert(cfd_); + + cf_meta->name = cfd_->GetName(); + cf_meta->size = 0; + cf_meta->file_count = 0; + cf_meta->levels.clear(); + + auto* ioptions = cfd_->ioptions(); + auto* vstorage = storage_info(); + + for (int level = 0; level < cfd_->NumberLevels(); level++) { + uint64_t level_size = 0; + cf_meta->file_count += vstorage->LevelFiles(level).size(); + std::vector files; + for (const auto& file : vstorage->LevelFiles(level)) { + uint32_t path_id = file->fd.GetPathId(); + std::string file_path; + if (path_id < ioptions->db_paths.size()) { + file_path = ioptions->db_paths[path_id].path; + } else { + assert(!ioptions->db_paths.empty()); + file_path = ioptions->db_paths.back().path; + } + files.emplace_back( + MakeTableFileName("", file->fd.GetNumber()), + file_path, + file->fd.GetFileSize(), + file->smallest_seqno, + file->largest_seqno, + file->smallest.user_key().ToString(), + file->largest.user_key().ToString(), + file->being_compacted); + level_size += file->fd.GetFileSize(); + } + cf_meta->levels.emplace_back( + level, level_size, std::move(files)); + cf_meta->size += level_size; + } +} + + uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const { // Estimation will be not accurate when: // (1) there is merge keys @@ -2645,41 +2689,22 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { c->column_family_data()->GetName().c_str()); } - // verify files in level - int level = c->level(); - for (int i = 0; i < c->num_input_files(0); i++) { - uint64_t number = c->input(0, i)->fd.GetNumber(); - - // look for this file in the current version - bool found = false; - for (unsigned int j = 0; j < vstorage->files_[level].size(); j++) { - FileMetaData* f = vstorage->files_[level][j]; - if (f->fd.GetNumber() == number) { - found = true; - break; + for (int input = 0; input < c->num_input_levels(); ++input) { + int level = c->level(input); + for (int i = 0; i < c->num_input_files(input); ++i) { + uint64_t number = c->input(input, i)->fd.GetNumber(); + bool found = false; + for (unsigned int j = 0; j < vstorage->files_[level].size(); j++) { + FileMetaData* f = vstorage->files_[level][j]; + if (f->fd.GetNumber() == number) { + found = true; + break; + } } - } - if (!found) { - return false; // input files non existant in current version - } - } - // verify level+1 files - level++; - for (int i = 0; i < c->num_input_files(1); i++) { - uint64_t number = c->input(1, i)->fd.GetNumber(); - - // look for this file in the current version - bool found = false; - for (unsigned int j = 0; j < vstorage->files_[level].size(); j++) { - FileMetaData* f = vstorage->files_[level][j]; - if (f->fd.GetNumber() == number) { - found = true; - break; + if (!found) { + return false; // input files non existent in current version } } - if (!found) { - return false; // input files non existant in current version - } } #endif return true; // everything good diff --git a/db/version_set.h b/db/version_set.h index f9801c7c7..e0d166818 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -420,6 +420,8 @@ class Version { VersionSet* version_set() { return vset_; } + void GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta); + private: friend class VersionSet; @@ -598,7 +600,7 @@ class VersionSet { Status GetMetadataForFile(uint64_t number, int* filelevel, FileMetaData** metadata, ColumnFamilyData** cfd); - void GetLiveFilesMetaData(std::vector* metadata); + void GetLiveFilesMetaData(std::vector *metadata); void GetObsoleteFiles(std::vector* files); @@ -609,6 +611,7 @@ class VersionSet { struct ManifestWriter; friend class Version; + friend class DBImpl; struct LogReporter : public log::Reader::Reporter { Status* status; diff --git a/examples/Makefile b/examples/Makefile index 97a4b2850..7807289ae 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -2,7 +2,7 @@ include ../build_config.mk .PHONY: main clean -all: simple_example column_families_example +all: simple_example column_families_example compact_files_example simple_example: simple_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) @@ -10,5 +10,8 @@ simple_example: simple_example.cc column_families_example: column_families_example.cc $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) -clean: simple_example column_families_example - rm -rf ./simple_example ./column_families_example +compact_files_example: compact_files_example.cc + $(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS) + +clean: simple_example column_families_example compact_files_example + rm -rf ./simple_example ./column_families_example ./compact_files_example diff --git a/examples/compact_files_example.cc b/examples/compact_files_example.cc new file mode 100644 index 000000000..3e7638b7e --- /dev/null +++ b/examples/compact_files_example.cc @@ -0,0 +1,175 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// An example code demonstrating how to use CompactFiles, EventListener, +// and GetColumnFamilyMetaData APIs to implement custom compaction algorithm. + +#include +#include +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/options.h" + +using namespace rocksdb; +std::string kDBPath = "/tmp/rocksdb_compact_files_example"; +class CompactionTask; + +// This is an example interface of external-compaction algorithm. +// Compaction algorithm can be implemented outside the core-RocksDB +// code by using the pluggable compaction APIs that RocksDb provides. +class Compactor : public EventListener { + public: + // Picks and returns a compaction task given the specified DB + // and column family. It is the caller's responsibility to + // destroy the returned CompactionTask. Returns "nullptr" + // if it cannot find a proper compaction task. + virtual CompactionTask* PickCompaction( + DB* db, const std::string& cf_name) = 0; + + // Schedule and run the specified compaction task in background. + virtual void ScheduleCompaction(CompactionTask *task) = 0; +}; + +// Example structure that describes a compaction task. +struct CompactionTask { + CompactionTask( + DB* db, Compactor* compactor, + const std::string& column_family_name, + const std::vector& input_file_names, + const int output_level, + const CompactionOptions& compact_options, + bool retry_on_fail) + : db(db), + compactor(compactor), + column_family_name(column_family_name), + input_file_names(input_file_names), + output_level(output_level), + compact_options(compact_options), + retry_on_fail(false) {} + DB* db; + Compactor* compactor; + const std::string& column_family_name; + std::vector input_file_names; + int output_level; + CompactionOptions compact_options; + bool retry_on_fail; +}; + +// A simple compaction algorithm that always compacts everything +// to the highest level whenever possible. +class FullCompactor : public Compactor { + public: + explicit FullCompactor(const Options options) : options_(options) { + compact_options_.compression = options_.compression; + compact_options_.output_file_size_limit = + options_.target_file_size_base; + } + + // When flush happens, it determins whether to trigger compaction. + // If triggered_writes_stop is true, it will also set the retry + // flag of compaction-task to true. + void OnFlushCompleted( + DB* db, const std::string& cf_name, + const std::string& file_path, + bool triggered_writes_slowdown, + bool triggered_writes_stop) override { + CompactionTask* task = PickCompaction(db, cf_name); + if (task != nullptr) { + if (triggered_writes_stop) { + task->retry_on_fail = true; + } + // Schedule compaction in a different thread. + ScheduleCompaction(task); + } + } + + // Always pick a compaction which includes all files whenever possible. + CompactionTask* PickCompaction( + DB* db, const std::string& cf_name) override { + ColumnFamilyMetaData cf_meta; + db->GetColumnFamilyMetaData(&cf_meta); + + std::vector input_file_names; + for (auto level : cf_meta.levels) { + for (auto file : level.files) { + if (file.being_compacted) { + return nullptr; + } + input_file_names.push_back(file.name); + } + } + return new CompactionTask( + db, this, cf_name, input_file_names, + options_.num_levels - 1, compact_options_, false); + } + + // Schedule the specified compaction task in background. + void ScheduleCompaction(CompactionTask* task) override { + options_.env->Schedule(&FullCompactor::CompactFiles, task); + } + + static void CompactFiles(void* arg) { + CompactionTask* task = reinterpret_cast(arg); + assert(task); + assert(task->db); + Status s = task->db->CompactFiles( + task->compact_options, + task->input_file_names, + task->output_level); + printf("CompactFiles() finished with status %s\n", s.ToString().c_str()); + if (!s.ok() && !s.IsIOError() && task->retry_on_fail) { + // If a compaction task with its retry_on_fail=true failed, + // try to schedule another compaction in case the reason + // is not an IO error. + CompactionTask* new_task = task->compactor->PickCompaction( + task->db, task->column_family_name); + task->compactor->ScheduleCompaction(new_task); + } + // release the task + delete task; + } + + private: + Options options_; + CompactionOptions compact_options_; +}; + +int main() { + Options options; + options.create_if_missing = true; + // Disable RocksDB background compaction. + options.compaction_style = kCompactionStyleNone; + // Small slowdown and stop trigger for experimental purpose. + options.level0_slowdown_writes_trigger = 3; + options.level0_stop_writes_trigger = 5; + options.IncreaseParallelism(5); + options.listeners.emplace_back(new FullCompactor(options)); + + DB* db = nullptr; + DestroyDB(kDBPath, options); + Status s = DB::Open(options, kDBPath, &db); + assert(s.ok()); + assert(db); + + // if background compaction is not working, write will stall + // because of options.level0_stop_writes_trigger + for (int i = 1000; i < 99999; ++i) { + db->Put(WriteOptions(), std::to_string(i), + std::string(500, 'a' + (i % 26))); + } + + // verify the values are still there + std::string value; + for (int i = 1000; i < 99999; ++i) { + db->Get(ReadOptions(), std::to_string(i), + &value); + assert(value == std::string(500, 'a' + (i % 26))); + } + + // close the db. + delete db; + + return 0; +} diff --git a/include/rocksdb/comparator.h b/include/rocksdb/comparator.h index 8e7366752..5b7dc1021 100644 --- a/include/rocksdb/comparator.h +++ b/include/rocksdb/comparator.h @@ -63,7 +63,7 @@ class Comparator { extern const Comparator* BytewiseComparator(); // Return a builtin comparator that uses reverse lexicographic byte-wise -// ordering. +// ordering. extern const Comparator* ReverseBytewiseComparator(); } // namespace rocksdb diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 65b517f54..3025d7ebc 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -15,19 +15,34 @@ #include #include #include +#include "rocksdb/metadata.h" #include "rocksdb/version.h" #include "rocksdb/iterator.h" #include "rocksdb/options.h" #include "rocksdb/types.h" #include "rocksdb/transaction_log.h" +#include "rocksdb/listener.h" namespace rocksdb { +struct Options; +struct DBOptions; +struct ColumnFamilyOptions; +struct ReadOptions; +struct WriteOptions; +struct FlushOptions; +struct CompactionOptions; +struct TableProperties; +class WriteBatch; +class Env; +class EventListener; + using std::unique_ptr; class ColumnFamilyHandle { public: virtual ~ColumnFamilyHandle() {} + virtual const std::string& GetName() const = 0; }; extern const std::string kDefaultColumnFamilyName; @@ -44,27 +59,6 @@ struct ColumnFamilyDescriptor { static const int kMajorVersion = __ROCKSDB_MAJOR__; static const int kMinorVersion = __ROCKSDB_MINOR__; -struct Options; -struct ReadOptions; -struct WriteOptions; -struct FlushOptions; -struct TableProperties; -class WriteBatch; -class Env; - -// Metadata associated with each SST file. -struct LiveFileMetaData { - std::string column_family_name; // Name of the column family - std::string db_path; - std::string name; // Name of the file - int level; // Level at which this file resides. - size_t size; // File size in bytes. - std::string smallestkey; // Smallest user defined key in the file. - std::string largestkey; // Largest user defined key in the file. - SequenceNumber smallest_seqno; // smallest seqno in file - SequenceNumber largest_seqno; // largest seqno in file -}; - // Abstract handle to particular state of a DB. // A Snapshot is an immutable object and can therefore be safely // accessed from multiple threads without any external synchronization. @@ -370,6 +364,26 @@ class DB { return SetOptions(DefaultColumnFamily(), new_options); } + // CompactFiles() inputs a list of files specified by file numbers + // and compacts them to the specified level. Note that the behavior + // is different from CompactRange in that CompactFiles() will + // perform the compaction job using the CURRENT thread. + // + // @see GetDataBaseMetaData + // @see GetColumnFamilyMetaData + virtual Status CompactFiles( + const CompactionOptions& compact_options, + ColumnFamilyHandle* column_family, + const std::vector& input_file_names, + const int output_level, const int output_path_id = -1) = 0; + + virtual Status CompactFiles( + const CompactionOptions& compact_options, + const std::vector& input_file_names, + const int output_level, const int output_path_id = -1) { + return CompactFiles(compact_options, DefaultColumnFamily(), + input_file_names, output_level, output_path_id); + } // Number of levels used for this DB. virtual int NumberLevels(ColumnFamilyHandle* column_family) = 0; virtual int NumberLevels() { return NumberLevels(DefaultColumnFamily()); } @@ -476,6 +490,21 @@ class DB { // and end key virtual void GetLiveFilesMetaData(std::vector* metadata) {} + // Obtains the meta data of the specified column family of the DB. + // Status::NotFound() will be returned if the current DB does not have + // any column family match the specified name. + // + // If cf_name is not specified, then the metadata of the default + // column family will be returned. + virtual void GetColumnFamilyMetaData( + ColumnFamilyHandle* column_family, + ColumnFamilyMetaData* metadata) {} + + // Get the metadata of the default column family. + virtual void GetColumnFamilyMetaData( + ColumnFamilyMetaData* metadata) { + GetColumnFamilyMetaData(DefaultColumnFamily(), metadata); + } #endif // ROCKSDB_LITE // Sets the globally unique ID created at database creation time by invoking diff --git a/include/rocksdb/immutable_options.h b/include/rocksdb/immutable_options.h index 49a136c07..02bd006f3 100644 --- a/include/rocksdb/immutable_options.h +++ b/include/rocksdb/immutable_options.h @@ -90,6 +90,10 @@ struct ImmutableCFOptions { Options::AccessHint access_hint_on_compaction_start; int num_levels; + + // A vector of EventListeners which call-back functions will be called + // when specific RocksDB event happens. + std::vector> listeners; }; } // namespace rocksdb diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h new file mode 100644 index 000000000..33e5fc51f --- /dev/null +++ b/include/rocksdb/listener.h @@ -0,0 +1,65 @@ +// Copyright (c) 2014 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include "rocksdb/status.h" + +namespace rocksdb { + +class DB; +class Status; + +// EventListener class contains a set of call-back functions that will +// be called when specific RocksDB event happens such as flush. It can +// be used as a building block for developing custom features such as +// stats-collector or external compaction algorithm. +// +// Note that call-back functions should not run for an extended period of +// time before the function returns, otherwise RocksDB may be blocked. +// For example, it is not suggested to do DB::CompactFiles() (as it may +// run for a long while) or issue many of DB::Put() (as Put may be blocked +// in certain cases) in the same thread in the EventListener callback. +// However, doing DB::CompactFiles() and DB::Put() in another thread is +// considered safe. +// +// [Threading] All EventListener callback will be called using the +// actual thread that involves in that specific event. For example, it +// is the RocksDB background flush thread that does the actual flush to +// call EventListener::OnFlushCompleted(). +class EventListener { + public: + // A call-back function to RocksDB which will be called whenever a + // registered RocksDB flushes a file. The default implementation is + // no-op. + // + // Note that the this function must be implemented in a way such that + // it should not run for an extended period of time before the function + // returns. Otherwise, RocksDB may be blocked. + // + // @param db a pointer to the rocksdb instance which just flushed + // a memtable to disk. + // @param column_family_id the id of the flushed column family. + // @param file_path the path to the newly created file. + // @param triggered_writes_slowdown true when rocksdb is currently + // slowing-down all writes to prevent creating too many Level 0 + // files as compaction seems not able to catch up the write request + // speed. This indicates that there're too many files in Level 0. + // @param triggered_writes_stop true when rocksdb is currently blocking + // any writes to prevent creating more L0 files. This indicates that + // there're too many files in level 0. Compactions should try to + // compact L0 files down to lower levels as soon as possible. + virtual void OnFlushCompleted( + DB* db, const std::string& column_family_name, + const std::string& file_path, + bool triggered_writes_slowdown, + bool triggered_writes_stop) {} +}; + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/include/rocksdb/metadata.h b/include/rocksdb/metadata.h new file mode 100644 index 000000000..96f70ed85 --- /dev/null +++ b/include/rocksdb/metadata.h @@ -0,0 +1,90 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include +#include + +#include "rocksdb/types.h" + +#pragma once + +namespace rocksdb { +struct ColumnFamilyMetaData; +struct LevelMetaData; +struct SstFileMetaData; + +// The metadata that describes a column family. +struct ColumnFamilyMetaData { + ColumnFamilyMetaData() : size(0), name("") {} + ColumnFamilyMetaData(const std::string& name, uint64_t size, + const std::vector&& levels) : + size(size), name(name), levels(levels) {} + + // The size of this column family in bytes, which is equal to the sum of + // the file size of its "levels". + uint64_t size; + // The number of files in this column family. + size_t file_count; + // The name of the column family. + std::string name; + // The metadata of all levels in this column family. + std::vector levels; +}; + +// The metadata that describes a level. +struct LevelMetaData { + LevelMetaData(int level, uint64_t size, + const std::vector&& files) : + level(level), size(size), + files(files) {} + + // The level which this meta data describes. + const int level; + // The size of this level in bytes, which is equal to the sum of + // the file size of its "files". + const uint64_t size; + // The metadata of all sst files in this level. + const std::vector files; +}; + +// The metadata that describes a SST file. +struct SstFileMetaData { + SstFileMetaData() {} + SstFileMetaData(const std::string& file_name, + const std::string& path, uint64_t size, + SequenceNumber smallest_seqno, + SequenceNumber largest_seqno, + const std::string& smallestkey, + const std::string& largestkey, + bool being_compacted) : + size(size), name(file_name), + db_path(path), smallest_seqno(smallest_seqno), largest_seqno(largest_seqno), + smallestkey(smallestkey), largestkey(largestkey), + being_compacted(being_compacted) {} + + // File size in bytes. + uint64_t size; + // The name of the file. + std::string name; + // The full path where the file locates. + std::string db_path; + + SequenceNumber smallest_seqno; // Smallest sequence number in file. + SequenceNumber largest_seqno; // Largest sequence number in file. + std::string smallestkey; // Smallest user defined key in the file. + std::string largestkey; // Largest user defined key in the file. + bool being_compacted; // true if the file is currently being compacted. +}; + +// The full set of metadata associated with each SST file. +struct LiveFileMetaData : SstFileMetaData { + std::string column_family_name; // Name of the column family + int level; // Level at which this file resides. +}; + + + +} // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b3ce77255..1656c5c41 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -10,13 +10,16 @@ #define STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_ #include +#include #include #include #include +#include #include #include #include "rocksdb/version.h" +#include "rocksdb/listener.h" #include "rocksdb/universal_compaction.h" namespace rocksdb { @@ -55,7 +58,9 @@ enum CompressionType : char { enum CompactionStyle : char { kCompactionStyleLevel = 0x0, // level based compaction style kCompactionStyleUniversal = 0x1, // Universal compaction style - kCompactionStyleFIFO = 0x2, // FIFO compaction style + kCompactionStyleFIFO = 0x2, // FIFO compaction style + kCompactionStyleNone = 0x3, // Disable background compaction. Compaction + // jobs are submitted via CompactFiles() }; @@ -586,6 +591,10 @@ struct ColumnFamilyOptions { // Default: 2 uint32_t min_partial_merge_operands; + // A vector of EventListeners which call-back functions will be called + // when specific RocksDB event happens. + std::vector> listeners; + // Create ColumnFamilyOptions with default values for all fields ColumnFamilyOptions(); // Create ColumnFamilyOptions from Options @@ -1067,6 +1076,19 @@ extern Options GetOptions(size_t total_write_buffer_limit, int write_amplification_threshold = 32, uint64_t target_db_size = 68719476736 /* 64GB */); +// CompactionOptions are used in CompactFiles() call. +struct CompactionOptions { + // Compaction output compression type + // Default: snappy + CompressionType compression; + // Compaction will create files of size `output_file_size_limit`. + // Default: MAX, which means that compaction will create a single file + uint64_t output_file_size_limit; + + CompactionOptions() + : compression(kSnappyCompression), + output_file_size_limit(std::numeric_limits::max()) {} +}; } // namespace rocksdb #endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_ diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 4be30c1f4..177d705f3 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -61,6 +61,9 @@ class Status { static Status Incomplete(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kIncomplete, msg, msg2); } + static Status ShutdownInProgress() { + return Status(kShutdownInProgress); + } static Status ShutdownInProgress(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kShutdownInProgress, msg, msg2); @@ -71,6 +74,12 @@ class Status { static Status TimedOut(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kTimedOut, msg, msg2); } + static Status Aborted() { + return Status(kAborted); + } + static Status Aborted(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kAborted, msg, msg2); + } // Returns true iff the status indicates success. bool ok() const { return code() == kOk; } @@ -101,6 +110,8 @@ class Status { bool IsTimedOut() const { return code() == kTimedOut; } + bool IsAborted() const { return code() == kAborted; } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; @@ -115,7 +126,8 @@ class Status { kMergeInProgress = 6, kIncomplete = 7, kShutdownInProgress = 8, - kTimedOut = 9 + kTimedOut = 9, + kAborted = 10 }; Code code() const { diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 50c6a6484..7bdf9928e 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -133,6 +133,17 @@ class StackableDB : public DB { target_level, target_path_id); } + using DB::CompactFiles; + virtual Status CompactFiles( + const CompactionOptions& compact_options, + ColumnFamilyHandle* column_family, + const std::vector& input_file_names, + const int output_level, const int output_path_id = -1) override { + return db_->CompactFiles( + compact_options, column_family, input_file_names, + output_level, output_path_id); + } + using DB::NumberLevels; virtual int NumberLevels(ColumnFamilyHandle* column_family) override { return db_->NumberLevels(column_family); @@ -170,6 +181,8 @@ class StackableDB : public DB { return db_->Flush(fopts, column_family); } +#ifndef ROCKSDB_LITE + virtual Status DisableFileDeletions() override { return db_->DisableFileDeletions(); } @@ -183,6 +196,14 @@ class StackableDB : public DB { db_->GetLiveFilesMetaData(metadata); } + virtual void GetColumnFamilyMetaData( + ColumnFamilyHandle *column_family, + ColumnFamilyMetaData* cf_meta) override { + db_->GetColumnFamilyMetaData(column_family, cf_meta); + } + +#endif // ROCKSDB_LITE + virtual Status GetLiveFiles(std::vector& vec, uint64_t* mfs, bool flush_memtable = true) override { return db_->GetLiveFiles(vec, mfs, flush_memtable); diff --git a/util/options.cc b/util/options.cc index 03ffb0a6d..bdcdcdf2b 100644 --- a/util/options.cc +++ b/util/options.cc @@ -64,7 +64,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) compression_per_level(options.compression_per_level), compression_opts(options.compression_opts), access_hint_on_compaction_start(options.access_hint_on_compaction_start), - num_levels(options.num_levels) {} + num_levels(options.num_levels), + listeners(options.listeners) {} ColumnFamilyOptions::ColumnFamilyOptions() : comparator(BytewiseComparator()), @@ -112,7 +113,8 @@ ColumnFamilyOptions::ColumnFamilyOptions() memtable_prefix_bloom_huge_page_tlb_size(0), bloom_locality(0), max_successive_merges(0), - min_partial_merge_operands(2) { + min_partial_merge_operands(2), + listeners() { assert(memtable_factory.get() != nullptr); } @@ -172,7 +174,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) options.memtable_prefix_bloom_huge_page_tlb_size), bloom_locality(options.bloom_locality), max_successive_merges(options.max_successive_merges), - min_partial_merge_operands(options.min_partial_merge_operands) { + min_partial_merge_operands(options.min_partial_merge_operands), + listeners(options.listeners) { assert(memtable_factory.get() != nullptr); if (max_bytes_for_level_multiplier_additional.size() < static_cast(num_levels)) { diff --git a/util/status.cc b/util/status.cc index 8eca3a5a8..fa8e18acf 100644 --- a/util/status.cc +++ b/util/status.cc @@ -70,6 +70,9 @@ std::string Status::ToString() const { case kTimedOut: type = "Operation timed out: "; break; + case kAborted: + type = "Operation aborted: "; + break; default: snprintf(tmp, sizeof(tmp), "Unknown code(%d): ", static_cast(code()));