Fix OnFlushCompleted fired before flush result write to MANIFEST (#5908)

Summary:
When there are concurrent flush job on the same CF, `OnFlushCompleted` can be called before the flush result being install to LSM. Fixing the issue by passing `FlushJobInfo` through `MemTable`, and the thread who commit the flush result can fetch the `FlushJobInfo` and fire `OnFlushCompleted` on behave of the thread actually writing the SST.

Fix https://github.com/facebook/rocksdb/issues/5892
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5908

Test Plan: Add new test. The test will fail without the fix.

Differential Revision: D17916144

Pulled By: riversand963

fbshipit-source-id: e18df67d9533b5baee52ae3605026cdeb05cbe10
main
Yi Wu 5 years ago committed by Facebook Github Bot
parent 2c9e9f2a59
commit 1f9d7c0f54
  1. 1
      HISTORY.md
  2. 96
      db/db_flush_test.cc
  3. 8
      db/db_impl/db_impl.h
  4. 77
      db/db_impl/db_impl_compaction_flush.cc
  5. 26
      db/flush_job.cc
  6. 18
      db/flush_job.h
  7. 12
      db/flush_job_test.cc
  8. 16
      db/memtable.h
  9. 11
      db/memtable_list.cc
  10. 5
      db/memtable_list.h
  11. 6
      db/memtable_list_test.cc
  12. 1
      include/rocksdb/listener.h

@ -9,6 +9,7 @@
* Revert the feature "Merging iterator to avoid child iterator reseek for some cases (#5286)" since it might cause strong results when reseek happens with a different iterator upper bound. * Revert the feature "Merging iterator to avoid child iterator reseek for some cases (#5286)" since it might cause strong results when reseek happens with a different iterator upper bound.
* Fix a bug causing a crash during ingest external file when background compaction cause severe error (file not found). * Fix a bug causing a crash during ingest external file when background compaction cause severe error (file not found).
* Fix a bug when partitioned filters and prefix search are used in conjunction, ::SeekForPrev could return invalid for an existing prefix. ::SeekForPrev might be called by the user, or internally on ::Prev, or within ::Seek if the return value involves Delete or a Merge operand. * Fix a bug when partitioned filters and prefix search are used in conjunction, ::SeekForPrev could return invalid for an existing prefix. ::SeekForPrev might be called by the user, or internally on ::Prev, or within ::Seek if the return value involves Delete or a Merge operand.
* Fix OnFlushCompleted fired before flush result persisted in MANIFEST when there's concurrent flush job. The bug exists since OnFlushCompleted was introduced in rocksdb 3.8.
### New Features ### New Features
* Introduced DBOptions::max_write_batch_group_size_bytes to configure maximum limit on number of bytes that are written in a single batch of WAL or memtable write. It is followed when the leader write size is larger than 1/8 of this limit. * Introduced DBOptions::max_write_batch_group_size_bytes to configure maximum limit on number of bytes that are written in a single batch of WAL or memtable write. It is followed when the leader write size is larger than 1/8 of this limit.
* VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting. * VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting.

@ -7,10 +7,16 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <atomic>
#include "db/db_impl/db_impl.h"
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "test_util/fault_injection_test_env.h" #include "test_util/fault_injection_test_env.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/mutexlock.h"
namespace rocksdb { namespace rocksdb {
@ -323,6 +329,96 @@ TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) {
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();
} }
#ifndef ROCKSDB_LITE
TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
class TestListener : public EventListener {
public:
void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
// There's only one key in each flush.
ASSERT_EQ(info.smallest_seqno, info.largest_seqno);
ASSERT_NE(0, info.smallest_seqno);
if (info.smallest_seqno == seq1) {
// First flush completed
ASSERT_FALSE(completed1);
completed1 = true;
CheckFlushResultCommitted(db, seq1);
} else {
// Second flush completed
ASSERT_FALSE(completed2);
completed2 = true;
ASSERT_EQ(info.smallest_seqno, seq2);
CheckFlushResultCommitted(db, seq2);
}
}
void CheckFlushResultCommitted(DB* db, SequenceNumber seq) {
DBImpl* db_impl = static_cast_with_check<DBImpl>(db);
InstrumentedMutex* mutex = db_impl->mutex();
mutex->Lock();
auto* cfd =
reinterpret_cast<ColumnFamilyHandleImpl*>(db->DefaultColumnFamily())
->cfd();
ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber());
mutex->Unlock();
}
std::atomic<SequenceNumber> seq1{0};
std::atomic<SequenceNumber> seq2{0};
std::atomic<bool> completed1{false};
std::atomic<bool> completed2{false};
};
std::shared_ptr<TestListener> listener = std::make_shared<TestListener>();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::FlushMemTable:AfterScheduleFlush",
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},
{"DBImpl::FlushMemTableToOutputFile:Finish",
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table", [&listener](void* arg) {
// Wait for the second flush finished, out of mutex.
auto* mems = reinterpret_cast<autovector<MemTable*>*>(arg);
if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) {
TEST_SYNC_POINT(
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:"
"WaitSecond");
}
});
Options options = CurrentOptions();
options.create_if_missing = true;
options.listeners.push_back(listener);
// Setting max_flush_jobs = max_background_jobs / 4 = 2.
options.max_background_jobs = 8;
// Allow 2 immutable memtables.
options.max_write_buffer_number = 3;
Reopen(options);
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("foo", "v"));
listener->seq1 = db_->GetLatestSequenceNumber();
// t1 will wait for the second flush complete before committing flush result.
auto t1 = port::Thread([&]() {
// flush_opts.wait = true
ASSERT_OK(db_->Flush(FlushOptions()));
});
// Wait for first flush scheduled.
TEST_SYNC_POINT(
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst");
// The second flush will exit early without commit its result. The work
// is delegated to the first flush.
ASSERT_OK(Put("bar", "v"));
listener->seq2 = db_->GetLatestSequenceNumber();
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(db_->Flush(flush_opts));
t1.join();
ASSERT_TRUE(listener->completed1);
ASSERT_TRUE(listener->completed2);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
#endif // !ROCKSDB_LITE
TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.create_if_missing = true; options.create_if_missing = true;

@ -1005,11 +1005,11 @@ class DBImpl : public DB {
void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop); int job_id);
void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta, void NotifyOnFlushCompleted(
const MutableCFOptions& mutable_cf_options, ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop); std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info);
void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
const Status& st, const Status& st,

