diff --git a/HISTORY.md b/HISTORY.md index 23d8717f3..919dea211 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ ### New Features * Add an option `snap_refresh_nanos` (default to 0.1s) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature. +* Add an option `unordered_write` which trades snapshot guarantees with higher write throughput. When used with WRITE_PREPARED transactions, it offers higher throughput with however no compromise on guarantees. ### Performance Improvements * Reduce binary search when iterator reseek into the same data block. diff --git a/db/c.cc b/db/c.cc index 58b51e252..8f96366fb 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2473,6 +2473,11 @@ void rocksdb_options_set_enable_pipelined_write(rocksdb_options_t* opt, opt->rep.enable_pipelined_write = v; } +void rocksdb_options_set_unordered_write(rocksdb_options_t* opt, + unsigned char v) { + opt->rep.unordered_write = v; +} + void rocksdb_options_set_max_subcompactions(rocksdb_options_t* opt, uint32_t n) { opt->rep.max_subcompactions = n; diff --git a/db/db_bloom_filter_test.cc b/db/db_bloom_filter_test.cc index a2a01d6b4..beed590ae 100644 --- a/db/db_bloom_filter_test.cc +++ b/db/db_bloom_filter_test.cc @@ -1095,6 +1095,8 @@ TEST_F(DBBloomFilterTest, PrefixScan) { options.max_background_compactions = 2; options.create_if_missing = true; options.memtable_factory.reset(NewHashSkipListRepFactory(16)); + assert(!options.unordered_write); + // It is incompatible with allow_concurrent_memtable_write=false options.allow_concurrent_memtable_write = false; BlockBasedTableOptions table_options; diff --git a/db/db_impl.h b/db/db_impl.h index 623f69ba6..0ee5d82b5 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -897,14 +897,32 @@ class DBImpl : public DB { bool disable_memtable = false, uint64_t* seq_used = nullptr); - // batch_cnt is expected to be non-zero in seq_per_batch mode and indicates - // the number of sub-patches. A sub-patch is a subset of the write batch that - // does not have duplicate keys. - Status WriteImplWALOnly(const WriteOptions& options, WriteBatch* updates, - WriteCallback* callback = nullptr, - uint64_t* log_used = nullptr, uint64_t log_ref = 0, - uint64_t* seq_used = nullptr, size_t batch_cnt = 0, - PreReleaseCallback* pre_release_callback = nullptr); + // Write only to memtables without joining any write queue + Status UnorderedWriteMemtable(const WriteOptions& write_options, + WriteBatch* my_batch, WriteCallback* callback, + uint64_t log_ref, SequenceNumber seq, + const size_t sub_batch_cnt); + + // Whether the batch requires to be assigned with an order + enum AssignOrder : bool { kDontAssignOrder, kDoAssignOrder }; + // Whether it requires publishing last sequence or not + enum PublishLastSeq : bool { kDontPublishLastSeq, kDoPublishLastSeq }; + + // Join the write_thread to write the batch only to the WAL. It is the + // responsibility of the caller to also write the write batch to the memtable + // if it required. + // + // sub_batch_cnt is expected to be non-zero when assign_order = kDoAssignOrder + // indicating the number of sub-batches in my_batch. A sub-patch is a subset + // of the write batch that does not have duplicate keys. When seq_per_batch is + // not set, each key is a separate sub_batch. Otherwise each duplicate key + // marks start of a new sub-batch. + Status WriteImplWALOnly( + WriteThread* write_thread, const WriteOptions& options, + WriteBatch* updates, WriteCallback* callback, uint64_t* log_used, + const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt, + PreReleaseCallback* pre_release_callback, const AssignOrder assign_order, + const PublishLastSeq publish_last_seq, const bool disable_memtable); // write cached_recoverable_state_ to memtable if it is not empty // The writer must be the leader in write_thread_ and holding mutex_ @@ -1121,6 +1139,20 @@ class DBImpl : public DB { const autovector& flush_memtable_ids, bool resuming_from_bg_err); + inline void WaitForPendingWrites() { + if (!immutable_db_options_.unordered_write) { + // Then the writes are finished before the next write group starts + return; + } + // Wait for the ones who already wrote to the WAL to finish their + // memtable write. + if (pending_memtable_writes_.load() != 0) { + std::unique_lock guard(switch_mutex_); + switch_cv_.wait(guard, + [&] { return pending_memtable_writes_.load() == 0; }); + } + } + // REQUIRES: mutex locked and in write thread. void AssignAtomicFlushSeq(const autovector& cfds); @@ -1571,13 +1603,21 @@ class DBImpl : public DB { // corresponding call to PurgeObsoleteFiles has not yet finished. int pending_purge_obsolete_files_; - // last time when DeleteObsoleteFiles with full scan was executed. Originaly + // last time when DeleteObsoleteFiles with full scan was executed. Originally // initialized with startup time. uint64_t delete_obsolete_files_last_run_; // last time stats were dumped to LOG std::atomic last_stats_dump_time_microsec_; + // The thread that wants to switch memtable, can wait on this cv until the + // pending writes to memtable finishes. + std::condition_variable switch_cv_; + // The mutex used by switch_cv_. mutex_ should be acquired beforehand. + std::mutex switch_mutex_; + // Number of threads intending to write to memtable + std::atomic pending_memtable_writes_ = {}; + // Each flush or compaction gets its own job id. this counter makes sure // they're unique std::atomic next_job_id_; diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 1bc69b491..66104d0ba 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -228,6 +228,17 @@ static Status ValidateOptions( return Status::InvalidArgument("keep_log_file_num must be greater than 0"); } + if (db_options.unordered_write && + !db_options.allow_concurrent_memtable_write) { + return Status::InvalidArgument( + "unordered_write is incompatible with !allow_concurrent_memtable_write"); + } + + if (db_options.unordered_write && db_options.enable_pipelined_write) { + return Status::InvalidArgument( + "unordered_write is incompatible with enable_pipelined_write"); + } + return Status::OK(); } } // namespace diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 3edec9ac5..733eb408a 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -94,6 +94,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return Status::NotSupported( "pipelined_writes is not compatible with seq_per_batch"); } + if (immutable_db_options_.unordered_write && + immutable_db_options_.enable_pipelined_write) { + return Status::NotSupported( + "pipelined_writes is not compatible with unordered_write"); + } // Otherwise IsLatestPersistentState optimization does not make sense assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) || disable_memtable); @@ -107,8 +112,39 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } if (two_write_queues_ && disable_memtable) { - return WriteImplWALOnly(write_options, my_batch, callback, log_used, - log_ref, seq_used, batch_cnt, pre_release_callback); + AssignOrder assign_order = + seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder; + // Otherwise it is WAL-only Prepare batches in WriteCommitted policy and + // they don't consume sequence. + return WriteImplWALOnly(&nonmem_write_thread_, write_options, my_batch, + callback, log_used, log_ref, seq_used, batch_cnt, + pre_release_callback, assign_order, + kDontPublishLastSeq, disable_memtable); + } + + if (immutable_db_options_.unordered_write) { + const size_t sub_batch_cnt = batch_cnt != 0 + ? batch_cnt + // every key is a sub-batch consuming a seq + : WriteBatchInternal::Count(my_batch); + uint64_t seq; + // Use a write thread to i) optimize for WAL write, ii) publish last + // sequence in in increasing order, iii) call pre_release_callback serially + status = WriteImplWALOnly(&write_thread_, write_options, my_batch, callback, + log_used, log_ref, &seq, sub_batch_cnt, + pre_release_callback, kDoAssignOrder, + kDoPublishLastSeq, disable_memtable); + if (!status.ok()) { + return status; + } + if (seq_used) { + *seq_used = seq; + } + if (!disable_memtable) { + status = UnorderedWriteMemtable(write_options, my_batch, callback, + log_ref, seq, sub_batch_cnt); + } + return status; } if (immutable_db_options_.enable_pipelined_write) { @@ -534,23 +570,65 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, return w.FinalStatus(); } +Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options, + WriteBatch* my_batch, + WriteCallback* callback, uint64_t log_ref, + SequenceNumber seq, + const size_t sub_batch_cnt) { + PERF_TIMER_GUARD(write_pre_and_post_process_time); + StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); + + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + false /*disable_memtable*/); + + if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) { + w.sequence = seq; + size_t total_count = WriteBatchInternal::Count(my_batch); + InternalStats* stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count); + RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + + ColumnFamilyMemTablesImpl column_family_memtables( + versions_->GetColumnFamilySet()); + w.status = WriteBatchInternal::InsertInto( + &w, w.sequence, &column_family_memtables, &flush_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, this, + true /*concurrent_memtable_writes*/, seq_per_batch_, sub_batch_cnt); + + WriteStatusCheck(w.status); + if (write_options.disableWAL) { + has_unpersisted_data_.store(true, std::memory_order_relaxed); + } + } + + size_t pending_cnt = pending_memtable_writes_.fetch_sub(1) - 1; + if (pending_cnt == 0) { + switch_cv_.notify_all(); + } + + if (!w.FinalStatus().ok()) { + return w.FinalStatus(); + } + return Status::OK(); +} + // The 2nd write queue. If enabled it will be used only for WAL-only writes. // This is the only queue that updates LastPublishedSequence which is only // applicable in a two-queue setting. -Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, - WriteBatch* my_batch, WriteCallback* callback, - uint64_t* log_used, uint64_t log_ref, - uint64_t* seq_used, size_t batch_cnt, - PreReleaseCallback* pre_release_callback) { +Status DBImpl::WriteImplWALOnly( + WriteThread* write_thread, const WriteOptions& write_options, + WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, + const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt, + PreReleaseCallback* pre_release_callback, const AssignOrder assign_order, + const PublishLastSeq publish_last_seq, const bool disable_memtable) { Status status; PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w(write_options, my_batch, callback, log_ref, - true /* disable_memtable */, batch_cnt, - pre_release_callback); + disable_memtable, sub_batch_cnt, pre_release_callback); RecordTick(stats_, WRITE_WITH_WAL); StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); - nonmem_write_thread_.JoinBatchGroup(&w); + write_thread->JoinBatchGroup(&w); assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER); if (w.state == WriteThread::STATE_COMPLETED) { if (log_used != nullptr) { @@ -563,9 +641,33 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, } // else we are the leader of the write batch group assert(w.state == WriteThread::STATE_GROUP_LEADER); + + if (publish_last_seq == kDoPublishLastSeq) { + // Currently we only use kDoPublishLastSeq in unordered_write + assert(immutable_db_options_.unordered_write); + WriteContext write_context; + if (error_handler_.IsDBStopped()) { + status = error_handler_.GetBGError(); + } + // TODO(myabandeh): Make preliminary checks thread-safe so we could do them + // without paying the cost of obtaining the mutex. + if (status.ok()) { + InstrumentedMutexLock l(&mutex_); + bool need_log_sync = false; + status = PreprocessWrite(write_options, &need_log_sync, &write_context); + WriteStatusCheck(status); + } + if (!status.ok()) { + WriteThread::WriteGroup write_group; + write_thread->EnterAsBatchGroupLeader(&w, &write_group); + write_thread->ExitAsBatchGroupLeader(write_group, status); + return status; + } + } + WriteThread::WriteGroup write_group; uint64_t last_sequence; - nonmem_write_thread_.EnterAsBatchGroupLeader(&w, &write_group); + write_thread->EnterAsBatchGroupLeader(&w, &write_group); // Note: no need to update last_batch_group_size_ here since the batch writes // to WAL only @@ -602,11 +704,13 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, // LastAllocatedSequence is increased inside WriteToWAL under // wal_write_mutex_ to ensure ordered events in WAL size_t seq_inc = 0 /* total_count */; - if (seq_per_batch_) { + if (assign_order == kDoAssignOrder) { size_t total_batch_cnt = 0; for (auto* writer : write_group) { - assert(writer->batch_cnt); - total_batch_cnt += writer->batch_cnt; + assert(writer->batch_cnt || !seq_per_batch_); + if (!writer->CallbackFailed()) { + total_batch_cnt += writer->batch_cnt; + } } seq_inc = total_batch_cnt; } @@ -617,16 +721,21 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, // Otherwise we inc seq number to do solely the seq allocation last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); } + + size_t memtable_write_cnt = 0; auto curr_seq = last_sequence + 1; for (auto* writer : write_group) { if (writer->CallbackFailed()) { continue; } writer->sequence = curr_seq; - if (seq_per_batch_) { - assert(writer->batch_cnt); + if (assign_order == kDoAssignOrder) { + assert(writer->batch_cnt || !seq_per_batch_); curr_seq += writer->batch_cnt; } + if (!writer->disable_memtable) { + memtable_write_cnt++; + } // else seq advances only by memtable writes } if (status.ok() && write_options.sync) { @@ -648,9 +757,8 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, for (auto* writer : write_group) { if (!writer->CallbackFailed() && writer->pre_release_callback) { assert(writer->sequence != kMaxSequenceNumber); - const bool DISABLE_MEMTABLE = true; Status ws = writer->pre_release_callback->Callback( - writer->sequence, DISABLE_MEMTABLE, writer->log_used); + writer->sequence, disable_memtable, writer->log_used); if (!ws.ok()) { status = ws; break; @@ -658,7 +766,15 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, } } } - nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, status); + if (publish_last_seq == kDoPublishLastSeq) { + versions_->SetLastSequence(last_sequence + seq_inc); + // Currently we only use kDoPublishLastSeq in unordered_write + assert(immutable_db_options_.unordered_write); + } + if (immutable_db_options_.unordered_write && status.ok()) { + pending_memtable_writes_ += memtable_write_cnt; + } + write_thread->ExitAsBatchGroupLeader(write_group, status); if (status.ok()) { status = w.FinalStatus(); } @@ -710,6 +826,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); if (UNLIKELY(status.ok() && !single_column_family_mode_ && total_log_size_ > GetMaxTotalWalSize())) { + WaitForPendingWrites(); status = SwitchWAL(write_context); } @@ -719,10 +836,12 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, // thread is writing to another DB with the same write buffer, they may also // be flushed. We may end up with flushing much more DBs than needed. It's // suboptimal but still correct. + WaitForPendingWrites(); status = HandleWriteBufferFull(write_context); } if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { + WaitForPendingWrites(); status = ScheduleFlushes(write_context); } diff --git a/db/db_memtable_test.cc b/db/db_memtable_test.cc index 294d0f581..a212c9812 100644 --- a/db/db_memtable_test.cc +++ b/db/db_memtable_test.cc @@ -204,6 +204,75 @@ TEST_F(DBMemTableTest, DuplicateSeq) { delete mem; } +// A simple test to verify that the concurrent merge writes is functional +TEST_F(DBMemTableTest, ConcurrentMergeWrite) { + int num_ops = 1000; + std::string value; + Status s; + MergeContext merge_context; + Options options; + // A merge operator that is not sensitive to concurrent writes since in this + // test we don't order the writes. + options.merge_operator = MergeOperators::CreateUInt64AddOperator(); + + // Create a MemTable + InternalKeyComparator cmp(BytewiseComparator()); + auto factory = std::make_shared(); + options.memtable_factory = factory; + options.allow_concurrent_memtable_write = true; + ImmutableCFOptions ioptions(options); + WriteBufferManager wb(options.db_write_buffer_size); + MemTablePostProcessInfo post_process_info; + MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb, + kMaxSequenceNumber, 0 /* column_family_id */); + + // Put 0 as the base + PutFixed64(&value, static_cast(0)); + bool res = mem->Add(0, kTypeValue, "key", value); + ASSERT_TRUE(res); + value.clear(); + + // Write Merge concurrently + rocksdb::port::Thread write_thread1([&]() { + std::string v1; + for (int seq = 1; seq < num_ops / 2; seq++) { + PutFixed64(&v1, seq); + bool res1 = + mem->Add(seq, kTypeMerge, "key", v1, true, &post_process_info); + ASSERT_TRUE(res1); + v1.clear(); + } + }); + rocksdb::port::Thread write_thread2([&]() { + std::string v2; + for (int seq = num_ops / 2; seq < num_ops; seq++) { + PutFixed64(&v2, seq); + bool res2 = + mem->Add(seq, kTypeMerge, "key", v2, true, &post_process_info); + ASSERT_TRUE(res2); + v2.clear(); + } + }); + write_thread1.join(); + write_thread2.join(); + + Status status; + ReadOptions roptions; + SequenceNumber max_covering_tombstone_seq = 0; + LookupKey lkey("key", kMaxSequenceNumber); + res = mem->Get(lkey, &value, &status, &merge_context, + &max_covering_tombstone_seq, roptions); + ASSERT_TRUE(res); + uint64_t ivalue = DecodeFixed64(Slice(value).data()); + uint64_t sum = 0; + for (int seq = 0; seq < num_ops; seq++) { + sum += seq; + } + ASSERT_EQ(ivalue, sum); + + delete mem; +} + TEST_F(DBMemTableTest, InsertWithHint) { Options options; options.allow_concurrent_memtable_write = false; diff --git a/db/db_test_util.cc b/db/db_test_util.cc index bee6b81d5..ebfc7a9ca 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -341,6 +341,7 @@ Options DBTestBase::GetOptions( options.prefix_extractor.reset(NewFixedPrefixTransform(1)); options.memtable_factory.reset(NewHashSkipListRepFactory(16)); options.allow_concurrent_memtable_write = false; + options.unordered_write = false; break; case kPlainTableFirstBytePrefix: options.table_factory.reset(new PlainTableFactory()); @@ -373,12 +374,14 @@ Options DBTestBase::GetOptions( case kVectorRep: options.memtable_factory.reset(new VectorRepFactory(100)); options.allow_concurrent_memtable_write = false; + options.unordered_write = false; break; case kHashLinkList: options.prefix_extractor.reset(NewFixedPrefixTransform(1)); options.memtable_factory.reset( NewHashLinkListRepFactory(4, 0, 3, true, 4)); options.allow_concurrent_memtable_write = false; + options.unordered_write = false; break; case kDirectIO: { options.use_direct_reads = true; @@ -540,6 +543,11 @@ Options DBTestBase::GetOptions( options.manual_wal_flush = true; break; } + case kUnorderedWrite: { + options.allow_concurrent_memtable_write = false; + options.unordered_write = false; + break; + } default: break; diff --git a/db/db_test_util.h b/db/db_test_util.h index 50109e0a4..f5d7fd1a7 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -140,6 +140,11 @@ class SpecialMemTableRep : public MemTableRep { memtable_->Insert(handle); } + void InsertConcurrently(KeyHandle handle) override { + num_entries_++; + memtable_->Insert(handle); + } + // Returns true iff an entry that compares equal to key is in the list. virtual bool Contains(const char* key) const override { return memtable_->Contains(key); @@ -688,6 +693,7 @@ class DBTestBase : public testing::Test { kPartitionedFilterWithNewTableReaderForCompactions, kUniversalSubcompactions, kxxHash64Checksum, + kUnorderedWrite, // This must be the last line kEnd, }; diff --git a/db/flush_scheduler.h b/db/flush_scheduler.h index cd3575861..b5abec405 100644 --- a/db/flush_scheduler.h +++ b/db/flush_scheduler.h @@ -28,6 +28,9 @@ class FlushScheduler { // Filters column families that have been dropped. ColumnFamilyData* TakeNextColumnFamily(); + // This can be called concurrently with ScheduleFlush but it would miss all + // the scheduled flushes after the last synchronization. This would result + // into less precise enforcement of memtable sizes but should not matter much. bool Empty(); void Clear(); diff --git a/db/plain_table_db_test.cc b/db/plain_table_db_test.cc index 2dd0cff0b..8a08cf9fe 100644 --- a/db/plain_table_db_test.cc +++ b/db/plain_table_db_test.cc @@ -142,6 +142,7 @@ class PlainTableDBTest : public testing::Test, options.prefix_extractor.reset(NewFixedPrefixTransform(8)); options.allow_mmap_reads = mmap_mode_; options.allow_concurrent_memtable_write = false; + options.unordered_write = false; return options; } diff --git a/db/write_batch.cc b/db/write_batch.cc index 939b59530..830fbeab1 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1471,7 +1471,6 @@ class MemTableInserter : public WriteBatch::Handler { Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { - assert(!concurrent_memtable_writes_); // optimize for non-recovery mode if (UNLIKELY(write_after_commit_ && rebuilding_trx_ != nullptr)) { WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value); @@ -1498,6 +1497,8 @@ class MemTableInserter : public WriteBatch::Handler { MemTable* mem = cf_mems_->GetMemTable(); auto* moptions = mem->GetImmutableMemTableOptions(); bool perform_merge = false; + assert(!concurrent_memtable_writes_ || + moptions->max_successive_merges == 0); // If we pass DB through and options.max_successive_merges is hit // during recovery, Get() will be issued which will try to acquire @@ -1505,6 +1506,7 @@ class MemTableInserter : public WriteBatch::Handler { // So we disable merge in recovery if (moptions->max_successive_merges > 0 && db_ != nullptr && recovering_log_number_ == 0) { + assert(!concurrent_memtable_writes_); LookupKey lkey(key, sequence_); // Count the number of successive merges at the head @@ -1550,6 +1552,7 @@ class MemTableInserter : public WriteBatch::Handler { perform_merge = false; } else { // 3) Add value to memtable + assert(!concurrent_memtable_writes_); bool mem_res = mem->Add(sequence_, kTypeValue, key, new_value); if (UNLIKELY(!mem_res)) { assert(seq_per_batch_); @@ -1562,7 +1565,9 @@ class MemTableInserter : public WriteBatch::Handler { if (!perform_merge) { // Add merge operator to memtable - bool mem_res = mem->Add(sequence_, kTypeMerge, key, value); + bool mem_res = + mem->Add(sequence_, kTypeMerge, key, value, + concurrent_memtable_writes_, get_post_process_info(mem)); if (UNLIKELY(!mem_res)) { assert(seq_per_batch_); ret_status = Status::TryAgain("key+seq exists"); diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index cb880560e..7f2b20d89 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -124,6 +124,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { {false, false, true, false, true}, }; + for (auto& unordered_write : {true, false}) { for (auto& seq_per_batch : {true, false}) { for (auto& two_queues : {true, false}) { for (auto& allow_parallel : {true, false}) { @@ -133,15 +134,22 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { for (auto& write_group : write_scenarios) { Options options; options.create_if_missing = true; + options.unordered_write = unordered_write; options.allow_concurrent_memtable_write = allow_parallel; options.enable_pipelined_write = enable_pipelined_write; options.two_write_queues = two_queues; + // Skip unsupported combinations if (options.enable_pipelined_write && seq_per_batch) { - // This combination is not supported continue; } if (options.enable_pipelined_write && options.two_write_queues) { - // This combination is not supported + continue; + } + if (options.unordered_write && + !options.allow_concurrent_memtable_write) { + continue; + } + if (options.unordered_write && options.enable_pipelined_write) { continue; } @@ -358,8 +366,9 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { } } } -} -} + } + } + } } TEST_F(WriteCallbackTest, WriteCallBackTest) { diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index ed0709d22..5e75dd709 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -845,6 +845,8 @@ rocksdb_options_set_max_write_buffer_number_to_maintain(rocksdb_options_t*, int); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_enable_pipelined_write( rocksdb_options_t*, unsigned char); +extern ROCKSDB_LIBRARY_API void rocksdb_options_set_unordered_write( + rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_subcompactions( rocksdb_options_t*, uint32_t); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_background_jobs( diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index a1071f62e..c8b4cc538 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -893,6 +893,31 @@ struct DBOptions { // Default: false bool enable_pipelined_write = false; + // Setting unordered_write to true trades higher write throughput with + // relaxing the immutability guarantee of snapshots. This violates the + // repeatability one expects from ::Get from a snapshot, as well as + // ::MultiGet and Iterator's consistent-point-in-time view property. + // If the application cannot tolerate the relaxed guarantees, it can implement + // its own mechanisms to work around that and yet benefit from the higher + // throughput. Using TransactionDB with WRITE_PREPARED write policy is one way + // to achieve immutable snapshots despite unordered_write. + // + // By default, i.e., when it is false, rocksdb does not advance the sequence + // number for new snapshots unless all the writes with lower sequence numbers + // are already finished. This provides the immutability that we except from + // snapshots. Moreover, since Iterator and MultiGet internally depend on + // snapshots, the snapshot immutability results into Iterator and MultiGet + // offering consistent-point-in-time view. If set to true, although + // Read-Your-Own-Write property is still provided, the snapshot immutability + // property is relaxed: the writes issued after the snapshot is obtained (with + // larger sequence numbers) will be still not visible to the reads from that + // snapshot, however, there still might be pending writes (with lower sequence + // number) that will change the state visible to the snapshot after they are + // landed to the memtable. + // + // Default: false + bool unordered_write = false; + // If true, allow multi-writers to update mem tables in parallel. // Only some memtable_factory-s support concurrent writes; currently it // is implemented only for SkipListFactory. Concurrent memtable writes diff --git a/options/db_options.cc b/options/db_options.cc index 83f1a18b0..e180238f4 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -67,6 +67,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) listeners(options.listeners), enable_thread_tracking(options.enable_thread_tracking), enable_pipelined_write(options.enable_pipelined_write), + unordered_write(options.unordered_write), allow_concurrent_memtable_write(options.allow_concurrent_memtable_write), enable_write_thread_adaptive_yield( options.enable_write_thread_adaptive_yield), @@ -185,6 +186,8 @@ void ImmutableDBOptions::Dump(Logger* log) const { enable_thread_tracking); ROCKS_LOG_HEADER(log, " Options.enable_pipelined_write: %d", enable_pipelined_write); + ROCKS_LOG_HEADER(log, " Options.unordered_write: %d", + unordered_write); ROCKS_LOG_HEADER(log, " Options.allow_concurrent_memtable_write: %d", allow_concurrent_memtable_write); ROCKS_LOG_HEADER(log, " Options.enable_write_thread_adaptive_yield: %d", diff --git a/options/db_options.h b/options/db_options.h index 8d0200362..67b26786f 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -60,6 +60,7 @@ struct ImmutableDBOptions { std::vector> listeners; bool enable_thread_tracking; bool enable_pipelined_write; + bool unordered_write; bool allow_concurrent_memtable_write; bool enable_write_thread_adaptive_yield; uint64_t write_thread_max_yield_usec; diff --git a/options/options_helper.cc b/options/options_helper.cc index a973bbfde..c33c2be6f 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -103,6 +103,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.enable_thread_tracking = immutable_db_options.enable_thread_tracking; options.delayed_write_rate = mutable_db_options.delayed_write_rate; options.enable_pipelined_write = immutable_db_options.enable_pipelined_write; + options.unordered_write = immutable_db_options.unordered_write; options.allow_concurrent_memtable_write = immutable_db_options.allow_concurrent_memtable_write; options.enable_write_thread_adaptive_yield = @@ -1583,6 +1584,9 @@ std::unordered_map {"enable_pipelined_write", {offsetof(struct DBOptions, enable_pipelined_write), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, + {"unordered_write", + {offsetof(struct DBOptions, unordered_write), OptionType::kBoolean, + OptionVerificationType::kNormal, false, 0}}, {"allow_concurrent_memtable_write", {offsetof(struct DBOptions, allow_concurrent_memtable_write), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 2d6cc11c0..79a4fa814 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -279,6 +279,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "advise_random_on_open=true;" "fail_if_options_file_error=false;" "enable_pipelined_write=false;" + "unordered_write=false;" "allow_concurrent_memtable_write=true;" "wal_recovery_mode=kPointInTimeRecovery;" "enable_write_thread_adaptive_yield=true;" diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 47fe8e1b0..790a2c99e 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -227,7 +227,7 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder( } Status BlockBasedTableFactory::SanitizeOptions( - const DBOptions& /*db_opts*/, const ColumnFamilyOptions& cf_opts) const { + const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const { if (table_options_.index_type == BlockBasedTableOptions::kHashSearch && cf_opts.prefix_extractor == nullptr) { return Status::InvalidArgument( @@ -268,6 +268,12 @@ Status BlockBasedTableFactory::SanitizeOptions( "data_block_hash_table_util_ratio should be greater than 0 when " "data_block_index_type is set to kDataBlockBinaryAndHash"); } + if (db_opts.unordered_write && cf_opts.max_successive_merges > 0) { + // TODO(myabandeh): support it + return Status::InvalidArgument( + "max_successive_merges larger than 0 is currently inconsistent with " + "unordered_write"); + } return Status::OK(); } diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index b2562f4e5..b806fff89 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -891,6 +891,9 @@ DEFINE_uint64(delayed_write_rate, 8388608u, DEFINE_bool(enable_pipelined_write, true, "Allow WAL and memtable writes to be pipelined"); +DEFINE_bool(unordered_write, false, + "Allow WAL and memtable writes to be pipelined"); + DEFINE_bool(allow_concurrent_memtable_write, true, "Allow multi-writers to update mem tables in parallel."); @@ -3552,6 +3555,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { options.enable_write_thread_adaptive_yield = FLAGS_enable_write_thread_adaptive_yield; options.enable_pipelined_write = FLAGS_enable_pipelined_write; + options.unordered_write = FLAGS_unordered_write; options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec; options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec; options.rate_limit_delay_max_milliseconds = diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 8eb21777a..05973e83a 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -221,9 +221,18 @@ Status TransactionDB::Open( std::vector* handles, TransactionDB** dbptr) { Status s; DB* db = nullptr; + if (txn_db_options.write_policy == WRITE_COMMITTED && + db_options.unordered_write) { + return Status::NotSupported( + "WRITE_COMMITTED is incompatible with unordered_writes"); + } + if (txn_db_options.write_policy == WRITE_UNPREPARED && + db_options.unordered_write) { + // TODO(lth): support it + return Status::NotSupported( + "WRITE_UNPREPARED is currently incompatible with unordered_writes"); + } - ROCKS_LOG_WARN(db_options.info_log, "Transaction write_policy is %" PRId32, - static_cast(txn_db_options.write_policy)); std::vector column_families_copy = column_families; std::vector compaction_enabled_cf_indices; DBOptions db_options_2pc = db_options; @@ -238,6 +247,9 @@ Status TransactionDB::Open( s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db, use_seq_per_batch, use_batch_per_txn); if (s.ok()) { + ROCKS_LOG_WARN(db->GetDBOptions().info_log, + "Transaction write_policy is %" PRId32, + static_cast(txn_db_options.write_policy)); s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles, dbptr); } diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 1a5bf2d66..997a5abe2 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -42,40 +42,48 @@ namespace rocksdb { INSTANTIATE_TEST_CASE_P( DBAsBaseDB, TransactionTest, - ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), - std::make_tuple(false, true, WRITE_COMMITTED), - std::make_tuple(false, false, WRITE_PREPARED), - std::make_tuple(false, true, WRITE_PREPARED), - std::make_tuple(false, false, WRITE_UNPREPARED), - std::make_tuple(false, true, WRITE_UNPREPARED))); + ::testing::Values( + std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite), + std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite), + std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite), + std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite))); INSTANTIATE_TEST_CASE_P( DBAsBaseDB, TransactionStressTest, - ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED), - std::make_tuple(false, true, WRITE_COMMITTED), - std::make_tuple(false, false, WRITE_PREPARED), - std::make_tuple(false, true, WRITE_PREPARED), - std::make_tuple(false, false, WRITE_UNPREPARED), - std::make_tuple(false, true, WRITE_UNPREPARED))); + ::testing::Values( + std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite), + std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite), + std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite), + std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite))); INSTANTIATE_TEST_CASE_P( StackableDBAsBaseDB, TransactionTest, - ::testing::Values(std::make_tuple(true, true, WRITE_COMMITTED), - std::make_tuple(true, true, WRITE_PREPARED), - std::make_tuple(true, true, WRITE_UNPREPARED))); + ::testing::Values( + std::make_tuple(true, true, WRITE_COMMITTED, kOrderedWrite), + std::make_tuple(true, true, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(true, true, WRITE_UNPREPARED, kOrderedWrite))); // MySQLStyleTransactionTest takes far too long for valgrind to run. #ifndef ROCKSDB_VALGRIND_RUN INSTANTIATE_TEST_CASE_P( MySQLStyleTransactionTest, MySQLStyleTransactionTest, - ::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED, false), - std::make_tuple(false, true, WRITE_COMMITTED, false), - std::make_tuple(false, false, WRITE_PREPARED, false), - std::make_tuple(false, false, WRITE_PREPARED, true), - std::make_tuple(false, true, WRITE_PREPARED, false), - std::make_tuple(false, true, WRITE_PREPARED, true), - std::make_tuple(false, false, WRITE_UNPREPARED, false), - std::make_tuple(false, false, WRITE_UNPREPARED, true), - std::make_tuple(false, true, WRITE_UNPREPARED, false), - std::make_tuple(false, true, WRITE_UNPREPARED, true))); + ::testing::Values( + std::make_tuple(false, false, WRITE_COMMITTED, kOrderedWrite, false), + std::make_tuple(false, true, WRITE_COMMITTED, kOrderedWrite, false), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, false), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, true), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, false), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, true), + std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, false), + std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite, true), + std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, false), + std::make_tuple(false, true, WRITE_UNPREPARED, kOrderedWrite, true))); #endif // ROCKSDB_VALGRIND_RUN TEST_P(TransactionTest, DoubleEmptyWrite) { @@ -5646,7 +5654,7 @@ TEST_P(TransactionTest, DuplicateKeys) { } // do_rollback } // do_prepare - { + if (!options.unordered_write) { // Also test with max_successive_merges > 0. max_successive_merges will not // affect our algorithm for duplicate key insertion but we add the test to // verify that. @@ -5697,6 +5705,7 @@ TEST_P(TransactionTest, DuplicateKeys) { std::unique_ptr comp_gc(new ThreeBytewiseComparator()); cf_options.comparator = comp_gc.get(); + cf_options.merge_operator = MergeOperators::CreateStringAppendOperator(); ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); delete cf_handle; std::vector cfds{ diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 33b2c51ea..b42548709 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -39,6 +39,8 @@ namespace rocksdb { // Return true if the ith bit is set in combination represented by comb bool IsInCombination(size_t i, size_t comb) { return comb & (size_t(1) << i); } +enum WriteOrdering : bool { kOrderedWrite, kUnorderedWrite }; + class TransactionTestBase : public ::testing::Test { public: TransactionDB* db; @@ -50,11 +52,13 @@ class TransactionTestBase : public ::testing::Test { bool use_stackable_db_; TransactionTestBase(bool use_stackable_db, bool two_write_queue, - TxnDBWritePolicy write_policy) + TxnDBWritePolicy write_policy, + WriteOrdering write_ordering) : db(nullptr), env(nullptr), use_stackable_db_(use_stackable_db) { options.create_if_missing = true; options.max_write_buffer_number = 2; options.write_buffer_size = 4 * 1024; + options.unordered_write = write_ordering == kUnorderedWrite; options.level0_file_num_compaction_trigger = 2; options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); env = new FaultInjectionTestEnv(Env::Default()); @@ -352,6 +356,9 @@ class TransactionTestBase : public ::testing::Test { Transaction* txn; txn_db_options.write_policy = from_policy; + if (txn_db_options.write_policy == WRITE_COMMITTED) { + options.unordered_write = false; + } ReOpen(); for (int i = 0; i < 1024; i++) { @@ -400,6 +407,9 @@ class TransactionTestBase : public ::testing::Test { } // for i txn_db_options.write_policy = to_policy; + if (txn_db_options.write_policy == WRITE_COMMITTED) { + options.unordered_write = false; + } auto db_impl = reinterpret_cast(db->GetRootDB()); // Before upgrade/downgrade the WAL must be emptied if (empty_wal) { @@ -437,13 +447,14 @@ class TransactionTestBase : public ::testing::Test { } }; -class TransactionTest : public TransactionTestBase, - virtual public ::testing::WithParamInterface< - std::tuple> { +class TransactionTest + : public TransactionTestBase, + virtual public ::testing::WithParamInterface< + std::tuple> { public: TransactionTest() : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()), - std::get<2>(GetParam())){}; + std::get<2>(GetParam()), std::get<3>(GetParam())){}; }; class TransactionStressTest : public TransactionTest {}; @@ -451,12 +462,12 @@ class TransactionStressTest : public TransactionTest {}; class MySQLStyleTransactionTest : public TransactionTestBase, virtual public ::testing::WithParamInterface< - std::tuple> { + std::tuple> { public: MySQLStyleTransactionTest() : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()), - std::get<2>(GetParam())), - with_slow_threads_(std::get<3>(GetParam())) { + std::get<2>(GetParam()), std::get<3>(GetParam())), + with_slow_threads_(std::get<4>(GetParam())) { if (with_slow_threads_ && (txn_db_options.write_policy == WRITE_PREPARED || txn_db_options.write_policy == WRITE_UNPREPARED)) { diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index f2f3f30e2..d5a03cd04 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -342,8 +342,10 @@ class WritePreparedTxnDBMock : public WritePreparedTxnDB { class WritePreparedTransactionTestBase : public TransactionTestBase { public: WritePreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue, - TxnDBWritePolicy write_policy) - : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){}; + TxnDBWritePolicy write_policy, + WriteOrdering write_ordering) + : TransactionTestBase(use_stackable_db, two_write_queue, write_policy, + write_ordering){}; protected: void UpdateTransactionDBOptions(size_t snapshot_cache_bits, @@ -518,26 +520,26 @@ class WritePreparedTransactionTestBase : public TransactionTestBase { class WritePreparedTransactionTest : public WritePreparedTransactionTestBase, virtual public ::testing::WithParamInterface< - std::tuple> { + std::tuple> { public: WritePreparedTransactionTest() - : WritePreparedTransactionTestBase(std::get<0>(GetParam()), - std::get<1>(GetParam()), - std::get<2>(GetParam())){}; + : WritePreparedTransactionTestBase( + std::get<0>(GetParam()), std::get<1>(GetParam()), + std::get<2>(GetParam()), std::get<3>(GetParam())){}; }; #ifndef ROCKSDB_VALGRIND_RUN class SnapshotConcurrentAccessTest : public WritePreparedTransactionTestBase, - virtual public ::testing::WithParamInterface< - std::tuple> { + virtual public ::testing::WithParamInterface> { public: SnapshotConcurrentAccessTest() - : WritePreparedTransactionTestBase(std::get<0>(GetParam()), - std::get<1>(GetParam()), - std::get<2>(GetParam())), - split_id_(std::get<3>(GetParam())), - split_cnt_(std::get<4>(GetParam())){}; + : WritePreparedTransactionTestBase( + std::get<0>(GetParam()), std::get<1>(GetParam()), + std::get<2>(GetParam()), std::get<3>(GetParam())), + split_id_(std::get<4>(GetParam())), + split_cnt_(std::get<5>(GetParam())){}; protected: // A test is split into split_cnt_ tests, each identified with split_id_ where @@ -549,15 +551,15 @@ class SnapshotConcurrentAccessTest class SeqAdvanceConcurrentTest : public WritePreparedTransactionTestBase, - virtual public ::testing::WithParamInterface< - std::tuple> { + virtual public ::testing::WithParamInterface> { public: SeqAdvanceConcurrentTest() - : WritePreparedTransactionTestBase(std::get<0>(GetParam()), - std::get<1>(GetParam()), - std::get<2>(GetParam())), - split_id_(std::get<3>(GetParam())), - split_cnt_(std::get<4>(GetParam())){}; + : WritePreparedTransactionTestBase( + std::get<0>(GetParam()), std::get<1>(GetParam()), + std::get<2>(GetParam()), std::get<3>(GetParam())), + split_id_(std::get<4>(GetParam())), + split_cnt_(std::get<5>(GetParam())){}; protected: // A test is split into split_cnt_ tests, each identified with split_id_ where @@ -568,81 +570,152 @@ class SeqAdvanceConcurrentTest INSTANTIATE_TEST_CASE_P( WritePreparedTransactionTest, WritePreparedTransactionTest, - ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED), - std::make_tuple(false, true, WRITE_PREPARED))); + ::testing::Values( + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite))); #ifndef ROCKSDB_VALGRIND_RUN INSTANTIATE_TEST_CASE_P( TwoWriteQueues, SnapshotConcurrentAccessTest, - ::testing::Values(std::make_tuple(false, true, WRITE_PREPARED, 0, 20), - std::make_tuple(false, true, WRITE_PREPARED, 1, 20), - std::make_tuple(false, true, WRITE_PREPARED, 2, 20), - std::make_tuple(false, true, WRITE_PREPARED, 3, 20), - std::make_tuple(false, true, WRITE_PREPARED, 4, 20), - std::make_tuple(false, true, WRITE_PREPARED, 5, 20), - std::make_tuple(false, true, WRITE_PREPARED, 6, 20), - std::make_tuple(false, true, WRITE_PREPARED, 7, 20), - std::make_tuple(false, true, WRITE_PREPARED, 8, 20), - std::make_tuple(false, true, WRITE_PREPARED, 9, 20), - std::make_tuple(false, true, WRITE_PREPARED, 10, 20), - std::make_tuple(false, true, WRITE_PREPARED, 11, 20), - std::make_tuple(false, true, WRITE_PREPARED, 12, 20), - std::make_tuple(false, true, WRITE_PREPARED, 13, 20), - std::make_tuple(false, true, WRITE_PREPARED, 14, 20), - std::make_tuple(false, true, WRITE_PREPARED, 15, 20), - std::make_tuple(false, true, WRITE_PREPARED, 16, 20), - std::make_tuple(false, true, WRITE_PREPARED, 17, 20), - std::make_tuple(false, true, WRITE_PREPARED, 18, 20), - std::make_tuple(false, true, WRITE_PREPARED, 19, 20))); + ::testing::Values( + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 10, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 11, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 12, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 13, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 14, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 15, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 16, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 17, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 18, 20), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 19, 20), + + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 10, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 11, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 12, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 13, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 14, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 15, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 16, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 17, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 18, 20), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 19, 20))); INSTANTIATE_TEST_CASE_P( OneWriteQueue, SnapshotConcurrentAccessTest, - ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED, 0, 20), - std::make_tuple(false, false, WRITE_PREPARED, 1, 20), - std::make_tuple(false, false, WRITE_PREPARED, 2, 20), - std::make_tuple(false, false, WRITE_PREPARED, 3, 20), - std::make_tuple(false, false, WRITE_PREPARED, 4, 20), - std::make_tuple(false, false, WRITE_PREPARED, 5, 20), - std::make_tuple(false, false, WRITE_PREPARED, 6, 20), - std::make_tuple(false, false, WRITE_PREPARED, 7, 20), - std::make_tuple(false, false, WRITE_PREPARED, 8, 20), - std::make_tuple(false, false, WRITE_PREPARED, 9, 20), - std::make_tuple(false, false, WRITE_PREPARED, 10, 20), - std::make_tuple(false, false, WRITE_PREPARED, 11, 20), - std::make_tuple(false, false, WRITE_PREPARED, 12, 20), - std::make_tuple(false, false, WRITE_PREPARED, 13, 20), - std::make_tuple(false, false, WRITE_PREPARED, 14, 20), - std::make_tuple(false, false, WRITE_PREPARED, 15, 20), - std::make_tuple(false, false, WRITE_PREPARED, 16, 20), - std::make_tuple(false, false, WRITE_PREPARED, 17, 20), - std::make_tuple(false, false, WRITE_PREPARED, 18, 20), - std::make_tuple(false, false, WRITE_PREPARED, 19, 20))); + ::testing::Values( + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 10, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 11, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 12, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 13, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 14, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 15, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 16, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 17, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 18, 20), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 19, 20), + + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 0, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 1, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 2, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 3, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 4, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 5, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 6, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 7, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 8, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 9, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 10, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 11, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 12, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 13, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 14, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 15, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 16, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 17, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 18, 20), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 19, + 20))); INSTANTIATE_TEST_CASE_P( TwoWriteQueues, SeqAdvanceConcurrentTest, - ::testing::Values(std::make_tuple(false, true, WRITE_PREPARED, 0, 10), - std::make_tuple(false, true, WRITE_PREPARED, 1, 10), - std::make_tuple(false, true, WRITE_PREPARED, 2, 10), - std::make_tuple(false, true, WRITE_PREPARED, 3, 10), - std::make_tuple(false, true, WRITE_PREPARED, 4, 10), - std::make_tuple(false, true, WRITE_PREPARED, 5, 10), - std::make_tuple(false, true, WRITE_PREPARED, 6, 10), - std::make_tuple(false, true, WRITE_PREPARED, 7, 10), - std::make_tuple(false, true, WRITE_PREPARED, 8, 10), - std::make_tuple(false, true, WRITE_PREPARED, 9, 10))); + ::testing::Values( + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 10), + std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 10), + std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 10))); INSTANTIATE_TEST_CASE_P( OneWriteQueue, SeqAdvanceConcurrentTest, - ::testing::Values(std::make_tuple(false, false, WRITE_PREPARED, 0, 10), - std::make_tuple(false, false, WRITE_PREPARED, 1, 10), - std::make_tuple(false, false, WRITE_PREPARED, 2, 10), - std::make_tuple(false, false, WRITE_PREPARED, 3, 10), - std::make_tuple(false, false, WRITE_PREPARED, 4, 10), - std::make_tuple(false, false, WRITE_PREPARED, 5, 10), - std::make_tuple(false, false, WRITE_PREPARED, 6, 10), - std::make_tuple(false, false, WRITE_PREPARED, 7, 10), - std::make_tuple(false, false, WRITE_PREPARED, 8, 10), - std::make_tuple(false, false, WRITE_PREPARED, 9, 10))); + ::testing::Values( + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 10), + std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 10), + + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 0, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 1, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 2, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 3, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 4, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 5, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 6, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 7, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 8, 10), + std::make_tuple(false, false, WRITE_PREPARED, kUnorderedWrite, 9, 10))); #endif // ROCKSDB_VALGRIND_RUN TEST_P(WritePreparedTransactionTest, CommitMapTest) { diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index 9aee33b07..914f3f581 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -20,7 +20,8 @@ class WriteUnpreparedTransactionTestBase : public TransactionTestBase { WriteUnpreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue, TxnDBWritePolicy write_policy) - : TransactionTestBase(use_stackable_db, two_write_queue, write_policy){} + : TransactionTestBase(use_stackable_db, two_write_queue, write_policy, + kOrderedWrite) {} }; class WriteUnpreparedTransactionTest