From 3a276b0cbee3ca9d27f603775ec742ae2fec59a9 Mon Sep 17 00:00:00 2001 From: PraveenSinghRao Date: Thu, 2 Jun 2016 11:57:31 -0700 Subject: [PATCH] Add a callback for when memtable is moved to immutable (#1137) * Create a callback for memtable becoming immutable Create a callback for memtable becoming immutable Create a callback for memtable becoming immutable moved notification outside the lock Move sealed notification to unlocked portion of SwitchMemtable * fix lite build --- db/db_impl.cc | 34 ++++++++++++++++++++++++++++++++++ db/db_impl.h | 3 +++ db/listener_test.cc | 34 +++++++++++++++++++++++++++++++++- include/rocksdb/listener.h | 31 +++++++++++++++++++++++++++++++ 4 files changed, 101 insertions(+), 1 deletion(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index a6690e9e9..a5dacfb38 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4862,6 +4862,22 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { return Status::OK(); } +#ifndef ROCKSDB_LITE +void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* cfd, + const MemTableInfo& mem_table_info) { + if (db_options_.listeners.size() == 0U) { + return; + } + if (shutting_down_.load(std::memory_order_acquire)) { + return; + } + + for (auto listener : db_options_.listeners) { + listener->OnMemTableSealed(mem_table_info); + } +} +#endif // ROCKSDB_LITE + // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { @@ -4884,6 +4900,17 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { creating_new_log ? versions_->NewFileNumber() : logfile_number_; SuperVersion* new_superversion = nullptr; const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); + + // Set current_memtble_info for memtable sealed callback +#ifndef ROCKSDB_LITE + MemTableInfo memtable_info; + memtable_info.cf_name = cfd->GetName(); + memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber(); + memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber(); + memtable_info.num_entries = cfd->mem()->num_entries(); + memtable_info.num_deletes = cfd->mem()->num_deletes(); +#endif // ROCKSDB_LITE + mutex_.Unlock(); Status s; { @@ -4920,6 +4947,13 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); new_superversion = new SuperVersion(); } + +#ifndef ROCKSDB_LITE + // PLEASE NOTE: We assume that there are no failable operations + // after lock is acquired below since we are already notifying + // client about mem table becoming immutable. + NotifyOnMemTableSealed(cfd, memtable_info); +#endif //ROCKSDB_LITE } Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "[%s] New memtable created with log file: #%" PRIu64 diff --git a/db/db_impl.h b/db/db_impl.h index a587a5b27..70ab97c9e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -57,6 +57,7 @@ class Arena; class WriteCallback; struct JobContext; struct ExternalSstFileInfo; +struct MemTableInfo; class DBImpl : public DB { public: @@ -520,6 +521,8 @@ class DBImpl : public DB { Compaction *c, const Status &st, const CompactionJobStats& job_stats, int job_id); + void NotifyOnMemTableSealed(ColumnFamilyData* cfd, + const MemTableInfo& mem_table_info); void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const; diff --git a/db/listener_test.cc b/db/listener_test.cc index c9e1589a4..000fba683 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -723,7 +723,39 @@ TEST_F(EventListenerTest, TableFileCreationListenersTest) { dbfull()->TEST_WaitForCompact(); listener->CheckAndResetCounters(1, 1, 0, 1, 1, 1); } -} // namespace rocksdb + +class MemTableSealedListener : public EventListener { +private: + SequenceNumber latest_seq_number_; +public: + MemTableSealedListener() {} + void OnMemTableSealed(const MemTableInfo& info) override { + latest_seq_number_ = info.first_seqno; + } + + void OnFlushCompleted(DB* /*db*/, + const FlushJobInfo& flush_job_info) override { + ASSERT_LE(flush_job_info.smallest_seqno, latest_seq_number_); + } +}; + +TEST_F(EventListenerTest, MemTableSealedListenerTest) { + auto listener = std::make_shared(); + Options options; + options.create_if_missing = true; + options.listeners.push_back(listener); + DestroyAndReopen(options); + + for (unsigned int i = 0; i < 10; i++) { + std::string tag = std::to_string(i); + ASSERT_OK(Put("foo"+tag, "aaa")); + ASSERT_OK(Put("bar"+tag, "bbb")); + + ASSERT_OK(Flush()); + } +} +} // namespace rocksdb + #endif // ROCKSDB_LITE diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 9c1fa5449..32f012b77 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -151,6 +151,24 @@ struct CompactionJobInfo { CompactionJobStats stats; }; +struct MemTableInfo { + // the name of the column family to which memtable belongs + std::string cf_name; + // Sequence number of the first element that was inserted + // into the memtable. + SequenceNumber first_seqno; + // Sequence number that is guaranteed to be smaller than or equal + // to the sequence number of any key that could be inserted into this + // memtable. It can then be assumed that any write with a larger(or equal) + // sequence number will be present in this memtable or a later memtable. + SequenceNumber earliest_seqno; + // Total number of entries in memtable + uint64_t num_entries; + // Total number of deletes in memtable + uint64_t num_deletes; + +}; + // EventListener class contains a set of call-back functions that will // be called when specific RocksDB event happens such as flush. It can // be used as a building block for developing custom features such as @@ -247,6 +265,19 @@ class EventListener { // returned value. virtual void OnTableFileCreationStarted( const TableFileCreationBriefInfo& /*info*/) {} + + // A call-back function for RocksDB which will be called before + // a memtable is made immutable. + // + // Note that the this function must be implemented in a way such that + // it should not run for an extended period of time before the function + // returns. Otherwise, RocksDB may be blocked. + // + // Note that if applications would like to use the passed reference + // outside this function call, they should make copies from these + // returned value. + virtual void OnMemTableSealed( + const MemTableInfo& /*info*/) {} virtual ~EventListener() {} };