// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include #include "db/blob/blob_index.h" #include "db/db_test_util.h" #include "port/stack_trace.h" #include "test_util/sync_point.h" #include "utilities/fault_injection_env.h" namespace ROCKSDB_NAMESPACE { class DBBlobBasicTest : public DBTestBase { protected: DBBlobBasicTest() : DBTestBase("/db_blob_basic_test", /* env_do_fsync */ false) {} }; TEST_F(DBBlobBasicTest, GetBlob) { Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr char key[] = "key"; constexpr char blob_value[] = "blob_value"; ASSERT_OK(Put(key, blob_value)); ASSERT_OK(Flush()); ASSERT_EQ(Get(key), blob_value); // Try again with no I/O allowed. The table and the necessary blocks should // already be in their respective caches; however, the blob itself can only be // read from the blob file, so the read should return Incomplete. ReadOptions read_options; read_options.read_tier = kBlockCacheTier; PinnableSlice result; ASSERT_TRUE(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result) .IsIncomplete()); } TEST_F(DBBlobBasicTest, MultiGetBlobs) { constexpr size_t min_blob_size = 6; Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = min_blob_size; Reopen(options); // Put then retrieve three key-values. The first value is below the size limit // and is thus stored inline; the other two are stored separately as blobs. constexpr size_t num_keys = 3; constexpr char first_key[] = "first_key"; constexpr char first_value[] = "short"; static_assert(sizeof(first_value) - 1 < min_blob_size, "first_value too long to be inlined"); ASSERT_OK(Put(first_key, first_value)); constexpr char second_key[] = "second_key"; constexpr char second_value[] = "long_value"; static_assert(sizeof(second_value) - 1 >= min_blob_size, "second_value too short to be stored as blob"); ASSERT_OK(Put(second_key, second_value)); constexpr char third_key[] = "third_key"; constexpr char third_value[] = "other_long_value"; static_assert(sizeof(third_value) - 1 >= min_blob_size, "third_value too short to be stored as blob"); ASSERT_OK(Put(third_key, third_value)); ASSERT_OK(Flush()); ReadOptions read_options; std::array keys{{first_key, second_key, third_key}}; { std::array values; std::array statuses; db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], &values[0], &statuses[0]); ASSERT_OK(statuses[0]); ASSERT_EQ(values[0], first_value); ASSERT_OK(statuses[1]); ASSERT_EQ(values[1], second_value); ASSERT_OK(statuses[2]); ASSERT_EQ(values[2], third_value); } // Try again with no I/O allowed. The table and the necessary blocks should // already be in their respective caches. The first (inlined) value should be // successfully read; however, the two blob values could only be read from the // blob file, so for those the read should return Incomplete. read_options.read_tier = kBlockCacheTier; { std::array values; std::array statuses; db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0], &values[0], &statuses[0]); ASSERT_OK(statuses[0]); ASSERT_EQ(values[0], first_value); ASSERT_TRUE(statuses[1].IsIncomplete()); ASSERT_TRUE(statuses[2].IsIncomplete()); } } TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) { Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr char key[] = "key"; // Fake a corrupt blob index. const std::string blob_index("foobar"); WriteBatch batch; ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); ASSERT_OK(db_->Write(WriteOptions(), &batch)); ASSERT_OK(Flush()); PinnableSlice result; ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) .IsCorruption()); } TEST_F(DBBlobBasicTest, GetBlob_InlinedTTLIndex) { constexpr uint64_t min_blob_size = 10; Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = min_blob_size; Reopen(options); constexpr char key[] = "key"; constexpr char blob[] = "short"; static_assert(sizeof(short) - 1 < min_blob_size, "Blob too long to be inlined"); // Fake an inlined TTL blob index. std::string blob_index; constexpr uint64_t expiration = 1234567890; BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob); WriteBatch batch; ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); ASSERT_OK(db_->Write(WriteOptions(), &batch)); ASSERT_OK(Flush()); PinnableSlice result; ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) .IsCorruption()); } TEST_F(DBBlobBasicTest, GetBlob_IndexWithInvalidFileNumber) { Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr char key[] = "key"; // Fake a blob index referencing a non-existent blob file. std::string blob_index; constexpr uint64_t blob_file_number = 1000; constexpr uint64_t offset = 1234; constexpr uint64_t size = 5678; BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size, kNoCompression); WriteBatch batch; ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index)); ASSERT_OK(db_->Write(WriteOptions(), &batch)); ASSERT_OK(Flush()); PinnableSlice result; ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) .IsCorruption()); } #ifndef ROCKSDB_LITE TEST_F(DBBlobBasicTest, GenerateIOTracing) { Options options = GetDefaultOptions(); options.enable_blob_files = true; options.min_blob_size = 0; std::string trace_file = dbname_ + "/io_trace_file"; Reopen(options); { // Create IO trace file std::unique_ptr trace_writer; ASSERT_OK( NewFileTraceWriter(env_, EnvOptions(), trace_file, &trace_writer)); ASSERT_OK(db_->StartIOTrace(env_, TraceOptions(), std::move(trace_writer))); constexpr char key[] = "key"; constexpr char blob_value[] = "blob_value"; ASSERT_OK(Put(key, blob_value)); ASSERT_OK(Flush()); ASSERT_EQ(Get(key), blob_value); ASSERT_OK(db_->EndIOTrace()); ASSERT_OK(env_->FileExists(trace_file)); } { // Parse trace file to check file operations related to blob files are // recorded. std::unique_ptr trace_reader; ASSERT_OK( NewFileTraceReader(env_, EnvOptions(), trace_file, &trace_reader)); IOTraceReader reader(std::move(trace_reader)); IOTraceHeader header; ASSERT_OK(reader.ReadHeader(&header)); ASSERT_EQ(kMajorVersion, static_cast(header.rocksdb_major_version)); ASSERT_EQ(kMinorVersion, static_cast(header.rocksdb_minor_version)); // Read records. int blob_files_op_count = 0; Status status; while (true) { IOTraceRecord record; status = reader.ReadIOOp(&record); if (!status.ok()) { break; } if (record.file_name.find("blob") != std::string::npos) { blob_files_op_count++; } } // Assuming blob files will have Append, Close and then Read operations. ASSERT_GT(blob_files_op_count, 2); } } #endif // !ROCKSDB_LITE class DBBlobBasicIOErrorTest : public DBBlobBasicTest, public testing::WithParamInterface { protected: DBBlobBasicIOErrorTest() : sync_point_(GetParam()) { fault_injection_env_.reset(new FaultInjectionTestEnv(env_)); } ~DBBlobBasicIOErrorTest() { Close(); } std::unique_ptr fault_injection_env_; std::string sync_point_; }; INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorTest, ::testing::ValuesIn(std::vector{ "BlobFileReader::OpenFile:NewRandomAccessFile", "BlobFileReader::GetBlob:ReadFromFile"})); TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) { Options options; options.env = fault_injection_env_.get(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr char key[] = "key"; constexpr char blob_value[] = "blob_value"; ASSERT_OK(Put(key, blob_value)); ASSERT_OK(Flush()); SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { fault_injection_env_->SetFilesystemActive(false, Status::IOError(sync_point_)); }); SyncPoint::GetInstance()->EnableProcessing(); PinnableSlice result; ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result) .IsIOError()); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); } TEST_P(DBBlobBasicIOErrorTest, MultiGetBlobs_IOError) { Options options = GetDefaultOptions(); options.env = fault_injection_env_.get(); options.enable_blob_files = true; options.min_blob_size = 0; Reopen(options); constexpr size_t num_keys = 2; constexpr char first_key[] = "first_key"; constexpr char first_value[] = "first_value"; ASSERT_OK(Put(first_key, first_value)); constexpr char second_key[] = "second_key"; constexpr char second_value[] = "second_value"; ASSERT_OK(Put(second_key, second_value)); ASSERT_OK(Flush()); std::array keys{{first_key, second_key}}; std::array values; std::array statuses; SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { fault_injection_env_->SetFilesystemActive(false, Status::IOError(sync_point_)); }); SyncPoint::GetInstance()->EnableProcessing(); db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0], &values[0], &statuses[0]); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); ASSERT_TRUE(statuses[0].IsIOError()); ASSERT_TRUE(statuses[1].IsIOError()); } namespace { class ReadBlobCompactionFilter : public CompactionFilter { public: ReadBlobCompactionFilter() = default; const char* Name() const override { return "rocksdb.compaction.filter.read.blob"; } CompactionFilter::Decision FilterV2( int /*level*/, const Slice& /*key*/, ValueType value_type, const Slice& existing_value, std::string* new_value, std::string* /*skip_until*/) const override { if (value_type != CompactionFilter::ValueType::kValue) { return CompactionFilter::Decision::kKeep; } assert(new_value); new_value->assign(existing_value.data(), existing_value.size()); return CompactionFilter::Decision::kChangeValue; } }; } // anonymous namespace TEST_P(DBBlobBasicIOErrorTest, CompactionFilterReadBlob_IOError) { Options options = GetDefaultOptions(); options.env = fault_injection_env_.get(); options.enable_blob_files = true; options.min_blob_size = 0; options.create_if_missing = true; std::unique_ptr compaction_filter_guard( new ReadBlobCompactionFilter); options.compaction_filter = compaction_filter_guard.get(); DestroyAndReopen(options); constexpr char key[] = "foo"; constexpr char blob_value[] = "foo_blob_value"; ASSERT_OK(Put(key, blob_value)); ASSERT_OK(Flush()); SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { fault_injection_env_->SetFilesystemActive(false, Status::IOError(sync_point_)); }); SyncPoint::GetInstance()->EnableProcessing(); ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr, /*end=*/nullptr) .IsIOError()); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); } } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }