From f383641a1d772bcde6dc42f26d798c0d93311443 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Mon, 13 May 2019 17:43:47 -0700 Subject: [PATCH] Unordered Writes (#5218) Summary: Performing unordered writes in rocksdb when unordered_write option is set to true. When enabled the writes to memtable are done without joining any write thread. This offers much higher write throughput since the upcoming writes would not have to wait for the slowest memtable write to finish. The tradeoff is that the writes visible to a snapshot might change over time. If the application cannot tolerate that, it should implement its own mechanisms to work around that. Using TransactionDB with WRITE_PREPARED write policy is one way to achieve that. Doing so increases the max throughput by 2.2x without however compromising the snapshot guarantees. The patch is prepared based on an original by siying Existing unit tests are extended to include unordered_write option. Benchmark Results: ``` TEST_TMPDIR=/dev/shm/ ./db_bench_unordered --benchmarks=fillrandom --threads=32 --num=10000000 -max_write_buffer_number=16 --max_background_jobs=64 --batch_size=8 --writes=3000000 -level0_file_num_compaction_trigger=99999 --level0_slowdown_writes_trigger=99999 --level0_stop_writes_trigger=99999 -enable_pipelined_write=false -disable_auto_compactions --unordered_write=1 ``` With WAL - Vanilla RocksDB: 78.6 MB/s - WRITER_PREPARED with unordered_write: 177.8 MB/s (2.2x) - unordered_write: 368.9 MB/s (4.7x with relaxed snapshot guarantees) Without WAL - Vanilla RocksDB: 111.3 MB/s - WRITER_PREPARED with unordered_write: 259.3 MB/s MB/s (2.3x) - unordered_write: 645.6 MB/s (5.8x with relaxed snapshot guarantees) - WRITER_PREPARED with unordered_write disable concurrency control: 185.3 MB/s MB/s (2.35x) Limitations: - The feature is not yet extended to `max_successive_merges` > 0. The feature is also incompatible with `enable_pipelined_write` = true as well as with `allow_concurrent_memtable_write` = false. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5218 Differential Revision: D15219029 Pulled By: maysamyabandeh fbshipit-source-id: 38f2abc4af8780148c6128acdba2b3227bc81759 --- HISTORY.md | 1 + db/c.cc | 5 + db/db_bloom_filter_test.cc | 2 + db/db_impl.h | 58 ++++- db/db_impl_open.cc | 11 + db/db_impl_write.cc | 157 ++++++++++-- db/db_memtable_test.cc | 69 +++++ db/db_test_util.cc | 8 + db/db_test_util.h | 6 + db/flush_scheduler.h | 3 + db/plain_table_db_test.cc | 1 + db/write_batch.cc | 9 +- db/write_callback_test.cc | 17 +- include/rocksdb/c.h | 2 + include/rocksdb/options.h | 25 ++ options/db_options.cc | 3 + options/db_options.h | 1 + options/options_helper.cc | 4 + options/options_settable_test.cc | 1 + table/block_based_table_factory.cc | 8 +- tools/db_bench_tool.cc | 4 + .../pessimistic_transaction_db.cc | 16 +- utilities/transactions/transaction_test.cc | 61 +++-- utilities/transactions/transaction_test.h | 27 +- .../write_prepared_transaction_test.cc | 237 ++++++++++++------ .../write_unprepared_transaction_test.cc | 3 +- 26 files changed, 585 insertions(+), 154 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 23d8717f3..919dea211 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ ### New Features * Add an option `snap_refresh_nanos` (default to 0.1s) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature. +* Add an option `unordered_write` which trades snapshot guarantees with higher write throughput. When used with WRITE_PREPARED transactions, it offers higher throughput with however no compromise on guarantees. ### Performance Improvements * Reduce binary search when iterator reseek into the same data block. diff --git a/db/c.cc b/db/c.cc index 58b51e252..8f96366fb 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2473,6 +2473,11 @@ void rocksdb_options_set_enable_pipelined_write(rocksdb_options_t* opt, opt->rep.enable_pipelined_write = v; } +void rocksdb_options_set_unordered_write(rocksdb_options_t* opt, + unsigned char v) { + opt->rep.unordered_write = v; +} + void rocksdb_options_set_max_subcompactions(rocksdb_options_t* opt, uint32_t n) { opt->rep.max_subcompactions = n; diff --git a/db/db_bloom_filter_test.cc b/db/db_bloom_filter_test.cc index a2a01d6b4..beed590ae 100644 --- a/db/db_bloom_filter_test.cc +++ b/db/db_bloom_filter_test.cc @@ -1095,6 +1095,8 @@ TEST_F(DBBloomFilterTest, PrefixScan) { options.max_background_compactions = 2; options.create_if_missing = true; options.memtable_factory.reset(NewHashSkipListRepFactory(16)); + assert(!options.unordered_write); + // It is incompatible with allow_concurrent_memtable_write=false options.allow_concurrent_memtable_write = false; BlockBasedTableOptions table_options; diff --git a/db/db_impl.h b/db/db_impl.h index 623f69ba6..0ee5d82b5 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -897,14 +897,32 @@ class DBImpl : public DB { bool disable_memtable = false, uint64_t* seq_used = nullptr); - // batch_cnt is expected to be non-zero in seq_per_batch mode and indicates - // the number of sub-patches. A sub-patch is a subset of the write batch that - // does not have duplicate keys. - Status WriteImplWALOnly(const WriteOptions& options, WriteBatch* updates, - WriteCallback* callback = nullptr, - uint64_t* log_used = nullptr, uint64_t log_ref = 0, - uint64_t* seq_used = nullptr, size_t batch_cnt = 0, - PreReleaseCallback* pre_release_callback = nullptr); + // Write only to memtables without joining any write queue + Status UnorderedWriteMemtable(const WriteOptions& write_options, + WriteBatch* my_batch, WriteCallback* callback, + uint64_t log_ref, SequenceNumber seq, + const size_t sub_batch_cnt); + + // Whether the batch requires to be assigned with an order + enum AssignOrder : bool { kDontAssignOrder, kDoAssignOrder }; + // Whether it requires publishing last sequence or not + enum PublishLastSeq : bool { kDontPublishLastSeq, kDoPublishLastSeq }; + + // Join the write_thread to write the batch only to the WAL. It is the + // responsibility of the caller to also write the write batch to the memtable + // if it required. + // + // sub_batch_cnt is expected to be non-zero when assign_order = kDoAssignOrder + // indicating the number of sub-batches in my_batch. A sub-patch is a subset + // of the write batch that does not have duplicate keys. When seq_per_batch is + // not set, each key is a separate sub_batch. Otherwise each duplicate key + // marks start of a new sub-batch. + Status WriteImplWALOnly( + WriteThread* write_thread, const WriteOptions& options, + WriteBatch* updates, WriteCallback* callback, uint64_t* log_used, + const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt, + PreReleaseCallback* pre_release_callback, const AssignOrder assign_order, + const PublishLastSeq publish_last_seq, const bool disable_memtable); // write cached_recoverable_state_ to memtable if it is not empty // The writer must be the leader in write_thread_ and holding mutex_ @@ -1121,6 +1139,20 @@ class DBImpl : public DB { const autovector& flush_memtable_ids, bool resuming_from_bg_err); + inline void WaitForPendingWrites() { + if (!immutable_db_options_.unordered_write) { + // Then the writes are finished before the next write group starts + return; + } + // Wait for the ones who already wrote to the WAL to finish their + // memtable write. + if (pending_memtable_writes_.load() != 0) { + std::unique_lock guard(switch_mutex_); + switch_cv_.wait(guard, + [&] { return pending_memtable_writes_.load() == 0; }); + } + } + // REQUIRES: mutex locked and in write thread. void AssignAtomicFlushSeq(const autovector& cfds); @@ -1571,13 +1603,21 @@ class DBImpl : public DB { // corresponding call to PurgeObsoleteFiles has not yet finished. int pending_purge_obsolete_files_; - // last time when DeleteObsoleteFiles with full scan was executed. Originaly + // last time when DeleteObsoleteFiles with full scan was executed. Originally // initialized with startup time. uint64_t delete_obsolete_files_last_run_; // last time stats were dumped to LOG std::atomic last_stats_dump_time_microsec_; + // The thread that wants to switch memtable, can wait on this cv until the + // pending writes to memtable finishes. + std::condition_variable switch_cv_; + // The mutex used by switch_cv_. mutex_ should be acquired beforehand. + std::mutex switch_mutex_; + // Number of threads intending to write to memtable + std::atomic pending_memtable_writes_ = {}; + // Each flush or compaction gets its own job id. this counter makes sure // they're unique std::atomic next_job_id_; diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 1bc69b491..66104d0ba 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -228,6 +228,17 @@ static Status ValidateOptions( return Status::InvalidArgument("keep_log_file_num must be greater than 0"); } + if (db_options.unordered_write && + !db_options.allow_concurrent_memtable_write) { + return Status::InvalidArgument( + "unordered_write is incompatible with !allow_concurrent_memtable_write"); + } + + if (db_options.unordered_write && db_options.enable_pipelined_write) { + return Status::InvalidArgument( + "unordered_write is incompatible with enable_pipelined_write"); + } + return Status::OK(); } } // namespace diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 3edec9ac5..733eb408a 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -94,6 +94,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return Status::NotSupported( "pipelined_writes is not compatible with seq_per_batch"); } + if (immutable_db_options_.unordered_write && + immutable_db_options_.enable_pipelined_write) { + return Status::NotSupported( + "pipelined_writes is not compatible with unordered_write"); + } // Otherwise IsLatestPersistentState optimization does not make sense assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) || disable_memtable); @@ -107,8 +112,39 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } if (two_write_queues_ && disable_memtable) { - return WriteImplWALOnly(write_options, my_batch, callback, log_used, - log_ref, seq_used, batch_cnt, pre_release_callback); + AssignOrder assign_order = + seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder; + // Otherwise it is WAL-only Prepare batches in WriteCommitted policy and + // they don't consume sequence. + return WriteImplWALOnly(&nonmem_write_thread_, write_options, my_batch, + callback, log_used, log_ref, seq_used, batch_cnt, + pre_release_callback, assign_order, + kDontPublishLastSeq, disable_memtable); + } + + if (immutable_db_options_.unordered_write) { + const size_t sub_batch_cnt = batch_cnt != 0 + ? batch_cnt + // every key is a sub-batch consuming a seq + : WriteBatchInternal::Count(my_batch); + uint64_t seq; + // Use a write thread to i) optimize for WAL write, ii) publish last + // sequence in in increasing order, iii) call pre_release_callback serially + status = WriteImplWALOnly(&write_thread_, write_options, my_batch, callback, + log_used, log_ref, &seq, sub_batch_cnt, + pre_release_callback, kDoAssignOrder, + kDoPublishLastSeq, disable_memtable); + if (!status.ok()) { + return status; + } + if (seq_used) { + *seq_used = seq; + } + if (!disable_memtable) { + status = UnorderedWriteMemtable(write_options, my_batch, callback, + log_ref, seq, sub_batch_cnt); + } + return status; } if (immutable_db_options_.enable_pipelined_write) { @@ -534,23 +570,65 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, return w.FinalStatus(); } +Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options, + WriteBatch* my_batch, + WriteCallback* callback, uint64_t log_ref, + SequenceNumber seq, + const size_t sub_batch_cnt) { + PERF_TIMER_GUARD(write_pre_and_post_process_time); + StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); + + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + false /*disable_memtable*/); + + if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) { + w.sequence = seq; + size_t total_count = WriteBatchInternal::Count(my_batch); + InternalStats* stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count); + RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + + ColumnFamilyMemTablesImpl column_family_memtables( + versions_->GetColumnFamilySet()); + w.status = WriteBatchInternal::InsertInto( + &w, w.sequence, &column_family_memtables, &flush_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, this, + true /*concurrent_memtable_writes*/, seq_per_batch_, sub_batch_cnt); + + WriteStatusCheck(w.status); + if (write_options.disableWAL) { + has_unpersisted_data_.store(true, std::memory_order_relaxed); + } + } + + size_t pending_cnt = pending_memtable_writes_.fetch_sub(1) - 1; + if (pending_cnt == 0) { + switch_cv_.notify_all(); + } + + if (!w.FinalStatus().ok()) { + return w.FinalStatus(); + } + return Status::OK(); +} + // The 2nd write queue. If enabled it will be used only for WAL-only writes. // This is the only queue that updates LastPublishedSequence which is only // applicable in a two-queue setting. -Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, - WriteBatch* my_batch, WriteCallback* callback, - uint64_t* log_used, uint64_t log_ref, - uint64_t* seq_used, size_t batch_cnt, - PreReleaseCallback* pre_release_callback) { +Status DBImpl::WriteImplWALOnly( + WriteThread* write_thread, const WriteOptions& write_options, + WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, + const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt, + PreReleaseCallback* pre_release_callback, const AssignOrder assign_order, + const PublishLastSeq publish_last_seq, const bool disable_memtable) { Status status; PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w(write_options, my_batch, callback, log_ref, - true /* disable_memtable */, batch_cnt, - pre_release_callback); + disable_memtable, sub_batch_cnt, pre_release_callback); RecordTick(stats_, WRITE_WITH_WAL); StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); - nonmem_write_thread_.JoinBatchGroup(&w); + write_thread->JoinBatchGroup(&w); assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER); if (w.state == WriteThread::STATE_COMPLETED) { if (log_used != nullptr) { @@ -563,9 +641,33 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, } // else we are the leader of the write batch group assert(w.state == WriteThread::STATE_GROUP_LEADER); + + if (publish_last_seq == kDoPublishLastSeq) { + // Currently we only use kDoPublishLastSeq in unordered_write + assert(immutable_db_options_.unordered_write); + WriteContext write_context; + if (error_handler_.IsDBStopped()) { + status = error_handler_.GetBGError(); + } + // TODO(myabandeh): Make preliminary checks thread-safe so we could do them + // without paying the cost of obtaining the mutex. + if (status.ok()) { + InstrumentedMutexLock l(&mutex_); + bool need_log_sync = false; + status = PreprocessWrite(write_options, &need_log_sync, &write_context); + WriteStatusCheck(status); + } + if (!status.ok()) { + WriteThread::WriteGroup write_group; + write_thread->EnterAsBatchGroupLeader(&w, &write_group); + write_thread->ExitAsBatchGroupLeader(write_group, status); + return status; + } + } + WriteThread::WriteGroup write_group; uint64_t last_sequence; - nonmem_write_thread_.EnterAsBatchGroupLeader(&w, &write_group); + write_thread->EnterAsBatchGroupLeader(&w, &write_group); // Note: no need to update last_batch_group_size_ here since the batch writes // to WAL only @@ -602,11 +704,13 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, // LastAllocatedSequence is increased inside WriteToWAL under // wal_write_mutex_ to ensure ordered events in WAL size_t seq_inc = 0 /* total_count */; - if (seq_per_batch_) { + if (assign_order == kDoAssignOrder) { size_t total_batch_cnt = 0; for (auto* writer : write_group) { - assert(writer->batch_cnt); - total_batch_cnt += writer->batch_cnt; + assert(writer->batch_cnt || !seq_per_batch_); + if (!writer->CallbackFailed()) { + total_batch_cnt += writer->batch_cnt; + } } seq_inc = total_batch_cnt; } @@ -617,16 +721,21 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, // Otherwise we inc seq number to do solely the seq allocation last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); } + + size_t memtable_write_cnt = 0; auto curr_seq = last_sequence + 1; for (auto* writer : write_group) { if (writer->CallbackFailed()) { continue; } writer->sequence = curr_seq; - if (seq_per_batch_) { - assert(writer->batch_cnt); + if (assign_order == kDoAssignOrder) { + assert(writer->batch_cnt || !seq_per_batch_); curr_seq += writer->batch_cnt; } + if (!writer->disable_memtable) { + memtable_write_cnt++; + } // else seq advances only by memtable writes } if (status.ok() && write_options.sync) { @@ -648,9 +757,8 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, for (auto* writer : write_group) { if (!writer->CallbackFailed() && writer->pre_release_callback) { assert(writer->sequence != kMaxSequenceNumber); - const bool DISABLE_MEMTABLE = true; Status ws = writer->pre_release_callback->Callback( - writer->sequence, DISABLE_MEMTABLE, writer->log_used); + writer->sequence, disable_memtable, writer->log_used); if (!ws.ok()) { status = ws; break; @@ -658,7 +766,15 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, } } } - nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, status); + if (publish_last_seq == kDoPublishLastSeq) { + versions_->SetLastSequence(last_sequence + seq_inc); + // Currently we only use kDoPublishLastSeq in unordered_write + assert(immutable_db_options_.unordered_write); + } + if (immutable_db_options_.unordered_write && status.ok()) { + pending_memtable_writes_ += memtable_write_cnt; + } + write_thread->ExitAsBatchGroupLeader(write_group, status); if (status.ok()) { status = w.FinalStatus(); } @@ -710,6 +826,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); if (UNLIKELY(status.ok() && !single_column_family_mode_ && total_log_size_ > GetMaxTotalWalSize())) { + WaitForPendingWrites(); status = SwitchWAL(write_context); } @@ -719,10 +836,12 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, // thread is writing to another DB with the same write buffer, they may also // be flushed. We may end up with flushing much more DBs than needed. It's // suboptimal but still correct. + WaitForPendingWrites(); status = HandleWriteBufferFull(write_context); } if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { + WaitForPendingWrites(); status = ScheduleFlushes(write_context); } diff --git a/db/db_memtable_test.cc b/db/db_memtable_test.cc index 294d0f581..a212c9812 100644 --- a/db/db_memtable_test.cc +++ b/db/db_memtable_test.cc @@ -204,6 +204,75 @@ TEST_F(DBMemTableTest, DuplicateSeq) { delete mem; } +// A simple test to verify that the concurrent merge writes is functional +TEST_F(DBMemTableTest, ConcurrentMergeWrite) { + int num_ops = 1000; + std::string value; + Status s; + MergeContext merge_context; + Options options; + // A merge operator that is not sensitive to concurrent writes since in this + // test we don't order the writes. + options.merge_operator = MergeOperators::CreateUInt64AddOperator(); + + // Create a MemTable + InternalKeyComparator cmp(BytewiseComparator()); + auto factory = std::make_shared(); + options.memtable_factory = factory; + options.allow_concurrent_memtable_write = true; + ImmutableCFOptions ioptions(options); + WriteBufferManager wb(options.db_write_buffer_size); + MemTablePostProcessInfo post_process_info; + MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, + kMaxSequenceNumber, 0 /* column_family_id */); + + // Put 0 as the base + PutFixed64(&value, static_cast(0)); + bool res = mem->Add(0, kTypeValue, "key", value); + ASSERT_TRUE(res); + value.clear(); + + // Write Merge concurrently + rocksdb::port::Thread write_thread1([&]() { + std::string v1; + for (int seq = 1; seq < num_ops / 2; seq++) { + PutFixed64(&v1, seq); + bool res1 = + mem->Add(seq, kTypeMerge, "key", v1, true, &post_process_info); + ASSERT_TRUE(res1); + v1.clear(); + } + }); + rocksdb::port::Thread write_thread2([&]() { + std::string v2; + for (int seq = num_ops / 2; seq < num_ops; seq++) { + PutFixed64(&v2, seq); + bool res2 = + mem->Add(seq, kTypeMerge, "key", v2, true, &post_process_info); + ASSERT_TRUE(res2); + v2.clear(); + } + }); + write_thread1.join(); + write_thread2.join(); + + Status status; + ReadOptions roptions; + SequenceNumber max_covering_tombstone_seq = 0; + LookupKey lkey("key", kMaxSequenceNumber); + res = mem->Get(lkey, &value, &status, &merge_context, + &max_covering_tombstone_seq, roptions); + ASSERT_TRUE(res); + uint64_t ivalue = DecodeFixed64(Slice(value).data()); + uint64_t sum = 0; + for (int seq = 0; seq < num_ops; seq++) { + sum += seq; + } + ASSERT_EQ(ivalue, sum); + + delete mem; +} + TEST_F(DBMemTableTest, InsertWithHint) { Options options; options.allow_concurrent_memtable_write = false; diff --git a/db/db_test_util.cc b/db/db_test_util.cc index bee6b81d5..ebfc7a9ca 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -341,6 +341,7 @@ Options DBTestBase::GetOptions( options.prefix_extractor.reset(NewFixedPrefixTransform(1)); options.memtable_factory.reset(NewHashSkipListRepFactory(16)); options.allow_concurrent_memtable_write = false; + options.unordered_write = false; break; case kPlainTableFirstBytePrefix: options.table_factory.reset(new PlainTableFactory()); @@ -373,12 +374,14 @@ Options DBTestBase::GetOptions( case kVectorRep: options.memtable_factory.reset(new VectorRepFactory(100)); options.allow_concurrent_memtable_write = false; + options.unordered_write = false; break; case kHashLinkList: options.prefix_extractor.reset(NewFixedPrefixTransform(1)); options.memtable_factory.reset( NewHashLinkListRepFactory(4, 0, 3, true, 4)); options.allow_concurrent_memtable_write = false; + options.unordered_write = false; break; case kDirectIO: { options.use_direct_reads = true; @@ -540,6 +543,11 @@ Options DBTestBase::GetOptions( options.manual_wal_flush = true; break; } + case kUnorderedWrite: { + options.allow_concurrent_memtable_write = false; + options.unordered_write = false; + break; + } default: break; diff --git a/db/db_test_util.h b/db/db_test_util.h index 50109e0a4..f5d7fd1a7 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -140,6 +140,11 @@ class SpecialMemTableRep : public MemTableRep { memtable_->Insert(handle); } + void InsertConcurrently(KeyHandle handle) override { + num_entries_++; + memtable_->Insert(handle); + } + // Returns true iff an entry that compares equal to key is in the list. virtual bool Contains(const char* key) const override { return memtable_->Contains(key); @@ -688,6 +693,7 @@ class DBTestBase : public testing::Test { kPartitionedFilterWithNewTableReaderForCompactions, kUniversalSubcompactions, kxxHash64Checksum, + kUnorderedWrite, // This must be the last line kEnd, }; diff --git a/db/flush_scheduler.h b/db/flush_scheduler.h index cd3575861..b5abec405 100644 --- a/db/flush_scheduler.h +++ b/db/flush_scheduler.h @@ -28,6 +28,9 @@ class FlushScheduler { // Filters column families that have been dropped. ColumnFamilyData* TakeNextColumnFamily(); + // This can be called concurrently with ScheduleFlush but it would miss all + // the scheduled flushes after the last synchronization. This would result + // into less precise enforcement of memtable sizes but should not matter much. bool Empty(); void Clear(); diff --git a/db/plain_table_db_test.cc b/db/plain_table_db_test.cc index 2dd0cff0b..8a08cf9fe 100644 --- a/db/plain_table_db_test.cc +++ b/db/plain_table_db_test.cc @@ -142,6 +142,7 @@ class PlainTableDBTest : public testing::Test, options.prefix_extractor.reset(NewFixedPrefixTransform(8)); options.allow_mmap_reads = mmap_mode_; options.allow_concurrent_memtable_write = false; + options.unordered_write = false; return options; } diff --git a/db/write_batch.cc b/db/write_batch.cc index 939b59530..830fbeab1 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1471,7 +1471,6 @@ class MemTableInserter : public WriteBatch::Handler { Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { - assert(!concurrent_memtable_writes_); // optimize for non-recovery mode if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); @@ -1498,6 +1497,8 @@ class MemTableInserter : public WriteBatch::Handler { MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetImmutableMemTableOptions(); bool perform_merge = false; + assert(!concurrent_memtable_writes_ || + moptions->max_successive_merges == 0); // If we pass DB through and options.max_successive_merges is hit // during recovery, Get() will be issued which will try to acquire @@ -1505,6 +1506,7 @@ class MemTableInserter : public WriteBatch::Handler { // So we disable merge in recovery if (moptions->max_successive_merges > 0 && db_ != nullptr && recovering_log_number_ == 0) { + assert(!concurrent_memtable_writes_); LookupKey lkey(key, sequence_); // Count the number of successive merges at the head @@ -1550,6 +1552,7 @@ class MemTableInserter : public WriteBatch::Handler { perform_merge = false; } else { // 3) Add value to memtable + assert(!concurrent_memtable_writes_); bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value); if (UNLIKELY(!mem_res)) { assert(seq_per_batch_); @@ -1562,7 +1565,9 @@ class MemTableInserter : public WriteBatch::Handler { if (!perform_merge) { // Add merge operator to memtable - bool mem_res = mem->Add(sequence_, kTypeMerge, key, value); + bool mem_res = + mem->Add(sequence_, kTypeMerge, key, value, + concurrent_memtable_writes_, get_post_process_info(mem)); if (UNLIKELY(!mem_res)) { assert(seq_per_batch_); ret_status = Status::TryAgain("key+seq exists"); diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index cb880560e..7f2b20d89 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -124,6 +124,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { {false, false, true, false, true}, }; + for (auto& unordered_write : {true, false}) { for (auto& seq_per_batch : {true, false}) { for (auto& two_queues : {true, false}) { for (auto& allow_parallel : {true, false}) { @@ -133,15 +134,22 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { for (auto& write_group : write_scenarios) { Options options; options.create_if_missing = true; + options.unordered_write = unordered_write; options.allow_concurrent_memtable_write = allow_parallel; options.enable_pipelined_write = enable_pipelined_write; options.two_write_queues = two_queues; + // Skip unsupported combinations if (options.enable_pipelined_write && seq_per_batch) { - // This combination is not supported continue; } if (options.enable_pipelined_write && options.two_write_queues) { - // This combination is not supported + continue; + } + if (options.unordered_write && + !options.allow_concurrent_memtable_write) { + continue; + } + if (options.unordered_write && options.enable_pipelined_write) { continue; } @@ -358,8 +366,9 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { } } } -} -} + } + } + } } TEST_F(WriteCallbackTest, WriteCallBackTest) { diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index ed0709d22..5e75dd709 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -845,6 +845,8 @@ rocksdb_options_set_max_write_buffer_number_to_maintain(rocksdb_options_t*, int); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_enable_pipelined_write( rocksdb_options_t*, unsigned char); +extern ROCKSDB_LIBRARY_API void rocksdb_options_set_unordered_write( + rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_subcompactions( rocksdb_options_t*, uint32_t); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_background_jobs( diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index a1071f62e..c8b4cc538 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -893,6 +893,31 @@ struct DBOptions { // Default: false bool enable_pipelined_write = false; + // Setting unordered_write to true trades higher write throughput with + // relaxing the immutability guarantee of snapshots. This violates the + // repeatability one expects from ::Get from a snapshot, as well as + // ::MultiGet and Iterator's consistent-point-in-time view property. + // If the application cannot tolerate the relaxed guarantees, it can implement + // its own mechanisms to work around that and yet benefit from the higher + // throughput. Using TransactionDB with WRITE_PREPARED write policy is one way + // to achieve immutable snapshots despite unordered_write. + // + // By default, i.e., when it is false, rocksdb does not advance the sequence + // number for new snapshots unless all the writes with lower sequence numbers + // are already finished. This provides the immutability that we except from + // snapshots. Moreover, since Iterator and MultiGet internally depend on + // snapshots, the snapshot immutability results into Iterator and MultiGet + // offering consistent-point-in-time view. If set to true, although + // Read-Your-Own-Write property is still provided, the snapshot immutability + // property is relaxed: the writes issued after the snapshot is obtained (with + // larger sequence numbers) will be still not visible to the reads from that + // snapshot, however, there still might be pending writes (with lower sequence + // number) that will change the state visible to the snapshot after they are + // landed to the memtable. + // + // Default: false + bool unordered_write = false; + // If true, allow multi-writers to update mem tables in parallel. // Only some memtable_factory-s support concurrent writes; currently it // is implemented only for SkipListFactory. Concurrent memtable writes diff --git a/options/db_options.cc b/options/db_options.cc index 83f1a18b0..e180238f4 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -67,6 +67,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) listeners(options.listeners), enable_thread_tracking(options.enable_thread_tracking), enable_pipelined_write(options.enable_pipelined_write), + unordered_write(options.unordered_write), allow_concurrent_memtable_write(options.allow_concurrent_memtable_write), enable_write_thread_adaptive_yield( options.enable_write_thread_adaptive_yield), @@ -185,6 +186,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { enable_thread_tracking); ROCKS_LOG_HEADER(log, " Options.enable_pipelined_write: %d", enable_pipelined_write); + ROCKS_LOG_HEADER(log, " Options.unordered_write: %d", + unordered_write); ROCKS_LOG_HEADER(log, " Options.allow_concurrent_memtable_write: %d", allow_concurrent_memtable_write); ROCKS_LOG_HEADER(log, " Options.enable_write_thread_adaptive_yield: %d", diff --git a/options/db_options.h b/options/db_options.h index 8d0200362..67b26786f 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -60,6 +60,7 @@ struct ImmutableDBOptions { std::vector> listeners; bool enable_thread_tracking; bool enable_pipelined_write; + bool unordered_write; bool allow_concurrent_memtable_write; bool enable_write_thread_adaptive_yield; uint64_t write_thread_max_yield_usec; diff --git a/options/options_helper.cc b/options/options_helper.cc index a973bbfde..c33c2be6f 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -103,6 +103,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.enable_thread_tracking = immutable_db_options.enable_thread_tracking; options.delayed_write_rate = mutable_db_options.delayed_write_rate; options.enable_pipelined_write = immutable_db_options.enable_pipelined_write; + options.unordered_write = immutable_db_options.unordered_write; options.allow_concurrent_memtable_write = immutable_db_options.allow_concurrent_memtable_write; options.enable_write_thread_adaptive_yield = @@ -1583,6 +1584,9 @@ std::unordered_map {"enable_pipelined_write", {offsetof(struct DBOptions, enable_pipelined_write), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, + {"unordered_write", + {offsetof(struct DBOptions, unordered_write), OptionType::kBoolean, + OptionVerificationType::kNormal, false, 0}}, {"allow_concurrent_memtable_write", {offsetof(struct DBOptions, allow_concurrent_memtable_write), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 2d6cc11c0..79a4fa814 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -279,6 +279,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "advise_random_on_open=true;" "fail_if_options_file_error=false;" "enable_pipelined_write=false;" + "unordered_write=false;" "allow_concurrent_memtable_write=true;" "wal_recovery_mode=kPointInTimeRecovery;" "enable_write_thread_adaptive_yield=true;" diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 47fe8e1b0..790a2c99e 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -227,7 +227,7 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder( } Status BlockBasedTableFactory::SanitizeOptions( - const DBOptions& /*db_opts*/, const ColumnFamilyOptions& cf_opts) const { + const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const { if (table_options_.index_type == BlockBasedTableOptions::kHashSearch && cf_opts.prefix_extractor == nullptr) { return Status::InvalidArgument( @@ -268,6 +268,12 @@ Status BlockBasedTableFactory::SanitizeOptions( "data_block_hash_table_util_ratio should be greater than 0 when " "data_block_index_type is set to kDataBlockBinaryAndHash"); } + if (db_opts.unordered_write && cf_opts.max_successive_merges > 0) { + // TODO(myabandeh): support it + return Status::InvalidArgument( + "max_successive_merges larger than 0 is currently inconsistent with " + "unordered_write"); + } return Status::OK(); } diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index b2562f4e5..b806fff89 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -891,6 +891,9 @@ DEFINE_uint64(delayed_write_rate, 8388608u, DEFINE_bool(enable_pipelined_write, true, "Allow WAL and memtable writes to be pipelined"); +DEFINE_bool(unordered_write, false, + "Allow WAL and memtable writes to be pipelined"); + DEFINE_bool(allow_concurrent_memtable_write, true, "Allow multi-writers to update mem tables in parallel."); @@ -3552,6 +3555,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { options.enable_write_thread_adaptive_yield = FLAGS_enable_write_thread_adaptive_yield; options.enable_pipelined_write = FLAGS_enable_pipelined_write; + options.unordered_write = FLAGS_unordered_write; options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec; options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec; options.rate_limit_delay_max_milliseconds = diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 8eb21777a..05973e83a 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -221,9 +221,18 @@ Status TransactionDB::Open( std::vector* handles, TransactionDB** dbptr) { Status s; DB* db = nullptr; + if (txn_db_options.write_policy == WRITE_COMMITTED && + db_options.unordered_write) { + return Status::NotSupported( + "WRITE_COMMITTED is incompatible with unordered_writes"); + } + if (txn_db_options.write_policy == WRITE_UNPREPARED && + db_options.unordered_write) { + // TODO(lth): support it + return Status::NotSupported( + "WRITE_UNPREPARED is currently incompatible with unordered_writes"); + } - ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is %" PRId32, - static_cast(txn_db_options.write_policy)); std::vector column_families_copy = column_families; std::vector compaction_enabled_cf_indices; DBOptions db_options_2pc = db_options; @@ -238,6 +247,9 @@ Status TransactionDB::Open( s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db, use_seq_per_batch, use_batch_per_txn); if (s.ok()) { + ROCKS_LOG_WARN(db->GetDBOptions().info_log, + "Transaction write_policy is %" PRId32, + static_cast(txn_db_options.write_policy)); s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles, dbptr); } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 1a5bf2d66..997a5abe2 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -42,40 +42,48 @@ namespace rocksdb { INSTANTIATE_TEST_CASE_P( DBAsBaseDB, TransactionTest, - ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), - std::make_tuple(false, true, WRITE_COMMITTED), - std::make_tuple(false, false, WRITE_PREPARED), - std::make_tuple(false, true, WRITE_PREPARED), - std::make_tuple(false, false, WRITE_UNPREPARED), - std::make_tuple(false, true, WRITE_UNPREPARED))); + ::testing::Values( + std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite), + std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite), + std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite), + std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite))); INSTANTIATE_TEST_CASE_P( DBAsBaseDB, TransactionStressTest, - ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), - std::make_tuple(false, true, WRITE_COMMITTED), - std::make_tuple(false, false, WRITE_PREPARED), - std::make_tuple(false, true, WRITE_PREPARED), - std::make_tuple(false, false, WRITE_UNPREPARED), - std::make_tuple(false, true, WRITE_UNPREPARED))); + ::testing::Values( + std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite), + std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite), + std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite), + std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite))); INSTANTIATE_TEST_CASE_P( StackableDBAsBaseDB, TransactionTest, - ::testing::Values(std::make_tuple(true, true, WRITE_COMMITTED), - std::make_tuple(true, true, WRITE_PREPARED), - std::make_tuple(true, true, WRITE_UNPREPARED))); + ::testing::Values( + std::make_tuple(true, true, WRITE_COMMITTED, kOrderedWrite), + std::make_tuple(true, true, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(true, true, WRITE_UNPREPARED, kOrderedWrite))); // MySQLStyleTransactionTest takes far too long for valgrind to run. #ifndef ROCKSDB_VALGRIND_RUN INSTANTIATE_TEST_CASE_P( MySQLStyleTransactionTest, MySQLStyleTransactionTest, - ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED, false), - std::make_tuple(false, true, WRITE_COMMITTED, false), - std::make_tuple(false, false, WRITE_PREPARED, false), - std::make_tuple(false, false, WRITE_PREPARED, true), - std::make_tuple(false, true, WRITE_PREPARED, false), - std::make_tuple(false, true, WRITE_PREPARED, true), - std::make_tuple(false, false, WRITE_UNPREPARED, false), - std::make_tuple(false, false, WRITE_UNPREPARED, true), - std::make_tuple(false, true, WRITE_UNPREPARED, false), - std::make_tuple(false, true, WRITE_UNPREPARED, true))); + ::testing::Values( + std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite, false), + std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite, false), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, false), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, true), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, false), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, true), + std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, false), + std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, true), + std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, false), + std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, true))); #endif // ROCKSDB_VALGRIND_RUN TEST_P(TransactionTest, DoubleEmptyWrite) { @@ -5646,7 +5654,7 @@ TEST_P(TransactionTest, DuplicateKeys) { } // do_rollback } // do_prepare - { + if (!options.unordered_write) { // Also test with max_successive_merges > 0. max_successive_merges will not // affect our algorithm for duplicate key insertion but we add the test to // verify that. @@ -5697,6 +5705,7 @@ TEST_P(TransactionTest, DuplicateKeys) { std::unique_ptr comp_gc(new ThreeBytewiseComparator()); cf_options.comparator = comp_gc.get(); + cf_options.merge_operator = MergeOperators::CreateStringAppendOperator(); ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); delete cf_handle; std::vector cfds{ diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 33b2c51ea..b42548709 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -39,6 +39,8 @@ namespace rocksdb { // Return true if the ith bit is set in combination represented by comb bool IsInCombination(size_t i, size_t comb) { return comb & (size_t(1) << i); } +enum WriteOrdering : bool { kOrderedWrite, kUnorderedWrite }; + class TransactionTestBase : public ::testing::Test { public: TransactionDB* db; @@ -50,11 +52,13 @@ class TransactionTestBase : public ::testing::Test { bool use_stackable_db_; TransactionTestBase(bool use_stackable_db, bool two_write_queue, - TxnDBWritePolicy write_policy) + TxnDBWritePolicy write_policy, + WriteOrdering write_ordering) : db(nullptr), env(nullptr), use_stackable_db_(use_stackable_db) { options.create_if_missing = true; options.max_write_buffer_number = 2; options.write_buffer_size = 4 * 1024; + options.unordered_write = write_ordering == kUnorderedWrite; options.level0_file_num_compaction_trigger = 2; options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); env = new FaultInjectionTestEnv(Env::Default()); @@ -352,6 +356,9 @@ class TransactionTestBase : public ::testing::Test { Transaction* txn; txn_db_options.write_policy = from_policy; + if (txn_db_options.write_policy == WRITE_COMMITTED) { + options.unordered_write = false; + } ReOpen(); for (int i = 0; i < 1024; i++) { @@ -400,6 +407,9 @@ class TransactionTestBase : public ::testing::Test { } // for i txn_db_options.write_policy = to_policy; + if (txn_db_options.write_policy == WRITE_COMMITTED) { + options.unordered_write = false; + } auto db_impl = reinterpret_cast(db->GetRootDB()); // Before upgrade/downgrade the WAL must be emptied if (empty_wal) { @@ -437,13 +447,14 @@ class TransactionTestBase : public ::testing::Test { } }; -class TransactionTest : public TransactionTestBase, - virtual public ::testing::WithParamInterface< - std::tuple> { +class TransactionTest + : public TransactionTestBase, + virtual public ::testing::WithParamInterface< + std::tuple> { public: TransactionTest() : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()), - std::get<2>(GetParam())){}; + std::get<2>(GetParam()), std::get<3>(GetParam())){}; }; class TransactionStressTest : public TransactionTest {}; @@ -451,12 +462,12 @@ class TransactionStressTest : public TransactionTest {}; class MySQLStyleTransactionTest : public TransactionTestBase, virtual public ::testing::WithParamInterface< - std::tuple> { + std::tuple> { public: MySQLStyleTransactionTest() : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()), - std::get<2>(GetParam())), - with_slow_threads_(std::get<3>(GetParam())) { + std::get<2>(GetParam()), std::get<3>(GetParam())), + with_slow_threads_(std::get<4>(GetParam())) { if (with_slow_threads_ && (txn_db_options.write_policy == WRITE_PREPARED || txn_db_options.write_policy == WRITE_UNPREPARED)) { diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index f2f3f30e2..d5a03cd04 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -342,8 +342,10 @@ class WritePreparedTxnDBMock : public WritePreparedTxnDB { class WritePreparedTransactionTestBase : public TransactionTestBase { public: WritePreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue, - TxnDBWritePolicy write_policy) - : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){}; + TxnDBWritePolicy write_policy, + WriteOrdering write_ordering) + : TransactionTestBase(use_stackable_db, two_write_queue, write_policy, + write_ordering){}; protected: void UpdateTransactionDBOptions(size_t snapshot_cache_bits, @@ -518,26 +520,26 @@ class WritePreparedTransactionTestBase : public TransactionTestBase { class WritePreparedTransactionTest : public WritePreparedTransactionTestBase, virtual public ::testing::WithParamInterface< - std::tuple> { + std::tuple> { public: WritePreparedTransactionTest() - : WritePreparedTransactionTestBase(std::get<0>(GetParam()), - std::get<1>(GetParam()), - std::get<2>(GetParam())){}; + : WritePreparedTransactionTestBase( + std::get<0>(GetParam()), std::get<1>(GetParam()), + std::get<2>(GetParam()), std::get<3>(GetParam())){}; }; #ifndef ROCKSDB_VALGRIND_RUN class SnapshotConcurrentAccessTest : public WritePreparedTransactionTestBase, - virtual public ::testing::WithParamInterface< - std::tuple> { + virtual public ::testing::WithParamInterface> { public: SnapshotConcurrentAccessTest() - : WritePreparedTransactionTestBase(std::get<0>(GetParam()), - std::get<1>(GetParam()), - std::get<2>(GetParam())), - split_id_(std::get<3>(GetParam())), - split_cnt_(std::get<4>(GetParam())){}; + : WritePreparedTransactionTestBase( + std::get<0>(GetParam()), std::get<1>(GetParam()), + std::get<2>(GetParam()), std::get<3>(GetParam())), + split_id_(std::get<4>(GetParam())), + split_cnt_(std::get<5>(GetParam())){}; protected: // A test is split into split_cnt_ tests, each identified with split_id_ where @@ -549,15 +551,15 @@ class SnapshotConcurrentAccessTest class SeqAdvanceConcurrentTest : public WritePreparedTransactionTestBase, - virtual public ::testing::WithParamInterface< - std::tuple> { + virtual public ::testing::WithParamInterface> { public: SeqAdvanceConcurrentTest() - : WritePreparedTransactionTestBase(std::get<0>(GetParam()), - std::get<1>(GetParam()), - std::get<2>(GetParam())), - split_id_(std::get<3>(GetParam())), - split_cnt_(std::get<4>(GetParam())){}; + : WritePreparedTransactionTestBase( + std::get<0>(GetParam()), std::get<1>(GetParam()), + std::get<2>(GetParam()), std::get<3>(GetParam())), + split_id_(std::get<4>(GetParam())), + split_cnt_(std::get<5>(GetParam())){}; protected: // A test is split into split_cnt_ tests, each identified with split_id_ where @@ -568,81 +570,152 @@ class SeqAdvanceConcurrentTest INSTANTIATE_TEST_CASE_P( WritePreparedTransactionTest, WritePreparedTransactionTest, - ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED), - std::make_tuple(false, true, WRITE_PREPARED))); + ::testing::Values( + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite))); #ifndef ROCKSDB_VALGRIND_RUN INSTANTIATE_TEST_CASE_P( TwoWriteQueues, SnapshotConcurrentAccessTest, - ::testing::Values(std::make_tuple(false, true, WRITE_PREPARED, 0, 20), - std::make_tuple(false, true, WRITE_PREPARED, 1, 20), - std::make_tuple(false, true, WRITE_PREPARED, 2, 20), - std::make_tuple(false, true, WRITE_PREPARED, 3, 20), - std::make_tuple(false, true, WRITE_PREPARED, 4, 20), - std::make_tuple(false, true, WRITE_PREPARED, 5, 20), - std::make_tuple(false, true, WRITE_PREPARED, 6, 20), - std::make_tuple(false, true, WRITE_PREPARED, 7, 20), - std::make_tuple(false, true, WRITE_PREPARED, 8, 20), - std::make_tuple(false, true, WRITE_PREPARED, 9, 20), - std::make_tuple(false, true, WRITE_PREPARED, 10, 20), - std::make_tuple(false, true, WRITE_PREPARED, 11, 20), - std::make_tuple(false, true, WRITE_PREPARED, 12, 20), - std::make_tuple(false, true, WRITE_PREPARED, 13, 20), - std::make_tuple(false, true, WRITE_PREPARED, 14, 20), - std::make_tuple(false, true, WRITE_PREPARED, 15, 20), - std::make_tuple(false, true, WRITE_PREPARED, 16, 20), - std::make_tuple(false, true, WRITE_PREPARED, 17, 20), - std::make_tuple(false, true, WRITE_PREPARED, 18, 20), - std::make_tuple(false, true, WRITE_PREPARED, 19, 20))); + ::testing::Values( + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 10, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 11, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 12, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 13, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 14, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 15, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 16, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 17, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 18, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 19, 20), + + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 10, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 11, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 12, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 13, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 14, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 15, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 16, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 17, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 18, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 19, 20))); INSTANTIATE_TEST_CASE_P( OneWriteQueue, SnapshotConcurrentAccessTest, - ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED, 0, 20), - std::make_tuple(false, false, WRITE_PREPARED, 1, 20), - std::make_tuple(false, false, WRITE_PREPARED, 2, 20), - std::make_tuple(false, false, WRITE_PREPARED, 3, 20), - std::make_tuple(false, false, WRITE_PREPARED, 4, 20), - std::make_tuple(false, false, WRITE_PREPARED, 5, 20), - std::make_tuple(false, false, WRITE_PREPARED, 6, 20), - std::make_tuple(false, false, WRITE_PREPARED, 7, 20), - std::make_tuple(false, false, WRITE_PREPARED, 8, 20), - std::make_tuple(false, false, WRITE_PREPARED, 9, 20), - std::make_tuple(false, false, WRITE_PREPARED, 10, 20), - std::make_tuple(false, false, WRITE_PREPARED, 11, 20), - std::make_tuple(false, false, WRITE_PREPARED, 12, 20), - std::make_tuple(false, false, WRITE_PREPARED, 13, 20), - std::make_tuple(false, false, WRITE_PREPARED, 14, 20), - std::make_tuple(false, false, WRITE_PREPARED, 15, 20), - std::make_tuple(false, false, WRITE_PREPARED, 16, 20), - std::make_tuple(false, false, WRITE_PREPARED, 17, 20), - std::make_tuple(false, false, WRITE_PREPARED, 18, 20), - std::make_tuple(false, false, WRITE_PREPARED, 19, 20))); + ::testing::Values( + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 10, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 11, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 12, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 13, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 14, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 15, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 16, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 17, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 18, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 19, 20), + + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 0, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 1, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 2, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 3, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 4, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 5, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 6, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 7, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 8, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 9, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 10, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 11, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 12, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 13, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 14, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 15, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 16, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 17, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 18, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 19, + 20))); INSTANTIATE_TEST_CASE_P( TwoWriteQueues, SeqAdvanceConcurrentTest, - ::testing::Values(std::make_tuple(false, true, WRITE_PREPARED, 0, 10), - std::make_tuple(false, true, WRITE_PREPARED, 1, 10), - std::make_tuple(false, true, WRITE_PREPARED, 2, 10), - std::make_tuple(false, true, WRITE_PREPARED, 3, 10), - std::make_tuple(false, true, WRITE_PREPARED, 4, 10), - std::make_tuple(false, true, WRITE_PREPARED, 5, 10), - std::make_tuple(false, true, WRITE_PREPARED, 6, 10), - std::make_tuple(false, true, WRITE_PREPARED, 7, 10), - std::make_tuple(false, true, WRITE_PREPARED, 8, 10), - std::make_tuple(false, true, WRITE_PREPARED, 9, 10))); + ::testing::Values( + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 10))); INSTANTIATE_TEST_CASE_P( OneWriteQueue, SeqAdvanceConcurrentTest, - ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED, 0, 10), - std::make_tuple(false, false, WRITE_PREPARED, 1, 10), - std::make_tuple(false, false, WRITE_PREPARED, 2, 10), - std::make_tuple(false, false, WRITE_PREPARED, 3, 10), - std::make_tuple(false, false, WRITE_PREPARED, 4, 10), - std::make_tuple(false, false, WRITE_PREPARED, 5, 10), - std::make_tuple(false, false, WRITE_PREPARED, 6, 10), - std::make_tuple(false, false, WRITE_PREPARED, 7, 10), - std::make_tuple(false, false, WRITE_PREPARED, 8, 10), - std::make_tuple(false, false, WRITE_PREPARED, 9, 10))); + ::testing::Values( + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 10), + + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 0, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 1, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 2, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 3, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 4, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 5, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 6, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 7, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 8, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 9, 10))); #endif // ROCKSDB_VALGRIND_RUN TEST_P(WritePreparedTransactionTest, CommitMapTest) { diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index 9aee33b07..914f3f581 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -20,7 +20,8 @@ class WriteUnpreparedTransactionTestBase : public TransactionTestBase { WriteUnpreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue, TxnDBWritePolicy write_policy) - : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){} + : TransactionTestBase(use_stackable_db, two_write_queue, write_policy, + kOrderedWrite) {} }; class WriteUnpreparedTransactionTest