|
|
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
//
|
|
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
|
|
|
|
#ifdef GFLAGS
|
|
|
|
#include "db_stress_tool/db_stress_common.h"
|
|
|
|
#include "rocksdb/utilities/transaction_db.h"
|
Account memory of FileMetaData in global memory limit (#9924)
Summary:
**Context/Summary:**
As revealed by heap profiling, allocation of `FileMetaData` for [newly created file added to a Version](https://github.com/facebook/rocksdb/pull/9924/files#diff-a6aa385940793f95a2c5b39cc670bd440c4547fa54fd44622f756382d5e47e43R774) can consume significant heap memory. This PR is to account that toward our global memory limit based on block cache capacity.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9924
Test Plan:
- Previous `make check` verified there are only 2 places where the memory of the allocated `FileMetaData` can be released
- New unit test `TEST_P(ChargeFileMetadataTestWithParam, Basic)`
- db bench (CPU cost of `charge_file_metadata` in write and compact)
- **write micros/op: -0.24%** : `TEST_TMPDIR=/dev/shm/testdb ./db_bench -benchmarks=fillseq -db=$TEST_TMPDIR -charge_file_metadata=1 (remove this option for pre-PR) -disable_auto_compactions=1 -write_buffer_size=100000 -num=4000000 | egrep 'fillseq'`
- **compact micros/op -0.87%** : `TEST_TMPDIR=/dev/shm/testdb ./db_bench -benchmarks=fillseq -db=$TEST_TMPDIR -charge_file_metadata=1 -disable_auto_compactions=1 -write_buffer_size=100000 -num=4000000 -numdistinct=1000 && ./db_bench -benchmarks=compact -db=$TEST_TMPDIR -use_existing_db=1 -charge_file_metadata=1 -disable_auto_compactions=1 | egrep 'compact'`
table 1 - write
#-run | (pre-PR) avg micros/op | std micros/op | (post-PR) micros/op | std micros/op | change (%)
-- | -- | -- | -- | -- | --
10 | 3.9711 | 0.264408 | 3.9914 | 0.254563 | 0.5111933721
20 | 3.83905 | 0.0664488 | 3.8251 | 0.0695456 | -0.3633711465
40 | 3.86625 | 0.136669 | 3.8867 | 0.143765 | 0.5289363078
80 | 3.87828 | 0.119007 | 3.86791 | 0.115674 | **-0.2673865734**
160 | 3.87677 | 0.162231 | 3.86739 | 0.16663 | **-0.2419539978**
table 2 - compact
#-run | (pre-PR) avg micros/op | std micros/op | (post-PR) micros/op | std micros/op | change (%)
-- | -- | -- | -- | -- | --
10 | 2,399,650.00 | 96,375.80 | 2,359,537.00 | 53,243.60 | -1.67
20 | 2,410,480.00 | 89,988.00 | 2,433,580.00 | 91,121.20 | 0.96
40 | 2.41E+06 | 121811 | 2.39E+06 | 131525 | **-0.96**
80 | 2.40E+06 | 134503 | 2.39E+06 | 108799 | **-0.78**
- stress test: `python3 tools/db_crashtest.py blackbox --charge_file_metadata=1 --cache_size=1` killed as normal
Reviewed By: ajkr
Differential Revision: D36055583
Pulled By: hx235
fbshipit-source-id: b60eab94707103cb1322cf815f05810ef0232625
3 years ago
|
|
|
#include "utilities/fault_injection_fs.h"
|
|
|
|
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class NonBatchedOpsStressTest : public StressTest {
|
|
|
|
public:
|
|
|
|
NonBatchedOpsStressTest() {}
|
|
|
|
|
|
|
|
virtual ~NonBatchedOpsStressTest() {}
|
|
|
|
|
|
|
|
void VerifyDb(ThreadState* thread) const override {
|
|
|
|
// This `ReadOptions` is for validation purposes. Ignore
|
|
|
|
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
|
|
|
|
ReadOptions options(FLAGS_verify_checksum, true);
|
Add user-defined timestamps to db_stress (#8061)
Summary:
Add some basic test for user-defined timestamp to db_stress. Currently,
read with timestamp always tries to read using the current timestamp.
Due to the per-key timestamp-sequence ordering constraint, we only add timestamp-
related tests to the `NonBatchedOpsStressTest` since this test serializes accesses
to the same key and uses a file to cross-check data correctness.
The timestamp feature is not supported in a number of components, e.g. Merge, SingleDelete,
DeleteRange, CompactionFilter, Readonly instance, secondary instance, SST file ingestion, transaction,
etc. Therefore, db_stress should exit if user enables both timestamp and these features at the same
time. The (currently) incompatible features can be found in
`CheckAndSetOptionsForUserTimestamp`.
This PR also fixes a bug triggered when timestamp is enabled together with
`index_type=kBinarySearchWithFirstKey`. This bug fix will also be in another separate PR
with more unit tests coverage. Fixing it here because I do not want to exclude the index type
from crash test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8061
Test Plan: make crash_test_with_ts
Reviewed By: jay-zhuang
Differential Revision: D27056282
Pulled By: riversand963
fbshipit-source-id: c3e00ad1023fdb9ebbdf9601ec18270c5e2925a9
4 years ago
|
|
|
std::string ts_str;
|
|
|
|
Slice ts;
|
|
|
|
if (FLAGS_user_timestamp_size > 0) {
|
|
|
|
ts_str = GenerateTimestampForRead();
|
|
|
|
ts = ts_str;
|
|
|
|
options.timestamp = &ts;
|
|
|
|
}
|
|
|
|
auto shared = thread->shared;
|
|
|
|
const int64_t max_key = shared->GetMaxKey();
|
|
|
|
const int64_t keys_per_thread = max_key / shared->GetNumThreads();
|
|
|
|
int64_t start = keys_per_thread * thread->tid;
|
|
|
|
int64_t end = start + keys_per_thread;
|
|
|
|
uint64_t prefix_to_use =
|
|
|
|
(FLAGS_prefix_size < 0) ? 1 : static_cast<size_t>(FLAGS_prefix_size);
|
|
|
|
if (thread->tid == shared->GetNumThreads() - 1) {
|
|
|
|
end = max_key;
|
|
|
|
}
|
|
|
|
for (size_t cf = 0; cf < column_families_.size(); ++cf) {
|
|
|
|
if (thread->shared->HasVerificationFailedYet()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
if (thread->rand.OneIn(4)) {
|
|
|
|
// 1/4 chance use iterator to verify this range
|
|
|
|
Slice prefix;
|
|
|
|
std::string seek_key = Key(start);
|
|
|
|
std::unique_ptr<Iterator> iter(
|
|
|
|
db_->NewIterator(options, column_families_[cf]));
|
|
|
|
iter->Seek(seek_key);
|
|
|
|
prefix = Slice(seek_key.data(), prefix_to_use);
|
|
|
|
for (auto i = start; i < end; i++) {
|
|
|
|
if (thread->shared->HasVerificationFailedYet()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
std::string from_db;
|
|
|
|
std::string keystr = Key(i);
|
|
|
|
Slice k = keystr;
|
|
|
|
Slice pfx = Slice(keystr.data(), prefix_to_use);
|
|
|
|
// Reseek when the prefix changes
|
|
|
|
if (prefix_to_use > 0 && prefix.compare(pfx) != 0) {
|
|
|
|
iter->Seek(k);
|
|
|
|
seek_key = keystr;
|
|
|
|
prefix = Slice(seek_key.data(), prefix_to_use);
|
|
|
|
}
|
|
|
|
Status s = iter->status();
|
|
|
|
if (iter->Valid()) {
|
|
|
|
Slice iter_key = iter->key();
|
|
|
|
if (iter->key().compare(k) > 0) {
|
|
|
|
s = Status::NotFound(Slice());
|
|
|
|
} else if (iter->key().compare(k) == 0) {
|
|
|
|
from_db = iter->value().ToString();
|
|
|
|
iter->Next();
|
|
|
|
} else if (iter_key.compare(k) < 0) {
|
|
|
|
VerificationAbort(shared, "An out of range key was found",
|
|
|
|
static_cast<int>(cf), i);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// The iterator found no value for the key in question, so do not
|
|
|
|
// move to the next item in the iterator
|
|
|
|
s = Status::NotFound();
|
|
|
|
}
|
|
|
|
VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
|
|
|
|
s, true);
|
|
|
|
if (from_db.length()) {
|
|
|
|
PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
|
|
|
|
from_db.data(), from_db.length());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if (thread->rand.OneIn(3)) {
|
|
|
|
// 1/4 chance use Get to verify this range
|
|
|
|
for (auto i = start; i < end; i++) {
|
|
|
|
if (thread->shared->HasVerificationFailedYet()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
std::string from_db;
|
|
|
|
std::string keystr = Key(i);
|
|
|
|
Slice k = keystr;
|
|
|
|
Status s = db_->Get(options, column_families_[cf], k, &from_db);
|
|
|
|
VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
|
|
|
|
s, true);
|
|
|
|
if (from_db.length()) {
|
|
|
|
PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
|
|
|
|
from_db.data(), from_db.length());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if (thread->rand.OneIn(2)) {
|
|
|
|
// 1/4 chance use MultiGet to verify this range
|
|
|
|
for (auto i = start; i < end;) {
|
|
|
|
if (thread->shared->HasVerificationFailedYet()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
// Keep the batch size to some reasonable value
|
|
|
|
size_t batch_size = thread->rand.Uniform(128) + 1;
|
|
|
|
batch_size = std::min<size_t>(batch_size, end - i);
|
|
|
|
std::vector<std::string> keystrs(batch_size);
|
|
|
|
std::vector<Slice> keys(batch_size);
|
|
|
|
std::vector<PinnableSlice> values(batch_size);
|
|
|
|
std::vector<Status> statuses(batch_size);
|
|
|
|
for (size_t j = 0; j < batch_size; ++j) {
|
|
|
|
keystrs[j] = Key(i + j);
|
|
|
|
keys[j] = Slice(keystrs[j].data(), keystrs[j].length());
|
|
|
|
}
|
|
|
|
db_->MultiGet(options, column_families_[cf], batch_size, keys.data(),
|
|
|
|
values.data(), statuses.data());
|
|
|
|
for (size_t j = 0; j < batch_size; ++j) {
|
|
|
|
Status s = statuses[j];
|
|
|
|
std::string from_db = values[j].ToString();
|
|
|
|
VerifyOrSyncValue(static_cast<int>(cf), i + j, options, shared,
|
|
|
|
from_db, s, true);
|
|
|
|
if (from_db.length()) {
|
|
|
|
PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i + j),
|
|
|
|
from_db.data(), from_db.length());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
i += batch_size;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// 1/4 chance use GetMergeOperand to verify this range
|
|
|
|
// Start off with small size that will be increased later if necessary
|
|
|
|
std::vector<PinnableSlice> values(4);
|
|
|
|
GetMergeOperandsOptions merge_operands_info;
|
|
|
|
merge_operands_info.expected_max_number_of_operands =
|
|
|
|
static_cast<int>(values.size());
|
|
|
|
for (auto i = start; i < end; i++) {
|
|
|
|
if (thread->shared->HasVerificationFailedYet()) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
std::string from_db;
|
|
|
|
std::string keystr = Key(i);
|
|
|
|
Slice k = keystr;
|
|
|
|
int number_of_operands = 0;
|
|
|
|
Status s = db_->GetMergeOperands(options, column_families_[cf], k,
|
|
|
|
values.data(), &merge_operands_info,
|
|
|
|
&number_of_operands);
|
|
|
|
if (s.IsIncomplete()) {
|
|
|
|
// Need to resize values as there are more than values.size() merge
|
|
|
|
// operands on this key. Should only happen a few times when we
|
|
|
|
// encounter a key that had more merge operands than any key seen so
|
|
|
|
// far
|
|
|
|
values.resize(number_of_operands);
|
|
|
|
merge_operands_info.expected_max_number_of_operands =
|
|
|
|
static_cast<int>(number_of_operands);
|
|
|
|
s = db_->GetMergeOperands(options, column_families_[cf], k,
|
|
|
|
values.data(), &merge_operands_info,
|
|
|
|
&number_of_operands);
|
|
|
|
}
|
|
|
|
// Assumed here that GetMergeOperands always sets number_of_operand
|
|
|
|
if (number_of_operands) {
|
|
|
|
from_db = values[number_of_operands - 1].ToString();
|
|
|
|
}
|
|
|
|
VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
|
|
|
|
s, true);
|
|
|
|
if (from_db.length()) {
|
|
|
|
PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
|
|
|
|
from_db.data(), from_db.length());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
void ContinuouslyVerifyDb(ThreadState* thread) const override {
|
|
|
|
if (!cmp_db_) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
assert(cmp_db_);
|
|
|
|
assert(!cmp_cfhs_.empty());
|
|
|
|
Status s = cmp_db_->TryCatchUpWithPrimary();
|
|
|
|
if (!s.ok()) {
|
|
|
|
assert(false);
|
|
|
|
exit(1);
|
|
|
|
}
|
|
|
|
|
|
|
|
const auto checksum_column_family = [](Iterator* iter,
|
|
|
|
uint32_t* checksum) -> Status {
|
|
|
|
assert(nullptr != checksum);
|
|
|
|
uint32_t ret = 0;
|
|
|
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
|
|
|
ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
|
|
|
|
ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
|
|
|
|
}
|
|
|
|
*checksum = ret;
|
|
|
|
return iter->status();
|
|
|
|
};
|
|
|
|
|
|
|
|
auto* shared = thread->shared;
|
|
|
|
assert(shared);
|
|
|
|
const int64_t max_key = shared->GetMaxKey();
|
|
|
|
ReadOptions read_opts(FLAGS_verify_checksum, true);
|
|
|
|
std::string ts_str;
|
|
|
|
Slice ts;
|
|
|
|
if (FLAGS_user_timestamp_size > 0) {
|
|
|
|
ts_str = GenerateTimestampForRead();
|
|
|
|
ts = ts_str;
|
|
|
|
read_opts.timestamp = &ts;
|
|
|
|
}
|
|
|
|
|
|
|
|
static Random64 rand64(shared->GetSeed());
|
|
|
|
|
|
|
|
{
|
|
|
|
uint32_t crc = 0;
|
|
|
|
std::unique_ptr<Iterator> it(cmp_db_->NewIterator(read_opts));
|
|
|
|
s = checksum_column_family(it.get(), &crc);
|
|
|
|
if (!s.ok()) {
|
|
|
|
fprintf(stderr, "Computing checksum of default cf: %s\n",
|
|
|
|
s.ToString().c_str());
|
|
|
|
assert(false);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for (auto* handle : cmp_cfhs_) {
|
|
|
|
if (thread->rand.OneInOpt(3)) {
|
|
|
|
// Use Get()
|
|
|
|
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
|
|
|
|
std::string key_str = Key(key);
|
|
|
|
std::string value;
|
|
|
|
std::string key_ts;
|
|
|
|
s = cmp_db_->Get(read_opts, handle, key_str, &value,
|
|
|
|
FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr);
|
|
|
|
s.PermitUncheckedError();
|
|
|
|
} else {
|
|
|
|
// Use range scan
|
|
|
|
std::unique_ptr<Iterator> iter(cmp_db_->NewIterator(read_opts, handle));
|
|
|
|
uint32_t rnd = (thread->rand.Next()) % 4;
|
|
|
|
if (0 == rnd) {
|
|
|
|
// SeekToFirst() + Next()*5
|
|
|
|
read_opts.total_order_seek = true;
|
|
|
|
iter->SeekToFirst();
|
|
|
|
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
|
|
|
|
}
|
|
|
|
} else if (1 == rnd) {
|
|
|
|
// SeekToLast() + Prev()*5
|
|
|
|
read_opts.total_order_seek = true;
|
|
|
|
iter->SeekToLast();
|
|
|
|
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
|
|
|
|
}
|
|
|
|
} else if (2 == rnd) {
|
|
|
|
// Seek() +Next()*5
|
|
|
|
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
|
|
|
|
std::string key_str = Key(key);
|
|
|
|
iter->Seek(key_str);
|
|
|
|
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// SeekForPrev() + Prev()*5
|
|
|
|
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
|
|
|
|
std::string key_str = Key(key);
|
|
|
|
iter->SeekForPrev(key_str);
|
|
|
|
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#else
|
|
|
|
void ContinuouslyVerifyDb(ThreadState* /*thread*/) const override {}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
|
|
|
|
void MaybeClearOneColumnFamily(ThreadState* thread) override {
|
|
|
|
if (FLAGS_column_families > 1) {
|
|
|
|
if (thread->rand.OneInOpt(FLAGS_clear_column_family_one_in)) {
|
|
|
|
// drop column family and then create it again (can't drop default)
|
|
|
|
int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1;
|
|
|
|
std::string new_name =
|
|
|
|
std::to_string(new_column_family_name_.fetch_add(1));
|
|
|
|
{
|
|
|
|
MutexLock l(thread->shared->GetMutex());
|
|
|
|
fprintf(
|
|
|
|
stdout,
|
|
|
|
"[CF %d] Dropping and recreating column family. new name: %s\n",
|
|
|
|
cf, new_name.c_str());
|
|
|
|
}
|
|
|
|
thread->shared->LockColumnFamily(cf);
|
|
|
|
Status s = db_->DropColumnFamily(column_families_[cf]);
|
|
|
|
delete column_families_[cf];
|
|
|
|
if (!s.ok()) {
|
|
|
|
fprintf(stderr, "dropping column family error: %s\n",
|
|
|
|
s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
|
|
|
|
&column_families_[cf]);
|
|
|
|
column_family_names_[cf] = new_name;
|
|
|
|
thread->shared->ClearColumnFamily(cf);
|
|
|
|
if (!s.ok()) {
|
|
|
|
fprintf(stderr, "creating column family error: %s\n",
|
|
|
|
s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
thread->shared->UnlockColumnFamily(cf);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
bool ShouldAcquireMutexOnKey() const override { return true; }
|
|
|
|
|
|
|
|
bool IsStateTracked() const override { return true; }
|
|
|
|
|
|
|
|
Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
|
|
|
|
const std::vector<int>& rand_column_families,
|
|
|
|
const std::vector<int64_t>& rand_keys) override {
|
|
|
|
auto cfh = column_families_[rand_column_families[0]];
|
|
|
|
std::string key_str = Key(rand_keys[0]);
|
|
|
|
Slice key = key_str;
|
|
|
|
std::string from_db;
|
|
|
|
int error_count = 0;
|
|
|
|
|
|
|
|
if (fault_fs_guard) {
|
|
|
|
fault_fs_guard->EnableErrorInjection();
|
|
|
|
SharedState::ignore_read_error = false;
|
|
|
|
}
|
|
|
|
Status s = db_->Get(read_opts, cfh, key, &from_db);
|
|
|
|
if (fault_fs_guard) {
|
|
|
|
error_count = fault_fs_guard->GetAndResetErrorCount();
|
|
|
|
}
|
|
|
|
if (s.ok()) {
|
|
|
|
if (fault_fs_guard) {
|
|
|
|
if (error_count && !SharedState::ignore_read_error) {
|
|
|
|
// Grab mutex so multiple thread don't try to print the
|
|
|
|
// stack trace at the same time
|
|
|
|
MutexLock l(thread->shared->GetMutex());
|
|
|
|
fprintf(stderr, "Didn't get expected error from Get\n");
|
|
|
|
fprintf(stderr, "Callstack that injected the fault\n");
|
|
|
|
fault_fs_guard->PrintFaultBacktrace();
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// found case
|
|
|
|
thread->stats.AddGets(1, 1);
|
|
|
|
} else if (s.IsNotFound()) {
|
|
|
|
// not found case
|
|
|
|
thread->stats.AddGets(1, 0);
|
|
|
|
} else {
|
|
|
|
if (error_count == 0) {
|
|
|
|
// errors case
|
|
|
|
thread->stats.AddErrors(1);
|
|
|
|
} else {
|
|
|
|
thread->stats.AddVerifiedErrors(1);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (fault_fs_guard) {
|
|
|
|
fault_fs_guard->DisableErrorInjection();
|
|
|
|
}
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::vector<Status> TestMultiGet(
|
|
|
|
ThreadState* thread, const ReadOptions& read_opts,
|
|
|
|
const std::vector<int>& rand_column_families,
|
|
|
|
const std::vector<int64_t>& rand_keys) override {
|
|
|
|
size_t num_keys = rand_keys.size();
|
|
|
|
std::vector<std::string> key_str;
|
|
|
|
std::vector<Slice> keys;
|
|
|
|
key_str.reserve(num_keys);
|
|
|
|
keys.reserve(num_keys);
|
|
|
|
std::vector<PinnableSlice> values(num_keys);
|
|
|
|
std::vector<Status> statuses(num_keys);
|
|
|
|
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
|
|
|
|
int error_count = 0;
|
|
|
|
// Do a consistency check between Get and MultiGet. Don't do it too
|
|
|
|
// often as it will slow db_stress down
|
|
|
|
bool do_consistency_check = thread->rand.OneIn(4);
|
|
|
|
|
|
|
|
ReadOptions readoptionscopy = read_opts;
|
|
|
|
if (do_consistency_check) {
|
|
|
|
readoptionscopy.snapshot = db_->GetSnapshot();
|
|
|
|
}
|
|
|
|
readoptionscopy.rate_limiter_priority =
|
|
|
|
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
|
|
|
|
|
|
|
|
// To appease clang analyzer
|
|
|
|
const bool use_txn = FLAGS_use_txn;
|
|
|
|
|
|
|
|
// Create a transaction in order to write some data. The purpose is to
|
|
|
|
// exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction
|
|
|
|
// will be rolled back once MultiGet returns.
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
Transaction* txn = nullptr;
|
|
|
|
if (use_txn) {
|
|
|
|
WriteOptions wo;
|
Rate-limit automatic WAL flush after each user write (#9607)
Summary:
**Context:**
WAL flush is currently not rate-limited by `Options::rate_limiter`. This PR is to provide rate-limiting to auto WAL flush, the one that automatically happen after each user write operation (i.e, `Options::manual_wal_flush == false`), by adding `WriteOptions::rate_limiter_options`.
Note that we are NOT rate-limiting WAL flush that do NOT automatically happen after each user write, such as `Options::manual_wal_flush == true + manual FlushWAL()` (rate-limiting multiple WAL flushes), for the benefits of:
- being consistent with [ReadOptions::rate_limiter_priority](https://github.com/facebook/rocksdb/blob/7.0.fb/include/rocksdb/options.h#L515)
- being able to turn off some WAL flush's rate-limiting but not all (e.g, turn off specific the WAL flush of a critical user write like a service's heartbeat)
`WriteOptions::rate_limiter_options` only accept `Env::IO_USER` and `Env::IO_TOTAL` currently due to an implementation constraint.
- The constraint is that we currently queue parallel writes (including WAL writes) based on FIFO policy which does not factor rate limiter priority into this layer's scheduling. If we allow lower priorities such as `Env::IO_HIGH/MID/LOW` and such writes specified with lower priorities occurs before ones specified with higher priorities (even just by a tiny bit in arrival time), the former would have blocked the latter, leading to a "priority inversion" issue and contradictory to what we promise for rate-limiting priority. Therefore we only allow `Env::IO_USER` and `Env::IO_TOTAL` right now before improving that scheduling.
A pre-requisite to this feature is to support operation-level rate limiting in `WritableFileWriter`, which is also included in this PR.
**Summary:**
- Renamed test suite `DBRateLimiterTest to DBRateLimiterOnReadTest` for adding a new test suite
- Accept `rate_limiter_priority` in `WritableFileWriter`'s private and public write functions
- Passed `WriteOptions::rate_limiter_options` to `WritableFileWriter` in the path of automatic WAL flush.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9607
Test Plan:
- Added new unit test to verify existing flush/compaction rate-limiting does not break, since `DBTest, RateLimitingTest` is disabled and current db-level rate-limiting tests focus on read only (e.g, `db_rate_limiter_test`, `DBTest2, RateLimitedCompactionReads`).
- Added new unit test `DBRateLimiterOnWriteWALTest, AutoWalFlush`
- `strace -ftt -e trace=write ./db_bench -benchmarks=fillseq -db=/dev/shm/testdb -rate_limit_auto_wal_flush=1 -rate_limiter_bytes_per_sec=15 -rate_limiter_refill_period_us=1000000 -write_buffer_size=100000000 -disable_auto_compactions=1 -num=100`
- verified that WAL flush(i.e, system-call _write_) were chunked into 15 bytes and each _write_ was roughly 1 second apart
- verified the chunking disappeared when `-rate_limit_auto_wal_flush=0`
- crash test: `python3 tools/db_crashtest.py blackbox --disable_wal=0 --rate_limit_auto_wal_flush=1 --rate_limiter_bytes_per_sec=10485760 --interval=10` killed as normal
**Benchmarked on flush/compaction to ensure no performance regression:**
- compaction with rate-limiting (see table 1, avg over 1280-run): pre-change: **915635 micros/op**; post-change:
**907350 micros/op (improved by 0.106%)**
```
#!/bin/bash
TEST_TMPDIR=/dev/shm/testdb
START=1
NUM_DATA_ENTRY=8
N=10
rm -f compact_bmk_output.txt compact_bmk_output_2.txt dont_care_output.txt
for i in $(eval echo "{$START..$NUM_DATA_ENTRY}")
do
NUM_RUN=$(($N*(2**($i-1))))
for j in $(eval echo "{$START..$NUM_RUN}")
do
./db_bench --benchmarks=fillrandom -db=$TEST_TMPDIR -disable_auto_compactions=1 -write_buffer_size=6710886 > dont_care_output.txt && ./db_bench --benchmarks=compact -use_existing_db=1 -db=$TEST_TMPDIR -level0_file_num_compaction_trigger=1 -rate_limiter_bytes_per_sec=100000000 | egrep 'compact'
done > compact_bmk_output.txt && awk -v NUM_RUN=$NUM_RUN '{sum+=$3;sum_sqrt+=$3^2}END{print sum/NUM_RUN, sqrt(sum_sqrt/NUM_RUN-(sum/NUM_RUN)^2)}' compact_bmk_output.txt >> compact_bmk_output_2.txt
done
```
- compaction w/o rate-limiting (see table 2, avg over 640-run): pre-change: **822197 micros/op**; post-change: **823148 micros/op (regressed by 0.12%)**
```
Same as above script, except that -rate_limiter_bytes_per_sec=0
```
- flush with rate-limiting (see table 3, avg over 320-run, run on the [patch](https://github.com/hx235/rocksdb/commit/ee5c6023a9f6533fab9afdc681568daa21da4953) to augment current db_bench ): pre-change: **745752 micros/op**; post-change: **745331 micros/op (regressed by 0.06 %)**
```
#!/bin/bash
TEST_TMPDIR=/dev/shm/testdb
START=1
NUM_DATA_ENTRY=8
N=10
rm -f flush_bmk_output.txt flush_bmk_output_2.txt
for i in $(eval echo "{$START..$NUM_DATA_ENTRY}")
do
NUM_RUN=$(($N*(2**($i-1))))
for j in $(eval echo "{$START..$NUM_RUN}")
do
./db_bench -db=$TEST_TMPDIR -write_buffer_size=1048576000 -num=1000000 -rate_limiter_bytes_per_sec=100000000 -benchmarks=fillseq,flush | egrep 'flush'
done > flush_bmk_output.txt && awk -v NUM_RUN=$NUM_RUN '{sum+=$3;sum_sqrt+=$3^2}END{print sum/NUM_RUN, sqrt(sum_sqrt/NUM_RUN-(sum/NUM_RUN)^2)}' flush_bmk_output.txt >> flush_bmk_output_2.txt
done
```
- flush w/o rate-limiting (see table 4, avg over 320-run, run on the [patch](https://github.com/hx235/rocksdb/commit/ee5c6023a9f6533fab9afdc681568daa21da4953) to augment current db_bench): pre-change: **487512 micros/op**, post-change: **485856 micors/ops (improved by 0.34%)**
```
Same as above script, except that -rate_limiter_bytes_per_sec=0
```
| table 1 - compact with rate-limiting|
#-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%)
-- | -- | -- | -- | -- | --
10 | 896978 | 16046.9 | 901242 | 15670.9 | 0.475373978
20 | 893718 | 15813 | 886505 | 17544.7 | -0.8070778478
40 | 900426 | 23882.2 | 894958 | 15104.5 | -0.6072681153
80 | 906635 | 21761.5 | 903332 | 23948.3 | -0.3643141948
160 | 898632 | 21098.9 | 907583 | 21145 | 0.9960695813
3.20E+02 | 905252 | 22785.5 | 908106 | 25325.5 | 0.3152713278
6.40E+02 | 905213 | 23598.6 | 906741 | 21370.5 | 0.1688000504
**1.28E+03** | **908316** | **23533.1** | **907350** | **24626.8** | **-0.1063506533**
average over #-run | 901896.25 | 21064.9625 | 901977.125 | 20592.025 | 0.008967217682
| table 2 - compact w/o rate-limiting|
#-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%)
-- | -- | -- | -- | -- | --
10 | 811211 | 26996.7 | 807586 | 28456.4 | -0.4468627768
20 | 815465 | 14803.7 | 814608 | 28719.7 | -0.105093413
40 | 809203 | 26187.1 | 797835 | 25492.1 | -1.404839082
80 | 822088 | 28765.3 | 822192 | 32840.4 | 0.01265071379
160 | 821719 | 36344.7 | 821664 | 29544.9 | -0.006693285661
3.20E+02 | 820921 | 27756.4 | 821403 | 28347.7 | 0.05871454135
**6.40E+02** | **822197** | **28960.6** | **823148** | **30055.1** | **0.1156657103**
average over #-run | 8.18E+05 | 2.71E+04 | 8.15E+05 | 2.91E+04 | -0.25
| table 3 - flush with rate-limiting|
#-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%)
-- | -- | -- | -- | -- | --
10 | 741721 | 11770.8 | 740345 | 5949.76 | -0.1855144994
20 | 735169 | 3561.83 | 743199 | 9755.77 | 1.09226586
40 | 743368 | 8891.03 | 742102 | 8683.22 | -0.1703059588
80 | 742129 | 8148.51 | 743417 | 9631.58| 0.1735547324
160 | 749045 | 9757.21 | 746256 | 9191.86 | -0.3723407806
**3.20E+02** | **745752** | **9819.65** | **745331** | **9840.62** | **-0.0564530836**
6.40E+02 | 749006 | 11080.5 | 748173 | 10578.7 | -0.1112140624
average over #-run | 743741.4286 | 9004.218571 | 744117.5714 | 9090.215714 | 0.05057441238
| table 4 - flush w/o rate-limiting|
#-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%)
-- | -- | -- | -- | -- | --
10 | 477283 | 24719.6 | 473864 | 12379 | -0.7163464863
20 | 486743 | 20175.2 | 502296 | 23931.3 | 3.195320734
40 | 482846 | 15309.2 | 489820 | 22259.5 | 1.444352858
80 | 491490 | 21883.1 | 490071 | 23085.7 | -0.2887139108
160 | 493347 | 28074.3 | 483609 | 21211.7 | -1.973864238
**3.20E+02** | **487512** | **21401.5** | **485856** | **22195.2** | **-0.3396839462**
6.40E+02 | 490307 | 25418.6 | 485435 | 22405.2 | -0.9936631539
average over #-run | 4.87E+05 | 2.24E+04 | 4.87E+05 | 2.11E+04 | 0.00E+00
Reviewed By: ajkr
Differential Revision: D34442441
Pulled By: hx235
fbshipit-source-id: 4790f13e1e5c0a95ae1d1cc93ffcf69dc6e78bdd
3 years ago
|
|
|
if (FLAGS_rate_limit_auto_wal_flush) {
|
|
|
|
wo.rate_limiter_priority = Env::IO_USER;
|
|
|
|
}
|
|
|
|
Status s = NewTxn(wo, &txn);
|
|
|
|
if (!s.ok()) {
|
|
|
|
fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
for (size_t i = 0; i < num_keys; ++i) {
|
|
|
|
key_str.emplace_back(Key(rand_keys[i]));
|
|
|
|
keys.emplace_back(key_str.back());
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
if (use_txn) {
|
|
|
|
// With a 1 in 10 probability, insert the just added key in the batch
|
|
|
|
// into the transaction. This will create an overlap with the MultiGet
|
|
|
|
// keys and exercise some corner cases in the code
|
|
|
|
if (thread->rand.OneIn(10)) {
|
|
|
|
int op = thread->rand.Uniform(2);
|
|
|
|
Status s;
|
|
|
|
switch (op) {
|
|
|
|
case 0:
|
|
|
|
case 1: {
|
|
|
|
uint32_t value_base =
|
|
|
|
thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
|
|
|
|
char value[100];
|
|
|
|
size_t sz = GenerateValue(value_base, value, sizeof(value));
|
|
|
|
Slice v(value, sz);
|
|
|
|
if (op == 0) {
|
|
|
|
s = txn->Put(cfh, keys.back(), v);
|
|
|
|
} else {
|
|
|
|
s = txn->Merge(cfh, keys.back(), v);
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case 2:
|
|
|
|
s = txn->Delete(cfh, keys.back());
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
assert(false);
|
|
|
|
}
|
|
|
|
if (!s.ok()) {
|
|
|
|
fprintf(stderr, "Transaction put: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!use_txn) {
|
|
|
|
if (fault_fs_guard) {
|
|
|
|
fault_fs_guard->EnableErrorInjection();
|
|
|
|
SharedState::ignore_read_error = false;
|
|
|
|
}
|
|
|
|
db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
|
|
|
|
statuses.data());
|
|
|
|
if (fault_fs_guard) {
|
|
|
|
error_count = fault_fs_guard->GetAndResetErrorCount();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
txn->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
|
|
|
|
statuses.data());
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
|
|
|
|
if (fault_fs_guard && error_count && !SharedState::ignore_read_error) {
|
|
|
|
int stat_nok = 0;
|
|
|
|
for (const auto& s : statuses) {
|
|
|
|
if (!s.ok() && !s.IsNotFound()) {
|
|
|
|
stat_nok++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (stat_nok < error_count) {
|
|
|
|
// Grab mutex so multiple thread don't try to print the
|
|
|
|
// stack trace at the same time
|
|
|
|
MutexLock l(thread->shared->GetMutex());
|
|
|
|
fprintf(stderr, "Didn't get expected error from MultiGet. \n");
|
|
|
|
fprintf(stderr, "num_keys %zu Expected %d errors, seen %d\n", num_keys,
|
|
|
|
error_count, stat_nok);
|
|
|
|
fprintf(stderr, "Callstack that injected the fault\n");
|
|
|
|
fault_fs_guard->PrintFaultBacktrace();
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (fault_fs_guard) {
|
|
|
|
fault_fs_guard->DisableErrorInjection();
|
|
|
|
}
|
|
|
|
|
|
|
|
for (size_t i = 0; i < statuses.size(); ++i) {
|
|
|
|
Status s = statuses[i];
|
|
|
|
bool is_consistent = true;
|
|
|
|
// Only do the consistency check if no error was injected and MultiGet
|
|
|
|
// didn't return an unexpected error
|
|
|
|
if (do_consistency_check && !error_count && (s.ok() || s.IsNotFound())) {
|
|
|
|
Status tmp_s;
|
|
|
|
std::string value;
|
|
|
|
|
|
|
|
if (use_txn) {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
tmp_s = txn->Get(readoptionscopy, cfh, keys[i], &value);
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
} else {
|
|
|
|
tmp_s = db_->Get(readoptionscopy, cfh, keys[i], &value);
|
|
|
|
}
|
|
|
|
if (!tmp_s.ok() && !tmp_s.IsNotFound()) {
|
|
|
|
fprintf(stderr, "Get error: %s\n", s.ToString().c_str());
|
|
|
|
is_consistent = false;
|
|
|
|
} else if (!s.ok() && tmp_s.ok()) {
|
|
|
|
fprintf(stderr, "MultiGet returned different results with key %s\n",
|
|
|
|
keys[i].ToString(true).c_str());
|
|
|
|
fprintf(stderr, "Get returned ok, MultiGet returned not found\n");
|
|
|
|
is_consistent = false;
|
|
|
|
} else if (s.ok() && tmp_s.IsNotFound()) {
|
|
|
|
fprintf(stderr, "MultiGet returned different results with key %s\n",
|
|
|
|
keys[i].ToString(true).c_str());
|
|
|
|
fprintf(stderr, "MultiGet returned ok, Get returned not found\n");
|
|
|
|
is_consistent = false;
|
|
|
|
} else if (s.ok() && value != values[i].ToString()) {
|
|
|
|
fprintf(stderr, "MultiGet returned different results with key %s\n",
|
|
|
|
keys[i].ToString(true).c_str());
|
|
|
|
fprintf(stderr, "MultiGet returned value %s\n",
|
|
|
|
values[i].ToString(true).c_str());
|
|
|
|
fprintf(stderr, "Get returned value %s\n", value.c_str());
|
|
|
|
is_consistent = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!is_consistent) {
|
|
|
|
fprintf(stderr, "TestMultiGet error: is_consistent is false\n");
|
|
|
|
thread->stats.AddErrors(1);
|
|
|
|
// Fail fast to preserve the DB state
|
|
|
|
thread->shared->SetVerificationFailure();
|
|
|
|
break;
|
|
|
|
} else if (s.ok()) {
|
|
|
|
// found case
|
|
|
|
thread->stats.AddGets(1, 1);
|
|
|
|
} else if (s.IsNotFound()) {
|
|
|
|
// not found case
|
|
|
|
thread->stats.AddGets(1, 0);
|
|
|
|
} else if (s.IsMergeInProgress() && use_txn) {
|
|
|
|
// With txn this is sometimes expected.
|
|
|
|
thread->stats.AddGets(1, 1);
|
|
|
|
} else {
|
|
|
|
if (error_count == 0) {
|
|
|
|
// errors case
|
|
|
|
fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
|
|
|
|
thread->stats.AddErrors(1);
|
|
|
|
} else {
|
|
|
|
thread->stats.AddVerifiedErrors(1);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (readoptionscopy.snapshot) {
|
|
|
|
db_->ReleaseSnapshot(readoptionscopy.snapshot);
|
|
|
|
}
|
|
|
|
if (use_txn) {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
RollbackTxn(txn);
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
return statuses;
|
|
|
|
}
|
|
|
|
|
|
|
|
Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
|
|
|
|
const std::vector<int>& rand_column_families,
|
|
|
|
const std::vector<int64_t>& rand_keys) override {
|
|
|
|
auto cfh = column_families_[rand_column_families[0]];
|
|
|
|
std::string key_str = Key(rand_keys[0]);
|
|
|
|
Slice key = key_str;
|
|
|
|
Slice prefix = Slice(key.data(), FLAGS_prefix_size);
|
|
|
|
|
|
|
|
std::string upper_bound;
|
|
|
|
Slice ub_slice;
|
|
|
|
ReadOptions ro_copy = read_opts;
|
|
|
|
// Get the next prefix first and then see if we want to set upper bound.
|
|
|
|
// We'll use the next prefix in an assertion later on
|
|
|
|
if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
|
|
|
|
// For half of the time, set the upper bound to the next prefix
|
|
|
|
ub_slice = Slice(upper_bound);
|
|
|
|
ro_copy.iterate_upper_bound = &ub_slice;
|
|
|
|
}
|
|
|
|
|
|
|
|
Iterator* iter = db_->NewIterator(ro_copy, cfh);
|
|
|
|
unsigned long count = 0;
|
|
|
|
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
|
|
|
|
iter->Next()) {
|
|
|
|
++count;
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
|
|
|
|
|
|
|
|
Status s = iter->status();
|
|
|
|
if (iter->status().ok()) {
|
|
|
|
thread->stats.AddPrefixes(1, count);
|
|
|
|
} else {
|
|
|
|
fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
|
|
|
|
thread->stats.AddErrors(1);
|
|
|
|
}
|
|
|
|
delete iter;
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
Status TestPut(ThreadState* thread, WriteOptions& write_opts,
|
|
|
|
const ReadOptions& read_opts,
|
|
|
|
const std::vector<int>& rand_column_families,
|
|
|
|
const std::vector<int64_t>& rand_keys, char (&value)[100],
|
|
|
|
std::unique_ptr<MutexLock>& lock) override {
|
|
|
|
auto shared = thread->shared;
|
|
|
|
int64_t max_key = shared->GetMaxKey();
|
|
|
|
int64_t rand_key = rand_keys[0];
|
|
|
|
int rand_column_family = rand_column_families[0];
|
Add user-defined timestamps to db_stress (#8061)
Summary:
Add some basic test for user-defined timestamp to db_stress. Currently,
read with timestamp always tries to read using the current timestamp.
Due to the per-key timestamp-sequence ordering constraint, we only add timestamp-
related tests to the `NonBatchedOpsStressTest` since this test serializes accesses
to the same key and uses a file to cross-check data correctness.
The timestamp feature is not supported in a number of components, e.g. Merge, SingleDelete,
DeleteRange, CompactionFilter, Readonly instance, secondary instance, SST file ingestion, transaction,
etc. Therefore, db_stress should exit if user enables both timestamp and these features at the same
time. The (currently) incompatible features can be found in
`CheckAndSetOptionsForUserTimestamp`.
This PR also fixes a bug triggered when timestamp is enabled together with
`index_type=kBinarySearchWithFirstKey`. This bug fix will also be in another separate PR
with more unit tests coverage. Fixing it here because I do not want to exclude the index type
from crash test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8061
Test Plan: make crash_test_with_ts
Reviewed By: jay-zhuang
Differential Revision: D27056282
Pulled By: riversand963
fbshipit-source-id: c3e00ad1023fdb9ebbdf9601ec18270c5e2925a9
4 years ago
|
|
|
std::string write_ts_str;
|
|
|
|
Slice write_ts;
|
|
|
|
while (!shared->AllowsOverwrite(rand_key) &&
|
|
|
|
(FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
|
|
|
|
lock.reset();
|
|
|
|
rand_key = thread->rand.Next() % max_key;
|
|
|
|
rand_column_family = thread->rand.Next() % FLAGS_column_families;
|
|
|
|
lock.reset(
|
|
|
|
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
|
Add user-defined timestamps to db_stress (#8061)
Summary:
Add some basic test for user-defined timestamp to db_stress. Currently,
read with timestamp always tries to read using the current timestamp.
Due to the per-key timestamp-sequence ordering constraint, we only add timestamp-
related tests to the `NonBatchedOpsStressTest` since this test serializes accesses
to the same key and uses a file to cross-check data correctness.
The timestamp feature is not supported in a number of components, e.g. Merge, SingleDelete,
DeleteRange, CompactionFilter, Readonly instance, secondary instance, SST file ingestion, transaction,
etc. Therefore, db_stress should exit if user enables both timestamp and these features at the same
time. The (currently) incompatible features can be found in
`CheckAndSetOptionsForUserTimestamp`.
This PR also fixes a bug triggered when timestamp is enabled together with
`index_type=kBinarySearchWithFirstKey`. This bug fix will also be in another separate PR
with more unit tests coverage. Fixing it here because I do not want to exclude the index type
from crash test.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8061
Test Plan: make crash_test_with_ts
Reviewed By: jay-zhuang
Differential Revision: D27056282
Pulled By: riversand963
fbshipit-source-id: c3e00ad1023fdb9ebbdf9601ec18270c5e2925a9
4 years ago
|
|
|
if (FLAGS_user_timestamp_size > 0) {
|
|
|
|
write_ts_str = NowNanosStr();
|
|
|
|
write_ts = write_ts_str;
|
|
|
|
}
|
|
|
|
}
|
Revise APIs related to user-defined timestamp (#8946)
Summary:
ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`.
Namely, `WriteOptions` should not include information about "what-to-write", but should just
include information about "how-to-write".
According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore,
this PR removes `WriteOptions::timestamp` for compliance.
After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set
of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and
`SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity
made me reconsider doing it in another PR (maybe).
For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take
extra `timestamp` information when writing to `WriteBatch`es.
These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list.
Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to
`WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps
allocated already and multiple timestamps can be updated.
The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp
size of the default column family. This will be used to allocate space when calling APIs that do not
specify a column family handle.
Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing
some assertions about timestamp to returning Status code.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946
Test Plan:
make check
./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8
./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0
Make sure there is no perf regression by running the following
```
./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom
```
Before this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s
```
After this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s
```
Reviewed By: ltamasi
Differential Revision: D33721359
Pulled By: riversand963
fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
3 years ago
|
|
|
if (write_ts.size() == 0 && FLAGS_user_timestamp_size) {
|
|
|
|
write_ts_str = NowNanosStr();
|
|
|
|
write_ts = write_ts_str;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string key_str = Key(rand_key);
|
|
|
|
Slice key = key_str;
|
|
|
|
ColumnFamilyHandle* cfh = column_families_[rand_column_family];
|
|
|
|
|
|
|
|
if (FLAGS_verify_before_write) {
|
|
|
|
std::string key_str2 = Key(rand_key);
|
|
|
|
Slice k = key_str2;
|
|
|
|
std::string from_db;
|
|
|
|
Status s = db_->Get(read_opts, cfh, k, &from_db);
|
|
|
|
if (!VerifyOrSyncValue(rand_column_family, rand_key, read_opts, shared,
|
|
|
|
from_db, s, true)) {
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
|
|
|
|
size_t sz = GenerateValue(value_base, value, sizeof(value));
|
|
|
|
Slice v(value, sz);
|
|
|
|
shared->Put(rand_column_family, rand_key, value_base, true /* pending */);
|
|
|
|
Status s;
|
|
|
|
if (FLAGS_use_merge) {
|
|
|
|
if (!FLAGS_use_txn) {
|
|
|
|
s = db_->Merge(write_opts, cfh, key, v);
|
|
|
|
} else {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
Transaction* txn;
|
|
|
|
s = NewTxn(write_opts, &txn);
|
|
|
|
if (s.ok()) {
|
|
|
|
s = txn->Merge(cfh, key, v);
|
|
|
|
if (s.ok()) {
|
Snapshots with user-specified timestamps (#9879)
Summary:
In RocksDB, keys are associated with (internal) sequence numbers which denote when the keys are written
to the database. Sequence numbers in different RocksDB instances are unrelated, thus not comparable.
It is nice if we can associate sequence numbers with their corresponding actual timestamps. One thing we can
do is to support user-defined timestamp, which allows the applications to specify the format of custom timestamps
and encode a timestamp with each key. More details can be found at https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp-%28Experimental%29.
This PR provides a different but complementary approach. We can associate rocksdb snapshots (defined in
https://github.com/facebook/rocksdb/blob/7.2.fb/include/rocksdb/snapshot.h#L20) with **user-specified** timestamps.
Since a snapshot is essentially an object representing a sequence number, this PR establishes a bi-directional mapping between sequence numbers and timestamps.
In the past, snapshots are usually taken by readers. The current super-version is grabbed, and a `rocksdb::Snapshot`
object is created with the last published sequence number of the super-version. You can see that the reader actually
has no good idea of what timestamp to assign to this snapshot, because by the time the `GetSnapshot()` is called,
an arbitrarily long period of time may have already elapsed since the last write, which is when the last published
sequence number is written.
This observation motivates the creation of "timestamped" snapshots on the write path. Currently, this functionality is
exposed only to the layer of `TransactionDB`. Application can tell RocksDB to create a snapshot when a transaction
commits, effectively associating the last sequence number with a timestamp. It is also assumed that application will
ensure any two snapshots with timestamps should satisfy the following:
```
snapshot1.seq < snapshot2.seq iff. snapshot1.ts < snapshot2.ts
```
If the application can guarantee that when a reader takes a timestamped snapshot, there is no active writes going on
in the database, then we also allow the user to use a new API `TransactionDB::CreateTimestampedSnapshot()` to create
a snapshot with associated timestamp.
Code example
```cpp
// Create a timestamped snapshot when committing transaction.
txn->SetCommitTimestamp(100);
txn->SetSnapshotOnNextOperation();
txn->Commit();
// A wrapper API for convenience
Status Transaction::CommitAndTryCreateSnapshot(
std::shared_ptr<TransactionNotifier> notifier,
TxnTimestamp ts,
std::shared_ptr<const Snapshot>* ret);
// Create a timestamped snapshot if caller guarantees no concurrent writes
std::pair<Status, std::shared_ptr<const Snapshot>> snapshot = txn_db->CreateTimestampedSnapshot(100);
```
The snapshots created in this way will be managed by RocksDB with ref-counting and potentially shared with
other readers. We provide the following APIs for readers to retrieve a snapshot given a timestamp.
```cpp
// Return the timestamped snapshot correponding to given timestamp. If ts is
// kMaxTxnTimestamp, then we return the latest timestamped snapshot if present.
// Othersise, we return the snapshot whose timestamp is equal to `ts`. If no
// such snapshot exists, then we return null.
std::shared_ptr<const Snapshot> TransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const;
// Return the latest timestamped snapshot if present.
std::shared_ptr<const Snapshot> TransactionDB::GetLatestTimestampedSnapshot() const;
```
We also provide two additional APIs for stats collection and reporting purposes.
```cpp
Status TransactionDB::GetAllTimestampedSnapshots(
std::vector<std::shared_ptr<const Snapshot>>& snapshots) const;
// Return timestamped snapshots whose timestamps fall in [ts_lb, ts_ub) and store them in `snapshots`.
Status TransactionDB::GetTimestampedSnapshots(
TxnTimestamp ts_lb,
TxnTimestamp ts_ub,
std::vector<std::shared_ptr<const Snapshot>>& snapshots) const;
```
To prevent the number of timestamped snapshots from growing infinitely, we provide the following API to release
timestamped snapshots whose timestamps are older than or equal to a given threshold.
```cpp
void TransactionDB::ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts);
```
Before shutdown, RocksDB will release all timestamped snapshots.
Comparison with user-defined timestamp and how they can be combined:
User-defined timestamp persists every key with a timestamp, while timestamped snapshots maintain a volatile
mapping between snapshots (sequence numbers) and timestamps.
Different internal keys with the same user key but different timestamps will be treated as different by compaction,
thus a newer version will not hide older versions (with smaller timestamps) unless they are eligible for garbage collection.
In contrast, taking a timestamped snapshot at a certain sequence number and timestamp prevents all the keys visible in
this snapshot from been dropped by compaction. Here, visible means (seq < snapshot and most recent).
The timestamped snapshot supports the semantics of reading at an exact point in time.
Timestamped snapshots can also be used with user-defined timestamp.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9879
Test Plan:
```
make check
TEST_TMPDIR=/dev/shm make crash_test_with_txn
```
Reviewed By: siying
Differential Revision: D35783919
Pulled By: riversand963
fbshipit-source-id: 586ad905e169189e19d3bfc0cb0177a7239d1bd4
3 years ago
|
|
|
s = CommitTxn(txn, thread);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if (!FLAGS_use_txn) {
|
Revise APIs related to user-defined timestamp (#8946)
Summary:
ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`.
Namely, `WriteOptions` should not include information about "what-to-write", but should just
include information about "how-to-write".
According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore,
this PR removes `WriteOptions::timestamp` for compliance.
After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set
of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and
`SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity
made me reconsider doing it in another PR (maybe).
For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take
extra `timestamp` information when writing to `WriteBatch`es.
These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list.
Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to
`WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps
allocated already and multiple timestamps can be updated.
The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp
size of the default column family. This will be used to allocate space when calling APIs that do not
specify a column family handle.
Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing
some assertions about timestamp to returning Status code.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946
Test Plan:
make check
./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8
./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0
Make sure there is no perf regression by running the following
```
./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom
```
Before this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s
```
After this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s
```
Reviewed By: ltamasi
Differential Revision: D33721359
Pulled By: riversand963
fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
3 years ago
|
|
|
if (FLAGS_user_timestamp_size == 0) {
|
|
|
|
s = db_->Put(write_opts, cfh, key, v);
|
|
|
|
} else {
|
|
|
|
s = db_->Put(write_opts, cfh, key, write_ts, v);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
Transaction* txn;
|
|
|
|
s = NewTxn(write_opts, &txn);
|
|
|
|
if (s.ok()) {
|
|
|
|
s = txn->Put(cfh, key, v);
|
|
|
|
if (s.ok()) {
|
Snapshots with user-specified timestamps (#9879)
Summary:
In RocksDB, keys are associated with (internal) sequence numbers which denote when the keys are written
to the database. Sequence numbers in different RocksDB instances are unrelated, thus not comparable.
It is nice if we can associate sequence numbers with their corresponding actual timestamps. One thing we can
do is to support user-defined timestamp, which allows the applications to specify the format of custom timestamps
and encode a timestamp with each key. More details can be found at https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp-%28Experimental%29.
This PR provides a different but complementary approach. We can associate rocksdb snapshots (defined in
https://github.com/facebook/rocksdb/blob/7.2.fb/include/rocksdb/snapshot.h#L20) with **user-specified** timestamps.
Since a snapshot is essentially an object representing a sequence number, this PR establishes a bi-directional mapping between sequence numbers and timestamps.
In the past, snapshots are usually taken by readers. The current super-version is grabbed, and a `rocksdb::Snapshot`
object is created with the last published sequence number of the super-version. You can see that the reader actually
has no good idea of what timestamp to assign to this snapshot, because by the time the `GetSnapshot()` is called,
an arbitrarily long period of time may have already elapsed since the last write, which is when the last published
sequence number is written.
This observation motivates the creation of "timestamped" snapshots on the write path. Currently, this functionality is
exposed only to the layer of `TransactionDB`. Application can tell RocksDB to create a snapshot when a transaction
commits, effectively associating the last sequence number with a timestamp. It is also assumed that application will
ensure any two snapshots with timestamps should satisfy the following:
```
snapshot1.seq < snapshot2.seq iff. snapshot1.ts < snapshot2.ts
```
If the application can guarantee that when a reader takes a timestamped snapshot, there is no active writes going on
in the database, then we also allow the user to use a new API `TransactionDB::CreateTimestampedSnapshot()` to create
a snapshot with associated timestamp.
Code example
```cpp
// Create a timestamped snapshot when committing transaction.
txn->SetCommitTimestamp(100);
txn->SetSnapshotOnNextOperation();
txn->Commit();
// A wrapper API for convenience
Status Transaction::CommitAndTryCreateSnapshot(
std::shared_ptr<TransactionNotifier> notifier,
TxnTimestamp ts,
std::shared_ptr<const Snapshot>* ret);
// Create a timestamped snapshot if caller guarantees no concurrent writes
std::pair<Status, std::shared_ptr<const Snapshot>> snapshot = txn_db->CreateTimestampedSnapshot(100);
```
The snapshots created in this way will be managed by RocksDB with ref-counting and potentially shared with
other readers. We provide the following APIs for readers to retrieve a snapshot given a timestamp.
```cpp
// Return the timestamped snapshot correponding to given timestamp. If ts is
// kMaxTxnTimestamp, then we return the latest timestamped snapshot if present.
// Othersise, we return the snapshot whose timestamp is equal to `ts`. If no
// such snapshot exists, then we return null.
std::shared_ptr<const Snapshot> TransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const;
// Return the latest timestamped snapshot if present.
std::shared_ptr<const Snapshot> TransactionDB::GetLatestTimestampedSnapshot() const;
```
We also provide two additional APIs for stats collection and reporting purposes.
```cpp
Status TransactionDB::GetAllTimestampedSnapshots(
std::vector<std::shared_ptr<const Snapshot>>& snapshots) const;
// Return timestamped snapshots whose timestamps fall in [ts_lb, ts_ub) and store them in `snapshots`.
Status TransactionDB::GetTimestampedSnapshots(
TxnTimestamp ts_lb,
TxnTimestamp ts_ub,
std::vector<std::shared_ptr<const Snapshot>>& snapshots) const;
```
To prevent the number of timestamped snapshots from growing infinitely, we provide the following API to release
timestamped snapshots whose timestamps are older than or equal to a given threshold.
```cpp
void TransactionDB::ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts);
```
Before shutdown, RocksDB will release all timestamped snapshots.
Comparison with user-defined timestamp and how they can be combined:
User-defined timestamp persists every key with a timestamp, while timestamped snapshots maintain a volatile
mapping between snapshots (sequence numbers) and timestamps.
Different internal keys with the same user key but different timestamps will be treated as different by compaction,
thus a newer version will not hide older versions (with smaller timestamps) unless they are eligible for garbage collection.
In contrast, taking a timestamped snapshot at a certain sequence number and timestamp prevents all the keys visible in
this snapshot from been dropped by compaction. Here, visible means (seq < snapshot and most recent).
The timestamped snapshot supports the semantics of reading at an exact point in time.
Timestamped snapshots can also be used with user-defined timestamp.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9879
Test Plan:
```
make check
TEST_TMPDIR=/dev/shm make crash_test_with_txn
```
Reviewed By: siying
Differential Revision: D35783919
Pulled By: riversand963
fbshipit-source-id: 586ad905e169189e19d3bfc0cb0177a7239d1bd4
3 years ago
|
|
|
s = CommitTxn(txn, thread);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
}
|
|
|
|
shared->Put(rand_column_family, rand_key, value_base, false /* pending */);
|
|
|
|
if (!s.ok()) {
|
|
|
|
if (FLAGS_injest_error_severity >= 2) {
|
|
|
|
if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
|
|
|
|
is_db_stopped_ = true;
|
|
|
|
} else if (!is_db_stopped_ ||
|
|
|
|
s.severity() < Status::Severity::kFatalError) {
|
|
|
|
fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
thread->stats.AddBytesForWrites(1, sz);
|
|
|
|
PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key), value,
|
|
|
|
sz);
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
|
|
|
|
const std::vector<int>& rand_column_families,
|
|
|
|
const std::vector<int64_t>& rand_keys,
|
|
|
|
std::unique_ptr<MutexLock>& /* lock */) override {
|
|
|
|
int64_t rand_key = rand_keys[0];
|
|
|
|
int rand_column_family = rand_column_families[0];
|
|
|
|
auto shared = thread->shared;
|
|
|
|
|
|
|
|
// OPERATION delete
|
|
|
|
std::string write_ts_str = NowNanosStr();
|
|
|
|
Slice write_ts = write_ts_str;
|
|
|
|
|
|
|
|
std::string key_str = Key(rand_key);
|
|
|
|
Slice key = key_str;
|
|
|
|
auto cfh = column_families_[rand_column_family];
|
|
|
|
|
|
|
|
// Use delete if the key may be overwritten and a single deletion
|
|
|
|
// otherwise.
|
|
|
|
Status s;
|
|
|
|
if (shared->AllowsOverwrite(rand_key)) {
|
|
|
|
shared->Delete(rand_column_family, rand_key, true /* pending */);
|
|
|
|
if (!FLAGS_use_txn) {
|
Revise APIs related to user-defined timestamp (#8946)
Summary:
ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`.
Namely, `WriteOptions` should not include information about "what-to-write", but should just
include information about "how-to-write".
According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore,
this PR removes `WriteOptions::timestamp` for compliance.
After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set
of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and
`SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity
made me reconsider doing it in another PR (maybe).
For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take
extra `timestamp` information when writing to `WriteBatch`es.
These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list.
Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to
`WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps
allocated already and multiple timestamps can be updated.
The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp
size of the default column family. This will be used to allocate space when calling APIs that do not
specify a column family handle.
Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing
some assertions about timestamp to returning Status code.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946
Test Plan:
make check
./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8
./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0
Make sure there is no perf regression by running the following
```
./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom
```
Before this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s
```
After this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s
```
Reviewed By: ltamasi
Differential Revision: D33721359
Pulled By: riversand963
fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
3 years ago
|
|
|
if (FLAGS_user_timestamp_size == 0) {
|
|
|
|
s = db_->Delete(write_opts, cfh, key);
|
|
|
|
} else {
|
|
|
|
s = db_->Delete(write_opts, cfh, key, write_ts);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
Transaction* txn;
|
|
|
|
s = NewTxn(write_opts, &txn);
|
|
|
|
if (s.ok()) {
|
|
|
|
s = txn->Delete(cfh, key);
|
|
|
|
if (s.ok()) {
|
Snapshots with user-specified timestamps (#9879)
Summary:
In RocksDB, keys are associated with (internal) sequence numbers which denote when the keys are written
to the database. Sequence numbers in different RocksDB instances are unrelated, thus not comparable.
It is nice if we can associate sequence numbers with their corresponding actual timestamps. One thing we can
do is to support user-defined timestamp, which allows the applications to specify the format of custom timestamps
and encode a timestamp with each key. More details can be found at https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp-%28Experimental%29.
This PR provides a different but complementary approach. We can associate rocksdb snapshots (defined in
https://github.com/facebook/rocksdb/blob/7.2.fb/include/rocksdb/snapshot.h#L20) with **user-specified** timestamps.
Since a snapshot is essentially an object representing a sequence number, this PR establishes a bi-directional mapping between sequence numbers and timestamps.
In the past, snapshots are usually taken by readers. The current super-version is grabbed, and a `rocksdb::Snapshot`
object is created with the last published sequence number of the super-version. You can see that the reader actually
has no good idea of what timestamp to assign to this snapshot, because by the time the `GetSnapshot()` is called,
an arbitrarily long period of time may have already elapsed since the last write, which is when the last published
sequence number is written.
This observation motivates the creation of "timestamped" snapshots on the write path. Currently, this functionality is
exposed only to the layer of `TransactionDB`. Application can tell RocksDB to create a snapshot when a transaction
commits, effectively associating the last sequence number with a timestamp. It is also assumed that application will
ensure any two snapshots with timestamps should satisfy the following:
```
snapshot1.seq < snapshot2.seq iff. snapshot1.ts < snapshot2.ts
```
If the application can guarantee that when a reader takes a timestamped snapshot, there is no active writes going on
in the database, then we also allow the user to use a new API `TransactionDB::CreateTimestampedSnapshot()` to create
a snapshot with associated timestamp.
Code example
```cpp
// Create a timestamped snapshot when committing transaction.
txn->SetCommitTimestamp(100);
txn->SetSnapshotOnNextOperation();
txn->Commit();
// A wrapper API for convenience
Status Transaction::CommitAndTryCreateSnapshot(
std::shared_ptr<TransactionNotifier> notifier,
TxnTimestamp ts,
std::shared_ptr<const Snapshot>* ret);
// Create a timestamped snapshot if caller guarantees no concurrent writes
std::pair<Status, std::shared_ptr<const Snapshot>> snapshot = txn_db->CreateTimestampedSnapshot(100);
```
The snapshots created in this way will be managed by RocksDB with ref-counting and potentially shared with
other readers. We provide the following APIs for readers to retrieve a snapshot given a timestamp.
```cpp
// Return the timestamped snapshot correponding to given timestamp. If ts is
// kMaxTxnTimestamp, then we return the latest timestamped snapshot if present.
// Othersise, we return the snapshot whose timestamp is equal to `ts`. If no
// such snapshot exists, then we return null.
std::shared_ptr<const Snapshot> TransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const;
// Return the latest timestamped snapshot if present.
std::shared_ptr<const Snapshot> TransactionDB::GetLatestTimestampedSnapshot() const;
```
We also provide two additional APIs for stats collection and reporting purposes.
```cpp
Status TransactionDB::GetAllTimestampedSnapshots(
std::vector<std::shared_ptr<const Snapshot>>& snapshots) const;
// Return timestamped snapshots whose timestamps fall in [ts_lb, ts_ub) and store them in `snapshots`.
Status TransactionDB::GetTimestampedSnapshots(
TxnTimestamp ts_lb,
TxnTimestamp ts_ub,
std::vector<std::shared_ptr<const Snapshot>>& snapshots) const;
```
To prevent the number of timestamped snapshots from growing infinitely, we provide the following API to release
timestamped snapshots whose timestamps are older than or equal to a given threshold.
```cpp
void TransactionDB::ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts);
```
Before shutdown, RocksDB will release all timestamped snapshots.
Comparison with user-defined timestamp and how they can be combined:
User-defined timestamp persists every key with a timestamp, while timestamped snapshots maintain a volatile
mapping between snapshots (sequence numbers) and timestamps.
Different internal keys with the same user key but different timestamps will be treated as different by compaction,
thus a newer version will not hide older versions (with smaller timestamps) unless they are eligible for garbage collection.
In contrast, taking a timestamped snapshot at a certain sequence number and timestamp prevents all the keys visible in
this snapshot from been dropped by compaction. Here, visible means (seq < snapshot and most recent).
The timestamped snapshot supports the semantics of reading at an exact point in time.
Timestamped snapshots can also be used with user-defined timestamp.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9879
Test Plan:
```
make check
TEST_TMPDIR=/dev/shm make crash_test_with_txn
```
Reviewed By: siying
Differential Revision: D35783919
Pulled By: riversand963
fbshipit-source-id: 586ad905e169189e19d3bfc0cb0177a7239d1bd4
3 years ago
|
|
|
s = CommitTxn(txn, thread);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
shared->Delete(rand_column_family, rand_key, false /* pending */);
|
|
|
|
thread->stats.AddDeletes(1);
|
|
|
|
if (!s.ok()) {
|
|
|
|
if (FLAGS_injest_error_severity >= 2) {
|
|
|
|
if (!is_db_stopped_ &&
|
|
|
|
s.severity() >= Status::Severity::kFatalError) {
|
|
|
|
is_db_stopped_ = true;
|
|
|
|
} else if (!is_db_stopped_ ||
|
|
|
|
s.severity() < Status::Severity::kFatalError) {
|
|
|
|
fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
shared->SingleDelete(rand_column_family, rand_key, true /* pending */);
|
|
|
|
if (!FLAGS_use_txn) {
|
Revise APIs related to user-defined timestamp (#8946)
Summary:
ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`.
Namely, `WriteOptions` should not include information about "what-to-write", but should just
include information about "how-to-write".
According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore,
this PR removes `WriteOptions::timestamp` for compliance.
After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set
of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and
`SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity
made me reconsider doing it in another PR (maybe).
For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take
extra `timestamp` information when writing to `WriteBatch`es.
These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list.
Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to
`WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps
allocated already and multiple timestamps can be updated.
The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp
size of the default column family. This will be used to allocate space when calling APIs that do not
specify a column family handle.
Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing
some assertions about timestamp to returning Status code.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946
Test Plan:
make check
./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8
./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0
Make sure there is no perf regression by running the following
```
./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom
```
Before this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s
```
After this PR
```
DB path: [/dev/shm/rocksdb]
fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s
```
Reviewed By: ltamasi
Differential Revision: D33721359
Pulled By: riversand963
fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
3 years ago
|
|
|
if (FLAGS_user_timestamp_size == 0) {
|
|
|
|
s = db_->SingleDelete(write_opts, cfh, key);
|
|
|
|
} else {
|
|
|
|
s = db_->SingleDelete(write_opts, cfh, key, write_ts);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
Transaction* txn;
|
|
|
|
s = NewTxn(write_opts, &txn);
|
|
|
|
if (s.ok()) {
|
|
|
|
s = txn->SingleDelete(cfh, key);
|
|
|
|
if (s.ok()) {
|
Snapshots with user-specified timestamps (#9879)
Summary:
In RocksDB, keys are associated with (internal) sequence numbers which denote when the keys are written
to the database. Sequence numbers in different RocksDB instances are unrelated, thus not comparable.
It is nice if we can associate sequence numbers with their corresponding actual timestamps. One thing we can
do is to support user-defined timestamp, which allows the applications to specify the format of custom timestamps
and encode a timestamp with each key. More details can be found at https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp-%28Experimental%29.
This PR provides a different but complementary approach. We can associate rocksdb snapshots (defined in
https://github.com/facebook/rocksdb/blob/7.2.fb/include/rocksdb/snapshot.h#L20) with **user-specified** timestamps.
Since a snapshot is essentially an object representing a sequence number, this PR establishes a bi-directional mapping between sequence numbers and timestamps.
In the past, snapshots are usually taken by readers. The current super-version is grabbed, and a `rocksdb::Snapshot`
object is created with the last published sequence number of the super-version. You can see that the reader actually
has no good idea of what timestamp to assign to this snapshot, because by the time the `GetSnapshot()` is called,
an arbitrarily long period of time may have already elapsed since the last write, which is when the last published
sequence number is written.
This observation motivates the creation of "timestamped" snapshots on the write path. Currently, this functionality is
exposed only to the layer of `TransactionDB`. Application can tell RocksDB to create a snapshot when a transaction
commits, effectively associating the last sequence number with a timestamp. It is also assumed that application will
ensure any two snapshots with timestamps should satisfy the following:
```
snapshot1.seq < snapshot2.seq iff. snapshot1.ts < snapshot2.ts
```
If the application can guarantee that when a reader takes a timestamped snapshot, there is no active writes going on
in the database, then we also allow the user to use a new API `TransactionDB::CreateTimestampedSnapshot()` to create
a snapshot with associated timestamp.
Code example
```cpp
// Create a timestamped snapshot when committing transaction.
txn->SetCommitTimestamp(100);
txn->SetSnapshotOnNextOperation();
txn->Commit();
// A wrapper API for convenience
Status Transaction::CommitAndTryCreateSnapshot(
std::shared_ptr<TransactionNotifier> notifier,
TxnTimestamp ts,
std::shared_ptr<const Snapshot>* ret);
// Create a timestamped snapshot if caller guarantees no concurrent writes
std::pair<Status, std::shared_ptr<const Snapshot>> snapshot = txn_db->CreateTimestampedSnapshot(100);
```
The snapshots created in this way will be managed by RocksDB with ref-counting and potentially shared with
other readers. We provide the following APIs for readers to retrieve a snapshot given a timestamp.
```cpp
// Return the timestamped snapshot correponding to given timestamp. If ts is
// kMaxTxnTimestamp, then we return the latest timestamped snapshot if present.
// Othersise, we return the snapshot whose timestamp is equal to `ts`. If no
// such snapshot exists, then we return null.
std::shared_ptr<const Snapshot> TransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const;
// Return the latest timestamped snapshot if present.
std::shared_ptr<const Snapshot> TransactionDB::GetLatestTimestampedSnapshot() const;
```
We also provide two additional APIs for stats collection and reporting purposes.
```cpp
Status TransactionDB::GetAllTimestampedSnapshots(
std::vector<std::shared_ptr<const Snapshot>>& snapshots) const;
// Return timestamped snapshots whose timestamps fall in [ts_lb, ts_ub) and store them in `snapshots`.
Status TransactionDB::GetTimestampedSnapshots(
TxnTimestamp ts_lb,
TxnTimestamp ts_ub,
std::vector<std::shared_ptr<const Snapshot>>& snapshots) const;
```
To prevent the number of timestamped snapshots from growing infinitely, we provide the following API to release
timestamped snapshots whose timestamps are older than or equal to a given threshold.
```cpp
void TransactionDB::ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts);
```
Before shutdown, RocksDB will release all timestamped snapshots.
Comparison with user-defined timestamp and how they can be combined:
User-defined timestamp persists every key with a timestamp, while timestamped snapshots maintain a volatile
mapping between snapshots (sequence numbers) and timestamps.
Different internal keys with the same user key but different timestamps will be treated as different by compaction,
thus a newer version will not hide older versions (with smaller timestamps) unless they are eligible for garbage collection.
In contrast, taking a timestamped snapshot at a certain sequence number and timestamp prevents all the keys visible in
this snapshot from been dropped by compaction. Here, visible means (seq < snapshot and most recent).
The timestamped snapshot supports the semantics of reading at an exact point in time.
Timestamped snapshots can also be used with user-defined timestamp.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/9879
Test Plan:
```
make check
TEST_TMPDIR=/dev/shm make crash_test_with_txn
```
Reviewed By: siying
Differential Revision: D35783919
Pulled By: riversand963
fbshipit-source-id: 586ad905e169189e19d3bfc0cb0177a7239d1bd4
3 years ago
|
|
|
s = CommitTxn(txn, thread);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
shared->SingleDelete(rand_column_family, rand_key, false /* pending */);
|
|
|
|
thread->stats.AddSingleDeletes(1);
|
|
|
|
if (!s.ok()) {
|
|
|
|
if (FLAGS_injest_error_severity >= 2) {
|
|
|
|
if (!is_db_stopped_ &&
|
|
|
|
s.severity() >= Status::Severity::kFatalError) {
|
|
|
|
is_db_stopped_ = true;
|
|
|
|
} else if (!is_db_stopped_ ||
|
|
|
|
s.severity() < Status::Severity::kFatalError) {
|
|
|
|
fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
|
|
|
|
const std::vector<int>& rand_column_families,
|
|
|
|
const std::vector<int64_t>& rand_keys,
|
|
|
|
std::unique_ptr<MutexLock>& lock) override {
|
|
|
|
// OPERATION delete range
|
|
|
|
std::vector<std::unique_ptr<MutexLock>> range_locks;
|
|
|
|
// delete range does not respect disallowed overwrites. the keys for
|
|
|
|
// which overwrites are disallowed are randomly distributed so it
|
|
|
|
// could be expensive to find a range where each key allows
|
|
|
|
// overwrites.
|
|
|
|
int64_t rand_key = rand_keys[0];
|
|
|
|
int rand_column_family = rand_column_families[0];
|
|
|
|
auto shared = thread->shared;
|
|
|
|
int64_t max_key = shared->GetMaxKey();
|
|
|
|
if (rand_key > max_key - FLAGS_range_deletion_width) {
|
|
|
|
lock.reset();
|
|
|
|
rand_key =
|
|
|
|
thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
|
|
|
|
range_locks.emplace_back(
|
|
|
|
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
|
|
|
|
} else {
|
|
|
|
range_locks.emplace_back(std::move(lock));
|
|
|
|
}
|
|
|
|
for (int j = 1; j < FLAGS_range_deletion_width; ++j) {
|
|
|
|
if (((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
|
|
|
|
range_locks.emplace_back(new MutexLock(
|
|
|
|
shared->GetMutexForKey(rand_column_family, rand_key + j)));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
shared->DeleteRange(rand_column_family, rand_key,
|
|
|
|
rand_key + FLAGS_range_deletion_width,
|
|
|
|
true /* pending */);
|
|
|
|
|
|
|
|
std::string keystr = Key(rand_key);
|
|
|
|
Slice key = keystr;
|
|
|
|
auto cfh = column_families_[rand_column_family];
|
|
|
|
std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
|
|
|
|
Slice end_key = end_keystr;
|
|
|
|
Status s = db_->DeleteRange(write_opts, cfh, key, end_key);
|
|
|
|
if (!s.ok()) {
|
|
|
|
if (FLAGS_injest_error_severity >= 2) {
|
|
|
|
if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
|
|
|
|
is_db_stopped_ = true;
|
|
|
|
} else if (!is_db_stopped_ ||
|
|
|
|
s.severity() < Status::Severity::kFatalError) {
|
|
|
|
fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
int covered = shared->DeleteRange(rand_column_family, rand_key,
|
|
|
|
rand_key + FLAGS_range_deletion_width,
|
|
|
|
false /* pending */);
|
|
|
|
thread->stats.AddRangeDeletions(1);
|
|
|
|
thread->stats.AddCoveredByRangeDeletions(covered);
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
#ifdef ROCKSDB_LITE
|
|
|
|
void TestIngestExternalFile(
|
|
|
|
ThreadState* /* thread */,
|
|
|
|
const std::vector<int>& /* rand_column_families */,
|
|
|
|
const std::vector<int64_t>& /* rand_keys */,
|
|
|
|
std::unique_ptr<MutexLock>& /* lock */) override {
|
|
|
|
assert(false);
|
|
|
|
fprintf(stderr,
|
|
|
|
"RocksDB lite does not support "
|
|
|
|
"TestIngestExternalFile\n");
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
#else
|
|
|
|
void TestIngestExternalFile(ThreadState* thread,
|
|
|
|
const std::vector<int>& rand_column_families,
|
|
|
|
const std::vector<int64_t>& rand_keys,
|
|
|
|
std::unique_ptr<MutexLock>& lock) override {
|
|
|
|
const std::string sst_filename =
|
|
|
|
FLAGS_db + "/." + std::to_string(thread->tid) + ".sst";
|
|
|
|
Status s;
|
|
|
|
if (db_stress_env->FileExists(sst_filename).ok()) {
|
|
|
|
// Maybe we terminated abnormally before, so cleanup to give this file
|
|
|
|
// ingestion a clean slate
|
|
|
|
s = db_stress_env->DeleteFile(sst_filename);
|
|
|
|
}
|
|
|
|
|
|
|
|
SstFileWriter sst_file_writer(EnvOptions(options_), options_);
|
|
|
|
if (s.ok()) {
|
|
|
|
s = sst_file_writer.Open(sst_filename);
|
|
|
|
}
|
|
|
|
int64_t key_base = rand_keys[0];
|
|
|
|
int column_family = rand_column_families[0];
|
|
|
|
std::vector<std::unique_ptr<MutexLock>> range_locks;
|
|
|
|
range_locks.reserve(FLAGS_ingest_external_file_width);
|
|
|
|
std::vector<int64_t> keys;
|
|
|
|
keys.reserve(FLAGS_ingest_external_file_width);
|
|
|
|
std::vector<uint32_t> values;
|
|
|
|
values.reserve(FLAGS_ingest_external_file_width);
|
|
|
|
SharedState* shared = thread->shared;
|
|
|
|
|
|
|
|
assert(FLAGS_nooverwritepercent < 100);
|
|
|
|
// Grab locks, set pending state on expected values, and add keys
|
|
|
|
for (int64_t key = key_base;
|
|
|
|
s.ok() && key < shared->GetMaxKey() &&
|
|
|
|
static_cast<int32_t>(keys.size()) < FLAGS_ingest_external_file_width;
|
|
|
|
++key) {
|
|
|
|
if (key == key_base) {
|
|
|
|
range_locks.emplace_back(std::move(lock));
|
|
|
|
} else if ((key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
|
|
|
|
range_locks.emplace_back(
|
|
|
|
new MutexLock(shared->GetMutexForKey(column_family, key)));
|
|
|
|
}
|
|
|
|
if (!shared->AllowsOverwrite(key)) {
|
|
|
|
// We could alternatively include `key` on the condition its current
|
|
|
|
// value is `DELETION_SENTINEL`.
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
keys.push_back(key);
|
|
|
|
|
|
|
|
uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
|
|
|
|
values.push_back(value_base);
|
|
|
|
shared->Put(column_family, key, value_base, true /* pending */);
|
|
|
|
|
|
|
|
char value[100];
|
|
|
|
size_t value_len = GenerateValue(value_base, value, sizeof(value));
|
|
|
|
auto key_str = Key(key);
|
|
|
|
s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len));
|
|
|
|
}
|
|
|
|
|
|
|
|
if (s.ok()) {
|
|
|
|
s = sst_file_writer.Finish();
|
|
|
|
}
|
|
|
|
if (s.ok()) {
|
|
|
|
s = db_->IngestExternalFile(column_families_[column_family],
|
|
|
|
{sst_filename}, IngestExternalFileOptions());
|
|
|
|
}
|
|
|
|
if (!s.ok()) {
|
|
|
|
fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str());
|
|
|
|
std::terminate();
|
|
|
|
}
|
|
|
|
for (size_t i = 0; i < keys.size(); ++i) {
|
|
|
|
shared->Put(column_family, keys[i], values[i], false /* pending */);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
|
|
|
|
bool VerifyOrSyncValue(int cf, int64_t key, const ReadOptions& /*opts*/,
|
|
|
|
SharedState* shared, const std::string& value_from_db,
|
|
|
|
const Status& s, bool strict = false) const {
|
|
|
|
if (shared->HasVerificationFailedYet()) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
// compare value_from_db with the value in the shared state
|
|
|
|
uint32_t value_base = shared->Get(cf, key);
|
|
|
|
if (value_base == SharedState::UNKNOWN_SENTINEL) {
|
|
|
|
if (s.ok()) {
|
|
|
|
// Value exists in db, update state to reflect that
|
|
|
|
Slice slice(value_from_db);
|
|
|
|
value_base = GetValueBase(slice);
|
|
|
|
shared->Put(cf, key, value_base, false);
|
|
|
|
} else if (s.IsNotFound()) {
|
|
|
|
// Value doesn't exist in db, update state to reflect that
|
|
|
|
shared->SingleDelete(cf, key, false);
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
if (value_base == SharedState::DELETION_SENTINEL && !strict) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (s.ok()) {
|
|
|
|
char value[kValueMaxLen];
|
|
|
|
if (value_base == SharedState::DELETION_SENTINEL) {
|
|
|
|
VerificationAbort(shared, "Unexpected value found", cf, key);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
size_t sz = GenerateValue(value_base, value, sizeof(value));
|
|
|
|
if (value_from_db.length() != sz) {
|
|
|
|
VerificationAbort(shared, "Length of value read is not equal", cf, key);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
if (memcmp(value_from_db.data(), value, sz) != 0) {
|
|
|
|
VerificationAbort(shared, "Contents of value read don't match", cf,
|
|
|
|
key);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if (value_base != SharedState::DELETION_SENTINEL) {
|
|
|
|
VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
void PrepareTxnDbOptions(SharedState* shared,
|
|
|
|
TransactionDBOptions& txn_db_opts) override {
|
|
|
|
txn_db_opts.rollback_deletion_type_callback =
|
|
|
|
[shared](TransactionDB*, ColumnFamilyHandle*, const Slice& key) {
|
|
|
|
assert(shared);
|
|
|
|
uint64_t key_num = 0;
|
|
|
|
bool ok = GetIntVal(key.ToString(), &key_num);
|
|
|
|
assert(ok);
|
|
|
|
(void)ok;
|
|
|
|
return !shared->AllowsOverwrite(key_num);
|
|
|
|
};
|
|
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
};
|
|
|
|
|
|
|
|
StressTest* CreateNonBatchedOpsStressTest() {
|
|
|
|
return new NonBatchedOpsStressTest();
|
|
|
|
}
|
|
|
|
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
#endif // GFLAGS
|