fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
631 lines
19 KiB
631 lines
19 KiB
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include <array>
|
|
|
|
#include "db/blob/blob_index.h"
|
|
#include "db/db_test_util.h"
|
|
#include "port/stack_trace.h"
|
|
#include "test_util/sync_point.h"
|
|
#include "utilities/fault_injection_env.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class DBBlobBasicTest : public DBTestBase {
|
|
protected:
|
|
DBBlobBasicTest()
|
|
: DBTestBase("db_blob_basic_test", /* env_do_fsync */ false) {}
|
|
};
|
|
|
|
TEST_F(DBBlobBasicTest, GetBlob) {
|
|
Options options = GetDefaultOptions();
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = 0;
|
|
|
|
Reopen(options);
|
|
|
|
constexpr char key[] = "key";
|
|
constexpr char blob_value[] = "blob_value";
|
|
|
|
ASSERT_OK(Put(key, blob_value));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_EQ(Get(key), blob_value);
|
|
|
|
// Try again with no I/O allowed. The table and the necessary blocks should
|
|
// already be in their respective caches; however, the blob itself can only be
|
|
// read from the blob file, so the read should return Incomplete.
|
|
ReadOptions read_options;
|
|
read_options.read_tier = kBlockCacheTier;
|
|
|
|
PinnableSlice result;
|
|
ASSERT_TRUE(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result)
|
|
.IsIncomplete());
|
|
}
|
|
|
|
TEST_F(DBBlobBasicTest, MultiGetBlobs) {
|
|
constexpr size_t min_blob_size = 6;
|
|
|
|
Options options = GetDefaultOptions();
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = min_blob_size;
|
|
|
|
Reopen(options);
|
|
|
|
// Put then retrieve three key-values. The first value is below the size limit
|
|
// and is thus stored inline; the other two are stored separately as blobs.
|
|
constexpr size_t num_keys = 3;
|
|
|
|
constexpr char first_key[] = "first_key";
|
|
constexpr char first_value[] = "short";
|
|
static_assert(sizeof(first_value) - 1 < min_blob_size,
|
|
"first_value too long to be inlined");
|
|
|
|
ASSERT_OK(Put(first_key, first_value));
|
|
|
|
constexpr char second_key[] = "second_key";
|
|
constexpr char second_value[] = "long_value";
|
|
static_assert(sizeof(second_value) - 1 >= min_blob_size,
|
|
"second_value too short to be stored as blob");
|
|
|
|
ASSERT_OK(Put(second_key, second_value));
|
|
|
|
constexpr char third_key[] = "third_key";
|
|
constexpr char third_value[] = "other_long_value";
|
|
static_assert(sizeof(third_value) - 1 >= min_blob_size,
|
|
"third_value too short to be stored as blob");
|
|
|
|
ASSERT_OK(Put(third_key, third_value));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
ReadOptions read_options;
|
|
|
|
std::array<Slice, num_keys> keys{{first_key, second_key, third_key}};
|
|
|
|
{
|
|
std::array<PinnableSlice, num_keys> values;
|
|
std::array<Status, num_keys> statuses;
|
|
|
|
db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
|
|
&values[0], &statuses[0]);
|
|
|
|
ASSERT_OK(statuses[0]);
|
|
ASSERT_EQ(values[0], first_value);
|
|
|
|
ASSERT_OK(statuses[1]);
|
|
ASSERT_EQ(values[1], second_value);
|
|
|
|
ASSERT_OK(statuses[2]);
|
|
ASSERT_EQ(values[2], third_value);
|
|
}
|
|
|
|
// Try again with no I/O allowed. The table and the necessary blocks should
|
|
// already be in their respective caches. The first (inlined) value should be
|
|
// successfully read; however, the two blob values could only be read from the
|
|
// blob file, so for those the read should return Incomplete.
|
|
read_options.read_tier = kBlockCacheTier;
|
|
|
|
{
|
|
std::array<PinnableSlice, num_keys> values;
|
|
std::array<Status, num_keys> statuses;
|
|
|
|
db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
|
|
&values[0], &statuses[0]);
|
|
|
|
ASSERT_OK(statuses[0]);
|
|
ASSERT_EQ(values[0], first_value);
|
|
|
|
ASSERT_TRUE(statuses[1].IsIncomplete());
|
|
|
|
ASSERT_TRUE(statuses[2].IsIncomplete());
|
|
}
|
|
}
|
|
|
|
TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) {
|
|
Options options = GetDefaultOptions();
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = 0;
|
|
|
|
Reopen(options);
|
|
|
|
constexpr char key[] = "key";
|
|
|
|
// Fake a corrupt blob index.
|
|
const std::string blob_index("foobar");
|
|
|
|
WriteBatch batch;
|
|
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
|
|
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
PinnableSlice result;
|
|
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
|
|
.IsCorruption());
|
|
}
|
|
|
|
TEST_F(DBBlobBasicTest, GetBlob_InlinedTTLIndex) {
|
|
constexpr uint64_t min_blob_size = 10;
|
|
|
|
Options options = GetDefaultOptions();
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = min_blob_size;
|
|
|
|
Reopen(options);
|
|
|
|
constexpr char key[] = "key";
|
|
constexpr char blob[] = "short";
|
|
static_assert(sizeof(short) - 1 < min_blob_size,
|
|
"Blob too long to be inlined");
|
|
|
|
// Fake an inlined TTL blob index.
|
|
std::string blob_index;
|
|
|
|
constexpr uint64_t expiration = 1234567890;
|
|
|
|
BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob);
|
|
|
|
WriteBatch batch;
|
|
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
|
|
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
PinnableSlice result;
|
|
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
|
|
.IsCorruption());
|
|
}
|
|
|
|
TEST_F(DBBlobBasicTest, GetBlob_IndexWithInvalidFileNumber) {
|
|
Options options = GetDefaultOptions();
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = 0;
|
|
|
|
Reopen(options);
|
|
|
|
constexpr char key[] = "key";
|
|
|
|
// Fake a blob index referencing a non-existent blob file.
|
|
std::string blob_index;
|
|
|
|
constexpr uint64_t blob_file_number = 1000;
|
|
constexpr uint64_t offset = 1234;
|
|
constexpr uint64_t size = 5678;
|
|
|
|
BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
|
|
kNoCompression);
|
|
|
|
WriteBatch batch;
|
|
ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
|
|
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
PinnableSlice result;
|
|
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
|
|
.IsCorruption());
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
TEST_F(DBBlobBasicTest, GenerateIOTracing) {
|
|
Options options = GetDefaultOptions();
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = 0;
|
|
std::string trace_file = dbname_ + "/io_trace_file";
|
|
|
|
Reopen(options);
|
|
{
|
|
// Create IO trace file
|
|
std::unique_ptr<TraceWriter> trace_writer;
|
|
ASSERT_OK(
|
|
NewFileTraceWriter(env_, EnvOptions(), trace_file, &trace_writer));
|
|
ASSERT_OK(db_->StartIOTrace(TraceOptions(), std::move(trace_writer)));
|
|
|
|
constexpr char key[] = "key";
|
|
constexpr char blob_value[] = "blob_value";
|
|
|
|
ASSERT_OK(Put(key, blob_value));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_EQ(Get(key), blob_value);
|
|
|
|
ASSERT_OK(db_->EndIOTrace());
|
|
ASSERT_OK(env_->FileExists(trace_file));
|
|
}
|
|
{
|
|
// Parse trace file to check file operations related to blob files are
|
|
// recorded.
|
|
std::unique_ptr<TraceReader> trace_reader;
|
|
ASSERT_OK(
|
|
NewFileTraceReader(env_, EnvOptions(), trace_file, &trace_reader));
|
|
IOTraceReader reader(std::move(trace_reader));
|
|
|
|
IOTraceHeader header;
|
|
ASSERT_OK(reader.ReadHeader(&header));
|
|
ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version));
|
|
ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version));
|
|
|
|
// Read records.
|
|
int blob_files_op_count = 0;
|
|
Status status;
|
|
while (true) {
|
|
IOTraceRecord record;
|
|
status = reader.ReadIOOp(&record);
|
|
if (!status.ok()) {
|
|
break;
|
|
}
|
|
if (record.file_name.find("blob") != std::string::npos) {
|
|
blob_files_op_count++;
|
|
}
|
|
}
|
|
// Assuming blob files will have Append, Close and then Read operations.
|
|
ASSERT_GT(blob_files_op_count, 2);
|
|
}
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
TEST_F(DBBlobBasicTest, BestEffortsRecovery_MissingNewestBlobFile) {
|
|
Options options = GetDefaultOptions();
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = 0;
|
|
options.create_if_missing = true;
|
|
Reopen(options);
|
|
|
|
ASSERT_OK(dbfull()->DisableFileDeletions());
|
|
constexpr int kNumTableFiles = 2;
|
|
for (int i = 0; i < kNumTableFiles; ++i) {
|
|
for (char ch = 'a'; ch != 'c'; ++ch) {
|
|
std::string key(1, ch);
|
|
ASSERT_OK(Put(key, "value" + std::to_string(i)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
|
|
Close();
|
|
|
|
std::vector<std::string> files;
|
|
ASSERT_OK(env_->GetChildren(dbname_, &files));
|
|
std::string blob_file_path;
|
|
uint64_t max_blob_file_num = kInvalidBlobFileNumber;
|
|
for (const auto& fname : files) {
|
|
uint64_t file_num = 0;
|
|
FileType type;
|
|
if (ParseFileName(fname, &file_num, /*info_log_name_prefix=*/"", &type) &&
|
|
type == kBlobFile) {
|
|
if (file_num > max_blob_file_num) {
|
|
max_blob_file_num = file_num;
|
|
blob_file_path = dbname_ + "/" + fname;
|
|
}
|
|
}
|
|
}
|
|
ASSERT_OK(env_->DeleteFile(blob_file_path));
|
|
|
|
options.best_efforts_recovery = true;
|
|
Reopen(options);
|
|
std::string value;
|
|
ASSERT_OK(db_->Get(ReadOptions(), "a", &value));
|
|
ASSERT_EQ("value" + std::to_string(kNumTableFiles - 2), value);
|
|
}
|
|
|
|
TEST_F(DBBlobBasicTest, GetMergeBlobWithPut) {
|
|
Options options = GetDefaultOptions();
|
|
options.merge_operator = MergeOperators::CreateStringAppendOperator();
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = 0;
|
|
|
|
Reopen(options);
|
|
|
|
ASSERT_OK(Put("Key1", "v1"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Merge("Key1", "v2"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Merge("Key1", "v3"));
|
|
ASSERT_OK(Flush());
|
|
|
|
std::string value;
|
|
ASSERT_OK(db_->Get(ReadOptions(), "Key1", &value));
|
|
ASSERT_EQ(Get("Key1"), "v1,v2,v3");
|
|
}
|
|
|
|
TEST_F(DBBlobBasicTest, MultiGetMergeBlobWithPut) {
|
|
constexpr size_t num_keys = 3;
|
|
|
|
Options options = GetDefaultOptions();
|
|
options.merge_operator = MergeOperators::CreateStringAppendOperator();
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = 0;
|
|
|
|
Reopen(options);
|
|
|
|
ASSERT_OK(Put("Key0", "v0_0"));
|
|
ASSERT_OK(Put("Key1", "v1_0"));
|
|
ASSERT_OK(Put("Key2", "v2_0"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Merge("Key0", "v0_1"));
|
|
ASSERT_OK(Merge("Key1", "v1_1"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Merge("Key0", "v0_2"));
|
|
ASSERT_OK(Flush());
|
|
|
|
std::array<Slice, num_keys> keys{{"Key0", "Key1", "Key2"}};
|
|
std::array<PinnableSlice, num_keys> values;
|
|
std::array<Status, num_keys> statuses;
|
|
|
|
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0],
|
|
&values[0], &statuses[0]);
|
|
|
|
ASSERT_OK(statuses[0]);
|
|
ASSERT_EQ(values[0], "v0_0,v0_1,v0_2");
|
|
|
|
ASSERT_OK(statuses[1]);
|
|
ASSERT_EQ(values[1], "v1_0,v1_1");
|
|
|
|
ASSERT_OK(statuses[2]);
|
|
ASSERT_EQ(values[2], "v2_0");
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
TEST_F(DBBlobBasicTest, Properties) {
|
|
Options options = GetDefaultOptions();
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = 0;
|
|
Reopen(options);
|
|
ASSERT_OK(Put("key1", "0000000000"));
|
|
ASSERT_OK(Put("key2", "0000000000"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("key3", "0000000000"));
|
|
ASSERT_OK(Flush());
|
|
// num of files
|
|
uint64_t num_blob_files = 0;
|
|
EXPECT_TRUE(
|
|
db_->GetIntProperty(DB::Properties::kNumBlobFiles, &num_blob_files));
|
|
ASSERT_EQ(num_blob_files, 2);
|
|
// size of live blob files
|
|
uint64_t live_blob_file_size = 0;
|
|
EXPECT_TRUE(db_->GetIntProperty(DB::Properties::kLiveBlobFileSize,
|
|
&live_blob_file_size));
|
|
// size of total blob files
|
|
uint64_t total_blob_file_size = 0;
|
|
EXPECT_TRUE(db_->GetIntProperty(DB::Properties::kTotalBlobFileSize,
|
|
&total_blob_file_size));
|
|
ASSERT_EQ(live_blob_file_size, total_blob_file_size);
|
|
auto* versions = dbfull()->TEST_GetVersionSet();
|
|
auto* current = versions->GetColumnFamilySet()->GetDefault()->current();
|
|
const auto& blob_files = current->storage_info()->GetBlobFiles();
|
|
uint64_t expected_live_blob_file_size = 0;
|
|
for (const auto& pair : blob_files) {
|
|
expected_live_blob_file_size += pair.second->GetTotalBlobBytes();
|
|
}
|
|
ASSERT_EQ(live_blob_file_size, expected_live_blob_file_size);
|
|
|
|
// estimate live data size
|
|
std::string blob_stats = "";
|
|
EXPECT_TRUE(db_->GetProperty(DB::Properties::kBlobStats, &blob_stats));
|
|
EXPECT_TRUE(blob_stats.size() > 0);
|
|
|
|
// delete key2 to make some garbage
|
|
ASSERT_OK(Delete("key2"));
|
|
ASSERT_OK(Flush());
|
|
constexpr Slice* begin = nullptr;
|
|
constexpr Slice* end = nullptr;
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
|
|
|
|
std::string new_blob_stats = "";
|
|
EXPECT_TRUE(db_->GetProperty(DB::Properties::kBlobStats, &new_blob_stats));
|
|
std::cout << blob_stats << new_blob_stats << std::endl;
|
|
|
|
{
|
|
std::istringstream ss1(blob_stats);
|
|
std::istringstream ss2(new_blob_stats);
|
|
std::string stats_line = "";
|
|
std::string new_stats_line = "";
|
|
// skip the first line because it is the version info
|
|
std::getline(ss1, stats_line);
|
|
std::getline(ss2, new_stats_line);
|
|
for (size_t i = 0; i < 3; i++) {
|
|
std::getline(ss1, stats_line);
|
|
std::getline(ss2, new_stats_line);
|
|
if (i == 2) {
|
|
ASSERT_TRUE(stats_line != new_stats_line);
|
|
} else {
|
|
ASSERT_EQ(stats_line, new_stats_line);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(DBBlobBasicTest, PropertiesMultiVersion) {
|
|
Options options = GetDefaultOptions();
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = 0;
|
|
Reopen(options);
|
|
|
|
ASSERT_OK(Put("key1", "0000000000"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("key2", "0000000000"));
|
|
ASSERT_OK(Flush());
|
|
// create an iterator to make the current version alive
|
|
Iterator* iter = db_->NewIterator(ReadOptions());
|
|
ASSERT_OK(iter->status());
|
|
ASSERT_OK(Put("key3", "0000000000"));
|
|
ASSERT_OK(Flush());
|
|
|
|
// size of total blob files
|
|
uint64_t total_blob_file_size = 0;
|
|
EXPECT_TRUE(db_->GetIntProperty(DB::Properties::kTotalBlobFileSize,
|
|
&total_blob_file_size));
|
|
|
|
// total size equals to the current version's blob size because previous
|
|
// version's files are duplicated and thus not counted
|
|
auto* versions = dbfull()->TEST_GetVersionSet();
|
|
auto* current = versions->GetColumnFamilySet()->GetDefault()->current();
|
|
const auto& blob_files = current->storage_info()->GetBlobFiles();
|
|
uint64_t current_v_blob_size = 0;
|
|
for (const auto& pair : blob_files) {
|
|
current_v_blob_size += pair.second->GetTotalBlobBytes();
|
|
}
|
|
ASSERT_EQ(current_v_blob_size, total_blob_file_size);
|
|
delete iter;
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
class DBBlobBasicIOErrorTest : public DBBlobBasicTest,
|
|
public testing::WithParamInterface<std::string> {
|
|
protected:
|
|
DBBlobBasicIOErrorTest() : sync_point_(GetParam()) {
|
|
fault_injection_env_.reset(new FaultInjectionTestEnv(env_));
|
|
}
|
|
~DBBlobBasicIOErrorTest() { Close(); }
|
|
|
|
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
|
|
std::string sync_point_;
|
|
};
|
|
|
|
INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorTest,
|
|
::testing::ValuesIn(std::vector<std::string>{
|
|
"BlobFileReader::OpenFile:NewRandomAccessFile",
|
|
"BlobFileReader::GetBlob:ReadFromFile"}));
|
|
|
|
TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) {
|
|
Options options;
|
|
options.env = fault_injection_env_.get();
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = 0;
|
|
|
|
Reopen(options);
|
|
|
|
constexpr char key[] = "key";
|
|
constexpr char blob_value[] = "blob_value";
|
|
|
|
ASSERT_OK(Put(key, blob_value));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
|
|
fault_injection_env_->SetFilesystemActive(false,
|
|
Status::IOError(sync_point_));
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
PinnableSlice result;
|
|
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
|
|
.IsIOError());
|
|
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
}
|
|
|
|
TEST_P(DBBlobBasicIOErrorTest, MultiGetBlobs_IOError) {
|
|
Options options = GetDefaultOptions();
|
|
options.env = fault_injection_env_.get();
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = 0;
|
|
|
|
Reopen(options);
|
|
|
|
constexpr size_t num_keys = 2;
|
|
|
|
constexpr char first_key[] = "first_key";
|
|
constexpr char first_value[] = "first_value";
|
|
|
|
ASSERT_OK(Put(first_key, first_value));
|
|
|
|
constexpr char second_key[] = "second_key";
|
|
constexpr char second_value[] = "second_value";
|
|
|
|
ASSERT_OK(Put(second_key, second_value));
|
|
|
|
ASSERT_OK(Flush());
|
|
|
|
std::array<Slice, num_keys> keys{{first_key, second_key}};
|
|
std::array<PinnableSlice, num_keys> values;
|
|
std::array<Status, num_keys> statuses;
|
|
|
|
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
|
|
fault_injection_env_->SetFilesystemActive(false,
|
|
Status::IOError(sync_point_));
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0],
|
|
&values[0], &statuses[0]);
|
|
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
|
|
ASSERT_TRUE(statuses[0].IsIOError());
|
|
ASSERT_TRUE(statuses[1].IsIOError());
|
|
}
|
|
|
|
namespace {
|
|
|
|
class ReadBlobCompactionFilter : public CompactionFilter {
|
|
public:
|
|
ReadBlobCompactionFilter() = default;
|
|
const char* Name() const override {
|
|
return "rocksdb.compaction.filter.read.blob";
|
|
}
|
|
CompactionFilter::Decision FilterV2(
|
|
int /*level*/, const Slice& /*key*/, ValueType value_type,
|
|
const Slice& existing_value, std::string* new_value,
|
|
std::string* /*skip_until*/) const override {
|
|
if (value_type != CompactionFilter::ValueType::kValue) {
|
|
return CompactionFilter::Decision::kKeep;
|
|
}
|
|
assert(new_value);
|
|
new_value->assign(existing_value.data(), existing_value.size());
|
|
return CompactionFilter::Decision::kChangeValue;
|
|
}
|
|
};
|
|
|
|
} // anonymous namespace
|
|
|
|
TEST_P(DBBlobBasicIOErrorTest, CompactionFilterReadBlob_IOError) {
|
|
Options options = GetDefaultOptions();
|
|
options.env = fault_injection_env_.get();
|
|
options.enable_blob_files = true;
|
|
options.min_blob_size = 0;
|
|
options.create_if_missing = true;
|
|
std::unique_ptr<CompactionFilter> compaction_filter_guard(
|
|
new ReadBlobCompactionFilter);
|
|
options.compaction_filter = compaction_filter_guard.get();
|
|
|
|
DestroyAndReopen(options);
|
|
constexpr char key[] = "foo";
|
|
constexpr char blob_value[] = "foo_blob_value";
|
|
ASSERT_OK(Put(key, blob_value));
|
|
ASSERT_OK(Flush());
|
|
|
|
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
|
|
fault_injection_env_->SetFilesystemActive(false,
|
|
Status::IOError(sync_point_));
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
|
|
/*end=*/nullptr)
|
|
.IsIOError());
|
|
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
|
extern "C" {
|
|
void RegisterCustomObjects(int argc, char** argv);
|
|
}
|
|
#else
|
|
void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
|
|
#endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
|
|
|
int main(int argc, char** argv) {
|
|
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
RegisterCustomObjects(argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|
|
|