@ -164,8 +164,7 @@ Status DBImpl::FlushMemTableToOutputFile(
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex. // may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id, NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
flush_job.GetTableProperties());
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
Status s; Status s;
@ -213,8 +212,8 @@ Status DBImpl::FlushMemTableToOutputFile(
if (s.ok()) { if (s.ok()) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex. // may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options, NotifyOnFlushCompleted(cfd, mutable_cf_options,
job_context->job_id, flush_job.GetTableProperties()); flush_job.GetCommittedFlushJobsInfo());
auto sfm = static_cast<SstFileManagerImpl*>( auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get()); immutable_db_options_.sst_file_manager.get());
if (sfm) { if (sfm) {
@ -233,6 +232,7 @@ Status DBImpl::FlushMemTableToOutputFile(
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
return s; return s;
} }
@ -303,7 +303,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
autovector<Directory*> distinct_output_dirs; autovector<Directory*> distinct_output_dirs;
autovector<std::string> distinct_output_dir_paths; autovector<std::string> distinct_output_dir_paths;
std::vector<FlushJob> jobs; std::vector<std::unique_ptr<FlushJob>> jobs;
std::vector<MutableCFOptions> all_mutable_cf_options; std::vector<MutableCFOptions> all_mutable_cf_options;
int num_cfs = static_cast<int>(cfds.size()); int num_cfs = static_cast<int>(cfds.size());
all_mutable_cf_options.reserve(num_cfs); all_mutable_cf_options.reserve(num_cfs);
@ -330,7 +330,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions()); all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back(); const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_); const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_);
jobs.emplace_back( jobs.emplace_back(new FlushJob(
dbname_, cfd, immutable_db_options_, mutable_cf_options, dbname_, cfd, immutable_db_options_, mutable_cf_options,
max_memtable_id, env_options_for_compaction_, versions_.get(), &mutex_, max_memtable_id, env_options_for_compaction_, versions_.get(), &mutex_,
&shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
@ -338,8 +338,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */, false /* sync_output_directory */, false /* write_manifest */,
thread_pri); thread_pri));
jobs.back().PickMemTable(); jobs.back()->PickMemTable();
} }
std::vector<FileMetaData> file_meta(num_cfs); std::vector<FileMetaData> file_meta(num_cfs);
@ -351,7 +351,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i); const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
// may temporarily unlock and lock the mutex. // may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options, NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
job_context->job_id, jobs[i].GetTableProperties()); job_context->job_id);
} }
#endif /* !ROCKSDB_LITE */ #endif /* !ROCKSDB_LITE */
@ -373,7 +373,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// TODO (yanqin): parallelize jobs with threads. // TODO (yanqin): parallelize jobs with threads.
for (int i = 1; i != num_cfs; ++i) { for (int i = 1; i != num_cfs; ++i) {
exec_status[i].second = exec_status[i].second =
jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]); jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
exec_status[i].first = true; exec_status[i].first = true;
} }
if (num_cfs > 1) { if (num_cfs > 1) {
@ -382,8 +382,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
TEST_SYNC_POINT( TEST_SYNC_POINT(
"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"); "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
} }
assert(exec_status.size() > 0);
assert(!file_meta.empty());
exec_status[0].second = exec_status[0].second =
jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]); jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
exec_status[0].first = true; exec_status[0].first = true;
Status error_status; Status error_status;
@ -424,7 +426,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
auto wait_to_install_func = [&]() { auto wait_to_install_func = [&]() {
bool ready = true; bool ready = true;
for (size_t i = 0; i != cfds.size(); ++i) { for (size_t i = 0; i != cfds.size(); ++i) {
const auto& mems = jobs[i].GetMemTables(); const auto& mems = jobs[i]->GetMemTables();
if (cfds[i]->IsDropped()) { if (cfds[i]->IsDropped()) {
// If the column family is dropped, then do not wait. // If the column family is dropped, then do not wait.
continue; continue;
@ -465,7 +467,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
autovector<const MutableCFOptions*> mutable_cf_options_list; autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<FileMetaData*> tmp_file_meta; autovector<FileMetaData*> tmp_file_meta;
for (int i = 0; i != num_cfs; ++i) { for (int i = 0; i != num_cfs; ++i) {
const auto& mems = jobs[i].GetMemTables(); const auto& mems = jobs[i]->GetMemTables();
if (!cfds[i]->IsDropped() && !mems.empty()) { if (!cfds[i]->IsDropped() && !mems.empty()) {
tmp_cfds.emplace_back(cfds[i]); tmp_cfds.emplace_back(cfds[i]);
mems_list.emplace_back(&mems); mems_list.emplace_back(&mems);
@ -501,12 +503,13 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
auto sfm = static_cast<SstFileManagerImpl*>( auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get()); immutable_db_options_.sst_file_manager.get());
assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
for (int i = 0; i != num_cfs; ++i) { for (int i = 0; i != num_cfs; ++i) {
if (cfds[i]->IsDropped()) { if (cfds[i]->IsDropped()) {
continue; continue;
} }
NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i], NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
job_context->job_id, jobs[i].GetTableProperties()); jobs[i]->GetCommittedFlushJobsInfo());
if (sfm) { if (sfm) {
std::string file_path = MakeTableFileName( std::string file_path = MakeTableFileName(
cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber()); cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
@ -530,12 +533,12 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// unref the versions. // unref the versions.
for (int i = 0; i != num_cfs; ++i) { for (int i = 0; i != num_cfs; ++i) {
if (!exec_status[i].first) { if (!exec_status[i].first) {
jobs[i].Cancel(); jobs[i]->Cancel();
} }
} }
for (int i = 0; i != num_cfs; ++i) { for (int i = 0; i != num_cfs; ++i) {
if (exec_status[i].first && exec_status[i].second.ok()) { if (exec_status[i].first && exec_status[i].second.ok()) {
auto& mems = jobs[i].GetMemTables(); auto& mems = jobs[i]->GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(mems, cfds[i]->imm()->RollbackMemtableFlush(mems,
file_meta[i].fd.GetNumber()); file_meta[i].fd.GetNumber());
} }
@ -549,7 +552,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) { int job_id) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (immutable_db_options_.listeners.size() == 0U) { if (immutable_db_options_.listeners.size() == 0U) {
return; return;
@ -580,7 +583,6 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
info.triggered_writes_stop = triggered_writes_stop; info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->fd.smallest_seqno; info.smallest_seqno = file_meta->fd.smallest_seqno;
info.largest_seqno = file_meta->fd.largest_seqno; info.largest_seqno = file_meta->fd.largest_seqno;
info.table_properties = prop;
info.flush_reason = cfd->GetFlushReason(); info.flush_reason = cfd->GetFlushReason();
for (auto listener : immutable_db_options_.listeners) { for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushBegin(this, info); listener->OnFlushBegin(this, info);
@ -594,15 +596,14 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
(void)file_meta; (void)file_meta;
(void)mutable_cf_options; (void)mutable_cf_options;
(void)job_id; (void)job_id;
(void)prop;
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }
void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, void DBImpl::NotifyOnFlushCompleted(
FileMetaData* file_meta, ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const MutableCFOptions& mutable_cf_options, std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) {
int job_id, TableProperties prop) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
assert(flush_jobs_info != nullptr);
if (immutable_db_options_.listeners.size() == 0U) { if (immutable_db_options_.listeners.size() == 0U) {
return; return;
} }
@ -619,34 +620,22 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
// release lock while notifying events // release lock while notifying events
mutex_.Unlock(); mutex_.Unlock();
{ {
FlushJobInfo info; for (auto& info : *flush_jobs_info) {
info.cf_id = cfd->GetID(); info->triggered_writes_slowdown = triggered_writes_slowdown;
info.cf_name = cfd->GetName(); info->triggered_writes_stop = triggered_writes_stop;
// TODO(yhchiang): make db_paths dynamic in case flush does not for (auto listener : immutable_db_options_.listeners) {
// go to L0 in the future. listener->OnFlushCompleted(this, *info);
info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path, }
file_meta->fd.GetNumber());
info.thread_id = env_->GetThreadID();
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->fd.smallest_seqno;
info.largest_seqno = file_meta->fd.largest_seqno;
info.table_properties = prop;
info.flush_reason = cfd->GetFlushReason();
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushCompleted(this, info);
} }
flush_jobs_info->clear();
} }
mutex_.Lock(); mutex_.Lock();
// no need to signal bg_cv_ as it will be signaled at the end of the // no need to signal bg_cv_ as it will be signaled at the end of the
// flush process. // flush process.
#else #else
(void)cfd; (void)cfd;
(void)file_meta;
(void)mutable_cf_options; (void)mutable_cf_options;
(void)job_id; (void)flush_jobs_info;
(void)prop;
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }

@ -241,7 +241,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
s = cfd_->imm()->TryInstallMemtableFlushResults( s = cfd_->imm()->TryInstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_, cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_); log_buffer_, &committed_flush_jobs_info_);
} }
if (s.ok() && file_meta != nullptr) { if (s.ok() && file_meta != nullptr) {
@ -392,7 +392,7 @@ Status FlushJob::WriteLevel0Table() {
if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) { if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
s = output_file_directory_->Fsync(); s = output_file_directory_->Fsync();
} }
TEST_SYNC_POINT("FlushJob::WriteLevel0Table"); TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
db_mutex_->Lock(); db_mutex_->Lock();
} }
base_->Unref(); base_->Unref();
@ -410,6 +410,10 @@ Status FlushJob::WriteLevel0Table() {
meta_.fd.smallest_seqno, meta_.fd.largest_seqno, meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
meta_.marked_for_compaction, meta_.oldest_blob_file_number); meta_.marked_for_compaction, meta_.oldest_blob_file_number);
} }
#ifndef ROCKSDB_LITE
// Piggyback FlushJobInfo on the first first flushed memtable.
mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
#endif // !ROCKSDB_LITE
// Note that here we treat flush as level 0 compaction in internal stats // Note that here we treat flush as level 0 compaction in internal stats
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
@ -424,4 +428,22 @@ Status FlushJob::WriteLevel0Table() {
return s; return s;
} }
#ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
db_mutex_->AssertHeld();
std::unique_ptr<FlushJobInfo> info(new FlushJobInfo);
info->cf_id = cfd_->GetID();
info->cf_name = cfd_->GetName();
info->file_path = MakeTableFileName(cfd_->ioptions()->cf_paths[0].path,
meta_.fd.GetNumber());
info->thread_id = db_options_.env->GetThreadID();
info->job_id = job_context_->job_id;
info->smallest_seqno = meta_.fd.smallest_seqno;
info->largest_seqno = meta_.fd.largest_seqno;
info->table_properties = table_properties_;
info->flush_reason = cfd_->GetFlushReason();
return info;
}
#endif // !ROCKSDB_LITE
} // namespace rocksdb } // namespace rocksdb

