diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index d552b1178..8acd60df8 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -113,159 +113,162 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { for (auto& allow_parallel : {true, false}) { for (auto& allow_batching : {true, false}) { - for (auto& write_group : write_scenarios) { - Options options; - options.create_if_missing = true; - options.allow_concurrent_memtable_write = allow_parallel; - - WriteOptions write_options; - ReadOptions read_options; - DB* db; - DBImpl* db_impl; - - ASSERT_OK(DB::Open(options, dbname, &db)); - - db_impl = dynamic_cast(db); - ASSERT_TRUE(db_impl); + for (auto& enable_WAL : {true, false}) { + for (auto& write_group : write_scenarios) { + Options options; + options.create_if_missing = true; + options.allow_concurrent_memtable_write = allow_parallel; + + ReadOptions read_options; + DB* db; + DBImpl* db_impl; + + ASSERT_OK(DB::Open(options, dbname, &db)); + + db_impl = dynamic_cast(db); + ASSERT_TRUE(db_impl); + + std::atomic threads_waiting(0); + std::atomic seq(db_impl->GetLatestSequenceNumber()); + ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { + uint64_t cur_threads_waiting = 0; + bool is_leader = false; + bool is_last = false; + + // who am i + do { + cur_threads_waiting = threads_waiting.load(); + is_leader = (cur_threads_waiting == 0); + is_last = (cur_threads_waiting == write_group.size() - 1); + } while (!threads_waiting.compare_exchange_strong( + cur_threads_waiting, cur_threads_waiting + 1)); + + // check my state + auto* writer = reinterpret_cast(arg); + + if (is_leader) { + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_GROUP_LEADER); + } else { + ASSERT_TRUE(writer->state == WriteThread::State::STATE_INIT); + } + + // (meta test) the first WriteOP should indeed be the first + // and the last should be the last (all others can be out of + // order) + if (is_leader) { + ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == + !write_group.front().callback_.should_fail_); + } else if (is_last) { + ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == + !write_group.back().callback_.should_fail_); + } + + // wait for friends + while (threads_waiting.load() < write_group.size()) { + } + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) { + // check my state + auto* writer = reinterpret_cast(arg); + + if (!allow_batching) { + // no batching so everyone should be a leader + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_GROUP_LEADER); + } else if (!allow_parallel) { + ASSERT_TRUE(writer->state == + WriteThread::State::STATE_COMPLETED); + } + }); + + std::atomic thread_num(0); + std::atomic dummy_key(0); + std::function write_with_callback_func = [&]() { + uint32_t i = thread_num.fetch_add(1); + Random rnd(i); + + // leaders gotta lead + while (i > 0 && threads_waiting.load() < 1) { + } - std::atomic threads_waiting(0); - std::atomic seq(db_impl->GetLatestSequenceNumber()); - ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0); + // loser has to lose + while (i == write_group.size() - 1 && + threads_waiting.load() < write_group.size() - 1) { + } - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "WriteThread::JoinBatchGroup:Wait", [&](void* arg) { - uint64_t cur_threads_waiting = 0; - bool is_leader = false; - bool is_last = false; + auto& write_op = write_group.at(i); + write_op.Clear(); + write_op.callback_.allow_batching_ = allow_batching; - // who am i + // insert some keys + for (uint32_t j = 0; j < rnd.Next() % 50; j++) { + // grab unique key + char my_key = 0; do { - cur_threads_waiting = threads_waiting.load(); - is_leader = (cur_threads_waiting == 0); - is_last = (cur_threads_waiting == write_group.size() - 1); - } while (!threads_waiting.compare_exchange_strong( - cur_threads_waiting, cur_threads_waiting + 1)); - - // check my state - auto* writer = reinterpret_cast(arg); - - if (is_leader) { - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_GROUP_LEADER); - } else { - ASSERT_TRUE(writer->state == WriteThread::State::STATE_INIT); - } + my_key = dummy_key.load(); + } while (!dummy_key.compare_exchange_strong(my_key, my_key + 1)); - // (meta test) the first WriteOP should indeed be the first - // and the last should be the last (all others can be out of - // order) - if (is_leader) { - ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == - !write_group.front().callback_.should_fail_); - } else if (is_last) { - ASSERT_TRUE(writer->callback->Callback(nullptr).ok() == - !write_group.back().callback_.should_fail_); - } + string skey(5, my_key); + string sval(10, my_key); + write_op.Put(skey, sval); - // wait for friends - while (threads_waiting.load() < write_group.size()) { + if (!write_op.callback_.should_fail_) { + seq.fetch_add(1); } - }); - - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) { - // check my state - auto* writer = reinterpret_cast(arg); - - if (!allow_batching) { - // no batching so everyone should be a leader - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_GROUP_LEADER); - } else if (!allow_parallel) { - ASSERT_TRUE(writer->state == - WriteThread::State::STATE_COMPLETED); - } - }); - - std::atomic thread_num(0); - std::atomic dummy_key(0); - std::function write_with_callback_func = [&]() { - uint32_t i = thread_num.fetch_add(1); - Random rnd(i); - - // leaders gotta lead - while (i > 0 && threads_waiting.load() < 1) { - } - - // loser has to lose - while (i == write_group.size() - 1 && - threads_waiting.load() < write_group.size() - 1) { - } - - auto& write_op = write_group.at(i); - write_op.Clear(); - write_op.callback_.allow_batching_ = allow_batching; - - // insert some keys - for (uint32_t j = 0; j < rnd.Next() % 50; j++) { - // grab unique key - char my_key = 0; - do { - my_key = dummy_key.load(); - } while (!dummy_key.compare_exchange_strong(my_key, my_key + 1)); + } - string skey(5, my_key); - string sval(10, my_key); - write_op.Put(skey, sval); + WriteOptions woptions; + woptions.disableWAL = !enable_WAL; + woptions.sync = enable_WAL; + Status s = db_impl->WriteWithCallback( + woptions, &write_op.write_batch_, &write_op.callback_); - if (!write_op.callback_.should_fail_) { - seq.fetch_add(1); + if (write_op.callback_.should_fail_) { + ASSERT_TRUE(s.IsBusy()); + } else { + ASSERT_OK(s); } - } + }; - WriteOptions woptions; - Status s = db_impl->WriteWithCallback( - woptions, &write_op.write_batch_, &write_op.callback_); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - if (write_op.callback_.should_fail_) { - ASSERT_TRUE(s.IsBusy()); - } else { - ASSERT_OK(s); + // do all the writes + std::vector threads; + for (uint32_t i = 0; i < write_group.size(); i++) { + threads.emplace_back(write_with_callback_func); + } + for (auto& t : threads) { + t.join(); } - }; - - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - - // do all the writes - std::vector threads; - for (uint32_t i = 0; i < write_group.size(); i++) { - threads.emplace_back(write_with_callback_func); - } - for (auto& t : threads) { - t.join(); - } - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - // check for keys - string value; - for (auto& w : write_group) { - ASSERT_TRUE(w.callback_.was_called_); - for (auto& kvp : w.kvs_) { - if (w.callback_.should_fail_) { - ASSERT_TRUE( - db->Get(read_options, kvp.first, &value).IsNotFound()); - } else { - ASSERT_OK(db->Get(read_options, kvp.first, &value)); - ASSERT_EQ(value, kvp.second); + // check for keys + string value; + for (auto& w : write_group) { + ASSERT_TRUE(w.callback_.was_called_); + for (auto& kvp : w.kvs_) { + if (w.callback_.should_fail_) { + ASSERT_TRUE( + db->Get(read_options, kvp.first, &value).IsNotFound()); + } else { + ASSERT_OK(db->Get(read_options, kvp.first, &value)); + ASSERT_EQ(value, kvp.second); + } } } - } - ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber()); + ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber()); - delete db; - DestroyDB(dbname, options); + delete db; + DestroyDB(dbname, options); + } } } }