From 499ebb3ab5ea4207950fc95acf102b8f58add1c5 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Sat, 24 Jun 2017 14:06:43 -0700 Subject: [PATCH] Optimize for serial commits in 2PC Summary: Throughput: 46k tps in our sysbench settings (filling the details later) The idea is to have the simplest change that gives us a reasonable boost in 2PC throughput. Major design changes: 1. The WAL file internal buffer is not flushed after each write. Instead it is flushed before critical operations (WAL copy via fs) or when FlushWAL is called by MySQL. Flushing the WAL buffer is also protected via mutex_. 2. Use two sequence numbers: last seq, and last seq for write. Last seq is the last visible sequence number for reads. Last seq for write is the next sequence number that should be used to write to WAL/memtable. This allows to have a memtable write be in parallel to WAL writes. 3. BatchGroup is not used for writes. This means that we can have parallel writers which changes a major assumption in the code base. To accommodate for that i) allow only 1 WriteImpl that intends to write to memtable via mem_mutex_--which is fine since in 2PC almost all of the memtable writes come via group commit phase which is serial anyway, ii) make all the parts in the code base that assumed to be the only writer (via EnterUnbatched) to also acquire mem_mutex_, iii) stat updates are protected via a stat_mutex_. Note: the first commit has the approach figured out but is not clean. Submitting the PR anyway to get the early feedback on the approach. If we are ok with the approach I will go ahead with this updates: 0) Rebase with Yi's pipelining changes 1) Currently batching is disabled by default to make sure that it will be consistent with all unit tests. Will make this optional via a config. 2) A couple of unit tests are disabled. They need to be updated with the serial commit of 2PC taken into account. 3) Replacing BatchGroup with mem_mutex_ got a bit ugly as it requires releasing mutex_ beforehand (the same way EnterUnbatched does). This needs to be cleaned up. Closes https://github.com/facebook/rocksdb/pull/2345 Differential Revision: D5210732 Pulled By: maysamyabandeh fbshipit-source-id: 78653bd95a35cd1e831e555e0e57bdfd695355a4 --- db/column_family_test.cc | 3 + db/compaction_job_test.cc | 1 + db/db_impl.cc | 36 ++- db/db_impl.h | 45 ++- db/db_impl_files.cc | 5 +- db/db_impl_open.cc | 1 + db/db_impl_readonly.cc | 1 - db/db_impl_write.cc | 261 +++++++++++++++--- db/db_log_iter_test.cc | 4 + db/db_test_util.cc | 6 + db/db_test_util.h | 15 +- db/db_wal_test.cc | 9 +- db/db_write_test.cc | 1 + db/external_sst_file_ingestion_job.cc | 5 + db/fault_injection_test.cc | 3 +- db/log_writer.cc | 16 +- db/log_writer.h | 10 +- db/repair.cc | 1 + db/version_set.cc | 3 + db/version_set.h | 22 ++ db/wal_manager_test.cc | 1 + db/write_callback_test.cc | 304 +++++++++++---------- include/rocksdb/db.h | 5 + include/rocksdb/options.h | 12 + include/rocksdb/utilities/stackable_db.h | 2 + options/db_options.cc | 8 +- options/db_options.h | 2 + options/options_helper.h | 10 +- options/options_settable_test.cc | 15 +- table/block_based_table_factory.cc | 1 - util/murmurhash.cc | 4 +- util/murmurhash.h | 4 +- util/transaction_test_util.cc | 8 + util/transaction_test_util.h | 2 + utilities/backupable/backupable_db_test.cc | 3 + utilities/checkpoint/checkpoint_impl.cc | 1 + utilities/transactions/transaction_impl.cc | 2 + utilities/transactions/transaction_test.cc | 30 +- 38 files changed, 620 insertions(+), 242 deletions(-) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index dc2f99499..7621f3974 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -303,6 +303,7 @@ class ColumnFamilyTest : public testing::Test { ASSERT_OK(Put(cf, key, RandomString(&rnd_, key_value_size - 10))); } } + db_->FlushWAL(false); } #ifndef ROCKSDB_LITE // TEST functions in DB are not supported in lite @@ -580,6 +581,7 @@ TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest) { Flush(0); ASSERT_OK(Put(1, "bar", "v3")); // seqID 4 ASSERT_OK(Put(1, "foo", "v4")); // seqID 5 + db_->FlushWAL(false); // Preserve file system state up to here to simulate a crash condition. fault_env->SetFilesystemActive(false); @@ -642,6 +644,7 @@ TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest2) { // Write to log file D ASSERT_OK(Put(1, "bar", "v4")); // seqID 7 ASSERT_OK(Put(1, "bar", "v5")); // seqID 8 + db_->FlushWAL(false); // Preserve file system state up to here to simulate a crash condition. fault_env->SetFilesystemActive(false); std::vector names; diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 05ad80df6..967ac72b5 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -144,6 +144,7 @@ class CompactionJobTest : public testing::Test { } void SetLastSequence(const SequenceNumber sequence_number) { + versions_->SetLastToBeWrittenSequence(sequence_number + 1); versions_->SetLastSequence(sequence_number + 1); } diff --git a/db/db_impl.cc b/db/db_impl.cc index afec15935..06b5e09ef 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -160,6 +160,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) is_snapshot_supported_(true), write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), write_thread_(immutable_db_options_), + nonmem_write_thread_(immutable_db_options_), write_controller_(mutable_db_options_.delayed_write_rate), // Use delayed_write_rate as a base line to determine the initial // low pri write rate limit. It may be adjusted later. @@ -189,7 +190,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) bg_work_paused_(0), bg_compaction_paused_(0), refitting_level_(false), - opened_successfully_(false) { + opened_successfully_(false), + concurrent_prepare_(options.concurrent_prepare), + manual_wal_flush_(options.manual_wal_flush) { env_->GetAbsolutePath(dbname, &db_absolute_path_); // Reserve ten files or so for other uses and give the rest to TableCache. @@ -612,6 +615,26 @@ int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, return minimum_level; } +Status DBImpl::FlushWAL(bool sync) { + { + // We need to lock log_write_mutex_ since logs_ might change concurrently + InstrumentedMutexLock wl(&log_write_mutex_); + log::Writer* cur_log_writer = logs_.back().writer; + auto s = cur_log_writer->WriteBuffer(); + if (!s.ok()) { + ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s", + s.ToString().c_str()); + } + if (!sync) { + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false"); + return s; + } + } + // sync = true + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true"); + return SyncWAL(); +} + Status DBImpl::SyncWAL() { autovector logs_to_sync; bool need_log_dir_sync; @@ -650,6 +673,7 @@ Status DBImpl::SyncWAL() { need_log_dir_sync = !log_dir_synced_; } + TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); RecordTick(stats_, WAL_FILE_SYNCED); Status status; for (log::Writer* log : logs_to_sync) { @@ -661,6 +685,7 @@ Status DBImpl::SyncWAL() { if (status.ok() && need_log_dir_sync) { status = directories_.GetWalDir()->Fsync(); } + TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); { @@ -2634,9 +2659,13 @@ Status DBImpl::IngestExternalFile( InstrumentedMutexLock l(&mutex_); TEST_SYNC_POINT("DBImpl::AddFile:MutexLock"); - // Stop writes to the DB + // Stop writes to the DB by entering both write threads WriteThread::Writer w; write_thread_.EnterUnbatched(&w, &mutex_); + WriteThread::Writer nonmem_w; + if (concurrent_prepare_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } num_running_ingest_file_++; @@ -2677,6 +2706,9 @@ Status DBImpl::IngestExternalFile( } // Resume writes to the DB + if (concurrent_prepare_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } write_thread_.ExitUnbatched(&w); // Update stats diff --git a/db/db_impl.h b/db/db_impl.h index 03aac360f..285a7c861 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -199,6 +199,7 @@ class DBImpl : public DB { using DB::Flush; virtual Status Flush(const FlushOptions& options, ColumnFamilyHandle* column_family) override; + virtual Status FlushWAL(bool sync) override; virtual Status SyncWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; @@ -621,6 +622,10 @@ class DBImpl : public DB { uint64_t* log_used = nullptr, uint64_t log_ref = 0, bool disable_memtable = false); + Status WriteImplWALOnly(const WriteOptions& options, WriteBatch* updates, + WriteCallback* callback = nullptr, + uint64_t* log_used = nullptr, uint64_t log_ref = 0); + uint64_t FindMinLogContainingOutstandingPrep(); uint64_t FindMinPrepLogReferencedByMemTable(); @@ -746,9 +751,20 @@ class DBImpl : public DB { Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync, WriteContext* write_context); + WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group, + WriteBatch* tmp_batch, size_t* write_with_wal); + + Status WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, + uint64_t* log_used, uint64_t* log_size); + Status WriteToWAL(const WriteThread::WriteGroup& write_group, - log::Writer* log_writer, bool need_log_sync, - bool need_log_dir_sync, SequenceNumber sequence); + log::Writer* log_writer, uint64_t* log_used, + bool need_log_sync, bool need_log_dir_sync, + SequenceNumber sequence); + + Status ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, + uint64_t* log_used, SequenceNumber* last_sequence, + int total_count); // Used by WriteImpl to update bg_error_ if paranoid check is enabled. void WriteCallbackStatusCheck(const Status& status); @@ -827,10 +843,12 @@ class DBImpl : public DB { // Lock over the persistent DB state. Non-nullptr iff successfully acquired. FileLock* db_lock_; - // The mutex for options file related operations. - // NOTE: should never acquire options_file_mutex_ and mutex_ at the - // same time. - InstrumentedMutex options_files_mutex_; + // It is used to concurrently update stats in the write threads + InstrumentedMutex stat_mutex_; + // It protects the back() of logs_ and alive_log_files_. Any push_back to + // these must be under log_write_mutex_ and any access that requires the + // back() to remain the same must also lock log_write_mutex_. + InstrumentedMutex log_write_mutex_; // State below is protected by mutex_ mutable InstrumentedMutex mutex_; @@ -891,6 +909,10 @@ class DBImpl : public DB { // - back() and items with getting_synced=true are not popped, // - it follows that write thread with unlocked mutex_ can safely access // back() and items with getting_synced=true. + // -- Update: apparently this was a mistake. back() should be called under + // mute_: https://github.com/facebook/rocksdb/pull/1774 + // - When concurrent write threads is enabled, back(), push_back(), and + // pop_front() must be called within log_write_mutex_ std::deque logs_; // Signaled when getting_synced becomes false for some of the logs_. InstrumentedCondVar log_sync_cv_; @@ -939,8 +961,10 @@ class DBImpl : public DB { WriteBufferManager* write_buffer_manager_; WriteThread write_thread_; - WriteBatch tmp_batch_; + // The write thread when the writers have no memtable write. This will be used + // in 2PC to batch the prepares separately from the serial commit. + WriteThread nonmem_write_thread_; WriteController write_controller_; @@ -948,6 +972,8 @@ class DBImpl : public DB { // Size of the last batch group. In slowdown mode, next write needs to // sleep if it uses up the quota. + // Note: This is to protect memtable and compaction. If the batch only writes + // to the WAL its size need not to be included in this. uint64_t last_batch_group_size_; FlushScheduler flush_scheduler_; @@ -1190,6 +1216,11 @@ class DBImpl : public DB { bool MCOverlap(ManualCompaction* m, ManualCompaction* m1); size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; + + // When set, we use a seprate queue for writes that dont write to memtable. In + // 2PC these are the writes at Prepare phase. + const bool concurrent_prepare_; + const bool manual_wal_flush_; }; extern Options SanitizeOptions(const std::string& db, diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 607b8ff08..d0a22321a 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -265,7 +265,10 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, continue; } logs_to_free_.push_back(log.ReleaseWriter()); - logs_.pop_front(); + { + InstrumentedMutexLock wl(&log_write_mutex_); + logs_.pop_front(); + } } // Current log cannot be obsolete. assert(!logs_.empty()); diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 995b329bf..2baa7e070 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -725,6 +725,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, auto last_sequence = *next_sequence - 1; if ((*next_sequence != kMaxSequenceNumber) && (versions_->LastSequence() <= last_sequence)) { + versions_->SetLastToBeWrittenSequence(last_sequence); versions_->SetLastSequence(last_sequence); } } diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 7039ba29b..cc209a912 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -5,7 +5,6 @@ // This source code is also licensed under the GPLv2 license found in the // COPYING file in the root directory of this source tree. - #include "db/db_impl_readonly.h" #include "db/compacted_db_impl.h" diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 058f8ef28..fea326ca0 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -66,6 +66,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); } + if (concurrent_prepare_ && immutable_db_options_.enable_pipelined_write) { + return Status::NotSupported( + "pipelined_writes is not compatible with concurrent prepares"); + } Status status; if (write_options.low_pri) { @@ -75,6 +79,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } } + if (concurrent_prepare_ && disable_memtable) { + return WriteImplWALOnly(write_options, my_batch, callback, log_used, + log_ref); + } + if (immutable_db_options_.enable_pipelined_write) { return PipelinedWriteImpl(write_options, my_batch, callback, log_used, log_ref, disable_memtable); @@ -133,14 +142,22 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteContext write_context; WriteThread::WriteGroup write_group; bool in_parallel_group = false; - uint64_t last_sequence = versions_->LastSequence(); + uint64_t last_sequence; + if (!concurrent_prepare_) { + last_sequence = versions_->LastSequence(); + } mutex_.Lock(); bool need_log_sync = !write_options.disableWAL && write_options.sync; bool need_log_dir_sync = need_log_sync && !log_dir_synced_; - status = PreprocessWrite(write_options, &need_log_sync, &write_context); - log::Writer* cur_log_writer = logs_.back().writer; + if (!concurrent_prepare_ || !disable_memtable) { + // With concurrent writes we do preprocess only in the write thread that + // also does write to memtable to avoid sync issue on shared data structure + // with the other thread + status = PreprocessWrite(write_options, &need_log_sync, &write_context); + } + log::Writer* log_writer = logs_.back().writer; mutex_.Unlock(); @@ -180,9 +197,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } } - const SequenceNumber current_sequence = last_sequence + 1; - last_sequence += total_count; - + if (concurrent_prepare_) { + stat_mutex_.Lock(); + } // Update stats while we are an exclusive group leader, so we know // that nobody else can be writing to these particular stats. // We're optimistic, updating the stats before we successfully @@ -201,6 +218,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); } MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); + if (concurrent_prepare_) { + stat_mutex_.Unlock(); + } if (write_options.disableWAL) { has_unpersisted_data_.store(true, std::memory_order_relaxed); @@ -208,14 +228,27 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_STOP(write_pre_and_post_process_time); - if (status.ok() && !write_options.disableWAL) { - PERF_TIMER_GUARD(write_wal_time); - status = WriteToWAL(write_group, cur_log_writer, need_log_sync, - need_log_dir_sync, current_sequence); - if (log_used != nullptr) { - *log_used = logfile_number_; + if (!concurrent_prepare_) { + if (status.ok() && !write_options.disableWAL) { + PERF_TIMER_GUARD(write_wal_time); + status = WriteToWAL(write_group, log_writer, log_used, need_log_sync, + need_log_dir_sync, last_sequence + 1); + } + } else { + assert(!need_log_sync && !need_log_dir_sync); + if (status.ok() && !write_options.disableWAL) { + PERF_TIMER_GUARD(write_wal_time); + // LastToBeWrittenSequence is increased inside WriteToWAL under + // wal_write_mutex_ to ensure ordered events in WAL + status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, + total_count); + } else { + // Otherwise we inc seq number for memtable writes + last_sequence = versions_->FetchAddLastToBeWrittenSequence(total_count); } } + const SequenceNumber current_sequence = last_sequence + 1; + last_sequence += total_count; if (status.ok()) { PERF_TIMER_GUARD(write_memtable_time); @@ -263,6 +296,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, mutex_.Lock(); MarkLogsSynced(logfile_number_, need_log_dir_sync, status); mutex_.Unlock(); + // Requesting sync with concurrent_prepare_ is expected to be very rare. We + // hance provide a simple implementation that is not necessarily efficient. + if (concurrent_prepare_) { + if (manual_wal_flush_) { + status = FlushWAL(true); + } else { + status = SyncWAL(); + } + } } bool should_exit_batch_group = true; @@ -272,7 +314,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w); } if (should_exit_batch_group) { - versions_->SetLastSequence(last_sequence); + if (status.ok()) { + versions_->SetLastSequence(last_sequence); + } MemTableInsertStatusCheck(w.status); write_thread_.ExitAsBatchGroupLeader(write_group, w.status); } @@ -304,7 +348,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, bool need_log_sync = !write_options.disableWAL && write_options.sync; bool need_log_dir_sync = need_log_sync && !log_dir_synced_; w.status = PreprocessWrite(write_options, &need_log_sync, &write_context); - log::Writer* cur_log_writer = logs_.back().writer; + log::Writer* log_writer = logs_.back().writer; mutex_.Unlock(); // This can set non-OK status if callback fail. @@ -352,8 +396,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, wal_write_group.size - 1); RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); } - w.status = WriteToWAL(wal_write_group, cur_log_writer, need_log_sync, - need_log_dir_sync, current_sequence); + w.status = WriteToWAL(wal_write_group, log_writer, log_used, + need_log_sync, need_log_dir_sync, current_sequence); } if (!w.CallbackFailed()) { @@ -403,11 +447,91 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, } assert(w.state == WriteThread::STATE_COMPLETED); - if (log_used != nullptr) { - *log_used = w.log_used; + return w.FinalStatus(); +} + +Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, + WriteBatch* my_batch, WriteCallback* callback, + uint64_t* log_used, uint64_t log_ref) { + Status status; + PERF_TIMER_GUARD(write_pre_and_post_process_time); + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + true /* disable_memtable */); + if (write_options.disableWAL) { + return status; } + RecordTick(stats_, WRITE_WITH_WAL); + StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); - return w.FinalStatus(); + nonmem_write_thread_.JoinBatchGroup(&w); + assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER); + if (w.state == WriteThread::STATE_COMPLETED) { + if (log_used != nullptr) { + *log_used = w.log_used; + } + return w.FinalStatus(); + } + // else we are the leader of the write batch group + assert(w.state == WriteThread::STATE_GROUP_LEADER); + WriteContext write_context; + WriteThread::WriteGroup write_group; + uint64_t last_sequence; + nonmem_write_thread_.EnterAsBatchGroupLeader(&w, &write_group); + // Note: no need to update last_batch_group_size_ here since the batch writes + // to WAL only + + uint64_t total_byte_size = 0; + for (auto* writer : write_group) { + if (writer->CheckCallback(this)) { + total_byte_size = WriteBatchInternal::AppendedByteSize( + total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); + } + } + + stat_mutex_.Lock(); + // Update stats while we are an exclusive group leader, so we know + // that nobody else can be writing to these particular stats. + // We're optimistic, updating the stats before we successfully + // commit. That lets us release our leader status early. + auto stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size); + RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); + RecordTick(stats_, WRITE_DONE_BY_SELF); + auto write_done_by_other = write_group.size - 1; + if (write_done_by_other > 0) { + stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other); + RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); + } + MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); + stat_mutex_.Unlock(); + + PERF_TIMER_STOP(write_pre_and_post_process_time); + + PERF_TIMER_GUARD(write_wal_time); + // LastToBeWrittenSequence is increased inside WriteToWAL under + // wal_write_mutex_ to ensure ordered events in WAL + status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, + 0 /*total_count*/); + if (status.ok() && write_options.sync) { + // Requesting sync with concurrent_prepare_ is expected to be very rare. We + // hance provide a simple implementation that is not necessarily efficient. + if (manual_wal_flush_) { + status = FlushWAL(true); + } else { + status = SyncWAL(); + } + } + PERF_TIMER_START(write_pre_and_post_process_time); + + if (!w.CallbackFailed()) { + ParanoidCheck(status); + } + nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, w.status); + if (status.ok()) { + status = w.FinalStatus(); + } + return status; } void DBImpl::WriteCallbackStatusCheck(const Status& status) { @@ -519,13 +643,12 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, return status; } -Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, - log::Writer* log_writer, bool need_log_sync, - bool need_log_dir_sync, SequenceNumber sequence) { - Status status; - +WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, + WriteBatch* tmp_batch, size_t* write_with_wal) { + assert(write_with_wal != nullptr); + assert(tmp_batch != nullptr); WriteBatch* merged_batch = nullptr; - size_t write_with_wal = 0; + *write_with_wal = 0; auto* leader = write_group.leader; if (write_group.size == 1 && leader->ShouldWriteToWAL() && leader->batch->GetWalTerminationPoint().is_cleared()) { @@ -534,30 +657,54 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, // and the batch is not wanting to be truncated merged_batch = leader->batch; leader->log_used = logfile_number_; - write_with_wal = 1; + *write_with_wal = 1; } else { // WAL needs all of the batches flattened into a single batch. // We could avoid copying here with an iov-like AddRecord // interface - merged_batch = &tmp_batch_; + merged_batch = tmp_batch; for (auto writer : write_group) { if (writer->ShouldWriteToWAL()) { WriteBatchInternal::Append(merged_batch, writer->batch, /*WAL_only*/ true); - write_with_wal++; + (*write_with_wal)++; } writer->log_used = logfile_number_; } } + return merged_batch; +} - WriteBatchInternal::SetSequence(merged_batch, sequence); - - Slice log_entry = WriteBatchInternal::Contents(merged_batch); - status = log_writer->AddRecord(log_entry); +Status DBImpl::WriteToWAL(const WriteBatch& merged_batch, + log::Writer* log_writer, uint64_t* log_used, + uint64_t* log_size) { + assert(log_size != nullptr); + Slice log_entry = WriteBatchInternal::Contents(&merged_batch); + *log_size = log_entry.size(); + Status status = log_writer->AddRecord(log_entry); + if (log_used != nullptr) { + *log_used = logfile_number_; + } total_log_size_ += log_entry.size(); alive_log_files_.back().AddSize(log_entry.size()); log_empty_ = false; - uint64_t log_size = log_entry.size(); + return status; +} + +Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, + log::Writer* log_writer, uint64_t* log_used, + bool need_log_sync, bool need_log_dir_sync, + SequenceNumber sequence) { + Status status; + + size_t write_with_wal = 0; + WriteBatch* merged_batch = + MergeBatch(write_group, &tmp_batch_, &write_with_wal); + + WriteBatchInternal::SetSequence(merged_batch, sequence); + + uint64_t log_size; + status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); if (status.ok() && need_log_sync) { StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); @@ -599,6 +746,41 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, return status; } +Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, + uint64_t* log_used, + SequenceNumber* last_sequence, + int total_count) { + Status status; + + WriteBatch tmp_batch; + size_t write_with_wal = 0; + WriteBatch* merged_batch = + MergeBatch(write_group, &tmp_batch, &write_with_wal); + + // We need to lock log_write_mutex_ since logs_ and alive_log_files might be + // pushed back concurrently + log_write_mutex_.Lock(); + *last_sequence = versions_->FetchAddLastToBeWrittenSequence(total_count); + auto sequence = *last_sequence + 1; + WriteBatchInternal::SetSequence(merged_batch, sequence); + + log::Writer* log_writer = logs_.back().writer; + uint64_t log_size; + status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); + log_write_mutex_.Unlock(); + + if (status.ok()) { + stat_mutex_.Lock(); + auto stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size); + RecordTick(stats_, WAL_FILE_BYTES, log_size); + stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal); + RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); + stat_mutex_.Unlock(); + } + return status; +} + Status DBImpl::HandleWALFull(WriteContext* write_context) { mutex_.AssertHeld(); assert(write_context != nullptr); @@ -895,9 +1077,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { lfile->SetPreallocationBlockSize(preallocate_block_size); unique_ptr file_writer( new WritableFileWriter(std::move(lfile), opt_env_opt)); - new_log = - new log::Writer(std::move(file_writer), new_log_number, - immutable_db_options_.recycle_log_file_num > 0); + new_log = new log::Writer( + std::move(file_writer), new_log_number, + immutable_db_options_.recycle_log_file_num > 0, manual_wal_flush_); } } @@ -931,8 +1113,15 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { assert(new_log != nullptr); log_empty_ = true; log_dir_synced_ = false; + log_write_mutex_.Lock(); + if (!logs_.empty()) { + // Alway flush the buffer of the last log before switching to a new one + log::Writer* cur_log_writer = logs_.back().writer; + cur_log_writer->WriteBuffer(); + } logs_.emplace_back(logfile_number_, new_log); alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); + log_write_mutex_.Unlock(); } for (auto loop_cfd : *versions_->GetColumnFamilySet()) { // all this is just optimization to delete logs that diff --git a/db/db_log_iter_test.cc b/db/db_log_iter_test.cc index 2f56348d2..84c8776a7 100644 --- a/db/db_log_iter_test.cc +++ b/db/db_log_iter_test.cc @@ -118,6 +118,7 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) { dbfull()->Flush(FlushOptions()); Put("key4", DummyString(1024)); ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U); + dbfull()->FlushWAL(false); { auto iter = OpenTransactionLogIter(0); @@ -134,6 +135,7 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) { // "key5" would be written in a new memtable and log Put("key5", DummyString(1024)); + dbfull()->FlushWAL(false); { // this iter would miss "key4" if not fixed auto iter = OpenTransactionLogIter(0); @@ -183,6 +185,7 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) { Put("key"+ToString(i), DummyString(10)); } dbfull()->Flush(FlushOptions()); + dbfull()->FlushWAL(false); // Corrupt this log to create a gap rocksdb::VectorLogPtr wal_files; ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files)); @@ -196,6 +199,7 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) { // Insert a new entry to a new log file Put("key1025", DummyString(10)); + dbfull()->FlushWAL(false); // Try to read from the beginning. Should stop before the gap and read less // than 1025 entries auto iter = OpenTransactionLogIter(0); diff --git a/db/db_test_util.cc b/db/db_test_util.cc index d932b5542..e199ac526 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -469,6 +469,12 @@ Options DBTestBase::GetOptions( options.enable_pipelined_write = true; break; } + case kConcurrentWALWrites: { + // This options optimize 2PC commit path + options.concurrent_prepare = true; + options.manual_wal_flush = true; + break; + } default: break; diff --git a/db/db_test_util.h b/db/db_test_util.h index 372baed57..755b6929f 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -641,13 +641,14 @@ class DBTestBase : public testing::Test { kRecycleLogFiles = 28, kConcurrentSkipList = 29, kPipelinedWrite = 30, - kEnd = 31, - kDirectIO = 32, - kLevelSubcompactions = 33, - kUniversalSubcompactions = 34, - kBlockBasedTableWithIndexRestartInterval = 35, - kBlockBasedTableWithPartitionedIndex = 36, - kPartitionedFilterWithNewTableReaderForCompactions = 37, + kConcurrentWALWrites = 31, + kEnd = 32, + kDirectIO = 33, + kLevelSubcompactions = 34, + kUniversalSubcompactions = 35, + kBlockBasedTableWithIndexRestartInterval = 36, + kBlockBasedTableWithPartitionedIndex = 37, + kPartitionedFilterWithNewTableReaderForCompactions = 38, }; public: diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 443c99453..bb211cb34 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -122,9 +122,11 @@ TEST_F(DBWALTest, SyncWALNotWaitWrite) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::port::Thread thread([&]() { ASSERT_OK(Put("foo2", "bar2")); }); - TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); + // Moving this to SyncWAL before the actual fsync + // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); ASSERT_OK(db_->SyncWAL()); - TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); + // Moving this to SyncWAL after actual fsync + // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); thread.join(); @@ -660,6 +662,7 @@ TEST_F(DBWALTest, PartOfWritesWithWALDisabled) { ASSERT_OK(Flush(0)); ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5 ASSERT_EQ("v5", Get(0, "key")); + dbfull()->FlushWAL(false); // Simulate a crash. fault_env->SetFilesystemActive(false); Close(); @@ -729,6 +732,7 @@ class RecoveryTestHelper { batch.Put(key, value); WriteBatchInternal::SetSequence(&batch, seq); current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch)); + versions->SetLastToBeWrittenSequence(seq); versions->SetLastSequence(seq); } } @@ -1113,6 +1117,7 @@ TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) { ASSERT_EQ(3, countWalFiles()); Flush(1); ASSERT_OK(Put(2, "key7", kLargeValue)); + dbfull()->FlushWAL(false); ASSERT_EQ(4, countWalFiles()); // Reopen twice and validate. diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 66ce4668d..424709900 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -71,6 +71,7 @@ TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) { INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, testing::Values(DBTestBase::kDefault, + DBTestBase::kConcurrentWALWrites, DBTestBase::kPipelinedWrite)); } // namespace rocksdb diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 081eacc00..2c94d07d3 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -146,6 +146,8 @@ Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed) { return status; } +// REQUIRES: we have become the only writer by entering both write_thread_ and +// nonmem_write_thread_ Status ExternalSstFileIngestionJob::Run() { Status status; #ifndef NDEBUG @@ -164,6 +166,8 @@ Status ExternalSstFileIngestionJob::Run() { // if the dont overlap with any ranges since we have snapshots force_global_seqno = true; } + // It is safe to use this instead of LastToBeWrittenSequence since we are + // the only active writer, and hence they are equal const SequenceNumber last_seqno = versions_->LastSequence(); SuperVersion* super_version = cfd_->GetSuperVersion(); edit_.SetColumnFamily(cfd_->GetID()); @@ -197,6 +201,7 @@ Status ExternalSstFileIngestionJob::Run() { } if (consumed_seqno) { + versions_->SetLastToBeWrittenSequence(last_seqno + 1); versions_->SetLastSequence(last_seqno + 1); } diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index 98b9b9b97..5aa1bfdc2 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -412,6 +412,7 @@ TEST_P(FaultInjectionTest, WriteOptionSyncTest) { write_options.sync = true; ASSERT_OK( db_->Put(write_options, Key(2, &key_space), Value(2, &value_space))); + db_->FlushWAL(false); env_->SetFilesystemActive(false); NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); @@ -496,7 +497,7 @@ TEST_P(FaultInjectionTest, ManualLogSyncTest) { ASSERT_OK(db_->Flush(flush_options)); ASSERT_OK( db_->Put(write_options, Key(2, &key_space), Value(2, &value_space))); - ASSERT_OK(db_->SyncWAL()); + ASSERT_OK(db_->FlushWAL(true)); env_->SetFilesystemActive(false); NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); diff --git a/db/log_writer.cc b/db/log_writer.cc index 4081a3b13..36480c403 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -20,20 +20,22 @@ namespace rocksdb { namespace log { -Writer::Writer(unique_ptr&& dest, - uint64_t log_number, bool recycle_log_files) +Writer::Writer(unique_ptr&& dest, uint64_t log_number, + bool recycle_log_files, bool manual_flush) : dest_(std::move(dest)), block_offset_(0), log_number_(log_number), - recycle_log_files_(recycle_log_files) { + recycle_log_files_(recycle_log_files), + manual_flush_(manual_flush) { for (int i = 0; i <= kMaxRecordType; i++) { char t = static_cast(i); type_crc_[i] = crc32c::Value(&t, 1); } } -Writer::~Writer() { -} +Writer::~Writer() { WriteBuffer(); } + +Status Writer::WriteBuffer() { return dest_->Flush(); } Status Writer::AddRecord(const Slice& slice) { const char* ptr = slice.data(); @@ -129,7 +131,9 @@ Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { if (s.ok()) { s = dest_->Append(Slice(ptr, n)); if (s.ok()) { - s = dest_->Flush(); + if (!manual_flush_) { + s = dest_->Flush(); + } } } block_offset_ += header_size + n; diff --git a/db/log_writer.h b/db/log_writer.h index c88234402..c6cb12233 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -74,8 +74,8 @@ class Writer { // Create a writer that will append data to "*dest". // "*dest" must be initially empty. // "*dest" must remain live while this Writer is in use. - explicit Writer(unique_ptr&& dest, - uint64_t log_number, bool recycle_log_files); + explicit Writer(unique_ptr&& dest, uint64_t log_number, + bool recycle_log_files, bool manual_flush = false); ~Writer(); Status AddRecord(const Slice& slice); @@ -85,6 +85,8 @@ class Writer { uint64_t get_log_number() const { return log_number_; } + Status WriteBuffer(); + private: unique_ptr dest_; size_t block_offset_; // Current offset in block @@ -98,6 +100,10 @@ class Writer { Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length); + // If true, it does not flush after each write. Instead it relies on the upper + // layer to manually does the flush by calling ::WriteBuffer() + bool manual_flush_; + // No copying allowed Writer(const Writer&); void operator=(const Writer&); diff --git a/db/repair.cc b/db/repair.cc index 1f9e344e1..615ad454f 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -524,6 +524,7 @@ class Repairer { max_sequence = tables_[i].max_sequence; } } + vset_.SetLastToBeWrittenSequence(max_sequence); vset_.SetLastSequence(max_sequence); for (const auto& cf_id_and_tables : cf_id_to_tables) { diff --git a/db/version_set.cc b/db/version_set.cc index 6c220b5ef..6387e2df2 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2286,6 +2286,7 @@ VersionSet::VersionSet(const std::string& dbname, manifest_file_number_(0), // Filled by Recover() pending_manifest_file_number_(0), last_sequence_(0), + last_to_be_written_sequence_(0), prev_log_number_(0), current_version_number_(0), manifest_file_size_(0), @@ -2922,6 +2923,7 @@ Status VersionSet::Recover( manifest_file_size_ = current_manifest_file_size; next_file_number_.store(next_file + 1); + last_to_be_written_sequence_ = last_sequence; last_sequence_ = last_sequence; prev_log_number_ = previous_log_number; @@ -3291,6 +3293,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, } next_file_number_.store(next_file + 1); + last_to_be_written_sequence_ = last_sequence; last_sequence_ = last_sequence; prev_log_number_ = previous_log_number; diff --git a/db/version_set.h b/db/version_set.h index f1f0dcb64..17627a4b1 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -699,12 +699,31 @@ class VersionSet { return last_sequence_.load(std::memory_order_acquire); } + // Note: memory_order_acquire must be sufficient. + uint64_t LastToBeWrittenSequence() const { + return last_to_be_written_sequence_.load(std::memory_order_seq_cst); + } + // Set the last sequence number to s. void SetLastSequence(uint64_t s) { assert(s >= last_sequence_); + // Last visible seqeunce must always be less than last written seq + assert(!db_options_->concurrent_prepare || + s <= last_to_be_written_sequence_); last_sequence_.store(s, std::memory_order_release); } + // Note: memory_order_release must be sufficient + void SetLastToBeWrittenSequence(uint64_t s) { + assert(s >= last_to_be_written_sequence_); + last_to_be_written_sequence_.store(s, std::memory_order_seq_cst); + } + + // Note: memory_order_release must be sufficient + uint64_t FetchAddLastToBeWrittenSequence(uint64_t s) { + return last_to_be_written_sequence_.fetch_add(s, std::memory_order_seq_cst); + } + // Mark the specified file number as used. // REQUIRED: this is only called during single-threaded recovery void MarkFileNumberUsedDuringRecovery(uint64_t number); @@ -804,7 +823,10 @@ class VersionSet { uint64_t manifest_file_number_; uint64_t options_file_number_; uint64_t pending_manifest_file_number_; + // The last seq visible to reads std::atomic last_sequence_; + // The last seq with which a writer has written/will write. + std::atomic last_to_be_written_sequence_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted // Opened lazily diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index fb79601e3..f38131106 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -69,6 +69,7 @@ class WalManagerTest : public testing::Test { batch.Put(key, value); WriteBatchInternal::SetSequence(&batch, seq); current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch)); + versions_->SetLastToBeWrittenSequence(seq); versions_->SetLastSequence(seq); } diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index 03a82174a..18727ee6a 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -125,176 +125,180 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { {false, false, true, false, true}, }; - for (auto& allow_parallel : {true, false}) { - for (auto& allow_batching : {true, false}) { - for (auto& enable_WAL : {true, false}) { - for (auto& enable_pipelined_write : {true, false}) { - for (auto& write_group : write_scenarios) { - Options options; - options.create_if_missing = true; - options.allow_concurrent_memtable_write = allow_parallel; - options.enable_pipelined_write = enable_pipelined_write; - - ReadOptions read_options; - DB* db; - DBImpl* db_impl; - - DestroyDB(dbname, options); - ASSERT_OK(DB::Open(options, dbname, &db)); - - db_impl = dynamic_cast(db); - ASSERT_TRUE(db_impl); - - std::atomic threads_waiting(0); - std::atomic seq(db_impl->GetLatestSequenceNumber()); - ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0); - - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { - uint64_t cur_threads_waiting = 0; - bool is_leader = false; - bool is_last = false; - - // who am i - do { - cur_threads_waiting = threads_waiting.load(); - is_leader = (cur_threads_waiting == 0); - is_last = (cur_threads_waiting == write_group.size() - 1); - } while (!threads_waiting.compare_exchange_strong( - cur_threads_waiting, cur_threads_waiting + 1)); - - // check my state - auto* writer = reinterpret_cast(arg); - - if (is_leader) { - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_GROUP_LEADER); - } else { - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_INIT); - } - - // (meta test) the first WriteOP should indeed be the first - // and the last should be the last (all others can be out of - // order) - if (is_leader) { - ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == - !write_group.front().callback_.should_fail_); - } else if (is_last) { - ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == - !write_group.back().callback_.should_fail_); - } + for (auto& two_queues : {true, false}) { + for (auto& allow_parallel : {true, false}) { + for (auto& allow_batching : {true, false}) { + for (auto& enable_WAL : {true, false}) { + for (auto& enable_pipelined_write : {true, false}) { + for (auto& write_group : write_scenarios) { + Options options; + options.create_if_missing = true; + options.allow_concurrent_memtable_write = allow_parallel; + options.enable_pipelined_write = enable_pipelined_write; + options.concurrent_prepare = two_queues; + + ReadOptions read_options; + DB* db; + DBImpl* db_impl; + + DestroyDB(dbname, options); + ASSERT_OK(DB::Open(options, dbname, &db)); + + db_impl = dynamic_cast(db); + ASSERT_TRUE(db_impl); + + std::atomic threads_waiting(0); + std::atomic seq(db_impl->GetLatestSequenceNumber()); + ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + uint64_t cur_threads_waiting = 0; + bool is_leader = false; + bool is_last = false; + + // who am i + do { + cur_threads_waiting = threads_waiting.load(); + is_leader = (cur_threads_waiting == 0); + is_last = (cur_threads_waiting == write_group.size() - 1); + } while (!threads_waiting.compare_exchange_strong( + cur_threads_waiting, cur_threads_waiting + 1)); + + // check my state + auto* writer = reinterpret_cast(arg); + + if (is_leader) { + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_GROUP_LEADER); + } else { + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_INIT); + } + + // (meta test) the first WriteOP should indeed be the first + // and the last should be the last (all others can be out of + // order) + if (is_leader) { + ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == + !write_group.front().callback_.should_fail_); + } else if (is_last) { + ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == + !write_group.back().callback_.should_fail_); + } + + // wait for friends + while (threads_waiting.load() < write_group.size()) { + } + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) { + // check my state + auto* writer = reinterpret_cast(arg); + + if (!allow_batching) { + // no batching so everyone should be a leader + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_GROUP_LEADER); + } else if (!allow_parallel) { + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_COMPLETED || + (enable_pipelined_write && + writer->state == + WriteThread::State:: + STATE_MEMTABLE_WRITER_LEADER)); + } + }); + + std::atomic thread_num(0); + std::atomic dummy_key(0); + std::function write_with_callback_func = [&]() { + uint32_t i = thread_num.fetch_add(1); + Random rnd(i); + + // leaders gotta lead + while (i > 0 && threads_waiting.load() < 1) { + } - // wait for friends - while (threads_waiting.load() < write_group.size()) { - } - }); - - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) { - // check my state - auto* writer = reinterpret_cast(arg); - - if (!allow_batching) { - // no batching so everyone should be a leader - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_GROUP_LEADER); - } else if (!allow_parallel) { - ASSERT_TRUE( - writer->state == WriteThread::State::STATE_COMPLETED || - (enable_pipelined_write && - writer->state == - WriteThread::State::STATE_MEMTABLE_WRITER_LEADER)); - } - }); + // loser has to lose + while (i == write_group.size() - 1 && + threads_waiting.load() < write_group.size() - 1) { + } - std::atomic thread_num(0); - std::atomic dummy_key(0); - std::function write_with_callback_func = [&]() { - uint32_t i = thread_num.fetch_add(1); - Random rnd(i); + auto& write_op = write_group.at(i); + write_op.Clear(); + write_op.callback_.allow_batching_ = allow_batching; - // leaders gotta lead - while (i > 0 && threads_waiting.load() < 1) { - } + // insert some keys + for (uint32_t j = 0; j < rnd.Next() % 50; j++) { + // grab unique key + char my_key = 0; + do { + my_key = dummy_key.load(); + } while ( + !dummy_key.compare_exchange_strong(my_key, my_key + 1)); - // loser has to lose - while (i == write_group.size() - 1 && - threads_waiting.load() < write_group.size() - 1) { - } + string skey(5, my_key); + string sval(10, my_key); + write_op.Put(skey, sval); - auto& write_op = write_group.at(i); - write_op.Clear(); - write_op.callback_.allow_batching_ = allow_batching; - - // insert some keys - for (uint32_t j = 0; j < rnd.Next() % 50; j++) { - // grab unique key - char my_key = 0; - do { - my_key = dummy_key.load(); - } while ( - !dummy_key.compare_exchange_strong(my_key, my_key + 1)); - - string skey(5, my_key); - string sval(10, my_key); - write_op.Put(skey, sval); - - if (!write_op.callback_.should_fail_) { - seq.fetch_add(1); + if (!write_op.callback_.should_fail_) { + seq.fetch_add(1); + } } - } - WriteOptions woptions; - woptions.disableWAL = !enable_WAL; - woptions.sync = enable_WAL; - Status s = db_impl->WriteWithCallback( - woptions, &write_op.write_batch_, &write_op.callback_); + WriteOptions woptions; + woptions.disableWAL = !enable_WAL; + woptions.sync = enable_WAL; + Status s = db_impl->WriteWithCallback( + woptions, &write_op.write_batch_, &write_op.callback_); - if (write_op.callback_.should_fail_) { - ASSERT_TRUE(s.IsBusy()); - } else { - ASSERT_OK(s); - } - }; + if (write_op.callback_.should_fail_) { + ASSERT_TRUE(s.IsBusy()); + } else { + ASSERT_OK(s); + } + }; - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - // do all the writes - std::vector threads; - for (uint32_t i = 0; i < write_group.size(); i++) { - threads.emplace_back(write_with_callback_func); - } - for (auto& t : threads) { - t.join(); - } + // do all the writes + std::vector threads; + for (uint32_t i = 0; i < write_group.size(); i++) { + threads.emplace_back(write_with_callback_func); + } + for (auto& t : threads) { + t.join(); + } - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - // check for keys - string value; - for (auto& w : write_group) { - ASSERT_TRUE(w.callback_.was_called_.load()); - for (auto& kvp : w.kvs_) { - if (w.callback_.should_fail_) { - ASSERT_TRUE( - db->Get(read_options, kvp.first, &value).IsNotFound()); - } else { - ASSERT_OK(db->Get(read_options, kvp.first, &value)); - ASSERT_EQ(value, kvp.second); + // check for keys + string value; + for (auto& w : write_group) { + ASSERT_TRUE(w.callback_.was_called_.load()); + for (auto& kvp : w.kvs_) { + if (w.callback_.should_fail_) { + ASSERT_TRUE( + db->Get(read_options, kvp.first, &value).IsNotFound()); + } else { + ASSERT_OK(db->Get(read_options, kvp.first, &value)); + ASSERT_EQ(value, kvp.second); + } } } - } - ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber()); + ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber()); - delete db; - DestroyDB(dbname, options); + delete db; + DestroyDB(dbname, options); + } } } } } - } +} } TEST_F(WriteCallbackTest, WriteCallBackTest) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index c25bcf771..67d4aac43 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -853,6 +853,11 @@ class DB { return Flush(options, DefaultColumnFamily()); } + // Flush the WAL memory buffer to the file. If sync is true, it calls SyncWAL + // afterwards. + virtual Status FlushWAL(bool sync) { + return Status::NotSupported("FlushWAL not implemented"); + } // Sync the wal. Note that Write() followed by SyncWAL() is not exactly the // same as Write() with sync=true: in the latter case the changes won't be // visible until the sync is done. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 283085a53..1c90a68be 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -887,6 +887,18 @@ struct DBOptions { // DEFAULT: false // Immutable. bool allow_ingest_behind = false; + + // If enabled it uses two queues for writes, one for the ones with + // disable_memtable and one for the ones that also write to memtable. This + // allows the memtable writes not to lag behind other writes. It can be used + // to optimize MySQL 2PC in which only the commits, which are serial, write to + // memtable. + bool concurrent_prepare = false; + + // If true WAL is not flushed automatically after each write. Instead it + // relies on manual invocation of FlushWAL to write the WAL buffer to its + // file. + bool manual_wal_flush = false; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 7ae8c9e4a..db5068b1d 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -268,6 +268,8 @@ class StackableDB : public DB { return db_->SyncWAL(); } + virtual Status FlushWAL(bool sync) override { return db_->FlushWAL(sync); } + #ifndef ROCKSDB_LITE virtual Status DisableFileDeletions() override { diff --git a/options/db_options.cc b/options/db_options.cc index d990ca81a..030a70b9c 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -86,7 +86,9 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) fail_if_options_file_error(options.fail_if_options_file_error), dump_malloc_stats(options.dump_malloc_stats), avoid_flush_during_recovery(options.avoid_flush_during_recovery), - allow_ingest_behind(options.allow_ingest_behind) { + allow_ingest_behind(options.allow_ingest_behind), + concurrent_prepare(options.concurrent_prepare), + manual_wal_flush(options.manual_wal_flush) { } void ImmutableDBOptions::Dump(Logger* log) const { @@ -219,6 +221,10 @@ void ImmutableDBOptions::Dump(Logger* log) const { avoid_flush_during_recovery); ROCKS_LOG_HEADER(log, " Options.allow_ingest_behind: %d", allow_ingest_behind); + ROCKS_LOG_HEADER(log, " Options.concurrent_prepare: %d", + concurrent_prepare); + ROCKS_LOG_HEADER(log, " Options.manual_wal_flush: %d", + manual_wal_flush); } MutableDBOptions::MutableDBOptions() diff --git a/options/db_options.h b/options/db_options.h index c174aeb08..f8c291b3b 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -79,6 +79,8 @@ struct ImmutableDBOptions { bool dump_malloc_stats; bool avoid_flush_during_recovery; bool allow_ingest_behind; + bool concurrent_prepare; + bool manual_wal_flush; }; struct MutableDBOptions { diff --git a/options/options_helper.h b/options/options_helper.h index a82cc9e47..399164514 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -349,7 +349,15 @@ static std::unordered_map db_options_type_info = { {"allow_ingest_behind", {offsetof(struct DBOptions, allow_ingest_behind), OptionType::kBoolean, OptionVerificationType::kNormal, false, - offsetof(struct ImmutableDBOptions, allow_ingest_behind)}}}; + offsetof(struct ImmutableDBOptions, allow_ingest_behind)}}, + {"concurrent_prepare", + {offsetof(struct DBOptions, concurrent_prepare), OptionType::kBoolean, + OptionVerificationType::kNormal, false, + offsetof(struct ImmutableDBOptions, concurrent_prepare)}}, + {"manual_wal_flush", + {offsetof(struct DBOptions, manual_wal_flush), OptionType::kBoolean, + OptionVerificationType::kNormal, false, + offsetof(struct ImmutableDBOptions, manual_wal_flush)}}}; // offset_of is used to get the offset of a class data member // ex: offset_of(&ColumnFamilyOptions::num_levels) diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index bfe080828..8345ec182 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -13,22 +13,11 @@ #define __STDC_FORMAT_MACROS #endif -#include -#include #include -#include -#include "options/options_helper.h" #include "options/options_parser.h" -#include "options/options_sanity_check.h" -#include "rocksdb/cache.h" #include "rocksdb/convenience.h" -#include "rocksdb/memtablerep.h" -#include "rocksdb/utilities/leveldb_options.h" -#include "util/random.h" -#include "util/stderr_logger.h" #include "util/testharness.h" -#include "util/testutil.h" #ifndef GFLAGS bool FLAGS_enable_print = false; @@ -294,7 +283,9 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "allow_2pc=false;" "avoid_flush_during_recovery=false;" "avoid_flush_during_shutdown=false;" - "allow_ingest_behind=false;", + "allow_ingest_behind=false;" + "concurrent_prepare=false;" + "manual_wal_flush=false;", new_options)); ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions), diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 4cd0ba51c..1db900e35 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -9,7 +9,6 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. - #include "table/block_based_table_factory.h" #include diff --git a/util/murmurhash.cc b/util/murmurhash.cc index 309166bce..376b644b4 100644 --- a/util/murmurhash.cc +++ b/util/murmurhash.cc @@ -8,8 +8,8 @@ /* Murmurhash from http://sites.google.com/site/murmurhash/ - All code is released to the public domain. For business purposes, Murmurhash is - under the MIT license. + All code is released to the public domain. For business purposes, Murmurhash + is under the MIT license. */ #include "murmurhash.h" diff --git a/util/murmurhash.h b/util/murmurhash.h index b368313fa..403b67c45 100644 --- a/util/murmurhash.h +++ b/util/murmurhash.h @@ -8,8 +8,8 @@ /* Murmurhash from http://sites.google.com/site/murmurhash/ - All code is released to the public domain. For business purposes, Murmurhash is - under the MIT license. + All code is released to the public domain. For business purposes, Murmurhash + is under the MIT license. */ #pragma once #include diff --git a/util/transaction_test_util.cc b/util/transaction_test_util.cc index 3d1764b90..223395991 100644 --- a/util/transaction_test_util.cc +++ b/util/transaction_test_util.cc @@ -12,6 +12,7 @@ #include #include +#include #include "rocksdb/db.h" #include "rocksdb/utilities/optimistic_transaction_db.h" @@ -135,6 +136,13 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, if (s.ok()) { if (txn != nullptr) { + std::hash hasher; + char name[64]; + snprintf(name, 64, "txn%zu-%d", hasher(std::this_thread::get_id()), + txn_id_++); + assert(strlen(name) < 64 - 1); + txn->SetName(name); + s = txn->Prepare(); s = txn->Commit(); if (!s.ok()) { diff --git a/util/transaction_test_util.h b/util/transaction_test_util.h index 97c62841f..4e192bac8 100644 --- a/util/transaction_test_util.h +++ b/util/transaction_test_util.h @@ -104,6 +104,8 @@ class RandomTransactionInserter { Transaction* txn_ = nullptr; Transaction* optimistic_txn_ = nullptr; + std::atomic txn_id_; + bool DoInsert(DB* db, Transaction* txn, bool is_optimistic); }; diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index f2a97d1d8..3a761c3f0 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -136,6 +136,9 @@ class DummyDB : public StackableDB { return Status::OK(); } + // To avoid FlushWAL called on stacked db which is nullptr + virtual Status FlushWAL(bool sync) override { return Status::OK(); } + std::vector live_files_; // pair std::vector> wal_files_; diff --git a/utilities/checkpoint/checkpoint_impl.cc b/utilities/checkpoint/checkpoint_impl.cc index 19543aad4..3a9606af4 100644 --- a/utilities/checkpoint/checkpoint_impl.cc +++ b/utilities/checkpoint/checkpoint_impl.cc @@ -209,6 +209,7 @@ Status CheckpointImpl::CreateCustomCheckpoint( TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1"); TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"); + db_->FlushWAL(false /* sync */); } // if we have more than one column family, we need to also get WAL files if (s.ok()) { diff --git a/utilities/transactions/transaction_impl.cc b/utilities/transactions/transaction_impl.cc index 731e138c8..b848b1be2 100644 --- a/utilities/transactions/transaction_impl.cc +++ b/utilities/transactions/transaction_impl.cc @@ -274,6 +274,8 @@ Status TransactionImpl::Commit() { s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, log_number_); if (!s.ok()) { + ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log, + "Commit write failed"); return s; } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index c93daddcc..5abe23cdb 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -35,7 +35,8 @@ using std::string; namespace rocksdb { -class TransactionTest : public ::testing::TestWithParam { +class TransactionTest + : public ::testing::TestWithParam> { public: TransactionDB* db; FaultInjectionTestEnv* env; @@ -52,13 +53,14 @@ class TransactionTest : public ::testing::TestWithParam { options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); env = new FaultInjectionTestEnv(Env::Default()); options.env = env; + options.concurrent_prepare = std::get<1>(GetParam()); dbname = test::TmpDir() + "/transaction_testdb"; DestroyDB(dbname, options); txn_db_options.transaction_lock_timeout = 0; txn_db_options.default_lock_timeout = 0; Status s; - if (GetParam() == false) { + if (std::get<0>(GetParam()) == false) { s = TransactionDB::Open(options, txn_db_options, dbname, &db); } else { s = OpenWithStackableDB(); @@ -79,7 +81,7 @@ class TransactionTest : public ::testing::TestWithParam { env->DropUnsyncedFileData(); env->ResetState(); Status s; - if (GetParam() == false) { + if (std::get<0>(GetParam()) == false) { s = TransactionDB::Open(options, txn_db_options, dbname, &db); } else { s = OpenWithStackableDB(); @@ -91,7 +93,7 @@ class TransactionTest : public ::testing::TestWithParam { delete db; DestroyDB(dbname, options); Status s; - if (GetParam() == false) { + if (std::get<0>(GetParam()) == false) { s = TransactionDB::Open(options, txn_db_options, dbname, &db); } else { s = OpenWithStackableDB(); @@ -122,9 +124,17 @@ class TransactionTest : public ::testing::TestWithParam { } }; -INSTANTIATE_TEST_CASE_P(DBAsBaseDB, TransactionTest, ::testing::Values(false)); +class MySQLStyleTransactionTest : public TransactionTest {}; + +INSTANTIATE_TEST_CASE_P(DBAsBaseDB, TransactionTest, + ::testing::Values(std::make_tuple(false, false))); INSTANTIATE_TEST_CASE_P(StackableDBAsBaseDB, TransactionTest, - ::testing::Values(true)); + ::testing::Values(std::make_tuple(true, false))); +INSTANTIATE_TEST_CASE_P(MySQLStyleTransactionTest, MySQLStyleTransactionTest, + ::testing::Values(std::make_tuple(false, false), + std::make_tuple(false, true), + std::make_tuple(true, false), + std::make_tuple(true, true))); TEST_P(TransactionTest, DoubleEmptyWrite) { WriteOptions write_options; @@ -957,6 +967,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { s = db->Get(read_options, Slice("foo"), &value); ASSERT_TRUE(s.IsNotFound()); + db->FlushWAL(false); delete txn; // kill and reopen s = ReOpenNoDelete(); @@ -1021,7 +1032,8 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) { ASSERT_EQ(db->GetTransactionByName("xid"), nullptr); } -TEST_P(TransactionTest, TwoPhaseMultiThreadTest) { +// TODO this test needs to be updated with serial commits +TEST_P(TransactionTest, DISABLED_TwoPhaseMultiThreadTest) { // mix transaction writes and regular writes const uint32_t NUM_TXN_THREADS = 50; std::atomic txn_thread_num(0); @@ -1546,6 +1558,8 @@ TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) { s = db->Put(wal_on, "cats", "dogs4"); ASSERT_OK(s); + db->FlushWAL(false); + // kill and reopen env->SetFilesystemActive(false); ReOpenNoDelete(); @@ -4487,7 +4501,7 @@ Status TransactionStressTestInserter(TransactionDB* db, } } // namespace -TEST_P(TransactionTest, TransactionStressTest) { +TEST_P(MySQLStyleTransactionTest, TransactionStressTest) { const size_t num_threads = 4; const size_t num_transactions_per_thread = 10000; const size_t num_sets = 3;