@ -11,10 +11,11 @@
#include <atomic> #include <atomic>
#include <deque> #include <deque>
#include <limits> #include <limits>
#include <list>
#include <set> #include <set>
#include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include <string>
#include "db/column_family.h" #include "db/column_family.h"
#include "db/dbformat.h" #include "db/dbformat.h"
@ -34,6 +35,7 @@
#include "port/port.h" #include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/listener.h"
#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h"
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "table/scoped_arena_iterator.h" #include "table/scoped_arena_iterator.h"
@ -79,14 +81,22 @@ class FlushJob {
Status Run(LogsWithPrepTracker* prep_tracker = nullptr, Status Run(LogsWithPrepTracker* prep_tracker = nullptr,
FileMetaData* file_meta = nullptr); FileMetaData* file_meta = nullptr);
void Cancel(); void Cancel();
TableProperties GetTableProperties() const { return table_properties_; }
const autovector<MemTable*>& GetMemTables() const { return mems_; } const autovector<MemTable*>& GetMemTables() const { return mems_; }
#ifndef ROCKSDB_LITE
std::list<std::unique_ptr<FlushJobInfo>>* GetCommittedFlushJobsInfo() {
return &committed_flush_jobs_info_;
}
#endif // !ROCKSDB_LITE
private: private:
void ReportStartedFlush(); void ReportStartedFlush();
void ReportFlushInputSize(const autovector<MemTable*>& mems); void ReportFlushInputSize(const autovector<MemTable*>& mems);
void RecordFlushIOStats(); void RecordFlushIOStats();
Status WriteLevel0Table(); Status WriteLevel0Table();
#ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
#endif // !ROCKSDB_LITE
const std::string& dbname_; const std::string& dbname_;
ColumnFamilyData* cfd_; ColumnFamilyData* cfd_;
@ -131,6 +141,10 @@ class FlushJob {
// In this case, only after all flush jobs succeed in flush can RocksDB // In this case, only after all flush jobs succeed in flush can RocksDB
// commit to the MANIFEST. // commit to the MANIFEST.
const bool write_manifest_; const bool write_manifest_;
// The current flush job can commit flush result of a concurrent flush job.
// We collect FlushJobInfo of all jobs committed by current job and fire
// OnFlushCompleted for them.
std::list<std::unique_ptr<FlushJobInfo>> committed_flush_jobs_info_;
// Variables below are set by PickMemTable(): // Variables below are set by PickMemTable():
FileMetaData meta_; FileMetaData meta_;

@ -347,18 +347,18 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
EventLogger event_logger(db_options_.info_log.get()); EventLogger event_logger(db_options_.info_log.get());
SnapshotChecker* snapshot_checker = nullptr; // not relevant SnapshotChecker* snapshot_checker = nullptr; // not relevant
std::vector<FlushJob> flush_jobs; std::vector<std::unique_ptr<FlushJob>> flush_jobs;
k = 0; k = 0;
for (auto cfd : all_cfds) { for (auto cfd : all_cfds) {
std::vector<SequenceNumber> snapshot_seqs; std::vector<SequenceNumber> snapshot_seqs;
flush_jobs.emplace_back( flush_jobs.emplace_back(new FlushJob(
dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
&memtable_ids[k], env_options_, versions_.get(), &mutex_, &memtable_ids[k], env_options_, versions_.get(), &mutex_,
&shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker, &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker,
&job_context, nullptr, nullptr, nullptr, kNoCompression, &job_context, nullptr, nullptr, nullptr, kNoCompression,
db_options_.statistics.get(), &event_logger, true, db_options_.statistics.get(), &event_logger, true,
false /* sync_output_directory */, false /* write_manifest */, false /* sync_output_directory */, false /* write_manifest */,
Env::Priority::USER); Env::Priority::USER));
k++; k++;
} }
HistogramData hist; HistogramData hist;
@ -367,12 +367,12 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
file_metas.reserve(flush_jobs.size()); file_metas.reserve(flush_jobs.size());
mutex_.Lock(); mutex_.Lock();
for (auto& job : flush_jobs) { for (auto& job : flush_jobs) {
job.PickMemTable(); job->PickMemTable();
} }
for (auto& job : flush_jobs) { for (auto& job : flush_jobs) {
FileMetaData meta; FileMetaData meta;
// Run will release and re-acquire mutex // Run will release and re-acquire mutex
ASSERT_OK(job.Run(nullptr /**/, &meta)); ASSERT_OK(job->Run(nullptr /**/, &meta));
file_metas.emplace_back(meta); file_metas.emplace_back(meta);
} }
autovector<FileMetaData*> file_meta_ptrs; autovector<FileMetaData*> file_meta_ptrs;
@ -381,7 +381,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
} }
autovector<const autovector<MemTable*>*> mems_list; autovector<const autovector<MemTable*>*> mems_list;
for (size_t i = 0; i != all_cfds.size(); ++i) { for (size_t i = 0; i != all_cfds.size(); ++i) {
const auto& mems = flush_jobs[i].GetMemTables(); const auto& mems = flush_jobs[i]->GetMemTables();
mems_list.push_back(&mems); mems_list.push_back(&mems);
} }
autovector<const MutableCFOptions*> mutable_cf_options_list; autovector<const MutableCFOptions*> mutable_cf_options_list;

