|
|
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
|
|
// This source code is licensed under the BSD-style license found in the
|
|
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
|
|
//
|
|
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include <atomic>
|
|
|
|
#include <cstdlib>
|
|
|
|
#include <functional>
|
|
|
|
|
|
|
|
#include "db/db_test_util.h"
|
|
|
|
#include "port/stack_trace.h"
|
|
|
|
#include "rocksdb/persistent_cache.h"
|
|
|
|
#include "rocksdb/wal_filter.h"
|
|
|
|
|
|
|
|
namespace rocksdb {
|
|
|
|
|
|
|
|
class DBTest2 : public DBTestBase {
|
|
|
|
public:
|
|
|
|
DBTest2() : DBTestBase("/db_test2") {}
|
|
|
|
};
|
|
|
|
|
|
|
|
class PrefixFullBloomWithReverseComparator
|
|
|
|
: public DBTestBase,
|
|
|
|
public ::testing::WithParamInterface<bool> {
|
|
|
|
public:
|
|
|
|
PrefixFullBloomWithReverseComparator()
|
|
|
|
: DBTestBase("/prefix_bloom_reverse") {}
|
|
|
|
virtual void SetUp() override { if_cache_filter_ = GetParam(); }
|
|
|
|
bool if_cache_filter_;
|
|
|
|
};
|
|
|
|
|
|
|
|
TEST_P(PrefixFullBloomWithReverseComparator,
|
|
|
|
PrefixFullBloomWithReverseComparator) {
|
|
|
|
Options options = last_options_;
|
|
|
|
options.comparator = ReverseBytewiseComparator();
|
|
|
|
options.prefix_extractor.reset(NewCappedPrefixTransform(3));
|
|
|
|
options.statistics = rocksdb::CreateDBStatistics();
|
|
|
|
BlockBasedTableOptions bbto;
|
|
|
|
if (if_cache_filter_) {
|
|
|
|
bbto.no_block_cache = false;
|
|
|
|
bbto.cache_index_and_filter_blocks = true;
|
|
|
|
bbto.block_cache = NewLRUCache(1);
|
|
|
|
}
|
|
|
|
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
|
|
|
bbto.whole_key_filtering = false;
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
ASSERT_OK(dbfull()->Put(WriteOptions(), "bar123", "foo"));
|
|
|
|
ASSERT_OK(dbfull()->Put(WriteOptions(), "bar234", "foo2"));
|
|
|
|
ASSERT_OK(dbfull()->Put(WriteOptions(), "foo123", "foo3"));
|
|
|
|
|
|
|
|
dbfull()->Flush(FlushOptions());
|
|
|
|
|
|
|
|
if (bbto.block_cache) {
|
|
|
|
bbto.block_cache->EraseUnRefEntries();
|
|
|
|
}
|
|
|
|
|
|
|
|
unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
|
|
|
|
iter->Seek("bar345");
|
|
|
|
ASSERT_OK(iter->status());
|
|
|
|
ASSERT_TRUE(iter->Valid());
|
|
|
|
ASSERT_EQ("bar234", iter->key().ToString());
|
|
|
|
ASSERT_EQ("foo2", iter->value().ToString());
|
|
|
|
iter->Next();
|
|
|
|
ASSERT_TRUE(iter->Valid());
|
|
|
|
ASSERT_EQ("bar123", iter->key().ToString());
|
|
|
|
ASSERT_EQ("foo", iter->value().ToString());
|
|
|
|
|
|
|
|
iter->Seek("foo234");
|
|
|
|
ASSERT_OK(iter->status());
|
|
|
|
ASSERT_TRUE(iter->Valid());
|
|
|
|
ASSERT_EQ("foo123", iter->key().ToString());
|
|
|
|
ASSERT_EQ("foo3", iter->value().ToString());
|
|
|
|
|
|
|
|
iter->Seek("bar");
|
|
|
|
ASSERT_OK(iter->status());
|
|
|
|
ASSERT_TRUE(!iter->Valid());
|
|
|
|
}
|
|
|
|
|
|
|
|
INSTANTIATE_TEST_CASE_P(PrefixFullBloomWithReverseComparator,
|
|
|
|
PrefixFullBloomWithReverseComparator, testing::Bool());
|
|
|
|
|
|
|
|
TEST_F(DBTest2, IteratorPropertyVersionNumber) {
|
|
|
|
Put("", "");
|
|
|
|
Iterator* iter1 = db_->NewIterator(ReadOptions());
|
|
|
|
std::string prop_value;
|
|
|
|
ASSERT_OK(
|
|
|
|
iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
|
|
|
|
uint64_t version_number1 =
|
|
|
|
static_cast<uint64_t>(std::atoi(prop_value.c_str()));
|
|
|
|
|
|
|
|
Put("", "");
|
|
|
|
Flush();
|
|
|
|
|
|
|
|
Iterator* iter2 = db_->NewIterator(ReadOptions());
|
|
|
|
ASSERT_OK(
|
|
|
|
iter2->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
|
|
|
|
uint64_t version_number2 =
|
|
|
|
static_cast<uint64_t>(std::atoi(prop_value.c_str()));
|
|
|
|
|
|
|
|
ASSERT_GT(version_number2, version_number1);
|
|
|
|
|
|
|
|
Put("", "");
|
|
|
|
|
|
|
|
Iterator* iter3 = db_->NewIterator(ReadOptions());
|
|
|
|
ASSERT_OK(
|
|
|
|
iter3->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
|
|
|
|
uint64_t version_number3 =
|
|
|
|
static_cast<uint64_t>(std::atoi(prop_value.c_str()));
|
|
|
|
|
|
|
|
ASSERT_EQ(version_number2, version_number3);
|
|
|
|
|
|
|
|
iter1->SeekToFirst();
|
|
|
|
ASSERT_OK(
|
|
|
|
iter1->GetProperty("rocksdb.iterator.super-version-number", &prop_value));
|
|
|
|
uint64_t version_number1_new =
|
|
|
|
static_cast<uint64_t>(std::atoi(prop_value.c_str()));
|
|
|
|
ASSERT_EQ(version_number1, version_number1_new);
|
|
|
|
|
|
|
|
delete iter1;
|
|
|
|
delete iter2;
|
|
|
|
delete iter3;
|
|
|
|
}
|
Index Reader should not be reused after DB restart
Summary:
In block based table reader, wow we put index reader to block cache, which can be retrieved after DB restart. However, index reader may reference internal comparator, which can be destroyed after DB restarts, causing problems.
Fix it by making cache key identical per table reader.
Test Plan: Add a new test which failed with out the commit but now pass.
Reviewers: IslamAbdelRahman
Reviewed By: IslamAbdelRahman
Subscribers: maro, yhchiang, kradhakrishnan, leveldb, andrewkr, dhruba
Differential Revision: https://reviews.facebook.net/D55287
9 years ago
|
|
|
|
|
|
|
TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.create_if_missing = true;
|
|
|
|
options.statistics = rocksdb::CreateDBStatistics();
|
|
|
|
BlockBasedTableOptions table_options;
|
|
|
|
table_options.cache_index_and_filter_blocks = true;
|
|
|
|
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
|
|
|
|
options.table_factory.reset(new BlockBasedTableFactory(table_options));
|
|
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
|
|
|
|
Put(1, "a", "begin");
|
|
|
|
Put(1, "z", "end");
|
|
|
|
ASSERT_OK(Flush(1));
|
|
|
|
TryReopenWithColumnFamilies({"default", "pikachu"}, options);
|
|
|
|
|
|
|
|
std::string value;
|
|
|
|
value = Get(1, "a");
|
|
|
|
}
|
|
|
|
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
class DBTestSharedWriteBufferAcrossCFs
|
|
|
|
: public DBTestBase,
|
|
|
|
public testing::WithParamInterface<bool> {
|
|
|
|
public:
|
|
|
|
DBTestSharedWriteBufferAcrossCFs()
|
|
|
|
: DBTestBase("/db_test_shared_write_buffer") {}
|
|
|
|
void SetUp() override { use_old_interface_ = GetParam(); }
|
|
|
|
bool use_old_interface_;
|
|
|
|
};
|
|
|
|
|
|
|
|
TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
if (use_old_interface_) {
|
|
|
|
options.db_write_buffer_size = 100000; // this is the real limit
|
|
|
|
} else {
|
|
|
|
options.write_buffer_manager.reset(new WriteBufferManager(100000));
|
|
|
|
}
|
|
|
|
options.write_buffer_size = 500000; // this is never hit
|
|
|
|
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
|
|
|
|
|
|
|
|
// Trigger a flush on CF "nikitich"
|
|
|
|
ASSERT_OK(Put(0, Key(1), DummyString(1)));
|
|
|
|
ASSERT_OK(Put(1, Key(1), DummyString(1)));
|
|
|
|
ASSERT_OK(Put(3, Key(1), DummyString(90000)));
|
|
|
|
ASSERT_OK(Put(2, Key(2), DummyString(20000)));
|
|
|
|
ASSERT_OK(Put(2, Key(1), DummyString(1)));
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
|
|
|
|
{
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
|
|
|
|
static_cast<uint64_t>(1));
|
|
|
|
}
|
|
|
|
|
|
|
|
// "dobrynia": 20KB
|
|
|
|
// Flush 'dobrynia'
|
|
|
|
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
|
|
|
|
ASSERT_OK(Put(2, Key(2), DummyString(70000)));
|
|
|
|
ASSERT_OK(Put(0, Key(1), DummyString(1)));
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
|
|
|
|
{
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
|
|
|
|
static_cast<uint64_t>(1));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
|
|
|
|
static_cast<uint64_t>(1));
|
|
|
|
}
|
|
|
|
|
|
|
|
// "nikitich" still has data of 80KB
|
|
|
|
// Inserting Data in "dobrynia" triggers "nikitich" flushing.
|
|
|
|
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
|
|
|
|
ASSERT_OK(Put(2, Key(2), DummyString(40000)));
|
|
|
|
ASSERT_OK(Put(0, Key(1), DummyString(1)));
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
|
|
|
|
{
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
|
|
|
|
static_cast<uint64_t>(1));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
|
|
|
|
static_cast<uint64_t>(2));
|
|
|
|
}
|
|
|
|
|
|
|
|
// "dobrynia" still has 40KB
|
|
|
|
ASSERT_OK(Put(1, Key(2), DummyString(20000)));
|
|
|
|
ASSERT_OK(Put(0, Key(1), DummyString(10000)));
|
|
|
|
ASSERT_OK(Put(0, Key(1), DummyString(1)));
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
|
|
|
|
// This should triggers no flush
|
|
|
|
{
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
|
|
|
|
static_cast<uint64_t>(1));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
|
|
|
|
static_cast<uint64_t>(2));
|
|
|
|
}
|
|
|
|
|
|
|
|
// "default": 10KB, "pikachu": 20KB, "dobrynia": 40KB
|
|
|
|
ASSERT_OK(Put(1, Key(2), DummyString(40000)));
|
|
|
|
ASSERT_OK(Put(0, Key(1), DummyString(1)));
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
|
|
|
|
// This should triggers flush of "pikachu"
|
|
|
|
{
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
|
|
|
|
static_cast<uint64_t>(1));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
|
|
|
|
static_cast<uint64_t>(1));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
|
|
|
|
static_cast<uint64_t>(2));
|
|
|
|
}
|
|
|
|
|
|
|
|
// "default": 10KB, "dobrynia": 40KB
|
|
|
|
// Some remaining writes so 'default', 'dobrynia' and 'nikitich' flush on
|
|
|
|
// closure.
|
|
|
|
ASSERT_OK(Put(3, Key(1), DummyString(1)));
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
|
|
|
|
options);
|
|
|
|
{
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
|
|
|
|
static_cast<uint64_t>(1));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
|
|
|
|
static_cast<uint64_t>(1));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
|
|
|
|
static_cast<uint64_t>(2));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
|
|
|
|
static_cast<uint64_t>(3));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
INSTANTIATE_TEST_CASE_P(DBTestSharedWriteBufferAcrossCFs,
|
|
|
|
DBTestSharedWriteBufferAcrossCFs, ::testing::Bool());
|
|
|
|
|
|
|
|
TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
|
|
|
|
std::string dbname2 = test::TmpDir(env_) + "/db_shared_wb_db2";
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.write_buffer_size = 500000; // this is never hit
|
|
|
|
options.write_buffer_manager.reset(new WriteBufferManager(100000));
|
|
|
|
CreateAndReopenWithCF({"cf1", "cf2"}, options);
|
|
|
|
|
|
|
|
ASSERT_OK(DestroyDB(dbname2, options));
|
|
|
|
DB* db2 = nullptr;
|
|
|
|
ASSERT_OK(DB::Open(options, dbname2, &db2));
|
|
|
|
|
|
|
|
WriteOptions wo;
|
|
|
|
|
|
|
|
// Trigger a flush on cf2
|
|
|
|
ASSERT_OK(Put(0, Key(1), DummyString(1)));
|
|
|
|
ASSERT_OK(Put(1, Key(1), DummyString(1)));
|
|
|
|
ASSERT_OK(Put(2, Key(1), DummyString(90000)));
|
|
|
|
|
|
|
|
// Insert to DB2
|
|
|
|
ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000)));
|
|
|
|
|
|
|
|
ASSERT_OK(Put(2, Key(1), DummyString(1)));
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
|
|
|
|
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
|
|
|
|
{
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
|
|
|
|
static_cast<uint64_t>(1));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
}
|
|
|
|
|
|
|
|
// db2: 20KB
|
|
|
|
ASSERT_OK(db2->Put(wo, Key(2), DummyString(40000)));
|
|
|
|
ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000)));
|
|
|
|
ASSERT_OK(db2->Put(wo, Key(1), DummyString(1)));
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
|
|
|
|
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
|
|
|
|
{
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
|
|
|
|
static_cast<uint64_t>(1));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
|
|
|
|
static_cast<uint64_t>(1));
|
|
|
|
}
|
|
|
|
|
|
|
|
//
|
|
|
|
// Inserting Data in db2 and db_ triggers flushing in db_.
|
|
|
|
ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000)));
|
|
|
|
ASSERT_OK(Put(2, Key(2), DummyString(45000)));
|
|
|
|
ASSERT_OK(Put(0, Key(1), DummyString(1)));
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
|
|
|
|
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
|
|
|
|
{
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
|
|
|
|
static_cast<uint64_t>(0));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
|
|
|
|
static_cast<uint64_t>(2));
|
|
|
|
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
|
|
|
|
static_cast<uint64_t>(1));
|
|
|
|
}
|
|
|
|
|
|
|
|
delete db2;
|
|
|
|
ASSERT_OK(DestroyDB(dbname2, options));
|
|
|
|
}
|
|
|
|
|
|
|
|
namespace {
|
|
|
|
void ValidateKeyExistence(DB* db, const std::vector<Slice>& keys_must_exist,
|
|
|
|
const std::vector<Slice>& keys_must_not_exist) {
|
|
|
|
// Ensure that expected keys exist
|
|
|
|
std::vector<std::string> values;
|
|
|
|
if (keys_must_exist.size() > 0) {
|
|
|
|
std::vector<Status> status_list =
|
|
|
|
db->MultiGet(ReadOptions(), keys_must_exist, &values);
|
|
|
|
for (size_t i = 0; i < keys_must_exist.size(); i++) {
|
|
|
|
ASSERT_OK(status_list[i]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ensure that given keys don't exist
|
|
|
|
if (keys_must_not_exist.size() > 0) {
|
|
|
|
std::vector<Status> status_list =
|
|
|
|
db->MultiGet(ReadOptions(), keys_must_not_exist, &values);
|
|
|
|
for (size_t i = 0; i < keys_must_not_exist.size(); i++) {
|
|
|
|
ASSERT_TRUE(status_list[i].IsNotFound());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
TEST_F(DBTest2, WalFilterTest) {
|
|
|
|
class TestWalFilter : public WalFilter {
|
|
|
|
private:
|
|
|
|
// Processing option that is requested to be applied at the given index
|
|
|
|
WalFilter::WalProcessingOption wal_processing_option_;
|
|
|
|
// Index at which to apply wal_processing_option_
|
|
|
|
// At other indexes default wal_processing_option::kContinueProcessing is
|
|
|
|
// returned.
|
|
|
|
size_t apply_option_at_record_index_;
|
|
|
|
// Current record index, incremented with each record encountered.
|
|
|
|
size_t current_record_index_;
|
|
|
|
|
|
|
|
public:
|
|
|
|
TestWalFilter(WalFilter::WalProcessingOption wal_processing_option,
|
|
|
|
size_t apply_option_for_record_index)
|
|
|
|
: wal_processing_option_(wal_processing_option),
|
|
|
|
apply_option_at_record_index_(apply_option_for_record_index),
|
|
|
|
current_record_index_(0) {}
|
|
|
|
|
|
|
|
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
|
|
|
|
WriteBatch* new_batch,
|
|
|
|
bool* batch_changed) const override {
|
|
|
|
WalFilter::WalProcessingOption option_to_return;
|
|
|
|
|
|
|
|
if (current_record_index_ == apply_option_at_record_index_) {
|
|
|
|
option_to_return = wal_processing_option_;
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
option_to_return = WalProcessingOption::kContinueProcessing;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Filter is passed as a const object for RocksDB to not modify the
|
|
|
|
// object, however we modify it for our own purpose here and hence
|
|
|
|
// cast the constness away.
|
|
|
|
(const_cast<TestWalFilter*>(this)->current_record_index_)++;
|
|
|
|
|
|
|
|
return option_to_return;
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual const char* Name() const override { return "TestWalFilter"; }
|
|
|
|
};
|
|
|
|
|
|
|
|
// Create 3 batches with two keys each
|
|
|
|
std::vector<std::vector<std::string>> batch_keys(3);
|
|
|
|
|
|
|
|
batch_keys[0].push_back("key1");
|
|
|
|
batch_keys[0].push_back("key2");
|
|
|
|
batch_keys[1].push_back("key3");
|
|
|
|
batch_keys[1].push_back("key4");
|
|
|
|
batch_keys[2].push_back("key5");
|
|
|
|
batch_keys[2].push_back("key6");
|
|
|
|
|
|
|
|
// Test with all WAL processing options
|
|
|
|
for (int option = 0;
|
|
|
|
option < static_cast<int>(
|
|
|
|
WalFilter::WalProcessingOption::kWalProcessingOptionMax);
|
|
|
|
option++) {
|
|
|
|
Options options = OptionsForLogIterTest();
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
CreateAndReopenWithCF({ "pikachu" }, options);
|
|
|
|
|
|
|
|
// Write given keys in given batches
|
|
|
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
|
|
|
WriteBatch batch;
|
|
|
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
|
|
|
batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
|
|
|
|
}
|
|
|
|
dbfull()->Write(WriteOptions(), &batch);
|
|
|
|
}
|
|
|
|
|
|
|
|
WalFilter::WalProcessingOption wal_processing_option =
|
|
|
|
static_cast<WalFilter::WalProcessingOption>(option);
|
|
|
|
|
|
|
|
// Create a test filter that would apply wal_processing_option at the first
|
|
|
|
// record
|
|
|
|
size_t apply_option_for_record_index = 1;
|
|
|
|
TestWalFilter test_wal_filter(wal_processing_option,
|
|
|
|
apply_option_for_record_index);
|
|
|
|
|
|
|
|
// Reopen database with option to use WAL filter
|
|
|
|
options = OptionsForLogIterTest();
|
|
|
|
options.wal_filter = &test_wal_filter;
|
|
|
|
Status status =
|
|
|
|
TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
|
|
|
if (wal_processing_option ==
|
|
|
|
WalFilter::WalProcessingOption::kCorruptedRecord) {
|
|
|
|
assert(!status.ok());
|
|
|
|
// In case of corruption we can turn off paranoid_checks to reopen
|
|
|
|
// databse
|
|
|
|
options.paranoid_checks = false;
|
|
|
|
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
assert(status.ok());
|
|
|
|
}
|
|
|
|
|
|
|
|
// Compute which keys we expect to be found
|
|
|
|
// and which we expect not to be found after recovery.
|
|
|
|
std::vector<Slice> keys_must_exist;
|
|
|
|
std::vector<Slice> keys_must_not_exist;
|
|
|
|
switch (wal_processing_option) {
|
|
|
|
case WalFilter::WalProcessingOption::kCorruptedRecord:
|
|
|
|
case WalFilter::WalProcessingOption::kContinueProcessing: {
|
|
|
|
fprintf(stderr, "Testing with complete WAL processing\n");
|
|
|
|
// we expect all records to be processed
|
|
|
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
|
|
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
|
|
|
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord: {
|
|
|
|
fprintf(stderr,
|
|
|
|
"Testing with ignoring record %" ROCKSDB_PRIszt " only\n",
|
|
|
|
apply_option_for_record_index);
|
|
|
|
// We expect the record with apply_option_for_record_index to be not
|
|
|
|
// found.
|
|
|
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
|
|
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
|
|
|
if (i == apply_option_for_record_index) {
|
|
|
|
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
case WalFilter::WalProcessingOption::kStopReplay: {
|
|
|
|
fprintf(stderr,
|
|
|
|
"Testing with stopping replay from record %" ROCKSDB_PRIszt
|
|
|
|
"\n",
|
|
|
|
apply_option_for_record_index);
|
|
|
|
// We expect records beyond apply_option_for_record_index to be not
|
|
|
|
// found.
|
|
|
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
|
|
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
|
|
|
if (i >= apply_option_for_record_index) {
|
|
|
|
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
assert(false); // unhandled case
|
|
|
|
}
|
|
|
|
|
|
|
|
bool checked_after_reopen = false;
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
// Ensure that expected keys exists
|
|
|
|
// and not expected keys don't exist after recovery
|
|
|
|
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
|
|
|
|
|
|
|
|
if (checked_after_reopen) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
// reopen database again to make sure previous log(s) are not used
|
|
|
|
//(even if they were skipped)
|
|
|
|
// reopn database with option to use WAL filter
|
|
|
|
options = OptionsForLogIterTest();
|
|
|
|
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
|
|
|
|
|
|
|
checked_after_reopen = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(DBTest2, WalFilterTestWithChangeBatch) {
|
|
|
|
class ChangeBatchHandler : public WriteBatch::Handler {
|
|
|
|
private:
|
|
|
|
// Batch to insert keys in
|
|
|
|
WriteBatch* new_write_batch_;
|
|
|
|
// Number of keys to add in the new batch
|
|
|
|
size_t num_keys_to_add_in_new_batch_;
|
|
|
|
// Number of keys added to new batch
|
|
|
|
size_t num_keys_added_;
|
|
|
|
|
|
|
|
public:
|
|
|
|
ChangeBatchHandler(WriteBatch* new_write_batch,
|
|
|
|
size_t num_keys_to_add_in_new_batch)
|
|
|
|
: new_write_batch_(new_write_batch),
|
|
|
|
num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
|
|
|
|
num_keys_added_(0) {}
|
|
|
|
virtual void Put(const Slice& key, const Slice& value) override {
|
|
|
|
if (num_keys_added_ < num_keys_to_add_in_new_batch_) {
|
|
|
|
new_write_batch_->Put(key, value);
|
|
|
|
++num_keys_added_;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
class TestWalFilterWithChangeBatch : public WalFilter {
|
|
|
|
private:
|
|
|
|
// Index at which to start changing records
|
|
|
|
size_t change_records_from_index_;
|
|
|
|
// Number of keys to add in the new batch
|
|
|
|
size_t num_keys_to_add_in_new_batch_;
|
|
|
|
// Current record index, incremented with each record encountered.
|
|
|
|
size_t current_record_index_;
|
|
|
|
|
|
|
|
public:
|
|
|
|
TestWalFilterWithChangeBatch(size_t change_records_from_index,
|
|
|
|
size_t num_keys_to_add_in_new_batch)
|
|
|
|
: change_records_from_index_(change_records_from_index),
|
|
|
|
num_keys_to_add_in_new_batch_(num_keys_to_add_in_new_batch),
|
|
|
|
current_record_index_(0) {}
|
|
|
|
|
|
|
|
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
|
|
|
|
WriteBatch* new_batch,
|
|
|
|
bool* batch_changed) const override {
|
|
|
|
if (current_record_index_ >= change_records_from_index_) {
|
|
|
|
ChangeBatchHandler handler(new_batch, num_keys_to_add_in_new_batch_);
|
|
|
|
batch.Iterate(&handler);
|
|
|
|
*batch_changed = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Filter is passed as a const object for RocksDB to not modify the
|
|
|
|
// object, however we modify it for our own purpose here and hence
|
|
|
|
// cast the constness away.
|
|
|
|
(const_cast<TestWalFilterWithChangeBatch*>(this)
|
|
|
|
->current_record_index_)++;
|
|
|
|
|
|
|
|
return WalProcessingOption::kContinueProcessing;
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual const char* Name() const override {
|
|
|
|
return "TestWalFilterWithChangeBatch";
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
std::vector<std::vector<std::string>> batch_keys(3);
|
|
|
|
|
|
|
|
batch_keys[0].push_back("key1");
|
|
|
|
batch_keys[0].push_back("key2");
|
|
|
|
batch_keys[1].push_back("key3");
|
|
|
|
batch_keys[1].push_back("key4");
|
|
|
|
batch_keys[2].push_back("key5");
|
|
|
|
batch_keys[2].push_back("key6");
|
|
|
|
|
|
|
|
Options options = OptionsForLogIterTest();
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
CreateAndReopenWithCF({ "pikachu" }, options);
|
|
|
|
|
|
|
|
// Write given keys in given batches
|
|
|
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
|
|
|
WriteBatch batch;
|
|
|
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
|
|
|
batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
|
|
|
|
}
|
|
|
|
dbfull()->Write(WriteOptions(), &batch);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create a test filter that would apply wal_processing_option at the first
|
|
|
|
// record
|
|
|
|
size_t change_records_from_index = 1;
|
|
|
|
size_t num_keys_to_add_in_new_batch = 1;
|
|
|
|
TestWalFilterWithChangeBatch test_wal_filter_with_change_batch(
|
|
|
|
change_records_from_index, num_keys_to_add_in_new_batch);
|
|
|
|
|
|
|
|
// Reopen database with option to use WAL filter
|
|
|
|
options = OptionsForLogIterTest();
|
|
|
|
options.wal_filter = &test_wal_filter_with_change_batch;
|
|
|
|
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
|
|
|
|
|
|
|
// Ensure that all keys exist before change_records_from_index_
|
|
|
|
// And after that index only single key exists
|
|
|
|
// as our filter adds only single key for each batch
|
|
|
|
std::vector<Slice> keys_must_exist;
|
|
|
|
std::vector<Slice> keys_must_not_exist;
|
|
|
|
|
|
|
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
|
|
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
|
|
|
if (i >= change_records_from_index && j >= num_keys_to_add_in_new_batch) {
|
|
|
|
keys_must_not_exist.push_back(Slice(batch_keys[i][j]));
|
|
|
|
}
|
|
|
|
else {
|
|
|
|
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
bool checked_after_reopen = false;
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
// Ensure that expected keys exists
|
|
|
|
// and not expected keys don't exist after recovery
|
|
|
|
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
|
|
|
|
|
|
|
|
if (checked_after_reopen) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
// reopen database again to make sure previous log(s) are not used
|
|
|
|
//(even if they were skipped)
|
|
|
|
// reopn database with option to use WAL filter
|
|
|
|
options = OptionsForLogIterTest();
|
|
|
|
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
|
|
|
|
|
|
|
checked_after_reopen = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(DBTest2, WalFilterTestWithChangeBatchExtraKeys) {
|
|
|
|
class TestWalFilterWithChangeBatchAddExtraKeys : public WalFilter {
|
|
|
|
public:
|
|
|
|
virtual WalProcessingOption LogRecord(const WriteBatch& batch,
|
|
|
|
WriteBatch* new_batch,
|
|
|
|
bool* batch_changed) const override {
|
|
|
|
*new_batch = batch;
|
|
|
|
new_batch->Put("key_extra", "value_extra");
|
|
|
|
*batch_changed = true;
|
|
|
|
return WalProcessingOption::kContinueProcessing;
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual const char* Name() const override {
|
|
|
|
return "WalFilterTestWithChangeBatchExtraKeys";
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
std::vector<std::vector<std::string>> batch_keys(3);
|
|
|
|
|
|
|
|
batch_keys[0].push_back("key1");
|
|
|
|
batch_keys[0].push_back("key2");
|
|
|
|
batch_keys[1].push_back("key3");
|
|
|
|
batch_keys[1].push_back("key4");
|
|
|
|
batch_keys[2].push_back("key5");
|
|
|
|
batch_keys[2].push_back("key6");
|
|
|
|
|
|
|
|
Options options = OptionsForLogIterTest();
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
CreateAndReopenWithCF({ "pikachu" }, options);
|
|
|
|
|
|
|
|
// Write given keys in given batches
|
|
|
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
|
|
|
WriteBatch batch;
|
|
|
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
|
|
|
batch.Put(handles_[0], batch_keys[i][j], DummyString(1024));
|
|
|
|
}
|
|
|
|
dbfull()->Write(WriteOptions(), &batch);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create a test filter that would add extra keys
|
|
|
|
TestWalFilterWithChangeBatchAddExtraKeys test_wal_filter_extra_keys;
|
|
|
|
|
|
|
|
// Reopen database with option to use WAL filter
|
|
|
|
options = OptionsForLogIterTest();
|
|
|
|
options.wal_filter = &test_wal_filter_extra_keys;
|
|
|
|
Status status = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
|
|
|
|
ASSERT_TRUE(status.IsNotSupported());
|
|
|
|
|
|
|
|
// Reopen without filter, now reopen should succeed - previous
|
|
|
|
// attempt to open must not have altered the db.
|
|
|
|
options = OptionsForLogIterTest();
|
|
|
|
ReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
|
|
|
|
|
|
|
std::vector<Slice> keys_must_exist;
|
|
|
|
std::vector<Slice> keys_must_not_exist; // empty vector
|
|
|
|
|
|
|
|
for (size_t i = 0; i < batch_keys.size(); i++) {
|
|
|
|
for (size_t j = 0; j < batch_keys[i].size(); j++) {
|
|
|
|
keys_must_exist.push_back(Slice(batch_keys[i][j]));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(DBTest2, WalFilterTestWithColumnFamilies) {
|
|
|
|
class TestWalFilterWithColumnFamilies : public WalFilter {
|
|
|
|
private:
|
|
|
|
// column_family_id -> log_number map (provided to WALFilter)
|
|
|
|
std::map<uint32_t, uint64_t> cf_log_number_map_;
|
|
|
|
// column_family_name -> column_family_id map (provided to WALFilter)
|
|
|
|
std::map<std::string, uint32_t> cf_name_id_map_;
|
|
|
|
// column_family_name -> keys_found_in_wal map
|
|
|
|
// We store keys that are applicable to the column_family
|
|
|
|
// during recovery (i.e. aren't already flushed to SST file(s))
|
|
|
|
// for verification against the keys we expect.
|
|
|
|
std::map<uint32_t, std::vector<std::string>> cf_wal_keys_;
|
|
|
|
public:
|
|
|
|
virtual void ColumnFamilyLogNumberMap(
|
|
|
|
const std::map<uint32_t, uint64_t>& cf_lognumber_map,
|
|
|
|
const std::map<std::string, uint32_t>& cf_name_id_map) override {
|
|
|
|
cf_log_number_map_ = cf_lognumber_map;
|
|
|
|
cf_name_id_map_ = cf_name_id_map;
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual WalProcessingOption LogRecordFound(unsigned long long log_number,
|
|
|
|
const std::string& log_file_name,
|
|
|
|
const WriteBatch& batch,
|
|
|
|
WriteBatch* new_batch,
|
|
|
|
bool* batch_changed) override {
|
|
|
|
class LogRecordBatchHandler : public WriteBatch::Handler {
|
|
|
|
private:
|
|
|
|
const std::map<uint32_t, uint64_t> & cf_log_number_map_;
|
|
|
|
std::map<uint32_t, std::vector<std::string>> & cf_wal_keys_;
|
|
|
|
unsigned long long log_number_;
|
|
|
|
public:
|
|
|
|
LogRecordBatchHandler(unsigned long long current_log_number,
|
|
|
|
const std::map<uint32_t, uint64_t> & cf_log_number_map,
|
|
|
|
std::map<uint32_t, std::vector<std::string>> & cf_wal_keys) :
|
|
|
|
cf_log_number_map_(cf_log_number_map),
|
|
|
|
cf_wal_keys_(cf_wal_keys),
|
|
|
|
log_number_(current_log_number){}
|
|
|
|
|
|
|
|
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
|
|
|
|
const Slice& /*value*/) override {
|
|
|
|
auto it = cf_log_number_map_.find(column_family_id);
|
|
|
|
assert(it != cf_log_number_map_.end());
|
|
|
|
unsigned long long log_number_for_cf = it->second;
|
|
|
|
// If the current record is applicable for column_family_id
|
|
|
|
// (i.e. isn't flushed to SST file(s) for column_family_id)
|
|
|
|
// add it to the cf_wal_keys_ map for verification.
|
|
|
|
if (log_number_ >= log_number_for_cf) {
|
|
|
|
cf_wal_keys_[column_family_id].push_back(std::string(key.data(),
|
|
|
|
key.size()));
|
|
|
|
}
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
} handler(log_number, cf_log_number_map_, cf_wal_keys_);
|
|
|
|
|
|
|
|
batch.Iterate(&handler);
|
|
|
|
|
|
|
|
return WalProcessingOption::kContinueProcessing;
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual const char* Name() const override {
|
|
|
|
return "WalFilterTestWithColumnFamilies";
|
|
|
|
}
|
|
|
|
|
|
|
|
const std::map<uint32_t, std::vector<std::string>>& GetColumnFamilyKeys() {
|
|
|
|
return cf_wal_keys_;
|
|
|
|
}
|
|
|
|
|
|
|
|
const std::map<std::string, uint32_t> & GetColumnFamilyNameIdMap() {
|
|
|
|
return cf_name_id_map_;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
std::vector<std::vector<std::string>> batch_keys_pre_flush(3);
|
|
|
|
|
|
|
|
batch_keys_pre_flush[0].push_back("key1");
|
|
|
|
batch_keys_pre_flush[0].push_back("key2");
|
|
|
|
batch_keys_pre_flush[1].push_back("key3");
|
|
|
|
batch_keys_pre_flush[1].push_back("key4");
|
|
|
|
batch_keys_pre_flush[2].push_back("key5");
|
|
|
|
batch_keys_pre_flush[2].push_back("key6");
|
|
|
|
|
|
|
|
Options options = OptionsForLogIterTest();
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
CreateAndReopenWithCF({ "pikachu" }, options);
|
|
|
|
|
|
|
|
// Write given keys in given batches
|
|
|
|
for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
|
|
|
|
WriteBatch batch;
|
|
|
|
for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
|
|
|
|
batch.Put(handles_[0], batch_keys_pre_flush[i][j], DummyString(1024));
|
|
|
|
batch.Put(handles_[1], batch_keys_pre_flush[i][j], DummyString(1024));
|
|
|
|
}
|
|
|
|
dbfull()->Write(WriteOptions(), &batch);
|
|
|
|
}
|
|
|
|
|
|
|
|
//Flush default column-family
|
|
|
|
db_->Flush(FlushOptions(), handles_[0]);
|
|
|
|
|
|
|
|
// Do some more writes
|
|
|
|
std::vector<std::vector<std::string>> batch_keys_post_flush(3);
|
|
|
|
|
|
|
|
batch_keys_post_flush[0].push_back("key7");
|
|
|
|
batch_keys_post_flush[0].push_back("key8");
|
|
|
|
batch_keys_post_flush[1].push_back("key9");
|
|
|
|
batch_keys_post_flush[1].push_back("key10");
|
|
|
|
batch_keys_post_flush[2].push_back("key11");
|
|
|
|
batch_keys_post_flush[2].push_back("key12");
|
|
|
|
|
|
|
|
// Write given keys in given batches
|
|
|
|
for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
|
|
|
|
WriteBatch batch;
|
|
|
|
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
|
|
|
|
batch.Put(handles_[0], batch_keys_post_flush[i][j], DummyString(1024));
|
|
|
|
batch.Put(handles_[1], batch_keys_post_flush[i][j], DummyString(1024));
|
|
|
|
}
|
|
|
|
dbfull()->Write(WriteOptions(), &batch);
|
|
|
|
}
|
|
|
|
|
|
|
|
// On Recovery we should only find the second batch applicable to default CF
|
|
|
|
// But both batches applicable to pikachu CF
|
|
|
|
|
|
|
|
// Create a test filter that would add extra keys
|
|
|
|
TestWalFilterWithColumnFamilies test_wal_filter_column_families;
|
|
|
|
|
|
|
|
// Reopen database with option to use WAL filter
|
|
|
|
options = OptionsForLogIterTest();
|
|
|
|
options.wal_filter = &test_wal_filter_column_families;
|
|
|
|
Status status =
|
|
|
|
TryReopenWithColumnFamilies({ "default", "pikachu" }, options);
|
|
|
|
ASSERT_TRUE(status.ok());
|
|
|
|
|
|
|
|
// verify that handles_[0] only has post_flush keys
|
|
|
|
// while handles_[1] has pre and post flush keys
|
|
|
|
auto cf_wal_keys = test_wal_filter_column_families.GetColumnFamilyKeys();
|
|
|
|
auto name_id_map = test_wal_filter_column_families.GetColumnFamilyNameIdMap();
|
|
|
|
size_t index = 0;
|
|
|
|
auto keys_cf = cf_wal_keys[name_id_map[kDefaultColumnFamilyName]];
|
|
|
|
//default column-family, only post_flush keys are expected
|
|
|
|
for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
|
|
|
|
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
|
|
|
|
Slice key_from_the_log(keys_cf[index++]);
|
|
|
|
Slice batch_key(batch_keys_post_flush[i][j]);
|
|
|
|
ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ASSERT_TRUE(index == keys_cf.size());
|
|
|
|
|
|
|
|
index = 0;
|
|
|
|
keys_cf = cf_wal_keys[name_id_map["pikachu"]];
|
|
|
|
//pikachu column-family, all keys are expected
|
|
|
|
for (size_t i = 0; i < batch_keys_pre_flush.size(); i++) {
|
|
|
|
for (size_t j = 0; j < batch_keys_pre_flush[i].size(); j++) {
|
|
|
|
Slice key_from_the_log(keys_cf[index++]);
|
|
|
|
Slice batch_key(batch_keys_pre_flush[i][j]);
|
|
|
|
ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for (size_t i = 0; i < batch_keys_post_flush.size(); i++) {
|
|
|
|
for (size_t j = 0; j < batch_keys_post_flush[i].size(); j++) {
|
|
|
|
Slice key_from_the_log(keys_cf[index++]);
|
|
|
|
Slice batch_key(batch_keys_post_flush[i][j]);
|
|
|
|
ASSERT_TRUE(key_from_the_log.compare(batch_key) == 0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ASSERT_TRUE(index == keys_cf.size());
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_F(DBTest2, PresetCompressionDict) {
|
|
|
|
const size_t kBlockSizeBytes = 4 << 10;
|
|
|
|
const size_t kL0FileBytes = 128 << 10;
|
|
|
|
const size_t kApproxPerBlockOverheadBytes = 50;
|
|
|
|
const int kNumL0Files = 5;
|
|
|
|
|
|
|
|
Options options;
|
|
|
|
options.arena_block_size = kBlockSizeBytes;
|
|
|
|
options.compaction_style = kCompactionStyleUniversal;
|
|
|
|
options.create_if_missing = true;
|
|
|
|
options.disable_auto_compactions = true;
|
|
|
|
options.level0_file_num_compaction_trigger = kNumL0Files;
|
|
|
|
options.memtable_factory.reset(
|
|
|
|
new SpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes));
|
|
|
|
options.num_levels = 2;
|
|
|
|
options.target_file_size_base = kL0FileBytes;
|
|
|
|
options.target_file_size_multiplier = 2;
|
|
|
|
options.write_buffer_size = kL0FileBytes;
|
|
|
|
BlockBasedTableOptions table_options;
|
|
|
|
table_options.block_size = kBlockSizeBytes;
|
|
|
|
std::vector<CompressionType> compression_types;
|
|
|
|
if (Zlib_Supported()) {
|
|
|
|
compression_types.push_back(kZlibCompression);
|
|
|
|
}
|
|
|
|
#if LZ4_VERSION_NUMBER >= 10400 // r124+
|
|
|
|
compression_types.push_back(kLZ4Compression);
|
|
|
|
compression_types.push_back(kLZ4HCCompression);
|
|
|
|
#endif // LZ4_VERSION_NUMBER >= 10400
|
|
|
|
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
|
|
|
|
compression_types.push_back(kZSTDNotFinalCompression);
|
|
|
|
#endif // ZSTD_VERSION_NUMBER >= 500
|
|
|
|
|
|
|
|
for (auto compression_type : compression_types) {
|
|
|
|
options.compression = compression_type;
|
|
|
|
size_t prev_out_bytes;
|
|
|
|
for (int i = 0; i < 2; ++i) {
|
|
|
|
// First iteration: compress without preset dictionary
|
|
|
|
// Second iteration: compress with preset dictionary
|
|
|
|
// To make sure the compression dictionary was actually used, we verify
|
|
|
|
// the compressed size is smaller in the second iteration. Also in the
|
|
|
|
// second iteration, verify the data we get out is the same data we put
|
|
|
|
// in.
|
|
|
|
if (i) {
|
|
|
|
options.compression_opts.max_dict_bytes = kBlockSizeBytes;
|
|
|
|
} else {
|
|
|
|
options.compression_opts.max_dict_bytes = 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
options.statistics = rocksdb::CreateDBStatistics();
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
Random rnd(301);
|
|
|
|
std::string seq_data =
|
|
|
|
RandomString(&rnd, kBlockSizeBytes - kApproxPerBlockOverheadBytes);
|
|
|
|
|
|
|
|
ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
|
|
|
|
for (int j = 0; j < kNumL0Files; ++j) {
|
|
|
|
for (size_t k = 0; k < kL0FileBytes / kBlockSizeBytes + 1; ++k) {
|
|
|
|
ASSERT_OK(Put(1, Key(static_cast<int>(
|
|
|
|
j * (kL0FileBytes / kBlockSizeBytes) + k)),
|
|
|
|
seq_data));
|
|
|
|
}
|
|
|
|
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
|
|
|
|
ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1));
|
|
|
|
}
|
|
|
|
db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
|
|
|
|
ASSERT_EQ(0, NumTableFilesAtLevel(0, 1));
|
|
|
|
ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
|
|
|
|
|
|
|
|
size_t out_bytes = 0;
|
|
|
|
std::vector<std::string> files;
|
|
|
|
GetSstFiles(dbname_, &files);
|
|
|
|
for (const auto& file : files) {
|
|
|
|
uint64_t curr_bytes;
|
|
|
|
env_->GetFileSize(dbname_ + "/" + file, &curr_bytes);
|
|
|
|
out_bytes += static_cast<size_t>(curr_bytes);
|
|
|
|
}
|
|
|
|
|
|
|
|
for (size_t j = 0; j < kNumL0Files * (kL0FileBytes / kBlockSizeBytes);
|
|
|
|
j++) {
|
|
|
|
ASSERT_EQ(seq_data, Get(1, Key(static_cast<int>(j))));
|
|
|
|
}
|
|
|
|
if (i) {
|
|
|
|
ASSERT_GT(prev_out_bytes, out_bytes);
|
|
|
|
}
|
|
|
|
prev_out_bytes = out_bytes;
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
Adding pin_l0_filter_and_index_blocks_in_cache feature and related fixes.
Summary:
When a block based table file is opened, if prefetch_index_and_filter is true, it will prefetch the index and filter blocks, putting them into the block cache.
What this feature adds: when a L0 block based table file is opened, if pin_l0_filter_and_index_blocks_in_cache is true in the options (and prefetch_index_and_filter is true), then the filter and index blocks aren't released back to the block cache at the end of BlockBasedTableReader::Open(). Instead the table reader takes ownership of them, hence pinning them, ie. the LRU cache will never push them out. Meanwhile in the table reader, further accesses will not hit the block cache, thus avoiding lock contention.
Test Plan:
'export TEST_TMPDIR=/dev/shm/ && DISABLE_JEMALLOC=1 OPT=-g make all valgrind_check -j32' is OK.
I didn't run the Java tests, I don't have Java set up on my devserver.
Reviewers: sdong
Reviewed By: sdong
Subscribers: andrewkr, dhruba
Differential Revision: https://reviews.facebook.net/D56133
9 years ago
|
|
|
|
|
|
|
class CompactionCompressionListener : public EventListener {
|
|
|
|
public:
|
|
|
|
explicit CompactionCompressionListener(Options* db_options)
|
|
|
|
: db_options_(db_options) {}
|
|
|
|
|
|
|
|
void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
|
|
|
|
// Figure out last level with files
|
|
|
|
int bottommost_level = 0;
|
|
|
|
for (int level = 0; level < db->NumberLevels(); level++) {
|
|
|
|
std::string files_at_level;
|
|
|
|
ASSERT_TRUE(
|
|
|
|
db->GetProperty("rocksdb.num-files-at-level" + NumberToString(level),
|
|
|
|
&files_at_level));
|
|
|
|
if (files_at_level != "0") {
|
|
|
|
bottommost_level = level;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (db_options_->bottommost_compression != kDisableCompressionOption &&
|
|
|
|
ci.output_level == bottommost_level && ci.output_level >= 2) {
|
|
|
|
ASSERT_EQ(ci.compression, db_options_->bottommost_compression);
|
|
|
|
} else if (db_options_->compression_per_level.size() != 0) {
|
|
|
|
ASSERT_EQ(ci.compression,
|
|
|
|
db_options_->compression_per_level[ci.output_level]);
|
|
|
|
} else {
|
|
|
|
ASSERT_EQ(ci.compression, db_options_->compression);
|
|
|
|
}
|
|
|
|
max_level_checked = std::max(max_level_checked, ci.output_level);
|
|
|
|
}
|
|
|
|
|
|
|
|
int max_level_checked = 0;
|
|
|
|
const Options* db_options_;
|
|
|
|
};
|
|
|
|
|
|
|
|
TEST_F(DBTest2, CompressionOptions) {
|
|
|
|
if (!Zlib_Supported() || !Snappy_Supported()) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.level0_file_num_compaction_trigger = 2;
|
|
|
|
options.max_bytes_for_level_base = 100;
|
|
|
|
options.max_bytes_for_level_multiplier = 2;
|
|
|
|
options.num_levels = 7;
|
|
|
|
options.max_background_compactions = 1;
|
|
|
|
options.base_background_compactions = 1;
|
|
|
|
|
|
|
|
CompactionCompressionListener* listener =
|
|
|
|
new CompactionCompressionListener(&options);
|
|
|
|
options.listeners.emplace_back(listener);
|
|
|
|
|
|
|
|
const int kKeySize = 5;
|
|
|
|
const int kValSize = 20;
|
|
|
|
Random rnd(301);
|
|
|
|
|
|
|
|
for (int iter = 0; iter <= 2; iter++) {
|
|
|
|
listener->max_level_checked = 0;
|
|
|
|
|
|
|
|
if (iter == 0) {
|
|
|
|
// Use different compression algorithms for different levels but
|
|
|
|
// always use Zlib for bottommost level
|
|
|
|
options.compression_per_level = {kNoCompression, kNoCompression,
|
|
|
|
kNoCompression, kSnappyCompression,
|
|
|
|
kSnappyCompression, kSnappyCompression,
|
|
|
|
kZlibCompression};
|
|
|
|
options.compression = kNoCompression;
|
|
|
|
options.bottommost_compression = kZlibCompression;
|
|
|
|
} else if (iter == 1) {
|
|
|
|
// Use Snappy except for bottommost level use ZLib
|
|
|
|
options.compression_per_level = {};
|
|
|
|
options.compression = kSnappyCompression;
|
|
|
|
options.bottommost_compression = kZlibCompression;
|
|
|
|
} else if (iter == 2) {
|
|
|
|
// Use Snappy everywhere
|
|
|
|
options.compression_per_level = {};
|
|
|
|
options.compression = kSnappyCompression;
|
|
|
|
options.bottommost_compression = kDisableCompressionOption;
|
|
|
|
}
|
|
|
|
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
// Write 10 random files
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
|
for (int j = 0; j < 5; j++) {
|
|
|
|
ASSERT_OK(
|
|
|
|
Put(RandomString(&rnd, kKeySize), RandomString(&rnd, kValSize)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
dbfull()->TEST_WaitForCompact();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Make sure that we wrote enough to check all 7 levels
|
|
|
|
ASSERT_EQ(listener->max_level_checked, 6);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
class CompactionStallTestListener : public EventListener {
|
|
|
|
public:
|
|
|
|
CompactionStallTestListener() : compacted_files_cnt_(0) {}
|
|
|
|
|
|
|
|
void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
|
|
|
|
ASSERT_EQ(ci.cf_name, "default");
|
|
|
|
ASSERT_EQ(ci.base_input_level, 0);
|
|
|
|
ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum);
|
|
|
|
compacted_files_cnt_ += ci.input_files.size();
|
|
|
|
}
|
|
|
|
std::atomic<size_t> compacted_files_cnt_;
|
|
|
|
};
|
|
|
|
|
|
|
|
TEST_F(DBTest2, CompactionStall) {
|
|
|
|
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
|
|
|
{{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:0"},
|
|
|
|
{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:1"},
|
|
|
|
{"DBTest2::CompactionStall:2",
|
|
|
|
"DBImpl::NotifyOnCompactionCompleted::UnlockMutex"}});
|
|
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.level0_file_num_compaction_trigger = 4;
|
|
|
|
options.max_background_compactions = 40;
|
|
|
|
CompactionStallTestListener* listener = new CompactionStallTestListener();
|
|
|
|
options.listeners.emplace_back(listener);
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
Random rnd(301);
|
|
|
|
|
|
|
|
// 4 Files in L0
|
|
|
|
for (int i = 0; i < 4; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait for compaction to be triggered
|
|
|
|
TEST_SYNC_POINT("DBTest2::CompactionStall:0");
|
|
|
|
|
|
|
|
// Clear "DBImpl::BGWorkCompaction" SYNC_POINT since we want to hold it again
|
|
|
|
// at DBTest2::CompactionStall::1
|
|
|
|
rocksdb::SyncPoint::GetInstance()->ClearTrace();
|
|
|
|
|
|
|
|
// Another 6 L0 files to trigger compaction again
|
|
|
|
for (int i = 0; i < 6; i++) {
|
|
|
|
for (int j = 0; j < 10; j++) {
|
|
|
|
ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10)));
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait for another compaction to be triggered
|
|
|
|
TEST_SYNC_POINT("DBTest2::CompactionStall:1");
|
|
|
|
|
|
|
|
// Hold NotifyOnCompactionCompleted in the unlock mutex section
|
|
|
|
TEST_SYNC_POINT("DBTest2::CompactionStall:2");
|
|
|
|
|
|
|
|
dbfull()->TEST_WaitForCompact();
|
|
|
|
ASSERT_LT(NumTableFilesAtLevel(0),
|
|
|
|
options.level0_file_num_compaction_trigger);
|
|
|
|
ASSERT_GT(listener->compacted_files_cnt_.load(),
|
|
|
|
10 - options.level0_file_num_compaction_trigger);
|
|
|
|
|
|
|
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
|
|
|
}
|
|
|
|
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
|
|
|
|
TEST_F(DBTest2, FirstSnapshotTest) {
|
|
|
|
Options options;
|
|
|
|
options.write_buffer_size = 100000; // Small write buffer
|
|
|
|
options = CurrentOptions(options);
|
|
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
|
|
|
|
// This snapshot will have sequence number 0 what is expected behaviour.
|
|
|
|
const Snapshot* s1 = db_->GetSnapshot();
|
|
|
|
|
|
|
|
Put(1, "k1", std::string(100000, 'x')); // Fill memtable
|
|
|
|
Put(1, "k2", std::string(100000, 'y')); // Trigger flush
|
|
|
|
|
|
|
|
db_->ReleaseSnapshot(s1);
|
|
|
|
}
|
|
|
|
|
Adding pin_l0_filter_and_index_blocks_in_cache feature and related fixes.
Summary:
When a block based table file is opened, if prefetch_index_and_filter is true, it will prefetch the index and filter blocks, putting them into the block cache.
What this feature adds: when a L0 block based table file is opened, if pin_l0_filter_and_index_blocks_in_cache is true in the options (and prefetch_index_and_filter is true), then the filter and index blocks aren't released back to the block cache at the end of BlockBasedTableReader::Open(). Instead the table reader takes ownership of them, hence pinning them, ie. the LRU cache will never push them out. Meanwhile in the table reader, further accesses will not hit the block cache, thus avoiding lock contention.
Test Plan:
'export TEST_TMPDIR=/dev/shm/ && DISABLE_JEMALLOC=1 OPT=-g make all valgrind_check -j32' is OK.
I didn't run the Java tests, I don't have Java set up on my devserver.
Reviewers: sdong
Reviewed By: sdong
Subscribers: andrewkr, dhruba
Differential Revision: https://reviews.facebook.net/D56133
9 years ago
|
|
|
class PinL0IndexAndFilterBlocksTest : public DBTestBase,
|
|
|
|
public testing::WithParamInterface<bool> {
|
|
|
|
public:
|
|
|
|
PinL0IndexAndFilterBlocksTest() : DBTestBase("/db_pin_l0_index_bloom_test") {}
|
|
|
|
virtual void SetUp() override { infinite_max_files_ = GetParam(); }
|
|
|
|
|
|
|
|
void CreateTwoLevels(Options* options) {
|
|
|
|
if (infinite_max_files_) {
|
|
|
|
options->max_open_files = -1;
|
|
|
|
}
|
|
|
|
options->create_if_missing = true;
|
|
|
|
options->statistics = rocksdb::CreateDBStatistics();
|
|
|
|
BlockBasedTableOptions table_options;
|
|
|
|
table_options.cache_index_and_filter_blocks = true;
|
|
|
|
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
|
|
|
|
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
|
|
|
|
options->table_factory.reset(new BlockBasedTableFactory(table_options));
|
|
|
|
CreateAndReopenWithCF({"pikachu"}, *options);
|
|
|
|
|
|
|
|
Put(1, "a", "begin");
|
|
|
|
Put(1, "z", "end");
|
|
|
|
ASSERT_OK(Flush(1));
|
|
|
|
// move this table to L1
|
|
|
|
dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
|
|
|
|
|
|
|
|
// reset block cache
|
|
|
|
table_options.block_cache = NewLRUCache(64 * 1024);
|
|
|
|
options->table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
|
|
TryReopenWithColumnFamilies({"default", "pikachu"}, *options);
|
|
|
|
// create new table at L0
|
|
|
|
Put(1, "a2", "begin2");
|
|
|
|
Put(1, "z2", "end2");
|
|
|
|
ASSERT_OK(Flush(1));
|
|
|
|
|
|
|
|
table_options.block_cache->EraseUnRefEntries();
|
|
|
|
}
|
|
|
|
|
Adding pin_l0_filter_and_index_blocks_in_cache feature and related fixes.
Summary:
When a block based table file is opened, if prefetch_index_and_filter is true, it will prefetch the index and filter blocks, putting them into the block cache.
What this feature adds: when a L0 block based table file is opened, if pin_l0_filter_and_index_blocks_in_cache is true in the options (and prefetch_index_and_filter is true), then the filter and index blocks aren't released back to the block cache at the end of BlockBasedTableReader::Open(). Instead the table reader takes ownership of them, hence pinning them, ie. the LRU cache will never push them out. Meanwhile in the table reader, further accesses will not hit the block cache, thus avoiding lock contention.
Test Plan:
'export TEST_TMPDIR=/dev/shm/ && DISABLE_JEMALLOC=1 OPT=-g make all valgrind_check -j32' is OK.
I didn't run the Java tests, I don't have Java set up on my devserver.
Reviewers: sdong
Reviewed By: sdong
Subscribers: andrewkr, dhruba
Differential Revision: https://reviews.facebook.net/D56133
9 years ago
|
|
|
bool infinite_max_files_;
|
|
|
|
};
|
|
|
|
|
|
|
|
TEST_P(PinL0IndexAndFilterBlocksTest,
|
|
|
|
IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
if (infinite_max_files_) {
|
|
|
|
options.max_open_files = -1;
|
|
|
|
}
|
|
|
|
options.create_if_missing = true;
|
|
|
|
options.statistics = rocksdb::CreateDBStatistics();
|
|
|
|
BlockBasedTableOptions table_options;
|
|
|
|
table_options.cache_index_and_filter_blocks = true;
|
|
|
|
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
|
|
|
|
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
|
|
|
|
options.table_factory.reset(new BlockBasedTableFactory(table_options));
|
|
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
|
|
|
|
ASSERT_OK(Put(1, "key", "val"));
|
|
|
|
// Create a new table.
|
|
|
|
ASSERT_OK(Flush(1));
|
|
|
|
|
|
|
|
// index/filter blocks added to block cache right after table creation.
|
|
|
|
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
|
|
|
|
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
|
|
|
|
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
|
|
|
|
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
|
|
|
|
|
|
|
|
// only index/filter were added
|
|
|
|
ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD));
|
|
|
|
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS));
|
|
|
|
|
|
|
|
std::string value;
|
|
|
|
// Miss and hit count should remain the same, they're all pinned.
|
|
|
|
db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value);
|
|
|
|
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
|
|
|
|
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
|
|
|
|
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
|
|
|
|
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
|
|
|
|
|
|
|
|
// Miss and hit count should remain the same, they're all pinned.
|
|
|
|
value = Get(1, "key");
|
|
|
|
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
|
|
|
|
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
|
|
|
|
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
|
|
|
|
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_P(PinL0IndexAndFilterBlocksTest,
|
|
|
|
MultiLevelIndexAndFilterBlocksCachedWithPinning) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options);
|
Adding pin_l0_filter_and_index_blocks_in_cache feature and related fixes.
Summary:
When a block based table file is opened, if prefetch_index_and_filter is true, it will prefetch the index and filter blocks, putting them into the block cache.
What this feature adds: when a L0 block based table file is opened, if pin_l0_filter_and_index_blocks_in_cache is true in the options (and prefetch_index_and_filter is true), then the filter and index blocks aren't released back to the block cache at the end of BlockBasedTableReader::Open(). Instead the table reader takes ownership of them, hence pinning them, ie. the LRU cache will never push them out. Meanwhile in the table reader, further accesses will not hit the block cache, thus avoiding lock contention.
Test Plan:
'export TEST_TMPDIR=/dev/shm/ && DISABLE_JEMALLOC=1 OPT=-g make all valgrind_check -j32' is OK.
I didn't run the Java tests, I don't have Java set up on my devserver.
Reviewers: sdong
Reviewed By: sdong
Subscribers: andrewkr, dhruba
Differential Revision: https://reviews.facebook.net/D56133
9 years ago
|
|
|
// get base cache values
|
|
|
|
uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
|
|
|
|
uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
|
|
|
|
uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
|
|
|
|
uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
|
|
|
|
|
|
|
|
std::string value;
|
|
|
|
// this should be read from L0
|
|
|
|
// so cache values don't change
|
|
|
|
value = Get(1, "a2");
|
|
|
|
ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
|
|
|
|
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
|
|
|
|
ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
|
|
|
|
ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
|
|
|
|
|
|
|
|
// this should be read from L1
|
|
|
|
// the file is opened, prefetching results in a cache filter miss
|
|
|
|
// the block is loaded and added to the cache,
|
|
|
|
// then the get results in a cache hit for L1
|
|
|
|
// When we have inifinite max_files, there is still cache miss because we have
|
|
|
|
// reset the block cache
|
Adding pin_l0_filter_and_index_blocks_in_cache feature and related fixes.
Summary:
When a block based table file is opened, if prefetch_index_and_filter is true, it will prefetch the index and filter blocks, putting them into the block cache.
What this feature adds: when a L0 block based table file is opened, if pin_l0_filter_and_index_blocks_in_cache is true in the options (and prefetch_index_and_filter is true), then the filter and index blocks aren't released back to the block cache at the end of BlockBasedTableReader::Open(). Instead the table reader takes ownership of them, hence pinning them, ie. the LRU cache will never push them out. Meanwhile in the table reader, further accesses will not hit the block cache, thus avoiding lock contention.
Test Plan:
'export TEST_TMPDIR=/dev/shm/ && DISABLE_JEMALLOC=1 OPT=-g make all valgrind_check -j32' is OK.
I didn't run the Java tests, I don't have Java set up on my devserver.
Reviewers: sdong
Reviewed By: sdong
Subscribers: andrewkr, dhruba
Differential Revision: https://reviews.facebook.net/D56133
9 years ago
|
|
|
value = Get(1, "a");
|
|
|
|
ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
|
|
|
|
ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_P(PinL0IndexAndFilterBlocksTest, DisablePrefetchingNonL0IndexAndFilter) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
PinL0IndexAndFilterBlocksTest::CreateTwoLevels(&options);
|
|
|
|
|
|
|
|
// Get base cache values
|
|
|
|
uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS);
|
|
|
|
uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT);
|
|
|
|
uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS);
|
|
|
|
uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT);
|
|
|
|
|
|
|
|
// Reopen database. If max_open_files is set as -1, table readers will be
|
|
|
|
// preloaded. This will trigger a BlockBasedTable::Open() and prefetch
|
|
|
|
// L0 index and filter. Level 1's prefetching is disabled in DB::Open()
|
|
|
|
TryReopenWithColumnFamilies({"default", "pikachu"}, options);
|
|
|
|
|
|
|
|
if (infinite_max_files_) {
|
|
|
|
// After reopen, cache miss are increased by one because we read (and only
|
|
|
|
// read) filter and index on L0
|
|
|
|
ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
|
|
|
|
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
|
|
|
|
ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
|
|
|
|
ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
|
|
|
|
} else {
|
|
|
|
// If max_open_files is not -1, we do not preload table readers, so there is
|
|
|
|
// no change.
|
|
|
|
ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
|
|
|
|
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
|
|
|
|
ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
|
|
|
|
ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
|
|
|
|
}
|
|
|
|
std::string value;
|
|
|
|
// this should be read from L0
|
|
|
|
value = Get(1, "a2");
|
|
|
|
// If max_open_files is -1, we have pinned index and filter in Rep, so there
|
|
|
|
// will not be changes in index and filter misses or hits. If max_open_files
|
|
|
|
// is not -1, Get() will open a TableReader and prefetch index and filter.
|
|
|
|
ASSERT_EQ(fm + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
|
|
|
|
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
|
|
|
|
ASSERT_EQ(im + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
|
|
|
|
ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
|
|
|
|
|
|
|
|
// this should be read from L1
|
|
|
|
value = Get(1, "a");
|
|
|
|
if (infinite_max_files_) {
|
|
|
|
// In inifinite max files case, there's a cache miss in executing Get()
|
|
|
|
// because index and filter are not prefetched before.
|
|
|
|
ASSERT_EQ(fm + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
|
|
|
|
ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
|
|
|
|
ASSERT_EQ(im + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
|
|
|
|
ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
|
|
|
|
} else {
|
|
|
|
// In this case, cache miss will be increased by one in
|
|
|
|
// BlockBasedTable::Open() because this is not in DB::Open() code path so we
|
|
|
|
// will prefetch L1's index and filter. Cache hit will also be increased by
|
|
|
|
// one because Get() will read index and filter from the block cache
|
|
|
|
// prefetched in previous Open() call.
|
|
|
|
ASSERT_EQ(fm + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS));
|
|
|
|
ASSERT_EQ(fh + 1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT));
|
|
|
|
ASSERT_EQ(im + 2, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS));
|
|
|
|
ASSERT_EQ(ih + 1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
Adding pin_l0_filter_and_index_blocks_in_cache feature and related fixes.
Summary:
When a block based table file is opened, if prefetch_index_and_filter is true, it will prefetch the index and filter blocks, putting them into the block cache.
What this feature adds: when a L0 block based table file is opened, if pin_l0_filter_and_index_blocks_in_cache is true in the options (and prefetch_index_and_filter is true), then the filter and index blocks aren't released back to the block cache at the end of BlockBasedTableReader::Open(). Instead the table reader takes ownership of them, hence pinning them, ie. the LRU cache will never push them out. Meanwhile in the table reader, further accesses will not hit the block cache, thus avoiding lock contention.
Test Plan:
'export TEST_TMPDIR=/dev/shm/ && DISABLE_JEMALLOC=1 OPT=-g make all valgrind_check -j32' is OK.
I didn't run the Java tests, I don't have Java set up on my devserver.
Reviewers: sdong
Reviewed By: sdong
Subscribers: andrewkr, dhruba
Differential Revision: https://reviews.facebook.net/D56133
9 years ago
|
|
|
INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest,
|
|
|
|
PinL0IndexAndFilterBlocksTest, ::testing::Bool());
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
static void UniqueIdCallback(void* arg) {
|
|
|
|
int* result = reinterpret_cast<int*>(arg);
|
|
|
|
if (*result == -1) {
|
|
|
|
*result = 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
rocksdb::SyncPoint::GetInstance()->ClearTrace();
|
|
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
|
|
"GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
|
|
|
|
}
|
|
|
|
|
|
|
|
class MockPersistentCache : public PersistentCache {
|
|
|
|
public:
|
|
|
|
explicit MockPersistentCache(const bool is_compressed, const size_t max_size)
|
|
|
|
: is_compressed_(is_compressed), max_size_(max_size) {
|
|
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
|
|
"GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual ~MockPersistentCache() {}
|
|
|
|
|
|
|
|
Status Insert(const Slice& page_key, const char* data,
|
|
|
|
const size_t size) override {
|
|
|
|
MutexLock _(&lock_);
|
|
|
|
|
|
|
|
if (size_ > max_size_) {
|
|
|
|
size_ -= data_.begin()->second.size();
|
|
|
|
data_.erase(data_.begin());
|
|
|
|
}
|
|
|
|
|
|
|
|
data_.insert(std::make_pair(page_key.ToString(), std::string(data, size)));
|
|
|
|
size_ += size;
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
|
|
|
|
Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
|
|
|
|
size_t* size) override {
|
|
|
|
MutexLock _(&lock_);
|
|
|
|
auto it = data_.find(page_key.ToString());
|
|
|
|
if (it == data_.end()) {
|
|
|
|
return Status::NotFound();
|
|
|
|
}
|
|
|
|
|
|
|
|
assert(page_key.ToString() == it->first);
|
|
|
|
data->reset(new char[it->second.size()]);
|
|
|
|
memcpy(data->get(), it->second.c_str(), it->second.size());
|
|
|
|
*size = it->second.size();
|
|
|
|
return Status::OK();
|
|
|
|
}
|
|
|
|
|
|
|
|
bool IsCompressed() override { return is_compressed_; }
|
|
|
|
|
|
|
|
port::Mutex lock_;
|
|
|
|
std::map<std::string, std::string> data_;
|
|
|
|
const bool is_compressed_ = true;
|
|
|
|
size_t size_ = 0;
|
|
|
|
const size_t max_size_ = 10 * 1024; // 10KiB
|
|
|
|
};
|
|
|
|
|
|
|
|
TEST_F(DBTest2, PersistentCache) {
|
|
|
|
int num_iter = 80;
|
Shared dictionary compression using reference block
Summary:
This adds a new metablock containing a shared dictionary that is used
to compress all data blocks in the SST file. The size of the shared dictionary
is configurable in CompressionOptions and defaults to 0. It's currently only
used for zlib/lz4/lz4hc, but the block will be stored in the SST regardless of
the compression type if the user chooses a nonzero dictionary size.
During compaction, computes the dictionary by randomly sampling the first
output file in each subcompaction. It pre-computes the intervals to sample
by assuming the output file will have the maximum allowable length. In case
the file is smaller, some of the pre-computed sampling intervals can be beyond
end-of-file, in which case we skip over those samples and the dictionary will
be a bit smaller. After the dictionary is generated using the first file in a
subcompaction, it is loaded into the compression library before writing each
block in each subsequent file of that subcompaction.
On the read path, gets the dictionary from the metablock, if it exists. Then,
loads that dictionary into the compression library before reading each block.
Test Plan: new unit test
Reviewers: yhchiang, IslamAbdelRahman, cyan, sdong
Reviewed By: sdong
Subscribers: andrewkr, yoshinorim, kradhakrishnan, dhruba, leveldb
Differential Revision: https://reviews.facebook.net/D52287
9 years ago
|
|
|
|
|
|
|
Options options;
|
|
|
|
options.write_buffer_size = 64 * 1024; // small write buffer
|
|
|
|
options.statistics = rocksdb::CreateDBStatistics();
|
|
|
|
options = CurrentOptions(options);
|
|
|
|
|
|
|
|
auto bsizes = {/*no block cache*/ 0, /*1M*/ 1 * 1024 * 1024};
|
|
|
|
auto types = {/*compressed*/ 1, /*uncompressed*/ 0};
|
|
|
|
for (auto bsize : bsizes) {
|
|
|
|
for (auto type : types) {
|
|
|
|
BlockBasedTableOptions table_options;
|
|
|
|
table_options.persistent_cache.reset(
|
|
|
|
new MockPersistentCache(type, 10 * 1024));
|
|
|
|
table_options.no_block_cache = true;
|
|
|
|
table_options.block_cache = bsize ? NewLRUCache(bsize) : nullptr;
|
|
|
|
table_options.block_cache_compressed = nullptr;
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
|
|
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
// default column family doesn't have block cache
|
|
|
|
Options no_block_cache_opts;
|
|
|
|
no_block_cache_opts.statistics = options.statistics;
|
|
|
|
no_block_cache_opts = CurrentOptions(no_block_cache_opts);
|
|
|
|
BlockBasedTableOptions table_options_no_bc;
|
|
|
|
table_options_no_bc.no_block_cache = true;
|
|
|
|
no_block_cache_opts.table_factory.reset(
|
|
|
|
NewBlockBasedTableFactory(table_options_no_bc));
|
|
|
|
ReopenWithColumnFamilies(
|
|
|
|
{"default", "pikachu"},
|
|
|
|
std::vector<Options>({no_block_cache_opts, options}));
|
|
|
|
|
|
|
|
Random rnd(301);
|
|
|
|
|
|
|
|
// Write 8MB (80 values, each 100K)
|
|
|
|
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
|
|
|
|
std::vector<std::string> values;
|
|
|
|
std::string str;
|
|
|
|
for (int i = 0; i < num_iter; i++) {
|
|
|
|
if (i % 4 == 0) { // high compression ratio
|
|
|
|
str = RandomString(&rnd, 1000);
|
|
|
|
}
|
|
|
|
values.push_back(str);
|
|
|
|
ASSERT_OK(Put(1, Key(i), values[i]));
|
|
|
|
}
|
|
|
|
|
|
|
|
// flush all data from memtable so that reads are from block cache
|
|
|
|
ASSERT_OK(Flush(1));
|
|
|
|
|
|
|
|
for (int i = 0; i < num_iter; i++) {
|
|
|
|
ASSERT_EQ(Get(1, Key(i)), values[i]);
|
|
|
|
}
|
|
|
|
|
|
|
|
auto hit = options.statistics->getTickerCount(PERSISTENT_CACHE_HIT);
|
|
|
|
auto miss = options.statistics->getTickerCount(PERSISTENT_CACHE_MISS);
|
|
|
|
|
|
|
|
ASSERT_GT(hit, 0);
|
|
|
|
ASSERT_GT(miss, 0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
namespace {
|
|
|
|
void CountSyncPoint() {
|
|
|
|
TEST_SYNC_POINT_CALLBACK("DBTest2::MarkedPoint", nullptr /* arg */);
|
|
|
|
}
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
TEST_F(DBTest2, SyncPointMarker) {
|
|
|
|
std::atomic<int> sync_point_called(0);
|
|
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
|
|
"DBTest2::MarkedPoint",
|
|
|
|
[&](void* arg) { sync_point_called.fetch_add(1); });
|
|
|
|
|
|
|
|
// The first dependency enforces Marker can be loaded before MarkedPoint.
|
|
|
|
// The second checks that thread 1's MarkedPoint should be disabled here.
|
|
|
|
// Execution order:
|
|
|
|
// | Thread 1 | Thread 2 |
|
|
|
|
// | | Marker |
|
|
|
|
// | MarkedPoint | |
|
|
|
|
// | Thread1First | |
|
|
|
|
// | | MarkedPoint |
|
|
|
|
rocksdb::SyncPoint::GetInstance()->LoadDependencyAndMarkers(
|
|
|
|
{{"DBTest2::SyncPointMarker:Thread1First", "DBTest2::MarkedPoint"}},
|
|
|
|
{{"DBTest2::SyncPointMarker:Marker", "DBTest2::MarkedPoint"}});
|
|
|
|
|
|
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
|
|
|
|
std::function<void()> func1 = [&]() {
|
|
|
|
CountSyncPoint();
|
|
|
|
TEST_SYNC_POINT("DBTest2::SyncPointMarker:Thread1First");
|
|
|
|
};
|
|
|
|
|
|
|
|
std::function<void()> func2 = [&]() {
|
|
|
|
TEST_SYNC_POINT("DBTest2::SyncPointMarker:Marker");
|
|
|
|
CountSyncPoint();
|
|
|
|
};
|
|
|
|
|
|
|
|
auto thread1 = std::thread(func1);
|
|
|
|
auto thread2 = std::thread(func2);
|
|
|
|
thread1.join();
|
|
|
|
thread2.join();
|
|
|
|
|
|
|
|
// Callback is only executed once
|
|
|
|
ASSERT_EQ(sync_point_called.load(), 1);
|
|
|
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
|
|
|
}
|
|
|
|
#endif
|
Introduce FullMergeV2 (eliminate memcpy from merge operators)
Summary:
This diff update the code to pin the merge operator operands while the merge operation is done, so that we can eliminate the memcpy cost, to do that we need a new public API for FullMerge that replace the std::deque<std::string> with std::vector<Slice>
This diff is stacked on top of D56493 and D56511
In this diff we
- Update FullMergeV2 arguments to be encapsulated in MergeOperationInput and MergeOperationOutput which will make it easier to add new arguments in the future
- Replace std::deque<std::string> with std::vector<Slice> to pass operands
- Replace MergeContext std::deque with std::vector (based on a simple benchmark I ran https://gist.github.com/IslamAbdelRahman/78fc86c9ab9f52b1df791e58943fb187)
- Allow FullMergeV2 output to be an existing operand
```
[Everything in Memtable | 10K operands | 10 KB each | 1 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=10000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000
[FullMergeV2]
readseq : 0.607 micros/op 1648235 ops/sec; 16121.2 MB/s
readseq : 0.478 micros/op 2091546 ops/sec; 20457.2 MB/s
readseq : 0.252 micros/op 3972081 ops/sec; 38850.5 MB/s
readseq : 0.237 micros/op 4218328 ops/sec; 41259.0 MB/s
readseq : 0.247 micros/op 4043927 ops/sec; 39553.2 MB/s
[master]
readseq : 3.935 micros/op 254140 ops/sec; 2485.7 MB/s
readseq : 3.722 micros/op 268657 ops/sec; 2627.7 MB/s
readseq : 3.149 micros/op 317605 ops/sec; 3106.5 MB/s
readseq : 3.125 micros/op 320024 ops/sec; 3130.1 MB/s
readseq : 4.075 micros/op 245374 ops/sec; 2400.0 MB/s
```
```
[Everything in Memtable | 10K operands | 10 KB each | 10 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="mergerandom,readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --merge_keys=1000 --num=10000 --disable_auto_compactions --value_size=10240 --write_buffer_size=1000000000
[FullMergeV2]
readseq : 3.472 micros/op 288018 ops/sec; 2817.1 MB/s
readseq : 2.304 micros/op 434027 ops/sec; 4245.2 MB/s
readseq : 1.163 micros/op 859845 ops/sec; 8410.0 MB/s
readseq : 1.192 micros/op 838926 ops/sec; 8205.4 MB/s
readseq : 1.250 micros/op 800000 ops/sec; 7824.7 MB/s
[master]
readseq : 24.025 micros/op 41623 ops/sec; 407.1 MB/s
readseq : 18.489 micros/op 54086 ops/sec; 529.0 MB/s
readseq : 18.693 micros/op 53495 ops/sec; 523.2 MB/s
readseq : 23.621 micros/op 42335 ops/sec; 414.1 MB/s
readseq : 18.775 micros/op 53262 ops/sec; 521.0 MB/s
```
```
[Everything in Block cache | 10K operands | 10 KB each | 1 operand per key]
[FullMergeV2]
$ DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions
readseq : 14.741 micros/op 67837 ops/sec; 663.5 MB/s
readseq : 1.029 micros/op 971446 ops/sec; 9501.6 MB/s
readseq : 0.974 micros/op 1026229 ops/sec; 10037.4 MB/s
readseq : 0.965 micros/op 1036080 ops/sec; 10133.8 MB/s
readseq : 0.943 micros/op 1060657 ops/sec; 10374.2 MB/s
[master]
readseq : 16.735 micros/op 59755 ops/sec; 584.5 MB/s
readseq : 3.029 micros/op 330151 ops/sec; 3229.2 MB/s
readseq : 3.136 micros/op 318883 ops/sec; 3119.0 MB/s
readseq : 3.065 micros/op 326245 ops/sec; 3191.0 MB/s
readseq : 3.014 micros/op 331813 ops/sec; 3245.4 MB/s
```
```
[Everything in Block cache | 10K operands | 10 KB each | 10 operand per key]
DEBUG_LEVEL=0 make db_bench -j64 && ./db_bench --benchmarks="readseq,readseq,readseq,readseq,readseq" --merge_operator="max" --num=100000 --db="/dev/shm/merge-random-10-operands-10K-10KB" --cache_size=1000000000 --use_existing_db --disable_auto_compactions
[FullMergeV2]
readseq : 24.325 micros/op 41109 ops/sec; 402.1 MB/s
readseq : 1.470 micros/op 680272 ops/sec; 6653.7 MB/s
readseq : 1.231 micros/op 812347 ops/sec; 7945.5 MB/s
readseq : 1.091 micros/op 916590 ops/sec; 8965.1 MB/s
readseq : 1.109 micros/op 901713 ops/sec; 8819.6 MB/s
[master]
readseq : 27.257 micros/op 36687 ops/sec; 358.8 MB/s
readseq : 4.443 micros/op 225073 ops/sec; 2201.4 MB/s
readseq : 5.830 micros/op 171526 ops/sec; 1677.7 MB/s
readseq : 4.173 micros/op 239635 ops/sec; 2343.8 MB/s
readseq : 4.150 micros/op 240963 ops/sec; 2356.8 MB/s
```
Test Plan: COMPILE_WITH_ASAN=1 make check -j64
Reviewers: yhchiang, andrewkr, sdong
Reviewed By: sdong
Subscribers: lovro, andrewkr, dhruba
Differential Revision: https://reviews.facebook.net/D57075
8 years ago
|
|
|
|
|
|
|
class MergeOperatorPinningTest : public DBTest2,
|
|
|
|
public testing::WithParamInterface<bool> {
|
|
|
|
public:
|
|
|
|
MergeOperatorPinningTest() { disable_block_cache_ = GetParam(); }
|
|
|
|
|
|
|
|
bool disable_block_cache_;
|
|
|
|
};
|
|
|
|
|
|
|
|
INSTANTIATE_TEST_CASE_P(MergeOperatorPinningTest, MergeOperatorPinningTest,
|
|
|
|
::testing::Bool());
|
|
|
|
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
TEST_P(MergeOperatorPinningTest, OperandsMultiBlocks) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
BlockBasedTableOptions table_options;
|
|
|
|
table_options.block_size = 1; // every block will contain one entry
|
|
|
|
table_options.no_block_cache = disable_block_cache_;
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
|
|
options.merge_operator = MergeOperators::CreateStringAppendTESTOperator();
|
|
|
|
options.level0_slowdown_writes_trigger = (1 << 30);
|
|
|
|
options.level0_stop_writes_trigger = (1 << 30);
|
|
|
|
options.disable_auto_compactions = true;
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
const int kKeysPerFile = 10;
|
|
|
|
const int kOperandsPerKeyPerFile = 7;
|
|
|
|
const int kOperandSize = 100;
|
|
|
|
// Filse to write in L0 before compacting to lower level
|
|
|
|
const int kFilesPerLevel = 3;
|
|
|
|
|
|
|
|
Random rnd(301);
|
|
|
|
std::map<std::string, std::string> true_data;
|
|
|
|
int batch_num = 1;
|
|
|
|
int lvl_to_fill = 4;
|
|
|
|
int key_id = 0;
|
|
|
|
while (true) {
|
|
|
|
for (int j = 0; j < kKeysPerFile; j++) {
|
|
|
|
std::string key = Key(key_id % 35);
|
|
|
|
key_id++;
|
|
|
|
for (int k = 0; k < kOperandsPerKeyPerFile; k++) {
|
|
|
|
std::string val = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Merge(WriteOptions(), key, val));
|
|
|
|
if (true_data[key].size() == 0) {
|
|
|
|
true_data[key] = val;
|
|
|
|
} else {
|
|
|
|
true_data[key] += "," + val;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (lvl_to_fill == -1) {
|
|
|
|
// Keep last batch in memtable and stop
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
if (batch_num % kFilesPerLevel == 0) {
|
|
|
|
if (lvl_to_fill != 0) {
|
|
|
|
MoveFilesToLevel(lvl_to_fill);
|
|
|
|
}
|
|
|
|
lvl_to_fill--;
|
|
|
|
}
|
|
|
|
batch_num++;
|
|
|
|
}
|
|
|
|
|
|
|
|
// 3 L0 files
|
|
|
|
// 1 L1 file
|
|
|
|
// 3 L2 files
|
|
|
|
// 1 L3 file
|
|
|
|
// 3 L4 Files
|
|
|
|
ASSERT_EQ(FilesPerLevel(), "3,1,3,1,3");
|
|
|
|
|
|
|
|
// Verify Get()
|
|
|
|
for (auto kv : true_data) {
|
|
|
|
ASSERT_EQ(Get(kv.first), kv.second);
|
|
|
|
}
|
|
|
|
|
|
|
|
Iterator* iter = db_->NewIterator(ReadOptions());
|
|
|
|
|
|
|
|
// Verify Iterator::Next()
|
|
|
|
auto data_iter = true_data.begin();
|
|
|
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) {
|
|
|
|
ASSERT_EQ(iter->key().ToString(), data_iter->first);
|
|
|
|
ASSERT_EQ(iter->value().ToString(), data_iter->second);
|
|
|
|
}
|
|
|
|
ASSERT_EQ(data_iter, true_data.end());
|
|
|
|
|
|
|
|
// Verify Iterator::Prev()
|
|
|
|
auto data_rev = true_data.rbegin();
|
|
|
|
for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) {
|
|
|
|
ASSERT_EQ(iter->key().ToString(), data_rev->first);
|
|
|
|
ASSERT_EQ(iter->value().ToString(), data_rev->second);
|
|
|
|
}
|
|
|
|
ASSERT_EQ(data_rev, true_data.rend());
|
|
|
|
|
|
|
|
// Verify Iterator::Seek()
|
|
|
|
for (auto kv : true_data) {
|
|
|
|
iter->Seek(kv.first);
|
|
|
|
ASSERT_EQ(kv.first, iter->key().ToString());
|
|
|
|
ASSERT_EQ(kv.second, iter->value().ToString());
|
|
|
|
}
|
|
|
|
|
|
|
|
delete iter;
|
|
|
|
}
|
|
|
|
|
|
|
|
TEST_P(MergeOperatorPinningTest, Randomized) {
|
|
|
|
do {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
options.merge_operator = MergeOperators::CreateMaxOperator();
|
|
|
|
BlockBasedTableOptions table_options;
|
|
|
|
table_options.no_block_cache = disable_block_cache_;
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
Random rnd(301);
|
|
|
|
std::map<std::string, std::string> true_data;
|
|
|
|
|
|
|
|
const int kTotalMerges = 10000;
|
|
|
|
// Every key gets ~10 operands
|
|
|
|
const int kKeyRange = kTotalMerges / 10;
|
|
|
|
const int kOperandSize = 20;
|
|
|
|
const int kNumPutBefore = kKeyRange / 10; // 10% value
|
|
|
|
const int kNumPutAfter = kKeyRange / 10; // 10% overwrite
|
|
|
|
const int kNumDelete = kKeyRange / 10; // 10% delete
|
|
|
|
|
|
|
|
// kNumPutBefore keys will have base values
|
|
|
|
for (int i = 0; i < kNumPutBefore; i++) {
|
|
|
|
std::string key = Key(rnd.Next() % kKeyRange);
|
|
|
|
std::string value = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Put(WriteOptions(), key, value));
|
|
|
|
|
|
|
|
true_data[key] = value;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Do kTotalMerges merges
|
|
|
|
for (int i = 0; i < kTotalMerges; i++) {
|
|
|
|
std::string key = Key(rnd.Next() % kKeyRange);
|
|
|
|
std::string value = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Merge(WriteOptions(), key, value));
|
|
|
|
|
|
|
|
if (true_data[key] < value) {
|
|
|
|
true_data[key] = value;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Overwrite random kNumPutAfter keys
|
|
|
|
for (int i = 0; i < kNumPutAfter; i++) {
|
|
|
|
std::string key = Key(rnd.Next() % kKeyRange);
|
|
|
|
std::string value = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Put(WriteOptions(), key, value));
|
|
|
|
|
|
|
|
true_data[key] = value;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delete random kNumDelete keys
|
|
|
|
for (int i = 0; i < kNumDelete; i++) {
|
|
|
|
std::string key = Key(rnd.Next() % kKeyRange);
|
|
|
|
ASSERT_OK(db_->Delete(WriteOptions(), key));
|
|
|
|
|
|
|
|
true_data.erase(key);
|
|
|
|
}
|
|
|
|
|
|
|
|
VerifyDBFromMap(true_data);
|
|
|
|
|
|
|
|
// Skip HashCuckoo since it does not support merge operators
|
|
|
|
} while (ChangeOptions(kSkipMergePut | kSkipHashCuckoo));
|
|
|
|
}
|
|
|
|
|
|
|
|
class MergeOperatorHook : public MergeOperator {
|
|
|
|
public:
|
|
|
|
explicit MergeOperatorHook(std::shared_ptr<MergeOperator> _merge_op)
|
|
|
|
: merge_op_(_merge_op) {}
|
|
|
|
|
|
|
|
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
|
|
|
|
MergeOperationOutput* merge_out) const override {
|
|
|
|
before_merge_();
|
|
|
|
bool res = merge_op_->FullMergeV2(merge_in, merge_out);
|
|
|
|
after_merge_();
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
virtual const char* Name() const override { return merge_op_->Name(); }
|
|
|
|
|
|
|
|
std::shared_ptr<MergeOperator> merge_op_;
|
|
|
|
std::function<void()> before_merge_ = []() {};
|
|
|
|
std::function<void()> after_merge_ = []() {};
|
|
|
|
};
|
|
|
|
|
|
|
|
TEST_P(MergeOperatorPinningTest, EvictCacheBeforeMerge) {
|
|
|
|
Options options = CurrentOptions();
|
|
|
|
|
|
|
|
auto merge_hook =
|
|
|
|
std::make_shared<MergeOperatorHook>(MergeOperators::CreateMaxOperator());
|
|
|
|
options.merge_operator = merge_hook;
|
|
|
|
options.disable_auto_compactions = true;
|
|
|
|
options.level0_slowdown_writes_trigger = (1 << 30);
|
|
|
|
options.level0_stop_writes_trigger = (1 << 30);
|
|
|
|
options.max_open_files = 20;
|
|
|
|
BlockBasedTableOptions bbto;
|
|
|
|
bbto.no_block_cache = disable_block_cache_;
|
|
|
|
if (bbto.no_block_cache == false) {
|
|
|
|
bbto.block_cache = NewLRUCache(64 * 1024 * 1024);
|
|
|
|
} else {
|
|
|
|
bbto.block_cache = nullptr;
|
|
|
|
}
|
|
|
|
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
|
|
|
|
const int kNumOperands = 30;
|
|
|
|
const int kNumKeys = 1000;
|
|
|
|
const int kOperandSize = 100;
|
|
|
|
Random rnd(301);
|
|
|
|
|
|
|
|
// 1000 keys every key have 30 operands, every operand is in a different file
|
|
|
|
std::map<std::string, std::string> true_data;
|
|
|
|
for (int i = 0; i < kNumOperands; i++) {
|
|
|
|
for (int j = 0; j < kNumKeys; j++) {
|
|
|
|
std::string k = Key(j);
|
|
|
|
std::string v = RandomString(&rnd, kOperandSize);
|
|
|
|
ASSERT_OK(db_->Merge(WriteOptions(), k, v));
|
|
|
|
|
|
|
|
true_data[k] = std::max(true_data[k], v);
|
|
|
|
}
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
}
|
|
|
|
|
|
|
|
std::vector<uint64_t> file_numbers = ListTableFiles(env_, dbname_);
|
|
|
|
ASSERT_EQ(file_numbers.size(), kNumOperands);
|
|
|
|
int merge_cnt = 0;
|
|
|
|
|
|
|
|
// Code executed before merge operation
|
|
|
|
merge_hook->before_merge_ = [&]() {
|
|
|
|
// Evict all tables from cache before every merge operation
|
|
|
|
for (uint64_t num : file_numbers) {
|
|
|
|
TableCache::Evict(dbfull()->TEST_table_cache(), num);
|
|
|
|
}
|
|
|
|
// Decrease cache capacity to force all unrefed blocks to be evicted
|
|
|
|
if (bbto.block_cache) {
|
|
|
|
bbto.block_cache->SetCapacity(1);
|
|
|
|
}
|
|
|
|
merge_cnt++;
|
|
|
|
};
|
|
|
|
|
|
|
|
// Code executed after merge operation
|
|
|
|
merge_hook->after_merge_ = [&]() {
|
|
|
|
// Increase capacity again after doing the merge
|
|
|
|
if (bbto.block_cache) {
|
|
|
|
bbto.block_cache->SetCapacity(64 * 1024 * 1024);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
VerifyDBFromMap(true_data);
|
|
|
|
ASSERT_EQ(merge_cnt, kNumKeys * 4 /* get + next + prev + seek */);
|
|
|
|
|
|
|
|
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
|
|
|
|
|
|
|
VerifyDBFromMap(true_data);
|
|
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
|
|
|
|
} // namespace rocksdb
|
|
|
|
|
|
|
|
int main(int argc, char** argv) {
|
|
|
|
rocksdb::port::InstallStackTraceHandler();
|
|
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
|
|
return RUN_ALL_TESTS();
|
|
|
|
}
|