diff --git a/HISTORY.md b/HISTORY.md index 8ba8face6..a98e9c2aa 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,7 @@ * PinnableSlice releases the pinned resources that contain the value when it is destructed or when ::Reset() is called on it. * The old API that accepts std::string, although discouraged, is still supported. * Replace Options::use_direct_writes with Options::use_direct_io_for_flush_and_compaction. Read Direct IO wiki for details. +* Added CompactionEventListener and EventListener::OnFlushBegin interfaces. ### New Features * Memtable flush can be avoided during checkpoint creation if total log file size is smaller than a threshold specified by the user. diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index acbf1003c..f94ab344c 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -10,12 +10,34 @@ namespace rocksdb { +CompactionEventListener::CompactionListenerValueType fromInternalValueType( + ValueType vt) { + switch (vt) { + case kTypeDeletion: + return CompactionEventListener::CompactionListenerValueType::kDelete; + case kTypeValue: + return CompactionEventListener::CompactionListenerValueType::kValue; + case kTypeMerge: + return CompactionEventListener::CompactionListenerValueType:: + kMergeOperand; + case kTypeSingleDeletion: + return CompactionEventListener::CompactionListenerValueType:: + kSingleDelete; + case kTypeRangeDeletion: + return CompactionEventListener::CompactionListenerValueType::kRangeDelete; + default: + assert(false); + return CompactionEventListener::CompactionListenerValueType::kInvalid; + } +} + CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, SequenceNumber last_sequence, std::vector* snapshots, SequenceNumber earliest_write_conflict_snapshot, Env* env, bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, const Compaction* compaction, const CompactionFilter* compaction_filter, + CompactionEventListener* compaction_listener, const std::atomic* shutting_down) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, @@ -23,7 +45,7 @@ CompactionIterator::CompactionIterator( range_del_agg, std::unique_ptr( compaction ? new CompactionProxy(compaction) : nullptr), - compaction_filter, shutting_down) {} + compaction_filter, compaction_listener, shutting_down) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -32,6 +54,7 @@ CompactionIterator::CompactionIterator( bool expect_valid_internal_key, RangeDelAggregator* range_del_agg, std::unique_ptr compaction, const CompactionFilter* compaction_filter, + CompactionEventListener* compaction_listener, const std::atomic* shutting_down) : input_(input), cmp_(cmp), @@ -43,7 +66,9 @@ CompactionIterator::CompactionIterator( range_del_agg_(range_del_agg), compaction_(std::move(compaction)), compaction_filter_(compaction_filter), + compaction_listener_(compaction_listener), shutting_down_(shutting_down), + ignore_snapshots_(false), merge_out_iter_(merge_helper_) { assert(compaction_filter_ == nullptr || compaction_ != nullptr); bottommost_level_ = @@ -62,8 +87,8 @@ CompactionIterator::CompactionIterator( earliest_snapshot_ = snapshots_->at(0); latest_snapshot_ = snapshots_->back(); } - if (compaction_filter_ != nullptr && compaction_filter_->IgnoreSnapshots()) { - ignore_snapshots_ = true; + if (compaction_filter_ != nullptr) { + if (compaction_filter_->IgnoreSnapshots()) ignore_snapshots_ = true; } else { ignore_snapshots_ = false; } @@ -188,6 +213,12 @@ void CompactionIterator::NextFromInput() { current_user_key_sequence_ = kMaxSequenceNumber; current_user_key_snapshot_ = 0; + if (compaction_listener_) { + compaction_listener_->OnCompaction(compaction_->level(), ikey_.user_key, + fromInternalValueType(ikey_.type), + value_, ikey_.sequence, true); + } + // apply the compaction filter to the first occurrence of the user key if (compaction_filter_ != nullptr && ikey_.type == kTypeValue && (visible_at_tip_ || ikey_.sequence > latest_snapshot_ || @@ -235,6 +266,12 @@ void CompactionIterator::NextFromInput() { } } } else { + if (compaction_listener_) { + compaction_listener_->OnCompaction(compaction_->level(), ikey_.user_key, + fromInternalValueType(ikey_.type), + value_, ikey_.sequence, false); + } + // Update the current key to reflect the new sequence number/type without // copying the user key. // TODO(rven): Compaction filter does not process keys in this path @@ -394,7 +431,7 @@ void CompactionIterator::NextFromInput() { // is the same as the visibility of a previous instance of the // same key, then this kv is not visible in any snapshot. // Hidden by an newer entry for same user key - // TODO: why not > ? + // TODO(noetzli): why not > ? // // Note: Dropping this key will not affect TransactionDB write-conflict // checking since there has already been a record returned for this key diff --git a/db/compaction_iterator.h b/db/compaction_iterator.h index 16cb2fb10..efe50811c 100644 --- a/db/compaction_iterator.h +++ b/db/compaction_iterator.h @@ -20,6 +20,8 @@ namespace rocksdb { +class CompactionEventListener; + class CompactionIterator { public: // A wrapper around Compaction. Has a much smaller interface, only what @@ -60,6 +62,7 @@ class CompactionIterator { RangeDelAggregator* range_del_agg, const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, + CompactionEventListener* compaction_listener = nullptr, const std::atomic* shutting_down = nullptr); // Constructor with custom CompactionProxy, used for tests. @@ -71,6 +74,7 @@ class CompactionIterator { RangeDelAggregator* range_del_agg, std::unique_ptr compaction, const CompactionFilter* compaction_filter = nullptr, + CompactionEventListener* compaction_listener = nullptr, const std::atomic* shutting_down = nullptr); ~CompactionIterator(); @@ -124,6 +128,7 @@ class CompactionIterator { RangeDelAggregator* range_del_agg_; std::unique_ptr compaction_; const CompactionFilter* compaction_filter_; + CompactionEventListener* compaction_listener_; const std::atomic* shutting_down_; bool bottommost_level_; bool valid_ = false; diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index 65601e183..37a031d15 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -189,7 +189,7 @@ class CompactionIteratorTest : public testing::Test { c_iter_.reset(new CompactionIterator( iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_, kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get(), - std::move(compaction), filter, &shutting_down_)); + std::move(compaction), filter, nullptr, &shutting_down_)); } void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); } diff --git a/db/compaction_job.cc b/db/compaction_job.cc index bcf1155d5..fbb87d80f 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -739,12 +739,21 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { input->SeekToFirst(); } + // we allow only 1 compaction event listener. Used by blob storage + CompactionEventListener* comp_event_listener = nullptr; + for (auto& celitr : cfd->ioptions()->listeners) { + comp_event_listener = celitr->GetCompactionEventListener(); + if (comp_event_listener != nullptr) { + break; + } + } + Status status; sub_compact->c_iter.reset(new CompactionIterator( input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), &existing_snapshots_, earliest_write_conflict_snapshot_, env_, false, range_del_agg.get(), sub_compact->compaction, compaction_filter, - shutting_down_)); + comp_event_listener, shutting_down_)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && diff --git a/db/db_impl.h b/db/db_impl.h index 0a97fd170..2b75d53bb 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -557,6 +557,10 @@ class DBImpl : public DB { Status RenameTempFileToOptionsFile(const std::string& file_name); Status DeleteObsoleteOptionsFiles(); + void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, + const MutableCFOptions& mutable_cf_options, + int job_id, TableProperties prop); + void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, int job_id, TableProperties prop); @@ -1095,13 +1099,6 @@ class DBImpl : public DB { DBImpl(const DBImpl&); void operator=(const DBImpl&); - // Return the earliest snapshot where seqno is visible. - // Store the snapshot right before that, if any, in prev_snapshot - inline SequenceNumber findEarliestVisibleSnapshot( - SequenceNumber in, - std::vector& snapshots, - SequenceNumber* prev_snapshot); - // Background threads call this function, which is just a wrapper around // the InstallSuperVersion() function. Background threads carry // job_context which can have new_superversion already diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 245e5727d..dd3e4a7d4 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -90,6 +90,12 @@ Status DBImpl::FlushMemTableToOutputFile( flush_job.PickMemTable(); +#ifndef ROCKSDB_LITE + // may temporarily unlock and lock the mutex. + NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id, + flush_job.GetTableProperties()); +#endif // ROCKSDB_LITE + Status s; if (logfile_number_ > 0 && versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 0) { @@ -156,6 +162,49 @@ Status DBImpl::FlushMemTableToOutputFile( return s; } +void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, + const MutableCFOptions& mutable_cf_options, + int job_id, TableProperties prop) { +#ifndef ROCKSDB_LITE + if (immutable_db_options_.listeners.size() == 0U) { + return; + } + mutex_.AssertHeld(); + if (shutting_down_.load(std::memory_order_acquire)) { + return; + } + bool triggered_writes_slowdown = + (cfd->current()->storage_info()->NumLevelFiles(0) >= + mutable_cf_options.level0_slowdown_writes_trigger); + bool triggered_writes_stop = + (cfd->current()->storage_info()->NumLevelFiles(0) >= + mutable_cf_options.level0_stop_writes_trigger); + // release lock while notifying events + mutex_.Unlock(); + { + FlushJobInfo info; + info.cf_name = cfd->GetName(); + // TODO(yhchiang): make db_paths dynamic in case flush does not + // go to L0 in the future. + info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path, + file_meta->fd.GetNumber()); + info.thread_id = env_->GetThreadID(); + info.job_id = job_id; + info.triggered_writes_slowdown = triggered_writes_slowdown; + info.triggered_writes_stop = triggered_writes_stop; + info.smallest_seqno = file_meta->smallest_seqno; + info.largest_seqno = file_meta->largest_seqno; + info.table_properties = prop; + for (auto listener : immutable_db_options_.listeners) { + listener->OnFlushBegin(this, info); + } + } + mutex_.Lock(); +// no need to signal bg_cv_ as it will be signaled at the end of the +// flush process. +#endif // ROCKSDB_LITE +} + void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index f24dd97c4..ea6095fcd 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -444,6 +444,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) { case kIdentityFile: case kMetaDatabase: case kOptionsFile: + case kBlobFile: keep = true; break; } diff --git a/db/log_format.h b/db/log_format.h index cf48a202f..09d5fead4 100644 --- a/db/log_format.h +++ b/db/log_format.h @@ -34,8 +34,8 @@ static const int kMaxRecordType = kRecyclableLastType; static const unsigned int kBlockSize = 32768; -// Header is checksum (4 bytes), type (1 byte), length (2 bytes). -static const int kHeaderSize = 4 + 1 + 2; +// Header is checksum (4 bytes), length (2 bytes), type (1 byte) +static const int kHeaderSize = 4 + 2 + 1; // Recyclable header is checksum (4 bytes), type (1 byte), log number // (4 bytes), length (2 bytes). diff --git a/db/memtable.cc b/db/memtable.cc index 206372951..c44558861 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -724,9 +724,9 @@ void MemTable::Update(SequenceNumber seq, uint32_t new_size = static_cast(value.size()); // Update value, if new value size <= previous value size - if (new_size <= prev_size ) { - char* p = EncodeVarint32(const_cast(key_ptr) + key_length, - new_size); + if (new_size <= prev_size) { + char* p = + EncodeVarint32(const_cast(key_ptr) + key_length, new_size); WriteLock wl(GetLock(lkey.user_key())); memcpy(p, value.data(), value.size()); assert((unsigned)((p + value.size()) - entry) == diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 6370e33d3..f1d6e666b 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -163,6 +163,20 @@ class Env { unique_ptr* result, const EnvOptions& options) = 0; + // Create an object that writes to a new file with the specified + // name. Deletes any existing file with the same name and creates a + // new file. On success, stores a pointer to the new file in + // *result and returns OK. On failure stores nullptr in *result and + // returns non-OK. + // + // The returned file will only be accessed by one thread at a time. + virtual Status ReopenWritableFile(const std::string& fname, + unique_ptr* result, + const EnvOptions& options) { + Status s; + return s; + } + // Reuse an existing file by renaming it and opening it as writable. virtual Status ReuseWritableFile(const std::string& fname, const std::string& old_fname, @@ -454,6 +468,8 @@ class SequentialFile { // aligned buffer for Direct I/O virtual size_t GetRequiredBufferAlignment() const { return kDefaultPageSize; } + virtual void Rewind() {} + // Remove any kind of caching of data from the offset to offset+length // of this file. If the length is 0, then it refers to the end of file. // If the system is not caching the file contents, then this is a noop. @@ -918,6 +934,11 @@ class EnvWrapper : public Env { const EnvOptions& options) override { return target_->NewWritableFile(f, r, options); } + Status ReopenWritableFile(const std::string& fname, + unique_ptr* result, + const EnvOptions& options) override { + return target_->ReopenWritableFile(fname, result, options); + } Status ReuseWritableFile(const std::string& fname, const std::string& old_fname, unique_ptr* r, diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 37a3961bb..d6a70c156 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -183,6 +183,28 @@ struct ExternalFileIngestionInfo { TableProperties table_properties; }; +// A call-back function to RocksDB which will be called when the compaction +// iterator is compacting values. It is mean to be returned from +// EventListner::GetCompactionEventListner() at the beginning of compaction +// job. +class CompactionEventListener { + public: + enum CompactionListenerValueType { + kValue, + kMergeOperand, + kDelete, + kSingleDelete, + kRangeDelete, + kInvalid, + }; + + virtual void OnCompaction(int level, const Slice& key, + CompactionListenerValueType value_type, + const Slice& existing_value, + const SequenceNumber& sn, bool is_new) = 0; + + virtual ~CompactionEventListener() = default; +}; // EventListener class contains a set of call-back functions that will // be called when specific RocksDB event happens such as flush. It can @@ -225,6 +247,16 @@ class EventListener { virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& /*flush_job_info*/) {} + // A call-back function to RocksDB which will be called before a + // RocksDB starts to flush memtables. The default implementation is + // no-op. + // + // Note that the this function must be implemented in a way such that + // it should not run for an extended period of time before the function + // returns. Otherwise, RocksDB may be blocked. + virtual void OnFlushBegin(DB* /*db*/, + const FlushJobInfo& /*flush_job_info*/) {} + // A call-back function for RocksDB which will be called whenever // a SST file is deleted. Different from OnCompactionCompleted and // OnFlushCompleted, this call-back is designed for external logging @@ -314,6 +346,12 @@ class EventListener { virtual void OnExternalFileIngested( DB* /*db*/, const ExternalFileIngestionInfo& /*info*/) {} + // Factory method to return CompactionEventListener. If multiple listeners + // provides CompactionEventListner, only the first one will be used. + virtual CompactionEventListener* GetCompactionEventListener() { + return nullptr; + } + virtual ~EventListener() {} }; diff --git a/util/coding.h b/util/coding.h index 336b7880b..91f8bbebc 100644 --- a/util/coding.h +++ b/util/coding.h @@ -48,6 +48,7 @@ extern void PutLengthPrefixedSliceParts(std::string* dst, // Standard Get... routines parse a value from the beginning of a Slice // and advance the slice past the parsed value. extern bool GetFixed64(Slice* input, uint64_t* value); +extern bool GetFixed32(Slice* input, uint32_t* value); extern bool GetVarint32(Slice* input, uint32_t* value); extern bool GetVarint64(Slice* input, uint64_t* value); extern bool GetLengthPrefixedSlice(Slice* input, Slice* result); @@ -271,6 +272,15 @@ inline bool GetFixed64(Slice* input, uint64_t* value) { return true; } +inline bool GetFixed32(Slice* input, uint32_t* value) { + if (input->size() < sizeof(uint32_t)) { + return false; + } + *value = DecodeFixed32(input->data()); + input->remove_prefix(sizeof(uint32_t)); + return true; +} + inline bool GetVarint32(Slice* input, uint32_t* value) { const char* p = input->data(); const char* limit = p + input->size(); diff --git a/util/filename.cc b/util/filename.cc index 101767e20..c4d4a7651 100644 --- a/util/filename.cc +++ b/util/filename.cc @@ -27,6 +27,7 @@ namespace rocksdb { static const std::string kRocksDbTFileExt = "sst"; static const std::string kLevelDbTFileExt = "ldb"; +static const std::string kRocksDBBlobFileExt = "blob"; // Given a path, flatten the path name by replacing all chars not in // {[0-9,a-z,A-Z,-,_,.]} with _. And append '_LOG\0' at the end. @@ -74,6 +75,11 @@ std::string LogFileName(const std::string& name, uint64_t number) { return MakeFileName(name, number, "log"); } +std::string BlobFileName(const std::string& blobdirname, uint64_t number) { + assert(number > 0); + return MakeFileName(blobdirname, number, kRocksDBBlobFileExt.c_str()); +} + std::string ArchivalDirectory(const std::string& dir) { return dir + "/" + ARCHIVAL_DIR; } @@ -220,7 +226,7 @@ std::string IdentityFileName(const std::string& dbname) { // dbname/ // dbname/.old.[0-9]+ // dbname/MANIFEST-[0-9]+ -// dbname/[0-9]+.(log|sst) +// dbname/[0-9]+.(log|sst|blob) // dbname/METADB-[0-9]+ // dbname/OPTIONS-[0-9]+ // dbname/OPTIONS-[0-9]+.dbtmp @@ -335,6 +341,8 @@ bool ParseFileName(const std::string& fname, uint64_t* number, } else if (suffix == Slice(kRocksDbTFileExt) || suffix == Slice(kLevelDbTFileExt)) { *type = kTableFile; + } else if (suffix == Slice(kRocksDBBlobFileExt)) { + *type = kBlobFile; } else if (suffix == Slice(kTempFileNameSuffix)) { *type = kTempFile; } else { diff --git a/util/filename.h b/util/filename.h index e48893266..67a0bfaa6 100644 --- a/util/filename.h +++ b/util/filename.h @@ -38,7 +38,8 @@ enum FileType { kInfoLogFile, // Either the current one, or an old one kMetaDatabase, kIdentityFile, - kOptionsFile + kOptionsFile, + kBlobFile }; // Return the name of the log file with the specified number @@ -46,6 +47,8 @@ enum FileType { // "dbname". extern std::string LogFileName(const std::string& dbname, uint64_t number); +extern std::string BlobFileName(const std::string& bdirname, uint64_t number); + static const std::string ARCHIVAL_DIR = "archive"; extern std::string ArchivalDirectory(const std::string& dbname);