@ -32,6 +32,7 @@
namespace rocksdb { namespace rocksdb {
struct FlushJobInfo;
class Mutex; class Mutex;
class MemTableIterator; class MemTableIterator;
class MergeContext; class MergeContext;
@ -423,6 +424,16 @@ class MemTable {
flush_in_progress_ = in_progress; flush_in_progress_ = in_progress;
} }
#ifndef ROCKSDB_LITE
void SetFlushJobInfo(std::unique_ptr<FlushJobInfo>&& info) {
flush_job_info_ = std::move(info);
}
std::unique_ptr<FlushJobInfo> ReleaseFlushJobInfo() {
return std::move(flush_job_info_);
}
#endif // !ROCKSDB_LITE
private: private:
enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
@ -505,6 +516,11 @@ class MemTable {
// Gets refrshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow` // Gets refrshed inside `ApproximateMemoryUsage()` or `ShouldFlushNow`
std::atomic<uint64_t> approximate_memory_usage_; std::atomic<uint64_t> approximate_memory_usage_;
#ifndef ROCKSDB_LITE
// Flush job info of the current memtable.
std::unique_ptr<FlushJobInfo> flush_job_info_;
#endif // !ROCKSDB_LITE
// Returns a heuristic flush decision // Returns a heuristic flush decision
bool ShouldFlushNow(); bool ShouldFlushNow();

@ -385,7 +385,8 @@ Status MemTableList::TryInstallMemtableFlushResults(
const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker, const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker,
VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
autovector<MemTable*>* to_delete, Directory* db_directory, autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer) { LogBuffer* log_buffer,
std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info) {
AutoThreadOperationStageUpdater stage_updater( AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
mu->AssertHeld(); mu->AssertHeld();
@ -443,6 +444,14 @@ Status MemTableList::TryInstallMemtableFlushResults(
cfd->GetName().c_str(), m->file_number_); cfd->GetName().c_str(), m->file_number_);
edit_list.push_back(&m->edit_); edit_list.push_back(&m->edit_);
memtables_to_flush.push_back(m); memtables_to_flush.push_back(m);
#ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> info = m->ReleaseFlushJobInfo();
if (info != nullptr) {
committed_flush_jobs_info->push_back(std::move(info));
}
#else
(void)committed_flush_jobs_info;
#endif // !ROCKSDB_LITE
} }
batch_count++; batch_count++;
} }

