Support file ingestion in stress test (#4018)

Summary:
Once per `ingest_external_file_one_in` operations, uses SstFileWriter to create a file containing `ingest_external_file_width` consecutive keys. The file is named containing the thread ID to avoid clashes. The file is then added to the DB using `IngestExternalFile`.

We can't enable it by default in crash test because `nooverwritepercent` and `test_batches_snapshot` both must be zero for the DB's whole lifetime. Perhaps we should setup a separate test with that config as range deletion also requires it.
Closes https://github.com/facebook/rocksdb/pull/4018

Differential Revision: D8507698

Pulled By: ajkr

fbshipit-source-id: 1437ea26fd989349a9ce8b94117241c65e40f10f
main
Andrew Kryczka 7 years ago committed by Facebook Github Bot
parent 61d69d450d
commit 14cee194d6
  1. 111
      tools/db_stress.cc

@ -396,6 +396,14 @@ DEFINE_int32(checkpoint_one_in, 0,
"every N operations on average. 0 indicates CreateCheckpoint() " "every N operations on average. 0 indicates CreateCheckpoint() "
"is disabled."); "is disabled.");
DEFINE_int32(ingest_external_file_one_in, 0,
"If non-zero, then IngestExternalFile() will be called once for "
"every N operations on average. 0 indicates IngestExternalFile() "
"is disabled.");
DEFINE_int32(ingest_external_file_width, 1000,
"The width of the ingested external files.");
DEFINE_int32(compact_files_one_in, 0, DEFINE_int32(compact_files_one_in, 0,
"If non-zero, then CompactFiles() will be called once for every N " "If non-zero, then CompactFiles() will be called once for every N "
"operations on average. 0 indicates CompactFiles() is disabled."); "operations on average. 0 indicates CompactFiles() is disabled.");
@ -1901,6 +1909,11 @@ class StressTest {
} }
} }
if (FLAGS_ingest_external_file_one_in > 0 &&
thread->rand.Uniform(FLAGS_ingest_external_file_one_in) == 0) {
TestIngestExternalFile(thread, {rand_column_family}, {rand_key}, lock);
}
if (FLAGS_acquire_snapshot_one_in > 0 && if (FLAGS_acquire_snapshot_one_in > 0 &&
thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) { thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) {
auto snapshot = db_->GetSnapshot(); auto snapshot = db_->GetSnapshot();
@ -1998,6 +2011,11 @@ class StressTest {
const std::vector<int64_t>& rand_keys, const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& lock) = 0; std::unique_ptr<MutexLock>& lock) = 0;
virtual void TestIngestExternalFile(
ThreadState* thread, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& lock) = 0;
// Given a key K, this creates an iterator which scans to K and then // Given a key K, this creates an iterator which scans to K and then
// does a random sequence of Next/Prev operations. // does a random sequence of Next/Prev operations.
virtual Status TestIterate(ThreadState* thread, virtual Status TestIterate(ThreadState* thread,
@ -2777,6 +2795,82 @@ class NonBatchedOpsStressTest : public StressTest {
return s; return s;
} }
#ifdef ROCKSDB_LITE
virtual void TestIngestExternalFile(
ThreadState* /* thread */,
const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */,
std::unique_ptr<MutexLock>& /* lock */) {
assert(false);
fprintf(stderr,
"RocksDB lite does not support "
"TestIngestExternalFile\n");
std::terminate();
}
#else
virtual void TestIngestExternalFile(
ThreadState* thread, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, std::unique_ptr<MutexLock>& lock) {
const std::string sst_filename =
FLAGS_db + "/." + ToString(thread->tid) + ".sst";
Status s;
if (FLAGS_env->FileExists(sst_filename).ok()) {
// Maybe we terminated abnormally before, so cleanup to give this file
// ingestion a clean slate
s = FLAGS_env->DeleteFile(sst_filename);
}
SstFileWriter sst_file_writer(EnvOptions(), options_);
if (s.ok()) {
s = sst_file_writer.Open(sst_filename);
}
int64_t key_base = rand_keys[0];
int column_family = rand_column_families[0];
std::vector<std::unique_ptr<MutexLock> > range_locks;
std::vector<uint32_t> values;
SharedState* shared = thread->shared;
// Grab locks, set pending state on expected values, and add keys
for (int64_t key = key_base;
s.ok() && key < std::min(key_base + FLAGS_ingest_external_file_width,
shared->GetMaxKey());
++key) {
if (key == key_base) {
range_locks.emplace_back(std::move(lock));
} else if ((key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
range_locks.emplace_back(
new MutexLock(shared->GetMutexForKey(column_family, key)));
}
uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
values.push_back(value_base);
shared->Put(column_family, key, value_base, true /* pending */);
char value[100];
size_t value_len = GenerateValue(value_base, value, sizeof(value));
auto key_str = Key(key);
s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len));
}
if (s.ok()) {
s = sst_file_writer.Finish();
}
if (s.ok()) {
s = db_->IngestExternalFile(column_families_[column_family],
{sst_filename}, IngestExternalFileOptions());
}
if (!s.ok()) {
fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str());
std::terminate();
}
int64_t key = key_base;
for (int64_t value : values) {
shared->Put(column_family, key, value, false /* pending */);
++key;
}
}
#endif // ROCKSDB_LITE
bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/, bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/,
SharedState* shared, const std::string& value_from_db, SharedState* shared, const std::string& value_from_db,
Status s, bool strict = false) const { Status s, bool strict = false) const {
@ -2905,6 +2999,18 @@ class BatchedOpsStressTest : public StressTest {
"TestDeleteRange"); "TestDeleteRange");
} }
virtual void TestIngestExternalFile(
ThreadState* /* thread */,
const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */,
std::unique_ptr<MutexLock>& /* lock */) {
assert(false);
fprintf(stderr,
"BatchedOpsStressTest does not support "
"TestIngestExternalFile\n");
std::terminate();
}
// Given a key K, this gets values for "0"+K, "1"+K,..."9"+K // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K
// in the same snapshot, and verifies that all the values are of the form // in the same snapshot, and verifies that all the values are of the form
// "0"+V, "1"+V,..."9"+V. // "0"+V, "1"+V,..."9"+V.
@ -3132,6 +3238,11 @@ int main(int argc, char** argv) {
"Error: nooverwritepercent must not be 100 when using merge operands"); "Error: nooverwritepercent must not be 100 when using merge operands");
exit(1); exit(1);
} }
if (FLAGS_ingest_external_file_one_in > 0 && FLAGS_nooverwritepercent > 0) {
fprintf(stderr,
"Error: nooverwritepercent must be 0 when using file ingestion\n");
exit(1);
}
// Choose a location for the test database if none given with --db=<path> // Choose a location for the test database if none given with --db=<path>
if (FLAGS_db.empty()) { if (FLAGS_db.empty()) {

Loading…
Cancel
Save