|
|
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
//
|
|
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
|
|
|
|
#include "db/db_test_util.h"
|
|
|
|
|
|
|
|
#include "db/forward_iterator.h"
|
|
|
|
#include "rocksdb/env_encryption.h"
|
|
|
|
#include "rocksdb/utilities/object_registry.h"
|
|
|
|
#include "util/random.h"
|
|
|
|
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
|
|
|
|
namespace {
|
|
|
|
int64_t MaybeCurrentTime(Env* env) {
|
|
|
|
int64_t time = 1337346000; // arbitrary fallback default
|
|
|
|
(void)env->GetCurrentTime(&time);
|
|
|
|
return time;
|
|
|
|
}
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
// Special Env used to delay background operations
|
|
|
|
|
|
|
|
SpecialEnv::SpecialEnv(Env* base)
|
|
|
|
: EnvWrapper(base),
|
|
|
|
maybe_starting_time_(MaybeCurrentTime(base)),
|
|
|
|
rnd_(301),
|
|
|
|
sleep_counter_(this),
|
|
|
|
addon_time_(0),
|
|
|
|
time_elapse_only_sleep_(false),
|
|
|
|
no_slowdown_(false) {
|
|
|
|
delay_sstable_sync_.store(false, std::memory_order_release);
|
|
|
|
drop_writes_.store(false, std::memory_order_release);
|
|
|
|
no_space_.store(false, std::memory_order_release);
|
|
|
|
non_writable_.store(false, std::memory_order_release);
|
|
|
|
count_random_reads_ = false;
|
|
|
|
count_sequential_reads_ = false;
|
|
|
|
manifest_sync_error_.store(false, std::memory_order_release);
|
|
|
|
manifest_write_error_.store(false, std::memory_order_release);
|
|
|
|
log_write_error_.store(false, std::memory_order_release);
|
|
|
|
random_file_open_counter_.store(0, std::memory_order_relaxed);
|
|
|
|
delete_count_.store(0, std::memory_order_relaxed);
|
|
|
|
num_open_wal_file_.store(0);
|
|
|
|
log_write_slowdown_ = 0;
|
|
|
|
bytes_written_ = 0;
|
|
|
|
sync_counter_ = 0;
|
|
|
|
non_writeable_rate_ = 0;
|
|
|
|
new_writable_count_ = 0;
|
|
|
|
non_writable_count_ = 0;
|
|
|
|
table_write_callback_ = nullptr;
|
|
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
ROT13BlockCipher rot13Cipher_(16);
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
|
|
|
|
DBTestBase::DBTestBase(const std::string path)
|
|
|
|
: mem_env_(nullptr), encrypted_env_(nullptr), option_config_(kDefault) {
|
|
|
|
Env* base_env = Env::Default();
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
const char* test_env_uri = getenv("TEST_ENV_URI");
|
|
|
|
if (test_env_uri) {
|
|
|
|
Env* test_env = nullptr;
|
|
|
|
Status s = Env::LoadEnv(test_env_uri, &test_env, &env_guard_);
|
|
|
|
base_env = test_env;
|
|
|
|
EXPECT_OK(s);
|
|
|
|
EXPECT_NE(Env::Default(), base_env);
|
|
|
|
}
|
|
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
EXPECT_NE(nullptr, base_env);
|
|
|
|
if (getenv("MEM_ENV")) {
|
|
|
|
mem_env_ = new MockEnv(base_env);
|
|
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
if (getenv("ENCRYPTED_ENV")) {
|
|
|
|
encrypted_env_ = NewEncryptedEnv(mem_env_ ? mem_env_ : base_env,
|
|
|
|
new CTREncryptionProvider(rot13Cipher_));
|
|
|
|
}
|
|
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
env_ = new SpecialEnv(encrypted_env_ ? encrypted_env_
|
|
|
|
: (mem_env_ ? mem_env_ : base_env));
|
|
|
|
env_->SetBackgroundThreads(1, Env::LOW);
|
|
|
|
env_->SetBackgroundThreads(1, Env::HIGH);
|
|
|
|
dbname_ = test::PerThreadDBPath(env_, path);
|
|
|
|
alternative_wal_dir_ = dbname_ + "/wal";
|
|
|
|
alternative_db_log_dir_ = dbname_ + "/db_log_dir";
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
options.env = env_;
|
|
|
|
auto delete_options = options;
|
|
|
|
delete_options.wal_dir = alternative_wal_dir_;
|
|
|
|
EXPECT_OK(DestroyDB(dbname_, delete_options));
|
|
|
|
// Destroy it for not alternative WAL dir is used.
|
|
|
|
EXPECT_OK(DestroyDB(dbname_, options));
|
|
|
|
db_ = nullptr;
|
|
|
|
Reopen(options);
|
|
|
|
Random::GetTLSInstance()->Reset(0xdeadbeef);
|
|
|
|
}
|
|
|
|
|
|
|
|
DBTestBase::~DBTestBase() {
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({});
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
|
|
Close();
|
|
|
|
Options options;
|
|
|
|
options.db_paths.emplace_back(dbname_, 0);
|
|
|
|
options.db_paths.emplace_back(dbname_ + "_2", 0);
|
|
|
|
options.db_paths.emplace_back(dbname_ + "_3", 0);
|
|
|
|
options.db_paths.emplace_back(dbname_ + "_4", 0);
|
|
|
|
options.env = env_;
|
|
|
|
|
|
|
|
if (getenv("KEEP_DB")) {
|
|
|
|
printf("DB is still at %s\n", dbname_.c_str());
|
|
|
|
} else {
|
|
|
|
EXPECT_OK(DestroyDB(dbname_, options));
|
|
|
|
}
|
|
|
|
delete env_;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool DBTestBase::ShouldSkipOptions(int option_config, int skip_mask) {
|
|
|
|
#ifdef ROCKSDB_LITE
|
|
|
|
// These options are not supported in ROCKSDB_LITE
|
|
|
|
if (option_config == kHashSkipList ||
|
|
|
|
option_config == kPlainTableFirstBytePrefix ||
|
|
|
|
option_config == kPlainTableCappedPrefix ||
|
|
|
|
option_config == kPlainTableCappedPrefixNonMmap ||
|
|
|
|
option_config == kPlainTableAllBytesPrefix ||
|
|
|
|
option_config == kVectorRep || option_config == kHashLinkList ||
|
|
|
|
option_config == kUniversalCompaction ||
|
|
|
|
option_config == kUniversalCompactionMultiLevel ||
|
|
|
|
option_config == kUniversalSubcompactions ||
|
|
|
|
option_config == kFIFOCompaction ||
|
|
|
|
option_config == kConcurrentSkipList) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
if ((skip_mask & kSkipUniversalCompaction) &&
|
|
|
|
(option_config == kUniversalCompaction ||
|
|
|
|
option_config == kUniversalCompactionMultiLevel ||
|
|
|
|
option_config == kUniversalSubcompactions)) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
if ((skip_mask & kSkipMergePut) && option_config == kMergePut) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
if ((skip_mask & kSkipNoSeekToLast) &&
|
|
|
|
(option_config == kHashLinkList || option_config == kHashSkipList)) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
if ((skip_mask & kSkipPlainTable) &&
|
|
|
|
(option_config == kPlainTableAllBytesPrefix ||
|
|
|
|
option_config == kPlainTableFirstBytePrefix ||
|
|
|
|
option_config == kPlainTableCappedPrefix ||
|
|
|
|
option_config == kPlainTableCappedPrefixNonMmap)) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
if ((skip_mask & kSkipHashIndex) &&
|
|
|
|
(option_config == kBlockBasedTableWithPrefixHashIndex ||
|
|
|
|
option_config == kBlockBasedTableWithWholeKeyHashIndex)) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
if ((skip_mask & kSkipFIFOCompaction) && option_config == kFIFOCompaction) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
if ((skip_mask & kSkipMmapReads) && option_config == kWalDirAndMmapReads) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Switch to a fresh database with the next option configuration to
|
|
|
|
// test. Return false if there are no more configurations to test.
|
|
|
|
bool DBTestBase::ChangeOptions(int skip_mask) {
|
|
|
|
for (option_config_++; option_config_ < kEnd; option_config_++) {
|
|
|
|
if (ShouldSkipOptions(option_config_, skip_mask)) {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (option_config_ >= kEnd) {
|
|
|
|
Destroy(last_options_);
|
|
|
|
return false;
|
|
|
|
} else {
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
options.create_if_missing = true;
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Switch between different compaction styles.
|
|
|
|
bool DBTestBase::ChangeCompactOptions() {
|
|
|
|
if (option_config_ == kDefault) {
|
|
|
|
option_config_ = kUniversalCompaction;
|
|
|
|
Destroy(last_options_);
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
options.create_if_missing = true;
|
|
|
|
TryReopen(options);
|
|
|
|
return true;
|
|
|
|
} else if (option_config_ == kUniversalCompaction) {
|
|
|
|
option_config_ = kUniversalCompactionMultiLevel;
|
|
|
|
Destroy(last_options_);
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
options.create_if_missing = true;
|
|
|
|
TryReopen(options);
|
|
|
|
return true;
|
|
|
|
} else if (option_config_ == kUniversalCompactionMultiLevel) {
|
|
|
|
option_config_ = kLevelSubcompactions;
|
|
|
|
Destroy(last_options_);
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
assert(options.max_subcompactions > 1);
|
|
|
|
TryReopen(options);
|
|
|
|
return true;
|
|
|
|
} else if (option_config_ == kLevelSubcompactions) {
|
|
|
|
option_config_ = kUniversalSubcompactions;
|
|
|
|
Destroy(last_options_);
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
assert(options.max_subcompactions > 1);
|
|
|
|
TryReopen(options);
|
|
|
|
return true;
|
|
|
|
} else {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Switch between different WAL settings
|
|
|
|
bool DBTestBase::ChangeWalOptions() {
|
|
|
|
if (option_config_ == kDefault) {
|
|
|
|
option_config_ = kDBLogDir;
|
|
|
|
Destroy(last_options_);
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
Destroy(options);
|
|
|
|
options.create_if_missing = true;
|
|
|
|
TryReopen(options);
|
|
|
|
return true;
|
|
|
|
} else if (option_config_ == kDBLogDir) {
|
|
|
|
option_config_ = kWalDirAndMmapReads;
|
|
|
|
Destroy(last_options_);
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
Destroy(options);
|
|
|
|
options.create_if_missing = true;
|
|
|
|
TryReopen(options);
|
|
|
|
return true;
|
|
|
|
} else if (option_config_ == kWalDirAndMmapReads) {
|
|
|
|
option_config_ = kRecycleLogFiles;
|
|
|
|
Destroy(last_options_);
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
Destroy(options);
|
|
|
|
TryReopen(options);
|
|
|
|
return true;
|
|
|
|
} else {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Switch between different filter policy
|
|
|
|
// Jump from kDefault to kFilter to kFullFilter
|
|
|
|
bool DBTestBase::ChangeFilterOptions() {
|
|
|
|
if (option_config_ == kDefault) {
|
|
|
|
option_config_ = kFilter;
|
|
|
|
} else if (option_config_ == kFilter) {
|
|
|
|
option_config_ = kFullFilterWithNewTableReaderForCompactions;
|
|
|
|
} else if (option_config_ == kFullFilterWithNewTableReaderForCompactions) {
|
|
|
|
option_config_ = kPartitionedFilterWithNewTableReaderForCompactions;
|
|
|
|
} else {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
Destroy(last_options_);
|
|
|
|
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
options.create_if_missing = true;
|
|
|
|
TryReopen(options);
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Switch between different DB options for file ingestion tests.
|
|
|
|
bool DBTestBase::ChangeOptionsForFileIngestionTest() {
|
|
|
|
if (option_config_ == kDefault) {
|
|
|
|
option_config_ = kUniversalCompaction;
|
|
|
|
Destroy(last_options_);
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
options.create_if_missing = true;
|
|
|
|
TryReopen(options);
|
|
|
|
return true;
|
|
|
|
} else if (option_config_ == kUniversalCompaction) {
|
|
|
|
option_config_ = kUniversalCompactionMultiLevel;
|
|
|
|
Destroy(last_options_);
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
options.create_if_missing = true;
|
|
|
|
TryReopen(options);
|
|
|
|
return true;
|
|
|
|
} else if (option_config_ == kUniversalCompactionMultiLevel) {
|
|
|
|
option_config_ = kLevelSubcompactions;
|
|
|
|
Destroy(last_options_);
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
assert(options.max_subcompactions > 1);
|
|
|
|
TryReopen(options);
|
|
|
|
return true;
|
|
|
|
} else if (option_config_ == kLevelSubcompactions) {
|
|
|
|
option_config_ = kUniversalSubcompactions;
|
|
|
|
Destroy(last_options_);
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
assert(options.max_subcompactions > 1);
|
|
|
|
TryReopen(options);
|
|
|
|
return true;
|
|
|
|
} else if (option_config_ == kUniversalSubcompactions) {
|
|
|
|
option_config_ = kDirectIO;
|
|
|
|
Destroy(last_options_);
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
TryReopen(options);
|
|
|
|
return true;
|
|
|
|
} else {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Return the current option configuration.
|
|
|
|
Options DBTestBase::CurrentOptions(
|
|
|
|
const anon::OptionsOverride& options_override) const {
|
|
|
|
return GetOptions(option_config_, GetDefaultOptions(), options_override);
|
|
|
|
}
|
|
|
|
|
|
|
|
Options DBTestBase::CurrentOptions(
|
|
|
|
const Options& default_options,
|
|
|
|
const anon::OptionsOverride& options_override) const {
|
|
|
|
return GetOptions(option_config_, default_options, options_override);
|
|
|
|
}
|
|
|
|
|
|
|
|
Options DBTestBase::GetDefaultOptions() {
|
|
|
|
Options options;
|
|
|
|
options.write_buffer_size = 4090 * 4096;
|
|
|
|
options.target_file_size_base = 2 * 1024 * 1024;
|
|
|
|
options.max_bytes_for_level_base = 10 * 1024 * 1024;
|
|
|
|
options.max_open_files = 5000;
|
|
|
|
options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
|
|
|
|
options.compaction_pri = CompactionPri::kByCompensatedSize;
|
|
|
|
return options;
|
|
|
|
}
|
|
|
|
|
|
|
|
Options DBTestBase::GetOptions(
|
|
|
|
int option_config, const Options& default_options,
|
|
|
|
const anon::OptionsOverride& options_override) const {
|
|
|
|
// this redundant copy is to minimize code change w/o having lint error.
|
|
|
|
Options options = default_options;
|
|
|
|
BlockBasedTableOptions table_options;
|
|
|
|
bool set_block_based_table_factory = true;
|
|
|
|
#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \
|
|
|
|
!defined(OS_AIX)
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
|
|
|
|
"NewRandomAccessFile:O_DIRECT");
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
|
|
|
|
"NewWritableFile:O_DIRECT");
|
|
|
|
#endif
|
|
|
|
|
|
|
|
bool can_allow_mmap = IsMemoryMappedAccessSupported();
|
|
|
|
switch (option_config) {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
case kHashSkipList:
|
|
|
|
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
|
|
|
|
options.memtable_factory.reset(NewHashSkipListRepFactory(16));
|
|
|
|
options.allow_concurrent_memtable_write = false;
|
|
|
|
options.unordered_write = false;
|
|
|
|
break;
|
|
|
|
case kPlainTableFirstBytePrefix:
|
|
|
|
options.table_factory.reset(new PlainTableFactory());
|
|
|
|
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
|
|
|
|
options.allow_mmap_reads = can_allow_mmap;
|
|
|
|
options.max_sequential_skip_in_iterations = 999999;
|
|
|
|
set_block_based_table_factory = false;
|
|
|
|
break;
|
|
|
|
case kPlainTableCappedPrefix:
|
|
|
|
options.table_factory.reset(new PlainTableFactory());
|
|
|
|
options.prefix_extractor.reset(NewCappedPrefixTransform(8));
|
|
|
|
options.allow_mmap_reads = can_allow_mmap;
|
|
|
|
options.max_sequential_skip_in_iterations = 999999;
|
|
|
|
set_block_based_table_factory = false;
|
|
|
|
break;
|
|
|
|
case kPlainTableCappedPrefixNonMmap:
|
|
|
|
options.table_factory.reset(new PlainTableFactory());
|
|
|
|
options.prefix_extractor.reset(NewCappedPrefixTransform(8));
|
|
|
|
options.allow_mmap_reads = false;
|
|
|
|
options.max_sequential_skip_in_iterations = 999999;
|
|
|
|
set_block_based_table_factory = false;
|
|
|
|
break;
|
|
|
|
case kPlainTableAllBytesPrefix:
|
|
|
|
options.table_factory.reset(new PlainTableFactory());
|
|
|
|
options.prefix_extractor.reset(NewNoopTransform());
|
|
|
|
options.allow_mmap_reads = can_allow_mmap;
|
|
|
|
options.max_sequential_skip_in_iterations = 999999;
|
|
|
|
set_block_based_table_factory = false;
|
|
|
|
break;
|
|
|
|
case kVectorRep:
|
|
|
|
options.memtable_factory.reset(new VectorRepFactory(100));
|
|
|
|
options.allow_concurrent_memtable_write = false;
|
|
|
|
options.unordered_write = false;
|
|
|
|
break;
|
|
|
|
case kHashLinkList:
|
|
|
|
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
|
|
|
|
options.memtable_factory.reset(
|
|
|
|
NewHashLinkListRepFactory(4, 0, 3, true, 4));
|
|
|
|
options.allow_concurrent_memtable_write = false;
|
|
|
|
options.unordered_write = false;
|
|
|
|
break;
|
|
|
|
case kDirectIO: {
|
|
|
|
options.use_direct_reads = true;
|
|
|
|
options.use_direct_io_for_flush_and_compaction = true;
|
|
|
|
options.compaction_readahead_size = 2 * 1024 * 1024;
|
|
|
|
SetupSyncPointsToMockDirectIO();
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
case kMergePut:
|
|
|
|
options.merge_operator = MergeOperators::CreatePutOperator();
|
|
|
|
break;
|
|
|
|
case kFilter:
|
|
|
|
table_options.filter_policy.reset(NewBloomFilterPolicy(10, true));
|
|
|
|
break;
|
|
|
|
case kFullFilterWithNewTableReaderForCompactions:
|
|
|
|
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
|
|
|
options.new_table_reader_for_compaction_inputs = true;
|
|
|
|
options.compaction_readahead_size = 10 * 1024 * 1024;
|
|
|
|
break;
|
|
|
|
case kPartitionedFilterWithNewTableReaderForCompactions:
|
|
|
|
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
|
|
|
table_options.partition_filters = true;
|
|
|
|
table_options.index_type =
|
|
|
|
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
|
|
|
|
options.new_table_reader_for_compaction_inputs = true;
|
|
|
|
options.compaction_readahead_size = 10 * 1024 * 1024;
|
|
|
|
break;
|
|
|
|
case kUncompressed:
|
|
|
|
options.compression = kNoCompression;
|
|
|
|
break;
|
|
|
|
case kNumLevel_3:
|
|
|
|
options.num_levels = 3;
|
|
|
|
break;
|
|
|
|
case kDBLogDir:
|
|
|
|
options.db_log_dir = alternative_db_log_dir_;
|
|
|
|
break;
|
|
|
|
case kWalDirAndMmapReads:
|
|
|
|
options.wal_dir = alternative_wal_dir_;
|
|
|
|
// mmap reads should be orthogonal to WalDir setting, so we piggyback to
|
|
|
|
// this option config to test mmap reads as well
|
|
|
|
options.allow_mmap_reads = can_allow_mmap;
|
|
|
|
break;
|
|
|
|
case kManifestFileSize:
|
|
|
|
options.max_manifest_file_size = 50; // 50 bytes
|
|
|
|
break;
|
|
|
|
case kPerfOptions:
|
|
|
|
options.soft_rate_limit = 2.0;
|
|
|
|
options.delayed_write_rate = 8 * 1024 * 1024;
|
|
|
|
options.report_bg_io_stats = true;
|
|
|
|
// TODO(3.13) -- test more options
|
|
|
|
break;
|
|
|
|
case kUniversalCompaction:
|
|
|
|
options.compaction_style = kCompactionStyleUniversal;
|
|
|
|
options.num_levels = 1;
|
|
|
|
break;
|
|
|
|
case kUniversalCompactionMultiLevel:
|
|
|
|
options.compaction_style = kCompactionStyleUniversal;
|
|
|
|
options.num_levels = 8;
|
|
|
|
break;
|
|
|
|
case kCompressedBlockCache:
|
|
|
|
options.allow_mmap_writes = can_allow_mmap;
|
|
|
|
table_options.block_cache_compressed = NewLRUCache(8 * 1024 * 1024);
|
|
|
|
break;
|
|
|
|
case kInfiniteMaxOpenFiles:
|
|
|
|
options.max_open_files = -1;
|
|
|
|
break;
|
|
|
|
case kxxHashChecksum: {
|
|
|
|
table_options.checksum = kxxHash;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kxxHash64Checksum: {
|
|
|
|
table_options.checksum = kxxHash64;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kFIFOCompaction: {
|
|
|
|
options.compaction_style = kCompactionStyleFIFO;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kBlockBasedTableWithPrefixHashIndex: {
|
|
|
|
table_options.index_type = BlockBasedTableOptions::kHashSearch;
|
|
|
|
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kBlockBasedTableWithWholeKeyHashIndex: {
|
|
|
|
table_options.index_type = BlockBasedTableOptions::kHashSearch;
|
|
|
|
options.prefix_extractor.reset(NewNoopTransform());
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kBlockBasedTableWithPartitionedIndex: {
|
|
|
|
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
|
|
|
|
options.prefix_extractor.reset(NewNoopTransform());
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kBlockBasedTableWithPartitionedIndexFormat4: {
|
|
|
|
table_options.format_version = 4;
|
|
|
|
// Format 4 changes the binary index format. Since partitioned index is a
|
|
|
|
// super-set of simple indexes, we are also using kTwoLevelIndexSearch to
|
|
|
|
// test this format.
|
|
|
|
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
|
|
|
|
// The top-level index in partition filters are also affected by format 4.
|
|
|
|
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
|
|
|
table_options.partition_filters = true;
|
|
|
|
table_options.index_block_restart_interval = 8;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kBlockBasedTableWithIndexRestartInterval: {
|
|
|
|
table_options.index_block_restart_interval = 8;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kOptimizeFiltersForHits: {
|
|
|
|
options.optimize_filters_for_hits = true;
|
|
|
|
set_block_based_table_factory = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kRowCache: {
|
|
|
|
options.row_cache = NewLRUCache(1024 * 1024);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kRecycleLogFiles: {
|
|
|
|
options.recycle_log_file_num = 2;
|
|
|
|
break;
|
|
|
|
}
|
Parallelize L0-L1 Compaction: Restructure Compaction Job
Summary:
As of now compactions involving files from Level 0 and Level 1 are single
threaded because the files in L0, although sorted, are not range partitioned like
the other levels. This means that during L0-L1 compaction each file from L1
needs to be merged with potentially all the files from L0.
This attempt to parallelize the L0-L1 compaction assigns a thread and a
corresponding iterator to each L1 file that then considers only the key range
found in that L1 file and only the L0 files that have those keys (and only the
specific portion of those L0 files in which those keys are found). In this way
the overlap is minimized and potentially eliminated between different iterators
focusing on the same files.
The first step is to restructure the compaction logic to break L0-L1 compactions
into multiple, smaller, sequential compactions. Eventually each of these smaller
jobs will be run simultaneously. Areas to pay extra attention to are
# Correct aggregation of compaction job statistics across multiple threads
# Proper opening/closing of output files (make sure each thread's is unique)
# Keys that span multiple L1 files
# Skewed distributions of keys within L0 files
Test Plan: Make and run db_test (newer version has separate compaction tests) and compaction_job_stats_test
Reviewers: igor, noetzli, anthony, sdong, yhchiang
Reviewed By: yhchiang
Subscribers: MarkCallaghan, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D42699
9 years ago
|
|
|
case kLevelSubcompactions: {
|
|
|
|
options.max_subcompactions = 4;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kUniversalSubcompactions: {
|
|
|
|
options.compaction_style = kCompactionStyleUniversal;
|
|
|
|
options.num_levels = 8;
|
|
|
|
options.max_subcompactions = 4;
|
Parallelize L0-L1 Compaction: Restructure Compaction Job
Summary:
As of now compactions involving files from Level 0 and Level 1 are single
threaded because the files in L0, although sorted, are not range partitioned like
the other levels. This means that during L0-L1 compaction each file from L1
needs to be merged with potentially all the files from L0.
This attempt to parallelize the L0-L1 compaction assigns a thread and a
corresponding iterator to each L1 file that then considers only the key range
found in that L1 file and only the L0 files that have those keys (and only the
specific portion of those L0 files in which those keys are found). In this way
the overlap is minimized and potentially eliminated between different iterators
focusing on the same files.
The first step is to restructure the compaction logic to break L0-L1 compactions
into multiple, smaller, sequential compactions. Eventually each of these smaller
jobs will be run simultaneously. Areas to pay extra attention to are
# Correct aggregation of compaction job statistics across multiple threads
# Proper opening/closing of output files (make sure each thread's is unique)
# Keys that span multiple L1 files
# Skewed distributions of keys within L0 files
Test Plan: Make and run db_test (newer version has separate compaction tests) and compaction_job_stats_test
Reviewers: igor, noetzli, anthony, sdong, yhchiang
Reviewed By: yhchiang
Subscribers: MarkCallaghan, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D42699
9 years ago
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kConcurrentSkipList: {
|
|
|
|
options.allow_concurrent_memtable_write = true;
|
|
|
|
options.enable_write_thread_adaptive_yield = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kPipelinedWrite: {
|
|
|
|
options.enable_pipelined_write = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kConcurrentWALWrites: {
|
|
|
|
// This options optimize 2PC commit path
|
|
|
|
options.two_write_queues = true;
|
|
|
|
options.manual_wal_flush = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case kUnorderedWrite: {
|
|
|
|
options.allow_concurrent_memtable_write = false;
|
|
|
|
options.unordered_write = false;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
default:
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (options_override.filter_policy) {
|
|
|
|
table_options.filter_policy = options_override.filter_policy;
|
|
|
|
table_options.partition_filters = options_override.partition_filters;
|
|
|
|
table_options.metadata_block_size = options_override.metadata_block_size;
|
|
|
|
}
|
|
|
|
if (set_block_based_table_factory) {
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
|
|
}
|
|
|
|
options.env = env_;
|
|
|
|
options.create_if_missing = true;
|
|
|
|
options.fail_if_options_file_error = true;
|
|
|
|
return options;
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::CreateColumnFamilies(const std::vector<std::string>& cfs,
|
|
|
|
const Options& options) {
|
|
|
|
ColumnFamilyOptions cf_opts(options);
|
|
|
|
size_t cfi = handles_.size();
|
|
|
|
handles_.resize(cfi + cfs.size());
|
|
|
|
for (auto cf : cfs) {
|
|
|
|
Status s = db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]);
|
|
|
|
ASSERT_OK(s);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::CreateAndReopenWithCF(const std::vector<std::string>& cfs,
|
|
|
|
const Options& options) {
|
|
|
|
CreateColumnFamilies(cfs, options);
|
|
|
|
std::vector<std::string> cfs_plus_default = cfs;
|
|
|
|
cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
|
|
|
|
ReopenWithColumnFamilies(cfs_plus_default, options);
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
|
|
|
|
const std::vector<Options>& options) {
|
|
|
|
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
|
|
|
|
const Options& options) {
|
|
|
|
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
|
|
|
|
}
|
|
|
|
|
|
|
|
Status DBTestBase::TryReopenWithColumnFamilies(
|
|
|
|
const std::vector<std::string>& cfs, const std::vector<Options>& options) {
|
|
|
|
Close();
|
|
|
|
EXPECT_EQ(cfs.size(), options.size());
|
|
|
|
std::vector<ColumnFamilyDescriptor> column_families;
|
|
|
|
for (size_t i = 0; i < cfs.size(); ++i) {
|
|
|
|
column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
|
|
|
|
}
|
|
|
|
DBOptions db_opts = DBOptions(options[0]);
|
|
|
|
last_options_ = options[0];
|
|
|
|
return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status DBTestBase::TryReopenWithColumnFamilies(
|
|
|
|
const std::vector<std::string>& cfs, const Options& options) {
|
|
|
|
Close();
|
|
|
|
std::vector<Options> v_opts(cfs.size(), options);
|
|
|
|
return TryReopenWithColumnFamilies(cfs, v_opts);
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::Reopen(const Options& options) {
|
|
|
|
ASSERT_OK(TryReopen(options));
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::Close() {
|
|
|
|
for (auto h : handles_) {
|
|
|
|
db_->DestroyColumnFamilyHandle(h);
|
|
|
|
}
|
|
|
|
handles_.clear();
|
|
|
|
delete db_;
|
|
|
|
db_ = nullptr;
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::DestroyAndReopen(const Options& options) {
|
|
|
|
// Destroy using last options
|
|
|
|
Destroy(last_options_);
|
|
|
|
ASSERT_OK(TryReopen(options));
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::Destroy(const Options& options, bool delete_cf_paths) {
|
|
|
|
std::vector<ColumnFamilyDescriptor> column_families;
|
|
|
|
if (delete_cf_paths) {
|
|
|
|
for (size_t i = 0; i < handles_.size(); ++i) {
|
|
|
|
ColumnFamilyDescriptor cfdescriptor;
|
|
|
|
handles_[i]->GetDescriptor(&cfdescriptor);
|
|
|
|
column_families.push_back(cfdescriptor);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Close();
|
|
|
|
ASSERT_OK(DestroyDB(dbname_, options, column_families));
|
|
|
|
}
|
|
|
|
|
|
|
|
Status DBTestBase::ReadOnlyReopen(const Options& options) {
|
|
|
|
return DB::OpenForReadOnly(options, dbname_, &db_);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status DBTestBase::TryReopen(const Options& options) {
|
|
|
|
Close();
|
|
|
|
last_options_.table_factory.reset();
|
|
|
|
// Note: operator= is an unsafe approach here since it destructs
|
|
|
|
// std::shared_ptr in the same order of their creation, in contrast to
|
|
|
|
// destructors which destructs them in the opposite order of creation. One
|
|
|
|
// particular problme is that the cache destructor might invoke callback
|
|
|
|
// functions that use Option members such as statistics. To work around this
|
|
|
|
// problem, we manually call destructor of table_facotry which eventually
|
|
|
|
// clears the block cache.
|
|
|
|
last_options_ = options;
|
|
|
|
return DB::Open(options, dbname_, &db_);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool DBTestBase::IsDirectIOSupported() {
|
|
|
|
return test::IsDirectIOSupported(env_, dbname_);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool DBTestBase::IsMemoryMappedAccessSupported() const {
|
|
|
|
return (!encrypted_env_);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status DBTestBase::Flush(int cf) {
|
|
|
|
if (cf == 0) {
|
|
|
|
return db_->Flush(FlushOptions());
|
|
|
|
} else {
|
|
|
|
return db_->Flush(FlushOptions(), handles_[cf]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Status DBTestBase::Flush(const std::vector<int>& cf_ids) {
|
|
|
|
std::vector<ColumnFamilyHandle*> cfhs;
|
|
|
|
std::for_each(cf_ids.begin(), cf_ids.end(),
|
|
|
|
[&cfhs, this](int id) { cfhs.emplace_back(handles_[id]); });
|
|
|
|
return db_->Flush(FlushOptions(), cfhs);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status DBTestBase::Put(const Slice& k, const Slice& v, WriteOptions wo) {
|
|
|
|
if (kMergePut == option_config_) {
|
|
|
|
return db_->Merge(wo, k, v);
|
|
|
|
} else {
|
|
|
|
return db_->Put(wo, k, v);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Status DBTestBase::Put(int cf, const Slice& k, const Slice& v,
|
|
|
|
WriteOptions wo) {
|
|
|
|
if (kMergePut == option_config_) {
|
|
|
|
return db_->Merge(wo, handles_[cf], k, v);
|
|
|
|
} else {
|
|
|
|
return db_->Put(wo, handles_[cf], k, v);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Status DBTestBase::Merge(const Slice& k, const Slice& v, WriteOptions wo) {
|
|
|
|
return db_->Merge(wo, k, v);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status DBTestBase::Merge(int cf, const Slice& k, const Slice& v,
|
|
|
|
WriteOptions wo) {
|
|
|
|
return db_->Merge(wo, handles_[cf], k, v);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status DBTestBase::Delete(const std::string& k) {
|
|
|
|
return db_->Delete(WriteOptions(), k);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status DBTestBase::Delete(int cf, const std::string& k) {
|
|
|
|
return db_->Delete(WriteOptions(), handles_[cf], k);
|
|
|
|
}
|
|
|
|
|
Support for SingleDelete()
Summary:
This patch fixes #7460559. It introduces SingleDelete as a new database
operation. This operation can be used to delete keys that were never
overwritten (no put following another put of the same key). If an overwritten
key is single deleted the behavior is undefined. Single deletion of a
non-existent key has no effect but multiple consecutive single deletions are
not allowed (see limitations).
In contrast to the conventional Delete() operation, the deletion entry is
removed along with the value when the two are lined up in a compaction. Note:
The semantics are similar to @igor's prototype that allowed to have this
behavior on the granularity of a column family (
https://reviews.facebook.net/D42093 ). This new patch, however, is more
aggressive when it comes to removing tombstones: It removes the SingleDelete
together with the value whenever there is no snapshot between them while the
older patch only did this when the sequence number of the deletion was older
than the earliest snapshot.
Most of the complex additions are in the Compaction Iterator, all other changes
should be relatively straightforward. The patch also includes basic support for
single deletions in db_stress and db_bench.
Limitations:
- Not compatible with cuckoo hash tables
- Single deletions cannot be used in combination with merges and normal
deletions on the same key (other keys are not affected by this)
- Consecutive single deletions are currently not allowed (and older version of
this patch supported this so it could be resurrected if needed)
Test Plan: make all check
Reviewers: yhchiang, sdong, rven, anthony, yoshinorim, igor
Reviewed By: igor
Subscribers: maykov, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D43179
9 years ago
|
|
|
Status DBTestBase::SingleDelete(const std::string& k) {
|
|
|
|
return db_->SingleDelete(WriteOptions(), k);
|
|
|
|
}
|
|
|
|
|
|
|
|
Status DBTestBase::SingleDelete(int cf, const std::string& k) {
|
|
|
|
return db_->SingleDelete(WriteOptions(), handles_[cf], k);
|
|
|
|
}
|
|
|
|
|
Added support for differential snapshots
Summary:
The motivation for this PR is to add to RocksDB support for differential (incremental) snapshots, as snapshot of the DB changes between two points in time (one can think of it as diff between to sequence numbers, or the diff D which can be thought of as an SST file or just set of KVs that can be applied to sequence number S1 to get the database to the state at sequence number S2).
This feature would be useful for various distributed storages layers built on top of RocksDB, as it should help reduce resources (time and network bandwidth) needed to recover and rebuilt DB instances as replicas in the context of distributed storages.
From the API standpoint that would like client app requesting iterator between (start seqnum) and current DB state, and reading the "diff".
This is a very draft PR for initial review in the discussion on the approach, i'm going to rework some parts and keep updating the PR.
For now, what's done here according to initial discussions:
Preserving deletes:
- We want to be able to optionally preserve recent deletes for some defined period of time, so that if a delete came in recently and might need to be included in the next incremental snapshot it would't get dropped by a compaction. This is done by adding new param to Options (preserve deletes flag) and new variable to DB Impl where we keep track of the sequence number after which we don't want to drop tombstones, even if they are otherwise eligible for deletion.
- I also added a new API call for clients to be able to advance this cutoff seqnum after which we drop deletes; i assume it's more flexible to let clients control this, since otherwise we'd need to keep some kind of timestamp < -- > seqnum mapping inside the DB, which sounds messy and painful to support. Clients could make use of it by periodically calling GetLatestSequenceNumber(), noting the timestamp, doing some calculation and figuring out by how much we need to advance the cutoff seqnum.
- Compaction codepath in compaction_iterator.cc has been modified to avoid dropping tombstones with seqnum > cutoff seqnum.
Iterator changes:
- couple params added to ReadOptions, to optionally allow client to request internal keys instead of user keys (so that client can get the latest value of a key, be it delete marker or a put), as well as min timestamp and min seqnum.
TableCache changes:
- I modified table_cache code to be able to quickly exclude SST files from iterators heep if creation_time on the file is less then iter_start_ts as passed in ReadOptions. That would help a lot in some DB settings (like reading very recent data only or using FIFO compactions), but not so much for universal compaction with more or less long iterator time span.
What's left:
- Still looking at how to best plug that inside DBIter codepath. So far it seems that FindNextUserKeyInternal only parses values as UserKeys, and iter->key() call generally returns user key. Can we add new API to DBIter as internal_key(), and modify this internal method to optionally set saved_key_ to point to the full internal key? I don't need to store actual seqnum there, but I do need to store type.
Closes https://github.com/facebook/rocksdb/pull/2999
Differential Revision: D6175602
Pulled By: mikhail-antonov
fbshipit-source-id: c779a6696ee2d574d86c69cec866a3ae095aa900
7 years ago
|
|
|
bool DBTestBase::SetPreserveDeletesSequenceNumber(SequenceNumber sn) {
|
|
|
|
return db_->SetPreserveDeletesSequenceNumber(sn);
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string DBTestBase::Get(const std::string& k, const Snapshot* snapshot) {
|
|
|
|
ReadOptions options;
|
|
|
|
options.verify_checksums = true;
|
|
|
|
options.snapshot = snapshot;
|
|
|
|
std::string result;
|
|
|
|
Status s = db_->Get(options, k, &result);
|
|
|
|
if (s.IsNotFound()) {
|
|
|
|
result = "NOT_FOUND";
|
|
|
|
} else if (!s.ok()) {
|
|
|
|
result = s.ToString();
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string DBTestBase::Get(int cf, const std::string& k,
|
|
|
|
const Snapshot* snapshot) {
|
|
|
|
ReadOptions options;
|
|
|
|
options.verify_checksums = true;
|
|
|
|
options.snapshot = snapshot;
|
|
|
|
std::string result;
|
|
|
|
Status s = db_->Get(options, handles_[cf], k, &result);
|
|
|
|
if (s.IsNotFound()) {
|
|
|
|
result = "NOT_FOUND";
|
|
|
|
} else if (!s.ok()) {
|
|
|
|
result = s.ToString();
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::vector<std::string> DBTestBase::MultiGet(std::vector<int> cfs,
|
|
|
|
const std::vector<std::string>& k,
|
|
|
|
const Snapshot* snapshot,
|
|
|
|
const bool batched) {
|
|
|
|
ReadOptions options;
|
|
|
|
options.verify_checksums = true;
|
|
|
|
options.snapshot = snapshot;
|
|
|
|
std::vector<ColumnFamilyHandle*> handles;
|
|
|
|
std::vector<Slice> keys;
|
|
|
|
std::vector<std::string> result;
|
|
|
|
|
|
|
|
for (unsigned int i = 0; i < cfs.size(); ++i) {
|
|
|
|
handles.push_back(handles_[cfs[i]]);
|
|
|
|
keys.push_back(k[i]);
|
|
|
|
}
|
|
|
|
std::vector<Status> s;
|
|
|
|
if (!batched) {
|
|
|
|
s = db_->MultiGet(options, handles, keys, &result);
|
|
|
|
for (unsigned int i = 0; i < s.size(); ++i) {
|
|
|
|
if (s[i].IsNotFound()) {
|
|
|
|
result[i] = "NOT_FOUND";
|
|
|
|
} else if (!s[i].ok()) {
|
|
|
|
result[i] = s[i].ToString();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
std::vector<PinnableSlice> pin_values(cfs.size());
|
|
|
|
result.resize(cfs.size());
|
|
|
|
s.resize(cfs.size());
|
|
|
|
db_->MultiGet(options, cfs.size(), handles.data(), keys.data(),
|
|
|
|
pin_values.data(), s.data());
|
|
|
|
for (unsigned int i = 0; i < s.size(); ++i) {
|
|
|
|
if (s[i].IsNotFound()) {
|
|
|
|
result[i] = "NOT_FOUND";
|
|
|
|
} else if (!s[i].ok()) {
|
|
|
|
result[i] = s[i].ToString();
|
|
|
|
} else {
|
|
|
|
result[i].assign(pin_values[i].data(), pin_values[i].size());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
Introduce a new MultiGet batching implementation (#5011)
Summary:
This PR introduces a new MultiGet() API, with the underlying implementation grouping keys based on SST file and batching lookups in a file. The reason for the new API is twofold - the definition allows callers to allocate storage for status and values on stack instead of std::vector, as well as return values as PinnableSlices in order to avoid copying, and it keeps the original MultiGet() implementation intact while we experiment with batching.
Batching is useful when there is some spatial locality to the keys being queries, as well as larger batch sizes. The main benefits are due to -
1. Fewer function calls, especially to BlockBasedTableReader::MultiGet() and FullFilterBlockReader::KeysMayMatch()
2. Bloom filter cachelines can be prefetched, hiding the cache miss latency
The next step is to optimize the binary searches in the level_storage_info, index blocks and data blocks, since we could reduce the number of key comparisons if the keys are relatively close to each other. The batching optimizations also need to be extended to other formats, such as PlainTable and filter formats. This also needs to be added to db_stress.
Benchmark results from db_bench for various batch size/locality of reference combinations are given below. Locality was simulated by offsetting the keys in a batch by a stride length. Each SST file is about 8.6MB uncompressed and key/value size is 16/100 uncompressed. To focus on the cpu benefit of batching, the runs were single threaded and bound to the same cpu to eliminate interference from other system events. The results show a 10-25% improvement in micros/op from smaller to larger batch sizes (4 - 32).
Batch Sizes
1 | 2 | 4 | 8 | 16 | 32
Random pattern (Stride length 0)
4.158 | 4.109 | 4.026 | 4.05 | 4.1 | 4.074 - Get
4.438 | 4.302 | 4.165 | 4.122 | 4.096 | 4.075 - MultiGet (no batching)
4.461 | 4.256 | 4.277 | 4.11 | 4.182 | 4.14 - MultiGet (w/ batching)
Good locality (Stride length 16)
4.048 | 3.659 | 3.248 | 2.99 | 2.84 | 2.753
4.429 | 3.728 | 3.406 | 3.053 | 2.911 | 2.781
4.452 | 3.45 | 2.833 | 2.451 | 2.233 | 2.135
Good locality (Stride length 256)
4.066 | 3.786 | 3.581 | 3.447 | 3.415 | 3.232
4.406 | 4.005 | 3.644 | 3.49 | 3.381 | 3.268
4.393 | 3.649 | 3.186 | 2.882 | 2.676 | 2.62
Medium locality (Stride length 4096)
4.012 | 3.922 | 3.768 | 3.61 | 3.582 | 3.555
4.364 | 4.057 | 3.791 | 3.65 | 3.57 | 3.465
4.479 | 3.758 | 3.316 | 3.077 | 2.959 | 2.891
dbbench command used (on a DB with 4 levels, 12 million keys)-
TEST_TMPDIR=/dev/shm numactl -C 10 ./db_bench.tmp -use_existing_db=true -benchmarks="readseq,multireadrandom" -write_buffer_size=4194304 -target_file_size_base=4194304 -max_bytes_for_level_base=16777216 -num=12000000 -reads=12000000 -duration=90 -threads=1 -compression_type=none -cache_size=4194304000 -batch_size=32 -disable_auto_compactions=true -bloom_bits=10 -cache_index_and_filter_blocks=true -pin_l0_filter_and_index_blocks_in_cache=true -multiread_batched=true -multiread_stride=4
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5011
Differential Revision: D14348703
Pulled By: anand1976
fbshipit-source-id: 774406dab3776d979c809522a67bedac6c17f84b
6 years ago
|
|
|
std::vector<std::string> DBTestBase::MultiGet(const std::vector<std::string>& k,
|
|
|
|
const Snapshot* snapshot) {
|
|
|
|
ReadOptions options;
|
|
|
|
options.verify_checksums = true;
|
|
|
|
options.snapshot = snapshot;
|
|
|
|
std::vector<Slice> keys;
|
|
|
|
std::vector<std::string> result;
|
|
|
|
std::vector<Status> statuses(k.size());
|
|
|
|
std::vector<PinnableSlice> pin_values(k.size());
|
|
|
|
|
|
|
|
for (unsigned int i = 0; i < k.size(); ++i) {
|
|
|
|
keys.push_back(k[i]);
|
|
|
|
}
|
|
|
|
db_->MultiGet(options, dbfull()->DefaultColumnFamily(), keys.size(),
|
|
|
|
keys.data(), pin_values.data(), statuses.data());
|
|
|
|
result.resize(k.size());
|
|
|
|
for (auto iter = result.begin(); iter != result.end(); ++iter) {
|
|
|
|
iter->assign(pin_values[iter - result.begin()].data(),
|
|
|
|
pin_values[iter - result.begin()].size());
|
|
|
|
}
|
|
|
|
for (unsigned int i = 0; i < statuses.size(); ++i) {
|
|
|
|
if (statuses[i].IsNotFound()) {
|
|
|
|
result[i] = "NOT_FOUND";
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
Status DBTestBase::Get(const std::string& k, PinnableSlice* v) {
|
|
|
|
ReadOptions options;
|
|
|
|
options.verify_checksums = true;
|
|
|
|
Status s = dbfull()->Get(options, dbfull()->DefaultColumnFamily(), k, v);
|
|
|
|
return s;
|
|
|
|
}
|
|
|
|
|
|
|
|
uint64_t DBTestBase::GetNumSnapshots() {
|
|
|
|
uint64_t int_num;
|
|
|
|
EXPECT_TRUE(dbfull()->GetIntProperty("rocksdb.num-snapshots", &int_num));
|
|
|
|
return int_num;
|
|
|
|
}
|
|
|
|
|
|
|
|
uint64_t DBTestBase::GetTimeOldestSnapshots() {
|
|
|
|
uint64_t int_num;
|
|
|
|
EXPECT_TRUE(
|
|
|
|
dbfull()->GetIntProperty("rocksdb.oldest-snapshot-time", &int_num));
|
|
|
|
return int_num;
|
|
|
|
}
|
|
|
|
|
|
|
|
uint64_t DBTestBase::GetSequenceOldestSnapshots() {
|
|
|
|
uint64_t int_num;
|
|
|
|
EXPECT_TRUE(
|
|
|
|
dbfull()->GetIntProperty("rocksdb.oldest-snapshot-sequence", &int_num));
|
|
|
|
return int_num;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Return a string that contains all key,value pairs in order,
|
|
|
|
// formatted like "(k1->v1)(k2->v2)".
|
|
|
|
std::string DBTestBase::Contents(int cf) {
|
|
|
|
std::vector<std::string> forward;
|
|
|
|
std::string result;
|
|
|
|
Iterator* iter = (cf == 0) ? db_->NewIterator(ReadOptions())
|
|
|
|
: db_->NewIterator(ReadOptions(), handles_[cf]);
|
|
|
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
|
|
|
std::string s = IterStatus(iter);
|
|
|
|
result.push_back('(');
|
|
|
|
result.append(s);
|
|
|
|
result.push_back(')');
|
|
|
|
forward.push_back(s);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check reverse iteration results are the reverse of forward results
|
|
|
|
unsigned int matched = 0;
|
|
|
|
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
|
|
|
|
EXPECT_LT(matched, forward.size());
|
|
|
|
EXPECT_EQ(IterStatus(iter), forward[forward.size() - matched - 1]);
|
|
|
|
matched++;
|
|
|
|
}
|
|
|
|
EXPECT_EQ(matched, forward.size());
|
|
|
|
|
|
|
|
delete iter;
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) {
|
|
|
|
Arena arena;
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
InternalKeyComparator icmp(options.comparator);
|
|
|
|
ReadRangeDelAggregator range_del_agg(&icmp,
|
|
|
|
kMaxSequenceNumber /* upper_bound */);
|
|
|
|
ScopedArenaIterator iter;
|
|
|
|
if (cf == 0) {
|
|
|
|
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg,
|
|
|
|
kMaxSequenceNumber));
|
|
|
|
} else {
|
|
|
|
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg,
|
|
|
|
kMaxSequenceNumber, handles_[cf]));
|
|
|
|
}
|
|
|
|
InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
|
|
|
|
iter->Seek(target.Encode());
|
|
|
|
std::string result;
|
|
|
|
if (!iter->status().ok()) {
|
|
|
|
result = iter->status().ToString();
|
|
|
|
} else {
|
|
|
|
result = "[ ";
|
|
|
|
bool first = true;
|
|
|
|
while (iter->Valid()) {
|
|
|
|
ParsedInternalKey ikey(Slice(), 0, kTypeValue);
|
|
|
|
if (!ParseInternalKey(iter->key(), &ikey)) {
|
|
|
|
result += "CORRUPTED";
|
|
|
|
} else {
|
|
|
|
if (!last_options_.comparator->Equal(ikey.user_key, user_key)) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
if (!first) {
|
|
|
|
result += ", ";
|
|
|
|
}
|
|
|
|
first = false;
|
|
|
|
switch (ikey.type) {
|
|
|
|
case kTypeValue:
|
|
|
|
result += iter->value().ToString();
|
|
|
|
break;
|
|
|
|
case kTypeMerge:
|
|
|
|
// keep it the same as kTypeValue for testing kMergePut
|
|
|
|
result += iter->value().ToString();
|
|
|
|
break;
|
|
|
|
case kTypeDeletion:
|
|
|
|
result += "DEL";
|
|
|
|
break;
|
Support for SingleDelete()
Summary:
This patch fixes #7460559. It introduces SingleDelete as a new database
operation. This operation can be used to delete keys that were never
overwritten (no put following another put of the same key). If an overwritten
key is single deleted the behavior is undefined. Single deletion of a
non-existent key has no effect but multiple consecutive single deletions are
not allowed (see limitations).
In contrast to the conventional Delete() operation, the deletion entry is
removed along with the value when the two are lined up in a compaction. Note:
The semantics are similar to @igor's prototype that allowed to have this
behavior on the granularity of a column family (
https://reviews.facebook.net/D42093 ). This new patch, however, is more
aggressive when it comes to removing tombstones: It removes the SingleDelete
together with the value whenever there is no snapshot between them while the
older patch only did this when the sequence number of the deletion was older
than the earliest snapshot.
Most of the complex additions are in the Compaction Iterator, all other changes
should be relatively straightforward. The patch also includes basic support for
single deletions in db_stress and db_bench.
Limitations:
- Not compatible with cuckoo hash tables
- Single deletions cannot be used in combination with merges and normal
deletions on the same key (other keys are not affected by this)
- Consecutive single deletions are currently not allowed (and older version of
this patch supported this so it could be resurrected if needed)
Test Plan: make all check
Reviewers: yhchiang, sdong, rven, anthony, yoshinorim, igor
Reviewed By: igor
Subscribers: maykov, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D43179
9 years ago
|
|
|
case kTypeSingleDeletion:
|
|
|
|
result += "SDEL";
|
|
|
|
break;
|
|
|
|
default:
|
|
|
|
assert(false);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
iter->Next();
|
|
|
|
}
|
|
|
|
if (!first) {
|
|
|
|
result += " ";
|
|
|
|
}
|
|
|
|
result += "]";
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
int DBTestBase::NumSortedRuns(int cf) {
|
|
|
|
ColumnFamilyMetaData cf_meta;
|
|
|
|
if (cf == 0) {
|
|
|
|
db_->GetColumnFamilyMetaData(&cf_meta);
|
|
|
|
} else {
|
|
|
|
db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta);
|
|
|
|
}
|
|
|
|
int num_sr = static_cast<int>(cf_meta.levels[0].files.size());
|
|
|
|
for (size_t i = 1U; i < cf_meta.levels.size(); i++) {
|
|
|
|
if (cf_meta.levels[i].files.size() > 0) {
|
|
|
|
num_sr++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return num_sr;
|
|
|
|
}
|
|
|
|
|
|
|
|
uint64_t DBTestBase::TotalSize(int cf) {
|
|
|
|
ColumnFamilyMetaData cf_meta;
|
|
|
|
if (cf == 0) {
|
|
|
|
db_->GetColumnFamilyMetaData(&cf_meta);
|
|
|
|
} else {
|
|
|
|
db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta);
|
|
|
|
}
|
|
|
|
return cf_meta.size;
|
|
|
|
}
|
|
|
|
|
|
|
|
uint64_t DBTestBase::SizeAtLevel(int level) {
|
|
|
|
std::vector<LiveFileMetaData> metadata;
|
|
|
|
db_->GetLiveFilesMetaData(&metadata);
|
|
|
|
uint64_t sum = 0;
|
|
|
|
for (const auto& m : metadata) {
|
|
|
|
if (m.level == level) {
|
|
|
|
sum += m.size;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return sum;
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t DBTestBase::TotalLiveFiles(int cf) {
|
|
|
|
ColumnFamilyMetaData cf_meta;
|
|
|
|
if (cf == 0) {
|
|
|
|
db_->GetColumnFamilyMetaData(&cf_meta);
|
|
|
|
} else {
|
|
|
|
db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta);
|
|
|
|
}
|
|
|
|
size_t num_files = 0;
|
|
|
|
for (auto& level : cf_meta.levels) {
|
|
|
|
num_files += level.files.size();
|
|
|
|
}
|
|
|
|
return num_files;
|
|
|
|
}
|
|
|
|
|
|
|
|
size_t DBTestBase::CountLiveFiles() {
|
|
|
|
std::vector<LiveFileMetaData> metadata;
|
|
|
|
db_->GetLiveFilesMetaData(&metadata);
|
|
|
|
return metadata.size();
|
|
|
|
}
|
|
|
|
|
|
|
|
int DBTestBase::NumTableFilesAtLevel(int level, int cf) {
|
|
|
|
std::string property;
|
|
|
|
if (cf == 0) {
|
|
|
|
// default cfd
|
|
|
|
EXPECT_TRUE(db_->GetProperty(
|
|
|
|
"rocksdb.num-files-at-level" + NumberToString(level), &property));
|
|
|
|
} else {
|
|
|
|
EXPECT_TRUE(db_->GetProperty(
|
|
|
|
handles_[cf], "rocksdb.num-files-at-level" + NumberToString(level),
|
|
|
|
&property));
|
|
|
|
}
|
|
|
|
return atoi(property.c_str());
|
|
|
|
}
|
|
|
|
|
|
|
|
double DBTestBase::CompressionRatioAtLevel(int level, int cf) {
|
|
|
|
std::string property;
|
|
|
|
if (cf == 0) {
|
|
|
|
// default cfd
|
|
|
|
EXPECT_TRUE(db_->GetProperty(
|
|
|
|
"rocksdb.compression-ratio-at-level" + NumberToString(level),
|
|
|
|
&property));
|
|
|
|
} else {
|
|
|
|
EXPECT_TRUE(db_->GetProperty(
|
|
|
|
handles_[cf],
|
|
|
|
"rocksdb.compression-ratio-at-level" + NumberToString(level),
|
|
|
|
&property));
|
|
|
|
}
|
|
|
|
return std::stod(property);
|
|
|
|
}
|
|
|
|
|
|
|
|
int DBTestBase::TotalTableFiles(int cf, int levels) {
|
|
|
|
if (levels == -1) {
|
|
|
|
levels = (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
|
|
|
|
}
|
|
|
|
int result = 0;
|
|
|
|
for (int level = 0; level < levels; level++) {
|
|
|
|
result += NumTableFilesAtLevel(level, cf);
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Return spread of files per level
|
|
|
|
std::string DBTestBase::FilesPerLevel(int cf) {
|
|
|
|
int num_levels =
|
|
|
|
(cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
|
|
|
|
std::string result;
|
|
|
|
size_t last_non_zero_offset = 0;
|
|
|
|
for (int level = 0; level < num_levels; level++) {
|
|
|
|
int f = NumTableFilesAtLevel(level, cf);
|
|
|
|
char buf[100];
|
|
|
|
snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
|
|
|
|
result += buf;
|
|
|
|
if (f > 0) {
|
|
|
|
last_non_zero_offset = result.size();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
result.resize(last_non_zero_offset);
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
|
|
|
|
size_t DBTestBase::CountFiles() {
|
|
|
|
std::vector<std::string> files;
|
|
|
|
env_->GetChildren(dbname_, &files);
|
|
|
|
|
|
|
|
std::vector<std::string> logfiles;
|
|
|
|
if (dbname_ != last_options_.wal_dir) {
|
|
|
|
env_->GetChildren(last_options_.wal_dir, &logfiles);
|
|
|
|
}
|
|
|
|
|
|
|
|
return files.size() + logfiles.size();
|
|
|
|
}
|
|
|
|
|
|
|
|
uint64_t DBTestBase::Size(const Slice& start, const Slice& limit, int cf) {
|
|
|
|
Range r(start, limit);
|
|
|
|
uint64_t size;
|
|
|
|
if (cf == 0) {
|
|
|
|
db_->GetApproximateSizes(&r, 1, &size);
|
|
|
|
} else {
|
|
|
|
db_->GetApproximateSizes(handles_[1], &r, 1, &size);
|
|
|
|
}
|
|
|
|
return size;
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::Compact(int cf, const Slice& start, const Slice& limit,
|
|
|
|
uint32_t target_path_id) {
|
|
|
|
CompactRangeOptions compact_options;
|
|
|
|
compact_options.target_path_id = target_path_id;
|
|
|
|
ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit));
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::Compact(int cf, const Slice& start, const Slice& limit) {
|
|
|
|
ASSERT_OK(
|
|
|
|
db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::Compact(const Slice& start, const Slice& limit) {
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Do n memtable compactions, each of which produces an sstable
|
|
|
|
// covering the range [small,large].
|
|
|
|
void DBTestBase::MakeTables(int n, const std::string& small,
|
|
|
|
const std::string& large, int cf) {
|
|
|
|
for (int i = 0; i < n; i++) {
|
|
|
|
ASSERT_OK(Put(cf, small, "begin"));
|
|
|
|
ASSERT_OK(Put(cf, large, "end"));
|
|
|
|
ASSERT_OK(Flush(cf));
|
|
|
|
MoveFilesToLevel(n - i - 1, cf);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Prevent pushing of new sstables into deeper levels by adding
|
|
|
|
// tables that cover a specified range to all levels.
|
|
|
|
void DBTestBase::FillLevels(const std::string& smallest,
|
|
|
|
const std::string& largest, int cf) {
|
|
|
|
MakeTables(db_->NumberLevels(handles_[cf]), smallest, largest, cf);
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::MoveFilesToLevel(int level, int cf) {
|
|
|
|
for (int l = 0; l < level; ++l) {
|
|
|
|
if (cf > 0) {
|
|
|
|
dbfull()->TEST_CompactRange(l, nullptr, nullptr, handles_[cf]);
|
|
|
|
} else {
|
|
|
|
dbfull()->TEST_CompactRange(l, nullptr, nullptr);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
void DBTestBase::DumpFileCounts(const char* label) {
|
|
|
|
fprintf(stderr, "---\n%s:\n", label);
|
|
|
|
fprintf(stderr, "maxoverlap: %" PRIu64 "\n",
|
|
|
|
dbfull()->TEST_MaxNextLevelOverlappingBytes());
|
|
|
|
for (int level = 0; level < db_->NumberLevels(); level++) {
|
|
|
|
int num = NumTableFilesAtLevel(level);
|
|
|
|
if (num > 0) {
|
|
|
|
fprintf(stderr, " level %3d : %d files\n", level, num);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
|
|
|
|
std::string DBTestBase::DumpSSTableList() {
|
|
|
|
std::string property;
|
|
|
|
db_->GetProperty("rocksdb.sstables", &property);
|
|
|
|
return property;
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::GetSstFiles(Env* env, std::string path,
|
|
|
|
std::vector<std::string>* files) {
|
|
|
|
env->GetChildren(path, files);
|
|
|
|
|
|
|
|
files->erase(
|
|
|
|
std::remove_if(files->begin(), files->end(), [](std::string name) {
|
|
|
|
uint64_t number;
|
|
|
|
FileType type;
|
|
|
|
return !(ParseFileName(name, &number, &type) && type == kTableFile);
|
|
|
|
}), files->end());
|
|
|
|
}
|
|
|
|
|
|
|
|
int DBTestBase::GetSstFileCount(std::string path) {
|
|
|
|
std::vector<std::string> files;
|
|
|
|
DBTestBase::GetSstFiles(env_, path, &files);
|
|
|
|
return static_cast<int>(files.size());
|
|
|
|
}
|
|
|
|
|
|
|
|
// this will generate non-overlapping files since it keeps increasing key_idx
|
|
|
|
void DBTestBase::GenerateNewFile(int cf, Random* rnd, int* key_idx,
|
|
|
|
bool nowait) {
|
|
|
|
for (int i = 0; i < KNumKeysByGenerateNewFile; i++) {
|
|
|
|
ASSERT_OK(Put(cf, Key(*key_idx), rnd->RandomString((i == 99) ? 1 : 990)));
|
|
|
|
(*key_idx)++;
|
|
|
|
}
|
|
|
|
if (!nowait) {
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable();
|
|
|
|
dbfull()->TEST_WaitForCompact();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// this will generate non-overlapping files since it keeps increasing key_idx
|
|
|
|
void DBTestBase::GenerateNewFile(Random* rnd, int* key_idx, bool nowait) {
|
|
|
|
for (int i = 0; i < KNumKeysByGenerateNewFile; i++) {
|
|
|
|
ASSERT_OK(Put(Key(*key_idx), rnd->RandomString((i == 99) ? 1 : 990)));
|
|
|
|
(*key_idx)++;
|
|
|
|
}
|
|
|
|
if (!nowait) {
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable();
|
|
|
|
dbfull()->TEST_WaitForCompact();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const int DBTestBase::kNumKeysByGenerateNewRandomFile = 51;
|
|
|
|
|
|
|
|
void DBTestBase::GenerateNewRandomFile(Random* rnd, bool nowait) {
|
|
|
|
for (int i = 0; i < kNumKeysByGenerateNewRandomFile; i++) {
|
|
|
|
ASSERT_OK(Put("key" + rnd->RandomString(7), rnd->RandomString(2000)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Put("key" + rnd->RandomString(7), rnd->RandomString(200)));
|
|
|
|
if (!nowait) {
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable();
|
|
|
|
dbfull()->TEST_WaitForCompact();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string DBTestBase::IterStatus(Iterator* iter) {
|
|
|
|
std::string result;
|
|
|
|
if (iter->Valid()) {
|
|
|
|
result = iter->key().ToString() + "->" + iter->value().ToString();
|
|
|
|
} else {
|
|
|
|
result = "(invalid)";
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
Options DBTestBase::OptionsForLogIterTest() {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.create_if_missing = true;
|
|
|
|
options.WAL_ttl_seconds = 1000;
|
|
|
|
return options;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string DBTestBase::DummyString(size_t len, char c) {
|
|
|
|
return std::string(len, c);
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::VerifyIterLast(std::string expected_key, int cf) {
|
|
|
|
Iterator* iter;
|
|
|
|
ReadOptions ro;
|
|
|
|
if (cf == 0) {
|
|
|
|
iter = db_->NewIterator(ro);
|
|
|
|
} else {
|
|
|
|
iter = db_->NewIterator(ro, handles_[cf]);
|
|
|
|
}
|
|
|
|
iter->SeekToLast();
|
|
|
|
ASSERT_EQ(IterStatus(iter), expected_key);
|
|
|
|
delete iter;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Used to test InplaceUpdate
|
|
|
|
|
|
|
|
// If previous value is nullptr or delta is > than previous value,
|
|
|
|
// sets newValue with delta
|
|
|
|
// If previous value is not empty,
|
|
|
|
// updates previous value with 'b' string of previous value size - 1.
|
|
|
|
UpdateStatus DBTestBase::updateInPlaceSmallerSize(char* prevValue,
|
|
|
|
uint32_t* prevSize,
|
|
|
|
Slice delta,
|
|
|
|
std::string* newValue) {
|
|
|
|
if (prevValue == nullptr) {
|
|
|
|
*newValue = std::string(delta.size(), 'c');
|
|
|
|
return UpdateStatus::UPDATED;
|
|
|
|
} else {
|
|
|
|
*prevSize = *prevSize - 1;
|
|
|
|
std::string str_b = std::string(*prevSize, 'b');
|
|
|
|
memcpy(prevValue, str_b.c_str(), str_b.size());
|
|
|
|
return UpdateStatus::UPDATED_INPLACE;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
UpdateStatus DBTestBase::updateInPlaceSmallerVarintSize(char* prevValue,
|
|
|
|
uint32_t* prevSize,
|
|
|
|
Slice delta,
|
|
|
|
std::string* newValue) {
|
|
|
|
if (prevValue == nullptr) {
|
|
|
|
*newValue = std::string(delta.size(), 'c');
|
|
|
|
return UpdateStatus::UPDATED;
|
|
|
|
} else {
|
|
|
|
*prevSize = 1;
|
|
|
|
std::string str_b = std::string(*prevSize, 'b');
|
|
|
|
memcpy(prevValue, str_b.c_str(), str_b.size());
|
|
|
|
return UpdateStatus::UPDATED_INPLACE;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
UpdateStatus DBTestBase::updateInPlaceLargerSize(char* /*prevValue*/,
|
|
|
|
uint32_t* /*prevSize*/,
|
|
|
|
Slice delta,
|
|
|
|
std::string* newValue) {
|
|
|
|
*newValue = std::string(delta.size(), 'c');
|
|
|
|
return UpdateStatus::UPDATED;
|
|
|
|
}
|
|
|
|
|
|
|
|
UpdateStatus DBTestBase::updateInPlaceNoAction(char* /*prevValue*/,
|
|
|
|
uint32_t* /*prevSize*/,
|
|
|
|
Slice /*delta*/,
|
|
|
|
std::string* /*newValue*/) {
|
|
|
|
return UpdateStatus::UPDATE_FAILED;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Utility method to test InplaceUpdate
|
|
|
|
void DBTestBase::validateNumberOfEntries(int numValues, int cf) {
|
|
|
|
Arena arena;
|
|
|
|
auto options = CurrentOptions();
|
|
|
|
InternalKeyComparator icmp(options.comparator);
|
|
|
|
ReadRangeDelAggregator range_del_agg(&icmp,
|
|
|
|
kMaxSequenceNumber /* upper_bound */);
|
|
|
|
// This should be defined after range_del_agg so that it destructs the
|
|
|
|
// assigned iterator before it range_del_agg is already destructed.
|
|
|
|
ScopedArenaIterator iter;
|
|
|
|
if (cf != 0) {
|
|
|
|
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg,
|
|
|
|
kMaxSequenceNumber, handles_[cf]));
|
|
|
|
} else {
|
|
|
|
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg,
|
|
|
|
kMaxSequenceNumber));
|
|
|
|
}
|
|
|
|
iter->SeekToFirst();
|
|
|
|
ASSERT_EQ(iter->status().ok(), true);
|
|
|
|
int seq = numValues;
|
|
|
|
while (iter->Valid()) {
|
|
|
|
ParsedInternalKey ikey;
|
|
|
|
ikey.clear();
|
|
|
|
ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
|
|
|
|
|
|
|
|
// checks sequence number for updates
|
|
|
|
ASSERT_EQ(ikey.sequence, (unsigned)seq--);
|
|
|
|
iter->Next();
|
|
|
|
}
|
|
|
|
ASSERT_EQ(0, seq);
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::CopyFile(const std::string& source,
|
|
|
|
const std::string& destination, uint64_t size) {
|
|
|
|
const EnvOptions soptions;
|
|
|
|
std::unique_ptr<SequentialFile> srcfile;
|
|
|
|
ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions));
|
|
|
|
std::unique_ptr<WritableFile> destfile;
|
|
|
|
ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions));
|
|
|
|
|
|
|
|
if (size == 0) {
|
|
|
|
// default argument means copy everything
|
|
|
|
ASSERT_OK(env_->GetFileSize(source, &size));
|
|
|
|
}
|
|
|
|
|
|
|
|
char buffer[4096];
|
|
|
|
Slice slice;
|
|
|
|
while (size > 0) {
|
|
|
|
uint64_t one = std::min(uint64_t(sizeof(buffer)), size);
|
|
|
|
ASSERT_OK(srcfile->Read(one, &slice, buffer));
|
|
|
|
ASSERT_OK(destfile->Append(slice));
|
|
|
|
size -= slice.size();
|
|
|
|
}
|
|
|
|
ASSERT_OK(destfile->Close());
|
|
|
|
}
|
|
|
|
|
|
|
|
std::unordered_map<std::string, uint64_t> DBTestBase::GetAllSSTFiles(
|
|
|
|
uint64_t* total_size) {
|
|
|
|
std::unordered_map<std::string, uint64_t> res;
|
|
|
|
|
|
|
|
if (total_size) {
|
|
|
|
*total_size = 0;
|
|
|
|
}
|
|
|
|
std::vector<std::string> files;
|
|
|
|
env_->GetChildren(dbname_, &files);
|
|
|
|
for (auto& file_name : files) {
|
|
|
|
uint64_t number;
|
|
|
|
FileType type;
|
|
|
|
std::string file_path = dbname_ + "/" + file_name;
|
|
|
|
if (ParseFileName(file_name, &number, &type) && type == kTableFile) {
|
|
|
|
uint64_t file_size = 0;
|
|
|
|
env_->GetFileSize(file_path, &file_size);
|
|
|
|
res[file_path] = file_size;
|
|
|
|
if (total_size) {
|
|
|
|
*total_size += file_size;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::vector<std::uint64_t> DBTestBase::ListTableFiles(Env* env,
|
|
|
|
const std::string& path) {
|
|
|
|
std::vector<std::string> files;
|
|
|
|
std::vector<uint64_t> file_numbers;
|
|
|
|
env->GetChildren(path, &files);
|
|
|
|
uint64_t number;
|
|
|
|
FileType type;
|
|
|
|
for (size_t i = 0; i < files.size(); ++i) {
|
|
|
|
if (ParseFileName(files[i], &number, &type)) {
|
|
|
|
if (type == kTableFile) {
|
|
|
|
file_numbers.push_back(number);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return file_numbers;
|
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
|
|
|
|
size_t* total_reads_res, bool tailing_iter,
|
|
|
|
std::map<std::string, Status> status) {
|
|
|
|
size_t total_reads = 0;
|
|
|
|
|
Introduce FullMergeV2 (eliminate memcpy from merge operators)
Summary:
This diff update the code to pin the merge operator operands while the merge operation is done, so that we can eliminate the memcpy cost, to do that we need a new public API for FullMerge that replace the std::deque<std::string> with std::vector<Slice>
This diff is stacked on top of D56493 and D56511
In this diff we
- Update FullMergeV2 arguments to be encapsulated in MergeOperationInput and MergeOperationOutput which will make it easier to add new arguments in the future
- Replace std::deque<std::string> with std::vector<Slice> to pass operands
- Replace MergeContext std::deque with std::vector (based on a simple benchmark I ran https://gist.github.com/IslamAbdelRahman/78fc86c9ab9f52b1df791e58943fb187)
- Allow FullMergeV2 output to be an existing operand
```
[Everything in Memtable | 10K operands | 10 KB each | 1 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=10000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000
[FullMergeV2]
readseq : 0.607 micros/op 1648235 ops/sec; 16121.2 MB/s
readseq : 0.478 micros/op 2091546 ops/sec; 20457.2 MB/s
readseq : 0.252 micros/op 3972081 ops/sec; 38850.5 MB/s
readseq : 0.237 micros/op 4218328 ops/sec; 41259.0 MB/s
readseq : 0.247 micros/op 4043927 ops/sec; 39553.2 MB/s
[master]
readseq : 3.935 micros/op 254140 ops/sec; 2485.7 MB/s
readseq : 3.722 micros/op 268657 ops/sec; 2627.7 MB/s
readseq : 3.149 micros/op 317605 ops/sec; 3106.5 MB/s
readseq : 3.125 micros/op 320024 ops/sec; 3130.1 MB/s
readseq : 4.075 micros/op 245374 ops/sec; 2400.0 MB/s
```
```
[Everything in Memtable | 10K operands | 10 KB each | 10 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=1000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000
[FullMergeV2]
readseq : 3.472 micros/op 288018 ops/sec; 2817.1 MB/s
readseq : 2.304 micros/op 434027 ops/sec; 4245.2 MB/s
readseq : 1.163 micros/op 859845 ops/sec; 8410.0 MB/s
readseq : 1.192 micros/op 838926 ops/sec; 8205.4 MB/s
readseq : 1.250 micros/op 800000 ops/sec; 7824.7 MB/s
[master]
readseq : 24.025 micros/op 41623 ops/sec; 407.1 MB/s
readseq : 18.489 micros/op 54086 ops/sec; 529.0 MB/s
readseq : 18.693 micros/op 53495 ops/sec; 523.2 MB/s
readseq : 23.621 micros/op 42335 ops/sec; 414.1 MB/s
readseq : 18.775 micros/op 53262 ops/sec; 521.0 MB/s
```
```
[Everything in Block cache | 10K operands | 10 KB each | 1 operand per key]
[FullMergeV2]
$ DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions
readseq : 14.741 micros/op 67837 ops/sec; 663.5 MB/s
readseq : 1.029 micros/op 971446 ops/sec; 9501.6 MB/s
readseq : 0.974 micros/op 1026229 ops/sec; 10037.4 MB/s
readseq : 0.965 micros/op 1036080 ops/sec; 10133.8 MB/s
readseq : 0.943 micros/op 1060657 ops/sec; 10374.2 MB/s
[master]
readseq : 16.735 micros/op 59755 ops/sec; 584.5 MB/s
readseq : 3.029 micros/op 330151 ops/sec; 3229.2 MB/s
readseq : 3.136 micros/op 318883 ops/sec; 3119.0 MB/s
readseq : 3.065 micros/op 326245 ops/sec; 3191.0 MB/s
readseq : 3.014 micros/op 331813 ops/sec; 3245.4 MB/s
```
```
[Everything in Block cache | 10K operands | 10 KB each | 10 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10-operands-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions
[FullMergeV2]
readseq : 24.325 micros/op 41109 ops/sec; 402.1 MB/s
readseq : 1.470 micros/op 680272 ops/sec; 6653.7 MB/s
readseq : 1.231 micros/op 812347 ops/sec; 7945.5 MB/s
readseq : 1.091 micros/op 916590 ops/sec; 8965.1 MB/s
readseq : 1.109 micros/op 901713 ops/sec; 8819.6 MB/s
[master]
readseq : 27.257 micros/op 36687 ops/sec; 358.8 MB/s
readseq : 4.443 micros/op 225073 ops/sec; 2201.4 MB/s
readseq : 5.830 micros/op 171526 ops/sec; 1677.7 MB/s
readseq : 4.173 micros/op 239635 ops/sec; 2343.8 MB/s
readseq : 4.150 micros/op 240963 ops/sec; 2356.8 MB/s
```
Test Plan: COMPILE_WITH_ASAN=1 make check -j64
Reviewers: yhchiang, andrewkr, sdong
Reviewed By: sdong
Subscribers: lovro, andrewkr, dhruba
Differential Revision: https://reviews.facebook.net/D57075
8 years ago
|
|
|
for (auto& kv : true_data) {
|
|
|
|
Status s = status[kv.first];
|
|
|
|
if (s.ok()) {
|
|
|
|
ASSERT_EQ(Get(kv.first), kv.second);
|
|
|
|
} else {
|
|
|
|
std::string value;
|
|
|
|
ASSERT_EQ(s, db_->Get(ReadOptions(), kv.first, &value));
|
|
|
|
}
|
|
|
|
total_reads++;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Normal Iterator
|
|
|
|
{
|
|
|
|
int iter_cnt = 0;
|
|
|
|
ReadOptions ro;
|
|
|
|
ro.total_order_seek = true;
|
|
|
|
Iterator* iter = db_->NewIterator(ro);
|
|
|
|
// Verify Iterator::Next()
|
|
|
|
iter_cnt = 0;
|
|
|
|
auto data_iter = true_data.begin();
|
|
|
|
Status s;
|
|
|
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) {
|
|
|
|
ASSERT_EQ(iter->key().ToString(), data_iter->first);
|
|
|
|
Status current_status = status[data_iter->first];
|
|
|
|
if (!current_status.ok()) {
|
|
|
|
s = current_status;
|
|
|
|
}
|
|
|
|
ASSERT_EQ(iter->status(), s);
|
|
|
|
if (current_status.ok()) {
|
|
|
|
ASSERT_EQ(iter->value().ToString(), data_iter->second);
|
|
|
|
}
|
|
|
|
iter_cnt++;
|
|
|
|
total_reads++;
|
|
|
|
}
|
|
|
|
ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / "
|
|
|
|
<< true_data.size();
|
|
|
|
delete iter;
|
|
|
|
|
|
|
|
// Verify Iterator::Prev()
|
|
|
|
// Use a new iterator to make sure its status is clean.
|
|
|
|
iter = db_->NewIterator(ro);
|
|
|
|
iter_cnt = 0;
|
|
|
|
s = Status::OK();
|
|
|
|
auto data_rev = true_data.rbegin();
|
|
|
|
for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) {
|
|
|
|
ASSERT_EQ(iter->key().ToString(), data_rev->first);
|
|
|
|
Status current_status = status[data_rev->first];
|
|
|
|
if (!current_status.ok()) {
|
|
|
|
s = current_status;
|
|
|
|
}
|
|
|
|
ASSERT_EQ(iter->status(), s);
|
|
|
|
if (current_status.ok()) {
|
|
|
|
ASSERT_EQ(iter->value().ToString(), data_rev->second);
|
|
|
|
}
|
|
|
|
iter_cnt++;
|
|
|
|
total_reads++;
|
|
|
|
}
|
|
|
|
ASSERT_EQ(data_rev, true_data.rend()) << iter_cnt << " / "
|
|
|
|
<< true_data.size();
|
|
|
|
|
|
|
|
// Verify Iterator::Seek()
|
|
|
|
for (auto kv : true_data) {
|
|
|
|
iter->Seek(kv.first);
|
|
|
|
ASSERT_EQ(kv.first, iter->key().ToString());
|
|
|
|
ASSERT_EQ(kv.second, iter->value().ToString());
|
|
|
|
total_reads++;
|
|
|
|
}
|
|
|
|
delete iter;
|
Introduce FullMergeV2 (eliminate memcpy from merge operators)
Summary:
This diff update the code to pin the merge operator operands while the merge operation is done, so that we can eliminate the memcpy cost, to do that we need a new public API for FullMerge that replace the std::deque<std::string> with std::vector<Slice>
This diff is stacked on top of D56493 and D56511
In this diff we
- Update FullMergeV2 arguments to be encapsulated in MergeOperationInput and MergeOperationOutput which will make it easier to add new arguments in the future
- Replace std::deque<std::string> with std::vector<Slice> to pass operands
- Replace MergeContext std::deque with std::vector (based on a simple benchmark I ran https://gist.github.com/IslamAbdelRahman/78fc86c9ab9f52b1df791e58943fb187)
- Allow FullMergeV2 output to be an existing operand
```
[Everything in Memtable | 10K operands | 10 KB each | 1 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=10000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000
[FullMergeV2]
readseq : 0.607 micros/op 1648235 ops/sec; 16121.2 MB/s
readseq : 0.478 micros/op 2091546 ops/sec; 20457.2 MB/s
readseq : 0.252 micros/op 3972081 ops/sec; 38850.5 MB/s
readseq : 0.237 micros/op 4218328 ops/sec; 41259.0 MB/s
readseq : 0.247 micros/op 4043927 ops/sec; 39553.2 MB/s
[master]
readseq : 3.935 micros/op 254140 ops/sec; 2485.7 MB/s
readseq : 3.722 micros/op 268657 ops/sec; 2627.7 MB/s
readseq : 3.149 micros/op 317605 ops/sec; 3106.5 MB/s
readseq : 3.125 micros/op 320024 ops/sec; 3130.1 MB/s
readseq : 4.075 micros/op 245374 ops/sec; 2400.0 MB/s
```
```
[Everything in Memtable | 10K operands | 10 KB each | 10 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=1000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000
[FullMergeV2]
readseq : 3.472 micros/op 288018 ops/sec; 2817.1 MB/s
readseq : 2.304 micros/op 434027 ops/sec; 4245.2 MB/s
readseq : 1.163 micros/op 859845 ops/sec; 8410.0 MB/s
readseq : 1.192 micros/op 838926 ops/sec; 8205.4 MB/s
readseq : 1.250 micros/op 800000 ops/sec; 7824.7 MB/s
[master]
readseq : 24.025 micros/op 41623 ops/sec; 407.1 MB/s
readseq : 18.489 micros/op 54086 ops/sec; 529.0 MB/s
readseq : 18.693 micros/op 53495 ops/sec; 523.2 MB/s
readseq : 23.621 micros/op 42335 ops/sec; 414.1 MB/s
readseq : 18.775 micros/op 53262 ops/sec; 521.0 MB/s
```
```
[Everything in Block cache | 10K operands | 10 KB each | 1 operand per key]
[FullMergeV2]
$ DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions
readseq : 14.741 micros/op 67837 ops/sec; 663.5 MB/s
readseq : 1.029 micros/op 971446 ops/sec; 9501.6 MB/s
readseq : 0.974 micros/op 1026229 ops/sec; 10037.4 MB/s
readseq : 0.965 micros/op 1036080 ops/sec; 10133.8 MB/s
readseq : 0.943 micros/op 1060657 ops/sec; 10374.2 MB/s
[master]
readseq : 16.735 micros/op 59755 ops/sec; 584.5 MB/s
readseq : 3.029 micros/op 330151 ops/sec; 3229.2 MB/s
readseq : 3.136 micros/op 318883 ops/sec; 3119.0 MB/s
readseq : 3.065 micros/op 326245 ops/sec; 3191.0 MB/s
readseq : 3.014 micros/op 331813 ops/sec; 3245.4 MB/s
```
```
[Everything in Block cache | 10K operands | 10 KB each | 10 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10-operands-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions
[FullMergeV2]
readseq : 24.325 micros/op 41109 ops/sec; 402.1 MB/s
readseq : 1.470 micros/op 680272 ops/sec; 6653.7 MB/s
readseq : 1.231 micros/op 812347 ops/sec; 7945.5 MB/s
readseq : 1.091 micros/op 916590 ops/sec; 8965.1 MB/s
readseq : 1.109 micros/op 901713 ops/sec; 8819.6 MB/s
[master]
readseq : 27.257 micros/op 36687 ops/sec; 358.8 MB/s
readseq : 4.443 micros/op 225073 ops/sec; 2201.4 MB/s
readseq : 5.830 micros/op 171526 ops/sec; 1677.7 MB/s
readseq : 4.173 micros/op 239635 ops/sec; 2343.8 MB/s
readseq : 4.150 micros/op 240963 ops/sec; 2356.8 MB/s
```
Test Plan: COMPILE_WITH_ASAN=1 make check -j64
Reviewers: yhchiang, andrewkr, sdong
Reviewed By: sdong
Subscribers: lovro, andrewkr, dhruba
Differential Revision: https://reviews.facebook.net/D57075
8 years ago
|
|
|
}
|
|
|
|
|
|
|
|
if (tailing_iter) {
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
// Tailing iterator
|
|
|
|
int iter_cnt = 0;
|
|
|
|
ReadOptions ro;
|
|
|
|
ro.tailing = true;
|
|
|
|
ro.total_order_seek = true;
|
|
|
|
Iterator* iter = db_->NewIterator(ro);
|
Introduce FullMergeV2 (eliminate memcpy from merge operators)
Summary:
This diff update the code to pin the merge operator operands while the merge operation is done, so that we can eliminate the memcpy cost, to do that we need a new public API for FullMerge that replace the std::deque<std::string> with std::vector<Slice>
This diff is stacked on top of D56493 and D56511
In this diff we
- Update FullMergeV2 arguments to be encapsulated in MergeOperationInput and MergeOperationOutput which will make it easier to add new arguments in the future
- Replace std::deque<std::string> with std::vector<Slice> to pass operands
- Replace MergeContext std::deque with std::vector (based on a simple benchmark I ran https://gist.github.com/IslamAbdelRahman/78fc86c9ab9f52b1df791e58943fb187)
- Allow FullMergeV2 output to be an existing operand
```
[Everything in Memtable | 10K operands | 10 KB each | 1 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=10000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000
[FullMergeV2]
readseq : 0.607 micros/op 1648235 ops/sec; 16121.2 MB/s
readseq : 0.478 micros/op 2091546 ops/sec; 20457.2 MB/s
readseq : 0.252 micros/op 3972081 ops/sec; 38850.5 MB/s
readseq : 0.237 micros/op 4218328 ops/sec; 41259.0 MB/s
readseq : 0.247 micros/op 4043927 ops/sec; 39553.2 MB/s
[master]
readseq : 3.935 micros/op 254140 ops/sec; 2485.7 MB/s
readseq : 3.722 micros/op 268657 ops/sec; 2627.7 MB/s
readseq : 3.149 micros/op 317605 ops/sec; 3106.5 MB/s
readseq : 3.125 micros/op 320024 ops/sec; 3130.1 MB/s
readseq : 4.075 micros/op 245374 ops/sec; 2400.0 MB/s
```
```
[Everything in Memtable | 10K operands | 10 KB each | 10 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=1000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000
[FullMergeV2]
readseq : 3.472 micros/op 288018 ops/sec; 2817.1 MB/s
readseq : 2.304 micros/op 434027 ops/sec; 4245.2 MB/s
readseq : 1.163 micros/op 859845 ops/sec; 8410.0 MB/s
readseq : 1.192 micros/op 838926 ops/sec; 8205.4 MB/s
readseq : 1.250 micros/op 800000 ops/sec; 7824.7 MB/s
[master]
readseq : 24.025 micros/op 41623 ops/sec; 407.1 MB/s
readseq : 18.489 micros/op 54086 ops/sec; 529.0 MB/s
readseq : 18.693 micros/op 53495 ops/sec; 523.2 MB/s
readseq : 23.621 micros/op 42335 ops/sec; 414.1 MB/s
readseq : 18.775 micros/op 53262 ops/sec; 521.0 MB/s
```
```
[Everything in Block cache | 10K operands | 10 KB each | 1 operand per key]
[FullMergeV2]
$ DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions
readseq : 14.741 micros/op 67837 ops/sec; 663.5 MB/s
readseq : 1.029 micros/op 971446 ops/sec; 9501.6 MB/s
readseq : 0.974 micros/op 1026229 ops/sec; 10037.4 MB/s
readseq : 0.965 micros/op 1036080 ops/sec; 10133.8 MB/s
readseq : 0.943 micros/op 1060657 ops/sec; 10374.2 MB/s
[master]
readseq : 16.735 micros/op 59755 ops/sec; 584.5 MB/s
readseq : 3.029 micros/op 330151 ops/sec; 3229.2 MB/s
readseq : 3.136 micros/op 318883 ops/sec; 3119.0 MB/s
readseq : 3.065 micros/op 326245 ops/sec; 3191.0 MB/s
readseq : 3.014 micros/op 331813 ops/sec; 3245.4 MB/s
```
```
[Everything in Block cache | 10K operands | 10 KB each | 10 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10-operands-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions
[FullMergeV2]
readseq : 24.325 micros/op 41109 ops/sec; 402.1 MB/s
readseq : 1.470 micros/op 680272 ops/sec; 6653.7 MB/s
readseq : 1.231 micros/op 812347 ops/sec; 7945.5 MB/s
readseq : 1.091 micros/op 916590 ops/sec; 8965.1 MB/s
readseq : 1.109 micros/op 901713 ops/sec; 8819.6 MB/s
[master]
readseq : 27.257 micros/op 36687 ops/sec; 358.8 MB/s
readseq : 4.443 micros/op 225073 ops/sec; 2201.4 MB/s
readseq : 5.830 micros/op 171526 ops/sec; 1677.7 MB/s
readseq : 4.173 micros/op 239635 ops/sec; 2343.8 MB/s
readseq : 4.150 micros/op 240963 ops/sec; 2356.8 MB/s
```
Test Plan: COMPILE_WITH_ASAN=1 make check -j64
Reviewers: yhchiang, andrewkr, sdong
Reviewed By: sdong
Subscribers: lovro, andrewkr, dhruba
Differential Revision: https://reviews.facebook.net/D57075
8 years ago
|
|
|
|
|
|
|
// Verify ForwardIterator::Next()
|
|
|
|
iter_cnt = 0;
|
|
|
|
auto data_iter = true_data.begin();
|
|
|
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) {
|
|
|
|
ASSERT_EQ(iter->key().ToString(), data_iter->first);
|
|
|
|
ASSERT_EQ(iter->value().ToString(), data_iter->second);
|
|
|
|
iter_cnt++;
|
|
|
|
total_reads++;
|
|
|
|
}
|
|
|
|
ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / "
|
|
|
|
<< true_data.size();
|
|
|
|
|
|
|
|
// Verify ForwardIterator::Seek()
|
|
|
|
for (auto kv : true_data) {
|
|
|
|
iter->Seek(kv.first);
|
|
|
|
ASSERT_EQ(kv.first, iter->key().ToString());
|
|
|
|
ASSERT_EQ(kv.second, iter->value().ToString());
|
|
|
|
total_reads++;
|
|
|
|
}
|
|
|
|
|
|
|
|
delete iter;
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
}
|
|
|
|
|
|
|
|
if (total_reads_res) {
|
|
|
|
*total_reads_res = total_reads;
|
|
|
|
}
|
Introduce FullMergeV2 (eliminate memcpy from merge operators)
Summary:
This diff update the code to pin the merge operator operands while the merge operation is done, so that we can eliminate the memcpy cost, to do that we need a new public API for FullMerge that replace the std::deque<std::string> with std::vector<Slice>
This diff is stacked on top of D56493 and D56511
In this diff we
- Update FullMergeV2 arguments to be encapsulated in MergeOperationInput and MergeOperationOutput which will make it easier to add new arguments in the future
- Replace std::deque<std::string> with std::vector<Slice> to pass operands
- Replace MergeContext std::deque with std::vector (based on a simple benchmark I ran https://gist.github.com/IslamAbdelRahman/78fc86c9ab9f52b1df791e58943fb187)
- Allow FullMergeV2 output to be an existing operand
```
[Everything in Memtable | 10K operands | 10 KB each | 1 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=10000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000
[FullMergeV2]
readseq : 0.607 micros/op 1648235 ops/sec; 16121.2 MB/s
readseq : 0.478 micros/op 2091546 ops/sec; 20457.2 MB/s
readseq : 0.252 micros/op 3972081 ops/sec; 38850.5 MB/s
readseq : 0.237 micros/op 4218328 ops/sec; 41259.0 MB/s
readseq : 0.247 micros/op 4043927 ops/sec; 39553.2 MB/s
[master]
readseq : 3.935 micros/op 254140 ops/sec; 2485.7 MB/s
readseq : 3.722 micros/op 268657 ops/sec; 2627.7 MB/s
readseq : 3.149 micros/op 317605 ops/sec; 3106.5 MB/s
readseq : 3.125 micros/op 320024 ops/sec; 3130.1 MB/s
readseq : 4.075 micros/op 245374 ops/sec; 2400.0 MB/s
```
```
[Everything in Memtable | 10K operands | 10 KB each | 10 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=1000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000
[FullMergeV2]
readseq : 3.472 micros/op 288018 ops/sec; 2817.1 MB/s
readseq : 2.304 micros/op 434027 ops/sec; 4245.2 MB/s
readseq : 1.163 micros/op 859845 ops/sec; 8410.0 MB/s
readseq : 1.192 micros/op 838926 ops/sec; 8205.4 MB/s
readseq : 1.250 micros/op 800000 ops/sec; 7824.7 MB/s
[master]
readseq : 24.025 micros/op 41623 ops/sec; 407.1 MB/s
readseq : 18.489 micros/op 54086 ops/sec; 529.0 MB/s
readseq : 18.693 micros/op 53495 ops/sec; 523.2 MB/s
readseq : 23.621 micros/op 42335 ops/sec; 414.1 MB/s
readseq : 18.775 micros/op 53262 ops/sec; 521.0 MB/s
```
```
[Everything in Block cache | 10K operands | 10 KB each | 1 operand per key]
[FullMergeV2]
$ DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions
readseq : 14.741 micros/op 67837 ops/sec; 663.5 MB/s
readseq : 1.029 micros/op 971446 ops/sec; 9501.6 MB/s
readseq : 0.974 micros/op 1026229 ops/sec; 10037.4 MB/s
readseq : 0.965 micros/op 1036080 ops/sec; 10133.8 MB/s
readseq : 0.943 micros/op 1060657 ops/sec; 10374.2 MB/s
[master]
readseq : 16.735 micros/op 59755 ops/sec; 584.5 MB/s
readseq : 3.029 micros/op 330151 ops/sec; 3229.2 MB/s
readseq : 3.136 micros/op 318883 ops/sec; 3119.0 MB/s
readseq : 3.065 micros/op 326245 ops/sec; 3191.0 MB/s
readseq : 3.014 micros/op 331813 ops/sec; 3245.4 MB/s
```
```
[Everything in Block cache | 10K operands | 10 KB each | 10 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10-operands-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions
[FullMergeV2]
readseq : 24.325 micros/op 41109 ops/sec; 402.1 MB/s
readseq : 1.470 micros/op 680272 ops/sec; 6653.7 MB/s
readseq : 1.231 micros/op 812347 ops/sec; 7945.5 MB/s
readseq : 1.091 micros/op 916590 ops/sec; 8965.1 MB/s
readseq : 1.109 micros/op 901713 ops/sec; 8819.6 MB/s
[master]
readseq : 27.257 micros/op 36687 ops/sec; 358.8 MB/s
readseq : 4.443 micros/op 225073 ops/sec; 2201.4 MB/s
readseq : 5.830 micros/op 171526 ops/sec; 1677.7 MB/s
readseq : 4.173 micros/op 239635 ops/sec; 2343.8 MB/s
readseq : 4.150 micros/op 240963 ops/sec; 2356.8 MB/s
```
Test Plan: COMPILE_WITH_ASAN=1 make check -j64
Reviewers: yhchiang, andrewkr, sdong
Reviewed By: sdong
Subscribers: lovro, andrewkr, dhruba
Differential Revision: https://reviews.facebook.net/D57075
8 years ago
|
|
|
}
|
|
|
|
|
|
|
|
void DBTestBase::VerifyDBInternal(
|
|
|
|
std::vector<std::pair<std::string, std::string>> true_data) {
|
|
|
|
Arena arena;
|
|
|
|
InternalKeyComparator icmp(last_options_.comparator);
|
|
|
|
ReadRangeDelAggregator range_del_agg(&icmp,
|
|
|
|
kMaxSequenceNumber /* upper_bound */);
|
|
|
|
auto iter =
|
|
|
|
dbfull()->NewInternalIterator(&arena, &range_del_agg, kMaxSequenceNumber);
|
|
|
|
iter->SeekToFirst();
|
|
|
|
for (auto p : true_data) {
|
|
|
|
ASSERT_TRUE(iter->Valid());
|
|
|
|
ParsedInternalKey ikey;
|
|
|
|
ASSERT_TRUE(ParseInternalKey(iter->key(), &ikey));
|
|
|
|
ASSERT_EQ(p.first, ikey.user_key);
|
|
|
|
ASSERT_EQ(p.second, iter->value());
|
|
|
|
iter->Next();
|
|
|
|
};
|
|
|
|
ASSERT_FALSE(iter->Valid());
|
|
|
|
iter->~InternalIterator();
|
|
|
|
}
|
|
|
|
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
|
|
|
|
uint64_t DBTestBase::GetNumberOfSstFilesForColumnFamily(
|
|
|
|
DB* db, std::string column_family_name) {
|
|
|
|
std::vector<LiveFileMetaData> metadata;
|
|
|
|
db->GetLiveFilesMetaData(&metadata);
|
|
|
|
uint64_t result = 0;
|
|
|
|
for (auto& fileMetadata : metadata) {
|
|
|
|
result += (fileMetadata.column_family_name == column_family_name);
|
|
|
|
}
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|