diff --git a/tools/db_stress.cc b/tools/db_stress.cc index b522062b1..eeeb8e3c6 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -2087,18 +2087,11 @@ class StressTest { const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1); thread->stats.Start(); - for (uint64_t i = 0, prev_i = 0; i < FLAGS_ops_per_thread; i++) { + for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) { if (thread->shared->HasVerificationFailedYet()) { break; } - // In case i is incremented more than once due to multiple operations, - // such as MultiGet or iterator seeks, check whether we have crossed - // the ops_per_open boundary in the previous iteration. If it did, - // then vote to reopen - if (i != 0 && - (i % ops_per_open == 0 || - i % ops_per_open < prev_i % ops_per_open)) { - { + if (open_cnt != 0) { thread->stats.FinishedSingleOp(); MutexLock l(thread->shared->GetMutex()); while (!thread->snapshot_queue.empty()) { @@ -2116,261 +2109,265 @@ class StressTest { } // Commenting this out as we don't want to reset stats on each open. // thread->stats.Start(); - } } - prev_i = i; - // Change Options - if (FLAGS_set_options_one_in > 0 && - thread->rand.OneIn(FLAGS_set_options_one_in)) { - SetOptions(thread); - } + for (uint64_t i = 0; i < ops_per_open; i++) { + if (thread->shared->HasVerificationFailedYet()) { + break; + } - if (FLAGS_set_in_place_one_in > 0 && - thread->rand.OneIn(FLAGS_set_in_place_one_in)) { - options_.inplace_update_support ^= options_.inplace_update_support; - } + // Change Options + if (FLAGS_set_options_one_in > 0 && + thread->rand.OneIn(FLAGS_set_options_one_in)) { + SetOptions(thread); + } + + if (FLAGS_set_in_place_one_in > 0 && + thread->rand.OneIn(FLAGS_set_in_place_one_in)) { + options_.inplace_update_support ^= options_.inplace_update_support; + } - MaybeClearOneColumnFamily(thread); + MaybeClearOneColumnFamily(thread); #ifndef ROCKSDB_LITE - if (FLAGS_compact_files_one_in > 0 && - thread->rand.Uniform(FLAGS_compact_files_one_in) == 0) { - auto* random_cf = - column_families_[thread->rand.Next() % FLAGS_column_families]; - rocksdb::ColumnFamilyMetaData cf_meta_data; - db_->GetColumnFamilyMetaData(random_cf, &cf_meta_data); - - // Randomly compact up to three consecutive files from a level - const int kMaxRetry = 3; - for (int attempt = 0; attempt < kMaxRetry; ++attempt) { - size_t random_level = thread->rand.Uniform( - static_cast(cf_meta_data.levels.size())); - - const auto& files = cf_meta_data.levels[random_level].files; - if (files.size() > 0) { - size_t random_file_index = - thread->rand.Uniform(static_cast(files.size())); - if (files[random_file_index].being_compacted) { - // Retry as the selected file is currently being compacted - continue; - } + if (FLAGS_compact_files_one_in > 0 && + thread->rand.Uniform(FLAGS_compact_files_one_in) == 0) { + auto* random_cf = + column_families_[thread->rand.Next() % FLAGS_column_families]; + rocksdb::ColumnFamilyMetaData cf_meta_data; + db_->GetColumnFamilyMetaData(random_cf, &cf_meta_data); + + // Randomly compact up to three consecutive files from a level + const int kMaxRetry = 3; + for (int attempt = 0; attempt < kMaxRetry; ++attempt) { + size_t random_level = thread->rand.Uniform( + static_cast(cf_meta_data.levels.size())); + + const auto& files = cf_meta_data.levels[random_level].files; + if (files.size() > 0) { + size_t random_file_index = + thread->rand.Uniform(static_cast(files.size())); + if (files[random_file_index].being_compacted) { + // Retry as the selected file is currently being compacted + continue; + } - std::vector input_files; - input_files.push_back(files[random_file_index].name); - if (random_file_index > 0 && - !files[random_file_index - 1].being_compacted) { - input_files.push_back(files[random_file_index - 1].name); - } - if (random_file_index + 1 < files.size() && - !files[random_file_index + 1].being_compacted) { - input_files.push_back(files[random_file_index + 1].name); - } + std::vector input_files; + input_files.push_back(files[random_file_index].name); + if (random_file_index > 0 && + !files[random_file_index - 1].being_compacted) { + input_files.push_back(files[random_file_index - 1].name); + } + if (random_file_index + 1 < files.size() && + !files[random_file_index + 1].being_compacted) { + input_files.push_back(files[random_file_index + 1].name); + } - size_t output_level = - std::min(random_level + 1, cf_meta_data.levels.size() - 1); - auto s = - db_->CompactFiles(CompactionOptions(), random_cf, input_files, - static_cast(output_level)); - if (!s.ok()) { - fprintf(stdout, "Unable to perform CompactFiles(): %s\n", - s.ToString().c_str()); - thread->stats.AddNumCompactFilesFailed(1); - } else { - thread->stats.AddNumCompactFilesSucceed(1); + size_t output_level = + std::min(random_level + 1, cf_meta_data.levels.size() - 1); + auto s = + db_->CompactFiles(CompactionOptions(), random_cf, input_files, + static_cast(output_level)); + if (!s.ok()) { + fprintf(stdout, "Unable to perform CompactFiles(): %s\n", + s.ToString().c_str()); + thread->stats.AddNumCompactFilesFailed(1); + } else { + thread->stats.AddNumCompactFilesSucceed(1); + } + break; } - break; } } - } #endif // !ROCKSDB_LITE - int64_t rand_key = GenerateOneKey(thread, i); - int rand_column_family = thread->rand.Next() % FLAGS_column_families; - std::string keystr = Key(rand_key); - Slice key = keystr; - std::unique_ptr lock; - if (ShouldAcquireMutexOnKey()) { - lock.reset(new MutexLock( - shared->GetMutexForKey(rand_column_family, rand_key))); - } + int64_t rand_key = GenerateOneKey(thread, i); + int rand_column_family = thread->rand.Next() % FLAGS_column_families; + std::string keystr = Key(rand_key); + Slice key = keystr; + std::unique_ptr lock; + if (ShouldAcquireMutexOnKey()) { + lock.reset(new MutexLock( + shared->GetMutexForKey(rand_column_family, rand_key))); + } - auto column_family = column_families_[rand_column_family]; + auto column_family = column_families_[rand_column_family]; - if (FLAGS_compact_range_one_in > 0 && - thread->rand.Uniform(FLAGS_compact_range_one_in) == 0) { - int64_t end_key_num; - if (port::kMaxInt64 - rand_key < FLAGS_compact_range_width) { - end_key_num = port::kMaxInt64; - } else { - end_key_num = FLAGS_compact_range_width + rand_key; - } - std::string end_key_buf = Key(end_key_num); - Slice end_key(end_key_buf); - - CompactRangeOptions cro; - cro.exclusive_manual_compaction = - static_cast(thread->rand.Next() % 2); - Status status = db_->CompactRange(cro, column_family, &key, &end_key); - if (!status.ok()) { - printf("Unable to perform CompactRange(): %s\n", - status.ToString().c_str()); + if (FLAGS_compact_range_one_in > 0 && + thread->rand.Uniform(FLAGS_compact_range_one_in) == 0) { + int64_t end_key_num; + if (port::kMaxInt64 - rand_key < FLAGS_compact_range_width) { + end_key_num = port::kMaxInt64; + } else { + end_key_num = FLAGS_compact_range_width + rand_key; + } + std::string end_key_buf = Key(end_key_num); + Slice end_key(end_key_buf); + + CompactRangeOptions cro; + cro.exclusive_manual_compaction = + static_cast(thread->rand.Next() % 2); + Status status = db_->CompactRange(cro, column_family, &key, &end_key); + if (!status.ok()) { + printf("Unable to perform CompactRange(): %s\n", + status.ToString().c_str()); + } } - } - std::vector rand_column_families = - GenerateColumnFamilies(FLAGS_column_families, rand_column_family); - - if (FLAGS_flush_one_in > 0 && - thread->rand.Uniform(FLAGS_flush_one_in) == 0) { - FlushOptions flush_opts; - std::vector cfhs; - std::for_each( - rand_column_families.begin(), rand_column_families.end(), - [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); }); - Status status = db_->Flush(flush_opts, cfhs); - if (!status.ok()) { - fprintf(stdout, "Unable to perform Flush(): %s\n", - status.ToString().c_str()); + std::vector rand_column_families = + GenerateColumnFamilies(FLAGS_column_families, rand_column_family); + + if (FLAGS_flush_one_in > 0 && + thread->rand.Uniform(FLAGS_flush_one_in) == 0) { + FlushOptions flush_opts; + std::vector cfhs; + std::for_each( + rand_column_families.begin(), rand_column_families.end(), + [this, &cfhs](int k) { cfhs.push_back(column_families_[k]); }); + Status status = db_->Flush(flush_opts, cfhs); + if (!status.ok()) { + fprintf(stdout, "Unable to perform Flush(): %s\n", + status.ToString().c_str()); + } } - } - std::vector rand_keys = GenerateKeys(rand_key); + std::vector rand_keys = GenerateKeys(rand_key); - if (FLAGS_ingest_external_file_one_in > 0 && - thread->rand.Uniform(FLAGS_ingest_external_file_one_in) == 0) { - TestIngestExternalFile(thread, rand_column_families, rand_keys, lock); - } + if (FLAGS_ingest_external_file_one_in > 0 && + thread->rand.Uniform(FLAGS_ingest_external_file_one_in) == 0) { + TestIngestExternalFile(thread, rand_column_families, rand_keys, lock); + } - if (FLAGS_backup_one_in > 0 && - thread->rand.Uniform(FLAGS_backup_one_in) == 0) { - Status s = TestBackupRestore(thread, rand_column_families, rand_keys); - if (!s.ok()) { - VerificationAbort(shared, "Backup/restore gave inconsistent state", - s); + if (FLAGS_backup_one_in > 0 && + thread->rand.Uniform(FLAGS_backup_one_in) == 0) { + Status s = TestBackupRestore(thread, rand_column_families, rand_keys); + if (!s.ok()) { + VerificationAbort(shared, "Backup/restore gave inconsistent state", + s); + } } - } - if (FLAGS_checkpoint_one_in > 0 && - thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) { - Status s = TestCheckpoint(thread, rand_column_families, rand_keys); - if (!s.ok()) { - VerificationAbort(shared, "Checkpoint gave inconsistent state", s); + if (FLAGS_checkpoint_one_in > 0 && + thread->rand.Uniform(FLAGS_checkpoint_one_in) == 0) { + Status s = TestCheckpoint(thread, rand_column_families, rand_keys); + if (!s.ok()) { + VerificationAbort(shared, "Checkpoint gave inconsistent state", s); + } } - } - if (FLAGS_acquire_snapshot_one_in > 0 && - thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) { - auto snapshot = db_->GetSnapshot(); - ReadOptions ropt; - ropt.snapshot = snapshot; - std::string value_at; - // When taking a snapshot, we also read a key from that snapshot. We - // will later read the same key before releasing the snapshot and verify - // that the results are the same. - auto status_at = db_->Get(ropt, column_family, key, &value_at); - std::vector *key_vec = nullptr; - - if (FLAGS_compare_full_db_state_snapshot && - (thread->tid == 0)) { - key_vec = new std::vector(FLAGS_max_key); - // When `prefix_extractor` is set, seeking to beginning and scanning - // across prefixes are only supported with `total_order_seek` set. - ropt.total_order_seek = true; - std::unique_ptr iterator(db_->NewIterator(ropt)); - for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { - uint64_t key_val; - if (GetIntVal(iterator->key().ToString(), &key_val)) { - (*key_vec)[key_val] = true; + if (FLAGS_acquire_snapshot_one_in > 0 && + thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) { + auto snapshot = db_->GetSnapshot(); + ReadOptions ropt; + ropt.snapshot = snapshot; + std::string value_at; + // When taking a snapshot, we also read a key from that snapshot. We + // will later read the same key before releasing the snapshot and verify + // that the results are the same. + auto status_at = db_->Get(ropt, column_family, key, &value_at); + std::vector *key_vec = nullptr; + + if (FLAGS_compare_full_db_state_snapshot && + (thread->tid == 0)) { + key_vec = new std::vector(FLAGS_max_key); + // When `prefix_extractor` is set, seeking to beginning and scanning + // across prefixes are only supported with `total_order_seek` set. + ropt.total_order_seek = true; + std::unique_ptr iterator(db_->NewIterator(ropt)); + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + uint64_t key_val; + if (GetIntVal(iterator->key().ToString(), &key_val)) { + (*key_vec)[key_val] = true; + } } } - } - ThreadState::SnapshotState snap_state = { - snapshot, rand_column_family, column_family->GetName(), - keystr, status_at, value_at, key_vec}; - thread->snapshot_queue.emplace( - std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops), - snap_state); - } - while (!thread->snapshot_queue.empty() && - i >= thread->snapshot_queue.front().first) { - auto snap_state = thread->snapshot_queue.front().second; - assert(snap_state.snapshot); - // Note: this is unsafe as the cf might be dropped concurrently. But it - // is ok since unclean cf drop is cunnrently not supported by write - // prepared transactions. - Status s = - AssertSame(db_, column_families_[snap_state.cf_at], snap_state); - if (!s.ok()) { - VerificationAbort(shared, "Snapshot gave inconsistent state", s); + ThreadState::SnapshotState snap_state = { + snapshot, rand_column_family, column_family->GetName(), + keystr, status_at, value_at, key_vec}; + thread->snapshot_queue.emplace( + std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops), + snap_state); + } + while (!thread->snapshot_queue.empty() && + i >= thread->snapshot_queue.front().first) { + auto snap_state = thread->snapshot_queue.front().second; + assert(snap_state.snapshot); + // Note: this is unsafe as the cf might be dropped concurrently. But it + // is ok since unclean cf drop is cunnrently not supported by write + // prepared transactions. + Status s = + AssertSame(db_, column_families_[snap_state.cf_at], snap_state); + if (!s.ok()) { + VerificationAbort(shared, "Snapshot gave inconsistent state", s); + } + db_->ReleaseSnapshot(snap_state.snapshot); + delete snap_state.key_vec; + thread->snapshot_queue.pop(); } - db_->ReleaseSnapshot(snap_state.snapshot); - delete snap_state.key_vec; - thread->snapshot_queue.pop(); - } - int prob_op = thread->rand.Uniform(100); - // Reset this in case we pick something other than a read op. We don't - // want to use a stale value when deciding at the beginning of the loop - // whether to vote to reopen - if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) { - // OPERATION read - if (FLAGS_use_multiget) { - // Leave room for one more iteration of the loop with a single key - // batch. This is to ensure that each thread does exactly the same - // number of ops - int multiget_batch_size = static_cast( - std::min(static_cast(thread->rand.Uniform(64)), - FLAGS_ops_per_thread - i - 1)); - // If its the last iteration, ensure that multiget_batch_size is 1 - multiget_batch_size = std::max(multiget_batch_size, 1); - rand_keys = GenerateNKeys(thread, multiget_batch_size, i); - TestMultiGet(thread, read_opts, rand_column_families, rand_keys); - i += multiget_batch_size - 1; + int prob_op = thread->rand.Uniform(100); + // Reset this in case we pick something other than a read op. We don't + // want to use a stale value when deciding at the beginning of the loop + // whether to vote to reopen + if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) { + // OPERATION read + if (FLAGS_use_multiget) { + // Leave room for one more iteration of the loop with a single key + // batch. This is to ensure that each thread does exactly the same + // number of ops + int multiget_batch_size = static_cast( + std::min(static_cast(thread->rand.Uniform(64)), + FLAGS_ops_per_thread - i - 1)); + // If its the last iteration, ensure that multiget_batch_size is 1 + multiget_batch_size = std::max(multiget_batch_size, 1); + rand_keys = GenerateNKeys(thread, multiget_batch_size, i); + TestMultiGet(thread, read_opts, rand_column_families, rand_keys); + i += multiget_batch_size - 1; + } else { + TestGet(thread, read_opts, rand_column_families, rand_keys); + } + } else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) { + // OPERATION prefix scan + // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are + // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will + // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same + // prefix + TestPrefixScan(thread, read_opts, rand_column_families, rand_keys); + } else if (prefixBound <= prob_op && prob_op < writeBound) { + // OPERATION write + TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys, + value, lock); + } else if (writeBound <= prob_op && prob_op < delBound) { + // OPERATION delete + TestDelete(thread, write_opts, rand_column_families, rand_keys, lock); + } else if (delBound <= prob_op && prob_op < delRangeBound) { + // OPERATION delete range + TestDeleteRange(thread, write_opts, rand_column_families, rand_keys, + lock); } else { - TestGet(thread, read_opts, rand_column_families, rand_keys); + // OPERATION iterate + int num_seeks = static_cast( + std::min(static_cast(thread->rand.Uniform(4)), + FLAGS_ops_per_thread - i - 1)); + rand_keys = GenerateNKeys(thread, num_seeks, i); + i += num_seeks - 1; + TestIterate(thread, read_opts, rand_column_families, rand_keys); } - } else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) { - // OPERATION prefix scan - // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are - // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will - // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same - // prefix - TestPrefixScan(thread, read_opts, rand_column_families, rand_keys); - } else if (prefixBound <= prob_op && prob_op < writeBound) { - // OPERATION write - TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys, - value, lock); - } else if (writeBound <= prob_op && prob_op < delBound) { - // OPERATION delete - TestDelete(thread, write_opts, rand_column_families, rand_keys, lock); - } else if (delBound <= prob_op && prob_op < delRangeBound) { - // OPERATION delete range - TestDeleteRange(thread, write_opts, rand_column_families, rand_keys, - lock); - } else { - // OPERATION iterate - int num_seeks = static_cast( - std::min(static_cast(thread->rand.Uniform(4)), - FLAGS_ops_per_thread - i - 1)); - rand_keys = GenerateNKeys(thread, num_seeks, i); - i += num_seeks - 1; - TestIterate(thread, read_opts, rand_column_families, rand_keys); - } - thread->stats.FinishedSingleOp(); + thread->stats.FinishedSingleOp(); #ifndef ROCKSDB_LITE - uint32_t tid = thread->tid; - assert(secondaries_.empty() || - static_cast(tid) < secondaries_.size()); - if (FLAGS_secondary_catch_up_one_in > 0 && - thread->rand.Uniform(FLAGS_secondary_catch_up_one_in) == 0) { - Status s = secondaries_[tid]->TryCatchUpWithPrimary(); - if (!s.ok()) { - VerificationAbort(shared, "Secondary instance failed to catch up", s); - break; + uint32_t tid = thread->tid; + assert(secondaries_.empty() || + static_cast(tid) < secondaries_.size()); + if (FLAGS_secondary_catch_up_one_in > 0 && + thread->rand.Uniform(FLAGS_secondary_catch_up_one_in) == 0) { + Status s = secondaries_[tid]->TryCatchUpWithPrimary(); + if (!s.ok()) { + VerificationAbort(shared, "Secondary instance failed to catch up", s); + break; + } } - } #endif + } } while (!thread->snapshot_queue.empty()) { db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot);