diff --git a/HISTORY.md b/HISTORY.md index c9e08547c..9f8b3f258 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -13,6 +13,9 @@ * Since 6.15.0, `TransactionDB` returns error `Status`es from calls to `DeleteRange()` and calls to `Write()` where the `WriteBatch` contains a range deletion. Previously such operations may have succeeded while not providing the expected transactional guarantees. There are certain cases where range deletion can still be used on such DBs; see the API doc on `TransactionDB::DeleteRange()` for details. * `OptimisticTransactionDB` now returns error `Status`es from calls to `DeleteRange()` and calls to `Write()` where the `WriteBatch` contains a range deletion. Previously such operations may have succeeded while not providing the expected transactional guarantees. +### Public API Change +* Add new Append and PositionedAppend APIs to FileSystem to bring the data verification information (data checksum information) from upper layer (e.g., WritableFileWriter) to the storage layer. In this way, the customized FileSystem is able to verify the correctness of data being written to the storage on time. Add checksum_handoff_file_types to DBOptions. User can use this option to control which file types (Currently supported file tyes: kWALFile, kTableFile, kDescriptorFile.) should use the new Append and PositionedAppend APIs to handoff the verification information. Currently, RocksDB only use crc32c to calculate the checksum for write handoff. + ## 6.17.0 (01/15/2021) ### Behavior Changes * When verifying full file checksum with `DB::VerifyFileChecksums()`, we now fail with `Status::InvalidArgument` if the name of the checksum generator used for verification does not match the name of the checksum generator used for protecting the file when it was created. diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc index ba1c7a740..722ed21b9 100644 --- a/db/blob/blob_file_builder.cc +++ b/db/blob/blob_file_builder.cc @@ -17,6 +17,7 @@ #include "file/writable_file_writer.h" #include "logging/logging.h" #include "options/cf_options.h" +#include "options/options_helper.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" #include "test_util/sync_point.h" @@ -177,13 +178,13 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { assert(file); file->SetIOPriority(io_priority_); file->SetWriteLifeTimeHint(write_hint_); - + FileTypeSet tmp_set = immutable_cf_options_->checksum_handoff_file_types; Statistics* const statistics = immutable_cf_options_->statistics; - std::unique_ptr file_writer(new WritableFileWriter( std::move(file), blob_file_paths_->back(), *file_options_, clock_, nullptr /*IOTracer*/, statistics, immutable_cf_options_->listeners, - immutable_cf_options_->file_checksum_gen_factory)); + immutable_cf_options_->file_checksum_gen_factory, + tmp_set.Contains(FileType::kBlobFile))); constexpr bool do_flush = false; diff --git a/db/builder.cc b/db/builder.cc index f277058b5..21ff3a800 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -28,6 +28,7 @@ #include "file/writable_file_writer.h" #include "monitoring/iostats_context_imp.h" #include "monitoring/thread_status_util.h" +#include "options/options_helper.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" @@ -149,13 +150,14 @@ Status BuildTable( file_checksum, file_checksum_func_name); return s; } + FileTypeSet tmp_set = ioptions.checksum_handoff_file_types; file->SetIOPriority(io_priority); file->SetWriteLifeTimeHint(write_hint); - file_writer.reset(new WritableFileWriter( std::move(file), fname, file_options, clock, io_tracer, ioptions.statistics, ioptions.listeners, - ioptions.file_checksum_gen_factory)); + ioptions.file_checksum_gen_factory, + tmp_set.Contains(FileType::kTableFile))); builder = NewTableBuilder( ioptions, mutable_cf_options, internal_comparator, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index f8b7c4967..2fd14675e 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -46,6 +46,7 @@ #include "monitoring/iostats_context_imp.h" #include "monitoring/perf_context_imp.h" #include "monitoring/thread_status_util.h" +#include "options/options_helper.h" #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -1734,6 +1735,7 @@ Status CompactionJob::OpenCompactionOutputFile( writable_file->SetIOPriority(Env::IOPriority::IO_LOW); writable_file->SetWriteLifeTimeHint(write_hint_); + FileTypeSet tmp_set = db_options_.checksum_handoff_file_types; writable_file->SetPreallocationBlockSize(static_cast( sub_compact->compaction->OutputFilePreallocationSize())); const auto& listeners = @@ -1741,7 +1743,8 @@ Status CompactionJob::OpenCompactionOutputFile( sub_compact->outfile.reset(new WritableFileWriter( std::move(writable_file), fname, file_options_, clock_, io_tracer_, db_options_.statistics.get(), listeners, - db_options_.file_checksum_gen_factory.get())); + db_options_.file_checksum_gen_factory.get(), + tmp_set.Contains(FileType::kTableFile))); // If the Column family flag is to only optimize filters for hits, // we can skip creating filters if this is the bottommost_level where diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 2480538f4..4091efb5f 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -22,6 +22,7 @@ #include "util/concurrent_task_limiter_impl.h" #include "util/random.h" #include "utilities/fault_injection_env.h" +#include "utilities/fault_injection_fs.h" namespace ROCKSDB_NAMESPACE { @@ -6274,6 +6275,297 @@ TEST_F(DBCompactionTest, CompactionWithBlobGCError_IndexWithInvalidFileNumber) { db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption()); } +TEST_F(DBCompactionTest, CompactionWithChecksumHandoff1) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 3; + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.checksum_handoff_file_types.Add(FileType::kTableFile); + Status s; + Reopen(options); + + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + ASSERT_OK(Put(Key(0), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ASSERT_OK(Put(Key(1), "value3")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s, Status::OK()); + Destroy(options); + Reopen(options); + + // The hash does not match, compaction write fails + // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash); + // Since the file system returns IOStatus::Corruption, it is an + // unrecoverable error. + ASSERT_OK(Put(Key(0), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:FlushMemTableFinished", + "BackgroundCallCompaction:0"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", [&](void*) { + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put(Key(1), "value3")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s.severity(), + ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError); + SyncPoint::GetInstance()->DisableProcessing(); + Destroy(options); + Reopen(options); + + // The file system does not support checksum handoff. The check + // will be ignored. + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum); + ASSERT_OK(Put(Key(0), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ASSERT_OK(Put(Key(1), "value3")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s, Status::OK()); + + // Each write will be similated as corrupted. + // Since the file system returns IOStatus::Corruption, it is an + // unrecoverable error. + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + ASSERT_OK(Put(Key(0), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:FlushMemTableFinished", + "BackgroundCallCompaction:0"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", + [&](void*) { fault_fs->IngestDataCorruptionBeforeWrite(); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put(Key(1), "value3")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s.severity(), + ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError); + SyncPoint::GetInstance()->DisableProcessing(); + + Destroy(options); +} + +TEST_F(DBCompactionTest, CompactionWithChecksumHandoff2) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 3; + options.env = fault_fs_env.get(); + options.create_if_missing = true; + Status s; + Reopen(options); + + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + ASSERT_OK(Put(Key(0), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ASSERT_OK(Put(Key(1), "value3")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s, Status::OK()); + Destroy(options); + Reopen(options); + + // options is not set, the checksum handoff will not be triggered + ASSERT_OK(Put(Key(0), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:FlushMemTableFinished", + "BackgroundCallCompaction:0"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", [&](void*) { + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put(Key(1), "value3")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s, Status::OK()); + SyncPoint::GetInstance()->DisableProcessing(); + Destroy(options); + Reopen(options); + + // The file system does not support checksum handoff. The check + // will be ignored. + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum); + ASSERT_OK(Put(Key(0), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ASSERT_OK(Put(Key(1), "value3")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s, Status::OK()); + + // options is not set, the checksum handoff will not be triggered + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + ASSERT_OK(Put(Key(0), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:FlushMemTableFinished", + "BackgroundCallCompaction:0"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", + [&](void*) { fault_fs->IngestDataCorruptionBeforeWrite(); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put(Key(1), "value3")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s, Status::OK()); + + Destroy(options); +} + +TEST_F(DBCompactionTest, CompactionWithChecksumHandoffManifest1) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 3; + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.checksum_handoff_file_types.Add(FileType::kDescriptorFile); + Status s; + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + Reopen(options); + + ASSERT_OK(Put(Key(0), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ASSERT_OK(Put(Key(1), "value3")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s, Status::OK()); + Destroy(options); + Reopen(options); + + // The hash does not match, compaction write fails + // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash); + // Since the file system returns IOStatus::Corruption, it is mapped to + // kFatalError error. + ASSERT_OK(Put(Key(0), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:FlushMemTableFinished", + "BackgroundCallCompaction:0"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", [&](void*) { + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put(Key(1), "value3")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError); + SyncPoint::GetInstance()->DisableProcessing(); + Destroy(options); +} + +TEST_F(DBCompactionTest, CompactionWithChecksumHandoffManifest2) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 3; + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.checksum_handoff_file_types.Add(FileType::kDescriptorFile); + Status s; + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum); + Reopen(options); + + // The file system does not support checksum handoff. The check + // will be ignored. + ASSERT_OK(Put(Key(0), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ASSERT_OK(Put(Key(1), "value3")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s, Status::OK()); + + // Each write will be similated as corrupted. + // Since the file system returns IOStatus::Corruption, it is mapped to + // kFatalError error. + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + ASSERT_OK(Put(Key(0), "value1")); + ASSERT_OK(Put(Key(2), "value2")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:FlushMemTableFinished", + "BackgroundCallCompaction:0"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", + [&](void*) { fault_fs->IngestDataCorruptionBeforeWrite(); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put(Key(1), "value3")); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError); + SyncPoint::GetInstance()->DisableProcessing(); + + Destroy(options); +} + #endif // !defined(ROCKSDB_LITE) } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index a5e4f7a81..a03ec81e3 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -20,6 +20,7 @@ #include "util/cast_util.h" #include "util/mutexlock.h" #include "utilities/fault_injection_env.h" +#include "utilities/fault_injection_fs.h" namespace ROCKSDB_NAMESPACE { @@ -525,6 +526,204 @@ TEST_F(DBFlushTest, FlushWithBlob) { #endif // ROCKSDB_LITE } +TEST_F(DBFlushTest, FlushWithChecksumHandoff1) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + Options options = CurrentOptions(); + options.write_buffer_size = 100; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 3; + options.disable_auto_compactions = true; + options.env = fault_fs_env.get(); + options.checksum_handoff_file_types.Add(FileType::kTableFile); + Reopen(options); + + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + ASSERT_OK(Put("key1", "value1")); + ASSERT_OK(Put("key2", "value2")); + ASSERT_OK(dbfull()->TEST_SwitchMemtable()); + + // The hash does not match, write fails + // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash); + // Since the file system returns IOStatus::Corruption, it is an + // unrecoverable error. + SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) { + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash); + }); + ASSERT_OK(Put("key3", "value3")); + ASSERT_OK(Put("key4", "value4")); + SyncPoint::GetInstance()->EnableProcessing(); + Status s = Flush(); + ASSERT_EQ(s.severity(), + ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError); + SyncPoint::GetInstance()->DisableProcessing(); + Destroy(options); + Reopen(options); + + // The file system does not support checksum handoff. The check + // will be ignored. + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum); + ASSERT_OK(Put("key5", "value5")); + ASSERT_OK(Put("key6", "value6")); + ASSERT_OK(dbfull()->TEST_SwitchMemtable()); + + // Each write will be similated as corrupted. + // Since the file system returns IOStatus::Corruption, it is an + // unrecoverable error. + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) { + fault_fs->IngestDataCorruptionBeforeWrite(); + }); + ASSERT_OK(Put("key7", "value7")); + ASSERT_OK(Put("key8", "value8")); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), + ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError); + SyncPoint::GetInstance()->DisableProcessing(); + + Destroy(options); +} + +TEST_F(DBFlushTest, FlushWithChecksumHandoff2) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + Options options = CurrentOptions(); + options.write_buffer_size = 100; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 3; + options.disable_auto_compactions = true; + options.env = fault_fs_env.get(); + Reopen(options); + + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + ASSERT_OK(Put("key1", "value1")); + ASSERT_OK(Put("key2", "value2")); + ASSERT_OK(Flush()); + + // options is not set, the checksum handoff will not be triggered + SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) { + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash); + }); + ASSERT_OK(Put("key3", "value3")); + ASSERT_OK(Put("key4", "value4")); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Flush()); + SyncPoint::GetInstance()->DisableProcessing(); + Destroy(options); + Reopen(options); + + // The file system does not support checksum handoff. The check + // will be ignored. + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum); + ASSERT_OK(Put("key5", "value5")); + ASSERT_OK(Put("key6", "value6")); + ASSERT_OK(Flush()); + + // options is not set, the checksum handoff will not be triggered + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) { + fault_fs->IngestDataCorruptionBeforeWrite(); + }); + ASSERT_OK(Put("key7", "value7")); + ASSERT_OK(Put("key8", "value8")); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Flush()); + SyncPoint::GetInstance()->DisableProcessing(); + + Destroy(options); +} + +TEST_F(DBFlushTest, FlushWithChecksumHandoffManifest1) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + Options options = CurrentOptions(); + options.write_buffer_size = 100; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 3; + options.disable_auto_compactions = true; + options.env = fault_fs_env.get(); + options.checksum_handoff_file_types.Add(FileType::kDescriptorFile); + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + Reopen(options); + + ASSERT_OK(Put("key1", "value1")); + ASSERT_OK(Put("key2", "value2")); + ASSERT_OK(Flush()); + + // The hash does not match, write fails + // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash); + // Since the file system returns IOStatus::Corruption, it is mapped to + // kFatalError error. + ASSERT_OK(Put("key3", "value3")); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WriteManifest", [&](void*) { + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash); + }); + ASSERT_OK(Put("key3", "value3")); + ASSERT_OK(Put("key4", "value4")); + SyncPoint::GetInstance()->EnableProcessing(); + Status s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError); + SyncPoint::GetInstance()->DisableProcessing(); + Destroy(options); +} + +TEST_F(DBFlushTest, FlushWithChecksumHandoffManifest2) { + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + Options options = CurrentOptions(); + options.write_buffer_size = 100; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 3; + options.disable_auto_compactions = true; + options.env = fault_fs_env.get(); + options.checksum_handoff_file_types.Add(FileType::kDescriptorFile); + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum); + Reopen(options); + // The file system does not support checksum handoff. The check + // will be ignored. + ASSERT_OK(Put("key5", "value5")); + ASSERT_OK(Put("key6", "value6")); + ASSERT_OK(Flush()); + + // Each write will be similated as corrupted. + // Since the file system returns IOStatus::Corruption, it is mapped to + // kFatalError error. + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WriteManifest", + [&](void*) { fault_fs->IngestDataCorruptionBeforeWrite(); }); + ASSERT_OK(Put("key7", "value7")); + ASSERT_OK(Put("key8", "value8")); + SyncPoint::GetInstance()->EnableProcessing(); + Status s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError); + SyncPoint::GetInstance()->DisableProcessing(); + + Destroy(options); +} + class DBFlushTestBlobError : public DBFlushTest, public testing::WithParamInterface { public: diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index b21d9e84d..fd63052de 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -289,11 +289,13 @@ Status DBImpl::NewDB(std::vector* new_filenames) { if (!s.ok()) { return s; } + FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types; file->SetPreallocationBlockSize( immutable_db_options_.manifest_preallocation_size); std::unique_ptr file_writer(new WritableFileWriter( std::move(file), manifest, file_options, clock_, io_tracer_, - nullptr /* stats */, immutable_db_options_.listeners)); + nullptr /* stats */, immutable_db_options_.listeners, nullptr, + tmp_set.Contains(FileType::kDescriptorFile))); log::Writer log(std::move(file_writer), 0, false); std::string record; new_db.EncodeTo(&record); @@ -1487,9 +1489,11 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, lfile->SetPreallocationBlockSize(preallocate_block_size); const auto& listeners = immutable_db_options_.listeners; + FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types; std::unique_ptr file_writer(new WritableFileWriter( std::move(lfile), log_fname, opt_file_options, clock_, io_tracer_, - nullptr /* stats */, listeners)); + nullptr /* stats */, listeners, nullptr, + tmp_set.Contains(FileType::kWalFile))); *new_log = new log::Writer(std::move(file_writer), log_file_num, immutable_db_options_.recycle_log_file_num > 0, immutable_db_options_.manual_wal_flush); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 1cb0391b8..25803b332 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -14,6 +14,7 @@ #include "rocksdb/file_system.h" #include "test_util/sync_point.h" #include "utilities/fault_injection_env.h" +#include "utilities/fault_injection_fs.h" namespace ROCKSDB_NAMESPACE { class DBWALTestBase : public DBTestBase { @@ -503,6 +504,87 @@ TEST_F(DBWALTest, RecoverWithBlobMultiSST) { ASSERT_EQ(l0_files.size(), blob_files.size()); } +TEST_F(DBWALTest, WALWithChecksumHandoff) { +#ifndef ROCKSDB_ASSERT_STATUS_CHECKED + if (mem_env_ || encrypted_env_) { + ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); + return; + } + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + do { + Options options = CurrentOptions(); + + options.checksum_handoff_file_types.Add(FileType::kWalFile); + options.env = fault_fs_env.get(); + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + + CreateAndReopenWithCF({"pikachu"}, options); + WriteOptions writeOpt = WriteOptions(); + writeOpt.disableWAL = true; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1")); + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1")); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ("v1", Get(1, "foo")); + ASSERT_EQ("v1", Get(1, "bar")); + + writeOpt.disableWAL = false; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2")); + writeOpt.disableWAL = true; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2")); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + // Both value's should be present. + ASSERT_EQ("v2", Get(1, "bar")); + ASSERT_EQ("v2", Get(1, "foo")); + + writeOpt.disableWAL = true; + // This put, data is persisted by Flush + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + writeOpt.disableWAL = false; + // Data is persisted in the WAL + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "zoo", "v3")); + // The hash does not match, write fails + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash); + writeOpt.disableWAL = false; + ASSERT_NOK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3")); + + ReopenWithColumnFamilies({"default", "pikachu"}, options); + // Due to the write failure, Get should not find + ASSERT_NE("v3", Get(1, "foo")); + ASSERT_EQ("v3", Get(1, "zoo")); + ASSERT_EQ("v3", Get(1, "bar")); + + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + // Each write will be similated as corrupted. + fault_fs->IngestDataCorruptionBeforeWrite(); + writeOpt.disableWAL = true; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v4")); + writeOpt.disableWAL = false; + ASSERT_NOK(dbfull()->Put(writeOpt, handles_[1], "foo", "v4")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_NE("v4", Get(1, "foo")); + ASSERT_NE("v4", Get(1, "bar")); + fault_fs->NoDataCorruptionBeforeWrite(); + + fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum); + // The file system does not provide checksum method and verification. + writeOpt.disableWAL = true; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v5")); + writeOpt.disableWAL = false; + ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v5")); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ("v5", Get(1, "foo")); + ASSERT_EQ("v5", Get(1, "bar")); + + Destroy(options); + } while (ChangeWalOptions()); +#endif // ROCKSDB_ASSERT_STATUS_CHECKED +} + class DBRecoveryTestBlobError : public DBWALTest, public testing::WithParamInterface { diff --git a/db/version_set.cc b/db/version_set.cc index 3c03117f8..88cf623c3 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -42,6 +42,7 @@ #include "monitoring/file_read_sample.h" #include "monitoring/perf_context_imp.h" #include "monitoring/persistent_stats_history.h" +#include "options/options_helper.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/write_buffer_manager.h" @@ -4119,10 +4120,11 @@ Status VersionSet::ProcessManifestWrites( if (io_s.ok()) { descriptor_file->SetPreallocationBlockSize( db_options_->manifest_preallocation_size); - + FileTypeSet tmp_set = db_options_->checksum_handoff_file_types; std::unique_ptr file_writer(new WritableFileWriter( std::move(descriptor_file), descriptor_fname, opt_file_opts, clock_, - io_tracer_, nullptr, db_options_->listeners)); + io_tracer_, nullptr, db_options_->listeners, nullptr, + tmp_set.Contains(FileType::kDescriptorFile))); descriptor_log_.reset( new log::Writer(std::move(file_writer), 0, false)); s = WriteCurrentStateToManifest(curr_state, wal_additions, diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index f344297a6..495d6b889 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -18,6 +18,7 @@ #include "port/port.h" #include "rocksdb/system_clock.h" #include "test_util/sync_point.h" +#include "util/crc32c.h" #include "util/random.h" #include "util/rate_limiter.h" @@ -395,6 +396,8 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) { assert(!use_direct_io()); const char* src = data; size_t left = size; + DataVerificationInfo v_info; + char checksum_buf[sizeof(uint32_t)]; while (left > 0) { size_t allowed; @@ -420,8 +423,16 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) { #endif { auto prev_perf_level = GetPerfLevel(); + IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_); - s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr); + if (perform_data_verification_) { + Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf); + v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); + s = writable_file_->Append(Slice(src, allowed), IOOptions(), v_info, + nullptr); + } else { + s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr); + } SetPerfLevel(prev_perf_level); } #ifndef ROCKSDB_LITE @@ -451,6 +462,19 @@ void WritableFileWriter::UpdateFileChecksum(const Slice& data) { } } +// Currently, crc32c checksum is used to calculate the checksum value of the +// content in the input buffer for handoff. In the future, the checksum might be +// calculated from the existing crc32c checksums of the in WAl and Manifest +// records, or even SST file blocks. +// TODO: effectively use the existing checksum of the data being writing to +// generate the crc32c checksum instead of a raw calculation. +void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data, + size_t size, + char* buf) { + uint32_t v_crc32c = crc32c::Extend(0, data, size); + EncodeFixed32(buf, v_crc32c); +} + // This flushes the accumulated data in the buffer. We pad data with zeros if // necessary to the whole page. // However, during automatic flushes padding would not be necessary. @@ -481,6 +505,8 @@ IOStatus WritableFileWriter::WriteDirect() { const char* src = buf_.BufferStart(); uint64_t write_offset = next_write_offset_; size_t left = buf_.CurrentSize(); + DataVerificationInfo v_info; + char checksum_buf[sizeof(uint32_t)]; while (left > 0) { // Check how much is allowed @@ -501,8 +527,16 @@ IOStatus WritableFileWriter::WriteDirect() { start_ts = FileOperationInfo::StartNow(); } // direct writes must be positional - s = writable_file_->PositionedAppend(Slice(src, size), write_offset, - IOOptions(), nullptr); + if (perform_data_verification_) { + Crc32cHandoffChecksumCalculation(src, size, checksum_buf); + v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); + s = writable_file_->PositionedAppend(Slice(src, size), write_offset, + IOOptions(), v_info, nullptr); + } else { + s = writable_file_->PositionedAppend(Slice(src, size), write_offset, + IOOptions(), nullptr); + } + if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s); diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index 1b7b3cb4c..0556f7c77 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -118,6 +118,8 @@ class WritableFileWriter { bool ShouldNotifyListeners() const { return !listeners_.empty(); } void UpdateFileChecksum(const Slice& data); + void Crc32cHandoffChecksumCalculation(const char* data, size_t size, + char* buf); std::string file_name_; FSWritableFilePtr writable_file_; @@ -141,6 +143,7 @@ class WritableFileWriter { std::vector> listeners_; std::unique_ptr checksum_generator_; bool checksum_finalized_; + bool perform_data_verification_; public: WritableFileWriter( @@ -150,7 +153,8 @@ class WritableFileWriter { const std::shared_ptr& io_tracer = nullptr, Statistics* stats = nullptr, const std::vector>& listeners = {}, - FileChecksumGenFactory* file_checksum_gen_factory = nullptr) + FileChecksumGenFactory* file_checksum_gen_factory = nullptr, + bool perform_data_verification = false) : file_name_(_file_name), writable_file_(std::move(file), io_tracer, _file_name), clock_(clock), @@ -167,7 +171,8 @@ class WritableFileWriter { stats_(stats), listeners_(), checksum_generator_(nullptr), - checksum_finalized_(false) { + checksum_finalized_(false), + perform_data_verification_(perform_data_verification) { TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", reinterpret_cast(max_buffer_size_)); buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); diff --git a/include/rocksdb/data_structure.h b/include/rocksdb/data_structure.h new file mode 100644 index 000000000..c9a4ebd82 --- /dev/null +++ b/include/rocksdb/data_structure.h @@ -0,0 +1,48 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include + +#include + +#include "rocksdb/rocksdb_namespace.h" + +namespace ROCKSDB_NAMESPACE { + +// This is a data structure specifically designed as a "Set" for a +// pretty small scale of Enum structure. For now, it can support up +// to 64 element, and it is expandable in the future. +template +class SmallEnumSet { + public: + SmallEnumSet() : state_(0) {} + + ~SmallEnumSet() {} + + // Return true if the input enum is included in the "Set" (i.e., changes the + // internal scalar state successfully), otherwise, it will return false. + bool Add(const ENUM_TYPE value) { + static_assert(MAX_VALUE <= 63, "Size currently limited to 64"); + assert(value >= 0 && value <= MAX_VALUE); + uint64_t old_state = state_; + uint64_t tmp = 1; + state_ |= (tmp << value); + return old_state != state_; + } + + // Return true if the input enum is contained in the "Set". + bool Contains(const ENUM_TYPE value) { + static_assert(MAX_VALUE <= 63, "Size currently limited to 64"); + assert(value >= 0 && value <= MAX_VALUE); + uint64_t tmp = 1; + return state_ & (tmp << value); + } + + private: + uint64_t state_; +}; +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 393b7647a..43f082d41 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -17,6 +17,7 @@ #pragma once #include + #include #include #include @@ -25,9 +26,11 @@ #include #include #include + #include "rocksdb/env.h" #include "rocksdb/io_status.h" #include "rocksdb/options.h" +#include "rocksdb/table.h" #include "rocksdb/thread_status.h" namespace ROCKSDB_NAMESPACE { @@ -97,16 +100,22 @@ struct FileOptions : EnvOptions { // to be issued for the file open/creation IOOptions io_options; - FileOptions() : EnvOptions() {} + // The checksum type that is used to calculate the checksum value for + // handoff during file writes. + ChecksumType handoff_checksum_type; + + FileOptions() : EnvOptions(), handoff_checksum_type(ChecksumType::kCRC32c) {} FileOptions(const DBOptions& opts) - : EnvOptions(opts) {} + : EnvOptions(opts), handoff_checksum_type(ChecksumType::kCRC32c) {} FileOptions(const EnvOptions& opts) - : EnvOptions(opts) {} + : EnvOptions(opts), handoff_checksum_type(ChecksumType::kCRC32c) {} FileOptions(const FileOptions& opts) - : EnvOptions(opts), io_options(opts.io_options) {} + : EnvOptions(opts), + io_options(opts.io_options), + handoff_checksum_type(opts.handoff_checksum_type) {} FileOptions& operator=(const FileOptions& opts) = default; }; @@ -740,10 +749,14 @@ class FSWritableFile { virtual IOStatus Append(const Slice& data, const IOOptions& options, IODebugContext* dbg) = 0; - // EXPERIMENTAL / CURRENTLY UNUSED - // Append data with verification information + // Append data with verification information. // Note that this API change is experimental and it might be changed in - // the future. Currently, RocksDB does not use this API. + // the future. Currently, RocksDB only generates crc32c based checksum for + // the file writes when the checksum handoff option is set. + // Expected behavior: if the handoff_checksum_type in FileOptions (currently, + // ChecksumType::kCRC32C is set as default) is not supported by this + // FSWritableFile, the information in DataVerificationInfo can be ignored + // (i.e. does not perform checksum verification). virtual IOStatus Append(const Slice& data, const IOOptions& options, const DataVerificationInfo& /* verification_info */, IODebugContext* dbg) { @@ -777,10 +790,14 @@ class FSWritableFile { return IOStatus::NotSupported("PositionedAppend"); } - // EXPERIMENTAL / CURRENTLY UNUSED // PositionedAppend data with verification information. // Note that this API change is experimental and it might be changed in - // the future. Currently, RocksDB does not use this API. + // the future. Currently, RocksDB only generates crc32c based checksum for + // the file writes when the checksum handoff option is set. + // Expected behavior: if the handoff_checksum_type in FileOptions (currently, + // ChecksumType::kCRC32C is set as default) is not supported by this + // FSWritableFile, the information in DataVerificationInfo can be ignored + // (i.e. does not perform checksum verification). virtual IOStatus PositionedAppend( const Slice& /* data */, uint64_t /* offset */, const IOOptions& /*options*/, diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index ab6262007..79cce2270 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -20,12 +20,14 @@ #include "rocksdb/advanced_options.h" #include "rocksdb/comparator.h" #include "rocksdb/compression_type.h" +#include "rocksdb/data_structure.h" #include "rocksdb/env.h" #include "rocksdb/file_checksum.h" #include "rocksdb/listener.h" #include "rocksdb/sst_partitioner.h" #include "rocksdb/types.h" #include "rocksdb/universal_compaction.h" +#include "rocksdb/version.h" #include "rocksdb/write_buffer_manager.h" #ifdef max @@ -57,6 +59,8 @@ class FileSystem; struct Options; struct DbPath; +using FileTypeSet = SmallEnumSet; + struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions { // The function recovers options to a previous version. Only 4.6 or later // versions are supported. @@ -1191,6 +1195,16 @@ struct DBOptions { // // Default: hostname std::string db_host_id = kHostnameForDbHostId; + + // Use this if your DB want to enable checksum handoff for specific file + // types writes. Make sure that the File_system you use support the + // crc32c checksum verification + // Currently supported file tyes: kWALFile, kTableFile, kDescriptorFile. + // NOTE: currently RocksDB only generates crc32c based checksum for the + // handoff. If the storage layer has different checksum support, user + // should enble this set as empty. Otherwise,it may cause unexpected + // write failures. + FileTypeSet checksum_handoff_file_types; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/options/cf_options.cc b/options/cf_options.cc index c436dd312..7e680fd88 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -846,7 +846,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, file_checksum_gen_factory(db_options.file_checksum_gen_factory.get()), sst_partitioner_factory(cf_options.sst_partitioner_factory), allow_data_in_errors(db_options.allow_data_in_errors), - db_host_id(db_options.db_host_id) {} + db_host_id(db_options.db_host_id), + checksum_handoff_file_types(db_options.checksum_handoff_file_types) {} // Multiple two operands. If they overflow, return op1. uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) { diff --git a/options/cf_options.h b/options/cf_options.h index c9e8f068f..7bbea71e4 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -126,6 +126,8 @@ struct ImmutableCFOptions { bool allow_data_in_errors; std::string db_host_id; + + FileTypeSet checksum_handoff_file_types; }; struct MutableCFOptions { diff --git a/options/db_options.cc b/options/db_options.cc index 3733d448c..8e587117b 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -136,6 +136,7 @@ static std::unordered_map std::shared_ptr statistics; std::vector db_paths; std::vector> listeners; + FileTypeSet checksum_handoff_file_types; */ {"advise_random_on_open", {offsetof(struct ImmutableDBOptions, advise_random_on_open), @@ -580,7 +581,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) max_bgerror_resume_count(options.max_bgerror_resume_count), bgerror_resume_retry_interval(options.bgerror_resume_retry_interval), allow_data_in_errors(options.allow_data_in_errors), - db_host_id(options.db_host_id) { + db_host_id(options.db_host_id), + checksum_handoff_file_types(options.checksum_handoff_file_types) { } void ImmutableDBOptions::Dump(Logger* log) const { diff --git a/options/db_options.h b/options/db_options.h index 42a58e256..a454e42b0 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -93,6 +93,7 @@ struct ImmutableDBOptions { uint64_t bgerror_resume_retry_interval; bool allow_data_in_errors; std::string db_host_id; + FileTypeSet checksum_handoff_file_types; }; struct MutableDBOptions { diff --git a/options/options_helper.cc b/options/options_helper.cc index 02139a62b..e4e8d4ee8 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -168,6 +168,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, immutable_db_options.bgerror_resume_retry_interval; options.db_host_id = immutable_db_options.db_host_id; options.allow_data_in_errors = immutable_db_options.allow_data_in_errors; + options.checksum_handoff_file_types = + immutable_db_options.checksum_handoff_file_types; return options; } diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 5e0d402fd..a1fc17514 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -227,6 +227,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { {offsetof(struct DBOptions, file_checksum_gen_factory), sizeof(std::shared_ptr)}, {offsetof(struct DBOptions, db_host_id), sizeof(std::string)}, + {offsetof(struct DBOptions, checksum_handoff_file_types), + sizeof(FileTypeSet)}, }; char* options_ptr = new char[sizeof(DBOptions)]; diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index 8e4e264e2..b4d021ec3 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -183,8 +183,9 @@ Status SstFileWriter::Open(const std::string& file_path) { Rep* r = rep_.get(); Status s; std::unique_ptr sst_file; + FileOptions cur_file_opts(r->env_options); s = r->ioptions.env->GetFileSystem()->NewWritableFile( - file_path, r->env_options, &sst_file, nullptr); + file_path, cur_file_opts, &sst_file, nullptr); if (!s.ok()) { return s; } @@ -256,12 +257,13 @@ Status SstFileWriter::Open(const std::string& file_path) { r->column_family_name, unknown_level, 0 /* creation_time */, 0 /* oldest_key_time */, 0 /* target_file_size */, 0 /* file_creation_time */, "SST Writer" /* db_id */, db_session_id); - + FileTypeSet tmp_set = r->ioptions.checksum_handoff_file_types; r->file_writer.reset(new WritableFileWriter( std::move(sst_file), file_path, r->env_options, r->ioptions.env->GetSystemClock(), nullptr /* io_tracer */, nullptr /* stats */, r->ioptions.listeners, - r->ioptions.file_checksum_gen_factory)); + r->ioptions.file_checksum_gen_factory, + tmp_set.Contains(FileType::kTableFile))); // TODO(tec) : If table_factory is using compressed block cache, we will // be adding the external sst file blocks into it, which is wasteful. diff --git a/util/slice_test.cc b/util/slice_test.cc index 9e8a8e340..4226768f3 100644 --- a/util/slice_test.cc +++ b/util/slice_test.cc @@ -3,9 +3,12 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include "rocksdb/slice.h" + #include "port/port.h" #include "port/stack_trace.h" -#include "rocksdb/slice.h" +#include "rocksdb/data_structure.h" +#include "rocksdb/types.h" #include "test_util/testharness.h" #include "test_util/testutil.h" @@ -154,6 +157,22 @@ TEST_F(PinnableSliceTest, Move) { ASSERT_EQ(2, res); } +// Unit test for SmallEnumSet +class SmallEnumSetTest : public testing::Test { + public: + SmallEnumSetTest() {} + ~SmallEnumSetTest() {} +}; + +TEST_F(SmallEnumSetTest, SmallSetTest) { + FileTypeSet fs; + ASSERT_TRUE(fs.Add(FileType::kIdentityFile)); + ASSERT_FALSE(fs.Add(FileType::kIdentityFile)); + ASSERT_TRUE(fs.Add(FileType::kInfoLogFile)); + ASSERT_TRUE(fs.Contains(FileType::kIdentityFile)); + ASSERT_FALSE(fs.Contains(FileType::kDBLockFile)); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/utilities/fault_injection_fs.cc b/utilities/fault_injection_fs.cc index c2a5fb6b5..27509ab45 100644 --- a/utilities/fault_injection_fs.cc +++ b/utilities/fault_injection_fs.cc @@ -22,7 +22,10 @@ #include "env/composite_env_wrapper.h" #include "port/lang.h" #include "port/stack_trace.h" +#include "util/coding.h" +#include "util/crc32c.h" #include "util/random.h" +#include "util/xxhash.h" namespace ROCKSDB_NAMESPACE { @@ -53,6 +56,21 @@ std::pair TestFSGetDirAndName( return std::make_pair(dirname, fname); } +// Calculate the checksum of the data with corresponding checksum +// type. If name does not match, no checksum is returned. +void CalculateTypedChecksum(const ChecksumType& checksum_type, const char* data, + size_t size, std::string* checksum) { + if (checksum_type == ChecksumType::kCRC32c) { + uint32_t v_crc32c = crc32c::Extend(0, data, size); + PutFixed32(checksum, v_crc32c); + return; + } else if (checksum_type == ChecksumType::kxxHash) { + uint32_t v = XXH32(data, size, 0); + PutFixed32(checksum, v); + } + return; +} + IOStatus FSFileState::DropUnsyncedData() { buffer_.resize(0); return IOStatus::OK(); @@ -74,9 +92,11 @@ IOStatus TestFSDirectory::Fsync(const IOOptions& options, IODebugContext* dbg) { } TestFSWritableFile::TestFSWritableFile(const std::string& fname, + const FileOptions& file_opts, std::unique_ptr&& f, FaultInjectionTestFS* fs) : state_(fname), + file_opts_(file_opts), target_(std::move(f)), writable_file_opened_(true), fs_(fs) { @@ -103,6 +123,37 @@ IOStatus TestFSWritableFile::Append(const Slice& data, const IOOptions&, return io_s; } +// By setting the IngestDataCorruptionBeforeWrite(), the data corruption is +// simulated. +IOStatus TestFSWritableFile::Append( + const Slice& data, const IOOptions&, + const DataVerificationInfo& verification_info, IODebugContext*) { + MutexLock l(&mutex_); + if (!fs_->IsFilesystemActive()) { + return fs_->GetError(); + } + if (fs_->ShouldDataCorruptionBeforeWrite()) { + return IOStatus::Corruption("Data is corrupted!"); + } + + // Calculate the checksum + std::string checksum; + CalculateTypedChecksum(fs_->GetChecksumHandoffFuncType(), data.data(), + data.size(), &checksum); + if (fs_->GetChecksumHandoffFuncType() != ChecksumType::kNoChecksum && + checksum != verification_info.checksum.ToString()) { + std::string msg = "Data is corrupted! Origin data checksum: " + + verification_info.checksum.ToString() + + "current data checksum: " + checksum; + return IOStatus::Corruption(msg); + } + + state_.buffer_.append(data.data(), data.size()); + state_.pos_ += data.size(); + fs_->WritableFileAppended(state_); + return IOStatus::OK(); +} + IOStatus TestFSWritableFile::Close(const IOOptions& options, IODebugContext* dbg) { if (!fs_->IsFilesystemActive()) { @@ -249,7 +300,8 @@ IOStatus FaultInjectionTestFS::NewWritableFile( IOStatus io_s = target()->NewWritableFile(fname, file_opts, result, dbg); if (io_s.ok()) { - result->reset(new TestFSWritableFile(fname, std::move(*result), this)); + result->reset( + new TestFSWritableFile(fname, file_opts, std::move(*result), this)); // WritableFileWriter* file is opened // again then it will be truncated - so forget our saved state. UntrackFile(fname); @@ -273,7 +325,8 @@ IOStatus FaultInjectionTestFS::ReopenWritableFile( } IOStatus io_s = target()->ReopenWritableFile(fname, file_opts, result, dbg); if (io_s.ok()) { - result->reset(new TestFSWritableFile(fname, std::move(*result), this)); + result->reset( + new TestFSWritableFile(fname, file_opts, std::move(*result), this)); // WritableFileWriter* file is opened // again then it will be truncated - so forget our saved state. UntrackFile(fname); diff --git a/utilities/fault_injection_fs.h b/utilities/fault_injection_fs.h index fb18edbd6..7a8e46a6f 100644 --- a/utilities/fault_injection_fs.h +++ b/utilities/fault_injection_fs.h @@ -59,16 +59,15 @@ struct FSFileState { class TestFSWritableFile : public FSWritableFile { public: explicit TestFSWritableFile(const std::string& fname, + const FileOptions& file_opts, std::unique_ptr&& f, FaultInjectionTestFS* fs); virtual ~TestFSWritableFile(); virtual IOStatus Append(const Slice& data, const IOOptions&, IODebugContext*) override; - virtual IOStatus Append(const Slice& data, const IOOptions& options, - const DataVerificationInfo& /*verification_info*/, - IODebugContext* dbg) override { - return Append(data, options, dbg); - } + virtual IOStatus Append(const Slice& data, const IOOptions&, + const DataVerificationInfo& verification_info, + IODebugContext*) override; virtual IOStatus Truncate(uint64_t size, const IOOptions& options, IODebugContext* dbg) override { return target_->Truncate(size, options, dbg); @@ -98,6 +97,7 @@ class TestFSWritableFile : public FSWritableFile { private: FSFileState state_; + FileOptions file_opts_; std::unique_ptr target_; bool writable_file_opened_; FaultInjectionTestFS* fs_; @@ -174,7 +174,8 @@ class FaultInjectionTestFS : public FileSystemWrapper { filesystem_writable_(false), thread_local_error_(new ThreadLocalPtr(DeleteThreadLocalErrorContext)), enable_write_error_injection_(false), - write_error_rand_(0) {} + write_error_rand_(0), + ingest_data_corruption_before_write_(false) {} virtual ~FaultInjectionTestFS() { error_.PermitUncheckedError(); } const char* Name() const override { return "FaultInjectionTestFS"; } @@ -291,6 +292,32 @@ class FaultInjectionTestFS : public FileSystemWrapper { error_ = io_error; } + // To simulate the data corruption before data is written in FS + void IngestDataCorruptionBeforeWrite() { + MutexLock l(&mutex_); + ingest_data_corruption_before_write_ = true; + } + + void NoDataCorruptionBeforeWrite() { + MutexLock l(&mutex_); + ingest_data_corruption_before_write_ = false; + } + + bool ShouldDataCorruptionBeforeWrite() { + MutexLock l(&mutex_); + return ingest_data_corruption_before_write_; + } + + void SetChecksumHandoffFuncType(const ChecksumType& func_type) { + MutexLock l(&mutex_); + checksum_handoff_func_tpye_ = func_type; + } + + const ChecksumType& GetChecksumHandoffFuncType() { + MutexLock l(&mutex_); + return checksum_handoff_func_tpye_; + } + // Specify what the operation, so we can inject the right type of error enum ErrorOperation : char { kRead = 0, @@ -432,6 +459,8 @@ class FaultInjectionTestFS : public FileSystemWrapper { Random write_error_rand_; int write_error_one_in_; std::vector write_error_allowed_types_; + bool ingest_data_corruption_before_write_; + ChecksumType checksum_handoff_func_tpye_; }; } // namespace ROCKSDB_NAMESPACE