diff --git a/db/db_impl.cc b/db/db_impl.cc index 10aa6599e..30cf8e496 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1236,7 +1236,7 @@ Status DBImpl::FlushMemTableToOutputFile( GetCompressionFlush(*cfd->ioptions()), stats_, &event_logger_); - uint64_t file_number; + FileMetaData file_meta; // Within flush_job.Run, rocksdb may call event listener to notify // file creation and deletion. @@ -1244,7 +1244,7 @@ Status DBImpl::FlushMemTableToOutputFile( // Note that flush_job.Run will unlock and lock the db_mutex, // and EventListener callback will be called when the db_mutex // is unlocked by the current thread. - Status s = flush_job.Run(&file_number); + Status s = flush_job.Run(&file_meta); if (s.ok()) { InstallSuperVersionBackground(cfd, job_context, mutable_cf_options); @@ -1277,7 +1277,7 @@ Status DBImpl::FlushMemTableToOutputFile( #ifndef ROCKSDB_LITE if (s.ok()) { // may temporarily unlock and lock the mutex. - NotifyOnFlushCompleted(cfd, file_number, mutable_cf_options, + NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options, job_context->job_id); } #endif // ROCKSDB_LITE @@ -1285,7 +1285,7 @@ Status DBImpl::FlushMemTableToOutputFile( } void DBImpl::NotifyOnFlushCompleted( - ColumnFamilyData* cfd, uint64_t file_number, + ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, int job_id) { #ifndef ROCKSDB_LITE if (db_options_.listeners.size() == 0U) { @@ -1309,11 +1309,13 @@ void DBImpl::NotifyOnFlushCompleted( // TODO(yhchiang): make db_paths dynamic in case flush does not // go to L0 in the future. info.file_path = MakeTableFileName(db_options_.db_paths[0].path, - file_number); + file_meta->fd.GetNumber()); info.thread_id = env_->GetThreadID(); info.job_id = job_id; info.triggered_writes_slowdown = triggered_writes_slowdown; info.triggered_writes_stop = triggered_writes_stop; + info.smallest_seqno = file_meta->smallest_seqno; + info.largest_seqno = file_meta->largest_seqno; for (auto listener : db_options_.listeners) { listener->OnFlushCompleted(this, info); } diff --git a/db/db_impl.h b/db/db_impl.h index 5ff349926..50e470e21 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -349,7 +349,7 @@ class DBImpl : public DB { Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd, SuperVersion* super_version, Arena* arena); - void NotifyOnFlushCompleted(ColumnFamilyData* cfd, uint64_t file_number, + void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, int job_id); diff --git a/db/flush_job.cc b/db/flush_job.cc index d6a5f7867..e41f6b2a9 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -116,11 +116,11 @@ void FlushJob::RecordFlushIOStats() { IOSTATS_RESET(bytes_written); } -Status FlushJob::Run(uint64_t* file_number) { +Status FlushJob::Run(FileMetaData* file_meta) { AutoThreadOperationStageUpdater stage_run( ThreadStatus::STAGE_FLUSH_RUN); // Save the contents of the earliest memtable as a new Table - uint64_t fn; + FileMetaData meta; autovector mems; cfd_->imm()->PickMemtablesToFlush(&mems); if (mems.empty()) { @@ -143,7 +143,7 @@ Status FlushJob::Run(uint64_t* file_number) { edit->SetColumnFamily(cfd_->GetID()); // This will release and re-acquire the mutex. - Status s = WriteLevel0Table(mems, edit, &fn); + Status s = WriteLevel0Table(mems, edit, &meta); if (s.ok() && (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) { @@ -152,16 +152,17 @@ Status FlushJob::Run(uint64_t* file_number) { } if (!s.ok()) { - cfd_->imm()->RollbackMemtableFlush(mems, fn); + cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber()); } else { // Replace immutable memtable with the generated Table s = cfd_->imm()->InstallMemtableFlushResults( - cfd_, mutable_cf_options_, mems, versions_, db_mutex_, fn, - &job_context_->memtables_to_free, db_directory_, log_buffer_); + cfd_, mutable_cf_options_, mems, versions_, db_mutex_, + meta.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, + log_buffer_); } - if (s.ok() && file_number != nullptr) { - *file_number = fn; + if (s.ok() && file_meta != nullptr) { + *file_meta = meta; } RecordFlushIOStats(); @@ -180,15 +181,13 @@ Status FlushJob::Run(uint64_t* file_number) { } Status FlushJob::WriteLevel0Table(const autovector& mems, - VersionEdit* edit, uint64_t* filenumber) { + VersionEdit* edit, FileMetaData* meta) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_FLUSH_WRITE_L0); db_mutex_->AssertHeld(); const uint64_t start_micros = db_options_.env->NowMicros(); - FileMetaData meta; // path 0 for level 0 file. - meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); - *filenumber = meta.fd.GetNumber(); + meta->fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); const SequenceNumber earliest_seqno_in_memtable = mems[0]->GetFirstSequenceNumber(); @@ -229,12 +228,12 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, static_cast(memtables.size()), &arena)); Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started", - cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber()); + cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber()); TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", &output_compression_); s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_, - cfd_->table_cache(), iter.get(), &meta, + cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), newest_snapshot_, earliest_seqno_in_memtable, output_compression_, @@ -247,22 +246,22 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s" "%s", - cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber(), - meta.fd.GetFileSize(), s.ToString().c_str(), - meta.marked_for_compaction ? " (needs compaction)" : ""); + cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber(), + meta->fd.GetFileSize(), s.ToString().c_str(), + meta->marked_for_compaction ? " (needs compaction)" : ""); // output to event logger if (s.ok()) { info.db_name = dbname_; info.cf_name = cfd_->GetName(); info.file_path = TableFileName(db_options_.db_paths, - meta.fd.GetNumber(), - meta.fd.GetPathId()); - info.file_size = meta.fd.GetFileSize(); + meta->fd.GetNumber(), + meta->fd.GetPathId()); + info.file_size = meta->fd.GetFileSize(); info.job_id = job_context_->job_id; EventHelpers::LogAndNotifyTableFileCreation( event_logger_, db_options_.listeners, - meta.fd, info); + meta->fd, info); } if (!db_options_.disableDataSync && output_file_directory_ != nullptr) { @@ -278,9 +277,9 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. int level = 0; - if (s.ok() && meta.fd.GetFileSize() > 0) { - const Slice min_user_key = meta.smallest.user_key(); - const Slice max_user_key = meta.largest.user_key(); + if (s.ok() && meta->fd.GetFileSize() > 0) { + const Slice min_user_key = meta->smallest.user_key(); + const Slice max_user_key = meta->largest.user_key(); // if we have more than 1 background thread, then we cannot // insert files directly into higher levels because some other // threads could be concurrently producing compacted files for @@ -297,19 +296,19 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, level = 0; } } - edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), - meta.fd.GetFileSize(), meta.smallest, meta.largest, - meta.smallest_seqno, meta.largest_seqno, - meta.marked_for_compaction); + edit->AddFile(level, meta->fd.GetNumber(), meta->fd.GetPathId(), + meta->fd.GetFileSize(), meta->smallest, meta->largest, + meta->smallest_seqno, meta->largest_seqno, + meta->marked_for_compaction); } InternalStats::CompactionStats stats(1); stats.micros = db_options_.env->NowMicros() - start_micros; - stats.bytes_written = meta.fd.GetFileSize(); + stats.bytes_written = meta->fd.GetFileSize(); cfd_->internal_stats()->AddCompactionStats(level, stats); cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, - meta.fd.GetFileSize()); - RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize()); + meta->fd.GetFileSize()); + RecordTick(stats_, COMPACT_WRITE_BYTES, meta->fd.GetFileSize()); return s; } diff --git a/db/flush_job.h b/db/flush_job.h index c504b14fd..db2a49cda 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -64,14 +64,14 @@ class FlushJob { ~FlushJob(); - Status Run(uint64_t* file_number = nullptr); + Status Run(FileMetaData* file_meta = nullptr); private: void ReportStartedFlush(); void ReportFlushInputSize(const autovector& mems); void RecordFlushIOStats(); Status WriteLevel0Table(const autovector& mems, VersionEdit* edit, - uint64_t* filenumber); + FileMetaData* meta); const std::string& dbname_; ColumnFamilyData* cfd_; const DBOptions& db_options_; diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 1cd0e46d1..f693d5c9b 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -68,6 +68,10 @@ struct FlushJobInfo { // files in level 0. Compactions should try to compact L0 files down // to lower levels as soon as possible. bool triggered_writes_stop; + // The smallest sequence number in the newly created file + SequenceNumber smallest_seqno; + // The largest sequence number in the newly created file + SequenceNumber largest_seqno; }; struct CompactionJobInfo {