@ -33,6 +33,8 @@ class InstrumentedMutex;
class MergeIteratorBuilder; class MergeIteratorBuilder;
class MemTableList; class MemTableList;
struct FlushJobInfo;
// keeps a list of immutable memtables in a vector. the list is immutable // keeps a list of immutable memtables in a vector. the list is immutable
// if refcount is bigger than one. It is used as a state for Get() and // if refcount is bigger than one. It is used as a state for Get() and
// Iterator code paths // Iterator code paths
@ -254,7 +256,8 @@ class MemTableList {
const autovector<MemTable*>& m, LogsWithPrepTracker* prep_tracker, const autovector<MemTable*>& m, LogsWithPrepTracker* prep_tracker,
VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
autovector<MemTable*>* to_delete, Directory* db_directory, autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer); LogBuffer* log_buffer,
std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info);
// New memtables are inserted at the front of the list. // New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add(). // Takes ownership of the referenced held on *m by the caller of Add().

@ -117,9 +117,11 @@ class MemTableListTest : public testing::Test {
// Create dummy mutex. // Create dummy mutex.
InstrumentedMutex mutex; InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex); InstrumentedMutexLock l(&mutex);
return list->TryInstallMemtableFlushResults( std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info;
Status s = list->TryInstallMemtableFlushResults(
cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex, cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
file_num, to_delete, nullptr, &log_buffer); file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info);
return s;
} }
// Calls MemTableList::InstallMemtableFlushResults() and sets up all // Calls MemTableList::InstallMemtableFlushResults() and sets up all

@ -459,6 +459,7 @@ class EventListener {
#else #else
class EventListener {}; class EventListener {};
struct FlushJobInfo {};
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

Loading…
Cancel
Save