From e633983cf1e4f72476520ca27b4d7bd5ed138843 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Mon, 15 Oct 2018 19:59:20 -0700 Subject: [PATCH] Add support to flush multiple CFs atomically (#4262) Summary: Leverage existing `FlushJob` to implement atomic flush of multiple column families. This PR depends on other PRs and is a subset of #3752 . This PR itself is not sufficient in fulfilling atomic flush. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4262 Differential Revision: D9283109 Pulled By: riversand963 fbshipit-source-id: 65401f913e4160b0a61c0be6cd02adc15dad28ed --- db/db_impl.cc | 3 +- db/db_impl.h | 22 +- db/db_impl_compaction_flush.cc | 201 ++++++++++++- db/flush_job.cc | 16 +- db/flush_job.h | 33 ++- db/flush_job_test.cc | 232 +++++++++++++-- db/memtable.cc | 5 +- db/memtable.h | 14 + db/memtable_list.cc | 231 ++++++++++++++- db/memtable_list.h | 18 +- db/memtable_list_test.cc | 505 ++++++++++++++++++++++++++++++--- db/version_set.cc | 10 +- db/version_set.h | 35 ++- db/version_set_test.cc | 18 +- 14 files changed, 1242 insertions(+), 101 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index ab8b248f3..1c09e22d0 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -219,7 +219,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, own_sfm_(options.sst_file_manager == nullptr), preserve_deletes_(options.preserve_deletes), closed_(false), - error_handler_(this, immutable_db_options_, &mutex_) { + error_handler_(this, immutable_db_options_, &mutex_), + atomic_flush_commit_in_progress_(false) { // !batch_per_trx_ implies seq_per_batch_ because it is only unset for // WriteUnprepared, which should use seq_per_batch_. assert(batch_per_txn_ || seq_per_batch_); diff --git a/db/db_impl.h b/db/db_impl.h index de87d61de..6ada36d53 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -908,18 +908,18 @@ class DBImpl : public DB { // Argument required by background flush thread. struct BGFlushArg { BGFlushArg() - : cfd_(nullptr), memtable_id_(0), superversion_context_(nullptr) {} - BGFlushArg(ColumnFamilyData* cfd, uint64_t memtable_id, + : cfd_(nullptr), max_memtable_id_(0), superversion_context_(nullptr) {} + BGFlushArg(ColumnFamilyData* cfd, uint64_t max_memtable_id, SuperVersionContext* superversion_context) : cfd_(cfd), - memtable_id_(memtable_id), + max_memtable_id_(max_memtable_id), superversion_context_(superversion_context) {} // Column family to flush. ColumnFamilyData* cfd_; // Maximum ID of memtable to flush. In this column family, memtables with // IDs smaller than this value must be flushed before this flush completes. - uint64_t memtable_id_; + uint64_t max_memtable_id_; // Pointer to a SuperVersionContext object. After flush completes, RocksDB // installs a new superversion for the column family. This operation // requires a SuperVersionContext object (currently embedded in JobContext). @@ -932,6 +932,10 @@ class DBImpl : public DB { const autovector& bg_flush_args, bool* made_progress, JobContext* job_context, LogBuffer* log_buffer); + Status AtomicFlushMemTablesToOutputFiles( + const autovector& bg_flush_args, bool* made_progress, + JobContext* job_context, LogBuffer* log_buffer); + // REQUIRES: log_numbers are sorted in ascending order Status RecoverLogFiles(const std::vector& log_numbers, SequenceNumber* next_sequence, bool read_only); @@ -1579,6 +1583,16 @@ class DBImpl : public DB { bool closed_; ErrorHandler error_handler_; + + // True if the DB is committing atomic flush. + // TODO (yanqin) the current impl assumes that the entire DB belongs to + // a single atomic flush group. In the future we need to add a new class + // (struct) similar to the following to make it more general. + // struct AtomicFlushGroup { + // bool commit_in_progress_; + // std::vector imm_lists; + // }; + bool atomic_flush_commit_in_progress_; }; extern Options SanitizeOptions(const std::string& db, diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 6297c7bed..b436987e9 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -125,11 +125,13 @@ Status DBImpl::FlushMemTableToOutputFile( } FlushJob flush_job( dbname_, cfd, immutable_db_options_, mutable_cf_options, - env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, - snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, - job_context, log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U), + nullptr /* memtable_id */, env_options_for_compaction_, versions_.get(), + &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, + snapshot_checker, job_context, log_buffer, directories_.GetDbDir(), + GetDataDir(cfd, 0U), GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, - &event_logger_, mutable_cf_options.report_bg_io_stats); + &event_logger_, mutable_cf_options.report_bg_io_stats, + true /* sync_output_directory */, true /* write_manifest */); FileMetaData file_meta; @@ -169,7 +171,7 @@ Status DBImpl::FlushMemTableToOutputFile( InstallSuperVersionAndScheduleWork(cfd, superversion_context, mutable_cf_options); if (made_progress) { - *made_progress = 1; + *made_progress = true; } VersionStorageInfo::LevelSummaryStorage tmp; ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", @@ -225,6 +227,194 @@ Status DBImpl::FlushMemTablesToOutputFiles( return s; } +/* + * Atomically flushes multiple column families. + * + * For each column family, all memtables with ID smaller than or equal to the + * ID specified in bg_flush_args will be flushed. Only after all column + * families finish flush will this function commit to MANIFEST. If any of the + * column families are not flushed successfully, this function does not have + * any side-effect on the state of the database. + */ +Status DBImpl::AtomicFlushMemTablesToOutputFiles( + const autovector& bg_flush_args, bool* made_progress, + JobContext* job_context, LogBuffer* log_buffer) { + mutex_.AssertHeld(); + + autovector cfds; + for (const auto& arg : bg_flush_args) { + cfds.emplace_back(arg.cfd_); + } + +#ifndef NDEBUG + for (const auto cfd : cfds) { + assert(cfd->imm()->NumNotFlushed() != 0); + assert(cfd->imm()->IsFlushPending()); + } +#endif /* !NDEBUG */ + + SequenceNumber earliest_write_conflict_snapshot; + std::vector snapshot_seqs = + snapshots_.GetAll(&earliest_write_conflict_snapshot); + + auto snapshot_checker = snapshot_checker_.get(); + if (use_custom_gc_ && snapshot_checker == nullptr) { + snapshot_checker = DisableGCSnapshotChecker::Instance(); + } + autovector distinct_output_dirs; + std::vector jobs; + int num_cfs = static_cast(cfds.size()); + for (int i = 0; i < num_cfs; ++i) { + auto cfd = cfds[i]; + Directory* data_dir = GetDataDir(cfd, 0U); + + // Add to distinct output directories if eligible. Use linear search. Since + // the number of elements in the vector is not large, performance should be + // tolerable. + bool found = false; + for (const auto dir : distinct_output_dirs) { + if (dir == data_dir) { + found = true; + break; + } + } + if (!found) { + distinct_output_dirs.emplace_back(data_dir); + } + + const MutableCFOptions& mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); + const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_); + jobs.emplace_back( + dbname_, cfds[i], immutable_db_options_, mutable_cf_options, + max_memtable_id, env_options_for_compaction_, versions_.get(), &mutex_, + &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, + snapshot_checker, job_context, log_buffer, directories_.GetDbDir(), + data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), + stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, + false /* sync_output_directory */, false /* write_manifest */); + jobs.back().PickMemTable(); + } + + autovector file_meta; + Status s; + assert(num_cfs == static_cast(jobs.size())); + + for (int i = 0; i != num_cfs; ++i) { + file_meta.emplace_back(FileMetaData()); + +#ifndef ROCKSDB_LITE + const MutableCFOptions& mutable_cf_options = + *cfds[i]->GetLatestMutableCFOptions(); + // may temporarily unlock and lock the mutex. + NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options, + job_context->job_id, jobs[i].GetTableProperties()); +#endif /* !ROCKSDB_LITE */ + } + + if (logfile_number_ > 0) { + // TODO (yanqin) investigate whether we should sync the closed logs for + // single column family case. + s = SyncClosedLogs(job_context); + } + + if (s.ok()) { + // TODO (yanqin): parallelize jobs with threads. + for (int i = 0; i != num_cfs; ++i) { + s = jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]); + if (!s.ok()) { + break; + } + } + } + + if (s.ok()) { + // Sync on all distinct output directories. + for (auto dir : distinct_output_dirs) { + if (dir != nullptr) { + s = dir->Fsync(); + if (!s.ok()) { + break; + } + } + } + + if (s.ok()) { + autovector*> mems_list; + for (int i = 0; i != num_cfs; ++i) { + const auto& mems = jobs[i].GetMemTables(); + mems_list.emplace_back(&mems); + } + autovector all_cfds; + autovector imm_lists; + autovector mutable_cf_options_list; + for (auto cfd : *versions_->GetColumnFamilySet()) { + all_cfds.emplace_back(cfd); + imm_lists.emplace_back(cfd->imm()); + mutable_cf_options_list.emplace_back(cfd->GetLatestMutableCFOptions()); + } + + s = MemTableList::TryInstallMemtableFlushResults( + imm_lists, all_cfds, mutable_cf_options_list, mems_list, + &atomic_flush_commit_in_progress_, &logs_with_prep_tracker_, + versions_.get(), &mutex_, file_meta, &job_context->memtables_to_free, + directories_.GetDbDir(), log_buffer); + } + } + + if (s.ok()) { + assert(num_cfs == + static_cast(job_context->superversion_contexts.size())); + for (int i = 0; i != num_cfs; ++i) { + InstallSuperVersionAndScheduleWork(cfds[i], + &job_context->superversion_contexts[i], + *cfds[i]->GetLatestMutableCFOptions()); + VersionStorageInfo::LevelSummaryStorage tmp; + ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", + cfds[i]->GetName().c_str(), + cfds[i]->current()->storage_info()->LevelSummary(&tmp)); + } + if (made_progress) { + *made_progress = true; + } +#ifndef ROCKSDB_LITE + auto sfm = static_cast( + immutable_db_options_.sst_file_manager.get()); + for (int i = 0; i != num_cfs; ++i) { + NotifyOnFlushCompleted(cfds[i], &file_meta[i], + *cfds[i]->GetLatestMutableCFOptions(), + job_context->job_id, jobs[i].GetTableProperties()); + if (sfm) { + std::string file_path = MakeTableFileName( + cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber()); + sfm->OnAddFile(file_path); + if (sfm->IsMaxAllowedSpaceReached() && + error_handler_.GetBGError().ok()) { + Status new_bg_error = + Status::SpaceLimit("Max allowed space was reached"); + error_handler_.SetBGError(new_bg_error, + BackgroundErrorReason::kFlush); + } + } + } +#endif // ROCKSDB_LITE + } + + if (!s.ok()) { + for (int i = 0; i != num_cfs; ++i) { + auto& mems = jobs[i].GetMemTables(); + cfds[i]->imm()->RollbackMemtableFlush(mems, file_meta[i].fd.GetNumber()); + jobs[i].Cancel(); + } + if (!s.IsShutdownInProgress()) { + Status new_bg_error = s; + error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); + } + } + + return s; +} + void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, int job_id, TableProperties prop) { @@ -983,7 +1173,6 @@ Status DBImpl::Flush(const FlushOptions& flush_options, return s; } - Status DBImpl::FlushAllCFs(FlushReason flush_reason) { Status s; WriteContext context; diff --git a/db/flush_job.cc b/db/flush_job.cc index ca7f7c911..e34d3161c 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -85,11 +85,11 @@ const char* GetFlushReasonString (FlushReason flush_reason) { } } - FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, const ImmutableDBOptions& db_options, const MutableCFOptions& mutable_cf_options, - const EnvOptions env_options, VersionSet* versions, + const uint64_t* max_memtable_id, + const EnvOptions& env_options, VersionSet* versions, InstrumentedMutex* db_mutex, std::atomic* shutting_down, std::vector existing_snapshots, @@ -98,11 +98,13 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, LogBuffer* log_buffer, Directory* db_directory, Directory* output_file_directory, CompressionType output_compression, Statistics* stats, - EventLogger* event_logger, bool measure_io_stats) + EventLogger* event_logger, bool measure_io_stats, + const bool sync_output_directory, const bool write_manifest) : dbname_(dbname), cfd_(cfd), db_options_(db_options), mutable_cf_options_(mutable_cf_options), + max_memtable_id_(max_memtable_id), env_options_(env_options), versions_(versions), db_mutex_(db_mutex), @@ -118,6 +120,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, stats_(stats), event_logger_(event_logger), measure_io_stats_(measure_io_stats), + sync_output_directory_(sync_output_directory), + write_manifest_(write_manifest), edit_(nullptr), base_(nullptr), pick_memtable_called(false) { @@ -162,7 +166,7 @@ void FlushJob::PickMemTable() { assert(!pick_memtable_called); pick_memtable_called = true; // Save the contents of the earliest memtable as a new Table - cfd_->imm()->PickMemtablesToFlush(&mems_); + cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_); if (mems_.empty()) { return; } @@ -226,7 +230,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, if (!s.ok()) { cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber()); - } else { + } else if (write_manifest_) { TEST_SYNC_POINT("FlushJob::InstallResults"); // Replace immutable memtable with the generated Table s = cfd_->imm()->TryInstallMemtableFlushResults( @@ -373,7 +377,7 @@ Status FlushJob::WriteLevel0Table() { s.ToString().c_str(), meta_.marked_for_compaction ? " (needs compaction)" : ""); - if (s.ok() && output_file_directory_ != nullptr) { + if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) { s = output_file_directory_->Fsync(); } TEST_SYNC_POINT("FlushJob::WriteLevel0Table"); diff --git a/db/flush_job.h b/db/flush_job.h index c3115c4a6..d993e410d 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -59,14 +59,16 @@ class FlushJob { FlushJob(const std::string& dbname, ColumnFamilyData* cfd, const ImmutableDBOptions& db_options, const MutableCFOptions& mutable_cf_options, - const EnvOptions env_options, VersionSet* versions, - InstrumentedMutex* db_mutex, std::atomic* shutting_down, + const uint64_t* max_memtable_id, const EnvOptions& env_options, + VersionSet* versions, InstrumentedMutex* db_mutex, + std::atomic* shutting_down, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, SnapshotChecker* snapshot_checker, JobContext* job_context, LogBuffer* log_buffer, Directory* db_directory, Directory* output_file_directory, CompressionType output_compression, - Statistics* stats, EventLogger* event_logger, bool measure_io_stats); + Statistics* stats, EventLogger* event_logger, bool measure_io_stats, + const bool sync_output_directory, const bool write_manifest); ~FlushJob(); @@ -77,16 +79,24 @@ class FlushJob { FileMetaData* file_meta = nullptr); void Cancel(); TableProperties GetTableProperties() const { return table_properties_; } + const autovector& GetMemTables() const { return mems_; } private: void ReportStartedFlush(); void ReportFlushInputSize(const autovector& mems); void RecordFlushIOStats(); Status WriteLevel0Table(); + const std::string& dbname_; ColumnFamilyData* cfd_; const ImmutableDBOptions& db_options_; const MutableCFOptions& mutable_cf_options_; + // Pointer to a variable storing the largest memtable id to flush in this + // flush job. RocksDB uses this variable to select the memtables to flush in + // this job. All memtables in this column family with an ID smaller than or + // equal to *max_memtable_id_ will be selected for flush. If null, then all + // memtables in the column family will be selected. + const uint64_t* max_memtable_id_; const EnvOptions env_options_; VersionSet* versions_; InstrumentedMutex* db_mutex_; @@ -103,6 +113,23 @@ class FlushJob { EventLogger* event_logger_; TableProperties table_properties_; bool measure_io_stats_; + // True if this flush job should call fsync on the output directory. False + // otherwise. + // Usually sync_output_directory_ is true. A flush job needs to call sync on + // the output directory before committing to the MANIFEST. + // However, an individual flush job does not have to call sync on the output + // directory if it is part of an atomic flush. After all flush jobs in the + // atomic flush succeed, call sync once on each distinct output directory. + const bool sync_output_directory_; + // True if this flush job should write to MANIFEST after successfully + // flushing memtables. False otherwise. + // Usually write_manifest_ is true. A flush job commits to the MANIFEST after + // flushing the memtables. + // However, an individual flush job cannot rashly write to the MANIFEST + // immediately after it finishes the flush if it is part of an atomic flush. + // In this case, only after all flush jobs succeed in flush can RocksDB + // commit to the MANIFEST. + const bool write_manifest_; // Variables below are set by PickMemTable(): FileMetaData meta_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 041edeaa4..79f90c6d9 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -30,6 +30,7 @@ class FlushJobTest : public testing::Test { dbname_(test::PerThreadDBPath("flush_job_test")), options_(), db_options_(options_), + column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}), table_cache_(NewLRUCache(50000, 16)), write_buffer_manager_(db_options_.db_write_buffer_size), versions_(new VersionSet(dbname_, &db_options_, env_options_, @@ -45,7 +46,9 @@ class FlushJobTest : public testing::Test { NewDB(); std::vector column_families; cf_options_.table_factory = mock_table_factory_; - column_families.emplace_back(kDefaultColumnFamilyName, cf_options_); + for (const auto& cf_name : column_family_names_) { + column_families.emplace_back(cf_name, cf_options_); + } EXPECT_OK(versions_->Recover(column_families, false)); } @@ -56,6 +59,19 @@ class FlushJobTest : public testing::Test { new_db.SetNextFile(2); new_db.SetLastSequence(0); + autovector new_cfs; + SequenceNumber last_seq = 1; + uint32_t cf_id = 1; + for (size_t i = 1; i != column_family_names_.size(); ++i) { + VersionEdit new_cf; + new_cf.AddColumnFamily(column_family_names_[i]); + new_cf.SetColumnFamily(cf_id++); + new_cf.SetLogNumber(0); + new_cf.SetNextFile(2); + new_cf.SetLastSequence(last_seq++); + new_cfs.emplace_back(new_cf); + } + const std::string manifest = DescriptorFileName(dbname_, 1); unique_ptr file; Status s = env_->NewWritableFile( @@ -68,6 +84,13 @@ class FlushJobTest : public testing::Test { std::string record; new_db.EncodeTo(&record); s = log.AddRecord(record); + + for (const auto& e : new_cfs) { + record.clear(); + e.EncodeTo(&record); + s = log.AddRecord(record); + ASSERT_OK(s); + } } ASSERT_OK(s); // Make "CURRENT" file that points to the new manifest file. @@ -79,6 +102,7 @@ class FlushJobTest : public testing::Test { EnvOptions env_options_; Options options_; ImmutableDBOptions db_options_; + const std::vector column_family_names_; std::shared_ptr table_cache_; WriteController write_controller_; WriteBufferManager write_buffer_manager_; @@ -96,9 +120,11 @@ TEST_F(FlushJobTest, Empty) { SnapshotChecker* snapshot_checker = nullptr; // not relavant FlushJob flush_job( dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, - &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, &job_context, - nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger, false); + *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, + env_options_, versions_.get(), &mutex_, &shutting_down_, {}, + kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, + nullptr, kNoCompression, nullptr, &event_logger, false, + true /* sync_output_directory */, true /* write_manifest */); { InstrumentedMutexLock l(&mutex_); flush_job.PickMemTable(); @@ -139,12 +165,13 @@ TEST_F(FlushJobTest, NonEmpty) { EventLogger event_logger(db_options_.info_log.get()); SnapshotChecker* snapshot_checker = nullptr; // not relavant - FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), - db_options_, *cfd->GetLatestMutableCFOptions(), - env_options_, versions_.get(), &mutex_, &shutting_down_, - {}, kMaxSequenceNumber, snapshot_checker, &job_context, - nullptr, nullptr, nullptr, kNoCompression, - db_options_.statistics.get(), &event_logger, true); + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, + env_options_, versions_.get(), &mutex_, &shutting_down_, {}, + kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, + nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, + true, true /* sync_output_directory */, true /* write_manifest */); HistogramData hist; FileMetaData file_meta; @@ -165,6 +192,178 @@ TEST_F(FlushJobTest, NonEmpty) { job_context.Clean(); } +TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) { + const size_t num_mems = 2; + const size_t num_mems_to_flush = 1; + const size_t num_keys_per_table = 100; + JobContext job_context(0); + ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); + std::vector memtable_ids; + std::vector new_mems; + for (size_t i = 0; i != num_mems; ++i) { + MemTable* mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), + kMaxSequenceNumber); + mem->SetID(i); + mem->Ref(); + new_mems.emplace_back(mem); + memtable_ids.push_back(mem->GetID()); + + for (size_t j = 0; j < num_keys_per_table; ++j) { + std::string key(ToString(j + i * num_keys_per_table)); + std::string value("value" + key); + mem->Add(SequenceNumber(j + i * num_keys_per_table), kTypeValue, key, + value); + } + } + + autovector to_delete; + for (auto mem : new_mems) { + cfd->imm()->Add(mem, &to_delete); + } + + EventLogger event_logger(db_options_.info_log.get()); + SnapshotChecker* snapshot_checker = nullptr; // not relavant + + assert(memtable_ids.size() == num_mems); + uint64_t smallest_memtable_id = memtable_ids.front(); + uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1; + + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), &flush_memtable_id, env_options_, + versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, + snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + true /* sync_output_directory */, true /* write_manifest */); + HistogramData hist; + FileMetaData file_meta; + mutex_.Lock(); + flush_job.PickMemTable(); + ASSERT_OK(flush_job.Run(nullptr /* prep_tracker */, &file_meta)); + mutex_.Unlock(); + db_options_.statistics->histogramData(FLUSH_TIME, &hist); + ASSERT_GT(hist.average, 0.0); + + ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString()); + ASSERT_EQ("99", file_meta.largest.user_key().ToString()); + ASSERT_EQ(0, file_meta.fd.smallest_seqno); + ASSERT_EQ(SequenceNumber(num_mems_to_flush * num_keys_per_table - 1), + file_meta.fd.largest_seqno); + + for (auto m : to_delete) { + delete m; + } + to_delete.clear(); + job_context.Clean(); +} + +TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { + autovector all_cfds; + for (auto cfd : *versions_->GetColumnFamilySet()) { + all_cfds.push_back(cfd); + } + const std::vector num_memtables = {2, 1, 3}; + assert(num_memtables.size() == column_family_names_.size()); + const size_t num_keys_per_memtable = 1000; + JobContext job_context(0); + std::vector memtable_ids; + std::vector smallest_seqs; + std::vector largest_seqs; + autovector to_delete; + SequenceNumber curr_seqno = 0; + size_t k = 0; + for (auto cfd : all_cfds) { + smallest_seqs.push_back(curr_seqno); + for (size_t i = 0; i != num_memtables[k]; ++i) { + MemTable* mem = cfd->ConstructNewMemtable( + *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); + mem->SetID(i); + mem->Ref(); + mem->TEST_AtomicFlushSequenceNumber() = 123; + + for (size_t j = 0; j != num_keys_per_memtable; ++j) { + std::string key(ToString(j + i * num_keys_per_memtable)); + std::string value("value" + key); + mem->Add(curr_seqno++, kTypeValue, key, value); + } + + cfd->imm()->Add(mem, &to_delete); + } + largest_seqs.push_back(curr_seqno - 1); + memtable_ids.push_back(num_memtables[k++] - 1); + } + + EventLogger event_logger(db_options_.info_log.get()); + SnapshotChecker* snapshot_checker = nullptr; // not relevant + std::vector flush_jobs; + k = 0; + for (auto cfd : all_cfds) { + std::vector snapshot_seqs; + flush_jobs.emplace_back( + dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), + &memtable_ids[k], env_options_, versions_.get(), &mutex_, + &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker, + &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + false /* sync_output_directory */, false /* write_manifest */); + k++; + } + HistogramData hist; + autovector file_metas; + mutex_.Lock(); + for (auto& job : flush_jobs) { + job.PickMemTable(); + } + for (auto& job : flush_jobs) { + FileMetaData meta; + // Run will release and re-acquire mutex + ASSERT_OK(job.Run(nullptr /**/, &meta)); + file_metas.emplace_back(meta); + } + autovector*> mems_list; + for (size_t i = 0; i != all_cfds.size(); ++i) { + const auto& mems = flush_jobs[i].GetMemTables(); + mems_list.push_back(&mems); + } + autovector imm_lists; + autovector mutable_cf_options_list; + for (auto cfd : all_cfds) { + imm_lists.push_back(cfd->imm()); + mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions()); + } + + bool atomic_flush_commit_in_progress = false; + Status s = MemTableList::TryInstallMemtableFlushResults( + imm_lists, all_cfds, mutable_cf_options_list, mems_list, + &atomic_flush_commit_in_progress, nullptr /* logs_prep_tracker */, + versions_.get(), &mutex_, file_metas, &job_context.memtables_to_free, + nullptr /* db_directory */, nullptr /* log_buffer */); + ASSERT_OK(s); + + mutex_.Unlock(); + db_options_.statistics->histogramData(FLUSH_TIME, &hist); + ASSERT_GT(hist.average, 0.0); + k = 0; + for (const auto& file_meta : file_metas) { + ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString()); + ASSERT_EQ("999", file_meta.largest.user_key() + .ToString()); // max key by bytewise comparator + ASSERT_EQ(smallest_seqs[k], file_meta.fd.smallest_seqno); + ASSERT_EQ(largest_seqs[k], file_meta.fd.largest_seqno); + // Verify that imm is empty + ASSERT_EQ(std::numeric_limits::max(), + all_cfds[k]->imm()->GetEarliestMemTableID()); + ASSERT_EQ(0, all_cfds[k]->imm()->GetLatestMemTableID()); + ++k; + } + + for (auto m : to_delete) { + delete m; + } + to_delete.clear(); + job_context.Clean(); +} + TEST_F(FlushJobTest, Snapshots) { JobContext job_context(0); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); @@ -213,12 +412,13 @@ TEST_F(FlushJobTest, Snapshots) { EventLogger event_logger(db_options_.info_log.get()); SnapshotChecker* snapshot_checker = nullptr; // not relavant - FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), - db_options_, *cfd->GetLatestMutableCFOptions(), - env_options_, versions_.get(), &mutex_, &shutting_down_, - snapshots, kMaxSequenceNumber, snapshot_checker, - &job_context, nullptr, nullptr, nullptr, kNoCompression, - db_options_.statistics.get(), &event_logger, true); + FlushJob flush_job( + dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, + *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, + env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots, + kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, + nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, + true, true /* sync_output_directory */, true /* write_manifest */); mutex_.Lock(); flush_job.PickMemTable(); ASSERT_OK(flush_job.Run()); diff --git a/db/memtable.cc b/db/memtable.cc index 70e6d9da5..e2400414d 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -101,7 +101,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, env_(ioptions.env), insert_with_hint_prefix_extractor_( ioptions.memtable_insert_with_hint_prefix_extractor), - oldest_key_time_(std::numeric_limits::max()) { + oldest_key_time_(std::numeric_limits::max()), + atomic_flush_seqno_(kMaxSequenceNumber) { UpdateFlushState(); // something went wrong if we need to flush before inserting anything assert(!ShouldScheduleFlush()); @@ -640,7 +641,7 @@ static bool SaveValue(void* arg, const char* entry) { *(s->found_final_value) = true; return false; } - FALLTHROUGH_INTENDED; + FALLTHROUGH_INTENDED; case kTypeValue: { if (s->inplace_update_support) { s->mem->GetLock(s->key->user_key())->ReadLock(); diff --git a/db/memtable.h b/db/memtable.h index e45941007..57751c922 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -383,6 +383,14 @@ class MemTable { uint64_t GetID() const { return id_; } + SequenceNumber& TEST_AtomicFlushSequenceNumber() { + return atomic_flush_seqno_; + } + + void TEST_SetFlushCompleted(bool completed) { flush_completed_ = completed; } + + void TEST_SetFileNumber(uint64_t file_num) { file_number_ = file_num; } + private: enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; @@ -455,6 +463,12 @@ class MemTable { // Memtable id to track flush. uint64_t id_ = 0; + // Sequence number of the atomic flush that is responsible for this memtable. + // The sequence number of atomic flush is a seq, such that no writes with + // sequence numbers greater than or equal to seq are flushed, while all + // writes with sequence number smaller than seq are flushed. + SequenceNumber atomic_flush_seqno_; + // Returns a heuristic flush decision bool ShouldFlushNow() const; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 089f4a2b6..79e87c03f 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -11,6 +11,7 @@ #include #include +#include #include #include "db/db_impl.h" #include "db/memtable.h" @@ -269,10 +270,232 @@ void MemTableListVersion::TrimHistory(autovector* to_delete) { } } +// Try to record multiple successful flush to the MANIFEST as an atomic unit. +// This function may just return Status::OK if there has already been +// a concurrent thread performing actual recording. +Status MemTableList::TryInstallMemtableFlushResults( + autovector& imm_lists, + const autovector& cfds, + const autovector& mutable_cf_options_list, + const autovector*>& mems_list, + bool* atomic_flush_commit_in_progress, LogsWithPrepTracker* prep_tracker, + VersionSet* vset, InstrumentedMutex* mu, + const autovector& file_metas, + autovector* to_delete, Directory* db_directory, + LogBuffer* log_buffer) { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); + mu->AssertHeld(); + + for (size_t k = 0; k != mems_list.size(); ++k) { + for (size_t i = 0; i != mems_list[k]->size(); ++i) { + assert(i == 0 || (*mems_list[k])[i]->GetEdits()->NumEntries() == 0); + (*mems_list[k])[i]->flush_completed_ = true; + (*mems_list[k])[i]->file_number_ = file_metas[k].fd.GetNumber(); + } + } + + assert(atomic_flush_commit_in_progress != nullptr); + Status s; + if (*atomic_flush_commit_in_progress) { + // If the function reaches here, there must be a concurrent thread that + // have already started recording to MANIFEST. Therefore we should just + // return Status::OK and let the othe thread finish writing to MANIFEST on + // our behalf. + return s; + } + + // If the function reaches here, the current thread will start writing to + // MANIFEST. It may record to MANIFEST the flush results of other flushes. + *atomic_flush_commit_in_progress = true; + + auto comp = [&imm_lists](size_t lh, size_t rh) { + const auto& memlist1 = imm_lists[lh]->current_->memlist_; + const auto& memlist2 = imm_lists[rh]->current_->memlist_; + auto it1 = memlist1.rbegin(); + auto it2 = memlist2.rbegin(); + return (*it1)->atomic_flush_seqno_ > (*it2)->atomic_flush_seqno_; + }; + // The top of the heap is the memtable with smallest atomic_flush_seqno_. + std::priority_queue, decltype(comp)> heap(comp); + // Sequence number of the oldest unfinished atomic flush. + SequenceNumber min_unfinished_seqno = kMaxSequenceNumber; + // Populate the heap with first element of each imm iff. it has been + // flushed to storage, i.e. flush_completed_ is true. + size_t num = imm_lists.size(); + assert(num == cfds.size()); + for (size_t i = 0; i != num; ++i) { + std::list& memlist = imm_lists[i]->current_->memlist_; + if (memlist.empty()) { + continue; + } + auto it = memlist.rbegin(); + if ((*it)->flush_completed_) { + heap.emplace(i); + } else if (min_unfinished_seqno > (*it)->atomic_flush_seqno_) { + min_unfinished_seqno = (*it)->atomic_flush_seqno_; + } + } + + while (s.ok() && !heap.empty()) { + autovector batch; + SequenceNumber seqno = kMaxSequenceNumber; + // Pop from the heap the memtables that belong to the same atomic flush, + // namely their atomic_flush_seqno_ are equal. + do { + size_t pos = heap.top(); + const auto& memlist = imm_lists[pos]->current_->memlist_; + MemTable* mem = *(memlist.rbegin()); + if (seqno == kMaxSequenceNumber) { + // First mem in this batch. + seqno = mem->atomic_flush_seqno_; + batch.emplace_back(pos); + heap.pop(); + } else if (mem->atomic_flush_seqno_ == seqno) { + // mem has the same atomic_flush_seqno_, thus in the same atomic flush. + batch.emplace_back(pos); + heap.pop(); + } else if (mem->atomic_flush_seqno_ > seqno) { + // mem belongs to another atomic flush with higher seqno, break the + // loop. + break; + } + } while (!heap.empty()); + if (seqno >= min_unfinished_seqno) { + // If there is an older, unfinished atomic flush, then we should not + // proceed. + TEST_SYNC_POINT_CALLBACK( + "MemTableList::TryInstallMemtableFlushResults:" + "HasOlderUnfinishedAtomicFlush:0", + nullptr); + break; + } + + // Found the earliest, complete atomic flush. No earlier atomic flush is + // pending. Therefore ready to record it to the MANIFEST. + uint32_t num_entries = 0; + autovector tmp_cfds; + autovector tmp_mutable_cf_options_list; + std::vector> memtables_to_flush; + autovector> edit_lists; + for (auto pos : batch) { + tmp_cfds.emplace_back(cfds[pos]); + tmp_mutable_cf_options_list.emplace_back(mutable_cf_options_list[pos]); + const auto& memlist = imm_lists[pos]->current_->memlist_; + uint64_t batch_file_number = 0; + autovector tmp_mems; + autovector edits; + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { + MemTable* m = *it; + if (!m->flush_completed_ || + (it != memlist.rbegin() && m->file_number_ != batch_file_number)) { + break; + } + if (it == memlist.rbegin()) { + batch_file_number = m->file_number_; + edits.push_back(m->GetEdits()); + ++num_entries; + } + tmp_mems.push_back(m); + } + edit_lists.push_back(edits); + memtables_to_flush.push_back(tmp_mems); + } + TEST_SYNC_POINT_CALLBACK( + "MemTableList::TryInstallMemtableFlushResults:FoundBatchToCommit:0", + &num_entries); + + // Mark the version edits as an atomic group + uint32_t remaining = num_entries; + for (auto& edit_list : edit_lists) { + assert(edit_list.size() == 1); + edit_list[0]->MarkAtomicGroup(--remaining); + } + assert(remaining == 0); + + size_t batch_sz = batch.size(); + assert(batch_sz > 0); + assert(batch_sz == memtables_to_flush.size()); + assert(batch_sz == tmp_cfds.size()); + assert(batch_sz == edit_lists.size()); + + if (vset->db_options()->allow_2pc) { + for (size_t i = 0; i != batch_sz; ++i) { + auto& edit_list = edit_lists[i]; + assert(!edit_list.empty()); + edit_list.back()->SetMinLogNumberToKeep( + PrecomputeMinLogNumberToKeep(vset, *tmp_cfds[i], edit_list, + memtables_to_flush[i], prep_tracker)); + } + } + // this can release and reacquire the mutex. + s = vset->LogAndApply(tmp_cfds, tmp_mutable_cf_options_list, edit_lists, mu, + db_directory); + + for (const auto pos : batch) { + imm_lists[pos]->InstallNewVersion(); + } + + if (s.ok()) { + for (size_t i = 0; i != batch_sz; ++i) { + if (tmp_cfds[i]->IsDropped()) { + continue; + } + size_t pos = batch[i]; + for (auto m : memtables_to_flush[i]) { + assert(m->file_number_ > 0); + uint64_t mem_id = m->GetID(); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " done", + tmp_cfds[i]->GetName().c_str(), m->file_number_, + mem_id); + imm_lists[pos]->current_->Remove(m, to_delete); + } + } + } else { + for (size_t i = 0; i != batch_sz; ++i) { + size_t pos = batch[i]; + for (auto m : memtables_to_flush[i]) { + uint64_t mem_id = m->GetID(); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " failed", + tmp_cfds[i]->GetName().c_str(), m->file_number_, + mem_id); + m->flush_completed_ = false; + m->flush_in_progress_ = false; + m->edit_.Clear(); + m->file_number_ = 0; + imm_lists[pos]->num_flush_not_started_++; + } + imm_lists[pos]->imm_flush_needed.store(true, std::memory_order_release); + } + } + // Adjust the heap AFTER installing new MemTableListVersions because the + // compare function 'comp' needs to capture the most up-to-date state of + // imm_lists. + for (auto pos : batch) { + const auto& memlist = imm_lists[pos]->current_->memlist_; + if (!memlist.empty()) { + MemTable* mem = *(memlist.rbegin()); + if (mem->flush_completed_) { + heap.emplace(pos); + } else if (min_unfinished_seqno > mem->atomic_flush_seqno_) { + min_unfinished_seqno = mem->atomic_flush_seqno_; + } + } + } + } + + *atomic_flush_commit_in_progress = false; + return s; +} + // Returns true if there is at least one memtable on which flush has // not yet started. bool MemTableList::IsFlushPending() const { - if ((flush_requested_ && num_flush_not_started_ >= 1) || + if ((flush_requested_ && num_flush_not_started_ > 0) || (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) { assert(imm_flush_needed.load(std::memory_order_relaxed)); return true; @@ -281,12 +504,16 @@ bool MemTableList::IsFlushPending() const { } // Returns the memtables that need to be flushed. -void MemTableList::PickMemtablesToFlush(autovector* ret) { +void MemTableList::PickMemtablesToFlush(const uint64_t* max_memtable_id, + autovector* ret) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); const auto& memlist = current_->memlist_; for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { MemTable* m = *it; + if (max_memtable_id != nullptr && m->GetID() > *max_memtable_id) { + break; + } if (!m->flush_in_progress_) { assert(!m->flush_completed_); num_flush_not_started_--; diff --git a/db/memtable_list.h b/db/memtable_list.h index cf524a00f..3f6f399a1 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -163,6 +163,18 @@ class MemTableListVersion { // write thread.) class MemTableList { public: + // Commit a successful atomic flush in the manifest file + static Status TryInstallMemtableFlushResults( + autovector& imm_lists, + const autovector& cfds, + const autovector& mutable_cf_options_list, + const autovector*>& mems_list, + bool* atomic_flush_commit_in_progress, LogsWithPrepTracker* prep_tracker, + VersionSet* vset, InstrumentedMutex* mu, + const autovector& file_meta, + autovector* to_delete, Directory* db_directory, + LogBuffer* log_buffer); + // A list of memtables. explicit MemTableList(int min_write_buffer_number_to_merge, int max_write_buffer_number_to_maintain) @@ -201,7 +213,8 @@ class MemTableList { // Returns the earliest memtables that needs to be flushed. The returned // memtables are guaranteed to be in the ascending order of created time. - void PickMemtablesToFlush(autovector* mems); + void PickMemtablesToFlush(const uint64_t* max_memtable_id, + autovector* mems); // Reset status of the given memtable list back to pending state so that // they can get picked up again on the next round of flush. @@ -281,7 +294,8 @@ class MemTableList { // committing in progress bool commit_in_progress_; - // Requested a flush of all memtables to storage + // Requested a flush of memtables to storage. It's possible to request that + // a subset of memtables be flushed. bool flush_requested_; // The current memory usage. diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 2672e982e..c6c5486d7 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -25,9 +25,13 @@ class MemTableListTest : public testing::Test { std::string dbname; DB* db; Options options; + std::vector handles; + std::atomic file_number; - MemTableListTest() : db(nullptr) { + MemTableListTest() : db(nullptr), file_number(1) { dbname = test::PerThreadDBPath("memtable_list_test"); + options.create_if_missing = true; + DestroyDB(dbname, options); } // Create a test db if not yet created @@ -35,15 +39,45 @@ class MemTableListTest : public testing::Test { if (db == nullptr) { options.create_if_missing = true; DestroyDB(dbname, options); - Status s = DB::Open(options, dbname, &db); + // Open DB only with default column family + ColumnFamilyOptions cf_options; + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, cf_options); + Status s = DB::Open(options, dbname, cf_descs, &handles, &db); EXPECT_OK(s); + + ColumnFamilyOptions cf_opt1, cf_opt2; + cf_opt1.cf_paths.emplace_back(dbname + "_one_1", + std::numeric_limits::max()); + cf_opt2.cf_paths.emplace_back(dbname + "_two_1", + std::numeric_limits::max()); + int sz = static_cast(handles.size()); + handles.resize(sz + 2); + s = db->CreateColumnFamily(cf_opt1, "one", &handles[1]); + EXPECT_OK(s); + s = db->CreateColumnFamily(cf_opt2, "two", &handles[2]); + EXPECT_OK(s); + + cf_descs.emplace_back("one", cf_options); + cf_descs.emplace_back("two", cf_options); } } ~MemTableListTest() { if (db) { + std::vector cf_descs(handles.size()); + for (int i = 0; i != static_cast(handles.size()); ++i) { + handles[i]->GetDescriptor(&cf_descs[i]); + } + for (auto h : handles) { + if (h) { + db->DestroyColumnFamilyHandle(h); + } + } + handles.clear(); delete db; - DestroyDB(dbname, options); + db = nullptr; + DestroyDB(dbname, options, cf_descs); } } @@ -52,10 +86,26 @@ class MemTableListTest : public testing::Test { Status Mock_InstallMemtableFlushResults( MemTableList* list, const MutableCFOptions& mutable_cf_options, const autovector& m, autovector* to_delete) { + autovector lists; + lists.emplace_back(list); + autovector*> mems_list; + mems_list.emplace_back(&m); + return Mock_InstallMemtableFlushResults( + lists, {0} /* cf_ids */, {&mutable_cf_options}, mems_list, to_delete); + } + + // Calls MemTableList::InstallMemtableFlushResults() and sets up all + // structures needed to call this function. + Status Mock_InstallMemtableFlushResults( + autovector& lists, const autovector& cf_ids, + const autovector& mutable_cf_options_list, + const autovector*>& mems_list, + autovector* to_delete) { // Create a mock Logger test::NullLogger logger; LogBuffer log_buffer(DEBUG_LEVEL, &logger); + CreateDB(); // Create a mock VersionSet DBOptions db_options; ImmutableDBOptions immutable_db_options(db_options); @@ -64,28 +114,58 @@ class MemTableListTest : public testing::Test { WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); WriteController write_controller(10000000u); - CreateDB(); VersionSet versions(dbname, &immutable_db_options, env_options, table_cache.get(), &write_buffer_manager, &write_controller); + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); + cf_descs.emplace_back("one", ColumnFamilyOptions()); + cf_descs.emplace_back("two", ColumnFamilyOptions()); + EXPECT_OK(versions.Recover(cf_descs, false)); // Create mock default ColumnFamilyData - ColumnFamilyOptions cf_options; - std::vector column_families; - column_families.emplace_back(kDefaultColumnFamilyName, cf_options); - EXPECT_OK(versions.Recover(column_families, false)); auto column_family_set = versions.GetColumnFamilySet(); - auto cfd = column_family_set->GetColumnFamily(0); - EXPECT_TRUE(cfd != nullptr); - // Create dummy mutex. + LogsWithPrepTracker dummy_prep_tracker; + if (1 == cf_ids.size()) { + auto cfd = column_family_set->GetColumnFamily(cf_ids[0]); + EXPECT_TRUE(nullptr != cfd); + EXPECT_EQ(1, lists.size()); + MemTableList* list = lists[0]; + EXPECT_EQ(1, mutable_cf_options_list.size()); + const MutableCFOptions& mutable_cf_options = + *(mutable_cf_options_list.at(0)); + const autovector* mems = mems_list.at(0); + EXPECT_TRUE(nullptr != mems); + + uint64_t file_num = file_number.fetch_add(1); + // Create dummy mutex. + InstrumentedMutex mutex; + InstrumentedMutexLock l(&mutex); + return list->TryInstallMemtableFlushResults( + cfd, mutable_cf_options, *mems, &dummy_prep_tracker, &versions, + &mutex, file_num, to_delete, nullptr, &log_buffer); + } + autovector cfds; + for (int i = 0; i != static_cast(cf_ids.size()); ++i) { + cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i])); + EXPECT_NE(nullptr, cfds[i]); + } + autovector file_metas; + for (size_t i = 0; i != cf_ids.size(); ++i) { + FileMetaData meta; + uint64_t file_num = file_number.fetch_add(1); + meta.fd = FileDescriptor(file_num, 0, 0); + file_metas.emplace_back(meta); + } + bool atomic_flush_commit_in_progress = false; InstrumentedMutex mutex; InstrumentedMutexLock l(&mutex); - LogsWithPrepTracker dummy_prep_tracker; - return list->TryInstallMemtableFlushResults( - cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex, 1, - to_delete, nullptr, &log_buffer); + return MemTableList::TryInstallMemtableFlushResults( + lists, cfds, mutable_cf_options_list, mems_list, + &atomic_flush_commit_in_progress, &dummy_prep_tracker, &versions, + &mutex, file_metas, to_delete, nullptr, &log_buffer); } }; @@ -98,7 +178,7 @@ TEST_F(MemTableListTest, Empty) { ASSERT_FALSE(list.IsFlushPending()); autovector mems; - list.PickMemtablesToFlush(&mems); + list.PickMemtablesToFlush(nullptr /* memtable_id */, &mems); ASSERT_EQ(0, mems.size()); autovector to_delete; @@ -281,11 +361,12 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { // Flush this memtable from the list. // (It will then be a part of the memtable history). autovector to_flush; - list.PickMemtablesToFlush(&to_flush); + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); ASSERT_EQ(1, to_flush.size()); - s = Mock_InstallMemtableFlushResults(&list, MutableCFOptions(options), - to_flush, &to_delete); + MutableCFOptions mutable_cf_options(options); + s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush, + &to_delete); ASSERT_OK(s); ASSERT_EQ(0, list.NumNotFlushed()); ASSERT_EQ(1, list.NumFlushed()); @@ -330,12 +411,12 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { ASSERT_EQ(0, to_delete.size()); to_flush.clear(); - list.PickMemtablesToFlush(&to_flush); + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); ASSERT_EQ(1, to_flush.size()); // Flush second memtable - s = Mock_InstallMemtableFlushResults(&list, MutableCFOptions(options), - to_flush, &to_delete); + s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush, + &to_delete); ASSERT_OK(s); ASSERT_EQ(0, list.NumNotFlushed()); ASSERT_EQ(2, list.NumFlushed()); @@ -396,7 +477,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) { } TEST_F(MemTableListTest, FlushPendingTest) { - const int num_tables = 5; + const int num_tables = 6; SequenceNumber seq = 1; Status s; @@ -414,11 +495,13 @@ TEST_F(MemTableListTest, FlushPendingTest) { max_write_buffer_number_to_maintain); // Create some MemTables + uint64_t memtable_id = 0; std::vector tables; MutableCFOptions mutable_cf_options(options); for (int i = 0; i < num_tables; i++) { MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb, kMaxSequenceNumber, 0 /* column_family_id */); + mem->SetID(memtable_id++); mem->Ref(); std::string value; @@ -437,7 +520,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_FALSE(list.IsFlushPending()); ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); autovector to_flush; - list.PickMemtablesToFlush(&to_flush); + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); ASSERT_EQ(0, to_flush.size()); // Request a flush even though there is nothing to flush @@ -446,7 +529,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); // Attempt to 'flush' to clear request for flush - list.PickMemtablesToFlush(&to_flush); + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); ASSERT_EQ(0, to_flush.size()); ASSERT_FALSE(list.IsFlushPending()); ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); @@ -470,7 +553,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); // Pick tables to flush - list.PickMemtablesToFlush(&to_flush); + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); ASSERT_EQ(2, to_flush.size()); ASSERT_EQ(2, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); @@ -491,7 +574,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_EQ(0, to_delete.size()); // Pick tables to flush - list.PickMemtablesToFlush(&to_flush); + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); ASSERT_EQ(3, to_flush.size()); ASSERT_EQ(3, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); @@ -499,7 +582,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { // Pick tables to flush again autovector to_flush2; - list.PickMemtablesToFlush(&to_flush2); + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2); ASSERT_EQ(0, to_flush2.size()); ASSERT_EQ(3, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); @@ -517,7 +600,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); // Pick tables to flush again - list.PickMemtablesToFlush(&to_flush2); + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2); ASSERT_EQ(1, to_flush2.size()); ASSERT_EQ(4, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); @@ -538,7 +621,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_EQ(0, to_delete.size()); // Pick tables to flush - list.PickMemtablesToFlush(&to_flush); + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush); // Should pick 4 of 5 since 1 table has been picked in to_flush2 ASSERT_EQ(4, to_flush.size()); ASSERT_EQ(5, list.NumNotFlushed()); @@ -547,14 +630,15 @@ TEST_F(MemTableListTest, FlushPendingTest) { // Pick tables to flush again autovector to_flush3; + list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3); ASSERT_EQ(0, to_flush3.size()); // nothing not in progress of being flushed ASSERT_EQ(5, list.NumNotFlushed()); ASSERT_FALSE(list.IsFlushPending()); ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); // Flush the 4 memtables that were picked in to_flush - s = Mock_InstallMemtableFlushResults(&list, MutableCFOptions(options), - to_flush, &to_delete); + s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush, + &to_delete); ASSERT_OK(s); // Note: now to_flush contains tables[0,1,2,4]. to_flush2 contains @@ -574,7 +658,7 @@ TEST_F(MemTableListTest, FlushPendingTest) { // Flush the 1 memtable that was picked in to_flush2 s = MemTableListTest::Mock_InstallMemtableFlushResults( - &list, MutableCFOptions(options), to_flush2, &to_delete); + &list, mutable_cf_options, to_flush2, &to_delete); ASSERT_OK(s); // This will actually install 2 tables. The 1 we told it to flush, and also @@ -593,8 +677,37 @@ TEST_F(MemTableListTest, FlushPendingTest) { } to_delete.clear(); + // Add another table + list.Add(tables[5], &to_delete); + ASSERT_EQ(1, list.NumNotFlushed()); + ASSERT_EQ(5, list.GetLatestMemTableID()); + memtable_id = 4; + // Pick tables to flush. The tables to pick must have ID smaller than or + // equal to 4. Therefore, no table will be selected in this case. + autovector to_flush4; + list.FlushRequested(); + ASSERT_TRUE(list.HasFlushRequested()); + list.PickMemtablesToFlush(&memtable_id, &to_flush4); + ASSERT_TRUE(to_flush4.empty()); + ASSERT_EQ(1, list.NumNotFlushed()); + ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_FALSE(list.HasFlushRequested()); + + // Pick tables to flush. The tables to pick must have ID smaller than or + // equal to 5. Therefore, only tables[5] will be selected. + memtable_id = 5; + list.FlushRequested(); + list.PickMemtablesToFlush(&memtable_id, &to_flush4); + ASSERT_EQ(1, static_cast(to_flush4.size())); + ASSERT_EQ(1, list.NumNotFlushed()); + ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire)); + ASSERT_FALSE(list.IsFlushPending()); + to_delete.clear(); + list.current()->Unref(&to_delete); - int to_delete_size = std::min(5, max_write_buffer_number_to_maintain); + int to_delete_size = + std::min(num_tables, max_write_buffer_number_to_maintain); ASSERT_EQ(to_delete_size, to_delete.size()); for (const auto& m : to_delete) { @@ -607,6 +720,330 @@ TEST_F(MemTableListTest, FlushPendingTest) { to_delete.clear(); } +TEST_F(MemTableListTest, FlushMultipleCFsTest) { + const int num_cfs = 3; + const int num_tables_per_cf = 5; + SequenceNumber seq = 1; + Status s; + + auto factory = std::make_shared(); + options.memtable_factory = factory; + ImmutableCFOptions ioptions(options); + InternalKeyComparator cmp(BytewiseComparator()); + WriteBufferManager wb(options.db_write_buffer_size); + autovector to_delete; + + // Create MemTableLists + int min_write_buffer_number_to_merge = 3; + int max_write_buffer_number_to_maintain = 7; + autovector lists; + for (int i = 0; i != num_cfs; ++i) { + lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge, + max_write_buffer_number_to_maintain)); + } + + autovector cf_ids; + std::vector> tables(num_cfs); + autovector mutable_cf_options_list; + uint32_t cf_id = 0; + for (auto& elem : tables) { + mutable_cf_options_list.emplace_back(new MutableCFOptions(options)); + uint64_t memtable_id = 0; + for (int i = 0; i != num_tables_per_cf; ++i) { + MemTable* mem = + new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb, + kMaxSequenceNumber, cf_id); + mem->SetID(memtable_id++); + mem->Ref(); + + std::string value; + + mem->Add(++seq, kTypeValue, "key1", ToString(i)); + mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN"); + mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value"); + mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM"); + mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), ""); + + elem.push_back(mem); + } + cf_ids.push_back(cf_id++); + } + + std::vector> flush_candidates(num_cfs); + + // Nothing to flush + for (int i = 0; i != num_cfs; ++i) { + auto list = lists[i]; + ASSERT_FALSE(list->IsFlushPending()); + ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire)); + list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]); + ASSERT_EQ(0, static_cast(flush_candidates[i].size())); + } + + // Request flush even though there is nothing to flush + for (int i = 0; i != num_cfs; ++i) { + auto list = lists[i]; + list->FlushRequested(); + ASSERT_FALSE(list->IsFlushPending()); + ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire)); + } + + // Add tables to column families + for (int i = 0; i != num_cfs; ++i) { + for (int j = 0; j != num_tables_per_cf; ++j) { + lists[i]->Add(tables[i][j], &to_delete); + } + ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed()); + ASSERT_TRUE(lists[i]->IsFlushPending()); + ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire)); + } + + autovector*> to_flush; + std::vector prev_memtable_ids; + // For each column family, determine the memtables to flush + for (int k = 0; k != 4; ++k) { + std::vector flush_memtable_ids; + if (0 == k) { + // +----+ + // list[0]: |0 1| 2 3 4 + // list[1]: |0 1| 2 3 4 + // | +--+ + // list[2]: |0| 1 2 3 4 + // +-+ + flush_memtable_ids = {1, 1, 0}; + } else if (1 == k) { + // +----+ +---+ + // list[0]: |0 1| |2 3| 4 + // list[1]: |0 1| |2 3| 4 + // | +--+ +---+ + // list[2]: |0| 1 2 3 4 + // +-+ + flush_memtable_ids = {3, 3, 0}; + } else if (2 == k) { + // +-----+ +---+ + // list[0]: |0 1| |2 3| 4 + // list[1]: |0 1| |2 3| 4 + // | +---+ +---+ + // | | +-------+ + // list[2]: |0| |1 2 3| 4 + // +-+ +-------+ + flush_memtable_ids = {3, 3, 3}; + } else { + // +-----+ +---+ +-+ + // list[0]: |0 1| |2 3| |4| + // list[1]: |0 1| |2 3| |4| + // | +---+ +---+ | | + // | | +-------+ | | + // list[2]: |0| |1 2 3| |4| + // +-+ +-------+ +-+ + flush_memtable_ids = {4, 4, 4}; + } + assert(num_cfs == static_cast(flush_memtable_ids.size())); + + // Pick memtables to flush + for (int i = 0; i != num_cfs; ++i) { + flush_candidates[i].clear(); + lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i], + &flush_candidates[i]); + for (auto mem : flush_candidates[i]) { + mem->TEST_AtomicFlushSequenceNumber() = SequenceNumber(k); + } + if (prev_memtable_ids.empty()) { + ASSERT_EQ(flush_memtable_ids[i] - 0 + 1, flush_candidates[i].size()); + } else { + ASSERT_EQ(flush_memtable_ids[i] - prev_memtable_ids[i], + flush_candidates[i].size()); + } + ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed()); + ASSERT_FALSE(lists[i]->HasFlushRequested()); + if (flush_memtable_ids[i] == num_tables_per_cf - 1) { + ASSERT_FALSE( + lists[i]->imm_flush_needed.load(std::memory_order_acquire)); + } else { + ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire)); + } + } + prev_memtable_ids = flush_memtable_ids; + + if (k < 3) { + for (const auto& mems : flush_candidates) { + uint64_t file_num = file_number.fetch_add(1); + for (auto m : mems) { + m->TEST_SetFlushCompleted(true); + m->TEST_SetFileNumber(file_num); + } + } + } + + if (k == 0) { + // Rollback first pick of tables + for (int i = 0; i != num_cfs; ++i) { + auto list = lists[i]; + const auto& mems = flush_candidates[i]; + for (auto m : mems) { + m->TEST_SetFileNumber(0); + } + list->RollbackMemtableFlush(flush_candidates[i], 0); + ASSERT_TRUE(list->IsFlushPending()); + ASSERT_TRUE(list->imm_flush_needed.load(std::memory_order_acquire)); + } + prev_memtable_ids.clear(); + } + + if (k == 3) { + for (int i = 0; i != num_cfs; ++i) { + to_flush.emplace_back(&flush_candidates[i]); + } + } + } + + s = Mock_InstallMemtableFlushResults(lists, cf_ids, mutable_cf_options_list, + to_flush, &to_delete); + ASSERT_OK(s); + + to_delete.clear(); + for (auto list : lists) { + list->current()->Unref(&to_delete); + delete list; + } + for (auto& mutable_cf_options : mutable_cf_options_list) { + if (mutable_cf_options != nullptr) { + delete mutable_cf_options; + mutable_cf_options = nullptr; + } + } + // All memtables in tables array must have been flushed, thus ready to be + // deleted. + ASSERT_EQ(to_delete.size(), tables.size() * tables.front().size()); + for (const auto& m : to_delete) { + // Refcount should be 0 after calling InstallMemtableFlushResults. + // Verify this by Ref'ing and then Unref'ing. + m->Ref(); + ASSERT_EQ(m, m->Unref()); + delete m; + } + to_delete.clear(); +} + +TEST_F(MemTableListTest, HasOlderAtomicFlush) { + const size_t num_cfs = 3; + const size_t num_memtables_per_cf = 2; + SequenceNumber seq = 1; + Status s; + + auto factory = std::make_shared(); + options.memtable_factory = factory; + ImmutableCFOptions ioptions(options); + InternalKeyComparator cmp(BytewiseComparator()); + WriteBufferManager wb(options.db_write_buffer_size); + autovector to_delete; + + // Create MemTableLists + int min_write_buffer_number_to_merge = 3; + int max_write_buffer_number_to_maintain = 7; + autovector lists; + for (size_t i = 0; i != num_cfs; ++i) { + lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge, + max_write_buffer_number_to_maintain)); + } + + autovector cf_ids; + std::vector> tables; + autovector mutable_cf_options_list; + uint32_t cf_id = 0; + for (size_t k = 0; k != num_cfs; ++k) { + std::vector elem; + mutable_cf_options_list.emplace_back(new MutableCFOptions(options)); + uint64_t memtable_id = 0; + for (int i = 0; i != num_memtables_per_cf; ++i) { + MemTable* mem = + new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb, + kMaxSequenceNumber, cf_id); + mem->SetID(memtable_id++); + mem->Ref(); + + std::string value; + + mem->Add(++seq, kTypeValue, "key1", ToString(i)); + mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN"); + mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value"); + mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM"); + mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), ""); + + elem.push_back(mem); + } + tables.emplace_back(elem); + cf_ids.push_back(cf_id++); + } + + // Add tables to column families' immutable memtable lists + for (size_t i = 0; i != num_cfs; ++i) { + for (size_t j = 0; j != num_memtables_per_cf; ++j) { + lists[i]->Add(tables[i][j], &to_delete); + } + lists[i]->FlushRequested(); + ASSERT_EQ(num_memtables_per_cf, lists[i]->NumNotFlushed()); + ASSERT_TRUE(lists[i]->IsFlushPending()); + ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire)); + } + std::vector> flush_candidates(num_cfs); + for (size_t i = 0; i != num_cfs; ++i) { + lists[i]->PickMemtablesToFlush(nullptr, &flush_candidates[i]); + for (auto m : flush_candidates[i]) { + m->TEST_AtomicFlushSequenceNumber() = 123; + } + lists[i]->RollbackMemtableFlush(flush_candidates[i], 0); + } + uint64_t memtable_id = num_memtables_per_cf - 1; + autovector other_flush_candidates; + lists[0]->PickMemtablesToFlush(&memtable_id, &other_flush_candidates); + for (auto m : other_flush_candidates) { + m->TEST_AtomicFlushSequenceNumber() = 124; + m->TEST_SetFlushCompleted(true); + m->TEST_SetFileNumber(1); + } + autovector*> to_flush; + to_flush.emplace_back(&other_flush_candidates); + bool has_older_unfinished_atomic_flush = false; + bool found_batch_to_commit = false; + + SyncPoint::GetInstance()->SetCallBack( + "MemTableList::TryInstallMemtableFlushResults:" + "HasOlderUnfinishedAtomicFlush:0", + [&](void* /*arg*/) { has_older_unfinished_atomic_flush = true; }); + SyncPoint::GetInstance()->SetCallBack( + "MemTableList::TryInstallMemtableFlushResults:FoundBatchToCommit:0", + [&](void* /*arg*/) { found_batch_to_commit = true; }); + SyncPoint::GetInstance()->EnableProcessing(); + + s = Mock_InstallMemtableFlushResults(lists, cf_ids, mutable_cf_options_list, + to_flush, &to_delete); + ASSERT_OK(s); + ASSERT_TRUE(has_older_unfinished_atomic_flush); + ASSERT_FALSE(found_batch_to_commit); + + SyncPoint::GetInstance()->ClearAllCallBacks(); + + ASSERT_TRUE(to_delete.empty()); + for (auto list : lists) { + list->current()->Unref(&to_delete); + delete list; + } + lists.clear(); + ASSERT_EQ(num_cfs * num_memtables_per_cf, to_delete.size()); + for (auto m : to_delete) { + m->Ref(); + ASSERT_EQ(m, m->Unref()); + delete m; + } + to_delete.clear(); + for (auto& opts : mutable_cf_options_list) { + delete opts; + opts = nullptr; + } + mutable_cf_options_list.clear(); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index bbcbc3af7..146d69d62 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3064,9 +3064,9 @@ Status VersionSet::ProcessManifestWrites( // 'datas' is gramatically incorrect. We still use this notation is to indicate // that this variable represents a collection of column_family_data. Status VersionSet::LogAndApply( - const std::vector& column_family_datas, - const std::vector& mutable_cf_options_list, - const std::vector>& edit_lists, + const autovector& column_family_datas, + const autovector& mutable_cf_options_list, + const autovector>& edit_lists, InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options) { mu->AssertHeld(); @@ -3098,8 +3098,8 @@ Status VersionSet::LogAndApply( assert(static_cast(num_cfds) == edit_lists.size()); } for (int i = 0; i < num_cfds; ++i) { - writers.emplace_back(mu, column_family_datas[i], mutable_cf_options_list[i], - edit_lists[i]); + writers.emplace_back(mu, column_family_datas[i], + *mutable_cf_options_list[i], edit_lists[i]); manifest_writers_.push_back(&writers[i]); } assert(!writers.empty()); diff --git a/db/version_set.h b/db/version_set.h index 4e3b2bec2..4c9405d27 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -752,10 +752,14 @@ class VersionSet { InstrumentedMutex* mu, Directory* db_directory = nullptr, bool new_descriptor_log = false, const ColumnFamilyOptions* column_family_options = nullptr) { - std::vector cfds(1, column_family_data); - std::vector mutable_cf_options_list(1, - mutable_cf_options); - std::vector> edit_lists(1, {edit}); + autovector cfds; + cfds.emplace_back(column_family_data); + autovector mutable_cf_options_list; + mutable_cf_options_list.emplace_back(&mutable_cf_options); + autovector> edit_lists; + autovector edit_list; + edit_list.emplace_back(edit); + edit_lists.emplace_back(edit_list); return LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu, db_directory, new_descriptor_log, column_family_options); } @@ -767,10 +771,12 @@ class VersionSet { const autovector& edit_list, InstrumentedMutex* mu, Directory* db_directory = nullptr, bool new_descriptor_log = false, const ColumnFamilyOptions* column_family_options = nullptr) { - std::vector cfds(1, column_family_data); - std::vector mutable_cf_options_list(1, - mutable_cf_options); - std::vector> edit_lists(1, edit_list); + autovector cfds; + cfds.emplace_back(column_family_data); + autovector mutable_cf_options_list; + mutable_cf_options_list.emplace_back(&mutable_cf_options); + autovector> edit_lists; + edit_lists.emplace_back(edit_list); return LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu, db_directory, new_descriptor_log, column_family_options); } @@ -778,12 +784,13 @@ class VersionSet { // The across-multi-cf batch version. If edit_lists contain more than // 1 version edits, caller must ensure that no edit in the []list is column // family manipulation. - Status LogAndApply(const std::vector& cfds, - const std::vector& mutable_cf_options, - const std::vector>& edit_lists, - InstrumentedMutex* mu, Directory* db_directory = nullptr, - bool new_descriptor_log = false, - const ColumnFamilyOptions* new_cf_options = nullptr); + Status LogAndApply( + const autovector& cfds, + const autovector& mutable_cf_options_list, + const autovector>& edit_lists, + InstrumentedMutex* mu, Directory* db_directory = nullptr, + bool new_descriptor_log = false, + const ColumnFamilyOptions* new_cf_options = nullptr); // Recover the last saved descriptor from persistent storage. // If read_only == true, Recover() will not complain if some column families diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 26fb18dd0..59ab521bf 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -615,13 +615,19 @@ class ManifestWriterTest : public testing::Test { TEST_F(ManifestWriterTest, SameColumnFamilyGroupCommit) { NewDB(); const int kGroupSize = 5; - std::vector edits(kGroupSize); - std::vector cfds(kGroupSize, cfds_[0]); - std::vector all_mutable_cf_options(kGroupSize, - mutable_cf_options_); - std::vector> edit_lists(kGroupSize); + autovector edits; for (int i = 0; i != kGroupSize; ++i) { - edit_lists[i].emplace_back(&edits[i]); + edits.emplace_back(VersionEdit()); + } + autovector cfds; + autovector all_mutable_cf_options; + autovector> edit_lists; + for (int i = 0; i != kGroupSize; ++i) { + cfds.emplace_back(cfds_[0]); + all_mutable_cf_options.emplace_back(&mutable_cf_options_); + autovector edit_list; + edit_list.emplace_back(&edits[i]); + edit_lists.emplace_back(edit_list); } int count = 0;