Add largest sequence to FlushJobInfo

Summary:
Adding largest sequence number to FlushJobInfo
and passing flushed file metadata to NotifyOnFlushCompleted which include alot of other values that we may want to expose in FlushJobInfo

Test Plan: make check

Reviewers: igor, sdong

Reviewed By: sdong

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D39927
main
Islam AbdelRahman 10 years ago
parent ab455ce495
commit d6ce0f7c61
  1. 12
      db/db_impl.cc
  2. 2
      db/db_impl.h
  3. 61
      db/flush_job.cc
  4. 4
      db/flush_job.h
  5. 4
      include/rocksdb/listener.h

@ -1236,7 +1236,7 @@ Status DBImpl::FlushMemTableToOutputFile(
GetCompressionFlush(*cfd->ioptions()), stats_, GetCompressionFlush(*cfd->ioptions()), stats_,
&event_logger_); &event_logger_);
uint64_t file_number; FileMetaData file_meta;
// Within flush_job.Run, rocksdb may call event listener to notify // Within flush_job.Run, rocksdb may call event listener to notify
// file creation and deletion. // file creation and deletion.
@ -1244,7 +1244,7 @@ Status DBImpl::FlushMemTableToOutputFile(
// Note that flush_job.Run will unlock and lock the db_mutex, // Note that flush_job.Run will unlock and lock the db_mutex,
// and EventListener callback will be called when the db_mutex // and EventListener callback will be called when the db_mutex
// is unlocked by the current thread. // is unlocked by the current thread.
Status s = flush_job.Run(&file_number); Status s = flush_job.Run(&file_meta);
if (s.ok()) { if (s.ok()) {
InstallSuperVersionBackground(cfd, job_context, mutable_cf_options); InstallSuperVersionBackground(cfd, job_context, mutable_cf_options);
@ -1277,7 +1277,7 @@ Status DBImpl::FlushMemTableToOutputFile(
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (s.ok()) { if (s.ok()) {
// may temporarily unlock and lock the mutex. // may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, file_number, mutable_cf_options, NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options,
job_context->job_id); job_context->job_id);
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
@ -1285,7 +1285,7 @@ Status DBImpl::FlushMemTableToOutputFile(
} }
void DBImpl::NotifyOnFlushCompleted( void DBImpl::NotifyOnFlushCompleted(
ColumnFamilyData* cfd, uint64_t file_number, ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options, int job_id) { const MutableCFOptions& mutable_cf_options, int job_id) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (db_options_.listeners.size() == 0U) { if (db_options_.listeners.size() == 0U) {
@ -1309,11 +1309,13 @@ void DBImpl::NotifyOnFlushCompleted(
// TODO(yhchiang): make db_paths dynamic in case flush does not // TODO(yhchiang): make db_paths dynamic in case flush does not
// go to L0 in the future. // go to L0 in the future.
info.file_path = MakeTableFileName(db_options_.db_paths[0].path, info.file_path = MakeTableFileName(db_options_.db_paths[0].path,
file_number); file_meta->fd.GetNumber());
info.thread_id = env_->GetThreadID(); info.thread_id = env_->GetThreadID();
info.job_id = job_id; info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown; info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop; info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->smallest_seqno;
info.largest_seqno = file_meta->largest_seqno;
for (auto listener : db_options_.listeners) { for (auto listener : db_options_.listeners) {
listener->OnFlushCompleted(this, info); listener->OnFlushCompleted(this, info);
} }

@ -349,7 +349,7 @@ class DBImpl : public DB {
Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd, Iterator* NewInternalIterator(const ReadOptions&, ColumnFamilyData* cfd,
SuperVersion* super_version, Arena* arena); SuperVersion* super_version, Arena* arena);
void NotifyOnFlushCompleted(ColumnFamilyData* cfd, uint64_t file_number, void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
int job_id); int job_id);

@ -116,11 +116,11 @@ void FlushJob::RecordFlushIOStats() {
IOSTATS_RESET(bytes_written); IOSTATS_RESET(bytes_written);
} }
Status FlushJob::Run(uint64_t* file_number) { Status FlushJob::Run(FileMetaData* file_meta) {
AutoThreadOperationStageUpdater stage_run( AutoThreadOperationStageUpdater stage_run(
ThreadStatus::STAGE_FLUSH_RUN); ThreadStatus::STAGE_FLUSH_RUN);
// Save the contents of the earliest memtable as a new Table // Save the contents of the earliest memtable as a new Table
uint64_t fn; FileMetaData meta;
autovector<MemTable*> mems; autovector<MemTable*> mems;
cfd_->imm()->PickMemtablesToFlush(&mems); cfd_->imm()->PickMemtablesToFlush(&mems);
if (mems.empty()) { if (mems.empty()) {
@ -143,7 +143,7 @@ Status FlushJob::Run(uint64_t* file_number) {
edit->SetColumnFamily(cfd_->GetID()); edit->SetColumnFamily(cfd_->GetID());
// This will release and re-acquire the mutex. // This will release and re-acquire the mutex.
Status s = WriteLevel0Table(mems, edit, &fn); Status s = WriteLevel0Table(mems, edit, &meta);
if (s.ok() && if (s.ok() &&
(shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) { (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
@ -152,16 +152,17 @@ Status FlushJob::Run(uint64_t* file_number) {
} }
if (!s.ok()) { if (!s.ok()) {
cfd_->imm()->RollbackMemtableFlush(mems, fn); cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber());
} else { } else {
// Replace immutable memtable with the generated Table // Replace immutable memtable with the generated Table
s = cfd_->imm()->InstallMemtableFlushResults( s = cfd_->imm()->InstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems, versions_, db_mutex_, fn, cfd_, mutable_cf_options_, mems, versions_, db_mutex_,
&job_context_->memtables_to_free, db_directory_, log_buffer_); meta.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_);
} }
if (s.ok() && file_number != nullptr) { if (s.ok() && file_meta != nullptr) {
*file_number = fn; *file_meta = meta;
} }
RecordFlushIOStats(); RecordFlushIOStats();
@ -180,15 +181,13 @@ Status FlushJob::Run(uint64_t* file_number) {
} }
Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems, Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber) { VersionEdit* edit, FileMetaData* meta) {
AutoThreadOperationStageUpdater stage_updater( AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_FLUSH_WRITE_L0); ThreadStatus::STAGE_FLUSH_WRITE_L0);
db_mutex_->AssertHeld(); db_mutex_->AssertHeld();
const uint64_t start_micros = db_options_.env->NowMicros(); const uint64_t start_micros = db_options_.env->NowMicros();
FileMetaData meta;
// path 0 for level 0 file. // path 0 for level 0 file.
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); meta->fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
*filenumber = meta.fd.GetNumber();
const SequenceNumber earliest_seqno_in_memtable = const SequenceNumber earliest_seqno_in_memtable =
mems[0]->GetFirstSequenceNumber(); mems[0]->GetFirstSequenceNumber();
@ -229,12 +228,12 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
static_cast<int>(memtables.size()), &arena)); static_cast<int>(memtables.size()), &arena));
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started", "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber()); cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber());
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
&output_compression_); &output_compression_);
s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_, s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
cfd_->table_cache(), iter.get(), &meta, cfd_->table_cache(), iter.get(), meta,
cfd_->internal_comparator(), cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), newest_snapshot_, cfd_->int_tbl_prop_collector_factories(), newest_snapshot_,
earliest_seqno_in_memtable, output_compression_, earliest_seqno_in_memtable, output_compression_,
@ -247,22 +246,22 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
" bytes %s" " bytes %s"
"%s", "%s",
cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber(), cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber(),
meta.fd.GetFileSize(), s.ToString().c_str(), meta->fd.GetFileSize(), s.ToString().c_str(),
meta.marked_for_compaction ? " (needs compaction)" : ""); meta->marked_for_compaction ? " (needs compaction)" : "");
// output to event logger // output to event logger
if (s.ok()) { if (s.ok()) {
info.db_name = dbname_; info.db_name = dbname_;
info.cf_name = cfd_->GetName(); info.cf_name = cfd_->GetName();
info.file_path = TableFileName(db_options_.db_paths, info.file_path = TableFileName(db_options_.db_paths,
meta.fd.GetNumber(), meta->fd.GetNumber(),
meta.fd.GetPathId()); meta->fd.GetPathId());
info.file_size = meta.fd.GetFileSize(); info.file_size = meta->fd.GetFileSize();
info.job_id = job_context_->job_id; info.job_id = job_context_->job_id;
EventHelpers::LogAndNotifyTableFileCreation( EventHelpers::LogAndNotifyTableFileCreation(
event_logger_, db_options_.listeners, event_logger_, db_options_.listeners,
meta.fd, info); meta->fd, info);
} }
if (!db_options_.disableDataSync && output_file_directory_ != nullptr) { if (!db_options_.disableDataSync && output_file_directory_ != nullptr) {
@ -278,9 +277,9 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
// Note that if file_size is zero, the file has been deleted and // Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest. // should not be added to the manifest.
int level = 0; int level = 0;
if (s.ok() && meta.fd.GetFileSize() > 0) { if (s.ok() && meta->fd.GetFileSize() > 0) {
const Slice min_user_key = meta.smallest.user_key(); const Slice min_user_key = meta->smallest.user_key();
const Slice max_user_key = meta.largest.user_key(); const Slice max_user_key = meta->largest.user_key();
// if we have more than 1 background thread, then we cannot // if we have more than 1 background thread, then we cannot
// insert files directly into higher levels because some other // insert files directly into higher levels because some other
// threads could be concurrently producing compacted files for // threads could be concurrently producing compacted files for
@ -297,19 +296,19 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
level = 0; level = 0;
} }
} }
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), edit->AddFile(level, meta->fd.GetNumber(), meta->fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest, meta->fd.GetFileSize(), meta->smallest, meta->largest,
meta.smallest_seqno, meta.largest_seqno, meta->smallest_seqno, meta->largest_seqno,
meta.marked_for_compaction); meta->marked_for_compaction);
} }
InternalStats::CompactionStats stats(1); InternalStats::CompactionStats stats(1);
stats.micros = db_options_.env->NowMicros() - start_micros; stats.micros = db_options_.env->NowMicros() - start_micros;
stats.bytes_written = meta.fd.GetFileSize(); stats.bytes_written = meta->fd.GetFileSize();
cfd_->internal_stats()->AddCompactionStats(level, stats); cfd_->internal_stats()->AddCompactionStats(level, stats);
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
meta.fd.GetFileSize()); meta->fd.GetFileSize());
RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize()); RecordTick(stats_, COMPACT_WRITE_BYTES, meta->fd.GetFileSize());
return s; return s;
} }

@ -64,14 +64,14 @@ class FlushJob {
~FlushJob(); ~FlushJob();
Status Run(uint64_t* file_number = nullptr); Status Run(FileMetaData* file_meta = nullptr);
private: private:
void ReportStartedFlush(); void ReportStartedFlush();
void ReportFlushInputSize(const autovector<MemTable*>& mems); void ReportFlushInputSize(const autovector<MemTable*>& mems);
void RecordFlushIOStats(); void RecordFlushIOStats();
Status WriteLevel0Table(const autovector<MemTable*>& mems, VersionEdit* edit, Status WriteLevel0Table(const autovector<MemTable*>& mems, VersionEdit* edit,
uint64_t* filenumber); FileMetaData* meta);
const std::string& dbname_; const std::string& dbname_;
ColumnFamilyData* cfd_; ColumnFamilyData* cfd_;
const DBOptions& db_options_; const DBOptions& db_options_;

@ -68,6 +68,10 @@ struct FlushJobInfo {
// files in level 0. Compactions should try to compact L0 files down // files in level 0. Compactions should try to compact L0 files down
// to lower levels as soon as possible. // to lower levels as soon as possible.
bool triggered_writes_stop; bool triggered_writes_stop;
// The smallest sequence number in the newly created file
SequenceNumber smallest_seqno;
// The largest sequence number in the newly created file
SequenceNumber largest_seqno;
}; };
struct CompactionJobInfo { struct CompactionJobInfo {

Loading…
Cancel
Save